Supporting many concurrent LinuxApplications on same LinuxNode
[nepi.git] / src / neco / execution / resource.py
index f7010b0..322e276 100644 (file)
@@ -1,4 +1,5 @@
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.execution.trace import TraceAttr
 
 import copy
 import functools
@@ -19,8 +20,9 @@ class ResourceState:
     READY = 3
     STARTED = 4
     STOPPED = 5
-    FAILED = 6
-    RELEASED = 7
+    FINISHED = 6
+    FAILED = 7
+    RELEASED = 8
 
 def clsinit(cls):
     cls._clsinit()
@@ -32,6 +34,7 @@ class ResourceManager(object):
     _rtype = "Resource"
     _filters = None
     _attributes = None
+    _traces = None
 
     @classmethod
     def _register_filter(cls, attr):
@@ -50,17 +53,34 @@ class ResourceManager(object):
         cls._attributes[attr.name] = attr
 
     @classmethod
-    def _register_filters(cls):
+    def _register_trace(cls, trace):
         """ Resource subclasses will invoke this method to add a 
-        filter attribute
+        resource trace
+
+        """
+        cls._traces[trace.name] = trace
+
+
+    @classmethod
+    def _register_filters(cls):
+        """ Resource subclasses will invoke this method to register 
+        resource filters
 
         """
         pass
 
     @classmethod
     def _register_attributes(cls):
-        """ Resource subclasses will invoke this method to add a 
-        resource attribute
+        """ Resource subclasses will invoke this method to register
+        resource attributes
+
+        """
+        pass
+
+    @classmethod
+    def _register_traces(cls):
+        """ Resource subclasses will invoke this method to register
+        resource traces
 
         """
         pass
@@ -81,6 +101,10 @@ class ResourceManager(object):
         cls._attributes = dict()
         cls._register_attributes()
 
+        # static template for resource traces
+        cls._traces = dict()
+        cls._register_traces()
+
     @classmethod
     def rtype(cls):
         return cls._rtype
@@ -99,6 +123,13 @@ class ResourceManager(object):
         """
         return copy.deepcopy(cls._attributes.values())
 
+    @classmethod
+    def get_traces(cls):
+        """ Returns a copy of the traces
+
+        """
+        return copy.deepcopy(cls._traces.values())
+
     def __init__(self, ec, guid):
         self._guid = guid
         self._ec = weakref.ref(ec)
@@ -106,9 +137,11 @@ class ResourceManager(object):
         self._conditions = dict() 
 
         # the resource instance gets a copy of all attributes
-        # that can modify
         self._attrs = copy.deepcopy(self._attributes)
 
+        # the resource instance gets a copy of all traces
+        self._trcs = copy.deepcopy(self._traces)
+
         self._state = ResourceState.NEW
 
         self._start_time = None
@@ -119,7 +152,33 @@ class ResourceManager(object):
         self._release_time = None
 
         # Logging
-        self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " %  (self._rtype, self.guid))
+        self._logger = logging.getLogger("Resource")
+
+    def debug(self, msg, out = None, err = None):
+        self.log(msg, logging.DEBUG, out, err)
+
+    def error(self, msg, out = None, err = None):
+        self.log(msg, logging.ERROR, out, err)
+
+    def warn(self, msg, out = None, err = None):
+        self.log(msg, logging.WARNING, out, err)
+
+    def info(self, msg, out = None, err = None):
+        self.log(msg, logging.INFO, out, err)
+
+    def log(self, msg, level, out = None, err = None):
+        if out:
+            msg += " - OUT: %s " % out
+
+        if err:
+            msg += " - ERROR: %s " % err
+
+        msg = self.log_message(msg)
+
+        self.logger.log(level, msg)
+
+    def log_message(self, msg):
+        return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
 
     @property
     def logger(self):
@@ -192,7 +251,7 @@ class ResourceManager(object):
 
         """
         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
-            self.logger.error("Wrong state %s for start" % self.state)
+            self.error("Wrong state %s for start" % self.state)
             return
 
         self._start_time = strfnow()
@@ -203,7 +262,7 @@ class ResourceManager(object):
 
         """
         if not self._state in [ResourceState.STARTED]:
-            self.logger.error("Wrong state %s for stop" % self.state)
+            self.error("Wrong state %s for stop" % self.state)
             return
 
         self._stop_time = strfnow()
@@ -216,7 +275,6 @@ class ResourceManager(object):
         :type name: str
         :param name: Value of the attribute
         :type name: str
-        :rtype:  Boolean
         """
         attr = self._attrs[name]
         attr.value = value
@@ -231,6 +289,38 @@ class ResourceManager(object):
         attr = self._attrs[name]
         return attr.value
 
+    def register_trace(self, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        trace = self._trcs[name]
+        trace.enabled = True
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        pass
+
     def register_condition(self, action, group, state, 
             time = None):
         """ Registers a condition on the resource manager to allow execution 
@@ -259,6 +349,14 @@ class ResourceManager(object):
 
         conditions.append((group, state, time))
 
+    def get_connected(self, rtype):
+        connected = []
+        for guid in self.connections:
+            rm = self.ec.get_resource(guid)
+            if rm.rtype() == rtype:
+                connected.append(rm)
+        return connected
+
     def _needs_reschedule(self, group, state, time):
         """ Internal method that verify if 'time' has elapsed since 
         all elements in 'group' have reached state 'state'.
@@ -361,8 +459,9 @@ class ResourceManager(object):
         # only can start when RM is either STOPPED or READY
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
+            self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
-            self.logger.debug("---- START CONDITIONS ---- %s" % 
+            self.debug("---- START CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.START))
             
             # Verify all start conditions are met
@@ -375,7 +474,7 @@ class ResourceManager(object):
         if reschedule:
             self.ec.schedule(delay, self.start_with_conditions)
         else:
-            self.logger.debug("----- STARTING ---- ")
+            self.debug("----- STARTING ---- ")
             self.start()
 
     def stop_with_conditions(self):
@@ -392,7 +491,7 @@ class ResourceManager(object):
         if self.state != ResourceState.STARTED:
             reschedule = True
         else:
-            self.logger.debug(" ---- STOP CONDITIONS ---- %s" % 
+            self.debug(" ---- STOP CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.STOP))
 
             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
@@ -413,9 +512,10 @@ class ResourceManager(object):
 
         """
         if self._state > ResourceState.READY:
-            self.logger.error("Wrong state %s for deploy" % self.state)
+            self.error("Wrong state %s for deploy" % self.state)
             return
 
+        self.debug("----- DEPLOYING ---- ")
         self._ready_time = strfnow()
         self._state = ResourceState.READY