1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
8 _reschedule_delay = "1s"
29 # Decorator to invoke class initialization method
31 class ResourceManager(object):
37 def _register_filter(cls, attr):
38 """ Resource subclasses will invoke this method to add a
42 cls._filters[attr.name] = attr
45 def _register_attribute(cls, attr):
46 """ Resource subclasses will invoke this method to add a
50 cls._attributes[attr.name] = attr
53 def _register_filters(cls):
54 """ Resource subclasses will invoke this method to add a
61 def _register_attributes(cls):
62 """ Resource subclasses will invoke this method to add a
70 """ Create a new dictionnary instance of the dictionnary
71 with the same template.
73 Each ressource should have the same registration dictionary
74 template with different instances.
76 # static template for resource filters
78 cls._register_filters()
80 # static template for resource attributes
81 cls._attributes = dict()
82 cls._register_attributes()
90 """ Returns a copy of the filters
93 return copy.deepcopy(cls._filters.values())
96 def get_attributes(cls):
97 """ Returns a copy of the attributes
100 return copy.deepcopy(cls._attributes.values())
102 def __init__(self, ec, guid):
104 self._ec = weakref.ref(ec)
105 self._connections = set()
106 self._conditions = dict()
108 # the resource instance gets a copy of all attributes
110 self._attrs = copy.deepcopy(self._attributes)
112 self._state = ResourceState.NEW
114 self._start_time = None
115 self._stop_time = None
116 self._discover_time = None
117 self._provision_time = None
118 self._ready_time = None
119 self._release_time = None
122 self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " % (self._rtype, self.guid))
137 def connections(self):
138 return self._connections
141 def conditions(self):
142 return self._conditions
145 def start_time(self):
146 """ Returns timestamp with the time the RM started """
147 return self._start_time
151 """ Returns timestamp with the time the RM stopped """
152 return self._stop_time
155 def discover_time(self):
156 """ Returns timestamp with the time the RM passed to state discovered """
157 return self._discover_time
160 def provision_time(self):
161 """ Returns timestamp with the time the RM passed to state provisioned """
162 return self._provision_time
165 def ready_time(self):
166 """ Returns timestamp with the time the RM passed to state ready """
167 return self._ready_time
170 def release_time(self):
171 """ Returns timestamp with the time the RM was released """
172 return self._release_time
178 def connect(self, guid):
179 if self.valid_connection(guid):
180 self._connections.add(guid)
182 def discover(self, filters = None):
183 self._discover_time = strfnow()
184 self._state = ResourceState.DISCOVERED
186 def provision(self, filters = None):
187 self._provision_time = strfnow()
188 self._state = ResourceState.PROVISIONED
191 """ Start the Resource Manager
194 if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
195 self.logger.error("Wrong state %s for start" % self.state)
198 self._start_time = strfnow()
199 self._state = ResourceState.STARTED
202 """ Start the Resource Manager
205 if not self._state in [ResourceState.STARTED]:
206 self.logger.error("Wrong state %s for stop" % self.state)
209 self._stop_time = strfnow()
210 self._state = ResourceState.STOPPED
212 def set(self, name, value):
213 """ Set the value of the attribute
215 :param name: Name of the attribute
217 :param name: Value of the attribute
221 attr = self._attrs[name]
225 """ Start the Resource Manager
227 :param name: Name of the attribute
231 attr = self._attrs[name]
234 def register_condition(self, action, group, state,
236 """ Registers a condition on the resource manager to allow execution
237 of 'action' only after 'time' has elapsed from the moment all resources
238 in 'group' reached state 'state'
240 :param action: Action to restrict to condition (either 'START' or 'STOP')
242 :param group: Group of RMs to wait for (list of guids)
243 :type group: int or list of int
244 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
246 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
250 conditions = self.conditions.get(action)
253 self._conditions[action] = conditions
255 # For each condition to register a tuple of (group, state, time) is
256 # added to the 'action' list
257 if not isinstance(group, list):
260 conditions.append((group, state, time))
262 def _needs_reschedule(self, group, state, time):
263 """ Internal method that verify if 'time' has elapsed since
264 all elements in 'group' have reached state 'state'.
266 :param group: Group of RMs to wait for (list of guids)
267 :type group: int or list of int
268 :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
270 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
273 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
274 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
275 For the moment, 2m30s is not a correct syntax.
279 delay = _reschedule_delay
281 # check state and time elapsed on all RMs
283 rm = self.ec.get_resource(guid)
284 # If the RM state is lower than the requested state we must
285 # reschedule (e.g. if RM is READY but we required STARTED)
290 # If there is a time restriction, we must verify the
291 # restriction is satisfied
293 if state == ResourceState.DISCOVERED:
295 if state == ResourceState.PROVISIONED:
296 t = rm.provision_time
297 elif state == ResourceState.READY:
299 elif state == ResourceState.STARTED:
301 elif state == ResourceState.STOPPED:
304 # Only keep time information for START and STOP
307 d = strfdiff(strfnow(), t)
308 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
313 return reschedule, delay
315 def set_with_conditions(self, name, value, group, state, time):
316 """ Set value 'value' on attribute with name 'name' when 'time'
317 has elapsed since all elements in 'group' have reached state
320 :param name: Name of the attribute to set
322 :param name: Value of the attribute to set
324 :param group: Group of RMs to wait for (list of guids)
325 :type group: int or list of int
326 :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
328 :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
334 delay = _reschedule_delay
336 ## evaluate if set conditions are met
338 # only can set with conditions after the RM is started
339 if self.state != ResourceState.STARTED:
342 reschedule, delay = self._needs_reschedule(group, state, time)
345 callback = functools.partial(self.set_with_conditions,
346 name, value, group, state, time)
347 self.ec.schedule(delay, callback)
349 self.set(name, value)
351 def start_with_conditions(self):
352 """ Starts RM when all the conditions in self.conditions for
353 action 'START' are satisfied.
357 delay = _reschedule_delay
359 ## evaluate if set conditions are met
361 # only can start when RM is either STOPPED or READY
362 if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
365 self.logger.debug("---- START CONDITIONS ---- %s" %
366 self.conditions.get(ResourceAction.START))
368 # Verify all start conditions are met
369 start_conditions = self.conditions.get(ResourceAction.START, [])
370 for (group, state, time) in start_conditions:
371 reschedule, delay = self._needs_reschedule(group, state, time)
376 self.ec.schedule(delay, self.start_with_conditions)
378 self.logger.debug("----- STARTING ---- ")
381 def stop_with_conditions(self):
382 """ Stops RM when all the conditions in self.conditions for
383 action 'STOP' are satisfied.
387 delay = _reschedule_delay
389 ## evaluate if set conditions are met
391 # only can stop when RM is STARTED
392 if self.state != ResourceState.STARTED:
395 self.logger.debug(" ---- STOP CONDITIONS ---- %s" %
396 self.conditions.get(ResourceAction.STOP))
398 stop_conditions = self.conditions.get(ResourceAction.STOP, [])
399 for (group, state, time) in stop_conditions:
400 reschedule, delay = self._needs_reschedule(group, state, time)
405 callback = functools.partial(self.stop_with_conditions)
406 self.ec.schedule(delay, callback)
408 self.logger.debug(" ----- STOPPING ---- ")
412 """ Execute all steps required for the RM to reach the state READY
415 if self._state > ResourceState.READY:
416 self.logger.error("Wrong state %s for deploy" % self.state)
419 self._ready_time = strfnow()
420 self._state = ResourceState.READY
423 """Clean the resource at the end of the Experiment and change the status
426 self._release_time = strfnow()
427 self._state = ResourceState.RELEASED
429 def valid_connection(self, guid):
430 """Check if the connection is available.
432 :param guid: Guid of the current Resource Manager
440 class ResourceFactory(object):
441 _resource_types = dict()
444 def resource_types(cls):
445 return cls._resource_types
448 def register_type(cls, rclass):
449 cls._resource_types[rclass.rtype()] = rclass
452 def create(cls, rtype, ec, guid):
453 rclass = cls._resource_types[rtype]
454 return rclass(ec, guid)