little changes
[nepi.git] / src / neco / execution / resource.py
1
2 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
3
4 import copy
5 import functools
6 import logging
7 import weakref
8 import time as TIME
9
10 _reschedule_delay = "1s"
11
12 class ResourceAction:
13     DEPLOYED = 0
14     START = 1
15     STOP = 2
16
17 class ResourceState:
18     NEW = 0
19     DEPLOYED = 1
20     STARTED = 2
21     STOPPED = 3
22     FAILED = 4
23     RELEASED = 5
24
25 def clsinit(cls):
26     cls._clsinit()
27     return cls
28
29 # Decorator to invoke class initialization method
30 @clsinit
31 class ResourceManager(object):
32     _rtype = "Resource"
33     _filters = None
34     _attributes = None
35     _waiters = []
36
37     @classmethod
38     def _register_filter(cls, attr):
39         """ Resource subclasses will invoke this method to add a 
40         filter attribute
41
42         """
43         cls._filters[attr.name] = attr
44
45     @classmethod
46     def _register_attribute(cls, attr):
47         """ Resource subclasses will invoke this method to add a 
48         resource attribute
49
50         """
51         cls._attributes[attr.name] = attr
52
53     @classmethod
54     def _register_filters(cls):
55         """ Resource subclasses will invoke this method to add a 
56         filter attribute
57
58         """
59         pass
60
61     @classmethod
62     def _register_attributes(cls):
63         """ Resource subclasses will invoke this method to add a 
64         resource attribute
65
66         """
67         pass
68
69     @classmethod
70     def _clsinit(cls):
71         """ Create a new dictionnary instance of the dictionnary 
72         with the same template.
73  
74         Each ressource should have the same registration dictionary
75         template with different instances.
76         """
77         # static template for resource filters
78         cls._filters = dict()
79         cls._register_filters()
80
81         # static template for resource attributes
82         cls._attributes = dict()
83         cls._register_attributes()
84
85     @classmethod
86     def rtype(cls):
87         return cls._rtype
88
89     @classmethod
90     def waiters(cls):
91         return cls._waiters
92
93     @classmethod
94     def get_filters(cls):
95         """ Returns a copy of the filters
96
97         """
98         return copy.deepcopy(cls._filters.values())
99
100     @classmethod
101     def get_attributes(cls):
102         """ Returns a copy of the attributes
103
104         """
105         return copy.deepcopy(cls._attributes.values())
106
107     def __init__(self, ec, guid):
108         self._guid = guid
109         self._ec = weakref.ref(ec)
110         self._connections = set()
111         self._conditions = dict() 
112
113         # the resource instance gets a copy of all attributes
114         # that can modify
115         self._attrs = copy.deepcopy(self._attributes)
116
117         self._state = ResourceState.NEW
118
119         self._start_time = None
120         self._stop_time = None
121
122         # Logging
123         self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
124             self.guid)
125
126     @property
127     def logger(self):
128         return self._logger
129
130     @property
131     def guid(self):
132         return self._guid
133
134     @property
135     def ec(self):
136         return self._ec()
137
138     @property
139     def connections(self):
140         return self._connections
141
142     @property
143     def conditions(self):
144         return self._conditions
145
146     @property
147     def start_time(self):
148         """ timestamp with  """
149         return self._start_time
150
151     @property
152     def stop_time(self):
153         return self._stop_time
154
155     @property
156     def deploy_time(self):
157         return self._deploy_time
158
159     @property
160     def state(self):
161         return self._state
162
163     def connect(self, guid):
164         if (self._validate_connection(guid)):
165             self._connections.add(guid)
166
167     def discover(self, filters = None):
168         pass
169
170     def provision(self, filters = None):
171         pass
172
173     def start(self):
174         """ Start the Resource Manager
175
176         """
177         if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
178             self.logger.error("Wrong state %s for start" % self.state)
179
180         self._start_time = strfnow()
181         self._state = ResourceState.STARTED
182
183     def stop(self):
184         """ Start the Resource Manager
185
186         """
187         if not self._state in [ResourceState.STARTED]:
188             self.logger.error("Wrong state %s for stop" % self.state)
189
190         self._stop_time = strfnow()
191         self._state = ResourceState.STOPPED
192
193     def set(self, name, value):
194         """ Set the value of the attribute
195
196         :param name: Name of the attribute
197         :type name: str
198         :param name: Value of the attribute
199         :type name: str
200         :rtype:  Boolean
201         """
202         attr = self._attrs[name]
203         attr.value = value
204
205     def get(self, name):
206         """ Start the Resource Manager
207
208         :param name: Name of the attribute
209         :type name: str
210         :rtype: str
211         """
212         attr = self._attrs[name]
213         return attr.value
214
215     def register_condition(self, action, group, state, 
216             time = None):
217         """ Do the 'action' after 'time' on the current RM when 'group' 
218          reach the state 'state'
219
220         :param action: Action to do. Either 'START' or 'STOP'
221         :type action: str
222         :param group: group of RM
223         :type group: str
224         :param state: RM that are part of the condition
225         :type state: list
226         :param time: Time to wait after the state is reached (ex : '2s' )
227         :type time: str
228
229         """
230         if action not in self.conditions:
231             self._conditions[action] = set()
232
233         # We need to use only sequence inside a set and not a list. 
234         # As group is a list, we need to change it.
235         #print (tuple(group), state, time)
236         self.conditions.get(action).add((tuple(group), state, time))
237
238     def _needs_reschedule(self, group, state, time):
239         """ Internal method that verify if 'time' has elapsed since 
240         all elements in 'group' have reached state 'state'.
241
242         :param group: RM that are part of the condition
243         :type group: list
244         :param state: State that group need to reach for the condtion
245         :type state: str
246         :param time: time to wait after the state
247         :type time: str
248
249         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
250         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
251         For the moment, 2m30s is not a correct syntax.
252
253         """
254         reschedule = False
255         delay = _reschedule_delay 
256
257         # check state and time elapsed on all RMs
258         for guid in group:
259             rm = self.ec.get_resource(guid)
260             # If the RMs is lower than the requested state we must
261             # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
262             if rm.state < state:
263                 reschedule = True
264                 break
265
266             if time:
267                 if state == ResourceState.DEPLOYED:
268                     t = rm.deploy_time
269                 elif state == ResourceState.STARTED:
270                     t = rm.start_time
271                 elif state == ResourceState.STOPPED:
272                     t = rm.stop_time
273                 else:
274                     # Only keep time information for START and STOP
275                     break
276
277                 d = strfdiff(strfnow(), t)
278                 #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
279                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
280                 if wait > 0.001:
281                     reschedule = True
282                     delay = "%fs" % wait
283                     break
284         return reschedule, delay
285
286     def set_with_conditions(self, name, value, group, state, time):
287         """ Set value 'value' on attribute with name 'name' when 'time' 
288             has elapsed since all elements in 'group' have reached state
289            'state'.
290
291         :param name: Name of the attribute
292         :type name: str
293         :param name: Value of the attribute
294         :type name: str
295         :param group: RM that are part of the condition
296         :type group: list
297         :param state: State that group need to reach before set
298         :type state: str
299         :param time: Time to wait after the state is reached (ex : '2s' )
300         :type time: str
301
302         """
303
304         reschedule = False
305         delay = _reschedule_delay 
306
307         ## evaluate if set conditions are met
308
309         # only can set with conditions after the RM is started
310         if self.state != ResourceState.STARTED:
311             reschedule = True
312         else:
313             reschedule, delay = self._needs_reschedule(group, state, time)
314
315         if reschedule:
316             callback = functools.partial(self.set_with_conditions, 
317                     name, value, group, state, time)
318             self.ec.schedule(delay, callback)
319         else:
320             self.set(name, value)
321
322     def start_with_conditions(self):
323         """ Starts when all the conditions are reached
324
325         """
326         reschedule = False
327         delay = _reschedule_delay 
328
329         ## evaluate if set conditions are met
330
331         # only can start when RM is either STOPPED or DEPLOYED
332         if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
333             reschedule = True
334         else:
335             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  start condition : " + str(self.conditions.items())
336             # Need to separate because it could have more that tuple of condition 
337             # for the same action.
338             conditions_start = self.conditions.get(ResourceAction.START, [])
339             for (group, state, time) in conditions_start:
340                 reschedule, delay = self._needs_reschedule(group, state, time)
341                 if reschedule:
342                     break
343
344         if reschedule:
345             callback = functools.partial(self.start_with_conditions)
346             self.ec.schedule(delay, callback)
347         else:
348             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
349 ------------------------------------------------------------------------------\
350 ----------------------------------------------------------------  STARTING -- "
351             self.start()
352
353     def stop_with_conditions(self):
354         """ Stop when all the conditions are reached
355
356         """
357         reschedule = False
358         delay = _reschedule_delay 
359
360         ## evaluate if set conditions are met
361
362         # only can stop when RM is STARTED
363         if self.state != ResourceState.STARTED:
364             reschedule = True
365         else:
366             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
367  (Guid : "+ str(self.guid) +")  ----  stop condition : " + str(self.conditions.items())
368             conditions_stop = self.conditions.get(ResourceAction.STOP, []) 
369             for (group, state, time) in conditions_stop:
370                 reschedule, delay = self._needs_reschedule(group, state, time)
371                 if reschedule:
372                     break
373         if reschedule:
374             callback = functools.partial(self.stop_with_conditions)
375             self.ec.schedule(delay, callback)
376         else:
377             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
378 ------------------------------------------------------------------------------\
379 ----------------------------------------------------------------  STOPPING -- "
380             self.stop()
381
382     def deploy(self):
383         """Execute all the differents steps required to reach the state DEPLOYED
384
385         """
386         self.deploy_restriction()
387         self.discover()
388         self.provision()
389         self.deploy_with_conditions()
390
391     def deploy_restriction(self):
392         dep = set()
393         for guid in self.connections:
394             if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
395                 dep.add(guid)
396         self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
397
398
399     def deploy_with_conditions(self):
400         """ Starts when all the conditions are reached
401
402         """
403         reschedule = False
404         delay = _reschedule_delay 
405
406         ## evaluate if set conditions are met
407
408         # only can deploy when RM is NEW
409         if not self._state in [ResourceState.NEW]:
410             self.logger.error("Wrong state %s for stop" % self.state)
411             return
412         else:
413             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  deploy condition : " + str(self.conditions.items())
414             # Need to separate because it could have more that tuple of condition 
415             # for the same action.
416             conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
417             for (group, state, time) in conditions_deployed:
418                 reschedule, delay = self._needs_reschedule(group, state, time)
419                 if reschedule:
420                     break
421
422         if reschedule:
423             callback = functools.partial(self.deploy_with_conditions)
424             self.ec.schedule(delay, callback)
425         else:
426             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
427 ------------------------------------------------------------------------------\
428 ----------------------------------------------------------------  DEPLOY -- "
429             self.deploy_action()
430
431     def deploy_action(self):
432
433         self._deploy_time = strfnow()
434         self._state = ResourceState.DEPLOYED
435
436
437     def release(self):
438         """Clean the resource at the end of the Experiment and change the status
439
440         """
441         self._state = ResourceState.RELEASED
442
443     def _validate_connection(self, guid):
444         """Check if the connection is available.
445
446         :param guid: Guid of the current Resource Manager
447         :type guid: int
448         :rtype:  Boolean
449
450         """
451         # TODO: Validate!
452         return True
453
454 class ResourceFactory(object):
455     _resource_types = dict()
456
457     @classmethod
458     def resource_types(cls):
459         return cls._resource_types
460
461     @classmethod
462     def register_type(cls, rclass):
463         cls._resource_types[rclass.rtype()] = rclass
464
465     @classmethod
466     def create(cls, rtype, ec, guid):
467         rclass = cls._resource_types[rtype]
468         return rclass(ec, guid)
469