import os
import pkgutil
import sys
+import threading
import weakref
-reschedule_delay = "1s"
+reschedule_delay = "0.5s"
class ResourceAction:
""" Action that a user can order to a Resource Manager
READY = 3
STARTED = 4
STOPPED = 5
- FINISHED = 6
- FAILED = 7
- RELEASED = 8
+ FAILED = 6
+ RELEASED = 7
ResourceState2str = dict({
ResourceState.NEW : "NEW",
ResourceState.READY : "READY",
ResourceState.STARTED : "STARTED",
ResourceState.STOPPED : "STOPPED",
- ResourceState.FINISHED : "FINISHED",
ResourceState.FAILED : "FAILED",
ResourceState.RELEASED : "RELEASED",
})
def clsinit(cls):
""" Initializes template information (i.e. attributes and traces)
- for the ResourceManager class
- """
+ on classes derived from the ResourceManager class.
+
+ It is used as a decorator in the class declaration as follows:
+
+ @clsinit
+ class MyResourceManager(ResourceManager):
+
+ ...
+
+ """
+
cls._clsinit()
return cls
def clsinit_copy(cls):
""" Initializes template information (i.e. attributes and traces)
- for the ResourceManager class, inheriting attributes and traces
- from the parent class
+ on classes direved from the ResourceManager class.
+ It differs from the clsinit method in that it forces inheritance
+ of attributes and traces from the parent class.
+
+ It is used as a decorator in the class declaration as follows:
+
+ @clsinit
+ class MyResourceManager(ResourceManager):
+
+ ...
+
+
+ clsinit_copy should be prefered to clsinit when creating new
+ ResourceManager child classes.
+
"""
+
cls._clsinit_copy()
return cls
def failtrap(func):
+ """ Decorator function for instance methods that should set the
+ RM state to FAILED when an error is raised. The methods that must be
+ decorated are: discover, provision, deploy, start, stop.
+
+ """
def wrapped(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except:
+ self.fail()
+
import traceback
err = traceback.format_exc()
- self.error(err)
- self.debug("SETTING guid %d to state FAILED" % self.guid)
- self.fail()
+ logger = Logger(self._rtype)
+ logger.error(err)
+ logger.error("SETTING guid %d to state FAILED" % self.guid)
raise
return wrapped
-# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
""" Base clase for all ResourceManagers.
resource attribute
"""
+
cls._attributes[attr.name] = attr
@classmethod
resource attribute
"""
+
del cls._attributes[name]
@classmethod
resource trace
"""
+
cls._traces[trace.name] = trace
@classmethod
resource trace
"""
+
del cls._traces[name]
@classmethod
def _register_attributes(cls):
""" Resource subclasses will invoke this method to register
- resource attributes
+ resource attributes.
+
+ This method should be overriden in the RMs that define
+ attributes.
"""
- critical = Attribute("critical", "Defines whether the resource is critical. "
- " A failure on a critical resource will interrupt the experiment. ",
+ critical = Attribute("critical",
+ "Defines whether the resource is critical. "
+ "A failure on a critical resource will interrupt "
+ "the experiment. ",
type = Types.Bool,
default = True,
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
+ hard_release = Attribute("hardRelease",
+ "Forces removal of all result files and directories associated "
+ "to the RM upon resource release. After release the RM will "
+ "be removed from the EC and the results will not longer be "
+ "accessible",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Design)
cls._register_attribute(critical)
+ cls._register_attribute(hard_release)
@classmethod
def _register_traces(cls):
""" Resource subclasses will invoke this method to register
resource traces
+ This method should be overriden in the RMs that define traces.
+
"""
+
pass
@classmethod
def _clsinit(cls):
- """ ResourceManager child classes have different attributes and traces.
- Since the templates that hold the information of attributes and traces
- are 'class attribute' dictionaries, initially they all point to the
- parent class ResourceManager instances of those dictionaries.
- In order to make these templates independent from the parent's one,
- it is necessary re-initialize the corresponding dictionaries.
- This is the objective of the _clsinit method
+ """ ResourceManager classes have different attributes and traces.
+ Attribute and traces are stored in 'class attribute' dictionaries.
+ When a new ResourceManager class is created, the _clsinit method is
+ called to create a new instance of those dictionaries and initialize
+ them.
+
+ The _clsinit method is called by the clsinit decorator method.
+
"""
+
# static template for resource attributes
cls._attributes = dict()
cls._register_attributes()
@classmethod
def _clsinit_copy(cls):
- """ Same as _clsinit, except that it also inherits all attributes and traces
- from the parent class.
+ """ Same as _clsinit, except that after creating new instances of the
+ dictionaries it copies all the attributes and traces from the parent
+ class.
+
+ The _clsinit_copy method is called by the clsinit_copy decorator method.
+
"""
# static template for resource attributes
cls._attributes = copy.deepcopy(cls._attributes)
cls._register_traces()
@classmethod
- def rtype(cls):
+ def get_rtype(cls):
""" Returns the type of the Resource Manager
"""
"""
return copy.deepcopy(cls._attributes.values())
+ @classmethod
+ def get_attribute(cls, name):
+ """ Returns a copy of the attribute with name 'name'
+
+ """
+ return copy.deepcopy(cls._attributes[name])
+
@classmethod
def get_traces(cls):
""" Returns a copy of the traces
"""
return cls._backend
+ @classmethod
+ def get_global(cls, name):
+ """ Returns the value of a global attribute
+ Global attribute meaning an attribute for
+ all the resources from a rtype
+
+ :param name: Name of the attribute
+ :type name: str
+ :rtype: str
+ """
+ global_attr = cls._attributes[name]
+ return global_attr.value
+
+ @classmethod
+ def set_global(cls, name, value):
+ """ Set value for a global attribute
+
+ :param name: Name of the attribute
+ :type name: str
+ :param name: Value of the attribute
+ :type name: str
+ """
+ global_attr = cls._attributes[name]
+ global_attr.value = value
+ return value
+
def __init__(self, ec, guid):
- super(ResourceManager, self).__init__(self.rtype())
+ super(ResourceManager, self).__init__(self.get_rtype())
self._guid = guid
self._ec = weakref.ref(ec)
self._provision_time = None
self._ready_time = None
self._release_time = None
- self._finish_time = None
self._failed_time = None
self._state = ResourceState.NEW
+ # instance lock to synchronize exclusive state change methods (such
+ # as deploy and release methods), in order to prevent them from being
+ # executed at the same time and corrupt internal resource state
+ self._release_lock = threading.Lock()
+
@property
def guid(self):
""" Returns the global unique identifier of the RM """
@property
def ec(self):
- """ Returns the Experiment Controller """
+ """ Returns the Experiment Controller of the RM """
return self._ec()
@property
def connections(self):
- """ Returns the set of guids of connected RMs"""
+ """ Returns the set of guids of connected RMs """
return self._connections
@property
def conditions(self):
""" Returns the conditions to which the RM is subjected to.
- The object returned by this method is a dictionary indexed by
- ResourceAction."""
+ This method returns a dictionary of conditions lists indexed by
+ a ResourceAction.
+
+ """
return self._conditions
@property
def start_time(self):
- """ Returns the start time of the RM as a timestamp"""
+ """ Returns the start time of the RM as a timestamp """
return self._start_time
@property
def stop_time(self):
- """ Returns the stop time of the RM as a timestamp"""
+ """ Returns the stop time of the RM as a timestamp """
return self._stop_time
@property
def discover_time(self):
- """ Returns the time discovering was finished for the RM as a timestamp"""
+ """ Returns the discover time of the RM as a timestamp """
return self._discover_time
@property
def provision_time(self):
- """ Returns the time provisioning was finished for the RM as a timestamp"""
+ """ Returns the provision time of the RM as a timestamp """
return self._provision_time
@property
def ready_time(self):
- """ Returns the time deployment was finished for the RM as a timestamp"""
+ """ Returns the deployment time of the RM as a timestamp """
return self._ready_time
@property
def release_time(self):
- """ Returns the release time of the RM as a timestamp"""
+ """ Returns the release time of the RM as a timestamp """
return self._release_time
- @property
- def finish_time(self):
- """ Returns the finalization time of the RM as a timestamp"""
- return self._finish_time
-
@property
def failed_time(self):
- """ Returns the time failure occured for the RM as a timestamp"""
+ """ Returns the time failure occured for the RM as a timestamp """
return self._failed_time
@property
:param msg: text message
:type msg: str
:rtype: str
+
"""
- return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
+ return " %s guid %d - %s " % (self._rtype, self.guid, msg)
def register_connection(self, guid):
""" Registers a connection to the RM identified by guid
+ This method should not be overriden. Specific functionality
+ should be added in the do_connect method.
+
:param guid: Global unique identified of the RM to connect to
:type guid: int
+
"""
if self.valid_connection(guid):
- self.connect(guid)
+ self.do_connect(guid)
self._connections.add(guid)
def unregister_connection(self, guid):
""" Removes a registered connection to the RM identified by guid
+
+ This method should not be overriden. Specific functionality
+ should be added in the do_disconnect method.
:param guid: Global unique identified of the RM to connect to
:type guid: int
+
"""
if guid in self._connections:
- self.disconnect(guid)
+ self.do_disconnect(guid)
self._connections.remove(guid)
+ @failtrap
def discover(self):
""" Performs resource discovery.
-
+
This method is responsible for selecting an individual resource
matching user requirements.
- This method should be redefined when necessary in child classes.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_discover method.
"""
- self.set_discovered()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_discover()
+ @failtrap
def provision(self):
""" Performs resource provisioning.
This method is responsible for provisioning one resource.
After this method has been successfully invoked, the resource
should be accessible/controllable by the RM.
- This method should be redefined when necessary in child classes.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_provision method.
"""
- self.set_provisioned()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_provision()
+ @failtrap
def start(self):
- """ Starts the RM.
-
- There is no generic start behavior for all resources.
- This method should be redefined when necessary in child classes.
+ """ Starts the RM (e.g. launch remote process).
+
+ There is no standard start behavior. Some RMs will not need to perform
+ any actions upon start.
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_start method.
"""
+
if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self.set_started()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_start()
+ @failtrap
def stop(self):
""" Interrupts the RM, stopping any tasks the RM was performing.
-
- There is no generic stop behavior for all resources.
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
-
+
+ There is no standard stop behavior. Some RMs will not need to perform
+ any actions upon stop.
+
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_stop method.
+
"""
if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
- self.set_stopped()
+ with self._release_lock:
+ self.do_stop()
+ @failtrap
def deploy(self):
""" Execute all steps required for the RM to reach the state READY.
- This method is responsible for deploying the resource (and invoking the
- discover and provision methods).
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
-
+ This method is responsible for deploying the resource (and invoking
+ the discover and provision methods).
+
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_deploy method.
+
"""
if self.state > ResourceState.READY:
self.error("Wrong state %s for deploy" % self.state)
return
- self.debug("----- READY ---- ")
- self.set_ready()
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_deploy()
def release(self):
""" Perform actions to free resources used by the RM.
-
+
This method is responsible for releasing resources that were
used during the experiment by the RM.
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, this method should never
- raise an error and it must ensure the RM is set to state RELEASED.
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_release method.
+
"""
- self.set_released()
-
- def finish(self):
- """ Sets the RM to state FINISHED.
-
- The FINISHED state is different from STOPPED in that it should not be
- directly invoked by the user.
- STOPPED indicates that the user interrupted the RM, FINISHED means
- that the RM concluded normally the actions it was supposed to perform.
- This method should be redefined when necessary in child classes.
-
- If overridden in child classes, make sure to use the failtrap
- decorator to ensure the RM state will be set to FAILED in the event
- of an exception.
+ with self._release_lock:
+ try:
+ self.do_release()
+ except:
+ self.set_released()
+
+ import traceback
+ err = traceback.format_exc()
+ msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
+ self._rtype, self.guid, err)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
- """
-
- self.set_finished()
-
def fail(self):
""" Sets the RM to state FAILED.
- """
+ This method should not be overriden directly. Specific functionality
+ should be added in the do_fail method.
- self.set_failed()
+ """
+ with self._release_lock:
+ if self._state != ResourceState.RELEASED:
+ self.do_fail()
def set(self, name, value):
""" Set the value of the attribute
"""
attr = self._attrs[name]
attr.value = value
+ return value
def get(self, name):
""" Returns the value of the attribute
:rtype: str
"""
attr = self._attrs[name]
+
+ """
+ A.Q. Commenting due to performance impact
+ if attr.has_flag(Flags.Global):
+ self.warning( "Attribute %s is global. Use get_global instead." % name)
+ """
+
return attr.value
+ def has_changed(self, name):
+ """ Returns the True is the value of the attribute
+ has been modified by the user.
+
+ :param name: Name of the attribute
+ :type name: str
+ :rtype: str
+ """
+ attr = self._attrs[name]
+ return attr.has_changed
+
+ def has_flag(self, name, flag):
+ """ Returns true if the attribute has the flag 'flag'
+
+ :param flag: Flag to be checked
+ :type flag: Flags
+ """
+ attr = self._attrs[name]
+ return attr.has_flag(flag)
+
+ def has_attribute(self, name):
+ """ Returns true if the RM has an attribute with name
+
+ :param name: name of the attribute
+ :type name: string
+ """
+ return name in self._attrs
+
def enable_trace(self, name):
""" Explicitly enable trace generation
connected.append(rm)
return connected
+ def is_rm_instance(self, rtype):
+ """ Returns True if the RM is instance of 'rtype'
+
+ :param rtype: Type of the RM we look for
+ :type rtype: str
+ :return: True|False
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ if isinstance(self, rclass):
+ return True
+ return False
+
+ @failtrap
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
all elements in 'group' have reached state 'state'.
# check state and time elapsed on all RMs
for guid in group:
rm = self.ec.get_resource(guid)
+
+ # If one of the RMs this resource needs to wait for has FAILED
+ # and is critical we raise an exception
+ if rm.state == ResourceState.FAILED:
+ if not rm.get('critical'):
+ continue
+ msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
+ raise RuntimeError, msg
+
# If the RM state is lower than the requested state we must
# reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
t = rm.start_time
elif state == ResourceState.STOPPED:
t = rm.stop_time
+ elif state == ResourceState.RELEASED:
+ t = rm.release_time
else:
break
action 'START' are satisfied.
"""
+ #import pdb;pdb.set_trace()
+
reschedule = False
delay = reschedule_delay
+
## evaluate if conditions to start are met
if self.ec.abort:
return
# only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
ResourceState.PROVISIONED]:
+ #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
reschedule = True
self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
else:
#for guid in group:
# rm = self.ec.get_resource(guid)
# unmet.append((guid, rm._state))
- #
+
#self.debug("---- WAITED STATES ---- %s" % unmet )
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
self.ec.schedule(delay, self.deploy_with_conditions)
else:
- self.debug("----- STARTING ---- ")
+ self.debug("----- DEPLOYING ---- ")
self.deploy()
- def connect(self, guid):
+ def do_connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
This method should be redefined when necessary in child classes.
"""
pass
- def disconnect(self, guid):
+ def do_disconnect(self, guid):
""" Performs actions that need to be taken upon disassociating RMs.
This method should be redefined when necessary in child classes.
"""
"""
# TODO: Validate!
return True
-
- def set_started(self):
+
+ def do_discover(self):
+ self.set_discovered()
+
+ def do_provision(self):
+ self.set_provisioned()
+
+ def do_start(self):
+ self.set_started()
+
+ def do_stop(self):
+ self.set_stopped()
+
+ def do_deploy(self):
+ self.set_ready()
+
+ def do_release(self):
+ self.set_released()
+
+ def do_fail(self):
+ self.set_failed()
+ self.ec.inform_failure(self.guid)
+
+ def set_started(self, time = None):
""" Mark ResourceManager as STARTED """
- self.set_state(ResourceState.STARTED, "_start_time")
-
- def set_stopped(self):
+ self.set_state(ResourceState.STARTED, "_start_time", time)
+ self.debug("----- STARTED ---- ")
+
+ def set_stopped(self, time = None):
""" Mark ResourceManager as STOPPED """
- self.set_state(ResourceState.STOPPED, "_stop_time")
+ self.set_state(ResourceState.STOPPED, "_stop_time", time)
+ self.debug("----- STOPPED ---- ")
- def set_ready(self):
+ def set_ready(self, time = None):
""" Mark ResourceManager as READY """
- self.set_state(ResourceState.READY, "_ready_time")
+ self.set_state(ResourceState.READY, "_ready_time", time)
+ self.debug("----- READY ---- ")
- def set_released(self):
+ def set_released(self, time = None):
""" Mark ResourceManager as REALEASED """
- self.set_state(ResourceState.RELEASED, "_release_time")
+ self.set_state(ResourceState.RELEASED, "_release_time", time)
- def set_finished(self):
- """ Mark ResourceManager as FINISHED """
- self.set_state(ResourceState.FINISHED, "_finish_time")
+ msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
- def set_failed(self):
+ def set_failed(self, time = None):
""" Mark ResourceManager as FAILED """
- self.set_state(ResourceState.FAILED, "_failed_time")
+ self.set_state(ResourceState.FAILED, "_failed_time", time)
+
+ msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+ logger = Logger(self._rtype)
+ logger.debug(msg)
- def set_discovered(self):
+ def set_discovered(self, time = None):
""" Mark ResourceManager as DISCOVERED """
- self.set_state(ResourceState.DISCOVERED, "_discover_time")
+ self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
+ self.debug("----- DISCOVERED ---- ")
- def set_provisioned(self):
+ def set_provisioned(self, time = None):
""" Mark ResourceManager as PROVISIONED """
- self.set_state(ResourceState.PROVISIONED, "_provision_time")
+ self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
+ self.debug("----- PROVISIONED ---- ")
+
+ def set_state(self, state, state_time_attr, time = None):
+ """ Set the state of the RM while keeping a trace of the time """
- def set_state(self, state, state_time_attr):
# Ensure that RM state will not change after released
if self._state == ResourceState.RELEASED:
return
-
- setattr(self, state_time_attr, tnow())
+
+ time = time or tnow()
+ self.set_state_time(state, state_time_attr, time)
+
+ def set_state_time(self, state, state_time_attr, time):
+ """ Set the time for the RM state change """
+ setattr(self, state_time_attr, time)
self._state = state
class ResourceFactory(object):
@classmethod
def register_type(cls, rclass):
"""Register a new Ressource Manager"""
- cls._resource_types[rclass.rtype()] = rclass
+ cls._resource_types[rclass.get_rtype()] = rclass
@classmethod
def create(cls, rtype, ec, guid):
return rclass(ec, guid)
def populate_factory():
- """Register all the possible RM that exists in the current version of Nepi.
+ """Find and rgister all available RMs
"""
# Once the factory is populated, don't repopulate
if not ResourceFactory.resource_types():
path = os.path.dirname(nepi.resources.__file__)
search_path.add(path)
- types = []
+ types = set()
for importer, modname, ispkg in pkgutil.walk_packages(search_path,
prefix = "nepi.resources."):
loader = importer.find_module(modname)
try:
- # Notice: Repeated calls to load_module will act as a reload of teh module
+ # Notice: Repeated calls to load_module will act as a reload of the module
if modname in sys.modules:
module = sys.modules.get(modname)
else:
continue
if issubclass(attr, ResourceManager):
- types.append(attr)
+ types.add(attr)
if not modname in sys.modules:
sys.modules[modname] = module
return types
-