Added LinuxCCNPing
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import sys
30 import weakref
31
32 reschedule_delay = "1s"
33
34 class ResourceAction:
35     """ Action that a user can order to a Resource Manager
36    
37     """
38     DEPLOY = 0
39     START = 1
40     STOP = 2
41
42 class ResourceState:
43     """ State of a Resource Manager
44    
45     """
46     NEW = 0
47     DISCOVERED = 1
48     PROVISIONED = 2
49     READY = 3
50     STARTED = 4
51     STOPPED = 5
52     FINISHED = 6
53     FAILED = 7
54     RELEASED = 8
55
56 ResourceState2str = dict({
57     ResourceState.NEW : "NEW",
58     ResourceState.DISCOVERED : "DISCOVERED",
59     ResourceState.PROVISIONED : "PROVISIONED",
60     ResourceState.READY : "READY",
61     ResourceState.STARTED : "STARTED",
62     ResourceState.STOPPED : "STOPPED",
63     ResourceState.FINISHED : "FINISHED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     for the ResourceManager class
71     """
72     cls._clsinit()
73     return cls
74
75 def clsinit_copy(cls):
76     """ Initializes template information (i.e. attributes and traces)
77     for the ResourceManager class, inheriting attributes and traces
78     from the parent class
79     """
80     cls._clsinit_copy()
81     return cls
82
83 # Decorator to invoke class initialization method
84 @clsinit
85 class ResourceManager(Logger):
86     _rtype = "Resource"
87     _attributes = None
88     _traces = None
89
90     @classmethod
91     def _register_attribute(cls, attr):
92         """ Resource subclasses will invoke this method to add a 
93         resource attribute
94
95         """
96         cls._attributes[attr.name] = attr
97
98     @classmethod
99     def _remove_attribute(cls, name):
100         """ Resource subclasses will invoke this method to remove a 
101         resource attribute
102
103         """
104         del cls._attributes[name]
105
106     @classmethod
107     def _register_trace(cls, trace):
108         """ Resource subclasses will invoke this method to add a 
109         resource trace
110
111         """
112         cls._traces[trace.name] = trace
113
114     @classmethod
115     def _remove_trace(cls, name):
116         """ Resource subclasses will invoke this method to remove a 
117         resource trace
118
119         """
120         del cls._traces[name]
121
122     @classmethod
123     def _register_attributes(cls):
124         """ Resource subclasses will invoke this method to register
125         resource attributes
126
127         """
128         pass
129
130     @classmethod
131     def _register_traces(cls):
132         """ Resource subclasses will invoke this method to register
133         resource traces
134
135         """
136         pass
137
138     @classmethod
139     def _clsinit(cls):
140         """ ResourceManager child classes have different attributes and traces.
141         Since the templates that hold the information of attributes and traces
142         are 'class attribute' dictionaries, initially they all point to the 
143         parent class ResourceManager instances of those dictionaries. 
144         In order to make these templates independent from the parent's one,
145         it is necessary re-initialize the corresponding dictionaries. 
146         This is the objective of the _clsinit method
147         """
148         # static template for resource attributes
149         cls._attributes = dict()
150         cls._register_attributes()
151
152         # static template for resource traces
153         cls._traces = dict()
154         cls._register_traces()
155
156     @classmethod
157     def _clsinit_copy(cls):
158         """ Same as _clsinit, except that it also inherits all attributes and traces
159         from the parent class.
160         """
161         # static template for resource attributes
162         cls._attributes = copy.deepcopy(cls._attributes)
163         cls._register_attributes()
164
165         # static template for resource traces
166         cls._traces = copy.deepcopy(cls._traces)
167         cls._register_traces()
168
169     @classmethod
170     def rtype(cls):
171         """ Returns the type of the Resource Manager
172
173         """
174         return cls._rtype
175
176     @classmethod
177     def get_attributes(cls):
178         """ Returns a copy of the attributes
179
180         """
181         return copy.deepcopy(cls._attributes.values())
182
183     @classmethod
184     def get_traces(cls):
185         """ Returns a copy of the traces
186
187         """
188         return copy.deepcopy(cls._traces.values())
189
190     def __init__(self, ec, guid):
191         super(ResourceManager, self).__init__(self.rtype())
192         
193         self._guid = guid
194         self._ec = weakref.ref(ec)
195         self._connections = set()
196         self._conditions = dict() 
197
198         # the resource instance gets a copy of all attributes
199         self._attrs = copy.deepcopy(self._attributes)
200
201         # the resource instance gets a copy of all traces
202         self._trcs = copy.deepcopy(self._traces)
203
204         # Each resource is placed on a deployment group by the EC
205         # during deployment
206         self.deployment_group = None
207
208         self._start_time = None
209         self._stop_time = None
210         self._discover_time = None
211         self._provision_time = None
212         self._ready_time = None
213         self._release_time = None
214         self._finish_time = None
215         self._failed_time = None
216
217         self._state = ResourceState.NEW
218
219     @property
220     def guid(self):
221         """ Returns the global unique identifier of the RM """
222         return self._guid
223
224     @property
225     def ec(self):
226         """ Returns the Experiment Controller """
227         return self._ec()
228
229     @property
230     def connections(self):
231         """ Returns the set of guids of connected RMs"""
232         return self._connections
233
234     @property
235     def conditions(self):
236         """ Returns the conditions to which the RM is subjected to.
237         
238         The object returned by this method is a dictionary indexed by
239         ResourceAction."""
240         return self._conditions
241
242     @property
243     def start_time(self):
244         """ Returns the start time of the RM as a timestamp"""
245         return self._start_time
246
247     @property
248     def stop_time(self):
249         """ Returns the stop time of the RM as a timestamp"""
250         return self._stop_time
251
252     @property
253     def discover_time(self):
254         """ Returns the time discovering was finished for the RM as a timestamp"""
255         return self._discover_time
256
257     @property
258     def provision_time(self):
259         """ Returns the time provisioning was finished for the RM as a timestamp"""
260         return self._provision_time
261
262     @property
263     def ready_time(self):
264         """ Returns the time deployment was finished for the RM as a timestamp"""
265         return self._ready_time
266
267     @property
268     def release_time(self):
269         """ Returns the release time of the RM as a timestamp"""
270         return self._release_time
271
272     @property
273     def finish_time(self):
274         """ Returns the finalization time of the RM as a timestamp"""
275         return self._finish_time
276
277     @property
278     def failed_time(self):
279         """ Returns the time failure occured for the RM as a timestamp"""
280         return self._failed_time
281
282     @property
283     def state(self):
284         """ Get the state of the current RM """
285         return self._state
286
287     def log_message(self, msg):
288         """ Returns the log message formatted with added information.
289
290         :param msg: text message
291         :type msg: str
292         :rtype: str
293         """
294         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
295
296     def register_connection(self, guid):
297         """ Registers a connection to the RM identified by guid
298
299         :param guid: Global unique identified of the RM to connect to
300         :type guid: int
301         """
302         if self.valid_connection(guid):
303             self.connect(guid)
304             self._connections.add(guid)
305
306     def unregister_connection(self, guid):
307         """ Removes a registered connection to the RM identified by guid
308
309         :param guid: Global unique identified of the RM to connect to
310         :type guid: int
311         """
312         if guid in self._connections:
313             self.disconnect(guid)
314             self._connections.remove(guid)
315
316     def discover(self):
317         """ Performs resource discovery.
318
319         This  method is resposible for selecting an individual resource
320         matching user requirements.
321         This method should be redefined when necessary in child classes.
322         """
323         self.set_discovered()
324
325     def provision(self):
326         """ Performs resource provisioning.
327
328         This  method is resposible for provisioning one resource.
329         After this method has been successfully invoked, the resource
330         should be acccesible/controllable by the RM.
331         This method should be redefined when necessary in child classes.
332         """
333         self.set_provisioned()
334
335     def start(self):
336         """ Starts the resource.
337         
338         There is no generic start behavior for all resources.
339         This method should be redefined when necessary in child classes.
340         """
341         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
342             self.error("Wrong state %s for start" % self.state)
343             return
344
345         self.set_started()
346
347     def stop(self):
348         """ Stops the resource.
349         
350         There is no generic stop behavior for all resources.
351         This method should be redefined when necessary in child classes.
352         """
353         if not self.state in [ResourceState.STARTED]:
354             self.error("Wrong state %s for stop" % self.state)
355             return
356         
357         self.set_stopped()
358
359     def deploy(self):
360         """ Execute all steps required for the RM to reach the state READY
361
362         """
363         if self.state > ResourceState.READY:
364             self.error("Wrong state %s for deploy" % self.state)
365             return
366
367         self.debug("----- READY ---- ")
368         self.set_ready()
369
370     def release(self):
371         self.set_released()
372
373     def finish(self):
374         self.set_finished()
375  
376     def fail(self):
377         self.set_failed()
378
379     def set(self, name, value):
380         """ Set the value of the attribute
381
382         :param name: Name of the attribute
383         :type name: str
384         :param name: Value of the attribute
385         :type name: str
386         """
387         attr = self._attrs[name]
388         attr.value = value
389
390     def get(self, name):
391         """ Returns the value of the attribute
392
393         :param name: Name of the attribute
394         :type name: str
395         :rtype: str
396         """
397         attr = self._attrs[name]
398         return attr.value
399
400     def enable_trace(self, name):
401         """ Explicitly enable trace generation
402
403         :param name: Name of the trace
404         :type name: str
405         """
406         trace = self._trcs[name]
407         trace.enabled = True
408     
409     def trace_enabled(self, name):
410         """Returns True if trace is enables 
411
412         :param name: Name of the trace
413         :type name: str
414         """
415         trace = self._trcs[name]
416         return trace.enabled
417  
418     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
419         """ Get information on collected trace
420
421         :param name: Name of the trace
422         :type name: str
423
424         :param attr: Can be one of:
425                          - TraceAttr.ALL (complete trace content), 
426                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
427                          - TraceAttr.PATH (full path to the trace file),
428                          - TraceAttr.SIZE (size of trace file). 
429         :type attr: str
430
431         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
432         :type name: int
433
434         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
435         :type name: int
436
437         :rtype: str
438         """
439         pass
440
441     def register_condition(self, action, group, state, time = None):
442         """ Registers a condition on the resource manager to allow execution 
443         of 'action' only after 'time' has elapsed from the moment all resources 
444         in 'group' reached state 'state'
445
446         :param action: Action to restrict to condition (either 'START' or 'STOP')
447         :type action: str
448         :param group: Group of RMs to wait for (list of guids)
449         :type group: int or list of int
450         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
451         :type state: str
452         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
453         :type time: str
454
455         """
456
457         if not action in self.conditions:
458             self._conditions[action] = list()
459         
460         conditions = self.conditions.get(action)
461
462         # For each condition to register a tuple of (group, state, time) is 
463         # added to the 'action' list
464         if not isinstance(group, list):
465             group = [group]
466
467         conditions.append((group, state, time))
468
469     def unregister_condition(self, group, action = None):
470         """ Removed conditions for a certain group of guids
471
472         :param action: Action to restrict to condition (either 'START' or 'STOP')
473         :type action: str
474
475         :param group: Group of RMs to wait for (list of guids)
476         :type group: int or list of int
477
478         """
479         # For each condition a tuple of (group, state, time) is 
480         # added to the 'action' list
481         if not isinstance(group, list):
482             group = [group]
483
484         for act, conditions in self.conditions.iteritems():
485             if action and act != action:
486                 continue
487
488             for condition in list(conditions):
489                 (grp, state, time) = condition
490
491                 # If there is an intersection between grp and group,
492                 # then remove intersected elements
493                 intsec = set(group).intersection(set(grp))
494                 if intsec:
495                     idx = conditions.index(condition)
496                     newgrp = set(grp)
497                     newgrp.difference_update(intsec)
498                     conditions[idx] = (newgrp, state, time)
499                  
500     def get_connected(self, rtype = None):
501         """ Returns the list of RM with the type 'rtype'
502
503         :param rtype: Type of the RM we look for
504         :type rtype: str
505         :return: list of guid
506         """
507         connected = []
508         rclass = ResourceFactory.get_resource_type(rtype)
509         for guid in self.connections:
510             rm = self.ec.get_resource(guid)
511             if not rtype or isinstance(rm, rclass):
512                 connected.append(rm)
513         return connected
514
515     def _needs_reschedule(self, group, state, time):
516         """ Internal method that verify if 'time' has elapsed since 
517         all elements in 'group' have reached state 'state'.
518
519         :param group: Group of RMs to wait for (list of guids)
520         :type group: int or list of int
521         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
522         :type state: str
523         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
524         :type time: str
525
526         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
527         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
528         For the moment, 2m30s is not a correct syntax.
529
530         """
531         reschedule = False
532         delay = reschedule_delay 
533
534         # check state and time elapsed on all RMs
535         for guid in group:
536             rm = self.ec.get_resource(guid)
537             # If the RM state is lower than the requested state we must
538             # reschedule (e.g. if RM is READY but we required STARTED).
539             if rm.state < state:
540                 reschedule = True
541                 break
542
543             # If there is a time restriction, we must verify the
544             # restriction is satisfied 
545             if time:
546                 if state == ResourceState.DISCOVERED:
547                     t = rm.discover_time
548                 if state == ResourceState.PROVISIONED:
549                     t = rm.provision_time
550                 elif state == ResourceState.READY:
551                     t = rm.ready_time
552                 elif state == ResourceState.STARTED:
553                     t = rm.start_time
554                 elif state == ResourceState.STOPPED:
555                     t = rm.stop_time
556                 else:
557                     # Only keep time information for START and STOP
558                     break
559
560                 # time already elapsed since RM changed state
561                 waited = "%fs" % tdiffsec(tnow(), t)
562
563                 # time still to wait
564                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
565
566                 if wait > 0.001:
567                     reschedule = True
568                     delay = "%fs" % wait
569                     break
570
571         return reschedule, delay
572
573     def set_with_conditions(self, name, value, group, state, time):
574         """ Set value 'value' on attribute with name 'name' when 'time' 
575         has elapsed since all elements in 'group' have reached state
576         'state'
577
578         :param name: Name of the attribute to set
579         :type name: str
580         :param name: Value of the attribute to set
581         :type name: str
582         :param group: Group of RMs to wait for (list of guids)
583         :type group: int or list of int
584         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
585         :type state: str
586         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
587         :type time: str
588         """
589
590         reschedule = False
591         delay = reschedule_delay 
592
593         ## evaluate if set conditions are met
594
595         # only can set with conditions after the RM is started
596         if self.state != ResourceState.STARTED:
597             reschedule = True
598         else:
599             reschedule, delay = self._needs_reschedule(group, state, time)
600
601         if reschedule:
602             callback = functools.partial(self.set_with_conditions, 
603                     name, value, group, state, time)
604             self.ec.schedule(delay, callback)
605         else:
606             self.set(name, value)
607
608     def start_with_conditions(self):
609         """ Starts RM when all the conditions in self.conditions for
610         action 'START' are satisfied.
611
612         """
613         reschedule = False
614         delay = reschedule_delay 
615
616         ## evaluate if set conditions are met
617
618         # only can start when RM is either STOPPED or READY
619         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
620             reschedule = True
621             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
622         else:
623             start_conditions = self.conditions.get(ResourceAction.START, [])
624             
625             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
626             
627             # Verify all start conditions are met
628             for (group, state, time) in start_conditions:
629                 # Uncomment for debug
630                 #unmet = []
631                 #for guid in group:
632                 #    rm = self.ec.get_resource(guid)
633                 #    unmet.append((guid, rm._state))
634                 #
635                 #self.debug("---- WAITED STATES ---- %s" % unmet )
636
637                 reschedule, delay = self._needs_reschedule(group, state, time)
638                 if reschedule:
639                     break
640
641         if reschedule:
642             self.ec.schedule(delay, self.start_with_conditions)
643         else:
644             self.debug("----- STARTING ---- ")
645             self.start()
646
647     def stop_with_conditions(self):
648         """ Stops RM when all the conditions in self.conditions for
649         action 'STOP' are satisfied.
650
651         """
652         reschedule = False
653         delay = reschedule_delay 
654
655         ## evaluate if set conditions are met
656
657         # only can stop when RM is STARTED
658         if self.state != ResourceState.STARTED:
659             reschedule = True
660         else:
661             self.debug(" ---- STOP CONDITIONS ---- %s" % 
662                     self.conditions.get(ResourceAction.STOP))
663
664             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
665             for (group, state, time) in stop_conditions:
666                 reschedule, delay = self._needs_reschedule(group, state, time)
667                 if reschedule:
668                     break
669
670         if reschedule:
671             callback = functools.partial(self.stop_with_conditions)
672             self.ec.schedule(delay, callback)
673         else:
674             self.debug(" ----- STOPPING ---- ") 
675             self.stop()
676
677     def connect(self, guid):
678         """ Performs actions that need to be taken upon associating RMs.
679         This method should be redefined when necessary in child classes.
680         """
681         pass
682
683     def disconnect(self, guid):
684         """ Performs actions that need to be taken upon disassociating RMs.
685         This method should be redefined when necessary in child classes.
686         """
687         pass
688
689     def valid_connection(self, guid):
690         """Checks whether a connection with the other RM
691         is valid.
692         This method need to be redefined by each new Resource Manager.
693
694         :param guid: Guid of the current Resource Manager
695         :type guid: int
696         :rtype:  Boolean
697
698         """
699         # TODO: Validate!
700         return True
701     
702     def set_started(self):
703         """ Mark ResourceManager as STARTED """
704         self._start_time = tnow()
705         self._state = ResourceState.STARTED
706         
707     def set_stopped(self):
708         """ Mark ResourceManager as STOPPED """
709         self._stop_time = tnow()
710         self._state = ResourceState.STOPPED
711
712     def set_ready(self):
713         """ Mark ResourceManager as READY """
714         self._ready_time = tnow()
715         self._state = ResourceState.READY
716
717     def set_released(self):
718         """ Mark ResourceManager as REALEASED """
719         self._release_time = tnow()
720         self._state = ResourceState.RELEASED
721
722     def set_finished(self):
723         """ Mark ResourceManager as FINISHED """
724         self._finish_time = tnow()
725         self._state = ResourceState.FINISHED
726
727     def set_failed(self):
728         """ Mark ResourceManager as FAILED """
729         self._failed_time = tnow()
730         self._state = ResourceState.FAILED
731
732     def set_discovered(self):
733         """ Mark ResourceManager as DISCOVERED """
734         self._discover_time = tnow()
735         self._state = ResourceState.DISCOVERED
736
737     def set_provisioned(self):
738         """ Mark ResourceManager as PROVISIONED """
739         self._provision_time = tnow()
740         self._state = ResourceState.PROVISIONED
741
742 class ResourceFactory(object):
743     _resource_types = dict()
744
745     @classmethod
746     def resource_types(cls):
747         """Return the type of the Class"""
748         return cls._resource_types
749
750     @classmethod
751     def get_resource_type(cls, rtype):
752         """Return the type of the Class"""
753         return cls._resource_types.get(rtype)
754
755     @classmethod
756     def register_type(cls, rclass):
757         """Register a new Ressource Manager"""
758         cls._resource_types[rclass.rtype()] = rclass
759
760     @classmethod
761     def create(cls, rtype, ec, guid):
762         """Create a new instance of a Ressource Manager"""
763         rclass = cls._resource_types[rtype]
764         return rclass(ec, guid)
765
766 def populate_factory():
767     """Register all the possible RM that exists in the current version of Nepi.
768     """
769     # Once the factory is populated, don't repopulate
770     if not ResourceFactory.resource_types():
771         for rclass in find_types():
772             ResourceFactory.register_type(rclass)
773
774 def find_types():
775     """Look into the different folders to find all the 
776     availables Resources Managers
777     """
778     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
779     search_path = set(search_path.split(" "))
780    
781     import inspect
782     import nepi.resources 
783     path = os.path.dirname(nepi.resources.__file__)
784     search_path.add(path)
785
786     types = []
787
788     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
789             prefix = "nepi.resources."):
790
791         loader = importer.find_module(modname)
792         
793         try:
794             # Notice: Repeated calls to load_module will act as a reload of teh module
795             if modname in sys.modules:
796                 module = sys.modules.get(modname)
797             else:
798                 module = loader.load_module(modname)
799
800             for attrname in dir(module):
801                 if attrname.startswith("_"):
802                     continue
803
804                 attr = getattr(module, attrname)
805
806                 if attr == ResourceManager:
807                     continue
808
809                 if not inspect.isclass(attr):
810                     continue
811
812                 if issubclass(attr, ResourceManager):
813                     types.append(attr)
814
815                     if not modname in sys.modules:
816                         sys.modules[modname] = module
817
818         except:
819             import traceback
820             import logging
821             err = traceback.format_exc()
822             logger = logging.getLogger("Resource.find_types()")
823             logger.error("Error while loading Resource Managers %s" % err)
824
825     return types
826
827