Adding unit test for RM 'critical' attribute
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state
42     """
43     OK = 1
44     RM_FAILURE = 2
45     EC_FAILURE = 3
46
47 class FailureManager(object):
48     """ The FailureManager is responsible for handling errors,
49     and deciding whether an experiment should be aborted
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55
56     @property
57     def ec(self):
58         """ Returns the Experiment Controller """
59         return self._ec()
60
61     @property
62     def abort(self):
63         if self._failure_level == FailureLevel.OK:
64             for guid in self.ec.resources:
65                 state = self.ec.state(guid)
66                 critical = self.ec.get(guid, "critical")
67
68                 if state == ResourceState.FAILED and critical:
69                     self._failure_level = FailureLevel.RM_FAILURE
70                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
71                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
72                     break
73
74         return self._failure_level != FailureLevel.OK
75
76     def set_ec_failure(self):
77         self._failure_level = FailureLevel.EC_FAILURE
78
79
80 class ECState(object):
81     """ State of the Experiment Controller
82    
83     """
84     RUNNING = 1
85     FAILED = 2
86     TERMINATED = 3
87
88 class ExperimentController(object):
89     """
90     .. class:: Class Args :
91       
92         :param exp_id: Human readable identifier for the experiment scenario. 
93         :type exp_id: str
94
95     .. note::
96
97         An experiment, or scenario, is defined by a concrete set of resources,
98         behavior, configuration and interconnection of those resources. 
99         The Experiment Description (ED) is a detailed representation of a
100         single experiment. It contains all the necessary information to 
101         allow repeating the experiment. NEPI allows to describe
102         experiments by registering components (resources), configuring them
103         and interconnecting them.
104         
105         A same experiment (scenario) can be executed many times, generating 
106         different results. We call an experiment execution (instance) a 'run'.
107
108         The ExperimentController (EC), is the entity responsible of
109         managing an experiment run. The same scenario can be 
110         recreated (and re-run) by instantiating an EC and recreating 
111         the same experiment description. 
112
113         In NEPI, an experiment is represented as a graph of interconnected
114         resources. A resource is a generic concept in the sense that any
115         component taking part of an experiment, whether physical of
116         virtual, is considered a resource. A resources could be a host, 
117         a virtual machine, an application, a simulator, a IP address.
118
119         A ResourceManager (RM), is the entity responsible for managing a 
120         single resource. ResourceManagers are specific to a resource
121         type (i.e. An RM to control a Linux application will not be
122         the same as the RM used to control a ns-3 simulation).
123         To support a new type of resource in NEPI, a new RM must be 
124         implemented. NEPI already provides a variety of
125         RMs to control basic resources, and new can be extended from
126         the existing ones.
127
128         Through the EC interface the user can create ResourceManagers (RMs),
129         configure them and interconnect them, to describe an experiment.
130         Describing an experiment through the EC does not run the experiment.
131         Only when the 'deploy()' method is invoked on the EC, the EC will take 
132         actions to transform the 'described' experiment into a 'running' experiment.
133
134         While the experiment is running, it is possible to continue to
135         create/configure/connect RMs, and to deploy them to involve new
136         resources in the experiment (this is known as 'interactive' deployment).
137         
138         An experiments in NEPI is identified by a string id, 
139         which is either given by the user, or automatically generated by NEPI.  
140         The purpose of this identifier is to separate files and results that 
141         belong to different experiment scenarios. 
142         However, since a same 'experiment' can be run many times, the experiment
143         id is not enough to identify an experiment instance (run).
144         For this reason, the ExperimentController has two identifier, the 
145         exp_id, which can be re-used in different ExperimentController,
146         and the run_id, which is unique to one ExperimentController instance, and
147         is automatically generated by NEPI.
148         
149     """
150
151     def __init__(self, exp_id = None): 
152         super(ExperimentController, self).__init__()
153         # Logging
154         self._logger = logging.getLogger("ExperimentController")
155
156         # Run identifier. It identifies a concrete execution instance (run) 
157         # of an experiment.
158         # Since a same experiment (same configuration) can be executed many 
159         # times, this run_id permits to separate result files generated on 
160         # different experiment executions
161         self._run_id = tsformat()
162
163         # Experiment identifier. Usually assigned by the user
164         # Identifies the experiment scenario (i.e. configuration, 
165         # resources used, etc)
166         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
167
168         # generator of globally unique ids
169         self._guid_generator = guid.GuidGenerator()
170         
171         # Resource managers
172         self._resources = dict()
173
174         # Scheduler. It a queue that holds tasks scheduled for
175         # execution, and yields the next task to be executed 
176         # ordered by execution and arrival time
177         self._scheduler = HeapScheduler()
178
179         # Tasks
180         self._tasks = dict()
181
182         # RM groups (for deployment) 
183         self._groups = dict()
184
185         # generator of globally unique id for groups
186         self._group_id_generator = guid.GuidGenerator()
187
188         # Flag to stop processing thread
189         self._stop = False
190     
191         # Entity in charge of managing system failures
192         self._fm = FailureManager(self)
193
194         # EC state
195         self._state = ECState.RUNNING
196
197         # The runner is a pool of threads used to parallelize 
198         # execution of tasks
199         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
200         self._runner = ParallelRun(maxthreads = nthreads)
201
202         # Event processing thread
203         self._cond = threading.Condition()
204         self._thread = threading.Thread(target = self._process)
205         self._thread.setDaemon(True)
206         self._thread.start()
207
208     @property
209     def logger(self):
210         """ Return the logger of the Experiment Controller
211
212         """
213         return self._logger
214
215     @property
216     def ecstate(self):
217         """ Return the state of the Experiment Controller
218
219         """
220         return self._state
221
222     @property
223     def exp_id(self):
224         """ Return the experiment id assigned by the user
225
226         """
227         return self._exp_id
228
229     @property
230     def run_id(self):
231         """ Return the experiment instance (run) identifier  
232
233         """
234         return self._run_id
235
236     @property
237     def abort(self):
238         return self._fm.abort
239
240     def wait_finished(self, guids):
241         """ Blocking method that wait until all RMs in the 'guid' list 
242             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
243             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
244
245         :param guids: List of guids
246         :type guids: list
247
248         """
249
250         def quit():
251             return self.abort
252
253         return self.wait(guids, state = ResourceState.STOPPED, 
254                 quit = quit)
255
256     def wait_started(self, guids):
257         """ Blocking method that wait until all RMs in the 'guid' list 
258             reach a state >= STARTED or until a System Failure occurs 
259             (e.g. Task Failure) 
260
261         :param guids: List of guids
262         :type guids: list
263         """
264
265         def quit():
266             return self.abort
267
268         return self.wait(guids, state = ResourceState.STARTED, 
269                 quit = quit)
270
271     def wait_released(self, guids):
272         """ Blocking method that wait until all RMs in the 'guid' list 
273             reach a state = RELEASED or until the EC fails
274
275         :param guids: List of guids
276         :type guids: list
277         """
278
279         def quit():
280             return self._state == ECState.FAILED
281
282         return self.wait(guids, state = ResourceState.RELEASED, 
283                 quit = quit)
284
285     def wait_deployed(self, guids):
286         """ Blocking method that wait until all RMs in the 'guid' list 
287             reach a state >= READY or until a System Failure occurs 
288             (e.g. Task Failure) 
289
290         :param guids: List of guids
291         :type guids: list
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.READY, 
298                 quit = quit)
299
300     def wait(self, guids, state, quit):
301         """ Blocking method that wait until all RMs in the 'guid' list 
302             reach a state >= 'state' or until quit yileds True
303            
304         :param guids: List of guids
305         :type guids: list
306         """
307         if isinstance(guids, int):
308             guids = [guids]
309
310         # Make a copy to avoid modifying the original guids list
311         guids = list(guids)
312
313         while True:
314             # If there are no more guids to wait for
315             # or the quit function returns True, exit the loop
316             if len(guids) == 0 or quit():
317                 break
318
319             # If a guid reached one of the target states, remove it from list
320             guid = guids[0]
321             rstate = self.state(guid)
322             
323             hrrstate = ResourceState2str.get(rstate)
324             hrstate = ResourceState2str.get(state)
325
326             if rstate >= state:
327                 guids.remove(guid)
328                 self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
329                     guid, hrrstate, hrstate))
330             else:
331                 # Debug...
332                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
333                     guid, hrrstate, hrstate))
334                 time.sleep(0.5)
335   
336     def get_task(self, tid):
337         """ Get a specific task
338
339         :param tid: Id of the task
340         :type tid: int
341         :rtype: Task
342         """
343         return self._tasks.get(tid)
344
345     def get_resource(self, guid):
346         """ Get a specific Resource Manager
347
348         :param guid: Id of the task
349         :type guid: int
350         :rtype: ResourceManager
351         """
352         return self._resources.get(guid)
353
354     @property
355     def resources(self):
356         """ Returns the list of all the Resource Manager Id
357
358         :rtype: set
359
360         """
361         return self._resources.keys()
362
363     def register_resource(self, rtype, guid = None):
364         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
365         for the RM of type 'rtype' and add it to the list of Resources.
366
367         :param rtype: Type of the RM
368         :type rtype: str
369         :return: Id of the RM
370         :rtype: int
371         """
372         # Get next available guid
373         guid = self._guid_generator.next(guid)
374         
375         # Instantiate RM
376         rm = ResourceFactory.create(rtype, self, guid)
377
378         # Store RM
379         self._resources[guid] = rm
380
381         return guid
382
383     def get_attributes(self, guid):
384         """ Return all the attibutes of a specific RM
385
386         :param guid: Guid of the RM
387         :type guid: int
388         :return: List of attributes
389         :rtype: list
390         """
391         rm = self.get_resource(guid)
392         return rm.get_attributes()
393
394     def register_connection(self, guid1, guid2):
395         """ Registers a guid1 with a guid2. 
396             The declaration order is not important
397
398             :param guid1: First guid to connect
399             :type guid1: ResourceManager
400
401             :param guid2: Second guid to connect
402             :type guid: ResourceManager
403         """
404         rm1 = self.get_resource(guid1)
405         rm2 = self.get_resource(guid2)
406
407         rm1.register_connection(guid2)
408         rm2.register_connection(guid1)
409
410     def register_condition(self, guids1, action, guids2, state,
411             time = None):
412         """ Registers an action START or STOP for all RM on guids1 to occur 
413             time 'time' after all elements in guids2 reached state 'state'.
414
415             :param guids1: List of guids of RMs subjected to action
416             :type guids1: list
417
418             :param action: Action to register (either START or STOP)
419             :type action: ResourceAction
420
421             :param guids2: List of guids of RMs to we waited for
422             :type guids2: list
423
424             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
425             :type state: ResourceState
426
427             :param time: Time to wait after guids2 has reached status 
428             :type time: string
429
430         """
431         if isinstance(guids1, int):
432             guids1 = [guids1]
433         if isinstance(guids2, int):
434             guids2 = [guids2]
435
436         for guid1 in guids1:
437             rm = self.get_resource(guid1)
438             rm.register_condition(action, guids2, state, time)
439
440     def enable_trace(self, guid, name):
441         """ Enable trace
442
443         :param name: Name of the trace
444         :type name: str
445         """
446         rm = self.get_resource(guid)
447         rm.enable_trace(name)
448
449     def trace_enabled(self, guid, name):
450         """ Returns True if trace is enabled
451
452         :param name: Name of the trace
453         :type name: str
454         """
455         rm = self.get_resource(guid)
456         return rm.trace_enabled(name)
457
458     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
459         """ Get information on collected trace
460
461         :param name: Name of the trace
462         :type name: str
463
464         :param attr: Can be one of:
465                          - TraceAttr.ALL (complete trace content), 
466                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
467                          - TraceAttr.PATH (full path to the trace file),
468                          - TraceAttr.SIZE (size of trace file). 
469         :type attr: str
470
471         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
472         :type name: int
473
474         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
475         :type name: int
476
477         :rtype: str
478         """
479         rm = self.get_resource(guid)
480         return rm.trace(name, attr, block, offset)
481
482     def discover(self, guid):
483         """ Discover a specific RM defined by its 'guid'
484
485             :param guid: Guid of the RM
486             :type guid: int
487
488         """
489         rm = self.get_resource(guid)
490         return rm.discover()
491
492     def provision(self, guid):
493         """ Provision a specific RM defined by its 'guid'
494
495             :param guid: Guid of the RM
496             :type guid: int
497
498         """
499         rm = self.get_resource(guid)
500         return rm.provision()
501
502     def get(self, guid, name):
503         """ Get a specific attribute 'name' from the RM 'guid'
504
505             :param guid: Guid of the RM
506             :type guid: int
507
508             :param name: attribute's name
509             :type name: str
510
511         """
512         rm = self.get_resource(guid)
513         return rm.get(name)
514
515     def set(self, guid, name, value):
516         """ Set a specific attribute 'name' from the RM 'guid' 
517             with the value 'value' 
518
519             :param guid: Guid of the RM
520             :type guid: int
521
522             :param name: attribute's name
523             :type name: str
524
525             :param value: attribute's value
526
527         """
528         rm = self.get_resource(guid)
529         return rm.set(name, value)
530
531     def state(self, guid, hr = False):
532         """ Returns the state of a resource
533
534             :param guid: Resource guid
535             :type guid: integer
536
537             :param hr: Human readable. Forces return of a 
538                 status string instead of a number 
539             :type hr: boolean
540
541         """
542         rm = self.get_resource(guid)
543         state = rm.state
544
545         if hr:
546             return ResourceState2str.get(state)
547
548         return state
549
550     def stop(self, guid):
551         """ Stop a specific RM defined by its 'guid'
552
553             :param guid: Guid of the RM
554             :type guid: int
555
556         """
557         rm = self.get_resource(guid)
558         return rm.stop()
559
560     def start(self, guid):
561         """ Start a specific RM defined by its 'guid'
562
563             :param guid: Guid of the RM
564             :type guid: int
565
566         """
567         rm = self.get_resource(guid)
568         return rm.start()
569
570     def set_with_conditions(self, name, value, guids1, guids2, state,
571             time = None):
572         """ Set value 'value' on attribute with name 'name' on all RMs of
573             guids1 when 'time' has elapsed since all elements in guids2 
574             have reached state 'state'.
575
576             :param name: Name of attribute to set in RM
577             :type name: string
578
579             :param value: Value of attribute to set in RM
580             :type name: string
581
582             :param guids1: List of guids of RMs subjected to action
583             :type guids1: list
584
585             :param action: Action to register (either START or STOP)
586             :type action: ResourceAction
587
588             :param guids2: List of guids of RMs to we waited for
589             :type guids2: list
590
591             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
592             :type state: ResourceState
593
594             :param time: Time to wait after guids2 has reached status 
595             :type time: string
596
597         """
598         if isinstance(guids1, int):
599             guids1 = [guids1]
600         if isinstance(guids2, int):
601             guids2 = [guids2]
602
603         for guid1 in guids1:
604             rm = self.get_resource(guid)
605             rm.set_with_conditions(name, value, guids2, state, time)
606
607     def deploy(self, guids = None, wait_all_ready = True, group = None):
608         """ Deploy all resource manager in guids list
609
610         :param guids: List of guids of RMs to deploy
611         :type guids: list
612
613         :param wait_all_ready: Wait until all RMs are ready in
614             order to start the RMs
615         :type guid: int
616
617         :param group: Id of deployment group in which to deploy RMs
618         :type group: int
619
620         """
621         self.logger.debug(" ------- DEPLOY START ------ ")
622
623         if not guids:
624             # If no guids list was passed, all 'NEW' RMs will be deployed
625             guids = []
626             for guid in self.resources:
627                 if self.state(guid) == ResourceState.NEW:
628                     guids.append(guid)
629                 
630         if isinstance(guids, int):
631             guids = [guids]
632
633         # Create deployment group
634         # New guids can be added to a same deployment group later on
635         new_group = False
636         if not group:
637             new_group = True
638             group = self._group_id_generator.next()
639
640         if group not in self._groups:
641             self._groups[group] = []
642
643         self._groups[group].extend(guids)
644
645         def wait_all_and_start(group):
646             # Function that checks if all resources are READY
647             # before scheduling a start_with_conditions for each RM
648             reschedule = False
649             
650             # Get all guids in group
651             guids = self._groups[group]
652
653             for guid in guids:
654                 if self.state(guid) < ResourceState.READY:
655                     reschedule = True
656                     break
657
658             if reschedule:
659                 callback = functools.partial(wait_all_and_start, group)
660                 self.schedule("1s", callback)
661             else:
662                 # If all resources are ready, we schedule the start
663                 for guid in guids:
664                     rm = self.get_resource(guid)
665                     self.schedule("0s", rm.start_with_conditions)
666
667         if wait_all_ready and new_group:
668             # Schedule a function to check that all resources are
669             # READY, and only then schedule the start.
670             # This aims at reducing the number of tasks looping in the 
671             # scheduler. 
672             # Instead of having many start tasks, we will have only one for 
673             # the whole group.
674             callback = functools.partial(wait_all_and_start, group)
675             self.schedule("0s", callback)
676
677         for guid in guids:
678             rm = self.get_resource(guid)
679             rm.deployment_group = group
680             self.schedule("0s", rm.deploy_with_conditions)
681
682             if not wait_all_ready:
683                 self.schedule("0s", rm.start_with_conditions)
684
685             if rm.conditions.get(ResourceAction.STOP):
686                 # Only if the RM has STOP conditions we
687                 # schedule a stop. Otherwise the RM will stop immediately
688                 self.schedule("0s", rm.stop_with_conditions)
689
690     def release(self, guids = None):
691         """ Release al RMs on the guids list or 
692         all the resources if no list is specified
693
694             :param guids: List of RM guids
695             :type guids: list
696
697         """
698         if not guids:
699             guids = self.resources
700
701         # Remove all pending tasks from the scheduler queue
702         for tid in list(self._scheduler.pending):
703             self._scheduler.remove(tid)
704
705         self._runner.empty()
706
707         for guid in guids:
708             rm = self.get_resource(guid)
709             self.schedule("0s", rm.release)
710
711         self.wait_released(guids)
712         
713     def shutdown(self):
714         """ Shutdown the Experiment Controller. 
715         Releases all the resources and stops task processing thread
716
717         """
718         # If there was a major failure we can't exit gracefully
719         if self._state == ECState.FAILED:
720             raise RuntimeError("EC failure. Can not exit gracefully")
721
722         self.release()
723
724         # Mark the EC state as TERMINATED
725         self._state = ECState.TERMINATED
726
727         # Stop processing thread
728         self._stop = True
729
730         # Notify condition to wake up the processing thread
731         self._notify()
732         
733         if self._thread.is_alive():
734            self._thread.join()
735
736     def schedule(self, date, callback, track = False):
737         """ Schedule a callback to be executed at time date.
738
739             :param date: string containing execution time for the task.
740                     It can be expressed as an absolute time, using
741                     timestamp format, or as a relative time matching
742                     ^\d+.\d+(h|m|s|ms|us)$
743
744             :param callback: code to be executed for the task. Must be a
745                         Python function, and receives args and kwargs
746                         as arguments.
747
748             :param track: if set to True, the task will be retrivable with
749                     the get_task() method
750
751             :return : The Id of the task
752         """
753         timestamp = stabsformat(date)
754         task = Task(timestamp, callback)
755         task = self._scheduler.schedule(task)
756
757         if track:
758             self._tasks[task.id] = task
759
760         # Notify condition to wake up the processing thread
761         self._notify()
762
763         return task.id
764      
765     def _process(self):
766         """ Process scheduled tasks.
767
768         .. note::
769
770         The _process method is executed in an independent thread held by the 
771         ExperimentController for as long as the experiment is running.
772         
773         Tasks are scheduled by invoking the schedule method with a target callback. 
774         The schedule method is given a execution time which controls the
775         order in which tasks are processed. 
776
777         Tasks are processed in parallel using multithreading. 
778         The environmental variable NEPI_NTHREADS can be used to control
779         the number of threads used to process tasks. The default value is 50.
780
781         Exception handling:
782
783         To execute tasks in parallel, an ParallelRunner (PR) object, holding
784         a pool of threads (workers), is used.
785         For each available thread in the PR, the next task popped from 
786         the scheduler queue is 'put' in the PR.
787         Upon receiving a task to execute, each PR worker (thread) invokes the 
788         _execute method of the EC, passing the task as argument. 
789         This method, calls task.callback inside a try/except block. If an 
790         exception is raised by the tasks.callback, it will be trapped by the 
791         try block, logged to standard error (usually the console), and the EC 
792         state will be set to ECState.FAILED.
793         The invocation of _notify immediately after, forces the processing
794         loop in the _process method, to wake up if it was blocked waiting for new 
795         tasks to arrived, and to check the EC state.
796         As the EC is in FAILED state, the processing loop exits and the 
797         'finally' block is invoked. In the 'finally' block, the 'sync' method
798         of the PR is invoked, which forces the PR to raise any unchecked errors
799         that might have been raised by the workers.
800
801         """
802
803         self._runner.start()
804
805         while not self._stop:
806             try:
807                 self._cond.acquire()
808
809                 task = self._scheduler.next()
810                 
811                 if not task:
812                     # No task to execute. Wait for a new task to be scheduled.
813                     self._cond.wait()
814                 else:
815                     # The task timestamp is in the future. Wait for timeout 
816                     # or until another task is scheduled.
817                     now = tnow()
818                     if now < task.timestamp:
819                         # Calculate timeout in seconds
820                         timeout = tdiffsec(task.timestamp, now)
821
822                         # Re-schedule task with the same timestamp
823                         self._scheduler.schedule(task)
824                         
825                         task = None
826
827                         # Wait timeout or until a new task awakes the condition
828                         self._cond.wait(timeout)
829                
830                 self._cond.release()
831
832                 if task:
833                     # Process tasks in parallel
834                     self._runner.put(self._execute, task)
835             except: 
836                 import traceback
837                 err = traceback.format_exc()
838                 self.logger.error("Error while processing tasks in the EC: %s" % err)
839
840                 # Set the EC to FAILED state 
841                 self._state = ECState.FAILED
842             
843                 # Set the FailureManager failure level to EC failure
844                 self._fm.set_ec_failure()
845
846         self.logger.debug("Exiting the task processing loop ... ")
847         
848         self._runner.sync()
849         self._runner.destroy()
850
851     def _execute(self, task):
852         """ Executes a single task. 
853
854             :param task: Object containing the callback to execute
855             :type task: Task
856
857         .. note::
858
859         If the invokation of the task callback raises an
860         exception, the processing thread of the ExperimentController
861         will be stopped and the experiment will be aborted.
862
863         """
864         # Invoke callback
865         task.status = TaskStatus.DONE
866
867         try:
868             task.result = task.callback()
869         except:
870             import traceback
871             err = traceback.format_exc()
872             task.result = err
873             task.status = TaskStatus.ERROR
874             
875             self.logger.error("Error occurred while executing task: %s" % err)
876
877     def _notify(self):
878         """ Awakes the processing thread in case it is blocked waiting
879         for a new task to be scheduled.
880         """
881         self._cond.acquire()
882         self._cond.notify()
883         self._cond.release()
884