Adding start_with_condition, stop_with_condition and set_with_condition to the EC...
[nepi.git] / src / neco / execution / ec.py
index 076e4dd..1b7d4be 100644 (file)
@@ -6,10 +6,13 @@ import threading
 
 from neco.util import guid
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceAction, \
+        ResourceState
 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.util.parallel import ParallelRun
 
+# TODO: use multiprocessing instead of threading
+
 class ExperimentController(object):
     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
         super(ExperimentController, self).__init__()
@@ -75,46 +78,43 @@ class ExperimentController(object):
         rm1.connect(guid2)
         rm2.connect(guid1)
 
-    def discover_resource(self, guid, filters):
-        rm = self.get_resource(guid)
-        return rm.discover(filters)
+    def register_condition(self, group1, action, group2, state,
+            time = None):
+        """ Registers an action START or STOP for all RM on group1 to occur 
+            time 'time' after all elements in group2 reached state 'state'.
 
-    def provision_resource(self, guid, filters):
-        rm = self.get_resource(guid)
-        return rm.provision(filters)
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
 
-    def register_start(self, group1, time, after_status, group2):
-        if isinstance(group1, int):
-            group1 = list[group1]
-        if isinstance(group2, int):
-            group2 = list[group2]
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
 
-        for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.start_after(time, after_status, guid2)
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
 
-    def register_stop(self, group1, time, after_status, group2):
-        if isinstance(group1, int):
-            group1 = list[group1]
-        if isinstance(group2, int):
-            group2 = list[group2]
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
 
-        for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.stop_after(time, after_status, guid2)
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
 
-    def register_set(self, name, value, group1, time, after_status, group2):
+        """
         if isinstance(group1, int):
             group1 = list[group1]
         if isinstance(group2, int):
             group2 = list[group2]
 
         for guid1 in group1:
-            for guid2 in group2:
-                rm = self.get_resource(guid)
-                rm.set_after(name, value, time, after_status, guid2)
+            rm = self.get_resource(guid)
+            rm.register_condition(action, group2, state, time)
+
+    def discover(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.discover(filters)
+
+    def provision(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.provision(filters)
 
     def get(self, guid, name):
         rm = self.get_resource(guid)
@@ -124,15 +124,83 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.set(name, value)
 
-    def status(self, guid):
+    def state(self, guid):
         rm = self.get_resource(guid)
-        return rm.status()
+        return rm.state
 
     def stop(self, guid):
         rm = self.get_resource(guid)
         return rm.stop()
 
-    def deploy(self, group = None, start_when_all_ready = True):
+    def start(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start()
+
+    def set_with_conditions(self, name, value, group1, group2, state,
+            time = None):
+        """ Set value 'value' on attribute with name 'name' on all RMs of
+            group1 when 'time' has elapsed since all elements in group2 
+            have reached state 'state'.
+
+            :param name: Name of attribute to set in RM
+            :type name: string
+
+            :param value: Value of attribute to set in RM
+            :type name: string
+
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
+
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
+
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
+
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
+
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
+
+        """
+        if isinstance(group1, int):
+            group1 = list[group1]
+        if isinstance(group2, int):
+            group2 = list[group2]
+
+        for guid1 in group1:
+            rm = self.get_resource(guid)
+            rm.set_with_conditions(name, value, group2, state, time)
+
+    def stop_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.stop_with_conditions()
+
+    def start_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start_with_condition()
+
+    def deploy(self, group = None, wait_all_ready = True):
+        """ Deploy all resource manager in group
+
+        :param group: List of guids of RMs to deploy
+        :type group: list
+
+        :param wait_all_ready: Wait until all RMs are deployed in
+            order to start the RMs
+        :type guid: int
+
+        """
+        def steps(rm):
+            rm.deploy()
+            rm.start_with_conditions()
+
+            # Only if the RM has STOP consitions we
+            # schedule a stop. Otherwise the RM will stop immediately
+            if rm.conditions.get(ResourceAction.STOP):
+                rm.stop_with_conditions()
+
         if not group:
             group = self.resources
 
@@ -140,13 +208,13 @@ class ExperimentController(object):
         for guid in group:
             rm = self.get_resource(guid)
 
-            kwargs = {'target': rm.deploy}
-            if start_when_all_ready:
+            if wait_all_ready:
                 towait = list(group)
                 towait.remove(guid)
-                kwargs['args'] = towait
+                self.register_condition(guid, ResourceAction.START, 
+                        towait, ResourceState.DEPLOYED)
 
-            thread = threading.Thread(kwargs)
+            thread = threading.Thread(target = steps, args = (rm))
             threads.append(thread)
             thread.start()