no change - cosmetic only - various typos in comments
[nepi.git] / nepi / execution / resource.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
20 from nepi.util.logger import Logger
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.trace import TraceAttr
23
24 import copy
25 import functools
26 import logging
27 import os
28 import pkgutil
29 import sys
30 import threading
31 import weakref
32
33 class ResourceAction:
34     """ Action that a user can order to a Resource Manager
35    
36     """
37     DEPLOY = 0
38     START = 1
39     STOP = 2
40
41 class ResourceState:
42     """ State of a Resource Manager
43    
44     """
45     NEW = 0
46     DISCOVERED = 1
47     RESERVED = 2
48     PROVISIONED = 3
49     READY = 4
50     STARTED = 5
51     STOPPED = 6
52     FAILED = 7
53     RELEASED = 8
54
55 ResourceState2str = dict({
56     ResourceState.NEW : "NEW",
57     ResourceState.DISCOVERED : "DISCOVERED",
58     ResourceState.RESERVED : "RESERVED",
59     ResourceState.PROVISIONED : "PROVISIONED",
60     ResourceState.READY : "READY",
61     ResourceState.STARTED : "STARTED",
62     ResourceState.STOPPED : "STOPPED",
63     ResourceState.FAILED : "FAILED",
64     ResourceState.RELEASED : "RELEASED",
65     })
66
67 def clsinit(cls):
68     """ Initializes template information (i.e. attributes and traces)
69     on classes derived from the ResourceManager class.
70
71     It is used as a decorator in the class declaration as follows:
72
73         @clsinit
74         class MyResourceManager(ResourceManager):
75         
76             ...
77
78      """
79
80     cls._clsinit()
81     return cls
82
83 def clsinit_copy(cls):
84     """ Initializes template information (i.e. attributes and traces)
85     on classes derived from the ResourceManager class.
86     It differs from the clsinit method in that it forces inheritance
87     of attributes and traces from the parent class.
88
89     It is used as a decorator in the class declaration as follows:
90
91         @clsinit
92         class MyResourceManager(ResourceManager):
93         
94             ...
95
96
97     clsinit_copy should be prefered to clsinit when creating new
98     ResourceManager child classes.
99
100     """
101     
102     cls._clsinit_copy()
103     return cls
104
105 def failtrap(func):
106     """ Decorator function for instance methods that should set the 
107     RM state to FAILED when an error is raised. The methods that must be
108     decorated are: discover, reserved, provision, deploy, start, stop.
109
110     """
111     def wrapped(self, *args, **kwargs):
112         try:
113             return func(self, *args, **kwargs)
114         except:
115             self.fail()
116             
117             import traceback
118             err = traceback.format_exc()
119             logger = Logger(self._rtype)
120             logger.error(err)
121             logger.error("SETTING guid %d to state FAILED" % self.guid)
122             raise
123     
124     return wrapped
125
126 @clsinit
127 class ResourceManager(Logger):
128     """ Base clase for all ResourceManagers. 
129     
130     A ResourceManger is specific to a resource type (e.g. Node, 
131     Switch, Application, etc) on a specific platform (e.g. PlanetLab, 
132     OMF, etc).
133
134     The ResourceManager instances are responsible for interacting with
135     and controlling concrete (physical or virtual) resources in the 
136     experimental platforms.
137     
138     """
139     _rtype = "Resource"
140     _attributes = None
141     _traces = None
142     _help = None
143     _platform = None
144     _reschedule_delay = "0.5s"
145
146     @classmethod
147     def _register_attribute(cls, attr):
148         """ Resource subclasses will invoke this method to add a 
149         resource attribute
150
151         """
152         
153         cls._attributes[attr.name] = attr
154
155     @classmethod
156     def _remove_attribute(cls, name):
157         """ Resource subclasses will invoke this method to remove a 
158         resource attribute
159
160         """
161         
162         del cls._attributes[name]
163
164     @classmethod
165     def _register_trace(cls, trace):
166         """ Resource subclasses will invoke this method to add a 
167         resource trace
168
169         """
170         
171         cls._traces[trace.name] = trace
172
173     @classmethod
174     def _remove_trace(cls, name):
175         """ Resource subclasses will invoke this method to remove a 
176         resource trace
177
178         """
179         
180         del cls._traces[name]
181
182     @classmethod
183     def _register_attributes(cls):
184         """ Resource subclasses will invoke this method to register
185         resource attributes.
186
187         This method should be overriden in the RMs that define
188         attributes.
189
190         """
191         critical = Attribute("critical", 
192                 "Defines whether the resource is critical. "
193                 "A failure on a critical resource will interrupt "
194                 "the experiment. ",
195                 type = Types.Bool,
196                 default = True,
197                 flags = Flags.Design)
198         hard_release = Attribute("hardRelease", 
199                 "Forces removal of all result files and directories associated "
200                 "to the RM upon resource release. After release the RM will "
201                 "be removed from the EC and the results will not longer be "
202                 "accessible",
203                 type = Types.Bool,
204                 default = False,
205                 flags = Flags.Design)
206
207         cls._register_attribute(critical)
208         cls._register_attribute(hard_release)
209         
210     @classmethod
211     def _register_traces(cls):
212         """ Resource subclasses will invoke this method to register
213         resource traces
214
215         This method should be overridden in the RMs that define traces.
216         
217         """
218         
219         pass
220
221     @classmethod
222     def _clsinit(cls):
223         """ ResourceManager classes have different attributes and traces.
224         Attribute and traces are stored in 'class attribute' dictionaries.
225         When a new ResourceManager class is created, the _clsinit method is 
226         called to create a new instance of those dictionaries and initialize 
227         them.
228         
229         The _clsinit method is called by the clsinit decorator method.
230         
231         """
232         
233         # static template for resource attributes
234         cls._attributes = dict()
235         cls._register_attributes()
236
237         # static template for resource traces
238         cls._traces = dict()
239         cls._register_traces()
240
241     @classmethod
242     def _clsinit_copy(cls):
243         """ Same as _clsinit, except that after creating new instances of the
244         dictionaries it copies all the attributes and traces from the parent 
245         class.
246         
247         The _clsinit_copy method is called by the clsinit_copy decorator method.
248         
249         """
250         # static template for resource attributes
251         cls._attributes = copy.deepcopy(cls._attributes)
252         cls._register_attributes()
253
254         # static template for resource traces
255         cls._traces = copy.deepcopy(cls._traces)
256         cls._register_traces()
257
258     @classmethod
259     def get_rtype(cls):
260         """ Returns the type of the Resource Manager
261
262         """
263         return cls._rtype
264
265     @classmethod
266     def get_attributes(cls):
267         """ Returns a copy of the attributes
268
269         """
270         return copy.deepcopy(list(cls._attributes.values()))
271
272     @classmethod
273     def get_attribute(cls, name):
274         """ Returns a copy of the attribute with name 'name'
275
276         """
277         return copy.deepcopy(cls._attributes[name])
278
279     @classmethod
280     def get_traces(cls):
281         """ Returns a copy of the traces
282
283         """
284         return copy.deepcopy(list(cls._traces.values()))
285
286     @classmethod
287     def get_help(cls):
288         """ Returns the description of the type of Resource
289
290         """
291         return cls._help
292
293     @classmethod
294     def get_platform(cls):
295         """ Returns the identified of the platform (i.e. testbed type)
296         for the Resource
297
298         """
299         return cls._platform
300
301     @classmethod
302     def get_global(cls, name):
303         """ Returns the value of a global attribute
304             Global attribute meaning an attribute for 
305             all the resources from a rtype
306
307         :param name: Name of the attribute
308         :type name: str
309         :rtype: str
310         """
311         global_attr = cls._attributes[name]
312         return global_attr.value
313
314     @classmethod
315     def set_global(cls, name, value):
316         """ Set value for a global attribute
317
318         :param name: Name of the attribute
319         :type name: str
320         :param name: Value of the attribute
321         :type name: str
322         """
323         global_attr = cls._attributes[name]
324         global_attr.value = value
325         return value
326
327     def __init__(self, ec, guid):
328         super(ResourceManager, self).__init__(self.get_rtype())
329         
330         self._guid = guid
331         self._ec = weakref.ref(ec)
332         self._connections = set()
333         self._conditions = dict() 
334
335         # the resource instance gets a copy of all attributes
336         self._attrs = copy.deepcopy(self._attributes)
337
338         # the resource instance gets a copy of all traces
339         self._trcs = copy.deepcopy(self._traces)
340
341         # Each resource is placed on a deployment group by the EC
342         # during deployment
343         self.deployment_group = None
344
345         self._start_time = None
346         self._stop_time = None
347         self._discover_time = None
348         self._reserved_time = None
349         self._provision_time = None
350         self._ready_time = None
351         self._release_time = None
352         self._failed_time = None
353
354         self._state = ResourceState.NEW
355
356         # instance lock to synchronize exclusive state change methods (such
357         # as deploy and release methods), in order to prevent them from being 
358         # executed at the same time and corrupt internal resource state
359         self._release_lock = threading.Lock()
360
361     @property
362     def guid(self):
363         """ Returns the global unique identifier of the RM """
364         return self._guid
365
366     @property
367     def ec(self):
368         """ Returns the Experiment Controller of the RM """
369         return self._ec()
370
371     @property
372     def connections(self):
373         """ Returns the set of guids of connected RMs """
374         return self._connections
375
376     @property
377     def conditions(self):
378         """ Returns the conditions to which the RM is subjected to.
379         
380         This method returns a dictionary of conditions lists indexed by
381         a ResourceAction.
382         
383         """
384         return self._conditions
385
386     @property
387     def start_time(self):
388         """ Returns the start time of the RM as a timestamp """
389         return self._start_time
390
391     @property
392     def stop_time(self):
393         """ Returns the stop time of the RM as a timestamp """
394         return self._stop_time
395
396     @property
397     def discover_time(self):
398         """ Returns the discover time of the RM as a timestamp """
399         return self._discover_time
400
401     @property
402     def reserved_time(self):
403         """ Returns the reserved time of the RM as a timestamp """
404         return self._reserved_time
405
406     @property
407     def provision_time(self):
408         """ Returns the provision time of the RM as a timestamp """
409         return self._provision_time
410
411     @property
412     def ready_time(self):
413         """ Returns the deployment time of the RM as a timestamp """
414         return self._ready_time
415
416     @property
417     def release_time(self):
418         """ Returns the release time of the RM as a timestamp """
419         return self._release_time
420
421     @property
422     def failed_time(self):
423         """ Returns the time failure occurred for the RM as a timestamp """
424         return self._failed_time
425
426     @property
427     def state(self):
428         """ Get the current state of the RM """
429         return self._state
430
431     @property
432     def reschedule_delay(self):
433         """ Returns default reschedule delay """
434         return self._reschedule_delay
435
436     def log_message(self, msg):
437         """ Returns the log message formatted with added information.
438
439         :param msg: text message
440         :type msg: str
441         :rtype: str
442
443         """
444         return " %s guid %d - %s " % (self._rtype, self.guid, msg)
445
446     def register_connection(self, guid):
447         """ Registers a connection to the RM identified by guid
448
449         This method should not be overridden. Specific functionality
450         should be added in the do_connect method.
451
452         :param guid: Global unique identified of the RM to connect to
453         :type guid: int
454
455         """
456         if self.valid_connection(guid):
457             self.do_connect(guid)
458             self._connections.add(guid)
459
460     def unregister_connection(self, guid):
461         """ Removes a registered connection to the RM identified by guid
462         
463         This method should not be overridden. Specific functionality
464         should be added in the do_disconnect method.
465
466         :param guid: Global unique identified of the RM to connect to
467         :type guid: int
468
469         """
470         if guid in self._connections:
471             self.do_disconnect(guid)
472             self._connections.remove(guid)
473
474     @failtrap
475     def discover(self):
476         """ Performs resource discovery.
477         
478         This  method is responsible for selecting an individual resource
479         matching user requirements.
480
481         This method should not be overridden directly. Specific functionality
482         should be added in the do_discover method.
483
484         """
485         with self._release_lock:
486             if self._state != ResourceState.RELEASED:
487                 self.do_discover()
488
489     @failtrap
490     def reserve(self):
491         """ Performs resource reserve.
492         
493         This  method is responsible for reserving an individual resource
494         matching user requirements.
495
496         This method should not be overridden directly. Specific functionality
497         should be added in the do_reserved method.
498
499         """
500         with self._release_lock:
501             if self._state != ResourceState.RELEASED:
502                 self.do_reserve()
503
504     @failtrap
505     def provision(self):
506         """ Performs resource provisioning.
507
508         This  method is responsible for provisioning one resource.
509         After this method has been successfully invoked, the resource
510         should be accessible/controllable by the RM.
511
512         This method should not be overridden directly. Specific functionality
513         should be added in the do_provision method.
514
515         """
516         with self._release_lock:
517             if self._state != ResourceState.RELEASED:
518                 self.do_provision()
519
520     @failtrap
521     def configure(self):
522         """ Performs resource configuration.
523
524         This  method is responsible for configuring one resource.
525         After this method has been successfully invoked, the resource
526         should be set up to start the experimentation.
527
528         This method should not be overridden directly. Specific functionality
529         should be added in the do_configure method.
530
531         """
532         with self._release_lock:
533             if self._state != ResourceState.RELEASED:
534                 self.do_configure()
535
536     @failtrap
537     def start(self):
538         """ Starts the RM (e.g. launch remote process).
539     
540         There is no standard start behavior. Some RMs will not need to perform
541         any actions upon start.
542
543         This method should not be overridden directly. Specific functionality
544         should be added in the do_start method.
545
546         """
547
548         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
549             self.error("Wrong state %s for start" % self.state)
550             return
551
552         with self._release_lock:
553             if self._state != ResourceState.RELEASED:
554                 self.do_start()
555
556     @failtrap
557     def stop(self):
558         """ Interrupts the RM, stopping any tasks the RM was performing.
559      
560         There is no standard stop behavior. Some RMs will not need to perform
561         any actions upon stop.
562     
563         This method should not be overridden directly. Specific functionality
564         should be added in the do_stop method.
565       
566         """
567         if not self.state in [ResourceState.STARTED]:
568             self.error("Wrong state %s for stop" % self.state)
569             return
570         
571         with self._release_lock:
572             self.do_stop()
573
574     @failtrap
575     def deploy(self):
576         """ Execute all steps required for the RM to reach the state READY.
577
578         This method is responsible for deploying the resource (and invoking 
579         the discover and provision methods).
580  
581         This method should not be overridden directly. Specific functionality
582         should be added in the do_deploy method.
583        
584         """
585         if self.state > ResourceState.READY:
586             self.error("Wrong state %s for deploy" % self.state)
587             return
588
589         with self._release_lock:
590             if self._state != ResourceState.RELEASED:
591                 self.do_deploy()
592
593     def release(self):
594         """ Perform actions to free resources used by the RM.
595   
596         This  method is responsible for releasing resources that were
597         used during the experiment by the RM.
598
599         This method should not be overridden directly. Specific functionality
600         should be added in the do_release method.
601       
602         """
603         with self._release_lock:
604             try:
605                 self.do_release()
606             except:
607                 self.set_released()
608
609                 import traceback
610                 err = traceback.format_exc()
611                 msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
612                         self._rtype, self.guid, err)
613                 logger = Logger(self._rtype)
614                 logger.debug(msg)
615
616     def fail(self):
617         """ Sets the RM to state FAILED.
618
619         This method should not be overridden directly. Specific functionality
620         should be added in the do_fail method.
621
622         """
623         with self._release_lock:
624             if self._state != ResourceState.RELEASED:
625                 self.do_fail()
626
627     def set(self, name, value):
628         """ Set the value of the attribute
629
630         :param name: Name of the attribute
631         :type name: str
632         :param name: Value of the attribute
633         :type name: str
634         """
635         attr = self._attrs[name]
636         attr.value = value
637         return value
638
639     def get(self, name):
640         """ Returns the value of the attribute
641
642         :param name: Name of the attribute
643         :type name: str
644         :rtype: str
645         """
646         attr = self._attrs[name]
647
648         """
649         A.Q. Commenting due to performance impact
650         if attr.has_flag(Flags.Global):
651             self.warning( "Attribute %s is global. Use get_global instead." % name)
652         """
653             
654         return attr.value
655
656     def has_changed(self, name):
657         """ Returns the True is the value of the attribute
658             has been modified by the user.
659
660         :param name: Name of the attribute
661         :type name: str
662         :rtype: str
663         """
664         attr = self._attrs[name]
665         return attr.has_changed
666
667     def has_flag(self, name, flag):
668         """ Returns true if the attribute has the flag 'flag'
669
670         :param flag: Flag to be checked
671         :type flag: Flags
672         """
673         attr = self._attrs[name]
674         return attr.has_flag(flag)
675
676     def has_attribute(self, name):
677         """ Returns true if the RM has an attribute with name
678
679         :param name: name of the attribute
680         :type name: string
681         """
682         return name in self._attrs
683
684     def enable_trace(self, name):
685         """ Explicitly enable trace generation
686
687         :param name: Name of the trace
688         :type name: str
689         """
690         trace = self._trcs[name]
691         trace.enabled = True
692     
693     def trace_enabled(self, name):
694         """Returns True if trace is enables 
695
696         :param name: Name of the trace
697         :type name: str
698         """
699         trace = self._trcs[name]
700         return trace.enabled
701  
702     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
703         """ Get information on collected trace
704
705         :param name: Name of the trace
706         :type name: str
707
708         :param attr: Can be one of:
709                          - TraceAttr.ALL (complete trace content), 
710                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
711                          - TraceAttr.PATH (full path to the trace file),
712                          - TraceAttr.SIZE (size of trace file). 
713         :type attr: str
714
715         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
716         :type name: int
717
718         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
719         :type name: int
720
721         :rtype: str
722         """
723         pass
724
725     def register_condition(self, action, group, state, time = None):
726         """ Registers a condition on the resource manager to allow execution 
727         of 'action' only after 'time' has elapsed from the moment all resources 
728         in 'group' reached state 'state'
729
730         :param action: Action to restrict to condition (either 'START' or 'STOP')
731         :type action: str
732         :param group: Group of RMs to wait for (list of guids)
733         :type group: int or list of int
734         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
735         :type state: str
736         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
737         :type time: str
738
739         """
740
741         if not action in self.conditions:
742             self._conditions[action] = list()
743         
744         conditions = self.conditions.get(action)
745
746         # For each condition to register a tuple of (group, state, time) is 
747         # added to the 'action' list
748         if not isinstance(group, list):
749             group = [group]
750
751         conditions.append((group, state, time))
752
753     def unregister_condition(self, group, action = None):
754         """ Removed conditions for a certain group of guids
755
756         :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY')
757         :type action: str
758
759         :param group: Group of RMs to wait for (list of guids)
760         :type group: int or list of int
761
762         """
763         # For each condition a tuple of (group, state, time) is 
764         # added to the 'action' list
765         if not isinstance(group, list):
766             group = [group]
767
768         for act, conditions in self.conditions.items():
769             if action and act != action:
770                 continue
771
772             for condition in list(conditions):
773                 (grp, state, time) = condition
774
775                 # If there is an intersection between grp and group,
776                 # then remove intersected elements
777                 intsec = set(group).intersection(set(grp))
778                 if intsec:
779                     idx = conditions.index(condition)
780                     newgrp = set(grp)
781                     newgrp.difference_update(intsec)
782                     conditions[idx] = (newgrp, state, time)
783                  
784     def get_connected(self, rtype = None):
785         """ Returns the list of RM with the type 'rtype'
786
787         :param rtype: Type of the RM we look for
788         :type rtype: str
789         :return: list of guid
790         """
791         connected = []
792         rclass = ResourceFactory.get_resource_type(rtype)
793         for guid in self.connections:
794             rm = self.ec.get_resource(guid)
795             if not rtype or isinstance(rm, rclass):
796                 connected.append(rm)
797         return connected
798
799     def is_rm_instance(self, rtype):
800         """ Returns True if the RM is instance of 'rtype'
801
802         :param rtype: Type of the RM we look for
803         :type rtype: str
804         :return: True|False
805         """
806         rclass = ResourceFactory.get_resource_type(rtype)
807         if isinstance(self, rclass):
808             return True
809         return False
810
811     @failtrap
812     def _needs_reschedule(self, group, state, time):
813         """ Internal method that verify if 'time' has elapsed since 
814         all elements in 'group' have reached state 'state'.
815
816         :param group: Group of RMs to wait for (list of guids)
817         :type group: int or list of int
818         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
819         :type state: str
820         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
821         :type time: str
822
823         .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
824         If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
825         For the moment, 2m30s is not a correct syntax.
826
827         """
828         reschedule = False
829         delay = self.reschedule_delay 
830
831         # check state and time elapsed on all RMs
832         for guid in group:
833             rm = self.ec.get_resource(guid)
834             
835             # If one of the RMs this resource needs to wait for has FAILED
836             # and is critical we raise an exception
837             if rm.state == ResourceState.FAILED:
838                 if not rm.get('critical'):
839                     continue
840                 msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED"
841                 raise RuntimeError(msg)
842
843             # If the RM state is lower than the requested state we must
844             # reschedule (e.g. if RM is READY but we required STARTED).
845             if rm.state < state:
846                 reschedule = True
847                 break
848
849             # If there is a time restriction, we must verify the
850             # restriction is satisfied 
851             if time:
852                 if state == ResourceState.DISCOVERED:
853                     t = rm.discover_time
854                 elif state == ResourceState.RESERVED:
855                     t = rm.reserved_time
856                 elif state == ResourceState.PROVISIONED:
857                     t = rm.provision_time
858                 elif state == ResourceState.READY:
859                     t = rm.ready_time
860                 elif state == ResourceState.STARTED:
861                     t = rm.start_time
862                 elif state == ResourceState.STOPPED:
863                     t = rm.stop_time
864                 elif state == ResourceState.RELEASED:
865                     t = rm.release_time
866                 else:
867                     break
868
869                 # time already elapsed since RM changed state
870                 waited = "%fs" % tdiffsec(tnow(), t)
871
872                 # time still to wait
873                 wait = tdiffsec(stabsformat(time), stabsformat(waited))
874
875                 if wait > 0.001:
876                     reschedule = True
877                     delay = "%fs" % wait
878                     break
879
880         return reschedule, delay
881
882     def set_with_conditions(self, name, value, group, state, time):
883         """ Set value 'value' on attribute with name 'name' when 'time' 
884         has elapsed since all elements in 'group' have reached state
885         'state'
886
887         :param name: Name of the attribute to set
888         :type name: str
889         :param name: Value of the attribute to set
890         :type name: str
891         :param group: Group of RMs to wait for (list of guids)
892         :type group: int or list of int
893         :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY')
894         :type state: str
895         :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s')
896         :type time: str
897         """
898
899         reschedule = False
900         delay = self.reschedule_delay 
901
902         ## evaluate if set conditions are met
903
904         # only can set with conditions after the RM is started
905         if self.state != ResourceState.STARTED:
906             reschedule = True
907         else:
908             reschedule, delay = self._needs_reschedule(group, state, time)
909
910         if reschedule:
911             callback = functools.partial(self.set_with_conditions, 
912                     name, value, group, state, time)
913             self.ec.schedule(delay, callback)
914         else:
915             self.set(name, value)
916
917     def start_with_conditions(self):
918         """ Starts RM when all the conditions in self.conditions for
919         action 'START' are satisfied.
920
921         """
922         #import pdb;pdb.set_trace()
923
924         reschedule = False
925         delay = self.reschedule_delay 
926
927
928         ## evaluate if conditions to start are met
929         if self.ec.abort:
930             return 
931
932         # Can only start when RM is either STOPPED or READY
933         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
934             reschedule = True
935             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
936         else:
937             start_conditions = self.conditions.get(ResourceAction.START, [])
938             
939             self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
940             
941             # Verify all start conditions are met
942             for (group, state, time) in start_conditions:
943                 # Uncomment for debug
944                 #unmet = []
945                 #for guid in group:
946                 #    rm = self.ec.get_resource(guid)
947                 #    unmet.append((guid, rm._state))
948                 #
949                 #self.debug("---- WAITED STATES ---- %s" % unmet )
950
951                 reschedule, delay = self._needs_reschedule(group, state, time)
952                 if reschedule:
953                     break
954
955         if reschedule:
956             self.ec.schedule(delay, self.start_with_conditions)
957         else:
958             self.debug("----- STARTING ---- ")
959             self.start()
960
961     def stop_with_conditions(self):
962         """ Stops RM when all the conditions in self.conditions for
963         action 'STOP' are satisfied.
964
965         """
966         reschedule = False
967         delay = self.reschedule_delay 
968
969         ## evaluate if conditions to stop are met
970         if self.ec.abort:
971             return 
972
973         # only can stop when RM is STARTED
974         if self.state != ResourceState.STARTED:
975             reschedule = True
976             self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
977         else:
978             self.debug(" ---- STOP CONDITIONS ---- %s" % 
979                     self.conditions.get(ResourceAction.STOP))
980
981             stop_conditions = self.conditions.get(ResourceAction.STOP, []) 
982             for (group, state, time) in stop_conditions:
983                 reschedule, delay = self._needs_reschedule(group, state, time)
984                 if reschedule:
985                     break
986
987         if reschedule:
988             callback = functools.partial(self.stop_with_conditions)
989             self.ec.schedule(delay, callback)
990         else:
991             self.debug(" ----- STOPPING ---- ") 
992             self.stop()
993
994     def deploy_with_conditions(self):
995         """ Deploy RM when all the conditions in self.conditions for
996         action 'READY' are satisfied.
997
998         """
999         reschedule = False
1000         delay = self.reschedule_delay 
1001
1002         ## evaluate if conditions to deploy are met
1003         if self.ec.abort:
1004             return 
1005
1006         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
1007         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED,
1008                 ResourceState.RESERVED, ResourceState.PROVISIONED]:
1009             #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
1010             reschedule = True
1011             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
1012         else:
1013             deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, [])
1014             
1015             self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) 
1016             
1017             # Verify all start conditions are met
1018             for (group, state, time) in deploy_conditions:
1019                 # Uncomment for debug
1020                 #unmet = []
1021                 #for guid in group:
1022                 #    rm = self.ec.get_resource(guid)
1023                 #    unmet.append((guid, rm._state))
1024                 
1025                 #self.debug("---- WAITED STATES ---- %s" % unmet )
1026
1027                 reschedule, delay = self._needs_reschedule(group, state, time)
1028                 if reschedule:
1029                     break
1030
1031         if reschedule:
1032             self.ec.schedule(delay, self.deploy_with_conditions)
1033         else:
1034             self.debug("----- DEPLOYING ---- ")
1035             self.deploy()
1036
1037     def do_connect(self, guid):
1038         """ Performs actions that need to be taken upon associating RMs.
1039         This method should be redefined when necessary in child classes.
1040         """
1041         pass
1042
1043     def do_disconnect(self, guid):
1044         """ Performs actions that need to be taken upon disassociating RMs.
1045         This method should be redefined when necessary in child classes.
1046         """
1047         pass
1048
1049     def valid_connection(self, guid):
1050         """Checks whether a connection with the other RM
1051         is valid.
1052         This method need to be redefined by each new Resource Manager.
1053
1054         :param guid: Guid of the current Resource Manager
1055         :type guid: int
1056         :rtype:  Boolean
1057
1058         """
1059         # TODO: Validate!
1060         return True
1061
1062     def do_discover(self):
1063         self.set_discovered()
1064
1065     def do_reserve(self):
1066         self.set_reserved()
1067
1068     def do_provision(self):
1069         self.set_provisioned()
1070
1071     def do_configure(self):
1072         pass
1073
1074     def do_start(self):
1075         self.set_started()
1076
1077     def do_stop(self):
1078         self.set_stopped()
1079
1080     def do_deploy(self):
1081         self.set_ready()
1082
1083     def do_release(self):
1084         self.set_released()
1085
1086     def do_fail(self):
1087         self.set_failed()
1088         self.ec.inform_failure(self.guid)
1089
1090     def set_started(self, time = None):
1091         """ Mark ResourceManager as STARTED """
1092         self.set_state(ResourceState.STARTED, "_start_time", time)
1093         self.debug("----- STARTED ---- ")
1094
1095     def set_stopped(self, time = None):
1096         """ Mark ResourceManager as STOPPED """
1097         self.set_state(ResourceState.STOPPED, "_stop_time", time)
1098         self.debug("----- STOPPED ---- ")
1099
1100     def set_ready(self, time = None):
1101         """ Mark ResourceManager as READY """
1102         self.set_state(ResourceState.READY, "_ready_time", time)
1103         self.debug("----- READY ---- ")
1104
1105     def set_released(self, time = None):
1106         """ Mark ResourceManager as REALEASED """
1107         self.set_state(ResourceState.RELEASED, "_release_time", time)
1108
1109         msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
1110         logger = Logger(self._rtype)
1111         logger.debug(msg)
1112
1113     def set_failed(self, time = None):
1114         """ Mark ResourceManager as FAILED """
1115         self.set_state(ResourceState.FAILED, "_failed_time", time)
1116
1117         msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
1118         logger = Logger(self._rtype)
1119         logger.debug(msg)
1120
1121     def set_discovered(self, time = None):
1122         """ Mark ResourceManager as DISCOVERED """
1123         self.set_state(ResourceState.DISCOVERED, "_discover_time", time)
1124         self.debug("----- DISCOVERED ---- ")
1125
1126     def set_reserved(self, time = None):
1127         """ Mark ResourceManager as RESERVED """
1128         self.set_state(ResourceState.RESERVED, "_reserved_time", time)
1129         self.debug("----- RESERVED ---- ")
1130
1131     def set_provisioned(self, time = None):
1132         """ Mark ResourceManager as PROVISIONED """
1133         self.set_state(ResourceState.PROVISIONED, "_provision_time", time)
1134         self.debug("----- PROVISIONED ---- ")
1135
1136     def set_state(self, state, state_time_attr, time = None):
1137         """ Set the state of the RM while keeping a trace of the time """
1138
1139         # Ensure that RM state will not change after released
1140         if self._state == ResourceState.RELEASED:
1141             return 
1142
1143         time = time or tnow()
1144         self.set_state_time(state, state_time_attr, time)
1145   
1146     def set_state_time(self, state, state_time_attr, time):
1147         """ Set the time for the RM state change """
1148         setattr(self, state_time_attr, time)
1149         self._state = state
1150
1151 class ResourceFactory(object):
1152     _resource_types = dict()
1153
1154     @classmethod
1155     def resource_types(cls):
1156         """Return the type of the Class"""
1157         return cls._resource_types
1158
1159     @classmethod
1160     def get_resource_type(cls, rtype):
1161         """Return the type of the Class"""
1162         return cls._resource_types.get(rtype)
1163
1164     @classmethod
1165     def register_type(cls, rclass):
1166         """Register a new Ressource Manager"""
1167         cls._resource_types[rclass.get_rtype()] = rclass
1168
1169     @classmethod
1170     def create(cls, rtype, ec, guid):
1171         """Create a new instance of a Resource Manager"""
1172         rclass = cls._resource_types[rtype]
1173         return rclass(ec, guid)
1174
1175 def populate_factory():
1176     """Find and rgister all available RMs
1177     """
1178     # Once the factory is populated, don't repopulate
1179     if not ResourceFactory.resource_types():
1180         for rclass in find_types():
1181             ResourceFactory.register_type(rclass)
1182
1183 def find_types():
1184     """Look into the different folders to find all the 
1185     availables Resources Managers
1186     """
1187
1188     # look in NEPI_SEARCH_PATH
1189     # separated with colons, like usual paths (was SPACE in nepi-3.x)
1190     # add it to search path only if relevant to avoid loading modules in '.'
1191     search_path = set()
1192     environ = os.environ.get("NEPI_SEARCH_PATH", None)
1193     if environ:
1194         search_path = set(environ.split(":"))
1195    
1196     import inspect
1197     import nepi.resources 
1198     path = os.path.dirname(nepi.resources.__file__)
1199     search_path.add(path)
1200
1201     types = set()
1202
1203     for importer, modname, ispkg in pkgutil.walk_packages(search_path, 
1204             prefix = "nepi.resources."):
1205
1206         loader = importer.find_module(modname)
1207         
1208         try:
1209             # Notice: Repeated calls to load_module will act as a reload of the module
1210             if modname in sys.modules:
1211                 module = sys.modules.get(modname)
1212             else:
1213                 module = loader.load_module(modname)
1214
1215             for attrname in dir(module):
1216                 if attrname.startswith("_"):
1217                     continue
1218
1219                 attr = getattr(module, attrname)
1220
1221                 if attr == ResourceManager:
1222                     continue
1223
1224                 if not inspect.isclass(attr):
1225                     continue
1226
1227                 if issubclass(attr, ResourceManager):
1228                     types.add(attr)
1229
1230                     if not modname in sys.modules:
1231                         sys.modules[modname] = module
1232
1233         except:
1234             import traceback
1235             import logging
1236             err = traceback.format_exc()
1237             logger = logging.getLogger("Resource.find_types()")
1238             logger.error("Error while loading Resource Managers %s" % err)
1239
1240     return types
1241