import logging
import os
+import traceback
-LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "DEBUG").upper()
+LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "INFO").upper()
LOGLEVEL = getattr(logging, LOGLEVEL)
-FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
-logging.basicConfig(format = FORMAT, level = LOGLEVEL)
+#FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+FORMAT = "%(asctime)s %(name)s %(levelname)-4s %(message)s"
+
+# NEPI_LOG variable contains space separated components
+# on which logging should be enabled
+LOG = os.environ.get("NEPI_LOG", "ALL").upper()
+
+if LOG != 'ALL':
+ # Set by default loglevel to error
+ logging.basicConfig(format = FORMAT, level = logging.ERROR)
+
+ # Set logging level to that defined by the user
+ # only for the enabled components
+ for component in LOG.split(" "):
+ try:
+ log = logging.getLogger(component)
+ log.setLevel(LOGLEVEL)
+ except:
+ err = traceback.format_exc()
+ print "ERROR ", err
+else:
+ # Set the logging level defined by the user for all
+ # components
+ logging.basicConfig(format = FORMAT, level = LOGLEVEL)
self._thread.start()
# Logging
- self._logger = logging.getLogger("neco.execution.ec")
+ self._logger = logging.getLogger("ExperimentController")
@property
def logger(self):
self._release_time = None
# Logging
- self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " % (self._rtype, self.guid))
+ self._logger = logging.getLogger("Resource")
+
+ def debug(self, msg, out = None, err = None):
+ self.log(msg, logging.DEBUG, out, err)
+
+ def error(self, msg, out = None, err = None):
+ self.log(msg, logging.ERROR, out, err)
+
+ def warn(self, msg, out = None, err = None):
+ self.log(msg, logging.WARNING, out, err)
+
+ def info(self, msg, out = None, err = None):
+ self.log(msg, logging.INFO, out, err)
+
+ def log(self, msg, level, out = None, err = None):
+ if out:
+ msg += " - OUT: %s " % out
+
+ if err:
+ msg += " - ERROR: %s " % err
+
+ msg = self.log_message(msg)
+
+ self.logger.log(level, msg)
+
+ def log_message(self, msg):
+ return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
@property
def logger(self):
"""
if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
- self.logger.error("Wrong state %s for start" % self.state)
+ self.error("Wrong state %s for start" % self.state)
return
self._start_time = strfnow()
"""
if not self._state in [ResourceState.STARTED]:
- self.logger.error("Wrong state %s for stop" % self.state)
+ self.error("Wrong state %s for stop" % self.state)
return
self._stop_time = strfnow()
# only can start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
- self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state )
+ self.debug("---- RESCHEDULING START ---- state %s " % self.state )
else:
- self.logger.debug("---- START CONDITIONS ---- %s" %
+ self.debug("---- START CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.START))
# Verify all start conditions are met
if reschedule:
self.ec.schedule(delay, self.start_with_conditions)
else:
- self.logger.debug("----- STARTING ---- ")
+ self.debug("----- STARTING ---- ")
self.start()
def stop_with_conditions(self):
if self.state != ResourceState.STARTED:
reschedule = True
else:
- self.logger.debug(" ---- STOP CONDITIONS ---- %s" %
+ self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
stop_conditions = self.conditions.get(ResourceAction.STOP, [])
"""
if self._state > ResourceState.READY:
- self.logger.error("Wrong state %s for deploy" % self.state)
+ self.error("Wrong state %s for deploy" % self.state)
return
+ self.debug("----- DEPLOYING ---- ")
self._ready_time = strfnow()
self._state = ResourceState.READY
self._ppid = None
self._home = "app-%s" % self.guid
- self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+ self._logger = logging.getLogger("LinuxApplication")
+
+ def log_message(self, msg):
+ return " guid %d - host %s - %s " % (self.guid,
+ self.node.get("hostname"), msg)
@property
def node(self):
(out, err), proc = self.node.execute(cmd)
if (err and proc.poll()) or out.find("error") != -1:
- err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
return None
if attr == TraceAttr.PATH:
(out, err), proc = self.node.check_output(self.home, name)
if err and proc.poll():
- err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't read trace %s " % name
+ self.error(msg, out, err)
return None
return out
(out, err), proc = self.node.execute(cmd)
if err and proc.poll():
- err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
return None
if attr == TraceAttr.SIZE:
# check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
- self.logger.debug(" Uploading sources %s" % sources)
+ self.info(" Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
# create dir for sources
self.node.mkdir(self.src_dir)
- self.logger.debug(" Uploading code '%s'" % code)
+ self.info(" Uploading code ")
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
def install_dependencies(self):
depends = self.get("depends")
if depends:
- self.logger.debug(" Installing dependencies %s" % depends)
+ self.info(" Installing dependencies %s" % depends)
self.node.install_packages(depends, home = self.home)
def build(self):
build = self.get("build")
if build:
- self.logger.debug(" Building sources '%s'" % build)
+ self.info(" Building sources ")
# create dir for build
self.node.mkdir(self.build_dir)
def install(self):
install = self.get("install")
if install:
- self.logger.debug(" Installing sources '%s'" % install)
+ self.info(" Installing sources ")
cmd = self.replace_paths(install)
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
+ self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(DELAY, self.deploy)
else:
- self.discover()
- self.provision()
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self._state = ResourceState.FAILED
+ raise
super(LinuxApplication, self).deploy()
stdin = 'stdin' if self.get("stdin") else None
sudo = self.get('sudo') or False
x11 = self.get("forwardX11") or False
- err_msg = "Failed to run command %s on host %s" % (
- command, self.node.get("hostname"))
failed = False
super(LinuxApplication, self).start()
+ self.info("Starting command %s" % command)
+
if x11:
(out, err), proc = self.node.execute(command,
sudo = sudo,
if failed or out or chkerr:
# check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+ out = out
if err:
- err_msg = "%s. Proc error: %s" % (err_msg, err)
+ err = err
+ elif chkerr:
+ err = chkerr
- err_msg = "%s. Run error: %s " % (err_msg, out)
+ self.error(msg, out, err)
- if chkerr:
- err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+ msg2 = " Setting state to Failed"
+ self.debug(msg2)
+ self._state = ResourceState.FAILED
- self.logger.error(err_msg)
- self.state = ResourceState.FAILED
+ raise RuntimeError, msg
def stop(self):
state = self.state
if state == ResourceState.STARTED:
+ self.info("Stopping command %s" % command)
+
(out, err), proc = self.node.kill(self.pid, self.ppid)
if out or err:
# check if execution errors occurred
- err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
- self.get("command"), self.node.get("hostname"), err, out)
- self.logger.error(err_msg)
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
self._state = ResourceState.FAILED
stopped = False
else:
super(LinuxApplication, self).stop()
def release(self):
+ self.info("Releasing resource")
+
tear_down = self.get("tearDown")
if tear_down:
self.node.execute(tear_down)
(out, err), proc = self.node.check_output(self.home, 'stderr')
if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
+
# check if execution errors occurred
- err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
- self.get("command"), self.node.get("hostname"), err, out)
- self.logger.error(err_msg)
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
self._state = ResourceState.FAILED
elif self.pid and self.ppid:
def __init__(self, ec, guid):
super(LinuxChannel, self).__init__(ec, guid)
- self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid)
+ self._logger = logging.getLogger("LinuxChannel")
+
+ def log_message(self, msg):
+ return " guid %d - %s " % (self.guid, msg)
def valid_connection(self, guid):
# TODO: Validate!
import threading
# TODO: Verify files and dirs exists already
-# TODO: Blacklist node!
+# TODO: Blacklist nodes!
DELAY ="1s"
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
- self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
+ self._logger = logging.getLogger("LinuxNode")
+
+ def log_message(self, msg):
+ return " guid %d - host %s - %s " % (self.guid,
+ self.get("hostname"), msg)
@property
def home(self):
return self._os
if (not self.get("hostname") or not self.get("username")):
- msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
- self.logger.error(msg)
+ msg = "Can't resolve OS, insufficient data "
+ self.error(msg)
raise RuntimeError, msg
- (out, err), proc = self.execute("cat /etc/issue")
+ (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
if err and proc.poll():
- msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
- self.logger.error(msg)
- raise RuntimeError, msg
+ msg = "Error detecting OS "
+ self.error(msg, out, err)
+ raise RuntimeError, "%s - %s - %s" %( msg, out, err )
if out.find("Fedora release 12") == 0:
self._os = "f12"
elif out.find("Ubuntu") ==0:
self._os = "ubuntu"
else:
- msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
- self.logger.error(msg)
- raise RuntimeError, msg
+ msg = "Unsupported OS"
+ self.error(msg, out)
+ raise RuntimeError, "%s - %s " %( msg, out )
return self._os
def provision(self, filters = None):
if not self.is_alive():
self._state = ResourceState.FAILED
- self.logger.error("Deploy failed. Unresponsive node")
+ self.error("Deploy failed. Unresponsive node")
return
if self.get("cleanProcesses"):
def deploy(self):
if self.state == ResourceState.NEW:
- self.discover()
- self.provision()
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self._state = ResourceState.FAILED
+ raise
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
# TODO: Validate!
return True
- def clean_processes(self):
- self.logger.info("Cleaning up processes")
+ def clean_processes(self, killer = False):
+ self.info("Cleaning up processes")
- cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
- "sudo -S killall python tcpdump || /bin/true ; " +
- "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
- "sudo -S killall -u root || /bin/true ; " +
- "sudo -S killall -u root || /bin/true ; ")
+ if killer:
+ # Hardcore kill
+ cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; ")
+ else:
+ # Be gentler...
+ cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+
out = err = ""
- with self._lock:
- (out, err), proc = self.execute(cmd)
+ (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
def clean_home(self):
- self.logger.info("Cleaning up home")
+ self.info("Cleaning up home")
cmd = ("cd %s ; " % self.home +
"find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
" -execdir rm -rf {} + ")
out = err = ""
- with self._lock:
- (out, err), proc = self.execute(cmd)
+ (out, err), proc = self.execute(cmd, with_lock = True)
def upload(self, src, dst, text = False):
""" Copy content to destination
elif self.os in ["debian", "ubuntu"]:
cmd = debfuncs.install_packages_command(self.os, packages)
else:
- msg = "Error installing packages. OS not known for host %s " % (
- self.get("hostname"))
- self.logger.error(msg)
+ msg = "Error installing packages ( OS not known ) "
+ self.error(msg, self.os)
raise RuntimeError, msg
out = err = ""
- with self._lock:
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "instpkg_pid",
- stdout = "instpkg_log",
- stderr = "instpkg_err",
- raise_on_error = True)
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "instpkg_pid",
+ stdout = "instpkg_log",
+ stderr = "instpkg_err",
+ raise_on_error = True)
return (out, err), proc
elif self.os in ["debian", "ubuntu"]:
cmd = debfuncs.remove_packages_command(self.os, packages)
else:
- msg = "Error removing packages. OS not known for host %s " % (
- self.get("hostname"))
- self.logger.error(msg)
+ msg = "Error removing packages ( OS not known ) "
+ self.error(msg)
raise RuntimeError, msg
out = err = ""
- with self._lock:
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "rmpkg_pid",
- stdout = "rmpkg_log",
- stderr = "rmpkg_err",
- raise_on_error = True)
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "rmpkg_pid",
+ stdout = "rmpkg_log",
+ stderr = "rmpkg_err",
+ raise_on_error = True)
return (out, err), proc
if clean:
self.rmdir(path)
- return self.execute("mkdir -p %s" % path)
+ return self.execute("mkdir -p %s" % path, with_lock = True)
def rmdir(self, path):
- return self.execute("rm -rf %s" % path)
+ return self.execute("rm -rf %s" % path, with_lock = True)
def run_and_wait(self, command,
home = ".",
# check no errors occurred
if proc.poll() and err:
- msg = " Failed to run command %s on host %s" % (
- command, self.get("hostname"))
- self.logger.error(msg)
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
if raise_on_error:
raise RuntimeError, msg
(out, err), proc = self.check_output(home, stderr)
if err or out:
- msg = "Error while running command %s on host %s. error output: %s" % (
- command, self.get("hostname"), out)
- if err:
- msg += " . err: %s" % err
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
- self.logger.error(msg)
if raise_on_error:
raise RuntimeError, msg
time.sleep(delay)
delay = min(30,delay*1.2)
else:
- msg = " Failed to get pid for pidfile %s/%s on host %s" % (
- home, pidfile, self.get("hostname"))
- self.logger.error(msg)
+ msg = " Failed to get pid for pidfile %s/%s " % (
+ home, pidfile )
+ self.error(msg)
+
if raise_on_error:
raise RuntimeError, msg
def check_output(self, home, filename):
""" checks file content """
(out, err), proc = self.execute("cat %s" %
- os.path.join(home, filename))
+ os.path.join(home, filename), with_lock = True)
return (out, err), proc
def is_alive(self):
out = err = ""
try:
- (out, err), proc = self.execute("echo 'ALIVE'")
+ (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
except:
import traceback
trace = traceback.format_exc()
- self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s",
- self.get("hostname"), out, err, trace)
+ msg = "Unresponsive host "
+ self.warn(msg, out, trace)
return False
if out.strip().startswith('ALIVE'):
return True
else:
- self.logger.warn("Unresponsive host %s. got:\n%s%s",
- self.get("hostname"), out, err)
+ msg = "Unresponsive host "
+ self.warn(msg, out, err)
return False
# TODO!
(out, err), proc = execfuncs.lcopy(source, dest,
recursive = True)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
- src, dst,
- port = self.get("port"),
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- recursive = True)
+ with self._lock:
+ (out, err), proc = sshfuncs.rcopy(
+ src, dst,
+ port = self.get("port"),
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ recursive = True)
return (out, err), proc
tty = False,
forward_x11 = False,
timeout = None,
- retry = 0,
+ retry = 3,
err_on_timeout = True,
connect_timeout = 30,
- persistent = True
+ persistent = True,
+ with_lock = False
):
""" Notice that this invocation will block until the
execution finishes. If this is not the desired behavior,
stdin = stdin,
env = env)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rexec)(
+ if with_lock:
+ with self._lock:
+ (out, err), proc = sshfuncs.rexec(
+ command,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent
+ )
+ else:
+ (out, err), proc = sshfuncs.rexec(
command,
host = self.get("hostname"),
user = self.get("username"),
stderr = 'stderr',
sudo = False):
- self.logger.info("Running %s", command)
+ self.debug("Running %s" % command)
if self.localhost:
(out, err), proc = execfuncs.lspawn(command, pidfile,
else:
# Start process in a "daemonized" way, using nohup and heavy
# stdin/out redirection to avoid connection issues
- (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
- command,
- pidfile = pidfile,
- home = home,
- create_home = create_home,
- stdin = stdin if stdin is not None else '/dev/null',
- stdout = stdout if stdout else '/dev/null',
- stderr = stderr if stderr else '/dev/null',
- sudo = sudo,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
+ with self._lock:
+ (out,err), proc = sshfuncs.rspawn(
+ command,
+ pidfile = pidfile,
+ home = home,
+ create_home = create_home,
+ stdin = stdin if stdin is not None else '/dev/null',
+ stdout = stdout if stdout else '/dev/null',
+ stderr = stderr if stderr else '/dev/null',
+ sudo = sudo,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return (out, err), proc
if self.localhost:
pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
else:
- pidtuple = sshfuncs.rcheckpid(
- os.path.join(home, pidfile),
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
-
- return pidtuple
-
- def status(self, pid, ppid):
- if self.localhost:
- status = execfuncs.lstatus(pid, ppid)
- else:
- status = sshfuncs.rstatus(
- pid, ppid,
+ with self._lock:
+ pidtuple = sshfuncs.rcheckpid(
+ os.path.join(home, pidfile),
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
identity = self.get("identity"),
server_key = self.get("serverKey")
)
+
+ return pidtuple
+
+ def status(self, pid, ppid):
+ if self.localhost:
+ status = execfuncs.lstatus(pid, ppid)
+ else:
+ with self._lock:
+ status = sshfuncs.rstatus(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return status
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rkill)(
- pid, ppid,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- sudo = sudo,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
+ with self._lock:
+ (out, err), proc = sshfuncs.rkill(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return (out, err), proc
def check_bad_host(self, out, err):
def blacklist(self):
# TODO!!!!
- self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+ self.warn(" Blacklisting malfunctioning node ")
#import util
#util.appendBlacklist(self.hostname)
- def safe_retry(self, func):
- """Retries a function invocation using a lock"""
- import functools
- @functools.wraps(func)
- def rv(*p, **kw):
- fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
- func.__name__, p, kw, self.get("hostname"))
- retry = kw.pop("_retry", False)
- wlock = kw.pop("_with_lock", False)
-
- out = err = ""
- proc = None
- for i in xrange(0 if retry else 4):
- try:
- if wlock:
- with self._lock:
- (out, err), proc = func(*p, **kw)
- else:
- (out, err), proc = func(*p, **kw)
-
- if proc.poll():
- if retry:
- time.sleep(i*15)
- continue
- else:
- self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
- break
- except RuntimeError, e:
- if i >= 3:
- self.logger.error("%s. error: %s", fail_msg, e.args)
- return (out, err), proc
-
- return rv
-
import time
import tempfile
-logger = logging.getLogger("neco.execution.utils.sshfuncs")
+
+logger = logging.getLogger("sshfuncs")
+
+def log(msg, level, out = None, err = None):
+ if out:
+ msg += " - OUT: %s " % out
+
+ if err:
+ msg += " - ERROR: %s " % err
+
+ logger.log(level, msg)
+
if hasattr(os, "devnull"):
DEV_NULL = os.devnull
env = None,
tty = False,
timeout = None,
- retry = 0,
+ retry = 3,
err_on_timeout = True,
connect_timeout = 30,
persistent = True,
args.append(command)
- for x in xrange(retry or 3):
+ for x in xrange(retry):
# connects to the remote host and starts a remote connection
proc = subprocess.Popen(args,
stdout = subprocess.PIPE,
try:
out, err = _communicate(proc, stdin, timeout, err_on_timeout)
- logger.debug("COMMAND host %s, command %s, out %s, error %s" % (
- host, " ".join(args), out, err))
+ msg = " rexec - host %s - command %s " % (host, " ".join(args))
+ log(msg, logging.DEBUG, out, err)
if proc.poll():
+ skip = False
+
if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
# SSH error, can safely retry
- continue
+ skip = True
elif retry:
# Probably timed out or plain failed but can retry
+ skip = True
+
+ if skip:
+ t = x*2
+ msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % (
+ t, x, host, " ".join(args))
+ log(msg, logging.DEBUG)
+
+ time.sleep(t)
continue
break
except RuntimeError, e:
- logger.debug("EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % (
- host, " ".join(args), out, err, e.args))
+ msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
+ log(msg, logging.DEBUG, out, err)
if retry <= 0:
raise
in which case it is advised that the destination be a folder.
"""
- logger.debug("SCP %s %s" % (source, dest))
+ msg = " rcopy - scp %s %s " % (source, dest)
+ log(msg, logging.DEBUG)
if isinstance(source, file) and source.tell() == 0:
source = source.name
finally:
ec.shutdown()
- def test_deploy_fedora(self):
+ @skipIfNotAlive
+ def t_concurrency(self, host, user):
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ apps = list()
+ for i in xrange(50):
+ app = ec.register_resource("LinuxApplication")
+ cmd = "ping -c5 %s" % self.target
+ ec.set(app, "command", cmd)
+ ec.register_connection(app, node)
+ apps.append(app)
+
+ try:
+ ec.deploy()
+
+ while not all([ec.state(guid) == ResourceState.FINISHED \
+ for guid in apps]):
+ time.sleep(0.5)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(
+ all([ec.state(guid) == ResourceState.FINISHED \
+ for guid in apps])
+ )
+
+ for app in apps:
+ stdout = ec.trace(app, 'stdout')
+ size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+ self.assertEquals(len(stdout), size)
+
+ block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+ self.assertEquals(block, stdout[5:10])
+
+ path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+ rm = ec.get_resource(app)
+ p = os.path.join(rm.home, 'stdout')
+ self.assertEquals(path, p)
+
+ finally:
+ ec.shutdown()
+
+ def test_ping_fedora(self):
self.t_ping(self.fedora_host, self.fedora_user)
- def test_deploy_ubuntu(self):
+ def test_fing_ubuntu(self):
self.t_ping(self.ubuntu_host, self.ubuntu_user)
+ def test_concurrency_fedora(self):
+ self.t_concurrency(self.fedora_host, self.fedora_user)
+
+ def test_concurrency_ubuntu(self):
+ self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
+
if __name__ == '__main__':
unittest.main()
def skipInteractive(func):
name = func.__name__
def wrapped(*args, **kwargs):
- mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
+ mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes']
if not mode:
print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
return