e82ced043a006ebc4ba0b32a7de5e42388a4573f
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 reschedule_delay = "1s"
35
36 class ResourceAction:
37     """ Action that a user can order to a Resource Manager
38    
39     """
40     DEPLOY = 0
41     START = 1
42     STOP = 2
43
44 class ResourceState:
45     """ State of a Resource Manager
46    
47     """
48     NEW = 0
49     DISCOVERED = 1
50     PROVISIONED = 2
51     READY = 3
52     STARTED = 4
53     STOPPED = 5
54     FAILED = 6
55     RELEASED = 7
56
57 ResourceState2str = dict({
58     ResourceState.NEW : "NEW",
59     ResourceState.DISCOVERED : "DISCOVERED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes direved from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             import traceback
117             err = traceback.format_exc()
118             self.error(err)
119             self.debug("SETTING guid %d to state FAILED" % self.guid)
120             self.fail()
121             raise
122     
123     return wrapped
124
125 @clsinit
126 class ResourceManager(Logger):
127     """ Base clase for all ResourceManagers. 
128     
129     A ResourceManger is specific to a resource type (e.g. Node, 
130     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
131     OMF, etc).
132
133     The ResourceManager instances are responsible for interacting with
134     and controlling concrete (physical or virtual) resources in the 
135     experimental backends.
136     
137     """
138     _rtype = "Resource"
139     _attributes = None
140     _traces = None
141     _help = None
142     _backend = None
143
144     @classmethod
145     def _register_attribute(cls, attr):
146         """ Resource subclasses will invoke this method to add a 
147         resource attribute
148
149         """
150         
151         cls._attributes[attr.name] = attr
152
153     @classmethod
154     def _remove_attribute(cls, name):
155         """ Resource subclasses will invoke this method to remove a 
156         resource attribute
157
158         """
159         
160         del cls._attributes[name]
161
162     @classmethod
163     def _register_trace(cls, trace):
164         """ Resource subclasses will invoke this method to add a 
165         resource trace
166
167         """
168         
169         cls._traces[trace.name] = trace
170
171     @classmethod
172     def _remove_trace(cls, name):
173         """ Resource subclasses will invoke this method to remove a 
174         resource trace
175
176         """
177         
178         del cls._traces[name]
179
180     @classmethod
181     def _register_attributes(cls):
182         """ Resource subclasses will invoke this method to register
183         resource attributes.
184
185         This method should be overriden in the RMs that define
186         attributes.
187
188         """
189         critical = Attribute("critical", 
190                 "Defines whether the resource is critical. "
191                 "A failure on a critical resource will interrupt "
192                 "the experiment. ",
193                 type = Types.Bool,
194                 default = True,
195                 flags = Flags.Design)
196         hard_release = Attribute("hardRelease", 
197                 "Forces removal of all result files and directories associated "
198                 "to the RM upon resource release. After release the RM will "
199                 "be removed from the EC and the results will not longer be "
200                 "accessible",
201                 type = Types.Bool,
202                 default = False,
203                 flags = Flags.Design)
204
205         cls._register_attribute(critical)
206         cls._register_attribute(hard_release)
207         
208     @classmethod
209     def _register_traces(cls):
210         """ Resource subclasses will invoke this method to register
211         resource traces
212
213         This method should be overriden in the RMs that define traces.
214         
215         """
216         
217         pass
218
219     @classmethod
220     def _clsinit(cls):
221         """ ResourceManager classes have different attributes and traces.
222         Attribute and traces are stored in 'class attribute' dictionaries.
223         When a new ResourceManager class is created, the _clsinit method is 
224         called to create a new instance of those dictionaries and initialize 
225         them.
226         
227         The _clsinit method is called by the clsinit decorator method.
228         
229         """
230         
231         # static template for resource attributes
232         cls._attributes = dict()
233         cls._register_attributes()
234
235         # static template for resource traces
236         cls._traces = dict()
237         cls._register_traces()
238
239     @classmethod
240     def _clsinit_copy(cls):
241         """ Same as _clsinit, except that after creating new instances of the
242         dictionaries it copies all the attributes and traces from the parent 
243         class.
244         
245         The _clsinit_copy method is called by the clsinit_copy decorator method.
246         
247         """
248         # static template for resource attributes
249         cls._attributes = copy.deepcopy(cls._attributes)
250         cls._register_attributes()
251
252         # static template for resource traces
253         cls._traces = copy.deepcopy(cls._traces)
254         cls._register_traces()
255
256     @classmethod
257     def get_rtype(cls):
258         """ Returns the type of the Resource Manager
259
260         """
261         return cls._rtype
262
263     @classmethod
264     def get_attributes(cls):
265         """ Returns a copy of the attributes
266
267         """
268         return copy.deepcopy(cls._attributes.values())
269
270     @classmethod
271     def get_attribute(cls, name):
272         """ Returns a copy of the attribute with name 'name'
273
274         """
275         return copy.deepcopy(cls._attributes[name])
276
277     @classmethod
278     def get_traces(cls):
279         """ Returns a copy of the traces
280
281         """
282         return copy.deepcopy(cls._traces.values())
283
284     @classmethod
285     def get_help(cls):
286         """ Returns the description of the type of Resource
287
288         """
289         return cls._help
290
291     @classmethod
292     def get_backend(cls):
293         """ Returns the identified of the backend (i.e. testbed, environment)
294         for the Resource
295
296         """
297         return cls._backend
298
299     @classmethod
300     def get_global(cls, name):
301         """ Returns the value of a global attribute
302             Global attribute meaning an attribute for 
303             all the resources from a rtype
304
305         :param name: Name of the attribute
306         :type name: str
307         :rtype: str
308         """
309         global_attr = cls._attributes[name]
310         return global_attr.value
311
312     @classmethod
313     def set_global(cls, name, value):
314         """ Set value for a global attribute
315
316         :param name: Name of the attribute
317         :type name: str
318         :param name: Value of the attribute
319         :type name: str
320         """
321         global_attr = cls._attributes[name]
322         global_attr.value = value
323         return value
324
325     def __init__(self, ec, guid):
326         super(ResourceManager, self).__init__(self.get_rtype())
327         
328         self._guid = guid
329         self._ec = weakref.ref(ec)
330         self._connections = set()
331         self._conditions = dict() 
332
333         # the resource instance gets a copy of all attributes
334         self._attrs = copy.deepcopy(self._attributes)
335
336         # the resource instance gets a copy of all traces
337         self._trcs = copy.deepcopy(self._traces)
338
339         # Each resource is placed on a deployment group by the EC
340         # during deployment
341         self.deployment_group = None
342
343         self._start_time = None
344         self._stop_time = None
345         self._discover_time = None
346         self._provision_time = None
347         self._ready_time = None
348         self._release_time = None
349         self._failed_time = None
350
351         self._state = ResourceState.NEW
352
353         # instance lock to synchronize exclusive state change methods (such
354         # as deploy and release methods), in order to prevent them from being 
355         # executed at the same time and corrupt internal resource state
356         self._release_lock = threading.Lock()
357
358     @property
359     def guid(self):
360         """ Returns the global unique identifier of the RM """
361         return self._guid
362
363     @property
364     def ec(self):
365         """ Returns the Experiment Controller of the RM """
366         return self._ec()
367
368     @property
369     def connections(self):
370         """ Returns the set of guids of connected RMs """
371         return self._connections
372
373     @property
374     def conditions(self):
375         """ Returns the conditions to which the RM is subjected to.
376         
377         This method returns a dictionary of conditions lists indexed by
378         a ResourceAction.
379         
380         """
381         return self._conditions
382
383     @property
384     def start_time(self):
385         """ Returns the start time of the RM as a timestamp """
386         return self._start_time
387
388     @property
389     def stop_time(self):
390         """ Returns the stop time of the RM as a timestamp """
391         return self._stop_time
392
393     @property
394     def discover_time(self):
395         """ Returns the discover time of the RM as a timestamp """
396         return self._discover_time
397
398     @property
399     def provision_time(self):
400         """ Returns the provision time of the RM as a timestamp """
401         return self._provision_time
402
403     @property
404     def ready_time(self):
405         """ Returns the deployment time of the RM as a timestamp """
406         return self._ready_time
407
408     @property
409     def release_time(self):
410         """ Returns the release time of the RM as a timestamp """
411         return self._release_time
412
413     @property
414     def failed_time(self):
415         """ Returns the time failure occured for the RM as a timestamp """
416         return self._failed_time
417
418     @property
419     def state(self):
420         """ Get the current state of the RM """
421         return self._state
422
423     def log_message(self, msg):
424         """ Returns the log message formatted with added information.
425
426         :param msg: text message
427         :type msg: str
428         :rtype: str
429
430         """
431         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
432
433     def register_connection(self, guid):
434         """ Registers a connection to the RM identified by guid
435
436         This method should not be overriden. Specific functionality
437         should be added in the do_connect method.
438
439         :param guid: Global unique identified of the RM to connect to
440         :type guid: int
441
442         """
443         if self.valid_connection(guid):
444             self.do_connect(guid)
445             self._connections.add(guid)
446
447     def unregister_connection(self, guid):
448         """ Removes a registered connection to the RM identified by guid
449         
450         This method should not be overriden. Specific functionality
451         should be added in the do_disconnect method.
452
453         :param guid: Global unique identified of the RM to connect to
454         :type guid: int
455
456         """
457         if guid in self._connections:
458             self.do_disconnect(guid)
459             self._connections.remove(guid)
460
461     @failtrap
462     def discover(self):
463         """ Performs resource discovery.
464         
465         This  method is responsible for selecting an individual resource
466         matching user requirements.
467
468         This method should not be overriden directly. Specific functionality
469         should be added in the do_discover method.
470
471         """
472         with self._release_lock:
473             if self._state != ResourceState.RELEASED:
474                 self.do_discover()
475
476     @failtrap
477     def provision(self):
478         """ Performs resource provisioning.
479
480         This  method is responsible for provisioning one resource.
481         After this method has been successfully invoked, the resource
482         should be accessible/controllable by the RM.
483
484         This method should not be overriden directly. Specific functionality
485         should be added in the do_provision method.
486
487         """
488         with self._release_lock:
489             if self._state != ResourceState.RELEASED:
490                 self.do_provision()
491
492     @failtrap
493     def start(self):
494         """ Starts the RM (e.g. launch remote process).
495     
496         There is no standard start behavior. Some RMs will not need to perform
497         any actions upon start.
498
499         This method should not be overriden directly. Specific functionality
500         should be added in the do_start method.
501
502         """
503
504         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
505             self.error("Wrong state %s for start" % self.state)
506             return
507
508         with self._release_lock:
509             if self._state != ResourceState.RELEASED:
510                 self.do_start()
511
512     @failtrap
513     def stop(self):
514         """ Interrupts the RM, stopping any tasks the RM was performing.
515      
516         There is no standard stop behavior. Some RMs will not need to perform
517         any actions upon stop.
518     
519         This method should not be overriden directly. Specific functionality
520         should be added in the do_stop method.
521       
522         """
523         if not self.state in [ResourceState.STARTED]:
524             self.error("Wrong state %s for stop" % self.state)
525             return
526         
527         with self._release_lock:
528             self.do_stop()
529
530     @failtrap
531     def deploy(self):
532         """ Execute all steps required for the RM to reach the state READY.
533
534         This method is responsible for deploying the resource (and invoking 
535         the discover and provision methods).
536  
537         This method should not be overriden directly. Specific functionality
538         should be added in the do_deploy method.
539        
540         """
541         if self.state > ResourceState.READY:
542             self.error("Wrong state %s for deploy" % self.state)
543             return
544
545         with self._release_lock:
546             if self._state != ResourceState.RELEASED:
547                 self.do_deploy()
548
549     def release(self):
550         """ Perform actions to free resources used by the RM.
551   
552         This  method is responsible for releasing resources that were
553         used during the experiment by the RM.
554
555         This method should not be overriden directly. Specific functionality
556         should be added in the do_release method.
557       
558         """
559         with self._release_lock:
560             try:
561                 self.do_release()
562             except:
563                 import traceback
564                 err = traceback.format_exc()
565                 self.error(err)
566
567                 self.set_released()
568
569     def fail(self):
570         """ Sets the RM to state FAILED.
571
572         This method should not be overriden directly. Specific functionality
573         should be added in the do_fail method.
574
575         """
576         with self._release_lock:
577             if self._state != ResourceState.RELEASED:
578                 self.do_fail()
579
580     def set(self, name, value):
581         """ Set the value of the attribute
582
583         :param name: Name of the attribute
584         :type name: str
585         :param name: Value of the attribute
586         :type name: str
587         """
588         attr = self._attrs[name]
589         attr.value = value
590         return value
591
592     def get(self, name):
593         """ Returns the value of the attribute
594
595         :param name: Name of the attribute
596         :type name: str
597         :rtype: str
598         """
599         attr = self._attrs[name]
600
601         """
602         A.Q. Commenting due to performance impact
603         if attr.has_flag(Flags.Global):
604             self.warning( "Attribute %s is global. Use get_global instead." % name)
605         """
606             
607         return attr.value
608
609     def has_changed(self, name):
610         """ Returns the True is the value of the attribute
611             has been modified by the user.
612
613         :param name: Name of the attribute
614         :type name: str
615         :rtype: str
616         """
617         attr = self._attrs[name]
618         return attr.has_changed()
619
620     def has_flag(self, name, flag):
621         """ Returns true if the attribute has the flag 'flag'
622
623         :param flag: Flag to be checked
624         :type flag: Flags
625         """
626         attr = self._attrs[name]
627         return attr.has_flag(flag)
628
629     def has_attribute(self, name):
630         """ Returns true if the RM has an attribute with name
631
632         :param name: name of the attribute
633         :type name: string
634         """
635         return name in self._attrs
636
637     def enable_trace(self, name):
638         """ Explicitly enable trace generation
639
640         :param name: Name of the trace
641         :type name: str
642         """
643         trace = self._trcs[name]
644         trace.enabled = True
645     
646     def trace_enabled(self, name):
647         """Returns True if trace is enables 
648
649         :param name: Name of the trace
650         :type name: str
651         """
652         trace = self._trcs[name]
653         return trace.enabled
654  
655     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
656         """ Get information on collected trace
657
658         :param name: Name of the trace
659         :type name: str
660
661         :param attr: Can be one of:
662                          - TraceAttr.ALL (complete trace content), 
663                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
664                          - TraceAttr.PATH (full path to the trace file),
665                          - TraceAttr.SIZE (size of trace file). 
666         :type attr: str
667
668         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
669         :type name: int
670
671         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
672         :type name: int
673
674         :rtype: str
675         """
676         pass
677
678     def register_condition(self, action, group, state, time = None):
679         """ Registers a condition on the resource manager to allow execution 
680         of 'action' only after 'time' has elapsed from the moment all resources 
681         in 'group' reached state 'state'
682
683         :param action: Action to restrict to condition (either 'START' or 'STOP')
684         :type action: str
685         :param group: Group of RMs to wait for (list of guids)
686         :type group: int or list of int
687         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
688         :type state: str
689         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
690         :type time: str
691
692         """
693
694         if not action in self.conditions:
695             self._conditions[action] = list()
696         
697         conditions = self.conditions.get(action)
698
699         # For each condition to register a tuple of (group, state, time) is 
700         # added to the 'action' list
701         if not isinstance(group, list):
702             group = [group]
703
704         conditions.append((group, state, time))
705
706     def unregister_condition(self, group, action = None):
707         """ Removed conditions for a certain group of guids
708
709         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
710         :type action: str
711
712         :param group: Group of RMs to wait for (list of guids)
713         :type group: int or list of int
714
715         """
716         # For each condition a tuple of (group, state, time) is 
717         # added to the 'action' list
718         if not isinstance(group, list):
719             group = [group]
720
721         for act, conditions in self.conditions.iteritems():
722             if action and act != action:
723                 continue
724
725             for condition in list(conditions):
726                 (grp, state, time) = condition
727
728                 # If there is an intersection between grp and group,
729                 # then remove intersected elements
730                 intsec = set(group).intersection(set(grp))
731                 if intsec:
732                     idx = conditions.index(condition)
733                     newgrp = set(grp)
734                     newgrp.difference_update(intsec)
735                     conditions[idx] = (newgrp, state, time)
736                  
737     def get_connected(self, rtype = None):
738         """ Returns the list of RM with the type 'rtype'
739
740         :param rtype: Type of the RM we look for
741         :type rtype: str
742         :return: list of guid
743         """
744         connected = []
745         rclass = ResourceFactory.get_resource_type(rtype)
746         for guid in self.connections:
747             rm = self.ec.get_resource(guid)
748             if not rtype or isinstance(rm, rclass):
749                 connected.append(rm)
750         return connected
751
752     @failtrap
753     def _needs_reschedule(self, group, state, time):
754         """ Internal method that verify if 'time' has elapsed since 
755         all elements in 'group' have reached state 'state'.
756
757         :param group: Group of RMs to wait for (list of guids)
758         :type group: int or list of int
759         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
760         :type state: str
761         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
762         :type time: str
763
764         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
765         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
766         For the moment, 2m30s is not a correct syntax.
767
768         """
769         reschedule = False
770         delay = reschedule_delay 
771
772         # check state and time elapsed on all RMs
773         for guid in group:
774             rm = self.ec.get_resource(guid)
775             
776             # If one of the RMs this resource needs to wait for has FAILED
777             # and is critical we raise an exception
778             if rm.state == ResourceState.FAILED:
779                 if not rm.get('critical'):
780                     continue
781                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
782                 raise RuntimeError, msg
783
784             # If the RM state is lower than the requested state we must
785             # reschedule (e.g. if RM is READY but we required STARTED).
786             if rm.state < state:
787                 reschedule = True
788                 break
789
790             # If there is a time restriction, we must verify the
791             # restriction is satisfied 
792             if time:
793                 if state == ResourceState.DISCOVERED:
794                     t = rm.discover_time
795                 if state == ResourceState.PROVISIONED:
796                     t = rm.provision_time
797                 elif state == ResourceState.READY:
798                     t = rm.ready_time
799                 elif state == ResourceState.STARTED:
800                     t = rm.start_time
801                 elif state == ResourceState.STOPPED:
802                     t = rm.stop_time
803                 elif state == ResourceState.RELEASED:
804                     t = rm.release_time
805                 else:
806                     break
807
808                 # time already elapsed since RM changed state
809                 waited = "%fs" % tdiffsec(tnow(), t)
810
811                 # time still to wait
812                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
813
814                 if wait > 0.001:
815                     reschedule = True
816                     delay = "%fs" % wait
817                     break
818
819         return reschedule, delay
820
821     def set_with_conditions(self, name, value, group, state, time):
822         """ Set value 'value' on attribute with name 'name' when 'time' 
823         has elapsed since all elements in 'group' have reached state
824         'state'
825
826         :param name: Name of the attribute to set
827         :type name: str
828         :param name: Value of the attribute to set
829         :type name: str
830         :param group: Group of RMs to wait for (list of guids)
831         :type group: int or list of int
832         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
833         :type state: str
834         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
835         :type time: str
836         """
837
838         reschedule = False
839         delay = reschedule_delay 
840
841         ## evaluate if set conditions are met
842
843         # only can set with conditions after the RM is started
844         if self.state != ResourceState.STARTED:
845             reschedule = True
846         else:
847             reschedule, delay = self._needs_reschedule(group, state, time)
848
849         if reschedule:
850             callback = functools.partial(self.set_with_conditions, 
851                     name, value, group, state, time)
852             self.ec.schedule(delay, callback)
853         else:
854             self.set(name, value)
855
856     def start_with_conditions(self):
857         """ Starts RM when all the conditions in self.conditions for
858         action 'START' are satisfied.
859
860         """
861         #import pdb;pdb.set_trace()
862
863         reschedule = False
864         delay = reschedule_delay 
865
866
867         ## evaluate if conditions to start are met
868         if self.ec.abort:
869             return 
870
871         # Can only start when RM is either STOPPED or READY
872         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
873             reschedule = True
874             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
875         else:
876             start_conditions = self.conditions.get(ResourceAction.START, [])
877             
878             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
879             
880             # Verify all start conditions are met
881             for (group, state, time) in start_conditions:
882                 # Uncomment for debug
883                 #unmet = []
884                 #for guid in group:
885                 #    rm = self.ec.get_resource(guid)
886                 #    unmet.append((guid, rm._state))
887                 #
888                 #self.debug("---- WAITED STATES ---- %s" % unmet )
889
890                 reschedule, delay = self._needs_reschedule(group, state, time)
891                 if reschedule:
892                     break
893
894         if reschedule:
895             self.ec.schedule(delay, self.start_with_conditions)
896         else:
897             self.debug("----- STARTING ---- ")
898             self.start()
899
900     def stop_with_conditions(self):
901         """ Stops RM when all the conditions in self.conditions for
902         action 'STOP' are satisfied.
903
904         """
905         reschedule = False
906         delay = reschedule_delay 
907
908         ## evaluate if conditions to stop are met
909         if self.ec.abort:
910             return 
911
912         # only can stop when RM is STARTED
913         if self.state != ResourceState.STARTED:
914             reschedule = True
915             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
916         else:
917             self.debug(" ---- STOP CONDITIONS ---- %s" % 
918                     self.conditions.get(ResourceAction.STOP))
919
920             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
921             for (group, state, time) in stop_conditions:
922                 reschedule, delay = self._needs_reschedule(group, state, time)
923                 if reschedule:
924                     break
925
926         if reschedule:
927             callback = functools.partial(self.stop_with_conditions)
928             self.ec.schedule(delay, callback)
929         else:
930             self.debug(" ----- STOPPING ---- ") 
931             self.stop()
932
933     def deploy_with_conditions(self):
934         """ Deploy RM when all the conditions in self.conditions for
935         action 'READY' are satisfied.
936
937         """
938         reschedule = False
939         delay = reschedule_delay 
940
941         ## evaluate if conditions to deploy are met
942         if self.ec.abort:
943             return 
944
945         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
946         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
947                 ResourceState.PROVISIONED]:
948             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
949             reschedule = True
950             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
951         else:
952             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
953             
954             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
955             
956             # Verify all start conditions are met
957             for (group, state, time) in deploy_conditions:
958                 # Uncomment for debug
959                 #unmet = []
960                 #for guid in group:
961                 #    rm = self.ec.get_resource(guid)
962                 #    unmet.append((guid, rm._state))
963                 
964                 #self.debug("---- WAITED STATES ---- %s" % unmet )
965
966                 reschedule, delay = self._needs_reschedule(group, state, time)
967                 if reschedule:
968                     break
969
970         if reschedule:
971             self.ec.schedule(delay, self.deploy_with_conditions)
972         else:
973             self.debug("----- DEPLOYING ---- ")
974             self.deploy()
975
976     def do_connect(self, guid):
977         """ Performs actions that need to be taken upon associating RMs.
978         This method should be redefined when necessary in child classes.
979         """
980         pass
981
982     def do_disconnect(self, guid):
983         """ Performs actions that need to be taken upon disassociating RMs.
984         This method should be redefined when necessary in child classes.
985         """
986         pass
987
988     def valid_connection(self, guid):
989         """Checks whether a connection with the other RM
990         is valid.
991         This method need to be redefined by each new Resource Manager.
992
993         :param guid: Guid of the current Resource Manager
994         :type guid: int
995         :rtype:  Boolean
996
997         """
998         # TODO: Validate!
999         return True
1000
1001     def do_discover(self):
1002         self.set_discovered()
1003
1004     def do_provision(self):
1005         self.set_provisioned()
1006
1007     def do_start(self):
1008         self.set_started()
1009
1010     def do_stop(self):
1011         self.set_stopped()
1012
1013     def do_deploy(self):
1014         self.set_ready()
1015
1016     def do_release(self):
1017         self.set_released()
1018
1019     def do_fail(self):
1020         self.set_failed()
1021         self.ec.inform_failure(self.guid)
1022
1023     def set_started(self, time = None):
1024         """ Mark ResourceManager as STARTED """
1025         self.set_state(ResourceState.STARTED, "_start_time", time)
1026         self.debug("----- STARTED ---- ")
1027
1028     def set_stopped(self, time = None):
1029         """ Mark ResourceManager as STOPPED """
1030         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1031         self.debug("----- STOPPED ---- ")
1032
1033     def set_ready(self, time = None):
1034         """ Mark ResourceManager as READY """
1035         self.set_state(ResourceState.READY, "_ready_time", time)
1036         self.debug("----- READY ---- ")
1037
1038     def set_released(self, time = None):
1039         """ Mark ResourceManager as REALEASED """
1040         self.set_state(ResourceState.RELEASED, "_release_time", time)
1041         self.debug("----- RELEASED ---- ")
1042
1043     def set_failed(self, time = None):
1044         """ Mark ResourceManager as FAILED """
1045         self.set_state(ResourceState.FAILED, "_failed_time", time)
1046         self.debug("----- FAILED ---- ")
1047
1048     def set_discovered(self, time = None):
1049         """ Mark ResourceManager as DISCOVERED """
1050         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1051         self.debug("----- DISCOVERED ---- ")
1052
1053     def set_provisioned(self, time = None):
1054         """ Mark ResourceManager as PROVISIONED """
1055         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1056         self.debug("----- PROVISIONED ---- ")
1057
1058     def set_state(self, state, state_time_attr, time = None):
1059         """ Set the state of the RM while keeping a trace of the time """
1060
1061         # Ensure that RM state will not change after released
1062         if self._state == ResourceState.RELEASED:
1063             return 
1064
1065         time = time or tnow()
1066         self.set_state_time(state, state_time_attr, time)
1067   
1068     def set_state_time(self, state, state_time_attr, time):
1069         """ Set the time for the RM state change """
1070         setattr(self, state_time_attr, time)
1071         self._state = state
1072
1073 class ResourceFactory(object):
1074     _resource_types = dict()
1075
1076     @classmethod
1077     def resource_types(cls):
1078         """Return the type of the Class"""
1079         return cls._resource_types
1080
1081     @classmethod
1082     def get_resource_type(cls, rtype):
1083         """Return the type of the Class"""
1084         return cls._resource_types.get(rtype)
1085
1086     @classmethod
1087     def register_type(cls, rclass):
1088         """Register a new Ressource Manager"""
1089         cls._resource_types[rclass.get_rtype()] = rclass
1090
1091     @classmethod
1092     def create(cls, rtype, ec, guid):
1093         """Create a new instance of a Ressource Manager"""
1094         rclass = cls._resource_types[rtype]
1095         return rclass(ec, guid)
1096
1097 def populate_factory():
1098     """Find and rgister all available RMs
1099     """
1100     # Once the factory is populated, don't repopulate
1101     if not ResourceFactory.resource_types():
1102         for rclass in find_types():
1103             ResourceFactory.register_type(rclass)
1104
1105 def find_types():
1106     """Look into the different folders to find all the 
1107     availables Resources Managers
1108     """
1109     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1110     search_path = set(search_path.split(" "))
1111    
1112     import inspect
1113     import nepi.resources 
1114     path = os.path.dirname(nepi.resources.__file__)
1115     search_path.add(path)
1116
1117     types = set()
1118
1119     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1120             prefix = "nepi.resources."):
1121
1122         loader = importer.find_module(modname)
1123         
1124         try:
1125             # Notice: Repeated calls to load_module will act as a reload of the module
1126             if modname in sys.modules:
1127                 module = sys.modules.get(modname)
1128             else:
1129                 module = loader.load_module(modname)
1130
1131             for attrname in dir(module):
1132                 if attrname.startswith("_"):
1133                     continue
1134
1135                 attr = getattr(module, attrname)
1136
1137                 if attr == ResourceManager:
1138                     continue
1139
1140                 if not inspect.isclass(attr):
1141                     continue
1142
1143                 if issubclass(attr, ResourceManager):
1144                     types.add(attr)
1145
1146                     if not modname in sys.modules:
1147                         sys.modules[modname] = module
1148
1149         except:
1150             import traceback
1151             import logging
1152             err = traceback.format_exc()
1153             logger = logging.getLogger("Resource.find_types()")
1154             logger.error("Error while loading Resource Managers %s" % err)
1155
1156     return types
1157