Replace use of string formatted dates by datetime objects for Taks scheduling
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self._start_time = None
206         self._stop_time = None
207         self._discover_time = None
208         self._provision_time = None
209         self._ready_time = None
210         self._release_time = None
211         self._finish_time = None
212         self._failed_time = None
213
214     @property
215     def guid(self):
216         """ Returns the global unique identifier of the RM """
217         return self._guid
218
219     @property
220     def ec(self):
221         """ Returns the Experiment Controller """
222         return self._ec()
223
224     @property
225     def connections(self):
226         """ Returns the set of guids of connected RMs"""
227         return self._connections
228
229     @property
230     def conditions(self):
231         """ Returns the conditions to which the RM is subjected to.
232         
233         The object returned by this method is a dictionary indexed by
234         ResourceAction."""
235         return self._conditions
236
237     @property
238     def start_time(self):
239         """ Returns the start time of the RM as a timestamp"""
240         return self._start_time
241
242     @property
243     def stop_time(self):
244         """ Returns the stop time of the RM as a timestamp"""
245         return self._stop_time
246
247     @property
248     def discover_time(self):
249         """ Returns the time discovering was finished for the RM as a timestamp"""
250         return self._discover_time
251
252     @property
253     def provision_time(self):
254         """ Returns the time provisioning was finished for the RM as a timestamp"""
255         return self._provision_time
256
257     @property
258     def ready_time(self):
259         """ Returns the time deployment was finished for the RM as a timestamp"""
260         return self._ready_time
261
262     @property
263     def release_time(self):
264         """ Returns the release time of the RM as a timestamp"""
265         return self._release_time
266
267     @property
268     def finish_time(self):
269         """ Returns the finalization time of the RM as a timestamp"""
270         return self._finish_time
271
272     @property
273     def failed_time(self):
274         """ Returns the time failure occured for the RM as a timestamp"""
275         return self._failed_time
276
277     @property
278     def state(self):
279         """ Get the state of the current RM """
280         return self._state
281
282     def log_message(self, msg):
283         """ Returns the log message formatted with added information.
284
285         :param msg: text message
286         :type msg: str
287         :rtype: str
288         """
289         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
290
291     def connect(self, guid):
292         """ Establishes a connection to the RM identified by guid
293
294         :param guid: Global unique identified of the RM to connect to
295         :type guid: int
296         """
297         if self.valid_connection(guid):
298             self._connections.add(guid)
299
300     def disconnect(self, guid):
301         """ Removes connection to the RM identified by guid
302
303         :param guid: Global unique identified of the RM to connect to
304         :type guid: int
305         """
306         if guid in self._connections:
307             self._connections.remove(guid)
308
309     def discover(self):
310         """ Performs resource discovery.
311
312         This  method is resposible for selecting an individual resource
313         matching user requirements.
314         This method should be redefined when necessary in child classes.
315         """ 
316         self._discover_time = tnow()
317         self._state = ResourceState.DISCOVERED
318
319     def provision(self):
320         """ Performs resource provisioning.
321
322         This  method is resposible for provisioning one resource.
323         After this method has been successfully invoked, the resource
324         should be acccesible/controllable by the RM.
325         This method should be redefined when necessary in child classes.
326         """ 
327         self._provision_time = tnow()
328         self._state = ResourceState.PROVISIONED
329
330     def start(self):
331         """ Starts the resource.
332         
333         There is no generic start behavior for all resources.
334         This method should be redefined when necessary in child classes.
335         """
336         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
337             self.error("Wrong state %s for start" % self.state)
338             return
339
340         self._start_time = tnow()
341         self._state = ResourceState.STARTED
342
343     def stop(self):
344         """ Stops the resource.
345         
346         There is no generic stop behavior for all resources.
347         This method should be redefined when necessary in child classes.
348         """
349         if not self._state in [ResourceState.STARTED]:
350             self.error("Wrong state %s for stop" % self.state)
351             return
352
353         self._stop_time = tnow()
354         self._state = ResourceState.STOPPED
355
356     def set(self, name, value):
357         """ Set the value of the attribute
358
359         :param name: Name of the attribute
360         :type name: str
361         :param name: Value of the attribute
362         :type name: str
363         """
364         attr = self._attrs[name]
365         attr.value = value
366
367     def get(self, name):
368         """ Returns the value of the attribute
369
370         :param name: Name of the attribute
371         :type name: str
372         :rtype: str
373         """
374         attr = self._attrs[name]
375         return attr.value
376
377     def register_trace(self, name):
378         """ Explicitly enable trace generation
379
380         :param name: Name of the trace
381         :type name: str
382         """
383         trace = self._trcs[name]
384         trace.enabled = True
385
386     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
387         """ Get information on collected trace
388
389         :param name: Name of the trace
390         :type name: str
391
392         :param attr: Can be one of:
393                          - TraceAttr.ALL (complete trace content), 
394                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
395                          - TraceAttr.PATH (full path to the trace file),
396                          - TraceAttr.SIZE (size of trace file). 
397         :type attr: str
398
399         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
400         :type name: int
401
402         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
403         :type name: int
404
405         :rtype: str
406         """
407         pass
408
409     def register_condition(self, action, group, state, 
410             time = None):
411         """ Registers a condition on the resource manager to allow execution 
412         of 'action' only after 'time' has elapsed from the moment all resources 
413         in 'group' reached state 'state'
414
415         :param action: Action to restrict to condition (either 'START' or 'STOP')
416         :type action: str
417         :param group: Group of RMs to wait for (list of guids)
418         :type group: int or list of int
419         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
420         :type state: str
421         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
422         :type time: str
423
424         """
425         conditions = self.conditions.get(action)
426         if not conditions:
427             conditions = list()
428             self._conditions[action] = conditions
429
430         # For each condition to register a tuple of (group, state, time) is 
431         # added to the 'action' list
432         if not isinstance(group, list):
433             group = [group]
434
435         conditions.append((group, state, time))
436
437     def get_connected(self, rtype = None):
438         """ Returns the list of RM with the type 'rtype'
439
440         :param rtype: Type of the RM we look for
441         :type rtype: str
442         :return: list of guid
443         """
444         connected = []
445         for guid in self.connections:
446             rm = self.ec.get_resource(guid)
447             if not rtype or rm.rtype() == rtype:
448                 connected.append(rm)
449         return connected
450
451     def _needs_reschedule(self, group, state, time):
452         """ Internal method that verify if 'time' has elapsed since 
453         all elements in 'group' have reached state 'state'.
454
455         :param group: Group of RMs to wait for (list of guids)
456         :type group: int or list of int
457         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
458         :type state: str
459         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
460         :type time: str
461
462         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
463         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
464         For the moment, 2m30s is not a correct syntax.
465
466         """
467         reschedule = False
468         delay = reschedule_delay 
469
470         # check state and time elapsed on all RMs
471         for guid in group:
472             rm = self.ec.get_resource(guid)
473             # If the RM state is lower than the requested state we must
474             # reschedule (e.g. if RM is READY but we required STARTED)
475             if rm.state < state:
476                 reschedule = True
477                 break
478
479             # If there is a time restriction, we must verify the
480             # restriction is satisfied 
481             if time:
482                 if state == ResourceState.DISCOVERED:
483                     t = rm.discover_time
484                 if state == ResourceState.PROVISIONED:
485                     t = rm.provision_time
486                 elif state == ResourceState.READY:
487                     t = rm.ready_time
488                 elif state == ResourceState.STARTED:
489                     t = rm.start_time
490                 elif state == ResourceState.STOPPED:
491                     t = rm.stop_time
492                 else:
493                     # Only keep time information for START and STOP
494                     break
495
496                 # time already elapsed since RM changed state
497                 waited = "%fs" % tdiffsec(tnow(), t)
498
499                 # time still to wait
500                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
501
502                 if wait > 0:
503                     reschedule = True
504                     delay = "%fs" % wait
505                     break
506
507         return reschedule, delay
508
509     def set_with_conditions(self, name, value, group, state, time):
510         """ Set value 'value' on attribute with name 'name' when 'time' 
511         has elapsed since all elements in 'group' have reached state
512         'state'
513
514         :param name: Name of the attribute to set
515         :type name: str
516         :param name: Value of the attribute to set
517         :type name: str
518         :param group: Group of RMs to wait for (list of guids)
519         :type group: int or list of int
520         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
521         :type state: str
522         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
523         :type time: str
524         """
525
526         reschedule = False
527         delay = reschedule_delay 
528
529         ## evaluate if set conditions are met
530
531         # only can set with conditions after the RM is started
532         if self.state != ResourceState.STARTED:
533             reschedule = True
534         else:
535             reschedule, delay = self._needs_reschedule(group, state, time)
536
537         if reschedule:
538             callback = functools.partial(self.set_with_conditions, 
539                     name, value, group, state, time)
540             self.ec.schedule(delay, callback)
541         else:
542             self.set(name, value)
543
544     def start_with_conditions(self):
545         """ Starts RM when all the conditions in self.conditions for
546         action 'START' are satisfied.
547
548         """
549         reschedule = False
550         delay = reschedule_delay 
551
552         ## evaluate if set conditions are met
553
554         # only can start when RM is either STOPPED or READY
555         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
556             reschedule = True
557             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
558         else:
559             start_conditions = self.conditions.get(ResourceAction.START, [])
560             
561             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
562             
563             # Verify all start conditions are met
564             for (group, state, time) in start_conditions:
565                 # Uncomment for debug
566                 #unmet = []
567                 #for guid in group:
568                 #    rm = self.ec.get_resource(guid)
569                 #    unmet.append((guid, rm._state))
570                 #
571                 #self.debug("---- WAITED STATES ---- %s" % unmet )
572
573                 reschedule, delay = self._needs_reschedule(group, state, time)
574                 if reschedule:
575                     break
576
577         if reschedule:
578             self.ec.schedule(delay, self.start_with_conditions)
579         else:
580             self.debug("----- STARTING ---- ")
581             self.start()
582
583     def stop_with_conditions(self):
584         """ Stops RM when all the conditions in self.conditions for
585         action 'STOP' are satisfied.
586
587         """
588         reschedule = False
589         delay = reschedule_delay 
590
591         ## evaluate if set conditions are met
592
593         # only can stop when RM is STARTED
594         if self.state != ResourceState.STARTED:
595             reschedule = True
596         else:
597             self.debug(" ---- STOP CONDITIONS ---- %s" % 
598                     self.conditions.get(ResourceAction.STOP))
599
600             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
601             for (group, state, time) in stop_conditions:
602                 reschedule, delay = self._needs_reschedule(group, state, time)
603                 if reschedule:
604                     break
605
606         if reschedule:
607             callback = functools.partial(self.stop_with_conditions)
608             self.ec.schedule(delay, callback)
609         else:
610             self.debug(" ----- STOPPING ---- ") 
611             self.stop()
612
613     def deploy(self):
614         """ Execute all steps required for the RM to reach the state READY
615
616         """
617         if self._state > ResourceState.READY:
618             self.error("Wrong state %s for deploy" % self.state)
619             return
620
621         self.debug("----- READY ---- ")
622         self._ready_time = tnow()
623         self._state = ResourceState.READY
624
625     def release(self):
626         """Release any resources used by this RM
627
628         """
629         self._release_time = tnow()
630         self._state = ResourceState.RELEASED
631
632     def finish(self):
633         """ Mark ResourceManager as FINISHED
634
635         """
636         self._finish_time = tnow()
637         self._state = ResourceState.FINISHED
638
639     def fail(self):
640         """ Mark ResourceManager as FAILED
641
642         """
643         self._failed_time = tnow()
644         self._state = ResourceState.FAILED
645
646     def valid_connection(self, guid):
647         """Checks whether a connection with the other RM
648         is valid.
649         This method need to be redefined by each new Resource Manager.
650
651         :param guid: Guid of the current Resource Manager
652         :type guid: int
653         :rtype:  Boolean
654
655         """
656         # TODO: Validate!
657         return True
658
659 class ResourceFactory(object):
660     _resource_types = dict()
661
662     @classmethod
663     def resource_types(cls):
664         """Return the type of the Class"""
665         return cls._resource_types
666
667     @classmethod
668     def register_type(cls, rclass):
669         """Register a new Ressource Manager"""
670         cls._resource_types[rclass.rtype()] = rclass
671
672     @classmethod
673     def create(cls, rtype, ec, guid):
674         """Create a new instance of a Ressource Manager"""
675         rclass = cls._resource_types[rtype]
676         return rclass(ec, guid)
677
678 def populate_factory():
679     """Register all the possible RM that exists in the current version of Nepi.
680     """
681     for rclass in find_types():
682         ResourceFactory.register_type(rclass)
683
684 def find_types():
685     """Look into the different folders to find all the 
686     availables Resources Managers
687
688     """
689     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
690     search_path = set(search_path.split(" "))
691    
692     import inspect
693     import nepi.resources 
694     path = os.path.dirname(nepi.resources.__file__)
695     search_path.add(path)
696
697     types = []
698
699     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
700         loader = importer.find_module(modname)
701         try:
702             module = loader.load_module(loader.fullname)
703             for attrname in dir(module):
704                 if attrname.startswith("_"):
705                     continue
706
707                 attr = getattr(module, attrname)
708
709                 if attr == ResourceManager:
710                     continue
711
712                 if not inspect.isclass(attr):
713                     continue
714
715                 if issubclass(attr, ResourceManager):
716                     types.append(attr)
717         except:
718             import traceback
719             import logging
720             err = traceback.format_exc()
721             logger = logging.getLogger("Resource.find_types()")
722             logger.error("Error while lading Resource Managers %s" % err)
723
724     return types
725
726