2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "0.5s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 self._state = ResourceState.NEW
205 self._start_time = None
206 self._stop_time = None
207 self._discover_time = None
208 self._provision_time = None
209 self._ready_time = None
210 self._release_time = None
211 self._finish_time = None
212 self._failed_time = None
216 """ Returns the global unique identifier of the RM """
221 """ Returns the Experiment Controller """
225 def connections(self):
226 """ Returns the set of guids of connected RMs"""
227 return self._connections
230 def conditions(self):
231 """ Returns the conditions to which the RM is subjected to.
233 The object returned by this method is a dictionary indexed by
235 return self._conditions
238 def start_time(self):
239 """ Returns the start time of the RM as a timestamp"""
240 return self._start_time
244 """ Returns the stop time of the RM as a timestamp"""
245 return self._stop_time
248 def discover_time(self):
249 """ Returns the time discovering was finished for the RM as a timestamp"""
250 return self._discover_time
253 def provision_time(self):
254 """ Returns the time provisioning was finished for the RM as a timestamp"""
255 return self._provision_time
258 def ready_time(self):
259 """ Returns the time deployment was finished for the RM as a timestamp"""
260 return self._ready_time
263 def release_time(self):
264 """ Returns the release time of the RM as a timestamp"""
265 return self._release_time
268 def finish_time(self):
269 """ Returns the finalization time of the RM as a timestamp"""
270 return self._finish_time
273 def failed_time(self):
274 """ Returns the time failure occured for the RM as a timestamp"""
275 return self._failed_time
279 """ Get the state of the current RM """
282 def log_message(self, msg):
283 """ Returns the log message formatted with added information.
285 :param msg: text message
289 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
291 def connect(self, guid):
292 """ Establishes a connection to the RM identified by guid
294 :param guid: Global unique identified of the RM to connect to
297 if self.valid_connection(guid):
298 self._connections.add(guid)
300 def disconnect(self, guid):
301 """ Removes connection to the RM identified by guid
303 :param guid: Global unique identified of the RM to connect to
306 if guid in self._connections:
307 self._connections.remove(guid)
310 """ Performs resource discovery.
312 This method is resposible for selecting an individual resource
313 matching user requirements.
314 This method should be redefined when necessary in child classes.
316 self._discover_time = tnow()
317 self._state = ResourceState.DISCOVERED
320 """ Performs resource provisioning.
322 This method is resposible for provisioning one resource.
323 After this method has been successfully invoked, the resource
324 should be acccesible/controllable by the RM.
325 This method should be redefined when necessary in child classes.
327 self._provision_time = tnow()
328 self._state = ResourceState.PROVISIONED
331 """ Starts the resource.
333 There is no generic start behavior for all resources.
334 This method should be redefined when necessary in child classes.
336 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
337 self.error("Wrong state %s for start" % self.state)
340 self._start_time = tnow()
341 self._state = ResourceState.STARTED
344 """ Stops the resource.
346 There is no generic stop behavior for all resources.
347 This method should be redefined when necessary in child classes.
349 if not self._state in [ResourceState.STARTED]:
350 self.error("Wrong state %s for stop" % self.state)
353 self._stop_time = tnow()
354 self._state = ResourceState.STOPPED
356 def set(self, name, value):
357 """ Set the value of the attribute
359 :param name: Name of the attribute
361 :param name: Value of the attribute
364 attr = self._attrs[name]
368 """ Returns the value of the attribute
370 :param name: Name of the attribute
374 attr = self._attrs[name]
377 def register_trace(self, name):
378 """ Explicitly enable trace generation
380 :param name: Name of the trace
383 trace = self._trcs[name]
386 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
387 """ Get information on collected trace
389 :param name: Name of the trace
392 :param attr: Can be one of:
393 - TraceAttr.ALL (complete trace content),
394 - TraceAttr.STREAM (block in bytes to read starting at offset),
395 - TraceAttr.PATH (full path to the trace file),
396 - TraceAttr.SIZE (size of trace file).
399 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
402 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
409 def register_condition(self, action, group, state,
411 """ Registers a condition on the resource manager to allow execution
412 of 'action' only after 'time' has elapsed from the moment all resources
413 in 'group' reached state 'state'
415 :param action: Action to restrict to condition (either 'START' or 'STOP')
417 :param group: Group of RMs to wait for (list of guids)
418 :type group: int or list of int
419 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
421 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
425 conditions = self.conditions.get(action)
428 self._conditions[action] = conditions
430 # For each condition to register a tuple of (group, state, time) is
431 # added to the 'action' list
432 if not isinstance(group, list):
435 conditions.append((group, state, time))
437 def get_connected(self, rtype = None):
438 """ Returns the list of RM with the type 'rtype'
440 :param rtype: Type of the RM we look for
442 :return: list of guid
445 for guid in self.connections:
446 rm = self.ec.get_resource(guid)
447 if not rtype or rm.rtype() == rtype:
451 def _needs_reschedule(self, group, state, time):
452 """ Internal method that verify if 'time' has elapsed since
453 all elements in 'group' have reached state 'state'.
455 :param group: Group of RMs to wait for (list of guids)
456 :type group: int or list of int
457 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
459 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
462 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
463 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
464 For the moment, 2m30s is not a correct syntax.
468 delay = reschedule_delay
470 # check state and time elapsed on all RMs
472 rm = self.ec.get_resource(guid)
473 # If the RM state is lower than the requested state we must
474 # reschedule (e.g. if RM is READY but we required STARTED)
479 # If there is a time restriction, we must verify the
480 # restriction is satisfied
482 if state == ResourceState.DISCOVERED:
484 if state == ResourceState.PROVISIONED:
485 t = rm.provision_time
486 elif state == ResourceState.READY:
488 elif state == ResourceState.STARTED:
490 elif state == ResourceState.STOPPED:
493 # Only keep time information for START and STOP
496 # time already elapsed since RM changed state
497 waited = "%fs" % tdiffsec(tnow(), t)
500 wait = tdiffsec(stabsformat(time), stabsformat(waited))
507 return reschedule, delay
509 def set_with_conditions(self, name, value, group, state, time):
510 """ Set value 'value' on attribute with name 'name' when 'time'
511 has elapsed since all elements in 'group' have reached state
514 :param name: Name of the attribute to set
516 :param name: Value of the attribute to set
518 :param group: Group of RMs to wait for (list of guids)
519 :type group: int or list of int
520 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
522 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
527 delay = reschedule_delay
529 ## evaluate if set conditions are met
531 # only can set with conditions after the RM is started
532 if self.state != ResourceState.STARTED:
535 reschedule, delay = self._needs_reschedule(group, state, time)
538 callback = functools.partial(self.set_with_conditions,
539 name, value, group, state, time)
540 self.ec.schedule(delay, callback)
542 self.set(name, value)
544 def start_with_conditions(self):
545 """ Starts RM when all the conditions in self.conditions for
546 action 'START' are satisfied.
550 delay = reschedule_delay
552 ## evaluate if set conditions are met
554 # only can start when RM is either STOPPED or READY
555 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
557 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
559 start_conditions = self.conditions.get(ResourceAction.START, [])
561 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
563 # Verify all start conditions are met
564 for (group, state, time) in start_conditions:
565 # Uncomment for debug
568 # rm = self.ec.get_resource(guid)
569 # unmet.append((guid, rm._state))
571 #self.debug("---- WAITED STATES ---- %s" % unmet )
573 reschedule, delay = self._needs_reschedule(group, state, time)
578 self.ec.schedule(delay, self.start_with_conditions)
580 self.debug("----- STARTING ---- ")
583 def stop_with_conditions(self):
584 """ Stops RM when all the conditions in self.conditions for
585 action 'STOP' are satisfied.
589 delay = reschedule_delay
591 ## evaluate if set conditions are met
593 # only can stop when RM is STARTED
594 if self.state != ResourceState.STARTED:
597 self.debug(" ---- STOP CONDITIONS ---- %s" %
598 self.conditions.get(ResourceAction.STOP))
600 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
601 for (group, state, time) in stop_conditions:
602 reschedule, delay = self._needs_reschedule(group, state, time)
607 callback = functools.partial(self.stop_with_conditions)
608 self.ec.schedule(delay, callback)
610 self.debug(" ----- STOPPING ---- ")
614 """ Execute all steps required for the RM to reach the state READY
617 if self._state > ResourceState.READY:
618 self.error("Wrong state %s for deploy" % self.state)
621 self.debug("----- READY ---- ")
622 self._ready_time = tnow()
623 self._state = ResourceState.READY
626 """Release any resources used by this RM
629 self._release_time = tnow()
630 self._state = ResourceState.RELEASED
633 """ Mark ResourceManager as FINISHED
636 self._finish_time = tnow()
637 self._state = ResourceState.FINISHED
640 """ Mark ResourceManager as FAILED
643 self._failed_time = tnow()
644 self._state = ResourceState.FAILED
646 def valid_connection(self, guid):
647 """Checks whether a connection with the other RM
649 This method need to be redefined by each new Resource Manager.
651 :param guid: Guid of the current Resource Manager
659 class ResourceFactory(object):
660 _resource_types = dict()
663 def resource_types(cls):
664 """Return the type of the Class"""
665 return cls._resource_types
668 def register_type(cls, rclass):
669 """Register a new Ressource Manager"""
670 cls._resource_types[rclass.rtype()] = rclass
673 def create(cls, rtype, ec, guid):
674 """Create a new instance of a Ressource Manager"""
675 rclass = cls._resource_types[rtype]
676 return rclass(ec, guid)
678 def populate_factory():
679 """Register all the possible RM that exists in the current version of Nepi.
681 for rclass in find_types():
682 ResourceFactory.register_type(rclass)
685 """Look into the different folders to find all the
686 availables Resources Managers
689 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
690 search_path = set(search_path.split(" "))
693 import nepi.resources
694 path = os.path.dirname(nepi.resources.__file__)
695 search_path.add(path)
699 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
700 loader = importer.find_module(modname)
702 module = loader.load_module(loader.fullname)
703 for attrname in dir(module):
704 if attrname.startswith("_"):
707 attr = getattr(module, attrname)
709 if attr == ResourceManager:
712 if not inspect.isclass(attr):
715 if issubclass(attr, ResourceManager):
720 err = traceback.format_exc()
721 logger = logging.getLogger("Resource.find_types()")
722 logger.error("Error while lading Resource Managers %s" % err)