2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
34 reschedule_delay = "1s"
37 """ Action that a user can order to a Resource Manager
45 """ State of a Resource Manager
58 ResourceState2str = dict({
59 ResourceState.NEW : "NEW",
60 ResourceState.DISCOVERED : "DISCOVERED",
61 ResourceState.PROVISIONED : "PROVISIONED",
62 ResourceState.READY : "READY",
63 ResourceState.STARTED : "STARTED",
64 ResourceState.STOPPED : "STOPPED",
65 ResourceState.FINISHED : "FINISHED",
66 ResourceState.FAILED : "FAILED",
67 ResourceState.RELEASED : "RELEASED",
71 """ Initializes template information (i.e. attributes and traces)
72 on classes derived from the ResourceManager class.
74 It is used as a decorator in the class declaration as follows:
77 class MyResourceManager(ResourceManager):
86 def clsinit_copy(cls):
87 """ Initializes template information (i.e. attributes and traces)
88 on classes direved from the ResourceManager class.
89 It differs from the clsinit method in that it forces inheritance
90 of attributes and traces from the parent class.
92 It is used as a decorator in the class declaration as follows:
95 class MyResourceManager(ResourceManager):
100 clsinit_copy should be prefered to clsinit when creating new
101 ResourceManager child classes.
109 """ Decorator function for instance methods that should set the
110 RM state to FAILED when an error is raised. The methods that must be
111 decorated are: discover, provision, deploy, start, stop and finish.
114 def wrapped(self, *args, **kwargs):
116 return func(self, *args, **kwargs)
119 err = traceback.format_exc()
121 self.debug("SETTING guid %d to state FAILED" % self.guid)
128 class ResourceManager(Logger):
129 """ Base clase for all ResourceManagers.
131 A ResourceManger is specific to a resource type (e.g. Node,
132 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
135 The ResourceManager instances are responsible for interacting with
136 and controlling concrete (physical or virtual) resources in the
137 experimental backends.
147 def _register_attribute(cls, attr):
148 """ Resource subclasses will invoke this method to add a
153 cls._attributes[attr.name] = attr
156 def _remove_attribute(cls, name):
157 """ Resource subclasses will invoke this method to remove a
162 del cls._attributes[name]
165 def _register_trace(cls, trace):
166 """ Resource subclasses will invoke this method to add a
171 cls._traces[trace.name] = trace
174 def _remove_trace(cls, name):
175 """ Resource subclasses will invoke this method to remove a
180 del cls._traces[name]
183 def _register_attributes(cls):
184 """ Resource subclasses will invoke this method to register
187 This method should be overriden in the RMs that define
192 critical = Attribute("critical",
193 "Defines whether the resource is critical. "
194 "A failure on a critical resource will interrupt "
198 flags = Flags.ExecReadOnly)
200 cls._register_attribute(critical)
203 def _register_traces(cls):
204 """ Resource subclasses will invoke this method to register
207 This method should be overriden in the RMs that define traces.
215 """ ResourceManager classes have different attributes and traces.
216 Attribute and traces are stored in 'class attribute' dictionaries.
217 When a new ResourceManager class is created, the _clsinit method is
218 called to create a new instance of those dictionaries and initialize
221 The _clsinit method is called by the clsinit decorator method.
225 # static template for resource attributes
226 cls._attributes = dict()
227 cls._register_attributes()
229 # static template for resource traces
231 cls._register_traces()
234 def _clsinit_copy(cls):
235 """ Same as _clsinit, except that after creating new instances of the
236 dictionaries it copies all the attributes and traces from the parent
239 The _clsinit_copy method is called by the clsinit_copy decorator method.
242 # static template for resource attributes
243 cls._attributes = copy.deepcopy(cls._attributes)
244 cls._register_attributes()
246 # static template for resource traces
247 cls._traces = copy.deepcopy(cls._traces)
248 cls._register_traces()
252 """ Returns the type of the Resource Manager
258 def get_attributes(cls):
259 """ Returns a copy of the attributes
262 return copy.deepcopy(cls._attributes.values())
266 """ Returns a copy of the traces
269 return copy.deepcopy(cls._traces.values())
273 """ Returns the description of the type of Resource
279 def get_backend(cls):
280 """ Returns the identified of the backend (i.e. testbed, environment)
286 def __init__(self, ec, guid):
287 super(ResourceManager, self).__init__(self.rtype())
290 self._ec = weakref.ref(ec)
291 self._connections = set()
292 self._conditions = dict()
294 # the resource instance gets a copy of all attributes
295 self._attrs = copy.deepcopy(self._attributes)
297 # the resource instance gets a copy of all traces
298 self._trcs = copy.deepcopy(self._traces)
300 # Each resource is placed on a deployment group by the EC
302 self.deployment_group = None
304 self._start_time = None
305 self._stop_time = None
306 self._discover_time = None
307 self._provision_time = None
308 self._ready_time = None
309 self._release_time = None
310 self._finish_time = None
311 self._failed_time = None
313 self._state = ResourceState.NEW
315 # instance lock to synchronize exclusive state change methods (such
316 # as deploy and release methods), in order to prevent them from being
317 # executed at the same time
318 self._release_lock = threading.Lock()
322 """ Returns the global unique identifier of the RM """
327 """ Returns the Experiment Controller of the RM """
331 def connections(self):
332 """ Returns the set of guids of connected RMs """
333 return self._connections
336 def conditions(self):
337 """ Returns the conditions to which the RM is subjected to.
339 This method returns a dictionary of conditions lists indexed by
343 return self._conditions
346 def start_time(self):
347 """ Returns the start time of the RM as a timestamp """
348 return self._start_time
352 """ Returns the stop time of the RM as a timestamp """
353 return self._stop_time
356 def discover_time(self):
357 """ Returns the discover time of the RM as a timestamp """
358 return self._discover_time
361 def provision_time(self):
362 """ Returns the provision time of the RM as a timestamp """
363 return self._provision_time
366 def ready_time(self):
367 """ Returns the deployment time of the RM as a timestamp """
368 return self._ready_time
371 def release_time(self):
372 """ Returns the release time of the RM as a timestamp """
373 return self._release_time
376 def finish_time(self):
377 """ Returns the finalization time of the RM as a timestamp """
378 return self._finish_time
381 def failed_time(self):
382 """ Returns the time failure occured for the RM as a timestamp """
383 return self._failed_time
387 """ Get the current state of the RM """
390 def log_message(self, msg):
391 """ Returns the log message formatted with added information.
393 :param msg: text message
398 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
400 def register_connection(self, guid):
401 """ Registers a connection to the RM identified by guid
403 This method should not be overriden. Specific functionality
404 should be added in the do_connect method.
406 :param guid: Global unique identified of the RM to connect to
410 if self.valid_connection(guid):
411 self.do_connect(guid)
412 self._connections.add(guid)
414 def unregister_connection(self, guid):
415 """ Removes a registered connection to the RM identified by guid
417 This method should not be overriden. Specific functionality
418 should be added in the do_disconnect method.
420 :param guid: Global unique identified of the RM to connect to
424 if guid in self._connections:
425 self.do_disconnect(guid)
426 self._connections.remove(guid)
430 """ Performs resource discovery.
432 This method is responsible for selecting an individual resource
433 matching user requirements.
435 This method should not be overriden directly. Specific functionality
436 should be added in the do_discover method.
439 with self._release_lock:
440 if self._state != ResourceState.RELEASED:
445 """ Performs resource provisioning.
447 This method is responsible for provisioning one resource.
448 After this method has been successfully invoked, the resource
449 should be accessible/controllable by the RM.
451 This method should not be overriden directly. Specific functionality
452 should be added in the do_provision method.
455 with self._release_lock:
456 if self._state != ResourceState.RELEASED:
461 """ Starts the RM (e.g. launch remote process).
463 There is no standard start behavior. Some RMs will not need to perform
464 any actions upon start.
466 This method should not be overriden directly. Specific functionality
467 should be added in the do_start method.
470 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
471 self.error("Wrong state %s for start" % self.state)
474 with self._release_lock:
475 if self._state != ResourceState.RELEASED:
480 """ Interrupts the RM, stopping any tasks the RM was performing.
482 There is no standard stop behavior. Some RMs will not need to perform
483 any actions upon stop.
485 This method should not be overriden directly. Specific functionality
486 should be added in the do_stop method.
489 if not self.state in [ResourceState.STARTED]:
490 self.error("Wrong state %s for stop" % self.state)
493 with self._release_lock:
498 """ Execute all steps required for the RM to reach the state READY.
500 This method is responsible for deploying the resource (and invoking
501 the discover and provision methods).
503 This method should not be overriden directly. Specific functionality
504 should be added in the do_deploy method.
507 if self.state > ResourceState.READY:
508 self.error("Wrong state %s for deploy" % self.state)
511 with self._release_lock:
512 if self._state != ResourceState.RELEASED:
514 self.debug("----- READY ---- ")
517 """ Perform actions to free resources used by the RM.
519 This method is responsible for releasing resources that were
520 used during the experiment by the RM.
522 This method should not be overriden directly. Specific functionality
523 should be added in the do_release method.
526 with self._release_lock:
531 err = traceback.format_exc()
535 self.debug("----- RELEASED ---- ")
539 """ Sets the RM to state FINISHED.
541 The FINISHED state is different from STOPPED state in that it
542 should not be directly invoked by the user.
543 STOPPED indicates that the user interrupted the RM, FINISHED means
544 that the RM concluded normally the actions it was supposed to perform.
546 This method should not be overriden directly. Specific functionality
547 should be added in the do_finish method.
550 with self._release_lock:
551 if self._state != ResourceState.RELEASED:
555 """ Sets the RM to state FAILED.
557 This method should not be overriden directly. Specific functionality
558 should be added in the do_fail method.
561 with self._release_lock:
562 if self._state != ResourceState.RELEASED:
565 def set(self, name, value):
566 """ Set the value of the attribute
568 :param name: Name of the attribute
570 :param name: Value of the attribute
573 attr = self._attrs[name]
577 """ Returns the value of the attribute
579 :param name: Name of the attribute
583 attr = self._attrs[name]
586 def enable_trace(self, name):
587 """ Explicitly enable trace generation
589 :param name: Name of the trace
592 trace = self._trcs[name]
595 def trace_enabled(self, name):
596 """Returns True if trace is enables
598 :param name: Name of the trace
601 trace = self._trcs[name]
604 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
605 """ Get information on collected trace
607 :param name: Name of the trace
610 :param attr: Can be one of:
611 - TraceAttr.ALL (complete trace content),
612 - TraceAttr.STREAM (block in bytes to read starting at offset),
613 - TraceAttr.PATH (full path to the trace file),
614 - TraceAttr.SIZE (size of trace file).
617 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
620 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
627 def register_condition(self, action, group, state, time = None):
628 """ Registers a condition on the resource manager to allow execution
629 of 'action' only after 'time' has elapsed from the moment all resources
630 in 'group' reached state 'state'
632 :param action: Action to restrict to condition (either 'START' or 'STOP')
634 :param group: Group of RMs to wait for (list of guids)
635 :type group: int or list of int
636 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
638 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
643 if not action in self.conditions:
644 self._conditions[action] = list()
646 conditions = self.conditions.get(action)
648 # For each condition to register a tuple of (group, state, time) is
649 # added to the 'action' list
650 if not isinstance(group, list):
653 conditions.append((group, state, time))
655 def unregister_condition(self, group, action = None):
656 """ Removed conditions for a certain group of guids
658 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
661 :param group: Group of RMs to wait for (list of guids)
662 :type group: int or list of int
665 # For each condition a tuple of (group, state, time) is
666 # added to the 'action' list
667 if not isinstance(group, list):
670 for act, conditions in self.conditions.iteritems():
671 if action and act != action:
674 for condition in list(conditions):
675 (grp, state, time) = condition
677 # If there is an intersection between grp and group,
678 # then remove intersected elements
679 intsec = set(group).intersection(set(grp))
681 idx = conditions.index(condition)
683 newgrp.difference_update(intsec)
684 conditions[idx] = (newgrp, state, time)
686 def get_connected(self, rtype = None):
687 """ Returns the list of RM with the type 'rtype'
689 :param rtype: Type of the RM we look for
691 :return: list of guid
694 rclass = ResourceFactory.get_resource_type(rtype)
695 for guid in self.connections:
696 rm = self.ec.get_resource(guid)
697 if not rtype or isinstance(rm, rclass):
702 def _needs_reschedule(self, group, state, time):
703 """ Internal method that verify if 'time' has elapsed since
704 all elements in 'group' have reached state 'state'.
706 :param group: Group of RMs to wait for (list of guids)
707 :type group: int or list of int
708 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
710 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
713 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
714 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
715 For the moment, 2m30s is not a correct syntax.
719 delay = reschedule_delay
721 # check state and time elapsed on all RMs
723 rm = self.ec.get_resource(guid)
725 # If one of the RMs this resource needs to wait for has FAILED
726 # we raise an exception
727 if rm.state == ResourceState.FAILED:
728 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
729 raise RuntimeError, msg
731 # If the RM state is lower than the requested state we must
732 # reschedule (e.g. if RM is READY but we required STARTED).
737 # If there is a time restriction, we must verify the
738 # restriction is satisfied
740 if state == ResourceState.DISCOVERED:
742 if state == ResourceState.PROVISIONED:
743 t = rm.provision_time
744 elif state == ResourceState.READY:
746 elif state == ResourceState.STARTED:
748 elif state == ResourceState.STOPPED:
750 elif state == ResourceState.FINISHED:
752 elif state == ResourceState.RELEASED:
757 # time already elapsed since RM changed state
758 waited = "%fs" % tdiffsec(tnow(), t)
761 wait = tdiffsec(stabsformat(time), stabsformat(waited))
768 return reschedule, delay
770 def set_with_conditions(self, name, value, group, state, time):
771 """ Set value 'value' on attribute with name 'name' when 'time'
772 has elapsed since all elements in 'group' have reached state
775 :param name: Name of the attribute to set
777 :param name: Value of the attribute to set
779 :param group: Group of RMs to wait for (list of guids)
780 :type group: int or list of int
781 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
783 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
788 delay = reschedule_delay
790 ## evaluate if set conditions are met
792 # only can set with conditions after the RM is started
793 if self.state != ResourceState.STARTED:
796 reschedule, delay = self._needs_reschedule(group, state, time)
799 callback = functools.partial(self.set_with_conditions,
800 name, value, group, state, time)
801 self.ec.schedule(delay, callback)
803 self.set(name, value)
805 def start_with_conditions(self):
806 """ Starts RM when all the conditions in self.conditions for
807 action 'START' are satisfied.
811 delay = reschedule_delay
813 ## evaluate if conditions to start are met
817 # Can only start when RM is either STOPPED or READY
818 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
820 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
822 start_conditions = self.conditions.get(ResourceAction.START, [])
824 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
826 # Verify all start conditions are met
827 for (group, state, time) in start_conditions:
828 # Uncomment for debug
831 # rm = self.ec.get_resource(guid)
832 # unmet.append((guid, rm._state))
834 #self.debug("---- WAITED STATES ---- %s" % unmet )
836 reschedule, delay = self._needs_reschedule(group, state, time)
841 self.ec.schedule(delay, self.start_with_conditions)
843 self.debug("----- STARTING ---- ")
846 def stop_with_conditions(self):
847 """ Stops RM when all the conditions in self.conditions for
848 action 'STOP' are satisfied.
852 delay = reschedule_delay
854 ## evaluate if conditions to stop are met
858 # only can stop when RM is STARTED
859 if self.state != ResourceState.STARTED:
861 self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
863 self.debug(" ---- STOP CONDITIONS ---- %s" %
864 self.conditions.get(ResourceAction.STOP))
866 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
867 for (group, state, time) in stop_conditions:
868 reschedule, delay = self._needs_reschedule(group, state, time)
873 callback = functools.partial(self.stop_with_conditions)
874 self.ec.schedule(delay, callback)
876 self.debug(" ----- STOPPING ---- ")
879 def deploy_with_conditions(self):
880 """ Deploy RM when all the conditions in self.conditions for
881 action 'READY' are satisfied.
885 delay = reschedule_delay
887 ## evaluate if conditions to deploy are met
891 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
892 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
893 ResourceState.PROVISIONED]:
895 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
897 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
899 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
901 # Verify all start conditions are met
902 for (group, state, time) in deploy_conditions:
903 # Uncomment for debug
906 # rm = self.ec.get_resource(guid)
907 # unmet.append((guid, rm._state))
909 #self.debug("---- WAITED STATES ---- %s" % unmet )
911 reschedule, delay = self._needs_reschedule(group, state, time)
916 self.ec.schedule(delay, self.deploy_with_conditions)
918 self.debug("----- STARTING ---- ")
921 def do_connect(self, guid):
922 """ Performs actions that need to be taken upon associating RMs.
923 This method should be redefined when necessary in child classes.
927 def do_disconnect(self, guid):
928 """ Performs actions that need to be taken upon disassociating RMs.
929 This method should be redefined when necessary in child classes.
933 def valid_connection(self, guid):
934 """Checks whether a connection with the other RM
936 This method need to be redefined by each new Resource Manager.
938 :param guid: Guid of the current Resource Manager
946 def do_discover(self):
947 self.set_discovered()
949 def do_provision(self):
950 self.set_provisioned()
961 def do_release(self):
970 def set_started(self):
971 """ Mark ResourceManager as STARTED """
972 self.set_state(ResourceState.STARTED, "_start_time")
974 def set_stopped(self):
975 """ Mark ResourceManager as STOPPED """
976 self.set_state(ResourceState.STOPPED, "_stop_time")
979 """ Mark ResourceManager as READY """
980 self.set_state(ResourceState.READY, "_ready_time")
982 def set_released(self):
983 """ Mark ResourceManager as REALEASED """
984 self.set_state(ResourceState.RELEASED, "_release_time")
986 def set_finished(self):
987 """ Mark ResourceManager as FINISHED """
988 self.set_state(ResourceState.FINISHED, "_finish_time")
990 def set_failed(self):
991 """ Mark ResourceManager as FAILED """
992 self.set_state(ResourceState.FAILED, "_failed_time")
994 def set_discovered(self):
995 """ Mark ResourceManager as DISCOVERED """
996 self.set_state(ResourceState.DISCOVERED, "_discover_time")
998 def set_provisioned(self):
999 """ Mark ResourceManager as PROVISIONED """
1000 self.set_state(ResourceState.PROVISIONED, "_provision_time")
1002 def set_state(self, state, state_time_attr):
1003 # Ensure that RM state will not change after released
1004 if self._state == ResourceState.RELEASED:
1007 setattr(self, state_time_attr, tnow())
1010 class ResourceFactory(object):
1011 _resource_types = dict()
1014 def resource_types(cls):
1015 """Return the type of the Class"""
1016 return cls._resource_types
1019 def get_resource_type(cls, rtype):
1020 """Return the type of the Class"""
1021 return cls._resource_types.get(rtype)
1024 def register_type(cls, rclass):
1025 """Register a new Ressource Manager"""
1026 cls._resource_types[rclass.rtype()] = rclass
1029 def create(cls, rtype, ec, guid):
1030 """Create a new instance of a Ressource Manager"""
1031 rclass = cls._resource_types[rtype]
1032 return rclass(ec, guid)
1034 def populate_factory():
1035 """Register all the possible RM that exists in the current version of Nepi.
1037 # Once the factory is populated, don't repopulate
1038 if not ResourceFactory.resource_types():
1039 for rclass in find_types():
1040 ResourceFactory.register_type(rclass)
1043 """Look into the different folders to find all the
1044 availables Resources Managers
1046 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1047 search_path = set(search_path.split(" "))
1050 import nepi.resources
1051 path = os.path.dirname(nepi.resources.__file__)
1052 search_path.add(path)
1056 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
1057 prefix = "nepi.resources."):
1059 loader = importer.find_module(modname)
1062 # Notice: Repeated calls to load_module will act as a reload of teh module
1063 if modname in sys.modules:
1064 module = sys.modules.get(modname)
1066 module = loader.load_module(modname)
1068 for attrname in dir(module):
1069 if attrname.startswith("_"):
1072 attr = getattr(module, attrname)
1074 if attr == ResourceManager:
1077 if not inspect.isclass(attr):
1080 if issubclass(attr, ResourceManager):
1083 if not modname in sys.modules:
1084 sys.modules[modname] = module
1089 err = traceback.format_exc()
1090 logger = logging.getLogger("Resource.find_types()")
1091 logger.error("Error while loading Resource Managers %s" % err)