16b434fbddb8c730010f329a842ea011756c9b38
[nepi.git] / src / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
21 from nepi.util.logger import Logger
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.trace import TraceAttr
24
25 import copy
26 import functools
27 import logging
28 import os
29 import pkgutil
30 import sys
31 import threading
32 import weakref
33
34 class ResourceAction:
35     """ Action that a user can order to a Resource Manager
36    
37     """
38     DEPLOY = 0
39     START = 1
40     STOP = 2
41
42 class ResourceState:
43     """ State of a Resource Manager
44    
45     """
46     NEW = 0
47     DISCOVERED = 1
48     RESERVED = 2
49     PROVISIONED = 3
50     READY = 4
51     STARTED = 5
52     STOPPED = 6
53     FAILED = 7
54     RELEASED = 8
55
56 ResourceState2str = dict({
57     ResourceState.NEW : "NEW",
58     ResourceState.DISCOVERED : "DISCOVERED",
59     ResourceState.RESERVED : "RESERVED",
60     ResourceState.PROVISIONED : "PROVISIONED",
61     ResourceState.READY : "READY",
62     ResourceState.STARTED : "STARTED",
63     ResourceState.STOPPED : "STOPPED",
64     ResourceState.FAILED : "FAILED",
65     ResourceState.RELEASED : "RELEASED",
66     })
67
68 def clsinit(cls):
69     """ Initializes template information (i.e. attributes and traces)
70     on classes derived from the ResourceManager class.
71
72     It is used as a decorator in the class declaration as follows:
73
74         @clsinit
75         class MyResourceManager(ResourceManager):
76         
77             ...
78
79      """
80
81     cls._clsinit()
82     return cls
83
84 def clsinit_copy(cls):
85     """ Initializes template information (i.e. attributes and traces)
86     on classes direved from the ResourceManager class.
87     It differs from the clsinit method in that it forces inheritance
88     of attributes and traces from the parent class.
89
90     It is used as a decorator in the class declaration as follows:
91
92         @clsinit
93         class MyResourceManager(ResourceManager):
94         
95             ...
96
97
98     clsinit_copy should be prefered to clsinit when creating new
99     ResourceManager child classes.
100
101     """
102     
103     cls._clsinit_copy()
104     return cls
105
106 def failtrap(func):
107     """ Decorator function for instance methods that should set the 
108     RM state to FAILED when an error is raised. The methods that must be
109     decorated are: discover, reserved, provision, deploy, start, stop.
110
111     """
112     def wrapped(self, *args, **kwargs):
113         try:
114             return func(self, *args, **kwargs)
115         except:
116             self.fail()
117             
118             import traceback
119             err = traceback.format_exc()
120             logger = Logger(self._rtype)
121             logger.error(err)
122             logger.error("SETTING guid %d to state FAILED" % self.guid)
123             raise
124     
125     return wrapped
126
127 @clsinit
128 class ResourceManager(Logger):
129     """ Base clase for all ResourceManagers. 
130     
131     A ResourceManger is specific to a resource type (e.g. Node, 
132     Switch, Application, etc) on a specific platform (e.g. PlanetLab, 
133     OMF, etc).
134
135     The ResourceManager instances are responsible for interacting with
136     and controlling concrete (physical or virtual) resources in the 
137     experimental platforms.
138     
139     """
140     _rtype = "Resource"
141     _attributes = None
142     _traces = None
143     _help = None
144     _platform = None
145     _reschedule_delay = "0.5s"
146
147     @classmethod
148     def _register_attribute(cls, attr):
149         """ Resource subclasses will invoke this method to add a 
150         resource attribute
151
152         """
153         
154         cls._attributes[attr.name] = attr
155
156     @classmethod
157     def _remove_attribute(cls, name):
158         """ Resource subclasses will invoke this method to remove a 
159         resource attribute
160
161         """
162         
163         del cls._attributes[name]
164
165     @classmethod
166     def _register_trace(cls, trace):
167         """ Resource subclasses will invoke this method to add a 
168         resource trace
169
170         """
171         
172         cls._traces[trace.name] = trace
173
174     @classmethod
175     def _remove_trace(cls, name):
176         """ Resource subclasses will invoke this method to remove a 
177         resource trace
178
179         """
180         
181         del cls._traces[name]
182
183     @classmethod
184     def _register_attributes(cls):
185         """ Resource subclasses will invoke this method to register
186         resource attributes.
187
188         This method should be overriden in the RMs that define
189         attributes.
190
191         """
192         critical = Attribute("critical", 
193                 "Defines whether the resource is critical. "
194                 "A failure on a critical resource will interrupt "
195                 "the experiment. ",
196                 type = Types.Bool,
197                 default = True,
198                 flags = Flags.Design)
199         hard_release = Attribute("hardRelease", 
200                 "Forces removal of all result files and directories associated "
201                 "to the RM upon resource release. After release the RM will "
202                 "be removed from the EC and the results will not longer be "
203                 "accessible",
204                 type = Types.Bool,
205                 default = False,
206                 flags = Flags.Design)
207
208         cls._register_attribute(critical)
209         cls._register_attribute(hard_release)
210         
211     @classmethod
212     def _register_traces(cls):
213         """ Resource subclasses will invoke this method to register
214         resource traces
215
216         This method should be overriden in the RMs that define traces.
217         
218         """
219         
220         pass
221
222     @classmethod
223     def _clsinit(cls):
224         """ ResourceManager classes have different attributes and traces.
225         Attribute and traces are stored in 'class attribute' dictionaries.
226         When a new ResourceManager class is created, the _clsinit method is 
227         called to create a new instance of those dictionaries and initialize 
228         them.
229         
230         The _clsinit method is called by the clsinit decorator method.
231         
232         """
233         
234         # static template for resource attributes
235         cls._attributes = dict()
236         cls._register_attributes()
237
238         # static template for resource traces
239         cls._traces = dict()
240         cls._register_traces()
241
242     @classmethod
243     def _clsinit_copy(cls):
244         """ Same as _clsinit, except that after creating new instances of the
245         dictionaries it copies all the attributes and traces from the parent 
246         class.
247         
248         The _clsinit_copy method is called by the clsinit_copy decorator method.
249         
250         """
251         # static template for resource attributes
252         cls._attributes = copy.deepcopy(cls._attributes)
253         cls._register_attributes()
254
255         # static template for resource traces
256         cls._traces = copy.deepcopy(cls._traces)
257         cls._register_traces()
258
259     @classmethod
260     def get_rtype(cls):
261         """ Returns the type of the Resource Manager
262
263         """
264         return cls._rtype
265
266     @classmethod
267     def get_attributes(cls):
268         """ Returns a copy of the attributes
269
270         """
271         return copy.deepcopy(cls._attributes.values())
272
273     @classmethod
274     def get_attribute(cls, name):
275         """ Returns a copy of the attribute with name 'name'
276
277         """
278         return copy.deepcopy(cls._attributes[name])
279
280     @classmethod
281     def get_traces(cls):
282         """ Returns a copy of the traces
283
284         """
285         return copy.deepcopy(cls._traces.values())
286
287     @classmethod
288     def get_help(cls):
289         """ Returns the description of the type of Resource
290
291         """
292         return cls._help
293
294     @classmethod
295     def get_platform(cls):
296         """ Returns the identified of the platform (i.e. testbed type)
297         for the Resource
298
299         """
300         return cls._platform
301
302     @classmethod
303     def get_global(cls, name):
304         """ Returns the value of a global attribute
305             Global attribute meaning an attribute for 
306             all the resources from a rtype
307
308         :param name: Name of the attribute
309         :type name: str
310         :rtype: str
311         """
312         global_attr = cls._attributes[name]
313         return global_attr.value
314
315     @classmethod
316     def set_global(cls, name, value):
317         """ Set value for a global attribute
318
319         :param name: Name of the attribute
320         :type name: str
321         :param name: Value of the attribute
322         :type name: str
323         """
324         global_attr = cls._attributes[name]
325         global_attr.value = value
326         return value
327
328     def __init__(self, ec, guid):
329         super(ResourceManager, self).__init__(self.get_rtype())
330         
331         self._guid = guid
332         self._ec = weakref.ref(ec)
333         self._connections = set()
334         self._conditions = dict() 
335
336         # the resource instance gets a copy of all attributes
337         self._attrs = copy.deepcopy(self._attributes)
338
339         # the resource instance gets a copy of all traces
340         self._trcs = copy.deepcopy(self._traces)
341
342         # Each resource is placed on a deployment group by the EC
343         # during deployment
344         self.deployment_group = None
345
346         self._start_time = None
347         self._stop_time = None
348         self._discover_time = None
349         self._reserved_time = None
350         self._provision_time = None
351         self._ready_time = None
352         self._release_time = None
353         self._failed_time = None
354
355         self._state = ResourceState.NEW
356
357         # instance lock to synchronize exclusive state change methods (such
358         # as deploy and release methods), in order to prevent them from being 
359         # executed at the same time and corrupt internal resource state
360         self._release_lock = threading.Lock()
361
362     @property
363     def guid(self):
364         """ Returns the global unique identifier of the RM """
365         return self._guid
366
367     @property
368     def ec(self):
369         """ Returns the Experiment Controller of the RM """
370         return self._ec()
371
372     @property
373     def connections(self):
374         """ Returns the set of guids of connected RMs """
375         return self._connections
376
377     @property
378     def conditions(self):
379         """ Returns the conditions to which the RM is subjected to.
380         
381         This method returns a dictionary of conditions lists indexed by
382         a ResourceAction.
383         
384         """
385         return self._conditions
386
387     @property
388     def start_time(self):
389         """ Returns the start time of the RM as a timestamp """
390         return self._start_time
391
392     @property
393     def stop_time(self):
394         """ Returns the stop time of the RM as a timestamp """
395         return self._stop_time
396
397     @property
398     def discover_time(self):
399         """ Returns the discover time of the RM as a timestamp """
400         return self._discover_time
401
402     @property
403     def reserved_time(self):
404         """ Returns the resreved time of the RM as a timestamp """
405         return self._reserved_time
406
407     @property
408     def provision_time(self):
409         """ Returns the provision time of the RM as a timestamp """
410         return self._provision_time
411
412     @property
413     def ready_time(self):
414         """ Returns the deployment time of the RM as a timestamp """
415         return self._ready_time
416
417     @property
418     def release_time(self):
419         """ Returns the release time of the RM as a timestamp """
420         return self._release_time
421
422     @property
423     def failed_time(self):
424         """ Returns the time failure occured for the RM as a timestamp """
425         return self._failed_time
426
427     @property
428     def state(self):
429         """ Get the current state of the RM """
430         return self._state
431
432     @property
433     def reschedule_delay(self):
434         """ Returns default reschedule delay """
435         return self._reschedule_delay
436
437     def log_message(self, msg):
438         """ Returns the log message formatted with added information.
439
440         :param msg: text message
441         :type msg: str
442         :rtype: str
443
444         """
445         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
446
447     def register_connection(self, guid):
448         """ Registers a connection to the RM identified by guid
449
450         This method should not be overriden. Specific functionality
451         should be added in the do_connect method.
452
453         :param guid: Global unique identified of the RM to connect to
454         :type guid: int
455
456         """
457         if self.valid_connection(guid):
458             self.do_connect(guid)
459             self._connections.add(guid)
460
461     def unregister_connection(self, guid):
462         """ Removes a registered connection to the RM identified by guid
463         
464         This method should not be overriden. Specific functionality
465         should be added in the do_disconnect method.
466
467         :param guid: Global unique identified of the RM to connect to
468         :type guid: int
469
470         """
471         if guid in self._connections:
472             self.do_disconnect(guid)
473             self._connections.remove(guid)
474
475     @failtrap
476     def discover(self):
477         """ Performs resource discovery.
478         
479         This  method is responsible for selecting an individual resource
480         matching user requirements.
481
482         This method should not be overriden directly. Specific functionality
483         should be added in the do_discover method.
484
485         """
486         with self._release_lock:
487             if self._state != ResourceState.RELEASED:
488                 self.do_discover()
489
490     @failtrap
491     def reserve(self):
492         """ Performs resource reserve.
493         
494         This  method is responsible for reserving an individual resource
495         matching user requirements.
496
497         This method should not be overriden directly. Specific functionality
498         should be added in the do_reserved method.
499
500         """
501         with self._release_lock:
502             if self._state != ResourceState.RELEASED:
503                 self.do_reserve()
504
505     @failtrap
506     def provision(self):
507         """ Performs resource provisioning.
508
509         This  method is responsible for provisioning one resource.
510         After this method has been successfully invoked, the resource
511         should be accessible/controllable by the RM.
512
513         This method should not be overriden directly. Specific functionality
514         should be added in the do_provision method.
515
516         """
517         with self._release_lock:
518             if self._state != ResourceState.RELEASED:
519                 self.do_provision()
520
521     @failtrap
522     def start(self):
523         """ Starts the RM (e.g. launch remote process).
524     
525         There is no standard start behavior. Some RMs will not need to perform
526         any actions upon start.
527
528         This method should not be overriden directly. Specific functionality
529         should be added in the do_start method.
530
531         """
532
533         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
534             self.error("Wrong state %s for start" % self.state)
535             return
536
537         with self._release_lock:
538             if self._state != ResourceState.RELEASED:
539                 self.do_start()
540
541     @failtrap
542     def stop(self):
543         """ Interrupts the RM, stopping any tasks the RM was performing.
544      
545         There is no standard stop behavior. Some RMs will not need to perform
546         any actions upon stop.
547     
548         This method should not be overriden directly. Specific functionality
549         should be added in the do_stop method.
550       
551         """
552         if not self.state in [ResourceState.STARTED]:
553             self.error("Wrong state %s for stop" % self.state)
554             return
555         
556         with self._release_lock:
557             self.do_stop()
558
559     @failtrap
560     def deploy(self):
561         """ Execute all steps required for the RM to reach the state READY.
562
563         This method is responsible for deploying the resource (and invoking 
564         the discover and provision methods).
565  
566         This method should not be overriden directly. Specific functionality
567         should be added in the do_deploy method.
568        
569         """
570         if self.state > ResourceState.READY:
571             self.error("Wrong state %s for deploy" % self.state)
572             return
573
574         with self._release_lock:
575             if self._state != ResourceState.RELEASED:
576                 self.do_deploy()
577
578     def release(self):
579         """ Perform actions to free resources used by the RM.
580   
581         This  method is responsible for releasing resources that were
582         used during the experiment by the RM.
583
584         This method should not be overriden directly. Specific functionality
585         should be added in the do_release method.
586       
587         """
588         with self._release_lock:
589             try:
590                 self.do_release()
591             except:
592                 self.set_released()
593
594                 import traceback
595                 err = traceback.format_exc()
596                 msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
597                         self._rtype, self.guid, err)
598                 logger = Logger(self._rtype)
599                 logger.debug(msg)
600
601     def fail(self):
602         """ Sets the RM to state FAILED.
603
604         This method should not be overriden directly. Specific functionality
605         should be added in the do_fail method.
606
607         """
608         with self._release_lock:
609             if self._state != ResourceState.RELEASED:
610                 self.do_fail()
611
612     def set(self, name, value):
613         """ Set the value of the attribute
614
615         :param name: Name of the attribute
616         :type name: str
617         :param name: Value of the attribute
618         :type name: str
619         """
620         attr = self._attrs[name]
621         attr.value = value
622         return value
623
624     def get(self, name):
625         """ Returns the value of the attribute
626
627         :param name: Name of the attribute
628         :type name: str
629         :rtype: str
630         """
631         attr = self._attrs[name]
632
633         """
634         A.Q. Commenting due to performance impact
635         if attr.has_flag(Flags.Global):
636             self.warning( "Attribute %s is global. Use get_global instead." % name)
637         """
638             
639         return attr.value
640
641     def has_changed(self, name):
642         """ Returns the True is the value of the attribute
643             has been modified by the user.
644
645         :param name: Name of the attribute
646         :type name: str
647         :rtype: str
648         """
649         attr = self._attrs[name]
650         return attr.has_changed
651
652     def has_flag(self, name, flag):
653         """ Returns true if the attribute has the flag 'flag'
654
655         :param flag: Flag to be checked
656         :type flag: Flags
657         """
658         attr = self._attrs[name]
659         return attr.has_flag(flag)
660
661     def has_attribute(self, name):
662         """ Returns true if the RM has an attribute with name
663
664         :param name: name of the attribute
665         :type name: string
666         """
667         return name in self._attrs
668
669     def enable_trace(self, name):
670         """ Explicitly enable trace generation
671
672         :param name: Name of the trace
673         :type name: str
674         """
675         trace = self._trcs[name]
676         trace.enabled = True
677     
678     def trace_enabled(self, name):
679         """Returns True if trace is enables 
680
681         :param name: Name of the trace
682         :type name: str
683         """
684         trace = self._trcs[name]
685         return trace.enabled
686  
687     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
688         """ Get information on collected trace
689
690         :param name: Name of the trace
691         :type name: str
692
693         :param attr: Can be one of:
694                          - TraceAttr.ALL (complete trace content), 
695                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
696                          - TraceAttr.PATH (full path to the trace file),
697                          - TraceAttr.SIZE (size of trace file). 
698         :type attr: str
699
700         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
701         :type name: int
702
703         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
704         :type name: int
705
706         :rtype: str
707         """
708         pass
709
710     def register_condition(self, action, group, state, time = None):
711         """ Registers a condition on the resource manager to allow execution 
712         of 'action' only after 'time' has elapsed from the moment all resources 
713         in 'group' reached state 'state'
714
715         :param action: Action to restrict to condition (either 'START' or 'STOP')
716         :type action: str
717         :param group: Group of RMs to wait for (list of guids)
718         :type group: int or list of int
719         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
720         :type state: str
721         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
722         :type time: str
723
724         """
725
726         if not action in self.conditions:
727             self._conditions[action] = list()
728         
729         conditions = self.conditions.get(action)
730
731         # For each condition to register a tuple of (group, state, time) is 
732         # added to the 'action' list
733         if not isinstance(group, list):
734             group = [group]
735
736         conditions.append((group, state, time))
737
738     def unregister_condition(self, group, action = None):
739         """ Removed conditions for a certain group of guids
740
741         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
742         :type action: str
743
744         :param group: Group of RMs to wait for (list of guids)
745         :type group: int or list of int
746
747         """
748         # For each condition a tuple of (group, state, time) is 
749         # added to the 'action' list
750         if not isinstance(group, list):
751             group = [group]
752
753         for act, conditions in self.conditions.iteritems():
754             if action and act != action:
755                 continue
756
757             for condition in list(conditions):
758                 (grp, state, time) = condition
759
760                 # If there is an intersection between grp and group,
761                 # then remove intersected elements
762                 intsec = set(group).intersection(set(grp))
763                 if intsec:
764                     idx = conditions.index(condition)
765                     newgrp = set(grp)
766                     newgrp.difference_update(intsec)
767                     conditions[idx] = (newgrp, state, time)
768                  
769     def get_connected(self, rtype = None):
770         """ Returns the list of RM with the type 'rtype'
771
772         :param rtype: Type of the RM we look for
773         :type rtype: str
774         :return: list of guid
775         """
776         connected = []
777         rclass = ResourceFactory.get_resource_type(rtype)
778         for guid in self.connections:
779             rm = self.ec.get_resource(guid)
780             if not rtype or isinstance(rm, rclass):
781                 connected.append(rm)
782         return connected
783
784     def is_rm_instance(self, rtype):
785         """ Returns True if the RM is instance of 'rtype'
786
787         :param rtype: Type of the RM we look for
788         :type rtype: str
789         :return: True|False
790         """
791         rclass = ResourceFactory.get_resource_type(rtype)
792         if isinstance(self, rclass):
793             return True
794         return False
795
796     @failtrap
797     def _needs_reschedule(self, group, state, time):
798         """ Internal method that verify if 'time' has elapsed since 
799         all elements in 'group' have reached state 'state'.
800
801         :param group: Group of RMs to wait for (list of guids)
802         :type group: int or list of int
803         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
804         :type state: str
805         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
806         :type time: str
807
808         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
809         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
810         For the moment, 2m30s is not a correct syntax.
811
812         """
813         reschedule = False
814         delay = self.reschedule_delay 
815
816         # check state and time elapsed on all RMs
817         for guid in group:
818             rm = self.ec.get_resource(guid)
819             
820             # If one of the RMs this resource needs to wait for has FAILED
821             # and is critical we raise an exception
822             if rm.state == ResourceState.FAILED:
823                 if not rm.get('critical'):
824                     continue
825                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
826                 raise RuntimeError, msg
827
828             # If the RM state is lower than the requested state we must
829             # reschedule (e.g. if RM is READY but we required STARTED).
830             if rm.state < state:
831                 reschedule = True
832                 break
833
834             # If there is a time restriction, we must verify the
835             # restriction is satisfied 
836             if time:
837                 if state == ResourceState.DISCOVERED:
838                     t = rm.discover_time
839                 elif state == ResourceState.RESERVED:
840                     t = rm.reserved_time
841                 elif state == ResourceState.PROVISIONED:
842                     t = rm.provision_time
843                 elif state == ResourceState.READY:
844                     t = rm.ready_time
845                 elif state == ResourceState.STARTED:
846                     t = rm.start_time
847                 elif state == ResourceState.STOPPED:
848                     t = rm.stop_time
849                 elif state == ResourceState.RELEASED:
850                     t = rm.release_time
851                 else:
852                     break
853
854                 # time already elapsed since RM changed state
855                 waited = "%fs" % tdiffsec(tnow(), t)
856
857                 # time still to wait
858                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
859
860                 if wait > 0.001:
861                     reschedule = True
862                     delay = "%fs" % wait
863                     break
864
865         return reschedule, delay
866
867     def set_with_conditions(self, name, value, group, state, time):
868         """ Set value 'value' on attribute with name 'name' when 'time' 
869         has elapsed since all elements in 'group' have reached state
870         'state'
871
872         :param name: Name of the attribute to set
873         :type name: str
874         :param name: Value of the attribute to set
875         :type name: str
876         :param group: Group of RMs to wait for (list of guids)
877         :type group: int or list of int
878         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
879         :type state: str
880         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
881         :type time: str
882         """
883
884         reschedule = False
885         delay = self.reschedule_delay 
886
887         ## evaluate if set conditions are met
888
889         # only can set with conditions after the RM is started
890         if self.state != ResourceState.STARTED:
891             reschedule = True
892         else:
893             reschedule, delay = self._needs_reschedule(group, state, time)
894
895         if reschedule:
896             callback = functools.partial(self.set_with_conditions, 
897                     name, value, group, state, time)
898             self.ec.schedule(delay, callback)
899         else:
900             self.set(name, value)
901
902     def start_with_conditions(self):
903         """ Starts RM when all the conditions in self.conditions for
904         action 'START' are satisfied.
905
906         """
907         #import pdb;pdb.set_trace()
908
909         reschedule = False
910         delay = self.reschedule_delay 
911
912
913         ## evaluate if conditions to start are met
914         if self.ec.abort:
915             return 
916
917         # Can only start when RM is either STOPPED or READY
918         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
919             reschedule = True
920             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
921         else:
922             start_conditions = self.conditions.get(ResourceAction.START, [])
923             
924             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
925             
926             # Verify all start conditions are met
927             for (group, state, time) in start_conditions:
928                 # Uncomment for debug
929                 #unmet = []
930                 #for guid in group:
931                 #    rm = self.ec.get_resource(guid)
932                 #    unmet.append((guid, rm._state))
933                 #
934                 #self.debug("---- WAITED STATES ---- %s" % unmet )
935
936                 reschedule, delay = self._needs_reschedule(group, state, time)
937                 if reschedule:
938                     break
939
940         if reschedule:
941             self.ec.schedule(delay, self.start_with_conditions)
942         else:
943             self.debug("----- STARTING ---- ")
944             self.start()
945
946     def stop_with_conditions(self):
947         """ Stops RM when all the conditions in self.conditions for
948         action 'STOP' are satisfied.
949
950         """
951         reschedule = False
952         delay = self.reschedule_delay 
953
954         ## evaluate if conditions to stop are met
955         if self.ec.abort:
956             return 
957
958         # only can stop when RM is STARTED
959         if self.state != ResourceState.STARTED:
960             reschedule = True
961             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
962         else:
963             self.debug(" ---- STOP CONDITIONS ---- %s" % 
964                     self.conditions.get(ResourceAction.STOP))
965
966             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
967             for (group, state, time) in stop_conditions:
968                 reschedule, delay = self._needs_reschedule(group, state, time)
969                 if reschedule:
970                     break
971
972         if reschedule:
973             callback = functools.partial(self.stop_with_conditions)
974             self.ec.schedule(delay, callback)
975         else:
976             self.debug(" ----- STOPPING ---- ") 
977             self.stop()
978
979     def deploy_with_conditions(self):
980         """ Deploy RM when all the conditions in self.conditions for
981         action 'READY' are satisfied.
982
983         """
984         reschedule = False
985         delay = self.reschedule_delay 
986
987         ## evaluate if conditions to deploy are met
988         if self.ec.abort:
989             return 
990
991         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
992         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
993                 ResourceState.RESERVED, ResourceState.PROVISIONED]:
994             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
995             reschedule = True
996             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
997         else:
998             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
999             
1000             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
1001             
1002             # Verify all start conditions are met
1003             for (group, state, time) in deploy_conditions:
1004                 # Uncomment for debug
1005                 #unmet = []
1006                 #for guid in group:
1007                 #    rm = self.ec.get_resource(guid)
1008                 #    unmet.append((guid, rm._state))
1009                 
1010                 #self.debug("---- WAITED STATES ---- %s" % unmet )
1011
1012                 reschedule, delay = self._needs_reschedule(group, state, time)
1013                 if reschedule:
1014                     break
1015
1016         if reschedule:
1017             self.ec.schedule(delay, self.deploy_with_conditions)
1018         else:
1019             self.debug("----- DEPLOYING ---- ")
1020             self.deploy()
1021
1022     def do_connect(self, guid):
1023         """ Performs actions that need to be taken upon associating RMs.
1024         This method should be redefined when necessary in child classes.
1025         """
1026         pass
1027
1028     def do_disconnect(self, guid):
1029         """ Performs actions that need to be taken upon disassociating RMs.
1030         This method should be redefined when necessary in child classes.
1031         """
1032         pass
1033
1034     def valid_connection(self, guid):
1035         """Checks whether a connection with the other RM
1036         is valid.
1037         This method need to be redefined by each new Resource Manager.
1038
1039         :param guid: Guid of the current Resource Manager
1040         :type guid: int
1041         :rtype:  Boolean
1042
1043         """
1044         # TODO: Validate!
1045         return True
1046
1047     def do_discover(self):
1048         self.set_discovered()
1049
1050     def do_reserve(self):
1051         self.set_reserved()
1052
1053     def do_provision(self):
1054         self.set_provisioned()
1055
1056     def do_start(self):
1057         self.set_started()
1058
1059     def do_stop(self):
1060         self.set_stopped()
1061
1062     def do_deploy(self):
1063         self.set_ready()
1064
1065     def do_release(self):
1066         self.set_released()
1067
1068     def do_fail(self):
1069         self.set_failed()
1070         self.ec.inform_failure(self.guid)
1071
1072     def set_started(self, time = None):
1073         """ Mark ResourceManager as STARTED """
1074         self.set_state(ResourceState.STARTED, "_start_time", time)
1075         self.debug("----- STARTED ---- ")
1076
1077     def set_stopped(self, time = None):
1078         """ Mark ResourceManager as STOPPED """
1079         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1080         self.debug("----- STOPPED ---- ")
1081
1082     def set_ready(self, time = None):
1083         """ Mark ResourceManager as READY """
1084         self.set_state(ResourceState.READY, "_ready_time", time)
1085         self.debug("----- READY ---- ")
1086
1087     def set_released(self, time = None):
1088         """ Mark ResourceManager as REALEASED """
1089         self.set_state(ResourceState.RELEASED, "_release_time", time)
1090
1091         msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
1092         logger = Logger(self._rtype)
1093         logger.debug(msg)
1094
1095     def set_failed(self, time = None):
1096         """ Mark ResourceManager as FAILED """
1097         self.set_state(ResourceState.FAILED, "_failed_time", time)
1098
1099         msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
1100         logger = Logger(self._rtype)
1101         logger.debug(msg)
1102
1103     def set_discovered(self, time = None):
1104         """ Mark ResourceManager as DISCOVERED """
1105         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1106         self.debug("----- DISCOVERED ---- ")
1107
1108     def set_reserved(self, time = None):
1109         """ Mark ResourceManager as RESERVED """
1110         self.set_state(ResourceState.RESERVED, "_reserved_time", time)
1111         self.debug("----- RESERVED ---- ")
1112
1113     def set_provisioned(self, time = None):
1114         """ Mark ResourceManager as PROVISIONED """
1115         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1116         self.debug("----- PROVISIONED ---- ")
1117
1118     def set_state(self, state, state_time_attr, time = None):
1119         """ Set the state of the RM while keeping a trace of the time """
1120
1121         # Ensure that RM state will not change after released
1122         if self._state == ResourceState.RELEASED:
1123             return 
1124
1125         time = time or tnow()
1126         self.set_state_time(state, state_time_attr, time)
1127   
1128     def set_state_time(self, state, state_time_attr, time):
1129         """ Set the time for the RM state change """
1130         setattr(self, state_time_attr, time)
1131         self._state = state
1132
1133 class ResourceFactory(object):
1134     _resource_types = dict()
1135
1136     @classmethod
1137     def resource_types(cls):
1138         """Return the type of the Class"""
1139         return cls._resource_types
1140
1141     @classmethod
1142     def get_resource_type(cls, rtype):
1143         """Return the type of the Class"""
1144         return cls._resource_types.get(rtype)
1145
1146     @classmethod
1147     def register_type(cls, rclass):
1148         """Register a new Ressource Manager"""
1149         cls._resource_types[rclass.get_rtype()] = rclass
1150
1151     @classmethod
1152     def create(cls, rtype, ec, guid):
1153         """Create a new instance of a Ressource Manager"""
1154         rclass = cls._resource_types[rtype]
1155         return rclass(ec, guid)
1156
1157 def populate_factory():
1158     """Find and rgister all available RMs
1159     """
1160     # Once the factory is populated, don't repopulate
1161     if not ResourceFactory.resource_types():
1162         for rclass in find_types():
1163             ResourceFactory.register_type(rclass)
1164
1165 def find_types():
1166     """Look into the different folders to find all the 
1167     availables Resources Managers
1168     """
1169     search_path = os.environ.get("NEPI_SEARCH_PATH", "")
1170     search_path = set(search_path.split(" "))
1171    
1172     import inspect
1173     import nepi.resources 
1174     path = os.path.dirname(nepi.resources.__file__)
1175     search_path.add(path)
1176
1177     types = set()
1178
1179     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1180             prefix = "nepi.resources."):
1181
1182         loader = importer.find_module(modname)
1183         
1184         try:
1185             # Notice: Repeated calls to load_module will act as a reload of the module
1186             if modname in sys.modules:
1187                 module = sys.modules.get(modname)
1188             else:
1189                 module = loader.load_module(modname)
1190
1191             for attrname in dir(module):
1192                 if attrname.startswith("_"):
1193                     continue
1194
1195                 attr = getattr(module, attrname)
1196
1197                 if attr == ResourceManager:
1198                     continue
1199
1200                 if not inspect.isclass(attr):
1201                     continue
1202
1203                 if issubclass(attr, ResourceManager):
1204                     types.add(attr)
1205
1206                     if not modname in sys.modules:
1207                         sys.modules[modname] = module
1208
1209         except:
1210             import traceback
1211             import logging
1212             err = traceback.format_exc()
1213             logger = logging.getLogger("Resource.find_types()")
1214             logger.error("Error while loading Resource Managers %s" % err)
1215
1216     return types
1217