2d738c5d07d270ad85a847f9f01b3cce5942c20d
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "1s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         # Each resource is placed on a deployment group by the EC
204         # during deployment
205         self.deployment_group = None
206
207         self._start_time = None
208         self._stop_time = None
209         self._discover_time = None
210         self._provision_time = None
211         self._ready_time = None
212         self._release_time = None
213         self._finish_time = None
214         self._failed_time = None
215
216         self._state = ResourceState.NEW
217
218     @property
219     def guid(self):
220         """ Returns the global unique identifier of the RM """
221         return self._guid
222
223     @property
224     def ec(self):
225         """ Returns the Experiment Controller """
226         return self._ec()
227
228     @property
229     def connections(self):
230         """ Returns the set of guids of connected RMs"""
231         return self._connections
232
233     @property
234     def conditions(self):
235         """ Returns the conditions to which the RM is subjected to.
236         
237         The object returned by this method is a dictionary indexed by
238         ResourceAction."""
239         return self._conditions
240
241     @property
242     def start_time(self):
243         """ Returns the start time of the RM as a timestamp"""
244         return self._start_time
245
246     @property
247     def stop_time(self):
248         """ Returns the stop time of the RM as a timestamp"""
249         return self._stop_time
250
251     @property
252     def discover_time(self):
253         """ Returns the time discovering was finished for the RM as a timestamp"""
254         return self._discover_time
255
256     @property
257     def provision_time(self):
258         """ Returns the time provisioning was finished for the RM as a timestamp"""
259         return self._provision_time
260
261     @property
262     def ready_time(self):
263         """ Returns the time deployment was finished for the RM as a timestamp"""
264         return self._ready_time
265
266     @property
267     def release_time(self):
268         """ Returns the release time of the RM as a timestamp"""
269         return self._release_time
270
271     @property
272     def finish_time(self):
273         """ Returns the finalization time of the RM as a timestamp"""
274         return self._finish_time
275
276     @property
277     def failed_time(self):
278         """ Returns the time failure occured for the RM as a timestamp"""
279         return self._failed_time
280
281     @property
282     def state(self):
283         """ Get the state of the current RM """
284         return self._state
285
286     def log_message(self, msg):
287         """ Returns the log message formatted with added information.
288
289         :param msg: text message
290         :type msg: str
291         :rtype: str
292         """
293         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
294
295     def register_connection(self, guid):
296         """ Registers a connection to the RM identified by guid
297
298         :param guid: Global unique identified of the RM to connect to
299         :type guid: int
300         """
301         if self.valid_connection(guid):
302             self.connect(guid)
303             self._connections.add(guid)
304
305     def unregister_connection(self, guid):
306         """ Removes a registered connection to the RM identified by guid
307
308         :param guid: Global unique identified of the RM to connect to
309         :type guid: int
310         """
311         if guid in self._connections:
312             self.disconnect(guid)
313             self._connections.remove(guid)
314
315     def discover(self):
316         """ Performs resource discovery.
317
318         This  method is resposible for selecting an individual resource
319         matching user requirements.
320         This method should be redefined when necessary in child classes.
321         """
322         self.set_discovered()
323
324     def provision(self):
325         """ Performs resource provisioning.
326
327         This  method is resposible for provisioning one resource.
328         After this method has been successfully invoked, the resource
329         should be acccesible/controllable by the RM.
330         This method should be redefined when necessary in child classes.
331         """
332         self.set_provisioned()
333
334     def start(self):
335         """ Starts the resource.
336         
337         There is no generic start behavior for all resources.
338         This method should be redefined when necessary in child classes.
339         """
340         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
341             self.error("Wrong state %s for start" % self.state)
342             return
343
344         self.set_started()
345
346     def stop(self):
347         """ Stops the resource.
348         
349         There is no generic stop behavior for all resources.
350         This method should be redefined when necessary in child classes.
351         """
352         if not self.state in [ResourceState.STARTED]:
353             self.error("Wrong state %s for stop" % self.state)
354             return
355         
356         self.set_stopped()
357
358     def deploy(self):
359         """ Execute all steps required for the RM to reach the state READY
360
361         """
362         if self.state > ResourceState.READY:
363             self.error("Wrong state %s for deploy" % self.state)
364             return
365
366         self.debug("----- READY ---- ")
367         self.set_ready()
368
369     def release(self):
370         self.set_released()
371
372     def finish(self):
373         self.set_finished()
374  
375     def fail(self):
376         self.set_failed()
377
378     def set(self, name, value):
379         """ Set the value of the attribute
380
381         :param name: Name of the attribute
382         :type name: str
383         :param name: Value of the attribute
384         :type name: str
385         """
386         attr = self._attrs[name]
387         attr.value = value
388
389     def get(self, name):
390         """ Returns the value of the attribute
391
392         :param name: Name of the attribute
393         :type name: str
394         :rtype: str
395         """
396         attr = self._attrs[name]
397         return attr.value
398
399     def enable_trace(self, name):
400         """ Explicitly enable trace generation
401
402         :param name: Name of the trace
403         :type name: str
404         """
405         trace = self._trcs[name]
406         trace.enabled = True
407     
408     def trace_enabled(self, name):
409         """Returns True if trace is enables 
410
411         :param name: Name of the trace
412         :type name: str
413         """
414         trace = self._trcs[name]
415         return trace.enabled
416  
417     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
418         """ Get information on collected trace
419
420         :param name: Name of the trace
421         :type name: str
422
423         :param attr: Can be one of:
424                          - TraceAttr.ALL (complete trace content), 
425                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
426                          - TraceAttr.PATH (full path to the trace file),
427                          - TraceAttr.SIZE (size of trace file). 
428         :type attr: str
429
430         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
431         :type name: int
432
433         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
434         :type name: int
435
436         :rtype: str
437         """
438         pass
439
440     def register_condition(self, action, group, state, time = None):
441         """ Registers a condition on the resource manager to allow execution 
442         of 'action' only after 'time' has elapsed from the moment all resources 
443         in 'group' reached state 'state'
444
445         :param action: Action to restrict to condition (either 'START' or 'STOP')
446         :type action: str
447         :param group: Group of RMs to wait for (list of guids)
448         :type group: int or list of int
449         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
450         :type state: str
451         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
452         :type time: str
453
454         """
455
456         if not action in self.conditions:
457             self._conditions[action] = list()
458         
459         conditions = self.conditions.get(action)
460
461         # For each condition to register a tuple of (group, state, time) is 
462         # added to the 'action' list
463         if not isinstance(group, list):
464             group = [group]
465
466         conditions.append((group, state, time))
467
468     def unregister_condition(self, group, action = None):
469         """ Removed conditions for a certain group of guids
470
471         :param action: Action to restrict to condition (either 'START' or 'STOP')
472         :type action: str
473
474         :param group: Group of RMs to wait for (list of guids)
475         :type group: int or list of int
476
477         """
478         # For each condition a tuple of (group, state, time) is 
479         # added to the 'action' list
480         if not isinstance(group, list):
481             group = [group]
482
483         for act, conditions in self.conditions.iteritems():
484             if action and act != action:
485                 continue
486
487             for condition in list(conditions):
488                 (grp, state, time) = condition
489
490                 # If there is an intersection between grp and group,
491                 # then remove intersected elements
492                 intsec = set(group).intersection(set(grp))
493                 if intsec:
494                     idx = conditions.index(condition)
495                     newgrp = set(grp)
496                     newgrp.difference_update(intsec)
497                     conditions[idx] = (newgrp, state, time)
498                  
499     def get_connected(self, rtype = None):
500         """ Returns the list of RM with the type 'rtype'
501
502         :param rtype: Type of the RM we look for
503         :type rtype: str
504         :return: list of guid
505         """
506         connected = []
507         rclass = ResourceFactory.get_resource_type(rtype)
508         for guid in self.connections:
509             rm = self.ec.get_resource(guid)
510             if not rtype or isinstance(rm, rclass):
511                 connected.append(rm)
512         return connected
513
514     def _needs_reschedule(self, group, state, time):
515         """ Internal method that verify if 'time' has elapsed since 
516         all elements in 'group' have reached state 'state'.
517
518         :param group: Group of RMs to wait for (list of guids)
519         :type group: int or list of int
520         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
521         :type state: str
522         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
523         :type time: str
524
525         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
526         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
527         For the moment, 2m30s is not a correct syntax.
528
529         """
530         reschedule = False
531         delay = reschedule_delay 
532
533         # check state and time elapsed on all RMs
534         for guid in group:
535             rm = self.ec.get_resource(guid)
536             # If the RM state is lower than the requested state we must
537             # reschedule (e.g. if RM is READY but we required STARTED).
538             if rm.state < state:
539                 reschedule = True
540                 break
541
542             # If there is a time restriction, we must verify the
543             # restriction is satisfied 
544             if time:
545                 if state == ResourceState.DISCOVERED:
546                     t = rm.discover_time
547                 if state == ResourceState.PROVISIONED:
548                     t = rm.provision_time
549                 elif state == ResourceState.READY:
550                     t = rm.ready_time
551                 elif state == ResourceState.STARTED:
552                     t = rm.start_time
553                 elif state == ResourceState.STOPPED:
554                     t = rm.stop_time
555                 else:
556                     # Only keep time information for START and STOP
557                     break
558
559                 # time already elapsed since RM changed state
560                 waited = "%fs" % tdiffsec(tnow(), t)
561
562                 # time still to wait
563                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
564
565                 if wait > 0.001:
566                     reschedule = True
567                     delay = "%fs" % wait
568                     break
569
570         return reschedule, delay
571
572     def set_with_conditions(self, name, value, group, state, time):
573         """ Set value 'value' on attribute with name 'name' when 'time' 
574         has elapsed since all elements in 'group' have reached state
575         'state'
576
577         :param name: Name of the attribute to set
578         :type name: str
579         :param name: Value of the attribute to set
580         :type name: str
581         :param group: Group of RMs to wait for (list of guids)
582         :type group: int or list of int
583         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
584         :type state: str
585         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
586         :type time: str
587         """
588
589         reschedule = False
590         delay = reschedule_delay 
591
592         ## evaluate if set conditions are met
593
594         # only can set with conditions after the RM is started
595         if self.state != ResourceState.STARTED:
596             reschedule = True
597         else:
598             reschedule, delay = self._needs_reschedule(group, state, time)
599
600         if reschedule:
601             callback = functools.partial(self.set_with_conditions, 
602                     name, value, group, state, time)
603             self.ec.schedule(delay, callback)
604         else:
605             self.set(name, value)
606
607     def start_with_conditions(self):
608         """ Starts RM when all the conditions in self.conditions for
609         action 'START' are satisfied.
610
611         """
612         reschedule = False
613         delay = reschedule_delay 
614
615         ## evaluate if set conditions are met
616
617         # only can start when RM is either STOPPED or READY
618         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
619             reschedule = True
620             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
621         else:
622             start_conditions = self.conditions.get(ResourceAction.START, [])
623             
624             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
625             
626             # Verify all start conditions are met
627             for (group, state, time) in start_conditions:
628                 # Uncomment for debug
629                 #unmet = []
630                 #for guid in group:
631                 #    rm = self.ec.get_resource(guid)
632                 #    unmet.append((guid, rm._state))
633                 #
634                 #self.debug("---- WAITED STATES ---- %s" % unmet )
635
636                 reschedule, delay = self._needs_reschedule(group, state, time)
637                 if reschedule:
638                     break
639
640         if reschedule:
641             self.ec.schedule(delay, self.start_with_conditions)
642         else:
643             self.debug("----- STARTING ---- ")
644             self.start()
645
646     def stop_with_conditions(self):
647         """ Stops RM when all the conditions in self.conditions for
648         action 'STOP' are satisfied.
649
650         """
651         reschedule = False
652         delay = reschedule_delay 
653
654         ## evaluate if set conditions are met
655
656         # only can stop when RM is STARTED
657         if self.state != ResourceState.STARTED:
658             reschedule = True
659         else:
660             self.debug(" ---- STOP CONDITIONS ---- %s" % 
661                     self.conditions.get(ResourceAction.STOP))
662
663             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
664             for (group, state, time) in stop_conditions:
665                 reschedule, delay = self._needs_reschedule(group, state, time)
666                 if reschedule:
667                     break
668
669         if reschedule:
670             callback = functools.partial(self.stop_with_conditions)
671             self.ec.schedule(delay, callback)
672         else:
673             self.debug(" ----- STOPPING ---- ") 
674             self.stop()
675
676     def connect(self, guid):
677         """ Performs actions that need to be taken upon associating RMs.
678         This method should be redefined when necessary in child classes.
679         """
680         pass
681
682     def disconnect(self, guid):
683         """ Performs actions that need to be taken upon disassociating RMs.
684         This method should be redefined when necessary in child classes.
685         """
686         pass
687
688     def valid_connection(self, guid):
689         """Checks whether a connection with the other RM
690         is valid.
691         This method need to be redefined by each new Resource Manager.
692
693         :param guid: Guid of the current Resource Manager
694         :type guid: int
695         :rtype:  Boolean
696
697         """
698         # TODO: Validate!
699         return True
700     
701     def set_started(self):
702         """ Mark ResourceManager as STARTED """
703         self._start_time = tnow()
704         self._state = ResourceState.STARTED
705         
706     def set_stopped(self):
707         """ Mark ResourceManager as STOPPED """
708         self._stop_time = tnow()
709         self._state = ResourceState.STOPPED
710
711     def set_ready(self):
712         """ Mark ResourceManager as READY """
713         self._ready_time = tnow()
714         self._state = ResourceState.READY
715
716     def set_released(self):
717         """ Mark ResourceManager as REALEASED """
718         self._release_time = tnow()
719         self._state = ResourceState.RELEASED
720
721     def set_finished(self):
722         """ Mark ResourceManager as FINISHED """
723         self._finish_time = tnow()
724         self._state = ResourceState.FINISHED
725
726     def set_failed(self):
727         """ Mark ResourceManager as FAILED """
728         self._failed_time = tnow()
729         self._state = ResourceState.FAILED
730
731     def set_discovered(self):
732         """ Mark ResourceManager as DISCOVERED """
733         self._discover_time = tnow()
734         self._state = ResourceState.DISCOVERED
735
736     def set_provisioned(self):
737         """ Mark ResourceManager as PROVISIONED """
738         self._provision_time = tnow()
739         self._state = ResourceState.PROVISIONED
740
741 class ResourceFactory(object):
742     _resource_types = dict()
743
744     @classmethod
745     def resource_types(cls):
746         """Return the type of the Class"""
747         return cls._resource_types
748
749     @classmethod
750     def get_resource_type(cls, rtype):
751         """Return the type of the Class"""
752         return cls._resource_types.get(rtype)
753
754     @classmethod
755     def register_type(cls, rclass):
756         """Register a new Ressource Manager"""
757         cls._resource_types[rclass.rtype()] = rclass
758
759     @classmethod
760     def create(cls, rtype, ec, guid):
761         """Create a new instance of a Ressource Manager"""
762         rclass = cls._resource_types[rtype]
763         return rclass(ec, guid)
764
765 def populate_factory():
766     """Register all the possible RM that exists in the current version of Nepi.
767     """
768     # Once the factory is populated, don't repopulate
769     if not ResourceFactory.resource_types():
770         for rclass in find_types():
771             ResourceFactory.register_type(rclass)
772
773 def find_types():
774     """Look into the different folders to find all the 
775     availables Resources Managers
776     """
777     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
778     search_path = set(search_path.split(" "))
779    
780     import inspect
781     import nepi.resources 
782     path = os.path.dirname(nepi.resources.__file__)
783     search_path.add(path)
784
785     types = []
786
787     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
788             prefix = "nepi.resources."):
789
790         loader = importer.find_module(modname)
791         
792         try:
793             # Notice: Repeated calls to load_module will act as a reload of teh module
794             module = loader.load_module(modname)
795
796             for attrname in dir(module):
797                 if attrname.startswith("_"):
798                     continue
799
800                 attr = getattr(module, attrname)
801
802                 if attr == ResourceManager:
803                     continue
804
805                 if not inspect.isclass(attr):
806                     continue
807
808                 if issubclass(attr, ResourceManager):
809                     types.append(attr)
810         except:
811             import traceback
812             import logging
813             err = traceback.format_exc()
814             logger = logging.getLogger("Resource.find_types()")
815             logger.error("Error while loading Resource Managers %s" % err)
816
817     return types
818
819