2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
34 reschedule_delay = "1s"
37 """ Action that a user can order to a Resource Manager
45 """ State of a Resource Manager
57 ResourceState2str = dict({
58 ResourceState.NEW : "NEW",
59 ResourceState.DISCOVERED : "DISCOVERED",
60 ResourceState.PROVISIONED : "PROVISIONED",
61 ResourceState.READY : "READY",
62 ResourceState.STARTED : "STARTED",
63 ResourceState.STOPPED : "STOPPED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 on classes derived from the ResourceManager class.
72 It is used as a decorator in the class declaration as follows:
75 class MyResourceManager(ResourceManager):
84 def clsinit_copy(cls):
85 """ Initializes template information (i.e. attributes and traces)
86 on classes direved from the ResourceManager class.
87 It differs from the clsinit method in that it forces inheritance
88 of attributes and traces from the parent class.
90 It is used as a decorator in the class declaration as follows:
93 class MyResourceManager(ResourceManager):
98 clsinit_copy should be prefered to clsinit when creating new
99 ResourceManager child classes.
107 """ Decorator function for instance methods that should set the
108 RM state to FAILED when an error is raised. The methods that must be
109 decorated are: discover, provision, deploy, start, stop.
112 def wrapped(self, *args, **kwargs):
114 return func(self, *args, **kwargs)
117 err = traceback.format_exc()
119 self.debug("SETTING guid %d to state FAILED" % self.guid)
126 class ResourceManager(Logger):
127 """ Base clase for all ResourceManagers.
129 A ResourceManger is specific to a resource type (e.g. Node,
130 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
133 The ResourceManager instances are responsible for interacting with
134 and controlling concrete (physical or virtual) resources in the
135 experimental backends.
145 def _register_attribute(cls, attr):
146 """ Resource subclasses will invoke this method to add a
151 cls._attributes[attr.name] = attr
154 def _remove_attribute(cls, name):
155 """ Resource subclasses will invoke this method to remove a
160 del cls._attributes[name]
163 def _register_trace(cls, trace):
164 """ Resource subclasses will invoke this method to add a
169 cls._traces[trace.name] = trace
172 def _remove_trace(cls, name):
173 """ Resource subclasses will invoke this method to remove a
178 del cls._traces[name]
181 def _register_attributes(cls):
182 """ Resource subclasses will invoke this method to register
185 This method should be overriden in the RMs that define
190 critical = Attribute("critical",
191 "Defines whether the resource is critical. "
192 "A failure on a critical resource will interrupt "
196 flags = Flags.ExecReadOnly)
198 cls._register_attribute(critical)
201 def _register_traces(cls):
202 """ Resource subclasses will invoke this method to register
205 This method should be overriden in the RMs that define traces.
213 """ ResourceManager classes have different attributes and traces.
214 Attribute and traces are stored in 'class attribute' dictionaries.
215 When a new ResourceManager class is created, the _clsinit method is
216 called to create a new instance of those dictionaries and initialize
219 The _clsinit method is called by the clsinit decorator method.
223 # static template for resource attributes
224 cls._attributes = dict()
225 cls._register_attributes()
227 # static template for resource traces
229 cls._register_traces()
232 def _clsinit_copy(cls):
233 """ Same as _clsinit, except that after creating new instances of the
234 dictionaries it copies all the attributes and traces from the parent
237 The _clsinit_copy method is called by the clsinit_copy decorator method.
240 # static template for resource attributes
241 cls._attributes = copy.deepcopy(cls._attributes)
242 cls._register_attributes()
244 # static template for resource traces
245 cls._traces = copy.deepcopy(cls._traces)
246 cls._register_traces()
250 """ Returns the type of the Resource Manager
256 def get_attributes(cls):
257 """ Returns a copy of the attributes
260 return copy.deepcopy(cls._attributes.values())
264 """ Returns a copy of the traces
267 return copy.deepcopy(cls._traces.values())
271 """ Returns the description of the type of Resource
277 def get_backend(cls):
278 """ Returns the identified of the backend (i.e. testbed, environment)
284 def __init__(self, ec, guid):
285 super(ResourceManager, self).__init__(self.get_rtype())
288 self._ec = weakref.ref(ec)
289 self._connections = set()
290 self._conditions = dict()
292 # the resource instance gets a copy of all attributes
293 self._attrs = copy.deepcopy(self._attributes)
295 # the resource instance gets a copy of all traces
296 self._trcs = copy.deepcopy(self._traces)
298 # Each resource is placed on a deployment group by the EC
300 self.deployment_group = None
302 self._start_time = None
303 self._stop_time = None
304 self._discover_time = None
305 self._provision_time = None
306 self._ready_time = None
307 self._release_time = None
308 self._failed_time = None
310 self._state = ResourceState.NEW
312 # instance lock to synchronize exclusive state change methods (such
313 # as deploy and release methods), in order to prevent them from being
314 # executed at the same time
315 self._release_lock = threading.Lock()
319 """ Returns the global unique identifier of the RM """
324 """ Returns the Experiment Controller of the RM """
328 def connections(self):
329 """ Returns the set of guids of connected RMs """
330 return self._connections
333 def conditions(self):
334 """ Returns the conditions to which the RM is subjected to.
336 This method returns a dictionary of conditions lists indexed by
340 return self._conditions
343 def start_time(self):
344 """ Returns the start time of the RM as a timestamp """
345 return self._start_time
349 """ Returns the stop time of the RM as a timestamp """
350 return self._stop_time
353 def discover_time(self):
354 """ Returns the discover time of the RM as a timestamp """
355 return self._discover_time
358 def provision_time(self):
359 """ Returns the provision time of the RM as a timestamp """
360 return self._provision_time
363 def ready_time(self):
364 """ Returns the deployment time of the RM as a timestamp """
365 return self._ready_time
368 def release_time(self):
369 """ Returns the release time of the RM as a timestamp """
370 return self._release_time
373 def failed_time(self):
374 """ Returns the time failure occured for the RM as a timestamp """
375 return self._failed_time
379 """ Get the current state of the RM """
382 def log_message(self, msg):
383 """ Returns the log message formatted with added information.
385 :param msg: text message
390 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
392 def register_connection(self, guid):
393 """ Registers a connection to the RM identified by guid
395 This method should not be overriden. Specific functionality
396 should be added in the do_connect method.
398 :param guid: Global unique identified of the RM to connect to
402 if self.valid_connection(guid):
403 self.do_connect(guid)
404 self._connections.add(guid)
406 def unregister_connection(self, guid):
407 """ Removes a registered connection to the RM identified by guid
409 This method should not be overriden. Specific functionality
410 should be added in the do_disconnect method.
412 :param guid: Global unique identified of the RM to connect to
416 if guid in self._connections:
417 self.do_disconnect(guid)
418 self._connections.remove(guid)
422 """ Performs resource discovery.
424 This method is responsible for selecting an individual resource
425 matching user requirements.
427 This method should not be overriden directly. Specific functionality
428 should be added in the do_discover method.
431 with self._release_lock:
432 if self._state != ResourceState.RELEASED:
437 """ Performs resource provisioning.
439 This method is responsible for provisioning one resource.
440 After this method has been successfully invoked, the resource
441 should be accessible/controllable by the RM.
443 This method should not be overriden directly. Specific functionality
444 should be added in the do_provision method.
447 with self._release_lock:
448 if self._state != ResourceState.RELEASED:
453 """ Starts the RM (e.g. launch remote process).
455 There is no standard start behavior. Some RMs will not need to perform
456 any actions upon start.
458 This method should not be overriden directly. Specific functionality
459 should be added in the do_start method.
463 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
464 self.error("Wrong state %s for start" % self.state)
467 with self._release_lock:
468 if self._state != ResourceState.RELEASED:
473 """ Interrupts the RM, stopping any tasks the RM was performing.
475 There is no standard stop behavior. Some RMs will not need to perform
476 any actions upon stop.
478 This method should not be overriden directly. Specific functionality
479 should be added in the do_stop method.
482 if not self.state in [ResourceState.STARTED]:
483 self.error("Wrong state %s for stop" % self.state)
486 with self._release_lock:
491 """ Execute all steps required for the RM to reach the state READY.
493 This method is responsible for deploying the resource (and invoking
494 the discover and provision methods).
496 This method should not be overriden directly. Specific functionality
497 should be added in the do_deploy method.
500 if self.state > ResourceState.READY:
501 self.error("Wrong state %s for deploy" % self.state)
504 with self._release_lock:
505 if self._state != ResourceState.RELEASED:
507 self.debug("----- READY ---- ")
510 """ Perform actions to free resources used by the RM.
512 This method is responsible for releasing resources that were
513 used during the experiment by the RM.
515 This method should not be overriden directly. Specific functionality
516 should be added in the do_release method.
519 with self._release_lock:
524 err = traceback.format_exc()
528 self.debug("----- RELEASED ---- ")
531 """ Sets the RM to state FAILED.
533 This method should not be overriden directly. Specific functionality
534 should be added in the do_fail method.
537 with self._release_lock:
538 if self._state != ResourceState.RELEASED:
541 def set(self, name, value):
542 """ Set the value of the attribute
544 :param name: Name of the attribute
546 :param name: Value of the attribute
549 attr = self._attrs[name]
553 """ Returns the value of the attribute
555 :param name: Name of the attribute
559 attr = self._attrs[name]
562 def enable_trace(self, name):
563 """ Explicitly enable trace generation
565 :param name: Name of the trace
568 trace = self._trcs[name]
571 def trace_enabled(self, name):
572 """Returns True if trace is enables
574 :param name: Name of the trace
577 trace = self._trcs[name]
580 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
581 """ Get information on collected trace
583 :param name: Name of the trace
586 :param attr: Can be one of:
587 - TraceAttr.ALL (complete trace content),
588 - TraceAttr.STREAM (block in bytes to read starting at offset),
589 - TraceAttr.PATH (full path to the trace file),
590 - TraceAttr.SIZE (size of trace file).
593 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
596 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
603 def register_condition(self, action, group, state, time = None):
604 """ Registers a condition on the resource manager to allow execution
605 of 'action' only after 'time' has elapsed from the moment all resources
606 in 'group' reached state 'state'
608 :param action: Action to restrict to condition (either 'START' or 'STOP')
610 :param group: Group of RMs to wait for (list of guids)
611 :type group: int or list of int
612 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
614 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
619 if not action in self.conditions:
620 self._conditions[action] = list()
622 conditions = self.conditions.get(action)
624 # For each condition to register a tuple of (group, state, time) is
625 # added to the 'action' list
626 if not isinstance(group, list):
629 conditions.append((group, state, time))
631 def unregister_condition(self, group, action = None):
632 """ Removed conditions for a certain group of guids
634 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
637 :param group: Group of RMs to wait for (list of guids)
638 :type group: int or list of int
641 # For each condition a tuple of (group, state, time) is
642 # added to the 'action' list
643 if not isinstance(group, list):
646 for act, conditions in self.conditions.iteritems():
647 if action and act != action:
650 for condition in list(conditions):
651 (grp, state, time) = condition
653 # If there is an intersection between grp and group,
654 # then remove intersected elements
655 intsec = set(group).intersection(set(grp))
657 idx = conditions.index(condition)
659 newgrp.difference_update(intsec)
660 conditions[idx] = (newgrp, state, time)
662 def get_connected(self, rtype = None):
663 """ Returns the list of RM with the type 'rtype'
665 :param rtype: Type of the RM we look for
667 :return: list of guid
670 rclass = ResourceFactory.get_resource_type(rtype)
671 for guid in self.connections:
672 rm = self.ec.get_resource(guid)
673 if not rtype or isinstance(rm, rclass):
678 def _needs_reschedule(self, group, state, time):
679 """ Internal method that verify if 'time' has elapsed since
680 all elements in 'group' have reached state 'state'.
682 :param group: Group of RMs to wait for (list of guids)
683 :type group: int or list of int
684 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
686 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
689 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
690 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
691 For the moment, 2m30s is not a correct syntax.
695 delay = reschedule_delay
697 # check state and time elapsed on all RMs
699 rm = self.ec.get_resource(guid)
701 # If one of the RMs this resource needs to wait for has FAILED
702 # we raise an exception
703 if rm.state == ResourceState.FAILED:
704 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
705 raise RuntimeError, msg
707 # If the RM state is lower than the requested state we must
708 # reschedule (e.g. if RM is READY but we required STARTED).
713 # If there is a time restriction, we must verify the
714 # restriction is satisfied
716 if state == ResourceState.DISCOVERED:
718 if state == ResourceState.PROVISIONED:
719 t = rm.provision_time
720 elif state == ResourceState.READY:
722 elif state == ResourceState.STARTED:
724 elif state == ResourceState.STOPPED:
726 elif state == ResourceState.RELEASED:
731 # time already elapsed since RM changed state
732 waited = "%fs" % tdiffsec(tnow(), t)
735 wait = tdiffsec(stabsformat(time), stabsformat(waited))
742 return reschedule, delay
744 def set_with_conditions(self, name, value, group, state, time):
745 """ Set value 'value' on attribute with name 'name' when 'time'
746 has elapsed since all elements in 'group' have reached state
749 :param name: Name of the attribute to set
751 :param name: Value of the attribute to set
753 :param group: Group of RMs to wait for (list of guids)
754 :type group: int or list of int
755 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
757 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
762 delay = reschedule_delay
764 ## evaluate if set conditions are met
766 # only can set with conditions after the RM is started
767 if self.state != ResourceState.STARTED:
770 reschedule, delay = self._needs_reschedule(group, state, time)
773 callback = functools.partial(self.set_with_conditions,
774 name, value, group, state, time)
775 self.ec.schedule(delay, callback)
777 self.set(name, value)
779 def start_with_conditions(self):
780 """ Starts RM when all the conditions in self.conditions for
781 action 'START' are satisfied.
784 #import pdb;pdb.set_trace()
787 delay = reschedule_delay
790 ## evaluate if conditions to start are met
794 # Can only start when RM is either STOPPED or READY
795 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
797 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
799 start_conditions = self.conditions.get(ResourceAction.START, [])
801 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
803 # Verify all start conditions are met
804 for (group, state, time) in start_conditions:
805 # Uncomment for debug
808 rm = self.ec.get_resource(guid)
809 unmet.append((guid, rm._state))
811 self.debug("---- WAITED STATES ---- %s" % unmet )
813 reschedule, delay = self._needs_reschedule(group, state, time)
818 self.ec.schedule(delay, self.start_with_conditions)
820 self.debug("----- STARTING ---- ")
823 def stop_with_conditions(self):
824 """ Stops RM when all the conditions in self.conditions for
825 action 'STOP' are satisfied.
829 delay = reschedule_delay
831 ## evaluate if conditions to stop are met
835 # only can stop when RM is STARTED
836 if self.state != ResourceState.STARTED:
838 self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
840 self.debug(" ---- STOP CONDITIONS ---- %s" %
841 self.conditions.get(ResourceAction.STOP))
843 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
844 for (group, state, time) in stop_conditions:
845 reschedule, delay = self._needs_reschedule(group, state, time)
850 callback = functools.partial(self.stop_with_conditions)
851 self.ec.schedule(delay, callback)
853 self.debug(" ----- STOPPING ---- ")
856 def deploy_with_conditions(self):
857 """ Deploy RM when all the conditions in self.conditions for
858 action 'READY' are satisfied.
862 delay = reschedule_delay
864 ## evaluate if conditions to deploy are met
868 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
869 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
870 ResourceState.PROVISIONED]:
872 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
874 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
876 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
878 # Verify all start conditions are met
879 for (group, state, time) in deploy_conditions:
880 # Uncomment for debug
883 # rm = self.ec.get_resource(guid)
884 # unmet.append((guid, rm._state))
886 #self.debug("---- WAITED STATES ---- %s" % unmet )
888 reschedule, delay = self._needs_reschedule(group, state, time)
893 self.ec.schedule(delay, self.deploy_with_conditions)
895 self.debug("----- DEPLOYING ---- ")
898 def do_connect(self, guid):
899 """ Performs actions that need to be taken upon associating RMs.
900 This method should be redefined when necessary in child classes.
904 def do_disconnect(self, guid):
905 """ Performs actions that need to be taken upon disassociating RMs.
906 This method should be redefined when necessary in child classes.
910 def valid_connection(self, guid):
911 """Checks whether a connection with the other RM
913 This method need to be redefined by each new Resource Manager.
915 :param guid: Guid of the current Resource Manager
923 def do_discover(self):
924 self.set_discovered()
926 def do_provision(self):
927 self.set_provisioned()
938 def do_release(self):
944 def set_started(self):
945 """ Mark ResourceManager as STARTED """
946 self.set_state(ResourceState.STARTED, "_start_time")
948 def set_stopped(self):
949 """ Mark ResourceManager as STOPPED """
950 self.set_state(ResourceState.STOPPED, "_stop_time")
953 """ Mark ResourceManager as READY """
954 self.set_state(ResourceState.READY, "_ready_time")
956 def set_released(self):
957 """ Mark ResourceManager as REALEASED """
958 self.set_state(ResourceState.RELEASED, "_release_time")
960 def set_failed(self):
961 """ Mark ResourceManager as FAILED """
962 self.set_state(ResourceState.FAILED, "_failed_time")
964 def set_discovered(self):
965 """ Mark ResourceManager as DISCOVERED """
966 self.set_state(ResourceState.DISCOVERED, "_discover_time")
968 def set_provisioned(self):
969 """ Mark ResourceManager as PROVISIONED """
970 self.set_state(ResourceState.PROVISIONED, "_provision_time")
972 def set_state(self, state, state_time_attr):
973 # Ensure that RM state will not change after released
974 if self._state == ResourceState.RELEASED:
977 setattr(self, state_time_attr, tnow())
980 class ResourceFactory(object):
981 _resource_types = dict()
984 def resource_types(cls):
985 """Return the type of the Class"""
986 return cls._resource_types
989 def get_resource_type(cls, rtype):
990 """Return the type of the Class"""
991 return cls._resource_types.get(rtype)
994 def register_type(cls, rclass):
995 """Register a new Ressource Manager"""
996 cls._resource_types[rclass.get_rtype()] = rclass
999 def create(cls, rtype, ec, guid):
1000 """Create a new instance of a Ressource Manager"""
1001 rclass = cls._resource_types[rtype]
1002 return rclass(ec, guid)
1004 def populate_factory():
1005 """Register all the possible RM that exists in the current version of Nepi.
1007 # Once the factory is populated, don't repopulate
1008 if not ResourceFactory.resource_types():
1009 for rclass in find_types():
1010 ResourceFactory.register_type(rclass)
1013 """Look into the different folders to find all the
1014 availables Resources Managers
1016 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1017 search_path = set(search_path.split(" "))
1020 import nepi.resources
1021 path = os.path.dirname(nepi.resources.__file__)
1022 search_path.add(path)
1026 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
1027 prefix = "nepi.resources."):
1029 loader = importer.find_module(modname)
1032 # Notice: Repeated calls to load_module will act as a reload of teh module
1033 if modname in sys.modules:
1034 module = sys.modules.get(modname)
1036 module = loader.load_module(modname)
1038 for attrname in dir(module):
1039 if attrname.startswith("_"):
1042 attr = getattr(module, attrname)
1044 if attr == ResourceManager:
1047 if not inspect.isclass(attr):
1050 if issubclass(attr, ResourceManager):
1053 if not modname in sys.modules:
1054 sys.modules[modname] = module
1059 err = traceback.format_exc()
1060 logger = logging.getLogger("Resource.find_types()")
1061 logger.error("Error while loading Resource Managers %s" % err)