enable dce in simulation is now automatic
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 reschedule_delay = "1s"
35
36 class ResourceAction:
37     """ Action that a user can order to a Resource Manager
38    
39     """
40     DEPLOY = 0
41     START = 1
42     STOP = 2
43
44 class ResourceState:
45     """ State of a Resource Manager
46    
47     """
48     NEW = 0
49     DISCOVERED = 1
50     PROVISIONED = 2
51     READY = 3
52     STARTED = 4
53     STOPPED = 5
54     FAILED = 6
55     RELEASED = 7
56
57 ResourceState2str = dict({
58     ResourceState.NEW : "NEW",
59     ResourceState.DISCOVERED : "DISCOVERED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes direved from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             import traceback
117             err = traceback.format_exc()
118             self.error(err)
119             self.debug("SETTING guid %d to state FAILED" % self.guid)
120             self.fail()
121             raise
122     
123     return wrapped
124
125 @clsinit
126 class ResourceManager(Logger):
127     """ Base clase for all ResourceManagers. 
128     
129     A ResourceManger is specific to a resource type (e.g. Node, 
130     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
131     OMF, etc).
132
133     The ResourceManager instances are responsible for interacting with
134     and controlling concrete (physical or virtual) resources in the 
135     experimental backends.
136     
137     """
138     _rtype = "Resource"
139     _attributes = None
140     _traces = None
141     _help = None
142     _backend = None
143
144     @classmethod
145     def _register_attribute(cls, attr):
146         """ Resource subclasses will invoke this method to add a 
147         resource attribute
148
149         """
150         
151         cls._attributes[attr.name] = attr
152
153     @classmethod
154     def _remove_attribute(cls, name):
155         """ Resource subclasses will invoke this method to remove a 
156         resource attribute
157
158         """
159         
160         del cls._attributes[name]
161
162     @classmethod
163     def _register_trace(cls, trace):
164         """ Resource subclasses will invoke this method to add a 
165         resource trace
166
167         """
168         
169         cls._traces[trace.name] = trace
170
171     @classmethod
172     def _remove_trace(cls, name):
173         """ Resource subclasses will invoke this method to remove a 
174         resource trace
175
176         """
177         
178         del cls._traces[name]
179
180     @classmethod
181     def _register_attributes(cls):
182         """ Resource subclasses will invoke this method to register
183         resource attributes.
184
185         This method should be overriden in the RMs that define
186         attributes.
187
188         """
189         critical = Attribute("critical", 
190                 "Defines whether the resource is critical. "
191                 "A failure on a critical resource will interrupt "
192                 "the experiment. ",
193                 type = Types.Bool,
194                 default = True,
195                 flags = Flags.Design)
196         hard_release = Attribute("hardRelease", 
197                 "Forces removal of all result files and directories associated "
198                 "to the RM upon resource release. After release the RM will "
199                 "be removed from the EC and the results will not longer be "
200                 "accessible",
201                 type = Types.Bool,
202                 default = False,
203                 flags = Flags.Design)
204
205         cls._register_attribute(critical)
206         cls._register_attribute(hard_release)
207         
208     @classmethod
209     def _register_traces(cls):
210         """ Resource subclasses will invoke this method to register
211         resource traces
212
213         This method should be overriden in the RMs that define traces.
214         
215         """
216         
217         pass
218
219     @classmethod
220     def _clsinit(cls):
221         """ ResourceManager classes have different attributes and traces.
222         Attribute and traces are stored in 'class attribute' dictionaries.
223         When a new ResourceManager class is created, the _clsinit method is 
224         called to create a new instance of those dictionaries and initialize 
225         them.
226         
227         The _clsinit method is called by the clsinit decorator method.
228         
229         """
230         
231         # static template for resource attributes
232         cls._attributes = dict()
233         cls._register_attributes()
234
235         # static template for resource traces
236         cls._traces = dict()
237         cls._register_traces()
238
239     @classmethod
240     def _clsinit_copy(cls):
241         """ Same as _clsinit, except that after creating new instances of the
242         dictionaries it copies all the attributes and traces from the parent 
243         class.
244         
245         The _clsinit_copy method is called by the clsinit_copy decorator method.
246         
247         """
248         # static template for resource attributes
249         cls._attributes = copy.deepcopy(cls._attributes)
250         cls._register_attributes()
251
252         # static template for resource traces
253         cls._traces = copy.deepcopy(cls._traces)
254         cls._register_traces()
255
256     @classmethod
257     def get_rtype(cls):
258         """ Returns the type of the Resource Manager
259
260         """
261         return cls._rtype
262
263     @classmethod
264     def get_attributes(cls):
265         """ Returns a copy of the attributes
266
267         """
268         return copy.deepcopy(cls._attributes.values())
269
270     @classmethod
271     def get_attribute(cls, name):
272         """ Returns a copy of the attribute with name 'name'
273
274         """
275         return copy.deepcopy(cls._attributes[name])
276
277
278     @classmethod
279     def get_traces(cls):
280         """ Returns a copy of the traces
281
282         """
283         return copy.deepcopy(cls._traces.values())
284
285     @classmethod
286     def get_help(cls):
287         """ Returns the description of the type of Resource
288
289         """
290         return cls._help
291
292     @classmethod
293     def get_backend(cls):
294         """ Returns the identified of the backend (i.e. testbed, environment)
295         for the Resource
296
297         """
298         return cls._backend
299
300     @classmethod
301     def get_global(cls, name):
302         """ Returns the value of a global attribute
303             Global attribute meaning an attribute for 
304             all the resources from a rtype
305
306         :param name: Name of the attribute
307         :type name: str
308         :rtype: str
309         """
310         global_attr = cls._attributes[name]
311         return global_attr.value
312
313     @classmethod
314     def set_global(cls, name, value):
315         """ Set value for a global attribute
316
317         :param name: Name of the attribute
318         :type name: str
319         :param name: Value of the attribute
320         :type name: str
321         """
322         global_attr = cls._attributes[name]
323         global_attr.value = value
324         return value
325
326     def __init__(self, ec, guid):
327         super(ResourceManager, self).__init__(self.get_rtype())
328         
329         self._guid = guid
330         self._ec = weakref.ref(ec)
331         self._connections = set()
332         self._conditions = dict() 
333
334         # the resource instance gets a copy of all attributes
335         self._attrs = copy.deepcopy(self._attributes)
336
337         # the resource instance gets a copy of all traces
338         self._trcs = copy.deepcopy(self._traces)
339
340         # Each resource is placed on a deployment group by the EC
341         # during deployment
342         self.deployment_group = None
343
344         self._start_time = None
345         self._stop_time = None
346         self._discover_time = None
347         self._provision_time = None
348         self._ready_time = None
349         self._release_time = None
350         self._failed_time = None
351
352         self._state = ResourceState.NEW
353
354         # instance lock to synchronize exclusive state change methods (such
355         # as deploy and release methods), in order to prevent them from being 
356         # executed at the same time
357         self._release_lock = threading.Lock()
358
359     @property
360     def guid(self):
361         """ Returns the global unique identifier of the RM """
362         return self._guid
363
364     @property
365     def ec(self):
366         """ Returns the Experiment Controller of the RM """
367         return self._ec()
368
369     @property
370     def connections(self):
371         """ Returns the set of guids of connected RMs """
372         return self._connections
373
374     @property
375     def conditions(self):
376         """ Returns the conditions to which the RM is subjected to.
377         
378         This method returns a dictionary of conditions lists indexed by
379         a ResourceAction.
380         
381         """
382         return self._conditions
383
384     @property
385     def start_time(self):
386         """ Returns the start time of the RM as a timestamp """
387         return self._start_time
388
389     @property
390     def stop_time(self):
391         """ Returns the stop time of the RM as a timestamp """
392         return self._stop_time
393
394     @property
395     def discover_time(self):
396         """ Returns the discover time of the RM as a timestamp """
397         return self._discover_time
398
399     @property
400     def provision_time(self):
401         """ Returns the provision time of the RM as a timestamp """
402         return self._provision_time
403
404     @property
405     def ready_time(self):
406         """ Returns the deployment time of the RM as a timestamp """
407         return self._ready_time
408
409     @property
410     def release_time(self):
411         """ Returns the release time of the RM as a timestamp """
412         return self._release_time
413
414     @property
415     def failed_time(self):
416         """ Returns the time failure occured for the RM as a timestamp """
417         return self._failed_time
418
419     @property
420     def state(self):
421         """ Get the current state of the RM """
422         return self._state
423
424     def log_message(self, msg):
425         """ Returns the log message formatted with added information.
426
427         :param msg: text message
428         :type msg: str
429         :rtype: str
430
431         """
432         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
433
434     def register_connection(self, guid):
435         """ Registers a connection to the RM identified by guid
436
437         This method should not be overriden. Specific functionality
438         should be added in the do_connect method.
439
440         :param guid: Global unique identified of the RM to connect to
441         :type guid: int
442
443         """
444         if self.valid_connection(guid):
445             self.do_connect(guid)
446             self._connections.add(guid)
447
448     def unregister_connection(self, guid):
449         """ Removes a registered connection to the RM identified by guid
450         
451         This method should not be overriden. Specific functionality
452         should be added in the do_disconnect method.
453
454         :param guid: Global unique identified of the RM to connect to
455         :type guid: int
456
457         """
458         if guid in self._connections:
459             self.do_disconnect(guid)
460             self._connections.remove(guid)
461
462     @failtrap
463     def discover(self):
464         """ Performs resource discovery.
465         
466         This  method is responsible for selecting an individual resource
467         matching user requirements.
468
469         This method should not be overriden directly. Specific functionality
470         should be added in the do_discover method.
471
472         """
473         with self._release_lock:
474             if self._state != ResourceState.RELEASED:
475                 self.do_discover()
476
477     @failtrap
478     def provision(self):
479         """ Performs resource provisioning.
480
481         This  method is responsible for provisioning one resource.
482         After this method has been successfully invoked, the resource
483         should be accessible/controllable by the RM.
484
485         This method should not be overriden directly. Specific functionality
486         should be added in the do_provision method.
487
488         """
489         with self._release_lock:
490             if self._state != ResourceState.RELEASED:
491                 self.do_provision()
492
493     @failtrap
494     def start(self):
495         """ Starts the RM (e.g. launch remote process).
496     
497         There is no standard start behavior. Some RMs will not need to perform
498         any actions upon start.
499
500         This method should not be overriden directly. Specific functionality
501         should be added in the do_start method.
502
503         """
504
505         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
506             self.error("Wrong state %s for start" % self.state)
507             return
508
509         with self._release_lock:
510             if self._state != ResourceState.RELEASED:
511                 self.do_start()
512
513     @failtrap
514     def stop(self):
515         """ Interrupts the RM, stopping any tasks the RM was performing.
516      
517         There is no standard stop behavior. Some RMs will not need to perform
518         any actions upon stop.
519     
520         This method should not be overriden directly. Specific functionality
521         should be added in the do_stop method.
522       
523         """
524         if not self.state in [ResourceState.STARTED]:
525             self.error("Wrong state %s for stop" % self.state)
526             return
527         
528         with self._release_lock:
529             self.do_stop()
530
531     @failtrap
532     def deploy(self):
533         """ Execute all steps required for the RM to reach the state READY.
534
535         This method is responsible for deploying the resource (and invoking 
536         the discover and provision methods).
537  
538         This method should not be overriden directly. Specific functionality
539         should be added in the do_deploy method.
540        
541         """
542         if self.state > ResourceState.READY:
543             self.error("Wrong state %s for deploy" % self.state)
544             return
545
546         with self._release_lock:
547             if self._state != ResourceState.RELEASED:
548                 self.do_deploy()
549
550     def release(self):
551         """ Perform actions to free resources used by the RM.
552   
553         This  method is responsible for releasing resources that were
554         used during the experiment by the RM.
555
556         This method should not be overriden directly. Specific functionality
557         should be added in the do_release method.
558       
559         """
560         with self._release_lock:
561             try:
562                 self.do_release()
563             except:
564                 import traceback
565                 err = traceback.format_exc()
566                 self.error(err)
567
568                 self.set_released()
569
570     def fail(self):
571         """ Sets the RM to state FAILED.
572
573         This method should not be overriden directly. Specific functionality
574         should be added in the do_fail method.
575
576         """
577         with self._release_lock:
578             if self._state != ResourceState.RELEASED:
579                 self.do_fail()
580
581     def set(self, name, value):
582         """ Set the value of the attribute
583
584         :param name: Name of the attribute
585         :type name: str
586         :param name: Value of the attribute
587         :type name: str
588         """
589         attr = self._attrs[name]
590         attr.value = value
591         return value
592
593     def get(self, name):
594         """ Returns the value of the attribute
595
596         :param name: Name of the attribute
597         :type name: str
598         :rtype: str
599         """
600         attr = self._attrs[name]
601         if attr.has_flag(Flags.Global):
602             self.warning( "Attribute %s is global. Use get_global instead." % name)
603             
604         return attr.value
605
606     def has_changed(self, name):
607         """ Returns the True is the value of the attribute
608             has been modified by the user.
609
610         :param name: Name of the attribute
611         :type name: str
612         :rtype: str
613         """
614         attr = self._attrs[name]
615         return attr.has_changed()
616
617     def has_flag(self, name, flag):
618         """ Returns true if the attribute has the flag 'flag'
619
620         :param flag: Flag to be checked
621         :type flag: Flags
622         """
623         attr = self._attrs[name]
624         return attr.has_flag(flag)
625
626     def has_attribute(self, name):
627         """ Returns true if the RM has an attribute with name
628
629         :param name: name of the attribute
630         :type name: string
631         """
632         return name in self._attrs
633
634     def enable_trace(self, name):
635         """ Explicitly enable trace generation
636
637         :param name: Name of the trace
638         :type name: str
639         """
640         trace = self._trcs[name]
641         trace.enabled = True
642     
643     def trace_enabled(self, name):
644         """Returns True if trace is enables 
645
646         :param name: Name of the trace
647         :type name: str
648         """
649         trace = self._trcs[name]
650         return trace.enabled
651  
652     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
653         """ Get information on collected trace
654
655         :param name: Name of the trace
656         :type name: str
657
658         :param attr: Can be one of:
659                          - TraceAttr.ALL (complete trace content), 
660                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
661                          - TraceAttr.PATH (full path to the trace file),
662                          - TraceAttr.SIZE (size of trace file). 
663         :type attr: str
664
665         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
666         :type name: int
667
668         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
669         :type name: int
670
671         :rtype: str
672         """
673         pass
674
675     def register_condition(self, action, group, state, time = None):
676         """ Registers a condition on the resource manager to allow execution 
677         of 'action' only after 'time' has elapsed from the moment all resources 
678         in 'group' reached state 'state'
679
680         :param action: Action to restrict to condition (either 'START' or 'STOP')
681         :type action: str
682         :param group: Group of RMs to wait for (list of guids)
683         :type group: int or list of int
684         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
685         :type state: str
686         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
687         :type time: str
688
689         """
690
691         if not action in self.conditions:
692             self._conditions[action] = list()
693         
694         conditions = self.conditions.get(action)
695
696         # For each condition to register a tuple of (group, state, time) is 
697         # added to the 'action' list
698         if not isinstance(group, list):
699             group = [group]
700
701         conditions.append((group, state, time))
702
703     def unregister_condition(self, group, action = None):
704         """ Removed conditions for a certain group of guids
705
706         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
707         :type action: str
708
709         :param group: Group of RMs to wait for (list of guids)
710         :type group: int or list of int
711
712         """
713         # For each condition a tuple of (group, state, time) is 
714         # added to the 'action' list
715         if not isinstance(group, list):
716             group = [group]
717
718         for act, conditions in self.conditions.iteritems():
719             if action and act != action:
720                 continue
721
722             for condition in list(conditions):
723                 (grp, state, time) = condition
724
725                 # If there is an intersection between grp and group,
726                 # then remove intersected elements
727                 intsec = set(group).intersection(set(grp))
728                 if intsec:
729                     idx = conditions.index(condition)
730                     newgrp = set(grp)
731                     newgrp.difference_update(intsec)
732                     conditions[idx] = (newgrp, state, time)
733                  
734     def get_connected(self, rtype = None):
735         """ Returns the list of RM with the type 'rtype'
736
737         :param rtype: Type of the RM we look for
738         :type rtype: str
739         :return: list of guid
740         """
741         connected = []
742         rclass = ResourceFactory.get_resource_type(rtype)
743         for guid in self.connections:
744             rm = self.ec.get_resource(guid)
745             if not rtype or isinstance(rm, rclass):
746                 connected.append(rm)
747         return connected
748
749     @failtrap
750     def _needs_reschedule(self, group, state, time):
751         """ Internal method that verify if 'time' has elapsed since 
752         all elements in 'group' have reached state 'state'.
753
754         :param group: Group of RMs to wait for (list of guids)
755         :type group: int or list of int
756         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
757         :type state: str
758         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
759         :type time: str
760
761         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
762         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
763         For the moment, 2m30s is not a correct syntax.
764
765         """
766         reschedule = False
767         delay = reschedule_delay 
768
769         # check state and time elapsed on all RMs
770         for guid in group:
771             rm = self.ec.get_resource(guid)
772             
773             # If one of the RMs this resource needs to wait for has FAILED
774             # and is critical we raise an exception
775             if rm.state == ResourceState.FAILED:
776                 if not rm.get('critical'):
777                     continue
778                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
779                 raise RuntimeError, msg
780
781             # If the RM state is lower than the requested state we must
782             # reschedule (e.g. if RM is READY but we required STARTED).
783             if rm.state < state:
784                 reschedule = True
785                 break
786
787             # If there is a time restriction, we must verify the
788             # restriction is satisfied 
789             if time:
790                 if state == ResourceState.DISCOVERED:
791                     t = rm.discover_time
792                 if state == ResourceState.PROVISIONED:
793                     t = rm.provision_time
794                 elif state == ResourceState.READY:
795                     t = rm.ready_time
796                 elif state == ResourceState.STARTED:
797                     t = rm.start_time
798                 elif state == ResourceState.STOPPED:
799                     t = rm.stop_time
800                 elif state == ResourceState.RELEASED:
801                     t = rm.release_time
802                 else:
803                     break
804
805                 # time already elapsed since RM changed state
806                 waited = "%fs" % tdiffsec(tnow(), t)
807
808                 # time still to wait
809                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
810
811                 if wait > 0.001:
812                     reschedule = True
813                     delay = "%fs" % wait
814                     break
815
816         return reschedule, delay
817
818     def set_with_conditions(self, name, value, group, state, time):
819         """ Set value 'value' on attribute with name 'name' when 'time' 
820         has elapsed since all elements in 'group' have reached state
821         'state'
822
823         :param name: Name of the attribute to set
824         :type name: str
825         :param name: Value of the attribute to set
826         :type name: str
827         :param group: Group of RMs to wait for (list of guids)
828         :type group: int or list of int
829         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
830         :type state: str
831         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
832         :type time: str
833         """
834
835         reschedule = False
836         delay = reschedule_delay 
837
838         ## evaluate if set conditions are met
839
840         # only can set with conditions after the RM is started
841         if self.state != ResourceState.STARTED:
842             reschedule = True
843         else:
844             reschedule, delay = self._needs_reschedule(group, state, time)
845
846         if reschedule:
847             callback = functools.partial(self.set_with_conditions, 
848                     name, value, group, state, time)
849             self.ec.schedule(delay, callback)
850         else:
851             self.set(name, value)
852
853     def start_with_conditions(self):
854         """ Starts RM when all the conditions in self.conditions for
855         action 'START' are satisfied.
856
857         """
858         #import pdb;pdb.set_trace()
859
860         reschedule = False
861         delay = reschedule_delay 
862
863
864         ## evaluate if conditions to start are met
865         if self.ec.abort:
866             return 
867
868         # Can only start when RM is either STOPPED or READY
869         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
870             reschedule = True
871             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
872         else:
873             start_conditions = self.conditions.get(ResourceAction.START, [])
874             
875             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
876             
877             # Verify all start conditions are met
878             for (group, state, time) in start_conditions:
879                 # Uncomment for debug
880                 unmet = []
881                 for guid in group:
882                     rm = self.ec.get_resource(guid)
883                     unmet.append((guid, rm._state))
884                 
885                 self.debug("---- WAITED STATES ---- %s" % unmet )
886
887                 reschedule, delay = self._needs_reschedule(group, state, time)
888                 if reschedule:
889                     break
890
891         if reschedule:
892             self.ec.schedule(delay, self.start_with_conditions)
893         else:
894             self.debug("----- STARTING ---- ")
895             self.start()
896
897     def stop_with_conditions(self):
898         """ Stops RM when all the conditions in self.conditions for
899         action 'STOP' are satisfied.
900
901         """
902         reschedule = False
903         delay = reschedule_delay 
904
905         ## evaluate if conditions to stop are met
906         if self.ec.abort:
907             return 
908
909         # only can stop when RM is STARTED
910         if self.state != ResourceState.STARTED:
911             reschedule = True
912             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
913         else:
914             self.debug(" ---- STOP CONDITIONS ---- %s" % 
915                     self.conditions.get(ResourceAction.STOP))
916
917             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
918             for (group, state, time) in stop_conditions:
919                 reschedule, delay = self._needs_reschedule(group, state, time)
920                 if reschedule:
921                     break
922
923         if reschedule:
924             callback = functools.partial(self.stop_with_conditions)
925             self.ec.schedule(delay, callback)
926         else:
927             self.debug(" ----- STOPPING ---- ") 
928             self.stop()
929
930     def deploy_with_conditions(self):
931         """ Deploy RM when all the conditions in self.conditions for
932         action 'READY' are satisfied.
933
934         """
935         reschedule = False
936         delay = reschedule_delay 
937
938         ## evaluate if conditions to deploy are met
939         if self.ec.abort:
940             return 
941
942         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
943         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
944                 ResourceState.PROVISIONED]:
945             reschedule = True
946             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
947         else:
948             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
949             
950             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
951             
952             # Verify all start conditions are met
953             for (group, state, time) in deploy_conditions:
954                 # Uncomment for debug
955                 #unmet = []
956                 #for guid in group:
957                 #    rm = self.ec.get_resource(guid)
958                 #    unmet.append((guid, rm._state))
959                 
960                 #self.debug("---- WAITED STATES ---- %s" % unmet )
961
962                 reschedule, delay = self._needs_reschedule(group, state, time)
963                 if reschedule:
964                     break
965
966         if reschedule:
967             self.ec.schedule(delay, self.deploy_with_conditions)
968         else:
969             self.debug("----- DEPLOYING ---- ")
970             self.deploy()
971
972     def do_connect(self, guid):
973         """ Performs actions that need to be taken upon associating RMs.
974         This method should be redefined when necessary in child classes.
975         """
976         pass
977
978     def do_disconnect(self, guid):
979         """ Performs actions that need to be taken upon disassociating RMs.
980         This method should be redefined when necessary in child classes.
981         """
982         pass
983
984     def valid_connection(self, guid):
985         """Checks whether a connection with the other RM
986         is valid.
987         This method need to be redefined by each new Resource Manager.
988
989         :param guid: Guid of the current Resource Manager
990         :type guid: int
991         :rtype:  Boolean
992
993         """
994         # TODO: Validate!
995         return True
996
997     def do_discover(self):
998         self.set_discovered()
999
1000     def do_provision(self):
1001         self.set_provisioned()
1002
1003     def do_start(self):
1004         self.set_started()
1005
1006     def do_stop(self):
1007         self.set_stopped()
1008
1009     def do_deploy(self):
1010         self.set_ready()
1011
1012     def do_release(self):
1013         self.set_released()
1014
1015     def do_fail(self):
1016         self.set_failed()
1017
1018     def set_started(self):
1019         """ Mark ResourceManager as STARTED """
1020         self.set_state(ResourceState.STARTED, "_start_time")
1021         self.debug("----- STARTED ---- ")
1022
1023     def set_stopped(self):
1024         """ Mark ResourceManager as STOPPED """
1025         self.set_state(ResourceState.STOPPED, "_stop_time")
1026         self.debug("----- STOPPED ---- ")
1027
1028     def set_ready(self):
1029         """ Mark ResourceManager as READY """
1030         self.set_state(ResourceState.READY, "_ready_time")
1031         self.debug("----- READY ---- ")
1032
1033     def set_released(self):
1034         """ Mark ResourceManager as REALEASED """
1035         self.set_state(ResourceState.RELEASED, "_release_time")
1036         self.debug("----- RELEASED ---- ")
1037
1038     def set_failed(self):
1039         """ Mark ResourceManager as FAILED """
1040         self.set_state(ResourceState.FAILED, "_failed_time")
1041         self.debug("----- FAILED ---- ")
1042
1043     def set_discovered(self):
1044         """ Mark ResourceManager as DISCOVERED """
1045         self.set_state(ResourceState.DISCOVERED, "_discover_time")
1046         self.debug("----- DISCOVERED ---- ")
1047
1048     def set_provisioned(self):
1049         """ Mark ResourceManager as PROVISIONED """
1050         self.set_state(ResourceState.PROVISIONED, "_provision_time")
1051         self.debug("----- PROVISIONED ---- ")
1052
1053     def set_state(self, state, state_time_attr):
1054         """ Set the state of the RM while keeping a trace of the time """
1055
1056         # Ensure that RM state will not change after released
1057         if self._state == ResourceState.RELEASED:
1058             return 
1059    
1060         setattr(self, state_time_attr, tnow())
1061         self._state = state
1062
1063 class ResourceFactory(object):
1064     _resource_types = dict()
1065
1066     @classmethod
1067     def resource_types(cls):
1068         """Return the type of the Class"""
1069         return cls._resource_types
1070
1071     @classmethod
1072     def get_resource_type(cls, rtype):
1073         """Return the type of the Class"""
1074         return cls._resource_types.get(rtype)
1075
1076     @classmethod
1077     def register_type(cls, rclass):
1078         """Register a new Ressource Manager"""
1079         cls._resource_types[rclass.get_rtype()] = rclass
1080
1081     @classmethod
1082     def create(cls, rtype, ec, guid):
1083         """Create a new instance of a Ressource Manager"""
1084         rclass = cls._resource_types[rtype]
1085         return rclass(ec, guid)
1086
1087 def populate_factory():
1088     """Register all the possible RM that exists in the current version of Nepi.
1089     """
1090     # Once the factory is populated, don't repopulate
1091     if not ResourceFactory.resource_types():
1092         for rclass in find_types():
1093             ResourceFactory.register_type(rclass)
1094
1095 def find_types():
1096     """Look into the different folders to find all the 
1097     availables Resources Managers
1098     """
1099     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1100     search_path = set(search_path.split(" "))
1101    
1102     import inspect
1103     import nepi.resources 
1104     path = os.path.dirname(nepi.resources.__file__)
1105     search_path.add(path)
1106
1107     types = set()
1108
1109     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1110             prefix = "nepi.resources."):
1111
1112         loader = importer.find_module(modname)
1113         
1114         try:
1115             # Notice: Repeated calls to load_module will act as a reload of the module
1116             if modname in sys.modules:
1117                 module = sys.modules.get(modname)
1118             else:
1119                 module = loader.load_module(modname)
1120
1121             for attrname in dir(module):
1122                 if attrname.startswith("_"):
1123                     continue
1124
1125                 attr = getattr(module, attrname)
1126
1127                 if attr == ResourceManager:
1128                     continue
1129
1130                 if not inspect.isclass(attr):
1131                     continue
1132
1133                 if issubclass(attr, ResourceManager):
1134                     types.add(attr)
1135
1136                     if not modname in sys.modules:
1137                         sys.modules[modname] = module
1138
1139         except:
1140             import traceback
1141             import logging
1142             err = traceback.format_exc()
1143             logger = logging.getLogger("Resource.find_types()")
1144             logger.error("Error while loading Resource Managers %s" % err)
1145
1146     return types
1147