Minor typos
[nepi.git] / src / nepi / execution / resource.py
1 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
2 from neco.execution.trace import TraceAttr
3
4 import copy
5 import functools
6 import inspect
7 import logging
8 import os
9 import pkgutil
10 import weakref
11
12 reschedule_delay = "0.5s"
13
14 class ResourceAction:
15     DEPLOY = 0
16     START = 1
17     STOP = 2
18
19 class ResourceState:
20     NEW = 0
21     DISCOVERED = 1
22     PROVISIONED = 2
23     READY = 3
24     STARTED = 4
25     STOPPED = 5
26     FINISHED = 6
27     FAILED = 7
28     RELEASED = 8
29
30 def clsinit(cls):
31     cls._clsinit()
32     return cls
33
34 # Decorator to invoke class initialization method
35 @clsinit
36 class ResourceManager(object):
37     _rtype = "Resource"
38     _filters = None
39     _attributes = None
40     _traces = None
41
42     @classmethod
43     def _register_filter(cls, attr):
44         """ Resource subclasses will invoke this method to add a 
45         filter attribute
46
47         """
48         cls._filters[attr.name] = attr
49
50     @classmethod
51     def _register_attribute(cls, attr):
52         """ Resource subclasses will invoke this method to add a 
53         resource attribute
54
55         """
56         cls._attributes[attr.name] = attr
57
58     @classmethod
59     def _register_trace(cls, trace):
60         """ Resource subclasses will invoke this method to add a 
61         resource trace
62
63         """
64         cls._traces[trace.name] = trace
65
66
67     @classmethod
68     def _register_filters(cls):
69         """ Resource subclasses will invoke this method to register 
70         resource filters
71
72         """
73         pass
74
75     @classmethod
76     def _register_attributes(cls):
77         """ Resource subclasses will invoke this method to register
78         resource attributes
79
80         """
81         pass
82
83     @classmethod
84     def _register_traces(cls):
85         """ Resource subclasses will invoke this method to register
86         resource traces
87
88         """
89         pass
90
91     @classmethod
92     def _clsinit(cls):
93         """ Create a new dictionnary instance of the dictionnary 
94         with the same template.
95  
96         Each ressource should have the same registration dictionary
97         template with different instances.
98         """
99         # static template for resource filters
100         cls._filters = dict()
101         cls._register_filters()
102
103         # static template for resource attributes
104         cls._attributes = dict()
105         cls._register_attributes()
106
107         # static template for resource traces
108         cls._traces = dict()
109         cls._register_traces()
110
111     @classmethod
112     def rtype(cls):
113         return cls._rtype
114
115     @classmethod
116     def get_filters(cls):
117         """ Returns a copy of the filters
118
119         """
120         return copy.deepcopy(cls._filters.values())
121
122     @classmethod
123     def get_attributes(cls):
124         """ Returns a copy of the attributes
125
126         """
127         return copy.deepcopy(cls._attributes.values())
128
129     @classmethod
130     def get_traces(cls):
131         """ Returns a copy of the traces
132
133         """
134         return copy.deepcopy(cls._traces.values())
135
136     def __init__(self, ec, guid):
137         self._guid = guid
138         self._ec = weakref.ref(ec)
139         self._connections = set()
140         self._conditions = dict() 
141
142         # the resource instance gets a copy of all attributes
143         self._attrs = copy.deepcopy(self._attributes)
144
145         # the resource instance gets a copy of all traces
146         self._trcs = copy.deepcopy(self._traces)
147
148         self._state = ResourceState.NEW
149
150         self._start_time = None
151         self._stop_time = None
152         self._discover_time = None
153         self._provision_time = None
154         self._ready_time = None
155         self._release_time = None
156
157         # Logging
158         self._logger = logging.getLogger("Resource")
159
160     def debug(self, msg, out = None, err = None):
161         self.log(msg, logging.DEBUG, out, err)
162
163     def error(self, msg, out = None, err = None):
164         self.log(msg, logging.ERROR, out, err)
165
166     def warn(self, msg, out = None, err = None):
167         self.log(msg, logging.WARNING, out, err)
168
169     def info(self, msg, out = None, err = None):
170         self.log(msg, logging.INFO, out, err)
171
172     def log(self, msg, level, out = None, err = None):
173         if out:
174             msg += " - OUT: %s " % out
175
176         if err:
177             msg += " - ERROR: %s " % err
178
179         msg = self.log_message(msg)
180
181         self.logger.log(level, msg)
182
183     def log_message(self, msg):
184         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
185
186     @property
187     def logger(self):
188         return self._logger
189
190     @property
191     def guid(self):
192         return self._guid
193
194     @property
195     def ec(self):
196         return self._ec()
197
198     @property
199     def connections(self):
200         return self._connections
201
202     @property
203     def conditions(self):
204         return self._conditions
205
206     @property
207     def start_time(self):
208         """ Returns timestamp with the time the RM started """
209         return self._start_time
210
211     @property
212     def stop_time(self):
213         """ Returns timestamp with the time the RM stopped """
214         return self._stop_time
215
216     @property
217     def discover_time(self):
218         """ Returns timestamp with the time the RM passed to state discovered """
219         return self._discover_time
220
221     @property
222     def provision_time(self):
223         """ Returns timestamp with the time the RM passed to state provisioned """
224         return self._provision_time
225
226     @property
227     def ready_time(self):
228         """ Returns timestamp with the time the RM passed to state ready  """
229         return self._ready_time
230
231     @property
232     def release_time(self):
233         """ Returns timestamp with the time the RM was released """
234         return self._release_time
235
236     @property
237     def state(self):
238         return self._state
239
240     def connect(self, guid):
241         if self.valid_connection(guid):
242             self._connections.add(guid)
243
244     def discover(self, filters = None):
245         self._discover_time = strfnow()
246         self._state = ResourceState.DISCOVERED
247
248     def provision(self, filters = None):
249         self._provision_time = strfnow()
250         self._state = ResourceState.PROVISIONED
251
252     def start(self):
253         """ Start the Resource Manager
254
255         """
256         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
257             self.error("Wrong state %s for start" % self.state)
258             return
259
260         self._start_time = strfnow()
261         self._state = ResourceState.STARTED
262
263     def stop(self):
264         """ Start the Resource Manager
265
266         """
267         if not self._state in [ResourceState.STARTED]:
268             self.error("Wrong state %s for stop" % self.state)
269             return
270
271         self._stop_time = strfnow()
272         self._state = ResourceState.STOPPED
273
274     def set(self, name, value):
275         """ Set the value of the attribute
276
277         :param name: Name of the attribute
278         :type name: str
279         :param name: Value of the attribute
280         :type name: str
281         """
282         attr = self._attrs[name]
283         attr.value = value
284
285     def get(self, name):
286         """ Start the Resource Manager
287
288         :param name: Name of the attribute
289         :type name: str
290         :rtype: str
291         """
292         attr = self._attrs[name]
293         return attr.value
294
295     def register_trace(self, name):
296         """ Enable trace
297
298         :param name: Name of the trace
299         :type name: str
300         """
301         trace = self._trcs[name]
302         trace.enabled = True
303
304     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
305         """ Get information on collected trace
306
307         :param name: Name of the trace
308         :type name: str
309
310         :param attr: Can be one of:
311                          - TraceAttr.ALL (complete trace content), 
312                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
313                          - TraceAttr.PATH (full path to the trace file),
314                          - TraceAttr.SIZE (size of trace file). 
315         :type attr: str
316
317         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
318         :type name: int
319
320         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
321         :type name: int
322
323         :rtype: str
324         """
325         pass
326
327     def register_condition(self, action, group, state, 
328             time = None):
329         """ Registers a condition on the resource manager to allow execution 
330         of 'action' only after 'time' has elapsed from the moment all resources 
331         in 'group' reached state 'state'
332
333         :param action: Action to restrict to condition (either 'START' or 'STOP')
334         :type action: str
335         :param group: Group of RMs to wait for (list of guids)
336         :type group: int or list of int
337         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
338         :type state: str
339         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
340         :type time: str
341
342         """
343         conditions = self.conditions.get(action)
344         if not conditions:
345             conditions = list()
346             self._conditions[action] = conditions
347
348         # For each condition to register a tuple of (group, state, time) is 
349         # added to the 'action' list
350         if not isinstance(group, list):
351             group = [group]
352
353         conditions.append((group, state, time))
354
355     def get_connected(self, rtype):
356         connected = []
357         for guid in self.connections:
358             rm = self.ec.get_resource(guid)
359             if rm.rtype() == rtype:
360                 connected.append(rm)
361         return connected
362
363     def _needs_reschedule(self, group, state, time):
364         """ Internal method that verify if 'time' has elapsed since 
365         all elements in 'group' have reached state 'state'.
366
367         :param group: Group of RMs to wait for (list of guids)
368         :type group: int or list of int
369         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
370         :type state: str
371         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
372         :type time: str
373
374         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
375         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
376         For the moment, 2m30s is not a correct syntax.
377
378         """
379         reschedule = False
380         delay = reschedule_delay 
381
382         # check state and time elapsed on all RMs
383         for guid in group:
384             rm = self.ec.get_resource(guid)
385             # If the RM state is lower than the requested state we must
386             # reschedule (e.g. if RM is READY but we required STARTED)
387             if rm.state < state:
388                 reschedule = True
389                 break
390
391             # If there is a time restriction, we must verify the
392             # restriction is satisfied 
393             if time:
394                 if state == ResourceState.DISCOVERED:
395                     t = rm.discover_time
396                 if state == ResourceState.PROVISIONED:
397                     t = rm.provision_time
398                 elif state == ResourceState.READY:
399                     t = rm.ready_time
400                 elif state == ResourceState.STARTED:
401                     t = rm.start_time
402                 elif state == ResourceState.STOPPED:
403                     t = rm.stop_time
404                 else:
405                     # Only keep time information for START and STOP
406                     break
407
408                 d = strfdiff(strfnow(), t)
409                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
410                 if wait > 0.001:
411                     reschedule = True
412                     delay = "%fs" % wait
413                     break
414         return reschedule, delay
415
416     def set_with_conditions(self, name, value, group, state, time):
417         """ Set value 'value' on attribute with name 'name' when 'time' 
418             has elapsed since all elements in 'group' have reached state
419            'state'
420
421         :param name: Name of the attribute to set
422         :type name: str
423         :param name: Value of the attribute to set
424         :type name: str
425         :param group: Group of RMs to wait for (list of guids)
426         :type group: int or list of int
427         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
428         :type state: str
429         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
430         :type time: str
431
432         """
433
434         reschedule = False
435         delay = reschedule_delay 
436
437         ## evaluate if set conditions are met
438
439         # only can set with conditions after the RM is started
440         if self.state != ResourceState.STARTED:
441             reschedule = True
442         else:
443             reschedule, delay = self._needs_reschedule(group, state, time)
444
445         if reschedule:
446             callback = functools.partial(self.set_with_conditions, 
447                     name, value, group, state, time)
448             self.ec.schedule(delay, callback)
449         else:
450             self.set(name, value)
451
452     def start_with_conditions(self):
453         """ Starts RM when all the conditions in self.conditions for
454         action 'START' are satisfied.
455
456         """
457         reschedule = False
458         delay = reschedule_delay 
459
460         ## evaluate if set conditions are met
461
462         # only can start when RM is either STOPPED or READY
463         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
464             reschedule = True
465             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
466         else:
467             start_conditions = self.conditions.get(ResourceAction.START, [])
468             
469             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
470             
471             # Verify all start conditions are met
472             for (group, state, time) in start_conditions:
473                 # Uncomment for debug
474                 #unmet = []
475                 #for guid in group:
476                 #    rm = self.ec.get_resource(guid)
477                 #    unmet.append((guid, rm._state))
478                 #
479                 #self.debug("---- WAITED STATES ---- %s" % unmet )
480
481                 reschedule, delay = self._needs_reschedule(group, state, time)
482                 if reschedule:
483                     break
484
485         if reschedule:
486             self.ec.schedule(delay, self.start_with_conditions)
487         else:
488             self.debug("----- STARTING ---- ")
489             self.start()
490
491     def stop_with_conditions(self):
492         """ Stops RM when all the conditions in self.conditions for
493         action 'STOP' are satisfied.
494
495         """
496         reschedule = False
497         delay = reschedule_delay 
498
499         ## evaluate if set conditions are met
500
501         # only can stop when RM is STARTED
502         if self.state != ResourceState.STARTED:
503             reschedule = True
504         else:
505             self.debug(" ---- STOP CONDITIONS ---- %s" % 
506                     self.conditions.get(ResourceAction.STOP))
507
508             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
509             for (group, state, time) in stop_conditions:
510                 reschedule, delay = self._needs_reschedule(group, state, time)
511                 if reschedule:
512                     break
513
514         if reschedule:
515             callback = functools.partial(self.stop_with_conditions)
516             self.ec.schedule(delay, callback)
517         else:
518             self.logger.debug(" ----- STOPPING ---- ") 
519             self.stop()
520
521     def deploy(self):
522         """ Execute all steps required for the RM to reach the state READY
523
524         """
525         if self._state > ResourceState.READY:
526             self.error("Wrong state %s for deploy" % self.state)
527             return
528
529         self.debug("----- DEPLOYING ---- ")
530         self._ready_time = strfnow()
531         self._state = ResourceState.READY
532
533     def release(self):
534         """Clean the resource at the end of the Experiment and change the status
535
536         """
537         self._release_time = strfnow()
538         self._state = ResourceState.RELEASED
539
540     def valid_connection(self, guid):
541         """Check if the connection is available.
542
543         :param guid: Guid of the current Resource Manager
544         :type guid: int
545         :rtype:  Boolean
546
547         """
548         # TODO: Validate!
549         return True
550
551 class ResourceFactory(object):
552     _resource_types = dict()
553
554     @classmethod
555     def resource_types(cls):
556         return cls._resource_types
557
558     @classmethod
559     def register_type(cls, rclass):
560         cls._resource_types[rclass.rtype()] = rclass
561
562     @classmethod
563     def create(cls, rtype, ec, guid):
564         rclass = cls._resource_types[rtype]
565         return rclass(ec, guid)
566
567 def populate_factory():
568     for rclass in find_types():
569         ResourceFactory.register_type(rclass)
570
571 def find_types():
572     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
573     search_path = set(search_path.split(" "))
574    
575     import neco.resources 
576     path = os.path.dirname(neco.resources.__file__)
577     search_path.add(path)
578
579     types = []
580
581     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
582         loader = importer.find_module(modname)
583         try:
584             module = loader.load_module(loader.fullname)
585             for attrname in dir(module):
586                 if attrname.startswith("_"):
587                     continue
588
589                 attr = getattr(module, attrname)
590
591                 if attr == ResourceManager:
592                     continue
593
594                 if not inspect.isclass(attr):
595                     continue
596
597                 if issubclass(attr, ResourceManager):
598                     types.append(attr)
599         except:
600             import traceback
601             err = traceback.format_exc()
602             logger = logging.getLogger("Resource.find_types()")
603             logger.error("Error while lading Resource Managers %s" % err)
604
605     return types
606
607