upgrade documentation
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self._start_time = None
206         self._stop_time = None
207         self._discover_time = None
208         self._provision_time = None
209         self._ready_time = None
210         self._release_time = None
211
212     @property
213     def guid(self):
214         """ Returns the guid of the current RM """
215         return self._guid
216
217     @property
218     def ec(self):
219         """ Returns the Experiment Controller """
220         return self._ec()
221
222     @property
223     def connections(self):
224         """ Returns the set of connection for this RM"""
225         return self._connections
226
227     @property
228     def conditions(self):
229         """ Returns the list of conditions for this RM
230         The list is a dictionary with for each action, a list of tuple 
231         describing the conditions. """
232         return self._conditions
233
234     @property
235     def start_time(self):
236         """ Returns timestamp with the time the RM started """
237         return self._start_time
238
239     @property
240     def stop_time(self):
241         """ Returns timestamp with the time the RM stopped """
242         return self._stop_time
243
244     @property
245     def discover_time(self):
246         """ Returns timestamp with the time the RM passed to state discovered """
247         return self._discover_time
248
249     @property
250     def provision_time(self):
251         """ Returns timestamp with the time the RM passed to state provisioned """
252         return self._provision_time
253
254     @property
255     def ready_time(self):
256         """ Returns timestamp with the time the RM passed to state ready  """
257         return self._ready_time
258
259     @property
260     def release_time(self):
261         """ Returns timestamp with the time the RM was released """
262         return self._release_time
263
264     @property
265     def state(self):
266         """ Get the state of the current RM """
267         return self._state
268
269     def log_message(self, msg):
270         """ Improve debugging message by adding more information 
271             as the guid and the type of the RM
272
273         :param msg: Message to log
274         :type msg: str
275         :rtype: str
276         """
277         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
278
279     def connect(self, guid):
280         """ Connect the current RM with the RM 'guid'
281
282         :param guid: Guid of the RM the current RM will be connected
283         :type guid: int
284         """
285         if self.valid_connection(guid):
286             self._connections.add(guid)
287
288     def discover(self):
289         """ Discover the Resource. As it is specific for each RM, 
290         this method take the time when the RM become DISCOVERED and
291         change the status """
292         self._discover_time = strfnow()
293         self._state = ResourceState.DISCOVERED
294
295     def provision(self):
296         """ Provision the Resource. As it is specific for each RM, 
297         this method take the time when the RM become PROVISIONNED and
298         change the status """
299         self._provision_time = strfnow()
300         self._state = ResourceState.PROVISIONED
301
302     def start(self):
303         """ Start the Resource Manager. As it is specific to each RM, this methods
304         just change, after some verifications, the status to STARTED and save the time.
305
306         """
307         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
308             self.error("Wrong state %s for start" % self.state)
309             return
310
311         self._start_time = strfnow()
312         self._state = ResourceState.STARTED
313
314     def stop(self):
315         """ Stop the Resource Manager. As it is specific to each RM, this methods
316         just change, after some verifications, the status to STOPPED and save the time.
317
318         """
319         if not self._state in [ResourceState.STARTED]:
320             self.error("Wrong state %s for stop" % self.state)
321             return
322
323         self._stop_time = strfnow()
324         self._state = ResourceState.STOPPED
325
326     def set(self, name, value):
327         """ Set the value of the attribute
328
329         :param name: Name of the attribute
330         :type name: str
331         :param name: Value of the attribute
332         :type name: str
333         """
334         attr = self._attrs[name]
335         attr.value = value
336
337     def get(self, name):
338         """ Start the Resource Manager
339
340         :param name: Name of the attribute
341         :type name: str
342         :rtype: str
343         """
344         attr = self._attrs[name]
345         return attr.value
346
347     def register_trace(self, name):
348         """ Enable trace
349
350         :param name: Name of the trace
351         :type name: str
352         """
353         trace = self._trcs[name]
354         trace.enabled = True
355
356     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
357         """ Get information on collected trace
358
359         :param name: Name of the trace
360         :type name: str
361
362         :param attr: Can be one of:
363                          - TraceAttr.ALL (complete trace content), 
364                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
365                          - TraceAttr.PATH (full path to the trace file),
366                          - TraceAttr.SIZE (size of trace file). 
367         :type attr: str
368
369         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
370         :type name: int
371
372         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
373         :type name: int
374
375         :rtype: str
376         """
377         pass
378
379     def register_condition(self, action, group, state, 
380             time = None):
381         """ Registers a condition on the resource manager to allow execution 
382         of 'action' only after 'time' has elapsed from the moment all resources 
383         in 'group' reached state 'state'
384
385         :param action: Action to restrict to condition (either 'START' or 'STOP')
386         :type action: str
387         :param group: Group of RMs to wait for (list of guids)
388         :type group: int or list of int
389         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
390         :type state: str
391         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
392         :type time: str
393
394         """
395         conditions = self.conditions.get(action)
396         if not conditions:
397             conditions = list()
398             self._conditions[action] = conditions
399
400         # For each condition to register a tuple of (group, state, time) is 
401         # added to the 'action' list
402         if not isinstance(group, list):
403             group = [group]
404
405         conditions.append((group, state, time))
406
407     def get_connected(self, rtype):
408         """ Return the list of RM with the type 'rtype'
409
410         :param rtype: Type of the RM we look for
411         :type rtype: str
412         :return: list of guid
413         """
414         connected = []
415         for guid in self.connections:
416             rm = self.ec.get_resource(guid)
417             if rm.rtype() == rtype:
418                 connected.append(rm)
419         return connected
420
421     def _needs_reschedule(self, group, state, time):
422         """ Internal method that verify if 'time' has elapsed since 
423         all elements in 'group' have reached state 'state'.
424
425         :param group: Group of RMs to wait for (list of guids)
426         :type group: int or list of int
427         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
428         :type state: str
429         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
430         :type time: str
431
432         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
433         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
434         For the moment, 2m30s is not a correct syntax.
435
436         """
437         reschedule = False
438         delay = reschedule_delay 
439
440         # check state and time elapsed on all RMs
441         for guid in group:
442             rm = self.ec.get_resource(guid)
443             # If the RM state is lower than the requested state we must
444             # reschedule (e.g. if RM is READY but we required STARTED)
445             if rm.state < state:
446                 reschedule = True
447                 break
448
449             # If there is a time restriction, we must verify the
450             # restriction is satisfied 
451             if time:
452                 if state == ResourceState.DISCOVERED:
453                     t = rm.discover_time
454                 if state == ResourceState.PROVISIONED:
455                     t = rm.provision_time
456                 elif state == ResourceState.READY:
457                     t = rm.ready_time
458                 elif state == ResourceState.STARTED:
459                     t = rm.start_time
460                 elif state == ResourceState.STOPPED:
461                     t = rm.stop_time
462                 else:
463                     # Only keep time information for START and STOP
464                     break
465
466                 d = strfdiff(strfnow(), t)
467                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
468                 if wait > 0.001:
469                     reschedule = True
470                     delay = "%fs" % wait
471                     break
472         return reschedule, delay
473
474     def set_with_conditions(self, name, value, group, state, time):
475         """ Set value 'value' on attribute with name 'name' when 'time' 
476         has elapsed since all elements in 'group' have reached state
477         'state'
478
479         :param name: Name of the attribute to set
480         :type name: str
481         :param name: Value of the attribute to set
482         :type name: str
483         :param group: Group of RMs to wait for (list of guids)
484         :type group: int or list of int
485         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
486         :type state: str
487         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
488         :type time: str
489         """
490
491         reschedule = False
492         delay = reschedule_delay 
493
494         ## evaluate if set conditions are met
495
496         # only can set with conditions after the RM is started
497         if self.state != ResourceState.STARTED:
498             reschedule = True
499         else:
500             reschedule, delay = self._needs_reschedule(group, state, time)
501
502         if reschedule:
503             callback = functools.partial(self.set_with_conditions, 
504                     name, value, group, state, time)
505             self.ec.schedule(delay, callback)
506         else:
507             self.set(name, value)
508
509     def start_with_conditions(self):
510         """ Starts RM when all the conditions in self.conditions for
511         action 'START' are satisfied.
512
513         """
514         reschedule = False
515         delay = reschedule_delay 
516
517         ## evaluate if set conditions are met
518
519         # only can start when RM is either STOPPED or READY
520         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
521             reschedule = True
522             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
523         else:
524             start_conditions = self.conditions.get(ResourceAction.START, [])
525             
526             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
527             
528             # Verify all start conditions are met
529             for (group, state, time) in start_conditions:
530                 # Uncomment for debug
531                 #unmet = []
532                 #for guid in group:
533                 #    rm = self.ec.get_resource(guid)
534                 #    unmet.append((guid, rm._state))
535                 #
536                 #self.debug("---- WAITED STATES ---- %s" % unmet )
537
538                 reschedule, delay = self._needs_reschedule(group, state, time)
539                 if reschedule:
540                     break
541
542         if reschedule:
543             self.ec.schedule(delay, self.start_with_conditions)
544         else:
545             self.debug("----- STARTING ---- ")
546             self.start()
547
548     def stop_with_conditions(self):
549         """ Stops RM when all the conditions in self.conditions for
550         action 'STOP' are satisfied.
551
552         """
553         reschedule = False
554         delay = reschedule_delay 
555
556         ## evaluate if set conditions are met
557
558         # only can stop when RM is STARTED
559         if self.state != ResourceState.STARTED:
560             reschedule = True
561         else:
562             self.debug(" ---- STOP CONDITIONS ---- %s" % 
563                     self.conditions.get(ResourceAction.STOP))
564
565             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
566             for (group, state, time) in stop_conditions:
567                 reschedule, delay = self._needs_reschedule(group, state, time)
568                 if reschedule:
569                     break
570
571         if reschedule:
572             callback = functools.partial(self.stop_with_conditions)
573             self.ec.schedule(delay, callback)
574         else:
575             self.debug(" ----- STOPPING ---- ") 
576             self.stop()
577
578     def deploy(self):
579         """ Execute all steps required for the RM to reach the state READY
580
581         """
582         if self._state > ResourceState.READY:
583             self.error("Wrong state %s for deploy" % self.state)
584             return
585
586         self.debug("----- READY ---- ")
587         self._ready_time = strfnow()
588         self._state = ResourceState.READY
589
590     def release(self):
591         """Clean the resource at the end of the Experiment and change the status
592
593         """
594         self._release_time = strfnow()
595         self._state = ResourceState.RELEASED
596
597     def valid_connection(self, guid):
598         """Check if the connection is available. This method need to be 
599         redefined by each new Resource Manager.
600
601         :param guid: Guid of the current Resource Manager
602         :type guid: int
603         :rtype:  Boolean
604
605         """
606         # TODO: Validate!
607         return True
608
609 class ResourceFactory(object):
610     _resource_types = dict()
611
612     @classmethod
613     def resource_types(cls):
614         """Return the type of the Class"""
615         return cls._resource_types
616
617     @classmethod
618     def register_type(cls, rclass):
619         """Register a new Ressource Manager"""
620         cls._resource_types[rclass.rtype()] = rclass
621
622     @classmethod
623     def create(cls, rtype, ec, guid):
624         """Create a new instance of a Ressource Manager"""
625         rclass = cls._resource_types[rtype]
626         return rclass(ec, guid)
627
628 def populate_factory():
629     """Register all the possible RM that exists in the current version of Nepi.
630     """
631     for rclass in find_types():
632         ResourceFactory.register_type(rclass)
633
634 def find_types():
635     """Look into the different folders to find all the 
636     availables Resources Managers
637
638     """
639     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
640     search_path = set(search_path.split(" "))
641    
642     import inspect
643     import nepi.resources 
644     path = os.path.dirname(nepi.resources.__file__)
645     search_path.add(path)
646
647     types = []
648
649     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
650         loader = importer.find_module(modname)
651         try:
652             module = loader.load_module(loader.fullname)
653             for attrname in dir(module):
654                 if attrname.startswith("_"):
655                     continue
656
657                 attr = getattr(module, attrname)
658
659                 if attr == ResourceManager:
660                     continue
661
662                 if not inspect.isclass(attr):
663                     continue
664
665                 if issubclass(attr, ResourceManager):
666                     types.append(attr)
667         except:
668             import traceback
669             import logging
670             err = traceback.format_exc()
671             logger = logging.getLogger("Resource.find_types()")
672             logger.error("Error while lading Resource Managers %s" % err)
673
674     return types
675
676