From: Alina Quereilhac Date: Wed, 25 Sep 2013 14:17:57 +0000 (+0200) Subject: Adding method deploy_with_conditions to Resource X-Git-Tag: nepi-3.0.0~40 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=0016ed76a0c9b5d058510df63d42e71c346a4e64;p=nepi.git Adding method deploy_with_conditions to Resource --- diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 6beb3d76..e59e626f 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -587,7 +587,7 @@ class ExperimentController(object): new_group = False if not group: new_group = True - group = self._group_id_generator.next(guid) + group = self._group_id_generator.next() if group not in self._groups: self._groups[group] = [] @@ -640,7 +640,7 @@ class ExperimentController(object): for guid in guids: rm = self.get_resource(guid) rm.deployment_group = group - self.schedule("0s", rm.deploy) + self.schedule("0s", rm.deploy_with_conditions) if not wait_all_ready: self.schedule("1s", rm.start_with_conditions) diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index f4aa1695..95e31ff9 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -447,7 +447,7 @@ class ResourceManager(Logger): :type action: str :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str @@ -469,7 +469,7 @@ class ResourceManager(Logger): def unregister_condition(self, group, action = None): """ Removed conditions for a certain group of guids - :param action: Action to restrict to condition (either 'START' or 'STOP') + :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY') :type action: str :param group: Group of RMs to wait for (list of guids) @@ -518,7 +518,7 @@ class ResourceManager(Logger): :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str @@ -554,7 +554,6 @@ class ResourceManager(Logger): elif state == ResourceState.STOPPED: t = rm.stop_time else: - # Only keep time information for START and STOP break # time already elapsed since RM changed state @@ -674,6 +673,47 @@ class ResourceManager(Logger): self.debug(" ----- STOPPING ---- ") self.stop() + def deploy_with_conditions(self): + """ Deploy RM when all the conditions in self.conditions for + action 'READY' are satisfied. + + """ + reschedule = False + delay = reschedule_delay + + ## evaluate if set conditions are met + + # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED + if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, + ResourceState.PROVISIONED]: + reschedule = True + self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) + else: + deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, []) + + self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) + + # Verify all start conditions are met + for (group, state, time) in deploy_conditions: + # Uncomment for debug + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + # + #self.debug("---- WAITED STATES ---- %s" % unmet ) + + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + self.ec.schedule(delay, self.deploy_with_conditions) + else: + self.debug("----- STARTING ---- ") + self.deploy() + + def connect(self, guid): """ Performs actions that need to be taken upon associating RMs. This method should be redefined when necessary in child classes. diff --git a/src/nepi/resources/linux/ccn/ccnping.py b/src/nepi/resources/linux/ccn/ccnping.py index 2e3f4854..78215974 100644 --- a/src/nepi/resources/linux/ccn/ccnping.py +++ b/src/nepi/resources/linux/ccn/ccnping.py @@ -38,7 +38,7 @@ class LinuxCCNPing(LinuxCCNPingServer): count = Attribute("c", "Total number of pings", - type = Types.Integer, + type = Types.Double, flags = Flags.ExecReadOnly) number = Attribute("n", @@ -84,7 +84,7 @@ class LinuxCCNPing(LinuxCCNPingServer): if self.get("n"): args.append("-n %d" % self.get("n")) if self.get("i"): - args.append("-i %d" % self.get("i")) + args.append("-i %.2f" % self.get("i")) command = " ".join(args) diff --git a/src/nepi/resources/linux/ccn/fibentry.py b/src/nepi/resources/linux/ccn/fibentry.py index 9d8e8c25..aad03caf 100644 --- a/src/nepi/resources/linux/ccn/fibentry.py +++ b/src/nepi/resources/linux/ccn/fibentry.py @@ -215,13 +215,17 @@ class LinuxFIBEntry(LinuxApplication): self.info("Stopping command '%s'" % command) command = self._stop_command - (out, err), proc = self.execute_command(command, env) - - if proc.poll(): - pass + (out, err), proc = self.execute_command(command, env, + blocking = True) self.set_stopped() + if err: + msg = " Failed to execute command '%s'" % command + self.error(msg, out, err) + self.fail() + raise RuntimeError, msg + @property def _start_command(self): uri = self.get("uri") or ""