2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
34 reschedule_delay = "1s"
37 """ Action that a user can order to a Resource Manager
45 """ State of a Resource Manager
57 ResourceState2str = dict({
58 ResourceState.NEW : "NEW",
59 ResourceState.DISCOVERED : "DISCOVERED",
60 ResourceState.PROVISIONED : "PROVISIONED",
61 ResourceState.READY : "READY",
62 ResourceState.STARTED : "STARTED",
63 ResourceState.STOPPED : "STOPPED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 on classes derived from the ResourceManager class.
72 It is used as a decorator in the class declaration as follows:
75 class MyResourceManager(ResourceManager):
84 def clsinit_copy(cls):
85 """ Initializes template information (i.e. attributes and traces)
86 on classes direved from the ResourceManager class.
87 It differs from the clsinit method in that it forces inheritance
88 of attributes and traces from the parent class.
90 It is used as a decorator in the class declaration as follows:
93 class MyResourceManager(ResourceManager):
98 clsinit_copy should be prefered to clsinit when creating new
99 ResourceManager child classes.
107 """ Decorator function for instance methods that should set the
108 RM state to FAILED when an error is raised. The methods that must be
109 decorated are: discover, provision, deploy, start, stop.
112 def wrapped(self, *args, **kwargs):
114 return func(self, *args, **kwargs)
117 err = traceback.format_exc()
119 self.debug("SETTING guid %d to state FAILED" % self.guid)
126 class ResourceManager(Logger):
127 """ Base clase for all ResourceManagers.
129 A ResourceManger is specific to a resource type (e.g. Node,
130 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
133 The ResourceManager instances are responsible for interacting with
134 and controlling concrete (physical or virtual) resources in the
135 experimental backends.
145 def _register_attribute(cls, attr):
146 """ Resource subclasses will invoke this method to add a
151 cls._attributes[attr.name] = attr
154 def _remove_attribute(cls, name):
155 """ Resource subclasses will invoke this method to remove a
160 del cls._attributes[name]
163 def _register_trace(cls, trace):
164 """ Resource subclasses will invoke this method to add a
169 cls._traces[trace.name] = trace
172 def _remove_trace(cls, name):
173 """ Resource subclasses will invoke this method to remove a
178 del cls._traces[name]
181 def _register_attributes(cls):
182 """ Resource subclasses will invoke this method to register
185 This method should be overriden in the RMs that define
189 critical = Attribute("critical",
190 "Defines whether the resource is critical. "
191 "A failure on a critical resource will interrupt "
195 flags = Flags.Design)
197 cls._register_attribute(critical)
200 def _register_traces(cls):
201 """ Resource subclasses will invoke this method to register
204 This method should be overriden in the RMs that define traces.
212 """ ResourceManager classes have different attributes and traces.
213 Attribute and traces are stored in 'class attribute' dictionaries.
214 When a new ResourceManager class is created, the _clsinit method is
215 called to create a new instance of those dictionaries and initialize
218 The _clsinit method is called by the clsinit decorator method.
222 # static template for resource attributes
223 cls._attributes = dict()
224 cls._register_attributes()
226 # static template for resource traces
228 cls._register_traces()
231 def _clsinit_copy(cls):
232 """ Same as _clsinit, except that after creating new instances of the
233 dictionaries it copies all the attributes and traces from the parent
236 The _clsinit_copy method is called by the clsinit_copy decorator method.
239 # static template for resource attributes
240 cls._attributes = copy.deepcopy(cls._attributes)
241 cls._register_attributes()
243 # static template for resource traces
244 cls._traces = copy.deepcopy(cls._traces)
245 cls._register_traces()
249 """ Returns the type of the Resource Manager
255 def get_attributes(cls):
256 """ Returns a copy of the attributes
259 return copy.deepcopy(cls._attributes.values())
262 def get_attribute(cls, name):
263 """ Returns a copy of the attribute with name 'name'
266 return copy.deepcopy(cls._attributes[name])
271 """ Returns a copy of the traces
274 return copy.deepcopy(cls._traces.values())
278 """ Returns the description of the type of Resource
284 def get_backend(cls):
285 """ Returns the identified of the backend (i.e. testbed, environment)
292 def get_global(cls, name):
293 """ Returns the value of a global attribute
294 Global attribute meaning an attribute for
295 all the resources from a rtype
297 :param name: Name of the attribute
301 global_attr = cls._attributes[name]
302 return global_attr.value
305 def set_global(cls, name, value):
306 """ Set value for a global attribute
308 :param name: Name of the attribute
310 :param name: Value of the attribute
313 global_attr = cls._attributes[name]
314 global_attr.value = value
317 def __init__(self, ec, guid):
318 super(ResourceManager, self).__init__(self.get_rtype())
321 self._ec = weakref.ref(ec)
322 self._connections = set()
323 self._conditions = dict()
325 # the resource instance gets a copy of all attributes
326 self._attrs = copy.deepcopy(self._attributes)
328 # the resource instance gets a copy of all traces
329 self._trcs = copy.deepcopy(self._traces)
331 # Each resource is placed on a deployment group by the EC
333 self.deployment_group = None
335 self._start_time = None
336 self._stop_time = None
337 self._discover_time = None
338 self._provision_time = None
339 self._ready_time = None
340 self._release_time = None
341 self._failed_time = None
343 self._state = ResourceState.NEW
345 # instance lock to synchronize exclusive state change methods (such
346 # as deploy and release methods), in order to prevent them from being
347 # executed at the same time
348 self._release_lock = threading.Lock()
352 """ Returns the global unique identifier of the RM """
357 """ Returns the Experiment Controller of the RM """
361 def connections(self):
362 """ Returns the set of guids of connected RMs """
363 return self._connections
366 def conditions(self):
367 """ Returns the conditions to which the RM is subjected to.
369 This method returns a dictionary of conditions lists indexed by
373 return self._conditions
376 def start_time(self):
377 """ Returns the start time of the RM as a timestamp """
378 return self._start_time
382 """ Returns the stop time of the RM as a timestamp """
383 return self._stop_time
386 def discover_time(self):
387 """ Returns the discover time of the RM as a timestamp """
388 return self._discover_time
391 def provision_time(self):
392 """ Returns the provision time of the RM as a timestamp """
393 return self._provision_time
396 def ready_time(self):
397 """ Returns the deployment time of the RM as a timestamp """
398 return self._ready_time
401 def release_time(self):
402 """ Returns the release time of the RM as a timestamp """
403 return self._release_time
406 def failed_time(self):
407 """ Returns the time failure occured for the RM as a timestamp """
408 return self._failed_time
412 """ Get the current state of the RM """
415 def log_message(self, msg):
416 """ Returns the log message formatted with added information.
418 :param msg: text message
423 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
425 def register_connection(self, guid):
426 """ Registers a connection to the RM identified by guid
428 This method should not be overriden. Specific functionality
429 should be added in the do_connect method.
431 :param guid: Global unique identified of the RM to connect to
435 if self.valid_connection(guid):
436 self.do_connect(guid)
437 self._connections.add(guid)
439 def unregister_connection(self, guid):
440 """ Removes a registered connection to the RM identified by guid
442 This method should not be overriden. Specific functionality
443 should be added in the do_disconnect method.
445 :param guid: Global unique identified of the RM to connect to
449 if guid in self._connections:
450 self.do_disconnect(guid)
451 self._connections.remove(guid)
455 """ Performs resource discovery.
457 This method is responsible for selecting an individual resource
458 matching user requirements.
460 This method should not be overriden directly. Specific functionality
461 should be added in the do_discover method.
464 with self._release_lock:
465 if self._state != ResourceState.RELEASED:
470 """ Performs resource provisioning.
472 This method is responsible for provisioning one resource.
473 After this method has been successfully invoked, the resource
474 should be accessible/controllable by the RM.
476 This method should not be overriden directly. Specific functionality
477 should be added in the do_provision method.
480 with self._release_lock:
481 if self._state != ResourceState.RELEASED:
486 """ Starts the RM (e.g. launch remote process).
488 There is no standard start behavior. Some RMs will not need to perform
489 any actions upon start.
491 This method should not be overriden directly. Specific functionality
492 should be added in the do_start method.
496 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
497 self.error("Wrong state %s for start" % self.state)
500 with self._release_lock:
501 if self._state != ResourceState.RELEASED:
506 """ Interrupts the RM, stopping any tasks the RM was performing.
508 There is no standard stop behavior. Some RMs will not need to perform
509 any actions upon stop.
511 This method should not be overriden directly. Specific functionality
512 should be added in the do_stop method.
515 if not self.state in [ResourceState.STARTED]:
516 self.error("Wrong state %s for stop" % self.state)
519 with self._release_lock:
524 """ Execute all steps required for the RM to reach the state READY.
526 This method is responsible for deploying the resource (and invoking
527 the discover and provision methods).
529 This method should not be overriden directly. Specific functionality
530 should be added in the do_deploy method.
533 if self.state > ResourceState.READY:
534 self.error("Wrong state %s for deploy" % self.state)
537 with self._release_lock:
538 if self._state != ResourceState.RELEASED:
542 """ Perform actions to free resources used by the RM.
544 This method is responsible for releasing resources that were
545 used during the experiment by the RM.
547 This method should not be overriden directly. Specific functionality
548 should be added in the do_release method.
551 with self._release_lock:
556 err = traceback.format_exc()
562 """ Sets the RM to state FAILED.
564 This method should not be overriden directly. Specific functionality
565 should be added in the do_fail method.
568 with self._release_lock:
569 if self._state != ResourceState.RELEASED:
572 def set(self, name, value):
573 """ Set the value of the attribute
575 :param name: Name of the attribute
577 :param name: Value of the attribute
580 attr = self._attrs[name]
585 """ Returns the value of the attribute
587 :param name: Name of the attribute
591 attr = self._attrs[name]
592 if attr.has_flag(Flags.Global):
593 self.warning( "Attribute %s is global. Use get_global instead." % name)
597 def has_changed(self, name):
598 """ Returns the True is the value of the attribute
599 has been modified by the user.
601 :param name: Name of the attribute
605 attr = self._attrs[name]
606 return attr.has_changed()
608 def has_flag(self, name, flag):
609 """ Returns true if the attribute has the flag 'flag'
611 :param flag: Flag to be checked
614 attr = self._attrs[name]
615 return attr.has_flag(flag)
617 def has_attribute(self, name):
618 """ Returns true if the RM has an attribute with name
620 :param name: name of the attribute
623 return name in self._attrs
625 def enable_trace(self, name):
626 """ Explicitly enable trace generation
628 :param name: Name of the trace
631 trace = self._trcs[name]
634 def trace_enabled(self, name):
635 """Returns True if trace is enables
637 :param name: Name of the trace
640 trace = self._trcs[name]
643 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
644 """ Get information on collected trace
646 :param name: Name of the trace
649 :param attr: Can be one of:
650 - TraceAttr.ALL (complete trace content),
651 - TraceAttr.STREAM (block in bytes to read starting at offset),
652 - TraceAttr.PATH (full path to the trace file),
653 - TraceAttr.SIZE (size of trace file).
656 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
659 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
666 def register_condition(self, action, group, state, time = None):
667 """ Registers a condition on the resource manager to allow execution
668 of 'action' only after 'time' has elapsed from the moment all resources
669 in 'group' reached state 'state'
671 :param action: Action to restrict to condition (either 'START' or 'STOP')
673 :param group: Group of RMs to wait for (list of guids)
674 :type group: int or list of int
675 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
677 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
682 if not action in self.conditions:
683 self._conditions[action] = list()
685 conditions = self.conditions.get(action)
687 # For each condition to register a tuple of (group, state, time) is
688 # added to the 'action' list
689 if not isinstance(group, list):
692 conditions.append((group, state, time))
694 def unregister_condition(self, group, action = None):
695 """ Removed conditions for a certain group of guids
697 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
700 :param group: Group of RMs to wait for (list of guids)
701 :type group: int or list of int
704 # For each condition a tuple of (group, state, time) is
705 # added to the 'action' list
706 if not isinstance(group, list):
709 for act, conditions in self.conditions.iteritems():
710 if action and act != action:
713 for condition in list(conditions):
714 (grp, state, time) = condition
716 # If there is an intersection between grp and group,
717 # then remove intersected elements
718 intsec = set(group).intersection(set(grp))
720 idx = conditions.index(condition)
722 newgrp.difference_update(intsec)
723 conditions[idx] = (newgrp, state, time)
725 def get_connected(self, rtype = None):
726 """ Returns the list of RM with the type 'rtype'
728 :param rtype: Type of the RM we look for
730 :return: list of guid
733 rclass = ResourceFactory.get_resource_type(rtype)
734 for guid in self.connections:
736 rm = self.ec.get_resource(guid)
737 if not rtype or isinstance(rm, rclass):
742 def _needs_reschedule(self, group, state, time):
743 """ Internal method that verify if 'time' has elapsed since
744 all elements in 'group' have reached state 'state'.
746 :param group: Group of RMs to wait for (list of guids)
747 :type group: int or list of int
748 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
750 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
753 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
754 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
755 For the moment, 2m30s is not a correct syntax.
759 delay = reschedule_delay
761 # check state and time elapsed on all RMs
763 rm = self.ec.get_resource(guid)
765 # If one of the RMs this resource needs to wait for has FAILED
766 # and is critical we raise an exception
767 if rm.state == ResourceState.FAILED:
768 if not rm.get('critical'):
770 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
771 raise RuntimeError, msg
773 # If the RM state is lower than the requested state we must
774 # reschedule (e.g. if RM is READY but we required STARTED).
779 # If there is a time restriction, we must verify the
780 # restriction is satisfied
782 if state == ResourceState.DISCOVERED:
784 if state == ResourceState.PROVISIONED:
785 t = rm.provision_time
786 elif state == ResourceState.READY:
788 elif state == ResourceState.STARTED:
790 elif state == ResourceState.STOPPED:
792 elif state == ResourceState.RELEASED:
797 # time already elapsed since RM changed state
798 waited = "%fs" % tdiffsec(tnow(), t)
801 wait = tdiffsec(stabsformat(time), stabsformat(waited))
808 return reschedule, delay
810 def set_with_conditions(self, name, value, group, state, time):
811 """ Set value 'value' on attribute with name 'name' when 'time'
812 has elapsed since all elements in 'group' have reached state
815 :param name: Name of the attribute to set
817 :param name: Value of the attribute to set
819 :param group: Group of RMs to wait for (list of guids)
820 :type group: int or list of int
821 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
823 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
828 delay = reschedule_delay
830 ## evaluate if set conditions are met
832 # only can set with conditions after the RM is started
833 if self.state != ResourceState.STARTED:
836 reschedule, delay = self._needs_reschedule(group, state, time)
839 callback = functools.partial(self.set_with_conditions,
840 name, value, group, state, time)
841 self.ec.schedule(delay, callback)
843 self.set(name, value)
845 def start_with_conditions(self):
846 """ Starts RM when all the conditions in self.conditions for
847 action 'START' are satisfied.
850 #import pdb;pdb.set_trace()
853 delay = reschedule_delay
856 ## evaluate if conditions to start are met
860 # Can only start when RM is either STOPPED or READY
861 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
863 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
865 start_conditions = self.conditions.get(ResourceAction.START, [])
867 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
869 # Verify all start conditions are met
870 for (group, state, time) in start_conditions:
871 # Uncomment for debug
874 rm = self.ec.get_resource(guid)
875 unmet.append((guid, rm._state))
877 self.debug("---- WAITED STATES ---- %s" % unmet )
879 reschedule, delay = self._needs_reschedule(group, state, time)
884 self.ec.schedule(delay, self.start_with_conditions)
886 self.debug("----- STARTING ---- ")
889 def stop_with_conditions(self):
890 """ Stops RM when all the conditions in self.conditions for
891 action 'STOP' are satisfied.
895 delay = reschedule_delay
897 ## evaluate if conditions to stop are met
901 # only can stop when RM is STARTED
902 if self.state != ResourceState.STARTED:
904 self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
906 self.debug(" ---- STOP CONDITIONS ---- %s" %
907 self.conditions.get(ResourceAction.STOP))
909 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
910 for (group, state, time) in stop_conditions:
911 reschedule, delay = self._needs_reschedule(group, state, time)
916 callback = functools.partial(self.stop_with_conditions)
917 self.ec.schedule(delay, callback)
919 self.debug(" ----- STOPPING ---- ")
922 def deploy_with_conditions(self):
923 """ Deploy RM when all the conditions in self.conditions for
924 action 'READY' are satisfied.
928 delay = reschedule_delay
930 ## evaluate if conditions to deploy are met
934 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
935 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
936 ResourceState.PROVISIONED]:
938 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
940 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
942 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
944 # Verify all start conditions are met
945 for (group, state, time) in deploy_conditions:
946 # Uncomment for debug
949 # rm = self.ec.get_resource(guid)
950 # unmet.append((guid, rm._state))
952 #self.debug("---- WAITED STATES ---- %s" % unmet )
954 reschedule, delay = self._needs_reschedule(group, state, time)
959 self.ec.schedule(delay, self.deploy_with_conditions)
961 self.debug("----- DEPLOYING ---- ")
964 def do_connect(self, guid):
965 """ Performs actions that need to be taken upon associating RMs.
966 This method should be redefined when necessary in child classes.
970 def do_disconnect(self, guid):
971 """ Performs actions that need to be taken upon disassociating RMs.
972 This method should be redefined when necessary in child classes.
976 def valid_connection(self, guid):
977 """Checks whether a connection with the other RM
979 This method need to be redefined by each new Resource Manager.
981 :param guid: Guid of the current Resource Manager
989 def do_discover(self):
990 self.set_discovered()
992 def do_provision(self):
993 self.set_provisioned()
1001 def do_deploy(self):
1004 def do_release(self):
1010 def set_started(self):
1011 """ Mark ResourceManager as STARTED """
1012 self.set_state(ResourceState.STARTED, "_start_time")
1013 self.debug("----- STARTED ---- ")
1015 def set_stopped(self):
1016 """ Mark ResourceManager as STOPPED """
1017 self.set_state(ResourceState.STOPPED, "_stop_time")
1018 self.debug("----- STOPPED ---- ")
1020 def set_ready(self):
1021 """ Mark ResourceManager as READY """
1022 self.set_state(ResourceState.READY, "_ready_time")
1023 self.debug("----- READY ---- ")
1025 def set_released(self):
1026 """ Mark ResourceManager as REALEASED """
1027 self.set_state(ResourceState.RELEASED, "_release_time")
1028 self.debug("----- RELEASED ---- ")
1030 def set_failed(self):
1031 """ Mark ResourceManager as FAILED """
1032 self.set_state(ResourceState.FAILED, "_failed_time")
1033 self.debug("----- FAILED ---- ")
1035 def set_discovered(self):
1036 """ Mark ResourceManager as DISCOVERED """
1037 self.set_state(ResourceState.DISCOVERED, "_discover_time")
1038 self.debug("----- DISCOVERED ---- ")
1040 def set_provisioned(self):
1041 """ Mark ResourceManager as PROVISIONED """
1042 self.set_state(ResourceState.PROVISIONED, "_provision_time")
1043 self.debug("----- PROVISIONED ---- ")
1045 def set_state(self, state, state_time_attr):
1046 """ Set the state of the RM while keeping a trace of the time """
1048 # Ensure that RM state will not change after released
1049 if self._state == ResourceState.RELEASED:
1052 setattr(self, state_time_attr, tnow())
1055 class ResourceFactory(object):
1056 _resource_types = dict()
1059 def resource_types(cls):
1060 """Return the type of the Class"""
1061 return cls._resource_types
1064 def get_resource_type(cls, rtype):
1065 """Return the type of the Class"""
1066 return cls._resource_types.get(rtype)
1069 def register_type(cls, rclass):
1070 """Register a new Ressource Manager"""
1071 cls._resource_types[rclass.get_rtype()] = rclass
1074 def create(cls, rtype, ec, guid):
1075 """Create a new instance of a Ressource Manager"""
1076 rclass = cls._resource_types[rtype]
1077 return rclass(ec, guid)
1079 def populate_factory():
1080 """Register all the possible RM that exists in the current version of Nepi.
1082 # Once the factory is populated, don't repopulate
1083 if not ResourceFactory.resource_types():
1084 for rclass in find_types():
1085 ResourceFactory.register_type(rclass)
1088 """Look into the different folders to find all the
1089 availables Resources Managers
1091 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1092 search_path = set(search_path.split(" "))
1095 import nepi.resources
1096 path = os.path.dirname(nepi.resources.__file__)
1097 search_path.add(path)
1101 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
1102 prefix = "nepi.resources."):
1104 loader = importer.find_module(modname)
1107 # Notice: Repeated calls to load_module will act as a reload of the module
1108 if modname in sys.modules:
1109 module = sys.modules.get(modname)
1111 module = loader.load_module(modname)
1113 for attrname in dir(module):
1114 if attrname.startswith("_"):
1117 attr = getattr(module, attrname)
1119 if attr == ResourceManager:
1122 if not inspect.isclass(attr):
1125 if issubclass(attr, ResourceManager):
1128 if not modname in sys.modules:
1129 sys.modules[modname] = module
1134 err = traceback.format_exc()
1135 logger = logging.getLogger("Resource.find_types()")
1136 logger.error("Error while loading Resource Managers %s" % err)