Fixing tickets http://newyans.pl.sophia.inria.fr/trac/ticket/37 and http://newyans...
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state
42     """
43     OK = 1
44     RM_FAILURE = 2
45     EC_FAILURE = 3
46
47 class FailureManager(object):
48     """ The FailureManager is responsible for handling errors,
49     and deciding whether an experiment should be aborted
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55
56     @property
57     def ec(self):
58         """ Returns the Experiment Controller """
59         return self._ec()
60
61     @property
62     def abort(self):
63         if self._failure_level == FailureLevel.OK:
64             for guid in self.ec.resources:
65                 state = self.ec.state(guid)
66                 critical = self.ec.get(guid, "critical")
67
68                 if state == ResourceState.FAILED and critical:
69                     self._failure_level = FailureLevel.RM_FAILURE
70                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
71                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
72                     break
73
74         return self._failure_level != FailureLevel.OK
75
76     def set_ec_failure(self):
77         self._failure_level = FailureLevel.EC_FAILURE
78
79
80 class ECState(object):
81     """ State of the Experiment Controller
82    
83     """
84     RUNNING = 1
85     FAILED = 2
86     TERMINATED = 3
87
88 class ExperimentController(object):
89     """
90     .. class:: Class Args :
91       
92         :param exp_id: Human readable identifier for the experiment scenario. 
93         :type exp_id: str
94
95     .. note::
96
97         An experiment, or scenario, is defined by a concrete set of resources,
98         behavior, configuration and interconnection of those resources. 
99         The Experiment Description (ED) is a detailed representation of a
100         single experiment. It contains all the necessary information to 
101         allow repeating the experiment. NEPI allows to describe
102         experiments by registering components (resources), configuring them
103         and interconnecting them.
104         
105         A same experiment (scenario) can be executed many times, generating 
106         different results. We call an experiment execution (instance) a 'run'.
107
108         The ExperimentController (EC), is the entity responsible of
109         managing an experiment run. The same scenario can be 
110         recreated (and re-run) by instantiating an EC and recreating 
111         the same experiment description. 
112
113         In NEPI, an experiment is represented as a graph of interconnected
114         resources. A resource is a generic concept in the sense that any
115         component taking part of an experiment, whether physical of
116         virtual, is considered a resource. A resources could be a host, 
117         a virtual machine, an application, a simulator, a IP address.
118
119         A ResourceManager (RM), is the entity responsible for managing a 
120         single resource. ResourceManagers are specific to a resource
121         type (i.e. An RM to control a Linux application will not be
122         the same as the RM used to control a ns-3 simulation).
123         To support a new type of resource in NEPI, a new RM must be 
124         implemented. NEPI already provides a variety of
125         RMs to control basic resources, and new can be extended from
126         the existing ones.
127
128         Through the EC interface the user can create ResourceManagers (RMs),
129         configure them and interconnect them, to describe an experiment.
130         Describing an experiment through the EC does not run the experiment.
131         Only when the 'deploy()' method is invoked on the EC, the EC will take 
132         actions to transform the 'described' experiment into a 'running' experiment.
133
134         While the experiment is running, it is possible to continue to
135         create/configure/connect RMs, and to deploy them to involve new
136         resources in the experiment (this is known as 'interactive' deployment).
137         
138         An experiments in NEPI is identified by a string id, 
139         which is either given by the user, or automatically generated by NEPI.  
140         The purpose of this identifier is to separate files and results that 
141         belong to different experiment scenarios. 
142         However, since a same 'experiment' can be run many times, the experiment
143         id is not enough to identify an experiment instance (run).
144         For this reason, the ExperimentController has two identifier, the 
145         exp_id, which can be re-used in different ExperimentController,
146         and the run_id, which is unique to one ExperimentController instance, and
147         is automatically generated by NEPI.
148         
149     """
150
151     def __init__(self, exp_id = None): 
152         super(ExperimentController, self).__init__()
153         # Logging
154         self._logger = logging.getLogger("ExperimentController")
155
156         # Run identifier. It identifies a concrete execution instance (run) 
157         # of an experiment.
158         # Since a same experiment (same configuration) can be executed many 
159         # times, this run_id permits to separate result files generated on 
160         # different experiment executions
161         self._run_id = tsformat()
162
163         # Experiment identifier. Usually assigned by the user
164         # Identifies the experiment scenario (i.e. configuration, 
165         # resources used, etc)
166         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
167
168         # generator of globally unique ids
169         self._guid_generator = guid.GuidGenerator()
170         
171         # Resource managers
172         self._resources = dict()
173
174         # Scheduler. It a queue that holds tasks scheduled for
175         # execution, and yields the next task to be executed 
176         # ordered by execution and arrival time
177         self._scheduler = HeapScheduler()
178
179         # Tasks
180         self._tasks = dict()
181
182         # RM groups (for deployment) 
183         self._groups = dict()
184
185         # generator of globally unique id for groups
186         self._group_id_generator = guid.GuidGenerator()
187
188         # Flag to stop processing thread
189         self._stop = False
190     
191         # Entity in charge of managing system failures
192         self._fm = FailureManager(self)
193
194         # EC state
195         self._state = ECState.RUNNING
196
197         # The runner is a pool of threads used to parallelize 
198         # execution of tasks
199         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
200         self._runner = ParallelRun(maxthreads = nthreads)
201
202         # Event processing thread
203         self._cond = threading.Condition()
204         self._thread = threading.Thread(target = self._process)
205         self._thread.setDaemon(True)
206         self._thread.start()
207
208     @property
209     def logger(self):
210         """ Return the logger of the Experiment Controller
211
212         """
213         return self._logger
214
215     @property
216     def ecstate(self):
217         """ Return the state of the Experiment Controller
218
219         """
220         return self._state
221
222     @property
223     def exp_id(self):
224         """ Return the experiment id assigned by the user
225
226         """
227         return self._exp_id
228
229     @property
230     def run_id(self):
231         """ Return the experiment instance (run) identifier  
232
233         """
234         return self._run_id
235
236     @property
237     def abort(self):
238         return self._fm.abort
239
240     def wait_finished(self, guids):
241         """ Blocking method that wait until all RMs in the 'guid' list 
242             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
243             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
244
245         :param guids: List of guids
246         :type guids: list
247
248         """
249
250         def quit():
251             return self.abort
252
253         return self.wait(guids, state = ResourceState.STOPPED, 
254                 quit = quit)
255
256     def wait_started(self, guids):
257         """ Blocking method that wait until all RMs in the 'guid' list 
258             reach a state >= STARTED or until a System Failure occurs 
259             (e.g. Task Failure) 
260
261         :param guids: List of guids
262         :type guids: list
263         """
264
265         def quit():
266             return self.abort
267
268         return self.wait(guids, state = ResourceState.STARTED, 
269                 quit = quit)
270
271     def wait_released(self, guids):
272         """ Blocking method that wait until all RMs in the 'guid' list 
273             reach a state = RELEASED or until the EC fails
274
275         :param guids: List of guids
276         :type guids: list
277         """
278
279         def quit():
280             return self._state == ECState.FAILED
281
282         return self.wait(guids, state = ResourceState.RELEASED, 
283                 quit = quit)
284
285     def wait_deployed(self, guids):
286         """ Blocking method that wait until all RMs in the 'guid' list 
287             reach a state >= READY or until a System Failure occurs 
288             (e.g. Task Failure) 
289
290         :param guids: List of guids
291         :type guids: list
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.READY, 
298                 quit = quit)
299
300     def wait(self, guids, state, quit):
301         """ Blocking method that wait until all RMs in the 'guid' list 
302             reach a state >= 'state' or until quit yileds True
303            
304         :param guids: List of guids
305         :type guids: list
306         """
307         if isinstance(guids, int):
308             guids = [guids]
309
310         # Make a copy to avoid modifying the original guids list
311         guids = list(guids)
312
313         while True:
314             # If there are no more guids to wait for
315             # or the quit function returns True, exit the loop
316             if len(guids) == 0 or quit():
317                 break
318
319             # If a guid reached one of the target states, remove it from list
320             guid = guids[0]
321             rstate = self.state(guid)
322             
323             hrrstate = ResourceState2str.get(rstate)
324             hrstate = ResourceState2str.get(state)
325
326             if rstate >= state:
327                 guids.remove(guid)
328                 rm = self.get_resource(guid)
329                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
330                     rm.rtype(), guid, hrrstate, hrstate))
331             else:
332                 # Debug...
333                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
334                     guid, hrrstate, hrstate))
335                 time.sleep(0.5)
336   
337     def get_task(self, tid):
338         """ Get a specific task
339
340         :param tid: Id of the task
341         :type tid: int
342         :rtype: Task
343         """
344         return self._tasks.get(tid)
345
346     def get_resource(self, guid):
347         """ Get a specific Resource Manager
348
349         :param guid: Id of the task
350         :type guid: int
351         :rtype: ResourceManager
352         """
353         return self._resources.get(guid)
354
355     @property
356     def resources(self):
357         """ Returns the list of all the Resource Manager Id
358
359         :rtype: set
360
361         """
362         return self._resources.keys()
363
364     def register_resource(self, rtype, guid = None):
365         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
366         for the RM of type 'rtype' and add it to the list of Resources.
367
368         :param rtype: Type of the RM
369         :type rtype: str
370         :return: Id of the RM
371         :rtype: int
372         """
373         # Get next available guid
374         guid = self._guid_generator.next(guid)
375         
376         # Instantiate RM
377         rm = ResourceFactory.create(rtype, self, guid)
378
379         # Store RM
380         self._resources[guid] = rm
381
382         return guid
383
384     def get_attributes(self, guid):
385         """ Return all the attibutes of a specific RM
386
387         :param guid: Guid of the RM
388         :type guid: int
389         :return: List of attributes
390         :rtype: list
391         """
392         rm = self.get_resource(guid)
393         return rm.get_attributes()
394
395     def register_connection(self, guid1, guid2):
396         """ Registers a guid1 with a guid2. 
397             The declaration order is not important
398
399             :param guid1: First guid to connect
400             :type guid1: ResourceManager
401
402             :param guid2: Second guid to connect
403             :type guid: ResourceManager
404         """
405         rm1 = self.get_resource(guid1)
406         rm2 = self.get_resource(guid2)
407
408         rm1.register_connection(guid2)
409         rm2.register_connection(guid1)
410
411     def register_condition(self, guids1, action, guids2, state,
412             time = None):
413         """ Registers an action START or STOP for all RM on guids1 to occur 
414             time 'time' after all elements in guids2 reached state 'state'.
415
416             :param guids1: List of guids of RMs subjected to action
417             :type guids1: list
418
419             :param action: Action to register (either START or STOP)
420             :type action: ResourceAction
421
422             :param guids2: List of guids of RMs to we waited for
423             :type guids2: list
424
425             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
426             :type state: ResourceState
427
428             :param time: Time to wait after guids2 has reached status 
429             :type time: string
430
431         """
432         if isinstance(guids1, int):
433             guids1 = [guids1]
434         if isinstance(guids2, int):
435             guids2 = [guids2]
436
437         for guid1 in guids1:
438             rm = self.get_resource(guid1)
439             rm.register_condition(action, guids2, state, time)
440
441     def enable_trace(self, guid, name):
442         """ Enable trace
443
444         :param name: Name of the trace
445         :type name: str
446         """
447         rm = self.get_resource(guid)
448         rm.enable_trace(name)
449
450     def trace_enabled(self, guid, name):
451         """ Returns True if trace is enabled
452
453         :param name: Name of the trace
454         :type name: str
455         """
456         rm = self.get_resource(guid)
457         return rm.trace_enabled(name)
458
459     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
460         """ Get information on collected trace
461
462         :param name: Name of the trace
463         :type name: str
464
465         :param attr: Can be one of:
466                          - TraceAttr.ALL (complete trace content), 
467                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
468                          - TraceAttr.PATH (full path to the trace file),
469                          - TraceAttr.SIZE (size of trace file). 
470         :type attr: str
471
472         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
473         :type name: int
474
475         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
476         :type name: int
477
478         :rtype: str
479         """
480         rm = self.get_resource(guid)
481         return rm.trace(name, attr, block, offset)
482
483     def discover(self, guid):
484         """ Discover a specific RM defined by its 'guid'
485
486             :param guid: Guid of the RM
487             :type guid: int
488
489         """
490         rm = self.get_resource(guid)
491         return rm.discover()
492
493     def provision(self, guid):
494         """ Provision a specific RM defined by its 'guid'
495
496             :param guid: Guid of the RM
497             :type guid: int
498
499         """
500         rm = self.get_resource(guid)
501         return rm.provision()
502
503     def get(self, guid, name):
504         """ Get a specific attribute 'name' from the RM 'guid'
505
506             :param guid: Guid of the RM
507             :type guid: int
508
509             :param name: attribute's name
510             :type name: str
511
512         """
513         rm = self.get_resource(guid)
514         return rm.get(name)
515
516     def set(self, guid, name, value):
517         """ Set a specific attribute 'name' from the RM 'guid' 
518             with the value 'value' 
519
520             :param guid: Guid of the RM
521             :type guid: int
522
523             :param name: attribute's name
524             :type name: str
525
526             :param value: attribute's value
527
528         """
529         rm = self.get_resource(guid)
530         return rm.set(name, value)
531
532     def state(self, guid, hr = False):
533         """ Returns the state of a resource
534
535             :param guid: Resource guid
536             :type guid: integer
537
538             :param hr: Human readable. Forces return of a 
539                 status string instead of a number 
540             :type hr: boolean
541
542         """
543         rm = self.get_resource(guid)
544         state = rm.state
545
546         if hr:
547             return ResourceState2str.get(state)
548
549         return state
550
551     def stop(self, guid):
552         """ Stop a specific RM defined by its 'guid'
553
554             :param guid: Guid of the RM
555             :type guid: int
556
557         """
558         rm = self.get_resource(guid)
559         return rm.stop()
560
561     def start(self, guid):
562         """ Start a specific RM defined by its 'guid'
563
564             :param guid: Guid of the RM
565             :type guid: int
566
567         """
568         rm = self.get_resource(guid)
569         return rm.start()
570
571     def set_with_conditions(self, name, value, guids1, guids2, state,
572             time = None):
573         """ Set value 'value' on attribute with name 'name' on all RMs of
574             guids1 when 'time' has elapsed since all elements in guids2 
575             have reached state 'state'.
576
577             :param name: Name of attribute to set in RM
578             :type name: string
579
580             :param value: Value of attribute to set in RM
581             :type name: string
582
583             :param guids1: List of guids of RMs subjected to action
584             :type guids1: list
585
586             :param action: Action to register (either START or STOP)
587             :type action: ResourceAction
588
589             :param guids2: List of guids of RMs to we waited for
590             :type guids2: list
591
592             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
593             :type state: ResourceState
594
595             :param time: Time to wait after guids2 has reached status 
596             :type time: string
597
598         """
599         if isinstance(guids1, int):
600             guids1 = [guids1]
601         if isinstance(guids2, int):
602             guids2 = [guids2]
603
604         for guid1 in guids1:
605             rm = self.get_resource(guid)
606             rm.set_with_conditions(name, value, guids2, state, time)
607
608     def deploy(self, guids = None, wait_all_ready = True, group = None):
609         """ Deploy all resource manager in guids list
610
611         :param guids: List of guids of RMs to deploy
612         :type guids: list
613
614         :param wait_all_ready: Wait until all RMs are ready in
615             order to start the RMs
616         :type guid: int
617
618         :param group: Id of deployment group in which to deploy RMs
619         :type group: int
620
621         """
622         self.logger.debug(" ------- DEPLOY START ------ ")
623
624         if not guids:
625             # If no guids list was passed, all 'NEW' RMs will be deployed
626             guids = []
627             for guid in self.resources:
628                 if self.state(guid) == ResourceState.NEW:
629                     guids.append(guid)
630                 
631         if isinstance(guids, int):
632             guids = [guids]
633
634         # Create deployment group
635         # New guids can be added to a same deployment group later on
636         new_group = False
637         if not group:
638             new_group = True
639             group = self._group_id_generator.next()
640
641         if group not in self._groups:
642             self._groups[group] = []
643
644         self._groups[group].extend(guids)
645
646         def wait_all_and_start(group):
647             # Function that checks if all resources are READY
648             # before scheduling a start_with_conditions for each RM
649             reschedule = False
650             
651             # Get all guids in group
652             guids = self._groups[group]
653
654             for guid in guids:
655                 if self.state(guid) < ResourceState.READY:
656                     reschedule = True
657                     break
658
659             if reschedule:
660                 callback = functools.partial(wait_all_and_start, group)
661                 self.schedule("1s", callback)
662             else:
663                 # If all resources are ready, we schedule the start
664                 for guid in guids:
665                     rm = self.get_resource(guid)
666                     self.schedule("0s", rm.start_with_conditions)
667
668         if wait_all_ready and new_group:
669             # Schedule a function to check that all resources are
670             # READY, and only then schedule the start.
671             # This aims at reducing the number of tasks looping in the 
672             # scheduler. 
673             # Instead of having many start tasks, we will have only one for 
674             # the whole group.
675             callback = functools.partial(wait_all_and_start, group)
676             self.schedule("0s", callback)
677
678         for guid in guids:
679             rm = self.get_resource(guid)
680             rm.deployment_group = group
681             self.schedule("0s", rm.deploy_with_conditions)
682
683             if not wait_all_ready:
684                 self.schedule("0s", rm.start_with_conditions)
685
686             if rm.conditions.get(ResourceAction.STOP):
687                 # Only if the RM has STOP conditions we
688                 # schedule a stop. Otherwise the RM will stop immediately
689                 self.schedule("0s", rm.stop_with_conditions)
690
691     def release(self, guids = None):
692         """ Release al RMs on the guids list or 
693         all the resources if no list is specified
694
695             :param guids: List of RM guids
696             :type guids: list
697
698         """
699         if not guids:
700             guids = self.resources
701
702         # Remove all pending tasks from the scheduler queue
703         for tid in list(self._scheduler.pending):
704             self._scheduler.remove(tid)
705
706         self._runner.empty()
707
708         for guid in guids:
709             rm = self.get_resource(guid)
710             self.schedule("0s", rm.release)
711
712         self.wait_released(guids)
713         
714     def shutdown(self):
715         """ Shutdown the Experiment Controller. 
716         Releases all the resources and stops task processing thread
717
718         """
719         # If there was a major failure we can't exit gracefully
720         if self._state == ECState.FAILED:
721             raise RuntimeError("EC failure. Can not exit gracefully")
722
723         self.release()
724
725         # Mark the EC state as TERMINATED
726         self._state = ECState.TERMINATED
727
728         # Stop processing thread
729         self._stop = True
730
731         # Notify condition to wake up the processing thread
732         self._notify()
733         
734         if self._thread.is_alive():
735            self._thread.join()
736
737     def schedule(self, date, callback, track = False):
738         """ Schedule a callback to be executed at time date.
739
740             :param date: string containing execution time for the task.
741                     It can be expressed as an absolute time, using
742                     timestamp format, or as a relative time matching
743                     ^\d+.\d+(h|m|s|ms|us)$
744
745             :param callback: code to be executed for the task. Must be a
746                         Python function, and receives args and kwargs
747                         as arguments.
748
749             :param track: if set to True, the task will be retrivable with
750                     the get_task() method
751
752             :return : The Id of the task
753         """
754         timestamp = stabsformat(date)
755         task = Task(timestamp, callback)
756         task = self._scheduler.schedule(task)
757
758         if track:
759             self._tasks[task.id] = task
760
761         # Notify condition to wake up the processing thread
762         self._notify()
763
764         return task.id
765      
766     def _process(self):
767         """ Process scheduled tasks.
768
769         .. note::
770
771         The _process method is executed in an independent thread held by the 
772         ExperimentController for as long as the experiment is running.
773         
774         Tasks are scheduled by invoking the schedule method with a target callback. 
775         The schedule method is given a execution time which controls the
776         order in which tasks are processed. 
777
778         Tasks are processed in parallel using multithreading. 
779         The environmental variable NEPI_NTHREADS can be used to control
780         the number of threads used to process tasks. The default value is 50.
781
782         Exception handling:
783
784         To execute tasks in parallel, an ParallelRunner (PR) object, holding
785         a pool of threads (workers), is used.
786         For each available thread in the PR, the next task popped from 
787         the scheduler queue is 'put' in the PR.
788         Upon receiving a task to execute, each PR worker (thread) invokes the 
789         _execute method of the EC, passing the task as argument. 
790         This method, calls task.callback inside a try/except block. If an 
791         exception is raised by the tasks.callback, it will be trapped by the 
792         try block, logged to standard error (usually the console), and the EC 
793         state will be set to ECState.FAILED.
794         The invocation of _notify immediately after, forces the processing
795         loop in the _process method, to wake up if it was blocked waiting for new 
796         tasks to arrived, and to check the EC state.
797         As the EC is in FAILED state, the processing loop exits and the 
798         'finally' block is invoked. In the 'finally' block, the 'sync' method
799         of the PR is invoked, which forces the PR to raise any unchecked errors
800         that might have been raised by the workers.
801
802         """
803
804         self._runner.start()
805
806         while not self._stop:
807             try:
808                 self._cond.acquire()
809
810                 task = self._scheduler.next()
811                 
812                 if not task:
813                     # No task to execute. Wait for a new task to be scheduled.
814                     self._cond.wait()
815                 else:
816                     # The task timestamp is in the future. Wait for timeout 
817                     # or until another task is scheduled.
818                     now = tnow()
819                     if now < task.timestamp:
820                         # Calculate timeout in seconds
821                         timeout = tdiffsec(task.timestamp, now)
822
823                         # Re-schedule task with the same timestamp
824                         self._scheduler.schedule(task)
825                         
826                         task = None
827
828                         # Wait timeout or until a new task awakes the condition
829                         self._cond.wait(timeout)
830                
831                 self._cond.release()
832
833                 if task:
834                     # Process tasks in parallel
835                     self._runner.put(self._execute, task)
836             except: 
837                 import traceback
838                 err = traceback.format_exc()
839                 self.logger.error("Error while processing tasks in the EC: %s" % err)
840
841                 # Set the EC to FAILED state 
842                 self._state = ECState.FAILED
843             
844                 # Set the FailureManager failure level to EC failure
845                 self._fm.set_ec_failure()
846
847         self.logger.debug("Exiting the task processing loop ... ")
848         
849         self._runner.sync()
850         self._runner.destroy()
851
852     def _execute(self, task):
853         """ Executes a single task. 
854
855             :param task: Object containing the callback to execute
856             :type task: Task
857
858         .. note::
859
860         If the invokation of the task callback raises an
861         exception, the processing thread of the ExperimentController
862         will be stopped and the experiment will be aborted.
863
864         """
865         # Invoke callback
866         task.status = TaskStatus.DONE
867
868         try:
869             task.result = task.callback()
870         except:
871             import traceback
872             err = traceback.format_exc()
873             task.result = err
874             task.status = TaskStatus.ERROR
875             
876             self.logger.error("Error occurred while executing task: %s" % err)
877
878     def _notify(self):
879         """ Awakes the processing thread in case it is blocked waiting
880         for a new task to be scheduled.
881         """
882         self._cond.acquire()
883         self._cond.notify()
884         self._cond.release()
885