2d24dd1e5926374a41d684e47f7917e771eb6f2c
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 reschedule_delay = "1s"
35
36 class ResourceAction:
37     """ Action that a user can order to a Resource Manager
38    
39     """
40     DEPLOY = 0
41     START = 1
42     STOP = 2
43
44 class ResourceState:
45     """ State of a Resource Manager
46    
47     """
48     NEW = 0
49     DISCOVERED = 1
50     PROVISIONED = 2
51     READY = 3
52     STARTED = 4
53     STOPPED = 5
54     FAILED = 6
55     RELEASED = 7
56
57 ResourceState2str = dict({
58     ResourceState.NEW : "NEW",
59     ResourceState.DISCOVERED : "DISCOVERED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes direved from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             import traceback
117             err = traceback.format_exc()
118             self.error(err)
119             self.debug("SETTING guid %d to state FAILED" % self.guid)
120             self.fail()
121             raise
122     
123     return wrapped
124
125 @clsinit
126 class ResourceManager(Logger):
127     """ Base clase for all ResourceManagers. 
128     
129     A ResourceManger is specific to a resource type (e.g. Node, 
130     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
131     OMF, etc).
132
133     The ResourceManager instances are responsible for interacting with
134     and controlling concrete (physical or virtual) resources in the 
135     experimental backends.
136     
137     """
138     _rtype = "Resource"
139     _attributes = None
140     _traces = None
141     _help = None
142     _backend = None
143
144     @classmethod
145     def _register_attribute(cls, attr):
146         """ Resource subclasses will invoke this method to add a 
147         resource attribute
148
149         """
150         
151         cls._attributes[attr.name] = attr
152
153     @classmethod
154     def _remove_attribute(cls, name):
155         """ Resource subclasses will invoke this method to remove a 
156         resource attribute
157
158         """
159         
160         del cls._attributes[name]
161
162     @classmethod
163     def _register_trace(cls, trace):
164         """ Resource subclasses will invoke this method to add a 
165         resource trace
166
167         """
168         
169         cls._traces[trace.name] = trace
170
171     @classmethod
172     def _remove_trace(cls, name):
173         """ Resource subclasses will invoke this method to remove a 
174         resource trace
175
176         """
177         
178         del cls._traces[name]
179
180     @classmethod
181     def _register_attributes(cls):
182         """ Resource subclasses will invoke this method to register
183         resource attributes.
184
185         This method should be overriden in the RMs that define
186         attributes.
187
188         """
189         critical = Attribute("critical", 
190                 "Defines whether the resource is critical. "
191                 "A failure on a critical resource will interrupt "
192                 "the experiment. ",
193                 type = Types.Bool,
194                 default = True,
195                 flags = Flags.Design)
196         hard_release = Attribute("hardRelease", 
197                 "Forces removal of all result files and directories associated "
198                 "to the RM upon resource release. After release the RM will "
199                 "be removed from the EC and the results will not longer be "
200                 "accessible",
201                 type = Types.Bool,
202                 default = False,
203                 flags = Flags.Design)
204
205         cls._register_attribute(critical)
206         cls._register_attribute(hard_release)
207         
208     @classmethod
209     def _register_traces(cls):
210         """ Resource subclasses will invoke this method to register
211         resource traces
212
213         This method should be overriden in the RMs that define traces.
214         
215         """
216         
217         pass
218
219     @classmethod
220     def _clsinit(cls):
221         """ ResourceManager classes have different attributes and traces.
222         Attribute and traces are stored in 'class attribute' dictionaries.
223         When a new ResourceManager class is created, the _clsinit method is 
224         called to create a new instance of those dictionaries and initialize 
225         them.
226         
227         The _clsinit method is called by the clsinit decorator method.
228         
229         """
230         
231         # static template for resource attributes
232         cls._attributes = dict()
233         cls._register_attributes()
234
235         # static template for resource traces
236         cls._traces = dict()
237         cls._register_traces()
238
239     @classmethod
240     def _clsinit_copy(cls):
241         """ Same as _clsinit, except that after creating new instances of the
242         dictionaries it copies all the attributes and traces from the parent 
243         class.
244         
245         The _clsinit_copy method is called by the clsinit_copy decorator method.
246         
247         """
248         # static template for resource attributes
249         cls._attributes = copy.deepcopy(cls._attributes)
250         cls._register_attributes()
251
252         # static template for resource traces
253         cls._traces = copy.deepcopy(cls._traces)
254         cls._register_traces()
255
256     @classmethod
257     def get_rtype(cls):
258         """ Returns the type of the Resource Manager
259
260         """
261         return cls._rtype
262
263     @classmethod
264     def get_attributes(cls):
265         """ Returns a copy of the attributes
266
267         """
268         return copy.deepcopy(cls._attributes.values())
269
270     @classmethod
271     def get_attribute(cls, name):
272         """ Returns a copy of the attribute with name 'name'
273
274         """
275         return copy.deepcopy(cls._attributes[name])
276
277
278     @classmethod
279     def get_traces(cls):
280         """ Returns a copy of the traces
281
282         """
283         return copy.deepcopy(cls._traces.values())
284
285     @classmethod
286     def get_help(cls):
287         """ Returns the description of the type of Resource
288
289         """
290         return cls._help
291
292     @classmethod
293     def get_backend(cls):
294         """ Returns the identified of the backend (i.e. testbed, environment)
295         for the Resource
296
297         """
298         return cls._backend
299
300     @classmethod
301     def get_global(cls, name):
302         """ Returns the value of a global attribute
303             Global attribute meaning an attribute for 
304             all the resources from a rtype
305
306         :param name: Name of the attribute
307         :type name: str
308         :rtype: str
309         """
310         global_attr = cls._attributes[name]
311         return global_attr.value
312
313     @classmethod
314     def set_global(cls, name, value):
315         """ Set value for a global attribute
316
317         :param name: Name of the attribute
318         :type name: str
319         :param name: Value of the attribute
320         :type name: str
321         """
322         global_attr = cls._attributes[name]
323         global_attr.value = value
324         return value
325
326     def __init__(self, ec, guid):
327         super(ResourceManager, self).__init__(self.get_rtype())
328         
329         self._guid = guid
330         self._ec = weakref.ref(ec)
331         self._connections = set()
332         self._conditions = dict() 
333
334         # the resource instance gets a copy of all attributes
335         self._attrs = copy.deepcopy(self._attributes)
336
337         # the resource instance gets a copy of all traces
338         self._trcs = copy.deepcopy(self._traces)
339
340         # Each resource is placed on a deployment group by the EC
341         # during deployment
342         self.deployment_group = None
343
344         self._start_time = None
345         self._stop_time = None
346         self._discover_time = None
347         self._provision_time = None
348         self._ready_time = None
349         self._release_time = None
350         self._failed_time = None
351
352         self._state = ResourceState.NEW
353
354         # instance lock to synchronize exclusive state change methods (such
355         # as deploy and release methods), in order to prevent them from being 
356         # executed at the same time
357         self._release_lock = threading.Lock()
358
359     @property
360     def guid(self):
361         """ Returns the global unique identifier of the RM """
362         return self._guid
363
364     @property
365     def ec(self):
366         """ Returns the Experiment Controller of the RM """
367         return self._ec()
368
369     @property
370     def connections(self):
371         """ Returns the set of guids of connected RMs """
372         return self._connections
373
374     @property
375     def conditions(self):
376         """ Returns the conditions to which the RM is subjected to.
377         
378         This method returns a dictionary of conditions lists indexed by
379         a ResourceAction.
380         
381         """
382         return self._conditions
383
384     @property
385     def start_time(self):
386         """ Returns the start time of the RM as a timestamp """
387         return self._start_time
388
389     @property
390     def stop_time(self):
391         """ Returns the stop time of the RM as a timestamp """
392         return self._stop_time
393
394     @property
395     def discover_time(self):
396         """ Returns the discover time of the RM as a timestamp """
397         return self._discover_time
398
399     @property
400     def provision_time(self):
401         """ Returns the provision time of the RM as a timestamp """
402         return self._provision_time
403
404     @property
405     def ready_time(self):
406         """ Returns the deployment time of the RM as a timestamp """
407         return self._ready_time
408
409     @property
410     def release_time(self):
411         """ Returns the release time of the RM as a timestamp """
412         return self._release_time
413
414     @property
415     def failed_time(self):
416         """ Returns the time failure occured for the RM as a timestamp """
417         return self._failed_time
418
419     @property
420     def state(self):
421         """ Get the current state of the RM """
422         return self._state
423
424     def log_message(self, msg):
425         """ Returns the log message formatted with added information.
426
427         :param msg: text message
428         :type msg: str
429         :rtype: str
430
431         """
432         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
433
434     def register_connection(self, guid):
435         """ Registers a connection to the RM identified by guid
436
437         This method should not be overriden. Specific functionality
438         should be added in the do_connect method.
439
440         :param guid: Global unique identified of the RM to connect to
441         :type guid: int
442
443         """
444         if self.valid_connection(guid):
445             self.do_connect(guid)
446             self._connections.add(guid)
447
448     def unregister_connection(self, guid):
449         """ Removes a registered connection to the RM identified by guid
450         
451         This method should not be overriden. Specific functionality
452         should be added in the do_disconnect method.
453
454         :param guid: Global unique identified of the RM to connect to
455         :type guid: int
456
457         """
458         if guid in self._connections:
459             self.do_disconnect(guid)
460             self._connections.remove(guid)
461
462     @failtrap
463     def discover(self):
464         """ Performs resource discovery.
465         
466         This  method is responsible for selecting an individual resource
467         matching user requirements.
468
469         This method should not be overriden directly. Specific functionality
470         should be added in the do_discover method.
471
472         """
473         with self._release_lock:
474             if self._state != ResourceState.RELEASED:
475                 self.do_discover()
476
477     @failtrap
478     def provision(self):
479         """ Performs resource provisioning.
480
481         This  method is responsible for provisioning one resource.
482         After this method has been successfully invoked, the resource
483         should be accessible/controllable by the RM.
484
485         This method should not be overriden directly. Specific functionality
486         should be added in the do_provision method.
487
488         """
489         with self._release_lock:
490             if self._state != ResourceState.RELEASED:
491                 self.do_provision()
492
493     @failtrap
494     def start(self):
495         """ Starts the RM (e.g. launch remote process).
496     
497         There is no standard start behavior. Some RMs will not need to perform
498         any actions upon start.
499
500         This method should not be overriden directly. Specific functionality
501         should be added in the do_start method.
502
503         """
504
505         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
506             self.error("Wrong state %s for start" % self.state)
507             return
508
509         with self._release_lock:
510             if self._state != ResourceState.RELEASED:
511                 self.do_start()
512
513     @failtrap
514     def stop(self):
515         """ Interrupts the RM, stopping any tasks the RM was performing.
516      
517         There is no standard stop behavior. Some RMs will not need to perform
518         any actions upon stop.
519     
520         This method should not be overriden directly. Specific functionality
521         should be added in the do_stop method.
522       
523         """
524         if not self.state in [ResourceState.STARTED]:
525             self.error("Wrong state %s for stop" % self.state)
526             return
527         
528         with self._release_lock:
529             self.do_stop()
530
531     @failtrap
532     def deploy(self):
533         """ Execute all steps required for the RM to reach the state READY.
534
535         This method is responsible for deploying the resource (and invoking 
536         the discover and provision methods).
537  
538         This method should not be overriden directly. Specific functionality
539         should be added in the do_deploy method.
540        
541         """
542         if self.state > ResourceState.READY:
543             self.error("Wrong state %s for deploy" % self.state)
544             return
545
546         with self._release_lock:
547             if self._state != ResourceState.RELEASED:
548                 self.do_deploy()
549
550     def release(self):
551         """ Perform actions to free resources used by the RM.
552   
553         This  method is responsible for releasing resources that were
554         used during the experiment by the RM.
555
556         This method should not be overriden directly. Specific functionality
557         should be added in the do_release method.
558       
559         """
560         with self._release_lock:
561             try:
562                 self.do_release()
563             except:
564                 import traceback
565                 err = traceback.format_exc()
566                 self.error(err)
567
568                 self.set_released()
569
570     def fail(self):
571         """ Sets the RM to state FAILED.
572
573         This method should not be overriden directly. Specific functionality
574         should be added in the do_fail method.
575
576         """
577         with self._release_lock:
578             if self._state != ResourceState.RELEASED:
579                 self.do_fail()
580
581     def set(self, name, value):
582         """ Set the value of the attribute
583
584         :param name: Name of the attribute
585         :type name: str
586         :param name: Value of the attribute
587         :type name: str
588         """
589         attr = self._attrs[name]
590         attr.value = value
591         return value
592
593     def get(self, name):
594         """ Returns the value of the attribute
595
596         :param name: Name of the attribute
597         :type name: str
598         :rtype: str
599         """
600         attr = self._attrs[name]
601         if attr.has_flag(Flags.Global):
602             self.warning( "Attribute %s is global. Use get_global instead." % name)
603             
604         return attr.value
605
606     def has_changed(self, name):
607         """ Returns the True is the value of the attribute
608             has been modified by the user.
609
610         :param name: Name of the attribute
611         :type name: str
612         :rtype: str
613         """
614         attr = self._attrs[name]
615         return attr.has_changed()
616
617     def has_flag(self, name, flag):
618         """ Returns true if the attribute has the flag 'flag'
619
620         :param flag: Flag to be checked
621         :type flag: Flags
622         """
623         attr = self._attrs[name]
624         return attr.has_flag(flag)
625
626     def has_attribute(self, name):
627         """ Returns true if the RM has an attribute with name
628
629         :param name: name of the attribute
630         :type name: string
631         """
632         return name in self._attrs
633
634     def enable_trace(self, name):
635         """ Explicitly enable trace generation
636
637         :param name: Name of the trace
638         :type name: str
639         """
640         trace = self._trcs[name]
641         trace.enabled = True
642     
643     def trace_enabled(self, name):
644         """Returns True if trace is enables 
645
646         :param name: Name of the trace
647         :type name: str
648         """
649         trace = self._trcs[name]
650         return trace.enabled
651  
652     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
653         """ Get information on collected trace
654
655         :param name: Name of the trace
656         :type name: str
657
658         :param attr: Can be one of:
659                          - TraceAttr.ALL (complete trace content), 
660                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
661                          - TraceAttr.PATH (full path to the trace file),
662                          - TraceAttr.SIZE (size of trace file). 
663         :type attr: str
664
665         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
666         :type name: int
667
668         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
669         :type name: int
670
671         :rtype: str
672         """
673         pass
674
675     def register_condition(self, action, group, state, time = None):
676         """ Registers a condition on the resource manager to allow execution 
677         of 'action' only after 'time' has elapsed from the moment all resources 
678         in 'group' reached state 'state'
679
680         :param action: Action to restrict to condition (either 'START' or 'STOP')
681         :type action: str
682         :param group: Group of RMs to wait for (list of guids)
683         :type group: int or list of int
684         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
685         :type state: str
686         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
687         :type time: str
688
689         """
690
691         if not action in self.conditions:
692             self._conditions[action] = list()
693         
694         conditions = self.conditions.get(action)
695
696         # For each condition to register a tuple of (group, state, time) is 
697         # added to the 'action' list
698         if not isinstance(group, list):
699             group = [group]
700
701         conditions.append((group, state, time))
702
703     def unregister_condition(self, group, action = None):
704         """ Removed conditions for a certain group of guids
705
706         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
707         :type action: str
708
709         :param group: Group of RMs to wait for (list of guids)
710         :type group: int or list of int
711
712         """
713         # For each condition a tuple of (group, state, time) is 
714         # added to the 'action' list
715         if not isinstance(group, list):
716             group = [group]
717
718         for act, conditions in self.conditions.iteritems():
719             if action and act != action:
720                 continue
721
722             for condition in list(conditions):
723                 (grp, state, time) = condition
724
725                 # If there is an intersection between grp and group,
726                 # then remove intersected elements
727                 intsec = set(group).intersection(set(grp))
728                 if intsec:
729                     idx = conditions.index(condition)
730                     newgrp = set(grp)
731                     newgrp.difference_update(intsec)
732                     conditions[idx] = (newgrp, state, time)
733                  
734     def get_connected(self, rtype = None):
735         """ Returns the list of RM with the type 'rtype'
736
737         :param rtype: Type of the RM we look for
738         :type rtype: str
739         :return: list of guid
740         """
741         connected = []
742         rclass = ResourceFactory.get_resource_type(rtype)
743         for guid in self.connections:
744
745             rm = self.ec.get_resource(guid)
746             if not rtype or isinstance(rm, rclass):
747                 connected.append(rm)
748         return connected
749
750     @failtrap
751     def _needs_reschedule(self, group, state, time):
752         """ Internal method that verify if 'time' has elapsed since 
753         all elements in 'group' have reached state 'state'.
754
755         :param group: Group of RMs to wait for (list of guids)
756         :type group: int or list of int
757         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
758         :type state: str
759         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
760         :type time: str
761
762         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
763         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
764         For the moment, 2m30s is not a correct syntax.
765
766         """
767         reschedule = False
768         delay = reschedule_delay 
769
770         # check state and time elapsed on all RMs
771         for guid in group:
772             rm = self.ec.get_resource(guid)
773             
774             # If one of the RMs this resource needs to wait for has FAILED
775             # and is critical we raise an exception
776             if rm.state == ResourceState.FAILED:
777                 if not rm.get('critical'):
778                     continue
779                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
780                 raise RuntimeError, msg
781
782             # If the RM state is lower than the requested state we must
783             # reschedule (e.g. if RM is READY but we required STARTED).
784             if rm.state < state:
785                 reschedule = True
786                 break
787
788             # If there is a time restriction, we must verify the
789             # restriction is satisfied 
790             if time:
791                 if state == ResourceState.DISCOVERED:
792                     t = rm.discover_time
793                 if state == ResourceState.PROVISIONED:
794                     t = rm.provision_time
795                 elif state == ResourceState.READY:
796                     t = rm.ready_time
797                 elif state == ResourceState.STARTED:
798                     t = rm.start_time
799                 elif state == ResourceState.STOPPED:
800                     t = rm.stop_time
801                 elif state == ResourceState.RELEASED:
802                     t = rm.release_time
803                 else:
804                     break
805
806                 # time already elapsed since RM changed state
807                 waited = "%fs" % tdiffsec(tnow(), t)
808
809                 # time still to wait
810                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
811
812                 if wait > 0.001:
813                     reschedule = True
814                     delay = "%fs" % wait
815                     break
816
817         return reschedule, delay
818
819     def set_with_conditions(self, name, value, group, state, time):
820         """ Set value 'value' on attribute with name 'name' when 'time' 
821         has elapsed since all elements in 'group' have reached state
822         'state'
823
824         :param name: Name of the attribute to set
825         :type name: str
826         :param name: Value of the attribute to set
827         :type name: str
828         :param group: Group of RMs to wait for (list of guids)
829         :type group: int or list of int
830         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
831         :type state: str
832         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
833         :type time: str
834         """
835
836         reschedule = False
837         delay = reschedule_delay 
838
839         ## evaluate if set conditions are met
840
841         # only can set with conditions after the RM is started
842         if self.state != ResourceState.STARTED:
843             reschedule = True
844         else:
845             reschedule, delay = self._needs_reschedule(group, state, time)
846
847         if reschedule:
848             callback = functools.partial(self.set_with_conditions, 
849                     name, value, group, state, time)
850             self.ec.schedule(delay, callback)
851         else:
852             self.set(name, value)
853
854     def start_with_conditions(self):
855         """ Starts RM when all the conditions in self.conditions for
856         action 'START' are satisfied.
857
858         """
859         #import pdb;pdb.set_trace()
860
861         reschedule = False
862         delay = reschedule_delay 
863
864
865         ## evaluate if conditions to start are met
866         if self.ec.abort:
867             return 
868
869         # Can only start when RM is either STOPPED or READY
870         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
871             reschedule = True
872             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
873         else:
874             start_conditions = self.conditions.get(ResourceAction.START, [])
875             
876             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
877             
878             # Verify all start conditions are met
879             for (group, state, time) in start_conditions:
880                 # Uncomment for debug
881                 unmet = []
882                 for guid in group:
883                     rm = self.ec.get_resource(guid)
884                     unmet.append((guid, rm._state))
885                 
886                 self.debug("---- WAITED STATES ---- %s" % unmet )
887
888                 reschedule, delay = self._needs_reschedule(group, state, time)
889                 if reschedule:
890                     break
891
892         if reschedule:
893             self.ec.schedule(delay, self.start_with_conditions)
894         else:
895             self.debug("----- STARTING ---- ")
896             self.start()
897
898     def stop_with_conditions(self):
899         """ Stops RM when all the conditions in self.conditions for
900         action 'STOP' are satisfied.
901
902         """
903         reschedule = False
904         delay = reschedule_delay 
905
906         ## evaluate if conditions to stop are met
907         if self.ec.abort:
908             return 
909
910         # only can stop when RM is STARTED
911         if self.state != ResourceState.STARTED:
912             reschedule = True
913             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
914         else:
915             self.debug(" ---- STOP CONDITIONS ---- %s" % 
916                     self.conditions.get(ResourceAction.STOP))
917
918             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
919             for (group, state, time) in stop_conditions:
920                 reschedule, delay = self._needs_reschedule(group, state, time)
921                 if reschedule:
922                     break
923
924         if reschedule:
925             callback = functools.partial(self.stop_with_conditions)
926             self.ec.schedule(delay, callback)
927         else:
928             self.debug(" ----- STOPPING ---- ") 
929             self.stop()
930
931     def deploy_with_conditions(self):
932         """ Deploy RM when all the conditions in self.conditions for
933         action 'READY' are satisfied.
934
935         """
936         reschedule = False
937         delay = reschedule_delay 
938
939         ## evaluate if conditions to deploy are met
940         if self.ec.abort:
941             return 
942
943         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
944         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
945                 ResourceState.PROVISIONED]:
946             reschedule = True
947             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
948         else:
949             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
950             
951             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
952             
953             # Verify all start conditions are met
954             for (group, state, time) in deploy_conditions:
955                 # Uncomment for debug
956                 #unmet = []
957                 #for guid in group:
958                 #    rm = self.ec.get_resource(guid)
959                 #    unmet.append((guid, rm._state))
960                 
961                 #self.debug("---- WAITED STATES ---- %s" % unmet )
962
963                 reschedule, delay = self._needs_reschedule(group, state, time)
964                 if reschedule:
965                     break
966
967         if reschedule:
968             self.ec.schedule(delay, self.deploy_with_conditions)
969         else:
970             self.debug("----- DEPLOYING ---- ")
971             self.deploy()
972
973     def do_connect(self, guid):
974         """ Performs actions that need to be taken upon associating RMs.
975         This method should be redefined when necessary in child classes.
976         """
977         pass
978
979     def do_disconnect(self, guid):
980         """ Performs actions that need to be taken upon disassociating RMs.
981         This method should be redefined when necessary in child classes.
982         """
983         pass
984
985     def valid_connection(self, guid):
986         """Checks whether a connection with the other RM
987         is valid.
988         This method need to be redefined by each new Resource Manager.
989
990         :param guid: Guid of the current Resource Manager
991         :type guid: int
992         :rtype:  Boolean
993
994         """
995         # TODO: Validate!
996         return True
997
998     def do_discover(self):
999         self.set_discovered()
1000
1001     def do_provision(self):
1002         self.set_provisioned()
1003
1004     def do_start(self):
1005         self.set_started()
1006
1007     def do_stop(self):
1008         self.set_stopped()
1009
1010     def do_deploy(self):
1011         self.set_ready()
1012
1013     def do_release(self):
1014         self.set_released()
1015
1016     def do_fail(self):
1017         self.set_failed()
1018
1019     def set_started(self):
1020         """ Mark ResourceManager as STARTED """
1021         self.set_state(ResourceState.STARTED, "_start_time")
1022         self.debug("----- STARTED ---- ")
1023
1024     def set_stopped(self):
1025         """ Mark ResourceManager as STOPPED """
1026         self.set_state(ResourceState.STOPPED, "_stop_time")
1027         self.debug("----- STOPPED ---- ")
1028
1029     def set_ready(self):
1030         """ Mark ResourceManager as READY """
1031         self.set_state(ResourceState.READY, "_ready_time")
1032         self.debug("----- READY ---- ")
1033
1034     def set_released(self):
1035         """ Mark ResourceManager as REALEASED """
1036         self.set_state(ResourceState.RELEASED, "_release_time")
1037         self.debug("----- RELEASED ---- ")
1038
1039     def set_failed(self):
1040         """ Mark ResourceManager as FAILED """
1041         self.set_state(ResourceState.FAILED, "_failed_time")
1042         self.debug("----- FAILED ---- ")
1043
1044     def set_discovered(self):
1045         """ Mark ResourceManager as DISCOVERED """
1046         self.set_state(ResourceState.DISCOVERED, "_discover_time")
1047         self.debug("----- DISCOVERED ---- ")
1048
1049     def set_provisioned(self):
1050         """ Mark ResourceManager as PROVISIONED """
1051         self.set_state(ResourceState.PROVISIONED, "_provision_time")
1052         self.debug("----- PROVISIONED ---- ")
1053
1054     def set_state(self, state, state_time_attr):
1055         """ Set the state of the RM while keeping a trace of the time """
1056
1057         # Ensure that RM state will not change after released
1058         if self._state == ResourceState.RELEASED:
1059             return 
1060    
1061         setattr(self, state_time_attr, tnow())
1062         self._state = state
1063
1064 class ResourceFactory(object):
1065     _resource_types = dict()
1066
1067     @classmethod
1068     def resource_types(cls):
1069         """Return the type of the Class"""
1070         return cls._resource_types
1071
1072     @classmethod
1073     def get_resource_type(cls, rtype):
1074         """Return the type of the Class"""
1075         return cls._resource_types.get(rtype)
1076
1077     @classmethod
1078     def register_type(cls, rclass):
1079         """Register a new Ressource Manager"""
1080         cls._resource_types[rclass.get_rtype()] = rclass
1081
1082     @classmethod
1083     def create(cls, rtype, ec, guid):
1084         """Create a new instance of a Ressource Manager"""
1085         rclass = cls._resource_types[rtype]
1086         return rclass(ec, guid)
1087
1088 def populate_factory():
1089     """Register all the possible RM that exists in the current version of Nepi.
1090     """
1091     # Once the factory is populated, don't repopulate
1092     if not ResourceFactory.resource_types():
1093         for rclass in find_types():
1094             ResourceFactory.register_type(rclass)
1095
1096 def find_types():
1097     """Look into the different folders to find all the 
1098     availables Resources Managers
1099     """
1100     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1101     search_path = set(search_path.split(" "))
1102    
1103     import inspect
1104     import nepi.resources 
1105     path = os.path.dirname(nepi.resources.__file__)
1106     search_path.add(path)
1107
1108     types = set()
1109
1110     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1111             prefix = "nepi.resources."):
1112
1113         loader = importer.find_module(modname)
1114         
1115         try:
1116             # Notice: Repeated calls to load_module will act as a reload of the module
1117             if modname in sys.modules:
1118                 module = sys.modules.get(modname)
1119             else:
1120                 module = loader.load_module(modname)
1121
1122             for attrname in dir(module):
1123                 if attrname.startswith("_"):
1124                     continue
1125
1126                 attr = getattr(module, attrname)
1127
1128                 if attr == ResourceManager:
1129                     continue
1130
1131                 if not inspect.isclass(attr):
1132                     continue
1133
1134                 if issubclass(attr, ResourceManager):
1135                     types.add(attr)
1136
1137                     if not modname in sys.modules:
1138                         sys.modules[modname] = module
1139
1140         except:
1141             import traceback
1142             import logging
1143             err = traceback.format_exc()
1144             logger = logging.getLogger("Resource.find_types()")
1145             logger.error("Error while loading Resource Managers %s" % err)
1146
1147     return types
1148