f7010b0b89f34cdf026742bc68e50e7dbbfd8511
[nepi.git] / src / neco / execution / resource.py
1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2
3 import copy
4 import functools
5 import logging
6 import weakref
7
8 _reschedule_delay = "1s"
9
10 class ResourceAction:
11     DEPLOY = 0
12     START = 1
13     STOP = 2
14
15 class ResourceState:
16     NEW = 0
17     DISCOVERED = 1
18     PROVISIONED = 2
19     READY = 3
20     STARTED = 4
21     STOPPED = 5
22     FAILED = 6
23     RELEASED = 7
24
25 def clsinit(cls):
26     cls._clsinit()
27     return cls
28
29 # Decorator to invoke class initialization method
30 @clsinit
31 class ResourceManager(object):
32     _rtype = "Resource"
33     _filters = None
34     _attributes = None
35
36     @classmethod
37     def _register_filter(cls, attr):
38         """ Resource subclasses will invoke this method to add a 
39         filter attribute
40
41         """
42         cls._filters[attr.name] = attr
43
44     @classmethod
45     def _register_attribute(cls, attr):
46         """ Resource subclasses will invoke this method to add a 
47         resource attribute
48
49         """
50         cls._attributes[attr.name] = attr
51
52     @classmethod
53     def _register_filters(cls):
54         """ Resource subclasses will invoke this method to add a 
55         filter attribute
56
57         """
58         pass
59
60     @classmethod
61     def _register_attributes(cls):
62         """ Resource subclasses will invoke this method to add a 
63         resource attribute
64
65         """
66         pass
67
68     @classmethod
69     def _clsinit(cls):
70         """ Create a new dictionnary instance of the dictionnary 
71         with the same template.
72  
73         Each ressource should have the same registration dictionary
74         template with different instances.
75         """
76         # static template for resource filters
77         cls._filters = dict()
78         cls._register_filters()
79
80         # static template for resource attributes
81         cls._attributes = dict()
82         cls._register_attributes()
83
84     @classmethod
85     def rtype(cls):
86         return cls._rtype
87
88     @classmethod
89     def get_filters(cls):
90         """ Returns a copy of the filters
91
92         """
93         return copy.deepcopy(cls._filters.values())
94
95     @classmethod
96     def get_attributes(cls):
97         """ Returns a copy of the attributes
98
99         """
100         return copy.deepcopy(cls._attributes.values())
101
102     def __init__(self, ec, guid):
103         self._guid = guid
104         self._ec = weakref.ref(ec)
105         self._connections = set()
106         self._conditions = dict() 
107
108         # the resource instance gets a copy of all attributes
109         # that can modify
110         self._attrs = copy.deepcopy(self._attributes)
111
112         self._state = ResourceState.NEW
113
114         self._start_time = None
115         self._stop_time = None
116         self._discover_time = None
117         self._provision_time = None
118         self._ready_time = None
119         self._release_time = None
120
121         # Logging
122         self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " %  (self._rtype, self.guid))
123
124     @property
125     def logger(self):
126         return self._logger
127
128     @property
129     def guid(self):
130         return self._guid
131
132     @property
133     def ec(self):
134         return self._ec()
135
136     @property
137     def connections(self):
138         return self._connections
139
140     @property
141     def conditions(self):
142         return self._conditions
143
144     @property
145     def start_time(self):
146         """ Returns timestamp with the time the RM started """
147         return self._start_time
148
149     @property
150     def stop_time(self):
151         """ Returns timestamp with the time the RM stopped """
152         return self._stop_time
153
154     @property
155     def discover_time(self):
156         """ Returns timestamp with the time the RM passed to state discovered """
157         return self._discover_time
158
159     @property
160     def provision_time(self):
161         """ Returns timestamp with the time the RM passed to state provisioned """
162         return self._provision_time
163
164     @property
165     def ready_time(self):
166         """ Returns timestamp with the time the RM passed to state ready  """
167         return self._ready_time
168
169     @property
170     def release_time(self):
171         """ Returns timestamp with the time the RM was released """
172         return self._release_time
173
174     @property
175     def state(self):
176         return self._state
177
178     def connect(self, guid):
179         if self.valid_connection(guid):
180             self._connections.add(guid)
181
182     def discover(self, filters = None):
183         self._discover_time = strfnow()
184         self._state = ResourceState.DISCOVERED
185
186     def provision(self, filters = None):
187         self._provision_time = strfnow()
188         self._state = ResourceState.PROVISIONED
189
190     def start(self):
191         """ Start the Resource Manager
192
193         """
194         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
195             self.logger.error("Wrong state %s for start" % self.state)
196             return
197
198         self._start_time = strfnow()
199         self._state = ResourceState.STARTED
200
201     def stop(self):
202         """ Start the Resource Manager
203
204         """
205         if not self._state in [ResourceState.STARTED]:
206             self.logger.error("Wrong state %s for stop" % self.state)
207             return
208
209         self._stop_time = strfnow()
210         self._state = ResourceState.STOPPED
211
212     def set(self, name, value):
213         """ Set the value of the attribute
214
215         :param name: Name of the attribute
216         :type name: str
217         :param name: Value of the attribute
218         :type name: str
219         :rtype:  Boolean
220         """
221         attr = self._attrs[name]
222         attr.value = value
223
224     def get(self, name):
225         """ Start the Resource Manager
226
227         :param name: Name of the attribute
228         :type name: str
229         :rtype: str
230         """
231         attr = self._attrs[name]
232         return attr.value
233
234     def register_condition(self, action, group, state, 
235             time = None):
236         """ Registers a condition on the resource manager to allow execution 
237         of 'action' only after 'time' has elapsed from the moment all resources 
238         in 'group' reached state 'state'
239
240         :param action: Action to restrict to condition (either 'START' or 'STOP')
241         :type action: str
242         :param group: Group of RMs to wait for (list of guids)
243         :type group: int or list of int
244         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
245         :type state: str
246         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
247         :type time: str
248
249         """
250         conditions = self.conditions.get(action)
251         if not conditions:
252             conditions = list()
253             self._conditions[action] = conditions
254
255         # For each condition to register a tuple of (group, state, time) is 
256         # added to the 'action' list
257         if not isinstance(group, list):
258             group = [group]
259
260         conditions.append((group, state, time))
261
262     def _needs_reschedule(self, group, state, time):
263         """ Internal method that verify if 'time' has elapsed since 
264         all elements in 'group' have reached state 'state'.
265
266         :param group: Group of RMs to wait for (list of guids)
267         :type group: int or list of int
268         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
269         :type state: str
270         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
271         :type time: str
272
273         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
274         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
275         For the moment, 2m30s is not a correct syntax.
276
277         """
278         reschedule = False
279         delay = _reschedule_delay 
280
281         # check state and time elapsed on all RMs
282         for guid in group:
283             rm = self.ec.get_resource(guid)
284             # If the RM state is lower than the requested state we must
285             # reschedule (e.g. if RM is READY but we required STARTED)
286             if rm.state < state:
287                 reschedule = True
288                 break
289
290             # If there is a time restriction, we must verify the
291             # restriction is satisfied 
292             if time:
293                 if state == ResourceState.DISCOVERED:
294                     t = rm.discover_time
295                 if state == ResourceState.PROVISIONED:
296                     t = rm.provision_time
297                 elif state == ResourceState.READY:
298                     t = rm.ready_time
299                 elif state == ResourceState.STARTED:
300                     t = rm.start_time
301                 elif state == ResourceState.STOPPED:
302                     t = rm.stop_time
303                 else:
304                     # Only keep time information for START and STOP
305                     break
306
307                 d = strfdiff(strfnow(), t)
308                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
309                 if wait > 0.001:
310                     reschedule = True
311                     delay = "%fs" % wait
312                     break
313         return reschedule, delay
314
315     def set_with_conditions(self, name, value, group, state, time):
316         """ Set value 'value' on attribute with name 'name' when 'time' 
317             has elapsed since all elements in 'group' have reached state
318            'state'
319
320         :param name: Name of the attribute to set
321         :type name: str
322         :param name: Value of the attribute to set
323         :type name: str
324         :param group: Group of RMs to wait for (list of guids)
325         :type group: int or list of int
326         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
327         :type state: str
328         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
329         :type time: str
330
331         """
332
333         reschedule = False
334         delay = _reschedule_delay 
335
336         ## evaluate if set conditions are met
337
338         # only can set with conditions after the RM is started
339         if self.state != ResourceState.STARTED:
340             reschedule = True
341         else:
342             reschedule, delay = self._needs_reschedule(group, state, time)
343
344         if reschedule:
345             callback = functools.partial(self.set_with_conditions, 
346                     name, value, group, state, time)
347             self.ec.schedule(delay, callback)
348         else:
349             self.set(name, value)
350
351     def start_with_conditions(self):
352         """ Starts RM when all the conditions in self.conditions for
353         action 'START' are satisfied.
354
355         """
356         reschedule = False
357         delay = _reschedule_delay 
358
359         ## evaluate if set conditions are met
360
361         # only can start when RM is either STOPPED or READY
362         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
363             reschedule = True
364         else:
365             self.logger.debug("---- START CONDITIONS ---- %s" % 
366                     self.conditions.get(ResourceAction.START))
367             
368             # Verify all start conditions are met
369             start_conditions = self.conditions.get(ResourceAction.START, [])
370             for (group, state, time) in start_conditions:
371                 reschedule, delay = self._needs_reschedule(group, state, time)
372                 if reschedule:
373                     break
374
375         if reschedule:
376             self.ec.schedule(delay, self.start_with_conditions)
377         else:
378             self.logger.debug("----- STARTING ---- ")
379             self.start()
380
381     def stop_with_conditions(self):
382         """ Stops RM when all the conditions in self.conditions for
383         action 'STOP' are satisfied.
384
385         """
386         reschedule = False
387         delay = _reschedule_delay 
388
389         ## evaluate if set conditions are met
390
391         # only can stop when RM is STARTED
392         if self.state != ResourceState.STARTED:
393             reschedule = True
394         else:
395             self.logger.debug(" ---- STOP CONDITIONS ---- %s" % 
396                     self.conditions.get(ResourceAction.STOP))
397
398             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
399             for (group, state, time) in stop_conditions:
400                 reschedule, delay = self._needs_reschedule(group, state, time)
401                 if reschedule:
402                     break
403
404         if reschedule:
405             callback = functools.partial(self.stop_with_conditions)
406             self.ec.schedule(delay, callback)
407         else:
408             self.logger.debug(" ----- STOPPING ---- ") 
409             self.stop()
410
411     def deploy(self):
412         """ Execute all steps required for the RM to reach the state READY
413
414         """
415         if self._state > ResourceState.READY:
416             self.logger.error("Wrong state %s for deploy" % self.state)
417             return
418
419         self._ready_time = strfnow()
420         self._state = ResourceState.READY
421
422     def release(self):
423         """Clean the resource at the end of the Experiment and change the status
424
425         """
426         self._release_time = strfnow()
427         self._state = ResourceState.RELEASED
428
429     def valid_connection(self, guid):
430         """Check if the connection is available.
431
432         :param guid: Guid of the current Resource Manager
433         :type guid: int
434         :rtype:  Boolean
435
436         """
437         # TODO: Validate!
438         return True
439
440 class ResourceFactory(object):
441     _resource_types = dict()
442
443     @classmethod
444     def resource_types(cls):
445         return cls._resource_types
446
447     @classmethod
448     def register_type(cls, rclass):
449         cls._resource_types[rclass.rtype()] = rclass
450
451     @classmethod
452     def create(cls, rtype, ec, guid):
453         rclass = cls._resource_types[rtype]
454         return rclass(ec, guid)
455