Supporting many concurrent LinuxApplications on same LinuxNode
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 6 May 2013 18:08:21 +0000 (20:08 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 6 May 2013 18:08:21 +0000 (20:08 +0200)
src/neco/__init__.py
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/linux/channel.py
src/neco/resources/linux/node.py
src/neco/util/sshfuncs.py
test/resources/linux/application.py
test/resources/linux/test_utils.py

index d6f1dc0..00e54fe 100644 (file)
@@ -1,8 +1,31 @@
 import logging
 import os
+import traceback
 
-LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "DEBUG").upper()
+LOGLEVEL = os.environ.get("NEPI_LOGLEVEL", "INFO").upper()
 LOGLEVEL = getattr(logging, LOGLEVEL)
-FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
-logging.basicConfig(format = FORMAT, level = LOGLEVEL)
+#FORMAT = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+FORMAT = "%(asctime)s %(name)s %(levelname)-4s %(message)s"
+
+# NEPI_LOG variable contains space separated components 
+# on which logging should be enabled
+LOG = os.environ.get("NEPI_LOG", "ALL").upper()
+
+if LOG != 'ALL':
+    # Set by default loglevel to error
+    logging.basicConfig(format = FORMAT, level = logging.ERROR)
+
+    # Set logging level to that defined by the user
+    # only for the enabled components
+    for component in LOG.split(" "):
+        try:
+           log = logging.getLogger(component)
+           log.setLevel(LOGLEVEL)
+        except:
+            err = traceback.format_exc()
+            print "ERROR ", err
+else:
+    # Set the logging level defined by the user for all
+    # components
+    logging.basicConfig(format = FORMAT, level = LOGLEVEL)
 
index 07a4ee6..c9e4e06 100644 (file)
@@ -45,7 +45,7 @@ class ExperimentController(object):
         self._thread.start()
 
         # Logging
-        self._logger = logging.getLogger("neco.execution.ec")
+        self._logger = logging.getLogger("ExperimentController")
 
     @property
     def logger(self):
index 7669804..322e276 100644 (file)
@@ -152,7 +152,33 @@ class ResourceManager(object):
         self._release_time = None
 
         # Logging
-        self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " %  (self._rtype, self.guid))
+        self._logger = logging.getLogger("Resource")
+
+    def debug(self, msg, out = None, err = None):
+        self.log(msg, logging.DEBUG, out, err)
+
+    def error(self, msg, out = None, err = None):
+        self.log(msg, logging.ERROR, out, err)
+
+    def warn(self, msg, out = None, err = None):
+        self.log(msg, logging.WARNING, out, err)
+
+    def info(self, msg, out = None, err = None):
+        self.log(msg, logging.INFO, out, err)
+
+    def log(self, msg, level, out = None, err = None):
+        if out:
+            msg += " - OUT: %s " % out
+
+        if err:
+            msg += " - ERROR: %s " % err
+
+        msg = self.log_message(msg)
+
+        self.logger.log(level, msg)
+
+    def log_message(self, msg):
+        return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
     @property
     def logger(self):
@@ -225,7 +251,7 @@ class ResourceManager(object):
 
         """
         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
-            self.logger.error("Wrong state %s for start" % self.state)
+            self.error("Wrong state %s for start" % self.state)
             return
 
         self._start_time = strfnow()
@@ -236,7 +262,7 @@ class ResourceManager(object):
 
         """
         if not self._state in [ResourceState.STARTED]:
-            self.logger.error("Wrong state %s for stop" % self.state)
+            self.error("Wrong state %s for stop" % self.state)
             return
 
         self._stop_time = strfnow()
@@ -433,9 +459,9 @@ class ResourceManager(object):
         # only can start when RM is either STOPPED or READY
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
-            self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state )
+            self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
-            self.logger.debug("---- START CONDITIONS ---- %s" % 
+            self.debug("---- START CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.START))
             
             # Verify all start conditions are met
@@ -448,7 +474,7 @@ class ResourceManager(object):
         if reschedule:
             self.ec.schedule(delay, self.start_with_conditions)
         else:
-            self.logger.debug("----- STARTING ---- ")
+            self.debug("----- STARTING ---- ")
             self.start()
 
     def stop_with_conditions(self):
@@ -465,7 +491,7 @@ class ResourceManager(object):
         if self.state != ResourceState.STARTED:
             reschedule = True
         else:
-            self.logger.debug(" ---- STOP CONDITIONS ---- %s" % 
+            self.debug(" ---- STOP CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.STOP))
 
             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
@@ -486,9 +512,10 @@ class ResourceManager(object):
 
         """
         if self._state > ResourceState.READY:
-            self.logger.error("Wrong state %s for deploy" % self.state)
+            self.error("Wrong state %s for deploy" % self.state)
             return
 
+        self.debug("----- DEPLOYING ---- ")
         self._ready_time = strfnow()
         self._state = ResourceState.READY
 
index 321c696..bdd271b 100644 (file)
@@ -103,7 +103,11 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
-        self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+        self._logger = logging.getLogger("LinuxApplication")
+    
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.node.get("hostname"), msg)
 
     @property
     def node(self):
@@ -138,9 +142,8 @@ class LinuxApplication(ResourceManager):
         (out, err), proc = self.node.execute(cmd)
 
         if (err and proc.poll()) or out.find("error") != -1:
-            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
-                    name, self.node.get("hostname"), err)
-            self.logger.error(err_msg)
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
             return None
     
         if attr == TraceAttr.PATH:
@@ -150,9 +153,8 @@ class LinuxApplication(ResourceManager):
             (out, err), proc = self.node.check_output(self.home, name)
             
             if err and proc.poll():
-                err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
-                            name, self.node.get("hostname"), err)
-                self.logger.error(err_msg)
+                msg = " Couldn't read trace %s " % name
+                self.error(msg, out, err)
                 return None
 
             return out
@@ -165,9 +167,8 @@ class LinuxApplication(ResourceManager):
         (out, err), proc = self.node.execute(cmd)
 
         if err and proc.poll():
-            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
-                    name, self.node.get("hostname"), err)
-            self.logger.error(err_msg)
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
             return None
         
         if attr == TraceAttr.SIZE:
@@ -202,7 +203,7 @@ class LinuxApplication(ResourceManager):
         # check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
-            self.logger.debug(" Uploading sources %s" % sources)
+            self.info(" Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
@@ -229,7 +230,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.logger.debug(" Uploading code '%s'" % code)
+            self.info(" Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -237,13 +238,13 @@ class LinuxApplication(ResourceManager):
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.logger.debug(" Installing dependencies %s" % depends)
+            self.info(" Installing dependencies %s" % depends)
             self.node.install_packages(depends, home = self.home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.logger.debug(" Building sources '%s'" % build)
+            self.info(" Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
@@ -259,7 +260,7 @@ class LinuxApplication(ResourceManager):
     def install(self):
         install = self.get("install")
         if install:
-            self.logger.debug(" Installing sources '%s'" % install)
+            self.info(" Installing sources ")
 
             cmd = self.replace_paths(install)
 
@@ -273,10 +274,15 @@ class LinuxApplication(ResourceManager):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(DELAY, self.deploy)
         else:
-            self.discover()
-            self.provision()
+            try:
+                self.discover()
+                self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
 
             super(LinuxApplication, self).deploy()
 
@@ -286,12 +292,12 @@ class LinuxApplication(ResourceManager):
         stdin = 'stdin' if self.get("stdin") else None
         sudo = self.get('sudo') or False
         x11 = self.get("forwardX11") or False
-        err_msg = "Failed to run command %s on host %s" % (
-                     command, self.node.get("hostname"))
         failed = False
 
         super(LinuxApplication, self).start()
 
+        self.info("Starting command %s" % command)
+
         if x11:
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
@@ -323,33 +329,40 @@ class LinuxApplication(ResourceManager):
 
         if failed or out or chkerr:
             # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            out = out
             if err:
-                err_msg = "%s. Proc error: %s" % (err_msg, err)
+                err = err
+            elif chkerr:
+                err = chkerr
 
-            err_msg = "%s. Run error: %s " % (err_msg, out)
+            self.error(msg, out, err)
 
-            if chkerr:
-                err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+            msg2 = " Setting state to Failed"
+            self.debug(msg2)
+            self._state = ResourceState.FAILED
 
-            self.logger.error(err_msg)
-            self.state = ResourceState.FAILED
+            raise RuntimeError, msg
 
     def stop(self):
         state = self.state
         if state == ResourceState.STARTED:
+            self.info("Stopping command %s" % command)
+
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
             if out or err:
                 # check if execution errors occurred
-                err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
-                     self.get("command"), self.node.get("hostname"), err, out)
-                self.logger.error(err_msg)
+                msg = " Failed to STOP command '%s' " % self.get("command")
+                self.error(msg, out, err)
                 self._state = ResourceState.FAILED
                 stopped = False
             else:
                 super(LinuxApplication, self).stop()
 
     def release(self):
+        self.info("Releasing resource")
+
         tear_down = self.get("tearDown")
         if tear_down:
             self.node.execute(tear_down)
@@ -364,10 +377,14 @@ class LinuxApplication(ResourceManager):
             (out, err), proc = self.node.check_output(self.home, 'stderr')
 
             if out or err:
+                if err.find("No such file or directory") >= 0 :
+                    # The resource is marked as started, but the
+                    # command was not yet executed
+                    return ResourceState.READY
+
                 # check if execution errors occurred
-                err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
-                        self.get("command"), self.node.get("hostname"), err, out)
-                self.logger.error(err_msg)
+                msg = " Failed to execute command '%s'" % self.get("command")
+                self.error(msg, out, err)
                 self._state = ResourceState.FAILED
 
             elif self.pid and self.ppid:
index 23f87b5..f4c1cf2 100644 (file)
@@ -17,7 +17,10 @@ class LinuxChannel(ResourceManager):
 
     def __init__(self, ec, guid):
         super(LinuxChannel, self).__init__(ec, guid)
-        self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid)
+        self._logger = logging.getLogger("LinuxChannel")
+
+    def log_message(self, msg):
+        return " guid %d - %s " % (self.guid, msg)
 
     def valid_connection(self, guid):
         # TODO: Validate!
index a030eb4..f7c9daa 100644 (file)
@@ -13,7 +13,7 @@ import time
 import threading
 
 # TODO: Verify files and dirs exists already
-# TODO: Blacklist node!
+# TODO: Blacklist nodes!
 
 DELAY ="1s"
 
@@ -70,7 +70,11 @@ class LinuxNode(ResourceManager):
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
 
-        self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
+        self._logger = logging.getLogger("LinuxNode")
+    
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.get("hostname"), msg)
 
     @property
     def home(self):
@@ -92,16 +96,16 @@ class LinuxNode(ResourceManager):
             return self._os
 
         if (not self.get("hostname") or not self.get("username")):
-            msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
-            self.logger.error(msg)
+            msg = "Can't resolve OS, insufficient data "
+            self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue")
+        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
 
         if err and proc.poll():
-            msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
-            self.logger.error(msg)
-            raise RuntimeError, msg
+            msg = "Error detecting OS "
+            self.error(msg, out, err)
+            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
             self._os = "f12"
@@ -112,9 +116,9 @@ class LinuxNode(ResourceManager):
         elif out.find("Ubuntu") ==0:
             self._os = "ubuntu"
         else:
-            msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
-            self.logger.error(msg)
-            raise RuntimeError, msg
+            msg = "Unsupported OS"
+            self.error(msg, out)
+            raise RuntimeError, "%s - %s " %( msg, out )
 
         return self._os
 
@@ -125,7 +129,7 @@ class LinuxNode(ResourceManager):
     def provision(self, filters = None):
         if not self.is_alive():
             self._state = ResourceState.FAILED
-            self.logger.error("Deploy failed. Unresponsive node")
+            self.error("Deploy failed. Unresponsive node")
             return
 
         if self.get("cleanProcesses"):
@@ -140,8 +144,12 @@ class LinuxNode(ResourceManager):
 
     def deploy(self):
         if self.state == ResourceState.NEW:
-            self.discover()
-            self.provision()
+            try:
+               self.discover()
+               self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
@@ -165,29 +173,36 @@ class LinuxNode(ResourceManager):
         # TODO: Validate!
         return True
 
-    def clean_processes(self):
-        self.logger.info("Cleaning up processes")
+    def clean_processes(self, killer = False):
+        self.info("Cleaning up processes")
         
-        cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
-            "sudo -S killall python tcpdump || /bin/true ; " +
-            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
-            "sudo -S killall -u root || /bin/true ; " +
-            "sudo -S killall -u root || /bin/true ; ")
+        if killer:
+            # Hardcore kill
+            cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; ")
+        else:
+            # Be gentler...
+            cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+
 
         out = err = ""
-        with self._lock:
-           (out, err), proc = self.execute(cmd) 
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
     def clean_home(self):
-        self.logger.info("Cleaning up home")
+        self.info("Cleaning up home")
 
         cmd = ("cd %s ; " % self.home +
             "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
             " -execdir rm -rf {} + ")
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.execute(cmd)
+        (out, err), proc = self.execute(cmd, with_lock = True)
 
     def upload(self, src, dst, text = False):
         """ Copy content to destination
@@ -235,18 +250,16 @@ class LinuxNode(ResourceManager):
         elif self.os in ["debian", "ubuntu"]:
             cmd = debfuncs.install_packages_command(self.os, packages)
         else:
-            msg = "Error installing packages. OS not known for host %s " % (
-                    self.get("hostname"))
-            self.logger.error(msg)
+            msg = "Error installing packages ( OS not known ) "
+            self.error(msg, self.os)
             raise RuntimeError, msg
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, home, 
-                pidfile = "instpkg_pid",
-                stdout = "instpkg_log", 
-                stderr = "instpkg_err", 
-                raise_on_error = True)
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "instpkg_pid",
+            stdout = "instpkg_log", 
+            stderr = "instpkg_err", 
+            raise_on_error = True)
 
         return (out, err), proc 
 
@@ -259,18 +272,16 @@ class LinuxNode(ResourceManager):
         elif self.os in ["debian", "ubuntu"]:
             cmd = debfuncs.remove_packages_command(self.os, packages)
         else:
-            msg = "Error removing packages. OS not known for host %s " % (
-                    self.get("hostname"))
-            self.logger.error(msg)
+            msg = "Error removing packages ( OS not known ) "
+            self.error(msg)
             raise RuntimeError, msg
 
         out = err = ""
-        with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, home, 
-                pidfile = "rmpkg_pid",
-                stdout = "rmpkg_log", 
-                stderr = "rmpkg_err", 
-                raise_on_error = True)
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "rmpkg_pid",
+            stdout = "rmpkg_log", 
+            stderr = "rmpkg_err", 
+            raise_on_error = True)
          
         return (out, err), proc 
 
@@ -278,10 +289,10 @@ class LinuxNode(ResourceManager):
         if clean:
             self.rmdir(path)
 
-        return self.execute("mkdir -p %s" % path)
+        return self.execute("mkdir -p %s" % path, with_lock = True)
 
     def rmdir(self, path):
-        return self.execute("rm -rf %s" % path)
+        return self.execute("rm -rf %s" % path, with_lock = True)
 
     def run_and_wait(self, command, 
             home = ".", 
@@ -307,9 +318,8 @@ class LinuxNode(ResourceManager):
 
         # check no errors occurred
         if proc.poll() and err:
-            msg = " Failed to run command %s on host %s" % (
-                    command, self.get("hostname"))
-            self.logger.error(msg)
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
             if raise_on_error:
                 raise RuntimeError, msg
 
@@ -326,12 +336,9 @@ class LinuxNode(ResourceManager):
         (out, err), proc = self.check_output(home, stderr)
 
         if err or out:
-            msg = "Error while running command %s on host %s. error output: %s" % (
-                    command, self.get("hostname"), out)
-            if err:
-                msg += " . err: %s" % err
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
 
-            self.logger.error(msg)
             if raise_on_error:
                 raise RuntimeError, msg
         
@@ -352,9 +359,10 @@ class LinuxNode(ResourceManager):
                 time.sleep(delay)
                 delay = min(30,delay*1.2)
         else:
-            msg = " Failed to get pid for pidfile %s/%s on host %s" % (
-                    home, pidfile, self.get("hostname"))
-            self.logger.error(msg)
+            msg = " Failed to get pid for pidfile %s/%s " % (
+                    home, pidfile )
+            self.error(msg)
+            
             if raise_on_error:
                 raise RuntimeError, msg
 
@@ -387,7 +395,7 @@ class LinuxNode(ResourceManager):
     def check_output(self, home, filename):
         """ checks file content """
         (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, filename))
+            os.path.join(home, filename), with_lock = True)
         return (out, err), proc
 
     def is_alive(self):
@@ -396,19 +404,19 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         try:
-            (out, err), proc = self.execute("echo 'ALIVE'")
+            (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
         except:
             import traceback
             trace = traceback.format_exc()
-            self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s", 
-                    self.get("hostname"), out, err, trace)
+            msg = "Unresponsive host "
+            self.warn(msg, out, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
-            self.logger.warn("Unresponsive host %s. got:\n%s%s", 
-                    self.get("hostname"), out, err)
+            msg = "Unresponsive host "
+            self.warn(msg, out, err)
             return False
 
             # TODO!
@@ -420,12 +428,13 @@ class LinuxNode(ResourceManager):
             (out, err), proc =  execfuncs.lcopy(source, dest, 
                     recursive = True)
         else:
-            (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
-                src, dst, 
-                port = self.get("port"),
-                identity = self.get("identity"),
-                server_key = self.get("serverKey"),
-                recursive = True)
+            with self._lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True)
 
         return (out, err), proc
 
@@ -436,10 +445,11 @@ class LinuxNode(ResourceManager):
             tty = False,
             forward_x11 = False,
             timeout = None,
-            retry = 0,
+            retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
-            persistent = True
+            persistent = True,
+            with_lock = False
             ):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
@@ -452,7 +462,29 @@ class LinuxNode(ResourceManager):
                     stdin = stdin,
                     env = env)
         else:
-            (out, err), proc = self.safe_retry(sshfuncs.rexec)(
+            if with_lock:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rexec(
+                        command, 
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        stdin = stdin,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey"),
+                        env = env,
+                        tty = tty,
+                        forward_x11 = forward_x11,
+                        timeout = timeout,
+                        retry = retry,
+                        err_on_timeout = err_on_timeout,
+                        connect_timeout = connect_timeout,
+                        persistent = persistent
+                        )
+            else:
+                (out, err), proc = sshfuncs.rexec(
                     command, 
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -483,7 +515,7 @@ class LinuxNode(ResourceManager):
             stderr = 'stderr', 
             sudo = False):
 
-        self.logger.info("Running %s", command)
+        self.debug("Running %s" % command)
         
         if self.localhost:
             (out, err), proc = execfuncs.lspawn(command, pidfile, 
@@ -497,22 +529,23 @@ class LinuxNode(ResourceManager):
         else:
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
-            (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
-                command,
-                pidfile = pidfile,
-                home = home,
-                create_home = create_home,
-                stdin = stdin if stdin is not None else '/dev/null',
-                stdout = stdout if stdout else '/dev/null',
-                stderr = stderr if stderr else '/dev/null',
-                sudo = sudo,
-                host = self.get("hostname"),
-                user = self.get("username"),
-                port = self.get("port"),
-                agent = True,
-                identity = self.get("identity"),
-                server_key = self.get("serverKey")
-                )
+            with self._lock:
+                (out,err), proc = sshfuncs.rspawn(
+                    command,
+                    pidfile = pidfile,
+                    home = home,
+                    create_home = create_home,
+                    stdin = stdin if stdin is not None else '/dev/null',
+                    stdout = stdout if stdout else '/dev/null',
+                    stderr = stderr if stderr else '/dev/null',
+                    sudo = sudo,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
 
         return (out, err), proc
 
@@ -520,24 +553,9 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
         else:
-            pidtuple = sshfuncs.rcheckpid(
-                os.path.join(home, pidfile),
-                host = self.get("hostname"),
-                user = self.get("username"),
-                port = self.get("port"),
-                agent = True,
-                identity = self.get("identity"),
-                server_key = self.get("serverKey")
-                )
-        
-        return pidtuple
-    
-    def status(self, pid, ppid):
-        if self.localhost:
-            status = execfuncs.lstatus(pid, ppid)
-        else:
-            status = sshfuncs.rstatus(
-                    pid, ppid,
+            with self._lock:
+                pidtuple = sshfuncs.rcheckpid(
+                    os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
@@ -545,6 +563,23 @@ class LinuxNode(ResourceManager):
                     identity = self.get("identity"),
                     server_key = self.get("serverKey")
                     )
+        
+        return pidtuple
+    
+    def status(self, pid, ppid):
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            with self._lock:
+                status = sshfuncs.rstatus(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
            
         return status
     
@@ -557,16 +592,17 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                (out, err), proc = self.safe_retry(sshfuncs.rkill)(
-                    pid, ppid,
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    sudo = sudo,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey")
-                    )
+                with self._lock:
+                    (out, err), proc = sshfuncs.rkill(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
         return (out, err), proc
 
     def check_bad_host(self, out, err):
@@ -578,41 +614,7 @@ class LinuxNode(ResourceManager):
 
     def blacklist(self):
         # TODO!!!!
-        self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+        self.warn(" Blacklisting malfunctioning node ")
         #import util
         #util.appendBlacklist(self.hostname)
 
-    def safe_retry(self, func):
-        """Retries a function invocation using a lock"""
-        import functools
-        @functools.wraps(func)
-        def rv(*p, **kw):
-            fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
-                func.__name__, p, kw, self.get("hostname"))
-            retry = kw.pop("_retry", False)
-            wlock = kw.pop("_with_lock", False)
-
-            out = err = ""
-            proc = None
-            for i in xrange(0 if retry else 4):
-                try:
-                    if wlock:
-                        with self._lock:
-                            (out, err), proc = func(*p, **kw)
-                    else:
-                        (out, err), proc = func(*p, **kw)
-                        
-                    if proc.poll():
-                        if retry:
-                            time.sleep(i*15)
-                            continue
-                        else:
-                            self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
-                    break
-                except RuntimeError, e:
-                    if i >= 3:
-                        self.logger.error("%s. error: %s", fail_msg, e.args)
-            return (out, err), proc
-
-        return rv
-
index 0589fb8..b5d8f0e 100644 (file)
@@ -12,7 +12,18 @@ import subprocess
 import time
 import tempfile
 
-logger = logging.getLogger("neco.execution.utils.sshfuncs")
+
+logger = logging.getLogger("sshfuncs")
+
+def log(msg, level, out = None, err = None):
+    if out:
+        msg += " - OUT: %s " % out
+
+    if err:
+        msg += " - ERROR: %s " % err
+
+    logger.log(level, msg)
+
 
 if hasattr(os, "devnull"):
     DEV_NULL = os.devnull
@@ -176,7 +187,7 @@ def rexec(command, host, user,
         env = None,
         tty = False,
         timeout = None,
-        retry = 0,
+        retry = 3,
         err_on_timeout = True,
         connect_timeout = 30,
         persistent = True,
@@ -226,7 +237,7 @@ def rexec(command, host, user,
 
     args.append(command)
 
-    for x in xrange(retry or 3):
+    for x in xrange(retry):
         # connects to the remote host and starts a remote connection
         proc = subprocess.Popen(args, 
                 stdout = subprocess.PIPE,
@@ -239,20 +250,31 @@ def rexec(command, host, user,
     
         try:
             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-            logger.debug("COMMAND host %s, command %s, out %s, error %s" % (
-                host, " ".join(args), out, err))
+            msg = " rexec - host %s - command %s " % (host, " ".join(args))
+            log(msg, logging.DEBUG, out, err)
 
             if proc.poll():
+                skip = False
+
                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
                     # SSH error, can safely retry
-                    continue
+                    skip = True 
                 elif retry:
                     # Probably timed out or plain failed but can retry
+                    skip = True 
+                
+                if skip:
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % ( 
+                            t, x, host, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
                     continue
             break
         except RuntimeError, e:
-            logger.debug("EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
-                        host, " ".join(args), out, err, e.args))
+            msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
+            log(msg, logging.DEBUG, out, err)
 
             if retry <= 0:
                 raise
@@ -284,7 +306,8 @@ def rcopy(source, dest,
     in which case it is advised that the destination be a folder.
     """
     
-    logger.debug("SCP %s %s" % (source, dest))
+    msg = " rcopy - scp %s %s " % (source, dest)
+    log(msg, logging.DEBUG)
     
     if isinstance(source, file) and source.tell() == 0:
         source = source.name
index 17beb5a..445cef7 100644 (file)
@@ -67,12 +67,70 @@ class LinuxApplicationTestCase(unittest.TestCase):
         finally:
             ec.shutdown()
 
-    def test_deploy_fedora(self):
+    @skipIfNotAlive
+    def t_concurrency(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        apps = list()
+        for i in xrange(50):
+            app = ec.register_resource("LinuxApplication")
+            cmd = "ping -c5 %s" % self.target 
+            ec.set(app, "command", cmd)
+            ec.register_connection(app, node)
+            apps.append(app)
+
+        try:
+            ec.deploy()
+
+            while not all([ec.state(guid) == ResourceState.FINISHED \
+                    for guid in apps]):
+                time.sleep(0.5)
+
+            self.assertTrue(ec.state(node) == ResourceState.STARTED)
+            self.assertTrue(
+                   all([ec.state(guid) == ResourceState.FINISHED \
+                    for guid in apps])
+                    )
+
+            for app in apps:
+                stdout = ec.trace(app, 'stdout')
+                size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+                self.assertEquals(len(stdout), size)
+                
+                block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+                self.assertEquals(block, stdout[5:10])
+
+                path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+                rm = ec.get_resource(app)
+                p = os.path.join(rm.home, 'stdout')
+                self.assertEquals(path, p)
+
+        finally:
+            ec.shutdown()
+
+    def test_ping_fedora(self):
         self.t_ping(self.fedora_host, self.fedora_user)
 
-    def test_deploy_ubuntu(self):
+    def test_fing_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
+    def test_concurrency_fedora(self):
+        self.t_concurrency(self.fedora_host, self.fedora_user)
+
+    def test_concurrency_ubuntu(self):
+        self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
+
 
 if __name__ == '__main__':
     unittest.main()
index 885af79..369e022 100644 (file)
@@ -35,7 +35,7 @@ def skipIfNotAlive(func):
 def skipInteractive(func):
     name = func.__name__
     def wrapped(*args, **kwargs):
-        mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
+        mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes']
         if not mode:
             print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
             return