2 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
10 _reschedule_delay = "1s"
28 # Decorator to invoke class initialization method
30 class ResourceManager(object):
36 def _register_filter(cls, attr):
37 """ Resource subclasses will invoke this method to add a
41 cls._filters[attr.name] = attr
44 def _register_attribute(cls, attr):
45 """ Resource subclasses will invoke this method to add a
49 cls._attributes[attr.name] = attr
52 def _register_filters(cls):
53 """ Resource subclasses will invoke this method to add a
60 def _register_attributes(cls):
61 """ Resource subclasses will invoke this method to add a
69 """ Create a new dictionnary instance of the dictionnary
70 with the same template.
72 Each ressource should have the same registration dictionary
73 template with different instances.
75 # static template for resource filters
77 cls._register_filters()
79 # static template for resource attributes
80 cls._attributes = dict()
81 cls._register_attributes()
89 """ Returns a copy of the filters
92 return copy.deepcopy(cls._filters.values())
95 def get_attributes(cls):
96 """ Returns a copy of the attributes
99 return copy.deepcopy(cls._attributes.values())
101 def __init__(self, ec, guid):
103 self._ec = weakref.ref(ec)
104 self._connections = set()
105 self._conditions = dict()
107 # the resource instance gets a copy of all attributes
109 self._attrs = copy.deepcopy(self._attributes)
111 self._state = ResourceState.NEW
113 self._start_time = None
114 self._stop_time = None
117 self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
133 def connections(self):
134 return self._connections
137 def conditions(self):
138 return self._conditions
141 def start_time(self):
142 """ timestamp with """
143 return self._start_time
147 return self._stop_time
153 def connect(self, guid):
154 if (self._validate_connection(guid)):
155 self._connections.add(guid)
157 def discover(self, filters = None):
160 def provision(self, filters = None):
164 """ Start the Resource Manager
167 if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
168 self.logger.error("Wrong state %s for start" % self.state)
170 self._start_time = strfnow()
171 self._state = ResourceState.STARTED
174 """ Start the Resource Manager
177 if not self._state in [ResourceState.STARTED]:
178 self.logger.error("Wrong state %s for stop" % self.state)
180 self._stop_time = strfnow()
181 self._state = ResourceState.STOPPED
183 def set(self, name, value):
184 """ Set the value of the attribute
186 :param name: Name of the attribute
188 :param name: Value of the attribute
192 attr = self._attrs[name]
196 """ Start the Resource Manager
198 :param name: Name of the attribute
202 attr = self._attrs[name]
205 def register_condition(self, action, group, state,
207 """ Do the 'action' after 'time' on the current RM when 'group'
208 reach the state 'state'
210 :param action: Action to do. Either 'START' or 'STOP'
212 :param group: group of RM
214 :param state: RM that are part of the condition
216 :param time: Time to wait after the state is reached (ex : '2s' )
220 if action not in self.conditions:
221 self._conditions[action] = set()
223 # We need to use only sequence inside a set and not a list.
224 # As group is a list, we need to change it.
225 #print (tuple(group), state, time)
226 self.conditions.get(action).add((tuple(group), state, time))
228 def _needs_reschedule(self, group, state, time):
229 """ Internal method that verify if 'time' has elapsed since
230 all elements in 'group' have reached state 'state'.
232 :param group: RM that are part of the condition
234 :param state: State that group need to reach for the condtion
236 :param time: time to wait after the state
242 delay = _reschedule_delay
244 # check state and time elapsed on all RMs
246 rm = self.ec.get_resource(guid)
247 # If the RMs is lower than the requested state we must
248 # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
254 if state == ResourceState.STARTED:
256 elif state == ResourceState.STOPPED:
259 # Only keep time information for START and STOP
262 d = strfdiff(strfnow(), t)
265 delay = "%ds" % (int(time - d) +1)
267 return reschedule, delay
269 def set_with_conditions(self, name, value, group, state, time):
270 """ Set value 'value' on attribute with name 'name' when 'time'
271 has elapsed since all elements in 'group' have reached state
274 :param name: Name of the attribute
276 :param name: Value of the attribute
278 :param group: RM that are part of the condition
280 :param state: State that group need to reach before set
282 :param time: Time to wait after the state is reached (ex : '2s' )
288 delay = _reschedule_delay
290 ## evaluate if set conditions are met
292 # only can set with conditions after the RM is started
293 if self.state != ResourceState.STARTED:
296 reschedule, delay = self._needs_reschedule(group, state, time)
299 callback = functools.partial(self.set_with_conditions,
300 name, value, group, state, time)
301 self.ec.schedule(delay, callback)
303 self.set(name, value)
305 def start_with_conditions(self):
306 """ Starts when all the conditions are reached
310 delay = _reschedule_delay
312 ## evaluate if set conditions are met
314 # only can start when RM is either STOPPED or DEPLOYED
315 if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
318 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items())
319 # Need to separate because it could have more that tuple of condition
320 # for the same action.
321 conditions_start = self.conditions.get(ResourceAction.START, []):
322 for (group, state, time) in conditions_start:
323 reschedule, delay = self._needs_reschedule(group, state, time)
328 callback = functools.partial(self.start_with_conditions)
329 self.ec.schedule(delay, callback)
331 print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
332 ------------------------------------------------------------------------------\
333 ---------------------------------------------------------------- STARTING -- "
336 def stop_with_conditions(self):
337 """ Stop when all the conditions are reached
341 delay = _reschedule_delay
343 ## evaluate if set conditions are met
345 # only can stop when RM is STARTED
346 if self.state != ResourceState.STARTED:
349 #print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
350 (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items())
351 conditions_stop = self.conditions.get(ResourceAction.STOP, [])
352 for (group, state, time) in conditions_stop:
353 reschedule, delay = self._needs_reschedule(group, state, time)
357 callback = functools.partial(self.stop_with_conditions)
358 self.ec.schedule(delay, callback)
363 """Execute all the differents steps required to reach the state DEPLOYED
368 self._state = ResourceState.DEPLOYED
371 """Clean the resource at the end of the Experiment and change the status
374 self._state = ResourceState.RELEASED
376 def _validate_connection(self, guid):
377 """Check if the connection is available.
379 :param guid: Guid of the current Resource Manager
387 class ResourceFactory(object):
388 _resource_types = dict()
391 def resource_types(cls):
392 return cls._resource_types
395 def register_type(cls, rclass):
396 cls._resource_types[rclass.rtype()] = rclass
399 def create(cls, rtype, ec, guid):
400 rclass = cls._resource_types[rtype]
401 return rclass(ec, guid)