2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "1s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 # Each resource is placed on a deployment group by the EC
205 self.deployment_group = None
207 self._start_time = None
208 self._stop_time = None
209 self._discover_time = None
210 self._provision_time = None
211 self._ready_time = None
212 self._release_time = None
213 self._finish_time = None
214 self._failed_time = None
216 self._state = ResourceState.NEW
220 """ Returns the global unique identifier of the RM """
225 """ Returns the Experiment Controller """
229 def connections(self):
230 """ Returns the set of guids of connected RMs"""
231 return self._connections
234 def conditions(self):
235 """ Returns the conditions to which the RM is subjected to.
237 The object returned by this method is a dictionary indexed by
239 return self._conditions
242 def start_time(self):
243 """ Returns the start time of the RM as a timestamp"""
244 return self._start_time
248 """ Returns the stop time of the RM as a timestamp"""
249 return self._stop_time
252 def discover_time(self):
253 """ Returns the time discovering was finished for the RM as a timestamp"""
254 return self._discover_time
257 def provision_time(self):
258 """ Returns the time provisioning was finished for the RM as a timestamp"""
259 return self._provision_time
262 def ready_time(self):
263 """ Returns the time deployment was finished for the RM as a timestamp"""
264 return self._ready_time
267 def release_time(self):
268 """ Returns the release time of the RM as a timestamp"""
269 return self._release_time
272 def finish_time(self):
273 """ Returns the finalization time of the RM as a timestamp"""
274 return self._finish_time
277 def failed_time(self):
278 """ Returns the time failure occured for the RM as a timestamp"""
279 return self._failed_time
283 """ Get the state of the current RM """
286 def log_message(self, msg):
287 """ Returns the log message formatted with added information.
289 :param msg: text message
293 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
295 def register_connection(self, guid):
296 """ Registers a connection to the RM identified by guid
298 :param guid: Global unique identified of the RM to connect to
301 if self.valid_connection(guid):
303 self._connections.add(guid)
305 def unregister_connection(self, guid):
306 """ Removes a registered connection to the RM identified by guid
308 :param guid: Global unique identified of the RM to connect to
311 if guid in self._connections:
312 self.disconnect(guid)
313 self._connections.remove(guid)
316 """ Performs resource discovery.
318 This method is resposible for selecting an individual resource
319 matching user requirements.
320 This method should be redefined when necessary in child classes.
322 self.set_discovered()
325 """ Performs resource provisioning.
327 This method is resposible for provisioning one resource.
328 After this method has been successfully invoked, the resource
329 should be acccesible/controllable by the RM.
330 This method should be redefined when necessary in child classes.
332 self.set_provisioned()
335 """ Starts the resource.
337 There is no generic start behavior for all resources.
338 This method should be redefined when necessary in child classes.
340 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
341 self.error("Wrong state %s for start" % self.state)
347 """ Stops the resource.
349 There is no generic stop behavior for all resources.
350 This method should be redefined when necessary in child classes.
352 if not self.state in [ResourceState.STARTED]:
353 self.error("Wrong state %s for stop" % self.state)
359 """ Execute all steps required for the RM to reach the state READY
362 if self.state > ResourceState.READY:
363 self.error("Wrong state %s for deploy" % self.state)
366 self.debug("----- READY ---- ")
378 def set(self, name, value):
379 """ Set the value of the attribute
381 :param name: Name of the attribute
383 :param name: Value of the attribute
386 attr = self._attrs[name]
390 """ Returns the value of the attribute
392 :param name: Name of the attribute
396 attr = self._attrs[name]
399 def enable_trace(self, name):
400 """ Explicitly enable trace generation
402 :param name: Name of the trace
405 trace = self._trcs[name]
408 def trace_enabled(self, name):
409 """Returns True if trace is enables
411 :param name: Name of the trace
414 trace = self._trcs[name]
417 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
418 """ Get information on collected trace
420 :param name: Name of the trace
423 :param attr: Can be one of:
424 - TraceAttr.ALL (complete trace content),
425 - TraceAttr.STREAM (block in bytes to read starting at offset),
426 - TraceAttr.PATH (full path to the trace file),
427 - TraceAttr.SIZE (size of trace file).
430 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
433 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
440 def register_condition(self, action, group, state, time = None):
441 """ Registers a condition on the resource manager to allow execution
442 of 'action' only after 'time' has elapsed from the moment all resources
443 in 'group' reached state 'state'
445 :param action: Action to restrict to condition (either 'START' or 'STOP')
447 :param group: Group of RMs to wait for (list of guids)
448 :type group: int or list of int
449 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
451 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
456 if not action in self.conditions:
457 self._conditions[action] = list()
459 conditions = self.conditions.get(action)
461 # For each condition to register a tuple of (group, state, time) is
462 # added to the 'action' list
463 if not isinstance(group, list):
466 conditions.append((group, state, time))
468 def unregister_condition(self, group, action = None):
469 """ Removed conditions for a certain group of guids
471 :param action: Action to restrict to condition (either 'START' or 'STOP')
474 :param group: Group of RMs to wait for (list of guids)
475 :type group: int or list of int
478 # For each condition a tuple of (group, state, time) is
479 # added to the 'action' list
480 if not isinstance(group, list):
483 for act, conditions in self.conditions.iteritems():
484 if action and act != action:
487 for condition in list(conditions):
488 (grp, state, time) = condition
490 # If there is an intersection between grp and group,
491 # then remove intersected elements
492 intsec = set(group).intersection(set(grp))
494 idx = conditions.index(condition)
496 newgrp.difference_update(intsec)
497 conditions[idx] = (newgrp, state, time)
499 def get_connected(self, rtype = None):
500 """ Returns the list of RM with the type 'rtype'
502 :param rtype: Type of the RM we look for
504 :return: list of guid
507 rclass = ResourceFactory.get_resource_type(rtype)
508 for guid in self.connections:
509 rm = self.ec.get_resource(guid)
510 if not rtype or isinstance(rm, rclass):
514 def _needs_reschedule(self, group, state, time):
515 """ Internal method that verify if 'time' has elapsed since
516 all elements in 'group' have reached state 'state'.
518 :param group: Group of RMs to wait for (list of guids)
519 :type group: int or list of int
520 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
522 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
525 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
526 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
527 For the moment, 2m30s is not a correct syntax.
531 delay = reschedule_delay
533 # check state and time elapsed on all RMs
535 rm = self.ec.get_resource(guid)
536 # If the RM state is lower than the requested state we must
537 # reschedule (e.g. if RM is READY but we required STARTED).
542 # If there is a time restriction, we must verify the
543 # restriction is satisfied
545 if state == ResourceState.DISCOVERED:
547 if state == ResourceState.PROVISIONED:
548 t = rm.provision_time
549 elif state == ResourceState.READY:
551 elif state == ResourceState.STARTED:
553 elif state == ResourceState.STOPPED:
556 # Only keep time information for START and STOP
559 # time already elapsed since RM changed state
560 waited = "%fs" % tdiffsec(tnow(), t)
563 wait = tdiffsec(stabsformat(time), stabsformat(waited))
570 return reschedule, delay
572 def set_with_conditions(self, name, value, group, state, time):
573 """ Set value 'value' on attribute with name 'name' when 'time'
574 has elapsed since all elements in 'group' have reached state
577 :param name: Name of the attribute to set
579 :param name: Value of the attribute to set
581 :param group: Group of RMs to wait for (list of guids)
582 :type group: int or list of int
583 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
585 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
590 delay = reschedule_delay
592 ## evaluate if set conditions are met
594 # only can set with conditions after the RM is started
595 if self.state != ResourceState.STARTED:
598 reschedule, delay = self._needs_reschedule(group, state, time)
601 callback = functools.partial(self.set_with_conditions,
602 name, value, group, state, time)
603 self.ec.schedule(delay, callback)
605 self.set(name, value)
607 def start_with_conditions(self):
608 """ Starts RM when all the conditions in self.conditions for
609 action 'START' are satisfied.
613 delay = reschedule_delay
615 ## evaluate if set conditions are met
617 # only can start when RM is either STOPPED or READY
618 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
620 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
622 start_conditions = self.conditions.get(ResourceAction.START, [])
624 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
626 # Verify all start conditions are met
627 for (group, state, time) in start_conditions:
628 # Uncomment for debug
631 # rm = self.ec.get_resource(guid)
632 # unmet.append((guid, rm._state))
634 #self.debug("---- WAITED STATES ---- %s" % unmet )
636 reschedule, delay = self._needs_reschedule(group, state, time)
641 self.ec.schedule(delay, self.start_with_conditions)
643 self.debug("----- STARTING ---- ")
646 def stop_with_conditions(self):
647 """ Stops RM when all the conditions in self.conditions for
648 action 'STOP' are satisfied.
652 delay = reschedule_delay
654 ## evaluate if set conditions are met
656 # only can stop when RM is STARTED
657 if self.state != ResourceState.STARTED:
660 self.debug(" ---- STOP CONDITIONS ---- %s" %
661 self.conditions.get(ResourceAction.STOP))
663 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
664 for (group, state, time) in stop_conditions:
665 reschedule, delay = self._needs_reschedule(group, state, time)
670 callback = functools.partial(self.stop_with_conditions)
671 self.ec.schedule(delay, callback)
673 self.debug(" ----- STOPPING ---- ")
676 def connect(self, guid):
677 """ Performs actions that need to be taken upon associating RMs.
678 This method should be redefined when necessary in child classes.
682 def disconnect(self, guid):
683 """ Performs actions that need to be taken upon disassociating RMs.
684 This method should be redefined when necessary in child classes.
688 def valid_connection(self, guid):
689 """Checks whether a connection with the other RM
691 This method need to be redefined by each new Resource Manager.
693 :param guid: Guid of the current Resource Manager
701 def set_started(self):
702 """ Mark ResourceManager as STARTED """
703 self._start_time = tnow()
704 self._state = ResourceState.STARTED
706 def set_stopped(self):
707 """ Mark ResourceManager as STOPPED """
708 self._stop_time = tnow()
709 self._state = ResourceState.STOPPED
712 """ Mark ResourceManager as READY """
713 self._ready_time = tnow()
714 self._state = ResourceState.READY
716 def set_released(self):
717 """ Mark ResourceManager as REALEASED """
718 self._release_time = tnow()
719 self._state = ResourceState.RELEASED
721 def set_finished(self):
722 """ Mark ResourceManager as FINISHED """
723 self._finish_time = tnow()
724 self._state = ResourceState.FINISHED
726 def set_failed(self):
727 """ Mark ResourceManager as FAILED """
728 self._failed_time = tnow()
729 self._state = ResourceState.FAILED
731 def set_discovered(self):
732 """ Mark ResourceManager as DISCOVERED """
733 self._discover_time = tnow()
734 self._state = ResourceState.DISCOVERED
736 def set_provisioned(self):
737 """ Mark ResourceManager as PROVISIONED """
738 self._provision_time = tnow()
739 self._state = ResourceState.PROVISIONED
741 class ResourceFactory(object):
742 _resource_types = dict()
745 def resource_types(cls):
746 """Return the type of the Class"""
747 return cls._resource_types
750 def get_resource_type(cls, rtype):
751 """Return the type of the Class"""
752 return cls._resource_types.get(rtype)
755 def register_type(cls, rclass):
756 """Register a new Ressource Manager"""
757 cls._resource_types[rclass.rtype()] = rclass
760 def create(cls, rtype, ec, guid):
761 """Create a new instance of a Ressource Manager"""
762 rclass = cls._resource_types[rtype]
763 return rclass(ec, guid)
765 def populate_factory():
766 """Register all the possible RM that exists in the current version of Nepi.
768 # Once the factory is populated, don't repopulate
769 if not ResourceFactory.resource_types():
770 for rclass in find_types():
771 ResourceFactory.register_type(rclass)
774 """Look into the different folders to find all the
775 availables Resources Managers
777 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
778 search_path = set(search_path.split(" "))
781 import nepi.resources
782 path = os.path.dirname(nepi.resources.__file__)
783 search_path.add(path)
787 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
788 prefix = "nepi.resources."):
790 loader = importer.find_module(modname)
793 # Notice: Repeated calls to load_module will act as a reload of teh module
794 module = loader.load_module(modname)
796 for attrname in dir(module):
797 if attrname.startswith("_"):
800 attr = getattr(module, attrname)
802 if attr == ResourceManager:
805 if not inspect.isclass(attr):
808 if issubclass(attr, ResourceManager):
813 err = traceback.format_exc()
814 logger = logging.getLogger("Resource.find_types()")
815 logger.error("Error while loading Resource Managers %s" % err)