5f34f8268c0da3fc35bf16d7fc21c782bd586bda
[nepi.git] / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 # historical note: this class used to be in util/guid.py but is used only here
106 # FIXME: This class is not thread-safe. Should it be made thread-safe?
107 class GuidGenerator(object):
108     def __init__(self):
109         self._last_guid = 0
110
111     # historical note: this used to be called `next`
112     # which confused 2to3 - and me - while it has
113     # nothing to do at all with the iteration protocol
114     def generate(self, guid = None):
115         if guid == None:
116             guid = self._last_guid + 1
117
118         self._last_guid = self._last_guid if guid <= self._last_guid else guid
119
120         return guid
121
122 class ExperimentController(object):
123     """
124     .. note::
125
126     An experiment, or scenario, is defined by a concrete set of resources,
127     and the behavior, configuration and interconnection of those resources. 
128     The Experiment Description (ED) is a detailed representation of a
129     single experiment. It contains all the necessary information to 
130     allow repeating the experiment. NEPI allows to describe
131     experiments by registering components (resources), configuring them
132     and interconnecting them.
133     
134     A same experiment (scenario) can be executed many times, generating 
135     different results. We call an experiment execution (instance) a 'run'.
136
137     The ExperimentController (EC), is the entity responsible of
138     managing an experiment run. The same scenario can be 
139     recreated (and re-run) by instantiating an EC and recreating 
140     the same experiment description. 
141
142     An experiment is represented as a graph of interconnected
143     resources. A resource is a generic concept in the sense that any
144     component taking part of an experiment, whether physical of
145     virtual, is considered a resource. A resources could be a host, 
146     a virtual machine, an application, a simulator, a IP address.
147
148     A ResourceManager (RM), is the entity responsible for managing a 
149     single resource. ResourceManagers are specific to a resource
150     type (i.e. An RM to control a Linux application will not be
151     the same as the RM used to control a ns-3 simulation).
152     To support a new type of resource, a new RM must be implemented. 
153     NEPI already provides a variety of RMs to control basic resources, 
154     and new can be extended from the existing ones.
155
156     Through the EC interface the user can create ResourceManagers (RMs),
157     configure them and interconnect them, to describe an experiment.
158     Describing an experiment through the EC does not run the experiment.
159     Only when the 'deploy()' method is invoked on the EC, the EC will take 
160     actions to transform the 'described' experiment into a 'running' experiment.
161
162     While the experiment is running, it is possible to continue to
163     create/configure/connect RMs, and to deploy them to involve new
164     resources in the experiment (this is known as 'interactive' deployment).
165     
166     An experiments in NEPI is identified by a string id, 
167     which is either given by the user, or automatically generated by NEPI.  
168     The purpose of this identifier is to separate files and results that 
169     belong to different experiment scenarios. 
170     However, since a same 'experiment' can be run many times, the experiment
171     id is not enough to identify an experiment instance (run).
172     For this reason, the ExperimentController has two identifier, the 
173     exp_id, which can be re-used in different ExperimentController,
174     and the run_id, which is unique to one ExperimentController instance, and
175     is automatically generated by NEPI.
176    
177     """
178
179     @classmethod
180     def load(cls, filepath, format = SFormats.XML):
181         serializer = ECSerializer()
182         ec = serializer.load(filepath)
183         return ec
184
185     def __init__(self, exp_id = None, local_dir = None, persist = False,
186             fm = None, add_node_callback = None, add_edge_callback = None, 
187             **kwargs):
188         """ ExperimentController entity to model an execute a network 
189         experiment.
190         
191         :param exp_id: Human readable name to identify the experiment
192         :type exp_id: str
193
194         :param local_dir: Path to local directory where to store experiment
195             related files
196         :type local_dir: str
197
198         :param persist: Save an XML description of the experiment after 
199         completion at local_dir
200         :type persist: bool
201
202         :param fm: FailureManager object. If None is given, the default 
203             FailureManager class will be used
204         :type fm: FailureManager
205
206         :param add_node_callback: Callback to invoke for node instantiation
207         when automatic topology creation mode is used 
208         :type add_node_callback: function
209
210         :param add_edge_callback: Callback to invoke for edge instantiation 
211         when automatic topology creation mode is used 
212         :type add_edge_callback: function
213
214         """
215         super(ExperimentController, self).__init__()
216
217         # Logging
218         self._logger = logging.getLogger("ExperimentController")
219
220         # Run identifier. It identifies a concrete execution instance (run) 
221         # of an experiment.
222         # Since a same experiment (same configuration) can be executed many 
223         # times, this run_id permits to separate result files generated on 
224         # different experiment executions
225         self._run_id = tsformat()
226
227         # Experiment identifier. Usually assigned by the user
228         # Identifies the experiment scenario (i.e. configuration, 
229         # resources used, etc)
230         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
231
232         # Local path where to store experiment related files (results, etc)
233         if not local_dir:
234             local_dir = tempfile.gettempdir() # /tmp
235
236         self._local_dir = local_dir
237         self._exp_dir = os.path.join(local_dir, self.exp_id)
238         self._run_dir = os.path.join(self.exp_dir, self.run_id)
239
240         # If True persist the experiment controller in XML format, after completion
241         self._persist = persist
242
243         # generator of globally unique ids
244         self._guid_generator = GuidGenerator()
245         
246         # Resource managers
247         self._resources = dict()
248
249         # Scheduler. It a queue that holds tasks scheduled for
250         # execution, and yields the next task to be executed 
251         # ordered by execution and arrival time
252         self._scheduler = HeapScheduler()
253
254         # Tasks
255         self._tasks = dict()
256
257         # RM groups (for deployment) 
258         self._groups = dict()
259
260         # generator of globally unique id for groups
261         self._group_id_generator = GuidGenerator()
262
263         # Flag to stop processing thread
264         self._stop = False
265     
266         # Entity in charge of managing system failures
267         if not fm:
268             self._fm = FailureManager()
269         self._fm.set_ec(self)
270
271         # EC state
272         self._state = ECState.RUNNING
273
274         # Automatically construct experiment description 
275         self._netgraph = None
276         if add_node_callback or add_edge_callback or kwargs.get("topology"):
277             self._build_from_netgraph(add_node_callback, add_edge_callback, 
278                     **kwargs)
279
280         # The runner is a pool of threads used to parallelize 
281         # execution of tasks
282         self._nthreads = 20
283         self._runner = None
284
285         # Event processing thread
286         self._cond = threading.Condition()
287         self._thread = threading.Thread(target = self._process)
288         self._thread.setDaemon(True)
289         self._thread.start()
290         
291     @property
292     def logger(self):
293         """ Returns the logger instance of the Experiment Controller
294
295         """
296         return self._logger
297
298     @property
299     def fm(self):
300         """ Returns the failure manager
301
302         """
303
304         return self._fm
305
306     @property
307     def failure_level(self):
308         """ Returns the level of FAILURE of th experiment
309
310         """
311
312         return self._fm._failure_level
313
314     @property
315     def ecstate(self):
316         """ Returns the state of the Experiment Controller
317
318         """
319         return self._state
320
321     @property
322     def exp_id(self):
323         """ Returns the experiment id assigned by the user
324
325         """
326         return self._exp_id
327
328     @property
329     def run_id(self):
330         """ Returns the experiment instance (run) identifier (automatically 
331         generated)
332
333         """
334         return self._run_id
335
336     @property
337     def nthreads(self):
338         """ Returns the number of processing nthreads used
339
340         """
341         return self._nthreads
342
343     @property
344     def local_dir(self):
345         """ Root local directory for experiment files
346
347         """
348         return self._local_dir
349
350     @property
351     def exp_dir(self):
352         """ Local directory to store results and other files related to the 
353         experiment.
354
355         """
356         return self._exp_dir
357
358     @property
359     def run_dir(self):
360         """ Local directory to store results and other files related to the 
361         experiment run.
362
363         """
364         return self._run_dir
365
366     @property
367     def persist(self):
368         """ If True, persists the ExperimentController to XML format upon 
369         experiment completion
370
371         """
372         return self._persist
373
374     @property
375     def netgraph(self):
376         """ Return NetGraph instance if experiment description was automatically 
377         generated
378
379         """
380         return self._netgraph
381
382     @property
383     def abort(self):
384         """ Returns True if the experiment has failed and should be interrupted,
385         False otherwise.
386
387         """
388         return self._fm.abort
389
390     def inform_failure(self, guid):
391         """ Reports a failure in a RM to the EC for evaluation
392
393             :param guid: Resource id
394             :type guid: int
395
396         """
397
398         return self._fm.eval_failure(guid)
399
400     def wait_finished(self, guids):
401         """ Blocking method that waits until all RMs in the 'guids' list 
402         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
403         RELEASED ), or until a failure in the experiment occurs 
404         (i.e. abort == True) 
405         
406             :param guids: List of guids
407             :type guids: list
408
409         """
410
411         def quit():
412             return self.abort
413
414         return self.wait(guids, state = ResourceState.STOPPED, 
415                 quit = quit)
416
417     def wait_started(self, guids):
418         """ Blocking method that waits until all RMs in the 'guids' list 
419         have reached a state >= STARTED, or until a failure in the 
420         experiment occurs (i.e. abort == True) 
421         
422             :param guids: List of guids
423             :type guids: list
424
425         """
426
427         def quit():
428             return self.abort
429
430         return self.wait(guids, state = ResourceState.STARTED, 
431                 quit = quit)
432
433     def wait_released(self, guids):
434         """ Blocking method that waits until all RMs in the 'guids' list 
435         have reached a state == RELEASED, or until the EC fails 
436         
437             :param guids: List of guids
438             :type guids: list
439
440         """
441
442         def quit():
443             return self._state == ECState.FAILED
444
445         return self.wait(guids, state = ResourceState.RELEASED, 
446                 quit = quit)
447
448     def wait_deployed(self, guids):
449         """ Blocking method that waits until all RMs in the 'guids' list 
450         have reached a state >= READY, or until a failure in the 
451         experiment occurs (i.e. abort == True) 
452         
453             :param guids: List of guids
454             :type guids: list
455
456         """
457
458         def quit():
459             return self.abort
460
461         return self.wait(guids, state = ResourceState.READY, 
462                 quit = quit)
463
464     def wait(self, guids, state, quit):
465         """ Blocking method that waits until all RMs in the 'guids' list 
466         have reached a state >= 'state', or until the 'quit' callback
467         yields True
468            
469             :param guids: List of guids
470             :type guids: list
471         
472         """
473         if isinstance(guids, int):
474             guids = [guids]
475
476         # Make a copy to avoid modifying the original guids list
477         guids = list(guids)
478
479         while True:
480             # If there are no more guids to wait for
481             # or the quit function returns True, exit the loop
482             if len(guids) == 0 or quit():
483                 break
484
485             # If a guid reached one of the target states, remove it from list
486             guid = guids.pop()
487             rm = self.get_resource(guid)
488             rstate = rm.state
489             
490             if rstate >= state:
491                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
492                     rm.get_rtype(), guid, rstate, state))
493             else:
494                 # Debug...
495                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
496                     guid, rstate, state))
497
498                 guids.append(guid)
499
500                 time.sleep(0.5)
501
502     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
503         plotter = ECPlotter()
504         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
505                 show = show)
506         return fpath
507
508     def serialize(self, format = SFormats.XML):
509         serializer = ECSerializer()
510         sec = serializer.load(self, format = format)
511         return sec
512
513     def save(self, dirpath = None, format = SFormats.XML):
514         if dirpath == None:
515             dirpath = self.run_dir
516
517         try:
518             os.makedirs(dirpath)
519         except OSError:
520             pass
521
522         serializer = ECSerializer()
523         path = serializer.save(self, dirpath, format = format)
524         return path
525
526     def get_task(self, tid):
527         """ Returns a task by its id
528
529             :param tid: Id of the task
530             :type tid: int
531             
532             :rtype: Task
533             
534         """
535         return self._tasks.get(tid)
536
537     def get_resource(self, guid):
538         """ Returns a registered ResourceManager by its guid
539
540             :param guid: Id of the resource
541             :type guid: int
542             
543             :rtype: ResourceManager
544             
545         """
546         rm = self._resources.get(guid)
547         return rm
548
549     def get_resources_by_type(self, rtype):
550         """ Returns the ResourceManager objects of type rtype
551
552             :param rtype: Resource type
553             :type rtype: string
554             
555             :rtype: list of ResourceManagers
556             
557         """
558         rms = []
559         for guid, rm in self._resources.items():
560             if rm.get_rtype() == rtype: 
561                 rms.append(rm)
562         return rms
563
564     def remove_resource(self, guid):
565         del self._resources[guid]
566
567     @property
568     def resources(self):
569         """ Returns the guids of all ResourceManagers 
570
571             :return: Set of all RM guids
572             :rtype: list
573
574         """
575         keys = list(self._resources.keys())
576
577         return keys
578
579     def filter_resources(self, rtype):
580         """ Returns the guids of all ResourceManagers of type rtype
581
582             :param rtype: Resource type
583             :type rtype: string
584             
585             :rtype: list of guids
586             
587         """
588         rms = []
589         for guid, rm in self._resources.items():
590             if rm.get_rtype() == rtype: 
591                 rms.append(rm.guid)
592         return rms
593
594     def register_resource(self, rtype, guid = None, **keywords):
595         """ Registers a new ResourceManager of type 'rtype' in the experiment
596         
597         This method will assign a new 'guid' for the RM, if no guid
598         is specified.
599
600             :param rtype: Type of the RM
601             :type rtype: str
602
603             :return: Guid of the RM
604             :rtype: int
605             
606         """
607         # Get next available guid
608         # xxx_next_hiccup
609         guid = self._guid_generator.generate(guid)
610         
611         # Instantiate RM
612         rm = ResourceFactory.create(rtype, self, guid)
613
614         # Store RM
615         self._resources[guid] = rm
616
617         ### so we can do something like
618         # node = ec.register_resource("linux::Node",
619         #                             username = user,
620         #                             hostname = host)
621         ### instead of
622         # node = ec.register_resource("linux::Node")
623         # ec.set(node, "username", user)
624         # ec.set(node, "hostname", host)
625
626         for name, value in keywords.items():
627             self.set(guid, name, value)
628
629         return guid
630
631     def get_attributes(self, guid):
632         """ Returns all the attributes of the RM with guid 'guid'
633
634             :param guid: Guid of the RM
635             :type guid: int
636
637             :return: List of attributes
638             :rtype: list
639
640         """
641         rm = self.get_resource(guid)
642         return rm.get_attributes()
643
644     def get_attribute(self, guid, name):
645         """ Returns the attribute 'name' of the RM with guid 'guid'
646
647             :param guid: Guid of the RM
648             :type guid: int
649
650             :param name: Name of the attribute
651             :type name: str
652
653             :return: The attribute with name 'name'
654             :rtype: Attribute
655
656         """
657         rm = self.get_resource(guid)
658         return rm.get_attribute(name)
659
660     def register_connection(self, guid1, guid2):
661         """ Registers a connection between a RM with guid 'guid1'
662         and another RM with guid 'guid2'. 
663     
664         The order of the in which the two guids are provided is not
665         important, since the connection relationship is symmetric.
666
667             :param guid1: First guid to connect
668             :type guid1: ResourceManager
669
670             :param guid2: Second guid to connect
671             :type guid: ResourceManager
672
673         """
674         rm1 = self.get_resource(guid1)
675         rm2 = self.get_resource(guid2)
676
677         rm1.register_connection(guid2)
678         rm2.register_connection(guid1)
679
680     def register_condition(self, guids1, action, guids2, state,
681             time = None):
682         """ Registers an action START, STOP or DEPLOY for all RM on list
683         guids1 to occur at time 'time' after all elements in list guids2 
684         have reached state 'state'.
685
686             :param guids1: List of guids of RMs subjected to action
687             :type guids1: list
688
689             :param action: Action to perform (either START, STOP or DEPLOY)
690             :type action: ResourceAction
691
692             :param guids2: List of guids of RMs to we waited for
693             :type guids2: list
694
695             :param state: State to wait for on RMs of list guids2 (STARTED,
696                 STOPPED, etc)
697             :type state: ResourceState
698
699             :param time: Time to wait after guids2 has reached status 
700             :type time: string
701
702         """
703         if isinstance(guids1, int):
704             guids1 = [guids1]
705         if isinstance(guids2, int):
706             guids2 = [guids2]
707
708         for guid1 in guids1:
709             rm = self.get_resource(guid1)
710             rm.register_condition(action, guids2, state, time)
711
712     def enable_trace(self, guid, name):
713         """ Enables a trace to be collected during the experiment run
714
715             :param name: Name of the trace
716             :type name: str
717
718         """
719         rm = self.get_resource(guid)
720         rm.enable_trace(name)
721
722     def trace_enabled(self, guid, name):
723         """ Returns True if the trace of name 'name' is enabled
724
725             :param name: Name of the trace
726             :type name: str
727
728         """
729         rm = self.get_resource(guid)
730         return rm.trace_enabled(name)
731
732     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
733         """ Returns information on a collected trace, the trace stream or 
734         blocks (chunks) of the trace stream
735
736             :param name: Name of the trace
737             :type name: str
738
739             :param attr: Can be one of:
740                          - TraceAttr.ALL (complete trace content), 
741                          - TraceAttr.STREAM (block in bytes to read starting 
742                                 at offset),
743                          - TraceAttr.PATH (full path to the trace file),
744                          - TraceAttr.SIZE (size of trace file). 
745             :type attr: str
746
747             :param block: Number of bytes to retrieve from trace, when attr is 
748                 TraceAttr.STREAM 
749             :type name: int
750
751             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
752             :type name: int
753
754             :rtype: str
755
756         """
757         rm = self.get_resource(guid)
758         return rm.trace(name, attr, block, offset)
759
760     def get_traces(self, guid):
761         """ Returns the list of the trace names of the RM with guid 'guid'
762
763             :param guid: Guid of the RM
764             :type guid: int
765
766             :return: List of trace names
767             :rtype: list
768
769         """
770         rm = self.get_resource(guid)
771         return rm.get_traces()
772
773
774     def discover(self, guid):
775         """ Discovers an available resource matching the criteria defined
776         by the RM with guid 'guid', and associates that resource to the RM
777
778         Not all RM types require (or are capable of) performing resource 
779         discovery. For the RM types which are not capable of doing so, 
780         invoking this method does not have any consequences. 
781
782             :param guid: Guid of the RM
783             :type guid: int
784
785         """
786         rm = self.get_resource(guid)
787         return rm.discover()
788
789     def provision(self, guid):
790         """ Provisions the resource associated to the RM with guid 'guid'.
791
792         Provisioning means making a resource 'accessible' to the user. 
793         Not all RM types require (or are capable of) performing resource 
794         provisioning. For the RM types which are not capable of doing so, 
795         invoking this method does not have any consequences. 
796
797             :param guid: Guid of the RM
798             :type guid: int
799
800         """
801         rm = self.get_resource(guid)
802         return rm.provision()
803
804     def get(self, guid, name):
805         """ Returns the value of the attribute with name 'name' on the
806         RM with guid 'guid'
807
808             :param guid: Guid of the RM
809             :type guid: int
810
811             :param name: Name of the attribute 
812             :type name: str
813
814             :return: The value of the attribute with name 'name'
815
816         """
817         rm = self.get_resource(guid)
818         return rm.get(name)
819
820     def set(self, guid, name, value):
821         """ Modifies the value of the attribute with name 'name' on the 
822         RM with guid 'guid'.
823
824             :param guid: Guid of the RM
825             :type guid: int
826
827             :param name: Name of the attribute
828             :type name: str
829
830             :param value: Value of the attribute
831
832         """
833         rm = self.get_resource(guid)
834         rm.set(name, value)
835
836     def get_global(self, rtype, name):
837         """ Returns the value of the global attribute with name 'name' on the
838         RMs of rtype 'rtype'.
839
840             :param guid: Guid of the RM
841             :type guid: int
842
843             :param name: Name of the attribute 
844             :type name: str
845
846             :return: The value of the attribute with name 'name'
847
848         """
849         rclass = ResourceFactory.get_resource_type(rtype)
850         return rclass.get_global(name)
851
852     def set_global(self, rtype, name, value):
853         """ Modifies the value of the global attribute with name 'name' on the 
854         RMs of with rtype 'rtype'.
855
856             :param guid: Guid of the RM
857             :type guid: int
858
859             :param name: Name of the attribute
860             :type name: str
861
862             :param value: Value of the attribute
863
864         """
865         rclass = ResourceFactory.get_resource_type(rtype)
866         return rclass.set_global(name, value)
867
868     def state(self, guid, hr = False):
869         """ Returns the state of a resource
870
871             :param guid: Resource guid
872             :type guid: integer
873
874             :param hr: Human readable. Forces return of a 
875                 status string instead of a number 
876             :type hr: boolean
877
878         """
879         rm = self.get_resource(guid)
880         state = rm.state
881
882         if hr:
883             return ResourceState2str.get(state)
884
885         return state
886
887     def stop(self, guid):
888         """ Stops the RM with guid 'guid'
889
890         Stopping a RM means that the resource it controls will
891         no longer take part of the experiment.
892
893             :param guid: Guid of the RM
894             :type guid: int
895
896         """
897         rm = self.get_resource(guid)
898         return rm.stop()
899
900     def start(self, guid):
901         """ Starts the RM with guid 'guid'
902
903         Starting a RM means that the resource it controls will
904         begin taking part of the experiment.
905
906             :param guid: Guid of the RM
907             :type guid: int
908
909         """
910         rm = self.get_resource(guid)
911         return rm.start()
912
913     def get_start_time(self, guid):
914         """ Returns the start time of the RM as a timestamp """
915         rm = self.get_resource(guid)
916         return rm.start_time
917
918     def get_stop_time(self, guid):
919         """ Returns the stop time of the RM as a timestamp """
920         rm = self.get_resource(guid)
921         return rm.stop_time
922
923     def get_discover_time(self, guid):
924         """ Returns the discover time of the RM as a timestamp """
925         rm = self.get_resource(guid)
926         return rm.discover_time
927
928     def get_provision_time(self, guid):
929         """ Returns the provision time of the RM as a timestamp """
930         rm = self.get_resource(guid)
931         return rm.provision_time
932
933     def get_ready_time(self, guid):
934         """ Returns the deployment time of the RM as a timestamp """
935         rm = self.get_resource(guid)
936         return rm.ready_time
937
938     def get_release_time(self, guid):
939         """ Returns the release time of the RM as a timestamp """
940         rm = self.get_resource(guid)
941         return rm.release_time
942
943     def get_failed_time(self, guid):
944         """ Returns the time failure occured for the RM as a timestamp """
945         rm = self.get_resource(guid)
946         return rm.failed_time
947
948     def set_with_conditions(self, name, value, guids1, guids2, state,
949             time = None):
950         """ Modifies the value of attribute with name 'name' on all RMs 
951         on the guids1 list when time 'time' has elapsed since all 
952         elements in guids2 list have reached state 'state'.
953
954             :param name: Name of attribute to set in RM
955             :type name: string
956
957             :param value: Value of attribute to set in RM
958             :type name: string
959
960             :param guids1: List of guids of RMs subjected to action
961             :type guids1: list
962
963             :param action: Action to register (either START or STOP)
964             :type action: ResourceAction
965
966             :param guids2: List of guids of RMs to we waited for
967             :type guids2: list
968
969             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
970             :type state: ResourceState
971
972             :param time: Time to wait after guids2 has reached status 
973             :type time: string
974
975         """
976         if isinstance(guids1, int):
977             guids1 = [guids1]
978         if isinstance(guids2, int):
979             guids2 = [guids2]
980
981         for guid1 in guids1:
982             rm = self.get_resource(guid)
983             rm.set_with_conditions(name, value, guids2, state, time)
984
985     def deploy(self, guids = None, wait_all_ready = True, group = None):
986         """ Deploys all ResourceManagers in the guids list. 
987         
988         If the argument 'guids' is not given, all RMs with state NEW
989         are deployed.
990
991             :param guids: List of guids of RMs to deploy
992             :type guids: list
993
994             :param wait_all_ready: Wait until all RMs are ready in
995                 order to start the RMs
996             :type guid: int
997
998             :param group: Id of deployment group in which to deploy RMs
999             :type group: int
1000
1001         """
1002         self.logger.debug(" ------- DEPLOY START ------ ")
1003
1004         if not guids:
1005             # If no guids list was passed, all 'NEW' RMs will be deployed
1006             guids = []
1007             for guid, rm in self._resources.items():
1008                 if rm.state == ResourceState.NEW:
1009                     guids.append(guid)
1010                 
1011         if isinstance(guids, int):
1012             guids = [guids]
1013
1014         # Create deployment group
1015         # New guids can be added to a same deployment group later on
1016         new_group = False
1017         if not group:
1018             new_group = True
1019             # xxx_next_hiccup
1020             group = self._group_id_generator.generate()
1021
1022         if group not in self._groups:
1023             self._groups[group] = []
1024
1025         self._groups[group].extend(guids)
1026
1027         def wait_all_and_start(group):
1028             # Function that checks if all resources are READY
1029             # before scheduling a start_with_conditions for each RM
1030             reschedule = False
1031             
1032             # Get all guids in group
1033             guids = self._groups[group]
1034
1035             for guid in guids:
1036                 if self.state(guid) < ResourceState.READY:
1037                     reschedule = True
1038                     break
1039
1040             if reschedule:
1041                 callback = functools.partial(wait_all_and_start, group)
1042                 self.schedule("1s", callback)
1043             else:
1044                 # If all resources are ready, we schedule the start
1045                 for guid in guids:
1046                     rm = self.get_resource(guid)
1047                     self.schedule("0s", rm.start_with_conditions)
1048
1049                     if rm.conditions.get(ResourceAction.STOP):
1050                         # Only if the RM has STOP conditions we
1051                         # schedule a stop. Otherwise the RM will stop immediately
1052                         self.schedule("0s", rm.stop_with_conditions)
1053
1054         if wait_all_ready and new_group:
1055             # Schedule a function to check that all resources are
1056             # READY, and only then schedule the start.
1057             # This aims at reducing the number of tasks looping in the 
1058             # scheduler. 
1059             # Instead of having many start tasks, we will have only one for 
1060             # the whole group.
1061             callback = functools.partial(wait_all_and_start, group)
1062             self.schedule("0s", callback)
1063
1064         for guid in guids:
1065             rm = self.get_resource(guid)
1066             rm.deployment_group = group
1067             self.schedule("0s", rm.deploy_with_conditions)
1068
1069             if not wait_all_ready:
1070                 self.schedule("0s", rm.start_with_conditions)
1071
1072                 if rm.conditions.get(ResourceAction.STOP):
1073                     # Only if the RM has STOP conditions we
1074                     # schedule a stop. Otherwise the RM will stop immediately
1075                     self.schedule("0s", rm.stop_with_conditions)
1076
1077     def release(self, guids = None):
1078         """ Releases all ResourceManagers in the guids list.
1079
1080         If the argument 'guids' is not given, all RMs registered
1081         in the experiment are released.
1082
1083             :param guids: List of RM guids
1084             :type guids: list
1085
1086         """
1087         if self._state == ECState.RELEASED:
1088             return 
1089
1090         if isinstance(guids, int):
1091             guids = [guids]
1092
1093         if not guids:
1094             guids = self.resources
1095
1096         for guid in guids:
1097             rm = self.get_resource(guid)
1098             self.schedule("0s", rm.release)
1099
1100         self.wait_released(guids)
1101
1102         if self.persist:
1103             self.save()
1104
1105         for guid in guids:
1106             if self.get(guid, "hardRelease"):
1107                 self.remove_resource(guid)\
1108
1109         # Mark the EC state as RELEASED
1110         self._state = ECState.RELEASED
1111         
1112     def shutdown(self):
1113         """ Releases all resources and stops the ExperimentController
1114
1115         """
1116         # If there was a major failure we can't exit gracefully
1117         if self._state == ECState.FAILED:
1118             raise RuntimeError("EC failure. Can not exit gracefully")
1119
1120         # Remove all pending tasks from the scheduler queue
1121         for tid in list(self._scheduler.pending):
1122             self._scheduler.remove(tid)
1123
1124         # Remove pending tasks from the workers queue
1125         self._runner.empty()
1126
1127         self.release()
1128
1129         # Mark the EC state as TERMINATED
1130         self._state = ECState.TERMINATED
1131
1132         # Stop processing thread
1133         self._stop = True
1134
1135         # Notify condition to wake up the processing thread
1136         self._notify()
1137         
1138         if self._thread.is_alive():
1139            self._thread.join()
1140
1141     def schedule(self, date, callback, track = False):
1142         """ Schedules a callback to be executed at time 'date'.
1143
1144             :param date: string containing execution time for the task.
1145                     It can be expressed as an absolute time, using
1146                     timestamp format, or as a relative time matching
1147                     ^\d+.\d+(h|m|s|ms|us)$
1148
1149             :param callback: code to be executed for the task. Must be a
1150                         Python function, and receives args and kwargs
1151                         as arguments.
1152
1153             :param track: if set to True, the task will be retrievable with
1154                     the get_task() method
1155
1156             :return : The Id of the task
1157             :rtype: int
1158             
1159         """
1160         timestamp = stabsformat(date)
1161         task = Task(timestamp, callback)
1162         task = self._scheduler.schedule(task)
1163
1164         if track:
1165             self._tasks[task.id] = task
1166
1167         # Notify condition to wake up the processing thread
1168         self._notify()
1169
1170         return task.id
1171      
1172     def _process(self):
1173         """ Process scheduled tasks.
1174
1175         .. note::
1176         
1177         Tasks are scheduled by invoking the schedule method with a target 
1178         callback and an execution time. 
1179         The schedule method creates a new Task object with that callback 
1180         and execution time, and pushes it into the '_scheduler' queue. 
1181         The execution time and the order of arrival of tasks are used 
1182         to order the tasks in the queue.
1183
1184         The _process method is executed in an independent thread held by 
1185         the ExperimentController for as long as the experiment is running.
1186         This method takes tasks from the '_scheduler' queue in a loop 
1187         and processes them in parallel using multithreading. 
1188         The environmental variable NEPI_NTHREADS can be used to control
1189         the number of threads used to process tasks. The default value is 
1190         50.
1191
1192         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1193         This object keeps a pool of threads (workers), and a queue of tasks
1194         scheduled for 'immediate' execution. 
1195         
1196         On each iteration, the '_process' loop will take the next task that 
1197         is scheduled for 'future' execution from the '_scheduler' queue, 
1198         and if the execution time of that task is >= to the current time, 
1199         it will push that task into the PR for 'immediate execution'. 
1200         As soon as a worker is free, the PR will assign the next task to
1201         that worker.
1202
1203         Upon receiving a task to execute, each PR worker (thread) will 
1204         invoke the  _execute method of the EC, passing the task as 
1205         argument.         
1206         The _execute method will then invoke task.callback inside a 
1207         try/except block. If an exception is raised by the tasks.callback, 
1208         it will be trapped by the try block, logged to standard error 
1209         (usually the console), and the task will be marked as failed.
1210
1211         """
1212
1213         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1214         self._runner = ParallelRun(maxthreads = self.nthreads)
1215         self._runner.start()
1216
1217         while not self._stop:
1218             try:
1219                 self._cond.acquire()
1220
1221                 task = next(self._scheduler)
1222                 
1223                 if not task:
1224                     # No task to execute. Wait for a new task to be scheduled.
1225                     self._cond.wait()
1226                 else:
1227                     # The task timestamp is in the future. Wait for timeout 
1228                     # or until another task is scheduled.
1229                     now = tnow()
1230                     if now < task.timestamp:
1231                         # Calculate timeout in seconds
1232                         timeout = tdiffsec(task.timestamp, now)
1233
1234                         # Re-schedule task with the same timestamp
1235                         self._scheduler.schedule(task)
1236                         
1237                         task = None
1238
1239                         # Wait timeout or until a new task awakes the condition
1240                         self._cond.wait(timeout)
1241                
1242                 self._cond.release()
1243
1244                 if task:
1245                     # Process tasks in parallel
1246                     self._runner.put(self._execute, task)
1247             except: 
1248                 import traceback
1249                 err = traceback.format_exc()
1250                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1251
1252                 # Set the EC to FAILED state 
1253                 self._state = ECState.FAILED
1254             
1255                 # Set the FailureManager failure level to EC failure
1256                 self._fm.set_ec_failure()
1257
1258         self.logger.debug("Exiting the task processing loop ... ")
1259         
1260         self._runner.sync()
1261         self._runner.destroy()
1262
1263     def _execute(self, task):
1264         """ Executes a single task. 
1265
1266             :param task: Object containing the callback to execute
1267             :type task: Task
1268
1269         """
1270         try:
1271             # Invoke callback
1272             task.result = task.callback()
1273             task.status = TaskStatus.DONE
1274         except:
1275             import traceback
1276             err = traceback.format_exc()
1277             task.result = err
1278             task.status = TaskStatus.ERROR
1279             
1280             self.logger.error("Error occurred while executing task: %s" % err)
1281
1282     def _notify(self):
1283         """ Awakes the processing thread if it is blocked waiting 
1284         for new tasks to arrive
1285         
1286         """
1287         self._cond.acquire()
1288         self._cond.notify()
1289         self._cond.release()
1290
1291     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1292             **kwargs):
1293         """ Automates experiment description using a NetGraph instance.
1294         """
1295         self._netgraph = NetGraph(**kwargs)
1296
1297         if add_node_callback:
1298             ### Add resources to the EC
1299             for nid in self.netgraph.nodes():
1300                 add_node_callback(self, nid)
1301
1302         if add_edge_callback:
1303             #### Add connections between resources
1304             for nid1, nid2 in self.netgraph.edges():
1305                 add_edge_callback(self, nid1, nid2)
1306