Supporting many concurrent LinuxApplications on same LinuxNode
[nepi.git] / src / neco / execution / resource.py
1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2 from neco.execution.trace import TraceAttr
3
4 import copy
5 import functools
6 import logging
7 import weakref
8
9 _reschedule_delay = "1s"
10
11 class ResourceAction:
12     DEPLOY = 0
13     START = 1
14     STOP = 2
15
16 class ResourceState:
17     NEW = 0
18     DISCOVERED = 1
19     PROVISIONED = 2
20     READY = 3
21     STARTED = 4
22     STOPPED = 5
23     FINISHED = 6
24     FAILED = 7
25     RELEASED = 8
26
27 def clsinit(cls):
28     cls._clsinit()
29     return cls
30
31 # Decorator to invoke class initialization method
32 @clsinit
33 class ResourceManager(object):
34     _rtype = "Resource"
35     _filters = None
36     _attributes = None
37     _traces = None
38
39     @classmethod
40     def _register_filter(cls, attr):
41         """ Resource subclasses will invoke this method to add a 
42         filter attribute
43
44         """
45         cls._filters[attr.name] = attr
46
47     @classmethod
48     def _register_attribute(cls, attr):
49         """ Resource subclasses will invoke this method to add a 
50         resource attribute
51
52         """
53         cls._attributes[attr.name] = attr
54
55     @classmethod
56     def _register_trace(cls, trace):
57         """ Resource subclasses will invoke this method to add a 
58         resource trace
59
60         """
61         cls._traces[trace.name] = trace
62
63
64     @classmethod
65     def _register_filters(cls):
66         """ Resource subclasses will invoke this method to register 
67         resource filters
68
69         """
70         pass
71
72     @classmethod
73     def _register_attributes(cls):
74         """ Resource subclasses will invoke this method to register
75         resource attributes
76
77         """
78         pass
79
80     @classmethod
81     def _register_traces(cls):
82         """ Resource subclasses will invoke this method to register
83         resource traces
84
85         """
86         pass
87
88     @classmethod
89     def _clsinit(cls):
90         """ Create a new dictionnary instance of the dictionnary 
91         with the same template.
92  
93         Each ressource should have the same registration dictionary
94         template with different instances.
95         """
96         # static template for resource filters
97         cls._filters = dict()
98         cls._register_filters()
99
100         # static template for resource attributes
101         cls._attributes = dict()
102         cls._register_attributes()
103
104         # static template for resource traces
105         cls._traces = dict()
106         cls._register_traces()
107
108     @classmethod
109     def rtype(cls):
110         return cls._rtype
111
112     @classmethod
113     def get_filters(cls):
114         """ Returns a copy of the filters
115
116         """
117         return copy.deepcopy(cls._filters.values())
118
119     @classmethod
120     def get_attributes(cls):
121         """ Returns a copy of the attributes
122
123         """
124         return copy.deepcopy(cls._attributes.values())
125
126     @classmethod
127     def get_traces(cls):
128         """ Returns a copy of the traces
129
130         """
131         return copy.deepcopy(cls._traces.values())
132
133     def __init__(self, ec, guid):
134         self._guid = guid
135         self._ec = weakref.ref(ec)
136         self._connections = set()
137         self._conditions = dict() 
138
139         # the resource instance gets a copy of all attributes
140         self._attrs = copy.deepcopy(self._attributes)
141
142         # the resource instance gets a copy of all traces
143         self._trcs = copy.deepcopy(self._traces)
144
145         self._state = ResourceState.NEW
146
147         self._start_time = None
148         self._stop_time = None
149         self._discover_time = None
150         self._provision_time = None
151         self._ready_time = None
152         self._release_time = None
153
154         # Logging
155         self._logger = logging.getLogger("Resource")
156
157     def debug(self, msg, out = None, err = None):
158         self.log(msg, logging.DEBUG, out, err)
159
160     def error(self, msg, out = None, err = None):
161         self.log(msg, logging.ERROR, out, err)
162
163     def warn(self, msg, out = None, err = None):
164         self.log(msg, logging.WARNING, out, err)
165
166     def info(self, msg, out = None, err = None):
167         self.log(msg, logging.INFO, out, err)
168
169     def log(self, msg, level, out = None, err = None):
170         if out:
171             msg += " - OUT: %s " % out
172
173         if err:
174             msg += " - ERROR: %s " % err
175
176         msg = self.log_message(msg)
177
178         self.logger.log(level, msg)
179
180     def log_message(self, msg):
181         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
182
183     @property
184     def logger(self):
185         return self._logger
186
187     @property
188     def guid(self):
189         return self._guid
190
191     @property
192     def ec(self):
193         return self._ec()
194
195     @property
196     def connections(self):
197         return self._connections
198
199     @property
200     def conditions(self):
201         return self._conditions
202
203     @property
204     def start_time(self):
205         """ Returns timestamp with the time the RM started """
206         return self._start_time
207
208     @property
209     def stop_time(self):
210         """ Returns timestamp with the time the RM stopped """
211         return self._stop_time
212
213     @property
214     def discover_time(self):
215         """ Returns timestamp with the time the RM passed to state discovered """
216         return self._discover_time
217
218     @property
219     def provision_time(self):
220         """ Returns timestamp with the time the RM passed to state provisioned """
221         return self._provision_time
222
223     @property
224     def ready_time(self):
225         """ Returns timestamp with the time the RM passed to state ready  """
226         return self._ready_time
227
228     @property
229     def release_time(self):
230         """ Returns timestamp with the time the RM was released """
231         return self._release_time
232
233     @property
234     def state(self):
235         return self._state
236
237     def connect(self, guid):
238         if self.valid_connection(guid):
239             self._connections.add(guid)
240
241     def discover(self, filters = None):
242         self._discover_time = strfnow()
243         self._state = ResourceState.DISCOVERED
244
245     def provision(self, filters = None):
246         self._provision_time = strfnow()
247         self._state = ResourceState.PROVISIONED
248
249     def start(self):
250         """ Start the Resource Manager
251
252         """
253         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
254             self.error("Wrong state %s for start" % self.state)
255             return
256
257         self._start_time = strfnow()
258         self._state = ResourceState.STARTED
259
260     def stop(self):
261         """ Start the Resource Manager
262
263         """
264         if not self._state in [ResourceState.STARTED]:
265             self.error("Wrong state %s for stop" % self.state)
266             return
267
268         self._stop_time = strfnow()
269         self._state = ResourceState.STOPPED
270
271     def set(self, name, value):
272         """ Set the value of the attribute
273
274         :param name: Name of the attribute
275         :type name: str
276         :param name: Value of the attribute
277         :type name: str
278         """
279         attr = self._attrs[name]
280         attr.value = value
281
282     def get(self, name):
283         """ Start the Resource Manager
284
285         :param name: Name of the attribute
286         :type name: str
287         :rtype: str
288         """
289         attr = self._attrs[name]
290         return attr.value
291
292     def register_trace(self, name):
293         """ Enable trace
294
295         :param name: Name of the trace
296         :type name: str
297         """
298         trace = self._trcs[name]
299         trace.enabled = True
300
301     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
302         """ Get information on collected trace
303
304         :param name: Name of the trace
305         :type name: str
306
307         :param attr: Can be one of:
308                          - TraceAttr.ALL (complete trace content), 
309                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
310                          - TraceAttr.PATH (full path to the trace file),
311                          - TraceAttr.SIZE (size of trace file). 
312         :type attr: str
313
314         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
315         :type name: int
316
317         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
318         :type name: int
319
320         :rtype: str
321         """
322         pass
323
324     def register_condition(self, action, group, state, 
325             time = None):
326         """ Registers a condition on the resource manager to allow execution 
327         of 'action' only after 'time' has elapsed from the moment all resources 
328         in 'group' reached state 'state'
329
330         :param action: Action to restrict to condition (either 'START' or 'STOP')
331         :type action: str
332         :param group: Group of RMs to wait for (list of guids)
333         :type group: int or list of int
334         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
335         :type state: str
336         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
337         :type time: str
338
339         """
340         conditions = self.conditions.get(action)
341         if not conditions:
342             conditions = list()
343             self._conditions[action] = conditions
344
345         # For each condition to register a tuple of (group, state, time) is 
346         # added to the 'action' list
347         if not isinstance(group, list):
348             group = [group]
349
350         conditions.append((group, state, time))
351
352     def get_connected(self, rtype):
353         connected = []
354         for guid in self.connections:
355             rm = self.ec.get_resource(guid)
356             if rm.rtype() == rtype:
357                 connected.append(rm)
358         return connected
359
360     def _needs_reschedule(self, group, state, time):
361         """ Internal method that verify if 'time' has elapsed since 
362         all elements in 'group' have reached state 'state'.
363
364         :param group: Group of RMs to wait for (list of guids)
365         :type group: int or list of int
366         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
367         :type state: str
368         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
369         :type time: str
370
371         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
372         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
373         For the moment, 2m30s is not a correct syntax.
374
375         """
376         reschedule = False
377         delay = _reschedule_delay 
378
379         # check state and time elapsed on all RMs
380         for guid in group:
381             rm = self.ec.get_resource(guid)
382             # If the RM state is lower than the requested state we must
383             # reschedule (e.g. if RM is READY but we required STARTED)
384             if rm.state < state:
385                 reschedule = True
386                 break
387
388             # If there is a time restriction, we must verify the
389             # restriction is satisfied 
390             if time:
391                 if state == ResourceState.DISCOVERED:
392                     t = rm.discover_time
393                 if state == ResourceState.PROVISIONED:
394                     t = rm.provision_time
395                 elif state == ResourceState.READY:
396                     t = rm.ready_time
397                 elif state == ResourceState.STARTED:
398                     t = rm.start_time
399                 elif state == ResourceState.STOPPED:
400                     t = rm.stop_time
401                 else:
402                     # Only keep time information for START and STOP
403                     break
404
405                 d = strfdiff(strfnow(), t)
406                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
407                 if wait > 0.001:
408                     reschedule = True
409                     delay = "%fs" % wait
410                     break
411         return reschedule, delay
412
413     def set_with_conditions(self, name, value, group, state, time):
414         """ Set value 'value' on attribute with name 'name' when 'time' 
415             has elapsed since all elements in 'group' have reached state
416            'state'
417
418         :param name: Name of the attribute to set
419         :type name: str
420         :param name: Value of the attribute to set
421         :type name: str
422         :param group: Group of RMs to wait for (list of guids)
423         :type group: int or list of int
424         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
425         :type state: str
426         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
427         :type time: str
428
429         """
430
431         reschedule = False
432         delay = _reschedule_delay 
433
434         ## evaluate if set conditions are met
435
436         # only can set with conditions after the RM is started
437         if self.state != ResourceState.STARTED:
438             reschedule = True
439         else:
440             reschedule, delay = self._needs_reschedule(group, state, time)
441
442         if reschedule:
443             callback = functools.partial(self.set_with_conditions, 
444                     name, value, group, state, time)
445             self.ec.schedule(delay, callback)
446         else:
447             self.set(name, value)
448
449     def start_with_conditions(self):
450         """ Starts RM when all the conditions in self.conditions for
451         action 'START' are satisfied.
452
453         """
454         reschedule = False
455         delay = _reschedule_delay 
456
457         ## evaluate if set conditions are met
458
459         # only can start when RM is either STOPPED or READY
460         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
461             reschedule = True
462             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
463         else:
464             self.debug("---- START CONDITIONS ---- %s" % 
465                     self.conditions.get(ResourceAction.START))
466             
467             # Verify all start conditions are met
468             start_conditions = self.conditions.get(ResourceAction.START, [])
469             for (group, state, time) in start_conditions:
470                 reschedule, delay = self._needs_reschedule(group, state, time)
471                 if reschedule:
472                     break
473
474         if reschedule:
475             self.ec.schedule(delay, self.start_with_conditions)
476         else:
477             self.debug("----- STARTING ---- ")
478             self.start()
479
480     def stop_with_conditions(self):
481         """ Stops RM when all the conditions in self.conditions for
482         action 'STOP' are satisfied.
483
484         """
485         reschedule = False
486         delay = _reschedule_delay 
487
488         ## evaluate if set conditions are met
489
490         # only can stop when RM is STARTED
491         if self.state != ResourceState.STARTED:
492             reschedule = True
493         else:
494             self.debug(" ---- STOP CONDITIONS ---- %s" % 
495                     self.conditions.get(ResourceAction.STOP))
496
497             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
498             for (group, state, time) in stop_conditions:
499                 reschedule, delay = self._needs_reschedule(group, state, time)
500                 if reschedule:
501                     break
502
503         if reschedule:
504             callback = functools.partial(self.stop_with_conditions)
505             self.ec.schedule(delay, callback)
506         else:
507             self.logger.debug(" ----- STOPPING ---- ") 
508             self.stop()
509
510     def deploy(self):
511         """ Execute all steps required for the RM to reach the state READY
512
513         """
514         if self._state > ResourceState.READY:
515             self.error("Wrong state %s for deploy" % self.state)
516             return
517
518         self.debug("----- DEPLOYING ---- ")
519         self._ready_time = strfnow()
520         self._state = ResourceState.READY
521
522     def release(self):
523         """Clean the resource at the end of the Experiment and change the status
524
525         """
526         self._release_time = strfnow()
527         self._state = ResourceState.RELEASED
528
529     def valid_connection(self, guid):
530         """Check if the connection is available.
531
532         :param guid: Guid of the current Resource Manager
533         :type guid: int
534         :rtype:  Boolean
535
536         """
537         # TODO: Validate!
538         return True
539
540 class ResourceFactory(object):
541     _resource_types = dict()
542
543     @classmethod
544     def resource_types(cls):
545         return cls._resource_types
546
547     @classmethod
548     def register_type(cls, rclass):
549         cls._resource_types[rclass.rtype()] = rclass
550
551     @classmethod
552     def create(cls, rtype, ec, guid):
553         rclass = cls._resource_types[rtype]
554         return rclass(ec, guid)
555