From f072f25afac82e5a878fbc5566108a4206d3f5c7 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac <alina.quereilhac@inria.fr> Date: Wed, 24 Jul 2013 10:25:53 -0700 Subject: [PATCH] Adding deployment group --- src/nepi/execution/ec.py | 140 +++++++++++++---------- src/nepi/execution/resource.py | 2 + src/nepi/resources/linux/ccn/fibentry.py | 6 +- 3 files changed, 87 insertions(+), 61 deletions(-) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 7ee7a470..e1d6cb01 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -128,6 +128,12 @@ class ExperimentController(object): # Tasks self._tasks = dict() + # RM groups + self._groups = dict() + + # generator of globally unique id for groups + self._group_id_generator = guid.GuidGenerator() + # Event processing thread self._cond = threading.Condition() self._thread = threading.Thread(target = self._process) @@ -334,35 +340,35 @@ class ExperimentController(object): rm1.register_connection(guid2) rm2.register_connection(guid1) - def register_condition(self, group1, action, group2, state, + def register_condition(self, guids1, action, guids2, state, time = None): - """ Registers an action START or STOP for all RM on group1 to occur - time 'time' after all elements in group2 reached state 'state'. + """ Registers an action START or STOP for all RM on guids1 to occur + time 'time' after all elements in guids2 reached state 'state'. - :param group1: List of guids of RMs subjected to action - :type group1: list + :param guids1: List of guids of RMs subjected to action + :type guids1: list :param action: Action to register (either START or STOP) :type action: ResourceAction - :param group2: List of guids of RMs to we waited for - :type group2: list + :param guids2: List of guids of RMs to we waited for + :type guids2: list :param state: State to wait for on RMs (STARTED, STOPPED, etc) :type state: ResourceState - :param time: Time to wait after group2 has reached status + :param time: Time to wait after guids2 has reached status :type time: string """ - if isinstance(group1, int): - group1 = [group1] - if isinstance(group2, int): - group2 = [group2] + if isinstance(guids1, int): + guids1 = [guids1] + if isinstance(guids2, int): + guids2 = [guids2] - for guid1 in group1: + for guid1 in guids1: rm = self.get_resource(guid1) - rm.register_condition(action, group2, state, time) + rm.register_condition(action, guids2, state, time) def enable_trace(self, guid, name): """ Enable trace @@ -494,10 +500,10 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start() - def set_with_conditions(self, name, value, group1, group2, state, + def set_with_conditions(self, name, value, guids1, guids2, state, time = None): """ Set value 'value' on attribute with name 'name' on all RMs of - group1 when 'time' has elapsed since all elements in group2 + guids1 when 'time' has elapsed since all elements in guids2 have reached state 'state'. :param name: Name of attribute to set in RM @@ -506,30 +512,30 @@ class ExperimentController(object): :param value: Value of attribute to set in RM :type name: string - :param group1: List of guids of RMs subjected to action - :type group1: list + :param guids1: List of guids of RMs subjected to action + :type guids1: list :param action: Action to register (either START or STOP) :type action: ResourceAction - :param group2: List of guids of RMs to we waited for - :type group2: list + :param guids2: List of guids of RMs to we waited for + :type guids2: list :param state: State to wait for on RMs (STARTED, STOPPED, etc) :type state: ResourceState - :param time: Time to wait after group2 has reached status + :param time: Time to wait after guids2 has reached status :type time: string """ - if isinstance(group1, int): - group1 = [group1] - if isinstance(group2, int): - group2 = [group2] + if isinstance(guids1, int): + guids1 = [guids1] + if isinstance(guids2, int): + guids2 = [guids2] - for guid1 in group1: + for guid1 in guids1: rm = self.get_resource(guid) - rm.set_with_conditions(name, value, group2, state, time) + rm.set_with_conditions(name, value, guids2, state, time) def stop_with_conditions(self, guid): """ Stop a specific RM defined by its 'guid' only if all the conditions are true @@ -551,33 +557,44 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start_with_conditions() - def deploy(self, group = None, wait_all_ready = True): - """ Deploy all resource manager in group + def deploy(self, guids = None, wait_all_ready = True, group = None): + """ Deploy all resource manager in guids list - :param group: List of guids of RMs to deploy - :type group: list + :param guids: List of guids of RMs to deploy + :type guids: list :param wait_all_ready: Wait until all RMs are ready in order to start the RMs :type guid: int + :param group: Id of deployment group in which to deploy RMs + :type group: int + """ self.logger.debug(" ------- DEPLOY START ------ ") - if not group: - # By default, if not deployment group is indicated, - # all RMs that are undeployed will be deployed - group = [] + if not guids: + # If no guids list was indicated, all 'NEW' RMs will be deployed + guids = [] for guid in self.resources: if self.state(guid) == ResourceState.NEW: - group.append(guid) + guids.append(guid) - if isinstance(group, int): - group = [group] + if isinstance(guids, int): + guids = [guids] + + # Create deployment group + if not group: + group = self._group_id_generator.next(guid) + + if group not in self._groups: + self._groups[group] = [] - # Before starting deployment we disorder the group list with the + self._groups[group].extend(guids) + + # Before starting deployment we disorder the guids list with the # purpose of speeding up the whole deployment process. - # It is likely that the user inserted in the 'group' list closely + # It is likely that the user inserted in the 'guids' list closely # resources one after another (e.g. all applications # connected to the same node can likely appear one after another). # This can originate a slow down in the deployment since the N @@ -585,12 +602,16 @@ class ExperimentController(object): # be taken up by the same family of resources waiting for the # same conditions (e.g. LinuxApplications running on a same # node share a single lock, so they will tend to be serialized). - # If we disorder the group list, this problem can be mitigated. - random.shuffle(group) + # If we disorder the guids list, this problem can be mitigated. + random.shuffle(guids) def wait_all_and_start(group): reschedule = False - for guid in group: + + # Get all guids in group + guids = self._groups[group] + + for guid in guids: if self.state(guid) < ResourceState.READY: reschedule = True break @@ -600,20 +621,23 @@ class ExperimentController(object): self.schedule("1s", callback) else: # If all resources are read, we schedule the start - for guid in group: + for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.start_with_conditions) if wait_all_ready: - # Schedule the function that will check all resources are - # READY, and only then it will schedule the start. - # This is aimed to reduce the number of tasks looping in the scheduler. - # Intead of having N start tasks, we will have only one + # Schedule a function to check that all resources are + # READY, and only then schedule the start. + # This aimes at reducing the number of tasks looping in the + # scheduler. + # Intead of having N start tasks, we will have only one for + # the whole group. callback = functools.partial(wait_all_and_start, group) self.schedule("1s", callback) - for guid in group: + for guid in guids: rm = self.get_resource(guid) + rm.deployment_group = group self.schedule("0s", rm.deploy) if not wait_all_ready: @@ -624,22 +648,22 @@ class ExperimentController(object): # schedule a stop. Otherwise the RM will stop immediately self.schedule("2s", rm.stop_with_conditions) - def release(self, group = None): - """ Release the elements of the list 'group' or - all the resources if any group is specified + def release(self, guids = None): + """ Release al RMs on the guids list or + all the resources if no list is specified - :param group: List of RM - :type group: list + :param guids: List of RM guids + :type guids: list """ - if not group: - group = self.resources + if not guids: + guids = self.resources - for guid in group: + for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.release) - self.wait_released(group) + self.wait_released(guids) def shutdown(self): """ Shutdown the Experiment Controller. diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 8cb36730..978f0eeb 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -202,6 +202,8 @@ class ResourceManager(Logger): self._state = ResourceState.NEW + self.deployment_group = None + self._start_time = None self._stop_time = None self._discover_time = None diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index fe0330e8..1f39a986 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -158,7 +158,7 @@ class LinuxFIBEntry(LinuxApplication): self.ec.set(self._ping, "target", self.get("host")) self.ec.register_connection(self._ping, self.node.guid) # schedule ping deploy - self.ec.deploy(group=[self._ping]) + self.ec.deploy(guids=[self._ping], group = self.deployment_group) if self.trace_enabled("mtr"): self.info("Configuring MTR trace") @@ -169,7 +169,7 @@ class LinuxFIBEntry(LinuxApplication): self.ec.set(self._mtr, "target", self.get("host")) self.ec.register_connection(self._mtr, self.node.guid) # schedule mtr deploy - self.ec.deploy(group=[self._mtr]) + self.ec.deploy(guids=[self._mtr], group = self.deployment_group) if self.trace_enabled("traceroute"): self.info("Configuring TRACEROUTE trace") @@ -179,7 +179,7 @@ class LinuxFIBEntry(LinuxApplication): self.ec.set(self._traceroute, "target", self.get("host")) self.ec.register_connection(self._traceroute, self.node.guid) # schedule mtr deploy - self.ec.deploy(group=[self._traceroute]) + self.ec.deploy(guids=[self._traceroute], group = self.deployment_group) def start(self): if self._state in [ResourceState.READY, ResourceState.STARTED]: -- 2.47.0