Ignoring error on NETNSWrapper when NETNS is not instaled
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54
55     @property
56     def ec(self):
57         """ Returns the ExperimentController associated to this FailureManager 
58         
59         """
60         
61         return self._ec()
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.OK:
66             for guid in self.ec.resources:
67                 try:
68                     state = self.ec.state(guid)
69                     critical = self.ec.get(guid, "critical")
70                     if state == ResourceState.FAILED and critical:
71                         self._failure_level = FailureLevel.RM_FAILURE
72                         self.ec.logger.debug("RM critical failure occurred on guid %d." \
73                                 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
74                         break
75                 except:
76                     # An error might occure because a RM was deleted abruptly.
77                     # In this case the error should be ignored.
78                     if guid in self.ec._resources:
79                         raise
80
81         return self._failure_level != FailureLevel.OK
82
83     def set_ec_failure(self):
84         self._failure_level = FailureLevel.EC_FAILURE
85
86 class ECState(object):
87     """ Possible states for an ExperimentController
88    
89     """
90     RUNNING = 1
91     FAILED = 2
92     TERMINATED = 3
93
94 class ExperimentController(object):
95     """
96     .. class:: Class Args :
97       
98         :param exp_id: Human readable identifier for the experiment scenario. 
99         :type exp_id: str
100
101     .. note::
102
103     An experiment, or scenario, is defined by a concrete set of resources,
104     behavior, configuration and interconnection of those resources. 
105     The Experiment Description (ED) is a detailed representation of a
106     single experiment. It contains all the necessary information to 
107     allow repeating the experiment. NEPI allows to describe
108     experiments by registering components (resources), configuring them
109     and interconnecting them.
110     
111     A same experiment (scenario) can be executed many times, generating 
112     different results. We call an experiment execution (instance) a 'run'.
113
114     The ExperimentController (EC), is the entity responsible of
115     managing an experiment run. The same scenario can be 
116     recreated (and re-run) by instantiating an EC and recreating 
117     the same experiment description. 
118
119     In NEPI, an experiment is represented as a graph of interconnected
120     resources. A resource is a generic concept in the sense that any
121     component taking part of an experiment, whether physical of
122     virtual, is considered a resource. A resources could be a host, 
123     a virtual machine, an application, a simulator, a IP address.
124
125     A ResourceManager (RM), is the entity responsible for managing a 
126     single resource. ResourceManagers are specific to a resource
127     type (i.e. An RM to control a Linux application will not be
128     the same as the RM used to control a ns-3 simulation).
129     To support a new type of resource in NEPI, a new RM must be 
130     implemented. NEPI already provides a variety of
131     RMs to control basic resources, and new can be extended from
132     the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154         
155     """
156
157     def __init__(self, exp_id = None): 
158         super(ExperimentController, self).__init__()
159
160         # Logging
161         self._logger = logging.getLogger("ExperimentController")
162
163         # Run identifier. It identifies a concrete execution instance (run) 
164         # of an experiment.
165         # Since a same experiment (same configuration) can be executed many 
166         # times, this run_id permits to separate result files generated on 
167         # different experiment executions
168         self._run_id = tsformat()
169
170         # Experiment identifier. Usually assigned by the user
171         # Identifies the experiment scenario (i.e. configuration, 
172         # resources used, etc)
173         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
174
175         # generator of globally unique ids
176         self._guid_generator = guid.GuidGenerator()
177         
178         # Resource managers
179         self._resources = dict()
180
181         # Scheduler. It a queue that holds tasks scheduled for
182         # execution, and yields the next task to be executed 
183         # ordered by execution and arrival time
184         self._scheduler = HeapScheduler()
185
186         # Tasks
187         self._tasks = dict()
188
189         # RM groups (for deployment) 
190         self._groups = dict()
191
192         # generator of globally unique id for groups
193         self._group_id_generator = guid.GuidGenerator()
194
195         # Flag to stop processing thread
196         self._stop = False
197     
198         # Entity in charge of managing system failures
199         self._fm = FailureManager(self)
200
201         # EC state
202         self._state = ECState.RUNNING
203
204         # The runner is a pool of threads used to parallelize 
205         # execution of tasks
206         nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
207         self._runner = ParallelRun(maxthreads = nthreads)
208
209         # Event processing thread
210         self._cond = threading.Condition()
211         self._thread = threading.Thread(target = self._process)
212         self._thread.setDaemon(True)
213         self._thread.start()
214         
215     @property
216     def logger(self):
217         """ Returns the logger instance of the Experiment Controller
218
219         """
220         return self._logger
221
222     @property
223     def failure_level(self):
224         """ Returns the level of FAILURE of th experiment
225
226         """
227
228         return self._fm._failure_level
229
230     @property
231     def ecstate(self):
232         """ Returns the state of the Experiment Controller
233
234         """
235         return self._state
236
237     @property
238     def exp_id(self):
239         """ Returns the experiment id assigned by the user
240
241         """
242         return self._exp_id
243
244     @property
245     def run_id(self):
246         """ Returns the experiment instance (run) identifier (automatically 
247         generated)
248
249         """
250         return self._run_id
251
252     @property
253     def abort(self):
254         """ Returns True if the experiment has failed and should be interrupted,
255         False otherwise.
256
257         """
258         return self._fm.abort
259
260     def wait_finished(self, guids):
261         """ Blocking method that waits until all RMs in the 'guids' list 
262         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
263         RELEASED ), or until a failure in the experiment occurs 
264         (i.e. abort == True) 
265         
266             :param guids: List of guids
267             :type guids: list
268
269         """
270
271         def quit():
272             return self.abort
273
274         return self.wait(guids, state = ResourceState.STOPPED, 
275                 quit = quit)
276
277     def wait_started(self, guids):
278         """ Blocking method that waits until all RMs in the 'guids' list 
279         have reached a state >= STARTED, or until a failure in the 
280         experiment occurs (i.e. abort == True) 
281         
282             :param guids: List of guids
283             :type guids: list
284
285         """
286
287         def quit():
288             return self.abort
289
290         return self.wait(guids, state = ResourceState.STARTED, 
291                 quit = quit)
292
293     def wait_released(self, guids):
294         """ Blocking method that waits until all RMs in the 'guids' list 
295         have reached a state == RELEASED, or until the EC fails 
296         
297             :param guids: List of guids
298             :type guids: list
299
300         """
301
302         def quit():
303             return self._state == ECState.FAILED
304
305         return self.wait(guids, state = ResourceState.RELEASED, 
306                 quit = quit)
307
308     def wait_deployed(self, guids):
309         """ Blocking method that waits until all RMs in the 'guids' list 
310         have reached a state >= READY, or until a failure in the 
311         experiment occurs (i.e. abort == True) 
312         
313             :param guids: List of guids
314             :type guids: list
315
316         """
317
318         def quit():
319             return self.abort
320
321         return self.wait(guids, state = ResourceState.READY, 
322                 quit = quit)
323
324     def wait(self, guids, state, quit):
325         """ Blocking method that waits until all RMs in the 'guids' list 
326         have reached a state >= 'state', or until the 'quit' callback
327         yields True
328            
329             :param guids: List of guids
330             :type guids: list
331         
332         """
333         if isinstance(guids, int):
334             guids = [guids]
335
336         # Make a copy to avoid modifying the original guids list
337         guids = list(guids)
338
339         while True:
340             # If there are no more guids to wait for
341             # or the quit function returns True, exit the loop
342             if len(guids) == 0 or quit():
343                 break
344
345             # If a guid reached one of the target states, remove it from list
346             guid = guids[0]
347             rstate = self.state(guid)
348             
349             hrrstate = ResourceState2str.get(rstate)
350             hrstate = ResourceState2str.get(state)
351
352             if rstate >= state:
353                 guids.remove(guid)
354                 rm = self.get_resource(guid)
355                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
356                     rm.get_rtype(), guid, hrrstate, hrstate))
357             else:
358                 # Debug...
359                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
360                     guid, hrrstate, hrstate))
361                 time.sleep(0.5)
362   
363     def get_task(self, tid):
364         """ Returns a task by its id
365
366             :param tid: Id of the task
367             :type tid: int
368             
369             :rtype: Task
370             
371         """
372         return self._tasks.get(tid)
373
374     def get_resource(self, guid):
375         """ Returns a registered ResourceManager by its guid
376
377             :param guid: Id of the task
378             :type guid: int
379             
380             :rtype: ResourceManager
381             
382         """
383         rm = self._resources.get(guid)
384         return rm
385
386     def remove_resource(self, guid):
387         del self._resources[guid]
388
389     @property
390     def resources(self):
391         """ Returns the set() of guids of all the ResourceManager
392
393             :return: Set of all RM guids
394             :rtype: set
395
396         """
397         keys = self._resources.keys()
398
399         return keys
400
401     def register_resource(self, rtype, guid = None):
402         """ Registers a new ResourceManager of type 'rtype' in the experiment
403         
404         This method will assign a new 'guid' for the RM, if no guid
405         is specified.
406
407             :param rtype: Type of the RM
408             :type rtype: str
409
410             :return: Guid of the RM
411             :rtype: int
412             
413         """
414         # Get next available guid
415         guid = self._guid_generator.next(guid)
416         
417         # Instantiate RM
418         rm = ResourceFactory.create(rtype, self, guid)
419
420         # Store RM
421         self._resources[guid] = rm
422
423         return guid
424
425     def get_attributes(self, guid):
426         """ Returns all the attributes of the RM with guid 'guid'
427
428             :param guid: Guid of the RM
429             :type guid: int
430
431             :return: List of attributes
432             :rtype: list
433
434         """
435         rm = self.get_resource(guid)
436         return rm.get_attributes()
437
438     def get_attribute(self, guid, name):
439         """ Returns the attribute 'name' of the RM with guid 'guid'
440
441             :param guid: Guid of the RM
442             :type guid: int
443
444             :param name: Name of the attribute
445             :type name: str
446
447             :return: The attribute with name 'name'
448             :rtype: Attribute
449
450         """
451         rm = self.get_resource(guid)
452         return rm.get_attribute(name)
453
454     def register_connection(self, guid1, guid2):
455         """ Registers a connection between a RM with guid 'guid1'
456         and another RM with guid 'guid2'. 
457     
458         The order of the in which the two guids are provided is not
459         important, since the connection relationship is symmetric.
460
461             :param guid1: First guid to connect
462             :type guid1: ResourceManager
463
464             :param guid2: Second guid to connect
465             :type guid: ResourceManager
466
467         """
468         rm1 = self.get_resource(guid1)
469         rm2 = self.get_resource(guid2)
470
471         rm1.register_connection(guid2)
472         rm2.register_connection(guid1)
473
474     def register_condition(self, guids1, action, guids2, state,
475             time = None):
476         """ Registers an action START, STOP or DEPLOY for all RM on list
477         guids1 to occur at time 'time' after all elements in list guids2 
478         have reached state 'state'.
479
480             :param guids1: List of guids of RMs subjected to action
481             :type guids1: list
482
483             :param action: Action to perform (either START, STOP or DEPLOY)
484             :type action: ResourceAction
485
486             :param guids2: List of guids of RMs to we waited for
487             :type guids2: list
488
489             :param state: State to wait for on RMs of list guids2 (STARTED,
490                 STOPPED, etc)
491             :type state: ResourceState
492
493             :param time: Time to wait after guids2 has reached status 
494             :type time: string
495
496         """
497         if isinstance(guids1, int):
498             guids1 = [guids1]
499         if isinstance(guids2, int):
500             guids2 = [guids2]
501
502         for guid1 in guids1:
503             rm = self.get_resource(guid1)
504             rm.register_condition(action, guids2, state, time)
505
506     def enable_trace(self, guid, name):
507         """ Enables a trace to be collected during the experiment run
508
509             :param name: Name of the trace
510             :type name: str
511
512         """
513         rm = self.get_resource(guid)
514         rm.enable_trace(name)
515
516     def trace_enabled(self, guid, name):
517         """ Returns True if the trace of name 'name' is enabled
518
519             :param name: Name of the trace
520             :type name: str
521
522         """
523         rm = self.get_resource(guid)
524         return rm.trace_enabled(name)
525
526     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
527         """ Returns information on a collected trace, the trace stream or 
528         blocks (chunks) of the trace stream
529
530             :param name: Name of the trace
531             :type name: str
532
533             :param attr: Can be one of:
534                          - TraceAttr.ALL (complete trace content), 
535                          - TraceAttr.STREAM (block in bytes to read starting 
536                                 at offset),
537                          - TraceAttr.PATH (full path to the trace file),
538                          - TraceAttr.SIZE (size of trace file). 
539             :type attr: str
540
541             :param block: Number of bytes to retrieve from trace, when attr is 
542                 TraceAttr.STREAM 
543             :type name: int
544
545             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
546             :type name: int
547
548             :rtype: str
549
550         """
551         rm = self.get_resource(guid)
552         return rm.trace(name, attr, block, offset)
553
554     def get_traces(self, guid):
555         """ Returns the list of the trace names of the RM with guid 'guid'
556
557             :param guid: Guid of the RM
558             :type guid: int
559
560             :return: List of trace names
561             :rtype: list
562
563         """
564         rm = self.get_resource(guid)
565         return rm.get_traces()
566
567
568     def discover(self, guid):
569         """ Discovers an available resource matching the criteria defined
570         by the RM with guid 'guid', and associates that resource to the RM
571
572         Not all RM types require (or are capable of) performing resource 
573         discovery. For the RM types which are not capable of doing so, 
574         invoking this method does not have any consequences. 
575
576             :param guid: Guid of the RM
577             :type guid: int
578
579         """
580         rm = self.get_resource(guid)
581         return rm.discover()
582
583     def provision(self, guid):
584         """ Provisions the resource associated to the RM with guid 'guid'.
585
586         Provisioning means making a resource 'accessible' to the user. 
587         Not all RM types require (or are capable of) performing resource 
588         provisioning. For the RM types which are not capable of doing so, 
589         invoking this method does not have any consequences. 
590
591             :param guid: Guid of the RM
592             :type guid: int
593
594         """
595         rm = self.get_resource(guid)
596         return rm.provision()
597
598     def get(self, guid, name):
599         """ Returns the value of the attribute with name 'name' on the
600         RM with guid 'guid'
601
602             :param guid: Guid of the RM
603             :type guid: int
604
605             :param name: Name of the attribute 
606             :type name: str
607
608             :return: The value of the attribute with name 'name'
609
610         """
611         rm = self.get_resource(guid)
612         return rm.get(name)
613
614     def set(self, guid, name, value):
615         """ Modifies the value of the attribute with name 'name' on the 
616         RM with guid 'guid'.
617
618             :param guid: Guid of the RM
619             :type guid: int
620
621             :param name: Name of the attribute
622             :type name: str
623
624             :param value: Value of the attribute
625
626         """
627         rm = self.get_resource(guid)
628         rm.set(name, value)
629
630     def get_global(self, rtype, name):
631         """ Returns the value of the global attribute with name 'name' on the
632         RMs of rtype 'rtype'.
633
634             :param guid: Guid of the RM
635             :type guid: int
636
637             :param name: Name of the attribute 
638             :type name: str
639
640             :return: The value of the attribute with name 'name'
641
642         """
643         rclass = ResourceFactory.get_resource_type(rtype)
644         return rclass.get_global(name)
645
646     def set_global(self, rtype, name, value):
647         """ Modifies the value of the global attribute with name 'name' on the 
648         RMs of with rtype 'rtype'.
649
650             :param guid: Guid of the RM
651             :type guid: int
652
653             :param name: Name of the attribute
654             :type name: str
655
656             :param value: Value of the attribute
657
658         """
659         rclass = ResourceFactory.get_resource_type(rtype)
660         return rclass.set_global(name, value)
661
662     def state(self, guid, hr = False):
663         """ Returns the state of a resource
664
665             :param guid: Resource guid
666             :type guid: integer
667
668             :param hr: Human readable. Forces return of a 
669                 status string instead of a number 
670             :type hr: boolean
671
672         """
673         rm = self.get_resource(guid)
674         state = rm.state
675
676         if hr:
677             return ResourceState2str.get(state)
678
679         return state
680
681     def stop(self, guid):
682         """ Stops the RM with guid 'guid'
683
684         Stopping a RM means that the resource it controls will
685         no longer take part of the experiment.
686
687             :param guid: Guid of the RM
688             :type guid: int
689
690         """
691         rm = self.get_resource(guid)
692         return rm.stop()
693
694     def start(self, guid):
695         """ Starts the RM with guid 'guid'
696
697         Starting a RM means that the resource it controls will
698         begin taking part of the experiment.
699
700             :param guid: Guid of the RM
701             :type guid: int
702
703         """
704         rm = self.get_resource(guid)
705         return rm.start()
706
707     def get_start_time(self, guid):
708         """ Returns the start time of the RM as a timestamp """
709         rm = self.get_resource(guid)
710         return rm.start_time
711
712     def get_stop_time(self, guid):
713         """ Returns the stop time of the RM as a timestamp """
714         rm = self.get_resource(guid)
715         return rm.stop_time
716
717     def get_discover_time(self, guid):
718         """ Returns the discover time of the RM as a timestamp """
719         rm = self.get_resource(guid)
720         return rm.discover_time
721
722     def get_provision_time(self, guid):
723         """ Returns the provision time of the RM as a timestamp """
724         rm = self.get_resource(guid)
725         return rm.provision_time
726
727     def get_ready_time(self, guid):
728         """ Returns the deployment time of the RM as a timestamp """
729         rm = self.get_resource(guid)
730         return rm.ready_time
731
732     def get_release_time(self, guid):
733         """ Returns the release time of the RM as a timestamp """
734         rm = self.get_resource(guid)
735         return rm.release_time
736
737     def get_failed_time(self, guid):
738         """ Returns the time failure occured for the RM as a timestamp """
739         rm = self.get_resource(guid)
740         return rm.failed_time
741
742     def set_with_conditions(self, name, value, guids1, guids2, state,
743             time = None):
744         """ Modifies the value of attribute with name 'name' on all RMs 
745         on the guids1 list when time 'time' has elapsed since all 
746         elements in guids2 list have reached state 'state'.
747
748             :param name: Name of attribute to set in RM
749             :type name: string
750
751             :param value: Value of attribute to set in RM
752             :type name: string
753
754             :param guids1: List of guids of RMs subjected to action
755             :type guids1: list
756
757             :param action: Action to register (either START or STOP)
758             :type action: ResourceAction
759
760             :param guids2: List of guids of RMs to we waited for
761             :type guids2: list
762
763             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
764             :type state: ResourceState
765
766             :param time: Time to wait after guids2 has reached status 
767             :type time: string
768
769         """
770         if isinstance(guids1, int):
771             guids1 = [guids1]
772         if isinstance(guids2, int):
773             guids2 = [guids2]
774
775         for guid1 in guids1:
776             rm = self.get_resource(guid)
777             rm.set_with_conditions(name, value, guids2, state, time)
778
779     def deploy(self, guids = None, wait_all_ready = True, group = None):
780         """ Deploys all ResourceManagers in the guids list. 
781         
782         If the argument 'guids' is not given, all RMs with state NEW
783         are deployed.
784
785             :param guids: List of guids of RMs to deploy
786             :type guids: list
787
788             :param wait_all_ready: Wait until all RMs are ready in
789                 order to start the RMs
790             :type guid: int
791
792             :param group: Id of deployment group in which to deploy RMs
793             :type group: int
794
795         """
796         self.logger.debug(" ------- DEPLOY START ------ ")
797
798         if not guids:
799             # If no guids list was passed, all 'NEW' RMs will be deployed
800             guids = []
801             for guid in self.resources:
802                 if self.state(guid) == ResourceState.NEW:
803                     guids.append(guid)
804                 
805         if isinstance(guids, int):
806             guids = [guids]
807
808         # Create deployment group
809         # New guids can be added to a same deployment group later on
810         new_group = False
811         if not group:
812             new_group = True
813             group = self._group_id_generator.next()
814
815         if group not in self._groups:
816             self._groups[group] = []
817
818         self._groups[group].extend(guids)
819
820         def wait_all_and_start(group):
821             # Function that checks if all resources are READY
822             # before scheduling a start_with_conditions for each RM
823             reschedule = False
824             
825             # Get all guids in group
826             guids = self._groups[group]
827
828             for guid in guids:
829                 if self.state(guid) < ResourceState.READY:
830                     reschedule = True
831                     break
832
833             if reschedule:
834                 callback = functools.partial(wait_all_and_start, group)
835                 self.schedule("1s", callback)
836             else:
837                 # If all resources are ready, we schedule the start
838                 for guid in guids:
839                     rm = self.get_resource(guid)
840                     self.schedule("0s", rm.start_with_conditions)
841
842                     if rm.conditions.get(ResourceAction.STOP):
843                         # Only if the RM has STOP conditions we
844                         # schedule a stop. Otherwise the RM will stop immediately
845                         self.schedule("0s", rm.stop_with_conditions)
846
847         if wait_all_ready and new_group:
848             # Schedule a function to check that all resources are
849             # READY, and only then schedule the start.
850             # This aims at reducing the number of tasks looping in the 
851             # scheduler. 
852             # Instead of having many start tasks, we will have only one for 
853             # the whole group.
854             callback = functools.partial(wait_all_and_start, group)
855             self.schedule("0s", callback)
856
857         for guid in guids:
858             rm = self.get_resource(guid)
859             rm.deployment_group = group
860             self.schedule("0s", rm.deploy_with_conditions)
861
862             if not wait_all_ready:
863                 self.schedule("0s", rm.start_with_conditions)
864
865                 if rm.conditions.get(ResourceAction.STOP):
866                     # Only if the RM has STOP conditions we
867                     # schedule a stop. Otherwise the RM will stop immediately
868                     self.schedule("0s", rm.stop_with_conditions)
869
870     def release(self, guids = None):
871         """ Releases all ResourceManagers in the guids list.
872
873         If the argument 'guids' is not given, all RMs registered
874         in the experiment are released.
875
876             :param guids: List of RM guids
877             :type guids: list
878
879         """
880         if isinstance(guids, int):
881             guids = [guids]
882
883         if not guids:
884             guids = self.resources
885
886         for guid in guids:
887             rm = self.get_resource(guid)
888             self.schedule("0s", rm.release)
889
890         self.wait_released(guids)
891
892         for guid in guids:
893             if self.get(guid, "hardRelease"):
894                 self.remove_resource(guid)
895         
896     def shutdown(self):
897         """ Releases all resources and stops the ExperimentController
898
899         """
900         # If there was a major failure we can't exit gracefully
901         if self._state == ECState.FAILED:
902             raise RuntimeError("EC failure. Can not exit gracefully")
903
904         # Remove all pending tasks from the scheduler queue
905         for tid in list(self._scheduler.pending):
906             self._scheduler.remove(tid)
907
908         # Remove pending tasks from the workers queue
909         self._runner.empty()
910
911         self.release()
912
913         # Mark the EC state as TERMINATED
914         self._state = ECState.TERMINATED
915
916         # Stop processing thread
917         self._stop = True
918
919         # Notify condition to wake up the processing thread
920         self._notify()
921         
922         if self._thread.is_alive():
923            self._thread.join()
924
925     def schedule(self, date, callback, track = False):
926         """ Schedules a callback to be executed at time 'date'.
927
928             :param date: string containing execution time for the task.
929                     It can be expressed as an absolute time, using
930                     timestamp format, or as a relative time matching
931                     ^\d+.\d+(h|m|s|ms|us)$
932
933             :param callback: code to be executed for the task. Must be a
934                         Python function, and receives args and kwargs
935                         as arguments.
936
937             :param track: if set to True, the task will be retrievable with
938                     the get_task() method
939
940             :return : The Id of the task
941             :rtype: int
942             
943         """
944         timestamp = stabsformat(date)
945         task = Task(timestamp, callback)
946         task = self._scheduler.schedule(task)
947
948         if track:
949             self._tasks[task.id] = task
950
951         # Notify condition to wake up the processing thread
952         self._notify()
953
954         return task.id
955      
956     def _process(self):
957         """ Process scheduled tasks.
958
959         .. note::
960         
961         Tasks are scheduled by invoking the schedule method with a target 
962         callback and an execution time. 
963         The schedule method creates a new Task object with that callback 
964         and execution time, and pushes it into the '_scheduler' queue. 
965         The execution time and the order of arrival of tasks are used 
966         to order the tasks in the queue.
967
968         The _process method is executed in an independent thread held by 
969         the ExperimentController for as long as the experiment is running.
970         This method takes tasks from the '_scheduler' queue in a loop 
971         and processes them in parallel using multithreading. 
972         The environmental variable NEPI_NTHREADS can be used to control
973         the number of threads used to process tasks. The default value is 
974         50.
975
976         To execute tasks in parallel, a ParallelRunner (PR) object is used.
977         This object keeps a pool of threads (workers), and a queue of tasks
978         scheduled for 'immediate' execution. 
979         
980         On each iteration, the '_process' loop will take the next task that 
981         is scheduled for 'future' execution from the '_scheduler' queue, 
982         and if the execution time of that task is >= to the current time, 
983         it will push that task into the PR for 'immediate execution'. 
984         As soon as a worker is free, the PR will assign the next task to
985         that worker.
986
987         Upon receiving a task to execute, each PR worker (thread) will 
988         invoke the  _execute method of the EC, passing the task as 
989         argument.         
990         The _execute method will then invoke task.callback inside a 
991         try/except block. If an exception is raised by the tasks.callback, 
992         it will be trapped by the try block, logged to standard error 
993         (usually the console), and the task will be marked as failed.
994
995         """
996
997         self._runner.start()
998
999         while not self._stop:
1000             try:
1001                 self._cond.acquire()
1002
1003                 task = self._scheduler.next()
1004                 
1005                 if not task:
1006                     # No task to execute. Wait for a new task to be scheduled.
1007                     self._cond.wait()
1008                 else:
1009                     # The task timestamp is in the future. Wait for timeout 
1010                     # or until another task is scheduled.
1011                     now = tnow()
1012                     if now < task.timestamp:
1013                         # Calculate timeout in seconds
1014                         timeout = tdiffsec(task.timestamp, now)
1015
1016                         # Re-schedule task with the same timestamp
1017                         self._scheduler.schedule(task)
1018                         
1019                         task = None
1020
1021                         # Wait timeout or until a new task awakes the condition
1022                         self._cond.wait(timeout)
1023                
1024                 self._cond.release()
1025
1026                 if task:
1027                     # Process tasks in parallel
1028                     self._runner.put(self._execute, task)
1029             except: 
1030                 import traceback
1031                 err = traceback.format_exc()
1032                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1033
1034                 # Set the EC to FAILED state 
1035                 self._state = ECState.FAILED
1036             
1037                 # Set the FailureManager failure level to EC failure
1038                 self._fm.set_ec_failure()
1039
1040         self.logger.debug("Exiting the task processing loop ... ")
1041         
1042         self._runner.sync()
1043         self._runner.destroy()
1044
1045     def _execute(self, task):
1046         """ Executes a single task. 
1047
1048             :param task: Object containing the callback to execute
1049             :type task: Task
1050
1051         """
1052         try:
1053             # Invoke callback
1054             task.result = task.callback()
1055             task.status = TaskStatus.DONE
1056         except:
1057             import traceback
1058             err = traceback.format_exc()
1059             task.result = err
1060             task.status = TaskStatus.ERROR
1061             
1062             self.logger.error("Error occurred while executing task: %s" % err)
1063
1064     def _notify(self):
1065         """ Awakes the processing thread if it is blocked waiting 
1066         for new tasks to arrive
1067         
1068         """
1069         self._cond.acquire()
1070         self._cond.notify()
1071         self._cond.release()
1072