2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "1s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 self._state = ResourceState.NEW
205 self._start_time = None
206 self._stop_time = None
207 self._discover_time = None
208 self._provision_time = None
209 self._ready_time = None
210 self._release_time = None
211 self._finish_time = None
212 self._failed_time = None
216 """ Returns the global unique identifier of the RM """
221 """ Returns the Experiment Controller """
225 def connections(self):
226 """ Returns the set of guids of connected RMs"""
227 return self._connections
230 def conditions(self):
231 """ Returns the conditions to which the RM is subjected to.
233 The object returned by this method is a dictionary indexed by
235 return self._conditions
238 def start_time(self):
239 """ Returns the start time of the RM as a timestamp"""
240 return self._start_time
244 """ Returns the stop time of the RM as a timestamp"""
245 return self._stop_time
248 def discover_time(self):
249 """ Returns the time discovering was finished for the RM as a timestamp"""
250 return self._discover_time
253 def provision_time(self):
254 """ Returns the time provisioning was finished for the RM as a timestamp"""
255 return self._provision_time
258 def ready_time(self):
259 """ Returns the time deployment was finished for the RM as a timestamp"""
260 return self._ready_time
263 def release_time(self):
264 """ Returns the release time of the RM as a timestamp"""
265 return self._release_time
268 def finish_time(self):
269 """ Returns the finalization time of the RM as a timestamp"""
270 return self._finish_time
273 def failed_time(self):
274 """ Returns the time failure occured for the RM as a timestamp"""
275 return self._failed_time
279 """ Get the state of the current RM """
282 def log_message(self, msg):
283 """ Returns the log message formatted with added information.
285 :param msg: text message
289 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
291 def register_connection(self, guid):
292 """ Registers a connection to the RM identified by guid
294 :param guid: Global unique identified of the RM to connect to
297 if self.valid_connection(guid):
299 self._connections.add(guid)
301 def unregister_connection(self, guid):
302 """ Removes a registered connection to the RM identified by guid
304 :param guid: Global unique identified of the RM to connect to
307 if guid in self._connections:
308 self.disconnect(guid)
309 self._connections.remove(guid)
312 """ Performs resource discovery.
314 This method is resposible for selecting an individual resource
315 matching user requirements.
316 This method should be redefined when necessary in child classes.
318 self._discover_time = tnow()
319 self._state = ResourceState.DISCOVERED
322 """ Performs resource provisioning.
324 This method is resposible for provisioning one resource.
325 After this method has been successfully invoked, the resource
326 should be acccesible/controllable by the RM.
327 This method should be redefined when necessary in child classes.
329 self._provision_time = tnow()
330 self._state = ResourceState.PROVISIONED
333 """ Starts the resource.
335 There is no generic start behavior for all resources.
336 This method should be redefined when necessary in child classes.
338 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
339 self.error("Wrong state %s for start" % self.state)
342 self._start_time = tnow()
343 self._state = ResourceState.STARTED
346 """ Stops the resource.
348 There is no generic stop behavior for all resources.
349 This method should be redefined when necessary in child classes.
351 if not self._state in [ResourceState.STARTED]:
352 self.error("Wrong state %s for stop" % self.state)
355 self._stop_time = tnow()
356 self._state = ResourceState.STOPPED
358 def set(self, name, value):
359 """ Set the value of the attribute
361 :param name: Name of the attribute
363 :param name: Value of the attribute
366 attr = self._attrs[name]
370 """ Returns the value of the attribute
372 :param name: Name of the attribute
376 attr = self._attrs[name]
379 def enable_trace(self, name):
380 """ Explicitly enable trace generation
382 :param name: Name of the trace
385 trace = self._trcs[name]
388 def trace_enabled(self, name):
389 """Returns True if trace is enables
391 :param name: Name of the trace
394 trace = self._trcs[name]
397 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
398 """ Get information on collected trace
400 :param name: Name of the trace
403 :param attr: Can be one of:
404 - TraceAttr.ALL (complete trace content),
405 - TraceAttr.STREAM (block in bytes to read starting at offset),
406 - TraceAttr.PATH (full path to the trace file),
407 - TraceAttr.SIZE (size of trace file).
410 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
413 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
420 def register_condition(self, action, group, state, time = None):
421 """ Registers a condition on the resource manager to allow execution
422 of 'action' only after 'time' has elapsed from the moment all resources
423 in 'group' reached state 'state'
425 :param action: Action to restrict to condition (either 'START' or 'STOP')
427 :param group: Group of RMs to wait for (list of guids)
428 :type group: int or list of int
429 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
431 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
436 if not action in self.conditions:
437 self._conditions[action] = list()
439 conditions = self.conditions.get(action)
441 # For each condition to register a tuple of (group, state, time) is
442 # added to the 'action' list
443 if not isinstance(group, list):
446 conditions.append((group, state, time))
448 def unregister_condition(self, group, action = None):
449 """ Removed conditions for a certain group of guids
451 :param action: Action to restrict to condition (either 'START' or 'STOP')
454 :param group: Group of RMs to wait for (list of guids)
455 :type group: int or list of int
458 # For each condition a tuple of (group, state, time) is
459 # added to the 'action' list
460 if not isinstance(group, list):
463 for act, conditions in self.conditions.iteritems():
464 if action and act != action:
467 for condition in list(conditions):
468 (grp, state, time) = condition
470 # If there is an intersection between grp and group,
471 # then remove intersected elements
472 intsec = set(group).intersection(set(grp))
474 idx = conditions.index(condition)
476 newgrp.difference_update(intsec)
477 conditions[idx] = (newgrp, state, time)
479 def get_connected(self, rtype = None):
480 """ Returns the list of RM with the type 'rtype'
482 :param rtype: Type of the RM we look for
484 :return: list of guid
487 rclass = ResourceFactory.get_resource_type(rtype)
488 for guid in self.connections:
489 rm = self.ec.get_resource(guid)
490 if not rtype or isinstance(rm, rclass):
494 def _needs_reschedule(self, group, state, time):
495 """ Internal method that verify if 'time' has elapsed since
496 all elements in 'group' have reached state 'state'.
498 :param group: Group of RMs to wait for (list of guids)
499 :type group: int or list of int
500 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
502 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
505 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
506 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
507 For the moment, 2m30s is not a correct syntax.
511 delay = reschedule_delay
513 # check state and time elapsed on all RMs
515 rm = self.ec.get_resource(guid)
516 # If the RM state is lower than the requested state we must
517 # reschedule (e.g. if RM is READY but we required STARTED).
522 # If there is a time restriction, we must verify the
523 # restriction is satisfied
525 if state == ResourceState.DISCOVERED:
527 if state == ResourceState.PROVISIONED:
528 t = rm.provision_time
529 elif state == ResourceState.READY:
531 elif state == ResourceState.STARTED:
533 elif state == ResourceState.STOPPED:
536 # Only keep time information for START and STOP
539 # time already elapsed since RM changed state
540 waited = "%fs" % tdiffsec(tnow(), t)
543 wait = tdiffsec(stabsformat(time), stabsformat(waited))
550 return reschedule, delay
552 def set_with_conditions(self, name, value, group, state, time):
553 """ Set value 'value' on attribute with name 'name' when 'time'
554 has elapsed since all elements in 'group' have reached state
557 :param name: Name of the attribute to set
559 :param name: Value of the attribute to set
561 :param group: Group of RMs to wait for (list of guids)
562 :type group: int or list of int
563 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
565 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
570 delay = reschedule_delay
572 ## evaluate if set conditions are met
574 # only can set with conditions after the RM is started
575 if self.state != ResourceState.STARTED:
578 reschedule, delay = self._needs_reschedule(group, state, time)
581 callback = functools.partial(self.set_with_conditions,
582 name, value, group, state, time)
583 self.ec.schedule(delay, callback)
585 self.set(name, value)
587 def start_with_conditions(self):
588 """ Starts RM when all the conditions in self.conditions for
589 action 'START' are satisfied.
593 delay = reschedule_delay
595 ## evaluate if set conditions are met
597 # only can start when RM is either STOPPED or READY
598 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
600 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
602 start_conditions = self.conditions.get(ResourceAction.START, [])
604 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
606 # Verify all start conditions are met
607 for (group, state, time) in start_conditions:
608 # Uncomment for debug
611 # rm = self.ec.get_resource(guid)
612 # unmet.append((guid, rm._state))
614 #self.debug("---- WAITED STATES ---- %s" % unmet )
616 reschedule, delay = self._needs_reschedule(group, state, time)
621 self.ec.schedule(delay, self.start_with_conditions)
623 self.debug("----- STARTING ---- ")
626 def stop_with_conditions(self):
627 """ Stops RM when all the conditions in self.conditions for
628 action 'STOP' are satisfied.
632 delay = reschedule_delay
634 ## evaluate if set conditions are met
636 # only can stop when RM is STARTED
637 if self.state != ResourceState.STARTED:
640 self.debug(" ---- STOP CONDITIONS ---- %s" %
641 self.conditions.get(ResourceAction.STOP))
643 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
644 for (group, state, time) in stop_conditions:
645 reschedule, delay = self._needs_reschedule(group, state, time)
650 callback = functools.partial(self.stop_with_conditions)
651 self.ec.schedule(delay, callback)
653 self.debug(" ----- STOPPING ---- ")
657 """ Execute all steps required for the RM to reach the state READY
660 if self._state > ResourceState.READY:
661 self.error("Wrong state %s for deploy" % self.state)
664 self.debug("----- READY ---- ")
665 self._ready_time = tnow()
666 self._state = ResourceState.READY
669 """Release any resources used by this RM
672 self._release_time = tnow()
673 self._state = ResourceState.RELEASED
676 """ Mark ResourceManager as FINISHED
679 self._finish_time = tnow()
680 self._state = ResourceState.FINISHED
683 """ Mark ResourceManager as FAILED
686 self._failed_time = tnow()
687 self._state = ResourceState.FAILED
689 def connect(self, guid):
690 """ Performs actions that need to be taken upon associating RMs.
691 This method should be redefined when necessary in child classes.
695 def disconnect(self, guid):
696 """ Performs actions that need to be taken upon disassociating RMs.
697 This method should be redefined when necessary in child classes.
701 def valid_connection(self, guid):
702 """Checks whether a connection with the other RM
704 This method need to be redefined by each new Resource Manager.
706 :param guid: Guid of the current Resource Manager
714 class ResourceFactory(object):
715 _resource_types = dict()
718 def resource_types(cls):
719 """Return the type of the Class"""
720 return cls._resource_types
723 def get_resource_type(cls, rtype):
724 """Return the type of the Class"""
725 return cls._resource_types.get(rtype)
728 def register_type(cls, rclass):
729 """Register a new Ressource Manager"""
730 cls._resource_types[rclass.rtype()] = rclass
733 def create(cls, rtype, ec, guid):
734 """Create a new instance of a Ressource Manager"""
735 rclass = cls._resource_types[rtype]
736 return rclass(ec, guid)
738 def populate_factory():
739 """Register all the possible RM that exists in the current version of Nepi.
741 # Once the factory is populated, don't repopulate
742 if not ResourceFactory.resource_types():
743 for rclass in find_types():
744 ResourceFactory.register_type(rclass)
747 """Look into the different folders to find all the
748 availables Resources Managers
750 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
751 search_path = set(search_path.split(" "))
754 import nepi.resources
755 path = os.path.dirname(nepi.resources.__file__)
756 search_path.add(path)
760 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
761 prefix = "nepi.resources."):
763 loader = importer.find_module(modname)
766 # Notice: Repeated calls to load_module will act as a reload of teh module
767 module = loader.load_module(modname)
769 for attrname in dir(module):
770 if attrname.startswith("_"):
773 attr = getattr(module, attrname)
775 if attr == ResourceManager:
778 if not inspect.isclass(attr):
781 if issubclass(attr, ResourceManager):
786 err = traceback.format_exc()
787 logger = logging.getLogger("Resource.find_types()")
788 logger.error("Error while lading Resource Managers %s" % err)