Adding PlanetLab resources
[nepi.git] / src / nepi / execution / resource.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
21 from nepi.execution.trace import TraceAttr
22
23 import copy
24 import functools
25 import inspect
26 import logging
27 import os
28 import pkgutil
29 import weakref
30
31 reschedule_delay = "0.5s"
32
33 class ResourceAction:
34     DEPLOY = 0
35     START = 1
36     STOP = 2
37
38 class ResourceState:
39     NEW = 0
40     DISCOVERED = 1
41     PROVISIONED = 2
42     READY = 3
43     STARTED = 4
44     STOPPED = 5
45     FINISHED = 6
46     FAILED = 7
47     RELEASED = 8
48
49 ResourceState2str = dict({
50     NEW = "NEW",
51     DISCOVERED = "DISCOVERED",
52     PROVISIONED = "PROVISIONED",
53     READY = "READY",
54     STARTED = "STARTED",
55     STOPPED = "STOPPED",
56     FINISHED = "FINISHED",
57     FAILED = "FAILED",
58     RELEASED = "RELEASED",
59     })
60
61 def clsinit(cls):
62     """ Initializes template information (i.e. attributes and traces)
63     for the ResourceManager class
64     """
65     cls._clsinit()
66     return cls
67
68 def clsinit_copy(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     for the ResourceManager class, inheriting attributes and traces
71     from the parent class
72     """
73     cls._clsinit_copy()
74     return cls
75
76 # Decorator to invoke class initialization method
77 @clsinit
78 class ResourceManager(object):
79     _rtype = "Resource"
80     _attributes = None
81     _traces = None
82
83     @classmethod
84     def _register_attribute(cls, attr):
85         """ Resource subclasses will invoke this method to add a 
86         resource attribute
87
88         """
89         cls._attributes[attr.name] = attr
90
91     @classmethod
92     def _remove_attribute(cls, name):
93         """ Resource subclasses will invoke this method to remove a 
94         resource attribute
95
96         """
97         del cls._attributes[name]
98
99     @classmethod
100     def _register_trace(cls, trace):
101         """ Resource subclasses will invoke this method to add a 
102         resource trace
103
104         """
105         cls._traces[trace.name] = trace
106
107     @classmethod
108     def _remove_trace(cls, name):
109         """ Resource subclasses will invoke this method to remove a 
110         resource trace
111
112         """
113         del cls._traces[name]
114
115     @classmethod
116     def _register_attributes(cls):
117         """ Resource subclasses will invoke this method to register
118         resource attributes
119
120         """
121         pass
122
123     @classmethod
124     def _register_traces(cls):
125         """ Resource subclasses will invoke this method to register
126         resource traces
127
128         """
129         pass
130
131     @classmethod
132     def _clsinit(cls):
133         """ ResourceManager child classes have different attributes and traces.
134         Since the templates that hold the information of attributes and traces
135         are 'class attribute' dictionaries, initially they all point to the 
136         parent class ResourceManager instances of those dictionaries. 
137         In order to make these templates independent from the parent's one,
138         it is necessary re-initialize the corresponding dictionaries. 
139         This is the objective of the _clsinit method
140         """
141         # static template for resource attributes
142         cls._attributes = dict()
143         cls._register_attributes()
144
145         # static template for resource traces
146         cls._traces = dict()
147         cls._register_traces()
148
149     @classmethod
150     def _clsinit_copy(cls):
151         """ Same as _clsinit, except that it also inherits all attributes and traces
152         from the parent class.
153         """
154         # static template for resource attributes
155         cls._attributes = copy.deepcopy(cls._attributes)
156         cls._register_attributes()
157
158         # static template for resource traces
159         cls._traces = copy.deepcopy(cls._traces)
160         cls._register_traces()
161
162     @classmethod
163     def rtype(cls):
164         return cls._rtype
165
166     @classmethod
167     def get_attributes(cls):
168         """ Returns a copy of the attributes
169
170         """
171         return copy.deepcopy(cls._attributes.values())
172
173     @classmethod
174     def get_traces(cls):
175         """ Returns a copy of the traces
176
177         """
178         return copy.deepcopy(cls._traces.values())
179
180     def __init__(self, ec, guid):
181         self._guid = guid
182         self._ec = weakref.ref(ec)
183         self._connections = set()
184         self._conditions = dict() 
185
186         # the resource instance gets a copy of all attributes
187         self._attrs = copy.deepcopy(self._attributes)
188
189         # the resource instance gets a copy of all traces
190         self._trcs = copy.deepcopy(self._traces)
191
192         self._state = ResourceState.NEW
193
194         self._start_time = None
195         self._stop_time = None
196         self._discover_time = None
197         self._provision_time = None
198         self._ready_time = None
199         self._release_time = None
200
201         # Logging
202         self._logger = logging.getLogger("Resource")
203
204     def debug(self, msg, out = None, err = None):
205         self.log(msg, logging.DEBUG, out, err)
206
207     def error(self, msg, out = None, err = None):
208         self.log(msg, logging.ERROR, out, err)
209
210     def warn(self, msg, out = None, err = None):
211         self.log(msg, logging.WARNING, out, err)
212
213     def info(self, msg, out = None, err = None):
214         self.log(msg, logging.INFO, out, err)
215
216     def log(self, msg, level, out = None, err = None):
217         if out:
218             msg += " - OUT: %s " % out
219
220         if err:
221             msg += " - ERROR: %s " % err
222
223         msg = self.log_message(msg)
224
225         self.logger.log(level, msg)
226
227     def log_message(self, msg):
228         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
229
230     @property
231     def logger(self):
232         return self._logger
233
234     @property
235     def guid(self):
236         return self._guid
237
238     @property
239     def ec(self):
240         return self._ec()
241
242     @property
243     def connections(self):
244         return self._connections
245
246     @property
247     def conditions(self):
248         return self._conditions
249
250     @property
251     def start_time(self):
252         """ Returns timestamp with the time the RM started """
253         return self._start_time
254
255     @property
256     def stop_time(self):
257         """ Returns timestamp with the time the RM stopped """
258         return self._stop_time
259
260     @property
261     def discover_time(self):
262         """ Returns timestamp with the time the RM passed to state discovered """
263         return self._discover_time
264
265     @property
266     def provision_time(self):
267         """ Returns timestamp with the time the RM passed to state provisioned """
268         return self._provision_time
269
270     @property
271     def ready_time(self):
272         """ Returns timestamp with the time the RM passed to state ready  """
273         return self._ready_time
274
275     @property
276     def release_time(self):
277         """ Returns timestamp with the time the RM was released """
278         return self._release_time
279
280     @property
281     def state(self):
282         return self._state
283
284     def connect(self, guid):
285         if self.valid_connection(guid):
286             self._connections.add(guid)
287
288     def discover(self):
289         self._discover_time = strfnow()
290         self._state = ResourceState.DISCOVERED
291
292     def provision(self):
293         self._provision_time = strfnow()
294         self._state = ResourceState.PROVISIONED
295
296     def start(self):
297         """ Start the Resource Manager
298
299         """
300         if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
301             self.error("Wrong state %s for start" % self.state)
302             return
303
304         self._start_time = strfnow()
305         self._state = ResourceState.STARTED
306
307     def stop(self):
308         """ Start the Resource Manager
309
310         """
311         if not self._state in [ResourceState.STARTED]:
312             self.error("Wrong state %s for stop" % self.state)
313             return
314
315         self._stop_time = strfnow()
316         self._state = ResourceState.STOPPED
317
318     def set(self, name, value):
319         """ Set the value of the attribute
320
321         :param name: Name of the attribute
322         :type name: str
323         :param name: Value of the attribute
324         :type name: str
325         """
326         attr = self._attrs[name]
327         attr.value = value
328
329     def get(self, name):
330         """ Start the Resource Manager
331
332         :param name: Name of the attribute
333         :type name: str
334         :rtype: str
335         """
336         attr = self._attrs[name]
337         return attr.value
338
339     def register_trace(self, name):
340         """ Enable trace
341
342         :param name: Name of the trace
343         :type name: str
344         """
345         trace = self._trcs[name]
346         trace.enabled = True
347
348     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
349         """ Get information on collected trace
350
351         :param name: Name of the trace
352         :type name: str
353
354         :param attr: Can be one of:
355                          - TraceAttr.ALL (complete trace content), 
356                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
357                          - TraceAttr.PATH (full path to the trace file),
358                          - TraceAttr.SIZE (size of trace file). 
359         :type attr: str
360
361         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
362         :type name: int
363
364         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
365         :type name: int
366
367         :rtype: str
368         """
369         pass
370
371     def register_condition(self, action, group, state, 
372             time = None):
373         """ Registers a condition on the resource manager to allow execution 
374         of 'action' only after 'time' has elapsed from the moment all resources 
375         in 'group' reached state 'state'
376
377         :param action: Action to restrict to condition (either 'START' or 'STOP')
378         :type action: str
379         :param group: Group of RMs to wait for (list of guids)
380         :type group: int or list of int
381         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
382         :type state: str
383         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
384         :type time: str
385
386         """
387         conditions = self.conditions.get(action)
388         if not conditions:
389             conditions = list()
390             self._conditions[action] = conditions
391
392         # For each condition to register a tuple of (group, state, time) is 
393         # added to the 'action' list
394         if not isinstance(group, list):
395             group = [group]
396
397         conditions.append((group, state, time))
398
399     def get_connected(self, rtype):
400         connected = []
401         for guid in self.connections:
402             rm = self.ec.get_resource(guid)
403             if rm.rtype() == rtype:
404                 connected.append(rm)
405         return connected
406
407     def _needs_reschedule(self, group, state, time):
408         """ Internal method that verify if 'time' has elapsed since 
409         all elements in 'group' have reached state 'state'.
410
411         :param group: Group of RMs to wait for (list of guids)
412         :type group: int or list of int
413         :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED')
414         :type state: str
415         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
416         :type time: str
417
418         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
419         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
420         For the moment, 2m30s is not a correct syntax.
421
422         """
423         reschedule = False
424         delay = reschedule_delay 
425
426         # check state and time elapsed on all RMs
427         for guid in group:
428             rm = self.ec.get_resource(guid)
429             # If the RM state is lower than the requested state we must
430             # reschedule (e.g. if RM is READY but we required STARTED)
431             if rm.state < state:
432                 reschedule = True
433                 break
434
435             # If there is a time restriction, we must verify the
436             # restriction is satisfied 
437             if time:
438                 if state == ResourceState.DISCOVERED:
439                     t = rm.discover_time
440                 if state == ResourceState.PROVISIONED:
441                     t = rm.provision_time
442                 elif state == ResourceState.READY:
443                     t = rm.ready_time
444                 elif state == ResourceState.STARTED:
445                     t = rm.start_time
446                 elif state == ResourceState.STOPPED:
447                     t = rm.stop_time
448                 else:
449                     # Only keep time information for START and STOP
450                     break
451
452                 d = strfdiff(strfnow(), t)
453                 wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
454                 if wait > 0.001:
455                     reschedule = True
456                     delay = "%fs" % wait
457                     break
458         return reschedule, delay
459
460     def set_with_conditions(self, name, value, group, state, time):
461         """ Set value 'value' on attribute with name 'name' when 'time' 
462             has elapsed since all elements in 'group' have reached state
463            'state'
464
465         :param name: Name of the attribute to set
466         :type name: str
467         :param name: Value of the attribute to set
468         :type name: str
469         :param group: Group of RMs to wait for (list of guids)
470         :type group: int or list of int
471         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
472         :type state: str
473         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
474         :type time: str
475
476         """
477
478         reschedule = False
479         delay = reschedule_delay 
480
481         ## evaluate if set conditions are met
482
483         # only can set with conditions after the RM is started
484         if self.state != ResourceState.STARTED:
485             reschedule = True
486         else:
487             reschedule, delay = self._needs_reschedule(group, state, time)
488
489         if reschedule:
490             callback = functools.partial(self.set_with_conditions, 
491                     name, value, group, state, time)
492             self.ec.schedule(delay, callback)
493         else:
494             self.set(name, value)
495
496     def start_with_conditions(self):
497         """ Starts RM when all the conditions in self.conditions for
498         action 'START' are satisfied.
499
500         """
501         reschedule = False
502         delay = reschedule_delay 
503
504         ## evaluate if set conditions are met
505
506         # only can start when RM is either STOPPED or READY
507         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
508             reschedule = True
509             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
510         else:
511             start_conditions = self.conditions.get(ResourceAction.START, [])
512             
513             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
514             
515             # Verify all start conditions are met
516             for (group, state, time) in start_conditions:
517                 # Uncomment for debug
518                 #unmet = []
519                 #for guid in group:
520                 #    rm = self.ec.get_resource(guid)
521                 #    unmet.append((guid, rm._state))
522                 #
523                 #self.debug("---- WAITED STATES ---- %s" % unmet )
524
525                 reschedule, delay = self._needs_reschedule(group, state, time)
526                 if reschedule:
527                     break
528
529         if reschedule:
530             self.ec.schedule(delay, self.start_with_conditions)
531         else:
532             self.debug("----- STARTING ---- ")
533             self.start()
534
535     def stop_with_conditions(self):
536         """ Stops RM when all the conditions in self.conditions for
537         action 'STOP' are satisfied.
538
539         """
540         reschedule = False
541         delay = reschedule_delay 
542
543         ## evaluate if set conditions are met
544
545         # only can stop when RM is STARTED
546         if self.state != ResourceState.STARTED:
547             reschedule = True
548         else:
549             self.debug(" ---- STOP CONDITIONS ---- %s" % 
550                     self.conditions.get(ResourceAction.STOP))
551
552             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
553             for (group, state, time) in stop_conditions:
554                 reschedule, delay = self._needs_reschedule(group, state, time)
555                 if reschedule:
556                     break
557
558         if reschedule:
559             callback = functools.partial(self.stop_with_conditions)
560             self.ec.schedule(delay, callback)
561         else:
562             self.logger.debug(" ----- STOPPING ---- ") 
563             self.stop()
564
565     def deploy(self):
566         """ Execute all steps required for the RM to reach the state READY
567
568         """
569         if self._state > ResourceState.READY:
570             self.error("Wrong state %s for deploy" % self.state)
571             return
572
573         self.debug("----- DEPLOYING ---- ")
574         self._ready_time = strfnow()
575         self._state = ResourceState.READY
576
577     def release(self):
578         """Clean the resource at the end of the Experiment and change the status
579
580         """
581         self._release_time = strfnow()
582         self._state = ResourceState.RELEASED
583
584     def valid_connection(self, guid):
585         """Check if the connection is available.
586
587         :param guid: Guid of the current Resource Manager
588         :type guid: int
589         :rtype:  Boolean
590
591         """
592         # TODO: Validate!
593         return True
594
595 class ResourceFactory(object):
596     _resource_types = dict()
597
598     @classmethod
599     def resource_types(cls):
600         return cls._resource_types
601
602     @classmethod
603     def register_type(cls, rclass):
604         cls._resource_types[rclass.rtype()] = rclass
605
606     @classmethod
607     def create(cls, rtype, ec, guid):
608         rclass = cls._resource_types[rtype]
609         return rclass(ec, guid)
610
611 def populate_factory():
612     for rclass in find_types():
613         ResourceFactory.register_type(rclass)
614
615 def find_types():
616     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
617     search_path = set(search_path.split(" "))
618    
619     import nepi.resources 
620     path = os.path.dirname(nepi.resources.__file__)
621     search_path.add(path)
622
623     types = []
624
625     for importer, modname, ispkg in pkgutil.walk_packages(search_path):
626         loader = importer.find_module(modname)
627         try:
628             module = loader.load_module(loader.fullname)
629             for attrname in dir(module):
630                 if attrname.startswith("_"):
631                     continue
632
633                 attr = getattr(module, attrname)
634
635                 if attr == ResourceManager:
636                     continue
637
638                 if not inspect.isclass(attr):
639                     continue
640
641                 if issubclass(attr, ResourceManager):
642                     types.append(attr)
643         except:
644             import traceback
645             err = traceback.format_exc()
646             logger = logging.getLogger("Resource.find_types()")
647             logger.error("Error while lading Resource Managers %s" % err)
648
649     return types
650
651