BugFix in LinuxNode unit test
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.util.logger import Logger
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     DEPLOY = 0
35     START = 1
36     STOP = 2
37
38 class ResourceState:
39     NEW = 0
40     DISCOVERED = 1
41     PROVISIONED = 2
42     READY = 3
43     STARTED = 4
44     STOPPED = 5
45     FINISHED = 6
46     FAILED = 7
47     RELEASED = 8
48
49 ResourceState2str = dict({
50     ResourceState.NEW : "NEW",
51     ResourceState.DISCOVERED : "DISCOVERED",
52     ResourceState.PROVISIONED : "PROVISIONED",
53     ResourceState.READY : "READY",
54     ResourceState.STARTED : "STARTED",
55     ResourceState.STOPPED : "STOPPED",
56     ResourceState.FINISHED : "FINISHED",
57     ResourceState.FAILED : "FAILED",
58     ResourceState.RELEASED : "RELEASED",
59     })
60
61 def clsinit(cls):
62     """ Initializes template information (i.e. attributes and traces)
63     for the ResourceManager class
64     """
65     cls._clsinit()
66     return cls
67
68 def clsinit_copy(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     for the ResourceManager class, inheriting attributes and traces
71     from the parent class
72     """
73     cls._clsinit_copy()
74     return cls
75
76 # Decorator to invoke class initialization method
77 @clsinit
78 class ResourceManager(Logger):
79     _rtype = "Resource"
80     _attributes = None
81     _traces = None
82
83     @classmethod
84     def _register_attribute(cls, attr):
85         """ Resource subclasses will invoke this method to add a 
86         resource attribute
87
88         """
89         cls._attributes[attr.name] = attr
90
91     @classmethod
92     def _remove_attribute(cls, name):
93         """ Resource subclasses will invoke this method to remove a 
94         resource attribute
95
96         """
97         del cls._attributes[name]
98
99     @classmethod
100     def _register_trace(cls, trace):
101         """ Resource subclasses will invoke this method to add a 
102         resource trace
103
104         """
105         cls._traces[trace.name] = trace
106
107     @classmethod
108     def _remove_trace(cls, name):
109         """ Resource subclasses will invoke this method to remove a 
110         resource trace
111
112         """
113         del cls._traces[name]
114
115     @classmethod
116     def _register_attributes(cls):
117         """ Resource subclasses will invoke this method to register
118         resource attributes
119
120         """
121         pass
122
123     @classmethod
124     def _register_traces(cls):
125         """ Resource subclasses will invoke this method to register
126         resource traces
127
128         """
129         pass
130
131     @classmethod
132     def _clsinit(cls):
133         """ ResourceManager child classes have different attributes and traces.
134         Since the templates that hold the information of attributes and traces
135         are 'class attribute' dictionaries, initially they all point to the 
136         parent class ResourceManager instances of those dictionaries. 
137         In order to make these templates independent from the parent's one,
138         it is necessary re-initialize the corresponding dictionaries. 
139         This is the objective of the _clsinit method
140         """
141         # static template for resource attributes
142         cls._attributes = dict()
143         cls._register_attributes()
144
145         # static template for resource traces
146         cls._traces = dict()
147         cls._register_traces()
148
149     @classmethod
150     def _clsinit_copy(cls):
151         """ Same as _clsinit, except that it also inherits all attributes and traces
152         from the parent class.
153         """
154         # static template for resource attributes
155         cls._attributes = copy.deepcopy(cls._attributes)
156         cls._register_attributes()
157
158         # static template for resource traces
159         cls._traces = copy.deepcopy(cls._traces)
160         cls._register_traces()
161
162     @classmethod
163     def rtype(cls):
164         return cls._rtype
165
166     @classmethod
167     def get_attributes(cls):
168         """ Returns a copy of the attributes
169
170         """
171         return copy.deepcopy(cls._attributes.values())
172
173     @classmethod
174     def get_traces(cls):
175         """ Returns a copy of the traces
176
177         """
178         return copy.deepcopy(cls._traces.values())
179
180     def __init__(self, ec, guid):
181         super(ResourceManager, self).__init__(self.rtype())
182         
183         self._guid = guid
184         self._ec = weakref.ref(ec)
185         self._connections = set()
186         self._conditions = dict() 
187
188         # the resource instance gets a copy of all attributes
189         self._attrs = copy.deepcopy(self._attributes)
190
191         # the resource instance gets a copy of all traces
192         self._trcs = copy.deepcopy(self._traces)
193
194         self._state = ResourceState.NEW
195
196         self._start_time = None
197         self._stop_time = None
198         self._discover_time = None
199         self._provision_time = None
200         self._ready_time = None
201         self._release_time = None
202
203     @property
204     def guid(self):
205         return self._guid
206
207     @property
208     def ec(self):
209         return self._ec()
210
211     @property
212     def connections(self):
213         return self._connections
214
215     @property
216     def conditions(self):
217         return self._conditions
218
219     @property
220     def start_time(self):
221         """ Returns timestamp with the time the RM started """
222         return self._start_time
223
224     @property
225     def stop_time(self):
226         """ Returns timestamp with the time the RM stopped """
227         return self._stop_time
228
229     @property
230     def discover_time(self):
231         """ Returns timestamp with the time the RM passed to state discovered """
232         return self._discover_time
233
234     @property
235     def provision_time(self):
236         """ Returns timestamp with the time the RM passed to state provisioned """
237         return self._provision_time
238
239     @property
240     def ready_time(self):
241         """ Returns timestamp with the time the RM passed to state ready  """
242         return self._ready_time
243
244     @property
245     def release_time(self):
246         """ Returns timestamp with the time the RM was released """
247         return self._release_time
248
249     @property
250     def state(self):
251         return self._state
252
253     def log_message(self, msg):
254         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
255
256     def connect(self, guid):
257         if self.valid_connection(guid):
258             self._connections.add(guid)
259
260     def discover(self):
261         self._discover_time = strfnow()
262         self._state = ResourceState.DISCOVERED
263
264     def provision(self):
265         self._provision_time = strfnow()
266         self._state = ResourceState.PROVISIONED
267
268     def start(self):
269         """ Start the Resource Manager
270
271         """
272         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
273             self.error("Wrong state %s for start" % self.state)
274             return
275
276         self._start_time = strfnow()
277         self._state = ResourceState.STARTED
278
279     def stop(self):
280         """ Start the Resource Manager
281
282         """
283         if not self._state in [ResourceState.STARTED]:
284             self.error("Wrong state %s for stop" % self.state)
285             return
286
287         self._stop_time = strfnow()
288         self._state = ResourceState.STOPPED
289
290     def set(self, name, value):
291         """ Set the value of the attribute
292
293         :param name: Name of the attribute
294         :type name: str
295         :param name: Value of the attribute
296         :type name: str
297         """
298         attr = self._attrs[name]
299         attr.value = value
300
301     def get(self, name):
302         """ Start the Resource Manager
303
304         :param name: Name of the attribute
305         :type name: str
306         :rtype: str
307         """
308         attr = self._attrs[name]
309         return attr.value
310
311     def register_trace(self, name):
312         """ Enable trace
313
314         :param name: Name of the trace
315         :type name: str
316         """
317         trace = self._trcs[name]
318         trace.enabled = True
319
320     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
321         """ Get information on collected trace
322
323         :param name: Name of the trace
324         :type name: str
325
326         :param attr: Can be one of:
327                          - TraceAttr.ALL (complete trace content), 
328                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
329                          - TraceAttr.PATH (full path to the trace file),
330                          - TraceAttr.SIZE (size of trace file). 
331         :type attr: str
332
333         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
334         :type name: int
335
336         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
337         :type name: int
338
339         :rtype: str
340         """
341         pass
342
343     def register_condition(self, action, group, state, 
344             time = None):
345         """ Registers a condition on the resource manager to allow execution 
346         of 'action' only after 'time' has elapsed from the moment all resources 
347         in 'group' reached state 'state'
348
349         :param action: Action to restrict to condition (either 'START' or 'STOP')
350         :type action: str
351         :param group: Group of RMs to wait for (list of guids)
352         :type group: int or list of int
353         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
354         :type state: str
355         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
356         :type time: str
357
358         """
359         conditions = self.conditions.get(action)
360         if not conditions:
361             conditions = list()
362             self._conditions[action] = conditions
363
364         # For each condition to register a tuple of (group, state, time) is 
365         # added to the 'action' list
366         if not isinstance(group, list):
367             group = [group]
368
369         conditions.append((group, state, time))
370
371     def get_connected(self, rtype):
372         connected = []
373         for guid in self.connections:
374             rm = self.ec.get_resource(guid)
375             if rm.rtype() == rtype:
376                 connected.append(rm)
377         return connected
378
379     def _needs_reschedule(self, group, state, time):
380         """ Internal method that verify if 'time' has elapsed since 
381         all elements in 'group' have reached state 'state'.
382
383         :param group: Group of RMs to wait for (list of guids)
384         :type group: int or list of int
385         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
386         :type state: str
387         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
388         :type time: str
389
390         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
391         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
392         For the moment, 2m30s is not a correct syntax.
393
394         """
395         reschedule = False
396         delay = reschedule_delay 
397
398         # check state and time elapsed on all RMs
399         for guid in group:
400             rm = self.ec.get_resource(guid)
401             # If the RM state is lower than the requested state we must
402             # reschedule (e.g. if RM is READY but we required STARTED)
403             if rm.state < state:
404                 reschedule = True
405                 break
406
407             # If there is a time restriction, we must verify the
408             # restriction is satisfied 
409             if time:
410                 if state == ResourceState.DISCOVERED:
411                     t = rm.discover_time
412                 if state == ResourceState.PROVISIONED:
413                     t = rm.provision_time
414                 elif state == ResourceState.READY:
415                     t = rm.ready_time
416                 elif state == ResourceState.STARTED:
417                     t = rm.start_time
418                 elif state == ResourceState.STOPPED:
419                     t = rm.stop_time
420                 else:
421                     # Only keep time information for START and STOP
422                     break
423
424                 d = strfdiff(strfnow(), t)
425                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
426                 if wait > 0.001:
427                     reschedule = True
428                     delay = "%fs" % wait
429                     break
430         return reschedule, delay
431
432     def set_with_conditions(self, name, value, group, state, time):
433         """ Set value 'value' on attribute with name 'name' when 'time' 
434             has elapsed since all elements in 'group' have reached state
435            'state'
436
437         :param name: Name of the attribute to set
438         :type name: str
439         :param name: Value of the attribute to set
440         :type name: str
441         :param group: Group of RMs to wait for (list of guids)
442         :type group: int or list of int
443         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
444         :type state: str
445         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
446         :type time: str
447
448         """
449
450         reschedule = False
451         delay = reschedule_delay 
452
453         ## evaluate if set conditions are met
454
455         # only can set with conditions after the RM is started
456         if self.state != ResourceState.STARTED:
457             reschedule = True
458         else:
459             reschedule, delay = self._needs_reschedule(group, state, time)
460
461         if reschedule:
462             callback = functools.partial(self.set_with_conditions, 
463                     name, value, group, state, time)
464             self.ec.schedule(delay, callback)
465         else:
466             self.set(name, value)
467
468     def start_with_conditions(self):
469         """ Starts RM when all the conditions in self.conditions for
470         action 'START' are satisfied.
471
472         """
473         reschedule = False
474         delay = reschedule_delay 
475
476         ## evaluate if set conditions are met
477
478         # only can start when RM is either STOPPED or READY
479         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
480             reschedule = True
481             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
482         else:
483             start_conditions = self.conditions.get(ResourceAction.START, [])
484             
485             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
486             
487             # Verify all start conditions are met
488             for (group, state, time) in start_conditions:
489                 # Uncomment for debug
490                 #unmet = []
491                 #for guid in group:
492                 #    rm = self.ec.get_resource(guid)
493                 #    unmet.append((guid, rm._state))
494                 #
495                 #self.debug("---- WAITED STATES ---- %s" % unmet )
496
497                 reschedule, delay = self._needs_reschedule(group, state, time)
498                 if reschedule:
499                     break
500
501         if reschedule:
502             self.ec.schedule(delay, self.start_with_conditions)
503         else:
504             self.debug("----- STARTING ---- ")
505             self.start()
506
507     def stop_with_conditions(self):
508         """ Stops RM when all the conditions in self.conditions for
509         action 'STOP' are satisfied.
510
511         """
512         reschedule = False
513         delay = reschedule_delay 
514
515         ## evaluate if set conditions are met
516
517         # only can stop when RM is STARTED
518         if self.state != ResourceState.STARTED:
519             reschedule = True
520         else:
521             self.debug(" ---- STOP CONDITIONS ---- %s" % 
522                     self.conditions.get(ResourceAction.STOP))
523
524             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
525             for (group, state, time) in stop_conditions:
526                 reschedule, delay = self._needs_reschedule(group, state, time)
527                 if reschedule:
528                     break
529
530         if reschedule:
531             callback = functools.partial(self.stop_with_conditions)
532             self.ec.schedule(delay, callback)
533         else:
534             self.debug(" ----- STOPPING ---- ") 
535             self.stop()
536
537     def deploy(self):
538         """ Execute all steps required for the RM to reach the state READY
539
540         """
541         if self._state > ResourceState.READY:
542             self.error("Wrong state %s for deploy" % self.state)
543             return
544
545         self.debug("----- READY ---- ")
546         self._ready_time = strfnow()
547         self._state = ResourceState.READY
548
549     def release(self):
550         """Clean the resource at the end of the Experiment and change the status
551
552         """
553         self._release_time = strfnow()
554         self._state = ResourceState.RELEASED
555
556     def valid_connection(self, guid):
557         """Check if the connection is available.
558
559         :param guid: Guid of the current Resource Manager
560         :type guid: int
561         :rtype:  Boolean
562
563         """
564         # TODO: Validate!
565         return True
566
567 class ResourceFactory(object):
568     _resource_types = dict()
569
570     @classmethod
571     def resource_types(cls):
572         return cls._resource_types
573
574     @classmethod
575     def register_type(cls, rclass):
576         cls._resource_types[rclass.rtype()] = rclass
577
578     @classmethod
579     def create(cls, rtype, ec, guid):
580         rclass = cls._resource_types[rtype]
581         return rclass(ec, guid)
582
583 def populate_factory():
584     for rclass in find_types():
585         ResourceFactory.register_type(rclass)
586
587 def find_types():
588     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
589     search_path = set(search_path.split(" "))
590    
591     import inspect
592     import nepi.resources 
593     path = os.path.dirname(nepi.resources.__file__)
594     search_path.add(path)
595
596     types = []
597
598     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
599         loader = importer.find_module(modname)
600         try:
601             module = loader.load_module(loader.fullname)
602             for attrname in dir(module):
603                 if attrname.startswith("_"):
604                     continue
605
606                 attr = getattr(module, attrname)
607
608                 if attr == ResourceManager:
609                     continue
610
611                 if not inspect.isclass(attr):
612                     continue
613
614                 if issubclass(attr, ResourceManager):
615                     types.append(attr)
616         except:
617             import traceback
618             import logging
619             err = traceback.format_exc()
620             logger = logging.getLogger("Resource.find_types()")
621             logger.error("Error while lading Resource Managers %s" % err)
622
623     return types
624
625