2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
32 reschedule_delay = "1s"
35 """ Action that a user can order to a Resource Manager
43 """ State of a Resource Manager
56 ResourceState2str = dict({
57 ResourceState.NEW : "NEW",
58 ResourceState.DISCOVERED : "DISCOVERED",
59 ResourceState.PROVISIONED : "PROVISIONED",
60 ResourceState.READY : "READY",
61 ResourceState.STARTED : "STARTED",
62 ResourceState.STOPPED : "STOPPED",
63 ResourceState.FINISHED : "FINISHED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class
75 def clsinit_copy(cls):
76 """ Initializes template information (i.e. attributes and traces)
77 for the ResourceManager class, inheriting attributes and traces
83 # Decorator to invoke class initialization method
85 class ResourceManager(Logger):
91 def _register_attribute(cls, attr):
92 """ Resource subclasses will invoke this method to add a
96 cls._attributes[attr.name] = attr
99 def _remove_attribute(cls, name):
100 """ Resource subclasses will invoke this method to remove a
104 del cls._attributes[name]
107 def _register_trace(cls, trace):
108 """ Resource subclasses will invoke this method to add a
112 cls._traces[trace.name] = trace
115 def _remove_trace(cls, name):
116 """ Resource subclasses will invoke this method to remove a
120 del cls._traces[name]
123 def _register_attributes(cls):
124 """ Resource subclasses will invoke this method to register
131 def _register_traces(cls):
132 """ Resource subclasses will invoke this method to register
140 """ ResourceManager child classes have different attributes and traces.
141 Since the templates that hold the information of attributes and traces
142 are 'class attribute' dictionaries, initially they all point to the
143 parent class ResourceManager instances of those dictionaries.
144 In order to make these templates independent from the parent's one,
145 it is necessary re-initialize the corresponding dictionaries.
146 This is the objective of the _clsinit method
148 # static template for resource attributes
149 cls._attributes = dict()
150 cls._register_attributes()
152 # static template for resource traces
154 cls._register_traces()
157 def _clsinit_copy(cls):
158 """ Same as _clsinit, except that it also inherits all attributes and traces
159 from the parent class.
161 # static template for resource attributes
162 cls._attributes = copy.deepcopy(cls._attributes)
163 cls._register_attributes()
165 # static template for resource traces
166 cls._traces = copy.deepcopy(cls._traces)
167 cls._register_traces()
171 """ Returns the type of the Resource Manager
177 def get_attributes(cls):
178 """ Returns a copy of the attributes
181 return copy.deepcopy(cls._attributes.values())
185 """ Returns a copy of the traces
188 return copy.deepcopy(cls._traces.values())
190 def __init__(self, ec, guid):
191 super(ResourceManager, self).__init__(self.rtype())
194 self._ec = weakref.ref(ec)
195 self._connections = set()
196 self._conditions = dict()
198 # the resource instance gets a copy of all attributes
199 self._attrs = copy.deepcopy(self._attributes)
201 # the resource instance gets a copy of all traces
202 self._trcs = copy.deepcopy(self._traces)
204 # Each resource is placed on a deployment group by the EC
206 self.deployment_group = None
208 self._start_time = None
209 self._stop_time = None
210 self._discover_time = None
211 self._provision_time = None
212 self._ready_time = None
213 self._release_time = None
214 self._finish_time = None
215 self._failed_time = None
217 self._state = ResourceState.NEW
221 """ Returns the global unique identifier of the RM """
226 """ Returns the Experiment Controller """
230 def connections(self):
231 """ Returns the set of guids of connected RMs"""
232 return self._connections
235 def conditions(self):
236 """ Returns the conditions to which the RM is subjected to.
238 The object returned by this method is a dictionary indexed by
240 return self._conditions
243 def start_time(self):
244 """ Returns the start time of the RM as a timestamp"""
245 return self._start_time
249 """ Returns the stop time of the RM as a timestamp"""
250 return self._stop_time
253 def discover_time(self):
254 """ Returns the time discovering was finished for the RM as a timestamp"""
255 return self._discover_time
258 def provision_time(self):
259 """ Returns the time provisioning was finished for the RM as a timestamp"""
260 return self._provision_time
263 def ready_time(self):
264 """ Returns the time deployment was finished for the RM as a timestamp"""
265 return self._ready_time
268 def release_time(self):
269 """ Returns the release time of the RM as a timestamp"""
270 return self._release_time
273 def finish_time(self):
274 """ Returns the finalization time of the RM as a timestamp"""
275 return self._finish_time
278 def failed_time(self):
279 """ Returns the time failure occured for the RM as a timestamp"""
280 return self._failed_time
284 """ Get the state of the current RM """
287 def log_message(self, msg):
288 """ Returns the log message formatted with added information.
290 :param msg: text message
294 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
296 def register_connection(self, guid):
297 """ Registers a connection to the RM identified by guid
299 :param guid: Global unique identified of the RM to connect to
302 if self.valid_connection(guid):
304 self._connections.add(guid)
306 def unregister_connection(self, guid):
307 """ Removes a registered connection to the RM identified by guid
309 :param guid: Global unique identified of the RM to connect to
312 if guid in self._connections:
313 self.disconnect(guid)
314 self._connections.remove(guid)
317 """ Performs resource discovery.
319 This method is resposible for selecting an individual resource
320 matching user requirements.
321 This method should be redefined when necessary in child classes.
323 self.set_discovered()
326 """ Performs resource provisioning.
328 This method is resposible for provisioning one resource.
329 After this method has been successfully invoked, the resource
330 should be acccesible/controllable by the RM.
331 This method should be redefined when necessary in child classes.
333 self.set_provisioned()
336 """ Starts the resource.
338 There is no generic start behavior for all resources.
339 This method should be redefined when necessary in child classes.
341 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
342 self.error("Wrong state %s for start" % self.state)
348 """ Stops the resource.
350 There is no generic stop behavior for all resources.
351 This method should be redefined when necessary in child classes.
353 if not self.state in [ResourceState.STARTED]:
354 self.error("Wrong state %s for stop" % self.state)
360 """ Execute all steps required for the RM to reach the state READY
363 if self.state > ResourceState.READY:
364 self.error("Wrong state %s for deploy" % self.state)
367 self.debug("----- READY ---- ")
379 def set(self, name, value):
380 """ Set the value of the attribute
382 :param name: Name of the attribute
384 :param name: Value of the attribute
387 attr = self._attrs[name]
391 """ Returns the value of the attribute
393 :param name: Name of the attribute
397 attr = self._attrs[name]
400 def enable_trace(self, name):
401 """ Explicitly enable trace generation
403 :param name: Name of the trace
406 trace = self._trcs[name]
409 def trace_enabled(self, name):
410 """Returns True if trace is enables
412 :param name: Name of the trace
415 trace = self._trcs[name]
418 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
419 """ Get information on collected trace
421 :param name: Name of the trace
424 :param attr: Can be one of:
425 - TraceAttr.ALL (complete trace content),
426 - TraceAttr.STREAM (block in bytes to read starting at offset),
427 - TraceAttr.PATH (full path to the trace file),
428 - TraceAttr.SIZE (size of trace file).
431 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
434 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
441 def register_condition(self, action, group, state, time = None):
442 """ Registers a condition on the resource manager to allow execution
443 of 'action' only after 'time' has elapsed from the moment all resources
444 in 'group' reached state 'state'
446 :param action: Action to restrict to condition (either 'START' or 'STOP')
448 :param group: Group of RMs to wait for (list of guids)
449 :type group: int or list of int
450 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
452 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
457 if not action in self.conditions:
458 self._conditions[action] = list()
460 conditions = self.conditions.get(action)
462 # For each condition to register a tuple of (group, state, time) is
463 # added to the 'action' list
464 if not isinstance(group, list):
467 conditions.append((group, state, time))
469 def unregister_condition(self, group, action = None):
470 """ Removed conditions for a certain group of guids
472 :param action: Action to restrict to condition (either 'START' or 'STOP')
475 :param group: Group of RMs to wait for (list of guids)
476 :type group: int or list of int
479 # For each condition a tuple of (group, state, time) is
480 # added to the 'action' list
481 if not isinstance(group, list):
484 for act, conditions in self.conditions.iteritems():
485 if action and act != action:
488 for condition in list(conditions):
489 (grp, state, time) = condition
491 # If there is an intersection between grp and group,
492 # then remove intersected elements
493 intsec = set(group).intersection(set(grp))
495 idx = conditions.index(condition)
497 newgrp.difference_update(intsec)
498 conditions[idx] = (newgrp, state, time)
500 def get_connected(self, rtype = None):
501 """ Returns the list of RM with the type 'rtype'
503 :param rtype: Type of the RM we look for
505 :return: list of guid
508 rclass = ResourceFactory.get_resource_type(rtype)
509 for guid in self.connections:
510 rm = self.ec.get_resource(guid)
511 if not rtype or isinstance(rm, rclass):
515 def _needs_reschedule(self, group, state, time):
516 """ Internal method that verify if 'time' has elapsed since
517 all elements in 'group' have reached state 'state'.
519 :param group: Group of RMs to wait for (list of guids)
520 :type group: int or list of int
521 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
523 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
526 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
527 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
528 For the moment, 2m30s is not a correct syntax.
532 delay = reschedule_delay
534 # check state and time elapsed on all RMs
536 rm = self.ec.get_resource(guid)
537 # If the RM state is lower than the requested state we must
538 # reschedule (e.g. if RM is READY but we required STARTED).
543 # If there is a time restriction, we must verify the
544 # restriction is satisfied
546 if state == ResourceState.DISCOVERED:
548 if state == ResourceState.PROVISIONED:
549 t = rm.provision_time
550 elif state == ResourceState.READY:
552 elif state == ResourceState.STARTED:
554 elif state == ResourceState.STOPPED:
557 # Only keep time information for START and STOP
560 # time already elapsed since RM changed state
561 waited = "%fs" % tdiffsec(tnow(), t)
564 wait = tdiffsec(stabsformat(time), stabsformat(waited))
571 return reschedule, delay
573 def set_with_conditions(self, name, value, group, state, time):
574 """ Set value 'value' on attribute with name 'name' when 'time'
575 has elapsed since all elements in 'group' have reached state
578 :param name: Name of the attribute to set
580 :param name: Value of the attribute to set
582 :param group: Group of RMs to wait for (list of guids)
583 :type group: int or list of int
584 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
586 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
591 delay = reschedule_delay
593 ## evaluate if set conditions are met
595 # only can set with conditions after the RM is started
596 if self.state != ResourceState.STARTED:
599 reschedule, delay = self._needs_reschedule(group, state, time)
602 callback = functools.partial(self.set_with_conditions,
603 name, value, group, state, time)
604 self.ec.schedule(delay, callback)
606 self.set(name, value)
608 def start_with_conditions(self):
609 """ Starts RM when all the conditions in self.conditions for
610 action 'START' are satisfied.
614 delay = reschedule_delay
616 ## evaluate if set conditions are met
618 # only can start when RM is either STOPPED or READY
619 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
621 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
623 start_conditions = self.conditions.get(ResourceAction.START, [])
625 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
627 # Verify all start conditions are met
628 for (group, state, time) in start_conditions:
629 # Uncomment for debug
632 # rm = self.ec.get_resource(guid)
633 # unmet.append((guid, rm._state))
635 #self.debug("---- WAITED STATES ---- %s" % unmet )
637 reschedule, delay = self._needs_reschedule(group, state, time)
642 self.ec.schedule(delay, self.start_with_conditions)
644 self.debug("----- STARTING ---- ")
647 def stop_with_conditions(self):
648 """ Stops RM when all the conditions in self.conditions for
649 action 'STOP' are satisfied.
653 delay = reschedule_delay
655 ## evaluate if set conditions are met
657 # only can stop when RM is STARTED
658 if self.state != ResourceState.STARTED:
661 self.debug(" ---- STOP CONDITIONS ---- %s" %
662 self.conditions.get(ResourceAction.STOP))
664 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
665 for (group, state, time) in stop_conditions:
666 reschedule, delay = self._needs_reschedule(group, state, time)
671 callback = functools.partial(self.stop_with_conditions)
672 self.ec.schedule(delay, callback)
674 self.debug(" ----- STOPPING ---- ")
677 def connect(self, guid):
678 """ Performs actions that need to be taken upon associating RMs.
679 This method should be redefined when necessary in child classes.
683 def disconnect(self, guid):
684 """ Performs actions that need to be taken upon disassociating RMs.
685 This method should be redefined when necessary in child classes.
689 def valid_connection(self, guid):
690 """Checks whether a connection with the other RM
692 This method need to be redefined by each new Resource Manager.
694 :param guid: Guid of the current Resource Manager
702 def set_started(self):
703 """ Mark ResourceManager as STARTED """
704 self._start_time = tnow()
705 self._state = ResourceState.STARTED
707 def set_stopped(self):
708 """ Mark ResourceManager as STOPPED """
709 self._stop_time = tnow()
710 self._state = ResourceState.STOPPED
713 """ Mark ResourceManager as READY """
714 self._ready_time = tnow()
715 self._state = ResourceState.READY
717 def set_released(self):
718 """ Mark ResourceManager as REALEASED """
719 self._release_time = tnow()
720 self._state = ResourceState.RELEASED
722 def set_finished(self):
723 """ Mark ResourceManager as FINISHED """
724 self._finish_time = tnow()
725 self._state = ResourceState.FINISHED
727 def set_failed(self):
728 """ Mark ResourceManager as FAILED """
729 self._failed_time = tnow()
730 self._state = ResourceState.FAILED
732 def set_discovered(self):
733 """ Mark ResourceManager as DISCOVERED """
734 self._discover_time = tnow()
735 self._state = ResourceState.DISCOVERED
737 def set_provisioned(self):
738 """ Mark ResourceManager as PROVISIONED """
739 self._provision_time = tnow()
740 self._state = ResourceState.PROVISIONED
742 class ResourceFactory(object):
743 _resource_types = dict()
746 def resource_types(cls):
747 """Return the type of the Class"""
748 return cls._resource_types
751 def get_resource_type(cls, rtype):
752 """Return the type of the Class"""
753 return cls._resource_types.get(rtype)
756 def register_type(cls, rclass):
757 """Register a new Ressource Manager"""
758 cls._resource_types[rclass.rtype()] = rclass
761 def create(cls, rtype, ec, guid):
762 """Create a new instance of a Ressource Manager"""
763 rclass = cls._resource_types[rtype]
764 return rclass(ec, guid)
766 def populate_factory():
767 """Register all the possible RM that exists in the current version of Nepi.
769 # Once the factory is populated, don't repopulate
770 if not ResourceFactory.resource_types():
771 for rclass in find_types():
772 ResourceFactory.register_type(rclass)
775 """Look into the different folders to find all the
776 availables Resources Managers
778 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
779 search_path = set(search_path.split(" "))
782 import nepi.resources
783 path = os.path.dirname(nepi.resources.__file__)
784 search_path.add(path)
788 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
789 prefix = "nepi.resources."):
791 loader = importer.find_module(modname)
794 # Notice: Repeated calls to load_module will act as a reload of teh module
795 if modname in sys.modules:
796 module = sys.modules.get(modname)
798 module = loader.load_module(modname)
800 for attrname in dir(module):
801 if attrname.startswith("_"):
804 attr = getattr(module, attrname)
806 if attr == ResourceManager:
809 if not inspect.isclass(attr):
812 if issubclass(attr, ResourceManager):
815 if not modname in sys.modules:
816 sys.modules[modname] = module
821 err = traceback.format_exc()
822 logger = logging.getLogger("Resource.find_types()")
823 logger.error("Error while loading Resource Managers %s" % err)