Supporting many concurrent LinuxApplications on same LinuxNode
[nepi.git] / src / neco / execution / resource.py
index ad9a5c5..322e276 100644 (file)
@@ -1,26 +1,28 @@
-
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.execution.trace import TraceAttr
 
 import copy
 import functools
 import logging
 import weakref
-import time as TIME
 
 _reschedule_delay = "1s"
 
 class ResourceAction:
-    DEPLOYED = 0
+    DEPLOY = 0
     START = 1
     STOP = 2
 
 class ResourceState:
     NEW = 0
-    DEPLOYED = 1
-    STARTED = 2
-    STOPPED = 3
-    FAILED = 4
-    RELEASED = 5
+    DISCOVERED = 1
+    PROVISIONED = 2
+    READY = 3
+    STARTED = 4
+    STOPPED = 5
+    FINISHED = 6
+    FAILED = 7
+    RELEASED = 8
 
 def clsinit(cls):
     cls._clsinit()
@@ -32,7 +34,7 @@ class ResourceManager(object):
     _rtype = "Resource"
     _filters = None
     _attributes = None
-    _waiters = []
+    _traces = None
 
     @classmethod
     def _register_filter(cls, attr):
@@ -51,17 +53,34 @@ class ResourceManager(object):
         cls._attributes[attr.name] = attr
 
     @classmethod
-    def _register_filters(cls):
+    def _register_trace(cls, trace):
         """ Resource subclasses will invoke this method to add a 
-        filter attribute
+        resource trace
+
+        """
+        cls._traces[trace.name] = trace
+
+
+    @classmethod
+    def _register_filters(cls):
+        """ Resource subclasses will invoke this method to register 
+        resource filters
 
         """
         pass
 
     @classmethod
     def _register_attributes(cls):
-        """ Resource subclasses will invoke this method to add a 
-        resource attribute
+        """ Resource subclasses will invoke this method to register
+        resource attributes
+
+        """
+        pass
+
+    @classmethod
+    def _register_traces(cls):
+        """ Resource subclasses will invoke this method to register
+        resource traces
 
         """
         pass
@@ -82,14 +101,14 @@ class ResourceManager(object):
         cls._attributes = dict()
         cls._register_attributes()
 
+        # static template for resource traces
+        cls._traces = dict()
+        cls._register_traces()
+
     @classmethod
     def rtype(cls):
         return cls._rtype
 
-    @classmethod
-    def waiters(cls):
-        return cls._waiters
-
     @classmethod
     def get_filters(cls):
         """ Returns a copy of the filters
@@ -104,6 +123,13 @@ class ResourceManager(object):
         """
         return copy.deepcopy(cls._attributes.values())
 
+    @classmethod
+    def get_traces(cls):
+        """ Returns a copy of the traces
+
+        """
+        return copy.deepcopy(cls._traces.values())
+
     def __init__(self, ec, guid):
         self._guid = guid
         self._ec = weakref.ref(ec)
@@ -111,17 +137,48 @@ class ResourceManager(object):
         self._conditions = dict() 
 
         # the resource instance gets a copy of all attributes
-        # that can modify
         self._attrs = copy.deepcopy(self._attributes)
 
+        # the resource instance gets a copy of all traces
+        self._trcs = copy.deepcopy(self._traces)
+
         self._state = ResourceState.NEW
 
         self._start_time = None
         self._stop_time = None
+        self._discover_time = None
+        self._provision_time = None
+        self._ready_time = None
+        self._release_time = None
 
         # Logging
-        self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
-            self.guid)
+        self._logger = logging.getLogger("Resource")
+
+    def debug(self, msg, out = None, err = None):
+        self.log(msg, logging.DEBUG, out, err)
+
+    def error(self, msg, out = None, err = None):
+        self.log(msg, logging.ERROR, out, err)
+
+    def warn(self, msg, out = None, err = None):
+        self.log(msg, logging.WARNING, out, err)
+
+    def info(self, msg, out = None, err = None):
+        self.log(msg, logging.INFO, out, err)
+
+    def log(self, msg, level, out = None, err = None):
+        if out:
+            msg += " - OUT: %s " % out
+
+        if err:
+            msg += " - ERROR: %s " % err
+
+        msg = self.log_message(msg)
+
+        self.logger.log(level, msg)
+
+    def log_message(self, msg):
+        return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
     @property
     def logger(self):
@@ -145,37 +202,57 @@ class ResourceManager(object):
 
     @property
     def start_time(self):
-        """ timestamp with  """
+        """ Returns timestamp with the time the RM started """
         return self._start_time
 
     @property
     def stop_time(self):
+        """ Returns timestamp with the time the RM stopped """
         return self._stop_time
 
     @property
-    def deploy_time(self):
-        return self._deploy_time
+    def discover_time(self):
+        """ Returns timestamp with the time the RM passed to state discovered """
+        return self._discover_time
+
+    @property
+    def provision_time(self):
+        """ Returns timestamp with the time the RM passed to state provisioned """
+        return self._provision_time
+
+    @property
+    def ready_time(self):
+        """ Returns timestamp with the time the RM passed to state ready  """
+        return self._ready_time
+
+    @property
+    def release_time(self):
+        """ Returns timestamp with the time the RM was released """
+        return self._release_time
 
     @property
     def state(self):
         return self._state
 
     def connect(self, guid):
-        if (self._validate_connection(guid)):
+        if self.valid_connection(guid):
             self._connections.add(guid)
 
     def discover(self, filters = None):
-        pass
+        self._discover_time = strfnow()
+        self._state = ResourceState.DISCOVERED
 
     def provision(self, filters = None):
-        pass
+        self._provision_time = strfnow()
+        self._state = ResourceState.PROVISIONED
 
     def start(self):
         """ Start the Resource Manager
 
         """
-        if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
-            self.logger.error("Wrong state %s for start" % self.state)
+        if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+            self.error("Wrong state %s for start" % self.state)
+            return
 
         self._start_time = strfnow()
         self._state = ResourceState.STARTED
@@ -185,7 +262,8 @@ class ResourceManager(object):
 
         """
         if not self._state in [ResourceState.STARTED]:
-            self.logger.error("Wrong state %s for stop" % self.state)
+            self.error("Wrong state %s for stop" % self.state)
+            return
 
         self._stop_time = strfnow()
         self._state = ResourceState.STOPPED
@@ -197,7 +275,6 @@ class ResourceManager(object):
         :type name: str
         :param name: Value of the attribute
         :type name: str
-        :rtype:  Boolean
         """
         attr = self._attrs[name]
         attr.value = value
@@ -212,38 +289,83 @@ class ResourceManager(object):
         attr = self._attrs[name]
         return attr.value
 
+    def register_trace(self, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        trace = self._trcs[name]
+        trace.enabled = True
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        pass
+
     def register_condition(self, action, group, state, 
             time = None):
-        """ Do the 'action' after 'time' on the current RM when 'group' 
-         reach the state 'state'
+        """ Registers a condition on the resource manager to allow execution 
+        of 'action' only after 'time' has elapsed from the moment all resources 
+        in 'group' reached state 'state'
 
-        :param action: Action to do. Either 'START' or 'STOP'
+        :param action: Action to restrict to condition (either 'START' or 'STOP')
         :type action: str
-        :param group: group of RM
-        :type group: str
-        :param state: RM that are part of the condition
-        :type state: list
-        :param time: Time to wait after the state is reached (ex : '2s' )
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+        :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+        :type state: str
+        :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
 
         """
-        if action not in self.conditions:
-            self._conditions[action] = set()
+        conditions = self.conditions.get(action)
+        if not conditions:
+            conditions = list()
+            self._conditions[action] = conditions
 
-        # We need to use only sequence inside a set and not a list. 
-        # As group is a list, we need to change it.
-        #print (tuple(group), state, time)
-        self.conditions.get(action).add((tuple(group), state, time))
+        # For each condition to register a tuple of (group, state, time) is 
+        # added to the 'action' list
+        if not isinstance(group, list):
+            group = [group]
+
+        conditions.append((group, state, time))
+
+    def get_connected(self, rtype):
+        connected = []
+        for guid in self.connections:
+            rm = self.ec.get_resource(guid)
+            if rm.rtype() == rtype:
+                connected.append(rm)
+        return connected
 
     def _needs_reschedule(self, group, state, time):
         """ Internal method that verify if 'time' has elapsed since 
         all elements in 'group' have reached state 'state'.
 
-        :param group: RM that are part of the condition
-        :type group: list
-        :param state: State that group need to reach for the condtion
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+        :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
         :type state: str
-        :param time: time to wait after the state
+        :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
 
         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
@@ -257,15 +379,21 @@ class ResourceManager(object):
         # check state and time elapsed on all RMs
         for guid in group:
             rm = self.ec.get_resource(guid)
-            # If the RMs is lower than the requested state we must
-            # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
+            # If the RM state is lower than the requested state we must
+            # reschedule (e.g. if RM is READY but we required STARTED)
             if rm.state < state:
                 reschedule = True
                 break
 
+            # If there is a time restriction, we must verify the
+            # restriction is satisfied 
             if time:
-                if state == ResourceState.DEPLOYED:
-                    t = rm.deploy_time
+                if state == ResourceState.DISCOVERED:
+                    t = rm.discover_time
+                if state == ResourceState.PROVISIONED:
+                    t = rm.provision_time
+                elif state == ResourceState.READY:
+                    t = rm.ready_time
                 elif state == ResourceState.STARTED:
                     t = rm.start_time
                 elif state == ResourceState.STOPPED:
@@ -275,7 +403,6 @@ class ResourceManager(object):
                     break
 
                 d = strfdiff(strfnow(), t)
-                #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
                 if wait > 0.001:
                     reschedule = True
@@ -286,17 +413,17 @@ class ResourceManager(object):
     def set_with_conditions(self, name, value, group, state, time):
         """ Set value 'value' on attribute with name 'name' when 'time' 
             has elapsed since all elements in 'group' have reached state
-           'state'.
+           'state'
 
-        :param name: Name of the attribute
+        :param name: Name of the attribute to set
         :type name: str
-        :param name: Value of the attribute
+        :param name: Value of the attribute to set
         :type name: str
-        :param group: RM that are part of the condition
-        :type group: list
-        :param state: State that group need to reach before set
+        :param group: Group of RMs to wait for (list of guids)
+        :type group: int or list of int
+        :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
         :type state: str
-        :param time: Time to wait after the state is reached (ex : '2s' )
+        :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
         :type time: str
 
         """
@@ -320,7 +447,8 @@ class ResourceManager(object):
             self.set(name, value)
 
     def start_with_conditions(self):
-        """ Starts when all the conditions are reached
+        """ Starts RM when all the conditions in self.conditions for
+        action 'START' are satisfied.
 
         """
         reschedule = False
@@ -328,30 +456,30 @@ class ResourceManager(object):
 
         ## evaluate if set conditions are met
 
-        # only can start when RM is either STOPPED or DEPLOYED
-        if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
+        # only can start when RM is either STOPPED or READY
+        if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
+            self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  start condition : " + str(self.conditions.items())
-            # Need to separate because it could have more that tuple of condition 
-            # for the same action.
-            conditions_start = self.conditions.get(ResourceAction.START, [])
-            for (group, state, time) in conditions_start:
+            self.debug("---- START CONDITIONS ---- %s" % 
+                    self.conditions.get(ResourceAction.START))
+            
+            # Verify all start conditions are met
+            start_conditions = self.conditions.get(ResourceAction.START, [])
+            for (group, state, time) in start_conditions:
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
                     break
 
         if reschedule:
-            callback = functools.partial(self.start_with_conditions)
-            self.ec.schedule(delay, callback)
+            self.ec.schedule(delay, self.start_with_conditions)
         else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
-----------------------------------------------------------------  STARTING -- "
+            self.debug("----- STARTING ---- ")
             self.start()
 
     def stop_with_conditions(self):
-        """ Stop when all the conditions are reached
+        """ Stops RM when all the conditions in self.conditions for
+        action 'STOP' are satisfied.
 
         """
         reschedule = False
@@ -363,84 +491,42 @@ class ResourceManager(object):
         if self.state != ResourceState.STARTED:
             reschedule = True
         else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
- (Guid : "+ str(self.guid) +")  ----  stop condition : " + str(self.conditions.items())
-            conditions_stop = self.conditions.get(ResourceAction.STOP, []) 
-            for (group, state, time) in conditions_stop:
+            self.debug(" ---- STOP CONDITIONS ---- %s" % 
+                    self.conditions.get(ResourceAction.STOP))
+
+            stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
+            for (group, state, time) in stop_conditions:
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
                     break
+
         if reschedule:
             callback = functools.partial(self.stop_with_conditions)
             self.ec.schedule(delay, callback)
         else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
-----------------------------------------------------------------  STOPPING -- "
+            self.logger.debug(" ----- STOPPING ---- ") 
             self.stop()
 
     def deploy(self):
-        """Execute all the differents steps required to reach the state DEPLOYED
+        """ Execute all steps required for the RM to reach the state READY
 
         """
-        self.deploy_restriction()
-        self.discover()
-        self.provision()
-        self.deploy_with_conditions()
-
-    def deploy_restriction(self):
-        dep = set()
-        for guid in self.connections:
-            if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
-                dep.add(guid)
-        self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
-
-
-    def deploy_with_conditions(self):
-        """ Starts when all the conditions are reached
-
-        """
-        reschedule = False
-        delay = _reschedule_delay 
-
-        ## evaluate if set conditions are met
-
-        # only can deploy when RM is NEW
-        if not self._state in [ResourceState.NEW]:
-            self.logger.error("Wrong state %s for stop" % self.state)
+        if self._state > ResourceState.READY:
+            self.error("Wrong state %s for deploy" % self.state)
             return
-        else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  deploy condition : " + str(self.conditions.items())
-            # Need to separate because it could have more that tuple of condition 
-            # for the same action.
-            conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
-            for (group, state, time) in conditions_deployed:
-                reschedule, delay = self._needs_reschedule(group, state, time)
-                if reschedule:
-                    break
-
-        if reschedule:
-            callback = functools.partial(self.deploy_with_conditions)
-            self.ec.schedule(delay, callback)
-        else:
-            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
-------------------------------------------------------------------------------\
-----------------------------------------------------------------  DEPLOY -- "
-            self.deploy_action()
-
-    def deploy_action(self):
-
-        self._deploy_time = strfnow()
-        self._state = ResourceState.DEPLOYED
 
+        self.debug("----- DEPLOYING ---- ")
+        self._ready_time = strfnow()
+        self._state = ResourceState.READY
 
     def release(self):
         """Clean the resource at the end of the Experiment and change the status
 
         """
+        self._release_time = strfnow()
         self._state = ResourceState.RELEASED
 
-    def _validate_connection(self, guid):
+    def valid_connection(self, guid):
         """Check if the connection is available.
 
         :param guid: Guid of the current Resource Manager