Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     PROVISIONED = 2
48     READY = 3
49     STARTED = 4
50     STOPPED = 5
51     FINISHED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FINISHED : "FINISHED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     for the ResourceManager class
70     """
71     cls._clsinit()
72     return cls
73
74 def clsinit_copy(cls):
75     """ Initializes template information (i.e. attributes and traces)
76     for the ResourceManager class, inheriting attributes and traces
77     from the parent class
78     """
79     cls._clsinit_copy()
80     return cls
81
82 # Decorator to invoke class initialization method
83 @clsinit
84 class ResourceManager(Logger):
85     _rtype = "Resource"
86     _attributes = None
87     _traces = None
88
89     @classmethod
90     def _register_attribute(cls, attr):
91         """ Resource subclasses will invoke this method to add a 
92         resource attribute
93
94         """
95         cls._attributes[attr.name] = attr
96
97     @classmethod
98     def _remove_attribute(cls, name):
99         """ Resource subclasses will invoke this method to remove a 
100         resource attribute
101
102         """
103         del cls._attributes[name]
104
105     @classmethod
106     def _register_trace(cls, trace):
107         """ Resource subclasses will invoke this method to add a 
108         resource trace
109
110         """
111         cls._traces[trace.name] = trace
112
113     @classmethod
114     def _remove_trace(cls, name):
115         """ Resource subclasses will invoke this method to remove a 
116         resource trace
117
118         """
119         del cls._traces[name]
120
121     @classmethod
122     def _register_attributes(cls):
123         """ Resource subclasses will invoke this method to register
124         resource attributes
125
126         """
127         pass
128
129     @classmethod
130     def _register_traces(cls):
131         """ Resource subclasses will invoke this method to register
132         resource traces
133
134         """
135         pass
136
137     @classmethod
138     def _clsinit(cls):
139         """ ResourceManager child classes have different attributes and traces.
140         Since the templates that hold the information of attributes and traces
141         are 'class attribute' dictionaries, initially they all point to the 
142         parent class ResourceManager instances of those dictionaries. 
143         In order to make these templates independent from the parent's one,
144         it is necessary re-initialize the corresponding dictionaries. 
145         This is the objective of the _clsinit method
146         """
147         # static template for resource attributes
148         cls._attributes = dict()
149         cls._register_attributes()
150
151         # static template for resource traces
152         cls._traces = dict()
153         cls._register_traces()
154
155     @classmethod
156     def _clsinit_copy(cls):
157         """ Same as _clsinit, except that it also inherits all attributes and traces
158         from the parent class.
159         """
160         # static template for resource attributes
161         cls._attributes = copy.deepcopy(cls._attributes)
162         cls._register_attributes()
163
164         # static template for resource traces
165         cls._traces = copy.deepcopy(cls._traces)
166         cls._register_traces()
167
168     @classmethod
169     def rtype(cls):
170         """ Returns the type of the Resource Manager
171
172         """
173         return cls._rtype
174
175     @classmethod
176     def get_attributes(cls):
177         """ Returns a copy of the attributes
178
179         """
180         return copy.deepcopy(cls._attributes.values())
181
182     @classmethod
183     def get_traces(cls):
184         """ Returns a copy of the traces
185
186         """
187         return copy.deepcopy(cls._traces.values())
188
189     def __init__(self, ec, guid):
190         super(ResourceManager, self).__init__(self.rtype())
191         
192         self._guid = guid
193         self._ec = weakref.ref(ec)
194         self._connections = set()
195         self._conditions = dict() 
196
197         # the resource instance gets a copy of all attributes
198         self._attrs = copy.deepcopy(self._attributes)
199
200         # the resource instance gets a copy of all traces
201         self._trcs = copy.deepcopy(self._traces)
202
203         self._state = ResourceState.NEW
204
205         self._start_time = None
206         self._stop_time = None
207         self._discover_time = None
208         self._provision_time = None
209         self._ready_time = None
210         self._release_time = None
211
212     @property
213     def guid(self):
214         """ Returns the guid of the current RM """
215         return self._guid
216
217     @property
218     def ec(self):
219         """ Returns the Experiment Controller """
220         return self._ec()
221
222     @property
223     def connections(self):
224         """ Returns the set of connection for this RM"""
225         return self._connections
226
227     @property
228     def conditions(self):
229         """ Returns the list of conditions for this RM
230         The list is a dictionary with for each action, a list of tuple 
231         describing the conditions. """
232         return self._conditions
233
234     @property
235     def start_time(self):
236         """ Returns timestamp with the time the RM started """
237         return self._start_time
238
239     @property
240     def stop_time(self):
241         """ Returns timestamp with the time the RM stopped """
242         return self._stop_time
243
244     @property
245     def discover_time(self):
246         """ Returns timestamp with the time the RM passed to state discovered """
247         return self._discover_time
248
249     @property
250     def provision_time(self):
251         """ Returns timestamp with the time the RM passed to state provisioned """
252         return self._provision_time
253
254     @property
255     def ready_time(self):
256         """ Returns timestamp with the time the RM passed to state ready  """
257         return self._ready_time
258
259     @property
260     def release_time(self):
261         """ Returns timestamp with the time the RM was released """
262         return self._release_time
263
264     @property
265     def state(self):
266         """ Get the state of the current RM """
267         return self._state
268
269     def log_message(self, msg):
270         """ Improve debugging message by adding more information 
271             as the guid and the type of the RM
272
273         :param msg: Message to log
274         :type msg: str
275         :rtype: str
276         """
277         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
278
279     def connect(self, guid):
280         """ Connect the current RM with the RM 'guid'
281
282         :param guid: Guid of the RM the current RM will be connected
283         :type guid: int
284         """
285         if self.valid_connection(guid):
286             self._connections.add(guid)
287
288     def discover(self):
289         """ Discover the Resource. As it is specific for each RM, 
290         this method take the time when the RM become DISCOVERED and
291         change the status """
292         self._discover_time = strfnow()
293         self._state = ResourceState.DISCOVERED
294
295     def provision(self):
296         """ Provision the Resource. As it is specific for each RM, 
297         this method take the time when the RM become PROVISIONNED and
298         change the status """
299         self._provision_time = strfnow()
300         self._state = ResourceState.PROVISIONED
301
302     def start(self):
303         """ Start the Resource Manager. As it is specific to each RM, this methods
304         just change, after some verifications, the status to STARTED and save the time.
305
306         """
307         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
308             self.error("Wrong state %s for start" % self.state)
309             return
310
311         self._start_time = strfnow()
312         self._state = ResourceState.STARTED
313
314     def stop(self):
315         """ Stop the Resource Manager. As it is specific to each RM, this methods
316         just change, after some verifications, the status to STOPPED and save the time.
317
318         """
319         if not self._state in [ResourceState.STARTED]:
320             self.error("Wrong state %s for stop" % self.state)
321             return
322
323         self._stop_time = strfnow()
324         self._state = ResourceState.STOPPED
325
326     def set(self, name, value):
327         """ Set the value of the attribute
328
329         :param name: Name of the attribute
330         :type name: str
331         :param name: Value of the attribute
332         :type name: str
333         """
334         attr = self._attrs[name]
335         attr.value = value
336
337     def get(self, name):
338         """ Start the Resource Manager
339
340         :param name: Name of the attribute
341         :type name: str
342         :rtype: str
343         """
344         attr = self._attrs[name]
345         return attr.value
346
347     def register_trace(self, name):
348         """ Enable trace
349
350         :param name: Name of the trace
351         :type name: str
352         """
353         trace = self._trcs[name]
354         trace.enabled = True
355
356     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
357         """ Get information on collected trace
358
359         :param name: Name of the trace
360         :type name: str
361
362         :param attr: Can be one of:
363                          - TraceAttr.ALL (complete trace content), 
364                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
365                          - TraceAttr.PATH (full path to the trace file),
366                          - TraceAttr.SIZE (size of trace file). 
367         :type attr: str
368
369         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
370         :type name: int
371
372         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
373         :type name: int
374
375         :rtype: str
376         """
377         pass
378
379     def register_condition(self, action, group, state, 
380             time = None):
381         """ Registers a condition on the resource manager to allow execution 
382         of 'action' only after 'time' has elapsed from the moment all resources 
383         in 'group' reached state 'state'
384
385         :param action: Action to restrict to condition (either 'START' or 'STOP')
386         :type action: str
387         :param group: Group of RMs to wait for (list of guids)
388         :type group: int or list of int
389         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
390         :type state: str
391         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
392         :type time: str
393
394         """
395         conditions = self.conditions.get(action)
396         if not conditions:
397             conditions = list()
398             self._conditions[action] = conditions
399
400         # For each condition to register a tuple of (group, state, time) is 
401         # added to the 'action' list
402         if not isinstance(group, list):
403             group = [group]
404
405         conditions.append((group, state, time))
406
407     def get_connected(self, rtype):
408         """ Return the list of RM with the type 'rtype' 
409
410         :param rtype: Type of the RM we look for
411         :type rtype: str
412         :return : list of guid
413         """
414         connected = []
415         for guid in self.connections:
416             rm = self.ec.get_resource(guid)
417             if rm.rtype() == rtype:
418                 connected.append(rm)
419         return connected
420
421     def _needs_reschedule(self, group, state, time):
422         """ Internal method that verify if 'time' has elapsed since 
423         all elements in 'group' have reached state 'state'.
424
425         :param group: Group of RMs to wait for (list of guids)
426         :type group: int or list of int
427         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
428         :type state: str
429         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
430         :type time: str
431
432         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
433         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
434         For the moment, 2m30s is not a correct syntax.
435
436         """
437         reschedule = False
438         delay = reschedule_delay 
439
440         # check state and time elapsed on all RMs
441         for guid in group:
442             rm = self.ec.get_resource(guid)
443             # If the RM state is lower than the requested state we must
444             # reschedule (e.g. if RM is READY but we required STARTED)
445             if rm.state < state:
446                 reschedule = True
447                 break
448
449             # If there is a time restriction, we must verify the
450             # restriction is satisfied 
451             if time:
452                 if state == ResourceState.DISCOVERED:
453                     t = rm.discover_time
454                 if state == ResourceState.PROVISIONED:
455                     t = rm.provision_time
456                 elif state == ResourceState.READY:
457                     t = rm.ready_time
458                 elif state == ResourceState.STARTED:
459                     t = rm.start_time
460                 elif state == ResourceState.STOPPED:
461                     t = rm.stop_time
462                 else:
463                     # Only keep time information for START and STOP
464                     break
465
466                 d = strfdiff(strfnow(), t)
467                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
468                 if wait > 0.001:
469                     reschedule = True
470                     delay = "%fs" % wait
471                     break
472         return reschedule, delay
473
474     def set_with_conditions(self, name, value, group, state, time):
475         """ Set value 'value' on attribute with name 'name' when 'time' 
476             has elapsed since all elements in 'group' have reached state
477            'state'
478
479         :param name: Name of the attribute to set
480         :type name: str
481         :param name: Value of the attribute to set
482         :type name: str
483         :param group: Group of RMs to wait for (list of guids)
484         :type group: int or list of int
485         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
486         :type state: str
487         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
488         :type time: str
489
490         """
491
492         reschedule = False
493         delay = reschedule_delay 
494
495         ## evaluate if set conditions are met
496
497         # only can set with conditions after the RM is started
498         if self.state != ResourceState.STARTED:
499             reschedule = True
500         else:
501             reschedule, delay = self._needs_reschedule(group, state, time)
502
503         if reschedule:
504             callback = functools.partial(self.set_with_conditions, 
505                     name, value, group, state, time)
506             self.ec.schedule(delay, callback)
507         else:
508             self.set(name, value)
509
510     def start_with_conditions(self):
511         """ Starts RM when all the conditions in self.conditions for
512         action 'START' are satisfied.
513
514         """
515         reschedule = False
516         delay = reschedule_delay 
517
518         ## evaluate if set conditions are met
519
520         # only can start when RM is either STOPPED or READY
521         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
522             reschedule = True
523             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
524         else:
525             start_conditions = self.conditions.get(ResourceAction.START, [])
526             
527             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
528             
529             # Verify all start conditions are met
530             for (group, state, time) in start_conditions:
531                 # Uncomment for debug
532                 #unmet = []
533                 #for guid in group:
534                 #    rm = self.ec.get_resource(guid)
535                 #    unmet.append((guid, rm._state))
536                 #
537                 #self.debug("---- WAITED STATES ---- %s" % unmet )
538
539                 reschedule, delay = self._needs_reschedule(group, state, time)
540                 if reschedule:
541                     break
542
543         if reschedule:
544             self.ec.schedule(delay, self.start_with_conditions)
545         else:
546             self.debug("----- STARTING ---- ")
547             self.start()
548
549     def stop_with_conditions(self):
550         """ Stops RM when all the conditions in self.conditions for
551         action 'STOP' are satisfied.
552
553         """
554         reschedule = False
555         delay = reschedule_delay 
556
557         ## evaluate if set conditions are met
558
559         # only can stop when RM is STARTED
560         if self.state != ResourceState.STARTED:
561             reschedule = True
562         else:
563             self.debug(" ---- STOP CONDITIONS ---- %s" % 
564                     self.conditions.get(ResourceAction.STOP))
565
566             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
567             for (group, state, time) in stop_conditions:
568                 reschedule, delay = self._needs_reschedule(group, state, time)
569                 if reschedule:
570                     break
571
572         if reschedule:
573             callback = functools.partial(self.stop_with_conditions)
574             self.ec.schedule(delay, callback)
575         else:
576             self.debug(" ----- STOPPING ---- ") 
577             self.stop()
578
579     def deploy(self):
580         """ Execute all steps required for the RM to reach the state READY
581
582         """
583         if self._state > ResourceState.READY:
584             self.error("Wrong state %s for deploy" % self.state)
585             return
586
587         self.debug("----- READY ---- ")
588         self._ready_time = strfnow()
589         self._state = ResourceState.READY
590
591     def release(self):
592         """Clean the resource at the end of the Experiment and change the status
593
594         """
595         self._release_time = strfnow()
596         self._state = ResourceState.RELEASED
597
598     def valid_connection(self, guid):
599         """Check if the connection is available. This method need to be 
600         redefined by each new Resource Manager.
601
602         :param guid: Guid of the current Resource Manager
603         :type guid: int
604         :rtype:  Boolean
605
606         """
607         # TODO: Validate!
608         return True
609
610 class ResourceFactory(object):
611     _resource_types = dict()
612
613     @classmethod
614     def resource_types(cls):
615         """Return the type of the Class"""
616         return cls._resource_types
617
618     @classmethod
619     def register_type(cls, rclass):
620         """Register a new Ressource Manager"""
621         cls._resource_types[rclass.rtype()] = rclass
622
623     @classmethod
624     def create(cls, rtype, ec, guid):
625         """Create a new instance of a Ressource Manager"""
626         rclass = cls._resource_types[rtype]
627         return rclass(ec, guid)
628
629 def populate_factory():
630     """Register all the possible RM that exists in the current version of Nepi.
631     """
632     for rclass in find_types():
633         ResourceFactory.register_type(rclass)
634
635 def find_types():
636     """Look into the different folders to find all the 
637     availables Resources Managers
638
639     """
640     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
641     search_path = set(search_path.split(" "))
642    
643     import inspect
644     import nepi.resources 
645     path = os.path.dirname(nepi.resources.__file__)
646     search_path.add(path)
647
648     types = []
649
650     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
651         loader = importer.find_module(modname)
652         try:
653             module = loader.load_module(loader.fullname)
654             for attrname in dir(module):
655                 if attrname.startswith("_"):
656                     continue
657
658                 attr = getattr(module, attrname)
659
660                 if attr == ResourceManager:
661                     continue
662
663                 if not inspect.isclass(attr):
664                     continue
665
666                 if issubclass(attr, ResourceManager):
667                     types.append(attr)
668         except:
669             import traceback
670             import logging
671             err = traceback.format_exc()
672             logger = logging.getLogger("Resource.find_types()")
673             logger.error("Error while lading Resource Managers %s" % err)
674
675     return types
676
677