Adding deployment group
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 17:25:53 +0000 (10:25 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 17:25:53 +0000 (10:25 -0700)
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/ccn/fibentry.py

index 7ee7a47..e1d6cb0 100644 (file)
@@ -128,6 +128,12 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
+        # RM groups 
+        self._groups = dict()
+
+        # generator of globally unique id for groups
+        self._group_id_generator = guid.GuidGenerator()
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
@@ -334,35 +340,35 @@ class ExperimentController(object):
         rm1.register_connection(guid2)
         rm2.register_connection(guid1)
 
-    def register_condition(self, group1, action, group2, state,
+    def register_condition(self, guids1, action, guids2, state,
             time = None):
-        """ Registers an action START or STOP for all RM on group1 to occur 
-            time 'time' after all elements in group2 reached state 'state'.
+        """ Registers an action START or STOP for all RM on guids1 to occur 
+            time 'time' after all elements in guids2 reached state 'state'.
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid1)
-            rm.register_condition(action, group2, state, time)
+            rm.register_condition(action, guids2, state, time)
 
     def enable_trace(self, guid, name):
         """ Enable trace
@@ -494,10 +500,10 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start()
 
-    def set_with_conditions(self, name, value, group1, group2, state,
+    def set_with_conditions(self, name, value, guids1, guids2, state,
             time = None):
         """ Set value 'value' on attribute with name 'name' on all RMs of
-            group1 when 'time' has elapsed since all elements in group
+            guids1 when 'time' has elapsed since all elements in guids
             have reached state 'state'.
 
             :param name: Name of attribute to set in RM
@@ -506,30 +512,30 @@ class ExperimentController(object):
             :param value: Value of attribute to set in RM
             :type name: string
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid)
-            rm.set_with_conditions(name, value, group2, state, time)
+            rm.set_with_conditions(name, value, guids2, state, time)
 
     def stop_with_conditions(self, guid):
         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
@@ -551,33 +557,44 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start_with_conditions()
 
-    def deploy(self, group = None, wait_all_ready = True):
-        """ Deploy all resource manager in group
+    def deploy(self, guids = None, wait_all_ready = True, group = None):
+        """ Deploy all resource manager in guids list
 
-        :param group: List of guids of RMs to deploy
-        :type group: list
+        :param guids: List of guids of RMs to deploy
+        :type guids: list
 
         :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
+        :param group: Id of deployment group in which to deploy RMs
+        :type group: int
+
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        if not group:
-            # By default, if not deployment group is indicated, 
-            # all RMs that are undeployed will be deployed
-            group = []
+        if not guids:
+            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
-                    group.append(guid)
+                    guids.append(guid)
                 
-        if isinstance(group, int):
-            group = [group]
+        if isinstance(guids, int):
+            guids = [guids]
+
+        # Create deployment group 
+        if not group:
+            group = self._group_id_generator.next(guid)
+
+        if group not in self._groups:
+            self._groups[group] = []
 
-        # Before starting deployment we disorder the group list with the
+        self._groups[group].extend(guids)
+
+        # Before starting deployment we disorder the guids list with the
         # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'group' list closely
+        # It is likely that the user inserted in the 'guids' list closely
         # resources one after another (e.g. all applications
         # connected to the same node can likely appear one after another).
         # This can originate a slow down in the deployment since the N 
@@ -585,12 +602,16 @@ class ExperimentController(object):
         # be taken up by the same family of resources waiting for the 
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
-        # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        # If we disorder the guids list, this problem can be mitigated.
+        random.shuffle(guids)
 
         def wait_all_and_start(group):
             reschedule = False
-            for guid in group:
+            
+            # Get all guids in group
+            guids = self._groups[group]
+
+            for guid in guids:
                 if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
@@ -600,20 +621,23 @@ class ExperimentController(object):
                 self.schedule("1s", callback)
             else:
                 # If all resources are read, we schedule the start
-                for guid in group:
+                for guid in guids:
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
-            # Schedule the function that will check all resources are
-            # READY, and only then it will schedule the start.
-            # This is aimed to reduce the number of tasks looping in the scheduler.
-            # Intead of having N start tasks, we will have only one
+            # Schedule a function to check that all resources are
+            # READY, and only then schedule the start.
+            # This aimes at reducing the number of tasks looping in the 
+            # scheduler. 
+            # Intead of having N start tasks, we will have only one for 
+            # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
+            rm.deployment_group = group
             self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
@@ -624,22 +648,22 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-    def release(self, group = None):
-        """ Release the elements of the list 'group' or 
-        all the resources if any group is specified
+    def release(self, guids = None):
+        """ Release al RMs on the guids list or 
+        all the resources if no list is specified
 
-            :param group: List of RM
-            :type group: list
+            :param guids: List of RM guids
+            :type guids: list
 
         """
-        if not group:
-            group = self.resources
+        if not guids:
+            guids = self.resources
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
             self.schedule("0s", rm.release)
 
-        self.wait_released(group)
+        self.wait_released(guids)
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
index 8cb3673..978f0ee 100644 (file)
@@ -202,6 +202,8 @@ class ResourceManager(Logger):
 
         self._state = ResourceState.NEW
 
+        self.deployment_group = None
+
         self._start_time = None
         self._stop_time = None
         self._discover_time = None
index fe0330e..1f39a98 100644 (file)
@@ -158,7 +158,7 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._ping, "target", self.get("host"))
             self.ec.register_connection(self._ping, self.node.guid)
             # schedule ping deploy
-            self.ec.deploy(group=[self._ping])
+            self.ec.deploy(guids=[self._ping], group = self.deployment_group)
 
         if self.trace_enabled("mtr"):
             self.info("Configuring MTR trace")
@@ -169,7 +169,7 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._mtr, "target", self.get("host"))
             self.ec.register_connection(self._mtr, self.node.guid)
             # schedule mtr deploy
-            self.ec.deploy(group=[self._mtr])
+            self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
 
         if self.trace_enabled("traceroute"):
             self.info("Configuring TRACEROUTE trace")
@@ -179,7 +179,7 @@ class LinuxFIBEntry(LinuxApplication):
             self.ec.set(self._traceroute, "target", self.get("host"))
             self.ec.register_connection(self._traceroute, self.node.guid)
             # schedule mtr deploy
-            self.ec.deploy(group=[self._traceroute])
+            self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
     def start(self):
         if self._state in [ResourceState.READY, ResourceState.STARTED]: