Commiting merged branch nepi-3-dev
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 17:26:18 +0000 (10:26 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 17:26:18 +0000 (10:26 -0700)
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/all/collector.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/traceroute.py

index 7ee7a47..e1d6cb0 100644 (file)
@@ -128,6 +128,12 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
+        # RM groups 
+        self._groups = dict()
+
+        # generator of globally unique id for groups
+        self._group_id_generator = guid.GuidGenerator()
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
@@ -334,35 +340,35 @@ class ExperimentController(object):
         rm1.register_connection(guid2)
         rm2.register_connection(guid1)
 
-    def register_condition(self, group1, action, group2, state,
+    def register_condition(self, guids1, action, guids2, state,
             time = None):
-        """ Registers an action START or STOP for all RM on group1 to occur 
-            time 'time' after all elements in group2 reached state 'state'.
+        """ Registers an action START or STOP for all RM on guids1 to occur 
+            time 'time' after all elements in guids2 reached state 'state'.
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid1)
-            rm.register_condition(action, group2, state, time)
+            rm.register_condition(action, guids2, state, time)
 
     def enable_trace(self, guid, name):
         """ Enable trace
@@ -494,10 +500,10 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start()
 
-    def set_with_conditions(self, name, value, group1, group2, state,
+    def set_with_conditions(self, name, value, guids1, guids2, state,
             time = None):
         """ Set value 'value' on attribute with name 'name' on all RMs of
-            group1 when 'time' has elapsed since all elements in group
+            guids1 when 'time' has elapsed since all elements in guids
             have reached state 'state'.
 
             :param name: Name of attribute to set in RM
@@ -506,30 +512,30 @@ class ExperimentController(object):
             :param value: Value of attribute to set in RM
             :type name: string
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid)
-            rm.set_with_conditions(name, value, group2, state, time)
+            rm.set_with_conditions(name, value, guids2, state, time)
 
     def stop_with_conditions(self, guid):
         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
@@ -551,33 +557,44 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start_with_conditions()
 
-    def deploy(self, group = None, wait_all_ready = True):
-        """ Deploy all resource manager in group
+    def deploy(self, guids = None, wait_all_ready = True, group = None):
+        """ Deploy all resource manager in guids list
 
-        :param group: List of guids of RMs to deploy
-        :type group: list
+        :param guids: List of guids of RMs to deploy
+        :type guids: list
 
         :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
+        :param group: Id of deployment group in which to deploy RMs
+        :type group: int
+
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        if not group:
-            # By default, if not deployment group is indicated, 
-            # all RMs that are undeployed will be deployed
-            group = []
+        if not guids:
+            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
-                    group.append(guid)
+                    guids.append(guid)
                 
-        if isinstance(group, int):
-            group = [group]
+        if isinstance(guids, int):
+            guids = [guids]
+
+        # Create deployment group 
+        if not group:
+            group = self._group_id_generator.next(guid)
+
+        if group not in self._groups:
+            self._groups[group] = []
 
-        # Before starting deployment we disorder the group list with the
+        self._groups[group].extend(guids)
+
+        # Before starting deployment we disorder the guids list with the
         # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'group' list closely
+        # It is likely that the user inserted in the 'guids' list closely
         # resources one after another (e.g. all applications
         # connected to the same node can likely appear one after another).
         # This can originate a slow down in the deployment since the N 
@@ -585,12 +602,16 @@ class ExperimentController(object):
         # be taken up by the same family of resources waiting for the 
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
-        # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        # If we disorder the guids list, this problem can be mitigated.
+        random.shuffle(guids)
 
         def wait_all_and_start(group):
             reschedule = False
-            for guid in group:
+            
+            # Get all guids in group
+            guids = self._groups[group]
+
+            for guid in guids:
                 if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
@@ -600,20 +621,23 @@ class ExperimentController(object):
                 self.schedule("1s", callback)
             else:
                 # If all resources are read, we schedule the start
-                for guid in group:
+                for guid in guids:
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
-            # Schedule the function that will check all resources are
-            # READY, and only then it will schedule the start.
-            # This is aimed to reduce the number of tasks looping in the scheduler.
-            # Intead of having N start tasks, we will have only one
+            # Schedule a function to check that all resources are
+            # READY, and only then schedule the start.
+            # This aimes at reducing the number of tasks looping in the 
+            # scheduler. 
+            # Intead of having N start tasks, we will have only one for 
+            # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
+            rm.deployment_group = group
             self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
@@ -624,22 +648,22 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-    def release(self, group = None):
-        """ Release the elements of the list 'group' or 
-        all the resources if any group is specified
+    def release(self, guids = None):
+        """ Release al RMs on the guids list or 
+        all the resources if no list is specified
 
-            :param group: List of RM
-            :type group: list
+            :param guids: List of RM guids
+            :type guids: list
 
         """
-        if not group:
-            group = self.resources
+        if not guids:
+            guids = self.resources
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
             self.schedule("0s", rm.release)
 
-        self.wait_released(group)
+        self.wait_released(guids)
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
index 8cb3673..978f0ee 100644 (file)
@@ -202,6 +202,8 @@ class ResourceManager(Logger):
 
         self._state = ResourceState.NEW
 
+        self.deployment_group = None
+
         self._start_time = None
         self._stop_time = None
         self._discover_time = None
index 61efa22..6101cdd 100644 (file)
@@ -118,9 +118,15 @@ class Collector(ResourceManager):
             result = self.ec.trace(rm.guid, trace_name)
             fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
                 rename))
-            f = open(fpath, "w")
-            f.write(result)
-            f.close()
+            try:
+                f = open(fpath, "w")
+                f.write(result)
+                f.close()
+            except:
+                msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, 
+                        rm.guid, fpath)
+                self.error(msg)
+                continue
 
         super(Collector, self).release()
 
index e874bcf..380470c 100644 (file)
@@ -585,14 +585,13 @@ class LinuxApplication(ResourceManager):
 
         if self.state == ResourceState.STARTED:
         
-            stopped = True
-
             self.info("Stopping command '%s'" % command)
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
+            stopped = True
 
             if self._proc:
                 self._proc.kill()
@@ -608,10 +607,9 @@ class LinuxApplication(ResourceManager):
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
-                        stopped = False
 
-            if stopped:
-                super(LinuxApplication, self).stop()
+        if self.state == ResourceState.STARTED:
+            super(LinuxApplication, self).stop()
 
     def release(self):
         self.info("Releasing resource")
@@ -623,6 +621,8 @@ class LinuxApplication(ResourceManager):
         self.stop()
 
         if self.state == ResourceState.STOPPED:
+            self.info("Resource released")
+
             super(LinuxApplication, self).release()
     
     @property
index 35d1563..9d531de 100644 (file)
@@ -108,25 +108,24 @@ class LinuxCCNContent(LinuxApplication):
         command = self.get("command")
         env = self.get("env")
 
-        if command:
-            self.info("Uploading command '%s'" % command)
-
-            # We want to make sure the content is published
-            # before the experiment starts.
-            # Run the command as a bash script in the background, 
-            # in the host ( but wait until the command has
-            # finished to continue )
-            env = self.replace_paths(env)
-            command = self.replace_paths(command)
-
-            (out, err), proc = self.execute_command(command, env, 
-                    blocking = True)
-
-            if proc.poll():
-                self.fail()
-                msg = "Failed to execute command"
-                self.error(msg, out, err)
-                raise RuntimeError, msg
+        self.info("Uploading command '%s'" % command)
+
+        # We want to make sure the content is published
+        # before the experiment starts.
+        # Run the command as a bash script in the background, 
+        # in the host ( but wait until the command has
+        # finished to continue )
+        env = self.replace_paths(env)
+        command = self.replace_paths(command)
+
+        (out, err), proc = self.execute_command(command, env, 
+                blocking = True)
+
+        if proc.poll():
+            self.fail()
+            msg = "Failed to execute command"
+            self.error(msg, out, err)
+            raise RuntimeError, msg
 
     def start(self):
         if self._state == ResourceState.READY:
index 7ed260c..ac10840 100644 (file)
@@ -146,6 +146,11 @@ class LinuxCCNR(LinuxApplication):
             "Sets the CCNS_SYNC_SCOPE environmental variable. ",
             flags = Flags.ExecReadOnly)
 
+        repo_file = Attribute("repoFile1",
+            "The Repository uses $CCNR_DIRECTORY/repoFile1 for "
+            "persistent storage of CCN Content Objects",
+            flags = Flags.ExecReadOnly)
+
         cls._register_attribute(max_fanout)
         cls._register_attribute(max_leaf_entries)
         cls._register_attribute(max_node_bytes)
@@ -172,6 +177,7 @@ class LinuxCCNR(LinuxApplication):
         cls._register_attribute(ccns_root_advise_lifetime)
         cls._register_attribute(ccns_stable_enabled)
         cls._register_attribute(ccns_sync_scope)
+        cls._register_attribute(repo_file)
 
     @classmethod
     def _register_traces(cls):
@@ -226,22 +232,30 @@ class LinuxCCNR(LinuxApplication):
         command = self.get("command")
         env = self.get("env")
 
-        if command:
-            # We want to make sure the repository is running
-            # before the experiment starts.
-            # Run the command as a bash script in background,
-            # in the host ( but wait until the command has
-            # finished to continue )
-            env = self.replace_paths(env)
-            command = self.replace_paths(command)
-
-            shfile = os.path.join(self.app_home, "start.sh")
-            self.node.run_and_wait(command, self.run_home,
-                    shfile = shfile,
-                    overwrite = False,
-                    env = env,
-                    raise_on_error = True)
+        if self.get("repoFile1"):
+            # upload repoFile1
+            local_file = self.get("repoFile1")
+            remote_file = "${RUN_HOME}/repoFile1"
+            remote_file = self.replace_paths(remote_file)
+            self.node.upload(local_file,
+                    remote_file,
+                    overwrite = False)
+
+        # We want to make sure the repository is running
+        # before the experiment starts.
+        # Run the command as a bash script in background,
+        # in the host ( but wait until the command has
+        # finished to continue )
+        env = self.replace_paths(env)
+        command = self.replace_paths(command)
+
+        shfile = os.path.join(self.app_home, "start.sh")
+        self.node.run_and_wait(command, self.run_home,
+                shfile = shfile,
+                overwrite = False,
+                env = env,
+                raise_on_error = True)
+
     def start(self):
         if self._state == ResourceState.READY:
             command = self.get("command")
index 0e4a687..1f39a98 100644 (file)
@@ -134,22 +134,21 @@ class LinuxFIBEntry(LinuxApplication):
         command = self.get("command")
         env = self.get("env")
 
-        if command:
-            # We want to make sure the FIB entries are created
-            # before the experiment starts.
-            # Run the command as a bash script in the background, 
-            # in the host ( but wait until the command has
-            # finished to continue )
-            env = env and self.replace_paths(env)
-            command = self.replace_paths(command)
+        # We want to make sure the FIB entries are created
+        # before the experiment starts.
+        # Run the command as a bash script in the background, 
+        # in the host ( but wait until the command has
+        # finished to continue )
+        env = env and self.replace_paths(env)
+        command = self.replace_paths(command)
 
-            (out, err), proc = self.execute_command(command, env)
+        (out, err), proc = self.execute_command(command, env)
 
-            if proc.poll():
-                self._state = ResourceState.FAILED
-                msg = "Failed to execute command"
-                self.error(msg, out, err)
-                raise RuntimeError, msg
+        if proc.poll():
+            self._state = ResourceState.FAILED
+            msg = "Failed to execute command"
+            self.error(msg, out, err)
+            raise RuntimeError, msg
 
     def configure(self):
         if self.trace_enabled("ping"):
@@ -158,11 +157,8 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._ping, "printTimestamp", True)
             self.ec.set(self._ping, "target", self.get("host"))
             self.ec.register_connection(self._ping, self.node.guid)
-            # force waiting until ping is READY before we starting the FIB
-            self.ec.register_condition(self.guid, ResourceAction.START, 
-                    self._ping, ResourceState.READY)
             # schedule ping deploy
-            self.ec.deploy(group=[self._ping])
+            self.ec.deploy(guids=[self._ping], group = self.deployment_group)
 
         if self.trace_enabled("mtr"):
             self.info("Configuring MTR trace")
@@ -172,11 +168,8 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._mtr, "continuous", True)
             self.ec.set(self._mtr, "target", self.get("host"))
             self.ec.register_connection(self._mtr, self.node.guid)
-            # force waiting until mtr is READY before we starting the FIB
-            self.ec.register_condition(self.guid, ResourceAction.START, 
-                    self._mtr, ResourceState.READY)
             # schedule mtr deploy
-            self.ec.deploy(group=[self._mtr])
+            self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
 
         if self.trace_enabled("traceroute"):
             self.info("Configuring TRACEROUTE trace")
@@ -185,11 +178,8 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._traceroute, "continuous", True)
             self.ec.set(self._traceroute, "target", self.get("host"))
             self.ec.register_connection(self._traceroute, self.node.guid)
-            # force waiting until mtr is READY before we starting the FIB
-            self.ec.register_condition(self.guid, ResourceAction.START, 
-                    self._traceroute, ResourceState.READY)
             # schedule mtr deploy
-            self.ec.deploy(group=[self._traceroute])
+            self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
     def start(self):
         if self._state in [ResourceState.READY, ResourceState.STARTED]:
index ea42514..4d55eb1 100644 (file)
@@ -73,7 +73,7 @@ class LinuxTraceroute(LinuxApplication):
         args.append("traceroute")
         args.append(self.get("target"))
         if self.get("continuous") == True:
-            args.append("; sleep 5 ; done ")
+            args.append("; sleep 2 ; done ")
 
         command = " ".join(args)