5664d2ff09b1ec9e79020cdd66befab8648fb38f
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Describes the system failure state """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52
53     """
54
55     def __init__(self, ec):
56         self._ec = weakref.ref(ec)
57         self._failure_level = FailureLevel.OK
58         self._abort = False
59
60     @property
61     def ec(self):
62         """ Returns the ExperimentController associated to this FailureManager 
63         
64         """
65         
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         if self._failure_level == FailureLevel.OK:
74             rm = self.ec.get_resource(guid)
75             state = rm.state
76             critical = rm.get("critical")
77
78             if state == ResourceState.FAILED and critical:
79                 self._failure_level = FailureLevel.RM_FAILURE
80                 self._abort = True
81                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
82                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
83
84     def set_ec_failure(self):
85         self._failure_level = FailureLevel.EC_FAILURE
86
87 class ECState(object):
88     """ Possible states for an ExperimentController
89    
90     """
91     RUNNING = 1
92     FAILED = 2
93     TERMINATED = 3
94
95 class ExperimentController(object):
96     """
97     .. class:: Class Args :
98       
99         :param exp_id: Human readable identifier for the experiment scenario. 
100         :type exp_id: str
101
102     .. note::
103
104     An experiment, or scenario, is defined by a concrete set of resources,
105     and the behavior, configuration and interconnection of those resources. 
106     The Experiment Description (ED) is a detailed representation of a
107     single experiment. It contains all the necessary information to 
108     allow repeating the experiment. NEPI allows to describe
109     experiments by registering components (resources), configuring them
110     and interconnecting them.
111     
112     A same experiment (scenario) can be executed many times, generating 
113     different results. We call an experiment execution (instance) a 'run'.
114
115     The ExperimentController (EC), is the entity responsible of
116     managing an experiment run. The same scenario can be 
117     recreated (and re-run) by instantiating an EC and recreating 
118     the same experiment description. 
119
120     An experiment is represented as a graph of interconnected
121     resources. A resource is a generic concept in the sense that any
122     component taking part of an experiment, whether physical of
123     virtual, is considered a resource. A resources could be a host, 
124     a virtual machine, an application, a simulator, a IP address.
125
126     A ResourceManager (RM), is the entity responsible for managing a 
127     single resource. ResourceManagers are specific to a resource
128     type (i.e. An RM to control a Linux application will not be
129     the same as the RM used to control a ns-3 simulation).
130     To support a new type of resource, a new RM must be implemented. 
131     NEPI already provides a variety of RMs to control basic resources, 
132     and new can be extended from the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154    
155     """
156
157     @classmethod
158     def load(cls, filepath, format = SFormats.XML):
159         serializer = ECSerializer()
160         ec = serializer.load(filepath)
161         return ec
162
163     def __init__(self, exp_id = None, local_dir = None, persist = False,
164             add_node_callback = None, add_edge_callback = None, **kwargs):
165         """ ExperimentController entity to model an execute a network 
166         experiment.
167         
168         :param exp_id: Human readable name to identify the experiment
169         :type exp_id: str
170
171         :param local_dir: Path to local directory where to store experiment
172             related files
173         :type local_dir: str
174
175         :param persist: Save an XML description of the experiment after 
176         completion at local_dir
177         :type persist: bool
178
179         :param add_node_callback: Callback to invoke for node instantiation
180         when automatic topology creation mode is used 
181         :type add_node_callback: function
182
183         :param add_edge_callback: Callback to invoke for edge instantiation 
184         when automatic topology creation mode is used 
185         :type add_edge_callback: function
186
187         """
188         super(ExperimentController, self).__init__()
189
190         # Logging
191         self._logger = logging.getLogger("ExperimentController")
192
193         # Run identifier. It identifies a concrete execution instance (run) 
194         # of an experiment.
195         # Since a same experiment (same configuration) can be executed many 
196         # times, this run_id permits to separate result files generated on 
197         # different experiment executions
198         self._run_id = tsformat()
199
200         # Experiment identifier. Usually assigned by the user
201         # Identifies the experiment scenario (i.e. configuration, 
202         # resources used, etc)
203         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
204
205         # Local path where to store experiment related files (results, etc)
206         if not local_dir:
207             local_dir = tempfile.mkdtemp()
208
209         self._local_dir = local_dir
210         self._exp_dir = os.path.join(local_dir, self.exp_id)
211         self._run_dir = os.path.join(self.exp_dir, self.run_id)
212
213         # If True persist the experiment controller in XML format, after completion
214         self._persist = persist
215
216         # generator of globally unique ids
217         self._guid_generator = guid.GuidGenerator()
218         
219         # Resource managers
220         self._resources = dict()
221
222         # Scheduler. It a queue that holds tasks scheduled for
223         # execution, and yields the next task to be executed 
224         # ordered by execution and arrival time
225         self._scheduler = HeapScheduler()
226
227         # Tasks
228         self._tasks = dict()
229
230         # RM groups (for deployment) 
231         self._groups = dict()
232
233         # generator of globally unique id for groups
234         self._group_id_generator = guid.GuidGenerator()
235
236         # Flag to stop processing thread
237         self._stop = False
238     
239         # Entity in charge of managing system failures
240         self._fm = FailureManager(self)
241
242         # EC state
243         self._state = ECState.RUNNING
244
245         # Automatically construct experiment description 
246         self._netgraph = None
247         if add_node_callback and add_edge_callback:
248             self._build_from_netgraph(add_node_callback, add_edge_callback, 
249                     **kwargs)
250
251         # The runner is a pool of threads used to parallelize 
252         # execution of tasks
253         self._nthreads = 20
254         self._runner = None
255
256         # Event processing thread
257         self._cond = threading.Condition()
258         self._thread = threading.Thread(target = self._process)
259         self._thread.setDaemon(True)
260         self._thread.start()
261         
262     @property
263     def logger(self):
264         """ Returns the logger instance of the Experiment Controller
265
266         """
267         return self._logger
268
269     @property
270     def failure_level(self):
271         """ Returns the level of FAILURE of th experiment
272
273         """
274
275         return self._fm._failure_level
276
277     @property
278     def ecstate(self):
279         """ Returns the state of the Experiment Controller
280
281         """
282         return self._state
283
284     @property
285     def exp_id(self):
286         """ Returns the experiment id assigned by the user
287
288         """
289         return self._exp_id
290
291     @property
292     def run_id(self):
293         """ Returns the experiment instance (run) identifier (automatically 
294         generated)
295
296         """
297         return self._run_id
298
299     @property
300     def nthreads(self):
301         """ Returns the number of processing nthreads used
302
303         """
304         return self._nthreads
305
306     @property
307     def local_dir(self):
308         """ Root local directory for experiment files
309
310         """
311         return self._local_dir
312
313     @property
314     def exp_dir(self):
315         """ Local directory to store results and other files related to the 
316         experiment.
317
318         """
319         return self._exp_dir
320
321     @property
322     def run_dir(self):
323         """ Local directory to store results and other files related to the 
324         experiment run.
325
326         """
327         return self._run_dir
328
329     @property
330     def persist(self):
331         """ If True, persists the ExperimentController to XML format upon 
332         experiment completion
333
334         """
335         return self._persist
336
337     @property
338     def netgraph(self):
339         """ Return NetGraph instance if experiment description was automatically 
340         generated
341
342         """
343         return self._netgraph
344
345     @property
346     def abort(self):
347         """ Returns True if the experiment has failed and should be interrupted,
348         False otherwise.
349
350         """
351         return self._fm.abort
352
353     def inform_failure(self, guid):
354         """ Reports a failure in a RM to the EC for evaluation
355
356             :param guid: Resource id
357             :type guid: int
358
359         """
360
361         return self._fm.eval_failure(guid)
362
363     def wait_finished(self, guids):
364         """ Blocking method that waits until all RMs in the 'guids' list 
365         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
366         RELEASED ), or until a failure in the experiment occurs 
367         (i.e. abort == True) 
368         
369             :param guids: List of guids
370             :type guids: list
371
372         """
373
374         def quit():
375             return self.abort
376
377         return self.wait(guids, state = ResourceState.STOPPED, 
378                 quit = quit)
379
380     def wait_started(self, guids):
381         """ Blocking method that waits until all RMs in the 'guids' list 
382         have reached a state >= STARTED, or until a failure in the 
383         experiment occurs (i.e. abort == True) 
384         
385             :param guids: List of guids
386             :type guids: list
387
388         """
389
390         def quit():
391             return self.abort
392
393         return self.wait(guids, state = ResourceState.STARTED, 
394                 quit = quit)
395
396     def wait_released(self, guids):
397         """ Blocking method that waits until all RMs in the 'guids' list 
398         have reached a state == RELEASED, or until the EC fails 
399         
400             :param guids: List of guids
401             :type guids: list
402
403         """
404
405         def quit():
406             return self._state == ECState.FAILED
407
408         return self.wait(guids, state = ResourceState.RELEASED, 
409                 quit = quit)
410
411     def wait_deployed(self, guids):
412         """ Blocking method that waits until all RMs in the 'guids' list 
413         have reached a state >= READY, or until a failure in the 
414         experiment occurs (i.e. abort == True) 
415         
416             :param guids: List of guids
417             :type guids: list
418
419         """
420
421         def quit():
422             return self.abort
423
424         return self.wait(guids, state = ResourceState.READY, 
425                 quit = quit)
426
427     def wait(self, guids, state, quit):
428         """ Blocking method that waits until all RMs in the 'guids' list 
429         have reached a state >= 'state', or until the 'quit' callback
430         yields True
431            
432             :param guids: List of guids
433             :type guids: list
434         
435         """
436         if isinstance(guids, int):
437             guids = [guids]
438
439         # Make a copy to avoid modifying the original guids list
440         guids = list(guids)
441
442         while True:
443             # If there are no more guids to wait for
444             # or the quit function returns True, exit the loop
445             if len(guids) == 0 or quit():
446                 break
447
448             # If a guid reached one of the target states, remove it from list
449             guid = guids.pop()
450             rm = self.get_resource(guid)
451             rstate = rm.state
452             
453             if rstate >= state:
454                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
455                     rm.get_rtype(), guid, rstate, state))
456             else:
457                 # Debug...
458                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
459                     guid, rstate, state))
460
461                 guids.append(guid)
462
463                 time.sleep(0.5)
464
465     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
466         plotter = ECPlotter()
467         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
468                 show = show)
469         return fpath
470
471     def serialize(self, format = SFormats.XML):
472         serializer = ECSerializer()
473         sec = serializer.load(self, format = format)
474         return sec
475
476     def save(self, dirpath = None, format = SFormats.XML):
477         serializer = ECSerializer()
478         path = serializer.save(self, dirpath  = None, format = format)
479         return path
480
481     def get_task(self, tid):
482         """ Returns a task by its id
483
484             :param tid: Id of the task
485             :type tid: int
486             
487             :rtype: Task
488             
489         """
490         return self._tasks.get(tid)
491
492     def get_resource(self, guid):
493         """ Returns a registered ResourceManager by its guid
494
495             :param guid: Id of the resource
496             :type guid: int
497             
498             :rtype: ResourceManager
499             
500         """
501         rm = self._resources.get(guid)
502         return rm
503
504     def get_resources_by_type(self, rtype):
505         """ Returns the ResourceManager objects of type rtype
506
507             :param rtype: Resource type
508             :type rtype: string
509             
510             :rtype: list of ResourceManagers
511             
512         """
513         rms = []
514         for guid, rm in self._resources.iteritems():
515             if rm.get_rtype() == rtype: 
516                 rms.append(rm)
517         return rms
518
519     def remove_resource(self, guid):
520         del self._resources[guid]
521
522     @property
523     def resources(self):
524         """ Returns the guids of all ResourceManagers 
525
526             :return: Set of all RM guids
527             :rtype: list
528
529         """
530         keys = self._resources.keys()
531
532         return keys
533
534     def filter_resources(self, rtype):
535         """ Returns the guids of all ResourceManagers of type rtype
536
537             :param rtype: Resource type
538             :type rtype: string
539             
540             :rtype: list of guids
541             
542         """
543         rms = []
544         for guid, rm in self._resources.iteritems():
545             if rm.get_rtype() == rtype: 
546                 rms.append(rm.guid)
547         return rms
548
549     def register_resource(self, rtype, guid = None):
550         """ Registers a new ResourceManager of type 'rtype' in the experiment
551         
552         This method will assign a new 'guid' for the RM, if no guid
553         is specified.
554
555             :param rtype: Type of the RM
556             :type rtype: str
557
558             :return: Guid of the RM
559             :rtype: int
560             
561         """
562         # Get next available guid
563         guid = self._guid_generator.next(guid)
564         
565         # Instantiate RM
566         rm = ResourceFactory.create(rtype, self, guid)
567
568         # Store RM
569         self._resources[guid] = rm
570
571         return guid
572
573     def get_attributes(self, guid):
574         """ Returns all the attributes of the RM with guid 'guid'
575
576             :param guid: Guid of the RM
577             :type guid: int
578
579             :return: List of attributes
580             :rtype: list
581
582         """
583         rm = self.get_resource(guid)
584         return rm.get_attributes()
585
586     def get_attribute(self, guid, name):
587         """ Returns the attribute 'name' of the RM with guid 'guid'
588
589             :param guid: Guid of the RM
590             :type guid: int
591
592             :param name: Name of the attribute
593             :type name: str
594
595             :return: The attribute with name 'name'
596             :rtype: Attribute
597
598         """
599         rm = self.get_resource(guid)
600         return rm.get_attribute(name)
601
602     def register_connection(self, guid1, guid2):
603         """ Registers a connection between a RM with guid 'guid1'
604         and another RM with guid 'guid2'. 
605     
606         The order of the in which the two guids are provided is not
607         important, since the connection relationship is symmetric.
608
609             :param guid1: First guid to connect
610             :type guid1: ResourceManager
611
612             :param guid2: Second guid to connect
613             :type guid: ResourceManager
614
615         """
616         rm1 = self.get_resource(guid1)
617         rm2 = self.get_resource(guid2)
618
619         rm1.register_connection(guid2)
620         rm2.register_connection(guid1)
621
622     def register_condition(self, guids1, action, guids2, state,
623             time = None):
624         """ Registers an action START, STOP or DEPLOY for all RM on list
625         guids1 to occur at time 'time' after all elements in list guids2 
626         have reached state 'state'.
627
628             :param guids1: List of guids of RMs subjected to action
629             :type guids1: list
630
631             :param action: Action to perform (either START, STOP or DEPLOY)
632             :type action: ResourceAction
633
634             :param guids2: List of guids of RMs to we waited for
635             :type guids2: list
636
637             :param state: State to wait for on RMs of list guids2 (STARTED,
638                 STOPPED, etc)
639             :type state: ResourceState
640
641             :param time: Time to wait after guids2 has reached status 
642             :type time: string
643
644         """
645         if isinstance(guids1, int):
646             guids1 = [guids1]
647         if isinstance(guids2, int):
648             guids2 = [guids2]
649
650         for guid1 in guids1:
651             rm = self.get_resource(guid1)
652             rm.register_condition(action, guids2, state, time)
653
654     def enable_trace(self, guid, name):
655         """ Enables a trace to be collected during the experiment run
656
657             :param name: Name of the trace
658             :type name: str
659
660         """
661         rm = self.get_resource(guid)
662         rm.enable_trace(name)
663
664     def trace_enabled(self, guid, name):
665         """ Returns True if the trace of name 'name' is enabled
666
667             :param name: Name of the trace
668             :type name: str
669
670         """
671         rm = self.get_resource(guid)
672         return rm.trace_enabled(name)
673
674     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
675         """ Returns information on a collected trace, the trace stream or 
676         blocks (chunks) of the trace stream
677
678             :param name: Name of the trace
679             :type name: str
680
681             :param attr: Can be one of:
682                          - TraceAttr.ALL (complete trace content), 
683                          - TraceAttr.STREAM (block in bytes to read starting 
684                                 at offset),
685                          - TraceAttr.PATH (full path to the trace file),
686                          - TraceAttr.SIZE (size of trace file). 
687             :type attr: str
688
689             :param block: Number of bytes to retrieve from trace, when attr is 
690                 TraceAttr.STREAM 
691             :type name: int
692
693             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
694             :type name: int
695
696             :rtype: str
697
698         """
699         rm = self.get_resource(guid)
700         return rm.trace(name, attr, block, offset)
701
702     def get_traces(self, guid):
703         """ Returns the list of the trace names of the RM with guid 'guid'
704
705             :param guid: Guid of the RM
706             :type guid: int
707
708             :return: List of trace names
709             :rtype: list
710
711         """
712         rm = self.get_resource(guid)
713         return rm.get_traces()
714
715
716     def discover(self, guid):
717         """ Discovers an available resource matching the criteria defined
718         by the RM with guid 'guid', and associates that resource to the RM
719
720         Not all RM types require (or are capable of) performing resource 
721         discovery. For the RM types which are not capable of doing so, 
722         invoking this method does not have any consequences. 
723
724             :param guid: Guid of the RM
725             :type guid: int
726
727         """
728         rm = self.get_resource(guid)
729         return rm.discover()
730
731     def provision(self, guid):
732         """ Provisions the resource associated to the RM with guid 'guid'.
733
734         Provisioning means making a resource 'accessible' to the user. 
735         Not all RM types require (or are capable of) performing resource 
736         provisioning. For the RM types which are not capable of doing so, 
737         invoking this method does not have any consequences. 
738
739             :param guid: Guid of the RM
740             :type guid: int
741
742         """
743         rm = self.get_resource(guid)
744         return rm.provision()
745
746     def get(self, guid, name):
747         """ Returns the value of the attribute with name 'name' on the
748         RM with guid 'guid'
749
750             :param guid: Guid of the RM
751             :type guid: int
752
753             :param name: Name of the attribute 
754             :type name: str
755
756             :return: The value of the attribute with name 'name'
757
758         """
759         rm = self.get_resource(guid)
760         return rm.get(name)
761
762     def set(self, guid, name, value):
763         """ Modifies the value of the attribute with name 'name' on the 
764         RM with guid 'guid'.
765
766             :param guid: Guid of the RM
767             :type guid: int
768
769             :param name: Name of the attribute
770             :type name: str
771
772             :param value: Value of the attribute
773
774         """
775         rm = self.get_resource(guid)
776         rm.set(name, value)
777
778     def get_global(self, rtype, name):
779         """ Returns the value of the global attribute with name 'name' on the
780         RMs of rtype 'rtype'.
781
782             :param guid: Guid of the RM
783             :type guid: int
784
785             :param name: Name of the attribute 
786             :type name: str
787
788             :return: The value of the attribute with name 'name'
789
790         """
791         rclass = ResourceFactory.get_resource_type(rtype)
792         return rclass.get_global(name)
793
794     def set_global(self, rtype, name, value):
795         """ Modifies the value of the global attribute with name 'name' on the 
796         RMs of with rtype 'rtype'.
797
798             :param guid: Guid of the RM
799             :type guid: int
800
801             :param name: Name of the attribute
802             :type name: str
803
804             :param value: Value of the attribute
805
806         """
807         rclass = ResourceFactory.get_resource_type(rtype)
808         return rclass.set_global(name, value)
809
810     def state(self, guid, hr = False):
811         """ Returns the state of a resource
812
813             :param guid: Resource guid
814             :type guid: integer
815
816             :param hr: Human readable. Forces return of a 
817                 status string instead of a number 
818             :type hr: boolean
819
820         """
821         rm = self.get_resource(guid)
822         state = rm.state
823
824         if hr:
825             return ResourceState2str.get(state)
826
827         return state
828
829     def stop(self, guid):
830         """ Stops the RM with guid 'guid'
831
832         Stopping a RM means that the resource it controls will
833         no longer take part of the experiment.
834
835             :param guid: Guid of the RM
836             :type guid: int
837
838         """
839         rm = self.get_resource(guid)
840         return rm.stop()
841
842     def start(self, guid):
843         """ Starts the RM with guid 'guid'
844
845         Starting a RM means that the resource it controls will
846         begin taking part of the experiment.
847
848             :param guid: Guid of the RM
849             :type guid: int
850
851         """
852         rm = self.get_resource(guid)
853         return rm.start()
854
855     def get_start_time(self, guid):
856         """ Returns the start time of the RM as a timestamp """
857         rm = self.get_resource(guid)
858         return rm.start_time
859
860     def get_stop_time(self, guid):
861         """ Returns the stop time of the RM as a timestamp """
862         rm = self.get_resource(guid)
863         return rm.stop_time
864
865     def get_discover_time(self, guid):
866         """ Returns the discover time of the RM as a timestamp """
867         rm = self.get_resource(guid)
868         return rm.discover_time
869
870     def get_provision_time(self, guid):
871         """ Returns the provision time of the RM as a timestamp """
872         rm = self.get_resource(guid)
873         return rm.provision_time
874
875     def get_ready_time(self, guid):
876         """ Returns the deployment time of the RM as a timestamp """
877         rm = self.get_resource(guid)
878         return rm.ready_time
879
880     def get_release_time(self, guid):
881         """ Returns the release time of the RM as a timestamp """
882         rm = self.get_resource(guid)
883         return rm.release_time
884
885     def get_failed_time(self, guid):
886         """ Returns the time failure occured for the RM as a timestamp """
887         rm = self.get_resource(guid)
888         return rm.failed_time
889
890     def set_with_conditions(self, name, value, guids1, guids2, state,
891             time = None):
892         """ Modifies the value of attribute with name 'name' on all RMs 
893         on the guids1 list when time 'time' has elapsed since all 
894         elements in guids2 list have reached state 'state'.
895
896             :param name: Name of attribute to set in RM
897             :type name: string
898
899             :param value: Value of attribute to set in RM
900             :type name: string
901
902             :param guids1: List of guids of RMs subjected to action
903             :type guids1: list
904
905             :param action: Action to register (either START or STOP)
906             :type action: ResourceAction
907
908             :param guids2: List of guids of RMs to we waited for
909             :type guids2: list
910
911             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
912             :type state: ResourceState
913
914             :param time: Time to wait after guids2 has reached status 
915             :type time: string
916
917         """
918         if isinstance(guids1, int):
919             guids1 = [guids1]
920         if isinstance(guids2, int):
921             guids2 = [guids2]
922
923         for guid1 in guids1:
924             rm = self.get_resource(guid)
925             rm.set_with_conditions(name, value, guids2, state, time)
926
927     def deploy(self, guids = None, wait_all_ready = True, group = None):
928         """ Deploys all ResourceManagers in the guids list. 
929         
930         If the argument 'guids' is not given, all RMs with state NEW
931         are deployed.
932
933             :param guids: List of guids of RMs to deploy
934             :type guids: list
935
936             :param wait_all_ready: Wait until all RMs are ready in
937                 order to start the RMs
938             :type guid: int
939
940             :param group: Id of deployment group in which to deploy RMs
941             :type group: int
942
943         """
944         self.logger.debug(" ------- DEPLOY START ------ ")
945
946         if not guids:
947             # If no guids list was passed, all 'NEW' RMs will be deployed
948             guids = []
949             for guid, rm in self._resources.iteritems():
950                 if rm.state == ResourceState.NEW:
951                     guids.append(guid)
952                 
953         if isinstance(guids, int):
954             guids = [guids]
955
956         # Create deployment group
957         # New guids can be added to a same deployment group later on
958         new_group = False
959         if not group:
960             new_group = True
961             group = self._group_id_generator.next()
962
963         if group not in self._groups:
964             self._groups[group] = []
965
966         self._groups[group].extend(guids)
967
968         def wait_all_and_start(group):
969             # Function that checks if all resources are READY
970             # before scheduling a start_with_conditions for each RM
971             reschedule = False
972             
973             # Get all guids in group
974             guids = self._groups[group]
975
976             for guid in guids:
977                 if self.state(guid) < ResourceState.READY:
978                     reschedule = True
979                     break
980
981             if reschedule:
982                 callback = functools.partial(wait_all_and_start, group)
983                 self.schedule("1s", callback)
984             else:
985                 # If all resources are ready, we schedule the start
986                 for guid in guids:
987                     rm = self.get_resource(guid)
988                     self.schedule("0s", rm.start_with_conditions)
989
990                     if rm.conditions.get(ResourceAction.STOP):
991                         # Only if the RM has STOP conditions we
992                         # schedule a stop. Otherwise the RM will stop immediately
993                         self.schedule("0s", rm.stop_with_conditions)
994
995         if wait_all_ready and new_group:
996             # Schedule a function to check that all resources are
997             # READY, and only then schedule the start.
998             # This aims at reducing the number of tasks looping in the 
999             # scheduler. 
1000             # Instead of having many start tasks, we will have only one for 
1001             # the whole group.
1002             callback = functools.partial(wait_all_and_start, group)
1003             self.schedule("0s", callback)
1004
1005         for guid in guids:
1006             rm = self.get_resource(guid)
1007             rm.deployment_group = group
1008             self.schedule("0s", rm.deploy_with_conditions)
1009
1010             if not wait_all_ready:
1011                 self.schedule("0s", rm.start_with_conditions)
1012
1013                 if rm.conditions.get(ResourceAction.STOP):
1014                     # Only if the RM has STOP conditions we
1015                     # schedule a stop. Otherwise the RM will stop immediately
1016                     self.schedule("0s", rm.stop_with_conditions)
1017
1018     def release(self, guids = None):
1019         """ Releases all ResourceManagers in the guids list.
1020
1021         If the argument 'guids' is not given, all RMs registered
1022         in the experiment are released.
1023
1024             :param guids: List of RM guids
1025             :type guids: list
1026
1027         """
1028         if isinstance(guids, int):
1029             guids = [guids]
1030
1031         if not guids:
1032             guids = self.resources
1033
1034         for guid in guids:
1035             rm = self.get_resource(guid)
1036             self.schedule("0s", rm.release)
1037
1038         self.wait_released(guids)
1039
1040         if self.persist:
1041             self.save(dirpath = self.run_dir)
1042
1043         for guid in guids:
1044             if self.get(guid, "hardRelease"):
1045                 self.remove_resource(guid)
1046         
1047     def shutdown(self):
1048         """ Releases all resources and stops the ExperimentController
1049
1050         """
1051         # If there was a major failure we can't exit gracefully
1052         if self._state == ECState.FAILED:
1053             raise RuntimeError("EC failure. Can not exit gracefully")
1054
1055         # Remove all pending tasks from the scheduler queue
1056         for tid in list(self._scheduler.pending):
1057             self._scheduler.remove(tid)
1058
1059         # Remove pending tasks from the workers queue
1060         self._runner.empty()
1061
1062         self.release()
1063
1064         # Mark the EC state as TERMINATED
1065         self._state = ECState.TERMINATED
1066
1067         # Stop processing thread
1068         self._stop = True
1069
1070         # Notify condition to wake up the processing thread
1071         self._notify()
1072         
1073         if self._thread.is_alive():
1074            self._thread.join()
1075
1076     def schedule(self, date, callback, track = False):
1077         """ Schedules a callback to be executed at time 'date'.
1078
1079             :param date: string containing execution time for the task.
1080                     It can be expressed as an absolute time, using
1081                     timestamp format, or as a relative time matching
1082                     ^\d+.\d+(h|m|s|ms|us)$
1083
1084             :param callback: code to be executed for the task. Must be a
1085                         Python function, and receives args and kwargs
1086                         as arguments.
1087
1088             :param track: if set to True, the task will be retrievable with
1089                     the get_task() method
1090
1091             :return : The Id of the task
1092             :rtype: int
1093             
1094         """
1095         timestamp = stabsformat(date)
1096         task = Task(timestamp, callback)
1097         task = self._scheduler.schedule(task)
1098
1099         if track:
1100             self._tasks[task.id] = task
1101
1102         # Notify condition to wake up the processing thread
1103         self._notify()
1104
1105         return task.id
1106      
1107     def _process(self):
1108         """ Process scheduled tasks.
1109
1110         .. note::
1111         
1112         Tasks are scheduled by invoking the schedule method with a target 
1113         callback and an execution time. 
1114         The schedule method creates a new Task object with that callback 
1115         and execution time, and pushes it into the '_scheduler' queue. 
1116         The execution time and the order of arrival of tasks are used 
1117         to order the tasks in the queue.
1118
1119         The _process method is executed in an independent thread held by 
1120         the ExperimentController for as long as the experiment is running.
1121         This method takes tasks from the '_scheduler' queue in a loop 
1122         and processes them in parallel using multithreading. 
1123         The environmental variable NEPI_NTHREADS can be used to control
1124         the number of threads used to process tasks. The default value is 
1125         50.
1126
1127         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1128         This object keeps a pool of threads (workers), and a queue of tasks
1129         scheduled for 'immediate' execution. 
1130         
1131         On each iteration, the '_process' loop will take the next task that 
1132         is scheduled for 'future' execution from the '_scheduler' queue, 
1133         and if the execution time of that task is >= to the current time, 
1134         it will push that task into the PR for 'immediate execution'. 
1135         As soon as a worker is free, the PR will assign the next task to
1136         that worker.
1137
1138         Upon receiving a task to execute, each PR worker (thread) will 
1139         invoke the  _execute method of the EC, passing the task as 
1140         argument.         
1141         The _execute method will then invoke task.callback inside a 
1142         try/except block. If an exception is raised by the tasks.callback, 
1143         it will be trapped by the try block, logged to standard error 
1144         (usually the console), and the task will be marked as failed.
1145
1146         """
1147
1148         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1149         self._runner = ParallelRun(maxthreads = self.nthreads)
1150         self._runner.start()
1151
1152         while not self._stop:
1153             try:
1154                 self._cond.acquire()
1155
1156                 task = self._scheduler.next()
1157                 
1158                 if not task:
1159                     # No task to execute. Wait for a new task to be scheduled.
1160                     self._cond.wait()
1161                 else:
1162                     # The task timestamp is in the future. Wait for timeout 
1163                     # or until another task is scheduled.
1164                     now = tnow()
1165                     if now < task.timestamp:
1166                         # Calculate timeout in seconds
1167                         timeout = tdiffsec(task.timestamp, now)
1168
1169                         # Re-schedule task with the same timestamp
1170                         self._scheduler.schedule(task)
1171                         
1172                         task = None
1173
1174                         # Wait timeout or until a new task awakes the condition
1175                         self._cond.wait(timeout)
1176                
1177                 self._cond.release()
1178
1179                 if task:
1180                     # Process tasks in parallel
1181                     self._runner.put(self._execute, task)
1182             except: 
1183                 import traceback
1184                 err = traceback.format_exc()
1185                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1186
1187                 # Set the EC to FAILED state 
1188                 self._state = ECState.FAILED
1189             
1190                 # Set the FailureManager failure level to EC failure
1191                 self._fm.set_ec_failure()
1192
1193         self.logger.debug("Exiting the task processing loop ... ")
1194         
1195         self._runner.sync()
1196         self._runner.destroy()
1197
1198     def _execute(self, task):
1199         """ Executes a single task. 
1200
1201             :param task: Object containing the callback to execute
1202             :type task: Task
1203
1204         """
1205         try:
1206             # Invoke callback
1207             task.result = task.callback()
1208             task.status = TaskStatus.DONE
1209         except:
1210             import traceback
1211             err = traceback.format_exc()
1212             task.result = err
1213             task.status = TaskStatus.ERROR
1214             
1215             self.logger.error("Error occurred while executing task: %s" % err)
1216
1217     def _notify(self):
1218         """ Awakes the processing thread if it is blocked waiting 
1219         for new tasks to arrive
1220         
1221         """
1222         self._cond.acquire()
1223         self._cond.notify()
1224         self._cond.release()
1225
1226     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1227             **kwargs):
1228         """ Automates experiment description using a NetGraph instance.
1229         """
1230         self._netgraph = NetGraph(**kwargs)
1231
1232         ### Add resources to the EC
1233         for nid in self.netgraph.graph.nodes():
1234             add_node_callback(self, nid)
1235
1236         #### Add connections between resources
1237         for nid1, nid2 in self.netgraph.graph.edges():
1238             add_edge_callback(self, nid1, nid2)
1239
1240