from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
+from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import TraceAttr
import copy
import logging
import os
import pkgutil
+import sys
import weakref
reschedule_delay = "1s"
cls._clsinit_copy()
return cls
+def failtrap(func):
+ def wrapped(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.error(err)
+ self.debug("SETTING guid %d to state FAILED" % self.guid)
+ self.fail()
+ raise
+
+ return wrapped
+
# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
+ """ Base clase for all ResourceManagers.
+
+ A ResourceManger is specific to a resource type (e.g. Node,
+ Switch, Application, etc) on a specific backend (e.g. PlanetLab,
+ OMF, etc).
+
+ The ResourceManager instances are responsible for interacting with
+ and controlling concrete (physical or virtual) resources in the
+ experimental backends.
+
+ """
_rtype = "Resource"
_attributes = None
_traces = None
+ _help = None
+ _backend = None
@classmethod
def _register_attribute(cls, attr):
resource attributes
"""
- pass
+ critical = Attribute("critical", "Defines whether the resource is critical. "
+ " A failure on a critical resource will interrupt the experiment. ",
+ type = Types.Bool,
+ default = True,
+ flags = Flags.ExecReadOnly)
+ cls._register_attribute(critical)
+
@classmethod
def _register_traces(cls):
""" Resource subclasses will invoke this method to register
"""
return copy.deepcopy(cls._traces.values())
+ @classmethod
+ def get_help(cls):
+ """ Returns the description of the type of Resource
+
+ """
+ return cls._help
+
+ @classmethod
+ def get_backend(cls):
+ """ Returns the identified of the backend (i.e. testbed, environment)
+ for the Resource
+
+ """
+ return cls._backend
+
def __init__(self, ec, guid):
super(ResourceManager, self).__init__(self.rtype())
# the resource instance gets a copy of all traces
self._trcs = copy.deepcopy(self._traces)
- self._state = ResourceState.NEW
+ # Each resource is placed on a deployment group by the EC
+ # during deployment
+ self.deployment_group = None
self._start_time = None
self._stop_time = None
self._finish_time = None
self._failed_time = None
+ self._state = ResourceState.NEW
+
@property
def guid(self):
""" Returns the global unique identifier of the RM """
@property
def state(self):
- """ Get the state of the current RM """
+ """ Get the current state of the RM """
return self._state
def log_message(self, msg):
def discover(self):
""" Performs resource discovery.
- This method is resposible for selecting an individual resource
+ This method is responsible for selecting an individual resource
matching user requirements.
This method should be redefined when necessary in child classes.
- """
- self._discover_time = tnow()
- self._state = ResourceState.DISCOVERED
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+ self.set_discovered()
def provision(self):
""" Performs resource provisioning.
- This method is resposible for provisioning one resource.
+ This method is responsible for provisioning one resource.
After this method has been successfully invoked, the resource
- should be acccesible/controllable by the RM.
+ should be accessible/controllable by the RM.
This method should be redefined when necessary in child classes.
- """
- self._provision_time = tnow()
- self._state = ResourceState.PROVISIONED
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+ self.set_provisioned()
def start(self):
- """ Starts the resource.
+ """ Starts the RM.
There is no generic start behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
- if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+ if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self._start_time = tnow()
- self._state = ResourceState.STARTED
+ self.set_started()
def stop(self):
- """ Stops the resource.
+ """ Interrupts the RM, stopping any tasks the RM was performing.
There is no generic stop behavior for all resources.
This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
"""
- if not self._state in [ResourceState.STARTED]:
+ if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
+
+ self.set_stopped()
+
+ def deploy(self):
+ """ Execute all steps required for the RM to reach the state READY.
+
+ This method is responsible for deploying the resource (and invoking the
+ discover and provision methods).
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+ if self.state > ResourceState.READY:
+ self.error("Wrong state %s for deploy" % self.state)
+ return
+
+ self.debug("----- READY ---- ")
+ self.set_ready()
+
+ def release(self):
+ """ Perform actions to free resources used by the RM.
+
+ This method is responsible for releasing resources that were
+ used during the experiment by the RM.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, this method should never
+ raise an error and it must ensure the RM is set to state RELEASED.
+
+ """
+ self.set_released()
+
+ def finish(self):
+ """ Sets the RM to state FINISHED.
+
+ The FINISHED state is different from STOPPED in that it should not be
+ directly invoked by the user.
+ STOPPED indicates that the user interrupted the RM, FINISHED means
+ that the RM concluded normally the actions it was supposed to perform.
+ This method should be redefined when necessary in child classes.
+
+ If overridden in child classes, make sure to use the failtrap
+ decorator to ensure the RM state will be set to FAILED in the event
+ of an exception.
+
+ """
+
+ self.set_finished()
+
+ def fail(self):
+ """ Sets the RM to state FAILED.
+
+ """
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
+ self.set_failed()
def set(self, name, value):
""" Set the value of the attribute
attr = self._attrs[name]
return attr.value
- def register_trace(self, name):
+ def enable_trace(self, name):
""" Explicitly enable trace generation
:param name: Name of the trace
"""
trace = self._trcs[name]
trace.enabled = True
+
+ def trace_enabled(self, name):
+ """Returns True if trace is enables
+ :param name: Name of the trace
+ :type name: str
+ """
+ trace = self._trcs[name]
+ return trace.enabled
+
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
""" Get information on collected trace
:type action: str
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
- :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+ :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
def unregister_condition(self, group, action = None):
""" Removed conditions for a certain group of guids
- :param action: Action to restrict to condition (either 'START' or 'STOP')
+ :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
:type action: str
:param group: Group of RMs to wait for (list of guids)
newgrp.difference_update(intsec)
conditions[idx] = (newgrp, state, time)
- def get_connected(self, rclass = None):
+ def get_connected(self, rtype = None):
""" Returns the list of RM with the type 'rtype'
:param rtype: Type of the RM we look for
:return: list of guid
"""
connected = []
+ rclass = ResourceFactory.get_resource_type(rtype)
for guid in self.connections:
rm = self.ec.get_resource(guid)
- if not rclass or isinstance(rm, rclass):
+ if not rtype or isinstance(rm, rclass):
connected.append(rm)
return connected
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
- :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+ :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
elif state == ResourceState.STOPPED:
t = rm.stop_time
else:
- # Only keep time information for START and STOP
break
# time already elapsed since RM changed state
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to start are met
+ if self.ec.abort:
+ return
- # only can start when RM is either STOPPED or READY
+ # Can only start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
self.debug("---- RESCHEDULING START ---- state %s " % self.state )
reschedule = False
delay = reschedule_delay
- ## evaluate if set conditions are met
+ ## evaluate if conditions to stop are met
+ if self.ec.abort:
+ return
# only can stop when RM is STARTED
if self.state != ResourceState.STARTED:
reschedule = True
+ self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
else:
self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
self.debug(" ----- STOPPING ---- ")
self.stop()
- def deploy(self):
- """ Execute all steps required for the RM to reach the state READY
-
- """
- if self._state > ResourceState.READY:
- self.error("Wrong state %s for deploy" % self.state)
- return
-
- self.debug("----- READY ---- ")
- self._ready_time = tnow()
- self._state = ResourceState.READY
-
- def release(self):
- """Release any resources used by this RM
+ def deploy_with_conditions(self):
+ """ Deploy RM when all the conditions in self.conditions for
+ action 'READY' are satisfied.
"""
- self._release_time = tnow()
- self._state = ResourceState.RELEASED
+ reschedule = False
+ delay = reschedule_delay
- def finish(self):
- """ Mark ResourceManager as FINISHED
+ ## evaluate if conditions to deploy are met
+ if self.ec.abort:
+ return
- """
- self._finish_time = tnow()
- self._state = ResourceState.FINISHED
+ # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
+ if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
+ ResourceState.PROVISIONED]:
+ reschedule = True
+ self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
+ else:
+ deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
+
+ self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
+
+ # Verify all start conditions are met
+ for (group, state, time) in deploy_conditions:
+ # Uncomment for debug
+ #unmet = []
+ #for guid in group:
+ # rm = self.ec.get_resource(guid)
+ # unmet.append((guid, rm._state))
+ #
+ #self.debug("---- WAITED STATES ---- %s" % unmet )
- def fail(self):
- """ Mark ResourceManager as FAILED
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
- """
- self._failed_time = tnow()
- self._state = ResourceState.FAILED
+ if reschedule:
+ self.ec.schedule(delay, self.deploy_with_conditions)
+ else:
+ self.debug("----- STARTING ---- ")
+ self.deploy()
def connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
"""
# TODO: Validate!
return True
+
+ def set_started(self):
+ """ Mark ResourceManager as STARTED """
+ self.set_state(ResourceState.STARTED, "_start_time")
+
+ def set_stopped(self):
+ """ Mark ResourceManager as STOPPED """
+ self.set_state(ResourceState.STOPPED, "_stop_time")
+
+ def set_ready(self):
+ """ Mark ResourceManager as READY """
+ self.set_state(ResourceState.READY, "_ready_time")
+
+ def set_released(self):
+ """ Mark ResourceManager as REALEASED """
+ self.set_state(ResourceState.RELEASED, "_release_time")
+
+ def set_finished(self):
+ """ Mark ResourceManager as FINISHED """
+ self.set_state(ResourceState.FINISHED, "_finish_time")
+
+ def set_failed(self):
+ """ Mark ResourceManager as FAILED """
+ self.set_state(ResourceState.FAILED, "_failed_time")
+
+ def set_discovered(self):
+ """ Mark ResourceManager as DISCOVERED """
+ self.set_state(ResourceState.DISCOVERED, "_discover_time")
+
+ def set_provisioned(self):
+ """ Mark ResourceManager as PROVISIONED """
+ self.set_state(ResourceState.PROVISIONED, "_provision_time")
+
+ def set_state(self, state, state_time_attr):
+ # Ensure that RM state will not change after released
+ if self._state == ResourceState.RELEASED:
+ return
+
+ setattr(self, state_time_attr, tnow())
+ self._state = state
class ResourceFactory(object):
_resource_types = dict()
"""Return the type of the Class"""
return cls._resource_types
+ @classmethod
+ def get_resource_type(cls, rtype):
+ """Return the type of the Class"""
+ return cls._resource_types.get(rtype)
+
@classmethod
def register_type(cls, rclass):
"""Register a new Ressource Manager"""
def populate_factory():
"""Register all the possible RM that exists in the current version of Nepi.
"""
- for rclass in find_types():
- ResourceFactory.register_type(rclass)
+ # Once the factory is populated, don't repopulate
+ if not ResourceFactory.resource_types():
+ for rclass in find_types():
+ ResourceFactory.register_type(rclass)
def find_types():
"""Look into the different folders to find all the
availables Resources Managers
-
"""
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))
types = []
- for importer, modname, ispkg in pkgutil.walk_packages(search_path):
+ for importer, modname, ispkg in pkgutil.walk_packages(search_path,
+ prefix = "nepi.resources."):
+
loader = importer.find_module(modname)
+
try:
- module = loader.load_module(loader.fullname)
+ # Notice: Repeated calls to load_module will act as a reload of teh module
+ if modname in sys.modules:
+ module = sys.modules.get(modname)
+ else:
+ module = loader.load_module(modname)
+
for attrname in dir(module):
if attrname.startswith("_"):
continue
if issubclass(attr, ResourceManager):
types.append(attr)
+
+ if not modname in sys.modules:
+ sys.modules[modname] = module
+
except:
import traceback
import logging
err = traceback.format_exc()
logger = logging.getLogger("Resource.find_types()")
- logger.error("Error while lading Resource Managers %s" % err)
+ logger.error("Error while loading Resource Managers %s" % err)
return types