ec_shutdown
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38
39 class FailurePolicy(object):
40     """ Defines how to respond to experiment failures  
41     """
42     IGNORE_RM_FAILURE = 1
43     ABORT_ON_RM_FAILURE = 2
44
45 class FailureLevel(object):
46     """ Describe the system failure state
47     """
48     OK = 1
49     RM_FAILURE = 2
50     TASK_FAILURE = 3
51     EC_FAILURE = 4
52
53 class FailureManager(object):
54     """ The FailureManager is responsible for handling errors,
55     and deciding whether an experiment should be aborted
56     """
57
58     def __init__(self, failure_policy = None):
59         self._failure_level = FailureLevel.OK
60         self._failure_policy = failure_policy or \
61                 FailurePolicy.ABORT_ON_RM_FAILURE
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.EC_FAILURE:
66             return True
67
68         if self._failure_level in [FailureLevel.TASK_FAILURE, 
69                 FailureLevel.RM_FAILURE] and \
70                         self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
71             return True
72
73         return False
74
75     def set_rm_failure(self):
76         self._failure_level = FailureLevel.RM_FAILURE
77
78     def set_task_failure(self):
79         self._failure_level = FailureLevel.TASK_FAILURE
80
81     def set_ec_failure(self):
82         self._failure_level = FailureLevel.EC_FAILURE
83
84 class ECState(object):
85     """ State of the Experiment Controller
86    
87     """
88     RUNNING = 1
89     FAILED = 2
90     TERMINATED = 3
91
92 class ExperimentController(object):
93     """
94     .. class:: Class Args :
95       
96         :param exp_id: Human readable identifier for the experiment scenario. 
97         :type exp_id: str
98
99     .. note::
100
101         An experiment, or scenario, is defined by a concrete set of resources,
102         behavior, configuration and interconnection of those resources. 
103         The Experiment Description (ED) is a detailed representation of a
104         single experiment. It contains all the necessary information to 
105         allow repeating the experiment. NEPI allows to describe
106         experiments by registering components (resources), configuring them
107         and interconnecting them.
108         
109         A same experiment (scenario) can be executed many times, generating 
110         different results. We call an experiment execution (instance) a 'run'.
111
112         The ExperimentController (EC), is the entity responsible of
113         managing an experiment run. The same scenario can be 
114         recreated (and re-run) by instantiating an EC and recreating 
115         the same experiment description. 
116
117         In NEPI, an experiment is represented as a graph of interconnected
118         resources. A resource is a generic concept in the sense that any
119         component taking part of an experiment, whether physical of
120         virtual, is considered a resource. A resources could be a host, 
121         a virtual machine, an application, a simulator, a IP address.
122
123         A ResourceManager (RM), is the entity responsible for managing a 
124         single resource. ResourceManagers are specific to a resource
125         type (i.e. An RM to control a Linux application will not be
126         the same as the RM used to control a ns-3 simulation).
127         To support a new type of resource in NEPI, a new RM must be 
128         implemented. NEPI already provides a variety of
129         RMs to control basic resources, and new can be extended from
130         the existing ones.
131
132         Through the EC interface the user can create ResourceManagers (RMs),
133         configure them and interconnect them, to describe an experiment.
134         Describing an experiment through the EC does not run the experiment.
135         Only when the 'deploy()' method is invoked on the EC, the EC will take 
136         actions to transform the 'described' experiment into a 'running' experiment.
137
138         While the experiment is running, it is possible to continue to
139         create/configure/connect RMs, and to deploy them to involve new
140         resources in the experiment (this is known as 'interactive' deployment).
141         
142         An experiments in NEPI is identified by a string id, 
143         which is either given by the user, or automatically generated by NEPI.  
144         The purpose of this identifier is to separate files and results that 
145         belong to different experiment scenarios. 
146         However, since a same 'experiment' can be run many times, the experiment
147         id is not enough to identify an experiment instance (run).
148         For this reason, the ExperimentController has two identifier, the 
149         exp_id, which can be re-used in different ExperimentController,
150         and the run_id, which is unique to one ExperimentController instance, and
151         is automatically generated by NEPI.
152         
153     """
154
155     def __init__(self, exp_id = None): 
156         super(ExperimentController, self).__init__()
157         # Logging
158         self._logger = logging.getLogger("ExperimentController")
159
160         # Run identifier. It identifies a concrete execution instance (run) 
161         # of an experiment.
162         # Since a same experiment (same configuration) can be executed many 
163         # times, this run_id permits to separate result files generated on 
164         # different experiment executions
165         self._run_id = tsformat()
166
167         # Experiment identifier. Usually assigned by the user
168         # Identifies the experiment scenario (i.e. configuration, 
169         # resources used, etc)
170         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
171
172         # generator of globally unique ids
173         self._guid_generator = guid.GuidGenerator()
174         
175         # Resource managers
176         self._resources = dict()
177
178         # Scheduler
179         self._scheduler = HeapScheduler()
180
181         # Tasks
182         self._tasks = dict()
183
184         # RM groups (for deployment) 
185         self._groups = dict()
186
187         # generator of globally unique id for groups
188         self._group_id_generator = guid.GuidGenerator()
189  
190         # Event processing thread
191         self._cond = threading.Condition()
192         self._thread = threading.Thread(target = self._process)
193         self._thread.setDaemon(True)
194         self._thread.start()
195
196         # Flag to stop processing thread
197         self._stop = False
198     
199         # Entity in charge of managing system failures
200         self._fm = FailureManager()
201
202         # EC state
203         self._state = ECState.RUNNING
204
205     @property
206     def logger(self):
207         """ Return the logger of the Experiment Controller
208
209         """
210         return self._logger
211
212     @property
213     def ecstate(self):
214         """ Return the state of the Experiment Controller
215
216         """
217         return self._state
218
219     @property
220     def exp_id(self):
221         """ Return the experiment id assigned by the user
222
223         """
224         return self._exp_id
225
226     @property
227     def run_id(self):
228         """ Return the experiment instance (run) identifier  
229
230         """
231         return self._run_id
232
233     @property
234     def abort(self):
235         return self._fm.abort
236
237     def set_rm_failure(self):
238         self._fm.set_rm_failure()
239
240     def wait_finished(self, guids):
241         """ Blocking method that wait until all RMs in the 'guid' list 
242             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
243             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
244
245         :param guids: List of guids
246         :type guids: list
247
248         """
249
250         def quit():
251             return self.abort
252
253         return self.wait(guids, state = ResourceState.STOPPED, 
254                 quit = quit)
255
256     def wait_started(self, guids):
257         """ Blocking method that wait until all RMs in the 'guid' list 
258             reach a state >= STARTED or until a System Failure occurs 
259             (e.g. Task Failure) 
260
261         :param guids: List of guids
262         :type guids: list
263         """
264
265         def quit():
266             return self.abort
267
268         return self.wait(guids, state = ResourceState.STARTED, 
269                 quit = quit)
270
271     def wait_released(self, guids):
272         """ Blocking method that wait until all RMs in the 'guid' list 
273             reach a state = RELEASED or until the EC fails
274
275         :param guids: List of guids
276         :type guids: list
277         """
278
279         def quit():
280             return self._state == ECState.FAILED
281
282         return self.wait(guids, state = ResourceState.RELEASED, 
283                 quit = quit)
284
285     def wait_deployed(self, guids):
286         """ Blocking method that wait until all RMs in the 'guid' list 
287             reach a state >= READY or until a System Failure occurs 
288             (e.g. Task Failure) 
289
290         :param guids: List of guids
291         :type guids: list
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.READY, 
298                 quit = quit)
299
300     def wait(self, guids, state, quit):
301         """ Blocking method that wait until all RMs in the 'guid' list 
302             reach a state >= 'state' or until quit yileds True
303            
304         :param guids: List of guids
305         :type guids: list
306         """
307         if isinstance(guids, int):
308             guids = [guids]
309
310         while True:
311             # If there are no more guids to wait for
312             # or the quit function returns True, exit the loop
313             if len(guids) == 0 or quit():
314                 break
315
316             # If a guid reached one of the target states, remove it from list
317             guid = guids[0]
318             rstate = self.state(guid)
319
320             if rstate >= state:
321                 guids.remove(guid)
322             else:
323                 # Debug...
324                 hrstate = ResourceState2str.get(rstate)
325                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
326                     guid, rstate, state))
327
328             time.sleep(0.5)
329   
330     def get_task(self, tid):
331         """ Get a specific task
332
333         :param tid: Id of the task
334         :type tid: int
335         :rtype: Task
336         """
337         return self._tasks.get(tid)
338
339     def get_resource(self, guid):
340         """ Get a specific Resource Manager
341
342         :param guid: Id of the task
343         :type guid: int
344         :rtype: ResourceManager
345         """
346         return self._resources.get(guid)
347
348     @property
349     def resources(self):
350         """ Returns the list of all the Resource Manager Id
351
352         :rtype: set
353
354         """
355         return self._resources.keys()
356
357     def register_resource(self, rtype, guid = None):
358         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
359         for the RM of type 'rtype' and add it to the list of Resources.
360
361         :param rtype: Type of the RM
362         :type rtype: str
363         :return: Id of the RM
364         :rtype: int
365         """
366         # Get next available guid
367         guid = self._guid_generator.next(guid)
368         
369         # Instantiate RM
370         rm = ResourceFactory.create(rtype, self, guid)
371
372         # Store RM
373         self._resources[guid] = rm
374
375         return guid
376
377     def get_attributes(self, guid):
378         """ Return all the attibutes of a specific RM
379
380         :param guid: Guid of the RM
381         :type guid: int
382         :return: List of attributes
383         :rtype: list
384         """
385         rm = self.get_resource(guid)
386         return rm.get_attributes()
387
388     def register_connection(self, guid1, guid2):
389         """ Registers a guid1 with a guid2. 
390             The declaration order is not important
391
392             :param guid1: First guid to connect
393             :type guid1: ResourceManager
394
395             :param guid2: Second guid to connect
396             :type guid: ResourceManager
397         """
398         rm1 = self.get_resource(guid1)
399         rm2 = self.get_resource(guid2)
400
401         rm1.register_connection(guid2)
402         rm2.register_connection(guid1)
403
404     def register_condition(self, guids1, action, guids2, state,
405             time = None):
406         """ Registers an action START or STOP for all RM on guids1 to occur 
407             time 'time' after all elements in guids2 reached state 'state'.
408
409             :param guids1: List of guids of RMs subjected to action
410             :type guids1: list
411
412             :param action: Action to register (either START or STOP)
413             :type action: ResourceAction
414
415             :param guids2: List of guids of RMs to we waited for
416             :type guids2: list
417
418             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
419             :type state: ResourceState
420
421             :param time: Time to wait after guids2 has reached status 
422             :type time: string
423
424         """
425         if isinstance(guids1, int):
426             guids1 = [guids1]
427         if isinstance(guids2, int):
428             guids2 = [guids2]
429
430         for guid1 in guids1:
431             rm = self.get_resource(guid1)
432             rm.register_condition(action, guids2, state, time)
433
434     def enable_trace(self, guid, name):
435         """ Enable trace
436
437         :param name: Name of the trace
438         :type name: str
439         """
440         rm = self.get_resource(guid)
441         rm.enable_trace(name)
442
443     def trace_enabled(self, guid, name):
444         """ Returns True if trace is enabled
445
446         :param name: Name of the trace
447         :type name: str
448         """
449         rm = self.get_resource(guid)
450         return rm.trace_enabled(name)
451
452     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
453         """ Get information on collected trace
454
455         :param name: Name of the trace
456         :type name: str
457
458         :param attr: Can be one of:
459                          - TraceAttr.ALL (complete trace content), 
460                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
461                          - TraceAttr.PATH (full path to the trace file),
462                          - TraceAttr.SIZE (size of trace file). 
463         :type attr: str
464
465         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
466         :type name: int
467
468         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
469         :type name: int
470
471         :rtype: str
472         """
473         rm = self.get_resource(guid)
474         return rm.trace(name, attr, block, offset)
475
476     def discover(self, guid):
477         """ Discover a specific RM defined by its 'guid'
478
479             :param guid: Guid of the RM
480             :type guid: int
481
482         """
483         rm = self.get_resource(guid)
484         return rm.discover()
485
486     def provision(self, guid):
487         """ Provision a specific RM defined by its 'guid'
488
489             :param guid: Guid of the RM
490             :type guid: int
491
492         """
493         rm = self.get_resource(guid)
494         return rm.provision()
495
496     def get(self, guid, name):
497         """ Get a specific attribute 'name' from the RM 'guid'
498
499             :param guid: Guid of the RM
500             :type guid: int
501
502             :param name: attribute's name
503             :type name: str
504
505         """
506         rm = self.get_resource(guid)
507         return rm.get(name)
508
509     def set(self, guid, name, value):
510         """ Set a specific attribute 'name' from the RM 'guid' 
511             with the value 'value' 
512
513             :param guid: Guid of the RM
514             :type guid: int
515
516             :param name: attribute's name
517             :type name: str
518
519             :param value: attribute's value
520
521         """
522         rm = self.get_resource(guid)
523         return rm.set(name, value)
524
525     def state(self, guid, hr = False):
526         """ Returns the state of a resource
527
528             :param guid: Resource guid
529             :type guid: integer
530
531             :param hr: Human readable. Forces return of a 
532                 status string instead of a number 
533             :type hr: boolean
534
535         """
536         rm = self.get_resource(guid)
537         state = rm.state
538
539         if hr:
540             return ResourceState2str.get(state)
541
542         return state
543
544     def stop(self, guid):
545         """ Stop a specific RM defined by its 'guid'
546
547             :param guid: Guid of the RM
548             :type guid: int
549
550         """
551         rm = self.get_resource(guid)
552         return rm.stop()
553
554     def start(self, guid):
555         """ Start a specific RM defined by its 'guid'
556
557             :param guid: Guid of the RM
558             :type guid: int
559
560         """
561         rm = self.get_resource(guid)
562         return rm.start()
563
564     def set_with_conditions(self, name, value, guids1, guids2, state,
565             time = None):
566         """ Set value 'value' on attribute with name 'name' on all RMs of
567             guids1 when 'time' has elapsed since all elements in guids2 
568             have reached state 'state'.
569
570             :param name: Name of attribute to set in RM
571             :type name: string
572
573             :param value: Value of attribute to set in RM
574             :type name: string
575
576             :param guids1: List of guids of RMs subjected to action
577             :type guids1: list
578
579             :param action: Action to register (either START or STOP)
580             :type action: ResourceAction
581
582             :param guids2: List of guids of RMs to we waited for
583             :type guids2: list
584
585             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
586             :type state: ResourceState
587
588             :param time: Time to wait after guids2 has reached status 
589             :type time: string
590
591         """
592         if isinstance(guids1, int):
593             guids1 = [guids1]
594         if isinstance(guids2, int):
595             guids2 = [guids2]
596
597         for guid1 in guids1:
598             rm = self.get_resource(guid)
599             rm.set_with_conditions(name, value, guids2, state, time)
600
601     def stop_with_conditions(self, guid):
602         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
603
604             :param guid: Guid of the RM
605             :type guid: int
606
607         """
608         rm = self.get_resource(guid)
609         return rm.stop_with_conditions()
610
611     def start_with_conditions(self, guid):
612         """ Start a specific RM defined by its 'guid' only if all the conditions are true
613
614             :param guid: Guid of the RM
615             :type guid: int
616
617         """
618         rm = self.get_resource(guid)
619         return rm.start_with_conditions()
620
621     def deploy(self, guids = None, wait_all_ready = True, group = None):
622         """ Deploy all resource manager in guids list
623
624         :param guids: List of guids of RMs to deploy
625         :type guids: list
626
627         :param wait_all_ready: Wait until all RMs are ready in
628             order to start the RMs
629         :type guid: int
630
631         :param group: Id of deployment group in which to deploy RMs
632         :type group: int
633
634         """
635         self.logger.debug(" ------- DEPLOY START ------ ")
636
637         if not guids:
638             # If no guids list was passed, all 'NEW' RMs will be deployed
639             guids = []
640             for guid in self.resources:
641                 if self.state(guid) == ResourceState.NEW:
642                     guids.append(guid)
643                 
644         if isinstance(guids, int):
645             guids = [guids]
646
647         # Create deployment group
648         # New guids can be added to a same deployment group later on
649         new_group = False
650         if not group:
651             new_group = True
652             group = self._group_id_generator.next()
653
654         if group not in self._groups:
655             self._groups[group] = []
656
657         self._groups[group].extend(guids)
658
659         def wait_all_and_start(group):
660             # Function that checks if all resources are READY
661             # before scheduling a start_with_conditions for each RM
662             reschedule = False
663             
664             # Get all guids in group
665             guids = self._groups[group]
666
667             for guid in guids:
668                 if self.state(guid) < ResourceState.READY:
669                     reschedule = True
670                     break
671
672             if reschedule:
673                 callback = functools.partial(wait_all_and_start, group)
674                 self.schedule("1s", callback)
675             else:
676                 # If all resources are read, we schedule the start
677                 for guid in guids:
678                     rm = self.get_resource(guid)
679                     self.schedule("0s", rm.start_with_conditions)
680
681         if wait_all_ready and new_group:
682             # Schedule a function to check that all resources are
683             # READY, and only then schedule the start.
684             # This aims at reducing the number of tasks looping in the 
685             # scheduler. 
686             # Instead of having many start tasks, we will have only one for 
687             # the whole group.
688             callback = functools.partial(wait_all_and_start, group)
689             self.schedule("1s", callback)
690
691         for guid in guids:
692             rm = self.get_resource(guid)
693             rm.deployment_group = group
694             self.schedule("0s", rm.deploy_with_conditions)
695
696             if not wait_all_ready:
697                 self.schedule("1s", rm.start_with_conditions)
698
699             if rm.conditions.get(ResourceAction.STOP):
700                 # Only if the RM has STOP conditions we
701                 # schedule a stop. Otherwise the RM will stop immediately
702                 self.schedule("2s", rm.stop_with_conditions)
703
704     def release(self, guids = None):
705         """ Release al RMs on the guids list or 
706         all the resources if no list is specified
707
708             :param guids: List of RM guids
709             :type guids: list
710
711         """
712         if not guids:
713             guids = self.resources
714
715         for guid in guids:
716             rm = self.get_resource(guid)
717             self.schedule("0s", rm.release)
718
719         self.wait_released(guids)
720         
721     def shutdown(self):
722         """ Shutdown the Experiment Controller. 
723         Releases all the resources and stops task processing thread
724
725         """
726         # TODO: Clean the parallel runner!! STOP all ongoing tasks
727         ####
728
729         self.release()
730
731         # Mark the EC state as TERMINATED
732         self._state = ECState.TERMINATED
733
734         # Stop processing thread
735         self._stop = True
736
737         # Notify condition to wake up the processing thread
738         self._notify()
739         
740         if self._thread.is_alive():
741            self._thread.join()
742
743     def schedule(self, date, callback, track = False):
744         """ Schedule a callback to be executed at time date.
745
746             :param date: string containing execution time for the task.
747                     It can be expressed as an absolute time, using
748                     timestamp format, or as a relative time matching
749                     ^\d+.\d+(h|m|s|ms|us)$
750
751             :param callback: code to be executed for the task. Must be a
752                         Python function, and receives args and kwargs
753                         as arguments.
754
755             :param track: if set to True, the task will be retrivable with
756                     the get_task() method
757
758             :return : The Id of the task
759         """
760         timestamp = stabsformat(date)
761         task = Task(timestamp, callback)
762         task = self._scheduler.schedule(task)
763
764         if track:
765             self._tasks[task.id] = task
766
767         # Notify condition to wake up the processing thread
768         self._notify()
769
770         return task.id
771      
772     def _process(self):
773         """ Process scheduled tasks.
774
775         .. note::
776
777         The _process method is executed in an independent thread held by the 
778         ExperimentController for as long as the experiment is running.
779         
780         Tasks are scheduled by invoking the schedule method with a target callback. 
781         The schedule method is given a execution time which controls the
782         order in which tasks are processed. 
783
784         Tasks are processed in parallel using multithreading. 
785         The environmental variable NEPI_NTHREADS can be used to control
786         the number of threads used to process tasks. The default value is 50.
787
788         Exception handling:
789
790         To execute tasks in parallel, an ParallelRunner (PR) object, holding
791         a pool of threads (workers), is used.
792         For each available thread in the PR, the next task popped from 
793         the scheduler queue is 'put' in the PR.
794         Upon receiving a task to execute, each PR worker (thread) invokes the 
795         _execute method of the EC, passing the task as argument. 
796         This method, calls task.callback inside a try/except block. If an 
797         exception is raised by the tasks.callback, it will be trapped by the 
798         try block, logged to standard error (usually the console), and the EC 
799         state will be set to ECState.FAILED.
800         The invocation of _notify immediately after, forces the processing
801         loop in the _process method, to wake up if it was blocked waiting for new 
802         tasks to arrived, and to check the EC state.
803         As the EC is in FAILED state, the processing loop exits and the 
804         'finally' block is invoked. In the 'finally' block, the 'sync' method
805         of the PR is invoked, which forces the PR to raise any unchecked errors
806         that might have been raised by the workers.
807
808         """
809         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
810
811         runner = ParallelRun(maxthreads = nthreads)
812         runner.start()
813
814         while not self._stop:
815             try:
816                 self._cond.acquire()
817
818                 task = self._scheduler.next()
819                 
820                 if not task:
821                     # No task to execute. Wait for a new task to be scheduled.
822                     self._cond.wait()
823                 else:
824                     # The task timestamp is in the future. Wait for timeout 
825                     # or until another task is scheduled.
826                     now = tnow()
827                     if now < task.timestamp:
828                         # Calculate timeout in seconds
829                         timeout = tdiffsec(task.timestamp, now)
830
831                         # Re-schedule task with the same timestamp
832                         self._scheduler.schedule(task)
833                         
834                         task = None
835
836                         # Wait timeout or until a new task awakes the condition
837                         self._cond.wait(timeout)
838                
839                 self._cond.release()
840
841                 if task:
842                     # Process tasks in parallel
843                     runner.put(self._execute, task)
844             except: 
845                 import traceback
846                 err = traceback.format_exc()
847                 self.logger.error("Error while processing tasks in the EC: %s" % err)
848
849                 # Set the EC to FAILED state 
850                 self._state = ECState.FAILED
851
852                 # Set the FailureManager failure level
853                 self._fm.set_ec_failure()
854
855         self.logger.debug("Exiting the task processing loop ... ")
856         runner.sync()
857         runner.destroy()
858
859     def _execute(self, task):
860         """ Executes a single task. 
861
862             :param task: Object containing the callback to execute
863             :type task: Task
864
865         .. note::
866
867         If the invokation of the task callback raises an
868         exception, the processing thread of the ExperimentController
869         will be stopped and the experiment will be aborted.
870
871         """
872         # Invoke callback
873         task.status = TaskStatus.DONE
874
875         try:
876             task.result = task.callback()
877         except:
878             import traceback
879             err = traceback.format_exc()
880             task.result = err
881             task.status = TaskStatus.ERROR
882             
883             self.logger.error("Error occurred while executing task: %s" % err)
884
885             # Set the FailureManager failure level
886             self._fm.set_task_failure()
887
888     def _notify(self):
889         """ Awakes the processing thread in case it is blocked waiting
890         for a new task to be scheduled.
891         """
892         self._cond.acquire()
893         self._cond.notify()
894         self._cond.release()
895