2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
32 reschedule_delay = "1s"
35 """ Action that a user can order to a Resource Manager
43 """ State of a Resource Manager
56 ResourceState2str = dict({
57 ResourceState.NEW : "NEW",
58 ResourceState.DISCOVERED : "DISCOVERED",
59 ResourceState.PROVISIONED : "PROVISIONED",
60 ResourceState.READY : "READY",
61 ResourceState.STARTED : "STARTED",
62 ResourceState.STOPPED : "STOPPED",
63 ResourceState.FINISHED : "FINISHED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class
75 def clsinit_copy(cls):
76 """ Initializes template information (i.e. attributes and traces)
77 for the ResourceManager class, inheriting attributes and traces
83 # Decorator to invoke class initialization method
85 class ResourceManager(Logger):
91 def _register_attribute(cls, attr):
92 """ Resource subclasses will invoke this method to add a
96 cls._attributes[attr.name] = attr
99 def _remove_attribute(cls, name):
100 """ Resource subclasses will invoke this method to remove a
104 del cls._attributes[name]
107 def _register_trace(cls, trace):
108 """ Resource subclasses will invoke this method to add a
112 cls._traces[trace.name] = trace
115 def _remove_trace(cls, name):
116 """ Resource subclasses will invoke this method to remove a
120 del cls._traces[name]
123 def _register_attributes(cls):
124 """ Resource subclasses will invoke this method to register
131 def _register_traces(cls):
132 """ Resource subclasses will invoke this method to register
140 """ ResourceManager child classes have different attributes and traces.
141 Since the templates that hold the information of attributes and traces
142 are 'class attribute' dictionaries, initially they all point to the
143 parent class ResourceManager instances of those dictionaries.
144 In order to make these templates independent from the parent's one,
145 it is necessary re-initialize the corresponding dictionaries.
146 This is the objective of the _clsinit method
148 # static template for resource attributes
149 cls._attributes = dict()
150 cls._register_attributes()
152 # static template for resource traces
154 cls._register_traces()
157 def _clsinit_copy(cls):
158 """ Same as _clsinit, except that it also inherits all attributes and traces
159 from the parent class.
161 # static template for resource attributes
162 cls._attributes = copy.deepcopy(cls._attributes)
163 cls._register_attributes()
165 # static template for resource traces
166 cls._traces = copy.deepcopy(cls._traces)
167 cls._register_traces()
171 """ Returns the type of the Resource Manager
177 def get_attributes(cls):
178 """ Returns a copy of the attributes
181 return copy.deepcopy(cls._attributes.values())
185 """ Returns a copy of the traces
188 return copy.deepcopy(cls._traces.values())
190 def __init__(self, ec, guid):
191 super(ResourceManager, self).__init__(self.rtype())
194 self._ec = weakref.ref(ec)
195 self._connections = set()
196 self._conditions = dict()
198 # the resource instance gets a copy of all attributes
199 self._attrs = copy.deepcopy(self._attributes)
201 # the resource instance gets a copy of all traces
202 self._trcs = copy.deepcopy(self._traces)
204 # Each resource is placed on a deployment group by the EC
206 self.deployment_group = None
208 self._start_time = None
209 self._stop_time = None
210 self._discover_time = None
211 self._provision_time = None
212 self._ready_time = None
213 self._release_time = None
214 self._finish_time = None
215 self._failed_time = None
217 self._state = ResourceState.NEW
221 """ Returns the global unique identifier of the RM """
226 """ Returns the Experiment Controller """
230 def connections(self):
231 """ Returns the set of guids of connected RMs"""
232 return self._connections
235 def conditions(self):
236 """ Returns the conditions to which the RM is subjected to.
238 The object returned by this method is a dictionary indexed by
240 return self._conditions
243 def start_time(self):
244 """ Returns the start time of the RM as a timestamp"""
245 return self._start_time
249 """ Returns the stop time of the RM as a timestamp"""
250 return self._stop_time
253 def discover_time(self):
254 """ Returns the time discovering was finished for the RM as a timestamp"""
255 return self._discover_time
258 def provision_time(self):
259 """ Returns the time provisioning was finished for the RM as a timestamp"""
260 return self._provision_time
263 def ready_time(self):
264 """ Returns the time deployment was finished for the RM as a timestamp"""
265 return self._ready_time
268 def release_time(self):
269 """ Returns the release time of the RM as a timestamp"""
270 return self._release_time
273 def finish_time(self):
274 """ Returns the finalization time of the RM as a timestamp"""
275 return self._finish_time
278 def failed_time(self):
279 """ Returns the time failure occured for the RM as a timestamp"""
280 return self._failed_time
284 """ Get the state of the current RM """
287 def log_message(self, msg):
288 """ Returns the log message formatted with added information.
290 :param msg: text message
294 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
296 def register_connection(self, guid):
297 """ Registers a connection to the RM identified by guid
299 :param guid: Global unique identified of the RM to connect to
302 if self.valid_connection(guid):
304 self._connections.add(guid)
306 def unregister_connection(self, guid):
307 """ Removes a registered connection to the RM identified by guid
309 :param guid: Global unique identified of the RM to connect to
312 if guid in self._connections:
313 self.disconnect(guid)
314 self._connections.remove(guid)
317 """ Performs resource discovery.
319 This method is resposible for selecting an individual resource
320 matching user requirements.
321 This method should be redefined when necessary in child classes.
323 self.set_discovered()
326 """ Performs resource provisioning.
328 This method is resposible for provisioning one resource.
329 After this method has been successfully invoked, the resource
330 should be acccesible/controllable by the RM.
331 This method should be redefined when necessary in child classes.
333 self.set_provisioned()
336 """ Starts the resource.
338 There is no generic start behavior for all resources.
339 This method should be redefined when necessary in child classes.
341 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
342 self.error("Wrong state %s for start" % self.state)
348 """ Stops the resource.
350 There is no generic stop behavior for all resources.
351 This method should be redefined when necessary in child classes.
353 if not self.state in [ResourceState.STARTED]:
354 self.error("Wrong state %s for stop" % self.state)
360 """ Execute all steps required for the RM to reach the state READY
363 if self.state > ResourceState.READY:
364 self.error("Wrong state %s for deploy" % self.state)
367 self.debug("----- READY ---- ")
379 def set(self, name, value):
380 """ Set the value of the attribute
382 :param name: Name of the attribute
384 :param name: Value of the attribute
387 attr = self._attrs[name]
391 """ Returns the value of the attribute
393 :param name: Name of the attribute
397 attr = self._attrs[name]
400 def enable_trace(self, name):
401 """ Explicitly enable trace generation
403 :param name: Name of the trace
406 trace = self._trcs[name]
409 def trace_enabled(self, name):
410 """Returns True if trace is enables
412 :param name: Name of the trace
415 trace = self._trcs[name]
418 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
419 """ Get information on collected trace
421 :param name: Name of the trace
424 :param attr: Can be one of:
425 - TraceAttr.ALL (complete trace content),
426 - TraceAttr.STREAM (block in bytes to read starting at offset),
427 - TraceAttr.PATH (full path to the trace file),
428 - TraceAttr.SIZE (size of trace file).
431 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
434 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
441 def register_condition(self, action, group, state, time = None):
442 """ Registers a condition on the resource manager to allow execution
443 of 'action' only after 'time' has elapsed from the moment all resources
444 in 'group' reached state 'state'
446 :param action: Action to restrict to condition (either 'START' or 'STOP')
448 :param group: Group of RMs to wait for (list of guids)
449 :type group: int or list of int
450 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
452 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
457 if not action in self.conditions:
458 self._conditions[action] = list()
460 conditions = self.conditions.get(action)
462 # For each condition to register a tuple of (group, state, time) is
463 # added to the 'action' list
464 if not isinstance(group, list):
467 conditions.append((group, state, time))
469 def unregister_condition(self, group, action = None):
470 """ Removed conditions for a certain group of guids
472 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
475 :param group: Group of RMs to wait for (list of guids)
476 :type group: int or list of int
479 # For each condition a tuple of (group, state, time) is
480 # added to the 'action' list
481 if not isinstance(group, list):
484 for act, conditions in self.conditions.iteritems():
485 if action and act != action:
488 for condition in list(conditions):
489 (grp, state, time) = condition
491 # If there is an intersection between grp and group,
492 # then remove intersected elements
493 intsec = set(group).intersection(set(grp))
495 idx = conditions.index(condition)
497 newgrp.difference_update(intsec)
498 conditions[idx] = (newgrp, state, time)
500 def get_connected(self, rtype = None):
501 """ Returns the list of RM with the type 'rtype'
503 :param rtype: Type of the RM we look for
505 :return: list of guid
508 rclass = ResourceFactory.get_resource_type(rtype)
509 for guid in self.connections:
510 rm = self.ec.get_resource(guid)
511 if not rtype or isinstance(rm, rclass):
515 def _needs_reschedule(self, group, state, time):
516 """ Internal method that verify if 'time' has elapsed since
517 all elements in 'group' have reached state 'state'.
519 :param group: Group of RMs to wait for (list of guids)
520 :type group: int or list of int
521 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
523 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
526 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
527 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
528 For the moment, 2m30s is not a correct syntax.
532 delay = reschedule_delay
534 # check state and time elapsed on all RMs
536 rm = self.ec.get_resource(guid)
537 # If the RM state is lower than the requested state we must
538 # reschedule (e.g. if RM is READY but we required STARTED).
543 # If there is a time restriction, we must verify the
544 # restriction is satisfied
546 if state == ResourceState.DISCOVERED:
548 if state == ResourceState.PROVISIONED:
549 t = rm.provision_time
550 elif state == ResourceState.READY:
552 elif state == ResourceState.STARTED:
554 elif state == ResourceState.STOPPED:
559 # time already elapsed since RM changed state
560 waited = "%fs" % tdiffsec(tnow(), t)
563 wait = tdiffsec(stabsformat(time), stabsformat(waited))
570 return reschedule, delay
572 def set_with_conditions(self, name, value, group, state, time):
573 """ Set value 'value' on attribute with name 'name' when 'time'
574 has elapsed since all elements in 'group' have reached state
577 :param name: Name of the attribute to set
579 :param name: Value of the attribute to set
581 :param group: Group of RMs to wait for (list of guids)
582 :type group: int or list of int
583 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
585 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
590 delay = reschedule_delay
592 ## evaluate if set conditions are met
594 # only can set with conditions after the RM is started
595 if self.state != ResourceState.STARTED:
598 reschedule, delay = self._needs_reschedule(group, state, time)
601 callback = functools.partial(self.set_with_conditions,
602 name, value, group, state, time)
603 self.ec.schedule(delay, callback)
605 self.set(name, value)
607 def start_with_conditions(self):
608 """ Starts RM when all the conditions in self.conditions for
609 action 'START' are satisfied.
613 delay = reschedule_delay
615 ## evaluate if set conditions are met
617 # only can start when RM is either STOPPED or READY
618 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
620 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
622 start_conditions = self.conditions.get(ResourceAction.START, [])
624 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
626 # Verify all start conditions are met
627 for (group, state, time) in start_conditions:
628 # Uncomment for debug
631 # rm = self.ec.get_resource(guid)
632 # unmet.append((guid, rm._state))
634 #self.debug("---- WAITED STATES ---- %s" % unmet )
636 reschedule, delay = self._needs_reschedule(group, state, time)
641 self.ec.schedule(delay, self.start_with_conditions)
643 self.debug("----- STARTING ---- ")
646 def stop_with_conditions(self):
647 """ Stops RM when all the conditions in self.conditions for
648 action 'STOP' are satisfied.
652 delay = reschedule_delay
654 ## evaluate if set conditions are met
656 # only can stop when RM is STARTED
657 if self.state != ResourceState.STARTED:
660 self.debug(" ---- STOP CONDITIONS ---- %s" %
661 self.conditions.get(ResourceAction.STOP))
663 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
664 for (group, state, time) in stop_conditions:
665 reschedule, delay = self._needs_reschedule(group, state, time)
670 callback = functools.partial(self.stop_with_conditions)
671 self.ec.schedule(delay, callback)
673 self.debug(" ----- STOPPING ---- ")
676 def deploy_with_conditions(self):
677 """ Deploy RM when all the conditions in self.conditions for
678 action 'READY' are satisfied.
682 delay = reschedule_delay
684 ## evaluate if set conditions are met
686 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
687 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
688 ResourceState.PROVISIONED]:
690 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
692 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
694 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
696 # Verify all start conditions are met
697 for (group, state, time) in deploy_conditions:
698 # Uncomment for debug
701 # rm = self.ec.get_resource(guid)
702 # unmet.append((guid, rm._state))
704 #self.debug("---- WAITED STATES ---- %s" % unmet )
706 reschedule, delay = self._needs_reschedule(group, state, time)
711 self.ec.schedule(delay, self.deploy_with_conditions)
713 self.debug("----- STARTING ---- ")
717 def connect(self, guid):
718 """ Performs actions that need to be taken upon associating RMs.
719 This method should be redefined when necessary in child classes.
723 def disconnect(self, guid):
724 """ Performs actions that need to be taken upon disassociating RMs.
725 This method should be redefined when necessary in child classes.
729 def valid_connection(self, guid):
730 """Checks whether a connection with the other RM
732 This method need to be redefined by each new Resource Manager.
734 :param guid: Guid of the current Resource Manager
742 def set_started(self):
743 """ Mark ResourceManager as STARTED """
744 self._start_time = tnow()
745 self._state = ResourceState.STARTED
747 def set_stopped(self):
748 """ Mark ResourceManager as STOPPED """
749 self._stop_time = tnow()
750 self._state = ResourceState.STOPPED
753 """ Mark ResourceManager as READY """
754 self._ready_time = tnow()
755 self._state = ResourceState.READY
757 def set_released(self):
758 """ Mark ResourceManager as REALEASED """
759 self._release_time = tnow()
760 self._state = ResourceState.RELEASED
762 def set_finished(self):
763 """ Mark ResourceManager as FINISHED """
764 self._finish_time = tnow()
765 self._state = ResourceState.FINISHED
767 def set_failed(self):
768 """ Mark ResourceManager as FAILED """
769 self._failed_time = tnow()
770 self._state = ResourceState.FAILED
772 def set_discovered(self):
773 """ Mark ResourceManager as DISCOVERED """
774 self._discover_time = tnow()
775 self._state = ResourceState.DISCOVERED
777 def set_provisioned(self):
778 """ Mark ResourceManager as PROVISIONED """
779 self._provision_time = tnow()
780 self._state = ResourceState.PROVISIONED
782 class ResourceFactory(object):
783 _resource_types = dict()
786 def resource_types(cls):
787 """Return the type of the Class"""
788 return cls._resource_types
791 def get_resource_type(cls, rtype):
792 """Return the type of the Class"""
793 return cls._resource_types.get(rtype)
796 def register_type(cls, rclass):
797 """Register a new Ressource Manager"""
798 cls._resource_types[rclass.rtype()] = rclass
801 def create(cls, rtype, ec, guid):
802 """Create a new instance of a Ressource Manager"""
803 rclass = cls._resource_types[rtype]
804 return rclass(ec, guid)
806 def populate_factory():
807 """Register all the possible RM that exists in the current version of Nepi.
809 # Once the factory is populated, don't repopulate
810 if not ResourceFactory.resource_types():
811 for rclass in find_types():
812 ResourceFactory.register_type(rclass)
815 """Look into the different folders to find all the
816 availables Resources Managers
818 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
819 search_path = set(search_path.split(" "))
822 import nepi.resources
823 path = os.path.dirname(nepi.resources.__file__)
824 search_path.add(path)
828 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
829 prefix = "nepi.resources."):
831 loader = importer.find_module(modname)
834 # Notice: Repeated calls to load_module will act as a reload of teh module
835 if modname in sys.modules:
836 module = sys.modules.get(modname)
838 module = loader.load_module(modname)
840 for attrname in dir(module):
841 if attrname.startswith("_"):
844 attr = getattr(module, attrname)
846 if attr == ResourceManager:
849 if not inspect.isclass(attr):
852 if issubclass(attr, ResourceManager):
855 if not modname in sys.modules:
856 sys.modules[modname] = module
861 err = traceback.format_exc()
862 logger = logging.getLogger("Resource.find_types()")
863 logger.error("Error while loading Resource Managers %s" % err)