2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
34 reschedule_delay = "1s"
37 """ Action that a user can order to a Resource Manager
45 """ State of a Resource Manager
57 ResourceState2str = dict({
58 ResourceState.NEW : "NEW",
59 ResourceState.DISCOVERED : "DISCOVERED",
60 ResourceState.PROVISIONED : "PROVISIONED",
61 ResourceState.READY : "READY",
62 ResourceState.STARTED : "STARTED",
63 ResourceState.STOPPED : "STOPPED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 on classes derived from the ResourceManager class.
72 It is used as a decorator in the class declaration as follows:
75 class MyResourceManager(ResourceManager):
84 def clsinit_copy(cls):
85 """ Initializes template information (i.e. attributes and traces)
86 on classes direved from the ResourceManager class.
87 It differs from the clsinit method in that it forces inheritance
88 of attributes and traces from the parent class.
90 It is used as a decorator in the class declaration as follows:
93 class MyResourceManager(ResourceManager):
98 clsinit_copy should be prefered to clsinit when creating new
99 ResourceManager child classes.
107 """ Decorator function for instance methods that should set the
108 RM state to FAILED when an error is raised. The methods that must be
109 decorated are: discover, provision, deploy, start, stop.
112 def wrapped(self, *args, **kwargs):
114 return func(self, *args, **kwargs)
117 err = traceback.format_exc()
119 self.debug("SETTING guid %d to state FAILED" % self.guid)
126 class ResourceManager(Logger):
127 """ Base clase for all ResourceManagers.
129 A ResourceManger is specific to a resource type (e.g. Node,
130 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
133 The ResourceManager instances are responsible for interacting with
134 and controlling concrete (physical or virtual) resources in the
135 experimental backends.
145 def _register_attribute(cls, attr):
146 """ Resource subclasses will invoke this method to add a
151 cls._attributes[attr.name] = attr
154 def _remove_attribute(cls, name):
155 """ Resource subclasses will invoke this method to remove a
160 del cls._attributes[name]
163 def _register_trace(cls, trace):
164 """ Resource subclasses will invoke this method to add a
169 cls._traces[trace.name] = trace
172 def _remove_trace(cls, name):
173 """ Resource subclasses will invoke this method to remove a
178 del cls._traces[name]
181 def _register_attributes(cls):
182 """ Resource subclasses will invoke this method to register
185 This method should be overriden in the RMs that define
190 critical = Attribute("critical",
191 "Defines whether the resource is critical. "
192 "A failure on a critical resource will interrupt "
196 flags = Flags.Design)
198 cls._register_attribute(critical)
201 def _register_traces(cls):
202 """ Resource subclasses will invoke this method to register
205 This method should be overriden in the RMs that define traces.
213 """ ResourceManager classes have different attributes and traces.
214 Attribute and traces are stored in 'class attribute' dictionaries.
215 When a new ResourceManager class is created, the _clsinit method is
216 called to create a new instance of those dictionaries and initialize
219 The _clsinit method is called by the clsinit decorator method.
223 # static template for resource attributes
224 cls._attributes = dict()
225 cls._register_attributes()
227 # static template for resource traces
229 cls._register_traces()
232 def _clsinit_copy(cls):
233 """ Same as _clsinit, except that after creating new instances of the
234 dictionaries it copies all the attributes and traces from the parent
237 The _clsinit_copy method is called by the clsinit_copy decorator method.
240 # static template for resource attributes
241 cls._attributes = copy.deepcopy(cls._attributes)
242 cls._register_attributes()
244 # static template for resource traces
245 cls._traces = copy.deepcopy(cls._traces)
246 cls._register_traces()
250 """ Returns the type of the Resource Manager
256 def get_attributes(cls):
257 """ Returns a copy of the attributes
260 return copy.deepcopy(cls._attributes.values())
263 def get_attribute(cls, name):
264 """ Returns a copy of the attribute with name 'name'
267 return copy.deepcopy(cls._attributes[name])
272 """ Returns a copy of the traces
275 return copy.deepcopy(cls._traces.values())
279 """ Returns the description of the type of Resource
285 def get_backend(cls):
286 """ Returns the identified of the backend (i.e. testbed, environment)
292 def __init__(self, ec, guid):
293 super(ResourceManager, self).__init__(self.get_rtype())
296 self._ec = weakref.ref(ec)
297 self._connections = set()
298 self._conditions = dict()
300 # the resource instance gets a copy of all attributes
301 self._attrs = copy.deepcopy(self._attributes)
303 # the resource instance gets a copy of all traces
304 self._trcs = copy.deepcopy(self._traces)
306 # Each resource is placed on a deployment group by the EC
308 self.deployment_group = None
310 self._start_time = None
311 self._stop_time = None
312 self._discover_time = None
313 self._provision_time = None
314 self._ready_time = None
315 self._release_time = None
316 self._failed_time = None
318 self._state = ResourceState.NEW
320 # instance lock to synchronize exclusive state change methods (such
321 # as deploy and release methods), in order to prevent them from being
322 # executed at the same time
323 self._release_lock = threading.Lock()
327 """ Returns the global unique identifier of the RM """
332 """ Returns the Experiment Controller of the RM """
336 def connections(self):
337 """ Returns the set of guids of connected RMs """
338 return self._connections
341 def conditions(self):
342 """ Returns the conditions to which the RM is subjected to.
344 This method returns a dictionary of conditions lists indexed by
348 return self._conditions
351 def start_time(self):
352 """ Returns the start time of the RM as a timestamp """
353 return self._start_time
357 """ Returns the stop time of the RM as a timestamp """
358 return self._stop_time
361 def discover_time(self):
362 """ Returns the discover time of the RM as a timestamp """
363 return self._discover_time
366 def provision_time(self):
367 """ Returns the provision time of the RM as a timestamp """
368 return self._provision_time
371 def ready_time(self):
372 """ Returns the deployment time of the RM as a timestamp """
373 return self._ready_time
376 def release_time(self):
377 """ Returns the release time of the RM as a timestamp """
378 return self._release_time
381 def failed_time(self):
382 """ Returns the time failure occured for the RM as a timestamp """
383 return self._failed_time
387 """ Get the current state of the RM """
390 def log_message(self, msg):
391 """ Returns the log message formatted with added information.
393 :param msg: text message
398 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
400 def register_connection(self, guid):
401 """ Registers a connection to the RM identified by guid
403 This method should not be overriden. Specific functionality
404 should be added in the do_connect method.
406 :param guid: Global unique identified of the RM to connect to
410 if self.valid_connection(guid):
411 self.do_connect(guid)
412 self._connections.add(guid)
414 def unregister_connection(self, guid):
415 """ Removes a registered connection to the RM identified by guid
417 This method should not be overriden. Specific functionality
418 should be added in the do_disconnect method.
420 :param guid: Global unique identified of the RM to connect to
424 if guid in self._connections:
425 self.do_disconnect(guid)
426 self._connections.remove(guid)
430 """ Performs resource discovery.
432 This method is responsible for selecting an individual resource
433 matching user requirements.
435 This method should not be overriden directly. Specific functionality
436 should be added in the do_discover method.
439 with self._release_lock:
440 if self._state != ResourceState.RELEASED:
445 """ Performs resource provisioning.
447 This method is responsible for provisioning one resource.
448 After this method has been successfully invoked, the resource
449 should be accessible/controllable by the RM.
451 This method should not be overriden directly. Specific functionality
452 should be added in the do_provision method.
455 with self._release_lock:
456 if self._state != ResourceState.RELEASED:
461 """ Starts the RM (e.g. launch remote process).
463 There is no standard start behavior. Some RMs will not need to perform
464 any actions upon start.
466 This method should not be overriden directly. Specific functionality
467 should be added in the do_start method.
471 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
472 self.error("Wrong state %s for start" % self.state)
475 with self._release_lock:
476 if self._state != ResourceState.RELEASED:
481 """ Interrupts the RM, stopping any tasks the RM was performing.
483 There is no standard stop behavior. Some RMs will not need to perform
484 any actions upon stop.
486 This method should not be overriden directly. Specific functionality
487 should be added in the do_stop method.
490 if not self.state in [ResourceState.STARTED]:
491 self.error("Wrong state %s for stop" % self.state)
494 with self._release_lock:
499 """ Execute all steps required for the RM to reach the state READY.
501 This method is responsible for deploying the resource (and invoking
502 the discover and provision methods).
504 This method should not be overriden directly. Specific functionality
505 should be added in the do_deploy method.
508 if self.state > ResourceState.READY:
509 self.error("Wrong state %s for deploy" % self.state)
512 with self._release_lock:
513 if self._state != ResourceState.RELEASED:
517 """ Perform actions to free resources used by the RM.
519 This method is responsible for releasing resources that were
520 used during the experiment by the RM.
522 This method should not be overriden directly. Specific functionality
523 should be added in the do_release method.
526 with self._release_lock:
531 err = traceback.format_exc()
537 """ Sets the RM to state FAILED.
539 This method should not be overriden directly. Specific functionality
540 should be added in the do_fail method.
543 with self._release_lock:
544 if self._state != ResourceState.RELEASED:
547 def set(self, name, value):
548 """ Set the value of the attribute
550 :param name: Name of the attribute
552 :param name: Value of the attribute
555 attr = self._attrs[name]
559 """ Returns the value of the attribute
561 :param name: Name of the attribute
565 attr = self._attrs[name]
568 def has_changed(self, name):
569 """ Returns the True is the value of the attribute
570 has been modified by the user.
572 :param name: Name of the attribute
576 attr = self._attrs[name]
577 return attr.has_changed()
579 def enable_trace(self, name):
580 """ Explicitly enable trace generation
582 :param name: Name of the trace
585 trace = self._trcs[name]
588 def trace_enabled(self, name):
589 """Returns True if trace is enables
591 :param name: Name of the trace
594 trace = self._trcs[name]
597 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
598 """ Get information on collected trace
600 :param name: Name of the trace
603 :param attr: Can be one of:
604 - TraceAttr.ALL (complete trace content),
605 - TraceAttr.STREAM (block in bytes to read starting at offset),
606 - TraceAttr.PATH (full path to the trace file),
607 - TraceAttr.SIZE (size of trace file).
610 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
613 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
620 def register_condition(self, action, group, state, time = None):
621 """ Registers a condition on the resource manager to allow execution
622 of 'action' only after 'time' has elapsed from the moment all resources
623 in 'group' reached state 'state'
625 :param action: Action to restrict to condition (either 'START' or 'STOP')
627 :param group: Group of RMs to wait for (list of guids)
628 :type group: int or list of int
629 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
631 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
636 if not action in self.conditions:
637 self._conditions[action] = list()
639 conditions = self.conditions.get(action)
641 # For each condition to register a tuple of (group, state, time) is
642 # added to the 'action' list
643 if not isinstance(group, list):
646 conditions.append((group, state, time))
648 def unregister_condition(self, group, action = None):
649 """ Removed conditions for a certain group of guids
651 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
654 :param group: Group of RMs to wait for (list of guids)
655 :type group: int or list of int
658 # For each condition a tuple of (group, state, time) is
659 # added to the 'action' list
660 if not isinstance(group, list):
663 for act, conditions in self.conditions.iteritems():
664 if action and act != action:
667 for condition in list(conditions):
668 (grp, state, time) = condition
670 # If there is an intersection between grp and group,
671 # then remove intersected elements
672 intsec = set(group).intersection(set(grp))
674 idx = conditions.index(condition)
676 newgrp.difference_update(intsec)
677 conditions[idx] = (newgrp, state, time)
679 def get_connected(self, rtype = None):
680 """ Returns the list of RM with the type 'rtype'
682 :param rtype: Type of the RM we look for
684 :return: list of guid
687 rclass = ResourceFactory.get_resource_type(rtype)
688 for guid in self.connections:
689 rm = self.ec.get_resource(guid)
691 if not rtype or isinstance(rm, rclass):
696 def _needs_reschedule(self, group, state, time):
697 """ Internal method that verify if 'time' has elapsed since
698 all elements in 'group' have reached state 'state'.
700 :param group: Group of RMs to wait for (list of guids)
701 :type group: int or list of int
702 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
704 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
707 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
708 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
709 For the moment, 2m30s is not a correct syntax.
713 delay = reschedule_delay
715 # check state and time elapsed on all RMs
717 rm = self.ec.get_resource(guid)
719 # If one of the RMs this resource needs to wait for has FAILED
720 # and is critical we raise an exception
721 if rm.state == ResourceState.FAILED:
722 if not rm.get('critical'):
724 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
725 raise RuntimeError, msg
727 # If the RM state is lower than the requested state we must
728 # reschedule (e.g. if RM is READY but we required STARTED).
733 # If there is a time restriction, we must verify the
734 # restriction is satisfied
736 if state == ResourceState.DISCOVERED:
738 if state == ResourceState.PROVISIONED:
739 t = rm.provision_time
740 elif state == ResourceState.READY:
742 elif state == ResourceState.STARTED:
744 elif state == ResourceState.STOPPED:
746 elif state == ResourceState.RELEASED:
751 # time already elapsed since RM changed state
752 waited = "%fs" % tdiffsec(tnow(), t)
755 wait = tdiffsec(stabsformat(time), stabsformat(waited))
762 return reschedule, delay
764 def set_with_conditions(self, name, value, group, state, time):
765 """ Set value 'value' on attribute with name 'name' when 'time'
766 has elapsed since all elements in 'group' have reached state
769 :param name: Name of the attribute to set
771 :param name: Value of the attribute to set
773 :param group: Group of RMs to wait for (list of guids)
774 :type group: int or list of int
775 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
777 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
782 delay = reschedule_delay
784 ## evaluate if set conditions are met
786 # only can set with conditions after the RM is started
787 if self.state != ResourceState.STARTED:
790 reschedule, delay = self._needs_reschedule(group, state, time)
793 callback = functools.partial(self.set_with_conditions,
794 name, value, group, state, time)
795 self.ec.schedule(delay, callback)
797 self.set(name, value)
799 def start_with_conditions(self):
800 """ Starts RM when all the conditions in self.conditions for
801 action 'START' are satisfied.
804 #import pdb;pdb.set_trace()
807 delay = reschedule_delay
810 ## evaluate if conditions to start are met
814 # Can only start when RM is either STOPPED or READY
815 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
817 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
819 start_conditions = self.conditions.get(ResourceAction.START, [])
821 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
823 # Verify all start conditions are met
824 for (group, state, time) in start_conditions:
825 # Uncomment for debug
828 rm = self.ec.get_resource(guid)
829 unmet.append((guid, rm._state))
831 self.debug("---- WAITED STATES ---- %s" % unmet )
833 reschedule, delay = self._needs_reschedule(group, state, time)
838 self.ec.schedule(delay, self.start_with_conditions)
840 self.debug("----- STARTING ---- ")
843 def stop_with_conditions(self):
844 """ Stops RM when all the conditions in self.conditions for
845 action 'STOP' are satisfied.
849 delay = reschedule_delay
851 ## evaluate if conditions to stop are met
855 # only can stop when RM is STARTED
856 if self.state != ResourceState.STARTED:
858 self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
860 self.debug(" ---- STOP CONDITIONS ---- %s" %
861 self.conditions.get(ResourceAction.STOP))
863 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
864 for (group, state, time) in stop_conditions:
865 reschedule, delay = self._needs_reschedule(group, state, time)
870 callback = functools.partial(self.stop_with_conditions)
871 self.ec.schedule(delay, callback)
873 self.debug(" ----- STOPPING ---- ")
876 def deploy_with_conditions(self):
877 """ Deploy RM when all the conditions in self.conditions for
878 action 'READY' are satisfied.
882 delay = reschedule_delay
884 ## evaluate if conditions to deploy are met
888 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
889 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
890 ResourceState.PROVISIONED]:
892 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
894 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
896 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
898 # Verify all start conditions are met
899 for (group, state, time) in deploy_conditions:
900 # Uncomment for debug
903 # rm = self.ec.get_resource(guid)
904 # unmet.append((guid, rm._state))
906 #self.debug("---- WAITED STATES ---- %s" % unmet )
908 reschedule, delay = self._needs_reschedule(group, state, time)
913 self.ec.schedule(delay, self.deploy_with_conditions)
915 self.debug("----- DEPLOYING ---- ")
918 def do_connect(self, guid):
919 """ Performs actions that need to be taken upon associating RMs.
920 This method should be redefined when necessary in child classes.
924 def do_disconnect(self, guid):
925 """ Performs actions that need to be taken upon disassociating RMs.
926 This method should be redefined when necessary in child classes.
930 def valid_connection(self, guid):
931 """Checks whether a connection with the other RM
933 This method need to be redefined by each new Resource Manager.
935 :param guid: Guid of the current Resource Manager
943 def do_discover(self):
944 self.set_discovered()
946 def do_provision(self):
947 self.set_provisioned()
958 def do_release(self):
964 def set_started(self):
965 """ Mark ResourceManager as STARTED """
966 self.set_state(ResourceState.STARTED, "_start_time")
967 self.debug("----- STARTED ---- ")
969 def set_stopped(self):
970 """ Mark ResourceManager as STOPPED """
971 self.set_state(ResourceState.STOPPED, "_stop_time")
972 self.debug("----- STOPPED ---- ")
975 """ Mark ResourceManager as READY """
976 self.set_state(ResourceState.READY, "_ready_time")
977 self.debug("----- READY ---- ")
979 def set_released(self):
980 """ Mark ResourceManager as REALEASED """
981 self.set_state(ResourceState.RELEASED, "_release_time")
982 self.debug("----- RELEASED ---- ")
984 def set_failed(self):
985 """ Mark ResourceManager as FAILED """
986 self.set_state(ResourceState.FAILED, "_failed_time")
987 self.debug("----- FAILED ---- ")
989 def set_discovered(self):
990 """ Mark ResourceManager as DISCOVERED """
991 self.set_state(ResourceState.DISCOVERED, "_discover_time")
992 self.debug("----- DISCOVERED ---- ")
994 def set_provisioned(self):
995 """ Mark ResourceManager as PROVISIONED """
996 self.set_state(ResourceState.PROVISIONED, "_provision_time")
997 self.debug("----- PROVISIONED ---- ")
999 def set_state(self, state, state_time_attr):
1000 """ Set the state of the RM while keeping a trace of the time """
1002 # Ensure that RM state will not change after released
1003 if self._state == ResourceState.RELEASED:
1006 setattr(self, state_time_attr, tnow())
1009 class ResourceFactory(object):
1010 _resource_types = dict()
1013 def resource_types(cls):
1014 """Return the type of the Class"""
1015 return cls._resource_types
1018 def get_resource_type(cls, rtype):
1019 """Return the type of the Class"""
1020 return cls._resource_types.get(rtype)
1023 def register_type(cls, rclass):
1024 """Register a new Ressource Manager"""
1025 cls._resource_types[rclass.get_rtype()] = rclass
1028 def create(cls, rtype, ec, guid):
1029 """Create a new instance of a Ressource Manager"""
1030 rclass = cls._resource_types[rtype]
1031 return rclass(ec, guid)
1033 def populate_factory():
1034 """Register all the possible RM that exists in the current version of Nepi.
1036 # Once the factory is populated, don't repopulate
1037 if not ResourceFactory.resource_types():
1038 for rclass in find_types():
1039 ResourceFactory.register_type(rclass)
1042 """Look into the different folders to find all the
1043 availables Resources Managers
1045 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1046 search_path = set(search_path.split(" "))
1049 import nepi.resources
1050 path = os.path.dirname(nepi.resources.__file__)
1051 search_path.add(path)
1055 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
1056 prefix = "nepi.resources."):
1058 loader = importer.find_module(modname)
1061 # Notice: Repeated calls to load_module will act as a reload of the module
1062 if modname in sys.modules:
1063 module = sys.modules.get(modname)
1065 module = loader.load_module(modname)
1067 for attrname in dir(module):
1068 if attrname.startswith("_"):
1071 attr = getattr(module, attrname)
1073 if attr == ResourceManager:
1076 if not inspect.isclass(attr):
1079 if issubclass(attr, ResourceManager):
1082 if not modname in sys.modules:
1083 sys.modules[modname] = module
1088 err = traceback.format_exc()
1089 logger = logging.getLogger("Resource.find_types()")
1090 logger.error("Error while loading Resource Managers %s" % err)