2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
32 reschedule_delay = "1s"
35 """ Action that a user can order to a Resource Manager
43 """ State of a Resource Manager
56 ResourceState2str = dict({
57 ResourceState.NEW : "NEW",
58 ResourceState.DISCOVERED : "DISCOVERED",
59 ResourceState.PROVISIONED : "PROVISIONED",
60 ResourceState.READY : "READY",
61 ResourceState.STARTED : "STARTED",
62 ResourceState.STOPPED : "STOPPED",
63 ResourceState.FINISHED : "FINISHED",
64 ResourceState.FAILED : "FAILED",
65 ResourceState.RELEASED : "RELEASED",
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class
75 def clsinit_copy(cls):
76 """ Initializes template information (i.e. attributes and traces)
77 for the ResourceManager class, inheriting attributes and traces
83 # Decorator to invoke class initialization method
85 class ResourceManager(Logger):
86 """ Base clase for all ResourceManagers.
88 A ResourceManger is specific to a resource type (e.g. Node,
89 Switch, Application, etc) on a specific backend (e.g. PlanetLab,
92 The ResourceManager instances are responsible for interacting with
93 and controlling concrete (physical or virtual) resources in the
94 experimental backends.
104 def _register_attribute(cls, attr):
105 """ Resource subclasses will invoke this method to add a
109 cls._attributes[attr.name] = attr
112 def _remove_attribute(cls, name):
113 """ Resource subclasses will invoke this method to remove a
117 del cls._attributes[name]
120 def _register_trace(cls, trace):
121 """ Resource subclasses will invoke this method to add a
125 cls._traces[trace.name] = trace
128 def _remove_trace(cls, name):
129 """ Resource subclasses will invoke this method to remove a
133 del cls._traces[name]
136 def _register_attributes(cls):
137 """ Resource subclasses will invoke this method to register
144 def _register_traces(cls):
145 """ Resource subclasses will invoke this method to register
153 """ ResourceManager child classes have different attributes and traces.
154 Since the templates that hold the information of attributes and traces
155 are 'class attribute' dictionaries, initially they all point to the
156 parent class ResourceManager instances of those dictionaries.
157 In order to make these templates independent from the parent's one,
158 it is necessary re-initialize the corresponding dictionaries.
159 This is the objective of the _clsinit method
161 # static template for resource attributes
162 cls._attributes = dict()
163 cls._register_attributes()
165 # static template for resource traces
167 cls._register_traces()
170 def _clsinit_copy(cls):
171 """ Same as _clsinit, except that it also inherits all attributes and traces
172 from the parent class.
174 # static template for resource attributes
175 cls._attributes = copy.deepcopy(cls._attributes)
176 cls._register_attributes()
178 # static template for resource traces
179 cls._traces = copy.deepcopy(cls._traces)
180 cls._register_traces()
184 """ Returns the type of the Resource Manager
190 def get_attributes(cls):
191 """ Returns a copy of the attributes
194 return copy.deepcopy(cls._attributes.values())
198 """ Returns a copy of the traces
201 return copy.deepcopy(cls._traces.values())
205 """ Returns the description of the type of Resource
211 def get_backend(cls):
212 """ Returns the identified of the backend (i.e. testbed, environment)
218 def __init__(self, ec, guid):
219 super(ResourceManager, self).__init__(self.rtype())
222 self._ec = weakref.ref(ec)
223 self._connections = set()
224 self._conditions = dict()
226 # the resource instance gets a copy of all attributes
227 self._attrs = copy.deepcopy(self._attributes)
229 # the resource instance gets a copy of all traces
230 self._trcs = copy.deepcopy(self._traces)
232 # Each resource is placed on a deployment group by the EC
234 self.deployment_group = None
236 self._start_time = None
237 self._stop_time = None
238 self._discover_time = None
239 self._provision_time = None
240 self._ready_time = None
241 self._release_time = None
242 self._finish_time = None
243 self._failed_time = None
245 self._state = ResourceState.NEW
249 """ Returns the global unique identifier of the RM """
254 """ Returns the Experiment Controller """
258 def connections(self):
259 """ Returns the set of guids of connected RMs"""
260 return self._connections
263 def conditions(self):
264 """ Returns the conditions to which the RM is subjected to.
266 The object returned by this method is a dictionary indexed by
268 return self._conditions
271 def start_time(self):
272 """ Returns the start time of the RM as a timestamp"""
273 return self._start_time
277 """ Returns the stop time of the RM as a timestamp"""
278 return self._stop_time
281 def discover_time(self):
282 """ Returns the time discovering was finished for the RM as a timestamp"""
283 return self._discover_time
286 def provision_time(self):
287 """ Returns the time provisioning was finished for the RM as a timestamp"""
288 return self._provision_time
291 def ready_time(self):
292 """ Returns the time deployment was finished for the RM as a timestamp"""
293 return self._ready_time
296 def release_time(self):
297 """ Returns the release time of the RM as a timestamp"""
298 return self._release_time
301 def finish_time(self):
302 """ Returns the finalization time of the RM as a timestamp"""
303 return self._finish_time
306 def failed_time(self):
307 """ Returns the time failure occured for the RM as a timestamp"""
308 return self._failed_time
312 """ Get the state of the current RM """
315 def log_message(self, msg):
316 """ Returns the log message formatted with added information.
318 :param msg: text message
322 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
324 def register_connection(self, guid):
325 """ Registers a connection to the RM identified by guid
327 :param guid: Global unique identified of the RM to connect to
330 if self.valid_connection(guid):
332 self._connections.add(guid)
334 def unregister_connection(self, guid):
335 """ Removes a registered connection to the RM identified by guid
337 :param guid: Global unique identified of the RM to connect to
340 if guid in self._connections:
341 self.disconnect(guid)
342 self._connections.remove(guid)
345 """ Performs resource discovery.
347 This method is resposible for selecting an individual resource
348 matching user requirements.
349 This method should be redefined when necessary in child classes.
351 self.set_discovered()
354 """ Performs resource provisioning.
356 This method is resposible for provisioning one resource.
357 After this method has been successfully invoked, the resource
358 should be acccesible/controllable by the RM.
359 This method should be redefined when necessary in child classes.
361 self.set_provisioned()
364 """ Starts the resource.
366 There is no generic start behavior for all resources.
367 This method should be redefined when necessary in child classes.
369 if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
370 self.error("Wrong state %s for start" % self.state)
376 """ Stops the resource.
378 There is no generic stop behavior for all resources.
379 This method should be redefined when necessary in child classes.
381 if not self.state in [ResourceState.STARTED]:
382 self.error("Wrong state %s for stop" % self.state)
388 """ Execute all steps required for the RM to reach the state READY
391 if self.state > ResourceState.READY:
392 self.error("Wrong state %s for deploy" % self.state)
395 self.debug("----- READY ---- ")
407 def set(self, name, value):
408 """ Set the value of the attribute
410 :param name: Name of the attribute
412 :param name: Value of the attribute
415 attr = self._attrs[name]
419 """ Returns the value of the attribute
421 :param name: Name of the attribute
425 attr = self._attrs[name]
428 def enable_trace(self, name):
429 """ Explicitly enable trace generation
431 :param name: Name of the trace
434 trace = self._trcs[name]
437 def trace_enabled(self, name):
438 """Returns True if trace is enables
440 :param name: Name of the trace
443 trace = self._trcs[name]
446 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
447 """ Get information on collected trace
449 :param name: Name of the trace
452 :param attr: Can be one of:
453 - TraceAttr.ALL (complete trace content),
454 - TraceAttr.STREAM (block in bytes to read starting at offset),
455 - TraceAttr.PATH (full path to the trace file),
456 - TraceAttr.SIZE (size of trace file).
459 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
462 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
469 def register_condition(self, action, group, state, time = None):
470 """ Registers a condition on the resource manager to allow execution
471 of 'action' only after 'time' has elapsed from the moment all resources
472 in 'group' reached state 'state'
474 :param action: Action to restrict to condition (either 'START' or 'STOP')
476 :param group: Group of RMs to wait for (list of guids)
477 :type group: int or list of int
478 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
480 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
485 if not action in self.conditions:
486 self._conditions[action] = list()
488 conditions = self.conditions.get(action)
490 # For each condition to register a tuple of (group, state, time) is
491 # added to the 'action' list
492 if not isinstance(group, list):
495 conditions.append((group, state, time))
497 def unregister_condition(self, group, action = None):
498 """ Removed conditions for a certain group of guids
500 :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
503 :param group: Group of RMs to wait for (list of guids)
504 :type group: int or list of int
507 # For each condition a tuple of (group, state, time) is
508 # added to the 'action' list
509 if not isinstance(group, list):
512 for act, conditions in self.conditions.iteritems():
513 if action and act != action:
516 for condition in list(conditions):
517 (grp, state, time) = condition
519 # If there is an intersection between grp and group,
520 # then remove intersected elements
521 intsec = set(group).intersection(set(grp))
523 idx = conditions.index(condition)
525 newgrp.difference_update(intsec)
526 conditions[idx] = (newgrp, state, time)
528 def get_connected(self, rtype = None):
529 """ Returns the list of RM with the type 'rtype'
531 :param rtype: Type of the RM we look for
533 :return: list of guid
536 rclass = ResourceFactory.get_resource_type(rtype)
537 for guid in self.connections:
538 rm = self.ec.get_resource(guid)
539 if not rtype or isinstance(rm, rclass):
543 def _needs_reschedule(self, group, state, time):
544 """ Internal method that verify if 'time' has elapsed since
545 all elements in 'group' have reached state 'state'.
547 :param group: Group of RMs to wait for (list of guids)
548 :type group: int or list of int
549 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
551 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
554 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
555 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
556 For the moment, 2m30s is not a correct syntax.
560 delay = reschedule_delay
562 # check state and time elapsed on all RMs
564 rm = self.ec.get_resource(guid)
565 # If the RM state is lower than the requested state we must
566 # reschedule (e.g. if RM is READY but we required STARTED).
571 # If there is a time restriction, we must verify the
572 # restriction is satisfied
574 if state == ResourceState.DISCOVERED:
576 if state == ResourceState.PROVISIONED:
577 t = rm.provision_time
578 elif state == ResourceState.READY:
580 elif state == ResourceState.STARTED:
582 elif state == ResourceState.STOPPED:
587 # time already elapsed since RM changed state
588 waited = "%fs" % tdiffsec(tnow(), t)
591 wait = tdiffsec(stabsformat(time), stabsformat(waited))
598 return reschedule, delay
600 def set_with_conditions(self, name, value, group, state, time):
601 """ Set value 'value' on attribute with name 'name' when 'time'
602 has elapsed since all elements in 'group' have reached state
605 :param name: Name of the attribute to set
607 :param name: Value of the attribute to set
609 :param group: Group of RMs to wait for (list of guids)
610 :type group: int or list of int
611 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
613 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
618 delay = reschedule_delay
620 ## evaluate if set conditions are met
622 # only can set with conditions after the RM is started
623 if self.state != ResourceState.STARTED:
626 reschedule, delay = self._needs_reschedule(group, state, time)
629 callback = functools.partial(self.set_with_conditions,
630 name, value, group, state, time)
631 self.ec.schedule(delay, callback)
633 self.set(name, value)
635 def start_with_conditions(self):
636 """ Starts RM when all the conditions in self.conditions for
637 action 'START' are satisfied.
641 delay = reschedule_delay
643 ## evaluate if set conditions are met
645 # only can start when RM is either STOPPED or READY
646 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
648 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
650 start_conditions = self.conditions.get(ResourceAction.START, [])
652 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
654 # Verify all start conditions are met
655 for (group, state, time) in start_conditions:
656 # Uncomment for debug
659 # rm = self.ec.get_resource(guid)
660 # unmet.append((guid, rm._state))
662 #self.debug("---- WAITED STATES ---- %s" % unmet )
664 reschedule, delay = self._needs_reschedule(group, state, time)
669 self.ec.schedule(delay, self.start_with_conditions)
671 self.debug("----- STARTING ---- ")
674 def stop_with_conditions(self):
675 """ Stops RM when all the conditions in self.conditions for
676 action 'STOP' are satisfied.
680 delay = reschedule_delay
682 ## evaluate if set conditions are met
684 # only can stop when RM is STARTED
685 if self.state != ResourceState.STARTED:
688 self.debug(" ---- STOP CONDITIONS ---- %s" %
689 self.conditions.get(ResourceAction.STOP))
691 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
692 for (group, state, time) in stop_conditions:
693 reschedule, delay = self._needs_reschedule(group, state, time)
698 callback = functools.partial(self.stop_with_conditions)
699 self.ec.schedule(delay, callback)
701 self.debug(" ----- STOPPING ---- ")
704 def deploy_with_conditions(self):
705 """ Deploy RM when all the conditions in self.conditions for
706 action 'READY' are satisfied.
710 delay = reschedule_delay
712 ## evaluate if set conditions are met
714 # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED
715 if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
716 ResourceState.PROVISIONED]:
718 self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
720 deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
722 self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions)
724 # Verify all start conditions are met
725 for (group, state, time) in deploy_conditions:
726 # Uncomment for debug
729 # rm = self.ec.get_resource(guid)
730 # unmet.append((guid, rm._state))
732 #self.debug("---- WAITED STATES ---- %s" % unmet )
734 reschedule, delay = self._needs_reschedule(group, state, time)
739 self.ec.schedule(delay, self.deploy_with_conditions)
741 self.debug("----- STARTING ---- ")
745 def connect(self, guid):
746 """ Performs actions that need to be taken upon associating RMs.
747 This method should be redefined when necessary in child classes.
751 def disconnect(self, guid):
752 """ Performs actions that need to be taken upon disassociating RMs.
753 This method should be redefined when necessary in child classes.
757 def valid_connection(self, guid):
758 """Checks whether a connection with the other RM
760 This method need to be redefined by each new Resource Manager.
762 :param guid: Guid of the current Resource Manager
770 def set_started(self):
771 """ Mark ResourceManager as STARTED """
772 self._start_time = tnow()
773 self._state = ResourceState.STARTED
775 def set_stopped(self):
776 """ Mark ResourceManager as STOPPED """
777 self._stop_time = tnow()
778 self._state = ResourceState.STOPPED
781 """ Mark ResourceManager as READY """
782 self._ready_time = tnow()
783 self._state = ResourceState.READY
785 def set_released(self):
786 """ Mark ResourceManager as REALEASED """
787 self._release_time = tnow()
788 self._state = ResourceState.RELEASED
790 def set_finished(self):
791 """ Mark ResourceManager as FINISHED """
792 self._finish_time = tnow()
793 self._state = ResourceState.FINISHED
795 def set_failed(self):
796 """ Mark ResourceManager as FAILED """
797 self._failed_time = tnow()
798 self._state = ResourceState.FAILED
800 def set_discovered(self):
801 """ Mark ResourceManager as DISCOVERED """
802 self._discover_time = tnow()
803 self._state = ResourceState.DISCOVERED
805 def set_provisioned(self):
806 """ Mark ResourceManager as PROVISIONED """
807 self._provision_time = tnow()
808 self._state = ResourceState.PROVISIONED
810 class ResourceFactory(object):
811 _resource_types = dict()
814 def resource_types(cls):
815 """Return the type of the Class"""
816 return cls._resource_types
819 def get_resource_type(cls, rtype):
820 """Return the type of the Class"""
821 return cls._resource_types.get(rtype)
824 def register_type(cls, rclass):
825 """Register a new Ressource Manager"""
826 cls._resource_types[rclass.rtype()] = rclass
829 def create(cls, rtype, ec, guid):
830 """Create a new instance of a Ressource Manager"""
831 rclass = cls._resource_types[rtype]
832 return rclass(ec, guid)
834 def populate_factory():
835 """Register all the possible RM that exists in the current version of Nepi.
837 # Once the factory is populated, don't repopulate
838 if not ResourceFactory.resource_types():
839 for rclass in find_types():
840 ResourceFactory.register_type(rclass)
843 """Look into the different folders to find all the
844 availables Resources Managers
846 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
847 search_path = set(search_path.split(" "))
850 import nepi.resources
851 path = os.path.dirname(nepi.resources.__file__)
852 search_path.add(path)
856 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
857 prefix = "nepi.resources."):
859 loader = importer.find_module(modname)
862 # Notice: Repeated calls to load_module will act as a reload of teh module
863 if modname in sys.modules:
864 module = sys.modules.get(modname)
866 module = loader.load_module(modname)
868 for attrname in dir(module):
869 if attrname.startswith("_"):
872 attr = getattr(module, attrname)
874 if attr == ResourceManager:
877 if not inspect.isclass(attr):
880 if issubclass(attr, ResourceManager):
883 if not modname in sys.modules:
884 sys.modules[modname] = module
889 err = traceback.format_exc()
890 logger = logging.getLogger("Resource.find_types()")
891 logger.error("Error while loading Resource Managers %s" % err)