Bugfixes for LinuxApplication and LinuxNode
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 14 Jun 2013 00:46:31 +0000 (17:46 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 14 Jun 2013 00:46:31 +0000 (17:46 -0700)
examples/linux/ccnx/vlc_2_hosts.py
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccnd.py [new file with mode: 0644]
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
test/execution/resource.py
test/resources/linux/application.py

index f880fca..3235572 100755 (executable)
@@ -22,6 +22,7 @@
 from nepi.execution.ec import ExperimentController, ECState 
 from nepi.execution.resource import ResourceState, ResourceAction, \
         populate_factory
+from nepi.resources.linux.node import OSType
 
 from optparse import OptionParser, SUPPRESS_HELP
 
@@ -33,16 +34,16 @@ def add_node(ec, host, user, ssh_key = None):
     ec.set(node, "hostname", host)
     ec.set(node, "username", user)
     ec.set(node, "identity", ssh_key)
-    ec.set(node, "cleanHome", True)
+    #ec.set(node, "cleanHome", True)
     ec.set(node, "cleanProcesses", True)
     return node
 
 def add_ccnd(ec, os_type, peers):
-    if os_type == "f12":
+    if os_type == OSType.FEDORA:
         depends = ( " autoconf openssl-devel  expat-devel libpcap-devel "
                 " ecryptfs-utils-devel libxml2-devel automake gawk " 
                 " gcc gcc-c++ git pcre-devel make ")
-    elif os_type == "ubuntu":
+    else: # UBUNTU
         depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev "
                 " libecryptfs0 libxml2-utils automake gawk gcc g++ "
                 " git-core pkg-config libpcre3-dev make ")
@@ -61,7 +62,7 @@ def add_ccnd(ec, os_type, peers):
              " ) && "
                 "cd ${SOURCES}/ccnx && "
                 # Just execute and silence warnings...
-                "(  ( ./configure && make )  2>&1 )"
+                "( ./configure && make ) "
          " )") 
 
     install = (
@@ -77,10 +78,10 @@ def add_ccnd(ec, os_type, peers):
     env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
 
     # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp  host ;  ccnr '
-    command = "ccndstart ; "
+    command = "ccndstart && "
     peers = map(lambda peer: "ccndc add ccnx:/ udp  %s" % peer, peers)
-    command += " ; ".join(peers) + " ; "
-    command += " ccnr "
+    command += " ; ".join(peers) + " && "
+    command += " ccnr "
 
     app = ec.register_resource("LinuxApplication")
     ec.set(app, "depends", depends)
@@ -170,7 +171,7 @@ if __name__ == '__main__':
     node1 = add_node(ec, host1, pl_slice, pl_ssh_key)
     
     peers = [host2]
-    ccnd1 = add_ccnd(ec, "f12", peers)
+    ccnd1 = add_ccnd(ec, OSType.FEDORA, peers)
 
     ec.register_connection(ccnd1, node1)
 
index 5d0363e..c5dc10f 100644 (file)
@@ -138,22 +138,40 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reach the state FINISHED
+            reached the state FINISHED
 
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids)
+
+    def wait_started(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state STARTED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+
+    def wait(self, guids, states = [ResourceState.FINISHED]):
+        """ Blocking method that waits until all the RM from the 'guid' list 
+            reached state 'state' or until a failure occurs
+            
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in [ResourceState.FINISHED, 
-            ResourceState.STOPPED, 
-            ResourceState.FAILED] \
-                for guid in guids]) and not self.finished:
-            # We keep the sleep as large as possible to 
-            # decrese the number of RM state requests
+        while not all([self.state(guid) in states for guid in guids]) and \
+                not any([self.state(guid) in [
+                        ResourceState.STOPPED, 
+                        ResourceState.FAILED] for guid in guids]) and \
+                not self.finished:
+            # We keep the sleep big to decrease the number of RM state queries
             time.sleep(2)
-    
+   
     def get_task(self, tid):
         """ Get a specific task
 
@@ -448,8 +466,13 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not group:
-            group = self.resources
-
+            # By default, if not deployment group is indicated, 
+            # all RMs that are undeployed will be deployed
+            group = []
+            for guid in self.resources:
+                if self.state(guid) == ResourceState.NEW:
+                    group.append(guid)
+                
         if isinstance(group, int):
             group = [group]
 
@@ -651,6 +674,7 @@ class ExperimentController(object):
 
             self._state = ECState.FAILED
         finally:   
+            self._logger.info("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):
index 5f4c998..e975cd2 100644 (file)
@@ -102,11 +102,9 @@ class LinuxApplication(ResourceManager):
     def _register_traces(cls):
         stdout = Trace("stdout", "Standard output stream")
         stderr = Trace("stderr", "Standard error stream")
-        buildlog = Trace("buildlog", "Output of the build process")
 
         cls._register_trace(stdout)
         cls._register_trace(stderr)
-        cls._register_trace(buildlog)
 
     def __init__(self, ec, guid):
         super(LinuxApplication, self).__init__(ec, guid)
@@ -222,6 +220,7 @@ class LinuxApplication(ResourceManager):
 
             # replace application specific paths in the command
             command = self.replace_paths(command)
+            env = env and self.replace_paths(env)
 
             self.node.upload_command(command, self.app_home, 
                     shfile = "app.sh",
@@ -248,13 +247,16 @@ class LinuxApplication(ResourceManager):
 
             # Download http sources remotely
             if http_sources:
-                command = " wget -c --directory-prefix=${SOURCES} "
-                check = ""
+                command = [" wget -c --directory-prefix=${SOURCES} "]
+                check = []
 
                 for source in http_sources:
-                    command += " %s " % (source)
-                    check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                    command.append(" %s " % (source))
+                    check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
                 
+                command = " ".join(command)
+                check = " ; ".join(check)
+
                 # Append the command to check that the sources were downloaded
                 command += " ; %s " % check
 
@@ -307,7 +309,7 @@ class LinuxApplication(ResourceManager):
             self.node.mkdir(self.build_dir)
 
             # replace application specific paths in the command
-            command = self.replace_paths(command)
+            command = self.replace_paths(build)
 
             # Upload the command to a file, and execute asynchronously
             self.node.run_and_wait(command, self.app_home,
@@ -323,7 +325,7 @@ class LinuxApplication(ResourceManager):
             self.info(" Installing sources ")
 
             # replace application specific paths in the command
-            command = self.replace_paths(command)
+            command = self.replace_paths(install)
 
             # Upload the command to a file, and execute asynchronously
             self.node.run_and_wait(command, self.app_home,
@@ -388,7 +390,7 @@ class LinuxApplication(ResourceManager):
                 for var in env.split(" "):
                     environ += ' %s ' % var
 
-                command = "(" + environ + " ; " + command + ")"
+                command = "{" + environ + " ; " + command + " ; }"
                 command = self.replace_paths(command)
 
             # If the command requires X11 forwarding, we
@@ -515,8 +517,4 @@ class LinuxApplication(ResourceManager):
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
-        # XXX: What if it is connected to more than one node?
-        resources = self.find_resources(exact_tags = [tags.NODE])
-        self._node = resources[0] if len(resources) == 1 else None
-        return self._node
 
diff --git a/src/nepi/resources/linux/ccnd.py b/src/nepi/resources/linux/ccnd.py
new file mode 100644 (file)
index 0000000..06a735e
--- /dev/null
@@ -0,0 +1,287 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.resources.linux.application import LinuxApplication
+from nepi.resources.linux.node import OSType
+
+import os
+
+@clsinit_copy
+class LinuxCCND(LinuxApplication):
+    _rtype = "LinuxCCND"
+
+    @classmethod
+    def _register_attributes(cls):
+        debug = Attribute("debug", "Sets the CCND_DEBUG environmental variable. "
+            " Allowed values are : \n"
+            "  0 - no messages \n"
+            "  1 - basic messages (any non-zero value gets these) \n"
+            "  2 - interest messages \n"
+            "  4 - content messages \n"
+            "  8 - matching details \n"
+            "  16 - interest details \n"
+            "  32 - gory interest details \n"
+            "  64 - log occasional human-readable timestamps \n"
+            "  128 - face registration debugging \n"
+            "  -1 - max logging \n"
+            "  Or apply bitwise OR to these values to get combinations of them",
+            flags = Flags.ExecReadOnly)
+
+        port = Attribute("port", "Sets the CCN_LOCAL_PORT environmental variable. "
+            "Defaults to 9695 ", 
+            flags = Flags.ExecReadOnly)
+        sockname = Attribute("sockname",
+            "Sets the CCN_LOCAL_SCOKNAME environmental variable. "
+            "Defaults to /tmp/.ccnd.sock", 
+            flags = Flags.ExecReadOnly)
+
+        capacity = Attribute("capacity",
+            "Sets the CCND_CAP environmental variable. "
+            "Capacity limit in terms of ContentObjects",
+            flags = Flags.ExecReadOnly)
+
+        mtu = Attribute("mtu", "Sets the CCND_MTU environmental variable. ",
+            flags = Flags.ExecReadOnly)
+  
+        data_pause = Attribute("dataPauseMicrosec",
+            "Sets the CCND_DATA_PAUSE_MICROSEC environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        default_stale = Attribute("defaultTimeToStale",
+             "Sets the CCND_DEFAULT_TIME_TO_STALE environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        max_stale = Attribute("maxTimeToStale",
+            "Sets the CCND_MAX_TIME_TO_STALE environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        max_rte = Attribute("maxRteMicrosec",
+            "Sets the CCND_MAX_RTE_MICROSEC environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        keystore = Attribute("keyStoreDirectory",
+            "Sets the CCND_KEYSTORE_DIRECTORY environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        listen_on = Attribute("listenOn",
+            "Sets the CCND_LISTEN_ON environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        autoreg = Attribute("autoreg",
+            "Sets the CCND_AUTOREG environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        prefix = Attribute("prefix",
+            "Sets the CCND_PREFIX environmental variable. ",
+            flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(debug)
+        cls._register_attribute(port)
+        cls._register_attribute(sockname)
+        cls._register_attribute(capacity)
+        cls._register_attribute(mtu)
+        cls._register_attribute(data_pause)
+        cls._register_attribute(default_stale)
+        cls._register_attribute(max_stale)
+        cls._register_attribute(max_rte)
+        cls._register_attribute(keystore)
+        cls._register_attribute(listen_on)
+        cls._register_attribute(autoreg)
+        cls._register_attribute(prefix)
+
+    @classmethod
+    def _register_traces(cls):
+        log = Trace("log", "CCND log output")
+        status = Trace("status", "ccndstatus output")
+
+        cls._register_trace(log)
+        cls._register_trace(status)
+
+    def __init__(self, ec, guid):
+        super(LinuxCCND, self).__init__(ec, guid)
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+
+        path = os.path.join(self.app_home, name)
+        
+        command = "(test -f %s && echo 'success') || echo 'error'" % path
+        (out, err), proc = self.node.execute(command)
+
+        if (err and proc.poll()) or out.find("error") != -1:
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
+            return None
+    
+        if attr == TraceAttr.PATH:
+            return path
+
+        if attr == TraceAttr.ALL:
+            (out, err), proc = self.node.check_output(self.app_home, name)
+            
+            if err and proc.poll():
+                msg = " Couldn't read trace %s " % name
+                self.error(msg, out, err)
+                return None
+
+            return out
+
+        if attr == TraceAttr.STREAM:
+            cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+        elif attr == TraceAttr.SIZE:
+            cmd = "stat -c%%s %s " % path
+
+        (out, err), proc = self.node.execute(cmd)
+
+        if err and proc.poll():
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
+            return None
+        
+        if attr == TraceAttr.SIZE:
+            out = int(out.strip())
+
+        return out
+            
+    def deploy(self):
+        if not self.get("command"):
+            self.set("command", self._default_command)
+        
+        if not self.get("depends"):
+            self.set("depends", self._default_dependencies)
+
+        if not self.get("sources"):
+            self.set("sources", self._default_sources)
+
+        if not self.get("build"):
+            self.set("build", self._default_build)
+
+        if not self.get("install"):
+            self.set("install", self._default_install)
+
+        if not self.get("env"):
+            self.set("env", self._default_environment)
+
+        super(LinuxCCND, self).deploy()
+
+    def stop(self):
+        command = self.get('command') or ''
+        state = self.state
+        
+        if state == ResourceState.STARTED:
+            self.info("Stopping command '%s'" % command)
+
+            (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+            if out or err:
+                # check if execution errors occurred
+                msg = " Failed to STOP command '%s' " % self.get("command")
+                self.error(msg, out, err)
+                self._state = ResourceState.FAILED
+                stopped = False
+            else:
+                super(LinuxApplication, self).stop()
+
+
+    @property
+    def _default_command(self):
+        return "ccndstart"
+
+    @property
+    def _default_dependencies(self):
+        if self.node.os in [ OSType.FEDORA_12 , OSType.FEDORA_14 ]:
+            return ( " autoconf openssl-devel  expat-devel libpcap-devel "
+                " ecryptfs-utils-devel libxml2-devel automake gawk " 
+                " gcc gcc-c++ git pcre-devel make ")
+        elif self.node.os in [ OSType.UBUNTU , OSType.DEBIAN]:
+            return ( " autoconf libssl-dev libexpat-dev libpcap-dev "
+                " libecryptfs0 libxml2-utils automake gawk gcc g++ "
+                " git-core pkg-config libpcre3-dev make ")
+        return ""
+
+    @property
+    def _default_sources(self):
+        return "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz"
+
+    @property
+    def _default_build(self):
+        sources = self.get("sources").split(" ")[0]
+        sources = os.path.basename(sources)
+
+        return (
+            # Evaluate if ccnx binaries are already installed
+            " ( "
+                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+            " ) || ( "
+            # If not, untar and build
+                " ( "
+                    " mkdir -p ${SOURCES}/ccnx && "
+                    " tar xf ${SOURCES}/%(sources)s --strip-components=1 -C ${SOURCES}/ccnx "
+                 " ) && "
+                    "cd ${SOURCES}/ccnx && "
+                    # Just execute and silence warnings...
+                    " ( ./configure && make ) "
+             " )") % ({ 'sources': sources })
+
+    @property
+    def _default_install(self):
+        return (
+            # Evaluate if ccnx binaries are already installed
+            " ( "
+                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+            " ) || ( "
+                "  mkdir -p ${EXP_HOME}/ccnx/bin && "
+                "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
+            " )"
+            )
+
+    @property
+    def _default_environment(self):
+        envs = dict({
+            "debug": "CCND_DEBUG",
+            "port": "CCN_LOCAL_PORT",
+            "sockname" : "CCN_LOCAL_SOCKNAME",
+            "capacity" : "CCND_CAP",
+            "mtu" : "CCND_MTU",
+            "dataPauseMicrosec" : "CCND_DATA_PAUSE_MICROSEC",
+            "defaultTimeToStale" : "CCND_DEFAULT_TIME_TO_STALE",
+            "maxTimeToStale" : "CCND_MAX_TIME_TO_STALE",
+            "maxRteMicrosec" : "CCND_MAX_RTE_MICROSEC",
+            "keyStoreDirectory" : "CCND_KEYSTORE_DIRECTORY",
+            "listenOn" : "CCND_LISTEN_ON",
+            "autoreg" : "CCND_AUTOREG",
+            "prefix" : "CCND_PREFIX",
+            })
+
+        env = "PATH=$PATH:${EXP_HOME}/ccnx/bin"
+        for key in envs.keys():
+            val = self.get(key)
+            if val:
+                env += " %s=%s" % (key, val)
+        
+        return env            
+        
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
index 4708477..6fab54a 100644 (file)
@@ -40,7 +40,7 @@ class LinuxInterface(ResourceManager):
     @classmethod
     def _register_attributes(cls):
         ip4 = Attribute("ip4", "IPv4 Address",
-                flags = Flags.ExecReadOnly)
+              flags = Flags.ExecReadOnly)
 
         ip6 = Attribute("ip6", "IPv6 Address",
                 flags = Flags.ExecReadOnly)
@@ -56,7 +56,7 @@ class LinuxInterface(ResourceManager):
                 flags = Flags.ExecReadOnly)
 
         mtu = Attribute("mtu", "Maximum transmition unit for device",
-            type = Types.Integer)
+                type = Types.Integer)
 
         devname = Attribute("deviceName", 
                 "Name of the network interface (e.g. eth0, wlan0, etc)",
index 0f3a01c..50dd226 100644 (file)
@@ -48,6 +48,16 @@ class ExitCode:
     ERROR = -3
     OK = 0
 
+class OSType:
+    """
+    Supported flavors of Linux OS
+    """
+    FEDORA_12 = "f12"
+    FEDORA_14 = "f14"
+    FEDORA = "fedora"
+    UBUNTU = "ubuntu"
+    DEBIAN = "debian"
+
 @clsinit
 class LinuxNode(ResourceManager):
     _rtype = "LinuxNode"
@@ -136,13 +146,13 @@ class LinuxNode(ResourceManager):
             raise RuntimeError, "%s - %s - %s" %( msg, out, err )
 
         if out.find("Fedora release 12") == 0:
-            self._os = "f12"
+            self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
-            self._os = "f14"
+            self._os = OSType.FEDORA_14
         elif out.find("Debian") == 0: 
-            self._os = "debian"
+            self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
-            self._os = "ubuntu"
+            self._os = OSType.UBUNTU
         else:
             msg = "Unsupported OS"
             self.error(msg, out)
@@ -276,9 +286,9 @@ class LinuxNode(ResourceManager):
 
     def install_packages(self, packages, home):
         command = ""
-        if self.os in ["f12", "f14"]:
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
             command = rpmfuncs.install_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
             command = debfuncs.install_packages_command(self.os, packages)
         else:
             msg = "Error installing packages ( OS not known ) "
@@ -298,9 +308,9 @@ class LinuxNode(ResourceManager):
 
     def remove_packages(self, packages, home):
         command = ""
-        if self.os in ["f12", "f14"]:
+        if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
             command = rpmfuncs.remove_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
+        elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
             command = debfuncs.remove_packages_command(self.os, packages)
         else:
             msg = "Error removing packages ( OS not known ) "
@@ -411,7 +421,7 @@ class LinuxNode(ResourceManager):
             ecodefile = "exitcode",
             env = None):
 
-        command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
+        command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
                 'command': command,
                 'ecodefile': ecodefile,
                 } 
@@ -450,8 +460,9 @@ class LinuxNode(ResourceManager):
             
             # If the stderr file was not found, assume nothing happened.
             # We just ignore the error.
-            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
-                err = ""
+            # (cat returns 1 for error "No such file or directory")
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
+                out = err = ""
        
         return (out, err), proc
  
index d832fb6..e42e7b8 100644 (file)
@@ -47,11 +47,13 @@ def remove_packages_command(os, packages):
     return cmd 
 
 def install_rpmfusion_command(os):
+    from nepi.resources.linux.node import OSType
+
     cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
 
-    if os == "f12":
+    if os in [OSType.FEDORA, OSType.FEDORA_12]:
         cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
-    elif os == "f14":
+    elif os == OSType.FEDORA_14:
         # This one works for f13+
         cmd = cmd %  {'package': RPM_FUSION_URL}
     else:
index 74e560e..bb1de3e 100755 (executable)
@@ -23,6 +23,7 @@ from nepi.execution.attribute import Attribute
 from nepi.execution.ec import ExperimentController 
 from nepi.execution.resource import ResourceManager, ResourceState, clsinit
 
+import random
 import time
 import unittest
 
@@ -126,9 +127,15 @@ class Application(ResourceManager):
         if node.state < ResourceState.READY:
             self.ec.schedule("0.5s", self.deploy)
         else:
+            time.sleep(random.random() * 5)
             super(Application, self).deploy()
             self.logger.debug(" -------- DEPLOYED ------- ")
 
+    def start(self):
+        super(Application, self).start()
+        time.sleep(random.random() * 5)
+        self._state = ResourceState.FINISHED
+
 class ResourceManagerTestCase(unittest.TestCase):
     def test_deploy_in_order(self):
         """
@@ -169,10 +176,8 @@ class ResourceManagerTestCase(unittest.TestCase):
 
         ec.deploy()
 
-        while not all([ ec.state(guid) == ResourceState.STARTED \
-                for guid in [app1, app2, node1, node2, iface1, iface2, chan]]) \
-                and not ec.finished:
-            time.sleep(0.5)
+        guids = [app1, app2]
+        ec.wait_finished(guids)
 
         ec.shutdown()
 
@@ -201,6 +206,36 @@ class ResourceManagerTestCase(unittest.TestCase):
         self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
         self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
 
+    def test_concurrency(self):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(Application)
+        ResourceFactory.register_type(Node)
+        ResourceFactory.register_type(Interface)
+        ResourceFactory.register_type(Channel)
+
+        ec = ExperimentController()
+
+        node = ec.register_resource("Node")
+
+        apps = list()
+        for i in xrange(5000):
+            app = ec.register_resource("Application")
+            ec.register_connection(app, node)
+            apps.append(app)
+
+        ec.deploy()
+
+        ec.wait_finished(apps)
+        
+        self.assertTrue(ec.state(node) == ResourceState.STARTED)
+        self.assertTrue(
+               all([ec.state(guid) == ResourceState.FINISHED \
+                for guid in apps])
+                )
+
+        ec.shutdown()
+
     def test_start_with_condition(self):
         # TODO!!!
         pass
index 1393f6a..0c69f71 100755 (executable)
@@ -34,13 +34,13 @@ import unittest
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = "nepi2.pl.sophia.inria.fr"
+        self.fedora_host = "nepi5.pl.sophia.inria.fr"
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"
         self.ubuntu_user = "alina"
         
-        self.target = "nepi5.pl.sophia.inria.fr"
+        self.target = "nepi3.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
@@ -258,10 +258,10 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_ping_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
-    def test_concurrency_fedora(self):
+    def ztest_concurrency_fedora(self):
         self.t_concurrency(self.fedora_host, self.fedora_user)
 
-    def test_concurrency_ubuntu(self):
+    def ztest_concurrency_ubuntu(self):
         self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
 
     def test_condition_fedora(self):