2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "0.5s"
49 ResourceState2str = dict({
50 ResourceState.NEW : "NEW",
51 ResourceState.DISCOVERED : "DISCOVERED",
52 ResourceState.PROVISIONED : "PROVISIONED",
53 ResourceState.READY : "READY",
54 ResourceState.STARTED : "STARTED",
55 ResourceState.STOPPED : "STOPPED",
56 ResourceState.FINISHED : "FINISHED",
57 ResourceState.FAILED : "FAILED",
58 ResourceState.RELEASED : "RELEASED",
62 """ Initializes template information (i.e. attributes and traces)
63 for the ResourceManager class
68 def clsinit_copy(cls):
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class, inheriting attributes and traces
76 # Decorator to invoke class initialization method
78 class ResourceManager(Logger):
84 def _register_attribute(cls, attr):
85 """ Resource subclasses will invoke this method to add a
89 cls._attributes[attr.name] = attr
92 def _remove_attribute(cls, name):
93 """ Resource subclasses will invoke this method to remove a
97 del cls._attributes[name]
100 def _register_trace(cls, trace):
101 """ Resource subclasses will invoke this method to add a
105 cls._traces[trace.name] = trace
108 def _remove_trace(cls, name):
109 """ Resource subclasses will invoke this method to remove a
113 del cls._traces[name]
116 def _register_attributes(cls):
117 """ Resource subclasses will invoke this method to register
124 def _register_traces(cls):
125 """ Resource subclasses will invoke this method to register
133 """ ResourceManager child classes have different attributes and traces.
134 Since the templates that hold the information of attributes and traces
135 are 'class attribute' dictionaries, initially they all point to the
136 parent class ResourceManager instances of those dictionaries.
137 In order to make these templates independent from the parent's one,
138 it is necessary re-initialize the corresponding dictionaries.
139 This is the objective of the _clsinit method
141 # static template for resource attributes
142 cls._attributes = dict()
143 cls._register_attributes()
145 # static template for resource traces
147 cls._register_traces()
150 def _clsinit_copy(cls):
151 """ Same as _clsinit, except that it also inherits all attributes and traces
152 from the parent class.
154 # static template for resource attributes
155 cls._attributes = copy.deepcopy(cls._attributes)
156 cls._register_attributes()
158 # static template for resource traces
159 cls._traces = copy.deepcopy(cls._traces)
160 cls._register_traces()
167 def get_attributes(cls):
168 """ Returns a copy of the attributes
171 return copy.deepcopy(cls._attributes.values())
175 """ Returns a copy of the traces
178 return copy.deepcopy(cls._traces.values())
180 def __init__(self, ec, guid):
181 super(ResourceManager, self).__init__(self.rtype())
184 self._ec = weakref.ref(ec)
185 self._connections = set()
186 self._conditions = dict()
188 # the resource instance gets a copy of all attributes
189 self._attrs = copy.deepcopy(self._attributes)
191 # the resource instance gets a copy of all traces
192 self._trcs = copy.deepcopy(self._traces)
194 self._state = ResourceState.NEW
196 self._start_time = None
197 self._stop_time = None
198 self._discover_time = None
199 self._provision_time = None
200 self._ready_time = None
201 self._release_time = None
212 def connections(self):
213 return self._connections
216 def conditions(self):
217 return self._conditions
220 def start_time(self):
221 """ Returns timestamp with the time the RM started """
222 return self._start_time
226 """ Returns timestamp with the time the RM stopped """
227 return self._stop_time
230 def discover_time(self):
231 """ Returns timestamp with the time the RM passed to state discovered """
232 return self._discover_time
235 def provision_time(self):
236 """ Returns timestamp with the time the RM passed to state provisioned """
237 return self._provision_time
240 def ready_time(self):
241 """ Returns timestamp with the time the RM passed to state ready """
242 return self._ready_time
245 def release_time(self):
246 """ Returns timestamp with the time the RM was released """
247 return self._release_time
253 def log_message(self, msg):
254 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
256 def connect(self, guid):
257 if self.valid_connection(guid):
258 self._connections.add(guid)
261 self._discover_time = strfnow()
262 self._state = ResourceState.DISCOVERED
265 self._provision_time = strfnow()
266 self._state = ResourceState.PROVISIONED
269 """ Start the Resource Manager
272 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
273 self.error("Wrong state %s for start" % self.state)
276 self._start_time = strfnow()
277 self._state = ResourceState.STARTED
280 """ Start the Resource Manager
283 if not self._state in [ResourceState.STARTED]:
284 self.error("Wrong state %s for stop" % self.state)
287 self._stop_time = strfnow()
288 self._state = ResourceState.STOPPED
290 def set(self, name, value):
291 """ Set the value of the attribute
293 :param name: Name of the attribute
295 :param name: Value of the attribute
298 attr = self._attrs[name]
302 """ Start the Resource Manager
304 :param name: Name of the attribute
308 attr = self._attrs[name]
311 def register_trace(self, name):
314 :param name: Name of the trace
317 trace = self._trcs[name]
320 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
321 """ Get information on collected trace
323 :param name: Name of the trace
326 :param attr: Can be one of:
327 - TraceAttr.ALL (complete trace content),
328 - TraceAttr.STREAM (block in bytes to read starting at offset),
329 - TraceAttr.PATH (full path to the trace file),
330 - TraceAttr.SIZE (size of trace file).
333 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
336 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
343 def register_condition(self, action, group, state,
345 """ Registers a condition on the resource manager to allow execution
346 of 'action' only after 'time' has elapsed from the moment all resources
347 in 'group' reached state 'state'
349 :param action: Action to restrict to condition (either 'START' or 'STOP')
351 :param group: Group of RMs to wait for (list of guids)
352 :type group: int or list of int
353 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
355 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
359 conditions = self.conditions.get(action)
362 self._conditions[action] = conditions
364 # For each condition to register a tuple of (group, state, time) is
365 # added to the 'action' list
366 if not isinstance(group, list):
369 conditions.append((group, state, time))
371 def get_connected(self, rtype):
373 for guid in self.connections:
374 rm = self.ec.get_resource(guid)
375 if rm.rtype() == rtype:
379 def _needs_reschedule(self, group, state, time):
380 """ Internal method that verify if 'time' has elapsed since
381 all elements in 'group' have reached state 'state'.
383 :param group: Group of RMs to wait for (list of guids)
384 :type group: int or list of int
385 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
387 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
390 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
391 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
392 For the moment, 2m30s is not a correct syntax.
396 delay = reschedule_delay
398 # check state and time elapsed on all RMs
400 rm = self.ec.get_resource(guid)
401 # If the RM state is lower than the requested state we must
402 # reschedule (e.g. if RM is READY but we required STARTED)
407 # If there is a time restriction, we must verify the
408 # restriction is satisfied
410 if state == ResourceState.DISCOVERED:
412 if state == ResourceState.PROVISIONED:
413 t = rm.provision_time
414 elif state == ResourceState.READY:
416 elif state == ResourceState.STARTED:
418 elif state == ResourceState.STOPPED:
421 # Only keep time information for START and STOP
424 d = strfdiff(strfnow(), t)
425 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
430 return reschedule, delay
432 def set_with_conditions(self, name, value, group, state, time):
433 """ Set value 'value' on attribute with name 'name' when 'time'
434 has elapsed since all elements in 'group' have reached state
437 :param name: Name of the attribute to set
439 :param name: Value of the attribute to set
441 :param group: Group of RMs to wait for (list of guids)
442 :type group: int or list of int
443 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
445 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
451 delay = reschedule_delay
453 ## evaluate if set conditions are met
455 # only can set with conditions after the RM is started
456 if self.state != ResourceState.STARTED:
459 reschedule, delay = self._needs_reschedule(group, state, time)
462 callback = functools.partial(self.set_with_conditions,
463 name, value, group, state, time)
464 self.ec.schedule(delay, callback)
466 self.set(name, value)
468 def start_with_conditions(self):
469 """ Starts RM when all the conditions in self.conditions for
470 action 'START' are satisfied.
474 delay = reschedule_delay
476 ## evaluate if set conditions are met
478 # only can start when RM is either STOPPED or READY
479 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
481 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
483 start_conditions = self.conditions.get(ResourceAction.START, [])
485 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
487 # Verify all start conditions are met
488 for (group, state, time) in start_conditions:
489 # Uncomment for debug
492 # rm = self.ec.get_resource(guid)
493 # unmet.append((guid, rm._state))
495 #self.debug("---- WAITED STATES ---- %s" % unmet )
497 reschedule, delay = self._needs_reschedule(group, state, time)
502 self.ec.schedule(delay, self.start_with_conditions)
504 self.debug("----- STARTING ---- ")
507 def stop_with_conditions(self):
508 """ Stops RM when all the conditions in self.conditions for
509 action 'STOP' are satisfied.
513 delay = reschedule_delay
515 ## evaluate if set conditions are met
517 # only can stop when RM is STARTED
518 if self.state != ResourceState.STARTED:
521 self.debug(" ---- STOP CONDITIONS ---- %s" %
522 self.conditions.get(ResourceAction.STOP))
524 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
525 for (group, state, time) in stop_conditions:
526 reschedule, delay = self._needs_reschedule(group, state, time)
531 callback = functools.partial(self.stop_with_conditions)
532 self.ec.schedule(delay, callback)
534 self.debug(" ----- STOPPING ---- ")
538 """ Execute all steps required for the RM to reach the state READY
541 if self._state > ResourceState.READY:
542 self.error("Wrong state %s for deploy" % self.state)
545 self.debug("----- DEPLOYING ---- ")
546 self._ready_time = strfnow()
547 self._state = ResourceState.READY
550 """Clean the resource at the end of the Experiment and change the status
553 self._release_time = strfnow()
554 self._state = ResourceState.RELEASED
556 def valid_connection(self, guid):
557 """Check if the connection is available.
559 :param guid: Guid of the current Resource Manager
567 class ResourceFactory(object):
568 _resource_types = dict()
571 def resource_types(cls):
572 return cls._resource_types
575 def register_type(cls, rclass):
576 cls._resource_types[rclass.rtype()] = rclass
579 def create(cls, rtype, ec, guid):
580 rclass = cls._resource_types[rtype]
581 return rclass(ec, guid)
583 def populate_factory():
584 for rclass in find_types():
585 ResourceFactory.register_type(rclass)
588 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
589 search_path = set(search_path.split(" "))
591 import nepi.resources
592 path = os.path.dirname(nepi.resources.__file__)
593 search_path.add(path)
597 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
598 loader = importer.find_module(modname)
600 module = loader.load_module(loader.fullname)
601 for attrname in dir(module):
602 if attrname.startswith("_"):
605 attr = getattr(module, attrname)
607 if attr == ResourceManager:
610 if not inspect.isclass(attr):
613 if issubclass(attr, ResourceManager):
618 err = traceback.format_exc()
619 logger = logging.getLogger("Resource.find_types()")
620 logger.error("Error while lading Resource Managers %s" % err)