#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
from nepi.util.logger import Logger
from nepi.execution.trace import TraceAttr
import pkgutil
import weakref
-reschedule_delay = "0.5s"
+reschedule_delay = "1s"
class ResourceAction:
""" Action that a user can order to a Resource Manager
self._provision_time = None
self._ready_time = None
self._release_time = None
+ self._finish_time = None
+ self._failed_time = None
@property
def guid(self):
- """ Returns the guid of the current RM """
+ """ Returns the global unique identifier of the RM """
return self._guid
@property
@property
def connections(self):
- """ Returns the set of connection for this RM"""
+ """ Returns the set of guids of connected RMs"""
return self._connections
@property
def conditions(self):
- """ Returns the list of conditions for this RM
- The list is a dictionary with for each action, a list of tuple
- describing the conditions. """
+ """ Returns the conditions to which the RM is subjected to.
+
+ The object returned by this method is a dictionary indexed by
+ ResourceAction."""
return self._conditions
@property
def start_time(self):
- """ Returns timestamp with the time the RM started """
+ """ Returns the start time of the RM as a timestamp"""
return self._start_time
@property
def stop_time(self):
- """ Returns timestamp with the time the RM stopped """
+ """ Returns the stop time of the RM as a timestamp"""
return self._stop_time
@property
def discover_time(self):
- """ Returns timestamp with the time the RM passed to state discovered """
+ """ Returns the time discovering was finished for the RM as a timestamp"""
return self._discover_time
@property
def provision_time(self):
- """ Returns timestamp with the time the RM passed to state provisioned """
+ """ Returns the time provisioning was finished for the RM as a timestamp"""
return self._provision_time
@property
def ready_time(self):
- """ Returns timestamp with the time the RM passed to state ready """
+ """ Returns the time deployment was finished for the RM as a timestamp"""
return self._ready_time
@property
def release_time(self):
- """ Returns timestamp with the time the RM was released """
+ """ Returns the release time of the RM as a timestamp"""
return self._release_time
+ @property
+ def finish_time(self):
+ """ Returns the finalization time of the RM as a timestamp"""
+ return self._finish_time
+
+ @property
+ def failed_time(self):
+ """ Returns the time failure occured for the RM as a timestamp"""
+ return self._failed_time
+
@property
def state(self):
""" Get the state of the current RM """
return self._state
def log_message(self, msg):
- """ Improve debugging message by adding more information
- as the guid and the type of the RM
+ """ Returns the log message formatted with added information.
- :param msg: Message to log
+ :param msg: text message
:type msg: str
:rtype: str
"""
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
- def connect(self, guid):
- """ Connect the current RM with the RM 'guid'
+ def register_connection(self, guid):
+ """ Registers a connection to the RM identified by guid
- :param guid: Guid of the RM the current RM will be connected
+ :param guid: Global unique identified of the RM to connect to
:type guid: int
"""
if self.valid_connection(guid):
+ self.connect(guid)
self._connections.add(guid)
+ def unregister_connection(self, guid):
+ """ Removes a registered connection to the RM identified by guid
+
+ :param guid: Global unique identified of the RM to connect to
+ :type guid: int
+ """
+ if guid in self._connections:
+ self.disconnect(guid)
+ self._connections.remove(guid)
+
def discover(self):
- """ Discover the Resource. As it is specific for each RM,
- this method take the time when the RM become DISCOVERED and
- change the status """
- self._discover_time = strfnow()
+ """ Performs resource discovery.
+
+ This method is resposible for selecting an individual resource
+ matching user requirements.
+ This method should be redefined when necessary in child classes.
+ """
+ self._discover_time = tnow()
self._state = ResourceState.DISCOVERED
def provision(self):
- """ Provision the Resource. As it is specific for each RM,
- this method take the time when the RM become PROVISIONNED and
- change the status """
- self._provision_time = strfnow()
+ """ Performs resource provisioning.
+
+ This method is resposible for provisioning one resource.
+ After this method has been successfully invoked, the resource
+ should be acccesible/controllable by the RM.
+ This method should be redefined when necessary in child classes.
+ """
+ self._provision_time = tnow()
self._state = ResourceState.PROVISIONED
def start(self):
- """ Start the Resource Manager. As it is specific to each RM, this methods
- just change, after some verifications, the status to STARTED and save the time.
-
+ """ Starts the resource.
+
+ There is no generic start behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
self.error("Wrong state %s for start" % self.state)
return
- self._start_time = strfnow()
+ self._start_time = tnow()
self._state = ResourceState.STARTED
def stop(self):
- """ Stop the Resource Manager. As it is specific to each RM, this methods
- just change, after some verifications, the status to STOPPED and save the time.
-
+ """ Stops the resource.
+
+ There is no generic stop behavior for all resources.
+ This method should be redefined when necessary in child classes.
"""
if not self._state in [ResourceState.STARTED]:
self.error("Wrong state %s for stop" % self.state)
return
- self._stop_time = strfnow()
+ self._stop_time = tnow()
self._state = ResourceState.STOPPED
def set(self, name, value):
attr.value = value
def get(self, name):
- """ Start the Resource Manager
+ """ Returns the value of the attribute
:param name: Name of the attribute
:type name: str
return attr.value
def register_trace(self, name):
- """ Enable trace
+ """ Explicitly enable trace generation
:param name: Name of the trace
:type name: str
"""
pass
- def register_condition(self, action, group, state,
- time = None):
+ def register_condition(self, action, group, state, time = None):
""" Registers a condition on the resource manager to allow execution
of 'action' only after 'time' has elapsed from the moment all resources
in 'group' reached state 'state'
:type time: str
"""
+
+ if not action in self.conditions:
+ self._conditions[action] = list()
+
conditions = self.conditions.get(action)
- if not conditions:
- conditions = list()
- self._conditions[action] = conditions
# For each condition to register a tuple of (group, state, time) is
# added to the 'action' list
conditions.append((group, state, time))
- def get_connected(self, rtype):
- """ Return the list of RM with the type 'rtype'
+ def unregister_condition(self, group, action = None):
+ """ Removed conditions for a certain group of guids
+
+ :param action: Action to restrict to condition (either 'START' or 'STOP')
+ :type action: str
+
+ :param group: Group of RMs to wait for (list of guids)
+ :type group: int or list of int
+
+ """
+ # For each condition a tuple of (group, state, time) is
+ # added to the 'action' list
+ if not isinstance(group, list):
+ group = [group]
+
+ for act, conditions in self.conditions.iteritems():
+ if action and act != action:
+ continue
+
+ for condition in list(conditions):
+ (grp, state, time) = condition
+
+ # If there is an intersection between grp and group,
+ # then remove intersected elements
+ intsec = set(group).intersection(set(grp))
+ if intsec:
+ idx = conditions.index(condition)
+ newgrp = set(grp)
+ newgrp.difference_update(intsec)
+ conditions[idx] = (newgrp, state, time)
+
+ def get_connected(self, rtype = None):
+ """ Returns the list of RM with the type 'rtype'
:param rtype: Type of the RM we look for
:type rtype: str
- :return : list of guid
+ :return: list of guid
"""
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
- if rm.rtype() == rtype:
+ if not rtype or rm.rtype() == rtype:
connected.append(rm)
return connected
for guid in group:
rm = self.ec.get_resource(guid)
# If the RM state is lower than the requested state we must
- # reschedule (e.g. if RM is READY but we required STARTED)
+ # reschedule (e.g. if RM is READY but we required STARTED).
if rm.state < state:
reschedule = True
break
# Only keep time information for START and STOP
break
- d = strfdiff(strfnow(), t)
- wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
+ # time already elapsed since RM changed state
+ waited = "%fs" % tdiffsec(tnow(), t)
+
+ # time still to wait
+ wait = tdiffsec(stabsformat(time), stabsformat(waited))
+
if wait > 0.001:
reschedule = True
delay = "%fs" % wait
break
+
return reschedule, delay
def set_with_conditions(self, name, value, group, state, time):
""" Set value 'value' on attribute with name 'name' when 'time'
- has elapsed since all elements in 'group' have reached state
- 'state'
+ has elapsed since all elements in 'group' have reached state
+ 'state'
:param name: Name of the attribute to set
:type name: str
:type state: str
:param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
:type time: str
-
"""
reschedule = False
return
self.debug("----- READY ---- ")
- self._ready_time = strfnow()
+ self._ready_time = tnow()
self._state = ResourceState.READY
def release(self):
- """Clean the resource at the end of the Experiment and change the status
+ """Release any resources used by this RM
"""
- self._release_time = strfnow()
+ self._release_time = tnow()
self._state = ResourceState.RELEASED
+ def finish(self):
+ """ Mark ResourceManager as FINISHED
+
+ """
+ self._finish_time = tnow()
+ self._state = ResourceState.FINISHED
+
+ def fail(self):
+ """ Mark ResourceManager as FAILED
+
+ """
+ self._failed_time = tnow()
+ self._state = ResourceState.FAILED
+
+ def connect(self, guid):
+ """ Performs actions that need to be taken upon associating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
+ def disconnect(self, guid):
+ """ Performs actions that need to be taken upon disassociating RMs.
+ This method should be redefined when necessary in child classes.
+ """
+ pass
+
def valid_connection(self, guid):
- """Check if the connection is available. This method need to be
- redefined by each new Resource Manager.
+ """Checks whether a connection with the other RM
+ is valid.
+ This method need to be redefined by each new Resource Manager.
:param guid: Guid of the current Resource Manager
:type guid: int
return rclass(ec, guid)
def populate_factory():
- """Register all the possible RM that exists in the current version of Nepi.
-
- """
+ """Register all the possible RM that exists in the current version of Nepi.
+ """
for rclass in find_types():
ResourceFactory.register_type(rclass)
def find_types():
- """Look into the different folders to find all the
- availables Resources Managers
+ """Look into the different folders to find all the
+ availables Resources Managers
- """
+ """
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))