Added LICENSE
[nepi.git] / src / nepi / execution / resource.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.execution.trace import TraceAttr
22
23 import copy
24 import functools
25 import inspect
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     DEPLOY = 0
35     START = 1
36     STOP = 2
37
38 class ResourceState:
39     NEW = 0
40     DISCOVERED = 1
41     PROVISIONED = 2
42     READY = 3
43     STARTED = 4
44     STOPPED = 5
45     FINISHED = 6
46     FAILED = 7
47     RELEASED = 8
48
49 def clsinit(cls):
50     cls._clsinit()
51     return cls
52
53 # Decorator to invoke class initialization method
54 @clsinit
55 class ResourceManager(object):
56     _rtype = "Resource"
57     _filters = None
58     _attributes = None
59     _traces = None
60
61     @classmethod
62     def _register_filter(cls, attr):
63         """ Resource subclasses will invoke this method to add a 
64         filter attribute
65
66         """
67         cls._filters[attr.name] = attr
68
69     @classmethod
70     def _register_attribute(cls, attr):
71         """ Resource subclasses will invoke this method to add a 
72         resource attribute
73
74         """
75         cls._attributes[attr.name] = attr
76
77     @classmethod
78     def _register_trace(cls, trace):
79         """ Resource subclasses will invoke this method to add a 
80         resource trace
81
82         """
83         cls._traces[trace.name] = trace
84
85
86     @classmethod
87     def _register_filters(cls):
88         """ Resource subclasses will invoke this method to register 
89         resource filters
90
91         """
92         pass
93
94     @classmethod
95     def _register_attributes(cls):
96         """ Resource subclasses will invoke this method to register
97         resource attributes
98
99         """
100         pass
101
102     @classmethod
103     def _register_traces(cls):
104         """ Resource subclasses will invoke this method to register
105         resource traces
106
107         """
108         pass
109
110     @classmethod
111     def _clsinit(cls):
112         """ Create a new dictionnary instance of the dictionnary 
113         with the same template.
114  
115         Each ressource should have the same registration dictionary
116         template with different instances.
117         """
118         # static template for resource filters
119         cls._filters = dict()
120         cls._register_filters()
121
122         # static template for resource attributes
123         cls._attributes = dict()
124         cls._register_attributes()
125
126         # static template for resource traces
127         cls._traces = dict()
128         cls._register_traces()
129
130     @classmethod
131     def rtype(cls):
132         return cls._rtype
133
134     @classmethod
135     def get_filters(cls):
136         """ Returns a copy of the filters
137
138         """
139         return copy.deepcopy(cls._filters.values())
140
141     @classmethod
142     def get_attributes(cls):
143         """ Returns a copy of the attributes
144
145         """
146         return copy.deepcopy(cls._attributes.values())
147
148     @classmethod
149     def get_traces(cls):
150         """ Returns a copy of the traces
151
152         """
153         return copy.deepcopy(cls._traces.values())
154
155     def __init__(self, ec, guid):
156         self._guid = guid
157         self._ec = weakref.ref(ec)
158         self._connections = set()
159         self._conditions = dict() 
160
161         # the resource instance gets a copy of all attributes
162         self._attrs = copy.deepcopy(self._attributes)
163
164         # the resource instance gets a copy of all traces
165         self._trcs = copy.deepcopy(self._traces)
166
167         self._state = ResourceState.NEW
168
169         self._start_time = None
170         self._stop_time = None
171         self._discover_time = None
172         self._provision_time = None
173         self._ready_time = None
174         self._release_time = None
175
176         # Logging
177         self._logger = logging.getLogger("Resource")
178
179     def debug(self, msg, out = None, err = None):
180         self.log(msg, logging.DEBUG, out, err)
181
182     def error(self, msg, out = None, err = None):
183         self.log(msg, logging.ERROR, out, err)
184
185     def warn(self, msg, out = None, err = None):
186         self.log(msg, logging.WARNING, out, err)
187
188     def info(self, msg, out = None, err = None):
189         self.log(msg, logging.INFO, out, err)
190
191     def log(self, msg, level, out = None, err = None):
192         if out:
193             msg += " - OUT: %s " % out
194
195         if err:
196             msg += " - ERROR: %s " % err
197
198         msg = self.log_message(msg)
199
200         self.logger.log(level, msg)
201
202     def log_message(self, msg):
203         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
204
205     @property
206     def logger(self):
207         return self._logger
208
209     @property
210     def guid(self):
211         return self._guid
212
213     @property
214     def ec(self):
215         return self._ec()
216
217     @property
218     def connections(self):
219         return self._connections
220
221     @property
222     def conditions(self):
223         return self._conditions
224
225     @property
226     def start_time(self):
227         """ Returns timestamp with the time the RM started """
228         return self._start_time
229
230     @property
231     def stop_time(self):
232         """ Returns timestamp with the time the RM stopped """
233         return self._stop_time
234
235     @property
236     def discover_time(self):
237         """ Returns timestamp with the time the RM passed to state discovered """
238         return self._discover_time
239
240     @property
241     def provision_time(self):
242         """ Returns timestamp with the time the RM passed to state provisioned """
243         return self._provision_time
244
245     @property
246     def ready_time(self):
247         """ Returns timestamp with the time the RM passed to state ready  """
248         return self._ready_time
249
250     @property
251     def release_time(self):
252         """ Returns timestamp with the time the RM was released """
253         return self._release_time
254
255     @property
256     def state(self):
257         return self._state
258
259     def connect(self, guid):
260         if self.valid_connection(guid):
261             self._connections.add(guid)
262
263     def discover(self, filters = None):
264         self._discover_time = strfnow()
265         self._state = ResourceState.DISCOVERED
266
267     def provision(self, filters = None):
268         self._provision_time = strfnow()
269         self._state = ResourceState.PROVISIONED
270
271     def start(self):
272         """ Start the Resource Manager
273
274         """
275         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
276             self.error("Wrong state %s for start" % self.state)
277             return
278
279         self._start_time = strfnow()
280         self._state = ResourceState.STARTED
281
282     def stop(self):
283         """ Start the Resource Manager
284
285         """
286         if not self._state in [ResourceState.STARTED]:
287             self.error("Wrong state %s for stop" % self.state)
288             return
289
290         self._stop_time = strfnow()
291         self._state = ResourceState.STOPPED
292
293     def set(self, name, value):
294         """ Set the value of the attribute
295
296         :param name: Name of the attribute
297         :type name: str
298         :param name: Value of the attribute
299         :type name: str
300         """
301         attr = self._attrs[name]
302         attr.value = value
303
304     def get(self, name):
305         """ Start the Resource Manager
306
307         :param name: Name of the attribute
308         :type name: str
309         :rtype: str
310         """
311         attr = self._attrs[name]
312         return attr.value
313
314     def register_trace(self, name):
315         """ Enable trace
316
317         :param name: Name of the trace
318         :type name: str
319         """
320         trace = self._trcs[name]
321         trace.enabled = True
322
323     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
324         """ Get information on collected trace
325
326         :param name: Name of the trace
327         :type name: str
328
329         :param attr: Can be one of:
330                          - TraceAttr.ALL (complete trace content), 
331                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
332                          - TraceAttr.PATH (full path to the trace file),
333                          - TraceAttr.SIZE (size of trace file). 
334         :type attr: str
335
336         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
337         :type name: int
338
339         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
340         :type name: int
341
342         :rtype: str
343         """
344         pass
345
346     def register_condition(self, action, group, state, 
347             time = None):
348         """ Registers a condition on the resource manager to allow execution 
349         of 'action' only after 'time' has elapsed from the moment all resources 
350         in 'group' reached state 'state'
351
352         :param action: Action to restrict to condition (either 'START' or 'STOP')
353         :type action: str
354         :param group: Group of RMs to wait for (list of guids)
355         :type group: int or list of int
356         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
357         :type state: str
358         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
359         :type time: str
360
361         """
362         conditions = self.conditions.get(action)
363         if not conditions:
364             conditions = list()
365             self._conditions[action] = conditions
366
367         # For each condition to register a tuple of (group, state, time) is 
368         # added to the 'action' list
369         if not isinstance(group, list):
370             group = [group]
371
372         conditions.append((group, state, time))
373
374     def get_connected(self, rtype):
375         connected = []
376         for guid in self.connections:
377             rm = self.ec.get_resource(guid)
378             if rm.rtype() == rtype:
379                 connected.append(rm)
380         return connected
381
382     def _needs_reschedule(self, group, state, time):
383         """ Internal method that verify if 'time' has elapsed since 
384         all elements in 'group' have reached state 'state'.
385
386         :param group: Group of RMs to wait for (list of guids)
387         :type group: int or list of int
388         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
389         :type state: str
390         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
391         :type time: str
392
393         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
394         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
395         For the moment, 2m30s is not a correct syntax.
396
397         """
398         reschedule = False
399         delay = reschedule_delay 
400
401         # check state and time elapsed on all RMs
402         for guid in group:
403             rm = self.ec.get_resource(guid)
404             # If the RM state is lower than the requested state we must
405             # reschedule (e.g. if RM is READY but we required STARTED)
406             if rm.state < state:
407                 reschedule = True
408                 break
409
410             # If there is a time restriction, we must verify the
411             # restriction is satisfied 
412             if time:
413                 if state == ResourceState.DISCOVERED:
414                     t = rm.discover_time
415                 if state == ResourceState.PROVISIONED:
416                     t = rm.provision_time
417                 elif state == ResourceState.READY:
418                     t = rm.ready_time
419                 elif state == ResourceState.STARTED:
420                     t = rm.start_time
421                 elif state == ResourceState.STOPPED:
422                     t = rm.stop_time
423                 else:
424                     # Only keep time information for START and STOP
425                     break
426
427                 d = strfdiff(strfnow(), t)
428                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
429                 if wait > 0.001:
430                     reschedule = True
431                     delay = "%fs" % wait
432                     break
433         return reschedule, delay
434
435     def set_with_conditions(self, name, value, group, state, time):
436         """ Set value 'value' on attribute with name 'name' when 'time' 
437             has elapsed since all elements in 'group' have reached state
438            'state'
439
440         :param name: Name of the attribute to set
441         :type name: str
442         :param name: Value of the attribute to set
443         :type name: str
444         :param group: Group of RMs to wait for (list of guids)
445         :type group: int or list of int
446         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
447         :type state: str
448         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
449         :type time: str
450
451         """
452
453         reschedule = False
454         delay = reschedule_delay 
455
456         ## evaluate if set conditions are met
457
458         # only can set with conditions after the RM is started
459         if self.state != ResourceState.STARTED:
460             reschedule = True
461         else:
462             reschedule, delay = self._needs_reschedule(group, state, time)
463
464         if reschedule:
465             callback = functools.partial(self.set_with_conditions, 
466                     name, value, group, state, time)
467             self.ec.schedule(delay, callback)
468         else:
469             self.set(name, value)
470
471     def start_with_conditions(self):
472         """ Starts RM when all the conditions in self.conditions for
473         action 'START' are satisfied.
474
475         """
476         reschedule = False
477         delay = reschedule_delay 
478
479         ## evaluate if set conditions are met
480
481         # only can start when RM is either STOPPED or READY
482         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
483             reschedule = True
484             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
485         else:
486             start_conditions = self.conditions.get(ResourceAction.START, [])
487             
488             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
489             
490             # Verify all start conditions are met
491             for (group, state, time) in start_conditions:
492                 # Uncomment for debug
493                 #unmet = []
494                 #for guid in group:
495                 #    rm = self.ec.get_resource(guid)
496                 #    unmet.append((guid, rm._state))
497                 #
498                 #self.debug("---- WAITED STATES ---- %s" % unmet )
499
500                 reschedule, delay = self._needs_reschedule(group, state, time)
501                 if reschedule:
502                     break
503
504         if reschedule:
505             self.ec.schedule(delay, self.start_with_conditions)
506         else:
507             self.debug("----- STARTING ---- ")
508             self.start()
509
510     def stop_with_conditions(self):
511         """ Stops RM when all the conditions in self.conditions for
512         action 'STOP' are satisfied.
513
514         """
515         reschedule = False
516         delay = reschedule_delay 
517
518         ## evaluate if set conditions are met
519
520         # only can stop when RM is STARTED
521         if self.state != ResourceState.STARTED:
522             reschedule = True
523         else:
524             self.debug(" ---- STOP CONDITIONS ---- %s" % 
525                     self.conditions.get(ResourceAction.STOP))
526
527             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
528             for (group, state, time) in stop_conditions:
529                 reschedule, delay = self._needs_reschedule(group, state, time)
530                 if reschedule:
531                     break
532
533         if reschedule:
534             callback = functools.partial(self.stop_with_conditions)
535             self.ec.schedule(delay, callback)
536         else:
537             self.logger.debug(" ----- STOPPING ---- ") 
538             self.stop()
539
540     def deploy(self):
541         """ Execute all steps required for the RM to reach the state READY
542
543         """
544         if self._state > ResourceState.READY:
545             self.error("Wrong state %s for deploy" % self.state)
546             return
547
548         self.debug("----- DEPLOYING ---- ")
549         self._ready_time = strfnow()
550         self._state = ResourceState.READY
551
552     def release(self):
553         """Clean the resource at the end of the Experiment and change the status
554
555         """
556         self._release_time = strfnow()
557         self._state = ResourceState.RELEASED
558
559     def valid_connection(self, guid):
560         """Check if the connection is available.
561
562         :param guid: Guid of the current Resource Manager
563         :type guid: int
564         :rtype:  Boolean
565
566         """
567         # TODO: Validate!
568         return True
569
570 class ResourceFactory(object):
571     _resource_types = dict()
572
573     @classmethod
574     def resource_types(cls):
575         return cls._resource_types
576
577     @classmethod
578     def register_type(cls, rclass):
579         cls._resource_types[rclass.rtype()] = rclass
580
581     @classmethod
582     def create(cls, rtype, ec, guid):
583         rclass = cls._resource_types[rtype]
584         return rclass(ec, guid)
585
586 def populate_factory():
587     for rclass in find_types():
588         ResourceFactory.register_type(rclass)
589
590 def find_types():
591     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
592     search_path = set(search_path.split(" "))
593    
594     import nepi.resources 
595     path = os.path.dirname(nepi.resources.__file__)
596     search_path.add(path)
597
598     types = []
599
600     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
601         loader = importer.find_module(modname)
602         try:
603             module = loader.load_module(loader.fullname)
604             for attrname in dir(module):
605                 if attrname.startswith("_"):
606                     continue
607
608                 attr = getattr(module, attrname)
609
610                 if attr == ResourceManager:
611                     continue
612
613                 if not inspect.isclass(attr):
614                     continue
615
616                 if issubclass(attr, ResourceManager):
617                     types.append(attr)
618         except:
619             import traceback
620             err = traceback.format_exc()
621             logger = logging.getLogger("Resource.find_types()")
622             logger.error("Error while lading Resource Managers %s" % err)
623
624     return types
625
626