1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2 from neco.execution.trace import TraceAttr
12 reschedule_delay = "0.5s"
34 # Decorator to invoke class initialization method
36 class ResourceManager(object):
43 def _register_filter(cls, attr):
44 """ Resource subclasses will invoke this method to add a
48 cls._filters[attr.name] = attr
51 def _register_attribute(cls, attr):
52 """ Resource subclasses will invoke this method to add a
56 cls._attributes[attr.name] = attr
59 def _register_trace(cls, trace):
60 """ Resource subclasses will invoke this method to add a
64 cls._traces[trace.name] = trace
68 def _register_filters(cls):
69 """ Resource subclasses will invoke this method to register
76 def _register_attributes(cls):
77 """ Resource subclasses will invoke this method to register
84 def _register_traces(cls):
85 """ Resource subclasses will invoke this method to register
93 """ Create a new dictionnary instance of the dictionnary
94 with the same template.
96 Each ressource should have the same registration dictionary
97 template with different instances.
99 # static template for resource filters
100 cls._filters = dict()
101 cls._register_filters()
103 # static template for resource attributes
104 cls._attributes = dict()
105 cls._register_attributes()
107 # static template for resource traces
109 cls._register_traces()
116 def get_filters(cls):
117 """ Returns a copy of the filters
120 return copy.deepcopy(cls._filters.values())
123 def get_attributes(cls):
124 """ Returns a copy of the attributes
127 return copy.deepcopy(cls._attributes.values())
131 """ Returns a copy of the traces
134 return copy.deepcopy(cls._traces.values())
136 def __init__(self, ec, guid):
138 self._ec = weakref.ref(ec)
139 self._connections = set()
140 self._conditions = dict()
142 # the resource instance gets a copy of all attributes
143 self._attrs = copy.deepcopy(self._attributes)
145 # the resource instance gets a copy of all traces
146 self._trcs = copy.deepcopy(self._traces)
148 self._state = ResourceState.NEW
150 self._start_time = None
151 self._stop_time = None
152 self._discover_time = None
153 self._provision_time = None
154 self._ready_time = None
155 self._release_time = None
158 self._logger = logging.getLogger("Resource")
160 def debug(self, msg, out = None, err = None):
161 self.log(msg, logging.DEBUG, out, err)
163 def error(self, msg, out = None, err = None):
164 self.log(msg, logging.ERROR, out, err)
166 def warn(self, msg, out = None, err = None):
167 self.log(msg, logging.WARNING, out, err)
169 def info(self, msg, out = None, err = None):
170 self.log(msg, logging.INFO, out, err)
172 def log(self, msg, level, out = None, err = None):
174 msg += " - OUT: %s " % out
177 msg += " - ERROR: %s " % err
179 msg = self.log_message(msg)
181 self.logger.log(level, msg)
183 def log_message(self, msg):
184 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
199 def connections(self):
200 return self._connections
203 def conditions(self):
204 return self._conditions
207 def start_time(self):
208 """ Returns timestamp with the time the RM started """
209 return self._start_time
213 """ Returns timestamp with the time the RM stopped """
214 return self._stop_time
217 def discover_time(self):
218 """ Returns timestamp with the time the RM passed to state discovered """
219 return self._discover_time
222 def provision_time(self):
223 """ Returns timestamp with the time the RM passed to state provisioned """
224 return self._provision_time
227 def ready_time(self):
228 """ Returns timestamp with the time the RM passed to state ready """
229 return self._ready_time
232 def release_time(self):
233 """ Returns timestamp with the time the RM was released """
234 return self._release_time
240 def connect(self, guid):
241 if self.valid_connection(guid):
242 self._connections.add(guid)
244 def discover(self, filters = None):
245 self._discover_time = strfnow()
246 self._state = ResourceState.DISCOVERED
248 def provision(self, filters = None):
249 self._provision_time = strfnow()
250 self._state = ResourceState.PROVISIONED
253 """ Start the Resource Manager
256 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
257 self.error("Wrong state %s for start" % self.state)
260 self._start_time = strfnow()
261 self._state = ResourceState.STARTED
264 """ Start the Resource Manager
267 if not self._state in [ResourceState.STARTED]:
268 self.error("Wrong state %s for stop" % self.state)
271 self._stop_time = strfnow()
272 self._state = ResourceState.STOPPED
274 def set(self, name, value):
275 """ Set the value of the attribute
277 :param name: Name of the attribute
279 :param name: Value of the attribute
282 attr = self._attrs[name]
286 """ Start the Resource Manager
288 :param name: Name of the attribute
292 attr = self._attrs[name]
295 def register_trace(self, name):
298 :param name: Name of the trace
301 trace = self._trcs[name]
304 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
305 """ Get information on collected trace
307 :param name: Name of the trace
310 :param attr: Can be one of:
311 - TraceAttr.ALL (complete trace content),
312 - TraceAttr.STREAM (block in bytes to read starting at offset),
313 - TraceAttr.PATH (full path to the trace file),
314 - TraceAttr.SIZE (size of trace file).
317 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
320 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
327 def register_condition(self, action, group, state,
329 """ Registers a condition on the resource manager to allow execution
330 of 'action' only after 'time' has elapsed from the moment all resources
331 in 'group' reached state 'state'
333 :param action: Action to restrict to condition (either 'START' or 'STOP')
335 :param group: Group of RMs to wait for (list of guids)
336 :type group: int or list of int
337 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
339 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
343 conditions = self.conditions.get(action)
346 self._conditions[action] = conditions
348 # For each condition to register a tuple of (group, state, time) is
349 # added to the 'action' list
350 if not isinstance(group, list):
353 conditions.append((group, state, time))
355 def get_connected(self, rtype):
357 for guid in self.connections:
358 rm = self.ec.get_resource(guid)
359 if rm.rtype() == rtype:
363 def _needs_reschedule(self, group, state, time):
364 """ Internal method that verify if 'time' has elapsed since
365 all elements in 'group' have reached state 'state'.
367 :param group: Group of RMs to wait for (list of guids)
368 :type group: int or list of int
369 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
371 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
374 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
375 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
376 For the moment, 2m30s is not a correct syntax.
380 delay = reschedule_delay
382 # check state and time elapsed on all RMs
384 rm = self.ec.get_resource(guid)
385 # If the RM state is lower than the requested state we must
386 # reschedule (e.g. if RM is READY but we required STARTED)
391 # If there is a time restriction, we must verify the
392 # restriction is satisfied
394 if state == ResourceState.DISCOVERED:
396 if state == ResourceState.PROVISIONED:
397 t = rm.provision_time
398 elif state == ResourceState.READY:
400 elif state == ResourceState.STARTED:
402 elif state == ResourceState.STOPPED:
405 # Only keep time information for START and STOP
408 d = strfdiff(strfnow(), t)
409 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
414 return reschedule, delay
416 def set_with_conditions(self, name, value, group, state, time):
417 """ Set value 'value' on attribute with name 'name' when 'time'
418 has elapsed since all elements in 'group' have reached state
421 :param name: Name of the attribute to set
423 :param name: Value of the attribute to set
425 :param group: Group of RMs to wait for (list of guids)
426 :type group: int or list of int
427 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
429 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
435 delay = reschedule_delay
437 ## evaluate if set conditions are met
439 # only can set with conditions after the RM is started
440 if self.state != ResourceState.STARTED:
443 reschedule, delay = self._needs_reschedule(group, state, time)
446 callback = functools.partial(self.set_with_conditions,
447 name, value, group, state, time)
448 self.ec.schedule(delay, callback)
450 self.set(name, value)
452 def start_with_conditions(self):
453 """ Starts RM when all the conditions in self.conditions for
454 action 'START' are satisfied.
458 delay = reschedule_delay
460 ## evaluate if set conditions are met
462 # only can start when RM is either STOPPED or READY
463 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
465 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
467 start_conditions = self.conditions.get(ResourceAction.START, [])
469 self.debug("---- START CONDITIONS ---- %s" % start_conditions)
471 # Verify all start conditions are met
472 for (group, state, time) in start_conditions:
473 # Uncomment for debug
476 # rm = self.ec.get_resource(guid)
477 # unmet.append((guid, rm._state))
479 #self.debug("---- WAITED STATES ---- %s" % unmet )
481 reschedule, delay = self._needs_reschedule(group, state, time)
486 self.ec.schedule(delay, self.start_with_conditions)
488 self.debug("----- STARTING ---- ")
491 def stop_with_conditions(self):
492 """ Stops RM when all the conditions in self.conditions for
493 action 'STOP' are satisfied.
497 delay = reschedule_delay
499 ## evaluate if set conditions are met
501 # only can stop when RM is STARTED
502 if self.state != ResourceState.STARTED:
505 self.debug(" ---- STOP CONDITIONS ---- %s" %
506 self.conditions.get(ResourceAction.STOP))
508 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
509 for (group, state, time) in stop_conditions:
510 reschedule, delay = self._needs_reschedule(group, state, time)
515 callback = functools.partial(self.stop_with_conditions)
516 self.ec.schedule(delay, callback)
518 self.logger.debug(" ----- STOPPING ---- ")
522 """ Execute all steps required for the RM to reach the state READY
525 if self._state > ResourceState.READY:
526 self.error("Wrong state %s for deploy" % self.state)
529 self.debug("----- DEPLOYING ---- ")
530 self._ready_time = strfnow()
531 self._state = ResourceState.READY
534 """Clean the resource at the end of the Experiment and change the status
537 self._release_time = strfnow()
538 self._state = ResourceState.RELEASED
540 def valid_connection(self, guid):
541 """Check if the connection is available.
543 :param guid: Guid of the current Resource Manager
551 class ResourceFactory(object):
552 _resource_types = dict()
555 def resource_types(cls):
556 return cls._resource_types
559 def register_type(cls, rclass):
560 cls._resource_types[rclass.rtype()] = rclass
563 def create(cls, rtype, ec, guid):
564 rclass = cls._resource_types[rtype]
565 return rclass(ec, guid)
567 def populate_factory():
568 for rclass in find_types():
569 ResourceFactory.register_type(rclass)
572 search_path = os.environ.get("NECO_SEARCH_PATH", "")
573 search_path = set(search_path.split(" "))
575 import neco.resources
576 path = os.path.dirname(neco.resources.__file__)
577 search_path.add(path)
581 for importer, modname, ispkg in pkgutil.walk_packages(search_path):
582 loader = importer.find_module(modname)
584 module = loader.load_module(loader.fullname)
585 for attrname in dir(module):
586 if attrname.startswith("_"):
589 attr = getattr(module, attrname)
591 if attr == ResourceManager:
594 if not inspect.isclass(attr):
597 if issubclass(attr, ResourceManager):
601 err = traceback.format_exc()
602 logger = logging.getLogger("Resource.find_types()")
603 logger.error("Error while lading Resource Managers %s" % err)