Replacing RM.rtype() for RM.get_type() for consistency
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 reschedule_delay = "1s"
35
36 class ResourceAction:
37     """ Action that a user can order to a Resource Manager
38    
39     """
40     DEPLOY = 0
41     START = 1
42     STOP = 2
43
44 class ResourceState:
45     """ State of a Resource Manager
46    
47     """
48     NEW = 0
49     DISCOVERED = 1
50     PROVISIONED = 2
51     READY = 3
52     STARTED = 4
53     STOPPED = 5
54     FINISHED = 6
55     FAILED = 7
56     RELEASED = 8
57
58 ResourceState2str = dict({
59     ResourceState.NEW : "NEW",
60     ResourceState.DISCOVERED : "DISCOVERED",
61     ResourceState.PROVISIONED : "PROVISIONED",
62     ResourceState.READY : "READY",
63     ResourceState.STARTED : "STARTED",
64     ResourceState.STOPPED : "STOPPED",
65     ResourceState.FINISHED : "FINISHED",
66     ResourceState.FAILED : "FAILED",
67     ResourceState.RELEASED : "RELEASED",
68     })
69
70 def clsinit(cls):
71     """ Initializes template information (i.e. attributes and traces)
72     on classes derived from the ResourceManager class.
73
74     It is used as a decorator in the class declaration as follows:
75
76         @clsinit
77         class MyResourceManager(ResourceManager):
78         
79             ...
80
81      """
82
83     cls._clsinit()
84     return cls
85
86 def clsinit_copy(cls):
87     """ Initializes template information (i.e. attributes and traces)
88     on classes direved from the ResourceManager class.
89     It differs from the clsinit method in that it forces inheritance
90     of attributes and traces from the parent class.
91
92     It is used as a decorator in the class declaration as follows:
93
94         @clsinit
95         class MyResourceManager(ResourceManager):
96         
97             ...
98
99
100     clsinit_copy should be prefered to clsinit when creating new
101     ResourceManager child classes.
102
103     """
104     
105     cls._clsinit_copy()
106     return cls
107
108 def failtrap(func):
109     """ Decorator function for instance methods that should set the 
110     RM state to FAILED when an error is raised. The methods that must be
111     decorated are: discover, provision, deploy, start, stop and finish.
112
113     """
114     def wrapped(self, *args, **kwargs):
115         try:
116             return func(self, *args, **kwargs)
117         except:
118             import traceback
119             err = traceback.format_exc()
120             self.error(err)
121             self.debug("SETTING guid %d to state FAILED" % self.guid)
122             self.fail()
123             raise
124     
125     return wrapped
126
127 @clsinit
128 class ResourceManager(Logger):
129     """ Base clase for all ResourceManagers. 
130     
131     A ResourceManger is specific to a resource type (e.g. Node, 
132     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
133     OMF, etc).
134
135     The ResourceManager instances are responsible for interacting with
136     and controlling concrete (physical or virtual) resources in the 
137     experimental backends.
138     
139     """
140     _rtype = "Resource"
141     _attributes = None
142     _traces = None
143     _help = None
144     _backend = None
145
146     @classmethod
147     def _register_attribute(cls, attr):
148         """ Resource subclasses will invoke this method to add a 
149         resource attribute
150
151         """
152         
153         cls._attributes[attr.name] = attr
154
155     @classmethod
156     def _remove_attribute(cls, name):
157         """ Resource subclasses will invoke this method to remove a 
158         resource attribute
159
160         """
161         
162         del cls._attributes[name]
163
164     @classmethod
165     def _register_trace(cls, trace):
166         """ Resource subclasses will invoke this method to add a 
167         resource trace
168
169         """
170         
171         cls._traces[trace.name] = trace
172
173     @classmethod
174     def _remove_trace(cls, name):
175         """ Resource subclasses will invoke this method to remove a 
176         resource trace
177
178         """
179         
180         del cls._traces[name]
181
182     @classmethod
183     def _register_attributes(cls):
184         """ Resource subclasses will invoke this method to register
185         resource attributes.
186
187         This method should be overriden in the RMs that define
188         attributes.
189
190         """
191         
192         critical = Attribute("critical", 
193                 "Defines whether the resource is critical. "
194                 "A failure on a critical resource will interrupt "
195                 "the experiment. ",
196                 type = Types.Bool,
197                 default = True,
198                 flags = Flags.ExecReadOnly)
199
200         cls._register_attribute(critical)
201         
202     @classmethod
203     def _register_traces(cls):
204         """ Resource subclasses will invoke this method to register
205         resource traces
206
207         This method should be overriden in the RMs that define traces.
208         
209         """
210         
211         pass
212
213     @classmethod
214     def _clsinit(cls):
215         """ ResourceManager classes have different attributes and traces.
216         Attribute and traces are stored in 'class attribute' dictionaries.
217         When a new ResourceManager class is created, the _clsinit method is 
218         called to create a new instance of those dictionaries and initialize 
219         them.
220         
221         The _clsinit method is called by the clsinit decorator method.
222         
223         """
224         
225         # static template for resource attributes
226         cls._attributes = dict()
227         cls._register_attributes()
228
229         # static template for resource traces
230         cls._traces = dict()
231         cls._register_traces()
232
233     @classmethod
234     def _clsinit_copy(cls):
235         """ Same as _clsinit, except that after creating new instances of the
236         dictionaries it copies all the attributes and traces from the parent 
237         class.
238         
239         The _clsinit_copy method is called by the clsinit_copy decorator method.
240         
241         """
242         # static template for resource attributes
243         cls._attributes = copy.deepcopy(cls._attributes)
244         cls._register_attributes()
245
246         # static template for resource traces
247         cls._traces = copy.deepcopy(cls._traces)
248         cls._register_traces()
249
250     @classmethod
251     def get_rtype(cls):
252         """ Returns the type of the Resource Manager
253
254         """
255         return cls._rtype
256
257     @classmethod
258     def get_attributes(cls):
259         """ Returns a copy of the attributes
260
261         """
262         return copy.deepcopy(cls._attributes.values())
263
264     @classmethod
265     def get_traces(cls):
266         """ Returns a copy of the traces
267
268         """
269         return copy.deepcopy(cls._traces.values())
270
271     @classmethod
272     def get_help(cls):
273         """ Returns the description of the type of Resource
274
275         """
276         return cls._help
277
278     @classmethod
279     def get_backend(cls):
280         """ Returns the identified of the backend (i.e. testbed, environment)
281         for the Resource
282
283         """
284         return cls._backend
285
286     def __init__(self, ec, guid):
287         super(ResourceManager, self).__init__(self.get_rtype())
288         
289         self._guid = guid
290         self._ec = weakref.ref(ec)
291         self._connections = set()
292         self._conditions = dict() 
293
294         # the resource instance gets a copy of all attributes
295         self._attrs = copy.deepcopy(self._attributes)
296
297         # the resource instance gets a copy of all traces
298         self._trcs = copy.deepcopy(self._traces)
299
300         # Each resource is placed on a deployment group by the EC
301         # during deployment
302         self.deployment_group = None
303
304         self._start_time = None
305         self._stop_time = None
306         self._discover_time = None
307         self._provision_time = None
308         self._ready_time = None
309         self._release_time = None
310         self._finish_time = None
311         self._failed_time = None
312
313         self._state = ResourceState.NEW
314
315         # instance lock to synchronize exclusive state change methods (such
316         # as deploy and release methods), in order to prevent them from being 
317         # executed at the same time
318         self._release_lock = threading.Lock()
319
320     @property
321     def guid(self):
322         """ Returns the global unique identifier of the RM """
323         return self._guid
324
325     @property
326     def ec(self):
327         """ Returns the Experiment Controller of the RM """
328         return self._ec()
329
330     @property
331     def connections(self):
332         """ Returns the set of guids of connected RMs """
333         return self._connections
334
335     @property
336     def conditions(self):
337         """ Returns the conditions to which the RM is subjected to.
338         
339         This method returns a dictionary of conditions lists indexed by
340         a ResourceAction.
341         
342         """
343         return self._conditions
344
345     @property
346     def start_time(self):
347         """ Returns the start time of the RM as a timestamp """
348         return self._start_time
349
350     @property
351     def stop_time(self):
352         """ Returns the stop time of the RM as a timestamp """
353         return self._stop_time
354
355     @property
356     def discover_time(self):
357         """ Returns the discover time of the RM as a timestamp """
358         return self._discover_time
359
360     @property
361     def provision_time(self):
362         """ Returns the provision time of the RM as a timestamp """
363         return self._provision_time
364
365     @property
366     def ready_time(self):
367         """ Returns the deployment time of the RM as a timestamp """
368         return self._ready_time
369
370     @property
371     def release_time(self):
372         """ Returns the release time of the RM as a timestamp """
373         return self._release_time
374
375     @property
376     def finish_time(self):
377         """ Returns the finalization time of the RM as a timestamp """
378         return self._finish_time
379
380     @property
381     def failed_time(self):
382         """ Returns the time failure occured for the RM as a timestamp """
383         return self._failed_time
384
385     @property
386     def state(self):
387         """ Get the current state of the RM """
388         return self._state
389
390     def log_message(self, msg):
391         """ Returns the log message formatted with added information.
392
393         :param msg: text message
394         :type msg: str
395         :rtype: str
396
397         """
398         return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
399
400     def register_connection(self, guid):
401         """ Registers a connection to the RM identified by guid
402
403         This method should not be overriden. Specific functionality
404         should be added in the do_connect method.
405
406         :param guid: Global unique identified of the RM to connect to
407         :type guid: int
408
409         """
410         if self.valid_connection(guid):
411             self.do_connect(guid)
412             self._connections.add(guid)
413
414     def unregister_connection(self, guid):
415         """ Removes a registered connection to the RM identified by guid
416         
417         This method should not be overriden. Specific functionality
418         should be added in the do_disconnect method.
419
420         :param guid: Global unique identified of the RM to connect to
421         :type guid: int
422
423         """
424         if guid in self._connections:
425             self.do_disconnect(guid)
426             self._connections.remove(guid)
427
428     @failtrap
429     def discover(self):
430         """ Performs resource discovery.
431         
432         This  method is responsible for selecting an individual resource
433         matching user requirements.
434
435         This method should not be overriden directly. Specific functionality
436         should be added in the do_discover method.
437
438         """
439         with self._release_lock:
440             if self._state != ResourceState.RELEASED:
441                 self.do_discover()
442
443     @failtrap
444     def provision(self):
445         """ Performs resource provisioning.
446
447         This  method is responsible for provisioning one resource.
448         After this method has been successfully invoked, the resource
449         should be accessible/controllable by the RM.
450
451         This method should not be overriden directly. Specific functionality
452         should be added in the do_provision method.
453
454         """
455         with self._release_lock:
456             if self._state != ResourceState.RELEASED:
457                 self.do_provision()
458
459     @failtrap
460     def start(self):
461         """ Starts the RM (e.g. launch remote process).
462     
463         There is no standard start behavior. Some RMs will not need to perform
464         any actions upon start.
465
466         This method should not be overriden directly. Specific functionality
467         should be added in the do_start method.
468
469         """
470
471         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
472             self.error("Wrong state %s for start" % self.state)
473             return
474
475         with self._release_lock:
476             if self._state != ResourceState.RELEASED:
477                 self.do_start()
478
479     @failtrap
480     def stop(self):
481         """ Interrupts the RM, stopping any tasks the RM was performing.
482      
483         There is no standard stop behavior. Some RMs will not need to perform
484         any actions upon stop.
485     
486         This method should not be overriden directly. Specific functionality
487         should be added in the do_stop method.
488       
489         """
490         if not self.state in [ResourceState.STARTED]:
491             self.error("Wrong state %s for stop" % self.state)
492             return
493         
494         with self._release_lock:
495             self.do_stop()
496
497     @failtrap
498     def deploy(self):
499         """ Execute all steps required for the RM to reach the state READY.
500
501         This method is responsible for deploying the resource (and invoking 
502         the discover and provision methods).
503  
504         This method should not be overriden directly. Specific functionality
505         should be added in the do_deploy method.
506        
507         """
508         if self.state > ResourceState.READY:
509             self.error("Wrong state %s for deploy" % self.state)
510             return
511
512         with self._release_lock:
513             if self._state != ResourceState.RELEASED:
514                 self.do_deploy()
515                 self.debug("----- READY ---- ")
516
517     def release(self):
518         """ Perform actions to free resources used by the RM.
519   
520         This  method is responsible for releasing resources that were
521         used during the experiment by the RM.
522
523         This method should not be overriden directly. Specific functionality
524         should be added in the do_release method.
525       
526         """
527         with self._release_lock:
528             try:
529                 self.do_release()
530             except:
531                 import traceback
532                 err = traceback.format_exc()
533                 self.error(err)
534
535             self.set_released()
536             self.debug("----- RELEASED ---- ")
537
538     @failtrap
539     def finish(self):
540         """ Sets the RM to state FINISHED. 
541      
542         The FINISHED state is different from STOPPED state in that it 
543         should not be directly invoked by the user.
544         STOPPED indicates that the user interrupted the RM, FINISHED means
545         that the RM concluded normally the actions it was supposed to perform.
546     
547         This method should not be overriden directly. Specific functionality
548         should be added in the do_finish method.
549         
550         """
551         with self._release_lock:
552             if self._state != ResourceState.RELEASED:
553                 self.do_finish()
554
555     def fail(self):
556         """ Sets the RM to state FAILED.
557
558         This method should not be overriden directly. Specific functionality
559         should be added in the do_fail method.
560
561         """
562         with self._release_lock:
563             if self._state != ResourceState.RELEASED:
564                 self.do_fail()
565
566     def set(self, name, value):
567         """ Set the value of the attribute
568
569         :param name: Name of the attribute
570         :type name: str
571         :param name: Value of the attribute
572         :type name: str
573         """
574         attr = self._attrs[name]
575         attr.value = value
576
577     def get(self, name):
578         """ Returns the value of the attribute
579
580         :param name: Name of the attribute
581         :type name: str
582         :rtype: str
583         """
584         attr = self._attrs[name]
585         return attr.value
586
587     def enable_trace(self, name):
588         """ Explicitly enable trace generation
589
590         :param name: Name of the trace
591         :type name: str
592         """
593         trace = self._trcs[name]
594         trace.enabled = True
595     
596     def trace_enabled(self, name):
597         """Returns True if trace is enables 
598
599         :param name: Name of the trace
600         :type name: str
601         """
602         trace = self._trcs[name]
603         return trace.enabled
604  
605     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
606         """ Get information on collected trace
607
608         :param name: Name of the trace
609         :type name: str
610
611         :param attr: Can be one of:
612                          - TraceAttr.ALL (complete trace content), 
613                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
614                          - TraceAttr.PATH (full path to the trace file),
615                          - TraceAttr.SIZE (size of trace file). 
616         :type attr: str
617
618         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
619         :type name: int
620
621         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
622         :type name: int
623
624         :rtype: str
625         """
626         pass
627
628     def register_condition(self, action, group, state, time = None):
629         """ Registers a condition on the resource manager to allow execution 
630         of 'action' only after 'time' has elapsed from the moment all resources 
631         in 'group' reached state 'state'
632
633         :param action: Action to restrict to condition (either 'START' or 'STOP')
634         :type action: str
635         :param group: Group of RMs to wait for (list of guids)
636         :type group: int or list of int
637         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
638         :type state: str
639         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
640         :type time: str
641
642         """
643
644         if not action in self.conditions:
645             self._conditions[action] = list()
646         
647         conditions = self.conditions.get(action)
648
649         # For each condition to register a tuple of (group, state, time) is 
650         # added to the 'action' list
651         if not isinstance(group, list):
652             group = [group]
653
654         conditions.append((group, state, time))
655
656     def unregister_condition(self, group, action = None):
657         """ Removed conditions for a certain group of guids
658
659         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
660         :type action: str
661
662         :param group: Group of RMs to wait for (list of guids)
663         :type group: int or list of int
664
665         """
666         # For each condition a tuple of (group, state, time) is 
667         # added to the 'action' list
668         if not isinstance(group, list):
669             group = [group]
670
671         for act, conditions in self.conditions.iteritems():
672             if action and act != action:
673                 continue
674
675             for condition in list(conditions):
676                 (grp, state, time) = condition
677
678                 # If there is an intersection between grp and group,
679                 # then remove intersected elements
680                 intsec = set(group).intersection(set(grp))
681                 if intsec:
682                     idx = conditions.index(condition)
683                     newgrp = set(grp)
684                     newgrp.difference_update(intsec)
685                     conditions[idx] = (newgrp, state, time)
686                  
687     def get_connected(self, rtype = None):
688         """ Returns the list of RM with the type 'rtype'
689
690         :param rtype: Type of the RM we look for
691         :type rtype: str
692         :return: list of guid
693         """
694         connected = []
695         rclass = ResourceFactory.get_resource_type(rtype)
696         for guid in self.connections:
697             rm = self.ec.get_resource(guid)
698             if not rtype or isinstance(rm, rclass):
699                 connected.append(rm)
700         return connected
701
702     @failtrap
703     def _needs_reschedule(self, group, state, time):
704         """ Internal method that verify if 'time' has elapsed since 
705         all elements in 'group' have reached state 'state'.
706
707         :param group: Group of RMs to wait for (list of guids)
708         :type group: int or list of int
709         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
710         :type state: str
711         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
712         :type time: str
713
714         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
715         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
716         For the moment, 2m30s is not a correct syntax.
717
718         """
719         reschedule = False
720         delay = reschedule_delay 
721
722         # check state and time elapsed on all RMs
723         for guid in group:
724             rm = self.ec.get_resource(guid)
725             
726             # If one of the RMs this resource needs to wait for has FAILED
727             # we raise an exception
728             if rm.state == ResourceState.FAILED:
729                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
730                 raise RuntimeError, msg
731
732             # If the RM state is lower than the requested state we must
733             # reschedule (e.g. if RM is READY but we required STARTED).
734             if rm.state < state:
735                 reschedule = True
736                 break
737
738             # If there is a time restriction, we must verify the
739             # restriction is satisfied 
740             if time:
741                 if state == ResourceState.DISCOVERED:
742                     t = rm.discover_time
743                 if state == ResourceState.PROVISIONED:
744                     t = rm.provision_time
745                 elif state == ResourceState.READY:
746                     t = rm.ready_time
747                 elif state == ResourceState.STARTED:
748                     t = rm.start_time
749                 elif state == ResourceState.STOPPED:
750                     t = rm.stop_time
751                 elif state == ResourceState.FINISHED:
752                     t = rm.finish_time
753                 elif state == ResourceState.RELEASED:
754                     t = rm.release_time
755                 else:
756                     break
757
758                 # time already elapsed since RM changed state
759                 waited = "%fs" % tdiffsec(tnow(), t)
760
761                 # time still to wait
762                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
763
764                 if wait > 0.001:
765                     reschedule = True
766                     delay = "%fs" % wait
767                     break
768
769         return reschedule, delay
770
771     def set_with_conditions(self, name, value, group, state, time):
772         """ Set value 'value' on attribute with name 'name' when 'time' 
773         has elapsed since all elements in 'group' have reached state
774         'state'
775
776         :param name: Name of the attribute to set
777         :type name: str
778         :param name: Value of the attribute to set
779         :type name: str
780         :param group: Group of RMs to wait for (list of guids)
781         :type group: int or list of int
782         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
783         :type state: str
784         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
785         :type time: str
786         """
787
788         reschedule = False
789         delay = reschedule_delay 
790
791         ## evaluate if set conditions are met
792
793         # only can set with conditions after the RM is started
794         if self.state != ResourceState.STARTED:
795             reschedule = True
796         else:
797             reschedule, delay = self._needs_reschedule(group, state, time)
798
799         if reschedule:
800             callback = functools.partial(self.set_with_conditions, 
801                     name, value, group, state, time)
802             self.ec.schedule(delay, callback)
803         else:
804             self.set(name, value)
805
806     def start_with_conditions(self):
807         """ Starts RM when all the conditions in self.conditions for
808         action 'START' are satisfied.
809
810         """
811         #import pdb;pdb.set_trace()
812
813         reschedule = False
814         delay = reschedule_delay 
815
816
817         ## evaluate if conditions to start are met
818         if self.ec.abort:
819             return 
820
821         # Can only start when RM is either STOPPED or READY
822         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
823             reschedule = True
824             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
825         else:
826             start_conditions = self.conditions.get(ResourceAction.START, [])
827             
828             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
829             
830             # Verify all start conditions are met
831             for (group, state, time) in start_conditions:
832                 # Uncomment for debug
833                 unmet = []
834                 for guid in group:
835                     rm = self.ec.get_resource(guid)
836                     unmet.append((guid, rm._state))
837                 
838                 self.debug("---- WAITED STATES ---- %s" % unmet )
839
840                 reschedule, delay = self._needs_reschedule(group, state, time)
841                 if reschedule:
842                     break
843
844         if reschedule:
845             self.ec.schedule(delay, self.start_with_conditions)
846         else:
847             self.debug("----- STARTING ---- ")
848             self.start()
849
850     def stop_with_conditions(self):
851         """ Stops RM when all the conditions in self.conditions for
852         action 'STOP' are satisfied.
853
854         """
855         reschedule = False
856         delay = reschedule_delay 
857
858         ## evaluate if conditions to stop are met
859         if self.ec.abort:
860             return 
861
862         # only can stop when RM is STARTED
863         if self.state != ResourceState.STARTED:
864             reschedule = True
865             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
866         else:
867             self.debug(" ---- STOP CONDITIONS ---- %s" % 
868                     self.conditions.get(ResourceAction.STOP))
869
870             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
871             for (group, state, time) in stop_conditions:
872                 reschedule, delay = self._needs_reschedule(group, state, time)
873                 if reschedule:
874                     break
875
876         if reschedule:
877             callback = functools.partial(self.stop_with_conditions)
878             self.ec.schedule(delay, callback)
879         else:
880             self.debug(" ----- STOPPING ---- ") 
881             self.stop()
882
883     def deploy_with_conditions(self):
884         """ Deploy RM when all the conditions in self.conditions for
885         action 'READY' are satisfied.
886
887         """
888         reschedule = False
889         delay = reschedule_delay 
890
891         ## evaluate if conditions to deploy are met
892         if self.ec.abort:
893             return 
894
895         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
896         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
897                 ResourceState.PROVISIONED]:
898             reschedule = True
899             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
900         else:
901             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
902             
903             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
904             
905             # Verify all start conditions are met
906             for (group, state, time) in deploy_conditions:
907                 # Uncomment for debug
908                 #unmet = []
909                 #for guid in group:
910                 #    rm = self.ec.get_resource(guid)
911                 #    unmet.append((guid, rm._state))
912                 #
913                 #self.debug("---- WAITED STATES ---- %s" % unmet )
914
915                 reschedule, delay = self._needs_reschedule(group, state, time)
916                 if reschedule:
917                     break
918
919         if reschedule:
920             self.ec.schedule(delay, self.deploy_with_conditions)
921         else:
922             self.debug("----- DEPLOYING ---- ")
923             self.deploy()
924
925     def do_connect(self, guid):
926         """ Performs actions that need to be taken upon associating RMs.
927         This method should be redefined when necessary in child classes.
928         """
929         pass
930
931     def do_disconnect(self, guid):
932         """ Performs actions that need to be taken upon disassociating RMs.
933         This method should be redefined when necessary in child classes.
934         """
935         pass
936
937     def valid_connection(self, guid):
938         """Checks whether a connection with the other RM
939         is valid.
940         This method need to be redefined by each new Resource Manager.
941
942         :param guid: Guid of the current Resource Manager
943         :type guid: int
944         :rtype:  Boolean
945
946         """
947         # TODO: Validate!
948         return True
949
950     def do_discover(self):
951         self.set_discovered()
952
953     def do_provision(self):
954         self.set_provisioned()
955
956     def do_start(self):
957         self.set_started()
958
959     def do_stop(self):
960         self.set_stopped()
961
962     def do_deploy(self):
963         self.set_ready()
964
965     def do_release(self):
966         pass
967
968     def do_finish(self):
969         # In case the RM passed from STARTED directly to FINISHED,
970         # we set the stop_time for consistency
971         if self.stop_time == None:
972             self.set_stopped()
973
974         self.set_finished()
975
976     def do_fail(self):
977         self.set_failed()
978
979     def set_started(self):
980         """ Mark ResourceManager as STARTED """
981         self.set_state(ResourceState.STARTED, "_start_time")
982         
983     def set_stopped(self):
984         """ Mark ResourceManager as STOPPED """
985         self.set_state(ResourceState.STOPPED, "_stop_time")
986
987     def set_ready(self):
988         """ Mark ResourceManager as READY """
989         self.set_state(ResourceState.READY, "_ready_time")
990
991     def set_released(self):
992         """ Mark ResourceManager as REALEASED """
993         self.set_state(ResourceState.RELEASED, "_release_time")
994
995     def set_finished(self):
996         """ Mark ResourceManager as FINISHED """
997         self.set_state(ResourceState.FINISHED, "_finish_time")
998
999     def set_failed(self):
1000         """ Mark ResourceManager as FAILED """
1001         self.set_state(ResourceState.FAILED, "_failed_time")
1002
1003     def set_discovered(self):
1004         """ Mark ResourceManager as DISCOVERED """
1005         self.set_state(ResourceState.DISCOVERED, "_discover_time")
1006
1007     def set_provisioned(self):
1008         """ Mark ResourceManager as PROVISIONED """
1009         self.set_state(ResourceState.PROVISIONED, "_provision_time")
1010
1011     def set_state(self, state, state_time_attr):
1012         # Ensure that RM state will not change after released
1013         if self._state == ResourceState.RELEASED:
1014             return 
1015    
1016         setattr(self, state_time_attr, tnow())
1017         self._state = state
1018
1019 class ResourceFactory(object):
1020     _resource_types = dict()
1021
1022     @classmethod
1023     def resource_types(cls):
1024         """Return the type of the Class"""
1025         return cls._resource_types
1026
1027     @classmethod
1028     def get_resource_type(cls, rtype):
1029         """Return the type of the Class"""
1030         return cls._resource_types.get(rtype)
1031
1032     @classmethod
1033     def register_type(cls, rclass):
1034         """Register a new Ressource Manager"""
1035         cls._resource_types[rclass.get_rtype()] = rclass
1036
1037     @classmethod
1038     def create(cls, rtype, ec, guid):
1039         """Create a new instance of a Ressource Manager"""
1040         rclass = cls._resource_types[rtype]
1041         return rclass(ec, guid)
1042
1043 def populate_factory():
1044     """Register all the possible RM that exists in the current version of Nepi.
1045     """
1046     # Once the factory is populated, don't repopulate
1047     if not ResourceFactory.resource_types():
1048         for rclass in find_types():
1049             ResourceFactory.register_type(rclass)
1050
1051 def find_types():
1052     """Look into the different folders to find all the 
1053     availables Resources Managers
1054     """
1055     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1056     search_path = set(search_path.split(" "))
1057    
1058     import inspect
1059     import nepi.resources 
1060     path = os.path.dirname(nepi.resources.__file__)
1061     search_path.add(path)
1062
1063     types = []
1064
1065     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1066             prefix = "nepi.resources."):
1067
1068         loader = importer.find_module(modname)
1069         
1070         try:
1071             # Notice: Repeated calls to load_module will act as a reload of teh module
1072             if modname in sys.modules:
1073                 module = sys.modules.get(modname)
1074             else:
1075                 module = loader.load_module(modname)
1076
1077             for attrname in dir(module):
1078                 if attrname.startswith("_"):
1079                     continue
1080
1081                 attr = getattr(module, attrname)
1082
1083                 if attr == ResourceManager:
1084                     continue
1085
1086                 if not inspect.isclass(attr):
1087                     continue
1088
1089                 if issubclass(attr, ResourceManager):
1090                     types.append(attr)
1091
1092                     if not modname in sys.modules:
1093                         sys.modules[modname] = module
1094
1095         except:
1096             import traceback
1097             import logging
1098             err = traceback.format_exc()
1099             logger = logging.getLogger("Resource.find_types()")
1100             logger.error("Error while loading Resource Managers %s" % err)
1101
1102     return types
1103
1104