Adding trace Collector RM
[nepi.git] / src / nepi / execution / resource.py
index 423ca92..3882523 100644 (file)
@@ -17,7 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
 from nepi.util.logger import Logger
 from nepi.execution.trace import TraceAttr
 
@@ -28,7 +28,7 @@ import os
 import pkgutil
 import weakref
 
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
 
 class ResourceAction:
     """ Action that a user can order to a Resource Manager
@@ -208,10 +208,12 @@ class ResourceManager(Logger):
         self._provision_time = None
         self._ready_time = None
         self._release_time = None
+        self._finish_time = None
+        self._failed_time = None
 
     @property
     def guid(self):
-        """ Returns the guid of the current RM """
+        """ Returns the global unique identifier of the RM """
         return self._guid
 
     @property
@@ -221,106 +223,136 @@ class ResourceManager(Logger):
 
     @property
     def connections(self):
-        """ Returns the set of connection for this RM"""
+        """ Returns the set of guids of connected RMs"""
         return self._connections
 
     @property
     def conditions(self):
-        """ Returns the list of conditions for this RM
-        The list is a dictionary with for each action, a list of tuple 
-        describing the conditions. """
+        """ Returns the conditions to which the RM is subjected to.
+        
+        The object returned by this method is a dictionary indexed by
+        ResourceAction."""
         return self._conditions
 
     @property
     def start_time(self):
-        """ Returns timestamp with the time the RM started """
+        """ Returns the start time of the RM as a timestamp"""
         return self._start_time
 
     @property
     def stop_time(self):
-        """ Returns timestamp with the time the RM stopped """
+        """ Returns the stop time of the RM as a timestamp"""
         return self._stop_time
 
     @property
     def discover_time(self):
-        """ Returns timestamp with the time the RM passed to state discovered """
+        """ Returns the time discovering was finished for the RM as a timestamp"""
         return self._discover_time
 
     @property
     def provision_time(self):
-        """ Returns timestamp with the time the RM passed to state provisioned """
+        """ Returns the time provisioning was finished for the RM as a timestamp"""
         return self._provision_time
 
     @property
     def ready_time(self):
-        """ Returns timestamp with the time the RM passed to state ready  """
+        """ Returns the time deployment was finished for the RM as a timestamp"""
         return self._ready_time
 
     @property
     def release_time(self):
-        """ Returns timestamp with the time the RM was released """
+        """ Returns the release time of the RM as a timestamp"""
         return self._release_time
 
+    @property
+    def finish_time(self):
+        """ Returns the finalization time of the RM as a timestamp"""
+        return self._finish_time
+
+    @property
+    def failed_time(self):
+        """ Returns the time failure occured for the RM as a timestamp"""
+        return self._failed_time
+
     @property
     def state(self):
         """ Get the state of the current RM """
         return self._state
 
     def log_message(self, msg):
-        """ Improve debugging message by adding more information 
-            as the guid and the type of the RM
+        """ Returns the log message formatted with added information.
 
-        :param msg: Message to log
+        :param msg: text message
         :type msg: str
         :rtype: str
         """
         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
-    def connect(self, guid):
-        """ Connect the current RM with the RM 'guid'
+    def register_connection(self, guid):
+        """ Registers a connection to the RM identified by guid
 
-        :param guid: Guid of the RM the current RM will be connected
+        :param guid: Global unique identified of the RM to connect to
         :type guid: int
         """
         if self.valid_connection(guid):
+            self.connect(guid)
             self._connections.add(guid)
 
+    def unregister_connection(self, guid):
+        """ Removes a registered connection to the RM identified by guid
+
+        :param guid: Global unique identified of the RM to connect to
+        :type guid: int
+        """
+        if guid in self._connections:
+            self.disconnect(guid)
+            self._connections.remove(guid)
+
     def discover(self):
-        """ Discover the Resource. As it is specific for each RM, 
-        this method take the time when the RM become DISCOVERED and
-        change the status """
-        self._discover_time = strfnow()
+        """ Performs resource discovery.
+
+        This  method is resposible for selecting an individual resource
+        matching user requirements.
+        This method should be redefined when necessary in child classes.
+        """ 
+        self._discover_time = tnow()
         self._state = ResourceState.DISCOVERED
 
     def provision(self):
-        """ Provision the Resource. As it is specific for each RM, 
-        this method take the time when the RM become PROVISIONNED and
-        change the status """
-        self._provision_time = strfnow()
+        """ Performs resource provisioning.
+
+        This  method is resposible for provisioning one resource.
+        After this method has been successfully invoked, the resource
+        should be acccesible/controllable by the RM.
+        This method should be redefined when necessary in child classes.
+        """ 
+        self._provision_time = tnow()
         self._state = ResourceState.PROVISIONED
 
     def start(self):
-        """ Start the Resource Manager. As it is specific to each RM, this methods
-        just change, after some verifications, the status to STARTED and save the time.
-
+        """ Starts the resource.
+        
+        There is no generic start behavior for all resources.
+        This method should be redefined when necessary in child classes.
         """
         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
             return
 
-        self._start_time = strfnow()
+        self._start_time = tnow()
         self._state = ResourceState.STARTED
 
     def stop(self):
-        """ Stop the Resource Manager. As it is specific to each RM, this methods
-        just change, after some verifications, the status to STOPPED and save the time.
-
+        """ Stops the resource.
+        
+        There is no generic stop behavior for all resources.
+        This method should be redefined when necessary in child classes.
         """
         if not self._state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
             return
 
-        self._stop_time = strfnow()
+        self._stop_time = tnow()
         self._state = ResourceState.STOPPED
 
     def set(self, name, value):
@@ -335,7 +367,7 @@ class ResourceManager(Logger):
         attr.value = value
 
     def get(self, name):
-        """ Start the Resource Manager
+        """ Returns the value of the attribute
 
         :param name: Name of the attribute
         :type name: str
@@ -345,7 +377,7 @@ class ResourceManager(Logger):
         return attr.value
 
     def register_trace(self, name):
-        """ Enable trace
+        """ Explicitly enable trace generation
 
         :param name: Name of the trace
         :type name: str
@@ -376,8 +408,7 @@ class ResourceManager(Logger):
         """
         pass
 
-    def register_condition(self, action, group, state, 
-            time = None):
+    def register_condition(self, action, group, state, time = None):
         """ Registers a condition on the resource manager to allow execution 
         of 'action' only after 'time' has elapsed from the moment all resources 
         in 'group' reached state 'state'
@@ -392,10 +423,11 @@ class ResourceManager(Logger):
         :type time: str
 
         """
+
+        if not action in self.conditions:
+            self._conditions[action] = list()
+        
         conditions = self.conditions.get(action)
-        if not conditions:
-            conditions = list()
-            self._conditions[action] = conditions
 
         # For each condition to register a tuple of (group, state, time) is 
         # added to the 'action' list
@@ -404,17 +436,48 @@ class ResourceManager(Logger):
 
         conditions.append((group, state, time))
 
-    def get_connected(self, rtype):
-        """ Return the list of RM with the type 'rtype' 
+    def unregister_condition(self, group, action = None):
+        """ Removed conditions for a certain group of guids
+
+        :param action: Action to restrict to condition (either 'START' or 'STOP')
+        :type action: str
+
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+
+        """
+        # For each condition a tuple of (group, state, time) is 
+        # added to the 'action' list
+        if not isinstance(group, list):
+            group = [group]
+
+        for act, conditions in self.conditions.iteritems():
+            if action and act != action:
+                continue
+
+            for condition in list(conditions):
+                (grp, state, time) = condition
+
+                # If there is an intersection between grp and group,
+                # then remove intersected elements
+                intsec = set(group).intersection(set(grp))
+                if intsec:
+                    idx = conditions.index(condition)
+                    newgrp = set(grp)
+                    newgrp.difference_update(intsec)
+                    conditions[idx] = (newgrp, state, time)
+                 
+    def get_connected(self, rtype = None):
+        """ Returns the list of RM with the type 'rtype'
 
         :param rtype: Type of the RM we look for
         :type rtype: str
-        :return : list of guid
+        :return: list of guid
         """
         connected = []
         for guid in self.connections:
             rm = self.ec.get_resource(guid)
-            if rm.rtype() == rtype:
+            if not rtype or rm.rtype() == rtype:
                 connected.append(rm)
         return connected
 
@@ -441,7 +504,7 @@ class ResourceManager(Logger):
         for guid in group:
             rm = self.ec.get_resource(guid)
             # If the RM state is lower than the requested state we must
-            # reschedule (e.g. if RM is READY but we required STARTED)
+            # reschedule (e.g. if RM is READY but we required STARTED).
             if rm.state < state:
                 reschedule = True
                 break
@@ -463,18 +526,23 @@ class ResourceManager(Logger):
                     # Only keep time information for START and STOP
                     break
 
-                d = strfdiff(strfnow(), t)
-                wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
+                # time already elapsed since RM changed state
+                waited = "%fs" % tdiffsec(tnow(), t)
+
+                # time still to wait
+                wait = tdiffsec(stabsformat(time), stabsformat(waited))
+
                 if wait > 0.001:
                     reschedule = True
                     delay = "%fs" % wait
                     break
+
         return reschedule, delay
 
     def set_with_conditions(self, name, value, group, state, time):
         """ Set value 'value' on attribute with name 'name' when 'time' 
-            has elapsed since all elements in 'group' have reached state
-           'state'
+        has elapsed since all elements in 'group' have reached state
+        'state'
 
         :param name: Name of the attribute to set
         :type name: str
@@ -486,7 +554,6 @@ class ResourceManager(Logger):
         :type state: str
         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
-
         """
 
         reschedule = False
@@ -585,19 +652,46 @@ class ResourceManager(Logger):
             return
 
         self.debug("----- READY ---- ")
-        self._ready_time = strfnow()
+        self._ready_time = tnow()
         self._state = ResourceState.READY
 
     def release(self):
-        """Clean the resource at the end of the Experiment and change the status
+        """Release any resources used by this RM
 
         """
-        self._release_time = strfnow()
+        self._release_time = tnow()
         self._state = ResourceState.RELEASED
 
+    def finish(self):
+        """ Mark ResourceManager as FINISHED
+
+        """
+        self._finish_time = tnow()
+        self._state = ResourceState.FINISHED
+
+    def fail(self):
+        """ Mark ResourceManager as FAILED
+
+        """
+        self._failed_time = tnow()
+        self._state = ResourceState.FAILED
+
+    def connect(self, guid):
+        """ Performs actions that need to be taken upon associating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
+    def disconnect(self, guid):
+        """ Performs actions that need to be taken upon disassociating RMs.
+        This method should be redefined when necessary in child classes.
+        """
+        pass
+
     def valid_connection(self, guid):
-        """Check if the connection is available. This method need to be 
-        redefined by each new Resource Manager.
+        """Checks whether a connection with the other RM
+        is valid.
+        This method need to be redefined by each new Resource Manager.
 
         :param guid: Guid of the current Resource Manager
         :type guid: int
@@ -627,17 +721,16 @@ class ResourceFactory(object):
         return rclass(ec, guid)
 
 def populate_factory():
-        """Register all the possible RM that exists in the current version of Nepi.
-
-        """
+    """Register all the possible RM that exists in the current version of Nepi.
+    """
     for rclass in find_types():
         ResourceFactory.register_type(rclass)
 
 def find_types():
-        """Look into the different folders to find all the 
-        availables Resources Managers
+    """Look into the different folders to find all the 
+    availables Resources Managers
 
-        """
+    """
     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
     search_path = set(search_path.split(" "))