Adding data progressing functions for CCN
[nepi.git] / src / nepi / execution / resource.py
index ed16e08..0f75e9a 100644 (file)
@@ -31,7 +31,7 @@ import sys
 import threading
 import weakref
 
-reschedule_delay = "1s"
+reschedule_delay = "0.5s"
 
 class ResourceAction:
     """ Action that a user can order to a Resource Manager
@@ -51,9 +51,8 @@ class ResourceState:
     READY = 3
     STARTED = 4
     STOPPED = 5
-    FINISHED = 6
-    FAILED = 7
-    RELEASED = 8
+    FAILED = 6
+    RELEASED = 7
 
 ResourceState2str = dict({
     ResourceState.NEW : "NEW",
@@ -62,7 +61,6 @@ ResourceState2str = dict({
     ResourceState.READY : "READY",
     ResourceState.STARTED : "STARTED",
     ResourceState.STOPPED : "STOPPED",
-    ResourceState.FINISHED : "FINISHED",
     ResourceState.FAILED : "FAILED",
     ResourceState.RELEASED : "RELEASED",
     })
@@ -108,18 +106,20 @@ def clsinit_copy(cls):
 def failtrap(func):
     """ Decorator function for instance methods that should set the 
     RM state to FAILED when an error is raised. The methods that must be
-    decorated are: discover, provision, deploy, start, stop and finish.
+    decorated are: discover, provision, deploy, start, stop.
 
     """
     def wrapped(self, *args, **kwargs):
         try:
             return func(self, *args, **kwargs)
         except:
+            self.fail()
+            
             import traceback
             err = traceback.format_exc()
-            self.error(err)
-            self.debug("SETTING guid %d to state FAILED" % self.guid)
-            self.fail()
+            logger = Logger(self._rtype)
+            logger.error(err)
+            logger.error("SETTING guid %d to state FAILED" % self.guid)
             raise
     
     return wrapped
@@ -188,16 +188,24 @@ class ResourceManager(Logger):
         attributes.
 
         """
-        
         critical = Attribute("critical", 
                 "Defines whether the resource is critical. "
                 "A failure on a critical resource will interrupt "
                 "the experiment. ",
                 type = Types.Bool,
                 default = True,
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
+        hard_release = Attribute("hardRelease", 
+                "Forces removal of all result files and directories associated "
+                "to the RM upon resource release. After release the RM will "
+                "be removed from the EC and the results will not longer be "
+                "accessible",
+                type = Types.Bool,
+                default = False,
+                flags = Flags.Design)
 
         cls._register_attribute(critical)
+        cls._register_attribute(hard_release)
         
     @classmethod
     def _register_traces(cls):
@@ -248,7 +256,7 @@ class ResourceManager(Logger):
         cls._register_traces()
 
     @classmethod
-    def rtype(cls):
+    def get_rtype(cls):
         """ Returns the type of the Resource Manager
 
         """
@@ -261,6 +269,13 @@ class ResourceManager(Logger):
         """
         return copy.deepcopy(cls._attributes.values())
 
+    @classmethod
+    def get_attribute(cls, name):
+        """ Returns a copy of the attribute with name 'name'
+
+        """
+        return copy.deepcopy(cls._attributes[name])
+
     @classmethod
     def get_traces(cls):
         """ Returns a copy of the traces
@@ -283,8 +298,34 @@ class ResourceManager(Logger):
         """
         return cls._backend
 
+    @classmethod
+    def get_global(cls, name):
+        """ Returns the value of a global attribute
+            Global attribute meaning an attribute for 
+            all the resources from a rtype
+
+        :param name: Name of the attribute
+        :type name: str
+        :rtype: str
+        """
+        global_attr = cls._attributes[name]
+        return global_attr.value
+
+    @classmethod
+    def set_global(cls, name, value):
+        """ Set value for a global attribute
+
+        :param name: Name of the attribute
+        :type name: str
+        :param name: Value of the attribute
+        :type name: str
+        """
+        global_attr = cls._attributes[name]
+        global_attr.value = value
+        return value
+
     def __init__(self, ec, guid):
-        super(ResourceManager, self).__init__(self.rtype())
+        super(ResourceManager, self).__init__(self.get_rtype())
         
         self._guid = guid
         self._ec = weakref.ref(ec)
@@ -307,14 +348,13 @@ class ResourceManager(Logger):
         self._provision_time = None
         self._ready_time = None
         self._release_time = None
-        self._finish_time = None
         self._failed_time = None
 
         self._state = ResourceState.NEW
 
         # instance lock to synchronize exclusive state change methods (such
         # as deploy and release methods), in order to prevent them from being 
-        # executed at the same time
+        # executed at the same time and corrupt internal resource state
         self._release_lock = threading.Lock()
 
     @property
@@ -372,11 +412,6 @@ class ResourceManager(Logger):
         """ Returns the release time of the RM as a timestamp """
         return self._release_time
 
-    @property
-    def finish_time(self):
-        """ Returns the finalization time of the RM as a timestamp """
-        return self._finish_time
-
     @property
     def failed_time(self):
         """ Returns the time failure occured for the RM as a timestamp """
@@ -395,7 +430,7 @@ class ResourceManager(Logger):
         :rtype: str
 
         """
-        return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
+        return " %s guid %d - %s " % (self._rtype, self.guid, msg)
 
     def register_connection(self, guid):
         """ Registers a connection to the RM identified by guid
@@ -467,6 +502,7 @@ class ResourceManager(Logger):
         should be added in the do_start method.
 
         """
+
         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
             return
@@ -511,7 +547,6 @@ class ResourceManager(Logger):
         with self._release_lock:
             if self._state != ResourceState.RELEASED:
                 self.do_deploy()
-                self.debug("----- READY ---- ")
 
     def release(self):
         """ Perform actions to free resources used by the RM.
@@ -527,29 +562,14 @@ class ResourceManager(Logger):
             try:
                 self.do_release()
             except:
+                self.set_released()
+
                 import traceback
                 err = traceback.format_exc()
-                self.error(err)
-
-            self.set_released()
-            self.debug("----- RELEASED ---- ")
-
-    @failtrap
-    def finish(self):
-        """ Sets the RM to state FINISHED. 
-     
-        The FINISHED state is different from STOPPED state in that it 
-        should not be directly invoked by the user.
-        STOPPED indicates that the user interrupted the RM, FINISHED means
-        that the RM concluded normally the actions it was supposed to perform.
-    
-        This method should not be overriden directly. Specific functionality
-        should be added in the do_finish method.
-        
-        """
-        with self._release_lock:
-            if self._state != ResourceState.RELEASED:
-                self.do_finish()
+                msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
+                        self._rtype, self.guid, err)
+                logger = Logger(self._rtype)
+                logger.debug(msg)
 
     def fail(self):
         """ Sets the RM to state FAILED.
@@ -572,6 +592,7 @@ class ResourceManager(Logger):
         """
         attr = self._attrs[name]
         attr.value = value
+        return value
 
     def get(self, name):
         """ Returns the value of the attribute
@@ -581,8 +602,43 @@ class ResourceManager(Logger):
         :rtype: str
         """
         attr = self._attrs[name]
+
+        """
+        A.Q. Commenting due to performance impact
+        if attr.has_flag(Flags.Global):
+            self.warning( "Attribute %s is global. Use get_global instead." % name)
+        """
+            
         return attr.value
 
+    def has_changed(self, name):
+        """ Returns the True is the value of the attribute
+            has been modified by the user.
+
+        :param name: Name of the attribute
+        :type name: str
+        :rtype: str
+        """
+        attr = self._attrs[name]
+        return attr.has_changed
+
+    def has_flag(self, name, flag):
+        """ Returns true if the attribute has the flag 'flag'
+
+        :param flag: Flag to be checked
+        :type flag: Flags
+        """
+        attr = self._attrs[name]
+        return attr.has_flag(flag)
+
+    def has_attribute(self, name):
+        """ Returns true if the RM has an attribute with name
+
+        :param name: name of the attribute
+        :type name: string
+        """
+        return name in self._attrs
+
     def enable_trace(self, name):
         """ Explicitly enable trace generation
 
@@ -698,6 +754,19 @@ class ResourceManager(Logger):
                 connected.append(rm)
         return connected
 
+    def is_rm_instance(self, rtype):
+        """ Returns True if the RM is instance of 'rtype'
+
+        :param rtype: Type of the RM we look for
+        :type rtype: str
+        :return: True|False
+        """
+        rclass = ResourceFactory.get_resource_type(rtype)
+        if isinstance(self, rclass):
+            return True
+        return False
+
+    @failtrap
     def _needs_reschedule(self, group, state, time):
         """ Internal method that verify if 'time' has elapsed since 
         all elements in 'group' have reached state 'state'.
@@ -720,6 +789,15 @@ class ResourceManager(Logger):
         # check state and time elapsed on all RMs
         for guid in group:
             rm = self.ec.get_resource(guid)
+            
+            # If one of the RMs this resource needs to wait for has FAILED
+            # and is critical we raise an exception
+            if rm.state == ResourceState.FAILED:
+                if not rm.get('critical'):
+                    continue
+                msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
+                raise RuntimeError, msg
+
             # If the RM state is lower than the requested state we must
             # reschedule (e.g. if RM is READY but we required STARTED).
             if rm.state < state:
@@ -739,6 +817,8 @@ class ResourceManager(Logger):
                     t = rm.start_time
                 elif state == ResourceState.STOPPED:
                     t = rm.stop_time
+                elif state == ResourceState.RELEASED:
+                    t = rm.release_time
                 else:
                     break
 
@@ -795,9 +875,12 @@ class ResourceManager(Logger):
         action 'START' are satisfied.
 
         """
+        #import pdb;pdb.set_trace()
+
         reschedule = False
         delay = reschedule_delay 
 
+
         ## evaluate if conditions to start are met
         if self.ec.abort:
             return 
@@ -879,6 +962,7 @@ class ResourceManager(Logger):
         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
                 ResourceState.PROVISIONED]:
+            #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
             reschedule = True
             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
         else:
@@ -893,7 +977,7 @@ class ResourceManager(Logger):
                 #for guid in group:
                 #    rm = self.ec.get_resource(guid)
                 #    unmet.append((guid, rm._state))
-                #
+                
                 #self.debug("---- WAITED STATES ---- %s" % unmet )
 
                 reschedule, delay = self._needs_reschedule(group, state, time)
@@ -903,7 +987,7 @@ class ResourceManager(Logger):
         if reschedule:
             self.ec.schedule(delay, self.deploy_with_conditions)
         else:
-            self.debug("----- STARTING ---- ")
+            self.debug("----- DEPLOYING ---- ")
             self.deploy()
 
     def do_connect(self, guid):
@@ -947,52 +1031,66 @@ class ResourceManager(Logger):
         self.set_ready()
 
     def do_release(self):
-        pass
-
-    def do_finish(self):
-        self.set_finished()
+        self.set_released()
 
     def do_fail(self):
         self.set_failed()
+        self.ec.inform_failure(self.guid)
 
-    def set_started(self):
+    def set_started(self, time = None):
         """ Mark ResourceManager as STARTED """
-        self.set_state(ResourceState.STARTED, "_start_time")
-        
-    def set_stopped(self):
+        self.set_state(ResourceState.STARTED, "_start_time", time)
+        self.debug("----- STARTED ---- ")
+
+    def set_stopped(self, time = None):
         """ Mark ResourceManager as STOPPED """
-        self.set_state(ResourceState.STOPPED, "_stop_time")
+        self.set_state(ResourceState.STOPPED, "_stop_time", time)
+        self.debug("----- STOPPED ---- ")
 
-    def set_ready(self):
+    def set_ready(self, time = None):
         """ Mark ResourceManager as READY """
-        self.set_state(ResourceState.READY, "_ready_time")
+        self.set_state(ResourceState.READY, "_ready_time", time)
+        self.debug("----- READY ---- ")
 
-    def set_released(self):
+    def set_released(self, time = None):
         """ Mark ResourceManager as REALEASED """
-        self.set_state(ResourceState.RELEASED, "_release_time")
+        self.set_state(ResourceState.RELEASED, "_release_time", time)
 
-    def set_finished(self):
-        """ Mark ResourceManager as FINISHED """
-        self.set_state(ResourceState.FINISHED, "_finish_time")
+        msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
-    def set_failed(self):
+    def set_failed(self, time = None):
         """ Mark ResourceManager as FAILED """
-        self.set_state(ResourceState.FAILED, "_failed_time")
+        self.set_state(ResourceState.FAILED, "_failed_time", time)
+
+        msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
-    def set_discovered(self):
+    def set_discovered(self, time = None):
         """ Mark ResourceManager as DISCOVERED """
-        self.set_state(ResourceState.DISCOVERED, "_discover_time")
+        self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
+        self.debug("----- DISCOVERED ---- ")
 
-    def set_provisioned(self):
+    def set_provisioned(self, time = None):
         """ Mark ResourceManager as PROVISIONED """
-        self.set_state(ResourceState.PROVISIONED, "_provision_time")
+        self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
+        self.debug("----- PROVISIONED ---- ")
+
+    def set_state(self, state, state_time_attr, time = None):
+        """ Set the state of the RM while keeping a trace of the time """
 
-    def set_state(self, state, state_time_attr):
         # Ensure that RM state will not change after released
         if self._state == ResourceState.RELEASED:
             return 
-   
-        setattr(self, state_time_attr, tnow())
+
+        time = time or tnow()
+        self.set_state_time(state, state_time_attr, time)
+  
+    def set_state_time(self, state, state_time_attr, time):
+        """ Set the time for the RM state change """
+        setattr(self, state_time_attr, time)
         self._state = state
 
 class ResourceFactory(object):
@@ -1011,7 +1109,7 @@ class ResourceFactory(object):
     @classmethod
     def register_type(cls, rclass):
         """Register a new Ressource Manager"""
-        cls._resource_types[rclass.rtype()] = rclass
+        cls._resource_types[rclass.get_rtype()] = rclass
 
     @classmethod
     def create(cls, rtype, ec, guid):
@@ -1020,7 +1118,7 @@ class ResourceFactory(object):
         return rclass(ec, guid)
 
 def populate_factory():
-    """Register all the possible RM that exists in the current version of Nepi.
+    """Find and rgister all available RMs
     """
     # Once the factory is populated, don't repopulate
     if not ResourceFactory.resource_types():
@@ -1039,7 +1137,7 @@ def find_types():
     path = os.path.dirname(nepi.resources.__file__)
     search_path.add(path)
 
-    types = []
+    types = set()
 
     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
             prefix = "nepi.resources."):
@@ -1047,7 +1145,7 @@ def find_types():
         loader = importer.find_module(modname)
         
         try:
-            # Notice: Repeated calls to load_module will act as a reload of teh module
+            # Notice: Repeated calls to load_module will act as a reload of the module
             if modname in sys.modules:
                 module = sys.modules.get(modname)
             else:
@@ -1066,7 +1164,7 @@ def find_types():
                     continue
 
                 if issubclass(attr, ResourceManager):
-                    types.append(attr)
+                    types.add(attr)
 
                     if not modname in sys.modules:
                         sys.modules[modname] = module
@@ -1080,4 +1178,3 @@ def find_types():
 
     return types
 
-