Changing reschedule_delay internals
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 class ResourceAction:
35     """ Action that a user can order to a Resource Manager
36    
37     """
38     DEPLOY = 0
39     START = 1
40     STOP = 2
41
42 class ResourceState:
43     """ State of a Resource Manager
44    
45     """
46     NEW = 0
47     DISCOVERED = 1
48     PROVISIONED = 2
49     READY = 3
50     STARTED = 4
51     STOPPED = 5
52     FAILED = 6
53     RELEASED = 7
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.PROVISIONED : "PROVISIONED",
59     ResourceState.READY : "READY",
60     ResourceState.STARTED : "STARTED",
61     ResourceState.STOPPED : "STOPPED",
62     ResourceState.FAILED : "FAILED",
63     ResourceState.RELEASED : "RELEASED",
64     })
65
66 def clsinit(cls):
67     """ Initializes template information (i.e. attributes and traces)
68     on classes derived from the ResourceManager class.
69
70     It is used as a decorator in the class declaration as follows:
71
72         @clsinit
73         class MyResourceManager(ResourceManager):
74         
75             ...
76
77      """
78
79     cls._clsinit()
80     return cls
81
82 def clsinit_copy(cls):
83     """ Initializes template information (i.e. attributes and traces)
84     on classes direved from the ResourceManager class.
85     It differs from the clsinit method in that it forces inheritance
86     of attributes and traces from the parent class.
87
88     It is used as a decorator in the class declaration as follows:
89
90         @clsinit
91         class MyResourceManager(ResourceManager):
92         
93             ...
94
95
96     clsinit_copy should be prefered to clsinit when creating new
97     ResourceManager child classes.
98
99     """
100     
101     cls._clsinit_copy()
102     return cls
103
104 def failtrap(func):
105     """ Decorator function for instance methods that should set the 
106     RM state to FAILED when an error is raised. The methods that must be
107     decorated are: discover, provision, deploy, start, stop.
108
109     """
110     def wrapped(self, *args, **kwargs):
111         try:
112             return func(self, *args, **kwargs)
113         except:
114             self.fail()
115             
116             import traceback
117             err = traceback.format_exc()
118             logger = Logger(self._rtype)
119             logger.error(err)
120             logger.error("SETTING guid %d to state FAILED" % self.guid)
121             raise
122     
123     return wrapped
124
125 @clsinit
126 class ResourceManager(Logger):
127     """ Base clase for all ResourceManagers. 
128     
129     A ResourceManger is specific to a resource type (e.g. Node, 
130     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
131     OMF, etc).
132
133     The ResourceManager instances are responsible for interacting with
134     and controlling concrete (physical or virtual) resources in the 
135     experimental backends.
136     
137     """
138     _rtype = "Resource"
139     _attributes = None
140     _traces = None
141     _help = None
142     _backend = None
143     _reschedule_delay = "0.5s"
144
145     @classmethod
146     def _register_attribute(cls, attr):
147         """ Resource subclasses will invoke this method to add a 
148         resource attribute
149
150         """
151         
152         cls._attributes[attr.name] = attr
153
154     @classmethod
155     def _remove_attribute(cls, name):
156         """ Resource subclasses will invoke this method to remove a 
157         resource attribute
158
159         """
160         
161         del cls._attributes[name]
162
163     @classmethod
164     def _register_trace(cls, trace):
165         """ Resource subclasses will invoke this method to add a 
166         resource trace
167
168         """
169         
170         cls._traces[trace.name] = trace
171
172     @classmethod
173     def _remove_trace(cls, name):
174         """ Resource subclasses will invoke this method to remove a 
175         resource trace
176
177         """
178         
179         del cls._traces[name]
180
181     @classmethod
182     def _register_attributes(cls):
183         """ Resource subclasses will invoke this method to register
184         resource attributes.
185
186         This method should be overriden in the RMs that define
187         attributes.
188
189         """
190         critical = Attribute("critical", 
191                 "Defines whether the resource is critical. "
192                 "A failure on a critical resource will interrupt "
193                 "the experiment. ",
194                 type = Types.Bool,
195                 default = True,
196                 flags = Flags.Design)
197         hard_release = Attribute("hardRelease", 
198                 "Forces removal of all result files and directories associated "
199                 "to the RM upon resource release. After release the RM will "
200                 "be removed from the EC and the results will not longer be "
201                 "accessible",
202                 type = Types.Bool,
203                 default = False,
204                 flags = Flags.Design)
205
206         cls._register_attribute(critical)
207         cls._register_attribute(hard_release)
208         
209     @classmethod
210     def _register_traces(cls):
211         """ Resource subclasses will invoke this method to register
212         resource traces
213
214         This method should be overriden in the RMs that define traces.
215         
216         """
217         
218         pass
219
220     @classmethod
221     def _clsinit(cls):
222         """ ResourceManager classes have different attributes and traces.
223         Attribute and traces are stored in 'class attribute' dictionaries.
224         When a new ResourceManager class is created, the _clsinit method is 
225         called to create a new instance of those dictionaries and initialize 
226         them.
227         
228         The _clsinit method is called by the clsinit decorator method.
229         
230         """
231         
232         # static template for resource attributes
233         cls._attributes = dict()
234         cls._register_attributes()
235
236         # static template for resource traces
237         cls._traces = dict()
238         cls._register_traces()
239
240     @classmethod
241     def _clsinit_copy(cls):
242         """ Same as _clsinit, except that after creating new instances of the
243         dictionaries it copies all the attributes and traces from the parent 
244         class.
245         
246         The _clsinit_copy method is called by the clsinit_copy decorator method.
247         
248         """
249         # static template for resource attributes
250         cls._attributes = copy.deepcopy(cls._attributes)
251         cls._register_attributes()
252
253         # static template for resource traces
254         cls._traces = copy.deepcopy(cls._traces)
255         cls._register_traces()
256
257     @classmethod
258     def get_rtype(cls):
259         """ Returns the type of the Resource Manager
260
261         """
262         return cls._rtype
263
264     @classmethod
265     def get_attributes(cls):
266         """ Returns a copy of the attributes
267
268         """
269         return copy.deepcopy(cls._attributes.values())
270
271     @classmethod
272     def get_attribute(cls, name):
273         """ Returns a copy of the attribute with name 'name'
274
275         """
276         return copy.deepcopy(cls._attributes[name])
277
278     @classmethod
279     def get_traces(cls):
280         """ Returns a copy of the traces
281
282         """
283         return copy.deepcopy(cls._traces.values())
284
285     @classmethod
286     def get_help(cls):
287         """ Returns the description of the type of Resource
288
289         """
290         return cls._help
291
292     @classmethod
293     def get_backend(cls):
294         """ Returns the identified of the backend (i.e. testbed, environment)
295         for the Resource
296
297         """
298         return cls._backend
299
300     @classmethod
301     def get_global(cls, name):
302         """ Returns the value of a global attribute
303             Global attribute meaning an attribute for 
304             all the resources from a rtype
305
306         :param name: Name of the attribute
307         :type name: str
308         :rtype: str
309         """
310         global_attr = cls._attributes[name]
311         return global_attr.value
312
313     @classmethod
314     def set_global(cls, name, value):
315         """ Set value for a global attribute
316
317         :param name: Name of the attribute
318         :type name: str
319         :param name: Value of the attribute
320         :type name: str
321         """
322         global_attr = cls._attributes[name]
323         global_attr.value = value
324         return value
325
326     def __init__(self, ec, guid):
327         super(ResourceManager, self).__init__(self.get_rtype())
328         
329         self._guid = guid
330         self._ec = weakref.ref(ec)
331         self._connections = set()
332         self._conditions = dict() 
333
334         # the resource instance gets a copy of all attributes
335         self._attrs = copy.deepcopy(self._attributes)
336
337         # the resource instance gets a copy of all traces
338         self._trcs = copy.deepcopy(self._traces)
339
340         # Each resource is placed on a deployment group by the EC
341         # during deployment
342         self.deployment_group = None
343
344         self._start_time = None
345         self._stop_time = None
346         self._discover_time = None
347         self._provision_time = None
348         self._ready_time = None
349         self._release_time = None
350         self._failed_time = None
351
352         self._state = ResourceState.NEW
353
354         # instance lock to synchronize exclusive state change methods (such
355         # as deploy and release methods), in order to prevent them from being 
356         # executed at the same time and corrupt internal resource state
357         self._release_lock = threading.Lock()
358
359     @property
360     def guid(self):
361         """ Returns the global unique identifier of the RM """
362         return self._guid
363
364     @property
365     def ec(self):
366         """ Returns the Experiment Controller of the RM """
367         return self._ec()
368
369     @property
370     def connections(self):
371         """ Returns the set of guids of connected RMs """
372         return self._connections
373
374     @property
375     def conditions(self):
376         """ Returns the conditions to which the RM is subjected to.
377         
378         This method returns a dictionary of conditions lists indexed by
379         a ResourceAction.
380         
381         """
382         return self._conditions
383
384     @property
385     def start_time(self):
386         """ Returns the start time of the RM as a timestamp """
387         return self._start_time
388
389     @property
390     def stop_time(self):
391         """ Returns the stop time of the RM as a timestamp """
392         return self._stop_time
393
394     @property
395     def discover_time(self):
396         """ Returns the discover time of the RM as a timestamp """
397         return self._discover_time
398
399     @property
400     def provision_time(self):
401         """ Returns the provision time of the RM as a timestamp """
402         return self._provision_time
403
404     @property
405     def ready_time(self):
406         """ Returns the deployment time of the RM as a timestamp """
407         return self._ready_time
408
409     @property
410     def release_time(self):
411         """ Returns the release time of the RM as a timestamp """
412         return self._release_time
413
414     @property
415     def failed_time(self):
416         """ Returns the time failure occured for the RM as a timestamp """
417         return self._failed_time
418
419     @property
420     def state(self):
421         """ Get the current state of the RM """
422         return self._state
423
424     @property
425     def reschedule_delay(self):
426         """ Returns default reschedule delay """
427         return self._reschedule_delay
428
429     def log_message(self, msg):
430         """ Returns the log message formatted with added information.
431
432         :param msg: text message
433         :type msg: str
434         :rtype: str
435
436         """
437         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
438
439     def register_connection(self, guid):
440         """ Registers a connection to the RM identified by guid
441
442         This method should not be overriden. Specific functionality
443         should be added in the do_connect method.
444
445         :param guid: Global unique identified of the RM to connect to
446         :type guid: int
447
448         """
449         if self.valid_connection(guid):
450             self.do_connect(guid)
451             self._connections.add(guid)
452
453     def unregister_connection(self, guid):
454         """ Removes a registered connection to the RM identified by guid
455         
456         This method should not be overriden. Specific functionality
457         should be added in the do_disconnect method.
458
459         :param guid: Global unique identified of the RM to connect to
460         :type guid: int
461
462         """
463         if guid in self._connections:
464             self.do_disconnect(guid)
465             self._connections.remove(guid)
466
467     @failtrap
468     def discover(self):
469         """ Performs resource discovery.
470         
471         This  method is responsible for selecting an individual resource
472         matching user requirements.
473
474         This method should not be overriden directly. Specific functionality
475         should be added in the do_discover method.
476
477         """
478         with self._release_lock:
479             if self._state != ResourceState.RELEASED:
480                 self.do_discover()
481
482     @failtrap
483     def provision(self):
484         """ Performs resource provisioning.
485
486         This  method is responsible for provisioning one resource.
487         After this method has been successfully invoked, the resource
488         should be accessible/controllable by the RM.
489
490         This method should not be overriden directly. Specific functionality
491         should be added in the do_provision method.
492
493         """
494         with self._release_lock:
495             if self._state != ResourceState.RELEASED:
496                 self.do_provision()
497
498     @failtrap
499     def start(self):
500         """ Starts the RM (e.g. launch remote process).
501     
502         There is no standard start behavior. Some RMs will not need to perform
503         any actions upon start.
504
505         This method should not be overriden directly. Specific functionality
506         should be added in the do_start method.
507
508         """
509
510         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
511             self.error("Wrong state %s for start" % self.state)
512             return
513
514         with self._release_lock:
515             if self._state != ResourceState.RELEASED:
516                 self.do_start()
517
518     @failtrap
519     def stop(self):
520         """ Interrupts the RM, stopping any tasks the RM was performing.
521      
522         There is no standard stop behavior. Some RMs will not need to perform
523         any actions upon stop.
524     
525         This method should not be overriden directly. Specific functionality
526         should be added in the do_stop method.
527       
528         """
529         if not self.state in [ResourceState.STARTED]:
530             self.error("Wrong state %s for stop" % self.state)
531             return
532         
533         with self._release_lock:
534             self.do_stop()
535
536     @failtrap
537     def deploy(self):
538         """ Execute all steps required for the RM to reach the state READY.
539
540         This method is responsible for deploying the resource (and invoking 
541         the discover and provision methods).
542  
543         This method should not be overriden directly. Specific functionality
544         should be added in the do_deploy method.
545        
546         """
547         if self.state > ResourceState.READY:
548             self.error("Wrong state %s for deploy" % self.state)
549             return
550
551         with self._release_lock:
552             if self._state != ResourceState.RELEASED:
553                 self.do_deploy()
554
555     def release(self):
556         """ Perform actions to free resources used by the RM.
557   
558         This  method is responsible for releasing resources that were
559         used during the experiment by the RM.
560
561         This method should not be overriden directly. Specific functionality
562         should be added in the do_release method.
563       
564         """
565         with self._release_lock:
566             try:
567                 self.do_release()
568             except:
569                 self.set_released()
570
571                 import traceback
572                 err = traceback.format_exc()
573                 msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
574                         self._rtype, self.guid, err)
575                 logger = Logger(self._rtype)
576                 logger.debug(msg)
577
578     def fail(self):
579         """ Sets the RM to state FAILED.
580
581         This method should not be overriden directly. Specific functionality
582         should be added in the do_fail method.
583
584         """
585         with self._release_lock:
586             if self._state != ResourceState.RELEASED:
587                 self.do_fail()
588
589     def set(self, name, value):
590         """ Set the value of the attribute
591
592         :param name: Name of the attribute
593         :type name: str
594         :param name: Value of the attribute
595         :type name: str
596         """
597         attr = self._attrs[name]
598         attr.value = value
599         return value
600
601     def get(self, name):
602         """ Returns the value of the attribute
603
604         :param name: Name of the attribute
605         :type name: str
606         :rtype: str
607         """
608         attr = self._attrs[name]
609
610         """
611         A.Q. Commenting due to performance impact
612         if attr.has_flag(Flags.Global):
613             self.warning( "Attribute %s is global. Use get_global instead." % name)
614         """
615             
616         return attr.value
617
618     def has_changed(self, name):
619         """ Returns the True is the value of the attribute
620             has been modified by the user.
621
622         :param name: Name of the attribute
623         :type name: str
624         :rtype: str
625         """
626         attr = self._attrs[name]
627         return attr.has_changed
628
629     def has_flag(self, name, flag):
630         """ Returns true if the attribute has the flag 'flag'
631
632         :param flag: Flag to be checked
633         :type flag: Flags
634         """
635         attr = self._attrs[name]
636         return attr.has_flag(flag)
637
638     def has_attribute(self, name):
639         """ Returns true if the RM has an attribute with name
640
641         :param name: name of the attribute
642         :type name: string
643         """
644         return name in self._attrs
645
646     def enable_trace(self, name):
647         """ Explicitly enable trace generation
648
649         :param name: Name of the trace
650         :type name: str
651         """
652         trace = self._trcs[name]
653         trace.enabled = True
654     
655     def trace_enabled(self, name):
656         """Returns True if trace is enables 
657
658         :param name: Name of the trace
659         :type name: str
660         """
661         trace = self._trcs[name]
662         return trace.enabled
663  
664     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
665         """ Get information on collected trace
666
667         :param name: Name of the trace
668         :type name: str
669
670         :param attr: Can be one of:
671                          - TraceAttr.ALL (complete trace content), 
672                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
673                          - TraceAttr.PATH (full path to the trace file),
674                          - TraceAttr.SIZE (size of trace file). 
675         :type attr: str
676
677         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
678         :type name: int
679
680         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
681         :type name: int
682
683         :rtype: str
684         """
685         pass
686
687     def register_condition(self, action, group, state, time = None):
688         """ Registers a condition on the resource manager to allow execution 
689         of 'action' only after 'time' has elapsed from the moment all resources 
690         in 'group' reached state 'state'
691
692         :param action: Action to restrict to condition (either 'START' or 'STOP')
693         :type action: str
694         :param group: Group of RMs to wait for (list of guids)
695         :type group: int or list of int
696         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
697         :type state: str
698         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
699         :type time: str
700
701         """
702
703         if not action in self.conditions:
704             self._conditions[action] = list()
705         
706         conditions = self.conditions.get(action)
707
708         # For each condition to register a tuple of (group, state, time) is 
709         # added to the 'action' list
710         if not isinstance(group, list):
711             group = [group]
712
713         conditions.append((group, state, time))
714
715     def unregister_condition(self, group, action = None):
716         """ Removed conditions for a certain group of guids
717
718         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
719         :type action: str
720
721         :param group: Group of RMs to wait for (list of guids)
722         :type group: int or list of int
723
724         """
725         # For each condition a tuple of (group, state, time) is 
726         # added to the 'action' list
727         if not isinstance(group, list):
728             group = [group]
729
730         for act, conditions in self.conditions.iteritems():
731             if action and act != action:
732                 continue
733
734             for condition in list(conditions):
735                 (grp, state, time) = condition
736
737                 # If there is an intersection between grp and group,
738                 # then remove intersected elements
739                 intsec = set(group).intersection(set(grp))
740                 if intsec:
741                     idx = conditions.index(condition)
742                     newgrp = set(grp)
743                     newgrp.difference_update(intsec)
744                     conditions[idx] = (newgrp, state, time)
745                  
746     def get_connected(self, rtype = None):
747         """ Returns the list of RM with the type 'rtype'
748
749         :param rtype: Type of the RM we look for
750         :type rtype: str
751         :return: list of guid
752         """
753         connected = []
754         rclass = ResourceFactory.get_resource_type(rtype)
755         for guid in self.connections:
756             rm = self.ec.get_resource(guid)
757             if not rtype or isinstance(rm, rclass):
758                 connected.append(rm)
759         return connected
760
761     def is_rm_instance(self, rtype):
762         """ Returns True if the RM is instance of 'rtype'
763
764         :param rtype: Type of the RM we look for
765         :type rtype: str
766         :return: True|False
767         """
768         rclass = ResourceFactory.get_resource_type(rtype)
769         if isinstance(self, rclass):
770             return True
771         return False
772
773     @failtrap
774     def _needs_reschedule(self, group, state, time):
775         """ Internal method that verify if 'time' has elapsed since 
776         all elements in 'group' have reached state 'state'.
777
778         :param group: Group of RMs to wait for (list of guids)
779         :type group: int or list of int
780         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
781         :type state: str
782         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
783         :type time: str
784
785         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
786         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
787         For the moment, 2m30s is not a correct syntax.
788
789         """
790         reschedule = False
791         delay = self.reschedule_delay 
792
793         # check state and time elapsed on all RMs
794         for guid in group:
795             rm = self.ec.get_resource(guid)
796             
797             # If one of the RMs this resource needs to wait for has FAILED
798             # and is critical we raise an exception
799             if rm.state == ResourceState.FAILED:
800                 if not rm.get('critical'):
801                     continue
802                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
803                 raise RuntimeError, msg
804
805             # If the RM state is lower than the requested state we must
806             # reschedule (e.g. if RM is READY but we required STARTED).
807             if rm.state < state:
808                 reschedule = True
809                 break
810
811             # If there is a time restriction, we must verify the
812             # restriction is satisfied 
813             if time:
814                 if state == ResourceState.DISCOVERED:
815                     t = rm.discover_time
816                 if state == ResourceState.PROVISIONED:
817                     t = rm.provision_time
818                 elif state == ResourceState.READY:
819                     t = rm.ready_time
820                 elif state == ResourceState.STARTED:
821                     t = rm.start_time
822                 elif state == ResourceState.STOPPED:
823                     t = rm.stop_time
824                 elif state == ResourceState.RELEASED:
825                     t = rm.release_time
826                 else:
827                     break
828
829                 # time already elapsed since RM changed state
830                 waited = "%fs" % tdiffsec(tnow(), t)
831
832                 # time still to wait
833                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
834
835                 if wait > 0.001:
836                     reschedule = True
837                     delay = "%fs" % wait
838                     break
839
840         return reschedule, delay
841
842     def set_with_conditions(self, name, value, group, state, time):
843         """ Set value 'value' on attribute with name 'name' when 'time' 
844         has elapsed since all elements in 'group' have reached state
845         'state'
846
847         :param name: Name of the attribute to set
848         :type name: str
849         :param name: Value of the attribute to set
850         :type name: str
851         :param group: Group of RMs to wait for (list of guids)
852         :type group: int or list of int
853         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
854         :type state: str
855         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
856         :type time: str
857         """
858
859         reschedule = False
860         delay = self.reschedule_delay 
861
862         ## evaluate if set conditions are met
863
864         # only can set with conditions after the RM is started
865         if self.state != ResourceState.STARTED:
866             reschedule = True
867         else:
868             reschedule, delay = self._needs_reschedule(group, state, time)
869
870         if reschedule:
871             callback = functools.partial(self.set_with_conditions, 
872                     name, value, group, state, time)
873             self.ec.schedule(delay, callback)
874         else:
875             self.set(name, value)
876
877     def start_with_conditions(self):
878         """ Starts RM when all the conditions in self.conditions for
879         action 'START' are satisfied.
880
881         """
882         #import pdb;pdb.set_trace()
883
884         reschedule = False
885         delay = self.reschedule_delay 
886
887
888         ## evaluate if conditions to start are met
889         if self.ec.abort:
890             return 
891
892         # Can only start when RM is either STOPPED or READY
893         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
894             reschedule = True
895             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
896         else:
897             start_conditions = self.conditions.get(ResourceAction.START, [])
898             
899             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
900             
901             # Verify all start conditions are met
902             for (group, state, time) in start_conditions:
903                 # Uncomment for debug
904                 #unmet = []
905                 #for guid in group:
906                 #    rm = self.ec.get_resource(guid)
907                 #    unmet.append((guid, rm._state))
908                 #
909                 #self.debug("---- WAITED STATES ---- %s" % unmet )
910
911                 reschedule, delay = self._needs_reschedule(group, state, time)
912                 if reschedule:
913                     break
914
915         if reschedule:
916             self.ec.schedule(delay, self.start_with_conditions)
917         else:
918             self.debug("----- STARTING ---- ")
919             self.start()
920
921     def stop_with_conditions(self):
922         """ Stops RM when all the conditions in self.conditions for
923         action 'STOP' are satisfied.
924
925         """
926         reschedule = False
927         delay = self.reschedule_delay 
928
929         ## evaluate if conditions to stop are met
930         if self.ec.abort:
931             return 
932
933         # only can stop when RM is STARTED
934         if self.state != ResourceState.STARTED:
935             reschedule = True
936             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
937         else:
938             self.debug(" ---- STOP CONDITIONS ---- %s" % 
939                     self.conditions.get(ResourceAction.STOP))
940
941             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
942             for (group, state, time) in stop_conditions:
943                 reschedule, delay = self._needs_reschedule(group, state, time)
944                 if reschedule:
945                     break
946
947         if reschedule:
948             callback = functools.partial(self.stop_with_conditions)
949             self.ec.schedule(delay, callback)
950         else:
951             self.debug(" ----- STOPPING ---- ") 
952             self.stop()
953
954     def deploy_with_conditions(self):
955         """ Deploy RM when all the conditions in self.conditions for
956         action 'READY' are satisfied.
957
958         """
959         reschedule = False
960         delay = self.reschedule_delay 
961
962         ## evaluate if conditions to deploy are met
963         if self.ec.abort:
964             return 
965
966         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
967         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
968                 ResourceState.PROVISIONED]:
969             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
970             reschedule = True
971             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
972         else:
973             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
974             
975             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
976             
977             # Verify all start conditions are met
978             for (group, state, time) in deploy_conditions:
979                 # Uncomment for debug
980                 #unmet = []
981                 #for guid in group:
982                 #    rm = self.ec.get_resource(guid)
983                 #    unmet.append((guid, rm._state))
984                 
985                 #self.debug("---- WAITED STATES ---- %s" % unmet )
986
987                 reschedule, delay = self._needs_reschedule(group, state, time)
988                 if reschedule:
989                     break
990
991         if reschedule:
992             self.ec.schedule(delay, self.deploy_with_conditions)
993         else:
994             self.debug("----- DEPLOYING ---- ")
995             self.deploy()
996
997     def do_connect(self, guid):
998         """ Performs actions that need to be taken upon associating RMs.
999         This method should be redefined when necessary in child classes.
1000         """
1001         pass
1002
1003     def do_disconnect(self, guid):
1004         """ Performs actions that need to be taken upon disassociating RMs.
1005         This method should be redefined when necessary in child classes.
1006         """
1007         pass
1008
1009     def valid_connection(self, guid):
1010         """Checks whether a connection with the other RM
1011         is valid.
1012         This method need to be redefined by each new Resource Manager.
1013
1014         :param guid: Guid of the current Resource Manager
1015         :type guid: int
1016         :rtype:  Boolean
1017
1018         """
1019         # TODO: Validate!
1020         return True
1021
1022     def do_discover(self):
1023         self.set_discovered()
1024
1025     def do_provision(self):
1026         self.set_provisioned()
1027
1028     def do_start(self):
1029         self.set_started()
1030
1031     def do_stop(self):
1032         self.set_stopped()
1033
1034     def do_deploy(self):
1035         self.set_ready()
1036
1037     def do_release(self):
1038         self.set_released()
1039
1040     def do_fail(self):
1041         self.set_failed()
1042         self.ec.inform_failure(self.guid)
1043
1044     def set_started(self, time = None):
1045         """ Mark ResourceManager as STARTED """
1046         self.set_state(ResourceState.STARTED, "_start_time", time)
1047         self.debug("----- STARTED ---- ")
1048
1049     def set_stopped(self, time = None):
1050         """ Mark ResourceManager as STOPPED """
1051         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1052         self.debug("----- STOPPED ---- ")
1053
1054     def set_ready(self, time = None):
1055         """ Mark ResourceManager as READY """
1056         self.set_state(ResourceState.READY, "_ready_time", time)
1057         self.debug("----- READY ---- ")
1058
1059     def set_released(self, time = None):
1060         """ Mark ResourceManager as REALEASED """
1061         self.set_state(ResourceState.RELEASED, "_release_time", time)
1062
1063         msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
1064         logger = Logger(self._rtype)
1065         logger.debug(msg)
1066
1067     def set_failed(self, time = None):
1068         """ Mark ResourceManager as FAILED """
1069         self.set_state(ResourceState.FAILED, "_failed_time", time)
1070
1071         msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
1072         logger = Logger(self._rtype)
1073         logger.debug(msg)
1074
1075     def set_discovered(self, time = None):
1076         """ Mark ResourceManager as DISCOVERED """
1077         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1078         self.debug("----- DISCOVERED ---- ")
1079
1080     def set_provisioned(self, time = None):
1081         """ Mark ResourceManager as PROVISIONED """
1082         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1083         self.debug("----- PROVISIONED ---- ")
1084
1085     def set_state(self, state, state_time_attr, time = None):
1086         """ Set the state of the RM while keeping a trace of the time """
1087
1088         # Ensure that RM state will not change after released
1089         if self._state == ResourceState.RELEASED:
1090             return 
1091
1092         time = time or tnow()
1093         self.set_state_time(state, state_time_attr, time)
1094   
1095     def set_state_time(self, state, state_time_attr, time):
1096         """ Set the time for the RM state change """
1097         setattr(self, state_time_attr, time)
1098         self._state = state
1099
1100 class ResourceFactory(object):
1101     _resource_types = dict()
1102
1103     @classmethod
1104     def resource_types(cls):
1105         """Return the type of the Class"""
1106         return cls._resource_types
1107
1108     @classmethod
1109     def get_resource_type(cls, rtype):
1110         """Return the type of the Class"""
1111         return cls._resource_types.get(rtype)
1112
1113     @classmethod
1114     def register_type(cls, rclass):
1115         """Register a new Ressource Manager"""
1116         cls._resource_types[rclass.get_rtype()] = rclass
1117
1118     @classmethod
1119     def create(cls, rtype, ec, guid):
1120         """Create a new instance of a Ressource Manager"""
1121         rclass = cls._resource_types[rtype]
1122         return rclass(ec, guid)
1123
1124 def populate_factory():
1125     """Find and rgister all available RMs
1126     """
1127     # Once the factory is populated, don't repopulate
1128     if not ResourceFactory.resource_types():
1129         for rclass in find_types():
1130             ResourceFactory.register_type(rclass)
1131
1132 def find_types():
1133     """Look into the different folders to find all the 
1134     availables Resources Managers
1135     """
1136     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1137     search_path = set(search_path.split(" "))
1138    
1139     import inspect
1140     import nepi.resources 
1141     path = os.path.dirname(nepi.resources.__file__)
1142     search_path.add(path)
1143
1144     types = set()
1145
1146     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1147             prefix = "nepi.resources."):
1148
1149         loader = importer.find_module(modname)
1150         
1151         try:
1152             # Notice: Repeated calls to load_module will act as a reload of the module
1153             if modname in sys.modules:
1154                 module = sys.modules.get(modname)
1155             else:
1156                 module = loader.load_module(modname)
1157
1158             for attrname in dir(module):
1159                 if attrname.startswith("_"):
1160                     continue
1161
1162                 attr = getattr(module, attrname)
1163
1164                 if attr == ResourceManager:
1165                     continue
1166
1167                 if not inspect.isclass(attr):
1168                     continue
1169
1170                 if issubclass(attr, ResourceManager):
1171                     types.add(attr)
1172
1173                     if not modname in sys.modules:
1174                         sys.modules[modname] = module
1175
1176         except:
1177             import traceback
1178             import logging
1179             err = traceback.format_exc()
1180             logger = logging.getLogger("Resource.find_types()")
1181             logger.error("Error while loading Resource Managers %s" % err)
1182
1183     return types
1184