2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "0.5s"
34 """ Action that a user can order to a Resource Manager
42 """ State of a Resource Manager
55 ResourceState2str = dict({
56 ResourceState.NEW : "NEW",
57 ResourceState.DISCOVERED : "DISCOVERED",
58 ResourceState.PROVISIONED : "PROVISIONED",
59 ResourceState.READY : "READY",
60 ResourceState.STARTED : "STARTED",
61 ResourceState.STOPPED : "STOPPED",
62 ResourceState.FINISHED : "FINISHED",
63 ResourceState.FAILED : "FAILED",
64 ResourceState.RELEASED : "RELEASED",
68 """ Initializes template information (i.e. attributes and traces)
69 for the ResourceManager class
74 def clsinit_copy(cls):
75 """ Initializes template information (i.e. attributes and traces)
76 for the ResourceManager class, inheriting attributes and traces
82 # Decorator to invoke class initialization method
84 class ResourceManager(Logger):
90 def _register_attribute(cls, attr):
91 """ Resource subclasses will invoke this method to add a
95 cls._attributes[attr.name] = attr
98 def _remove_attribute(cls, name):
99 """ Resource subclasses will invoke this method to remove a
103 del cls._attributes[name]
106 def _register_trace(cls, trace):
107 """ Resource subclasses will invoke this method to add a
111 cls._traces[trace.name] = trace
114 def _remove_trace(cls, name):
115 """ Resource subclasses will invoke this method to remove a
119 del cls._traces[name]
122 def _register_attributes(cls):
123 """ Resource subclasses will invoke this method to register
130 def _register_traces(cls):
131 """ Resource subclasses will invoke this method to register
139 """ ResourceManager child classes have different attributes and traces.
140 Since the templates that hold the information of attributes and traces
141 are 'class attribute' dictionaries, initially they all point to the
142 parent class ResourceManager instances of those dictionaries.
143 In order to make these templates independent from the parent's one,
144 it is necessary re-initialize the corresponding dictionaries.
145 This is the objective of the _clsinit method
147 # static template for resource attributes
148 cls._attributes = dict()
149 cls._register_attributes()
151 # static template for resource traces
153 cls._register_traces()
156 def _clsinit_copy(cls):
157 """ Same as _clsinit, except that it also inherits all attributes and traces
158 from the parent class.
160 # static template for resource attributes
161 cls._attributes = copy.deepcopy(cls._attributes)
162 cls._register_attributes()
164 # static template for resource traces
165 cls._traces = copy.deepcopy(cls._traces)
166 cls._register_traces()
170 """ Returns the type of the Resource Manager
176 def get_attributes(cls):
177 """ Returns a copy of the attributes
180 return copy.deepcopy(cls._attributes.values())
184 """ Returns a copy of the traces
187 return copy.deepcopy(cls._traces.values())
189 def __init__(self, ec, guid):
190 super(ResourceManager, self).__init__(self.rtype())
193 self._ec = weakref.ref(ec)
194 self._connections = set()
195 self._conditions = dict()
197 # the resource instance gets a copy of all attributes
198 self._attrs = copy.deepcopy(self._attributes)
200 # the resource instance gets a copy of all traces
201 self._trcs = copy.deepcopy(self._traces)
203 self._state = ResourceState.NEW
205 self._start_time = None
206 self._stop_time = None
207 self._discover_time = None
208 self._provision_time = None
209 self._ready_time = None
210 self._release_time = None
214 """ Returns the guid of the current RM """
219 """ Returns the Experiment Controller """
223 def connections(self):
224 """ Returns the set of connection for this RM"""
225 return self._connections
228 def conditions(self):
229 """ Returns the list of conditions for this RM
230 The list is a dictionary with for each action, a list of tuple
231 describing the conditions. """
232 return self._conditions
235 def start_time(self):
236 """ Returns timestamp with the time the RM started """
237 return self._start_time
241 """ Returns timestamp with the time the RM stopped """
242 return self._stop_time
245 def discover_time(self):
246 """ Returns timestamp with the time the RM passed to state discovered """
247 return self._discover_time
250 def provision_time(self):
251 """ Returns timestamp with the time the RM passed to state provisioned """
252 return self._provision_time
255 def ready_time(self):
256 """ Returns timestamp with the time the RM passed to state ready """
257 return self._ready_time
260 def release_time(self):
261 """ Returns timestamp with the time the RM was released """
262 return self._release_time
266 """ Get the state of the current RM """
269 def log_message(self, msg):
270 """ Improve debugging message by adding more information
271 as the guid and the type of the RM
273 :param msg: Message to log
277 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
279 def connect(self, guid):
280 """ Connect the current RM with the RM 'guid'
282 :param guid: Guid of the RM the current RM will be connected
285 if self.valid_connection(guid):
286 self._connections.add(guid)
289 """ Discover the Resource. As it is specific for each RM,
290 this method take the time when the RM become DISCOVERED and
291 change the status """
292 self._discover_time = strfnow()
293 self._state = ResourceState.DISCOVERED
296 """ Provision the Resource. As it is specific for each RM,
297 this method take the time when the RM become PROVISIONNED and
298 change the status """
299 self._provision_time = strfnow()
300 self._state = ResourceState.PROVISIONED
303 """ Start the Resource Manager. As it is specific to each RM, this methods
304 just change, after some verifications, the status to STARTED and save the time.
307 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
308 self.error("Wrong state %s for start" % self.state)
311 self._start_time = strfnow()
312 self._state = ResourceState.STARTED
315 """ Stop the Resource Manager. As it is specific to each RM, this methods
316 just change, after some verifications, the status to STOPPED and save the time.
319 if not self._state in [ResourceState.STARTED]:
320 self.error("Wrong state %s for stop" % self.state)
323 self._stop_time = strfnow()
324 self._state = ResourceState.STOPPED
326 def set(self, name, value):
327 """ Set the value of the attribute
329 :param name: Name of the attribute
331 :param name: Value of the attribute
334 attr = self._attrs[name]
338 """ Start the Resource Manager
340 :param name: Name of the attribute
344 attr = self._attrs[name]
347 def register_trace(self, name):
350 :param name: Name of the trace
353 trace = self._trcs[name]
356 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
357 """ Get information on collected trace
359 :param name: Name of the trace
362 :param attr: Can be one of:
363 - TraceAttr.ALL (complete trace content),
364 - TraceAttr.STREAM (block in bytes to read starting at offset),
365 - TraceAttr.PATH (full path to the trace file),
366 - TraceAttr.SIZE (size of trace file).
369 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
372 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
379 def register_condition(self, action, group, state,
381 """ Registers a condition on the resource manager to allow execution
382 of 'action' only after 'time' has elapsed from the moment all resources
383 in 'group' reached state 'state'
385 :param action: Action to restrict to condition (either 'START' or 'STOP')
387 :param group: Group of RMs to wait for (list of guids)
388 :type group: int or list of int
389 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
391 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
395 conditions = self.conditions.get(action)
398 self._conditions[action] = conditions
400 # For each condition to register a tuple of (group, state, time) is
401 # added to the 'action' list
402 if not isinstance(group, list):
405 conditions.append((group, state, time))
407 def get_connected(self, rtype):
408 """ Return the list of RM with the type 'rtype'
410 :param rtype: Type of the RM we look for
412 :return: list of guid
415 for guid in self.connections:
416 rm = self.ec.get_resource(guid)
417 if rm.rtype() == rtype:
421 def _needs_reschedule(self, group, state, time):
422 """ Internal method that verify if 'time' has elapsed since
423 all elements in 'group' have reached state 'state'.
425 :param group: Group of RMs to wait for (list of guids)
426 :type group: int or list of int
427 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
429 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
432 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
433 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
434 For the moment, 2m30s is not a correct syntax.
438 delay = reschedule_delay
440 # check state and time elapsed on all RMs
442 rm = self.ec.get_resource(guid)
443 # If the RM state is lower than the requested state we must
444 # reschedule (e.g. if RM is READY but we required STARTED)
449 # If there is a time restriction, we must verify the
450 # restriction is satisfied
452 if state == ResourceState.DISCOVERED:
454 if state == ResourceState.PROVISIONED:
455 t = rm.provision_time
456 elif state == ResourceState.READY:
458 elif state == ResourceState.STARTED:
460 elif state == ResourceState.STOPPED:
463 # Only keep time information for START and STOP
466 d = strfdiff(strfnow(), t)
467 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
472 return reschedule, delay
474 def set_with_conditions(self, name, value, group, state, time):
475 """ Set value 'value' on attribute with name 'name' when 'time'
476 has elapsed since all elements in 'group' have reached state
479 :param name: Name of the attribute to set
481 :param name: Value of the attribute to set
483 :param group: Group of RMs to wait for (list of guids)
484 :type group: int or list of int
485 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
487 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
492 delay = reschedule_delay
494 ## evaluate if set conditions are met
496 # only can set with conditions after the RM is started
497 if self.state != ResourceState.STARTED:
500 reschedule, delay = self._needs_reschedule(group, state, time)
503 callback = functools.partial(self.set_with_conditions,
504 name, value, group, state, time)
505 self.ec.schedule(delay, callback)
507 self.set(name, value)
509 def start_with_conditions(self):
510 """ Starts RM when all the conditions in self.conditions for
511 action 'START' are satisfied.
515 delay = reschedule_delay
517 ## evaluate if set conditions are met
519 # only can start when RM is either STOPPED or READY
520 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
522 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
524 start_conditions = self.conditions.get(ResourceAction.START, [])
526 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
528 # Verify all start conditions are met
529 for (group, state, time) in start_conditions:
530 # Uncomment for debug
533 # rm = self.ec.get_resource(guid)
534 # unmet.append((guid, rm._state))
536 #self.debug("---- WAITED STATES ---- %s" % unmet )
538 reschedule, delay = self._needs_reschedule(group, state, time)
543 self.ec.schedule(delay, self.start_with_conditions)
545 self.debug("----- STARTING ---- ")
548 def stop_with_conditions(self):
549 """ Stops RM when all the conditions in self.conditions for
550 action 'STOP' are satisfied.
554 delay = reschedule_delay
556 ## evaluate if set conditions are met
558 # only can stop when RM is STARTED
559 if self.state != ResourceState.STARTED:
562 self.debug(" ---- STOP CONDITIONS ---- %s" %
563 self.conditions.get(ResourceAction.STOP))
565 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
566 for (group, state, time) in stop_conditions:
567 reschedule, delay = self._needs_reschedule(group, state, time)
572 callback = functools.partial(self.stop_with_conditions)
573 self.ec.schedule(delay, callback)
575 self.debug(" ----- STOPPING ---- ")
579 """ Execute all steps required for the RM to reach the state READY
582 if self._state > ResourceState.READY:
583 self.error("Wrong state %s for deploy" % self.state)
586 self.debug("----- READY ---- ")
587 self._ready_time = strfnow()
588 self._state = ResourceState.READY
591 """Clean the resource at the end of the Experiment and change the status
594 self._release_time = strfnow()
595 self._state = ResourceState.RELEASED
597 def valid_connection(self, guid):
598 """Check if the connection is available. This method need to be
599 redefined by each new Resource Manager.
601 :param guid: Guid of the current Resource Manager
609 class ResourceFactory(object):
610 _resource_types = dict()
613 def resource_types(cls):
614 """Return the type of the Class"""
615 return cls._resource_types
618 def register_type(cls, rclass):
619 """Register a new Ressource Manager"""
620 cls._resource_types[rclass.rtype()] = rclass
623 def create(cls, rtype, ec, guid):
624 """Create a new instance of a Ressource Manager"""
625 rclass = cls._resource_types[rtype]
626 return rclass(ec, guid)
628 def populate_factory():
629 """Register all the possible RM that exists in the current version of Nepi.
631 for rclass in find_types():
632 ResourceFactory.register_type(rclass)
635 """Look into the different folders to find all the
636 availables Resources Managers
639 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
640 search_path = set(search_path.split(" "))
643 import nepi.resources
644 path = os.path.dirname(nepi.resources.__file__)
645 search_path.add(path)
649 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
650 loader = importer.find_module(modname)
652 module = loader.load_module(loader.fullname)
653 for attrname in dir(module):
654 if attrname.startswith("_"):
657 attr = getattr(module, attrname)
659 if attr == ResourceManager:
662 if not inspect.isclass(attr):
665 if issubclass(attr, ResourceManager):
670 err = traceback.format_exc()
671 logger = logging.getLogger("Resource.find_types()")
672 logger.error("Error while lading Resource Managers %s" % err)