978f0eeb39e3d0a90a68b91bcd819c8a4d1e8c83
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "1s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self.deployment_group = None
206
207         self._start_time = None
208         self._stop_time = None
209         self._discover_time = None
210         self._provision_time = None
211         self._ready_time = None
212         self._release_time = None
213         self._finish_time = None
214         self._failed_time = None
215
216     @property
217     def guid(self):
218         """ Returns the global unique identifier of the RM """
219         return self._guid
220
221     @property
222     def ec(self):
223         """ Returns the Experiment Controller """
224         return self._ec()
225
226     @property
227     def connections(self):
228         """ Returns the set of guids of connected RMs"""
229         return self._connections
230
231     @property
232     def conditions(self):
233         """ Returns the conditions to which the RM is subjected to.
234         
235         The object returned by this method is a dictionary indexed by
236         ResourceAction."""
237         return self._conditions
238
239     @property
240     def start_time(self):
241         """ Returns the start time of the RM as a timestamp"""
242         return self._start_time
243
244     @property
245     def stop_time(self):
246         """ Returns the stop time of the RM as a timestamp"""
247         return self._stop_time
248
249     @property
250     def discover_time(self):
251         """ Returns the time discovering was finished for the RM as a timestamp"""
252         return self._discover_time
253
254     @property
255     def provision_time(self):
256         """ Returns the time provisioning was finished for the RM as a timestamp"""
257         return self._provision_time
258
259     @property
260     def ready_time(self):
261         """ Returns the time deployment was finished for the RM as a timestamp"""
262         return self._ready_time
263
264     @property
265     def release_time(self):
266         """ Returns the release time of the RM as a timestamp"""
267         return self._release_time
268
269     @property
270     def finish_time(self):
271         """ Returns the finalization time of the RM as a timestamp"""
272         return self._finish_time
273
274     @property
275     def failed_time(self):
276         """ Returns the time failure occured for the RM as a timestamp"""
277         return self._failed_time
278
279     @property
280     def state(self):
281         """ Get the state of the current RM """
282         return self._state
283
284     def log_message(self, msg):
285         """ Returns the log message formatted with added information.
286
287         :param msg: text message
288         :type msg: str
289         :rtype: str
290         """
291         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
292
293     def register_connection(self, guid):
294         """ Registers a connection to the RM identified by guid
295
296         :param guid: Global unique identified of the RM to connect to
297         :type guid: int
298         """
299         if self.valid_connection(guid):
300             self.connect(guid)
301             self._connections.add(guid)
302
303     def unregister_connection(self, guid):
304         """ Removes a registered connection to the RM identified by guid
305
306         :param guid: Global unique identified of the RM to connect to
307         :type guid: int
308         """
309         if guid in self._connections:
310             self.disconnect(guid)
311             self._connections.remove(guid)
312
313     def discover(self):
314         """ Performs resource discovery.
315
316         This  method is resposible for selecting an individual resource
317         matching user requirements.
318         This method should be redefined when necessary in child classes.
319         """ 
320         self._discover_time = tnow()
321         self._state = ResourceState.DISCOVERED
322
323     def provision(self):
324         """ Performs resource provisioning.
325
326         This  method is resposible for provisioning one resource.
327         After this method has been successfully invoked, the resource
328         should be acccesible/controllable by the RM.
329         This method should be redefined when necessary in child classes.
330         """ 
331         self._provision_time = tnow()
332         self._state = ResourceState.PROVISIONED
333
334     def start(self):
335         """ Starts the resource.
336         
337         There is no generic start behavior for all resources.
338         This method should be redefined when necessary in child classes.
339         """
340         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
341             self.error("Wrong state %s for start" % self.state)
342             return
343
344         self._start_time = tnow()
345         self._state = ResourceState.STARTED
346
347     def stop(self):
348         """ Stops the resource.
349         
350         There is no generic stop behavior for all resources.
351         This method should be redefined when necessary in child classes.
352         """
353         if not self._state in [ResourceState.STARTED]:
354             self.error("Wrong state %s for stop" % self.state)
355             return
356
357         self._stop_time = tnow()
358         self._state = ResourceState.STOPPED
359
360     def set(self, name, value):
361         """ Set the value of the attribute
362
363         :param name: Name of the attribute
364         :type name: str
365         :param name: Value of the attribute
366         :type name: str
367         """
368         attr = self._attrs[name]
369         attr.value = value
370
371     def get(self, name):
372         """ Returns the value of the attribute
373
374         :param name: Name of the attribute
375         :type name: str
376         :rtype: str
377         """
378         attr = self._attrs[name]
379         return attr.value
380
381     def enable_trace(self, name):
382         """ Explicitly enable trace generation
383
384         :param name: Name of the trace
385         :type name: str
386         """
387         trace = self._trcs[name]
388         trace.enabled = True
389     
390     def trace_enabled(self, name):
391         """Returns True if trace is enables 
392
393         :param name: Name of the trace
394         :type name: str
395         """
396         trace = self._trcs[name]
397         return trace.enabled
398  
399     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
400         """ Get information on collected trace
401
402         :param name: Name of the trace
403         :type name: str
404
405         :param attr: Can be one of:
406                          - TraceAttr.ALL (complete trace content), 
407                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
408                          - TraceAttr.PATH (full path to the trace file),
409                          - TraceAttr.SIZE (size of trace file). 
410         :type attr: str
411
412         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
413         :type name: int
414
415         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
416         :type name: int
417
418         :rtype: str
419         """
420         pass
421
422     def register_condition(self, action, group, state, time = None):
423         """ Registers a condition on the resource manager to allow execution 
424         of 'action' only after 'time' has elapsed from the moment all resources 
425         in 'group' reached state 'state'
426
427         :param action: Action to restrict to condition (either 'START' or 'STOP')
428         :type action: str
429         :param group: Group of RMs to wait for (list of guids)
430         :type group: int or list of int
431         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
432         :type state: str
433         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
434         :type time: str
435
436         """
437
438         if not action in self.conditions:
439             self._conditions[action] = list()
440         
441         conditions = self.conditions.get(action)
442
443         # For each condition to register a tuple of (group, state, time) is 
444         # added to the 'action' list
445         if not isinstance(group, list):
446             group = [group]
447
448         conditions.append((group, state, time))
449
450     def unregister_condition(self, group, action = None):
451         """ Removed conditions for a certain group of guids
452
453         :param action: Action to restrict to condition (either 'START' or 'STOP')
454         :type action: str
455
456         :param group: Group of RMs to wait for (list of guids)
457         :type group: int or list of int
458
459         """
460         # For each condition a tuple of (group, state, time) is 
461         # added to the 'action' list
462         if not isinstance(group, list):
463             group = [group]
464
465         for act, conditions in self.conditions.iteritems():
466             if action and act != action:
467                 continue
468
469             for condition in list(conditions):
470                 (grp, state, time) = condition
471
472                 # If there is an intersection between grp and group,
473                 # then remove intersected elements
474                 intsec = set(group).intersection(set(grp))
475                 if intsec:
476                     idx = conditions.index(condition)
477                     newgrp = set(grp)
478                     newgrp.difference_update(intsec)
479                     conditions[idx] = (newgrp, state, time)
480                  
481     def get_connected(self, rtype = None):
482         """ Returns the list of RM with the type 'rtype'
483
484         :param rtype: Type of the RM we look for
485         :type rtype: str
486         :return: list of guid
487         """
488         connected = []
489         rclass = ResourceFactory.get_resource_type(rtype)
490         for guid in self.connections:
491             rm = self.ec.get_resource(guid)
492             if not rtype or isinstance(rm, rclass):
493                 connected.append(rm)
494         return connected
495
496     def _needs_reschedule(self, group, state, time):
497         """ Internal method that verify if 'time' has elapsed since 
498         all elements in 'group' have reached state 'state'.
499
500         :param group: Group of RMs to wait for (list of guids)
501         :type group: int or list of int
502         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
503         :type state: str
504         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
505         :type time: str
506
507         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
508         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
509         For the moment, 2m30s is not a correct syntax.
510
511         """
512         reschedule = False
513         delay = reschedule_delay 
514
515         # check state and time elapsed on all RMs
516         for guid in group:
517             rm = self.ec.get_resource(guid)
518             # If the RM state is lower than the requested state we must
519             # reschedule (e.g. if RM is READY but we required STARTED).
520             if rm.state < state:
521                 reschedule = True
522                 break
523
524             # If there is a time restriction, we must verify the
525             # restriction is satisfied 
526             if time:
527                 if state == ResourceState.DISCOVERED:
528                     t = rm.discover_time
529                 if state == ResourceState.PROVISIONED:
530                     t = rm.provision_time
531                 elif state == ResourceState.READY:
532                     t = rm.ready_time
533                 elif state == ResourceState.STARTED:
534                     t = rm.start_time
535                 elif state == ResourceState.STOPPED:
536                     t = rm.stop_time
537                 else:
538                     # Only keep time information for START and STOP
539                     break
540
541                 # time already elapsed since RM changed state
542                 waited = "%fs" % tdiffsec(tnow(), t)
543
544                 # time still to wait
545                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
546
547                 if wait > 0.001:
548                     reschedule = True
549                     delay = "%fs" % wait
550                     break
551
552         return reschedule, delay
553
554     def set_with_conditions(self, name, value, group, state, time):
555         """ Set value 'value' on attribute with name 'name' when 'time' 
556         has elapsed since all elements in 'group' have reached state
557         'state'
558
559         :param name: Name of the attribute to set
560         :type name: str
561         :param name: Value of the attribute to set
562         :type name: str
563         :param group: Group of RMs to wait for (list of guids)
564         :type group: int or list of int
565         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
566         :type state: str
567         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
568         :type time: str
569         """
570
571         reschedule = False
572         delay = reschedule_delay 
573
574         ## evaluate if set conditions are met
575
576         # only can set with conditions after the RM is started
577         if self.state != ResourceState.STARTED:
578             reschedule = True
579         else:
580             reschedule, delay = self._needs_reschedule(group, state, time)
581
582         if reschedule:
583             callback = functools.partial(self.set_with_conditions, 
584                     name, value, group, state, time)
585             self.ec.schedule(delay, callback)
586         else:
587             self.set(name, value)
588
589     def start_with_conditions(self):
590         """ Starts RM when all the conditions in self.conditions for
591         action 'START' are satisfied.
592
593         """
594         reschedule = False
595         delay = reschedule_delay 
596
597         ## evaluate if set conditions are met
598
599         # only can start when RM is either STOPPED or READY
600         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
601             reschedule = True
602             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
603         else:
604             start_conditions = self.conditions.get(ResourceAction.START, [])
605             
606             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
607             
608             # Verify all start conditions are met
609             for (group, state, time) in start_conditions:
610                 # Uncomment for debug
611                 #unmet = []
612                 #for guid in group:
613                 #    rm = self.ec.get_resource(guid)
614                 #    unmet.append((guid, rm._state))
615                 #
616                 #self.debug("---- WAITED STATES ---- %s" % unmet )
617
618                 reschedule, delay = self._needs_reschedule(group, state, time)
619                 if reschedule:
620                     break
621
622         if reschedule:
623             self.ec.schedule(delay, self.start_with_conditions)
624         else:
625             self.debug("----- STARTING ---- ")
626             self.start()
627
628     def stop_with_conditions(self):
629         """ Stops RM when all the conditions in self.conditions for
630         action 'STOP' are satisfied.
631
632         """
633         reschedule = False
634         delay = reschedule_delay 
635
636         ## evaluate if set conditions are met
637
638         # only can stop when RM is STARTED
639         if self.state != ResourceState.STARTED:
640             reschedule = True
641         else:
642             self.debug(" ---- STOP CONDITIONS ---- %s" % 
643                     self.conditions.get(ResourceAction.STOP))
644
645             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
646             for (group, state, time) in stop_conditions:
647                 reschedule, delay = self._needs_reschedule(group, state, time)
648                 if reschedule:
649                     break
650
651         if reschedule:
652             callback = functools.partial(self.stop_with_conditions)
653             self.ec.schedule(delay, callback)
654         else:
655             self.debug(" ----- STOPPING ---- ") 
656             self.stop()
657
658     def deploy(self):
659         """ Execute all steps required for the RM to reach the state READY
660
661         """
662         if self._state > ResourceState.READY:
663             self.error("Wrong state %s for deploy" % self.state)
664             return
665
666         self.debug("----- READY ---- ")
667         self._ready_time = tnow()
668         self._state = ResourceState.READY
669
670     def release(self):
671         """Release any resources used by this RM
672
673         """
674         self._release_time = tnow()
675         self._state = ResourceState.RELEASED
676
677     def finish(self):
678         """ Mark ResourceManager as FINISHED
679
680         """
681         self._finish_time = tnow()
682         self._state = ResourceState.FINISHED
683
684     def fail(self):
685         """ Mark ResourceManager as FAILED
686
687         """
688         self._failed_time = tnow()
689         self._state = ResourceState.FAILED
690
691     def connect(self, guid):
692         """ Performs actions that need to be taken upon associating RMs.
693         This method should be redefined when necessary in child classes.
694         """
695         pass
696
697     def disconnect(self, guid):
698         """ Performs actions that need to be taken upon disassociating RMs.
699         This method should be redefined when necessary in child classes.
700         """
701         pass
702
703     def valid_connection(self, guid):
704         """Checks whether a connection with the other RM
705         is valid.
706         This method need to be redefined by each new Resource Manager.
707
708         :param guid: Guid of the current Resource Manager
709         :type guid: int
710         :rtype:  Boolean
711
712         """
713         # TODO: Validate!
714         return True
715
716 class ResourceFactory(object):
717     _resource_types = dict()
718
719     @classmethod
720     def resource_types(cls):
721         """Return the type of the Class"""
722         return cls._resource_types
723
724     @classmethod
725     def get_resource_type(cls, rtype):
726         """Return the type of the Class"""
727         return cls._resource_types.get(rtype)
728
729     @classmethod
730     def register_type(cls, rclass):
731         """Register a new Ressource Manager"""
732         cls._resource_types[rclass.rtype()] = rclass
733
734     @classmethod
735     def create(cls, rtype, ec, guid):
736         """Create a new instance of a Ressource Manager"""
737         rclass = cls._resource_types[rtype]
738         return rclass(ec, guid)
739
740 def populate_factory():
741     """Register all the possible RM that exists in the current version of Nepi.
742     """
743     # Once the factory is populated, don't repopulate
744     if not ResourceFactory.resource_types():
745         for rclass in find_types():
746             ResourceFactory.register_type(rclass)
747
748 def find_types():
749     """Look into the different folders to find all the 
750     availables Resources Managers
751     """
752     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
753     search_path = set(search_path.split(" "))
754    
755     import inspect
756     import nepi.resources 
757     path = os.path.dirname(nepi.resources.__file__)
758     search_path.add(path)
759
760     types = []
761
762     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
763             prefix = "nepi.resources."):
764
765         loader = importer.find_module(modname)
766         
767         try:
768             # Notice: Repeated calls to load_module will act as a reload of teh module
769             module = loader.load_module(modname)
770
771             for attrname in dir(module):
772                 if attrname.startswith("_"):
773                     continue
774
775                 attr = getattr(module, attrname)
776
777                 if attr == ResourceManager:
778                     continue
779
780                 if not inspect.isclass(attr):
781                     continue
782
783                 if issubclass(attr, ResourceManager):
784                     types.append(attr)
785         except:
786             import traceback
787             import logging
788             err = traceback.format_exc()
789             logger = logging.getLogger("Resource.find_types()")
790             logger.error("Error while loading Resource Managers %s" % err)
791
792     return types
793
794