Adding trace Collector RM
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "1s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self._start_time = None
206         self._stop_time = None
207         self._discover_time = None
208         self._provision_time = None
209         self._ready_time = None
210         self._release_time = None
211         self._finish_time = None
212         self._failed_time = None
213
214     @property
215     def guid(self):
216         """ Returns the global unique identifier of the RM """
217         return self._guid
218
219     @property
220     def ec(self):
221         """ Returns the Experiment Controller """
222         return self._ec()
223
224     @property
225     def connections(self):
226         """ Returns the set of guids of connected RMs"""
227         return self._connections
228
229     @property
230     def conditions(self):
231         """ Returns the conditions to which the RM is subjected to.
232         
233         The object returned by this method is a dictionary indexed by
234         ResourceAction."""
235         return self._conditions
236
237     @property
238     def start_time(self):
239         """ Returns the start time of the RM as a timestamp"""
240         return self._start_time
241
242     @property
243     def stop_time(self):
244         """ Returns the stop time of the RM as a timestamp"""
245         return self._stop_time
246
247     @property
248     def discover_time(self):
249         """ Returns the time discovering was finished for the RM as a timestamp"""
250         return self._discover_time
251
252     @property
253     def provision_time(self):
254         """ Returns the time provisioning was finished for the RM as a timestamp"""
255         return self._provision_time
256
257     @property
258     def ready_time(self):
259         """ Returns the time deployment was finished for the RM as a timestamp"""
260         return self._ready_time
261
262     @property
263     def release_time(self):
264         """ Returns the release time of the RM as a timestamp"""
265         return self._release_time
266
267     @property
268     def finish_time(self):
269         """ Returns the finalization time of the RM as a timestamp"""
270         return self._finish_time
271
272     @property
273     def failed_time(self):
274         """ Returns the time failure occured for the RM as a timestamp"""
275         return self._failed_time
276
277     @property
278     def state(self):
279         """ Get the state of the current RM """
280         return self._state
281
282     def log_message(self, msg):
283         """ Returns the log message formatted with added information.
284
285         :param msg: text message
286         :type msg: str
287         :rtype: str
288         """
289         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
290
291     def register_connection(self, guid):
292         """ Registers a connection to the RM identified by guid
293
294         :param guid: Global unique identified of the RM to connect to
295         :type guid: int
296         """
297         if self.valid_connection(guid):
298             self.connect(guid)
299             self._connections.add(guid)
300
301     def unregister_connection(self, guid):
302         """ Removes a registered connection to the RM identified by guid
303
304         :param guid: Global unique identified of the RM to connect to
305         :type guid: int
306         """
307         if guid in self._connections:
308             self.disconnect(guid)
309             self._connections.remove(guid)
310
311     def discover(self):
312         """ Performs resource discovery.
313
314         This  method is resposible for selecting an individual resource
315         matching user requirements.
316         This method should be redefined when necessary in child classes.
317         """ 
318         self._discover_time = tnow()
319         self._state = ResourceState.DISCOVERED
320
321     def provision(self):
322         """ Performs resource provisioning.
323
324         This  method is resposible for provisioning one resource.
325         After this method has been successfully invoked, the resource
326         should be acccesible/controllable by the RM.
327         This method should be redefined when necessary in child classes.
328         """ 
329         self._provision_time = tnow()
330         self._state = ResourceState.PROVISIONED
331
332     def start(self):
333         """ Starts the resource.
334         
335         There is no generic start behavior for all resources.
336         This method should be redefined when necessary in child classes.
337         """
338         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
339             self.error("Wrong state %s for start" % self.state)
340             return
341
342         self._start_time = tnow()
343         self._state = ResourceState.STARTED
344
345     def stop(self):
346         """ Stops the resource.
347         
348         There is no generic stop behavior for all resources.
349         This method should be redefined when necessary in child classes.
350         """
351         if not self._state in [ResourceState.STARTED]:
352             self.error("Wrong state %s for stop" % self.state)
353             return
354
355         self._stop_time = tnow()
356         self._state = ResourceState.STOPPED
357
358     def set(self, name, value):
359         """ Set the value of the attribute
360
361         :param name: Name of the attribute
362         :type name: str
363         :param name: Value of the attribute
364         :type name: str
365         """
366         attr = self._attrs[name]
367         attr.value = value
368
369     def get(self, name):
370         """ Returns the value of the attribute
371
372         :param name: Name of the attribute
373         :type name: str
374         :rtype: str
375         """
376         attr = self._attrs[name]
377         return attr.value
378
379     def register_trace(self, name):
380         """ Explicitly enable trace generation
381
382         :param name: Name of the trace
383         :type name: str
384         """
385         trace = self._trcs[name]
386         trace.enabled = True
387
388     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
389         """ Get information on collected trace
390
391         :param name: Name of the trace
392         :type name: str
393
394         :param attr: Can be one of:
395                          - TraceAttr.ALL (complete trace content), 
396                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
397                          - TraceAttr.PATH (full path to the trace file),
398                          - TraceAttr.SIZE (size of trace file). 
399         :type attr: str
400
401         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
402         :type name: int
403
404         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
405         :type name: int
406
407         :rtype: str
408         """
409         pass
410
411     def register_condition(self, action, group, state, time = None):
412         """ Registers a condition on the resource manager to allow execution 
413         of 'action' only after 'time' has elapsed from the moment all resources 
414         in 'group' reached state 'state'
415
416         :param action: Action to restrict to condition (either 'START' or 'STOP')
417         :type action: str
418         :param group: Group of RMs to wait for (list of guids)
419         :type group: int or list of int
420         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
421         :type state: str
422         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
423         :type time: str
424
425         """
426
427         if not action in self.conditions:
428             self._conditions[action] = list()
429         
430         conditions = self.conditions.get(action)
431
432         # For each condition to register a tuple of (group, state, time) is 
433         # added to the 'action' list
434         if not isinstance(group, list):
435             group = [group]
436
437         conditions.append((group, state, time))
438
439     def unregister_condition(self, group, action = None):
440         """ Removed conditions for a certain group of guids
441
442         :param action: Action to restrict to condition (either 'START' or 'STOP')
443         :type action: str
444
445         :param group: Group of RMs to wait for (list of guids)
446         :type group: int or list of int
447
448         """
449         # For each condition a tuple of (group, state, time) is 
450         # added to the 'action' list
451         if not isinstance(group, list):
452             group = [group]
453
454         for act, conditions in self.conditions.iteritems():
455             if action and act != action:
456                 continue
457
458             for condition in list(conditions):
459                 (grp, state, time) = condition
460
461                 # If there is an intersection between grp and group,
462                 # then remove intersected elements
463                 intsec = set(group).intersection(set(grp))
464                 if intsec:
465                     idx = conditions.index(condition)
466                     newgrp = set(grp)
467                     newgrp.difference_update(intsec)
468                     conditions[idx] = (newgrp, state, time)
469                  
470     def get_connected(self, rtype = None):
471         """ Returns the list of RM with the type 'rtype'
472
473         :param rtype: Type of the RM we look for
474         :type rtype: str
475         :return: list of guid
476         """
477         connected = []
478         for guid in self.connections:
479             rm = self.ec.get_resource(guid)
480             if not rtype or rm.rtype() == rtype:
481                 connected.append(rm)
482         return connected
483
484     def _needs_reschedule(self, group, state, time):
485         """ Internal method that verify if 'time' has elapsed since 
486         all elements in 'group' have reached state 'state'.
487
488         :param group: Group of RMs to wait for (list of guids)
489         :type group: int or list of int
490         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
491         :type state: str
492         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
493         :type time: str
494
495         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
496         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
497         For the moment, 2m30s is not a correct syntax.
498
499         """
500         reschedule = False
501         delay = reschedule_delay 
502
503         # check state and time elapsed on all RMs
504         for guid in group:
505             rm = self.ec.get_resource(guid)
506             # If the RM state is lower than the requested state we must
507             # reschedule (e.g. if RM is READY but we required STARTED).
508             if rm.state < state:
509                 reschedule = True
510                 break
511
512             # If there is a time restriction, we must verify the
513             # restriction is satisfied 
514             if time:
515                 if state == ResourceState.DISCOVERED:
516                     t = rm.discover_time
517                 if state == ResourceState.PROVISIONED:
518                     t = rm.provision_time
519                 elif state == ResourceState.READY:
520                     t = rm.ready_time
521                 elif state == ResourceState.STARTED:
522                     t = rm.start_time
523                 elif state == ResourceState.STOPPED:
524                     t = rm.stop_time
525                 else:
526                     # Only keep time information for START and STOP
527                     break
528
529                 # time already elapsed since RM changed state
530                 waited = "%fs" % tdiffsec(tnow(), t)
531
532                 # time still to wait
533                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
534
535                 if wait > 0.001:
536                     reschedule = True
537                     delay = "%fs" % wait
538                     break
539
540         return reschedule, delay
541
542     def set_with_conditions(self, name, value, group, state, time):
543         """ Set value 'value' on attribute with name 'name' when 'time' 
544         has elapsed since all elements in 'group' have reached state
545         'state'
546
547         :param name: Name of the attribute to set
548         :type name: str
549         :param name: Value of the attribute to set
550         :type name: str
551         :param group: Group of RMs to wait for (list of guids)
552         :type group: int or list of int
553         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
554         :type state: str
555         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
556         :type time: str
557         """
558
559         reschedule = False
560         delay = reschedule_delay 
561
562         ## evaluate if set conditions are met
563
564         # only can set with conditions after the RM is started
565         if self.state != ResourceState.STARTED:
566             reschedule = True
567         else:
568             reschedule, delay = self._needs_reschedule(group, state, time)
569
570         if reschedule:
571             callback = functools.partial(self.set_with_conditions, 
572                     name, value, group, state, time)
573             self.ec.schedule(delay, callback)
574         else:
575             self.set(name, value)
576
577     def start_with_conditions(self):
578         """ Starts RM when all the conditions in self.conditions for
579         action 'START' are satisfied.
580
581         """
582         reschedule = False
583         delay = reschedule_delay 
584
585         ## evaluate if set conditions are met
586
587         # only can start when RM is either STOPPED or READY
588         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
589             reschedule = True
590             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
591         else:
592             start_conditions = self.conditions.get(ResourceAction.START, [])
593             
594             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
595             
596             # Verify all start conditions are met
597             for (group, state, time) in start_conditions:
598                 # Uncomment for debug
599                 #unmet = []
600                 #for guid in group:
601                 #    rm = self.ec.get_resource(guid)
602                 #    unmet.append((guid, rm._state))
603                 #
604                 #self.debug("---- WAITED STATES ---- %s" % unmet )
605
606                 reschedule, delay = self._needs_reschedule(group, state, time)
607                 if reschedule:
608                     break
609
610         if reschedule:
611             self.ec.schedule(delay, self.start_with_conditions)
612         else:
613             self.debug("----- STARTING ---- ")
614             self.start()
615
616     def stop_with_conditions(self):
617         """ Stops RM when all the conditions in self.conditions for
618         action 'STOP' are satisfied.
619
620         """
621         reschedule = False
622         delay = reschedule_delay 
623
624         ## evaluate if set conditions are met
625
626         # only can stop when RM is STARTED
627         if self.state != ResourceState.STARTED:
628             reschedule = True
629         else:
630             self.debug(" ---- STOP CONDITIONS ---- %s" % 
631                     self.conditions.get(ResourceAction.STOP))
632
633             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
634             for (group, state, time) in stop_conditions:
635                 reschedule, delay = self._needs_reschedule(group, state, time)
636                 if reschedule:
637                     break
638
639         if reschedule:
640             callback = functools.partial(self.stop_with_conditions)
641             self.ec.schedule(delay, callback)
642         else:
643             self.debug(" ----- STOPPING ---- ") 
644             self.stop()
645
646     def deploy(self):
647         """ Execute all steps required for the RM to reach the state READY
648
649         """
650         if self._state > ResourceState.READY:
651             self.error("Wrong state %s for deploy" % self.state)
652             return
653
654         self.debug("----- READY ---- ")
655         self._ready_time = tnow()
656         self._state = ResourceState.READY
657
658     def release(self):
659         """Release any resources used by this RM
660
661         """
662         self._release_time = tnow()
663         self._state = ResourceState.RELEASED
664
665     def finish(self):
666         """ Mark ResourceManager as FINISHED
667
668         """
669         self._finish_time = tnow()
670         self._state = ResourceState.FINISHED
671
672     def fail(self):
673         """ Mark ResourceManager as FAILED
674
675         """
676         self._failed_time = tnow()
677         self._state = ResourceState.FAILED
678
679     def connect(self, guid):
680         """ Performs actions that need to be taken upon associating RMs.
681         This method should be redefined when necessary in child classes.
682         """
683         pass
684
685     def disconnect(self, guid):
686         """ Performs actions that need to be taken upon disassociating RMs.
687         This method should be redefined when necessary in child classes.
688         """
689         pass
690
691     def valid_connection(self, guid):
692         """Checks whether a connection with the other RM
693         is valid.
694         This method need to be redefined by each new Resource Manager.
695
696         :param guid: Guid of the current Resource Manager
697         :type guid: int
698         :rtype:  Boolean
699
700         """
701         # TODO: Validate!
702         return True
703
704 class ResourceFactory(object):
705     _resource_types = dict()
706
707     @classmethod
708     def resource_types(cls):
709         """Return the type of the Class"""
710         return cls._resource_types
711
712     @classmethod
713     def register_type(cls, rclass):
714         """Register a new Ressource Manager"""
715         cls._resource_types[rclass.rtype()] = rclass
716
717     @classmethod
718     def create(cls, rtype, ec, guid):
719         """Create a new instance of a Ressource Manager"""
720         rclass = cls._resource_types[rtype]
721         return rclass(ec, guid)
722
723 def populate_factory():
724     """Register all the possible RM that exists in the current version of Nepi.
725     """
726     for rclass in find_types():
727         ResourceFactory.register_type(rclass)
728
729 def find_types():
730     """Look into the different folders to find all the 
731     availables Resources Managers
732
733     """
734     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
735     search_path = set(search_path.split(" "))
736    
737     import inspect
738     import nepi.resources 
739     path = os.path.dirname(nepi.resources.__file__)
740     search_path.add(path)
741
742     types = []
743
744     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
745         loader = importer.find_module(modname)
746         try:
747             module = loader.load_module(loader.fullname)
748             for attrname in dir(module):
749                 if attrname.startswith("_"):
750                     continue
751
752                 attr = getattr(module, attrname)
753
754                 if attr == ResourceManager:
755                     continue
756
757                 if not inspect.isclass(attr):
758                     continue
759
760                 if issubclass(attr, ResourceManager):
761                     types.append(attr)
762         except:
763             import traceback
764             import logging
765             err = traceback.format_exc()
766             logger = logging.getLogger("Resource.find_types()")
767             logger.error("Error while lading Resource Managers %s" % err)
768
769     return types
770
771