2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.execution.trace import TraceAttr
31 reschedule_delay = "0.5s"
53 # Decorator to invoke class initialization method
55 class ResourceManager(object):
62 def _register_filter(cls, attr):
63 """ Resource subclasses will invoke this method to add a
67 cls._filters[attr.name] = attr
70 def _register_attribute(cls, attr):
71 """ Resource subclasses will invoke this method to add a
75 cls._attributes[attr.name] = attr
78 def _register_trace(cls, trace):
79 """ Resource subclasses will invoke this method to add a
83 cls._traces[trace.name] = trace
87 def _register_filters(cls):
88 """ Resource subclasses will invoke this method to register
95 def _register_attributes(cls):
96 """ Resource subclasses will invoke this method to register
103 def _register_traces(cls):
104 """ Resource subclasses will invoke this method to register
112 """ Create a new dictionnary instance of the dictionnary
113 with the same template.
115 Each ressource should have the same registration dictionary
116 template with different instances.
118 # static template for resource filters
119 cls._filters = dict()
120 cls._register_filters()
122 # static template for resource attributes
123 cls._attributes = dict()
124 cls._register_attributes()
126 # static template for resource traces
128 cls._register_traces()
135 def get_filters(cls):
136 """ Returns a copy of the filters
139 return copy.deepcopy(cls._filters.values())
142 def get_attributes(cls):
143 """ Returns a copy of the attributes
146 return copy.deepcopy(cls._attributes.values())
150 """ Returns a copy of the traces
153 return copy.deepcopy(cls._traces.values())
155 def __init__(self, ec, guid):
157 self._ec = weakref.ref(ec)
158 self._connections = set()
159 self._conditions = dict()
161 # the resource instance gets a copy of all attributes
162 self._attrs = copy.deepcopy(self._attributes)
164 # the resource instance gets a copy of all traces
165 self._trcs = copy.deepcopy(self._traces)
167 self._state = ResourceState.NEW
169 self._start_time = None
170 self._stop_time = None
171 self._discover_time = None
172 self._provision_time = None
173 self._ready_time = None
174 self._release_time = None
177 self._logger = logging.getLogger("Resource")
179 def debug(self, msg, out = None, err = None):
180 self.log(msg, logging.DEBUG, out, err)
182 def error(self, msg, out = None, err = None):
183 self.log(msg, logging.ERROR, out, err)
185 def warn(self, msg, out = None, err = None):
186 self.log(msg, logging.WARNING, out, err)
188 def info(self, msg, out = None, err = None):
189 self.log(msg, logging.INFO, out, err)
191 def log(self, msg, level, out = None, err = None):
193 msg += " - OUT: %s " % out
196 msg += " - ERROR: %s " % err
198 msg = self.log_message(msg)
200 self.logger.log(level, msg)
202 def log_message(self, msg):
203 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
218 def connections(self):
219 return self._connections
222 def conditions(self):
223 return self._conditions
226 def start_time(self):
227 """ Returns timestamp with the time the RM started """
228 return self._start_time
232 """ Returns timestamp with the time the RM stopped """
233 return self._stop_time
236 def discover_time(self):
237 """ Returns timestamp with the time the RM passed to state discovered """
238 return self._discover_time
241 def provision_time(self):
242 """ Returns timestamp with the time the RM passed to state provisioned """
243 return self._provision_time
246 def ready_time(self):
247 """ Returns timestamp with the time the RM passed to state ready """
248 return self._ready_time
251 def release_time(self):
252 """ Returns timestamp with the time the RM was released """
253 return self._release_time
259 def connect(self, guid):
260 if self.valid_connection(guid):
261 self._connections.add(guid)
263 def discover(self, filters = None):
264 self._discover_time = strfnow()
265 self._state = ResourceState.DISCOVERED
267 def provision(self, filters = None):
268 self._provision_time = strfnow()
269 self._state = ResourceState.PROVISIONED
272 """ Start the Resource Manager
275 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
276 self.error("Wrong state %s for start" % self.state)
279 self._start_time = strfnow()
280 self._state = ResourceState.STARTED
283 """ Start the Resource Manager
286 if not self._state in [ResourceState.STARTED]:
287 self.error("Wrong state %s for stop" % self.state)
290 self._stop_time = strfnow()
291 self._state = ResourceState.STOPPED
293 def set(self, name, value):
294 """ Set the value of the attribute
296 :param name: Name of the attribute
298 :param name: Value of the attribute
301 attr = self._attrs[name]
305 """ Start the Resource Manager
307 :param name: Name of the attribute
311 attr = self._attrs[name]
314 def register_trace(self, name):
317 :param name: Name of the trace
320 trace = self._trcs[name]
323 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
324 """ Get information on collected trace
326 :param name: Name of the trace
329 :param attr: Can be one of:
330 - TraceAttr.ALL (complete trace content),
331 - TraceAttr.STREAM (block in bytes to read starting at offset),
332 - TraceAttr.PATH (full path to the trace file),
333 - TraceAttr.SIZE (size of trace file).
336 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
339 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
346 def register_condition(self, action, group, state,
348 """ Registers a condition on the resource manager to allow execution
349 of 'action' only after 'time' has elapsed from the moment all resources
350 in 'group' reached state 'state'
352 :param action: Action to restrict to condition (either 'START' or 'STOP')
354 :param group: Group of RMs to wait for (list of guids)
355 :type group: int or list of int
356 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
358 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
362 conditions = self.conditions.get(action)
365 self._conditions[action] = conditions
367 # For each condition to register a tuple of (group, state, time) is
368 # added to the 'action' list
369 if not isinstance(group, list):
372 conditions.append((group, state, time))
374 def get_connected(self, rtype):
376 for guid in self.connections:
377 rm = self.ec.get_resource(guid)
378 if rm.rtype() == rtype:
382 def _needs_reschedule(self, group, state, time):
383 """ Internal method that verify if 'time' has elapsed since
384 all elements in 'group' have reached state 'state'.
386 :param group: Group of RMs to wait for (list of guids)
387 :type group: int or list of int
388 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
390 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
393 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
394 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
395 For the moment, 2m30s is not a correct syntax.
399 delay = reschedule_delay
401 # check state and time elapsed on all RMs
403 rm = self.ec.get_resource(guid)
404 # If the RM state is lower than the requested state we must
405 # reschedule (e.g. if RM is READY but we required STARTED)
410 # If there is a time restriction, we must verify the
411 # restriction is satisfied
413 if state == ResourceState.DISCOVERED:
415 if state == ResourceState.PROVISIONED:
416 t = rm.provision_time
417 elif state == ResourceState.READY:
419 elif state == ResourceState.STARTED:
421 elif state == ResourceState.STOPPED:
424 # Only keep time information for START and STOP
427 d = strfdiff(strfnow(), t)
428 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
433 return reschedule, delay
435 def set_with_conditions(self, name, value, group, state, time):
436 """ Set value 'value' on attribute with name 'name' when 'time'
437 has elapsed since all elements in 'group' have reached state
440 :param name: Name of the attribute to set
442 :param name: Value of the attribute to set
444 :param group: Group of RMs to wait for (list of guids)
445 :type group: int or list of int
446 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
448 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
454 delay = reschedule_delay
456 ## evaluate if set conditions are met
458 # only can set with conditions after the RM is started
459 if self.state != ResourceState.STARTED:
462 reschedule, delay = self._needs_reschedule(group, state, time)
465 callback = functools.partial(self.set_with_conditions,
466 name, value, group, state, time)
467 self.ec.schedule(delay, callback)
469 self.set(name, value)
471 def start_with_conditions(self):
472 """ Starts RM when all the conditions in self.conditions for
473 action 'START' are satisfied.
477 delay = reschedule_delay
479 ## evaluate if set conditions are met
481 # only can start when RM is either STOPPED or READY
482 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
484 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
486 start_conditions = self.conditions.get(ResourceAction.START, [])
488 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
490 # Verify all start conditions are met
491 for (group, state, time) in start_conditions:
492 # Uncomment for debug
495 # rm = self.ec.get_resource(guid)
496 # unmet.append((guid, rm._state))
498 #self.debug("---- WAITED STATES ---- %s" % unmet )
500 reschedule, delay = self._needs_reschedule(group, state, time)
505 self.ec.schedule(delay, self.start_with_conditions)
507 self.debug("----- STARTING ---- ")
510 def stop_with_conditions(self):
511 """ Stops RM when all the conditions in self.conditions for
512 action 'STOP' are satisfied.
516 delay = reschedule_delay
518 ## evaluate if set conditions are met
520 # only can stop when RM is STARTED
521 if self.state != ResourceState.STARTED:
524 self.debug(" ---- STOP CONDITIONS ---- %s" %
525 self.conditions.get(ResourceAction.STOP))
527 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
528 for (group, state, time) in stop_conditions:
529 reschedule, delay = self._needs_reschedule(group, state, time)
534 callback = functools.partial(self.stop_with_conditions)
535 self.ec.schedule(delay, callback)
537 self.logger.debug(" ----- STOPPING ---- ")
541 """ Execute all steps required for the RM to reach the state READY
544 if self._state > ResourceState.READY:
545 self.error("Wrong state %s for deploy" % self.state)
548 self.debug("----- DEPLOYING ---- ")
549 self._ready_time = strfnow()
550 self._state = ResourceState.READY
553 """Clean the resource at the end of the Experiment and change the status
556 self._release_time = strfnow()
557 self._state = ResourceState.RELEASED
559 def valid_connection(self, guid):
560 """Check if the connection is available.
562 :param guid: Guid of the current Resource Manager
570 class ResourceFactory(object):
571 _resource_types = dict()
574 def resource_types(cls):
575 return cls._resource_types
578 def register_type(cls, rclass):
579 cls._resource_types[rclass.rtype()] = rclass
582 def create(cls, rtype, ec, guid):
583 rclass = cls._resource_types[rtype]
584 return rclass(ec, guid)
586 def populate_factory():
587 for rclass in find_types():
588 ResourceFactory.register_type(rclass)
591 search_path = os.environ.get("NEPI_SEARCH_PATH", "")
592 search_path = set(search_path.split(" "))
594 import nepi.resources
595 path = os.path.dirname(nepi.resources.__file__)
596 search_path.add(path)
600 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
601 loader = importer.find_module(modname)
603 module = loader.load_module(loader.fullname)
604 for attrname in dir(module):
605 if attrname.startswith("_"):
608 attr = getattr(module, attrname)
610 if attr == ResourceManager:
613 if not inspect.isclass(attr):
616 if issubclass(attr, ResourceManager):
620 err = traceback.format_exc()
621 logger = logging.getLogger("Resource.find_types()")
622 logger.error("Error while lading Resource Managers %s" % err)