From: Alina Quereilhac Date: Mon, 6 May 2013 18:08:21 +0000 (+0200) Subject: Supporting many concurrent LinuxApplications on same LinuxNode X-Git-Tag: nepi-3.0.0~122^2~8 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;ds=sidebyside;h=b849b5d9bd2569b1db4dce2a65296e1c619bd0a7;p=nepi.git Supporting many concurrent LinuxApplications on same LinuxNode --- diff --git a/src/neco/__init__.py b/src/neco/__init__.py index d6f1dc03..00e54fea 100644 --- a/src/neco/__init__.py +++ b/src/neco/__init__.py @@ -1,8 +1,31 @@ import logging import os +import traceback -LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "DEBUG").upper() +LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "INFO").upper() LOGLEVEL = getattr(logging, LOGLEVEL) -FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s" -logging.basicConfig(format = FORMAT, level = LOGLEVEL) +#FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s" +FORMAT = "%(asctime)s %(name)s %(levelname)-4s %(message)s" + +# NEPI_LOG variable contains space separated components +# on which logging should be enabled +LOG = os.environ.get("NEPI_LOG", "ALL").upper() + +if LOG != 'ALL': + # Set by default loglevel to error + logging.basicConfig(format = FORMAT, level = logging.ERROR) + + # Set logging level to that defined by the user + # only for the enabled components + for component in LOG.split(" "): + try: + log = logging.getLogger(component) + log.setLevel(LOGLEVEL) + except: + err = traceback.format_exc() + print "ERROR ", err +else: + # Set the logging level defined by the user for all + # components + logging.basicConfig(format = FORMAT, level = LOGLEVEL) diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 07a4ee6d..c9e4e069 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -45,7 +45,7 @@ class ExperimentController(object): self._thread.start() # Logging - self._logger = logging.getLogger("neco.execution.ec") + self._logger = logging.getLogger("ExperimentController") @property def logger(self): diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 7669804d..322e2764 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -152,7 +152,33 @@ class ResourceManager(object): self._release_time = None # Logging - self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " % (self._rtype, self.guid)) + self._logger = logging.getLogger("Resource") + + def debug(self, msg, out = None, err = None): + self.log(msg, logging.DEBUG, out, err) + + def error(self, msg, out = None, err = None): + self.log(msg, logging.ERROR, out, err) + + def warn(self, msg, out = None, err = None): + self.log(msg, logging.WARNING, out, err) + + def info(self, msg, out = None, err = None): + self.log(msg, logging.INFO, out, err) + + def log(self, msg, level, out = None, err = None): + if out: + msg += " - OUT: %s " % out + + if err: + msg += " - ERROR: %s " % err + + msg = self.log_message(msg) + + self.logger.log(level, msg) + + def log_message(self, msg): + return " %s guid: %d - %s " % (self._rtype, self.guid, msg) @property def logger(self): @@ -225,7 +251,7 @@ class ResourceManager(object): """ if not self._state in [ResourceState.READY, ResourceState.STOPPED]: - self.logger.error("Wrong state %s for start" % self.state) + self.error("Wrong state %s for start" % self.state) return self._start_time = strfnow() @@ -236,7 +262,7 @@ class ResourceManager(object): """ if not self._state in [ResourceState.STARTED]: - self.logger.error("Wrong state %s for stop" % self.state) + self.error("Wrong state %s for stop" % self.state) return self._stop_time = strfnow() @@ -433,9 +459,9 @@ class ResourceManager(object): # only can start when RM is either STOPPED or READY if self.state not in [ResourceState.STOPPED, ResourceState.READY]: reschedule = True - self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state ) + self.debug("---- RESCHEDULING START ---- state %s " % self.state ) else: - self.logger.debug("---- START CONDITIONS ---- %s" % + self.debug("---- START CONDITIONS ---- %s" % self.conditions.get(ResourceAction.START)) # Verify all start conditions are met @@ -448,7 +474,7 @@ class ResourceManager(object): if reschedule: self.ec.schedule(delay, self.start_with_conditions) else: - self.logger.debug("----- STARTING ---- ") + self.debug("----- STARTING ---- ") self.start() def stop_with_conditions(self): @@ -465,7 +491,7 @@ class ResourceManager(object): if self.state != ResourceState.STARTED: reschedule = True else: - self.logger.debug(" ---- STOP CONDITIONS ---- %s" % + self.debug(" ---- STOP CONDITIONS ---- %s" % self.conditions.get(ResourceAction.STOP)) stop_conditions = self.conditions.get(ResourceAction.STOP, []) @@ -486,9 +512,10 @@ class ResourceManager(object): """ if self._state > ResourceState.READY: - self.logger.error("Wrong state %s for deploy" % self.state) + self.error("Wrong state %s for deploy" % self.state) return + self.debug("----- DEPLOYING ---- ") self._ready_time = strfnow() self._state = ResourceState.READY diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index 321c696c..bdd271b9 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -103,7 +103,11 @@ class LinuxApplication(ResourceManager): self._ppid = None self._home = "app-%s" % self.guid - self._logger = logging.getLogger("neco.linux.Application.%d" % guid) + self._logger = logging.getLogger("LinuxApplication") + + def log_message(self, msg): + return " guid %d - host %s - %s " % (self.guid, + self.node.get("hostname"), msg) @property def node(self): @@ -138,9 +142,8 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.execute(cmd) if (err and proc.poll()) or out.find("error") != -1: - err_msg = " Couldn't find trace %s on host %s. Error: %s" % ( - name, self.node.get("hostname"), err) - self.logger.error(err_msg) + msg = " Couldn't find trace %s " % name + self.error(msg, out, err) return None if attr == TraceAttr.PATH: @@ -150,9 +153,8 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.check_output(self.home, name) if err and proc.poll(): - err_msg = " Couldn't read trace %s on host %s. Error: %s" % ( - name, self.node.get("hostname"), err) - self.logger.error(err_msg) + msg = " Couldn't read trace %s " % name + self.error(msg, out, err) return None return out @@ -165,9 +167,8 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.execute(cmd) if err and proc.poll(): - err_msg = " Couldn't find trace %s on host %s. Error: %s" % ( - name, self.node.get("hostname"), err) - self.logger.error(err_msg) + msg = " Couldn't find trace %s " % name + self.error(msg, out, err) return None if attr == TraceAttr.SIZE: @@ -202,7 +203,7 @@ class LinuxApplication(ResourceManager): # check if sources need to be uploaded and upload them sources = self.get("sources") if sources: - self.logger.debug(" Uploading sources %s" % sources) + self.info(" Uploading sources ") # create dir for sources self.node.mkdir(self.src_dir) @@ -229,7 +230,7 @@ class LinuxApplication(ResourceManager): # create dir for sources self.node.mkdir(self.src_dir) - self.logger.debug(" Uploading code '%s'" % code) + self.info(" Uploading code ") dst = os.path.join(self.src_dir, "code") self.node.upload(sources, dst, text = True) @@ -237,13 +238,13 @@ class LinuxApplication(ResourceManager): def install_dependencies(self): depends = self.get("depends") if depends: - self.logger.debug(" Installing dependencies %s" % depends) + self.info(" Installing dependencies %s" % depends) self.node.install_packages(depends, home = self.home) def build(self): build = self.get("build") if build: - self.logger.debug(" Building sources '%s'" % build) + self.info(" Building sources ") # create dir for build self.node.mkdir(self.build_dir) @@ -259,7 +260,7 @@ class LinuxApplication(ResourceManager): def install(self): install = self.get("install") if install: - self.logger.debug(" Installing sources '%s'" % install) + self.info(" Installing sources ") cmd = self.replace_paths(install) @@ -273,10 +274,15 @@ class LinuxApplication(ResourceManager): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) self.ec.schedule(DELAY, self.deploy) else: - self.discover() - self.provision() + try: + self.discover() + self.provision() + except: + self._state = ResourceState.FAILED + raise super(LinuxApplication, self).deploy() @@ -286,12 +292,12 @@ class LinuxApplication(ResourceManager): stdin = 'stdin' if self.get("stdin") else None sudo = self.get('sudo') or False x11 = self.get("forwardX11") or False - err_msg = "Failed to run command %s on host %s" % ( - command, self.node.get("hostname")) failed = False super(LinuxApplication, self).start() + self.info("Starting command %s" % command) + if x11: (out, err), proc = self.node.execute(command, sudo = sudo, @@ -323,33 +329,40 @@ class LinuxApplication(ResourceManager): if failed or out or chkerr: # check if execution errors occurred + msg = " Failed to start command '%s' " % command + out = out if err: - err_msg = "%s. Proc error: %s" % (err_msg, err) + err = err + elif chkerr: + err = chkerr - err_msg = "%s. Run error: %s " % (err_msg, out) + self.error(msg, out, err) - if chkerr: - err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr) + msg2 = " Setting state to Failed" + self.debug(msg2) + self._state = ResourceState.FAILED - self.logger.error(err_msg) - self.state = ResourceState.FAILED + raise RuntimeError, msg def stop(self): state = self.state if state == ResourceState.STARTED: + self.info("Stopping command %s" % command) + (out, err), proc = self.node.kill(self.pid, self.ppid) if out or err: # check if execution errors occurred - err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % ( - self.get("command"), self.node.get("hostname"), err, out) - self.logger.error(err_msg) + msg = " Failed to STOP command '%s' " % self.get("command") + self.error(msg, out, err) self._state = ResourceState.FAILED stopped = False else: super(LinuxApplication, self).stop() def release(self): + self.info("Releasing resource") + tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) @@ -364,10 +377,14 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.check_output(self.home, 'stderr') if out or err: + if err.find("No such file or directory") >= 0 : + # The resource is marked as started, but the + # command was not yet executed + return ResourceState.READY + # check if execution errors occurred - err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % ( - self.get("command"), self.node.get("hostname"), err, out) - self.logger.error(err_msg) + msg = " Failed to execute command '%s'" % self.get("command") + self.error(msg, out, err) self._state = ResourceState.FAILED elif self.pid and self.ppid: diff --git a/src/neco/resources/linux/channel.py b/src/neco/resources/linux/channel.py index 23f87b5a..f4c1cf22 100644 --- a/src/neco/resources/linux/channel.py +++ b/src/neco/resources/linux/channel.py @@ -17,7 +17,10 @@ class LinuxChannel(ResourceManager): def __init__(self, ec, guid): super(LinuxChannel, self).__init__(ec, guid) - self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid) + self._logger = logging.getLogger("LinuxChannel") + + def log_message(self, msg): + return " guid %d - %s " % (self.guid, msg) def valid_connection(self, guid): # TODO: Validate! diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index a030eb43..f7c9daac 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -13,7 +13,7 @@ import time import threading # TODO: Verify files and dirs exists already -# TODO: Blacklist node! +# TODO: Blacklist nodes! DELAY ="1s" @@ -70,7 +70,11 @@ class LinuxNode(ResourceManager): # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() - self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid) + self._logger = logging.getLogger("LinuxNode") + + def log_message(self, msg): + return " guid %d - host %s - %s " % (self.guid, + self.get("hostname"), msg) @property def home(self): @@ -92,16 +96,16 @@ class LinuxNode(ResourceManager): return self._os if (not self.get("hostname") or not self.get("username")): - msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid - self.logger.error(msg) + msg = "Can't resolve OS, insufficient data " + self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue") + (out, err), proc = self.execute("cat /etc/issue", with_lock = True) if err and proc.poll(): - msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err) - self.logger.error(msg) - raise RuntimeError, msg + msg = "Error detecting OS " + self.error(msg, out, err) + raise RuntimeError, "%s - %s - %s" %( msg, out, err ) if out.find("Fedora release 12") == 0: self._os = "f12" @@ -112,9 +116,9 @@ class LinuxNode(ResourceManager): elif out.find("Ubuntu") ==0: self._os = "ubuntu" else: - msg = "Unsupported OS %s for host %s" % (out, self.get("hostname")) - self.logger.error(msg) - raise RuntimeError, msg + msg = "Unsupported OS" + self.error(msg, out) + raise RuntimeError, "%s - %s " %( msg, out ) return self._os @@ -125,7 +129,7 @@ class LinuxNode(ResourceManager): def provision(self, filters = None): if not self.is_alive(): self._state = ResourceState.FAILED - self.logger.error("Deploy failed. Unresponsive node") + self.error("Deploy failed. Unresponsive node") return if self.get("cleanProcesses"): @@ -140,8 +144,12 @@ class LinuxNode(ResourceManager): def deploy(self): if self.state == ResourceState.NEW: - self.discover() - self.provision() + try: + self.discover() + self.provision() + except: + self._state = ResourceState.FAILED + raise # Node needs to wait until all associated interfaces are # ready before it can finalize deployment @@ -165,29 +173,36 @@ class LinuxNode(ResourceManager): # TODO: Validate! return True - def clean_processes(self): - self.logger.info("Cleaning up processes") + def clean_processes(self, killer = False): + self.info("Cleaning up processes") - cmd = ("sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; ") + if killer: + # Hardcore kill + cmd = ("sudo -S killall python tcpdump || /bin/true ; " + + "sudo -S killall python tcpdump || /bin/true ; " + + "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + + "sudo -S killall -u root || /bin/true ; " + + "sudo -S killall -u root || /bin/true ; ") + else: + # Be gentler... + cmd = ("sudo -S killall tcpdump || /bin/true ; " + + "sudo -S killall tcpdump || /bin/true ; " + + "sudo -S killall -u %s || /bin/true ; " % self.get("username") + + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) + out = err = "" - with self._lock: - (out, err), proc = self.execute(cmd) + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def clean_home(self): - self.logger.info("Cleaning up home") + self.info("Cleaning up home") cmd = ("cd %s ; " % self.home + "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+ " -execdir rm -rf {} + ") out = err = "" - with self._lock: - (out, err), proc = self.execute(cmd) + (out, err), proc = self.execute(cmd, with_lock = True) def upload(self, src, dst, text = False): """ Copy content to destination @@ -235,18 +250,16 @@ class LinuxNode(ResourceManager): elif self.os in ["debian", "ubuntu"]: cmd = debfuncs.install_packages_command(self.os, packages) else: - msg = "Error installing packages. OS not known for host %s " % ( - self.get("hostname")) - self.logger.error(msg) + msg = "Error installing packages ( OS not known ) " + self.error(msg, self.os) raise RuntimeError, msg out = err = "" - with self._lock: - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "instpkg_pid", - stdout = "instpkg_log", - stderr = "instpkg_err", - raise_on_error = True) + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "instpkg_pid", + stdout = "instpkg_log", + stderr = "instpkg_err", + raise_on_error = True) return (out, err), proc @@ -259,18 +272,16 @@ class LinuxNode(ResourceManager): elif self.os in ["debian", "ubuntu"]: cmd = debfuncs.remove_packages_command(self.os, packages) else: - msg = "Error removing packages. OS not known for host %s " % ( - self.get("hostname")) - self.logger.error(msg) + msg = "Error removing packages ( OS not known ) " + self.error(msg) raise RuntimeError, msg out = err = "" - with self._lock: - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "rmpkg_pid", - stdout = "rmpkg_log", - stderr = "rmpkg_err", - raise_on_error = True) + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "rmpkg_pid", + stdout = "rmpkg_log", + stderr = "rmpkg_err", + raise_on_error = True) return (out, err), proc @@ -278,10 +289,10 @@ class LinuxNode(ResourceManager): if clean: self.rmdir(path) - return self.execute("mkdir -p %s" % path) + return self.execute("mkdir -p %s" % path, with_lock = True) def rmdir(self, path): - return self.execute("rm -rf %s" % path) + return self.execute("rm -rf %s" % path, with_lock = True) def run_and_wait(self, command, home = ".", @@ -307,9 +318,8 @@ class LinuxNode(ResourceManager): # check no errors occurred if proc.poll() and err: - msg = " Failed to run command %s on host %s" % ( - command, self.get("hostname")) - self.logger.error(msg) + msg = " Failed to run command '%s' " % command + self.error(msg, out, err) if raise_on_error: raise RuntimeError, msg @@ -326,12 +336,9 @@ class LinuxNode(ResourceManager): (out, err), proc = self.check_output(home, stderr) if err or out: - msg = "Error while running command %s on host %s. error output: %s" % ( - command, self.get("hostname"), out) - if err: - msg += " . err: %s" % err + msg = " Failed to run command '%s' " % command + self.error(msg, out, err) - self.logger.error(msg) if raise_on_error: raise RuntimeError, msg @@ -352,9 +359,10 @@ class LinuxNode(ResourceManager): time.sleep(delay) delay = min(30,delay*1.2) else: - msg = " Failed to get pid for pidfile %s/%s on host %s" % ( - home, pidfile, self.get("hostname")) - self.logger.error(msg) + msg = " Failed to get pid for pidfile %s/%s " % ( + home, pidfile ) + self.error(msg) + if raise_on_error: raise RuntimeError, msg @@ -387,7 +395,7 @@ class LinuxNode(ResourceManager): def check_output(self, home, filename): """ checks file content """ (out, err), proc = self.execute("cat %s" % - os.path.join(home, filename)) + os.path.join(home, filename), with_lock = True) return (out, err), proc def is_alive(self): @@ -396,19 +404,19 @@ class LinuxNode(ResourceManager): out = err = "" try: - (out, err), proc = self.execute("echo 'ALIVE'") + (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True) except: import traceback trace = traceback.format_exc() - self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s", - self.get("hostname"), out, err, trace) + msg = "Unresponsive host " + self.warn(msg, out, trace) return False if out.strip().startswith('ALIVE'): return True else: - self.logger.warn("Unresponsive host %s. got:\n%s%s", - self.get("hostname"), out, err) + msg = "Unresponsive host " + self.warn(msg, out, err) return False # TODO! @@ -420,12 +428,13 @@ class LinuxNode(ResourceManager): (out, err), proc = execfuncs.lcopy(source, dest, recursive = True) else: - (out, err), proc = self.safe_retry(sshfuncs.rcopy)( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True) + with self._lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True) return (out, err), proc @@ -436,10 +445,11 @@ class LinuxNode(ResourceManager): tty = False, forward_x11 = False, timeout = None, - retry = 0, + retry = 3, err_on_timeout = True, connect_timeout = 30, - persistent = True + persistent = True, + with_lock = False ): """ Notice that this invocation will block until the execution finishes. If this is not the desired behavior, @@ -452,7 +462,29 @@ class LinuxNode(ResourceManager): stdin = stdin, env = env) else: - (out, err), proc = self.safe_retry(sshfuncs.rexec)( + if with_lock: + with self._lock: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent + ) + else: + (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), user = self.get("username"), @@ -483,7 +515,7 @@ class LinuxNode(ResourceManager): stderr = 'stderr', sudo = False): - self.logger.info("Running %s", command) + self.debug("Running %s" % command) if self.localhost: (out, err), proc = execfuncs.lspawn(command, pidfile, @@ -497,22 +529,23 @@ class LinuxNode(ResourceManager): else: # Start process in a "daemonized" way, using nohup and heavy # stdin/out redirection to avoid connection issues - (out,err), proc = self.safe_retry(sshfuncs.rspawn)( - command, - pidfile = pidfile, - home = home, - create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + with self._lock: + (out,err), proc = sshfuncs.rspawn( + command, + pidfile = pidfile, + home = home, + create_home = create_home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) return (out, err), proc @@ -520,24 +553,9 @@ class LinuxNode(ResourceManager): if self.localhost: pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile)) else: - pidtuple = sshfuncs.rcheckpid( - os.path.join(home, pidfile), - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return pidtuple - - def status(self, pid, ppid): - if self.localhost: - status = execfuncs.lstatus(pid, ppid) - else: - status = sshfuncs.rstatus( - pid, ppid, + with self._lock: + pidtuple = sshfuncs.rcheckpid( + os.path.join(home, pidfile), host = self.get("hostname"), user = self.get("username"), port = self.get("port"), @@ -545,6 +563,23 @@ class LinuxNode(ResourceManager): identity = self.get("identity"), server_key = self.get("serverKey") ) + + return pidtuple + + def status(self, pid, ppid): + if self.localhost: + status = execfuncs.lstatus(pid, ppid) + else: + with self._lock: + status = sshfuncs.rstatus( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) return status @@ -557,16 +592,17 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: - (out, err), proc = self.safe_retry(sshfuncs.rkill)( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + with self._lock: + (out, err), proc = sshfuncs.rkill( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) return (out, err), proc def check_bad_host(self, out, err): @@ -578,41 +614,7 @@ class LinuxNode(ResourceManager): def blacklist(self): # TODO!!!! - self.logger.warn("Blacklisting malfunctioning node %s", self.hostname) + self.warn(" Blacklisting malfunctioning node ") #import util #util.appendBlacklist(self.hostname) - def safe_retry(self, func): - """Retries a function invocation using a lock""" - import functools - @functools.wraps(func) - def rv(*p, **kw): - fail_msg = " Failed to execute function %s(%s, %s) at host %s" % ( - func.__name__, p, kw, self.get("hostname")) - retry = kw.pop("_retry", False) - wlock = kw.pop("_with_lock", False) - - out = err = "" - proc = None - for i in xrange(0 if retry else 4): - try: - if wlock: - with self._lock: - (out, err), proc = func(*p, **kw) - else: - (out, err), proc = func(*p, **kw) - - if proc.poll(): - if retry: - time.sleep(i*15) - continue - else: - self.logger.error("%s. out: %s error: %s", fail_msg, out, err) - break - except RuntimeError, e: - if i >= 3: - self.logger.error("%s. error: %s", fail_msg, e.args) - return (out, err), proc - - return rv - diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index 0589fb8b..b5d8f0ed 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -12,7 +12,18 @@ import subprocess import time import tempfile -logger = logging.getLogger("neco.execution.utils.sshfuncs") + +logger = logging.getLogger("sshfuncs") + +def log(msg, level, out = None, err = None): + if out: + msg += " - OUT: %s " % out + + if err: + msg += " - ERROR: %s " % err + + logger.log(level, msg) + if hasattr(os, "devnull"): DEV_NULL = os.devnull @@ -176,7 +187,7 @@ def rexec(command, host, user, env = None, tty = False, timeout = None, - retry = 0, + retry = 3, err_on_timeout = True, connect_timeout = 30, persistent = True, @@ -226,7 +237,7 @@ def rexec(command, host, user, args.append(command) - for x in xrange(retry or 3): + for x in xrange(retry): # connects to the remote host and starts a remote connection proc = subprocess.Popen(args, stdout = subprocess.PIPE, @@ -239,20 +250,31 @@ def rexec(command, host, user, try: out, err = _communicate(proc, stdin, timeout, err_on_timeout) - logger.debug("COMMAND host %s, command %s, out %s, error %s" % ( - host, " ".join(args), out, err)) + msg = " rexec - host %s - command %s " % (host, " ".join(args)) + log(msg, logging.DEBUG, out, err) if proc.poll(): + skip = False + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): # SSH error, can safely retry - continue + skip = True elif retry: # Probably timed out or plain failed but can retry + skip = True + + if skip: + t = x*2 + msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % ( + t, x, host, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) continue break except RuntimeError, e: - logger.debug("EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % ( - host, " ".join(args), out, err, e.args)) + msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) + log(msg, logging.DEBUG, out, err) if retry <= 0: raise @@ -284,7 +306,8 @@ def rcopy(source, dest, in which case it is advised that the destination be a folder. """ - logger.debug("SCP %s %s" % (source, dest)) + msg = " rcopy - scp %s %s " % (source, dest) + log(msg, logging.DEBUG) if isinstance(source, file) and source.tell() == 0: source = source.name diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index 17beb5af..445cef7d 100644 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -67,12 +67,70 @@ class LinuxApplicationTestCase(unittest.TestCase): finally: ec.shutdown() - def test_deploy_fedora(self): + @skipIfNotAlive + def t_concurrency(self, host, user): + from neco.execution.resource import ResourceFactory + + ResourceFactory.register_type(LinuxNode) + ResourceFactory.register_type(LinuxApplication) + + ec = ExperimentController() + + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + apps = list() + for i in xrange(50): + app = ec.register_resource("LinuxApplication") + cmd = "ping -c5 %s" % self.target + ec.set(app, "command", cmd) + ec.register_connection(app, node) + apps.append(app) + + try: + ec.deploy() + + while not all([ec.state(guid) == ResourceState.FINISHED \ + for guid in apps]): + time.sleep(0.5) + + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue( + all([ec.state(guid) == ResourceState.FINISHED \ + for guid in apps]) + ) + + for app in apps: + stdout = ec.trace(app, 'stdout') + size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE) + self.assertEquals(len(stdout), size) + + block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1) + self.assertEquals(block, stdout[5:10]) + + path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) + rm = ec.get_resource(app) + p = os.path.join(rm.home, 'stdout') + self.assertEquals(path, p) + + finally: + ec.shutdown() + + def test_ping_fedora(self): self.t_ping(self.fedora_host, self.fedora_user) - def test_deploy_ubuntu(self): + def test_fing_ubuntu(self): self.t_ping(self.ubuntu_host, self.ubuntu_user) + def test_concurrency_fedora(self): + self.t_concurrency(self.fedora_host, self.fedora_user) + + def test_concurrency_ubuntu(self): + self.t_concurrency(self.ubuntu_host, self.ubuntu_user) + if __name__ == '__main__': unittest.main() diff --git a/test/resources/linux/test_utils.py b/test/resources/linux/test_utils.py index 885af79c..369e022a 100644 --- a/test/resources/linux/test_utils.py +++ b/test/resources/linux/test_utils.py @@ -35,7 +35,7 @@ def skipIfNotAlive(func): def skipInteractive(func): name = func.__name__ def wrapped(*args, **kwargs): - mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES'] + mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes'] if not mode: print "*** WARNING: Skipping test %s: Interactive mode off \n" % name return