Added unit tests for linux application
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 9 May 2013 18:02:51 +0000 (20:02 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 9 May 2013 18:02:51 +0000 (20:02 +0200)
14 files changed:
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/linux/debfuncs.py
src/neco/resources/linux/node.py
src/neco/resources/linux/rpmfuncs.py
src/neco/resources/omf/omf_interface.py
src/neco/util/sshfuncs.py
test/execution/ec.py
test/execution/resource.py
test/resources/linux/application.py
test/resources/linux/interface.py
test/resources/linux/node.py
test/resources/linux/test_utils.py

index c9e4e06..ba064d4 100644 (file)
@@ -13,6 +13,12 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
+# TODO: Improve speed. Too slow... !!
+
+class ECState(object):
+    RUNNING = 1
+    FAILED = 2
+    TERMINATED = 3
 
 class ExperimentController(object):
     def __init__(self, exp_id = None, root_dir = "/tmp"): 
@@ -29,9 +35,6 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
-        # Resource managers
-        self._group = dict()
-
         # Scheduler
         self._scheduler = HeapScheduler()
 
@@ -39,11 +42,14 @@ class ExperimentController(object):
         self._tasks = dict()
 
         # Event processing thread
-        self._stop = False
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
+        self._thread.setDaemon(True)
         self._thread.start()
 
+        # EC state
+        self._state = ECState.RUNNING
+
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
@@ -51,6 +57,10 @@ class ExperimentController(object):
     def logger(self):
         return self._logger
 
+    @property
+    def ecstate(self):
+        return self._state
+
     @property
     def exp_id(self):
         exp_id = self._exp_id
@@ -58,6 +68,15 @@ class ExperimentController(object):
             exp_id = "nepi-" + exp_id
         return exp_id
 
+    @property
+    def finished(self):
+        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+
+    def wait_finished(self, guids):
+        while not all([self.state(guid) == ResourceState.FINISHED \
+                for guid in guids]) and not self.finished:
+            time.sleep(1)
+    
     def get_task(self, tid):
         return self._tasks.get(tid)
 
@@ -80,16 +99,6 @@ class ExperimentController(object):
 
         return guid
 
-    def register_group(self, group):
-        guid = self._guid_generator.next()
-
-        if not isinstance(group, list):
-            group = [group] 
-
-        self._groups[guid] = group
-
-        return guid
-
     def get_attributes(self, guid):
         rm = self.get_resource(guid)
         return rm.get_attributes()
@@ -278,10 +287,15 @@ class ExperimentController(object):
 
             thread = threading.Thread(target = steps, args = (rm,))
             threads.append(thread)
+            thread.setDaemon(True)
             thread.start()
 
-        for thread in threads:
-            thread.join()
+        while list(threads) and not self.finished:
+            thread = threads[0]
+            # Time out after 5 seconds to check EC not terminated
+            thread.join(5)
+            if not thread.is_alive():
+                threads.remove(thread)
 
     def release(self, group = None):
         if not group:
@@ -292,18 +306,25 @@ class ExperimentController(object):
             rm = self.get_resource(guid)
             thread = threading.Thread(target=rm.release)
             threads.append(thread)
+            thread.setDaemon(True)
             thread.start()
 
-        for thread in threads:
-            thread.join()
+        while list(threads) and not self.finished:
+            thread = threads[0]
+            # Time out after 5 seconds to check EC not terminated
+            thread.join(5)
+            if not thread.is_alive():
+                threads.remove(thread)
+        
+        self._state = ECState.TERMINATED
 
     def shutdown(self):
         self.release()
         
-        self._stop = True
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
+
         if self._thread.is_alive():
            self._thread.join()
 
@@ -342,7 +363,7 @@ class ExperimentController(object):
         runner.start()
 
         try:
-            while not self._stop:
+            while not self.finished:
                 self._cond.acquire()
                 task = self._scheduler.next()
                 self._cond.release()
@@ -369,11 +390,19 @@ class ExperimentController(object):
                     else:
                         # Process tasks in parallel
                         runner.put(self._execute, task)
-        except:  
+                
+        except: 
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
+            self._state = ECState.FAILED
+            return
+   
+        # Mark EC state as terminated
+        if self.ecstate == ECState.RUNNING:
+            self._state = ECState.TERMINATED
+
     def _execute(self, task):
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -383,8 +412,19 @@ class ExperimentController(object):
         except:
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while executing event: %s" % err)
-
             task.result = err
             task.status = TaskStatus.ERROR
+            
+            self._logger.error("Error occurred while executing task: %s" % err)
+
+            # Mark the EC as failed
+            self._state = ECState.FAILED
+
+            # Wake up the EC in case it was sleeping
+            self._cond.acquire()
+            self._cond.notify()
+            self._cond.release()
+
+            # Propage error to the ParallelRunner
+            raise
 
index 322e276..0eb0d64 100644 (file)
@@ -3,10 +3,13 @@ from neco.execution.trace import TraceAttr
 
 import copy
 import functools
+import inspect
 import logging
+import os
+import pkgutil
 import weakref
 
-_reschedule_delay = "1s"
+reschedule_delay = "0.5s"
 
 class ResourceAction:
     DEPLOY = 0
@@ -374,7 +377,7 @@ class ResourceManager(object):
 
         """
         reschedule = False
-        delay = _reschedule_delay 
+        delay = reschedule_delay 
 
         # check state and time elapsed on all RMs
         for guid in group:
@@ -429,7 +432,7 @@ class ResourceManager(object):
         """
 
         reschedule = False
-        delay = _reschedule_delay 
+        delay = reschedule_delay 
 
         ## evaluate if set conditions are met
 
@@ -452,7 +455,7 @@ class ResourceManager(object):
 
         """
         reschedule = False
-        delay = _reschedule_delay 
+        delay = reschedule_delay 
 
         ## evaluate if set conditions are met
 
@@ -483,7 +486,7 @@ class ResourceManager(object):
 
         """
         reschedule = False
-        delay = _reschedule_delay 
+        delay = reschedule_delay 
 
         ## evaluate if set conditions are met
 
@@ -553,3 +556,44 @@ class ResourceFactory(object):
         rclass = cls._resource_types[rtype]
         return rclass(ec, guid)
 
+def populate_factory():
+    for rclass in find_types():
+        ResourceFactory.register_type(rclass)
+
+def find_types():
+    search_path = os.environ.get("NECO_SEARCH_PATH", "")
+    search_path = set(search_path.split(" "))
+   
+    import neco.resources 
+    path = os.path.dirname(neco.resources.__file__)
+    search_path.add(path)
+
+    types = []
+
+    for importer, modname, ispkg in pkgutil.walk_packages(search_path):
+        loader = importer.find_module(modname)
+        try:
+            module = loader.load_module(loader.fullname)
+            for attrname in dir(module):
+                if attrname.startswith("_"):
+                    continue
+
+                attr = getattr(module, attrname)
+
+                if attr == ResourceManager:
+                    continue
+
+                if not inspect.isclass(attr):
+                    continue
+
+                if issubclass(attr, ResourceManager):
+                    types.append(attr)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            logger = logging.getLogger("Resource.find_types()")
+            logger.error("Error while lading Resource Managers %s" % err)
+
+    return types
+
+
index bdd271b..b7b3f4e 100644 (file)
@@ -7,7 +7,7 @@ from neco.util import sshfuncs
 import logging
 import os
 
-DELAY ="1s"
+reschedule_delay = "0.5s"
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -62,12 +62,6 @@ class LinuxApplication(ResourceManager):
         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
-        update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
-                "re-upload before starting experiment. If not keep the same directory", 
-                default = True,
-                type = Types.Bool, 
-                flags = Flags.ExecReadOnly)
-
         tear_down = Attribute("tearDown", "Bash script to be executed before "
                 "releasing the resource", 
                 flags = Flags.ReadOnly)
@@ -84,7 +78,6 @@ class LinuxApplication(ResourceManager):
         cls._register_attribute(stdin)
         cls._register_attribute(stdout)
         cls._register_attribute(stderr)
-        cls._register_attribute(update_home)
         cls._register_attribute(tear_down)
 
     @classmethod
@@ -116,16 +109,16 @@ class LinuxApplication(ResourceManager):
         return None
 
     @property
-    def home(self):
+    def app_home(self):
         return os.path.join(self.node.exp_dir, self._home)
 
     @property
     def src_dir(self):
-        return os.path.join(self.home, 'src')
+        return os.path.join(self.app_home, 'src')
 
     @property
     def build_dir(self):
-        return os.path.join(self.home, 'build')
+        return os.path.join(self.app_home, 'build')
 
     @property
     def pid(self):
@@ -136,7 +129,9 @@ class LinuxApplication(ResourceManager):
         return self._ppid
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
-        path = os.path.join(self.home, name)
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+
+        path = os.path.join(self.app_home, name)
         
         cmd = "(test -f %s && echo 'success') || echo 'error'" % path
         (out, err), proc = self.node.execute(cmd)
@@ -150,7 +145,7 @@ class LinuxApplication(ResourceManager):
             return path
 
         if attr == TraceAttr.ALL:
-            (out, err), proc = self.node.check_output(self.home, name)
+            (out, err), proc = self.node.check_output(self.app_home, name)
             
             if err and proc.poll():
                 msg = " Couldn't read trace %s " % name
@@ -177,10 +172,8 @@ class LinuxApplication(ResourceManager):
         return out
             
     def provision(self, filters = None):
-        # TODO: verify home hash or clean home
-
         # create home dir for application
-        self.node.mkdir(self.home)
+        self.node.mkdir(self.app_home)
 
         # upload sources
         self.upload_sources()
@@ -197,10 +190,20 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
+        command = self.replace_paths(self.get("command"))
+        x11 = self.get("forwardX11") or False
+        if not x11:
+            self.info("Uploading command '%s'" % command)
+            
+            # If the command runs asynchronous, pre upload the command 
+            # to the app.sh file in the remote host
+            dst = os.path.join(self.app_home, "app.sh")
+            self.node.upload(command, dst, text = True)
+
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # check if sources need to be uploaded and upload them
+        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
             self.info(" Uploading sources ")
@@ -208,7 +211,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            sources = self.sources.split(' ')
+            sources = sources.split(' ')
 
             http_sources = list()
             for source in list(sources):
@@ -219,6 +222,8 @@ class LinuxApplication(ResourceManager):
             # Download http sources
             for source in http_sources:
                 dst = os.path.join(self.src_dir, source.split("/")[-1])
+                # TODO: Check if the tar.gz is already downloaded using a hash
+                # and don't download twice !!
                 command = "wget -o %s %s" % (dst, source)
                 self.node.execute(command)
 
@@ -239,7 +244,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.home)
+            self.node.install_packages(depends, home = self.app_home)
 
     def build(self):
         build = self.get("build")
@@ -251,9 +256,9 @@ class LinuxApplication(ResourceManager):
 
             cmd = self.replace_paths(build)
 
-            (out, err), proc = self.run_and_wait(cmd, self.home,
+            (out, err), proc = self.run_and_wait(cmd, self.app_home,
                 pidfile = "build_pid",
-                stdout = "build_log", 
+                stdout = "build_out", 
                 stderr = "build_err", 
                 raise_on_error = True)
  
@@ -264,18 +269,22 @@ class LinuxApplication(ResourceManager):
 
             cmd = self.replace_paths(install)
 
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
+            (out, err), proc = self.run_and_wait(cmd, self.app_home, 
                 pidfile = "install_pid",
-                stdout = "install_log", 
+                stdout = "install_out", 
                 stderr = "install_err", 
                 raise_on_error = True)
 
     def deploy(self):
+        command = self.replace_paths(self.get("command"))
+        
+        self.info(" Deploying command '%s' " % command)
+
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
-            self.ec.schedule(DELAY, self.deploy)
+            self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
                 self.discover()
@@ -296,7 +305,7 @@ class LinuxApplication(ResourceManager):
 
         super(LinuxApplication, self).start()
 
-        self.info("Starting command %s" % command)
+        self.info("Starting command '%s'" % command)
 
         if x11:
             (out, err), proc = self.node.execute(command,
@@ -310,7 +319,9 @@ class LinuxApplication(ResourceManager):
             if proc.poll() and err:
                 failed = True
         else:
-            (out, err), proc = self.node.run(command, self.home, 
+            # Run the command asynchronously
+            command = "bash ./app.sh"
+            (out, err), proc = self.node.run(command, self.app_home, 
                 stdin = stdin, 
                 sudo = sudo)
 
@@ -318,14 +329,14 @@ class LinuxApplication(ResourceManager):
                 failed = True
         
             if not failed:
-                pid, ppid = self.node.wait_pid(home = self.home)
+                pid, ppid = self.node.wait_pid(home = self.app_home)
                 if pid: self._pid = int(pid)
                 if ppid: self._ppid = int(ppid)
 
             if not self.pid or not self.ppid:
                 failed = True
  
-        (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+        (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
 
         if failed or out or chkerr:
             # check if execution errors occurred
@@ -374,7 +385,7 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.home, 'stderr')
+            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
             if out or err:
                 if err.find("No such file or directory") >= 0 :
@@ -432,7 +443,7 @@ class LinuxApplication(ResourceManager):
         return ( command
             .replace("${SOURCES}", self.src_dir)
             .replace("${BUILD}", self.build_dir) 
-            .replace("${APPHOME}", self.home) 
+            .replace("${APPHOME}", self.app_home) 
             .replace("${NODEHOME}", self.node.home) )
 
 
index 7cf7260..fdc8d3c 100644 (file)
@@ -6,7 +6,7 @@ def install_packages_command(os, packages):
 
     cmd = ""
     for p in packages:
-        cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % {
+        cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % {
                 'package': p}
    
     #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
@@ -18,7 +18,7 @@ def remove_packages_command(os, packages):
 
     cmd = ""
     for p in packages:
-        cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % {
+        cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % {
                 'package': p}
     
     #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
index f7c9daa..4907c21 100644 (file)
@@ -14,8 +14,9 @@ import threading
 
 # TODO: Verify files and dirs exists already
 # TODO: Blacklist nodes!
+# TODO: Unify delays!!
 
-DELAY ="1s"
+reschedule_delay = "0.5s"
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -83,12 +84,13 @@ class LinuxNode(ResourceManager):
     @property
     def exp_dir(self):
         exp_dir = os.path.join(self.home, self.ec.exp_id)
-        return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+        return exp_dir if exp_dir.startswith('/') or \
+                exp_dir.startswith("~/") else "~/"
 
     @property
-    def node_dir(self):
-        node_dir = "node-%d" % self.guid
-        return os.path.join(self.exp_dir, node_dir)
+    def node_home(self):
+        node_home = "node-%d" % self.guid
+        return os.path.join(self.exp_dir, node_home)
 
     @property
     def os(self):
@@ -138,7 +140,7 @@ class LinuxNode(ResourceManager):
         if self.get("cleanHome"):
             self.clean_home()
        
-        self.mkdir(self.node_dir)
+        self.mkdir(self.node_home)
 
         super(LinuxNode, self).provision()
 
@@ -157,7 +159,7 @@ class LinuxNode(ResourceManager):
         ifaces = self.get_connected(LinuxInterface.rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
-                self.ec.schedule(DELAY, self.deploy)
+                self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
         super(LinuxNode, self).deploy()
@@ -190,7 +192,6 @@ class LinuxNode(ResourceManager):
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
 
-
         out = err = ""
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
@@ -242,7 +243,7 @@ class LinuxNode(ResourceManager):
         return self.copy(src, dst)
 
     def install_packages(self, packages, home = None):
-        home = home or self.node_dir
+        home = home or self.node_home
 
         cmd = ""
         if self.os in ["f12", "f14"]:
@@ -257,14 +258,14 @@ class LinuxNode(ResourceManager):
         out = err = ""
         (out, err), proc = self.run_and_wait(cmd, home, 
             pidfile = "instpkg_pid",
-            stdout = "instpkg_log", 
-            stderr = "instpkg_err", 
+            stdout = "instpkg_out", 
+            stderr = "instpkg_err",
             raise_on_error = True)
 
         return (out, err), proc 
 
     def remove_packages(self, packages, home = None):
-        home = home or self.node_dir
+        home = home or self.node_home
 
         cmd = ""
         if self.os in ["f12", "f14"]:
@@ -279,8 +280,8 @@ class LinuxNode(ResourceManager):
         out = err = ""
         (out, err), proc = self.run_and_wait(cmd, home, 
             pidfile = "rmpkg_pid",
-            stdout = "rmpkg_log", 
-            stderr = "rmpkg_err", 
+            stdout = "rmpkg_out", 
+            stderr = "rmpkg_err",
             raise_on_error = True)
          
         return (out, err), proc 
@@ -301,6 +302,7 @@ class LinuxNode(ResourceManager):
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
+            tty = False,
             raise_on_error = False):
         """ runs a command in background on the remote host, but waits
             until the command finishes execution.
@@ -314,7 +316,8 @@ class LinuxNode(ResourceManager):
                 stdin = stdin, 
                 stdout = stdout, 
                 stderr = stderr, 
-                sudo = sudo)
+                sudo = sudo,
+                tty = tty)
 
         # check no errors occurred
         if proc.poll() and err:
@@ -395,7 +398,7 @@ class LinuxNode(ResourceManager):
     def check_output(self, home, filename):
         """ checks file content """
         (out, err), proc = self.execute("cat %s" % 
-            os.path.join(home, filename), with_lock = True)
+            os.path.join(home, filename), retry = 1, with_lock = True)
         return (out, err), proc
 
     def is_alive(self):
@@ -513,9 +516,10 @@ class LinuxNode(ResourceManager):
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
-            sudo = False):
+            sudo = False,
+            tty = False):
 
-        self.debug("Running %s" % command)
+        self.debug("Running command '%s'" % command)
         
         if self.localhost:
             (out, err), proc = execfuncs.lspawn(command, pidfile, 
@@ -544,7 +548,8 @@ class LinuxNode(ResourceManager):
                     port = self.get("port"),
                     agent = True,
                     identity = self.get("identity"),
-                    server_key = self.get("serverKey")
+                    server_key = self.get("serverKey"),
+                    tty = tty
                     )
 
         return (out, err), proc
index 7f44887..b5a7b3d 100644 (file)
@@ -9,7 +9,7 @@ def install_packages_command(os, packages):
 
     cmd = "( %s )" % install_rpmfusion_command(os)
     for p in packages:
-        cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % {
+        cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
             'package': p}
     
     #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
@@ -21,7 +21,7 @@ def remove_packages_command(os, packages):
 
     cmd = ""
     for p in packages:
-        cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % {
+        cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
                     'package': p}
     
     #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
index d436635..1bc0fc4 100644 (file)
@@ -35,7 +35,7 @@ class OMFWifiInterface(ResourceManager):
         """Register the attributes of an OMF interface 
 
         """
-        alias = Attribute("alias","Alias of the interface", default_value = "w0")
+        alias = Attribute("alias","Alias of the interface", default = "w0")
         mode = Attribute("mode","Mode of the interface")
         type = Attribute("type","Type of the interface")
         essid = Attribute("essid","Essid of the interface")
index b5d8f0e..043bc64 100644 (file)
@@ -265,7 +265,7 @@ def rexec(command, host, user,
                 
                 if skip:
                     t = x*2
-                    msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % ( 
+                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
                             t, x, host, " ".join(args))
                     log(msg, logging.DEBUG)
 
index fba3cd5..9322540 100755 (executable)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-from neco.execution.ec import ExperimentController 
+from neco.execution.ec import ExperimentController, ECState 
 from neco.execution.scheduler import TaskStatus
 
 import datetime
@@ -13,21 +13,19 @@ class ExecuteControllersTestCase(unittest.TestCase):
             return 'hola!' 
 
         ec = ExperimentController()
+    
+        tid = ec.schedule("0s", myfunc, track=True)
         
-        try:
-            tid = ec.schedule("0s", myfunc, track=True)
-            
-            while True:
-                task = ec.get_task(tid)
-                if task.status != TaskStatus.NEW:
-                    break
+        while True:
+            task = ec.get_task(tid)
+            if task.status != TaskStatus.NEW:
+                break
 
-                time.sleep(1)
+            time.sleep(1)
 
-            self.assertEquals('hola!', task.result)
+        self.assertEquals('hola!', task.result)
 
-        finally:
-            ec.shutdown()
+        ec.shutdown()
 
     def test_schedule_date(self):
         def get_time():
@@ -35,25 +33,36 @@ class ExecuteControllersTestCase(unittest.TestCase):
 
         ec = ExperimentController()
 
-        try:
-            schedule_time = datetime.datetime.now()
-            
-            tid = ec.schedule("4s", get_time, track=True)
+        schedule_time = datetime.datetime.now()
+        
+        tid = ec.schedule("4s", get_time, track=True)
+
+        while True:
+            task = ec.get_task(tid)
+            if task.status != TaskStatus.NEW:
+                break
+
+            time.sleep(1)
 
-            while True:
-                task = ec.get_task(tid)
-                if task.status != TaskStatus.NEW:
-                    break
+        execution_time = task.result
+        delta = execution_time - schedule_time
+        self.assertTrue(delta > datetime.timedelta(seconds=4))
+        self.assertTrue(delta < datetime.timedelta(seconds=5))
 
-                time.sleep(1)
+        ec.shutdown()
 
-            execution_time = task.result
-            delta = execution_time - schedule_time
-            self.assertTrue(delta > datetime.timedelta(seconds=4))
-            self.assertTrue(delta < datetime.timedelta(seconds=5))
+    def test_schedule_exception(self):
+        def raise_error():
+            raise RuntimeError, "the error"
 
-        finally:
-            ec.shutdown()
+        ec = ExperimentController()
+        ec.schedule("2s", raise_error)
+
+        while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
+           time.sleep(1)
+        
+        self.assertEquals(ec.ecstate, ECState.FAILED)
+        ec.shutdown()
 
 
 if __name__ == '__main__':
index 8f5a2b5..128c6df 100755 (executable)
@@ -147,15 +147,14 @@ class ResourceManagerTestCase(unittest.TestCase):
         ec.register_connection(iface1, chan)
         ec.register_connection(iface2, chan)
 
-        try:
-            ec.deploy()
+        ec.deploy()
 
-            while not all([ ec.state(guid) == ResourceState.STARTED \
-                    for guid in [app1, app2, node1, node2, iface1, iface2, chan]]):
-                time.sleep(0.5)
+        while not all([ ec.state(guid) == ResourceState.STARTED \
+                for guid in [app1, app2, node1, node2, iface1, iface2, chan]]) \
+                and not ec.finished:
+            time.sleep(0.5)
 
-        finally:
-            ec.shutdown()
+        ec.shutdown()
 
         rmapp1 = ec.get_resource(app1)
         rmapp2 = ec.get_resource(app2)
index 445cef7..b6066f1 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 from neco.execution.ec import ExperimentController 
-from neco.execution.resource import ResourceState
+from neco.execution.resource import ResourceState, ResourceAction
 from neco.execution.trace import TraceAttr
 from neco.resources.linux.node import LinuxNode
 from neco.resources.linux.application import LinuxApplication
@@ -22,6 +22,38 @@ class LinuxApplicationTestCase(unittest.TestCase):
         
         self.target = 'nepi5.pl.sophia.inria.fr'
 
+    @skipIfNotAlive
+    def t_stdout(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        app = ec.register_resource("LinuxApplication")
+        cmd = "echo 'HOLA'"
+        ec.set(app, "command", cmd)
+        ec.register_connection(app, node)
+
+        ec.deploy()
+
+        ec.wait_finished([app])
+
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+        stdout = ec.trace(app, 'stdout')
+        self.assertTrue(stdout.strip() == "HOLA")
+
+        ec.shutdown()
+
     @skipIfNotAlive
     def t_ping(self, host, user):
         from neco.execution.resource import ResourceFactory
@@ -43,15 +75,61 @@ class LinuxApplicationTestCase(unittest.TestCase):
         
         ec.register_connection(app, node)
 
-        try:
-            ec.deploy()
+        ec.deploy()
+
+        ec.wait_finished([app])
 
-            while not ec.state(app) == ResourceState.FINISHED:
-                time.sleep(0.5)
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+        stdout = ec.trace(app, 'stdout')
+        size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+        self.assertEquals(len(stdout), size)
+        
+        block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+        self.assertEquals(block, stdout[5:10])
 
-            self.assertTrue(ec.state(node) == ResourceState.STARTED)
-            self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+        path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+        rm = ec.get_resource(app)
+        p = os.path.join(rm.app_home, 'stdout')
+        self.assertEquals(path, p)
+
+        ec.shutdown()
+
+    @skipIfNotAlive
+    def t_concurrency(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
 
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        apps = list()
+        for i in xrange(50):
+            app = ec.register_resource("LinuxApplication")
+            cmd = "ping -c5 %s" % self.target 
+            ec.set(app, "command", cmd)
+            ec.register_connection(app, node)
+            apps.append(app)
+
+        ec.deploy()
+
+        ec.wait_finished(apps)
+
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(
+               all([ec.state(guid) == ResourceState.FINISHED \
+                for guid in apps])
+                )
+
+        for app in apps:
             stdout = ec.trace(app, 'stdout')
             size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
             self.assertEquals(len(stdout), size)
@@ -61,14 +139,13 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
             path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
             rm = ec.get_resource(app)
-            p = os.path.join(rm.home, 'stdout')
+            p = os.path.join(rm.app_home, 'stdout')
             self.assertEquals(path, p)
 
-        finally:
-            ec.shutdown()
+        ec.shutdown()
 
     @skipIfNotAlive
-    def t_concurrency(self, host, user):
+    def t_condition(self, host, user, depends):
         from neco.execution.resource import ResourceFactory
         
         ResourceFactory.register_type(LinuxNode)
@@ -82,47 +159,44 @@ class LinuxApplicationTestCase(unittest.TestCase):
         ec.set(node, "cleanHome", True)
         ec.set(node, "cleanProcesses", True)
 
-        apps = list()
-        for i in xrange(50):
-            app = ec.register_resource("LinuxApplication")
-            cmd = "ping -c5 %s" % self.target 
-            ec.set(app, "command", cmd)
-            ec.register_connection(app, node)
-            apps.append(app)
+        server = ec.register_resource("LinuxApplication")
+        cmd = "echo 'HOLA' | nc -l 3333"
+        ec.set(server, "command", cmd)
+        ec.set(server, "depends", depends)
+        ec.register_connection(server, node)
+
+        client = ec.register_resource("LinuxApplication")
+        cmd = "nc 127.0.0.1 3333"
+        ec.set(client, "command", cmd)
+        ec.register_connection(client, node)
+
+        ec.register_condition(client, ResourceAction.START, server, ResourceState.STARTED)
 
-        try:
-            ec.deploy()
+        apps = [client, server]
+        
+        ec.deploy()
+
+        ec.wait_finished(apps)
 
-            while not all([ec.state(guid) == ResourceState.FINISHED \
-                    for guid in apps]):
-                time.sleep(0.5)
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(ec.state(server) == ResourceState.FINISHED)
+        self.assertTrue(ec.state(client) == ResourceState.FINISHED)
 
-            self.assertTrue(ec.state(node) == ResourceState.STARTED)
-            self.assertTrue(
-                   all([ec.state(guid) == ResourceState.FINISHED \
-                    for guid in apps])
-                    )
+        stdout = ec.trace(client, 'stdout')
+        self.assertTrue(stdout.strip() == "HOLA")
 
-            for app in apps:
-                stdout = ec.trace(app, 'stdout')
-                size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
-                self.assertEquals(len(stdout), size)
-                
-                block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
-                self.assertEquals(block, stdout[5:10])
+        ec.shutdown()
 
-                path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
-                rm = ec.get_resource(app)
-                p = os.path.join(rm.home, 'stdout')
-                self.assertEquals(path, p)
+    def test_stdout_fedora(self):
+        self.t_stdout(self.fedora_host, self.fedora_user)
 
-        finally:
-            ec.shutdown()
+    def test_stdout_ubuntu(self):
+        self.t_stdout(self.ubuntu_host, self.ubuntu_user)
 
     def test_ping_fedora(self):
         self.t_ping(self.fedora_host, self.fedora_user)
 
-    def test_fing_ubuntu(self):
+    def test_ping_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
     def test_concurrency_fedora(self):
@@ -131,6 +205,13 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_concurrency_ubuntu(self):
         self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
 
+    def test_condition_fedora(self):
+        self.t_condition(self.fedora_host, self.fedora_user, "nc")
+
+    def test_condition_ubuntu(self):
+        self.t_condition(self.ubuntu_host, self.ubuntu_user, "netcat")
+
+    # TODO: test compilation, sources, dependencies, etc!!!
 
 if __name__ == '__main__':
     unittest.main()
index 7b914d2..f94c7e6 100644 (file)
@@ -41,19 +41,17 @@ class LinuxInterfaceTestCase(unittest.TestCase):
         ec.register_connection(iface, node)
         ec.register_connection(iface, chan)
 
-        try:
-            ec.deploy()
+        ec.deploy()
 
-            while not all([ ec.state(guid) == ResourceState.STARTED \
-                    for guid in [node, iface]]):
-                time.sleep(0.5)
+        while not all([ ec.state(guid) == ResourceState.STARTED \
+                for guid in [node, iface]]) and not ec.finished:
+            time.sleep(0.5)
 
-            self.assertTrue(ec.state(node) == ResourceState.STARTED)
-            self.assertTrue(ec.state(iface) == ResourceState.STARTED)
-            self.assertTrue(ec.get(iface, "deviceName") == "eth0")
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(ec.state(iface) == ResourceState.STARTED)
+        self.assertTrue(ec.get(iface, "deviceName") == "eth0")
 
-        finally:
-            ec.shutdown()
+        ec.shutdown()
 
     def test_deploy_fedora(self):
         self.t_deploy(self.fedora_host, self.fedora_user)
index 075c353..cd56757 100644 (file)
@@ -75,6 +75,18 @@ class LinuxNodeTestCase(unittest.TestCase):
     def t_install(self, host, user):
         node, ec = create_node(host, user)
 
+        (out, err), proc = node.install_packages('gcc')
+        self.assertEquals(out, "")
+
+        (out, err), proc = node.remove_packages('gcc')
+        
+        self.assertEquals(out, "")
+
+
+    @skipIfNotAlive
+    def t_compile(self, host, user):
+        node, ec = create_node(host, user)
+
         app_home = os.path.join(node.exp_dir, "my-app")
         node.mkdir(app_home, clean = True)
 
@@ -142,6 +154,12 @@ main (void)
 
     def test_install_ubuntu(self):
         self.t_install(self.ubuntu_host, self.ubuntu_user)
+
+    def test_compile_fedora(self):
+        self.t_compile(self.fedora_host, self.fedora_user)
+
+    def test_compile_ubuntu(self):
+        self.t_compile(self.ubuntu_host, self.ubuntu_user)
     
     @skipInteractive
     def test_xterm_ubuntu(self):
index 369e022..1cf8aba 100644 (file)
@@ -35,7 +35,8 @@ def skipIfNotAlive(func):
 def skipInteractive(func):
     name = func.__name__
     def wrapped(*args, **kwargs):
-        mode = os.environ.get("NEPI_INTERACTIVE", False).lower() in ['true', 'yes']
+        mode = os.environ.get("NEPI_INTERACTIVE", False)
+        mode = mode and  mode.lower() in ['true', 'yes']
         if not mode:
             print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
             return