Adding trace Collector RM
[nepi.git] / src / nepi / execution / resource.py
index 0fcc198..3882523 100644 (file)
@@ -28,7 +28,7 @@ import os
 import pkgutil
 import weakref
 
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
 
 class ResourceAction:
     """ Action that a user can order to a Resource Manager
@@ -288,22 +288,24 @@ class ResourceManager(Logger):
         """
         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
-    def connect(self, guid):
-        """ Establishes a connection to the RM identified by guid
+    def register_connection(self, guid):
+        """ Registers a connection to the RM identified by guid
 
         :param guid: Global unique identified of the RM to connect to
         :type guid: int
         """
         if self.valid_connection(guid):
+            self.connect(guid)
             self._connections.add(guid)
 
-    def disconnect(self, guid):
-        """ Removes connection to the RM identified by guid
+    def unregister_connection(self, guid):
+        """ Removes a registered connection to the RM identified by guid
 
         :param guid: Global unique identified of the RM to connect to
         :type guid: int
         """
         if guid in self._connections:
+            self.disconnect(guid)
             self._connections.remove(guid)
 
     def discover(self):
@@ -406,8 +408,7 @@ class ResourceManager(Logger):
         """
         pass
 
-    def register_condition(self, action, group, state, 
-            time = None):
+    def register_condition(self, action, group, state, time = None):
         """ Registers a condition on the resource manager to allow execution 
         of 'action' only after 'time' has elapsed from the moment all resources 
         in 'group' reached state 'state'
@@ -422,10 +423,11 @@ class ResourceManager(Logger):
         :type time: str
 
         """
+
+        if not action in self.conditions:
+            self._conditions[action] = list()
+        
         conditions = self.conditions.get(action)
-        if not conditions:
-            conditions = list()
-            self._conditions[action] = conditions
 
         # For each condition to register a tuple of (group, state, time) is 
         # added to the 'action' list
@@ -434,6 +436,37 @@ class ResourceManager(Logger):
 
         conditions.append((group, state, time))
 
+    def unregister_condition(self, group, action = None):
+        """ Removed conditions for a certain group of guids
+
+        :param action: Action to restrict to condition (either 'START' or 'STOP')
+        :type action: str
+
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+
+        """
+        # For each condition a tuple of (group, state, time) is 
+        # added to the 'action' list
+        if not isinstance(group, list):
+            group = [group]
+
+        for act, conditions in self.conditions.iteritems():
+            if action and act != action:
+                continue
+
+            for condition in list(conditions):
+                (grp, state, time) = condition
+
+                # If there is an intersection between grp and group,
+                # then remove intersected elements
+                intsec = set(group).intersection(set(grp))
+                if intsec:
+                    idx = conditions.index(condition)
+                    newgrp = set(grp)
+                    newgrp.difference_update(intsec)
+                    conditions[idx] = (newgrp, state, time)
+                 
     def get_connected(self, rtype = None):
         """ Returns the list of RM with the type 'rtype'
 
@@ -471,7 +504,7 @@ class ResourceManager(Logger):
         for guid in group:
             rm = self.ec.get_resource(guid)
             # If the RM state is lower than the requested state we must
-            # reschedule (e.g. if RM is READY but we required STARTED)
+            # reschedule (e.g. if RM is READY but we required STARTED).
             if rm.state < state:
                 reschedule = True
                 break
@@ -499,7 +532,7 @@ class ResourceManager(Logger):
                 # time still to wait
                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
 
-                if wait > 0:
+                if wait > 0.001:
                     reschedule = True
                     delay = "%fs" % wait
                     break
@@ -643,6 +676,18 @@ class ResourceManager(Logger):
         self._failed_time = tnow()
         self._state = ResourceState.FAILED
 
+    def connect(self, guid):
+        """ Performs actions that need to be taken upon associating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
+    def disconnect(self, guid):
+        """ Performs actions that need to be taken upon disassociating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
     def valid_connection(self, guid):
         """Checks whether a connection with the other RM
         is valid.