2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "1s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 self._state = ResourceState.NEW
205 self.deployment_group = None
207 self._start_time = None
208 self._stop_time = None
209 self._discover_time = None
210 self._provision_time = None
211 self._ready_time = None
212 self._release_time = None
213 self._finish_time = None
214 self._failed_time = None
218 """ Returns the global unique identifier of the RM """
223 """ Returns the Experiment Controller """
227 def connections(self):
228 """ Returns the set of guids of connected RMs"""
229 return self._connections
232 def conditions(self):
233 """ Returns the conditions to which the RM is subjected to.
235 The object returned by this method is a dictionary indexed by
237 return self._conditions
240 def start_time(self):
241 """ Returns the start time of the RM as a timestamp"""
242 return self._start_time
246 """ Returns the stop time of the RM as a timestamp"""
247 return self._stop_time
250 def discover_time(self):
251 """ Returns the time discovering was finished for the RM as a timestamp"""
252 return self._discover_time
255 def provision_time(self):
256 """ Returns the time provisioning was finished for the RM as a timestamp"""
257 return self._provision_time
260 def ready_time(self):
261 """ Returns the time deployment was finished for the RM as a timestamp"""
262 return self._ready_time
265 def release_time(self):
266 """ Returns the release time of the RM as a timestamp"""
267 return self._release_time
270 def finish_time(self):
271 """ Returns the finalization time of the RM as a timestamp"""
272 return self._finish_time
275 def failed_time(self):
276 """ Returns the time failure occured for the RM as a timestamp"""
277 return self._failed_time
281 """ Get the state of the current RM """
284 def log_message(self, msg):
285 """ Returns the log message formatted with added information.
287 :param msg: text message
291 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
293 def register_connection(self, guid):
294 """ Registers a connection to the RM identified by guid
296 :param guid: Global unique identified of the RM to connect to
299 if self.valid_connection(guid):
301 self._connections.add(guid)
303 def unregister_connection(self, guid):
304 """ Removes a registered connection to the RM identified by guid
306 :param guid: Global unique identified of the RM to connect to
309 if guid in self._connections:
310 self.disconnect(guid)
311 self._connections.remove(guid)
314 """ Performs resource discovery.
316 This method is resposible for selecting an individual resource
317 matching user requirements.
318 This method should be redefined when necessary in child classes.
320 self._discover_time = tnow()
321 self._state = ResourceState.DISCOVERED
324 """ Performs resource provisioning.
326 This method is resposible for provisioning one resource.
327 After this method has been successfully invoked, the resource
328 should be acccesible/controllable by the RM.
329 This method should be redefined when necessary in child classes.
331 self._provision_time = tnow()
332 self._state = ResourceState.PROVISIONED
335 """ Starts the resource.
337 There is no generic start behavior for all resources.
338 This method should be redefined when necessary in child classes.
340 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
341 self.error("Wrong state %s for start" % self.state)
344 self._start_time = tnow()
345 self._state = ResourceState.STARTED
348 """ Stops the resource.
350 There is no generic stop behavior for all resources.
351 This method should be redefined when necessary in child classes.
353 if not self._state in [ResourceState.STARTED]:
354 self.error("Wrong state %s for stop" % self.state)
357 self._stop_time = tnow()
358 self._state = ResourceState.STOPPED
360 def set(self, name, value):
361 """ Set the value of the attribute
363 :param name: Name of the attribute
365 :param name: Value of the attribute
368 attr = self._attrs[name]
372 """ Returns the value of the attribute
374 :param name: Name of the attribute
378 attr = self._attrs[name]
381 def enable_trace(self, name):
382 """ Explicitly enable trace generation
384 :param name: Name of the trace
387 trace = self._trcs[name]
390 def trace_enabled(self, name):
391 """Returns True if trace is enables
393 :param name: Name of the trace
396 trace = self._trcs[name]
399 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
400 """ Get information on collected trace
402 :param name: Name of the trace
405 :param attr: Can be one of:
406 - TraceAttr.ALL (complete trace content),
407 - TraceAttr.STREAM (block in bytes to read starting at offset),
408 - TraceAttr.PATH (full path to the trace file),
409 - TraceAttr.SIZE (size of trace file).
412 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
415 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
422 def register_condition(self, action, group, state, time = None):
423 """ Registers a condition on the resource manager to allow execution
424 of 'action' only after 'time' has elapsed from the moment all resources
425 in 'group' reached state 'state'
427 :param action: Action to restrict to condition (either 'START' or 'STOP')
429 :param group: Group of RMs to wait for (list of guids)
430 :type group: int or list of int
431 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
433 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
438 if not action in self.conditions:
439 self._conditions[action] = list()
441 conditions = self.conditions.get(action)
443 # For each condition to register a tuple of (group, state, time) is
444 # added to the 'action' list
445 if not isinstance(group, list):
448 conditions.append((group, state, time))
450 def unregister_condition(self, group, action = None):
451 """ Removed conditions for a certain group of guids
453 :param action: Action to restrict to condition (either 'START' or 'STOP')
456 :param group: Group of RMs to wait for (list of guids)
457 :type group: int or list of int
460 # For each condition a tuple of (group, state, time) is
461 # added to the 'action' list
462 if not isinstance(group, list):
465 for act, conditions in self.conditions.iteritems():
466 if action and act != action:
469 for condition in list(conditions):
470 (grp, state, time) = condition
472 # If there is an intersection between grp and group,
473 # then remove intersected elements
474 intsec = set(group).intersection(set(grp))
476 idx = conditions.index(condition)
478 newgrp.difference_update(intsec)
479 conditions[idx] = (newgrp, state, time)
481 def get_connected(self, rtype = None):
482 """ Returns the list of RM with the type 'rtype'
484 :param rtype: Type of the RM we look for
486 :return: list of guid
489 rclass = ResourceFactory.get_resource_type(rtype)
490 for guid in self.connections:
491 rm = self.ec.get_resource(guid)
492 if not rtype or isinstance(rm, rclass):
496 def _needs_reschedule(self, group, state, time):
497 """ Internal method that verify if 'time' has elapsed since
498 all elements in 'group' have reached state 'state'.
500 :param group: Group of RMs to wait for (list of guids)
501 :type group: int or list of int
502 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
504 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
507 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
508 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
509 For the moment, 2m30s is not a correct syntax.
513 delay = reschedule_delay
515 # check state and time elapsed on all RMs
517 rm = self.ec.get_resource(guid)
518 # If the RM state is lower than the requested state we must
519 # reschedule (e.g. if RM is READY but we required STARTED).
524 # If there is a time restriction, we must verify the
525 # restriction is satisfied
527 if state == ResourceState.DISCOVERED:
529 if state == ResourceState.PROVISIONED:
530 t = rm.provision_time
531 elif state == ResourceState.READY:
533 elif state == ResourceState.STARTED:
535 elif state == ResourceState.STOPPED:
538 # Only keep time information for START and STOP
541 # time already elapsed since RM changed state
542 waited = "%fs" % tdiffsec(tnow(), t)
545 wait = tdiffsec(stabsformat(time), stabsformat(waited))
552 return reschedule, delay
554 def set_with_conditions(self, name, value, group, state, time):
555 """ Set value 'value' on attribute with name 'name' when 'time'
556 has elapsed since all elements in 'group' have reached state
559 :param name: Name of the attribute to set
561 :param name: Value of the attribute to set
563 :param group: Group of RMs to wait for (list of guids)
564 :type group: int or list of int
565 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
567 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
572 delay = reschedule_delay
574 ## evaluate if set conditions are met
576 # only can set with conditions after the RM is started
577 if self.state != ResourceState.STARTED:
580 reschedule, delay = self._needs_reschedule(group, state, time)
583 callback = functools.partial(self.set_with_conditions,
584 name, value, group, state, time)
585 self.ec.schedule(delay, callback)
587 self.set(name, value)
589 def start_with_conditions(self):
590 """ Starts RM when all the conditions in self.conditions for
591 action 'START' are satisfied.
595 delay = reschedule_delay
597 ## evaluate if set conditions are met
599 # only can start when RM is either STOPPED or READY
600 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
602 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
604 start_conditions = self.conditions.get(ResourceAction.START, [])
606 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
608 # Verify all start conditions are met
609 for (group, state, time) in start_conditions:
610 # Uncomment for debug
613 # rm = self.ec.get_resource(guid)
614 # unmet.append((guid, rm._state))
616 #self.debug("---- WAITED STATES ---- %s" % unmet )
618 reschedule, delay = self._needs_reschedule(group, state, time)
623 self.ec.schedule(delay, self.start_with_conditions)
625 self.debug("----- STARTING ---- ")
628 def stop_with_conditions(self):
629 """ Stops RM when all the conditions in self.conditions for
630 action 'STOP' are satisfied.
634 delay = reschedule_delay
636 ## evaluate if set conditions are met
638 # only can stop when RM is STARTED
639 if self.state != ResourceState.STARTED:
642 self.debug(" ---- STOP CONDITIONS ---- %s" %
643 self.conditions.get(ResourceAction.STOP))
645 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
646 for (group, state, time) in stop_conditions:
647 reschedule, delay = self._needs_reschedule(group, state, time)
652 callback = functools.partial(self.stop_with_conditions)
653 self.ec.schedule(delay, callback)
655 self.debug(" ----- STOPPING ---- ")
659 """ Execute all steps required for the RM to reach the state READY
662 if self._state > ResourceState.READY:
663 self.error("Wrong state %s for deploy" % self.state)
666 self.debug("----- READY ---- ")
667 self._ready_time = tnow()
668 self._state = ResourceState.READY
671 """Release any resources used by this RM
674 self._release_time = tnow()
675 self._state = ResourceState.RELEASED
678 """ Mark ResourceManager as FINISHED
681 self._finish_time = tnow()
682 self._state = ResourceState.FINISHED
685 """ Mark ResourceManager as FAILED
688 self._failed_time = tnow()
689 self._state = ResourceState.FAILED
691 def connect(self, guid):
692 """ Performs actions that need to be taken upon associating RMs.
693 This method should be redefined when necessary in child classes.
697 def disconnect(self, guid):
698 """ Performs actions that need to be taken upon disassociating RMs.
699 This method should be redefined when necessary in child classes.
703 def valid_connection(self, guid):
704 """Checks whether a connection with the other RM
706 This method need to be redefined by each new Resource Manager.
708 :param guid: Guid of the current Resource Manager
716 class ResourceFactory(object):
717 _resource_types = dict()
720 def resource_types(cls):
721 """Return the type of the Class"""
722 return cls._resource_types
725 def get_resource_type(cls, rtype):
726 """Return the type of the Class"""
727 return cls._resource_types.get(rtype)
730 def register_type(cls, rclass):
731 """Register a new Ressource Manager"""
732 cls._resource_types[rclass.rtype()] = rclass
735 def create(cls, rtype, ec, guid):
736 """Create a new instance of a Ressource Manager"""
737 rclass = cls._resource_types[rtype]
738 return rclass(ec, guid)
740 def populate_factory():
741 """Register all the possible RM that exists in the current version of Nepi.
743 # Once the factory is populated, don't repopulate
744 if not ResourceFactory.resource_types():
745 for rclass in find_types():
746 ResourceFactory.register_type(rclass)
749 """Look into the different folders to find all the
750 availables Resources Managers
752 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
753 search_path = set(search_path.split(" "))
756 import nepi.resources
757 path = os.path.dirname(nepi.resources.__file__)
758 search_path.add(path)
762 for importer, modname, ispkg in pkgutil.walk_packages(search_path,
763 prefix = "nepi.resources."):
765 loader = importer.find_module(modname)
768 # Notice: Repeated calls to load_module will act as a reload of teh module
769 module = loader.load_module(modname)
771 for attrname in dir(module):
772 if attrname.startswith("_"):
775 attr = getattr(module, attrname)
777 if attr == ResourceManager:
780 if not inspect.isclass(attr):
783 if issubclass(attr, ResourceManager):
788 err = traceback.format_exc()
789 logger = logging.getLogger("Resource.find_types()")
790 logger.error("Error while loading Resource Managers %s" % err)