2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
33 reschedule_delay = "1s"
36 """ Action that a user can order to a Resource Manager
44 """ State of a Resource Manager
57 ResourceState2str = dict({
58 ResourceState.NEW : "NEW",
59 ResourceState.DISCOVERED : "DISCOVERED",
60 ResourceState.PROVISIONED : "PROVISIONED",
61 ResourceState.READY : "READY",
62 ResourceState.STARTED : "STARTED",
63 ResourceState.STOPPED : "STOPPED",
64 ResourceState.FINISHED : "FINISHED",
65 ResourceState.FAILED : "FAILED",
66 ResourceState.RELEASED : "RELEASED",
70 """ Initializes template information (i.e. attributes and traces)
71 for the ResourceManager class
76 def clsinit_copy(cls):
77 """ Initializes template information (i.e. attributes and traces)
78 for the ResourceManager class, inheriting attributes and traces
85 def wrapped(self, *args, **kwargs):
87 return func(self, *args, **kwargs)
90 err = traceback.format_exc()
92 self.debug("SETTING guid %d to state FAILED" % self.guid)
98 # Decorator to invoke class initialization method
100 class ResourceManager(Logger):
101 """ Base clase for all ResourceManagers.
103 A ResourceManger is specific to a resource type (e.g. Node,
104 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
107 The ResourceManager instances are responsible for interacting with
108 and controlling concrete (physical or virtual) resources in the
109 experimental backends.
119 def _register_attribute(cls, attr):
120 """ Resource subclasses will invoke this method to add a
124 cls._attributes[attr.name] = attr
127 def _remove_attribute(cls, name):
128 """ Resource subclasses will invoke this method to remove a
132 del cls._attributes[name]
135 def _register_trace(cls, trace):
136 """ Resource subclasses will invoke this method to add a
140 cls._traces[trace.name] = trace
143 def _remove_trace(cls, name):
144 """ Resource subclasses will invoke this method to remove a
148 del cls._traces[name]
151 def _register_attributes(cls):
152 """ Resource subclasses will invoke this method to register
156 critical = Attribute("critical", "Defines whether the resource is critical. "
157 " A failure on a critical resource will interrupt the experiment. ",
160 flags = Flags.ExecReadOnly)
162 cls._register_attribute(critical)
165 def _register_traces(cls):
166 """ Resource subclasses will invoke this method to register
174 """ ResourceManager child classes have different attributes and traces.
175 Since the templates that hold the information of attributes and traces
176 are 'class attribute' dictionaries, initially they all point to the
177 parent class ResourceManager instances of those dictionaries.
178 In order to make these templates independent from the parent's one,
179 it is necessary re-initialize the corresponding dictionaries.
180 This is the objective of the _clsinit method
182 # static template for resource attributes
183 cls._attributes = dict()
184 cls._register_attributes()
186 # static template for resource traces
188 cls._register_traces()
191 def _clsinit_copy(cls):
192 """ Same as _clsinit, except that it also inherits all attributes and traces
193 from the parent class.
195 # static template for resource attributes
196 cls._attributes = copy.deepcopy(cls._attributes)
197 cls._register_attributes()
199 # static template for resource traces
200 cls._traces = copy.deepcopy(cls._traces)
201 cls._register_traces()
205 """ Returns the type of the Resource Manager
211 def get_attributes(cls):
212 """ Returns a copy of the attributes
215 return copy.deepcopy(cls._attributes.values())
219 """ Returns a copy of the traces
222 return copy.deepcopy(cls._traces.values())
226 """ Returns the description of the type of Resource
232 def get_backend(cls):
233 """ Returns the identified of the backend (i.e. testbed, environment)
239 def __init__(self, ec, guid):
240 super(ResourceManager, self).__init__(self.rtype())
243 self._ec = weakref.ref(ec)
244 self._connections = set()
245 self._conditions = dict()
247 # the resource instance gets a copy of all attributes
248 self._attrs = copy.deepcopy(self._attributes)
250 # the resource instance gets a copy of all traces
251 self._trcs = copy.deepcopy(self._traces)
253 # Each resource is placed on a deployment group by the EC
255 self.deployment_group = None
257 self._start_time = None
258 self._stop_time = None
259 self._discover_time = None
260 self._provision_time = None
261 self._ready_time = None
262 self._release_time = None
263 self._finish_time = None
264 self._failed_time = None
266 self._state = ResourceState.NEW
270 """ Returns the global unique identifier of the RM """
275 """ Returns the Experiment Controller """
279 def connections(self):
280 """ Returns the set of guids of connected RMs"""
281 return self._connections
284 def conditions(self):
285 """ Returns the conditions to which the RM is subjected to.
287 The object returned by this method is a dictionary indexed by
289 return self._conditions
292 def start_time(self):
293 """ Returns the start time of the RM as a timestamp"""
294 return self._start_time
298 """ Returns the stop time of the RM as a timestamp"""
299 return self._stop_time
302 def discover_time(self):
303 """ Returns the time discovering was finished for the RM as a timestamp"""
304 return self._discover_time
307 def provision_time(self):
308 """ Returns the time provisioning was finished for the RM as a timestamp"""
309 return self._provision_time
312 def ready_time(self):
313 """ Returns the time deployment was finished for the RM as a timestamp"""
314 return self._ready_time
317 def release_time(self):
318 """ Returns the release time of the RM as a timestamp"""
319 return self._release_time
322 def finish_time(self):
323 """ Returns the finalization time of the RM as a timestamp"""
324 return self._finish_time
327 def failed_time(self):
328 """ Returns the time failure occured for the RM as a timestamp"""
329 return self._failed_time
333 """ Get the current state of the RM """
336 def log_message(self, msg):
337 """ Returns the log message formatted with added information.
339 :param msg: text message
343 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
345 def register_connection(self, guid):
346 """ Registers a connection to the RM identified by guid
348 :param guid: Global unique identified of the RM to connect to
351 if self.valid_connection(guid):
353 self._connections.add(guid)
355 def unregister_connection(self, guid):
356 """ Removes a registered connection to the RM identified by guid
358 :param guid: Global unique identified of the RM to connect to
361 if guid in self._connections:
362 self.disconnect(guid)
363 self._connections.remove(guid)
366 """ Performs resource discovery.
368 This method is responsible for selecting an individual resource
369 matching user requirements.
370 This method should be redefined when necessary in child classes.
372 If overridden in child classes, make sure to use the failtrap
373 decorator to ensure the RM state will be set to FAILED in the event
377 self.set_discovered()
380 """ Performs resource provisioning.
382 This method is responsible for provisioning one resource.
383 After this method has been successfully invoked, the resource
384 should be accessible/controllable by the RM.
385 This method should be redefined when necessary in child classes.
387 If overridden in child classes, make sure to use the failtrap
388 decorator to ensure the RM state will be set to FAILED in the event
392 self.set_provisioned()
397 There is no generic start behavior for all resources.
398 This method should be redefined when necessary in child classes.
400 If overridden in child classes, make sure to use the failtrap
401 decorator to ensure the RM state will be set to FAILED in the event
405 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
406 self.error("Wrong state %s for start" % self.state)
412 """ Interrupts the RM, stopping any tasks the RM was performing.
414 There is no generic stop behavior for all resources.
415 This method should be redefined when necessary in child classes.
417 If overridden in child classes, make sure to use the failtrap
418 decorator to ensure the RM state will be set to FAILED in the event
422 if not self.state in [ResourceState.STARTED]:
423 self.error("Wrong state %s for stop" % self.state)
429 """ Execute all steps required for the RM to reach the state READY.
431 This method is responsible for deploying the resource (and invoking the
432 discover and provision methods).
433 This method should be redefined when necessary in child classes.
435 If overridden in child classes, make sure to use the failtrap
436 decorator to ensure the RM state will be set to FAILED in the event
440 if self.state > ResourceState.READY:
441 self.error("Wrong state %s for deploy" % self.state)
444 self.debug("----- READY ---- ")
448 """ Perform actions to free resources used by the RM.
450 This method is responsible for releasing resources that were
451 used during the experiment by the RM.
452 This method should be redefined when necessary in child classes.
454 If overridden in child classes, this method should never
455 raise an error and it must ensure the RM is set to state RELEASED.
461 """ Sets the RM to state FINISHED.
463 The FINISHED state is different from STOPPED in that it should not be
464 directly invoked by the user.
465 STOPPED indicates that the user interrupted the RM, FINISHED means
466 that the RM concluded normally the actions it was supposed to perform.
467 This method should be redefined when necessary in child classes.
469 If overridden in child classes, make sure to use the failtrap
470 decorator to ensure the RM state will be set to FAILED in the event
478 """ Sets the RM to state FAILED.
484 def set(self, name, value):
485 """ Set the value of the attribute
487 :param name: Name of the attribute
489 :param name: Value of the attribute
492 attr = self._attrs[name]
496 """ Returns the value of the attribute
498 :param name: Name of the attribute
502 attr = self._attrs[name]
505 def enable_trace(self, name):
506 """ Explicitly enable trace generation
508 :param name: Name of the trace
511 trace = self._trcs[name]
514 def trace_enabled(self, name):
515 """Returns True if trace is enables
517 :param name: Name of the trace
520 trace = self._trcs[name]
523 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
524 """ Get information on collected trace
526 :param name: Name of the trace
529 :param attr: Can be one of:
530 - TraceAttr.ALL (complete trace content),
531 - TraceAttr.STREAM (block in bytes to read starting at offset),
532 - TraceAttr.PATH (full path to the trace file),
533 - TraceAttr.SIZE (size of trace file).
536 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
539 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
546 def register_condition(self, action, group, state, time = None):
547 """ Registers a condition on the resource manager to allow execution
548 of 'action' only after 'time' has elapsed from the moment all resources
549 in 'group' reached state 'state'
551 :param action: Action to restrict to condition (either 'START' or 'STOP')
553 :param group: Group of RMs to wait for (list of guids)
554 :type group: int or list of int
555 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
557 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
562 if not action in self.conditions:
563 self._conditions[action] = list()
565 conditions = self.conditions.get(action)
567 # For each condition to register a tuple of (group, state, time) is
568 # added to the 'action' list
569 if not isinstance(group, list):
572 conditions.append((group, state, time))
574 def unregister_condition(self, group, action = None):
575 """ Removed conditions for a certain group of guids
577 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
580 :param group: Group of RMs to wait for (list of guids)
581 :type group: int or list of int
584 # For each condition a tuple of (group, state, time) is
585 # added to the 'action' list
586 if not isinstance(group, list):
589 for act, conditions in self.conditions.iteritems():
590 if action and act != action:
593 for condition in list(conditions):
594 (grp, state, time) = condition
596 # If there is an intersection between grp and group,
597 # then remove intersected elements
598 intsec = set(group).intersection(set(grp))
600 idx = conditions.index(condition)
602 newgrp.difference_update(intsec)
603 conditions[idx] = (newgrp, state, time)
605 def get_connected(self, rtype = None):
606 """ Returns the list of RM with the type 'rtype'
608 :param rtype: Type of the RM we look for
610 :return: list of guid
613 rclass = ResourceFactory.get_resource_type(rtype)
614 for guid in self.connections:
615 rm = self.ec.get_resource(guid)
616 if not rtype or isinstance(rm, rclass):
620 def _needs_reschedule(self, group, state, time):
621 """ Internal method that verify if 'time' has elapsed since
622 all elements in 'group' have reached state 'state'.
624 :param group: Group of RMs to wait for (list of guids)
625 :type group: int or list of int
626 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
628 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
631 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
632 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
633 For the moment, 2m30s is not a correct syntax.
637 delay = reschedule_delay
639 # check state and time elapsed on all RMs
641 rm = self.ec.get_resource(guid)
642 # If the RM state is lower than the requested state we must
643 # reschedule (e.g. if RM is READY but we required STARTED).
648 # If there is a time restriction, we must verify the
649 # restriction is satisfied
651 if state == ResourceState.DISCOVERED:
653 if state == ResourceState.PROVISIONED:
654 t = rm.provision_time
655 elif state == ResourceState.READY:
657 elif state == ResourceState.STARTED:
659 elif state == ResourceState.STOPPED:
664 # time already elapsed since RM changed state
665 waited = "%fs" % tdiffsec(tnow(), t)
668 wait = tdiffsec(stabsformat(time), stabsformat(waited))
675 return reschedule, delay
677 def set_with_conditions(self, name, value, group, state, time):
678 """ Set value 'value' on attribute with name 'name' when 'time'
679 has elapsed since all elements in 'group' have reached state
682 :param name: Name of the attribute to set
684 :param name: Value of the attribute to set
686 :param group: Group of RMs to wait for (list of guids)
687 :type group: int or list of int
688 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
690 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
695 delay = reschedule_delay
697 ## evaluate if set conditions are met
699 # only can set with conditions after the RM is started
700 if self.state != ResourceState.STARTED:
703 reschedule, delay = self._needs_reschedule(group, state, time)
706 callback = functools.partial(self.set_with_conditions,
707 name, value, group, state, time)
708 self.ec.schedule(delay, callback)
710 self.set(name, value)
712 def start_with_conditions(self):
713 """ Starts RM when all the conditions in self.conditions for
714 action 'START' are satisfied.
718 delay = reschedule_delay
720 ## evaluate if conditions to start are met
724 # Can only start when RM is either STOPPED or READY
725 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
727 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
729 start_conditions = self.conditions.get(ResourceAction.START, [])
731 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
733 # Verify all start conditions are met
734 for (group, state, time) in start_conditions:
735 # Uncomment for debug
738 # rm = self.ec.get_resource(guid)
739 # unmet.append((guid, rm._state))
741 #self.debug("---- WAITED STATES ---- %s" % unmet )
743 reschedule, delay = self._needs_reschedule(group, state, time)
748 self.ec.schedule(delay, self.start_with_conditions)
750 self.debug("----- STARTING ---- ")
753 def stop_with_conditions(self):
754 """ Stops RM when all the conditions in self.conditions for
755 action 'STOP' are satisfied.
759 delay = reschedule_delay
761 ## evaluate if conditions to stop are met
765 # only can stop when RM is STARTED
766 if self.state != ResourceState.STARTED:
768 self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
770 self.debug(" ---- STOP CONDITIONS ---- %s" %
771 self.conditions.get(ResourceAction.STOP))
773 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
774 for (group, state, time) in stop_conditions:
775 reschedule, delay = self._needs_reschedule(group, state, time)
780 callback = functools.partial(self.stop_with_conditions)
781 self.ec.schedule(delay, callback)
783 self.debug(" ----- STOPPING ---- ")
786 def deploy_with_conditions(self):
787 """ Deploy RM when all the conditions in self.conditions for
788 action 'READY' are satisfied.
792 delay = reschedule_delay
794 ## evaluate if conditions to deploy are met
798 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
799 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
800 ResourceState.PROVISIONED]:
802 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
804 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
806 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
808 # Verify all start conditions are met
809 for (group, state, time) in deploy_conditions:
810 # Uncomment for debug
813 # rm = self.ec.get_resource(guid)
814 # unmet.append((guid, rm._state))
816 #self.debug("---- WAITED STATES ---- %s" % unmet )
818 reschedule, delay = self._needs_reschedule(group, state, time)
823 self.ec.schedule(delay, self.deploy_with_conditions)
825 self.debug("----- STARTING ---- ")
828 def connect(self, guid):
829 """ Performs actions that need to be taken upon associating RMs.
830 This method should be redefined when necessary in child classes.
834 def disconnect(self, guid):
835 """ Performs actions that need to be taken upon disassociating RMs.
836 This method should be redefined when necessary in child classes.
840 def valid_connection(self, guid):
841 """Checks whether a connection with the other RM
843 This method need to be redefined by each new Resource Manager.
845 :param guid: Guid of the current Resource Manager
853 def set_started(self):
854 """ Mark ResourceManager as STARTED """
855 self.set_state(ResourceState.STARTED, "_start_time")
857 def set_stopped(self):
858 """ Mark ResourceManager as STOPPED """
859 self.set_state(ResourceState.STOPPED, "_stop_time")
862 """ Mark ResourceManager as READY """
863 self.set_state(ResourceState.READY, "_ready_time")
865 def set_released(self):
866 """ Mark ResourceManager as REALEASED """
867 self.set_state(ResourceState.RELEASED, "_release_time")
869 def set_finished(self):
870 """ Mark ResourceManager as FINISHED """
871 self.set_state(ResourceState.FINISHED, "_finish_time")
873 def set_failed(self):
874 """ Mark ResourceManager as FAILED """
875 self.set_state(ResourceState.FAILED, "_failed_time")
877 def set_discovered(self):
878 """ Mark ResourceManager as DISCOVERED """
879 self.set_state(ResourceState.DISCOVERED, "_discover_time")
881 def set_provisioned(self):
882 """ Mark ResourceManager as PROVISIONED """
883 self.set_state(ResourceState.PROVISIONED, "_provision_time")
885 def set_state(self, state, state_time_attr):
886 # Ensure that RM state will not change after released
887 if self._state == ResourceState.RELEASED:
890 setattr(self, state_time_attr, tnow())
893 class ResourceFactory(object):
894 _resource_types = dict()
897 def resource_types(cls):
898 """Return the type of the Class"""
899 return cls._resource_types
902 def get_resource_type(cls, rtype):
903 """Return the type of the Class"""
904 return cls._resource_types.get(rtype)
907 def register_type(cls, rclass):
908 """Register a new Ressource Manager"""
909 cls._resource_types[rclass.rtype()] = rclass
912 def create(cls, rtype, ec, guid):
913 """Create a new instance of a Ressource Manager"""
914 rclass = cls._resource_types[rtype]
915 return rclass(ec, guid)
917 def populate_factory():
918 """Register all the possible RM that exists in the current version of Nepi.
920 # Once the factory is populated, don't repopulate
921 if not ResourceFactory.resource_types():
922 for rclass in find_types():
923 ResourceFactory.register_type(rclass)
926 """Look into the different folders to find all the
927 availables Resources Managers
929 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
930 search_path = set(search_path.split(" "))
933 import nepi.resources
934 path = os.path.dirname(nepi.resources.__file__)
935 search_path.add(path)
939 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
940 prefix = "nepi.resources."):
942 loader = importer.find_module(modname)
945 # Notice: Repeated calls to load_module will act as a reload of teh module
946 if modname in sys.modules:
947 module = sys.modules.get(modname)
949 module = loader.load_module(modname)
951 for attrname in dir(module):
952 if attrname.startswith("_"):
955 attr = getattr(module, attrname)
957 if attr == ResourceManager:
960 if not inspect.isclass(attr):
963 if issubclass(attr, ResourceManager):
966 if not modname in sys.modules:
967 sys.modules[modname] = module
972 err = traceback.format_exc()
973 logger = logging.getLogger("Resource.find_types()")
974 logger.error("Error while loading Resource Managers %s" % err)