58ece229e25b4fe8076376c46587806c607a2742
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Describes the system failure state """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52
53     """
54
55     def __init__(self, ec):
56         self._ec = weakref.ref(ec)
57         self._failure_level = FailureLevel.OK
58         self._abort = False
59
60     @property
61     def ec(self):
62         """ Returns the ExperimentController associated to this FailureManager 
63         
64         """
65         
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         if self._failure_level == FailureLevel.OK:
74             rm = self.ec.get_resource(guid)
75             state = rm.state
76             critical = rm.get("critical")
77
78             if state == ResourceState.FAILED and critical:
79                 self._failure_level = FailureLevel.RM_FAILURE
80                 self._abort = True
81                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
82                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
83
84     def set_ec_failure(self):
85         self._failure_level = FailureLevel.EC_FAILURE
86
87 class ECState(object):
88     """ Possible states for an ExperimentController
89    
90     """
91     RUNNING = 1
92     FAILED = 2
93     RELEASED = 3
94     TERMINATED = 4
95
96 class ExperimentController(object):
97     """
98     .. class:: Class Args :
99       
100         :param exp_id: Human readable identifier for the experiment scenario. 
101         :type exp_id: str
102
103     .. note::
104
105     An experiment, or scenario, is defined by a concrete set of resources,
106     and the behavior, configuration and interconnection of those resources. 
107     The Experiment Description (ED) is a detailed representation of a
108     single experiment. It contains all the necessary information to 
109     allow repeating the experiment. NEPI allows to describe
110     experiments by registering components (resources), configuring them
111     and interconnecting them.
112     
113     A same experiment (scenario) can be executed many times, generating 
114     different results. We call an experiment execution (instance) a 'run'.
115
116     The ExperimentController (EC), is the entity responsible of
117     managing an experiment run. The same scenario can be 
118     recreated (and re-run) by instantiating an EC and recreating 
119     the same experiment description. 
120
121     An experiment is represented as a graph of interconnected
122     resources. A resource is a generic concept in the sense that any
123     component taking part of an experiment, whether physical of
124     virtual, is considered a resource. A resources could be a host, 
125     a virtual machine, an application, a simulator, a IP address.
126
127     A ResourceManager (RM), is the entity responsible for managing a 
128     single resource. ResourceManagers are specific to a resource
129     type (i.e. An RM to control a Linux application will not be
130     the same as the RM used to control a ns-3 simulation).
131     To support a new type of resource, a new RM must be implemented. 
132     NEPI already provides a variety of RMs to control basic resources, 
133     and new can be extended from the existing ones.
134
135     Through the EC interface the user can create ResourceManagers (RMs),
136     configure them and interconnect them, to describe an experiment.
137     Describing an experiment through the EC does not run the experiment.
138     Only when the 'deploy()' method is invoked on the EC, the EC will take 
139     actions to transform the 'described' experiment into a 'running' experiment.
140
141     While the experiment is running, it is possible to continue to
142     create/configure/connect RMs, and to deploy them to involve new
143     resources in the experiment (this is known as 'interactive' deployment).
144     
145     An experiments in NEPI is identified by a string id, 
146     which is either given by the user, or automatically generated by NEPI.  
147     The purpose of this identifier is to separate files and results that 
148     belong to different experiment scenarios. 
149     However, since a same 'experiment' can be run many times, the experiment
150     id is not enough to identify an experiment instance (run).
151     For this reason, the ExperimentController has two identifier, the 
152     exp_id, which can be re-used in different ExperimentController,
153     and the run_id, which is unique to one ExperimentController instance, and
154     is automatically generated by NEPI.
155    
156     """
157
158     @classmethod
159     def load(cls, filepath, format = SFormats.XML):
160         serializer = ECSerializer()
161         ec = serializer.load(filepath)
162         return ec
163
164     def __init__(self, exp_id = None, local_dir = None, persist = False,
165             add_node_callback = None, add_edge_callback = None, **kwargs):
166         """ ExperimentController entity to model an execute a network 
167         experiment.
168         
169         :param exp_id: Human readable name to identify the experiment
170         :type exp_id: str
171
172         :param local_dir: Path to local directory where to store experiment
173             related files
174         :type local_dir: str
175
176         :param persist: Save an XML description of the experiment after 
177         completion at local_dir
178         :type persist: bool
179
180         :param add_node_callback: Callback to invoke for node instantiation
181         when automatic topology creation mode is used 
182         :type add_node_callback: function
183
184         :param add_edge_callback: Callback to invoke for edge instantiation 
185         when automatic topology creation mode is used 
186         :type add_edge_callback: function
187
188         """
189         super(ExperimentController, self).__init__()
190
191         # Logging
192         self._logger = logging.getLogger("ExperimentController")
193
194         # Run identifier. It identifies a concrete execution instance (run) 
195         # of an experiment.
196         # Since a same experiment (same configuration) can be executed many 
197         # times, this run_id permits to separate result files generated on 
198         # different experiment executions
199         self._run_id = tsformat()
200
201         # Experiment identifier. Usually assigned by the user
202         # Identifies the experiment scenario (i.e. configuration, 
203         # resources used, etc)
204         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
205
206         # Local path where to store experiment related files (results, etc)
207         if not local_dir:
208             local_dir = tempfile.gettempdir() # /tmp
209
210         self._local_dir = local_dir
211         self._exp_dir = os.path.join(local_dir, self.exp_id)
212         self._run_dir = os.path.join(self.exp_dir, self.run_id)
213
214         # If True persist the experiment controller in XML format, after completion
215         self._persist = persist
216
217         # generator of globally unique ids
218         self._guid_generator = guid.GuidGenerator()
219         
220         # Resource managers
221         self._resources = dict()
222
223         # Scheduler. It a queue that holds tasks scheduled for
224         # execution, and yields the next task to be executed 
225         # ordered by execution and arrival time
226         self._scheduler = HeapScheduler()
227
228         # Tasks
229         self._tasks = dict()
230
231         # RM groups (for deployment) 
232         self._groups = dict()
233
234         # generator of globally unique id for groups
235         self._group_id_generator = guid.GuidGenerator()
236
237         # Flag to stop processing thread
238         self._stop = False
239     
240         # Entity in charge of managing system failures
241         self._fm = FailureManager(self)
242
243         # EC state
244         self._state = ECState.RUNNING
245
246         # Automatically construct experiment description 
247         self._netgraph = None
248         if add_node_callback or add_edge_callback or kwargs.get("topology"):
249             self._build_from_netgraph(add_node_callback, add_edge_callback, 
250                     **kwargs)
251
252         # The runner is a pool of threads used to parallelize 
253         # execution of tasks
254         self._nthreads = 20
255         self._runner = None
256
257         # Event processing thread
258         self._cond = threading.Condition()
259         self._thread = threading.Thread(target = self._process)
260         self._thread.setDaemon(True)
261         self._thread.start()
262         
263     @property
264     def logger(self):
265         """ Returns the logger instance of the Experiment Controller
266
267         """
268         return self._logger
269
270     @property
271     def failure_level(self):
272         """ Returns the level of FAILURE of th experiment
273
274         """
275
276         return self._fm._failure_level
277
278     @property
279     def ecstate(self):
280         """ Returns the state of the Experiment Controller
281
282         """
283         return self._state
284
285     @property
286     def exp_id(self):
287         """ Returns the experiment id assigned by the user
288
289         """
290         return self._exp_id
291
292     @property
293     def run_id(self):
294         """ Returns the experiment instance (run) identifier (automatically 
295         generated)
296
297         """
298         return self._run_id
299
300     @property
301     def nthreads(self):
302         """ Returns the number of processing nthreads used
303
304         """
305         return self._nthreads
306
307     @property
308     def local_dir(self):
309         """ Root local directory for experiment files
310
311         """
312         return self._local_dir
313
314     @property
315     def exp_dir(self):
316         """ Local directory to store results and other files related to the 
317         experiment.
318
319         """
320         return self._exp_dir
321
322     @property
323     def run_dir(self):
324         """ Local directory to store results and other files related to the 
325         experiment run.
326
327         """
328         return self._run_dir
329
330     @property
331     def persist(self):
332         """ If True, persists the ExperimentController to XML format upon 
333         experiment completion
334
335         """
336         return self._persist
337
338     @property
339     def netgraph(self):
340         """ Return NetGraph instance if experiment description was automatically 
341         generated
342
343         """
344         return self._netgraph
345
346     @property
347     def abort(self):
348         """ Returns True if the experiment has failed and should be interrupted,
349         False otherwise.
350
351         """
352         return self._fm.abort
353
354     def inform_failure(self, guid):
355         """ Reports a failure in a RM to the EC for evaluation
356
357             :param guid: Resource id
358             :type guid: int
359
360         """
361
362         return self._fm.eval_failure(guid)
363
364     def wait_finished(self, guids):
365         """ Blocking method that waits until all RMs in the 'guids' list 
366         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
367         RELEASED ), or until a failure in the experiment occurs 
368         (i.e. abort == True) 
369         
370             :param guids: List of guids
371             :type guids: list
372
373         """
374
375         def quit():
376             return self.abort
377
378         return self.wait(guids, state = ResourceState.STOPPED, 
379                 quit = quit)
380
381     def wait_started(self, guids):
382         """ Blocking method that waits until all RMs in the 'guids' list 
383         have reached a state >= STARTED, or until a failure in the 
384         experiment occurs (i.e. abort == True) 
385         
386             :param guids: List of guids
387             :type guids: list
388
389         """
390
391         def quit():
392             return self.abort
393
394         return self.wait(guids, state = ResourceState.STARTED, 
395                 quit = quit)
396
397     def wait_released(self, guids):
398         """ Blocking method that waits until all RMs in the 'guids' list 
399         have reached a state == RELEASED, or until the EC fails 
400         
401             :param guids: List of guids
402             :type guids: list
403
404         """
405
406         def quit():
407             return self._state == ECState.FAILED
408
409         return self.wait(guids, state = ResourceState.RELEASED, 
410                 quit = quit)
411
412     def wait_deployed(self, guids):
413         """ Blocking method that waits until all RMs in the 'guids' list 
414         have reached a state >= READY, or until a failure in the 
415         experiment occurs (i.e. abort == True) 
416         
417             :param guids: List of guids
418             :type guids: list
419
420         """
421
422         def quit():
423             return self.abort
424
425         return self.wait(guids, state = ResourceState.READY, 
426                 quit = quit)
427
428     def wait(self, guids, state, quit):
429         """ Blocking method that waits until all RMs in the 'guids' list 
430         have reached a state >= 'state', or until the 'quit' callback
431         yields True
432            
433             :param guids: List of guids
434             :type guids: list
435         
436         """
437         if isinstance(guids, int):
438             guids = [guids]
439
440         # Make a copy to avoid modifying the original guids list
441         guids = list(guids)
442
443         while True:
444             # If there are no more guids to wait for
445             # or the quit function returns True, exit the loop
446             if len(guids) == 0 or quit():
447                 break
448
449             # If a guid reached one of the target states, remove it from list
450             guid = guids.pop()
451             rm = self.get_resource(guid)
452             rstate = rm.state
453             
454             if rstate >= state:
455                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
456                     rm.get_rtype(), guid, rstate, state))
457             else:
458                 # Debug...
459                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
460                     guid, rstate, state))
461
462                 guids.append(guid)
463
464                 time.sleep(0.5)
465
466     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
467         plotter = ECPlotter()
468         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
469                 show = show)
470         return fpath
471
472     def serialize(self, format = SFormats.XML):
473         serializer = ECSerializer()
474         sec = serializer.load(self, format = format)
475         return sec
476
477     def save(self, dirpath = None, format = SFormats.XML):
478         if dirpath == None:
479             dirpath = self.run_dir
480
481         try:
482             os.makedirs(dirpath)
483         except OSError:
484             pass
485
486         serializer = ECSerializer()
487         path = serializer.save(self, dirpath, format = format)
488         return path
489
490     def get_task(self, tid):
491         """ Returns a task by its id
492
493             :param tid: Id of the task
494             :type tid: int
495             
496             :rtype: Task
497             
498         """
499         return self._tasks.get(tid)
500
501     def get_resource(self, guid):
502         """ Returns a registered ResourceManager by its guid
503
504             :param guid: Id of the resource
505             :type guid: int
506             
507             :rtype: ResourceManager
508             
509         """
510         rm = self._resources.get(guid)
511         return rm
512
513     def get_resources_by_type(self, rtype):
514         """ Returns the ResourceManager objects of type rtype
515
516             :param rtype: Resource type
517             :type rtype: string
518             
519             :rtype: list of ResourceManagers
520             
521         """
522         rms = []
523         for guid, rm in self._resources.iteritems():
524             if rm.get_rtype() == rtype: 
525                 rms.append(rm)
526         return rms
527
528     def remove_resource(self, guid):
529         del self._resources[guid]
530
531     @property
532     def resources(self):
533         """ Returns the guids of all ResourceManagers 
534
535             :return: Set of all RM guids
536             :rtype: list
537
538         """
539         keys = self._resources.keys()
540
541         return keys
542
543     def filter_resources(self, rtype):
544         """ Returns the guids of all ResourceManagers of type rtype
545
546             :param rtype: Resource type
547             :type rtype: string
548             
549             :rtype: list of guids
550             
551         """
552         rms = []
553         for guid, rm in self._resources.iteritems():
554             if rm.get_rtype() == rtype: 
555                 rms.append(rm.guid)
556         return rms
557
558     def register_resource(self, rtype, guid = None):
559         """ Registers a new ResourceManager of type 'rtype' in the experiment
560         
561         This method will assign a new 'guid' for the RM, if no guid
562         is specified.
563
564             :param rtype: Type of the RM
565             :type rtype: str
566
567             :return: Guid of the RM
568             :rtype: int
569             
570         """
571         # Get next available guid
572         guid = self._guid_generator.next(guid)
573         
574         # Instantiate RM
575         rm = ResourceFactory.create(rtype, self, guid)
576
577         # Store RM
578         self._resources[guid] = rm
579
580         return guid
581
582     def get_attributes(self, guid):
583         """ Returns all the attributes of the RM with guid 'guid'
584
585             :param guid: Guid of the RM
586             :type guid: int
587
588             :return: List of attributes
589             :rtype: list
590
591         """
592         rm = self.get_resource(guid)
593         return rm.get_attributes()
594
595     def get_attribute(self, guid, name):
596         """ Returns the attribute 'name' of the RM with guid 'guid'
597
598             :param guid: Guid of the RM
599             :type guid: int
600
601             :param name: Name of the attribute
602             :type name: str
603
604             :return: The attribute with name 'name'
605             :rtype: Attribute
606
607         """
608         rm = self.get_resource(guid)
609         return rm.get_attribute(name)
610
611     def register_connection(self, guid1, guid2):
612         """ Registers a connection between a RM with guid 'guid1'
613         and another RM with guid 'guid2'. 
614     
615         The order of the in which the two guids are provided is not
616         important, since the connection relationship is symmetric.
617
618             :param guid1: First guid to connect
619             :type guid1: ResourceManager
620
621             :param guid2: Second guid to connect
622             :type guid: ResourceManager
623
624         """
625         rm1 = self.get_resource(guid1)
626         rm2 = self.get_resource(guid2)
627
628         rm1.register_connection(guid2)
629         rm2.register_connection(guid1)
630
631     def register_condition(self, guids1, action, guids2, state,
632             time = None):
633         """ Registers an action START, STOP or DEPLOY for all RM on list
634         guids1 to occur at time 'time' after all elements in list guids2 
635         have reached state 'state'.
636
637             :param guids1: List of guids of RMs subjected to action
638             :type guids1: list
639
640             :param action: Action to perform (either START, STOP or DEPLOY)
641             :type action: ResourceAction
642
643             :param guids2: List of guids of RMs to we waited for
644             :type guids2: list
645
646             :param state: State to wait for on RMs of list guids2 (STARTED,
647                 STOPPED, etc)
648             :type state: ResourceState
649
650             :param time: Time to wait after guids2 has reached status 
651             :type time: string
652
653         """
654         if isinstance(guids1, int):
655             guids1 = [guids1]
656         if isinstance(guids2, int):
657             guids2 = [guids2]
658
659         for guid1 in guids1:
660             rm = self.get_resource(guid1)
661             rm.register_condition(action, guids2, state, time)
662
663     def enable_trace(self, guid, name):
664         """ Enables a trace to be collected during the experiment run
665
666             :param name: Name of the trace
667             :type name: str
668
669         """
670         rm = self.get_resource(guid)
671         rm.enable_trace(name)
672
673     def trace_enabled(self, guid, name):
674         """ Returns True if the trace of name 'name' is enabled
675
676             :param name: Name of the trace
677             :type name: str
678
679         """
680         rm = self.get_resource(guid)
681         return rm.trace_enabled(name)
682
683     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
684         """ Returns information on a collected trace, the trace stream or 
685         blocks (chunks) of the trace stream
686
687             :param name: Name of the trace
688             :type name: str
689
690             :param attr: Can be one of:
691                          - TraceAttr.ALL (complete trace content), 
692                          - TraceAttr.STREAM (block in bytes to read starting 
693                                 at offset),
694                          - TraceAttr.PATH (full path to the trace file),
695                          - TraceAttr.SIZE (size of trace file). 
696             :type attr: str
697
698             :param block: Number of bytes to retrieve from trace, when attr is 
699                 TraceAttr.STREAM 
700             :type name: int
701
702             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
703             :type name: int
704
705             :rtype: str
706
707         """
708         rm = self.get_resource(guid)
709         return rm.trace(name, attr, block, offset)
710
711     def get_traces(self, guid):
712         """ Returns the list of the trace names of the RM with guid 'guid'
713
714             :param guid: Guid of the RM
715             :type guid: int
716
717             :return: List of trace names
718             :rtype: list
719
720         """
721         rm = self.get_resource(guid)
722         return rm.get_traces()
723
724
725     def discover(self, guid):
726         """ Discovers an available resource matching the criteria defined
727         by the RM with guid 'guid', and associates that resource to the RM
728
729         Not all RM types require (or are capable of) performing resource 
730         discovery. For the RM types which are not capable of doing so, 
731         invoking this method does not have any consequences. 
732
733             :param guid: Guid of the RM
734             :type guid: int
735
736         """
737         rm = self.get_resource(guid)
738         return rm.discover()
739
740     def provision(self, guid):
741         """ Provisions the resource associated to the RM with guid 'guid'.
742
743         Provisioning means making a resource 'accessible' to the user. 
744         Not all RM types require (or are capable of) performing resource 
745         provisioning. For the RM types which are not capable of doing so, 
746         invoking this method does not have any consequences. 
747
748             :param guid: Guid of the RM
749             :type guid: int
750
751         """
752         rm = self.get_resource(guid)
753         return rm.provision()
754
755     def get(self, guid, name):
756         """ Returns the value of the attribute with name 'name' on the
757         RM with guid 'guid'
758
759             :param guid: Guid of the RM
760             :type guid: int
761
762             :param name: Name of the attribute 
763             :type name: str
764
765             :return: The value of the attribute with name 'name'
766
767         """
768         rm = self.get_resource(guid)
769         return rm.get(name)
770
771     def set(self, guid, name, value):
772         """ Modifies the value of the attribute with name 'name' on the 
773         RM with guid 'guid'.
774
775             :param guid: Guid of the RM
776             :type guid: int
777
778             :param name: Name of the attribute
779             :type name: str
780
781             :param value: Value of the attribute
782
783         """
784         rm = self.get_resource(guid)
785         rm.set(name, value)
786
787     def get_global(self, rtype, name):
788         """ Returns the value of the global attribute with name 'name' on the
789         RMs of rtype 'rtype'.
790
791             :param guid: Guid of the RM
792             :type guid: int
793
794             :param name: Name of the attribute 
795             :type name: str
796
797             :return: The value of the attribute with name 'name'
798
799         """
800         rclass = ResourceFactory.get_resource_type(rtype)
801         return rclass.get_global(name)
802
803     def set_global(self, rtype, name, value):
804         """ Modifies the value of the global attribute with name 'name' on the 
805         RMs of with rtype 'rtype'.
806
807             :param guid: Guid of the RM
808             :type guid: int
809
810             :param name: Name of the attribute
811             :type name: str
812
813             :param value: Value of the attribute
814
815         """
816         rclass = ResourceFactory.get_resource_type(rtype)
817         return rclass.set_global(name, value)
818
819     def state(self, guid, hr = False):
820         """ Returns the state of a resource
821
822             :param guid: Resource guid
823             :type guid: integer
824
825             :param hr: Human readable. Forces return of a 
826                 status string instead of a number 
827             :type hr: boolean
828
829         """
830         rm = self.get_resource(guid)
831         state = rm.state
832
833         if hr:
834             return ResourceState2str.get(state)
835
836         return state
837
838     def stop(self, guid):
839         """ Stops the RM with guid 'guid'
840
841         Stopping a RM means that the resource it controls will
842         no longer take part of the experiment.
843
844             :param guid: Guid of the RM
845             :type guid: int
846
847         """
848         rm = self.get_resource(guid)
849         return rm.stop()
850
851     def start(self, guid):
852         """ Starts the RM with guid 'guid'
853
854         Starting a RM means that the resource it controls will
855         begin taking part of the experiment.
856
857             :param guid: Guid of the RM
858             :type guid: int
859
860         """
861         rm = self.get_resource(guid)
862         return rm.start()
863
864     def get_start_time(self, guid):
865         """ Returns the start time of the RM as a timestamp """
866         rm = self.get_resource(guid)
867         return rm.start_time
868
869     def get_stop_time(self, guid):
870         """ Returns the stop time of the RM as a timestamp """
871         rm = self.get_resource(guid)
872         return rm.stop_time
873
874     def get_discover_time(self, guid):
875         """ Returns the discover time of the RM as a timestamp """
876         rm = self.get_resource(guid)
877         return rm.discover_time
878
879     def get_provision_time(self, guid):
880         """ Returns the provision time of the RM as a timestamp """
881         rm = self.get_resource(guid)
882         return rm.provision_time
883
884     def get_ready_time(self, guid):
885         """ Returns the deployment time of the RM as a timestamp """
886         rm = self.get_resource(guid)
887         return rm.ready_time
888
889     def get_release_time(self, guid):
890         """ Returns the release time of the RM as a timestamp """
891         rm = self.get_resource(guid)
892         return rm.release_time
893
894     def get_failed_time(self, guid):
895         """ Returns the time failure occured for the RM as a timestamp """
896         rm = self.get_resource(guid)
897         return rm.failed_time
898
899     def set_with_conditions(self, name, value, guids1, guids2, state,
900             time = None):
901         """ Modifies the value of attribute with name 'name' on all RMs 
902         on the guids1 list when time 'time' has elapsed since all 
903         elements in guids2 list have reached state 'state'.
904
905             :param name: Name of attribute to set in RM
906             :type name: string
907
908             :param value: Value of attribute to set in RM
909             :type name: string
910
911             :param guids1: List of guids of RMs subjected to action
912             :type guids1: list
913
914             :param action: Action to register (either START or STOP)
915             :type action: ResourceAction
916
917             :param guids2: List of guids of RMs to we waited for
918             :type guids2: list
919
920             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
921             :type state: ResourceState
922
923             :param time: Time to wait after guids2 has reached status 
924             :type time: string
925
926         """
927         if isinstance(guids1, int):
928             guids1 = [guids1]
929         if isinstance(guids2, int):
930             guids2 = [guids2]
931
932         for guid1 in guids1:
933             rm = self.get_resource(guid)
934             rm.set_with_conditions(name, value, guids2, state, time)
935
936     def deploy(self, guids = None, wait_all_ready = True, group = None):
937         """ Deploys all ResourceManagers in the guids list. 
938         
939         If the argument 'guids' is not given, all RMs with state NEW
940         are deployed.
941
942             :param guids: List of guids of RMs to deploy
943             :type guids: list
944
945             :param wait_all_ready: Wait until all RMs are ready in
946                 order to start the RMs
947             :type guid: int
948
949             :param group: Id of deployment group in which to deploy RMs
950             :type group: int
951
952         """
953         self.logger.debug(" ------- DEPLOY START ------ ")
954
955         if not guids:
956             # If no guids list was passed, all 'NEW' RMs will be deployed
957             guids = []
958             for guid, rm in self._resources.iteritems():
959                 if rm.state == ResourceState.NEW:
960                     guids.append(guid)
961                 
962         if isinstance(guids, int):
963             guids = [guids]
964
965         # Create deployment group
966         # New guids can be added to a same deployment group later on
967         new_group = False
968         if not group:
969             new_group = True
970             group = self._group_id_generator.next()
971
972         if group not in self._groups:
973             self._groups[group] = []
974
975         self._groups[group].extend(guids)
976
977         def wait_all_and_start(group):
978             # Function that checks if all resources are READY
979             # before scheduling a start_with_conditions for each RM
980             reschedule = False
981             
982             # Get all guids in group
983             guids = self._groups[group]
984
985             for guid in guids:
986                 if self.state(guid) < ResourceState.READY:
987                     reschedule = True
988                     break
989
990             if reschedule:
991                 callback = functools.partial(wait_all_and_start, group)
992                 self.schedule("1s", callback)
993             else:
994                 # If all resources are ready, we schedule the start
995                 for guid in guids:
996                     rm = self.get_resource(guid)
997                     self.schedule("0s", rm.start_with_conditions)
998
999                     if rm.conditions.get(ResourceAction.STOP):
1000                         # Only if the RM has STOP conditions we
1001                         # schedule a stop. Otherwise the RM will stop immediately
1002                         self.schedule("0s", rm.stop_with_conditions)
1003
1004         if wait_all_ready and new_group:
1005             # Schedule a function to check that all resources are
1006             # READY, and only then schedule the start.
1007             # This aims at reducing the number of tasks looping in the 
1008             # scheduler. 
1009             # Instead of having many start tasks, we will have only one for 
1010             # the whole group.
1011             callback = functools.partial(wait_all_and_start, group)
1012             self.schedule("0s", callback)
1013
1014         for guid in guids:
1015             rm = self.get_resource(guid)
1016             rm.deployment_group = group
1017             self.schedule("0s", rm.deploy_with_conditions)
1018
1019             if not wait_all_ready:
1020                 self.schedule("0s", rm.start_with_conditions)
1021
1022                 if rm.conditions.get(ResourceAction.STOP):
1023                     # Only if the RM has STOP conditions we
1024                     # schedule a stop. Otherwise the RM will stop immediately
1025                     self.schedule("0s", rm.stop_with_conditions)
1026
1027     def release(self, guids = None):
1028         """ Releases all ResourceManagers in the guids list.
1029
1030         If the argument 'guids' is not given, all RMs registered
1031         in the experiment are released.
1032
1033             :param guids: List of RM guids
1034             :type guids: list
1035
1036         """
1037         if self._state == ECState.RELEASED:
1038             return 
1039
1040         if isinstance(guids, int):
1041             guids = [guids]
1042
1043         if not guids:
1044             guids = self.resources
1045
1046         for guid in guids:
1047             rm = self.get_resource(guid)
1048             self.schedule("0s", rm.release)
1049
1050         self.wait_released(guids)
1051
1052         if self.persist:
1053             self.save()
1054
1055         for guid in guids:
1056             if self.get(guid, "hardRelease"):
1057                 self.remove_resource(guid)\
1058
1059         # Mark the EC state as RELEASED
1060         self._state = ECState.RELEASED
1061         
1062     def shutdown(self):
1063         """ Releases all resources and stops the ExperimentController
1064
1065         """
1066         # If there was a major failure we can't exit gracefully
1067         if self._state == ECState.FAILED:
1068             raise RuntimeError("EC failure. Can not exit gracefully")
1069
1070         # Remove all pending tasks from the scheduler queue
1071         for tid in list(self._scheduler.pending):
1072             self._scheduler.remove(tid)
1073
1074         # Remove pending tasks from the workers queue
1075         self._runner.empty()
1076
1077         self.release()
1078
1079         # Mark the EC state as TERMINATED
1080         self._state = ECState.TERMINATED
1081
1082         # Stop processing thread
1083         self._stop = True
1084
1085         # Notify condition to wake up the processing thread
1086         self._notify()
1087         
1088         if self._thread.is_alive():
1089            self._thread.join()
1090
1091     def schedule(self, date, callback, track = False):
1092         """ Schedules a callback to be executed at time 'date'.
1093
1094             :param date: string containing execution time for the task.
1095                     It can be expressed as an absolute time, using
1096                     timestamp format, or as a relative time matching
1097                     ^\d+.\d+(h|m|s|ms|us)$
1098
1099             :param callback: code to be executed for the task. Must be a
1100                         Python function, and receives args and kwargs
1101                         as arguments.
1102
1103             :param track: if set to True, the task will be retrievable with
1104                     the get_task() method
1105
1106             :return : The Id of the task
1107             :rtype: int
1108             
1109         """
1110         timestamp = stabsformat(date)
1111         task = Task(timestamp, callback)
1112         task = self._scheduler.schedule(task)
1113
1114         if track:
1115             self._tasks[task.id] = task
1116
1117         # Notify condition to wake up the processing thread
1118         self._notify()
1119
1120         return task.id
1121      
1122     def _process(self):
1123         """ Process scheduled tasks.
1124
1125         .. note::
1126         
1127         Tasks are scheduled by invoking the schedule method with a target 
1128         callback and an execution time. 
1129         The schedule method creates a new Task object with that callback 
1130         and execution time, and pushes it into the '_scheduler' queue. 
1131         The execution time and the order of arrival of tasks are used 
1132         to order the tasks in the queue.
1133
1134         The _process method is executed in an independent thread held by 
1135         the ExperimentController for as long as the experiment is running.
1136         This method takes tasks from the '_scheduler' queue in a loop 
1137         and processes them in parallel using multithreading. 
1138         The environmental variable NEPI_NTHREADS can be used to control
1139         the number of threads used to process tasks. The default value is 
1140         50.
1141
1142         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1143         This object keeps a pool of threads (workers), and a queue of tasks
1144         scheduled for 'immediate' execution. 
1145         
1146         On each iteration, the '_process' loop will take the next task that 
1147         is scheduled for 'future' execution from the '_scheduler' queue, 
1148         and if the execution time of that task is >= to the current time, 
1149         it will push that task into the PR for 'immediate execution'. 
1150         As soon as a worker is free, the PR will assign the next task to
1151         that worker.
1152
1153         Upon receiving a task to execute, each PR worker (thread) will 
1154         invoke the  _execute method of the EC, passing the task as 
1155         argument.         
1156         The _execute method will then invoke task.callback inside a 
1157         try/except block. If an exception is raised by the tasks.callback, 
1158         it will be trapped by the try block, logged to standard error 
1159         (usually the console), and the task will be marked as failed.
1160
1161         """
1162
1163         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1164         self._runner = ParallelRun(maxthreads = self.nthreads)
1165         self._runner.start()
1166
1167         while not self._stop:
1168             try:
1169                 self._cond.acquire()
1170
1171                 task = self._scheduler.next()
1172                 
1173                 if not task:
1174                     # No task to execute. Wait for a new task to be scheduled.
1175                     self._cond.wait()
1176                 else:
1177                     # The task timestamp is in the future. Wait for timeout 
1178                     # or until another task is scheduled.
1179                     now = tnow()
1180                     if now < task.timestamp:
1181                         # Calculate timeout in seconds
1182                         timeout = tdiffsec(task.timestamp, now)
1183
1184                         # Re-schedule task with the same timestamp
1185                         self._scheduler.schedule(task)
1186                         
1187                         task = None
1188
1189                         # Wait timeout or until a new task awakes the condition
1190                         self._cond.wait(timeout)
1191                
1192                 self._cond.release()
1193
1194                 if task:
1195                     # Process tasks in parallel
1196                     self._runner.put(self._execute, task)
1197             except: 
1198                 import traceback
1199                 err = traceback.format_exc()
1200                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1201
1202                 # Set the EC to FAILED state 
1203                 self._state = ECState.FAILED
1204             
1205                 # Set the FailureManager failure level to EC failure
1206                 self._fm.set_ec_failure()
1207
1208         self.logger.debug("Exiting the task processing loop ... ")
1209         
1210         self._runner.sync()
1211         self._runner.destroy()
1212
1213     def _execute(self, task):
1214         """ Executes a single task. 
1215
1216             :param task: Object containing the callback to execute
1217             :type task: Task
1218
1219         """
1220         try:
1221             # Invoke callback
1222             task.result = task.callback()
1223             task.status = TaskStatus.DONE
1224         except:
1225             import traceback
1226             err = traceback.format_exc()
1227             task.result = err
1228             task.status = TaskStatus.ERROR
1229             
1230             self.logger.error("Error occurred while executing task: %s" % err)
1231
1232     def _notify(self):
1233         """ Awakes the processing thread if it is blocked waiting 
1234         for new tasks to arrive
1235         
1236         """
1237         self._cond.acquire()
1238         self._cond.notify()
1239         self._cond.release()
1240
1241     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1242             **kwargs):
1243         """ Automates experiment description using a NetGraph instance.
1244         """
1245         self._netgraph = NetGraph(**kwargs)
1246
1247         if add_node_callback:
1248             ### Add resources to the EC
1249             for nid in self.netgraph.nodes():
1250                 add_node_callback(self, nid)
1251
1252         if add_edge_callback:
1253             #### Add connections between resources
1254             for nid1, nid2 in self.netgraph.edges():
1255                 add_edge_callback(self, nid1, nid2)
1256