adcb438c4d9ddc9c2499e2457b7f429a9b1256f4
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 class ResourceAction:
35     """ Action that a user can order to a Resource Manager
36    
37     """
38     DEPLOY = 0
39     START = 1
40     STOP = 2
41
42 class ResourceState:
43     """ State of a Resource Manager
44    
45     """
46     NEW = 0
47     DISCOVERED = 1
48     RESERVED = 2
49     PROVISIONED = 3
50     READY = 4
51     STARTED = 5
52     STOPPED = 6
53     FAILED = 7
54     RELEASED = 8
55
56 ResourceState2str = dict({
57     ResourceState.NEW : "NEW",
58     ResourceState.DISCOVERED : "DISCOVERED",
59     ResourceState.RESERVED : "RESERVED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes derived from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, reserved, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             self.fail()
117             
118             import traceback
119             err = traceback.format_exc()
120             logger = Logger(self._rtype)
121             logger.error(err)
122             logger.error("SETTING guid %d to state FAILED" % self.guid)
123             raise
124     
125     return wrapped
126
127 @clsinit
128 class ResourceManager(Logger):
129     """ Base clase for all ResourceManagers. 
130     
131     A ResourceManger is specific to a resource type (e.g. Node, 
132     Switch, Application, etc) on a specific platform (e.g. PlanetLab, 
133     OMF, etc).
134
135     The ResourceManager instances are responsible for interacting with
136     and controlling concrete (physical or virtual) resources in the 
137     experimental platforms.
138     
139     """
140     _rtype = "Resource"
141     _attributes = None
142     _traces = None
143     _help = None
144     _platform = None
145     _reschedule_delay = "0.5s"
146
147     @classmethod
148     def _register_attribute(cls, attr):
149         """ Resource subclasses will invoke this method to add a 
150         resource attribute
151
152         """
153         
154         cls._attributes[attr.name] = attr
155
156     @classmethod
157     def _remove_attribute(cls, name):
158         """ Resource subclasses will invoke this method to remove a 
159         resource attribute
160
161         """
162         
163         del cls._attributes[name]
164
165     @classmethod
166     def _register_trace(cls, trace):
167         """ Resource subclasses will invoke this method to add a 
168         resource trace
169
170         """
171         
172         cls._traces[trace.name] = trace
173
174     @classmethod
175     def _remove_trace(cls, name):
176         """ Resource subclasses will invoke this method to remove a 
177         resource trace
178
179         """
180         
181         del cls._traces[name]
182
183     @classmethod
184     def _register_attributes(cls):
185         """ Resource subclasses will invoke this method to register
186         resource attributes.
187
188         This method should be overriden in the RMs that define
189         attributes.
190
191         """
192         critical = Attribute("critical", 
193                 "Defines whether the resource is critical. "
194                 "A failure on a critical resource will interrupt "
195                 "the experiment. ",
196                 type = Types.Bool,
197                 default = True,
198                 flags = Flags.Design)
199         hard_release = Attribute("hardRelease", 
200                 "Forces removal of all result files and directories associated "
201                 "to the RM upon resource release. After release the RM will "
202                 "be removed from the EC and the results will not longer be "
203                 "accessible",
204                 type = Types.Bool,
205                 default = False,
206                 flags = Flags.Design)
207
208         cls._register_attribute(critical)
209         cls._register_attribute(hard_release)
210         
211     @classmethod
212     def _register_traces(cls):
213         """ Resource subclasses will invoke this method to register
214         resource traces
215
216         This method should be overridden in the RMs that define traces.
217         
218         """
219         
220         pass
221
222     @classmethod
223     def _clsinit(cls):
224         """ ResourceManager classes have different attributes and traces.
225         Attribute and traces are stored in 'class attribute' dictionaries.
226         When a new ResourceManager class is created, the _clsinit method is 
227         called to create a new instance of those dictionaries and initialize 
228         them.
229         
230         The _clsinit method is called by the clsinit decorator method.
231         
232         """
233         
234         # static template for resource attributes
235         cls._attributes = dict()
236         cls._register_attributes()
237
238         # static template for resource traces
239         cls._traces = dict()
240         cls._register_traces()
241
242     @classmethod
243     def _clsinit_copy(cls):
244         """ Same as _clsinit, except that after creating new instances of the
245         dictionaries it copies all the attributes and traces from the parent 
246         class.
247         
248         The _clsinit_copy method is called by the clsinit_copy decorator method.
249         
250         """
251         # static template for resource attributes
252         cls._attributes = copy.deepcopy(cls._attributes)
253         cls._register_attributes()
254
255         # static template for resource traces
256         cls._traces = copy.deepcopy(cls._traces)
257         cls._register_traces()
258
259     @classmethod
260     def get_rtype(cls):
261         """ Returns the type of the Resource Manager
262
263         """
264         return cls._rtype
265
266     @classmethod
267     def get_attributes(cls):
268         """ Returns a copy of the attributes
269
270         """
271         return copy.deepcopy(cls._attributes.values())
272
273     @classmethod
274     def get_attribute(cls, name):
275         """ Returns a copy of the attribute with name 'name'
276
277         """
278         return copy.deepcopy(cls._attributes[name])
279
280     @classmethod
281     def get_traces(cls):
282         """ Returns a copy of the traces
283
284         """
285         return copy.deepcopy(cls._traces.values())
286
287     @classmethod
288     def get_help(cls):
289         """ Returns the description of the type of Resource
290
291         """
292         return cls._help
293
294     @classmethod
295     def get_platform(cls):
296         """ Returns the identified of the platform (i.e. testbed type)
297         for the Resource
298
299         """
300         return cls._platform
301
302     @classmethod
303     def get_global(cls, name):
304         """ Returns the value of a global attribute
305             Global attribute meaning an attribute for 
306             all the resources from a rtype
307
308         :param name: Name of the attribute
309         :type name: str
310         :rtype: str
311         """
312         global_attr = cls._attributes[name]
313         return global_attr.value
314
315     @classmethod
316     def set_global(cls, name, value):
317         """ Set value for a global attribute
318
319         :param name: Name of the attribute
320         :type name: str
321         :param name: Value of the attribute
322         :type name: str
323         """
324         global_attr = cls._attributes[name]
325         global_attr.value = value
326         return value
327
328     def __init__(self, ec, guid):
329         super(ResourceManager, self).__init__(self.get_rtype())
330         
331         self._guid = guid
332         self._ec = weakref.ref(ec)
333         self._connections = set()
334         self._conditions = dict() 
335
336         # the resource instance gets a copy of all attributes
337         self._attrs = copy.deepcopy(self._attributes)
338
339         # the resource instance gets a copy of all traces
340         self._trcs = copy.deepcopy(self._traces)
341
342         # Each resource is placed on a deployment group by the EC
343         # during deployment
344         self.deployment_group = None
345
346         self._start_time = None
347         self._stop_time = None
348         self._discover_time = None
349         self._reserved_time = None
350         self._provision_time = None
351         self._ready_time = None
352         self._release_time = None
353         self._failed_time = None
354
355         self._state = ResourceState.NEW
356
357         # instance lock to synchronize exclusive state change methods (such
358         # as deploy and release methods), in order to prevent them from being 
359         # executed at the same time and corrupt internal resource state
360         self._release_lock = threading.Lock()
361
362     @property
363     def guid(self):
364         """ Returns the global unique identifier of the RM """
365         return self._guid
366
367     @property
368     def ec(self):
369         """ Returns the Experiment Controller of the RM """
370         return self._ec()
371
372     @property
373     def connections(self):
374         """ Returns the set of guids of connected RMs """
375         return self._connections
376
377     @property
378     def conditions(self):
379         """ Returns the conditions to which the RM is subjected to.
380         
381         This method returns a dictionary of conditions lists indexed by
382         a ResourceAction.
383         
384         """
385         return self._conditions
386
387     @property
388     def start_time(self):
389         """ Returns the start time of the RM as a timestamp """
390         return self._start_time
391
392     @property
393     def stop_time(self):
394         """ Returns the stop time of the RM as a timestamp """
395         return self._stop_time
396
397     @property
398     def discover_time(self):
399         """ Returns the discover time of the RM as a timestamp """
400         return self._discover_time
401
402     @property
403     def reserved_time(self):
404         """ Returns the reserved time of the RM as a timestamp """
405         return self._reserved_time
406
407     @property
408     def provision_time(self):
409         """ Returns the provision time of the RM as a timestamp """
410         return self._provision_time
411
412     @property
413     def ready_time(self):
414         """ Returns the deployment time of the RM as a timestamp """
415         return self._ready_time
416
417     @property
418     def release_time(self):
419         """ Returns the release time of the RM as a timestamp """
420         return self._release_time
421
422     @property
423     def failed_time(self):
424         """ Returns the time failure occurred for the RM as a timestamp """
425         return self._failed_time
426
427     @property
428     def state(self):
429         """ Get the current state of the RM """
430         return self._state
431
432     @property
433     def reschedule_delay(self):
434         """ Returns default reschedule delay """
435         return self._reschedule_delay
436
437     def log_message(self, msg):
438         """ Returns the log message formatted with added information.
439
440         :param msg: text message
441         :type msg: str
442         :rtype: str
443
444         """
445         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
446
447     def register_connection(self, guid):
448         """ Registers a connection to the RM identified by guid
449
450         This method should not be overridden. Specific functionality
451         should be added in the do_connect method.
452
453         :param guid: Global unique identified of the RM to connect to
454         :type guid: int
455
456         """
457         if self.valid_connection(guid):
458             self.do_connect(guid)
459             self._connections.add(guid)
460
461     def unregister_connection(self, guid):
462         """ Removes a registered connection to the RM identified by guid
463         
464         This method should not be overridden. Specific functionality
465         should be added in the do_disconnect method.
466
467         :param guid: Global unique identified of the RM to connect to
468         :type guid: int
469
470         """
471         if guid in self._connections:
472             self.do_disconnect(guid)
473             self._connections.remove(guid)
474
475     @failtrap
476     def discover(self):
477         """ Performs resource discovery.
478         
479         This  method is responsible for selecting an individual resource
480         matching user requirements.
481
482         This method should not be overridden directly. Specific functionality
483         should be added in the do_discover method.
484
485         """
486         with self._release_lock:
487             if self._state != ResourceState.RELEASED:
488                 self.do_discover()
489
490     @failtrap
491     def reserve(self):
492         """ Performs resource reserve.
493         
494         This  method is responsible for reserving an individual resource
495         matching user requirements.
496
497         This method should not be overridden directly. Specific functionality
498         should be added in the do_reserved method.
499
500         """
501         with self._release_lock:
502             if self._state != ResourceState.RELEASED:
503                 self.do_reserve()
504
505     @failtrap
506     def provision(self):
507         """ Performs resource provisioning.
508
509         This  method is responsible for provisioning one resource.
510         After this method has been successfully invoked, the resource
511         should be accessible/controllable by the RM.
512
513         This method should not be overridden directly. Specific functionality
514         should be added in the do_provision method.
515
516         """
517         with self._release_lock:
518             if self._state != ResourceState.RELEASED:
519                 self.do_provision()
520
521     @failtrap
522     def configure(self):
523         """ Performs resource configuration.
524
525         This  method is responsible for configuring one resource.
526         After this method has been successfully invoked, the resource
527         should be set up to start the experimentation.
528
529         This method should not be overridden directly. Specific functionality
530         should be added in the do_configure method.
531
532         """
533         with self._release_lock:
534             if self._state != ResourceState.RELEASED:
535                 self.do_configure()
536
537     @failtrap
538     def start(self):
539         """ Starts the RM (e.g. launch remote process).
540     
541         There is no standard start behavior. Some RMs will not need to perform
542         any actions upon start.
543
544         This method should not be overridden directly. Specific functionality
545         should be added in the do_start method.
546
547         """
548
549         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
550             self.error("Wrong state %s for start" % self.state)
551             return
552
553         with self._release_lock:
554             if self._state != ResourceState.RELEASED:
555                 self.do_start()
556
557     @failtrap
558     def stop(self):
559         """ Interrupts the RM, stopping any tasks the RM was performing.
560      
561         There is no standard stop behavior. Some RMs will not need to perform
562         any actions upon stop.
563     
564         This method should not be overridden directly. Specific functionality
565         should be added in the do_stop method.
566       
567         """
568         if not self.state in [ResourceState.STARTED]:
569             self.error("Wrong state %s for stop" % self.state)
570             return
571         
572         with self._release_lock:
573             self.do_stop()
574
575     @failtrap
576     def deploy(self):
577         """ Execute all steps required for the RM to reach the state READY.
578
579         This method is responsible for deploying the resource (and invoking 
580         the discover and provision methods).
581  
582         This method should not be overridden directly. Specific functionality
583         should be added in the do_deploy method.
584        
585         """
586         if self.state > ResourceState.READY:
587             self.error("Wrong state %s for deploy" % self.state)
588             return
589
590         with self._release_lock:
591             if self._state != ResourceState.RELEASED:
592                 self.do_deploy()
593
594     def release(self):
595         """ Perform actions to free resources used by the RM.
596   
597         This  method is responsible for releasing resources that were
598         used during the experiment by the RM.
599
600         This method should not be overridden directly. Specific functionality
601         should be added in the do_release method.
602       
603         """
604         with self._release_lock:
605             try:
606                 self.do_release()
607             except:
608                 self.set_released()
609
610                 import traceback
611                 err = traceback.format_exc()
612                 msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
613                         self._rtype, self.guid, err)
614                 logger = Logger(self._rtype)
615                 logger.debug(msg)
616
617     def fail(self):
618         """ Sets the RM to state FAILED.
619
620         This method should not be overridden directly. Specific functionality
621         should be added in the do_fail method.
622
623         """
624         with self._release_lock:
625             if self._state != ResourceState.RELEASED:
626                 self.do_fail()
627
628     def set(self, name, value):
629         """ Set the value of the attribute
630
631         :param name: Name of the attribute
632         :type name: str
633         :param name: Value of the attribute
634         :type name: str
635         """
636         attr = self._attrs[name]
637         attr.value = value
638         return value
639
640     def get(self, name):
641         """ Returns the value of the attribute
642
643         :param name: Name of the attribute
644         :type name: str
645         :rtype: str
646         """
647         attr = self._attrs[name]
648
649         """
650         A.Q. Commenting due to performance impact
651         if attr.has_flag(Flags.Global):
652             self.warning( "Attribute %s is global. Use get_global instead." % name)
653         """
654             
655         return attr.value
656
657     def has_changed(self, name):
658         """ Returns the True is the value of the attribute
659             has been modified by the user.
660
661         :param name: Name of the attribute
662         :type name: str
663         :rtype: str
664         """
665         attr = self._attrs[name]
666         return attr.has_changed
667
668     def has_flag(self, name, flag):
669         """ Returns true if the attribute has the flag 'flag'
670
671         :param flag: Flag to be checked
672         :type flag: Flags
673         """
674         attr = self._attrs[name]
675         return attr.has_flag(flag)
676
677     def has_attribute(self, name):
678         """ Returns true if the RM has an attribute with name
679
680         :param name: name of the attribute
681         :type name: string
682         """
683         return name in self._attrs
684
685     def enable_trace(self, name):
686         """ Explicitly enable trace generation
687
688         :param name: Name of the trace
689         :type name: str
690         """
691         trace = self._trcs[name]
692         trace.enabled = True
693     
694     def trace_enabled(self, name):
695         """Returns True if trace is enables 
696
697         :param name: Name of the trace
698         :type name: str
699         """
700         trace = self._trcs[name]
701         return trace.enabled
702  
703     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
704         """ Get information on collected trace
705
706         :param name: Name of the trace
707         :type name: str
708
709         :param attr: Can be one of:
710                          - TraceAttr.ALL (complete trace content), 
711                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
712                          - TraceAttr.PATH (full path to the trace file),
713                          - TraceAttr.SIZE (size of trace file). 
714         :type attr: str
715
716         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
717         :type name: int
718
719         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
720         :type name: int
721
722         :rtype: str
723         """
724         pass
725
726     def register_condition(self, action, group, state, time = None):
727         """ Registers a condition on the resource manager to allow execution 
728         of 'action' only after 'time' has elapsed from the moment all resources 
729         in 'group' reached state 'state'
730
731         :param action: Action to restrict to condition (either 'START' or 'STOP')
732         :type action: str
733         :param group: Group of RMs to wait for (list of guids)
734         :type group: int or list of int
735         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
736         :type state: str
737         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
738         :type time: str
739
740         """
741
742         if not action in self.conditions:
743             self._conditions[action] = list()
744         
745         conditions = self.conditions.get(action)
746
747         # For each condition to register a tuple of (group, state, time) is 
748         # added to the 'action' list
749         if not isinstance(group, list):
750             group = [group]
751
752         conditions.append((group, state, time))
753
754     def unregister_condition(self, group, action = None):
755         """ Removed conditions for a certain group of guids
756
757         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
758         :type action: str
759
760         :param group: Group of RMs to wait for (list of guids)
761         :type group: int or list of int
762
763         """
764         # For each condition a tuple of (group, state, time) is 
765         # added to the 'action' list
766         if not isinstance(group, list):
767             group = [group]
768
769         for act, conditions in self.conditions.iteritems():
770             if action and act != action:
771                 continue
772
773             for condition in list(conditions):
774                 (grp, state, time) = condition
775
776                 # If there is an intersection between grp and group,
777                 # then remove intersected elements
778                 intsec = set(group).intersection(set(grp))
779                 if intsec:
780                     idx = conditions.index(condition)
781                     newgrp = set(grp)
782                     newgrp.difference_update(intsec)
783                     conditions[idx] = (newgrp, state, time)
784                  
785     def get_connected(self, rtype = None):
786         """ Returns the list of RM with the type 'rtype'
787
788         :param rtype: Type of the RM we look for
789         :type rtype: str
790         :return: list of guid
791         """
792         connected = []
793         rclass = ResourceFactory.get_resource_type(rtype)
794         for guid in self.connections:
795             rm = self.ec.get_resource(guid)
796             if not rtype or isinstance(rm, rclass):
797                 connected.append(rm)
798         return connected
799
800     def is_rm_instance(self, rtype):
801         """ Returns True if the RM is instance of 'rtype'
802
803         :param rtype: Type of the RM we look for
804         :type rtype: str
805         :return: True|False
806         """
807         rclass = ResourceFactory.get_resource_type(rtype)
808         if isinstance(self, rclass):
809             return True
810         return False
811
812     @failtrap
813     def _needs_reschedule(self, group, state, time):
814         """ Internal method that verify if 'time' has elapsed since 
815         all elements in 'group' have reached state 'state'.
816
817         :param group: Group of RMs to wait for (list of guids)
818         :type group: int or list of int
819         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
820         :type state: str
821         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
822         :type time: str
823
824         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
825         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
826         For the moment, 2m30s is not a correct syntax.
827
828         """
829         reschedule = False
830         delay = self.reschedule_delay 
831
832         # check state and time elapsed on all RMs
833         for guid in group:
834             rm = self.ec.get_resource(guid)
835             
836             # If one of the RMs this resource needs to wait for has FAILED
837             # and is critical we raise an exception
838             if rm.state == ResourceState.FAILED:
839                 if not rm.get('critical'):
840                     continue
841                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
842                 raise RuntimeError, msg
843
844             # If the RM state is lower than the requested state we must
845             # reschedule (e.g. if RM is READY but we required STARTED).
846             if rm.state < state:
847                 reschedule = True
848                 break
849
850             # If there is a time restriction, we must verify the
851             # restriction is satisfied 
852             if time:
853                 if state == ResourceState.DISCOVERED:
854                     t = rm.discover_time
855                 elif state == ResourceState.RESERVED:
856                     t = rm.reserved_time
857                 elif state == ResourceState.PROVISIONED:
858                     t = rm.provision_time
859                 elif state == ResourceState.READY:
860                     t = rm.ready_time
861                 elif state == ResourceState.STARTED:
862                     t = rm.start_time
863                 elif state == ResourceState.STOPPED:
864                     t = rm.stop_time
865                 elif state == ResourceState.RELEASED:
866                     t = rm.release_time
867                 else:
868                     break
869
870                 # time already elapsed since RM changed state
871                 waited = "%fs" % tdiffsec(tnow(), t)
872
873                 # time still to wait
874                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
875
876                 if wait > 0.001:
877                     reschedule = True
878                     delay = "%fs" % wait
879                     break
880
881         return reschedule, delay
882
883     def set_with_conditions(self, name, value, group, state, time):
884         """ Set value 'value' on attribute with name 'name' when 'time' 
885         has elapsed since all elements in 'group' have reached state
886         'state'
887
888         :param name: Name of the attribute to set
889         :type name: str
890         :param name: Value of the attribute to set
891         :type name: str
892         :param group: Group of RMs to wait for (list of guids)
893         :type group: int or list of int
894         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
895         :type state: str
896         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
897         :type time: str
898         """
899
900         reschedule = False
901         delay = self.reschedule_delay 
902
903         ## evaluate if set conditions are met
904
905         # only can set with conditions after the RM is started
906         if self.state != ResourceState.STARTED:
907             reschedule = True
908         else:
909             reschedule, delay = self._needs_reschedule(group, state, time)
910
911         if reschedule:
912             callback = functools.partial(self.set_with_conditions, 
913                     name, value, group, state, time)
914             self.ec.schedule(delay, callback)
915         else:
916             self.set(name, value)
917
918     def start_with_conditions(self):
919         """ Starts RM when all the conditions in self.conditions for
920         action 'START' are satisfied.
921
922         """
923         #import pdb;pdb.set_trace()
924
925         reschedule = False
926         delay = self.reschedule_delay 
927
928
929         ## evaluate if conditions to start are met
930         if self.ec.abort:
931             return 
932
933         # Can only start when RM is either STOPPED or READY
934         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
935             reschedule = True
936             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
937         else:
938             start_conditions = self.conditions.get(ResourceAction.START, [])
939             
940             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
941             
942             # Verify all start conditions are met
943             for (group, state, time) in start_conditions:
944                 # Uncomment for debug
945                 #unmet = []
946                 #for guid in group:
947                 #    rm = self.ec.get_resource(guid)
948                 #    unmet.append((guid, rm._state))
949                 #
950                 #self.debug("---- WAITED STATES ---- %s" % unmet )
951
952                 reschedule, delay = self._needs_reschedule(group, state, time)
953                 if reschedule:
954                     break
955
956         if reschedule:
957             self.ec.schedule(delay, self.start_with_conditions)
958         else:
959             self.debug("----- STARTING ---- ")
960             self.start()
961
962     def stop_with_conditions(self):
963         """ Stops RM when all the conditions in self.conditions for
964         action 'STOP' are satisfied.
965
966         """
967         reschedule = False
968         delay = self.reschedule_delay 
969
970         ## evaluate if conditions to stop are met
971         if self.ec.abort:
972             return 
973
974         # only can stop when RM is STARTED
975         if self.state != ResourceState.STARTED:
976             reschedule = True
977             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
978         else:
979             self.debug(" ---- STOP CONDITIONS ---- %s" % 
980                     self.conditions.get(ResourceAction.STOP))
981
982             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
983             for (group, state, time) in stop_conditions:
984                 reschedule, delay = self._needs_reschedule(group, state, time)
985                 if reschedule:
986                     break
987
988         if reschedule:
989             callback = functools.partial(self.stop_with_conditions)
990             self.ec.schedule(delay, callback)
991         else:
992             self.debug(" ----- STOPPING ---- ") 
993             self.stop()
994
995     def deploy_with_conditions(self):
996         """ Deploy RM when all the conditions in self.conditions for
997         action 'READY' are satisfied.
998
999         """
1000         reschedule = False
1001         delay = self.reschedule_delay 
1002
1003         ## evaluate if conditions to deploy are met
1004         if self.ec.abort:
1005             return 
1006
1007         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
1008         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
1009                 ResourceState.RESERVED, ResourceState.PROVISIONED]:
1010             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
1011             reschedule = True
1012             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
1013         else:
1014             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
1015             
1016             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
1017             
1018             # Verify all start conditions are met
1019             for (group, state, time) in deploy_conditions:
1020                 # Uncomment for debug
1021                 #unmet = []
1022                 #for guid in group:
1023                 #    rm = self.ec.get_resource(guid)
1024                 #    unmet.append((guid, rm._state))
1025                 
1026                 #self.debug("---- WAITED STATES ---- %s" % unmet )
1027
1028                 reschedule, delay = self._needs_reschedule(group, state, time)
1029                 if reschedule:
1030                     break
1031
1032         if reschedule:
1033             self.ec.schedule(delay, self.deploy_with_conditions)
1034         else:
1035             self.debug("----- DEPLOYING ---- ")
1036             self.deploy()
1037
1038     def do_connect(self, guid):
1039         """ Performs actions that need to be taken upon associating RMs.
1040         This method should be redefined when necessary in child classes.
1041         """
1042         pass
1043
1044     def do_disconnect(self, guid):
1045         """ Performs actions that need to be taken upon disassociating RMs.
1046         This method should be redefined when necessary in child classes.
1047         """
1048         pass
1049
1050     def valid_connection(self, guid):
1051         """Checks whether a connection with the other RM
1052         is valid.
1053         This method need to be redefined by each new Resource Manager.
1054
1055         :param guid: Guid of the current Resource Manager
1056         :type guid: int
1057         :rtype:  Boolean
1058
1059         """
1060         # TODO: Validate!
1061         return True
1062
1063     def do_discover(self):
1064         self.set_discovered()
1065
1066     def do_reserve(self):
1067         self.set_reserved()
1068
1069     def do_provision(self):
1070         self.set_provisioned()
1071
1072     def do_configure(self):
1073         pass
1074
1075     def do_start(self):
1076         self.set_started()
1077
1078     def do_stop(self):
1079         self.set_stopped()
1080
1081     def do_deploy(self):
1082         self.set_ready()
1083
1084     def do_release(self):
1085         self.set_released()
1086
1087     def do_fail(self):
1088         self.set_failed()
1089         self.ec.inform_failure(self.guid)
1090
1091     def set_started(self, time = None):
1092         """ Mark ResourceManager as STARTED """
1093         self.set_state(ResourceState.STARTED, "_start_time", time)
1094         self.debug("----- STARTED ---- ")
1095
1096     def set_stopped(self, time = None):
1097         """ Mark ResourceManager as STOPPED """
1098         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1099         self.debug("----- STOPPED ---- ")
1100
1101     def set_ready(self, time = None):
1102         """ Mark ResourceManager as READY """
1103         self.set_state(ResourceState.READY, "_ready_time", time)
1104         self.debug("----- READY ---- ")
1105
1106     def set_released(self, time = None):
1107         """ Mark ResourceManager as REALEASED """
1108         self.set_state(ResourceState.RELEASED, "_release_time", time)
1109
1110         msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
1111         logger = Logger(self._rtype)
1112         logger.debug(msg)
1113
1114     def set_failed(self, time = None):
1115         """ Mark ResourceManager as FAILED """
1116         self.set_state(ResourceState.FAILED, "_failed_time", time)
1117
1118         msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
1119         logger = Logger(self._rtype)
1120         logger.debug(msg)
1121
1122     def set_discovered(self, time = None):
1123         """ Mark ResourceManager as DISCOVERED """
1124         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1125         self.debug("----- DISCOVERED ---- ")
1126
1127     def set_reserved(self, time = None):
1128         """ Mark ResourceManager as RESERVED """
1129         self.set_state(ResourceState.RESERVED, "_reserved_time", time)
1130         self.debug("----- RESERVED ---- ")
1131
1132     def set_provisioned(self, time = None):
1133         """ Mark ResourceManager as PROVISIONED """
1134         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1135         self.debug("----- PROVISIONED ---- ")
1136
1137     def set_state(self, state, state_time_attr, time = None):
1138         """ Set the state of the RM while keeping a trace of the time """
1139
1140         # Ensure that RM state will not change after released
1141         if self._state == ResourceState.RELEASED:
1142             return 
1143
1144         time = time or tnow()
1145         self.set_state_time(state, state_time_attr, time)
1146   
1147     def set_state_time(self, state, state_time_attr, time):
1148         """ Set the time for the RM state change """
1149         setattr(self, state_time_attr, time)
1150         self._state = state
1151
1152 class ResourceFactory(object):
1153     _resource_types = dict()
1154
1155     @classmethod
1156     def resource_types(cls):
1157         """Return the type of the Class"""
1158         return cls._resource_types
1159
1160     @classmethod
1161     def get_resource_type(cls, rtype):
1162         """Return the type of the Class"""
1163         return cls._resource_types.get(rtype)
1164
1165     @classmethod
1166     def register_type(cls, rclass):
1167         """Register a new Ressource Manager"""
1168         cls._resource_types[rclass.get_rtype()] = rclass
1169
1170     @classmethod
1171     def create(cls, rtype, ec, guid):
1172         """Create a new instance of a Ressource Manager"""
1173         rclass = cls._resource_types[rtype]
1174         return rclass(ec, guid)
1175
1176 def populate_factory():
1177     """Find and rgister all available RMs
1178     """
1179     # Once the factory is populated, don't repopulate
1180     if not ResourceFactory.resource_types():
1181         for rclass in find_types():
1182             ResourceFactory.register_type(rclass)
1183
1184 def find_types():
1185     """Look into the different folders to find all the 
1186     availables Resources Managers
1187     """
1188     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1189     search_path = set(search_path.split(" "))
1190    
1191     import inspect
1192     import nepi.resources 
1193     path = os.path.dirname(nepi.resources.__file__)
1194     search_path.add(path)
1195
1196     types = set()
1197
1198     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1199             prefix = "nepi.resources."):
1200
1201         loader = importer.find_module(modname)
1202         
1203         try:
1204             # Notice: Repeated calls to load_module will act as a reload of the module
1205             if modname in sys.modules:
1206                 module = sys.modules.get(modname)
1207             else:
1208                 module = loader.load_module(modname)
1209
1210             for attrname in dir(module):
1211                 if attrname.startswith("_"):
1212                     continue
1213
1214                 attr = getattr(module, attrname)
1215
1216                 if attr == ResourceManager:
1217                     continue
1218
1219                 if not inspect.isclass(attr):
1220                     continue
1221
1222                 if issubclass(attr, ResourceManager):
1223                     types.add(attr)
1224
1225                     if not modname in sys.modules:
1226                         sys.modules[modname] = module
1227
1228         except:
1229             import traceback
1230             import logging
1231             err = traceback.format_exc()
1232             logger = logging.getLogger("Resource.find_types()")
1233             logger.error("Error while loading Resource Managers %s" % err)
1234
1235     return types
1236