-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
+from nepi.util.logger import Logger
from nepi.execution.trace import TraceAttr
import copy
import functools
-import inspect
import logging
import os
import pkgutil
+import sys
import weakref
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
class ResourceAction:
+ """ Action that a user can order to a Resource Manager
+
+ """
DEPLOY = 0
START = 1
STOP = 2
class ResourceState:
+ """ State of a Resource Manager
+
+ """
NEW = 0
DISCOVERED = 1
PROVISIONED = 2
FAILED = 7
RELEASED = 8
+ResourceState2str = dict({
+ ResourceState.NEW : "NEW",
+ ResourceState.DISCOVERED : "DISCOVERED",
+ ResourceState.PROVISIONED : "PROVISIONED",
+ ResourceState.READY : "READY",
+ ResourceState.STARTED : "STARTED",
+ ResourceState.STOPPED : "STOPPED",
+ ResourceState.FINISHED : "FINISHED",
+ ResourceState.FAILED : "FAILED",
+ ResourceState.RELEASED : "RELEASED",
+ })
+
def clsinit(cls):
+ """ Initializes template information (i.e. attributes and traces)
+ for the ResourceManager class
+ """
cls._clsinit()
return cls
+def clsinit_copy(cls):
+ """ Initializes template information (i.e. attributes and traces)
+ for the ResourceManager class, inheriting attributes and traces
+ from the parent class
+ """
+ cls._clsinit_copy()
+ return cls
+
# Decorator to invoke class initialization method
@clsinit
-class ResourceManager(object):
+class ResourceManager(Logger):
+ """ Base clase for all ResourceManagers.
+
+ A ResourceManger is specific to a resource type (e.g. Node,
+ Switch, Application, etc) on a specific backend (e.g. PlanetLab,
+ OMF, etc).
+
+ The ResourceManager instances are responsible for interacting with
+ and controlling concrete (physical or virtual) resources in the
+ experimental backends.
+
+ """
_rtype = "Resource"
- _filters = None
_attributes = None
_traces = None
+ _help = None
+ _backend = None
@classmethod
- def _register_filter(cls, attr):
+ def _register_attribute(cls, attr):
""" Resource subclasses will invoke this method to add a
- filter attribute
+ resource attribute
"""
- cls._filters[attr.name] = attr
+ cls._attributes[attr.name] = attr
@classmethod
- def _register_attribute(cls, attr):
- """ Resource subclasses will invoke this method to add a
+ def _remove_attribute(cls, name):
+ """ Resource subclasses will invoke this method to remove a
resource attribute
"""
- cls._attributes[attr.name] = attr
+ del cls._attributes[name]
@classmethod
def _register_trace(cls, trace):
"""
cls._traces[trace.name] = trace
-
@classmethod
- def _register_filters(cls):
- """ Resource subclasses will invoke this method to register
- resource filters
+ def _remove_trace(cls, name):
+ """ Resource subclasses will invoke this method to remove a
+ resource trace
"""
- pass
+ del cls._traces[name]
@classmethod
def _register_attributes(cls):
@classmethod
def _clsinit(cls):
- """ Create a new dictionnary instance of the dictionnary
- with the same template.
-
- Each ressource should have the same registration dictionary
- template with different instances.
+ """ ResourceManager child classes have different attributes and traces.
+ Since the templates that hold the information of attributes and traces
+ are 'class attribute' dictionaries, initially they all point to the
+ parent class ResourceManager instances of those dictionaries.
+ In order to make these templates independent from the parent's one,
+ it is necessary re-initialize the corresponding dictionaries.
+ This is the objective of the _clsinit method
"""
- # static template for resource filters
- cls._filters = dict()
- cls._register_filters()
-
# static template for resource attributes
cls._attributes = dict()
cls._register_attributes()
cls._register_traces()
@classmethod
- def rtype(cls):
- return cls._rtype
+ def _clsinit_copy(cls):
+ """ Same as _clsinit, except that it also inherits all attributes and traces
+ from the parent class.
+ """
+ # static template for resource attributes
+ cls._attributes = copy.deepcopy(cls._attributes)
+ cls._register_attributes()
+
+ # static template for resource traces
+ cls._traces = copy.deepcopy(cls._traces)
+ cls._register_traces()
@classmethod
- def get_filters(cls):
- """ Returns a copy of the filters
+ def rtype(cls):
+ """ Returns the type of the Resource Manager
"""
- return copy.deepcopy(cls._filters.values())
+ return cls._rtype
@classmethod
def get_attributes(cls):
"""
return copy.deepcopy(cls._traces.values())
+ @classmethod
+ def get_help(cls):
+ """ Returns the description of the type of Resource
+
+ """
+ return cls._help
+
+ @classmethod
+ def get_backend(cls):
+ """ Returns the identified of the backend (i.e. testbed, environment)
+ for the Resource
+
+ """
+ return cls._backend
+
def __init__(self, ec, guid):
+ super(ResourceManager, self).__init__(self.rtype())
+
self._guid = guid
self._ec = weakref.ref(ec)
self._connections = set()
# the resource instance gets a copy of all traces
self._trcs = copy.deepcopy(self._traces)
- self._state = ResourceState.NEW
+ # Each resource is placed on a deployment group by the EC
+ # during deployment
+ self.deployment_group = None
self._start_time = None
self._stop_time = None
self._provision_time = None
self._ready_time = None
self._release_time = None
+ self._finish_time = None
+ self._failed_time = None
- # Logging
- self._logger = logging.getLogger("Resource")
-
- def debug(self, msg, out = None, err = None):
- self.log(msg, logging.DEBUG, out, err)
-
- def error(self, msg, out = None, err = None):
- self.log(msg, logging.ERROR, out, err)
-
- def warn(self, msg, out = None, err = None):
- self.log(msg, logging.WARNING, out, err)
-
- def info(self, msg, out = None, err = None):
- self.log(msg, logging.INFO, out, err)
-
- def log(self, msg, level, out = None, err = None):
- if out:
- msg += " - OUT: %s " % out
-
- if err:
- msg += " - ERROR: %s " % err
-
- msg = self.log_message(msg)
-
- self.logger.log(level, msg)
-
- def log_message(self, msg):
- return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
-
- @property
- def logger(self):
- return self._logger
+ self._state = ResourceState.NEW
@property
def guid(self):
+ """ Returns the global unique identifier of the RM """
return self._guid
@property
def ec(self):
+ """ Returns the Experiment Controller """
return self._ec()
@property
def connections(self):
+ """ Returns the set of guids of connected RMs"""
return self._connections
@property
def conditions(self):
+ """ Returns the conditions to which the RM is subjected to.
+
+ The object returned by this method is a dictionary indexed by
+ ResourceAction."""
return self._conditions
@property
def start_time(self):
- """ Returns timestamp with the time the RM started """
+ """ Returns the start time of the RM as a timestamp"""
return self._start_time
@property
def stop_time(self):
- """ Returns timestamp with the time the RM stopped """
+ """ Returns the stop time of the RM as a timestamp"""
return self._stop_time
@property
def discover_time(self):
- """ Returns timestamp with the time the RM passed to state discovered """
+ """ Returns the time discovering was finished for the RM as a timestamp"""
return self._discover_time
@property
def provision_time(self):
- """ Returns timestamp with the time the RM passed to state provisioned """
+ """ Returns the time provisioning was finished for the RM as a timestamp"""
return self._provision_time
@property
def ready_time(self):
- """ Returns timestamp with the time the RM passed to state ready """
+ """ Returns the time deployment was finished for the RM as a timestamp"""
return self._ready_time
@property
def release_time(self):
- """ Returns timestamp with the time the RM was released """
+ """ Returns the release time of the RM as a timestamp"""
return self._release_time
+ @property
+ def finish_time(self):
+ """ Returns the finalization time of the RM as a timestamp"""
+ return self._finish_time
+
+ @property
+ def failed_time(self):
+ """ Returns the time failure occured for the RM as a timestamp"""
+ return self._failed_time
+
@property
def state(self):
+ """ Get the state of the current RM """
return self._state
- def connect(self, guid):
+ def log_message(self, msg):
+ """ Returns the log message formatted with added information.
+
+ :param msg: text message
+ :type msg: str
+ :rtype: str
+ """
+ return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
+
+ def register_connection(self, guid):
+ """ Registers a connection to the RM identified by guid
+
+ :param guid: Global unique identified of the RM to connect to
+ :type guid: int
+ """
if self.valid_connection(guid):
+ self.connect(guid)
self._connections.add(guid)
- def discover(self, filters = None):
- self._discover_time = strfnow()
- self._state = ResourceState.DISCOVERED
+ def unregister_connection(self, guid):
+ """ Removes a registered connection to the RM identified by guid
- def provision(self, filters = None):
- self._provision_time = strfnow()
- self._state = ResourceState.PROVISIONED
+ :param guid: Global unique identified of the RM to connect to
+ :type guid: int
+ """
+ if guid in self._connections:
+ self.disconnect(guid)
+ self._connections.remove(guid)
- def start(self):
- """ Start the Resource Manager
+ def discover(self):
+ """ Performs resource discovery.
+
+ This method is resposible for selecting an individual resource
+ matching user requirements.
+ This method should be redefined when necessary in child classes.
+ """
+ self.set_discovered()
+
+ def provision(self):
+ """ Performs resource provisioning.
+
+ This method is resposible for provisioning one resource.
+ After this method has been successfully invoked, the resource
+ should be acccesible/controllable by the RM.
+ This method should be redefined when necessary in child classes.
+ """
+ self.set_provisioned()
+ def start(self):
+ """ Starts the resource.
+
+ There is no generic start behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
- if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
+ if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self._start_time = strfnow()
- self._state = ResourceState.STARTED
+ self.set_started()
def stop(self):
- """ Start the Resource Manager
-
+ """ Stops the resource.
+
+ There is no generic stop behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
- if not self._state in [ResourceState.STARTED]:
+ if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
+
+ self.set_stopped()
- self._stop_time = strfnow()
- self._state = ResourceState.STOPPED
+ def deploy(self):
+ """ Execute all steps required for the RM to reach the state READY
+
+ """
+ if self.state > ResourceState.READY:
+ self.error("Wrong state %s for deploy" % self.state)
+ return
+
+ self.debug("----- READY ---- ")
+ self.set_ready()
+
+ def release(self):
+ self.set_released()
+
+ def finish(self):
+ self.set_finished()
+
+ def fail(self):
+ self.set_failed()
def set(self, name, value):
""" Set the value of the attribute
attr.value = value
def get(self, name):
- """ Start the Resource Manager
+ """ Returns the value of the attribute
:param name: Name of the attribute
:type name: str
attr = self._attrs[name]
return attr.value
- def register_trace(self, name):
- """ Enable trace
+ def enable_trace(self, name):
+ """ Explicitly enable trace generation
:param name: Name of the trace
:type name: str
"""
trace = self._trcs[name]
trace.enabled = True
+
+ def trace_enabled(self, name):
+ """Returns True if trace is enables
+ :param name: Name of the trace
+ :type name: str
+ """
+ trace = self._trcs[name]
+ return trace.enabled
+
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
""" Get information on collected trace
"""
pass
- def register_condition(self, action, group, state,
- time = None):
+ def register_condition(self, action, group, state, time = None):
""" Registers a condition on the resource manager to allow execution
of 'action' only after 'time' has elapsed from the moment all resources
in 'group' reached state 'state'
:type action: str
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
- :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+ :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
"""
+
+ if not action in self.conditions:
+ self._conditions[action] = list()
+
conditions = self.conditions.get(action)
- if not conditions:
- conditions = list()
- self._conditions[action] = conditions
# For each condition to register a tuple of (group, state, time) is
# added to the 'action' list
conditions.append((group, state, time))
- def get_connected(self, rtype):
+ def unregister_condition(self, group, action = None):
+ """ Removed conditions for a certain group of guids
+
+ :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
+ :type action: str
+
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+
+ """
+ # For each condition a tuple of (group, state, time) is
+ # added to the 'action' list
+ if not isinstance(group, list):
+ group = [group]
+
+ for act, conditions in self.conditions.iteritems():
+ if action and act != action:
+ continue
+
+ for condition in list(conditions):
+ (grp, state, time) = condition
+
+ # If there is an intersection between grp and group,
+ # then remove intersected elements
+ intsec = set(group).intersection(set(grp))
+ if intsec:
+ idx = conditions.index(condition)
+ newgrp = set(grp)
+ newgrp.difference_update(intsec)
+ conditions[idx] = (newgrp, state, time)
+
+ def get_connected(self, rtype = None):
+ """ Returns the list of RM with the type 'rtype'
+
+ :param rtype: Type of the RM we look for
+ :type rtype: str
+ :return: list of guid
+ """
connected = []
+ rclass = ResourceFactory.get_resource_type(rtype)
for guid in self.connections:
rm = self.ec.get_resource(guid)
- if rm.rtype() == rtype:
+ if not rtype or isinstance(rm, rclass):
connected.append(rm)
return connected
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
- :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
+ :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
for guid in group:
rm = self.ec.get_resource(guid)
# If the RM state is lower than the requested state we must
- # reschedule (e.g. if RM is READY but we required STARTED)
+ # reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
reschedule = True
break
elif state == ResourceState.STOPPED:
t = rm.stop_time
else:
- # Only keep time information for START and STOP
break
- d = strfdiff(strfnow(), t)
- wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
+ # time already elapsed since RM changed state
+ waited = "%fs" % tdiffsec(tnow(), t)
+
+ # time still to wait
+ wait = tdiffsec(stabsformat(time), stabsformat(waited))
+
if wait > 0.001:
reschedule = True
delay = "%fs" % wait
break
+
return reschedule, delay
def set_with_conditions(self, name, value, group, state, time):
""" Set value 'value' on attribute with name 'name' when 'time'
- has elapsed since all elements in 'group' have reached state
- 'state'
+ has elapsed since all elements in 'group' have reached state
+ 'state'
:param name: Name of the attribute to set
:type name: str
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
-
"""
reschedule = False
callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
- self.logger.debug(" ----- STOPPING ---- ")
+ self.debug(" ----- STOPPING ---- ")
self.stop()
- def deploy(self):
- """ Execute all steps required for the RM to reach the state READY
+ def deploy_with_conditions(self):
+ """ Deploy RM when all the conditions in self.conditions for
+ action 'READY' are satisfied.
"""
- if self._state > ResourceState.READY:
- self.error("Wrong state %s for deploy" % self.state)
- return
+ reschedule = False
+ delay = reschedule_delay
- self.debug("----- DEPLOYING ---- ")
- self._ready_time = strfnow()
- self._state = ResourceState.READY
+ ## evaluate if set conditions are met
- def release(self):
- """Clean the resource at the end of the Experiment and change the status
+ # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
+ if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
+ ResourceState.PROVISIONED]:
+ reschedule = True
+ self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
+ else:
+ deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
+
+ self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
+
+ # Verify all start conditions are met
+ for (group, state, time) in deploy_conditions:
+ # Uncomment for debug
+ #unmet = []
+ #for guid in group:
+ # rm = self.ec.get_resource(guid)
+ # unmet.append((guid, rm._state))
+ #
+ #self.debug("---- WAITED STATES ---- %s" % unmet )
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
+
+ if reschedule:
+ self.ec.schedule(delay, self.deploy_with_conditions)
+ else:
+ self.debug("----- STARTING ---- ")
+ self.deploy()
+
+
+ def connect(self, guid):
+ """ Performs actions that need to be taken upon associating RMs.
+ This method should be redefined when necessary in child classes.
"""
- self._release_time = strfnow()
- self._state = ResourceState.RELEASED
+ pass
+
+ def disconnect(self, guid):
+ """ Performs actions that need to be taken upon disassociating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
def valid_connection(self, guid):
- """Check if the connection is available.
+ """Checks whether a connection with the other RM
+ is valid.
+ This method need to be redefined by each new Resource Manager.
:param guid: Guid of the current Resource Manager
:type guid: int
"""
# TODO: Validate!
return True
+
+ def set_started(self):
+ """ Mark ResourceManager as STARTED """
+ self._start_time = tnow()
+ self._state = ResourceState.STARTED
+
+ def set_stopped(self):
+ """ Mark ResourceManager as STOPPED """
+ self._stop_time = tnow()
+ self._state = ResourceState.STOPPED
+
+ def set_ready(self):
+ """ Mark ResourceManager as READY """
+ self._ready_time = tnow()
+ self._state = ResourceState.READY
+
+ def set_released(self):
+ """ Mark ResourceManager as REALEASED """
+ self._release_time = tnow()
+ self._state = ResourceState.RELEASED
+
+ def set_finished(self):
+ """ Mark ResourceManager as FINISHED """
+ self._finish_time = tnow()
+ self._state = ResourceState.FINISHED
+
+ def set_failed(self):
+ """ Mark ResourceManager as FAILED """
+ self._failed_time = tnow()
+ self._state = ResourceState.FAILED
+
+ def set_discovered(self):
+ """ Mark ResourceManager as DISCOVERED """
+ self._discover_time = tnow()
+ self._state = ResourceState.DISCOVERED
+
+ def set_provisioned(self):
+ """ Mark ResourceManager as PROVISIONED """
+ self._provision_time = tnow()
+ self._state = ResourceState.PROVISIONED
class ResourceFactory(object):
_resource_types = dict()
@classmethod
def resource_types(cls):
+ """Return the type of the Class"""
return cls._resource_types
+ @classmethod
+ def get_resource_type(cls, rtype):
+ """Return the type of the Class"""
+ return cls._resource_types.get(rtype)
+
@classmethod
def register_type(cls, rclass):
+ """Register a new Ressource Manager"""
cls._resource_types[rclass.rtype()] = rclass
@classmethod
def create(cls, rtype, ec, guid):
+ """Create a new instance of a Ressource Manager"""
rclass = cls._resource_types[rtype]
return rclass(ec, guid)
def populate_factory():
- for rclass in find_types():
- ResourceFactory.register_type(rclass)
+ """Register all the possible RM that exists in the current version of Nepi.
+ """
+ # Once the factory is populated, don't repopulate
+ if not ResourceFactory.resource_types():
+ for rclass in find_types():
+ ResourceFactory.register_type(rclass)
def find_types():
+ """Look into the different folders to find all the
+ availables Resources Managers
+ """
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))
+ import inspect
import nepi.resources
path = os.path.dirname(nepi.resources.__file__)
search_path.add(path)
types = []
- for importer, modname, ispkg in pkgutil.walk_packages(search_path):
+ for importer, modname, ispkg in pkgutil.walk_packages(search_path,
+ prefix = "nepi.resources."):
+
loader = importer.find_module(modname)
+
try:
- module = loader.load_module(loader.fullname)
+ # Notice: Repeated calls to load_module will act as a reload of teh module
+ if modname in sys.modules:
+ module = sys.modules.get(modname)
+ else:
+ module = loader.load_module(modname)
+
for attrname in dir(module):
if attrname.startswith("_"):
continue
if issubclass(attr, ResourceManager):
types.append(attr)
+
+ if not modname in sys.modules:
+ sys.modules[modname] = module
+
except:
import traceback
+ import logging
err = traceback.format_exc()
logger = logging.getLogger("Resource.find_types()")
- logger.error("Error while lading Resource Managers %s" % err)
+ logger.error("Error while loading Resource Managers %s" % err)
return types