popolate_factory no longer requires to be invoked explicitly by the user
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "1s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self._start_time = None
206         self._stop_time = None
207         self._discover_time = None
208         self._provision_time = None
209         self._ready_time = None
210         self._release_time = None
211         self._finish_time = None
212         self._failed_time = None
213
214     @property
215     def guid(self):
216         """ Returns the global unique identifier of the RM """
217         return self._guid
218
219     @property
220     def ec(self):
221         """ Returns the Experiment Controller """
222         return self._ec()
223
224     @property
225     def connections(self):
226         """ Returns the set of guids of connected RMs"""
227         return self._connections
228
229     @property
230     def conditions(self):
231         """ Returns the conditions to which the RM is subjected to.
232         
233         The object returned by this method is a dictionary indexed by
234         ResourceAction."""
235         return self._conditions
236
237     @property
238     def start_time(self):
239         """ Returns the start time of the RM as a timestamp"""
240         return self._start_time
241
242     @property
243     def stop_time(self):
244         """ Returns the stop time of the RM as a timestamp"""
245         return self._stop_time
246
247     @property
248     def discover_time(self):
249         """ Returns the time discovering was finished for the RM as a timestamp"""
250         return self._discover_time
251
252     @property
253     def provision_time(self):
254         """ Returns the time provisioning was finished for the RM as a timestamp"""
255         return self._provision_time
256
257     @property
258     def ready_time(self):
259         """ Returns the time deployment was finished for the RM as a timestamp"""
260         return self._ready_time
261
262     @property
263     def release_time(self):
264         """ Returns the release time of the RM as a timestamp"""
265         return self._release_time
266
267     @property
268     def finish_time(self):
269         """ Returns the finalization time of the RM as a timestamp"""
270         return self._finish_time
271
272     @property
273     def failed_time(self):
274         """ Returns the time failure occured for the RM as a timestamp"""
275         return self._failed_time
276
277     @property
278     def state(self):
279         """ Get the state of the current RM """
280         return self._state
281
282     def log_message(self, msg):
283         """ Returns the log message formatted with added information.
284
285         :param msg: text message
286         :type msg: str
287         :rtype: str
288         """
289         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
290
291     def register_connection(self, guid):
292         """ Registers a connection to the RM identified by guid
293
294         :param guid: Global unique identified of the RM to connect to
295         :type guid: int
296         """
297         if self.valid_connection(guid):
298             self.connect(guid)
299             self._connections.add(guid)
300
301     def unregister_connection(self, guid):
302         """ Removes a registered connection to the RM identified by guid
303
304         :param guid: Global unique identified of the RM to connect to
305         :type guid: int
306         """
307         if guid in self._connections:
308             self.disconnect(guid)
309             self._connections.remove(guid)
310
311     def discover(self):
312         """ Performs resource discovery.
313
314         This  method is resposible for selecting an individual resource
315         matching user requirements.
316         This method should be redefined when necessary in child classes.
317         """ 
318         self._discover_time = tnow()
319         self._state = ResourceState.DISCOVERED
320
321     def provision(self):
322         """ Performs resource provisioning.
323
324         This  method is resposible for provisioning one resource.
325         After this method has been successfully invoked, the resource
326         should be acccesible/controllable by the RM.
327         This method should be redefined when necessary in child classes.
328         """ 
329         self._provision_time = tnow()
330         self._state = ResourceState.PROVISIONED
331
332     def start(self):
333         """ Starts the resource.
334         
335         There is no generic start behavior for all resources.
336         This method should be redefined when necessary in child classes.
337         """
338         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
339             self.error("Wrong state %s for start" % self.state)
340             return
341
342         self._start_time = tnow()
343         self._state = ResourceState.STARTED
344
345     def stop(self):
346         """ Stops the resource.
347         
348         There is no generic stop behavior for all resources.
349         This method should be redefined when necessary in child classes.
350         """
351         if not self._state in [ResourceState.STARTED]:
352             self.error("Wrong state %s for stop" % self.state)
353             return
354
355         self._stop_time = tnow()
356         self._state = ResourceState.STOPPED
357
358     def set(self, name, value):
359         """ Set the value of the attribute
360
361         :param name: Name of the attribute
362         :type name: str
363         :param name: Value of the attribute
364         :type name: str
365         """
366         attr = self._attrs[name]
367         attr.value = value
368
369     def get(self, name):
370         """ Returns the value of the attribute
371
372         :param name: Name of the attribute
373         :type name: str
374         :rtype: str
375         """
376         attr = self._attrs[name]
377         return attr.value
378
379     def enable_trace(self, name):
380         """ Explicitly enable trace generation
381
382         :param name: Name of the trace
383         :type name: str
384         """
385         trace = self._trcs[name]
386         trace.enabled = True
387     
388     def trace_enabled(self, name):
389         """Returns True if trace is enables 
390
391         :param name: Name of the trace
392         :type name: str
393         """
394         trace = self._trcs[name]
395         return trace.enabled
396  
397     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
398         """ Get information on collected trace
399
400         :param name: Name of the trace
401         :type name: str
402
403         :param attr: Can be one of:
404                          - TraceAttr.ALL (complete trace content), 
405                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
406                          - TraceAttr.PATH (full path to the trace file),
407                          - TraceAttr.SIZE (size of trace file). 
408         :type attr: str
409
410         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
411         :type name: int
412
413         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
414         :type name: int
415
416         :rtype: str
417         """
418         pass
419
420     def register_condition(self, action, group, state, time = None):
421         """ Registers a condition on the resource manager to allow execution 
422         of 'action' only after 'time' has elapsed from the moment all resources 
423         in 'group' reached state 'state'
424
425         :param action: Action to restrict to condition (either 'START' or 'STOP')
426         :type action: str
427         :param group: Group of RMs to wait for (list of guids)
428         :type group: int or list of int
429         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
430         :type state: str
431         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
432         :type time: str
433
434         """
435
436         if not action in self.conditions:
437             self._conditions[action] = list()
438         
439         conditions = self.conditions.get(action)
440
441         # For each condition to register a tuple of (group, state, time) is 
442         # added to the 'action' list
443         if not isinstance(group, list):
444             group = [group]
445
446         conditions.append((group, state, time))
447
448     def unregister_condition(self, group, action = None):
449         """ Removed conditions for a certain group of guids
450
451         :param action: Action to restrict to condition (either 'START' or 'STOP')
452         :type action: str
453
454         :param group: Group of RMs to wait for (list of guids)
455         :type group: int or list of int
456
457         """
458         # For each condition a tuple of (group, state, time) is 
459         # added to the 'action' list
460         if not isinstance(group, list):
461             group = [group]
462
463         for act, conditions in self.conditions.iteritems():
464             if action and act != action:
465                 continue
466
467             for condition in list(conditions):
468                 (grp, state, time) = condition
469
470                 # If there is an intersection between grp and group,
471                 # then remove intersected elements
472                 intsec = set(group).intersection(set(grp))
473                 if intsec:
474                     idx = conditions.index(condition)
475                     newgrp = set(grp)
476                     newgrp.difference_update(intsec)
477                     conditions[idx] = (newgrp, state, time)
478                  
479     def get_connected(self, rtype = None):
480         """ Returns the list of RM with the type 'rtype'
481
482         :param rtype: Type of the RM we look for
483         :type rtype: str
484         :return: list of guid
485         """
486         connected = []
487         rclass = ResourceFactory.get_resource_type(rtype)
488         for guid in self.connections:
489             rm = self.ec.get_resource(guid)
490             if not rtype or isinstance(rm, rclass):
491                 connected.append(rm)
492         return connected
493
494     def _needs_reschedule(self, group, state, time):
495         """ Internal method that verify if 'time' has elapsed since 
496         all elements in 'group' have reached state 'state'.
497
498         :param group: Group of RMs to wait for (list of guids)
499         :type group: int or list of int
500         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
501         :type state: str
502         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
503         :type time: str
504
505         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
506         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
507         For the moment, 2m30s is not a correct syntax.
508
509         """
510         reschedule = False
511         delay = reschedule_delay 
512
513         # check state and time elapsed on all RMs
514         for guid in group:
515             rm = self.ec.get_resource(guid)
516             # If the RM state is lower than the requested state we must
517             # reschedule (e.g. if RM is READY but we required STARTED).
518             if rm.state < state:
519                 reschedule = True
520                 break
521
522             # If there is a time restriction, we must verify the
523             # restriction is satisfied 
524             if time:
525                 if state == ResourceState.DISCOVERED:
526                     t = rm.discover_time
527                 if state == ResourceState.PROVISIONED:
528                     t = rm.provision_time
529                 elif state == ResourceState.READY:
530                     t = rm.ready_time
531                 elif state == ResourceState.STARTED:
532                     t = rm.start_time
533                 elif state == ResourceState.STOPPED:
534                     t = rm.stop_time
535                 else:
536                     # Only keep time information for START and STOP
537                     break
538
539                 # time already elapsed since RM changed state
540                 waited = "%fs" % tdiffsec(tnow(), t)
541
542                 # time still to wait
543                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
544
545                 if wait > 0.001:
546                     reschedule = True
547                     delay = "%fs" % wait
548                     break
549
550         return reschedule, delay
551
552     def set_with_conditions(self, name, value, group, state, time):
553         """ Set value 'value' on attribute with name 'name' when 'time' 
554         has elapsed since all elements in 'group' have reached state
555         'state'
556
557         :param name: Name of the attribute to set
558         :type name: str
559         :param name: Value of the attribute to set
560         :type name: str
561         :param group: Group of RMs to wait for (list of guids)
562         :type group: int or list of int
563         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
564         :type state: str
565         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
566         :type time: str
567         """
568
569         reschedule = False
570         delay = reschedule_delay 
571
572         ## evaluate if set conditions are met
573
574         # only can set with conditions after the RM is started
575         if self.state != ResourceState.STARTED:
576             reschedule = True
577         else:
578             reschedule, delay = self._needs_reschedule(group, state, time)
579
580         if reschedule:
581             callback = functools.partial(self.set_with_conditions, 
582                     name, value, group, state, time)
583             self.ec.schedule(delay, callback)
584         else:
585             self.set(name, value)
586
587     def start_with_conditions(self):
588         """ Starts RM when all the conditions in self.conditions for
589         action 'START' are satisfied.
590
591         """
592         reschedule = False
593         delay = reschedule_delay 
594
595         ## evaluate if set conditions are met
596
597         # only can start when RM is either STOPPED or READY
598         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
599             reschedule = True
600             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
601         else:
602             start_conditions = self.conditions.get(ResourceAction.START, [])
603             
604             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
605             
606             # Verify all start conditions are met
607             for (group, state, time) in start_conditions:
608                 # Uncomment for debug
609                 #unmet = []
610                 #for guid in group:
611                 #    rm = self.ec.get_resource(guid)
612                 #    unmet.append((guid, rm._state))
613                 #
614                 #self.debug("---- WAITED STATES ---- %s" % unmet )
615
616                 reschedule, delay = self._needs_reschedule(group, state, time)
617                 if reschedule:
618                     break
619
620         if reschedule:
621             self.ec.schedule(delay, self.start_with_conditions)
622         else:
623             self.debug("----- STARTING ---- ")
624             self.start()
625
626     def stop_with_conditions(self):
627         """ Stops RM when all the conditions in self.conditions for
628         action 'STOP' are satisfied.
629
630         """
631         reschedule = False
632         delay = reschedule_delay 
633
634         ## evaluate if set conditions are met
635
636         # only can stop when RM is STARTED
637         if self.state != ResourceState.STARTED:
638             reschedule = True
639         else:
640             self.debug(" ---- STOP CONDITIONS ---- %s" % 
641                     self.conditions.get(ResourceAction.STOP))
642
643             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
644             for (group, state, time) in stop_conditions:
645                 reschedule, delay = self._needs_reschedule(group, state, time)
646                 if reschedule:
647                     break
648
649         if reschedule:
650             callback = functools.partial(self.stop_with_conditions)
651             self.ec.schedule(delay, callback)
652         else:
653             self.debug(" ----- STOPPING ---- ") 
654             self.stop()
655
656     def deploy(self):
657         """ Execute all steps required for the RM to reach the state READY
658
659         """
660         if self._state > ResourceState.READY:
661             self.error("Wrong state %s for deploy" % self.state)
662             return
663
664         self.debug("----- READY ---- ")
665         self._ready_time = tnow()
666         self._state = ResourceState.READY
667
668     def release(self):
669         """Release any resources used by this RM
670
671         """
672         self._release_time = tnow()
673         self._state = ResourceState.RELEASED
674
675     def finish(self):
676         """ Mark ResourceManager as FINISHED
677
678         """
679         self._finish_time = tnow()
680         self._state = ResourceState.FINISHED
681
682     def fail(self):
683         """ Mark ResourceManager as FAILED
684
685         """
686         self._failed_time = tnow()
687         self._state = ResourceState.FAILED
688
689     def connect(self, guid):
690         """ Performs actions that need to be taken upon associating RMs.
691         This method should be redefined when necessary in child classes.
692         """
693         pass
694
695     def disconnect(self, guid):
696         """ Performs actions that need to be taken upon disassociating RMs.
697         This method should be redefined when necessary in child classes.
698         """
699         pass
700
701     def valid_connection(self, guid):
702         """Checks whether a connection with the other RM
703         is valid.
704         This method need to be redefined by each new Resource Manager.
705
706         :param guid: Guid of the current Resource Manager
707         :type guid: int
708         :rtype:  Boolean
709
710         """
711         # TODO: Validate!
712         return True
713
714 class ResourceFactory(object):
715     _resource_types = dict()
716
717     @classmethod
718     def resource_types(cls):
719         """Return the type of the Class"""
720         return cls._resource_types
721
722     @classmethod
723     def get_resource_type(cls, rtype):
724         """Return the type of the Class"""
725         return cls._resource_types.get(rtype)
726
727     @classmethod
728     def register_type(cls, rclass):
729         """Register a new Ressource Manager"""
730         cls._resource_types[rclass.rtype()] = rclass
731
732     @classmethod
733     def create(cls, rtype, ec, guid):
734         """Create a new instance of a Ressource Manager"""
735         rclass = cls._resource_types[rtype]
736         return rclass(ec, guid)
737
738 def populate_factory():
739     """Register all the possible RM that exists in the current version of Nepi.
740     """
741     # Once the factory is populated, don't repopulate
742     if not ResourceFactory.resource_types():
743         for rclass in find_types():
744             ResourceFactory.register_type(rclass)
745
746 def find_types():
747     """Look into the different folders to find all the 
748     availables Resources Managers
749     """
750     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
751     search_path = set(search_path.split(" "))
752    
753     import inspect
754     import nepi.resources 
755     path = os.path.dirname(nepi.resources.__file__)
756     search_path.add(path)
757
758     types = []
759
760     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
761             prefix = "nepi.resources."):
762
763         loader = importer.find_module(modname)
764         
765         try:
766             # Notice: Repeated calls to load_module will act as a reload of teh module
767             module = loader.load_module(modname)
768
769             for attrname in dir(module):
770                 if attrname.startswith("_"):
771                     continue
772
773                 attr = getattr(module, attrname)
774
775                 if attr == ResourceManager:
776                     continue
777
778                 if not inspect.isclass(attr):
779                     continue
780
781                 if issubclass(attr, ResourceManager):
782                     types.append(attr)
783         except:
784             import traceback
785             import logging
786             err = traceback.format_exc()
787             logger = logging.getLogger("Resource.find_types()")
788             logger.error("Error while lading Resource Managers %s" % err)
789
790     return types
791
792