Adding NS3 FDNetDevice RM
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 class ExperimentController(object):
106     """
107     .. note::
108
109     An experiment, or scenario, is defined by a concrete set of resources,
110     and the behavior, configuration and interconnection of those resources. 
111     The Experiment Description (ED) is a detailed representation of a
112     single experiment. It contains all the necessary information to 
113     allow repeating the experiment. NEPI allows to describe
114     experiments by registering components (resources), configuring them
115     and interconnecting them.
116     
117     A same experiment (scenario) can be executed many times, generating 
118     different results. We call an experiment execution (instance) a 'run'.
119
120     The ExperimentController (EC), is the entity responsible of
121     managing an experiment run. The same scenario can be 
122     recreated (and re-run) by instantiating an EC and recreating 
123     the same experiment description. 
124
125     An experiment is represented as a graph of interconnected
126     resources. A resource is a generic concept in the sense that any
127     component taking part of an experiment, whether physical of
128     virtual, is considered a resource. A resources could be a host, 
129     a virtual machine, an application, a simulator, a IP address.
130
131     A ResourceManager (RM), is the entity responsible for managing a 
132     single resource. ResourceManagers are specific to a resource
133     type (i.e. An RM to control a Linux application will not be
134     the same as the RM used to control a ns-3 simulation).
135     To support a new type of resource, a new RM must be implemented. 
136     NEPI already provides a variety of RMs to control basic resources, 
137     and new can be extended from the existing ones.
138
139     Through the EC interface the user can create ResourceManagers (RMs),
140     configure them and interconnect them, to describe an experiment.
141     Describing an experiment through the EC does not run the experiment.
142     Only when the 'deploy()' method is invoked on the EC, the EC will take 
143     actions to transform the 'described' experiment into a 'running' experiment.
144
145     While the experiment is running, it is possible to continue to
146     create/configure/connect RMs, and to deploy them to involve new
147     resources in the experiment (this is known as 'interactive' deployment).
148     
149     An experiments in NEPI is identified by a string id, 
150     which is either given by the user, or automatically generated by NEPI.  
151     The purpose of this identifier is to separate files and results that 
152     belong to different experiment scenarios. 
153     However, since a same 'experiment' can be run many times, the experiment
154     id is not enough to identify an experiment instance (run).
155     For this reason, the ExperimentController has two identifier, the 
156     exp_id, which can be re-used in different ExperimentController,
157     and the run_id, which is unique to one ExperimentController instance, and
158     is automatically generated by NEPI.
159    
160     """
161
162     @classmethod
163     def load(cls, filepath, format = SFormats.XML):
164         serializer = ECSerializer()
165         ec = serializer.load(filepath)
166         return ec
167
168     def __init__(self, exp_id = None, local_dir = None, persist = False,
169             fm = None, add_node_callback = None, add_edge_callback = None, 
170             **kwargs):
171         """ ExperimentController entity to model an execute a network 
172         experiment.
173         
174         :param exp_id: Human readable name to identify the experiment
175         :type exp_id: str
176
177         :param local_dir: Path to local directory where to store experiment
178             related files
179         :type local_dir: str
180
181         :param persist: Save an XML description of the experiment after 
182         completion at local_dir
183         :type persist: bool
184
185         :param fm: FailureManager object. If None is given, the default 
186             FailureManager class will be used
187         :type fm: FailureManager
188
189         :param add_node_callback: Callback to invoke for node instantiation
190         when automatic topology creation mode is used 
191         :type add_node_callback: function
192
193         :param add_edge_callback: Callback to invoke for edge instantiation 
194         when automatic topology creation mode is used 
195         :type add_edge_callback: function
196
197         """
198         super(ExperimentController, self).__init__()
199
200         # Logging
201         self._logger = logging.getLogger("ExperimentController")
202
203         # Run identifier. It identifies a concrete execution instance (run) 
204         # of an experiment.
205         # Since a same experiment (same configuration) can be executed many 
206         # times, this run_id permits to separate result files generated on 
207         # different experiment executions
208         self._run_id = tsformat()
209
210         # Experiment identifier. Usually assigned by the user
211         # Identifies the experiment scenario (i.e. configuration, 
212         # resources used, etc)
213         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
214
215         # Local path where to store experiment related files (results, etc)
216         if not local_dir:
217             local_dir = tempfile.gettempdir() # /tmp
218
219         self._local_dir = local_dir
220         self._exp_dir = os.path.join(local_dir, self.exp_id)
221         self._run_dir = os.path.join(self.exp_dir, self.run_id)
222
223         # If True persist the experiment controller in XML format, after completion
224         self._persist = persist
225
226         # generator of globally unique ids
227         self._guid_generator = guid.GuidGenerator()
228         
229         # Resource managers
230         self._resources = dict()
231
232         # Scheduler. It a queue that holds tasks scheduled for
233         # execution, and yields the next task to be executed 
234         # ordered by execution and arrival time
235         self._scheduler = HeapScheduler()
236
237         # Tasks
238         self._tasks = dict()
239
240         # RM groups (for deployment) 
241         self._groups = dict()
242
243         # generator of globally unique id for groups
244         self._group_id_generator = guid.GuidGenerator()
245
246         # Flag to stop processing thread
247         self._stop = False
248     
249         # Entity in charge of managing system failures
250         if not fm:
251             self._fm = FailureManager()
252         self._fm.set_ec(self)
253
254         # EC state
255         self._state = ECState.RUNNING
256
257         # Automatically construct experiment description 
258         self._netgraph = None
259         if add_node_callback or add_edge_callback or kwargs.get("topology"):
260             self._build_from_netgraph(add_node_callback, add_edge_callback, 
261                     **kwargs)
262
263         # The runner is a pool of threads used to parallelize 
264         # execution of tasks
265         self._nthreads = 20
266         self._runner = None
267
268         # Event processing thread
269         self._cond = threading.Condition()
270         self._thread = threading.Thread(target = self._process)
271         self._thread.setDaemon(True)
272         self._thread.start()
273         
274     @property
275     def logger(self):
276         """ Returns the logger instance of the Experiment Controller
277
278         """
279         return self._logger
280
281     @property
282     def failure_level(self):
283         """ Returns the level of FAILURE of th experiment
284
285         """
286
287         return self._fm._failure_level
288
289     @property
290     def ecstate(self):
291         """ Returns the state of the Experiment Controller
292
293         """
294         return self._state
295
296     @property
297     def exp_id(self):
298         """ Returns the experiment id assigned by the user
299
300         """
301         return self._exp_id
302
303     @property
304     def run_id(self):
305         """ Returns the experiment instance (run) identifier (automatically 
306         generated)
307
308         """
309         return self._run_id
310
311     @property
312     def nthreads(self):
313         """ Returns the number of processing nthreads used
314
315         """
316         return self._nthreads
317
318     @property
319     def local_dir(self):
320         """ Root local directory for experiment files
321
322         """
323         return self._local_dir
324
325     @property
326     def exp_dir(self):
327         """ Local directory to store results and other files related to the 
328         experiment.
329
330         """
331         return self._exp_dir
332
333     @property
334     def run_dir(self):
335         """ Local directory to store results and other files related to the 
336         experiment run.
337
338         """
339         return self._run_dir
340
341     @property
342     def persist(self):
343         """ If True, persists the ExperimentController to XML format upon 
344         experiment completion
345
346         """
347         return self._persist
348
349     @property
350     def netgraph(self):
351         """ Return NetGraph instance if experiment description was automatically 
352         generated
353
354         """
355         return self._netgraph
356
357     @property
358     def abort(self):
359         """ Returns True if the experiment has failed and should be interrupted,
360         False otherwise.
361
362         """
363         return self._fm.abort
364
365     def inform_failure(self, guid):
366         """ Reports a failure in a RM to the EC for evaluation
367
368             :param guid: Resource id
369             :type guid: int
370
371         """
372
373         return self._fm.eval_failure(guid)
374
375     def wait_finished(self, guids):
376         """ Blocking method that waits until all RMs in the 'guids' list 
377         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
378         RELEASED ), or until a failure in the experiment occurs 
379         (i.e. abort == True) 
380         
381             :param guids: List of guids
382             :type guids: list
383
384         """
385
386         def quit():
387             return self.abort
388
389         return self.wait(guids, state = ResourceState.STOPPED, 
390                 quit = quit)
391
392     def wait_started(self, guids):
393         """ Blocking method that waits until all RMs in the 'guids' list 
394         have reached a state >= STARTED, or until a failure in the 
395         experiment occurs (i.e. abort == True) 
396         
397             :param guids: List of guids
398             :type guids: list
399
400         """
401
402         def quit():
403             return self.abort
404
405         return self.wait(guids, state = ResourceState.STARTED, 
406                 quit = quit)
407
408     def wait_released(self, guids):
409         """ Blocking method that waits until all RMs in the 'guids' list 
410         have reached a state == RELEASED, or until the EC fails 
411         
412             :param guids: List of guids
413             :type guids: list
414
415         """
416
417         def quit():
418             return self._state == ECState.FAILED
419
420         return self.wait(guids, state = ResourceState.RELEASED, 
421                 quit = quit)
422
423     def wait_deployed(self, guids):
424         """ Blocking method that waits until all RMs in the 'guids' list 
425         have reached a state >= READY, or until a failure in the 
426         experiment occurs (i.e. abort == True) 
427         
428             :param guids: List of guids
429             :type guids: list
430
431         """
432
433         def quit():
434             return self.abort
435
436         return self.wait(guids, state = ResourceState.READY, 
437                 quit = quit)
438
439     def wait(self, guids, state, quit):
440         """ Blocking method that waits until all RMs in the 'guids' list 
441         have reached a state >= 'state', or until the 'quit' callback
442         yields True
443            
444             :param guids: List of guids
445             :type guids: list
446         
447         """
448         if isinstance(guids, int):
449             guids = [guids]
450
451         # Make a copy to avoid modifying the original guids list
452         guids = list(guids)
453
454         while True:
455             # If there are no more guids to wait for
456             # or the quit function returns True, exit the loop
457             if len(guids) == 0 or quit():
458                 break
459
460             # If a guid reached one of the target states, remove it from list
461             guid = guids.pop()
462             rm = self.get_resource(guid)
463             rstate = rm.state
464             
465             if rstate >= state:
466                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
467                     rm.get_rtype(), guid, rstate, state))
468             else:
469                 # Debug...
470                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
471                     guid, rstate, state))
472
473                 guids.append(guid)
474
475                 time.sleep(0.5)
476
477     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
478         plotter = ECPlotter()
479         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
480                 show = show)
481         return fpath
482
483     def serialize(self, format = SFormats.XML):
484         serializer = ECSerializer()
485         sec = serializer.load(self, format = format)
486         return sec
487
488     def save(self, dirpath = None, format = SFormats.XML):
489         if dirpath == None:
490             dirpath = self.run_dir
491
492         try:
493             os.makedirs(dirpath)
494         except OSError:
495             pass
496
497         serializer = ECSerializer()
498         path = serializer.save(self, dirpath, format = format)
499         return path
500
501     def get_task(self, tid):
502         """ Returns a task by its id
503
504             :param tid: Id of the task
505             :type tid: int
506             
507             :rtype: Task
508             
509         """
510         return self._tasks.get(tid)
511
512     def get_resource(self, guid):
513         """ Returns a registered ResourceManager by its guid
514
515             :param guid: Id of the resource
516             :type guid: int
517             
518             :rtype: ResourceManager
519             
520         """
521         rm = self._resources.get(guid)
522         return rm
523
524     def get_resources_by_type(self, rtype):
525         """ Returns the ResourceManager objects of type rtype
526
527             :param rtype: Resource type
528             :type rtype: string
529             
530             :rtype: list of ResourceManagers
531             
532         """
533         rms = []
534         for guid, rm in self._resources.iteritems():
535             if rm.get_rtype() == rtype: 
536                 rms.append(rm)
537         return rms
538
539     def remove_resource(self, guid):
540         del self._resources[guid]
541
542     @property
543     def resources(self):
544         """ Returns the guids of all ResourceManagers 
545
546             :return: Set of all RM guids
547             :rtype: list
548
549         """
550         keys = self._resources.keys()
551
552         return keys
553
554     def filter_resources(self, rtype):
555         """ Returns the guids of all ResourceManagers of type rtype
556
557             :param rtype: Resource type
558             :type rtype: string
559             
560             :rtype: list of guids
561             
562         """
563         rms = []
564         for guid, rm in self._resources.iteritems():
565             if rm.get_rtype() == rtype: 
566                 rms.append(rm.guid)
567         return rms
568
569     def register_resource(self, rtype, guid = None):
570         """ Registers a new ResourceManager of type 'rtype' in the experiment
571         
572         This method will assign a new 'guid' for the RM, if no guid
573         is specified.
574
575             :param rtype: Type of the RM
576             :type rtype: str
577
578             :return: Guid of the RM
579             :rtype: int
580             
581         """
582         # Get next available guid
583         guid = self._guid_generator.next(guid)
584         
585         # Instantiate RM
586         rm = ResourceFactory.create(rtype, self, guid)
587
588         # Store RM
589         self._resources[guid] = rm
590
591         return guid
592
593     def get_attributes(self, guid):
594         """ Returns all the attributes of the RM with guid 'guid'
595
596             :param guid: Guid of the RM
597             :type guid: int
598
599             :return: List of attributes
600             :rtype: list
601
602         """
603         rm = self.get_resource(guid)
604         return rm.get_attributes()
605
606     def get_attribute(self, guid, name):
607         """ Returns the attribute 'name' of the RM with guid 'guid'
608
609             :param guid: Guid of the RM
610             :type guid: int
611
612             :param name: Name of the attribute
613             :type name: str
614
615             :return: The attribute with name 'name'
616             :rtype: Attribute
617
618         """
619         rm = self.get_resource(guid)
620         return rm.get_attribute(name)
621
622     def register_connection(self, guid1, guid2):
623         """ Registers a connection between a RM with guid 'guid1'
624         and another RM with guid 'guid2'. 
625     
626         The order of the in which the two guids are provided is not
627         important, since the connection relationship is symmetric.
628
629             :param guid1: First guid to connect
630             :type guid1: ResourceManager
631
632             :param guid2: Second guid to connect
633             :type guid: ResourceManager
634
635         """
636         rm1 = self.get_resource(guid1)
637         rm2 = self.get_resource(guid2)
638
639         rm1.register_connection(guid2)
640         rm2.register_connection(guid1)
641
642     def register_condition(self, guids1, action, guids2, state,
643             time = None):
644         """ Registers an action START, STOP or DEPLOY for all RM on list
645         guids1 to occur at time 'time' after all elements in list guids2 
646         have reached state 'state'.
647
648             :param guids1: List of guids of RMs subjected to action
649             :type guids1: list
650
651             :param action: Action to perform (either START, STOP or DEPLOY)
652             :type action: ResourceAction
653
654             :param guids2: List of guids of RMs to we waited for
655             :type guids2: list
656
657             :param state: State to wait for on RMs of list guids2 (STARTED,
658                 STOPPED, etc)
659             :type state: ResourceState
660
661             :param time: Time to wait after guids2 has reached status 
662             :type time: string
663
664         """
665         if isinstance(guids1, int):
666             guids1 = [guids1]
667         if isinstance(guids2, int):
668             guids2 = [guids2]
669
670         for guid1 in guids1:
671             rm = self.get_resource(guid1)
672             rm.register_condition(action, guids2, state, time)
673
674     def enable_trace(self, guid, name):
675         """ Enables a trace to be collected during the experiment run
676
677             :param name: Name of the trace
678             :type name: str
679
680         """
681         rm = self.get_resource(guid)
682         rm.enable_trace(name)
683
684     def trace_enabled(self, guid, name):
685         """ Returns True if the trace of name 'name' is enabled
686
687             :param name: Name of the trace
688             :type name: str
689
690         """
691         rm = self.get_resource(guid)
692         return rm.trace_enabled(name)
693
694     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
695         """ Returns information on a collected trace, the trace stream or 
696         blocks (chunks) of the trace stream
697
698             :param name: Name of the trace
699             :type name: str
700
701             :param attr: Can be one of:
702                          - TraceAttr.ALL (complete trace content), 
703                          - TraceAttr.STREAM (block in bytes to read starting 
704                                 at offset),
705                          - TraceAttr.PATH (full path to the trace file),
706                          - TraceAttr.SIZE (size of trace file). 
707             :type attr: str
708
709             :param block: Number of bytes to retrieve from trace, when attr is 
710                 TraceAttr.STREAM 
711             :type name: int
712
713             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
714             :type name: int
715
716             :rtype: str
717
718         """
719         rm = self.get_resource(guid)
720         return rm.trace(name, attr, block, offset)
721
722     def get_traces(self, guid):
723         """ Returns the list of the trace names of the RM with guid 'guid'
724
725             :param guid: Guid of the RM
726             :type guid: int
727
728             :return: List of trace names
729             :rtype: list
730
731         """
732         rm = self.get_resource(guid)
733         return rm.get_traces()
734
735
736     def discover(self, guid):
737         """ Discovers an available resource matching the criteria defined
738         by the RM with guid 'guid', and associates that resource to the RM
739
740         Not all RM types require (or are capable of) performing resource 
741         discovery. For the RM types which are not capable of doing so, 
742         invoking this method does not have any consequences. 
743
744             :param guid: Guid of the RM
745             :type guid: int
746
747         """
748         rm = self.get_resource(guid)
749         return rm.discover()
750
751     def provision(self, guid):
752         """ Provisions the resource associated to the RM with guid 'guid'.
753
754         Provisioning means making a resource 'accessible' to the user. 
755         Not all RM types require (or are capable of) performing resource 
756         provisioning. For the RM types which are not capable of doing so, 
757         invoking this method does not have any consequences. 
758
759             :param guid: Guid of the RM
760             :type guid: int
761
762         """
763         rm = self.get_resource(guid)
764         return rm.provision()
765
766     def get(self, guid, name):
767         """ Returns the value of the attribute with name 'name' on the
768         RM with guid 'guid'
769
770             :param guid: Guid of the RM
771             :type guid: int
772
773             :param name: Name of the attribute 
774             :type name: str
775
776             :return: The value of the attribute with name 'name'
777
778         """
779         rm = self.get_resource(guid)
780         return rm.get(name)
781
782     def set(self, guid, name, value):
783         """ Modifies the value of the attribute with name 'name' on the 
784         RM with guid 'guid'.
785
786             :param guid: Guid of the RM
787             :type guid: int
788
789             :param name: Name of the attribute
790             :type name: str
791
792             :param value: Value of the attribute
793
794         """
795         rm = self.get_resource(guid)
796         rm.set(name, value)
797
798     def get_global(self, rtype, name):
799         """ Returns the value of the global attribute with name 'name' on the
800         RMs of rtype 'rtype'.
801
802             :param guid: Guid of the RM
803             :type guid: int
804
805             :param name: Name of the attribute 
806             :type name: str
807
808             :return: The value of the attribute with name 'name'
809
810         """
811         rclass = ResourceFactory.get_resource_type(rtype)
812         return rclass.get_global(name)
813
814     def set_global(self, rtype, name, value):
815         """ Modifies the value of the global attribute with name 'name' on the 
816         RMs of with rtype 'rtype'.
817
818             :param guid: Guid of the RM
819             :type guid: int
820
821             :param name: Name of the attribute
822             :type name: str
823
824             :param value: Value of the attribute
825
826         """
827         rclass = ResourceFactory.get_resource_type(rtype)
828         return rclass.set_global(name, value)
829
830     def state(self, guid, hr = False):
831         """ Returns the state of a resource
832
833             :param guid: Resource guid
834             :type guid: integer
835
836             :param hr: Human readable. Forces return of a 
837                 status string instead of a number 
838             :type hr: boolean
839
840         """
841         rm = self.get_resource(guid)
842         state = rm.state
843
844         if hr:
845             return ResourceState2str.get(state)
846
847         return state
848
849     def stop(self, guid):
850         """ Stops the RM with guid 'guid'
851
852         Stopping a RM means that the resource it controls will
853         no longer take part of the experiment.
854
855             :param guid: Guid of the RM
856             :type guid: int
857
858         """
859         rm = self.get_resource(guid)
860         return rm.stop()
861
862     def start(self, guid):
863         """ Starts the RM with guid 'guid'
864
865         Starting a RM means that the resource it controls will
866         begin taking part of the experiment.
867
868             :param guid: Guid of the RM
869             :type guid: int
870
871         """
872         rm = self.get_resource(guid)
873         return rm.start()
874
875     def get_start_time(self, guid):
876         """ Returns the start time of the RM as a timestamp """
877         rm = self.get_resource(guid)
878         return rm.start_time
879
880     def get_stop_time(self, guid):
881         """ Returns the stop time of the RM as a timestamp """
882         rm = self.get_resource(guid)
883         return rm.stop_time
884
885     def get_discover_time(self, guid):
886         """ Returns the discover time of the RM as a timestamp """
887         rm = self.get_resource(guid)
888         return rm.discover_time
889
890     def get_provision_time(self, guid):
891         """ Returns the provision time of the RM as a timestamp """
892         rm = self.get_resource(guid)
893         return rm.provision_time
894
895     def get_ready_time(self, guid):
896         """ Returns the deployment time of the RM as a timestamp """
897         rm = self.get_resource(guid)
898         return rm.ready_time
899
900     def get_release_time(self, guid):
901         """ Returns the release time of the RM as a timestamp """
902         rm = self.get_resource(guid)
903         return rm.release_time
904
905     def get_failed_time(self, guid):
906         """ Returns the time failure occured for the RM as a timestamp """
907         rm = self.get_resource(guid)
908         return rm.failed_time
909
910     def set_with_conditions(self, name, value, guids1, guids2, state,
911             time = None):
912         """ Modifies the value of attribute with name 'name' on all RMs 
913         on the guids1 list when time 'time' has elapsed since all 
914         elements in guids2 list have reached state 'state'.
915
916             :param name: Name of attribute to set in RM
917             :type name: string
918
919             :param value: Value of attribute to set in RM
920             :type name: string
921
922             :param guids1: List of guids of RMs subjected to action
923             :type guids1: list
924
925             :param action: Action to register (either START or STOP)
926             :type action: ResourceAction
927
928             :param guids2: List of guids of RMs to we waited for
929             :type guids2: list
930
931             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
932             :type state: ResourceState
933
934             :param time: Time to wait after guids2 has reached status 
935             :type time: string
936
937         """
938         if isinstance(guids1, int):
939             guids1 = [guids1]
940         if isinstance(guids2, int):
941             guids2 = [guids2]
942
943         for guid1 in guids1:
944             rm = self.get_resource(guid)
945             rm.set_with_conditions(name, value, guids2, state, time)
946
947     def deploy(self, guids = None, wait_all_ready = True, group = None):
948         """ Deploys all ResourceManagers in the guids list. 
949         
950         If the argument 'guids' is not given, all RMs with state NEW
951         are deployed.
952
953             :param guids: List of guids of RMs to deploy
954             :type guids: list
955
956             :param wait_all_ready: Wait until all RMs are ready in
957                 order to start the RMs
958             :type guid: int
959
960             :param group: Id of deployment group in which to deploy RMs
961             :type group: int
962
963         """
964         self.logger.debug(" ------- DEPLOY START ------ ")
965
966         if not guids:
967             # If no guids list was passed, all 'NEW' RMs will be deployed
968             guids = []
969             for guid, rm in self._resources.iteritems():
970                 if rm.state == ResourceState.NEW:
971                     guids.append(guid)
972                 
973         if isinstance(guids, int):
974             guids = [guids]
975
976         # Create deployment group
977         # New guids can be added to a same deployment group later on
978         new_group = False
979         if not group:
980             new_group = True
981             group = self._group_id_generator.next()
982
983         if group not in self._groups:
984             self._groups[group] = []
985
986         self._groups[group].extend(guids)
987
988         def wait_all_and_start(group):
989             # Function that checks if all resources are READY
990             # before scheduling a start_with_conditions for each RM
991             reschedule = False
992             
993             # Get all guids in group
994             guids = self._groups[group]
995
996             for guid in guids:
997                 if self.state(guid) < ResourceState.READY:
998                     reschedule = True
999                     break
1000
1001             if reschedule:
1002                 callback = functools.partial(wait_all_and_start, group)
1003                 self.schedule("1s", callback)
1004             else:
1005                 # If all resources are ready, we schedule the start
1006                 for guid in guids:
1007                     rm = self.get_resource(guid)
1008                     self.schedule("0s", rm.start_with_conditions)
1009
1010                     if rm.conditions.get(ResourceAction.STOP):
1011                         # Only if the RM has STOP conditions we
1012                         # schedule a stop. Otherwise the RM will stop immediately
1013                         self.schedule("0s", rm.stop_with_conditions)
1014
1015         if wait_all_ready and new_group:
1016             # Schedule a function to check that all resources are
1017             # READY, and only then schedule the start.
1018             # This aims at reducing the number of tasks looping in the 
1019             # scheduler. 
1020             # Instead of having many start tasks, we will have only one for 
1021             # the whole group.
1022             callback = functools.partial(wait_all_and_start, group)
1023             self.schedule("0s", callback)
1024
1025         for guid in guids:
1026             rm = self.get_resource(guid)
1027             rm.deployment_group = group
1028             self.schedule("0s", rm.deploy_with_conditions)
1029
1030             if not wait_all_ready:
1031                 self.schedule("0s", rm.start_with_conditions)
1032
1033                 if rm.conditions.get(ResourceAction.STOP):
1034                     # Only if the RM has STOP conditions we
1035                     # schedule a stop. Otherwise the RM will stop immediately
1036                     self.schedule("0s", rm.stop_with_conditions)
1037
1038     def release(self, guids = None):
1039         """ Releases all ResourceManagers in the guids list.
1040
1041         If the argument 'guids' is not given, all RMs registered
1042         in the experiment are released.
1043
1044             :param guids: List of RM guids
1045             :type guids: list
1046
1047         """
1048         if self._state == ECState.RELEASED:
1049             return 
1050
1051         if isinstance(guids, int):
1052             guids = [guids]
1053
1054         if not guids:
1055             guids = self.resources
1056
1057         for guid in guids:
1058             rm = self.get_resource(guid)
1059             self.schedule("0s", rm.release)
1060
1061         self.wait_released(guids)
1062
1063         if self.persist:
1064             self.save()
1065
1066         for guid in guids:
1067             if self.get(guid, "hardRelease"):
1068                 self.remove_resource(guid)\
1069
1070         # Mark the EC state as RELEASED
1071         self._state = ECState.RELEASED
1072         
1073     def shutdown(self):
1074         """ Releases all resources and stops the ExperimentController
1075
1076         """
1077         # If there was a major failure we can't exit gracefully
1078         if self._state == ECState.FAILED:
1079             raise RuntimeError("EC failure. Can not exit gracefully")
1080
1081         # Remove all pending tasks from the scheduler queue
1082         for tid in list(self._scheduler.pending):
1083             self._scheduler.remove(tid)
1084
1085         # Remove pending tasks from the workers queue
1086         self._runner.empty()
1087
1088         self.release()
1089
1090         # Mark the EC state as TERMINATED
1091         self._state = ECState.TERMINATED
1092
1093         # Stop processing thread
1094         self._stop = True
1095
1096         # Notify condition to wake up the processing thread
1097         self._notify()
1098         
1099         if self._thread.is_alive():
1100            self._thread.join()
1101
1102     def schedule(self, date, callback, track = False):
1103         """ Schedules a callback to be executed at time 'date'.
1104
1105             :param date: string containing execution time for the task.
1106                     It can be expressed as an absolute time, using
1107                     timestamp format, or as a relative time matching
1108                     ^\d+.\d+(h|m|s|ms|us)$
1109
1110             :param callback: code to be executed for the task. Must be a
1111                         Python function, and receives args and kwargs
1112                         as arguments.
1113
1114             :param track: if set to True, the task will be retrievable with
1115                     the get_task() method
1116
1117             :return : The Id of the task
1118             :rtype: int
1119             
1120         """
1121         timestamp = stabsformat(date)
1122         task = Task(timestamp, callback)
1123         task = self._scheduler.schedule(task)
1124
1125         if track:
1126             self._tasks[task.id] = task
1127
1128         # Notify condition to wake up the processing thread
1129         self._notify()
1130
1131         return task.id
1132      
1133     def _process(self):
1134         """ Process scheduled tasks.
1135
1136         .. note::
1137         
1138         Tasks are scheduled by invoking the schedule method with a target 
1139         callback and an execution time. 
1140         The schedule method creates a new Task object with that callback 
1141         and execution time, and pushes it into the '_scheduler' queue. 
1142         The execution time and the order of arrival of tasks are used 
1143         to order the tasks in the queue.
1144
1145         The _process method is executed in an independent thread held by 
1146         the ExperimentController for as long as the experiment is running.
1147         This method takes tasks from the '_scheduler' queue in a loop 
1148         and processes them in parallel using multithreading. 
1149         The environmental variable NEPI_NTHREADS can be used to control
1150         the number of threads used to process tasks. The default value is 
1151         50.
1152
1153         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1154         This object keeps a pool of threads (workers), and a queue of tasks
1155         scheduled for 'immediate' execution. 
1156         
1157         On each iteration, the '_process' loop will take the next task that 
1158         is scheduled for 'future' execution from the '_scheduler' queue, 
1159         and if the execution time of that task is >= to the current time, 
1160         it will push that task into the PR for 'immediate execution'. 
1161         As soon as a worker is free, the PR will assign the next task to
1162         that worker.
1163
1164         Upon receiving a task to execute, each PR worker (thread) will 
1165         invoke the  _execute method of the EC, passing the task as 
1166         argument.         
1167         The _execute method will then invoke task.callback inside a 
1168         try/except block. If an exception is raised by the tasks.callback, 
1169         it will be trapped by the try block, logged to standard error 
1170         (usually the console), and the task will be marked as failed.
1171
1172         """
1173
1174         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1175         self._runner = ParallelRun(maxthreads = self.nthreads)
1176         self._runner.start()
1177
1178         while not self._stop:
1179             try:
1180                 self._cond.acquire()
1181
1182                 task = self._scheduler.next()
1183                 
1184                 if not task:
1185                     # No task to execute. Wait for a new task to be scheduled.
1186                     self._cond.wait()
1187                 else:
1188                     # The task timestamp is in the future. Wait for timeout 
1189                     # or until another task is scheduled.
1190                     now = tnow()
1191                     if now < task.timestamp:
1192                         # Calculate timeout in seconds
1193                         timeout = tdiffsec(task.timestamp, now)
1194
1195                         # Re-schedule task with the same timestamp
1196                         self._scheduler.schedule(task)
1197                         
1198                         task = None
1199
1200                         # Wait timeout or until a new task awakes the condition
1201                         self._cond.wait(timeout)
1202                
1203                 self._cond.release()
1204
1205                 if task:
1206                     # Process tasks in parallel
1207                     self._runner.put(self._execute, task)
1208             except: 
1209                 import traceback
1210                 err = traceback.format_exc()
1211                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1212
1213                 # Set the EC to FAILED state 
1214                 self._state = ECState.FAILED
1215             
1216                 # Set the FailureManager failure level to EC failure
1217                 self._fm.set_ec_failure()
1218
1219         self.logger.debug("Exiting the task processing loop ... ")
1220         
1221         self._runner.sync()
1222         self._runner.destroy()
1223
1224     def _execute(self, task):
1225         """ Executes a single task. 
1226
1227             :param task: Object containing the callback to execute
1228             :type task: Task
1229
1230         """
1231         try:
1232             # Invoke callback
1233             task.result = task.callback()
1234             task.status = TaskStatus.DONE
1235         except:
1236             import traceback
1237             err = traceback.format_exc()
1238             task.result = err
1239             task.status = TaskStatus.ERROR
1240             
1241             self.logger.error("Error occurred while executing task: %s" % err)
1242
1243     def _notify(self):
1244         """ Awakes the processing thread if it is blocked waiting 
1245         for new tasks to arrive
1246         
1247         """
1248         self._cond.acquire()
1249         self._cond.notify()
1250         self._cond.release()
1251
1252     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1253             **kwargs):
1254         """ Automates experiment description using a NetGraph instance.
1255         """
1256         self._netgraph = NetGraph(**kwargs)
1257
1258         if add_node_callback:
1259             ### Add resources to the EC
1260             for nid in self.netgraph.nodes():
1261                 add_node_callback(self, nid)
1262
1263         if add_edge_callback:
1264             #### Add connections between resources
1265             for nid1, nid2 in self.netgraph.edges():
1266                 add_edge_callback(self, nid1, nid2)
1267