Adding method deploy_with_conditions to Resource
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 25 Sep 2013 14:17:57 +0000 (16:17 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 25 Sep 2013 14:17:57 +0000 (16:17 +0200)
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/ccn/ccnping.py
src/nepi/resources/linux/ccn/fibentry.py

index 6beb3d7..e59e626 100644 (file)
@@ -587,7 +587,7 @@ class ExperimentController(object):
         new_group = False
         if not group:
             new_group = True
-            group = self._group_id_generator.next(guid)
+            group = self._group_id_generator.next()
 
         if group not in self._groups:
             self._groups[group] = []
@@ -640,7 +640,7 @@ class ExperimentController(object):
         for guid in guids:
             rm = self.get_resource(guid)
             rm.deployment_group = group
-            self.schedule("0s", rm.deploy)
+            self.schedule("0s", rm.deploy_with_conditions)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
index f4aa169..95e31ff 100644 (file)
@@ -447,7 +447,7 @@ class ResourceManager(Logger):
         :type action: str
         :param group: Group of RMs to wait for (list of guids)
         :type group: int or list of int
-        :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+        :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
         :type state: str
         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
@@ -469,7 +469,7 @@ class ResourceManager(Logger):
     def unregister_condition(self, group, action = None):
         """ Removed conditions for a certain group of guids
 
-        :param action: Action to restrict to condition (either 'START' or 'STOP')
+        :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
         :type action: str
 
         :param group: Group of RMs to wait for (list of guids)
@@ -518,7 +518,7 @@ class ResourceManager(Logger):
 
         :param group: Group of RMs to wait for (list of guids)
         :type group: int or list of int
-        :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+        :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
         :type state: str
         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
@@ -554,7 +554,6 @@ class ResourceManager(Logger):
                 elif state == ResourceState.STOPPED:
                     t = rm.stop_time
                 else:
-                    # Only keep time information for START and STOP
                     break
 
                 # time already elapsed since RM changed state
@@ -674,6 +673,47 @@ class ResourceManager(Logger):
             self.debug(" ----- STOPPING ---- ") 
             self.stop()
 
+    def deploy_with_conditions(self):
+        """ Deploy RM when all the conditions in self.conditions for
+        action 'READY' are satisfied.
+
+        """
+        reschedule = False
+        delay = reschedule_delay 
+
+        ## evaluate if set conditions are met
+
+        # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
+        if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
+                ResourceState.PROVISIONED]:
+            reschedule = True
+            self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
+        else:
+            deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
+            
+            self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
+            
+            # Verify all start conditions are met
+            for (group, state, time) in deploy_conditions:
+                # Uncomment for debug
+                #unmet = []
+                #for guid in group:
+                #    rm = self.ec.get_resource(guid)
+                #    unmet.append((guid, rm._state))
+                #
+                #self.debug("---- WAITED STATES ---- %s" % unmet )
+
+                reschedule, delay = self._needs_reschedule(group, state, time)
+                if reschedule:
+                    break
+
+        if reschedule:
+            self.ec.schedule(delay, self.deploy_with_conditions)
+        else:
+            self.debug("----- STARTING ---- ")
+            self.deploy()
+
+
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
index 2e3f485..7821597 100644 (file)
@@ -38,7 +38,7 @@ class LinuxCCNPing(LinuxCCNPingServer):
 
         count = Attribute("c",
             "Total number of pings",
-            type = Types.Integer,
+            type = Types.Double,
             flags = Flags.ExecReadOnly)
 
         number = Attribute("n",
@@ -84,7 +84,7 @@ class LinuxCCNPing(LinuxCCNPingServer):
         if self.get("n"):
             args.append("-n %d" % self.get("n"))
         if self.get("i"):
-            args.append("-i %d" % self.get("i"))
+            args.append("-i %.2f" % self.get("i"))
 
         command = " ".join(args)
 
index 9d8e8c2..aad03ca 100644 (file)
@@ -215,13 +215,17 @@ class LinuxFIBEntry(LinuxApplication):
             self.info("Stopping command '%s'" % command)
 
             command = self._stop_command
-            (out, err), proc = self.execute_command(command, env)
-
-            if proc.poll():
-                pass
+            (out, err), proc = self.execute_command(command, env,
+                    blocking = True)
 
             self.set_stopped()
 
+            if err:
+                msg = " Failed to execute command '%s'" % command
+                self.error(msg, out, err)
+                self.fail()
+                raise RuntimeError, msg
+
     @property
     def _start_command(self):
         uri = self.get("uri") or ""