2 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
10 _reschedule_delay = "1s"
29 # Decorator to invoke class initialization method
31 class ResourceManager(object):
38 def _register_filter(cls, attr):
39 """ Resource subclasses will invoke this method to add a
43 cls._filters[attr.name] = attr
46 def _register_attribute(cls, attr):
47 """ Resource subclasses will invoke this method to add a
51 cls._attributes[attr.name] = attr
54 def _register_filters(cls):
55 """ Resource subclasses will invoke this method to add a
62 def _register_attributes(cls):
63 """ Resource subclasses will invoke this method to add a
71 """ Create a new dictionnary instance of the dictionnary
72 with the same template.
74 Each ressource should have the same registration dictionary
75 template with different instances.
77 # static template for resource filters
79 cls._register_filters()
81 # static template for resource attributes
82 cls._attributes = dict()
83 cls._register_attributes()
95 """ Returns a copy of the filters
98 return copy.deepcopy(cls._filters.values())
101 def get_attributes(cls):
102 """ Returns a copy of the attributes
105 return copy.deepcopy(cls._attributes.values())
107 def __init__(self, ec, guid):
109 self._ec = weakref.ref(ec)
110 self._connections = set()
111 self._conditions = dict()
113 # the resource instance gets a copy of all attributes
115 self._attrs = copy.deepcopy(self._attributes)
117 self._state = ResourceState.NEW
119 self._start_time = None
120 self._stop_time = None
123 self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
139 def connections(self):
140 return self._connections
143 def conditions(self):
144 return self._conditions
147 def start_time(self):
148 """ timestamp with """
149 return self._start_time
153 return self._stop_time
156 def deploy_time(self):
157 return self._deploy_time
163 def connect(self, guid):
164 if (self._validate_connection(guid)):
165 self._connections.add(guid)
167 def discover(self, filters = None):
170 def provision(self, filters = None):
174 """ Start the Resource Manager
177 if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
178 self.logger.error("Wrong state %s for start" % self.state)
180 self._start_time = strfnow()
181 self._state = ResourceState.STARTED
184 """ Start the Resource Manager
187 if not self._state in [ResourceState.STARTED]:
188 self.logger.error("Wrong state %s for stop" % self.state)
190 self._stop_time = strfnow()
191 self._state = ResourceState.STOPPED
193 def set(self, name, value):
194 """ Set the value of the attribute
196 :param name: Name of the attribute
198 :param name: Value of the attribute
202 attr = self._attrs[name]
206 """ Start the Resource Manager
208 :param name: Name of the attribute
212 attr = self._attrs[name]
215 def register_condition(self, action, group, state,
217 """ Do the 'action' after 'time' on the current RM when 'group'
218 reach the state 'state'
220 :param action: Action to do. Either 'START' or 'STOP'
222 :param group: group of RM
224 :param state: RM that are part of the condition
226 :param time: Time to wait after the state is reached (ex : '2s' )
230 if action not in self.conditions:
231 self._conditions[action] = set()
233 # We need to use only sequence inside a set and not a list.
234 # As group is a list, we need to change it.
235 #print (tuple(group), state, time)
236 self.conditions.get(action).add((tuple(group), state, time))
238 def _needs_reschedule(self, group, state, time):
239 """ Internal method that verify if 'time' has elapsed since
240 all elements in 'group' have reached state 'state'.
242 :param group: RM that are part of the condition
244 :param state: State that group need to reach for the condtion
246 :param time: time to wait after the state
249 .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
250 If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
251 For the moment, 2m30s is not a correct syntax.
255 delay = _reschedule_delay
257 # check state and time elapsed on all RMs
259 rm = self.ec.get_resource(guid)
260 # If the RMs is lower than the requested state we must
261 # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
267 if state == ResourceState.DEPLOYED:
269 elif state == ResourceState.STARTED:
271 elif state == ResourceState.STOPPED:
274 # Only keep time information for START and STOP
277 d = strfdiff(strfnow(), t)
278 #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
279 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
284 return reschedule, delay
286 def set_with_conditions(self, name, value, group, state, time):
287 """ Set value 'value' on attribute with name 'name' when 'time'
288 has elapsed since all elements in 'group' have reached state
291 :param name: Name of the attribute
293 :param name: Value of the attribute
295 :param group: RM that are part of the condition
297 :param state: State that group need to reach before set
299 :param time: Time to wait after the state is reached (ex : '2s' )
305 delay = _reschedule_delay
307 ## evaluate if set conditions are met
309 # only can set with conditions after the RM is started
310 if self.state != ResourceState.STARTED:
313 reschedule, delay = self._needs_reschedule(group, state, time)
316 callback = functools.partial(self.set_with_conditions,
317 name, value, group, state, time)
318 self.ec.schedule(delay, callback)
320 self.set(name, value)
322 def start_with_conditions(self):
323 """ Starts when all the conditions are reached
327 delay = _reschedule_delay
329 ## evaluate if set conditions are met
331 # only can start when RM is either STOPPED or DEPLOYED
332 if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
335 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items())
336 # Need to separate because it could have more that tuple of condition
337 # for the same action.
338 conditions_start = self.conditions.get(ResourceAction.START, [])
339 for (group, state, time) in conditions_start:
340 reschedule, delay = self._needs_reschedule(group, state, time)
345 callback = functools.partial(self.start_with_conditions)
346 self.ec.schedule(delay, callback)
348 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
349 ------------------------------------------------------------------------------\
350 ---------------------------------------------------------------- STARTING -- "
353 def stop_with_conditions(self):
354 """ Stop when all the conditions are reached
358 delay = _reschedule_delay
360 ## evaluate if set conditions are met
362 # only can stop when RM is STARTED
363 if self.state != ResourceState.STARTED:
366 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
367 (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items())
368 conditions_stop = self.conditions.get(ResourceAction.STOP, [])
369 for (group, state, time) in conditions_stop:
370 reschedule, delay = self._needs_reschedule(group, state, time)
374 callback = functools.partial(self.stop_with_conditions)
375 self.ec.schedule(delay, callback)
377 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
378 ------------------------------------------------------------------------------\
379 ---------------------------------------------------------------- STOPPING -- "
383 """Execute all the differents steps required to reach the state DEPLOYED
386 self.deploy_restriction()
389 self.deploy_with_conditions()
391 def deploy_restriction(self):
393 for guid in self.connections:
394 if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
396 self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
399 def deploy_with_conditions(self):
400 """ Starts when all the conditions are reached
404 delay = _reschedule_delay
406 ## evaluate if set conditions are met
408 # only can deploy when RM is NEW
409 if not self._state in [ResourceState.NEW]:
410 self.logger.error("Wrong state %s for stop" % self.state)
413 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- deploy condition : " + str(self.conditions.items())
414 # Need to separate because it could have more that tuple of condition
415 # for the same action.
416 conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
417 for (group, state, time) in conditions_deployed:
418 reschedule, delay = self._needs_reschedule(group, state, time)
423 callback = functools.partial(self.deploy_with_conditions)
424 self.ec.schedule(delay, callback)
426 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
427 ------------------------------------------------------------------------------\
428 ---------------------------------------------------------------- DEPLOY -- "
431 def deploy_action(self):
433 self._deploy_time = strfnow()
434 self._state = ResourceState.DEPLOYED
438 """Clean the resource at the end of the Experiment and change the status
441 self._state = ResourceState.RELEASED
443 def _validate_connection(self, guid):
444 """Check if the connection is available.
446 :param guid: Guid of the current Resource Manager
454 class ResourceFactory(object):
455 _resource_types = dict()
458 def resource_types(cls):
459 return cls._resource_types
462 def register_type(cls, rclass):
463 cls._resource_types[rclass.rtype()] = rclass
466 def create(cls, rtype, ec, guid):
467 rclass = cls._resource_types[rtype]
468 return rclass(ec, guid)