c389cfe23f9b516c33100f8301abeacd215db1ca
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 reschedule_delay = "0.5s"
35
36 class ResourceAction:
37     """ Action that a user can order to a Resource Manager
38    
39     """
40     DEPLOY = 0
41     START = 1
42     STOP = 2
43
44 class ResourceState:
45     """ State of a Resource Manager
46    
47     """
48     NEW = 0
49     DISCOVERED = 1
50     PROVISIONED = 2
51     READY = 3
52     STARTED = 4
53     STOPPED = 5
54     FAILED = 6
55     RELEASED = 7
56
57 ResourceState2str = dict({
58     ResourceState.NEW : "NEW",
59     ResourceState.DISCOVERED : "DISCOVERED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes direved from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             import traceback
117             err = traceback.format_exc()
118             self.error(err)
119             self.debug("SETTING guid %d to state FAILED" % self.guid)
120             self.fail()
121             raise
122     
123     return wrapped
124
125 @clsinit
126 class ResourceManager(Logger):
127     """ Base clase for all ResourceManagers. 
128     
129     A ResourceManger is specific to a resource type (e.g. Node, 
130     Switch, Application, etc) on a specific backend (e.g. PlanetLab, 
131     OMF, etc).
132
133     The ResourceManager instances are responsible for interacting with
134     and controlling concrete (physical or virtual) resources in the 
135     experimental backends.
136     
137     """
138     _rtype = "Resource"
139     _attributes = None
140     _traces = None
141     _help = None
142     _backend = None
143
144     @classmethod
145     def _register_attribute(cls, attr):
146         """ Resource subclasses will invoke this method to add a 
147         resource attribute
148
149         """
150         
151         cls._attributes[attr.name] = attr
152
153     @classmethod
154     def _remove_attribute(cls, name):
155         """ Resource subclasses will invoke this method to remove a 
156         resource attribute
157
158         """
159         
160         del cls._attributes[name]
161
162     @classmethod
163     def _register_trace(cls, trace):
164         """ Resource subclasses will invoke this method to add a 
165         resource trace
166
167         """
168         
169         cls._traces[trace.name] = trace
170
171     @classmethod
172     def _remove_trace(cls, name):
173         """ Resource subclasses will invoke this method to remove a 
174         resource trace
175
176         """
177         
178         del cls._traces[name]
179
180     @classmethod
181     def _register_attributes(cls):
182         """ Resource subclasses will invoke this method to register
183         resource attributes.
184
185         This method should be overriden in the RMs that define
186         attributes.
187
188         """
189         critical = Attribute("critical", 
190                 "Defines whether the resource is critical. "
191                 "A failure on a critical resource will interrupt "
192                 "the experiment. ",
193                 type = Types.Bool,
194                 default = True,
195                 flags = Flags.Design)
196         hard_release = Attribute("hardRelease", 
197                 "Forces removal of all result files and directories associated "
198                 "to the RM upon resource release. After release the RM will "
199                 "be removed from the EC and the results will not longer be "
200                 "accessible",
201                 type = Types.Bool,
202                 default = False,
203                 flags = Flags.Design)
204
205         cls._register_attribute(critical)
206         cls._register_attribute(hard_release)
207         
208     @classmethod
209     def _register_traces(cls):
210         """ Resource subclasses will invoke this method to register
211         resource traces
212
213         This method should be overriden in the RMs that define traces.
214         
215         """
216         
217         pass
218
219     @classmethod
220     def _clsinit(cls):
221         """ ResourceManager classes have different attributes and traces.
222         Attribute and traces are stored in 'class attribute' dictionaries.
223         When a new ResourceManager class is created, the _clsinit method is 
224         called to create a new instance of those dictionaries and initialize 
225         them.
226         
227         The _clsinit method is called by the clsinit decorator method.
228         
229         """
230         
231         # static template for resource attributes
232         cls._attributes = dict()
233         cls._register_attributes()
234
235         # static template for resource traces
236         cls._traces = dict()
237         cls._register_traces()
238
239     @classmethod
240     def _clsinit_copy(cls):
241         """ Same as _clsinit, except that after creating new instances of the
242         dictionaries it copies all the attributes and traces from the parent 
243         class.
244         
245         The _clsinit_copy method is called by the clsinit_copy decorator method.
246         
247         """
248         # static template for resource attributes
249         cls._attributes = copy.deepcopy(cls._attributes)
250         cls._register_attributes()
251
252         # static template for resource traces
253         cls._traces = copy.deepcopy(cls._traces)
254         cls._register_traces()
255
256     @classmethod
257     def get_rtype(cls):
258         """ Returns the type of the Resource Manager
259
260         """
261         return cls._rtype
262
263     @classmethod
264     def get_attributes(cls):
265         """ Returns a copy of the attributes
266
267         """
268         return copy.deepcopy(cls._attributes.values())
269
270     @classmethod
271     def get_attribute(cls, name):
272         """ Returns a copy of the attribute with name 'name'
273
274         """
275         return copy.deepcopy(cls._attributes[name])
276
277     @classmethod
278     def get_traces(cls):
279         """ Returns a copy of the traces
280
281         """
282         return copy.deepcopy(cls._traces.values())
283
284     @classmethod
285     def get_help(cls):
286         """ Returns the description of the type of Resource
287
288         """
289         return cls._help
290
291     @classmethod
292     def get_backend(cls):
293         """ Returns the identified of the backend (i.e. testbed, environment)
294         for the Resource
295
296         """
297         return cls._backend
298
299     @classmethod
300     def get_global(cls, name):
301         """ Returns the value of a global attribute
302             Global attribute meaning an attribute for 
303             all the resources from a rtype
304
305         :param name: Name of the attribute
306         :type name: str
307         :rtype: str
308         """
309         global_attr = cls._attributes[name]
310         return global_attr.value
311
312     @classmethod
313     def set_global(cls, name, value):
314         """ Set value for a global attribute
315
316         :param name: Name of the attribute
317         :type name: str
318         :param name: Value of the attribute
319         :type name: str
320         """
321         global_attr = cls._attributes[name]
322         global_attr.value = value
323         return value
324
325     def __init__(self, ec, guid):
326         super(ResourceManager, self).__init__(self.get_rtype())
327         
328         self._guid = guid
329         self._ec = weakref.ref(ec)
330         self._connections = set()
331         self._conditions = dict() 
332
333         # the resource instance gets a copy of all attributes
334         self._attrs = copy.deepcopy(self._attributes)
335
336         # the resource instance gets a copy of all traces
337         self._trcs = copy.deepcopy(self._traces)
338
339         # Each resource is placed on a deployment group by the EC
340         # during deployment
341         self.deployment_group = None
342
343         self._start_time = None
344         self._stop_time = None
345         self._discover_time = None
346         self._provision_time = None
347         self._ready_time = None
348         self._release_time = None
349         self._failed_time = None
350
351         self._state = ResourceState.NEW
352
353         # instance lock to synchronize exclusive state change methods (such
354         # as deploy and release methods), in order to prevent them from being 
355         # executed at the same time and corrupt internal resource state
356         self._release_lock = threading.Lock()
357
358     @property
359     def guid(self):
360         """ Returns the global unique identifier of the RM """
361         return self._guid
362
363     @property
364     def ec(self):
365         """ Returns the Experiment Controller of the RM """
366         return self._ec()
367
368     @property
369     def connections(self):
370         """ Returns the set of guids of connected RMs """
371         return self._connections
372
373     @property
374     def conditions(self):
375         """ Returns the conditions to which the RM is subjected to.
376         
377         This method returns a dictionary of conditions lists indexed by
378         a ResourceAction.
379         
380         """
381         return self._conditions
382
383     @property
384     def start_time(self):
385         """ Returns the start time of the RM as a timestamp """
386         return self._start_time
387
388     @property
389     def stop_time(self):
390         """ Returns the stop time of the RM as a timestamp """
391         return self._stop_time
392
393     @property
394     def discover_time(self):
395         """ Returns the discover time of the RM as a timestamp """
396         return self._discover_time
397
398     @property
399     def provision_time(self):
400         """ Returns the provision time of the RM as a timestamp """
401         return self._provision_time
402
403     @property
404     def ready_time(self):
405         """ Returns the deployment time of the RM as a timestamp """
406         return self._ready_time
407
408     @property
409     def release_time(self):
410         """ Returns the release time of the RM as a timestamp """
411         return self._release_time
412
413     @property
414     def failed_time(self):
415         """ Returns the time failure occured for the RM as a timestamp """
416         return self._failed_time
417
418     @property
419     def state(self):
420         """ Get the current state of the RM """
421         return self._state
422
423     def log_message(self, msg):
424         """ Returns the log message formatted with added information.
425
426         :param msg: text message
427         :type msg: str
428         :rtype: str
429
430         """
431         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
432
433     def register_connection(self, guid):
434         """ Registers a connection to the RM identified by guid
435
436         This method should not be overriden. Specific functionality
437         should be added in the do_connect method.
438
439         :param guid: Global unique identified of the RM to connect to
440         :type guid: int
441
442         """
443         if self.valid_connection(guid):
444             self.do_connect(guid)
445             self._connections.add(guid)
446
447     def unregister_connection(self, guid):
448         """ Removes a registered connection to the RM identified by guid
449         
450         This method should not be overriden. Specific functionality
451         should be added in the do_disconnect method.
452
453         :param guid: Global unique identified of the RM to connect to
454         :type guid: int
455
456         """
457         if guid in self._connections:
458             self.do_disconnect(guid)
459             self._connections.remove(guid)
460
461     @failtrap
462     def discover(self):
463         """ Performs resource discovery.
464         
465         This  method is responsible for selecting an individual resource
466         matching user requirements.
467
468         This method should not be overriden directly. Specific functionality
469         should be added in the do_discover method.
470
471         """
472         with self._release_lock:
473             if self._state != ResourceState.RELEASED:
474                 self.do_discover()
475
476     @failtrap
477     def provision(self):
478         """ Performs resource provisioning.
479
480         This  method is responsible for provisioning one resource.
481         After this method has been successfully invoked, the resource
482         should be accessible/controllable by the RM.
483
484         This method should not be overriden directly. Specific functionality
485         should be added in the do_provision method.
486
487         """
488         with self._release_lock:
489             if self._state != ResourceState.RELEASED:
490                 self.do_provision()
491
492     @failtrap
493     def start(self):
494         """ Starts the RM (e.g. launch remote process).
495     
496         There is no standard start behavior. Some RMs will not need to perform
497         any actions upon start.
498
499         This method should not be overriden directly. Specific functionality
500         should be added in the do_start method.
501
502         """
503
504         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
505             self.error("Wrong state %s for start" % self.state)
506             return
507
508         with self._release_lock:
509             if self._state != ResourceState.RELEASED:
510                 self.do_start()
511
512     @failtrap
513     def stop(self):
514         """ Interrupts the RM, stopping any tasks the RM was performing.
515      
516         There is no standard stop behavior. Some RMs will not need to perform
517         any actions upon stop.
518     
519         This method should not be overriden directly. Specific functionality
520         should be added in the do_stop method.
521       
522         """
523         if not self.state in [ResourceState.STARTED]:
524             self.error("Wrong state %s for stop" % self.state)
525             return
526         
527         with self._release_lock:
528             self.do_stop()
529
530     @failtrap
531     def deploy(self):
532         """ Execute all steps required for the RM to reach the state READY.
533
534         This method is responsible for deploying the resource (and invoking 
535         the discover and provision methods).
536  
537         This method should not be overriden directly. Specific functionality
538         should be added in the do_deploy method.
539        
540         """
541         if self.state > ResourceState.READY:
542             self.error("Wrong state %s for deploy" % self.state)
543             return
544
545         with self._release_lock:
546             if self._state != ResourceState.RELEASED:
547                 self.do_deploy()
548
549     def release(self):
550         """ Perform actions to free resources used by the RM.
551   
552         This  method is responsible for releasing resources that were
553         used during the experiment by the RM.
554
555         This method should not be overriden directly. Specific functionality
556         should be added in the do_release method.
557       
558         """
559         with self._release_lock:
560             try:
561                 self.do_release()
562             except:
563                 import traceback
564                 err = traceback.format_exc()
565                 self.error(err)
566
567                 self.set_released()
568
569     def fail(self):
570         """ Sets the RM to state FAILED.
571
572         This method should not be overriden directly. Specific functionality
573         should be added in the do_fail method.
574
575         """
576         with self._release_lock:
577             if self._state != ResourceState.RELEASED:
578                 self.do_fail()
579
580     def set(self, name, value):
581         """ Set the value of the attribute
582
583         :param name: Name of the attribute
584         :type name: str
585         :param name: Value of the attribute
586         :type name: str
587         """
588         attr = self._attrs[name]
589         attr.value = value
590         return value
591
592     def get(self, name):
593         """ Returns the value of the attribute
594
595         :param name: Name of the attribute
596         :type name: str
597         :rtype: str
598         """
599         attr = self._attrs[name]
600
601         """
602         A.Q. Commenting due to performance impact
603         if attr.has_flag(Flags.Global):
604             self.warning( "Attribute %s is global. Use get_global instead." % name)
605         """
606             
607         return attr.value
608
609     def has_changed(self, name):
610         """ Returns the True is the value of the attribute
611             has been modified by the user.
612
613         :param name: Name of the attribute
614         :type name: str
615         :rtype: str
616         """
617         attr = self._attrs[name]
618         return attr.has_changed
619
620     def has_flag(self, name, flag):
621         """ Returns true if the attribute has the flag 'flag'
622
623         :param flag: Flag to be checked
624         :type flag: Flags
625         """
626         attr = self._attrs[name]
627         return attr.has_flag(flag)
628
629     def has_attribute(self, name):
630         """ Returns true if the RM has an attribute with name
631
632         :param name: name of the attribute
633         :type name: string
634         """
635         return name in self._attrs
636
637     def enable_trace(self, name):
638         """ Explicitly enable trace generation
639
640         :param name: Name of the trace
641         :type name: str
642         """
643         trace = self._trcs[name]
644         trace.enabled = True
645     
646     def trace_enabled(self, name):
647         """Returns True if trace is enables 
648
649         :param name: Name of the trace
650         :type name: str
651         """
652         trace = self._trcs[name]
653         return trace.enabled
654  
655     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
656         """ Get information on collected trace
657
658         :param name: Name of the trace
659         :type name: str
660
661         :param attr: Can be one of:
662                          - TraceAttr.ALL (complete trace content), 
663                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
664                          - TraceAttr.PATH (full path to the trace file),
665                          - TraceAttr.SIZE (size of trace file). 
666         :type attr: str
667
668         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
669         :type name: int
670
671         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
672         :type name: int
673
674         :rtype: str
675         """
676         pass
677
678     def register_condition(self, action, group, state, time = None):
679         """ Registers a condition on the resource manager to allow execution 
680         of 'action' only after 'time' has elapsed from the moment all resources 
681         in 'group' reached state 'state'
682
683         :param action: Action to restrict to condition (either 'START' or 'STOP')
684         :type action: str
685         :param group: Group of RMs to wait for (list of guids)
686         :type group: int or list of int
687         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
688         :type state: str
689         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
690         :type time: str
691
692         """
693
694         if not action in self.conditions:
695             self._conditions[action] = list()
696         
697         conditions = self.conditions.get(action)
698
699         # For each condition to register a tuple of (group, state, time) is 
700         # added to the 'action' list
701         if not isinstance(group, list):
702             group = [group]
703
704         conditions.append((group, state, time))
705
706     def unregister_condition(self, group, action = None):
707         """ Removed conditions for a certain group of guids
708
709         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
710         :type action: str
711
712         :param group: Group of RMs to wait for (list of guids)
713         :type group: int or list of int
714
715         """
716         # For each condition a tuple of (group, state, time) is 
717         # added to the 'action' list
718         if not isinstance(group, list):
719             group = [group]
720
721         for act, conditions in self.conditions.iteritems():
722             if action and act != action:
723                 continue
724
725             for condition in list(conditions):
726                 (grp, state, time) = condition
727
728                 # If there is an intersection between grp and group,
729                 # then remove intersected elements
730                 intsec = set(group).intersection(set(grp))
731                 if intsec:
732                     idx = conditions.index(condition)
733                     newgrp = set(grp)
734                     newgrp.difference_update(intsec)
735                     conditions[idx] = (newgrp, state, time)
736                  
737     def get_connected(self, rtype = None):
738         """ Returns the list of RM with the type 'rtype'
739
740         :param rtype: Type of the RM we look for
741         :type rtype: str
742         :return: list of guid
743         """
744         connected = []
745         rclass = ResourceFactory.get_resource_type(rtype)
746         for guid in self.connections:
747             rm = self.ec.get_resource(guid)
748             if not rtype or isinstance(rm, rclass):
749                 connected.append(rm)
750         return connected
751
752     def is_rm_instance(self, rtype):
753         """ Returns True if the RM is instance of 'rtype'
754
755         :param rtype: Type of the RM we look for
756         :type rtype: str
757         :return: True|False
758         """
759         rclass = ResourceFactory.get_resource_type(rtype)
760         if isinstance(self, rclass):
761             return True
762         return False
763
764     @failtrap
765     def _needs_reschedule(self, group, state, time):
766         """ Internal method that verify if 'time' has elapsed since 
767         all elements in 'group' have reached state 'state'.
768
769         :param group: Group of RMs to wait for (list of guids)
770         :type group: int or list of int
771         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
772         :type state: str
773         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
774         :type time: str
775
776         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
777         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
778         For the moment, 2m30s is not a correct syntax.
779
780         """
781         reschedule = False
782         delay = reschedule_delay 
783
784         # check state and time elapsed on all RMs
785         for guid in group:
786             rm = self.ec.get_resource(guid)
787             
788             # If one of the RMs this resource needs to wait for has FAILED
789             # and is critical we raise an exception
790             if rm.state == ResourceState.FAILED:
791                 if not rm.get('critical'):
792                     continue
793                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
794                 raise RuntimeError, msg
795
796             # If the RM state is lower than the requested state we must
797             # reschedule (e.g. if RM is READY but we required STARTED).
798             if rm.state < state:
799                 reschedule = True
800                 break
801
802             # If there is a time restriction, we must verify the
803             # restriction is satisfied 
804             if time:
805                 if state == ResourceState.DISCOVERED:
806                     t = rm.discover_time
807                 if state == ResourceState.PROVISIONED:
808                     t = rm.provision_time
809                 elif state == ResourceState.READY:
810                     t = rm.ready_time
811                 elif state == ResourceState.STARTED:
812                     t = rm.start_time
813                 elif state == ResourceState.STOPPED:
814                     t = rm.stop_time
815                 elif state == ResourceState.RELEASED:
816                     t = rm.release_time
817                 else:
818                     break
819
820                 # time already elapsed since RM changed state
821                 waited = "%fs" % tdiffsec(tnow(), t)
822
823                 # time still to wait
824                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
825
826                 if wait > 0.001:
827                     reschedule = True
828                     delay = "%fs" % wait
829                     break
830
831         return reschedule, delay
832
833     def set_with_conditions(self, name, value, group, state, time):
834         """ Set value 'value' on attribute with name 'name' when 'time' 
835         has elapsed since all elements in 'group' have reached state
836         'state'
837
838         :param name: Name of the attribute to set
839         :type name: str
840         :param name: Value of the attribute to set
841         :type name: str
842         :param group: Group of RMs to wait for (list of guids)
843         :type group: int or list of int
844         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
845         :type state: str
846         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
847         :type time: str
848         """
849
850         reschedule = False
851         delay = reschedule_delay 
852
853         ## evaluate if set conditions are met
854
855         # only can set with conditions after the RM is started
856         if self.state != ResourceState.STARTED:
857             reschedule = True
858         else:
859             reschedule, delay = self._needs_reschedule(group, state, time)
860
861         if reschedule:
862             callback = functools.partial(self.set_with_conditions, 
863                     name, value, group, state, time)
864             self.ec.schedule(delay, callback)
865         else:
866             self.set(name, value)
867
868     def start_with_conditions(self):
869         """ Starts RM when all the conditions in self.conditions for
870         action 'START' are satisfied.
871
872         """
873         #import pdb;pdb.set_trace()
874
875         reschedule = False
876         delay = reschedule_delay 
877
878
879         ## evaluate if conditions to start are met
880         if self.ec.abort:
881             return 
882
883         # Can only start when RM is either STOPPED or READY
884         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
885             reschedule = True
886             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
887         else:
888             start_conditions = self.conditions.get(ResourceAction.START, [])
889             
890             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
891             
892             # Verify all start conditions are met
893             for (group, state, time) in start_conditions:
894                 # Uncomment for debug
895                 #unmet = []
896                 #for guid in group:
897                 #    rm = self.ec.get_resource(guid)
898                 #    unmet.append((guid, rm._state))
899                 #
900                 #self.debug("---- WAITED STATES ---- %s" % unmet )
901
902                 reschedule, delay = self._needs_reschedule(group, state, time)
903                 if reschedule:
904                     break
905
906         if reschedule:
907             self.ec.schedule(delay, self.start_with_conditions)
908         else:
909             self.debug("----- STARTING ---- ")
910             self.start()
911
912     def stop_with_conditions(self):
913         """ Stops RM when all the conditions in self.conditions for
914         action 'STOP' are satisfied.
915
916         """
917         reschedule = False
918         delay = reschedule_delay 
919
920         ## evaluate if conditions to stop are met
921         if self.ec.abort:
922             return 
923
924         # only can stop when RM is STARTED
925         if self.state != ResourceState.STARTED:
926             reschedule = True
927             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
928         else:
929             self.debug(" ---- STOP CONDITIONS ---- %s" % 
930                     self.conditions.get(ResourceAction.STOP))
931
932             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
933             for (group, state, time) in stop_conditions:
934                 reschedule, delay = self._needs_reschedule(group, state, time)
935                 if reschedule:
936                     break
937
938         if reschedule:
939             callback = functools.partial(self.stop_with_conditions)
940             self.ec.schedule(delay, callback)
941         else:
942             self.debug(" ----- STOPPING ---- ") 
943             self.stop()
944
945     def deploy_with_conditions(self):
946         """ Deploy RM when all the conditions in self.conditions for
947         action 'READY' are satisfied.
948
949         """
950         reschedule = False
951         delay = reschedule_delay 
952
953         ## evaluate if conditions to deploy are met
954         if self.ec.abort:
955             return 
956
957         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
958         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
959                 ResourceState.PROVISIONED]:
960             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
961             reschedule = True
962             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
963         else:
964             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
965             
966             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
967             
968             # Verify all start conditions are met
969             for (group, state, time) in deploy_conditions:
970                 # Uncomment for debug
971                 #unmet = []
972                 #for guid in group:
973                 #    rm = self.ec.get_resource(guid)
974                 #    unmet.append((guid, rm._state))
975                 
976                 #self.debug("---- WAITED STATES ---- %s" % unmet )
977
978                 reschedule, delay = self._needs_reschedule(group, state, time)
979                 if reschedule:
980                     break
981
982         if reschedule:
983             self.ec.schedule(delay, self.deploy_with_conditions)
984         else:
985             self.debug("----- DEPLOYING ---- ")
986             self.deploy()
987
988     def do_connect(self, guid):
989         """ Performs actions that need to be taken upon associating RMs.
990         This method should be redefined when necessary in child classes.
991         """
992         pass
993
994     def do_disconnect(self, guid):
995         """ Performs actions that need to be taken upon disassociating RMs.
996         This method should be redefined when necessary in child classes.
997         """
998         pass
999
1000     def valid_connection(self, guid):
1001         """Checks whether a connection with the other RM
1002         is valid.
1003         This method need to be redefined by each new Resource Manager.
1004
1005         :param guid: Guid of the current Resource Manager
1006         :type guid: int
1007         :rtype:  Boolean
1008
1009         """
1010         # TODO: Validate!
1011         return True
1012
1013     def do_discover(self):
1014         self.set_discovered()
1015
1016     def do_provision(self):
1017         self.set_provisioned()
1018
1019     def do_start(self):
1020         self.set_started()
1021
1022     def do_stop(self):
1023         self.set_stopped()
1024
1025     def do_deploy(self):
1026         self.set_ready()
1027
1028     def do_release(self):
1029         self.set_released()
1030
1031     def do_fail(self):
1032         self.set_failed()
1033         self.ec.inform_failure(self.guid)
1034
1035     def set_started(self, time = None):
1036         """ Mark ResourceManager as STARTED """
1037         self.set_state(ResourceState.STARTED, "_start_time", time)
1038         self.debug("----- STARTED ---- ")
1039
1040     def set_stopped(self, time = None):
1041         """ Mark ResourceManager as STOPPED """
1042         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1043         self.debug("----- STOPPED ---- ")
1044
1045     def set_ready(self, time = None):
1046         """ Mark ResourceManager as READY """
1047         self.set_state(ResourceState.READY, "_ready_time", time)
1048         self.debug("----- READY ---- ")
1049
1050     def set_released(self, time = None):
1051         """ Mark ResourceManager as REALEASED """
1052         self.set_state(ResourceState.RELEASED, "_release_time", time)
1053         self.debug("----- RELEASED ---- ")
1054
1055     def set_failed(self, time = None):
1056         """ Mark ResourceManager as FAILED """
1057         self.set_state(ResourceState.FAILED, "_failed_time", time)
1058         self.debug("----- FAILED ---- ")
1059
1060     def set_discovered(self, time = None):
1061         """ Mark ResourceManager as DISCOVERED """
1062         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1063         self.debug("----- DISCOVERED ---- ")
1064
1065     def set_provisioned(self, time = None):
1066         """ Mark ResourceManager as PROVISIONED """
1067         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1068         self.debug("----- PROVISIONED ---- ")
1069
1070     def set_state(self, state, state_time_attr, time = None):
1071         """ Set the state of the RM while keeping a trace of the time """
1072
1073         # Ensure that RM state will not change after released
1074         if self._state == ResourceState.RELEASED:
1075             return 
1076
1077         time = time or tnow()
1078         self.set_state_time(state, state_time_attr, time)
1079   
1080     def set_state_time(self, state, state_time_attr, time):
1081         """ Set the time for the RM state change """
1082         setattr(self, state_time_attr, time)
1083         self._state = state
1084
1085 class ResourceFactory(object):
1086     _resource_types = dict()
1087
1088     @classmethod
1089     def resource_types(cls):
1090         """Return the type of the Class"""
1091         return cls._resource_types
1092
1093     @classmethod
1094     def get_resource_type(cls, rtype):
1095         """Return the type of the Class"""
1096         return cls._resource_types.get(rtype)
1097
1098     @classmethod
1099     def register_type(cls, rclass):
1100         """Register a new Ressource Manager"""
1101         cls._resource_types[rclass.get_rtype()] = rclass
1102
1103     @classmethod
1104     def create(cls, rtype, ec, guid):
1105         """Create a new instance of a Ressource Manager"""
1106         rclass = cls._resource_types[rtype]
1107         return rclass(ec, guid)
1108
1109 def populate_factory():
1110     """Find and rgister all available RMs
1111     """
1112     # Once the factory is populated, don't repopulate
1113     if not ResourceFactory.resource_types():
1114         for rclass in find_types():
1115             ResourceFactory.register_type(rclass)
1116
1117 def find_types():
1118     """Look into the different folders to find all the 
1119     availables Resources Managers
1120     """
1121     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1122     search_path = set(search_path.split(" "))
1123    
1124     import inspect
1125     import nepi.resources 
1126     path = os.path.dirname(nepi.resources.__file__)
1127     search_path.add(path)
1128
1129     types = set()
1130
1131     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1132             prefix = "nepi.resources."):
1133
1134         loader = importer.find_module(modname)
1135         
1136         try:
1137             # Notice: Repeated calls to load_module will act as a reload of the module
1138             if modname in sys.modules:
1139                 module = sys.modules.get(modname)
1140             else:
1141                 module = loader.load_module(modname)
1142
1143             for attrname in dir(module):
1144                 if attrname.startswith("_"):
1145                     continue
1146
1147                 attr = getattr(module, attrname)
1148
1149                 if attr == ResourceManager:
1150                     continue
1151
1152                 if not inspect.isclass(attr):
1153                     continue
1154
1155                 if issubclass(attr, ResourceManager):
1156                     types.add(attr)
1157
1158                     if not modname in sys.modules:
1159                         sys.modules[modname] = module
1160
1161         except:
1162             import traceback
1163             import logging
1164             err = traceback.format_exc()
1165             logger = logging.getLogger("Resource.find_types()")
1166             logger.error("Error while loading Resource Managers %s" % err)
1167
1168     return types
1169