#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
from nepi.execution.trace import TraceAttr
import copy
import functools
import logging
import os
import pkgutil
import sys
import weakref
reschedule_delay = "1s"
class ResourceAction:
""" Action that a user can order to a Resource Manager
"""
DEPLOY = 0
START = 1
STOP = 2
class ResourceState:
""" State of a Resource Manager
"""
NEW = 0
DISCOVERED = 1
PROVISIONED = 2
READY = 3
STARTED = 4
STOPPED = 5
FINISHED = 6
FAILED = 7
RELEASED = 8
ResourceState2str = dict({
ResourceState.NEW : "NEW",
ResourceState.DISCOVERED : "DISCOVERED",
ResourceState.PROVISIONED : "PROVISIONED",
ResourceState.READY : "READY",
ResourceState.STARTED : "STARTED",
ResourceState.STOPPED : "STOPPED",
ResourceState.FINISHED : "FINISHED",
ResourceState.FAILED : "FAILED",
ResourceState.RELEASED : "RELEASED",
})
def clsinit(cls):
""" Initializes template information (i.e. attributes and traces)
for the ResourceManager class
"""
cls._clsinit()
return cls
def clsinit_copy(cls):
""" Initializes template information (i.e. attributes and traces)
for the ResourceManager class, inheriting attributes and traces
from the parent class
"""
cls._clsinit_copy()
return cls
# Decorator to invoke class initialization method
@clsinit
class ResourceManager(Logger):
_rtype = "Resource"
_attributes = None
_traces = None
@classmethod
def _register_attribute(cls, attr):
""" Resource subclasses will invoke this method to add a
resource attribute
"""
cls._attributes[attr.name] = attr
@classmethod
def _remove_attribute(cls, name):
""" Resource subclasses will invoke this method to remove a
resource attribute
"""
del cls._attributes[name]
@classmethod
def _register_trace(cls, trace):
""" Resource subclasses will invoke this method to add a
resource trace
"""
cls._traces[trace.name] = trace
@classmethod
def _remove_trace(cls, name):
""" Resource subclasses will invoke this method to remove a
resource trace
"""
del cls._traces[name]
@classmethod
def _register_attributes(cls):
""" Resource subclasses will invoke this method to register
resource attributes
"""
pass
@classmethod
def _register_traces(cls):
""" Resource subclasses will invoke this method to register
resource traces
"""
pass
@classmethod
def _clsinit(cls):
""" ResourceManager child classes have different attributes and traces.
Since the templates that hold the information of attributes and traces
are 'class attribute' dictionaries, initially they all point to the
parent class ResourceManager instances of those dictionaries.
In order to make these templates independent from the parent's one,
it is necessary re-initialize the corresponding dictionaries.
This is the objective of the _clsinit method
"""
# static template for resource attributes
cls._attributes = dict()
cls._register_attributes()
# static template for resource traces
cls._traces = dict()
cls._register_traces()
@classmethod
def _clsinit_copy(cls):
""" Same as _clsinit, except that it also inherits all attributes and traces
from the parent class.
"""
# static template for resource attributes
cls._attributes = copy.deepcopy(cls._attributes)
cls._register_attributes()
# static template for resource traces
cls._traces = copy.deepcopy(cls._traces)
cls._register_traces()
@classmethod
def rtype(cls):
""" Returns the type of the Resource Manager
"""
return cls._rtype
@classmethod
def get_attributes(cls):
""" Returns a copy of the attributes
"""
return copy.deepcopy(cls._attributes.values())
@classmethod
def get_traces(cls):
""" Returns a copy of the traces
"""
return copy.deepcopy(cls._traces.values())
def __init__(self, ec, guid):
super(ResourceManager, self).__init__(self.rtype())
self._guid = guid
self._ec = weakref.ref(ec)
self._connections = set()
self._conditions = dict()
# the resource instance gets a copy of all attributes
self._attrs = copy.deepcopy(self._attributes)
# the resource instance gets a copy of all traces
self._trcs = copy.deepcopy(self._traces)
# Each resource is placed on a deployment group by the EC
# during deployment
self.deployment_group = None
self._start_time = None
self._stop_time = None
self._discover_time = None
self._provision_time = None
self._ready_time = None
self._release_time = None
self._finish_time = None
self._failed_time = None
self._state = ResourceState.NEW
@property
def guid(self):
""" Returns the global unique identifier of the RM """
return self._guid
@property
def ec(self):
""" Returns the Experiment Controller """
return self._ec()
@property
def connections(self):
""" Returns the set of guids of connected RMs"""
return self._connections
@property
def conditions(self):
""" Returns the conditions to which the RM is subjected to.
The object returned by this method is a dictionary indexed by
ResourceAction."""
return self._conditions
@property
def start_time(self):
""" Returns the start time of the RM as a timestamp"""
return self._start_time
@property
def stop_time(self):
""" Returns the stop time of the RM as a timestamp"""
return self._stop_time
@property
def discover_time(self):
""" Returns the time discovering was finished for the RM as a timestamp"""
return self._discover_time
@property
def provision_time(self):
""" Returns the time provisioning was finished for the RM as a timestamp"""
return self._provision_time
@property
def ready_time(self):
""" Returns the time deployment was finished for the RM as a timestamp"""
return self._ready_time
@property
def release_time(self):
""" Returns the release time of the RM as a timestamp"""
return self._release_time
@property
def finish_time(self):
""" Returns the finalization time of the RM as a timestamp"""
return self._finish_time
@property
def failed_time(self):
""" Returns the time failure occured for the RM as a timestamp"""
return self._failed_time
@property
def state(self):
""" Get the state of the current RM """
return self._state
def log_message(self, msg):
""" Returns the log message formatted with added information.
:param msg: text message
:type msg: str
:rtype: str
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
def register_connection(self, guid):
""" Registers a connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if self.valid_connection(guid):
self.connect(guid)
self._connections.add(guid)
def unregister_connection(self, guid):
""" Removes a registered connection to the RM identified by guid
:param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if guid in self._connections:
self.disconnect(guid)
self._connections.remove(guid)
def discover(self):
""" Performs resource discovery.
This method is resposible for selecting an individual resource
matching user requirements.
This method should be redefined when necessary in child classes.
"""
self.set_discovered()
def provision(self):
""" Performs resource provisioning.
This method is resposible for provisioning one resource.
After this method has been successfully invoked, the resource
should be acccesible/controllable by the RM.
This method should be redefined when necessary in child classes.
"""
self.set_provisioned()
def start(self):
""" Starts the resource.
There is no generic start behavior for all resources.
This method should be redefined when necessary in child classes.
"""
if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
self.set_started()
def stop(self):
""" Stops the resource.
There is no generic stop behavior for all resources.
This method should be redefined when necessary in child classes.
"""
if not self.state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
self.set_stopped()
def deploy(self):
""" Execute all steps required for the RM to reach the state READY
"""
if self.state > ResourceState.READY:
self.error("Wrong state %s for deploy" % self.state)
return
self.debug("----- READY ---- ")
self.set_ready()
def release(self):
self.set_released()
def finish(self):
self.set_finished()
def fail(self):
self.set_failed()
def set(self, name, value):
""" Set the value of the attribute
:param name: Name of the attribute
:type name: str
:param name: Value of the attribute
:type name: str
"""
attr = self._attrs[name]
attr.value = value
def get(self, name):
""" Returns the value of the attribute
:param name: Name of the attribute
:type name: str
:rtype: str
"""
attr = self._attrs[name]
return attr.value
def enable_trace(self, name):
""" Explicitly enable trace generation
:param name: Name of the trace
:type name: str
"""
trace = self._trcs[name]
trace.enabled = True
def trace_enabled(self, name):
"""Returns True if trace is enables
:param name: Name of the trace
:type name: str
"""
trace = self._trcs[name]
return trace.enabled
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
""" Get information on collected trace
:param name: Name of the trace
:type name: str
:param attr: Can be one of:
- TraceAttr.ALL (complete trace content),
- TraceAttr.STREAM (block in bytes to read starting at offset),
- TraceAttr.PATH (full path to the trace file),
- TraceAttr.SIZE (size of trace file).
:type attr: str
:param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
:type name: int
:param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
:type name: int
:rtype: str
"""
pass
def register_condition(self, action, group, state, time = None):
""" Registers a condition on the resource manager to allow execution
of 'action' only after 'time' has elapsed from the moment all resources
in 'group' reached state 'state'
:param action: Action to restrict to condition (either 'START' or 'STOP')
:type action: str
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
:param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
"""
if not action in self.conditions:
self._conditions[action] = list()
conditions = self.conditions.get(action)
# For each condition to register a tuple of (group, state, time) is
# added to the 'action' list
if not isinstance(group, list):
group = [group]
conditions.append((group, state, time))
def unregister_condition(self, group, action = None):
""" Removed conditions for a certain group of guids
:param action: Action to restrict to condition (either 'START' or 'STOP')
:type action: str
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
"""
# For each condition a tuple of (group, state, time) is
# added to the 'action' list
if not isinstance(group, list):
group = [group]
for act, conditions in self.conditions.iteritems():
if action and act != action:
continue
for condition in list(conditions):
(grp, state, time) = condition
# If there is an intersection between grp and group,
# then remove intersected elements
intsec = set(group).intersection(set(grp))
if intsec:
idx = conditions.index(condition)
newgrp = set(grp)
newgrp.difference_update(intsec)
conditions[idx] = (newgrp, state, time)
def get_connected(self, rtype = None):
""" Returns the list of RM with the type 'rtype'
:param rtype: Type of the RM we look for
:type rtype: str
:return: list of guid
"""
connected = []
rclass = ResourceFactory.get_resource_type(rtype)
for guid in self.connections:
rm = self.ec.get_resource(guid)
if not rtype or isinstance(rm, rclass):
connected.append(rm)
return connected
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
all elements in 'group' have reached state 'state'.
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
:param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
.. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
For the moment, 2m30s is not a correct syntax.
"""
reschedule = False
delay = reschedule_delay
# check state and time elapsed on all RMs
for guid in group:
rm = self.ec.get_resource(guid)
# If the RM state is lower than the requested state we must
# reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
reschedule = True
break
# If there is a time restriction, we must verify the
# restriction is satisfied
if time:
if state == ResourceState.DISCOVERED:
t = rm.discover_time
if state == ResourceState.PROVISIONED:
t = rm.provision_time
elif state == ResourceState.READY:
t = rm.ready_time
elif state == ResourceState.STARTED:
t = rm.start_time
elif state == ResourceState.STOPPED:
t = rm.stop_time
else:
# Only keep time information for START and STOP
break
# time already elapsed since RM changed state
waited = "%fs" % tdiffsec(tnow(), t)
# time still to wait
wait = tdiffsec(stabsformat(time), stabsformat(waited))
if wait > 0.001:
reschedule = True
delay = "%fs" % wait
break
return reschedule, delay
def set_with_conditions(self, name, value, group, state, time):
""" Set value 'value' on attribute with name 'name' when 'time'
has elapsed since all elements in 'group' have reached state
'state'
:param name: Name of the attribute to set
:type name: str
:param name: Value of the attribute to set
:type name: str
:param group: Group of RMs to wait for (list of guids)
:type group: int or list of int
:param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
"""
reschedule = False
delay = reschedule_delay
## evaluate if set conditions are met
# only can set with conditions after the RM is started
if self.state != ResourceState.STARTED:
reschedule = True
else:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
callback = functools.partial(self.set_with_conditions,
name, value, group, state, time)
self.ec.schedule(delay, callback)
else:
self.set(name, value)
def start_with_conditions(self):
""" Starts RM when all the conditions in self.conditions for
action 'START' are satisfied.
"""
reschedule = False
delay = reschedule_delay
## evaluate if set conditions are met
# only can start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
self.debug("---- RESCHEDULING START ---- state %s " % self.state )
else:
start_conditions = self.conditions.get(ResourceAction.START, [])
self.debug("---- START CONDITIONS ---- %s" % start_conditions)
# Verify all start conditions are met
for (group, state, time) in start_conditions:
# Uncomment for debug
#unmet = []
#for guid in group:
# rm = self.ec.get_resource(guid)
# unmet.append((guid, rm._state))
#
#self.debug("---- WAITED STATES ---- %s" % unmet )
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
if reschedule:
self.ec.schedule(delay, self.start_with_conditions)
else:
self.debug("----- STARTING ---- ")
self.start()
def stop_with_conditions(self):
""" Stops RM when all the conditions in self.conditions for
action 'STOP' are satisfied.
"""
reschedule = False
delay = reschedule_delay
## evaluate if set conditions are met
# only can stop when RM is STARTED
if self.state != ResourceState.STARTED:
reschedule = True
else:
self.debug(" ---- STOP CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.STOP))
stop_conditions = self.conditions.get(ResourceAction.STOP, [])
for (group, state, time) in stop_conditions:
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
if reschedule:
callback = functools.partial(self.stop_with_conditions)
self.ec.schedule(delay, callback)
else:
self.debug(" ----- STOPPING ---- ")
self.stop()
def connect(self, guid):
""" Performs actions that need to be taken upon associating RMs.
This method should be redefined when necessary in child classes.
"""
pass
def disconnect(self, guid):
""" Performs actions that need to be taken upon disassociating RMs.
This method should be redefined when necessary in child classes.
"""
pass
def valid_connection(self, guid):
"""Checks whether a connection with the other RM
is valid.
This method need to be redefined by each new Resource Manager.
:param guid: Guid of the current Resource Manager
:type guid: int
:rtype: Boolean
"""
# TODO: Validate!
return True
def set_started(self):
""" Mark ResourceManager as STARTED """
self._start_time = tnow()
self._state = ResourceState.STARTED
def set_stopped(self):
""" Mark ResourceManager as STOPPED """
self._stop_time = tnow()
self._state = ResourceState.STOPPED
def set_ready(self):
""" Mark ResourceManager as READY """
self._ready_time = tnow()
self._state = ResourceState.READY
def set_released(self):
""" Mark ResourceManager as REALEASED """
self._release_time = tnow()
self._state = ResourceState.RELEASED
def set_finished(self):
""" Mark ResourceManager as FINISHED """
self._finish_time = tnow()
self._state = ResourceState.FINISHED
def set_failed(self):
""" Mark ResourceManager as FAILED """
self._failed_time = tnow()
self._state = ResourceState.FAILED
def set_discovered(self):
""" Mark ResourceManager as DISCOVERED """
self._discover_time = tnow()
self._state = ResourceState.DISCOVERED
def set_provisioned(self):
""" Mark ResourceManager as PROVISIONED """
self._provision_time = tnow()
self._state = ResourceState.PROVISIONED
class ResourceFactory(object):
_resource_types = dict()
@classmethod
def resource_types(cls):
"""Return the type of the Class"""
return cls._resource_types
@classmethod
def get_resource_type(cls, rtype):
"""Return the type of the Class"""
return cls._resource_types.get(rtype)
@classmethod
def register_type(cls, rclass):
"""Register a new Ressource Manager"""
cls._resource_types[rclass.rtype()] = rclass
@classmethod
def create(cls, rtype, ec, guid):
"""Create a new instance of a Ressource Manager"""
rclass = cls._resource_types[rtype]
return rclass(ec, guid)
def populate_factory():
"""Register all the possible RM that exists in the current version of Nepi.
"""
# Once the factory is populated, don't repopulate
if not ResourceFactory.resource_types():
for rclass in find_types():
ResourceFactory.register_type(rclass)
def find_types():
"""Look into the different folders to find all the
availables Resources Managers
"""
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))
import inspect
import nepi.resources
path = os.path.dirname(nepi.resources.__file__)
search_path.add(path)
types = []
for importer, modname, ispkg in pkgutil.walk_packages(search_path,
prefix = "nepi.resources."):
loader = importer.find_module(modname)
try:
# Notice: Repeated calls to load_module will act as a reload of teh module
if modname in sys.modules:
module = sys.modules.get(modname)
else:
module = loader.load_module(modname)
for attrname in dir(module):
if attrname.startswith("_"):
continue
attr = getattr(module, attrname)
if attr == ResourceManager:
continue
if not inspect.isclass(attr):
continue
if issubclass(attr, ResourceManager):
types.append(attr)
if not modname in sys.modules:
sys.modules[modname] = module
except:
import traceback
import logging
err = traceback.format_exc()
logger = logging.getLogger("Resource.find_types()")
logger.error("Error while loading Resource Managers %s" % err)
return types