2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "1s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 self._state = ResourceState.NEW
205 self._start_time = None
206 self._stop_time = None
207 self._discover_time = None
208 self._provision_time = None
209 self._ready_time = None
210 self._release_time = None
211 self._finish_time = None
212 self._failed_time = None
216 """ Returns the global unique identifier of the RM """
221 """ Returns the Experiment Controller """
225 def connections(self):
226 """ Returns the set of guids of connected RMs"""
227 return self._connections
230 def conditions(self):
231 """ Returns the conditions to which the RM is subjected to.
233 The object returned by this method is a dictionary indexed by
235 return self._conditions
238 def start_time(self):
239 """ Returns the start time of the RM as a timestamp"""
240 return self._start_time
244 """ Returns the stop time of the RM as a timestamp"""
245 return self._stop_time
248 def discover_time(self):
249 """ Returns the time discovering was finished for the RM as a timestamp"""
250 return self._discover_time
253 def provision_time(self):
254 """ Returns the time provisioning was finished for the RM as a timestamp"""
255 return self._provision_time
258 def ready_time(self):
259 """ Returns the time deployment was finished for the RM as a timestamp"""
260 return self._ready_time
263 def release_time(self):
264 """ Returns the release time of the RM as a timestamp"""
265 return self._release_time
268 def finish_time(self):
269 """ Returns the finalization time of the RM as a timestamp"""
270 return self._finish_time
273 def failed_time(self):
274 """ Returns the time failure occured for the RM as a timestamp"""
275 return self._failed_time
279 """ Get the state of the current RM """
282 def log_message(self, msg):
283 """ Returns the log message formatted with added information.
285 :param msg: text message
289 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
291 def register_connection(self, guid):
292 """ Registers a connection to the RM identified by guid
294 :param guid: Global unique identified of the RM to connect to
297 if self.valid_connection(guid):
299 self._connections.add(guid)
301 def unregister_connection(self, guid):
302 """ Removes a registered connection to the RM identified by guid
304 :param guid: Global unique identified of the RM to connect to
307 if guid in self._connections:
308 self.disconnect(guid)
309 self._connections.remove(guid)
312 """ Performs resource discovery.
314 This method is resposible for selecting an individual resource
315 matching user requirements.
316 This method should be redefined when necessary in child classes.
318 self._discover_time = tnow()
319 self._state = ResourceState.DISCOVERED
322 """ Performs resource provisioning.
324 This method is resposible for provisioning one resource.
325 After this method has been successfully invoked, the resource
326 should be acccesible/controllable by the RM.
327 This method should be redefined when necessary in child classes.
329 self._provision_time = tnow()
330 self._state = ResourceState.PROVISIONED
333 """ Starts the resource.
335 There is no generic start behavior for all resources.
336 This method should be redefined when necessary in child classes.
338 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
339 self.error("Wrong state %s for start" % self.state)
342 self._start_time = tnow()
343 self._state = ResourceState.STARTED
346 """ Stops the resource.
348 There is no generic stop behavior for all resources.
349 This method should be redefined when necessary in child classes.
351 if not self._state in [ResourceState.STARTED]:
352 self.error("Wrong state %s for stop" % self.state)
355 self._stop_time = tnow()
356 self._state = ResourceState.STOPPED
358 def set(self, name, value):
359 """ Set the value of the attribute
361 :param name: Name of the attribute
363 :param name: Value of the attribute
366 attr = self._attrs[name]
370 """ Returns the value of the attribute
372 :param name: Name of the attribute
376 attr = self._attrs[name]
379 def register_trace(self, name):
380 """ Explicitly enable trace generation
382 :param name: Name of the trace
385 trace = self._trcs[name]
388 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
389 """ Get information on collected trace
391 :param name: Name of the trace
394 :param attr: Can be one of:
395 - TraceAttr.ALL (complete trace content),
396 - TraceAttr.STREAM (block in bytes to read starting at offset),
397 - TraceAttr.PATH (full path to the trace file),
398 - TraceAttr.SIZE (size of trace file).
401 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
404 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
411 def register_condition(self, action, group, state, time = None):
412 """ Registers a condition on the resource manager to allow execution
413 of 'action' only after 'time' has elapsed from the moment all resources
414 in 'group' reached state 'state'
416 :param action: Action to restrict to condition (either 'START' or 'STOP')
418 :param group: Group of RMs to wait for (list of guids)
419 :type group: int or list of int
420 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
422 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
427 if not action in self.conditions:
428 self._conditions[action] = list()
430 conditions = self.conditions.get(action)
432 # For each condition to register a tuple of (group, state, time) is
433 # added to the 'action' list
434 if not isinstance(group, list):
437 conditions.append((group, state, time))
439 def unregister_condition(self, group, action = None):
440 """ Removed conditions for a certain group of guids
442 :param action: Action to restrict to condition (either 'START' or 'STOP')
445 :param group: Group of RMs to wait for (list of guids)
446 :type group: int or list of int
449 # For each condition a tuple of (group, state, time) is
450 # added to the 'action' list
451 if not isinstance(group, list):
454 for act, conditions in self.conditions.iteritems():
455 if action and act != action:
458 for condition in list(conditions):
459 (grp, state, time) = condition
461 # If there is an intersection between grp and group,
462 # then remove intersected elements
463 intsec = set(group).intersection(set(grp))
465 idx = conditions.index(condition)
467 newgrp.difference_update(intsec)
468 conditions[idx] = (newgrp, state, time)
470 def get_connected(self, rtype = None):
471 """ Returns the list of RM with the type 'rtype'
473 :param rtype: Type of the RM we look for
475 :return: list of guid
478 for guid in self.connections:
479 rm = self.ec.get_resource(guid)
480 if not rtype or rm.rtype() == rtype:
484 def _needs_reschedule(self, group, state, time):
485 """ Internal method that verify if 'time' has elapsed since
486 all elements in 'group' have reached state 'state'.
488 :param group: Group of RMs to wait for (list of guids)
489 :type group: int or list of int
490 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
492 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
495 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
496 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
497 For the moment, 2m30s is not a correct syntax.
501 delay = reschedule_delay
503 # check state and time elapsed on all RMs
505 rm = self.ec.get_resource(guid)
506 # If the RM state is lower than the requested state we must
507 # reschedule (e.g. if RM is READY but we required STARTED).
512 # If there is a time restriction, we must verify the
513 # restriction is satisfied
515 if state == ResourceState.DISCOVERED:
517 if state == ResourceState.PROVISIONED:
518 t = rm.provision_time
519 elif state == ResourceState.READY:
521 elif state == ResourceState.STARTED:
523 elif state == ResourceState.STOPPED:
526 # Only keep time information for START and STOP
529 # time already elapsed since RM changed state
530 waited = "%fs" % tdiffsec(tnow(), t)
533 wait = tdiffsec(stabsformat(time), stabsformat(waited))
540 return reschedule, delay
542 def set_with_conditions(self, name, value, group, state, time):
543 """ Set value 'value' on attribute with name 'name' when 'time'
544 has elapsed since all elements in 'group' have reached state
547 :param name: Name of the attribute to set
549 :param name: Value of the attribute to set
551 :param group: Group of RMs to wait for (list of guids)
552 :type group: int or list of int
553 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
555 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
560 delay = reschedule_delay
562 ## evaluate if set conditions are met
564 # only can set with conditions after the RM is started
565 if self.state != ResourceState.STARTED:
568 reschedule, delay = self._needs_reschedule(group, state, time)
571 callback = functools.partial(self.set_with_conditions,
572 name, value, group, state, time)
573 self.ec.schedule(delay, callback)
575 self.set(name, value)
577 def start_with_conditions(self):
578 """ Starts RM when all the conditions in self.conditions for
579 action 'START' are satisfied.
583 delay = reschedule_delay
585 ## evaluate if set conditions are met
587 # only can start when RM is either STOPPED or READY
588 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
590 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
592 start_conditions = self.conditions.get(ResourceAction.START, [])
594 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
596 # Verify all start conditions are met
597 for (group, state, time) in start_conditions:
598 # Uncomment for debug
601 # rm = self.ec.get_resource(guid)
602 # unmet.append((guid, rm._state))
604 #self.debug("---- WAITED STATES ---- %s" % unmet )
606 reschedule, delay = self._needs_reschedule(group, state, time)
611 self.ec.schedule(delay, self.start_with_conditions)
613 self.debug("----- STARTING ---- ")
616 def stop_with_conditions(self):
617 """ Stops RM when all the conditions in self.conditions for
618 action 'STOP' are satisfied.
622 delay = reschedule_delay
624 ## evaluate if set conditions are met
626 # only can stop when RM is STARTED
627 if self.state != ResourceState.STARTED:
630 self.debug(" ---- STOP CONDITIONS ---- %s" %
631 self.conditions.get(ResourceAction.STOP))
633 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
634 for (group, state, time) in stop_conditions:
635 reschedule, delay = self._needs_reschedule(group, state, time)
640 callback = functools.partial(self.stop_with_conditions)
641 self.ec.schedule(delay, callback)
643 self.debug(" ----- STOPPING ---- ")
647 """ Execute all steps required for the RM to reach the state READY
650 if self._state > ResourceState.READY:
651 self.error("Wrong state %s for deploy" % self.state)
654 self.debug("----- READY ---- ")
655 self._ready_time = tnow()
656 self._state = ResourceState.READY
659 """Release any resources used by this RM
662 self._release_time = tnow()
663 self._state = ResourceState.RELEASED
666 """ Mark ResourceManager as FINISHED
669 self._finish_time = tnow()
670 self._state = ResourceState.FINISHED
673 """ Mark ResourceManager as FAILED
676 self._failed_time = tnow()
677 self._state = ResourceState.FAILED
679 def connect(self, guid):
680 """ Performs actions that need to be taken upon associating RMs.
681 This method should be redefined when necessary in child classes.
685 def disconnect(self, guid):
686 """ Performs actions that need to be taken upon disassociating RMs.
687 This method should be redefined when necessary in child classes.
691 def valid_connection(self, guid):
692 """Checks whether a connection with the other RM
694 This method need to be redefined by each new Resource Manager.
696 :param guid: Guid of the current Resource Manager
704 class ResourceFactory(object):
705 _resource_types = dict()
708 def resource_types(cls):
709 """Return the type of the Class"""
710 return cls._resource_types
713 def register_type(cls, rclass):
714 """Register a new Ressource Manager"""
715 cls._resource_types[rclass.rtype()] = rclass
718 def create(cls, rtype, ec, guid):
719 """Create a new instance of a Ressource Manager"""
720 rclass = cls._resource_types[rtype]
721 return rclass(ec, guid)
723 def populate_factory():
724 """Register all the possible RM that exists in the current version of Nepi.
726 for rclass in find_types():
727 ResourceFactory.register_type(rclass)
730 """Look into the different folders to find all the
731 availables Resources Managers
734 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
735 search_path = set(search_path.split(" "))
738 import nepi.resources
739 path = os.path.dirname(nepi.resources.__file__)
740 search_path.add(path)
744 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
745 loader = importer.find_module(modname)
747 module = loader.load_module(loader.fullname)
748 for attrname in dir(module):
749 if attrname.startswith("_"):
752 attr = getattr(module, attrname)
754 if attr == ResourceManager:
757 if not inspect.isclass(attr):
760 if issubclass(attr, ResourceManager):
765 err = traceback.format_exc()
766 logger = logging.getLogger("Resource.find_types()")
767 logger.error("Error while lading Resource Managers %s" % err)