cf0e0bb701167cb810a07a589c95c38b77dbfa85
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54         self._abort = False
55
56     @property
57     def ec(self):
58         """ Returns the ExperimentController associated to this FailureManager 
59         
60         """
61         
62         return self._ec()
63
64     @property
65     def abort(self):
66         return self._abort
67
68     def eval_failure(self, guid):
69         if self._failure_level == FailureLevel.OK:
70             rm = self.get_resource(guid)
71             state = rm.state
72             critical = rm.get("critical")
73
74             if state == ResourceState.FAILED and critical:
75                 self._failure_level = FailureLevel.RM_FAILURE
76                 self._abort = True
77                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
78                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
79
80     def set_ec_failure(self):
81         self._failure_level = FailureLevel.EC_FAILURE
82
83 class ECState(object):
84     """ Possible states for an ExperimentController
85    
86     """
87     RUNNING = 1
88     FAILED = 2
89     TERMINATED = 3
90
91 class ExperimentController(object):
92     """
93     .. class:: Class Args :
94       
95         :param exp_id: Human readable identifier for the experiment scenario. 
96         :type exp_id: str
97
98     .. note::
99
100     An experiment, or scenario, is defined by a concrete set of resources,
101     behavior, configuration and interconnection of those resources. 
102     The Experiment Description (ED) is a detailed representation of a
103     single experiment. It contains all the necessary information to 
104     allow repeating the experiment. NEPI allows to describe
105     experiments by registering components (resources), configuring them
106     and interconnecting them.
107     
108     A same experiment (scenario) can be executed many times, generating 
109     different results. We call an experiment execution (instance) a 'run'.
110
111     The ExperimentController (EC), is the entity responsible of
112     managing an experiment run. The same scenario can be 
113     recreated (and re-run) by instantiating an EC and recreating 
114     the same experiment description. 
115
116     In NEPI, an experiment is represented as a graph of interconnected
117     resources. A resource is a generic concept in the sense that any
118     component taking part of an experiment, whether physical of
119     virtual, is considered a resource. A resources could be a host, 
120     a virtual machine, an application, a simulator, a IP address.
121
122     A ResourceManager (RM), is the entity responsible for managing a 
123     single resource. ResourceManagers are specific to a resource
124     type (i.e. An RM to control a Linux application will not be
125     the same as the RM used to control a ns-3 simulation).
126     To support a new type of resource in NEPI, a new RM must be 
127     implemented. NEPI already provides a variety of
128     RMs to control basic resources, and new can be extended from
129     the existing ones.
130
131     Through the EC interface the user can create ResourceManagers (RMs),
132     configure them and interconnect them, to describe an experiment.
133     Describing an experiment through the EC does not run the experiment.
134     Only when the 'deploy()' method is invoked on the EC, the EC will take 
135     actions to transform the 'described' experiment into a 'running' experiment.
136
137     While the experiment is running, it is possible to continue to
138     create/configure/connect RMs, and to deploy them to involve new
139     resources in the experiment (this is known as 'interactive' deployment).
140     
141     An experiments in NEPI is identified by a string id, 
142     which is either given by the user, or automatically generated by NEPI.  
143     The purpose of this identifier is to separate files and results that 
144     belong to different experiment scenarios. 
145     However, since a same 'experiment' can be run many times, the experiment
146     id is not enough to identify an experiment instance (run).
147     For this reason, the ExperimentController has two identifier, the 
148     exp_id, which can be re-used in different ExperimentController,
149     and the run_id, which is unique to one ExperimentController instance, and
150     is automatically generated by NEPI.
151         
152     """
153
154     def __init__(self, exp_id = None): 
155         super(ExperimentController, self).__init__()
156
157         # Logging
158         self._logger = logging.getLogger("ExperimentController")
159
160         # Run identifier. It identifies a concrete execution instance (run) 
161         # of an experiment.
162         # Since a same experiment (same configuration) can be executed many 
163         # times, this run_id permits to separate result files generated on 
164         # different experiment executions
165         self._run_id = tsformat()
166
167         # Experiment identifier. Usually assigned by the user
168         # Identifies the experiment scenario (i.e. configuration, 
169         # resources used, etc)
170         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
171
172         # generator of globally unique ids
173         self._guid_generator = guid.GuidGenerator()
174         
175         # Resource managers
176         self._resources = dict()
177
178         # Scheduler. It a queue that holds tasks scheduled for
179         # execution, and yields the next task to be executed 
180         # ordered by execution and arrival time
181         self._scheduler = HeapScheduler()
182
183         # Tasks
184         self._tasks = dict()
185
186         # RM groups (for deployment) 
187         self._groups = dict()
188
189         # generator of globally unique id for groups
190         self._group_id_generator = guid.GuidGenerator()
191
192         # Flag to stop processing thread
193         self._stop = False
194     
195         # Entity in charge of managing system failures
196         self._fm = FailureManager(self)
197
198         # EC state
199         self._state = ECState.RUNNING
200
201         # The runner is a pool of threads used to parallelize 
202         # execution of tasks
203         nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
204         self._runner = ParallelRun(maxthreads = nthreads)
205
206         # Event processing thread
207         self._cond = threading.Condition()
208         self._thread = threading.Thread(target = self._process)
209         self._thread.setDaemon(True)
210         self._thread.start()
211         
212     @property
213     def logger(self):
214         """ Returns the logger instance of the Experiment Controller
215
216         """
217         return self._logger
218
219     @property
220     def failure_level(self):
221         """ Returns the level of FAILURE of th experiment
222
223         """
224
225         return self._fm._failure_level
226
227     @property
228     def ecstate(self):
229         """ Returns the state of the Experiment Controller
230
231         """
232         return self._state
233
234     @property
235     def exp_id(self):
236         """ Returns the experiment id assigned by the user
237
238         """
239         return self._exp_id
240
241     @property
242     def run_id(self):
243         """ Returns the experiment instance (run) identifier (automatically 
244         generated)
245
246         """
247         return self._run_id
248
249     @property
250     def abort(self):
251         """ Returns True if the experiment has failed and should be interrupted,
252         False otherwise.
253
254         """
255         return self._fm.abort
256
257     def inform_failure(self, guid):
258         """ Reports a failure in a RM to the EC for evaluation
259
260             :param guid: Resource id
261             :type guid: int
262
263         """
264
265         return self._fm.eval_failure(guid)
266
267     def wait_finished(self, guids):
268         """ Blocking method that waits until all RMs in the 'guids' list 
269         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
270         RELEASED ), or until a failure in the experiment occurs 
271         (i.e. abort == True) 
272         
273             :param guids: List of guids
274             :type guids: list
275
276         """
277
278         def quit():
279             return self.abort
280
281         return self.wait(guids, state = ResourceState.STOPPED, 
282                 quit = quit)
283
284     def wait_started(self, guids):
285         """ Blocking method that waits until all RMs in the 'guids' list 
286         have reached a state >= STARTED, or until a failure in the 
287         experiment occurs (i.e. abort == True) 
288         
289             :param guids: List of guids
290             :type guids: list
291
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.STARTED, 
298                 quit = quit)
299
300     def wait_released(self, guids):
301         """ Blocking method that waits until all RMs in the 'guids' list 
302         have reached a state == RELEASED, or until the EC fails 
303         
304             :param guids: List of guids
305             :type guids: list
306
307         """
308
309         def quit():
310             return self._state == ECState.FAILED
311
312         return self.wait(guids, state = ResourceState.RELEASED, 
313                 quit = quit)
314
315     def wait_deployed(self, guids):
316         """ Blocking method that waits until all RMs in the 'guids' list 
317         have reached a state >= READY, or until a failure in the 
318         experiment occurs (i.e. abort == True) 
319         
320             :param guids: List of guids
321             :type guids: list
322
323         """
324
325         def quit():
326             return self.abort
327
328         return self.wait(guids, state = ResourceState.READY, 
329                 quit = quit)
330
331     def wait(self, guids, state, quit):
332         """ Blocking method that waits until all RMs in the 'guids' list 
333         have reached a state >= 'state', or until the 'quit' callback
334         yields True
335            
336             :param guids: List of guids
337             :type guids: list
338         
339         """
340         if isinstance(guids, int):
341             guids = [guids]
342
343         # Make a copy to avoid modifying the original guids list
344         guids = list(guids)
345
346         while True:
347             # If there are no more guids to wait for
348             # or the quit function returns True, exit the loop
349             if len(guids) == 0 or quit():
350                 break
351
352             # If a guid reached one of the target states, remove it from list
353             guid = guids.pop()
354             rm = self.get_resource(guid)
355             rstate = rm.state
356             
357             if rstate >= state:
358                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
359                     rm.get_rtype(), guid, rstate, state))
360             else:
361                 # Debug...
362                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
363                     guid, rstate, state))
364
365                 guids.append(guid)
366
367                 time.sleep(0.5)
368   
369     def get_task(self, tid):
370         """ Returns a task by its id
371
372             :param tid: Id of the task
373             :type tid: int
374             
375             :rtype: Task
376             
377         """
378         return self._tasks.get(tid)
379
380     def get_resource(self, guid):
381         """ Returns a registered ResourceManager by its guid
382
383             :param guid: Id of the task
384             :type guid: int
385             
386             :rtype: ResourceManager
387             
388         """
389         rm = self._resources.get(guid)
390         return rm
391
392     def remove_resource(self, guid):
393         del self._resources[guid]
394
395     @property
396     def resources(self):
397         """ Returns the set() of guids of all the ResourceManager
398
399             :return: Set of all RM guids
400             :rtype: set
401
402         """
403         keys = self._resources.keys()
404
405         return keys
406
407     def register_resource(self, rtype, guid = None):
408         """ Registers a new ResourceManager of type 'rtype' in the experiment
409         
410         This method will assign a new 'guid' for the RM, if no guid
411         is specified.
412
413             :param rtype: Type of the RM
414             :type rtype: str
415
416             :return: Guid of the RM
417             :rtype: int
418             
419         """
420         # Get next available guid
421         guid = self._guid_generator.next(guid)
422         
423         # Instantiate RM
424         rm = ResourceFactory.create(rtype, self, guid)
425
426         # Store RM
427         self._resources[guid] = rm
428
429         return guid
430
431     def get_attributes(self, guid):
432         """ Returns all the attributes of the RM with guid 'guid'
433
434             :param guid: Guid of the RM
435             :type guid: int
436
437             :return: List of attributes
438             :rtype: list
439
440         """
441         rm = self.get_resource(guid)
442         return rm.get_attributes()
443
444     def get_attribute(self, guid, name):
445         """ Returns the attribute 'name' of the RM with guid 'guid'
446
447             :param guid: Guid of the RM
448             :type guid: int
449
450             :param name: Name of the attribute
451             :type name: str
452
453             :return: The attribute with name 'name'
454             :rtype: Attribute
455
456         """
457         rm = self.get_resource(guid)
458         return rm.get_attribute(name)
459
460     def register_connection(self, guid1, guid2):
461         """ Registers a connection between a RM with guid 'guid1'
462         and another RM with guid 'guid2'. 
463     
464         The order of the in which the two guids are provided is not
465         important, since the connection relationship is symmetric.
466
467             :param guid1: First guid to connect
468             :type guid1: ResourceManager
469
470             :param guid2: Second guid to connect
471             :type guid: ResourceManager
472
473         """
474         rm1 = self.get_resource(guid1)
475         rm2 = self.get_resource(guid2)
476
477         rm1.register_connection(guid2)
478         rm2.register_connection(guid1)
479
480     def register_condition(self, guids1, action, guids2, state,
481             time = None):
482         """ Registers an action START, STOP or DEPLOY for all RM on list
483         guids1 to occur at time 'time' after all elements in list guids2 
484         have reached state 'state'.
485
486             :param guids1: List of guids of RMs subjected to action
487             :type guids1: list
488
489             :param action: Action to perform (either START, STOP or DEPLOY)
490             :type action: ResourceAction
491
492             :param guids2: List of guids of RMs to we waited for
493             :type guids2: list
494
495             :param state: State to wait for on RMs of list guids2 (STARTED,
496                 STOPPED, etc)
497             :type state: ResourceState
498
499             :param time: Time to wait after guids2 has reached status 
500             :type time: string
501
502         """
503         if isinstance(guids1, int):
504             guids1 = [guids1]
505         if isinstance(guids2, int):
506             guids2 = [guids2]
507
508         for guid1 in guids1:
509             rm = self.get_resource(guid1)
510             rm.register_condition(action, guids2, state, time)
511
512     def enable_trace(self, guid, name):
513         """ Enables a trace to be collected during the experiment run
514
515             :param name: Name of the trace
516             :type name: str
517
518         """
519         rm = self.get_resource(guid)
520         rm.enable_trace(name)
521
522     def trace_enabled(self, guid, name):
523         """ Returns True if the trace of name 'name' is enabled
524
525             :param name: Name of the trace
526             :type name: str
527
528         """
529         rm = self.get_resource(guid)
530         return rm.trace_enabled(name)
531
532     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
533         """ Returns information on a collected trace, the trace stream or 
534         blocks (chunks) of the trace stream
535
536             :param name: Name of the trace
537             :type name: str
538
539             :param attr: Can be one of:
540                          - TraceAttr.ALL (complete trace content), 
541                          - TraceAttr.STREAM (block in bytes to read starting 
542                                 at offset),
543                          - TraceAttr.PATH (full path to the trace file),
544                          - TraceAttr.SIZE (size of trace file). 
545             :type attr: str
546
547             :param block: Number of bytes to retrieve from trace, when attr is 
548                 TraceAttr.STREAM 
549             :type name: int
550
551             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
552             :type name: int
553
554             :rtype: str
555
556         """
557         rm = self.get_resource(guid)
558         return rm.trace(name, attr, block, offset)
559
560     def get_traces(self, guid):
561         """ Returns the list of the trace names of the RM with guid 'guid'
562
563             :param guid: Guid of the RM
564             :type guid: int
565
566             :return: List of trace names
567             :rtype: list
568
569         """
570         rm = self.get_resource(guid)
571         return rm.get_traces()
572
573
574     def discover(self, guid):
575         """ Discovers an available resource matching the criteria defined
576         by the RM with guid 'guid', and associates that resource to the RM
577
578         Not all RM types require (or are capable of) performing resource 
579         discovery. For the RM types which are not capable of doing so, 
580         invoking this method does not have any consequences. 
581
582             :param guid: Guid of the RM
583             :type guid: int
584
585         """
586         rm = self.get_resource(guid)
587         return rm.discover()
588
589     def provision(self, guid):
590         """ Provisions the resource associated to the RM with guid 'guid'.
591
592         Provisioning means making a resource 'accessible' to the user. 
593         Not all RM types require (or are capable of) performing resource 
594         provisioning. For the RM types which are not capable of doing so, 
595         invoking this method does not have any consequences. 
596
597             :param guid: Guid of the RM
598             :type guid: int
599
600         """
601         rm = self.get_resource(guid)
602         return rm.provision()
603
604     def get(self, guid, name):
605         """ Returns the value of the attribute with name 'name' on the
606         RM with guid 'guid'
607
608             :param guid: Guid of the RM
609             :type guid: int
610
611             :param name: Name of the attribute 
612             :type name: str
613
614             :return: The value of the attribute with name 'name'
615
616         """
617         rm = self.get_resource(guid)
618         return rm.get(name)
619
620     def set(self, guid, name, value):
621         """ Modifies the value of the attribute with name 'name' on the 
622         RM with guid 'guid'.
623
624             :param guid: Guid of the RM
625             :type guid: int
626
627             :param name: Name of the attribute
628             :type name: str
629
630             :param value: Value of the attribute
631
632         """
633         rm = self.get_resource(guid)
634         rm.set(name, value)
635
636     def get_global(self, rtype, name):
637         """ Returns the value of the global attribute with name 'name' on the
638         RMs of rtype 'rtype'.
639
640             :param guid: Guid of the RM
641             :type guid: int
642
643             :param name: Name of the attribute 
644             :type name: str
645
646             :return: The value of the attribute with name 'name'
647
648         """
649         rclass = ResourceFactory.get_resource_type(rtype)
650         return rclass.get_global(name)
651
652     def set_global(self, rtype, name, value):
653         """ Modifies the value of the global attribute with name 'name' on the 
654         RMs of with rtype 'rtype'.
655
656             :param guid: Guid of the RM
657             :type guid: int
658
659             :param name: Name of the attribute
660             :type name: str
661
662             :param value: Value of the attribute
663
664         """
665         rclass = ResourceFactory.get_resource_type(rtype)
666         return rclass.set_global(name, value)
667
668     def state(self, guid, hr = False):
669         """ Returns the state of a resource
670
671             :param guid: Resource guid
672             :type guid: integer
673
674             :param hr: Human readable. Forces return of a 
675                 status string instead of a number 
676             :type hr: boolean
677
678         """
679         rm = self.get_resource(guid)
680         state = rm.state
681
682         if hr:
683             return ResourceState2str.get(state)
684
685         return state
686
687     def stop(self, guid):
688         """ Stops the RM with guid 'guid'
689
690         Stopping a RM means that the resource it controls will
691         no longer take part of the experiment.
692
693             :param guid: Guid of the RM
694             :type guid: int
695
696         """
697         rm = self.get_resource(guid)
698         return rm.stop()
699
700     def start(self, guid):
701         """ Starts the RM with guid 'guid'
702
703         Starting a RM means that the resource it controls will
704         begin taking part of the experiment.
705
706             :param guid: Guid of the RM
707             :type guid: int
708
709         """
710         rm = self.get_resource(guid)
711         return rm.start()
712
713     def get_start_time(self, guid):
714         """ Returns the start time of the RM as a timestamp """
715         rm = self.get_resource(guid)
716         return rm.start_time
717
718     def get_stop_time(self, guid):
719         """ Returns the stop time of the RM as a timestamp """
720         rm = self.get_resource(guid)
721         return rm.stop_time
722
723     def get_discover_time(self, guid):
724         """ Returns the discover time of the RM as a timestamp """
725         rm = self.get_resource(guid)
726         return rm.discover_time
727
728     def get_provision_time(self, guid):
729         """ Returns the provision time of the RM as a timestamp """
730         rm = self.get_resource(guid)
731         return rm.provision_time
732
733     def get_ready_time(self, guid):
734         """ Returns the deployment time of the RM as a timestamp """
735         rm = self.get_resource(guid)
736         return rm.ready_time
737
738     def get_release_time(self, guid):
739         """ Returns the release time of the RM as a timestamp """
740         rm = self.get_resource(guid)
741         return rm.release_time
742
743     def get_failed_time(self, guid):
744         """ Returns the time failure occured for the RM as a timestamp """
745         rm = self.get_resource(guid)
746         return rm.failed_time
747
748     def set_with_conditions(self, name, value, guids1, guids2, state,
749             time = None):
750         """ Modifies the value of attribute with name 'name' on all RMs 
751         on the guids1 list when time 'time' has elapsed since all 
752         elements in guids2 list have reached state 'state'.
753
754             :param name: Name of attribute to set in RM
755             :type name: string
756
757             :param value: Value of attribute to set in RM
758             :type name: string
759
760             :param guids1: List of guids of RMs subjected to action
761             :type guids1: list
762
763             :param action: Action to register (either START or STOP)
764             :type action: ResourceAction
765
766             :param guids2: List of guids of RMs to we waited for
767             :type guids2: list
768
769             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
770             :type state: ResourceState
771
772             :param time: Time to wait after guids2 has reached status 
773             :type time: string
774
775         """
776         if isinstance(guids1, int):
777             guids1 = [guids1]
778         if isinstance(guids2, int):
779             guids2 = [guids2]
780
781         for guid1 in guids1:
782             rm = self.get_resource(guid)
783             rm.set_with_conditions(name, value, guids2, state, time)
784
785     def deploy(self, guids = None, wait_all_ready = True, group = None):
786         """ Deploys all ResourceManagers in the guids list. 
787         
788         If the argument 'guids' is not given, all RMs with state NEW
789         are deployed.
790
791             :param guids: List of guids of RMs to deploy
792             :type guids: list
793
794             :param wait_all_ready: Wait until all RMs are ready in
795                 order to start the RMs
796             :type guid: int
797
798             :param group: Id of deployment group in which to deploy RMs
799             :type group: int
800
801         """
802         self.logger.debug(" ------- DEPLOY START ------ ")
803
804         if not guids:
805             # If no guids list was passed, all 'NEW' RMs will be deployed
806             guids = []
807             for guid, rm in self._resources.iteritems():
808                 if rm.state == ResourceState.NEW:
809                     guids.append(guid)
810                 
811         if isinstance(guids, int):
812             guids = [guids]
813
814         # Create deployment group
815         # New guids can be added to a same deployment group later on
816         new_group = False
817         if not group:
818             new_group = True
819             group = self._group_id_generator.next()
820
821         if group not in self._groups:
822             self._groups[group] = []
823
824         self._groups[group].extend(guids)
825
826         def wait_all_and_start(group):
827             # Function that checks if all resources are READY
828             # before scheduling a start_with_conditions for each RM
829             reschedule = False
830             
831             # Get all guids in group
832             guids = self._groups[group]
833
834             for guid in guids:
835                 if self.state(guid) < ResourceState.READY:
836                     reschedule = True
837                     break
838
839             if reschedule:
840                 callback = functools.partial(wait_all_and_start, group)
841                 self.schedule("1s", callback)
842             else:
843                 # If all resources are ready, we schedule the start
844                 for guid in guids:
845                     rm = self.get_resource(guid)
846                     self.schedule("0s", rm.start_with_conditions)
847
848                     if rm.conditions.get(ResourceAction.STOP):
849                         # Only if the RM has STOP conditions we
850                         # schedule a stop. Otherwise the RM will stop immediately
851                         self.schedule("0s", rm.stop_with_conditions)
852
853         if wait_all_ready and new_group:
854             # Schedule a function to check that all resources are
855             # READY, and only then schedule the start.
856             # This aims at reducing the number of tasks looping in the 
857             # scheduler. 
858             # Instead of having many start tasks, we will have only one for 
859             # the whole group.
860             callback = functools.partial(wait_all_and_start, group)
861             self.schedule("0s", callback)
862
863         for guid in guids:
864             rm = self.get_resource(guid)
865             rm.deployment_group = group
866             self.schedule("0s", rm.deploy_with_conditions)
867
868             if not wait_all_ready:
869                 self.schedule("0s", rm.start_with_conditions)
870
871                 if rm.conditions.get(ResourceAction.STOP):
872                     # Only if the RM has STOP conditions we
873                     # schedule a stop. Otherwise the RM will stop immediately
874                     self.schedule("0s", rm.stop_with_conditions)
875
876     def release(self, guids = None):
877         """ Releases all ResourceManagers in the guids list.
878
879         If the argument 'guids' is not given, all RMs registered
880         in the experiment are released.
881
882             :param guids: List of RM guids
883             :type guids: list
884
885         """
886         if isinstance(guids, int):
887             guids = [guids]
888
889         if not guids:
890             guids = self.resources
891
892         for guid in guids:
893             rm = self.get_resource(guid)
894             self.schedule("0s", rm.release)
895
896         self.wait_released(guids)
897
898         for guid in guids:
899             if self.get(guid, "hardRelease"):
900                 self.remove_resource(guid)
901         
902     def shutdown(self):
903         """ Releases all resources and stops the ExperimentController
904
905         """
906         # If there was a major failure we can't exit gracefully
907         if self._state == ECState.FAILED:
908             raise RuntimeError("EC failure. Can not exit gracefully")
909
910         # Remove all pending tasks from the scheduler queue
911         for tid in list(self._scheduler.pending):
912             self._scheduler.remove(tid)
913
914         # Remove pending tasks from the workers queue
915         self._runner.empty()
916
917         self.release()
918
919         # Mark the EC state as TERMINATED
920         self._state = ECState.TERMINATED
921
922         # Stop processing thread
923         self._stop = True
924
925         # Notify condition to wake up the processing thread
926         self._notify()
927         
928         if self._thread.is_alive():
929            self._thread.join()
930
931     def schedule(self, date, callback, track = False):
932         """ Schedules a callback to be executed at time 'date'.
933
934             :param date: string containing execution time for the task.
935                     It can be expressed as an absolute time, using
936                     timestamp format, or as a relative time matching
937                     ^\d+.\d+(h|m|s|ms|us)$
938
939             :param callback: code to be executed for the task. Must be a
940                         Python function, and receives args and kwargs
941                         as arguments.
942
943             :param track: if set to True, the task will be retrievable with
944                     the get_task() method
945
946             :return : The Id of the task
947             :rtype: int
948             
949         """
950         timestamp = stabsformat(date)
951         task = Task(timestamp, callback)
952         task = self._scheduler.schedule(task)
953
954         if track:
955             self._tasks[task.id] = task
956
957         # Notify condition to wake up the processing thread
958         self._notify()
959
960         return task.id
961      
962     def _process(self):
963         """ Process scheduled tasks.
964
965         .. note::
966         
967         Tasks are scheduled by invoking the schedule method with a target 
968         callback and an execution time. 
969         The schedule method creates a new Task object with that callback 
970         and execution time, and pushes it into the '_scheduler' queue. 
971         The execution time and the order of arrival of tasks are used 
972         to order the tasks in the queue.
973
974         The _process method is executed in an independent thread held by 
975         the ExperimentController for as long as the experiment is running.
976         This method takes tasks from the '_scheduler' queue in a loop 
977         and processes them in parallel using multithreading. 
978         The environmental variable NEPI_NTHREADS can be used to control
979         the number of threads used to process tasks. The default value is 
980         50.
981
982         To execute tasks in parallel, a ParallelRunner (PR) object is used.
983         This object keeps a pool of threads (workers), and a queue of tasks
984         scheduled for 'immediate' execution. 
985         
986         On each iteration, the '_process' loop will take the next task that 
987         is scheduled for 'future' execution from the '_scheduler' queue, 
988         and if the execution time of that task is >= to the current time, 
989         it will push that task into the PR for 'immediate execution'. 
990         As soon as a worker is free, the PR will assign the next task to
991         that worker.
992
993         Upon receiving a task to execute, each PR worker (thread) will 
994         invoke the  _execute method of the EC, passing the task as 
995         argument.         
996         The _execute method will then invoke task.callback inside a 
997         try/except block. If an exception is raised by the tasks.callback, 
998         it will be trapped by the try block, logged to standard error 
999         (usually the console), and the task will be marked as failed.
1000
1001         """
1002
1003         self._runner.start()
1004
1005         while not self._stop:
1006             try:
1007                 self._cond.acquire()
1008
1009                 task = self._scheduler.next()
1010                 
1011                 if not task:
1012                     # No task to execute. Wait for a new task to be scheduled.
1013                     self._cond.wait()
1014                 else:
1015                     # The task timestamp is in the future. Wait for timeout 
1016                     # or until another task is scheduled.
1017                     now = tnow()
1018                     if now < task.timestamp:
1019                         # Calculate timeout in seconds
1020                         timeout = tdiffsec(task.timestamp, now)
1021
1022                         # Re-schedule task with the same timestamp
1023                         self._scheduler.schedule(task)
1024                         
1025                         task = None
1026
1027                         # Wait timeout or until a new task awakes the condition
1028                         self._cond.wait(timeout)
1029                
1030                 self._cond.release()
1031
1032                 if task:
1033                     # Process tasks in parallel
1034                     self._runner.put(self._execute, task)
1035             except: 
1036                 import traceback
1037                 err = traceback.format_exc()
1038                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1039
1040                 # Set the EC to FAILED state 
1041                 self._state = ECState.FAILED
1042             
1043                 # Set the FailureManager failure level to EC failure
1044                 self._fm.set_ec_failure()
1045
1046         self.logger.debug("Exiting the task processing loop ... ")
1047         
1048         self._runner.sync()
1049         self._runner.destroy()
1050
1051     def _execute(self, task):
1052         """ Executes a single task. 
1053
1054             :param task: Object containing the callback to execute
1055             :type task: Task
1056
1057         """
1058         try:
1059             # Invoke callback
1060             task.result = task.callback()
1061             task.status = TaskStatus.DONE
1062         except:
1063             import traceback
1064             err = traceback.format_exc()
1065             task.result = err
1066             task.status = TaskStatus.ERROR
1067             
1068             self.logger.error("Error occurred while executing task: %s" % err)
1069
1070     def _notify(self):
1071         """ Awakes the processing thread if it is blocked waiting 
1072         for new tasks to arrive
1073         
1074         """
1075         self._cond.acquire()
1076         self._cond.notify()
1077         self._cond.release()
1078