Added serialization to EC
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28
29 # TODO: use multiprocessing instead of threading
30 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
31
32 import functools
33 import logging
34 import os
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state """
42     OK = 1
43     RM_FAILURE = 2
44     EC_FAILURE = 3
45
46 class FailureManager(object):
47     """ The FailureManager is responsible for handling errors
48     and deciding whether an experiment should be aborted or not
49
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55         self._abort = False
56
57     @property
58     def ec(self):
59         """ Returns the ExperimentController associated to this FailureManager 
60         
61         """
62         
63         return self._ec()
64
65     @property
66     def abort(self):
67         return self._abort
68
69     def eval_failure(self, guid):
70         if self._failure_level == FailureLevel.OK:
71             rm = self.ec.get_resource(guid)
72             state = rm.state
73             critical = rm.get("critical")
74
75             if state == ResourceState.FAILED and critical:
76                 self._failure_level = FailureLevel.RM_FAILURE
77                 self._abort = True
78                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
79                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
80
81     def set_ec_failure(self):
82         self._failure_level = FailureLevel.EC_FAILURE
83
84 class ECState(object):
85     """ Possible states for an ExperimentController
86    
87     """
88     RUNNING = 1
89     FAILED = 2
90     TERMINATED = 3
91
92 class ExperimentController(object):
93     """
94     .. class:: Class Args :
95       
96         :param exp_id: Human readable identifier for the experiment scenario. 
97         :type exp_id: str
98
99     .. note::
100
101     An experiment, or scenario, is defined by a concrete set of resources,
102     behavior, configuration and interconnection of those resources. 
103     The Experiment Description (ED) is a detailed representation of a
104     single experiment. It contains all the necessary information to 
105     allow repeating the experiment. NEPI allows to describe
106     experiments by registering components (resources), configuring them
107     and interconnecting them.
108     
109     A same experiment (scenario) can be executed many times, generating 
110     different results. We call an experiment execution (instance) a 'run'.
111
112     The ExperimentController (EC), is the entity responsible of
113     managing an experiment run. The same scenario can be 
114     recreated (and re-run) by instantiating an EC and recreating 
115     the same experiment description. 
116
117     In NEPI, an experiment is represented as a graph of interconnected
118     resources. A resource is a generic concept in the sense that any
119     component taking part of an experiment, whether physical of
120     virtual, is considered a resource. A resources could be a host, 
121     a virtual machine, an application, a simulator, a IP address.
122
123     A ResourceManager (RM), is the entity responsible for managing a 
124     single resource. ResourceManagers are specific to a resource
125     type (i.e. An RM to control a Linux application will not be
126     the same as the RM used to control a ns-3 simulation).
127     To support a new type of resource in NEPI, a new RM must be 
128     implemented. NEPI already provides a variety of
129     RMs to control basic resources, and new can be extended from
130     the existing ones.
131
132     Through the EC interface the user can create ResourceManagers (RMs),
133     configure them and interconnect them, to describe an experiment.
134     Describing an experiment through the EC does not run the experiment.
135     Only when the 'deploy()' method is invoked on the EC, the EC will take 
136     actions to transform the 'described' experiment into a 'running' experiment.
137
138     While the experiment is running, it is possible to continue to
139     create/configure/connect RMs, and to deploy them to involve new
140     resources in the experiment (this is known as 'interactive' deployment).
141     
142     An experiments in NEPI is identified by a string id, 
143     which is either given by the user, or automatically generated by NEPI.  
144     The purpose of this identifier is to separate files and results that 
145     belong to different experiment scenarios. 
146     However, since a same 'experiment' can be run many times, the experiment
147     id is not enough to identify an experiment instance (run).
148     For this reason, the ExperimentController has two identifier, the 
149     exp_id, which can be re-used in different ExperimentController,
150     and the run_id, which is unique to one ExperimentController instance, and
151     is automatically generated by NEPI.
152         
153     """
154
155     @classmethod
156     def load(cls, path, format = SFormats.XML):
157         serializer = ECSerializer()
158         ec = serializer.load(path)
159         return ec
160
161     def __init__(self, exp_id = None): 
162         super(ExperimentController, self).__init__()
163
164         # Logging
165         self._logger = logging.getLogger("ExperimentController")
166
167         # Run identifier. It identifies a concrete execution instance (run) 
168         # of an experiment.
169         # Since a same experiment (same configuration) can be executed many 
170         # times, this run_id permits to separate result files generated on 
171         # different experiment executions
172         self._run_id = tsformat()
173
174         # Experiment identifier. Usually assigned by the user
175         # Identifies the experiment scenario (i.e. configuration, 
176         # resources used, etc)
177         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
178
179         # generator of globally unique ids
180         self._guid_generator = guid.GuidGenerator()
181         
182         # Resource managers
183         self._resources = dict()
184
185         # Scheduler. It a queue that holds tasks scheduled for
186         # execution, and yields the next task to be executed 
187         # ordered by execution and arrival time
188         self._scheduler = HeapScheduler()
189
190         # Tasks
191         self._tasks = dict()
192
193         # RM groups (for deployment) 
194         self._groups = dict()
195
196         # generator of globally unique id for groups
197         self._group_id_generator = guid.GuidGenerator()
198
199         # Flag to stop processing thread
200         self._stop = False
201     
202         # Entity in charge of managing system failures
203         self._fm = FailureManager(self)
204
205         # EC state
206         self._state = ECState.RUNNING
207
208         # The runner is a pool of threads used to parallelize 
209         # execution of tasks
210         self._nthreads = 20
211         self._runner = None
212
213         # Event processing thread
214         self._cond = threading.Condition()
215         self._thread = threading.Thread(target = self._process)
216         self._thread.setDaemon(True)
217         self._thread.start()
218         
219     @property
220     def logger(self):
221         """ Returns the logger instance of the Experiment Controller
222
223         """
224         return self._logger
225
226     @property
227     def failure_level(self):
228         """ Returns the level of FAILURE of th experiment
229
230         """
231
232         return self._fm._failure_level
233
234     @property
235     def ecstate(self):
236         """ Returns the state of the Experiment Controller
237
238         """
239         return self._state
240
241     @property
242     def exp_id(self):
243         """ Returns the experiment id assigned by the user
244
245         """
246         return self._exp_id
247
248     @property
249     def run_id(self):
250         """ Returns the experiment instance (run) identifier (automatically 
251         generated)
252
253         """
254         return self._run_id
255
256     @property
257     def nthreads(self):
258         """ Returns the number of processing nthreads used
259
260         """
261         return self._nthreads
262
263  
264     @property
265     def abort(self):
266         """ Returns True if the experiment has failed and should be interrupted,
267         False otherwise.
268
269         """
270         return self._fm.abort
271
272     def inform_failure(self, guid):
273         """ Reports a failure in a RM to the EC for evaluation
274
275             :param guid: Resource id
276             :type guid: int
277
278         """
279
280         return self._fm.eval_failure(guid)
281
282     def wait_finished(self, guids):
283         """ Blocking method that waits until all RMs in the 'guids' list 
284         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
285         RELEASED ), or until a failure in the experiment occurs 
286         (i.e. abort == True) 
287         
288             :param guids: List of guids
289             :type guids: list
290
291         """
292
293         def quit():
294             return self.abort
295
296         return self.wait(guids, state = ResourceState.STOPPED, 
297                 quit = quit)
298
299     def wait_started(self, guids):
300         """ Blocking method that waits until all RMs in the 'guids' list 
301         have reached a state >= STARTED, or until a failure in the 
302         experiment occurs (i.e. abort == True) 
303         
304             :param guids: List of guids
305             :type guids: list
306
307         """
308
309         def quit():
310             return self.abort
311
312         return self.wait(guids, state = ResourceState.STARTED, 
313                 quit = quit)
314
315     def wait_released(self, guids):
316         """ Blocking method that waits until all RMs in the 'guids' list 
317         have reached a state == RELEASED, or until the EC fails 
318         
319             :param guids: List of guids
320             :type guids: list
321
322         """
323
324         def quit():
325             return self._state == ECState.FAILED
326
327         return self.wait(guids, state = ResourceState.RELEASED, 
328                 quit = quit)
329
330     def wait_deployed(self, guids):
331         """ Blocking method that waits until all RMs in the 'guids' list 
332         have reached a state >= READY, or until a failure in the 
333         experiment occurs (i.e. abort == True) 
334         
335             :param guids: List of guids
336             :type guids: list
337
338         """
339
340         def quit():
341             return self.abort
342
343         return self.wait(guids, state = ResourceState.READY, 
344                 quit = quit)
345
346     def wait(self, guids, state, quit):
347         """ Blocking method that waits until all RMs in the 'guids' list 
348         have reached a state >= 'state', or until the 'quit' callback
349         yields True
350            
351             :param guids: List of guids
352             :type guids: list
353         
354         """
355         if isinstance(guids, int):
356             guids = [guids]
357
358         # Make a copy to avoid modifying the original guids list
359         guids = list(guids)
360
361         while True:
362             # If there are no more guids to wait for
363             # or the quit function returns True, exit the loop
364             if len(guids) == 0 or quit():
365                 break
366
367             # If a guid reached one of the target states, remove it from list
368             guid = guids.pop()
369             rm = self.get_resource(guid)
370             rstate = rm.state
371             
372             if rstate >= state:
373                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
374                     rm.get_rtype(), guid, rstate, state))
375             else:
376                 # Debug...
377                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
378                     guid, rstate, state))
379
380                 guids.append(guid)
381
382                 time.sleep(0.5)
383
384     def serialize(self, format = SFormats.XML):
385         serializer = ECSerializer()
386         sec = serializer.load(self, format = format)
387         return sec
388
389     def save(self, path, format = SFormats.XML):
390         serializer = ECSerializer()
391         path = serializer.save(self, path, format = format)
392         return path
393
394     def get_task(self, tid):
395         """ Returns a task by its id
396
397             :param tid: Id of the task
398             :type tid: int
399             
400             :rtype: Task
401             
402         """
403         return self._tasks.get(tid)
404
405     def get_resource(self, guid):
406         """ Returns a registered ResourceManager by its guid
407
408             :param guid: Id of the resource
409             :type guid: int
410             
411             :rtype: ResourceManager
412             
413         """
414         rm = self._resources.get(guid)
415         return rm
416
417     def get_resources_by_type(self, rtype):
418         """ Returns a registered ResourceManager by its guid
419
420             :param rtype: Resource type
421             :type rtype: string
422             
423             :rtype: list of ResourceManagers
424             
425         """
426         rms = []
427         for guid, rm in self._resources.iteritems():
428             if rm.get_rtype() == type: 
429                 rms.append(rm)
430         return rms
431
432     def remove_resource(self, guid):
433         del self._resources[guid]
434
435     @property
436     def resources(self):
437         """ Returns the set() of guids of all the ResourceManager
438
439             :return: Set of all RM guids
440             :rtype: set
441
442         """
443         keys = self._resources.keys()
444
445         return keys
446
447     def register_resource(self, rtype, guid = None):
448         """ Registers a new ResourceManager of type 'rtype' in the experiment
449         
450         This method will assign a new 'guid' for the RM, if no guid
451         is specified.
452
453             :param rtype: Type of the RM
454             :type rtype: str
455
456             :return: Guid of the RM
457             :rtype: int
458             
459         """
460         # Get next available guid
461         guid = self._guid_generator.next(guid)
462         
463         # Instantiate RM
464         rm = ResourceFactory.create(rtype, self, guid)
465
466         # Store RM
467         self._resources[guid] = rm
468
469         return guid
470
471     def get_attributes(self, guid):
472         """ Returns all the attributes of the RM with guid 'guid'
473
474             :param guid: Guid of the RM
475             :type guid: int
476
477             :return: List of attributes
478             :rtype: list
479
480         """
481         rm = self.get_resource(guid)
482         return rm.get_attributes()
483
484     def get_attribute(self, guid, name):
485         """ Returns the attribute 'name' of the RM with guid 'guid'
486
487             :param guid: Guid of the RM
488             :type guid: int
489
490             :param name: Name of the attribute
491             :type name: str
492
493             :return: The attribute with name 'name'
494             :rtype: Attribute
495
496         """
497         rm = self.get_resource(guid)
498         return rm.get_attribute(name)
499
500     def register_connection(self, guid1, guid2):
501         """ Registers a connection between a RM with guid 'guid1'
502         and another RM with guid 'guid2'. 
503     
504         The order of the in which the two guids are provided is not
505         important, since the connection relationship is symmetric.
506
507             :param guid1: First guid to connect
508             :type guid1: ResourceManager
509
510             :param guid2: Second guid to connect
511             :type guid: ResourceManager
512
513         """
514         rm1 = self.get_resource(guid1)
515         rm2 = self.get_resource(guid2)
516
517         rm1.register_connection(guid2)
518         rm2.register_connection(guid1)
519
520     def register_condition(self, guids1, action, guids2, state,
521             time = None):
522         """ Registers an action START, STOP or DEPLOY for all RM on list
523         guids1 to occur at time 'time' after all elements in list guids2 
524         have reached state 'state'.
525
526             :param guids1: List of guids of RMs subjected to action
527             :type guids1: list
528
529             :param action: Action to perform (either START, STOP or DEPLOY)
530             :type action: ResourceAction
531
532             :param guids2: List of guids of RMs to we waited for
533             :type guids2: list
534
535             :param state: State to wait for on RMs of list guids2 (STARTED,
536                 STOPPED, etc)
537             :type state: ResourceState
538
539             :param time: Time to wait after guids2 has reached status 
540             :type time: string
541
542         """
543         if isinstance(guids1, int):
544             guids1 = [guids1]
545         if isinstance(guids2, int):
546             guids2 = [guids2]
547
548         for guid1 in guids1:
549             rm = self.get_resource(guid1)
550             rm.register_condition(action, guids2, state, time)
551
552     def enable_trace(self, guid, name):
553         """ Enables a trace to be collected during the experiment run
554
555             :param name: Name of the trace
556             :type name: str
557
558         """
559         rm = self.get_resource(guid)
560         rm.enable_trace(name)
561
562     def trace_enabled(self, guid, name):
563         """ Returns True if the trace of name 'name' is enabled
564
565             :param name: Name of the trace
566             :type name: str
567
568         """
569         rm = self.get_resource(guid)
570         return rm.trace_enabled(name)
571
572     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
573         """ Returns information on a collected trace, the trace stream or 
574         blocks (chunks) of the trace stream
575
576             :param name: Name of the trace
577             :type name: str
578
579             :param attr: Can be one of:
580                          - TraceAttr.ALL (complete trace content), 
581                          - TraceAttr.STREAM (block in bytes to read starting 
582                                 at offset),
583                          - TraceAttr.PATH (full path to the trace file),
584                          - TraceAttr.SIZE (size of trace file). 
585             :type attr: str
586
587             :param block: Number of bytes to retrieve from trace, when attr is 
588                 TraceAttr.STREAM 
589             :type name: int
590
591             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
592             :type name: int
593
594             :rtype: str
595
596         """
597         rm = self.get_resource(guid)
598         return rm.trace(name, attr, block, offset)
599
600     def get_traces(self, guid):
601         """ Returns the list of the trace names of the RM with guid 'guid'
602
603             :param guid: Guid of the RM
604             :type guid: int
605
606             :return: List of trace names
607             :rtype: list
608
609         """
610         rm = self.get_resource(guid)
611         return rm.get_traces()
612
613
614     def discover(self, guid):
615         """ Discovers an available resource matching the criteria defined
616         by the RM with guid 'guid', and associates that resource to the RM
617
618         Not all RM types require (or are capable of) performing resource 
619         discovery. For the RM types which are not capable of doing so, 
620         invoking this method does not have any consequences. 
621
622             :param guid: Guid of the RM
623             :type guid: int
624
625         """
626         rm = self.get_resource(guid)
627         return rm.discover()
628
629     def provision(self, guid):
630         """ Provisions the resource associated to the RM with guid 'guid'.
631
632         Provisioning means making a resource 'accessible' to the user. 
633         Not all RM types require (or are capable of) performing resource 
634         provisioning. For the RM types which are not capable of doing so, 
635         invoking this method does not have any consequences. 
636
637             :param guid: Guid of the RM
638             :type guid: int
639
640         """
641         rm = self.get_resource(guid)
642         return rm.provision()
643
644     def get(self, guid, name):
645         """ Returns the value of the attribute with name 'name' on the
646         RM with guid 'guid'
647
648             :param guid: Guid of the RM
649             :type guid: int
650
651             :param name: Name of the attribute 
652             :type name: str
653
654             :return: The value of the attribute with name 'name'
655
656         """
657         rm = self.get_resource(guid)
658         return rm.get(name)
659
660     def set(self, guid, name, value):
661         """ Modifies the value of the attribute with name 'name' on the 
662         RM with guid 'guid'.
663
664             :param guid: Guid of the RM
665             :type guid: int
666
667             :param name: Name of the attribute
668             :type name: str
669
670             :param value: Value of the attribute
671
672         """
673         rm = self.get_resource(guid)
674         rm.set(name, value)
675
676     def get_global(self, rtype, name):
677         """ Returns the value of the global attribute with name 'name' on the
678         RMs of rtype 'rtype'.
679
680             :param guid: Guid of the RM
681             :type guid: int
682
683             :param name: Name of the attribute 
684             :type name: str
685
686             :return: The value of the attribute with name 'name'
687
688         """
689         rclass = ResourceFactory.get_resource_type(rtype)
690         return rclass.get_global(name)
691
692     def set_global(self, rtype, name, value):
693         """ Modifies the value of the global attribute with name 'name' on the 
694         RMs of with rtype 'rtype'.
695
696             :param guid: Guid of the RM
697             :type guid: int
698
699             :param name: Name of the attribute
700             :type name: str
701
702             :param value: Value of the attribute
703
704         """
705         rclass = ResourceFactory.get_resource_type(rtype)
706         return rclass.set_global(name, value)
707
708     def state(self, guid, hr = False):
709         """ Returns the state of a resource
710
711             :param guid: Resource guid
712             :type guid: integer
713
714             :param hr: Human readable. Forces return of a 
715                 status string instead of a number 
716             :type hr: boolean
717
718         """
719         rm = self.get_resource(guid)
720         state = rm.state
721
722         if hr:
723             return ResourceState2str.get(state)
724
725         return state
726
727     def stop(self, guid):
728         """ Stops the RM with guid 'guid'
729
730         Stopping a RM means that the resource it controls will
731         no longer take part of the experiment.
732
733             :param guid: Guid of the RM
734             :type guid: int
735
736         """
737         rm = self.get_resource(guid)
738         return rm.stop()
739
740     def start(self, guid):
741         """ Starts the RM with guid 'guid'
742
743         Starting a RM means that the resource it controls will
744         begin taking part of the experiment.
745
746             :param guid: Guid of the RM
747             :type guid: int
748
749         """
750         rm = self.get_resource(guid)
751         return rm.start()
752
753     def get_start_time(self, guid):
754         """ Returns the start time of the RM as a timestamp """
755         rm = self.get_resource(guid)
756         return rm.start_time
757
758     def get_stop_time(self, guid):
759         """ Returns the stop time of the RM as a timestamp """
760         rm = self.get_resource(guid)
761         return rm.stop_time
762
763     def get_discover_time(self, guid):
764         """ Returns the discover time of the RM as a timestamp """
765         rm = self.get_resource(guid)
766         return rm.discover_time
767
768     def get_provision_time(self, guid):
769         """ Returns the provision time of the RM as a timestamp """
770         rm = self.get_resource(guid)
771         return rm.provision_time
772
773     def get_ready_time(self, guid):
774         """ Returns the deployment time of the RM as a timestamp """
775         rm = self.get_resource(guid)
776         return rm.ready_time
777
778     def get_release_time(self, guid):
779         """ Returns the release time of the RM as a timestamp """
780         rm = self.get_resource(guid)
781         return rm.release_time
782
783     def get_failed_time(self, guid):
784         """ Returns the time failure occured for the RM as a timestamp """
785         rm = self.get_resource(guid)
786         return rm.failed_time
787
788     def set_with_conditions(self, name, value, guids1, guids2, state,
789             time = None):
790         """ Modifies the value of attribute with name 'name' on all RMs 
791         on the guids1 list when time 'time' has elapsed since all 
792         elements in guids2 list have reached state 'state'.
793
794             :param name: Name of attribute to set in RM
795             :type name: string
796
797             :param value: Value of attribute to set in RM
798             :type name: string
799
800             :param guids1: List of guids of RMs subjected to action
801             :type guids1: list
802
803             :param action: Action to register (either START or STOP)
804             :type action: ResourceAction
805
806             :param guids2: List of guids of RMs to we waited for
807             :type guids2: list
808
809             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
810             :type state: ResourceState
811
812             :param time: Time to wait after guids2 has reached status 
813             :type time: string
814
815         """
816         if isinstance(guids1, int):
817             guids1 = [guids1]
818         if isinstance(guids2, int):
819             guids2 = [guids2]
820
821         for guid1 in guids1:
822             rm = self.get_resource(guid)
823             rm.set_with_conditions(name, value, guids2, state, time)
824
825     def deploy(self, guids = None, wait_all_ready = True, group = None):
826         """ Deploys all ResourceManagers in the guids list. 
827         
828         If the argument 'guids' is not given, all RMs with state NEW
829         are deployed.
830
831             :param guids: List of guids of RMs to deploy
832             :type guids: list
833
834             :param wait_all_ready: Wait until all RMs are ready in
835                 order to start the RMs
836             :type guid: int
837
838             :param group: Id of deployment group in which to deploy RMs
839             :type group: int
840
841         """
842         self.logger.debug(" ------- DEPLOY START ------ ")
843
844         if not guids:
845             # If no guids list was passed, all 'NEW' RMs will be deployed
846             guids = []
847             for guid, rm in self._resources.iteritems():
848                 if rm.state == ResourceState.NEW:
849                     guids.append(guid)
850                 
851         if isinstance(guids, int):
852             guids = [guids]
853
854         # Create deployment group
855         # New guids can be added to a same deployment group later on
856         new_group = False
857         if not group:
858             new_group = True
859             group = self._group_id_generator.next()
860
861         if group not in self._groups:
862             self._groups[group] = []
863
864         self._groups[group].extend(guids)
865
866         def wait_all_and_start(group):
867             # Function that checks if all resources are READY
868             # before scheduling a start_with_conditions for each RM
869             reschedule = False
870             
871             # Get all guids in group
872             guids = self._groups[group]
873
874             for guid in guids:
875                 if self.state(guid) < ResourceState.READY:
876                     reschedule = True
877                     break
878
879             if reschedule:
880                 callback = functools.partial(wait_all_and_start, group)
881                 self.schedule("1s", callback)
882             else:
883                 # If all resources are ready, we schedule the start
884                 for guid in guids:
885                     rm = self.get_resource(guid)
886                     self.schedule("0s", rm.start_with_conditions)
887
888                     if rm.conditions.get(ResourceAction.STOP):
889                         # Only if the RM has STOP conditions we
890                         # schedule a stop. Otherwise the RM will stop immediately
891                         self.schedule("0s", rm.stop_with_conditions)
892
893         if wait_all_ready and new_group:
894             # Schedule a function to check that all resources are
895             # READY, and only then schedule the start.
896             # This aims at reducing the number of tasks looping in the 
897             # scheduler. 
898             # Instead of having many start tasks, we will have only one for 
899             # the whole group.
900             callback = functools.partial(wait_all_and_start, group)
901             self.schedule("0s", callback)
902
903         for guid in guids:
904             rm = self.get_resource(guid)
905             rm.deployment_group = group
906             self.schedule("0s", rm.deploy_with_conditions)
907
908             if not wait_all_ready:
909                 self.schedule("0s", rm.start_with_conditions)
910
911                 if rm.conditions.get(ResourceAction.STOP):
912                     # Only if the RM has STOP conditions we
913                     # schedule a stop. Otherwise the RM will stop immediately
914                     self.schedule("0s", rm.stop_with_conditions)
915
916     def release(self, guids = None):
917         """ Releases all ResourceManagers in the guids list.
918
919         If the argument 'guids' is not given, all RMs registered
920         in the experiment are released.
921
922             :param guids: List of RM guids
923             :type guids: list
924
925         """
926         if isinstance(guids, int):
927             guids = [guids]
928
929         if not guids:
930             guids = self.resources
931
932         for guid in guids:
933             rm = self.get_resource(guid)
934             self.schedule("0s", rm.release)
935
936         self.wait_released(guids)
937
938         for guid in guids:
939             if self.get(guid, "hardRelease"):
940                 self.remove_resource(guid)
941         
942     def shutdown(self):
943         """ Releases all resources and stops the ExperimentController
944
945         """
946         # If there was a major failure we can't exit gracefully
947         if self._state == ECState.FAILED:
948             raise RuntimeError("EC failure. Can not exit gracefully")
949
950         # Remove all pending tasks from the scheduler queue
951         for tid in list(self._scheduler.pending):
952             self._scheduler.remove(tid)
953
954         # Remove pending tasks from the workers queue
955         self._runner.empty()
956
957         self.release()
958
959         # Mark the EC state as TERMINATED
960         self._state = ECState.TERMINATED
961
962         # Stop processing thread
963         self._stop = True
964
965         # Notify condition to wake up the processing thread
966         self._notify()
967         
968         if self._thread.is_alive():
969            self._thread.join()
970
971     def schedule(self, date, callback, track = False):
972         """ Schedules a callback to be executed at time 'date'.
973
974             :param date: string containing execution time for the task.
975                     It can be expressed as an absolute time, using
976                     timestamp format, or as a relative time matching
977                     ^\d+.\d+(h|m|s|ms|us)$
978
979             :param callback: code to be executed for the task. Must be a
980                         Python function, and receives args and kwargs
981                         as arguments.
982
983             :param track: if set to True, the task will be retrievable with
984                     the get_task() method
985
986             :return : The Id of the task
987             :rtype: int
988             
989         """
990         timestamp = stabsformat(date)
991         task = Task(timestamp, callback)
992         task = self._scheduler.schedule(task)
993
994         if track:
995             self._tasks[task.id] = task
996
997         # Notify condition to wake up the processing thread
998         self._notify()
999
1000         return task.id
1001      
1002     def _process(self):
1003         """ Process scheduled tasks.
1004
1005         .. note::
1006         
1007         Tasks are scheduled by invoking the schedule method with a target 
1008         callback and an execution time. 
1009         The schedule method creates a new Task object with that callback 
1010         and execution time, and pushes it into the '_scheduler' queue. 
1011         The execution time and the order of arrival of tasks are used 
1012         to order the tasks in the queue.
1013
1014         The _process method is executed in an independent thread held by 
1015         the ExperimentController for as long as the experiment is running.
1016         This method takes tasks from the '_scheduler' queue in a loop 
1017         and processes them in parallel using multithreading. 
1018         The environmental variable NEPI_NTHREADS can be used to control
1019         the number of threads used to process tasks. The default value is 
1020         50.
1021
1022         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1023         This object keeps a pool of threads (workers), and a queue of tasks
1024         scheduled for 'immediate' execution. 
1025         
1026         On each iteration, the '_process' loop will take the next task that 
1027         is scheduled for 'future' execution from the '_scheduler' queue, 
1028         and if the execution time of that task is >= to the current time, 
1029         it will push that task into the PR for 'immediate execution'. 
1030         As soon as a worker is free, the PR will assign the next task to
1031         that worker.
1032
1033         Upon receiving a task to execute, each PR worker (thread) will 
1034         invoke the  _execute method of the EC, passing the task as 
1035         argument.         
1036         The _execute method will then invoke task.callback inside a 
1037         try/except block. If an exception is raised by the tasks.callback, 
1038         it will be trapped by the try block, logged to standard error 
1039         (usually the console), and the task will be marked as failed.
1040
1041         """
1042
1043         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1044         self._runner = ParallelRun(maxthreads = self.nthreads)
1045         self._runner.start()
1046
1047         while not self._stop:
1048             try:
1049                 self._cond.acquire()
1050
1051                 task = self._scheduler.next()
1052                 
1053                 if not task:
1054                     # No task to execute. Wait for a new task to be scheduled.
1055                     self._cond.wait()
1056                 else:
1057                     # The task timestamp is in the future. Wait for timeout 
1058                     # or until another task is scheduled.
1059                     now = tnow()
1060                     if now < task.timestamp:
1061                         # Calculate timeout in seconds
1062                         timeout = tdiffsec(task.timestamp, now)
1063
1064                         # Re-schedule task with the same timestamp
1065                         self._scheduler.schedule(task)
1066                         
1067                         task = None
1068
1069                         # Wait timeout or until a new task awakes the condition
1070                         self._cond.wait(timeout)
1071                
1072                 self._cond.release()
1073
1074                 if task:
1075                     # Process tasks in parallel
1076                     self._runner.put(self._execute, task)
1077             except: 
1078                 import traceback
1079                 err = traceback.format_exc()
1080                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1081
1082                 # Set the EC to FAILED state 
1083                 self._state = ECState.FAILED
1084             
1085                 # Set the FailureManager failure level to EC failure
1086                 self._fm.set_ec_failure()
1087
1088         self.logger.debug("Exiting the task processing loop ... ")
1089         
1090         self._runner.sync()
1091         self._runner.destroy()
1092
1093     def _execute(self, task):
1094         """ Executes a single task. 
1095
1096             :param task: Object containing the callback to execute
1097             :type task: Task
1098
1099         """
1100         try:
1101             # Invoke callback
1102             task.result = task.callback()
1103             task.status = TaskStatus.DONE
1104         except:
1105             import traceback
1106             err = traceback.format_exc()
1107             task.result = err
1108             task.status = TaskStatus.ERROR
1109             
1110             self.logger.error("Error occurred while executing task: %s" % err)
1111
1112     def _notify(self):
1113         """ Awakes the processing thread if it is blocked waiting 
1114         for new tasks to arrive
1115         
1116         """
1117         self._cond.acquire()
1118         self._cond.notify()
1119         self._cond.release()
1120