2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
32 reschedule_delay = "1s"
35 """ Action that a user can order to a Resource Manager
43 """ State of a Resource Manager
56 ResourceState2str = dict({
57 ResourceState.NEW : "NEW",
58 ResourceState.DISCOVERED : "DISCOVERED",
59 ResourceState.PROVISIONED : "PROVISIONED",
60 ResourceState.READY : "READY",
61 ResourceState.STARTED : "STARTED",
62 ResourceState.STOPPED : "STOPPED",
63 ResourceState.FINISHED : "FINISHED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class
75 def clsinit_copy(cls):
76 """ Initializes template information (i.e. attributes and traces)
77 for the ResourceManager class, inheriting attributes and traces
83 # Decorator to invoke class initialization method
85 class ResourceManager(Logger):
86 """ Base clase for all ResourceManagers.
88 A ResourceManger is specific to a resource type (e.g. Node,
89 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
92 The ResourceManager instances are responsible for interacting with
93 and controlling concrete (physical or virtual) resources in the
94 experimental backends.
104 def _register_attribute(cls, attr):
105 """ Resource subclasses will invoke this method to add a
109 cls._attributes[attr.name] = attr
112 def _remove_attribute(cls, name):
113 """ Resource subclasses will invoke this method to remove a
117 del cls._attributes[name]
120 def _register_trace(cls, trace):
121 """ Resource subclasses will invoke this method to add a
125 cls._traces[trace.name] = trace
128 def _remove_trace(cls, name):
129 """ Resource subclasses will invoke this method to remove a
133 del cls._traces[name]
136 def _register_attributes(cls):
137 """ Resource subclasses will invoke this method to register
144 def _register_traces(cls):
145 """ Resource subclasses will invoke this method to register
153 """ ResourceManager child classes have different attributes and traces.
154 Since the templates that hold the information of attributes and traces
155 are 'class attribute' dictionaries, initially they all point to the
156 parent class ResourceManager instances of those dictionaries.
157 In order to make these templates independent from the parent's one,
158 it is necessary re-initialize the corresponding dictionaries.
159 This is the objective of the _clsinit method
161 # static template for resource attributes
162 cls._attributes = dict()
163 cls._register_attributes()
165 # static template for resource traces
167 cls._register_traces()
170 def _clsinit_copy(cls):
171 """ Same as _clsinit, except that it also inherits all attributes and traces
172 from the parent class.
174 # static template for resource attributes
175 cls._attributes = copy.deepcopy(cls._attributes)
176 cls._register_attributes()
178 # static template for resource traces
179 cls._traces = copy.deepcopy(cls._traces)
180 cls._register_traces()
184 """ Returns the type of the Resource Manager
190 def get_attributes(cls):
191 """ Returns a copy of the attributes
194 return copy.deepcopy(cls._attributes.values())
198 """ Returns a copy of the traces
201 return copy.deepcopy(cls._traces.values())
205 """ Returns the description of the type of Resource
211 def get_backend(cls):
212 """ Returns the identified of the backend (i.e. testbed, environment)
218 def __init__(self, ec, guid):
219 super(ResourceManager, self).__init__(self.rtype())
222 self._ec = weakref.ref(ec)
223 self._connections = set()
224 self._conditions = dict()
226 # the resource instance gets a copy of all attributes
227 self._attrs = copy.deepcopy(self._attributes)
229 # the resource instance gets a copy of all traces
230 self._trcs = copy.deepcopy(self._traces)
232 # Each resource is placed on a deployment group by the EC
234 self.deployment_group = None
236 self._start_time = None
237 self._stop_time = None
238 self._discover_time = None
239 self._provision_time = None
240 self._ready_time = None
241 self._release_time = None
242 self._finish_time = None
243 self._failed_time = None
245 self._state = ResourceState.NEW
249 """ Returns the global unique identifier of the RM """
254 """ Returns the Experiment Controller """
258 def connections(self):
259 """ Returns the set of guids of connected RMs"""
260 return self._connections
263 def conditions(self):
264 """ Returns the conditions to which the RM is subjected to.
266 The object returned by this method is a dictionary indexed by
268 return self._conditions
271 def start_time(self):
272 """ Returns the start time of the RM as a timestamp"""
273 return self._start_time
277 """ Returns the stop time of the RM as a timestamp"""
278 return self._stop_time
281 def discover_time(self):
282 """ Returns the time discovering was finished for the RM as a timestamp"""
283 return self._discover_time
286 def provision_time(self):
287 """ Returns the time provisioning was finished for the RM as a timestamp"""
288 return self._provision_time
291 def ready_time(self):
292 """ Returns the time deployment was finished for the RM as a timestamp"""
293 return self._ready_time
296 def release_time(self):
297 """ Returns the release time of the RM as a timestamp"""
298 return self._release_time
301 def finish_time(self):
302 """ Returns the finalization time of the RM as a timestamp"""
303 return self._finish_time
306 def failed_time(self):
307 """ Returns the time failure occured for the RM as a timestamp"""
308 return self._failed_time
312 """ Get the state of the current RM """
315 def log_message(self, msg):
316 """ Returns the log message formatted with added information.
318 :param msg: text message
322 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
324 def register_connection(self, guid):
325 """ Registers a connection to the RM identified by guid
327 :param guid: Global unique identified of the RM to connect to
330 if self.valid_connection(guid):
332 self._connections.add(guid)
334 def unregister_connection(self, guid):
335 """ Removes a registered connection to the RM identified by guid
337 :param guid: Global unique identified of the RM to connect to
340 if guid in self._connections:
341 self.disconnect(guid)
342 self._connections.remove(guid)
345 """ Performs resource discovery.
347 This method is resposible for selecting an individual resource
348 matching user requirements.
349 This method should be redefined when necessary in child classes.
351 self.set_discovered()
354 """ Performs resource provisioning.
356 This method is resposible for provisioning one resource.
357 After this method has been successfully invoked, the resource
358 should be acccesible/controllable by the RM.
359 This method should be redefined when necessary in child classes.
361 self.set_provisioned()
364 """ Starts the resource.
366 There is no generic start behavior for all resources.
367 This method should be redefined when necessary in child classes.
369 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
370 self.error("Wrong state %s for start" % self.state)
376 """ Stops the resource.
378 There is no generic stop behavior for all resources.
379 This method should be redefined when necessary in child classes.
381 if not self.state in [ResourceState.STARTED]:
382 self.error("Wrong state %s for stop" % self.state)
388 """ Execute all steps required for the RM to reach the state READY
391 if self.state > ResourceState.READY:
392 self.error("Wrong state %s for deploy" % self.state)
395 self.debug("----- READY ---- ")
406 self.ec.set_rm_failure()
408 def set(self, name, value):
409 """ Set the value of the attribute
411 :param name: Name of the attribute
413 :param name: Value of the attribute
416 attr = self._attrs[name]
420 """ Returns the value of the attribute
422 :param name: Name of the attribute
426 attr = self._attrs[name]
429 def enable_trace(self, name):
430 """ Explicitly enable trace generation
432 :param name: Name of the trace
435 trace = self._trcs[name]
438 def trace_enabled(self, name):
439 """Returns True if trace is enables
441 :param name: Name of the trace
444 trace = self._trcs[name]
447 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
448 """ Get information on collected trace
450 :param name: Name of the trace
453 :param attr: Can be one of:
454 - TraceAttr.ALL (complete trace content),
455 - TraceAttr.STREAM (block in bytes to read starting at offset),
456 - TraceAttr.PATH (full path to the trace file),
457 - TraceAttr.SIZE (size of trace file).
460 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
463 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
470 def register_condition(self, action, group, state, time = None):
471 """ Registers a condition on the resource manager to allow execution
472 of 'action' only after 'time' has elapsed from the moment all resources
473 in 'group' reached state 'state'
475 :param action: Action to restrict to condition (either 'START' or 'STOP')
477 :param group: Group of RMs to wait for (list of guids)
478 :type group: int or list of int
479 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
481 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
486 if not action in self.conditions:
487 self._conditions[action] = list()
489 conditions = self.conditions.get(action)
491 # For each condition to register a tuple of (group, state, time) is
492 # added to the 'action' list
493 if not isinstance(group, list):
496 conditions.append((group, state, time))
498 def unregister_condition(self, group, action = None):
499 """ Removed conditions for a certain group of guids
501 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
504 :param group: Group of RMs to wait for (list of guids)
505 :type group: int or list of int
508 # For each condition a tuple of (group, state, time) is
509 # added to the 'action' list
510 if not isinstance(group, list):
513 for act, conditions in self.conditions.iteritems():
514 if action and act != action:
517 for condition in list(conditions):
518 (grp, state, time) = condition
520 # If there is an intersection between grp and group,
521 # then remove intersected elements
522 intsec = set(group).intersection(set(grp))
524 idx = conditions.index(condition)
526 newgrp.difference_update(intsec)
527 conditions[idx] = (newgrp, state, time)
529 def get_connected(self, rtype = None):
530 """ Returns the list of RM with the type 'rtype'
532 :param rtype: Type of the RM we look for
534 :return: list of guid
537 rclass = ResourceFactory.get_resource_type(rtype)
538 for guid in self.connections:
539 rm = self.ec.get_resource(guid)
540 if not rtype or isinstance(rm, rclass):
544 def _needs_reschedule(self, group, state, time):
545 """ Internal method that verify if 'time' has elapsed since
546 all elements in 'group' have reached state 'state'.
548 :param group: Group of RMs to wait for (list of guids)
549 :type group: int or list of int
550 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
552 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
555 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
556 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
557 For the moment, 2m30s is not a correct syntax.
561 delay = reschedule_delay
563 # check state and time elapsed on all RMs
565 rm = self.ec.get_resource(guid)
566 # If the RM state is lower than the requested state we must
567 # reschedule (e.g. if RM is READY but we required STARTED).
572 # If there is a time restriction, we must verify the
573 # restriction is satisfied
575 if state == ResourceState.DISCOVERED:
577 if state == ResourceState.PROVISIONED:
578 t = rm.provision_time
579 elif state == ResourceState.READY:
581 elif state == ResourceState.STARTED:
583 elif state == ResourceState.STOPPED:
588 # time already elapsed since RM changed state
589 waited = "%fs" % tdiffsec(tnow(), t)
592 wait = tdiffsec(stabsformat(time), stabsformat(waited))
599 return reschedule, delay
601 def set_with_conditions(self, name, value, group, state, time):
602 """ Set value 'value' on attribute with name 'name' when 'time'
603 has elapsed since all elements in 'group' have reached state
606 :param name: Name of the attribute to set
608 :param name: Value of the attribute to set
610 :param group: Group of RMs to wait for (list of guids)
611 :type group: int or list of int
612 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
614 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
619 delay = reschedule_delay
621 ## evaluate if set conditions are met
623 # only can set with conditions after the RM is started
624 if self.state != ResourceState.STARTED:
627 reschedule, delay = self._needs_reschedule(group, state, time)
630 callback = functools.partial(self.set_with_conditions,
631 name, value, group, state, time)
632 self.ec.schedule(delay, callback)
634 self.set(name, value)
636 def start_with_conditions(self):
637 """ Starts RM when all the conditions in self.conditions for
638 action 'START' are satisfied.
642 delay = reschedule_delay
644 ## evaluate if set conditions are met
646 # only can start when RM is either STOPPED or READY
647 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
649 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
651 start_conditions = self.conditions.get(ResourceAction.START, [])
653 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
655 # Verify all start conditions are met
656 for (group, state, time) in start_conditions:
657 # Uncomment for debug
660 # rm = self.ec.get_resource(guid)
661 # unmet.append((guid, rm._state))
663 #self.debug("---- WAITED STATES ---- %s" % unmet )
665 reschedule, delay = self._needs_reschedule(group, state, time)
670 self.ec.schedule(delay, self.start_with_conditions)
672 self.debug("----- STARTING ---- ")
675 def stop_with_conditions(self):
676 """ Stops RM when all the conditions in self.conditions for
677 action 'STOP' are satisfied.
681 delay = reschedule_delay
683 ## evaluate if set conditions are met
685 # only can stop when RM is STARTED
686 if self.state != ResourceState.STARTED:
689 self.debug(" ---- STOP CONDITIONS ---- %s" %
690 self.conditions.get(ResourceAction.STOP))
692 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
693 for (group, state, time) in stop_conditions:
694 reschedule, delay = self._needs_reschedule(group, state, time)
699 callback = functools.partial(self.stop_with_conditions)
700 self.ec.schedule(delay, callback)
702 self.debug(" ----- STOPPING ---- ")
705 def deploy_with_conditions(self):
706 """ Deploy RM when all the conditions in self.conditions for
707 action 'READY' are satisfied.
711 delay = reschedule_delay
713 ## evaluate if set conditions are met
715 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
716 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
717 ResourceState.PROVISIONED]:
719 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
721 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
723 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
725 # Verify all start conditions are met
726 for (group, state, time) in deploy_conditions:
727 # Uncomment for debug
730 # rm = self.ec.get_resource(guid)
731 # unmet.append((guid, rm._state))
733 #self.debug("---- WAITED STATES ---- %s" % unmet )
735 reschedule, delay = self._needs_reschedule(group, state, time)
740 self.ec.schedule(delay, self.deploy_with_conditions)
742 self.debug("----- STARTING ---- ")
745 def connect(self, guid):
746 """ Performs actions that need to be taken upon associating RMs.
747 This method should be redefined when necessary in child classes.
751 def disconnect(self, guid):
752 """ Performs actions that need to be taken upon disassociating RMs.
753 This method should be redefined when necessary in child classes.
757 def valid_connection(self, guid):
758 """Checks whether a connection with the other RM
760 This method need to be redefined by each new Resource Manager.
762 :param guid: Guid of the current Resource Manager
770 def set_started(self):
771 """ Mark ResourceManager as STARTED """
772 self._start_time = tnow()
773 self._state = ResourceState.STARTED
775 def set_stopped(self):
776 """ Mark ResourceManager as STOPPED """
777 self._stop_time = tnow()
778 self._state = ResourceState.STOPPED
781 """ Mark ResourceManager as READY """
782 self._ready_time = tnow()
783 self._state = ResourceState.READY
785 def set_released(self):
786 """ Mark ResourceManager as REALEASED """
787 self._release_time = tnow()
788 self._state = ResourceState.RELEASED
790 def set_finished(self):
791 """ Mark ResourceManager as FINISHED """
792 self._finish_time = tnow()
793 self._state = ResourceState.FINISHED
795 def set_failed(self):
796 """ Mark ResourceManager as FAILED """
797 self._failed_time = tnow()
798 self._state = ResourceState.FAILED
800 def set_discovered(self):
801 """ Mark ResourceManager as DISCOVERED """
802 self._discover_time = tnow()
803 self._state = ResourceState.DISCOVERED
805 def set_provisioned(self):
806 """ Mark ResourceManager as PROVISIONED """
807 self._provision_time = tnow()
808 self._state = ResourceState.PROVISIONED
810 class ResourceFactory(object):
811 _resource_types = dict()
814 def resource_types(cls):
815 """Return the type of the Class"""
816 return cls._resource_types
819 def get_resource_type(cls, rtype):
820 """Return the type of the Class"""
821 return cls._resource_types.get(rtype)
824 def register_type(cls, rclass):
825 """Register a new Ressource Manager"""
826 cls._resource_types[rclass.rtype()] = rclass
829 def create(cls, rtype, ec, guid):
830 """Create a new instance of a Ressource Manager"""
831 rclass = cls._resource_types[rtype]
832 return rclass(ec, guid)
834 def populate_factory():
835 """Register all the possible RM that exists in the current version of Nepi.
837 # Once the factory is populated, don't repopulate
838 if not ResourceFactory.resource_types():
839 for rclass in find_types():
840 ResourceFactory.register_type(rclass)
843 """Look into the different folders to find all the
844 availables Resources Managers
846 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
847 search_path = set(search_path.split(" "))
850 import nepi.resources
851 path = os.path.dirname(nepi.resources.__file__)
852 search_path.add(path)
856 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
857 prefix = "nepi.resources."):
859 loader = importer.find_module(modname)
862 # Notice: Repeated calls to load_module will act as a reload of teh module
863 if modname in sys.modules:
864 module = sys.modules.get(modname)
866 module = loader.load_module(modname)
868 for attrname in dir(module):
869 if attrname.startswith("_"):
872 attr = getattr(module, attrname)
874 if attr == ResourceManager:
877 if not inspect.isclass(attr):
880 if issubclass(attr, ResourceManager):
883 if not modname in sys.modules:
884 sys.modules[modname] = module
889 err = traceback.format_exc()
890 logger = logging.getLogger("Resource.find_types()")
891 logger.error("Error while loading Resource Managers %s" % err)