2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "0.5s"
49 ResourceState2str = dict({
51 DISCOVERED = "DISCOVERED",
52 PROVISIONED = "PROVISIONED",
56 FINISHED = "FINISHED",
58 RELEASED = "RELEASED",
62 """ Initializes template information (i.e. attributes and traces)
63 for the ResourceManager class
68 def clsinit_copy(cls):
69 """ Initializes template information (i.e. attributes and traces)
70 for the ResourceManager class, inheriting attributes and traces
76 # Decorator to invoke class initialization method
78 class ResourceManager(object):
84 def _register_attribute(cls, attr):
85 """ Resource subclasses will invoke this method to add a
89 cls._attributes[attr.name] = attr
92 def _remove_attribute(cls, name):
93 """ Resource subclasses will invoke this method to remove a
97 del cls._attributes[name]
100 def _register_trace(cls, trace):
101 """ Resource subclasses will invoke this method to add a
105 cls._traces[trace.name] = trace
108 def _remove_trace(cls, name):
109 """ Resource subclasses will invoke this method to remove a
113 del cls._traces[name]
116 def _register_attributes(cls):
117 """ Resource subclasses will invoke this method to register
124 def _register_traces(cls):
125 """ Resource subclasses will invoke this method to register
133 """ ResourceManager child classes have different attributes and traces.
134 Since the templates that hold the information of attributes and traces
135 are 'class attribute' dictionaries, initially they all point to the
136 parent class ResourceManager instances of those dictionaries.
137 In order to make these templates independent from the parent's one,
138 it is necessary re-initialize the corresponding dictionaries.
139 This is the objective of the _clsinit method
141 # static template for resource attributes
142 cls._attributes = dict()
143 cls._register_attributes()
145 # static template for resource traces
147 cls._register_traces()
150 def _clsinit_copy(cls):
151 """ Same as _clsinit, except that it also inherits all attributes and traces
152 from the parent class.
154 # static template for resource attributes
155 cls._attributes = copy.deepcopy(cls._attributes)
156 cls._register_attributes()
158 # static template for resource traces
159 cls._traces = copy.deepcopy(cls._traces)
160 cls._register_traces()
167 def get_attributes(cls):
168 """ Returns a copy of the attributes
171 return copy.deepcopy(cls._attributes.values())
175 """ Returns a copy of the traces
178 return copy.deepcopy(cls._traces.values())
180 def __init__(self, ec, guid):
182 self._ec = weakref.ref(ec)
183 self._connections = set()
184 self._conditions = dict()
186 # the resource instance gets a copy of all attributes
187 self._attrs = copy.deepcopy(self._attributes)
189 # the resource instance gets a copy of all traces
190 self._trcs = copy.deepcopy(self._traces)
192 self._state = ResourceState.NEW
194 self._start_time = None
195 self._stop_time = None
196 self._discover_time = None
197 self._provision_time = None
198 self._ready_time = None
199 self._release_time = None
202 self._logger = logging.getLogger("Resource")
204 def debug(self, msg, out = None, err = None):
205 self.log(msg, logging.DEBUG, out, err)
207 def error(self, msg, out = None, err = None):
208 self.log(msg, logging.ERROR, out, err)
210 def warn(self, msg, out = None, err = None):
211 self.log(msg, logging.WARNING, out, err)
213 def info(self, msg, out = None, err = None):
214 self.log(msg, logging.INFO, out, err)
216 def log(self, msg, level, out = None, err = None):
218 msg += " - OUT: %s " % out
221 msg += " - ERROR: %s " % err
223 msg = self.log_message(msg)
225 self.logger.log(level, msg)
227 def log_message(self, msg):
228 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
243 def connections(self):
244 return self._connections
247 def conditions(self):
248 return self._conditions
251 def start_time(self):
252 """ Returns timestamp with the time the RM started """
253 return self._start_time
257 """ Returns timestamp with the time the RM stopped """
258 return self._stop_time
261 def discover_time(self):
262 """ Returns timestamp with the time the RM passed to state discovered """
263 return self._discover_time
266 def provision_time(self):
267 """ Returns timestamp with the time the RM passed to state provisioned """
268 return self._provision_time
271 def ready_time(self):
272 """ Returns timestamp with the time the RM passed to state ready """
273 return self._ready_time
276 def release_time(self):
277 """ Returns timestamp with the time the RM was released """
278 return self._release_time
284 def connect(self, guid):
285 if self.valid_connection(guid):
286 self._connections.add(guid)
289 self._discover_time = strfnow()
290 self._state = ResourceState.DISCOVERED
293 self._provision_time = strfnow()
294 self._state = ResourceState.PROVISIONED
297 """ Start the Resource Manager
300 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
301 self.error("Wrong state %s for start" % self.state)
304 self._start_time = strfnow()
305 self._state = ResourceState.STARTED
308 """ Start the Resource Manager
311 if not self._state in [ResourceState.STARTED]:
312 self.error("Wrong state %s for stop" % self.state)
315 self._stop_time = strfnow()
316 self._state = ResourceState.STOPPED
318 def set(self, name, value):
319 """ Set the value of the attribute
321 :param name: Name of the attribute
323 :param name: Value of the attribute
326 attr = self._attrs[name]
330 """ Start the Resource Manager
332 :param name: Name of the attribute
336 attr = self._attrs[name]
339 def register_trace(self, name):
342 :param name: Name of the trace
345 trace = self._trcs[name]
348 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
349 """ Get information on collected trace
351 :param name: Name of the trace
354 :param attr: Can be one of:
355 - TraceAttr.ALL (complete trace content),
356 - TraceAttr.STREAM (block in bytes to read starting at offset),
357 - TraceAttr.PATH (full path to the trace file),
358 - TraceAttr.SIZE (size of trace file).
361 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
364 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
371 def register_condition(self, action, group, state,
373 """ Registers a condition on the resource manager to allow execution
374 of 'action' only after 'time' has elapsed from the moment all resources
375 in 'group' reached state 'state'
377 :param action: Action to restrict to condition (either 'START' or 'STOP')
379 :param group: Group of RMs to wait for (list of guids)
380 :type group: int or list of int
381 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
383 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
387 conditions = self.conditions.get(action)
390 self._conditions[action] = conditions
392 # For each condition to register a tuple of (group, state, time) is
393 # added to the 'action' list
394 if not isinstance(group, list):
397 conditions.append((group, state, time))
399 def get_connected(self, rtype):
401 for guid in self.connections:
402 rm = self.ec.get_resource(guid)
403 if rm.rtype() == rtype:
407 def _needs_reschedule(self, group, state, time):
408 """ Internal method that verify if 'time' has elapsed since
409 all elements in 'group' have reached state 'state'.
411 :param group: Group of RMs to wait for (list of guids)
412 :type group: int or list of int
413 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
415 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
418 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
419 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
420 For the moment, 2m30s is not a correct syntax.
424 delay = reschedule_delay
426 # check state and time elapsed on all RMs
428 rm = self.ec.get_resource(guid)
429 # If the RM state is lower than the requested state we must
430 # reschedule (e.g. if RM is READY but we required STARTED)
435 # If there is a time restriction, we must verify the
436 # restriction is satisfied
438 if state == ResourceState.DISCOVERED:
440 if state == ResourceState.PROVISIONED:
441 t = rm.provision_time
442 elif state == ResourceState.READY:
444 elif state == ResourceState.STARTED:
446 elif state == ResourceState.STOPPED:
449 # Only keep time information for START and STOP
452 d = strfdiff(strfnow(), t)
453 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
458 return reschedule, delay
460 def set_with_conditions(self, name, value, group, state, time):
461 """ Set value 'value' on attribute with name 'name' when 'time'
462 has elapsed since all elements in 'group' have reached state
465 :param name: Name of the attribute to set
467 :param name: Value of the attribute to set
469 :param group: Group of RMs to wait for (list of guids)
470 :type group: int or list of int
471 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
473 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
479 delay = reschedule_delay
481 ## evaluate if set conditions are met
483 # only can set with conditions after the RM is started
484 if self.state != ResourceState.STARTED:
487 reschedule, delay = self._needs_reschedule(group, state, time)
490 callback = functools.partial(self.set_with_conditions,
491 name, value, group, state, time)
492 self.ec.schedule(delay, callback)
494 self.set(name, value)
496 def start_with_conditions(self):
497 """ Starts RM when all the conditions in self.conditions for
498 action 'START' are satisfied.
502 delay = reschedule_delay
504 ## evaluate if set conditions are met
506 # only can start when RM is either STOPPED or READY
507 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
509 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
511 start_conditions = self.conditions.get(ResourceAction.START, [])
513 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
515 # Verify all start conditions are met
516 for (group, state, time) in start_conditions:
517 # Uncomment for debug
520 # rm = self.ec.get_resource(guid)
521 # unmet.append((guid, rm._state))
523 #self.debug("---- WAITED STATES ---- %s" % unmet )
525 reschedule, delay = self._needs_reschedule(group, state, time)
530 self.ec.schedule(delay, self.start_with_conditions)
532 self.debug("----- STARTING ---- ")
535 def stop_with_conditions(self):
536 """ Stops RM when all the conditions in self.conditions for
537 action 'STOP' are satisfied.
541 delay = reschedule_delay
543 ## evaluate if set conditions are met
545 # only can stop when RM is STARTED
546 if self.state != ResourceState.STARTED:
549 self.debug(" ---- STOP CONDITIONS ---- %s" %
550 self.conditions.get(ResourceAction.STOP))
552 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
553 for (group, state, time) in stop_conditions:
554 reschedule, delay = self._needs_reschedule(group, state, time)
559 callback = functools.partial(self.stop_with_conditions)
560 self.ec.schedule(delay, callback)
562 self.logger.debug(" ----- STOPPING ---- ")
566 """ Execute all steps required for the RM to reach the state READY
569 if self._state > ResourceState.READY:
570 self.error("Wrong state %s for deploy" % self.state)
573 self.debug("----- DEPLOYING ---- ")
574 self._ready_time = strfnow()
575 self._state = ResourceState.READY
578 """Clean the resource at the end of the Experiment and change the status
581 self._release_time = strfnow()
582 self._state = ResourceState.RELEASED
584 def valid_connection(self, guid):
585 """Check if the connection is available.
587 :param guid: Guid of the current Resource Manager
595 class ResourceFactory(object):
596 _resource_types = dict()
599 def resource_types(cls):
600 return cls._resource_types
603 def register_type(cls, rclass):
604 cls._resource_types[rclass.rtype()] = rclass
607 def create(cls, rtype, ec, guid):
608 rclass = cls._resource_types[rtype]
609 return rclass(ec, guid)
611 def populate_factory():
612 for rclass in find_types():
613 ResourceFactory.register_type(rclass)
616 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
617 search_path = set(search_path.split(" "))
619 import nepi.resources
620 path = os.path.dirname(nepi.resources.__file__)
621 search_path.add(path)
625 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
626 loader = importer.find_module(modname)
628 module = loader.load_module(loader.fullname)
629 for attrname in dir(module):
630 if attrname.startswith("_"):
633 attr = getattr(module, attrname)
635 if attr == ResourceManager:
638 if not inspect.isclass(attr):
641 if issubclass(attr, ResourceManager):
645 err = traceback.format_exc()
646 logger = logging.getLogger("Resource.find_types()")
647 logger.error("Error while lading Resource Managers %s" % err)