1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2 from neco.execution.trace import TraceAttr
9 _reschedule_delay = "1s"
31 # Decorator to invoke class initialization method
33 class ResourceManager(object):
40 def _register_filter(cls, attr):
41 """ Resource subclasses will invoke this method to add a
45 cls._filters[attr.name] = attr
48 def _register_attribute(cls, attr):
49 """ Resource subclasses will invoke this method to add a
53 cls._attributes[attr.name] = attr
56 def _register_trace(cls, trace):
57 """ Resource subclasses will invoke this method to add a
61 cls._traces[trace.name] = trace
65 def _register_filters(cls):
66 """ Resource subclasses will invoke this method to register
73 def _register_attributes(cls):
74 """ Resource subclasses will invoke this method to register
81 def _register_traces(cls):
82 """ Resource subclasses will invoke this method to register
90 """ Create a new dictionnary instance of the dictionnary
91 with the same template.
93 Each ressource should have the same registration dictionary
94 template with different instances.
96 # static template for resource filters
98 cls._register_filters()
100 # static template for resource attributes
101 cls._attributes = dict()
102 cls._register_attributes()
104 # static template for resource traces
106 cls._register_traces()
113 def get_filters(cls):
114 """ Returns a copy of the filters
117 return copy.deepcopy(cls._filters.values())
120 def get_attributes(cls):
121 """ Returns a copy of the attributes
124 return copy.deepcopy(cls._attributes.values())
128 """ Returns a copy of the traces
131 return copy.deepcopy(cls._traces.values())
133 def __init__(self, ec, guid):
135 self._ec = weakref.ref(ec)
136 self._connections = set()
137 self._conditions = dict()
139 # the resource instance gets a copy of all attributes
140 self._attrs = copy.deepcopy(self._attributes)
142 # the resource instance gets a copy of all traces
143 self._trcs = copy.deepcopy(self._traces)
145 self._state = ResourceState.NEW
147 self._start_time = None
148 self._stop_time = None
149 self._discover_time = None
150 self._provision_time = None
151 self._ready_time = None
152 self._release_time = None
155 self._logger = logging.getLogger("Resource")
157 def debug(self, msg, out = None, err = None):
158 self.log(msg, logging.DEBUG, out, err)
160 def error(self, msg, out = None, err = None):
161 self.log(msg, logging.ERROR, out, err)
163 def warn(self, msg, out = None, err = None):
164 self.log(msg, logging.WARNING, out, err)
166 def info(self, msg, out = None, err = None):
167 self.log(msg, logging.INFO, out, err)
169 def log(self, msg, level, out = None, err = None):
171 msg += " - OUT: %s " % out
174 msg += " - ERROR: %s " % err
176 msg = self.log_message(msg)
178 self.logger.log(level, msg)
180 def log_message(self, msg):
181 return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
196 def connections(self):
197 return self._connections
200 def conditions(self):
201 return self._conditions
204 def start_time(self):
205 """ Returns timestamp with the time the RM started """
206 return self._start_time
210 """ Returns timestamp with the time the RM stopped """
211 return self._stop_time
214 def discover_time(self):
215 """ Returns timestamp with the time the RM passed to state discovered """
216 return self._discover_time
219 def provision_time(self):
220 """ Returns timestamp with the time the RM passed to state provisioned """
221 return self._provision_time
224 def ready_time(self):
225 """ Returns timestamp with the time the RM passed to state ready """
226 return self._ready_time
229 def release_time(self):
230 """ Returns timestamp with the time the RM was released """
231 return self._release_time
237 def connect(self, guid):
238 if self.valid_connection(guid):
239 self._connections.add(guid)
241 def discover(self, filters = None):
242 self._discover_time = strfnow()
243 self._state = ResourceState.DISCOVERED
245 def provision(self, filters = None):
246 self._provision_time = strfnow()
247 self._state = ResourceState.PROVISIONED
250 """ Start the Resource Manager
253 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
254 self.error("Wrong state %s for start" % self.state)
257 self._start_time = strfnow()
258 self._state = ResourceState.STARTED
261 """ Start the Resource Manager
264 if not self._state in [ResourceState.STARTED]:
265 self.error("Wrong state %s for stop" % self.state)
268 self._stop_time = strfnow()
269 self._state = ResourceState.STOPPED
271 def set(self, name, value):
272 """ Set the value of the attribute
274 :param name: Name of the attribute
276 :param name: Value of the attribute
279 attr = self._attrs[name]
283 """ Start the Resource Manager
285 :param name: Name of the attribute
289 attr = self._attrs[name]
292 def register_trace(self, name):
295 :param name: Name of the trace
298 trace = self._trcs[name]
301 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
302 """ Get information on collected trace
304 :param name: Name of the trace
307 :param attr: Can be one of:
308 - TraceAttr.ALL (complete trace content),
309 - TraceAttr.STREAM (block in bytes to read starting at offset),
310 - TraceAttr.PATH (full path to the trace file),
311 - TraceAttr.SIZE (size of trace file).
314 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
317 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
324 def register_condition(self, action, group, state,
326 """ Registers a condition on the resource manager to allow execution
327 of 'action' only after 'time' has elapsed from the moment all resources
328 in 'group' reached state 'state'
330 :param action: Action to restrict to condition (either 'START' or 'STOP')
332 :param group: Group of RMs to wait for (list of guids)
333 :type group: int or list of int
334 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
336 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
340 conditions = self.conditions.get(action)
343 self._conditions[action] = conditions
345 # For each condition to register a tuple of (group, state, time) is
346 # added to the 'action' list
347 if not isinstance(group, list):
350 conditions.append((group, state, time))
352 def get_connected(self, rtype):
354 for guid in self.connections:
355 rm = self.ec.get_resource(guid)
356 if rm.rtype() == rtype:
360 def _needs_reschedule(self, group, state, time):
361 """ Internal method that verify if 'time' has elapsed since
362 all elements in 'group' have reached state 'state'.
364 :param group: Group of RMs to wait for (list of guids)
365 :type group: int or list of int
366 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
368 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
371 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
372 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
373 For the moment, 2m30s is not a correct syntax.
377 delay = _reschedule_delay
379 # check state and time elapsed on all RMs
381 rm = self.ec.get_resource(guid)
382 # If the RM state is lower than the requested state we must
383 # reschedule (e.g. if RM is READY but we required STARTED)
388 # If there is a time restriction, we must verify the
389 # restriction is satisfied
391 if state == ResourceState.DISCOVERED:
393 if state == ResourceState.PROVISIONED:
394 t = rm.provision_time
395 elif state == ResourceState.READY:
397 elif state == ResourceState.STARTED:
399 elif state == ResourceState.STOPPED:
402 # Only keep time information for START and STOP
405 d = strfdiff(strfnow(), t)
406 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
411 return reschedule, delay
413 def set_with_conditions(self, name, value, group, state, time):
414 """ Set value 'value' on attribute with name 'name' when 'time'
415 has elapsed since all elements in 'group' have reached state
418 :param name: Name of the attribute to set
420 :param name: Value of the attribute to set
422 :param group: Group of RMs to wait for (list of guids)
423 :type group: int or list of int
424 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
426 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
432 delay = _reschedule_delay
434 ## evaluate if set conditions are met
436 # only can set with conditions after the RM is started
437 if self.state != ResourceState.STARTED:
440 reschedule, delay = self._needs_reschedule(group, state, time)
443 callback = functools.partial(self.set_with_conditions,
444 name, value, group, state, time)
445 self.ec.schedule(delay, callback)
447 self.set(name, value)
449 def start_with_conditions(self):
450 """ Starts RM when all the conditions in self.conditions for
451 action 'START' are satisfied.
455 delay = _reschedule_delay
457 ## evaluate if set conditions are met
459 # only can start when RM is either STOPPED or READY
460 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
462 self.debug("---- RESCHEDULING START ---- state %s " % self.state )
464 self.debug("---- START CONDITIONS ---- %s" %
465 self.conditions.get(ResourceAction.START))
467 # Verify all start conditions are met
468 start_conditions = self.conditions.get(ResourceAction.START, [])
469 for (group, state, time) in start_conditions:
470 reschedule, delay = self._needs_reschedule(group, state, time)
475 self.ec.schedule(delay, self.start_with_conditions)
477 self.debug("----- STARTING ---- ")
480 def stop_with_conditions(self):
481 """ Stops RM when all the conditions in self.conditions for
482 action 'STOP' are satisfied.
486 delay = _reschedule_delay
488 ## evaluate if set conditions are met
490 # only can stop when RM is STARTED
491 if self.state != ResourceState.STARTED:
494 self.debug(" ---- STOP CONDITIONS ---- %s" %
495 self.conditions.get(ResourceAction.STOP))
497 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
498 for (group, state, time) in stop_conditions:
499 reschedule, delay = self._needs_reschedule(group, state, time)
504 callback = functools.partial(self.stop_with_conditions)
505 self.ec.schedule(delay, callback)
507 self.logger.debug(" ----- STOPPING ---- ")
511 """ Execute all steps required for the RM to reach the state READY
514 if self._state > ResourceState.READY:
515 self.error("Wrong state %s for deploy" % self.state)
518 self.debug("----- DEPLOYING ---- ")
519 self._ready_time = strfnow()
520 self._state = ResourceState.READY
523 """Clean the resource at the end of the Experiment and change the status
526 self._release_time = strfnow()
527 self._state = ResourceState.RELEASED
529 def valid_connection(self, guid):
530 """Check if the connection is available.
532 :param guid: Guid of the current Resource Manager
540 class ResourceFactory(object):
541 _resource_types = dict()
544 def resource_types(cls):
545 return cls._resource_types
548 def register_type(cls, rclass):
549 cls._resource_types[rclass.rtype()] = rclass
552 def create(cls, rtype, ec, guid):
553 rclass = cls._resource_types[rtype]
554 return rclass(ec, guid)