Added Linux Application
[nepi.git] / src / neco / execution / resource.py
1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2 from neco.execution.trace import TraceAttr
3
4 import copy
5 import functools
6 import logging
7 import weakref
8
9 _reschedule_delay = "1s"
10
11 class ResourceAction:
12     DEPLOY = 0
13     START = 1
14     STOP = 2
15
16 class ResourceState:
17     NEW = 0
18     DISCOVERED = 1
19     PROVISIONED = 2
20     READY = 3
21     STARTED = 4
22     STOPPED = 5
23     FINISHED = 6
24     FAILED = 7
25     RELEASED = 8
26
27 def clsinit(cls):
28     cls._clsinit()
29     return cls
30
31 # Decorator to invoke class initialization method
32 @clsinit
33 class ResourceManager(object):
34     _rtype = "Resource"
35     _filters = None
36     _attributes = None
37     _traces = None
38
39     @classmethod
40     def _register_filter(cls, attr):
41         """ Resource subclasses will invoke this method to add a 
42         filter attribute
43
44         """
45         cls._filters[attr.name] = attr
46
47     @classmethod
48     def _register_attribute(cls, attr):
49         """ Resource subclasses will invoke this method to add a 
50         resource attribute
51
52         """
53         cls._attributes[attr.name] = attr
54
55     @classmethod
56     def _register_trace(cls, trace):
57         """ Resource subclasses will invoke this method to add a 
58         resource trace
59
60         """
61         cls._traces[trace.name] = trace
62
63
64     @classmethod
65     def _register_filters(cls):
66         """ Resource subclasses will invoke this method to register 
67         resource filters
68
69         """
70         pass
71
72     @classmethod
73     def _register_attributes(cls):
74         """ Resource subclasses will invoke this method to register
75         resource attributes
76
77         """
78         pass
79
80     @classmethod
81     def _register_traces(cls):
82         """ Resource subclasses will invoke this method to register
83         resource traces
84
85         """
86         pass
87
88     @classmethod
89     def _clsinit(cls):
90         """ Create a new dictionnary instance of the dictionnary 
91         with the same template.
92  
93         Each ressource should have the same registration dictionary
94         template with different instances.
95         """
96         # static template for resource filters
97         cls._filters = dict()
98         cls._register_filters()
99
100         # static template for resource attributes
101         cls._attributes = dict()
102         cls._register_attributes()
103
104         # static template for resource traces
105         cls._traces = dict()
106         cls._register_traces()
107
108     @classmethod
109     def rtype(cls):
110         return cls._rtype
111
112     @classmethod
113     def get_filters(cls):
114         """ Returns a copy of the filters
115
116         """
117         return copy.deepcopy(cls._filters.values())
118
119     @classmethod
120     def get_attributes(cls):
121         """ Returns a copy of the attributes
122
123         """
124         return copy.deepcopy(cls._attributes.values())
125
126     @classmethod
127     def get_traces(cls):
128         """ Returns a copy of the traces
129
130         """
131         return copy.deepcopy(cls._traces.values())
132
133     def __init__(self, ec, guid):
134         self._guid = guid
135         self._ec = weakref.ref(ec)
136         self._connections = set()
137         self._conditions = dict() 
138
139         # the resource instance gets a copy of all attributes
140         self._attrs = copy.deepcopy(self._attributes)
141
142         # the resource instance gets a copy of all traces
143         self._trcs = copy.deepcopy(self._traces)
144
145         self._state = ResourceState.NEW
146
147         self._start_time = None
148         self._stop_time = None
149         self._discover_time = None
150         self._provision_time = None
151         self._ready_time = None
152         self._release_time = None
153
154         # Logging
155         self._logger = logging.getLogger("neco.execution.resource.Resource %s.%d " %  (self._rtype, self.guid))
156
157     @property
158     def logger(self):
159         return self._logger
160
161     @property
162     def guid(self):
163         return self._guid
164
165     @property
166     def ec(self):
167         return self._ec()
168
169     @property
170     def connections(self):
171         return self._connections
172
173     @property
174     def conditions(self):
175         return self._conditions
176
177     @property
178     def start_time(self):
179         """ Returns timestamp with the time the RM started """
180         return self._start_time
181
182     @property
183     def stop_time(self):
184         """ Returns timestamp with the time the RM stopped """
185         return self._stop_time
186
187     @property
188     def discover_time(self):
189         """ Returns timestamp with the time the RM passed to state discovered """
190         return self._discover_time
191
192     @property
193     def provision_time(self):
194         """ Returns timestamp with the time the RM passed to state provisioned """
195         return self._provision_time
196
197     @property
198     def ready_time(self):
199         """ Returns timestamp with the time the RM passed to state ready  """
200         return self._ready_time
201
202     @property
203     def release_time(self):
204         """ Returns timestamp with the time the RM was released """
205         return self._release_time
206
207     @property
208     def state(self):
209         return self._state
210
211     def connect(self, guid):
212         if self.valid_connection(guid):
213             self._connections.add(guid)
214
215     def discover(self, filters = None):
216         self._discover_time = strfnow()
217         self._state = ResourceState.DISCOVERED
218
219     def provision(self, filters = None):
220         self._provision_time = strfnow()
221         self._state = ResourceState.PROVISIONED
222
223     def start(self):
224         """ Start the Resource Manager
225
226         """
227         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
228             self.logger.error("Wrong state %s for start" % self.state)
229             return
230
231         self._start_time = strfnow()
232         self._state = ResourceState.STARTED
233
234     def stop(self):
235         """ Start the Resource Manager
236
237         """
238         if not self._state in [ResourceState.STARTED]:
239             self.logger.error("Wrong state %s for stop" % self.state)
240             return
241
242         self._stop_time = strfnow()
243         self._state = ResourceState.STOPPED
244
245     def set(self, name, value):
246         """ Set the value of the attribute
247
248         :param name: Name of the attribute
249         :type name: str
250         :param name: Value of the attribute
251         :type name: str
252         """
253         attr = self._attrs[name]
254         attr.value = value
255
256     def get(self, name):
257         """ Start the Resource Manager
258
259         :param name: Name of the attribute
260         :type name: str
261         :rtype: str
262         """
263         attr = self._attrs[name]
264         return attr.value
265
266     def register_trace(self, name):
267         """ Enable trace
268
269         :param name: Name of the trace
270         :type name: str
271         """
272         trace = self._trcs[name]
273         trace.enabled = True
274
275     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
276         """ Get information on collected trace
277
278         :param name: Name of the trace
279         :type name: str
280
281         :param attr: Can be one of:
282                          - TraceAttr.ALL (complete trace content), 
283                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
284                          - TraceAttr.PATH (full path to the trace file),
285                          - TraceAttr.SIZE (size of trace file). 
286         :type attr: str
287
288         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
289         :type name: int
290
291         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
292         :type name: int
293
294         :rtype: str
295         """
296         pass
297
298     def register_condition(self, action, group, state, 
299             time = None):
300         """ Registers a condition on the resource manager to allow execution 
301         of 'action' only after 'time' has elapsed from the moment all resources 
302         in 'group' reached state 'state'
303
304         :param action: Action to restrict to condition (either 'START' or 'STOP')
305         :type action: str
306         :param group: Group of RMs to wait for (list of guids)
307         :type group: int or list of int
308         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
309         :type state: str
310         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
311         :type time: str
312
313         """
314         conditions = self.conditions.get(action)
315         if not conditions:
316             conditions = list()
317             self._conditions[action] = conditions
318
319         # For each condition to register a tuple of (group, state, time) is 
320         # added to the 'action' list
321         if not isinstance(group, list):
322             group = [group]
323
324         conditions.append((group, state, time))
325
326     def get_connected(self, rtype):
327         connected = []
328         for guid in self.connections:
329             rm = self.ec.get_resource(guid)
330             if rm.rtype() == rtype:
331                 connected.append(rm)
332         return connected
333
334     def _needs_reschedule(self, group, state, time):
335         """ Internal method that verify if 'time' has elapsed since 
336         all elements in 'group' have reached state 'state'.
337
338         :param group: Group of RMs to wait for (list of guids)
339         :type group: int or list of int
340         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
341         :type state: str
342         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
343         :type time: str
344
345         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
346         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
347         For the moment, 2m30s is not a correct syntax.
348
349         """
350         reschedule = False
351         delay = _reschedule_delay 
352
353         # check state and time elapsed on all RMs
354         for guid in group:
355             rm = self.ec.get_resource(guid)
356             # If the RM state is lower than the requested state we must
357             # reschedule (e.g. if RM is READY but we required STARTED)
358             if rm.state < state:
359                 reschedule = True
360                 break
361
362             # If there is a time restriction, we must verify the
363             # restriction is satisfied 
364             if time:
365                 if state == ResourceState.DISCOVERED:
366                     t = rm.discover_time
367                 if state == ResourceState.PROVISIONED:
368                     t = rm.provision_time
369                 elif state == ResourceState.READY:
370                     t = rm.ready_time
371                 elif state == ResourceState.STARTED:
372                     t = rm.start_time
373                 elif state == ResourceState.STOPPED:
374                     t = rm.stop_time
375                 else:
376                     # Only keep time information for START and STOP
377                     break
378
379                 d = strfdiff(strfnow(), t)
380                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
381                 if wait > 0.001:
382                     reschedule = True
383                     delay = "%fs" % wait
384                     break
385         return reschedule, delay
386
387     def set_with_conditions(self, name, value, group, state, time):
388         """ Set value 'value' on attribute with name 'name' when 'time' 
389             has elapsed since all elements in 'group' have reached state
390            'state'
391
392         :param name: Name of the attribute to set
393         :type name: str
394         :param name: Value of the attribute to set
395         :type name: str
396         :param group: Group of RMs to wait for (list of guids)
397         :type group: int or list of int
398         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
399         :type state: str
400         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
401         :type time: str
402
403         """
404
405         reschedule = False
406         delay = _reschedule_delay 
407
408         ## evaluate if set conditions are met
409
410         # only can set with conditions after the RM is started
411         if self.state != ResourceState.STARTED:
412             reschedule = True
413         else:
414             reschedule, delay = self._needs_reschedule(group, state, time)
415
416         if reschedule:
417             callback = functools.partial(self.set_with_conditions, 
418                     name, value, group, state, time)
419             self.ec.schedule(delay, callback)
420         else:
421             self.set(name, value)
422
423     def start_with_conditions(self):
424         """ Starts RM when all the conditions in self.conditions for
425         action 'START' are satisfied.
426
427         """
428         reschedule = False
429         delay = _reschedule_delay 
430
431         ## evaluate if set conditions are met
432
433         # only can start when RM is either STOPPED or READY
434         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
435             reschedule = True
436             self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state )
437         else:
438             self.logger.debug("---- START CONDITIONS ---- %s" % 
439                     self.conditions.get(ResourceAction.START))
440             
441             # Verify all start conditions are met
442             start_conditions = self.conditions.get(ResourceAction.START, [])
443             for (group, state, time) in start_conditions:
444                 reschedule, delay = self._needs_reschedule(group, state, time)
445                 if reschedule:
446                     break
447
448         if reschedule:
449             self.ec.schedule(delay, self.start_with_conditions)
450         else:
451             self.logger.debug("----- STARTING ---- ")
452             self.start()
453
454     def stop_with_conditions(self):
455         """ Stops RM when all the conditions in self.conditions for
456         action 'STOP' are satisfied.
457
458         """
459         reschedule = False
460         delay = _reschedule_delay 
461
462         ## evaluate if set conditions are met
463
464         # only can stop when RM is STARTED
465         if self.state != ResourceState.STARTED:
466             reschedule = True
467         else:
468             self.logger.debug(" ---- STOP CONDITIONS ---- %s" % 
469                     self.conditions.get(ResourceAction.STOP))
470
471             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
472             for (group, state, time) in stop_conditions:
473                 reschedule, delay = self._needs_reschedule(group, state, time)
474                 if reschedule:
475                     break
476
477         if reschedule:
478             callback = functools.partial(self.stop_with_conditions)
479             self.ec.schedule(delay, callback)
480         else:
481             self.logger.debug(" ----- STOPPING ---- ") 
482             self.stop()
483
484     def deploy(self):
485         """ Execute all steps required for the RM to reach the state READY
486
487         """
488         if self._state > ResourceState.READY:
489             self.logger.error("Wrong state %s for deploy" % self.state)
490             return
491
492         self._ready_time = strfnow()
493         self._state = ResourceState.READY
494
495     def release(self):
496         """Clean the resource at the end of the Experiment and change the status
497
498         """
499         self._release_time = strfnow()
500         self._state = ResourceState.RELEASED
501
502     def valid_connection(self, guid):
503         """Check if the connection is available.
504
505         :param guid: Guid of the current Resource Manager
506         :type guid: int
507         :rtype:  Boolean
508
509         """
510         # TODO: Validate!
511         return True
512
513 class ResourceFactory(object):
514     _resource_types = dict()
515
516     @classmethod
517     def resource_types(cls):
518         return cls._resource_types
519
520     @classmethod
521     def register_type(cls, rclass):
522         cls._resource_types[rclass.rtype()] = rclass
523
524     @classmethod
525     def create(cls, rtype, ec, guid):
526         rclass = cls._resource_types[rtype]
527         return rclass(ec, guid)
528