Modified FailureManager to abort only when critical resources fail
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state
42     """
43     OK = 1
44     RM_FAILURE = 2
45     EC_FAILURE = 3
46
47 class FailureManager(object):
48     """ The FailureManager is responsible for handling errors,
49     and deciding whether an experiment should be aborted
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55
56     @property
57     def ec(self):
58         """ Returns the Experiment Controller """
59         return self._ec()
60
61     @property
62     def abort(self):
63         if self._failure_level == FailureLevel.OK:
64             for guid in self.ec.resources:
65                 state = self.ec.state(guid)
66                 critical = self.ec.get(guid, "critical")
67
68                 if state == ResourceState.FAILED and critical:
69                     self._failure_level = FailureLevel.RM_FAILURE
70                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
71                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
72                     break
73
74         return self._failure_level != FailureLevel.OK
75
76     def set_ec_failure(self):
77         self._failure_level = FailureLevel.EC_FAILURE
78
79
80 class ECState(object):
81     """ State of the Experiment Controller
82    
83     """
84     RUNNING = 1
85     FAILED = 2
86     TERMINATED = 3
87
88 class ExperimentController(object):
89     """
90     .. class:: Class Args :
91       
92         :param exp_id: Human readable identifier for the experiment scenario. 
93         :type exp_id: str
94
95     .. note::
96
97         An experiment, or scenario, is defined by a concrete set of resources,
98         behavior, configuration and interconnection of those resources. 
99         The Experiment Description (ED) is a detailed representation of a
100         single experiment. It contains all the necessary information to 
101         allow repeating the experiment. NEPI allows to describe
102         experiments by registering components (resources), configuring them
103         and interconnecting them.
104         
105         A same experiment (scenario) can be executed many times, generating 
106         different results. We call an experiment execution (instance) a 'run'.
107
108         The ExperimentController (EC), is the entity responsible of
109         managing an experiment run. The same scenario can be 
110         recreated (and re-run) by instantiating an EC and recreating 
111         the same experiment description. 
112
113         In NEPI, an experiment is represented as a graph of interconnected
114         resources. A resource is a generic concept in the sense that any
115         component taking part of an experiment, whether physical of
116         virtual, is considered a resource. A resources could be a host, 
117         a virtual machine, an application, a simulator, a IP address.
118
119         A ResourceManager (RM), is the entity responsible for managing a 
120         single resource. ResourceManagers are specific to a resource
121         type (i.e. An RM to control a Linux application will not be
122         the same as the RM used to control a ns-3 simulation).
123         To support a new type of resource in NEPI, a new RM must be 
124         implemented. NEPI already provides a variety of
125         RMs to control basic resources, and new can be extended from
126         the existing ones.
127
128         Through the EC interface the user can create ResourceManagers (RMs),
129         configure them and interconnect them, to describe an experiment.
130         Describing an experiment through the EC does not run the experiment.
131         Only when the 'deploy()' method is invoked on the EC, the EC will take 
132         actions to transform the 'described' experiment into a 'running' experiment.
133
134         While the experiment is running, it is possible to continue to
135         create/configure/connect RMs, and to deploy them to involve new
136         resources in the experiment (this is known as 'interactive' deployment).
137         
138         An experiments in NEPI is identified by a string id, 
139         which is either given by the user, or automatically generated by NEPI.  
140         The purpose of this identifier is to separate files and results that 
141         belong to different experiment scenarios. 
142         However, since a same 'experiment' can be run many times, the experiment
143         id is not enough to identify an experiment instance (run).
144         For this reason, the ExperimentController has two identifier, the 
145         exp_id, which can be re-used in different ExperimentController,
146         and the run_id, which is unique to one ExperimentController instance, and
147         is automatically generated by NEPI.
148         
149     """
150
151     def __init__(self, exp_id = None): 
152         super(ExperimentController, self).__init__()
153         # Logging
154         self._logger = logging.getLogger("ExperimentController")
155
156         # Run identifier. It identifies a concrete execution instance (run) 
157         # of an experiment.
158         # Since a same experiment (same configuration) can be executed many 
159         # times, this run_id permits to separate result files generated on 
160         # different experiment executions
161         self._run_id = tsformat()
162
163         # Experiment identifier. Usually assigned by the user
164         # Identifies the experiment scenario (i.e. configuration, 
165         # resources used, etc)
166         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
167
168         # generator of globally unique ids
169         self._guid_generator = guid.GuidGenerator()
170         
171         # Resource managers
172         self._resources = dict()
173
174         # Scheduler. It a queue that holds tasks scheduled for
175         # execution, and yields the next task to be executed 
176         # ordered by execution and arrival time
177         self._scheduler = HeapScheduler()
178
179         # Tasks
180         self._tasks = dict()
181
182         # RM groups (for deployment) 
183         self._groups = dict()
184
185         # generator of globally unique id for groups
186         self._group_id_generator = guid.GuidGenerator()
187
188         # Flag to stop processing thread
189         self._stop = False
190     
191         # Entity in charge of managing system failures
192         self._fm = FailureManager(self)
193
194         # EC state
195         self._state = ECState.RUNNING
196
197         # The runner is a pool of threads used to parallelize 
198         # execution of tasks
199         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
200         self._runner = ParallelRun(maxthreads = nthreads)
201
202         # Event processing thread
203         self._cond = threading.Condition()
204         self._thread = threading.Thread(target = self._process)
205         self._thread.setDaemon(True)
206         self._thread.start()
207
208     @property
209     def logger(self):
210         """ Return the logger of the Experiment Controller
211
212         """
213         return self._logger
214
215     @property
216     def ecstate(self):
217         """ Return the state of the Experiment Controller
218
219         """
220         return self._state
221
222     @property
223     def exp_id(self):
224         """ Return the experiment id assigned by the user
225
226         """
227         return self._exp_id
228
229     @property
230     def run_id(self):
231         """ Return the experiment instance (run) identifier  
232
233         """
234         return self._run_id
235
236     @property
237     def abort(self):
238         return self._fm.abort
239
240     def wait_finished(self, guids):
241         """ Blocking method that wait until all RMs in the 'guid' list 
242             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
243             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
244
245         :param guids: List of guids
246         :type guids: list
247
248         """
249
250         def quit():
251             return self.abort
252
253         return self.wait(guids, state = ResourceState.STOPPED, 
254                 quit = quit)
255
256     def wait_started(self, guids):
257         """ Blocking method that wait until all RMs in the 'guid' list 
258             reach a state >= STARTED or until a System Failure occurs 
259             (e.g. Task Failure) 
260
261         :param guids: List of guids
262         :type guids: list
263         """
264
265         def quit():
266             return self.abort
267
268         return self.wait(guids, state = ResourceState.STARTED, 
269                 quit = quit)
270
271     def wait_released(self, guids):
272         """ Blocking method that wait until all RMs in the 'guid' list 
273             reach a state = RELEASED or until the EC fails
274
275         :param guids: List of guids
276         :type guids: list
277         """
278
279         def quit():
280             return self._state == ECState.FAILED
281
282         return self.wait(guids, state = ResourceState.RELEASED, 
283                 quit = quit)
284
285     def wait_deployed(self, guids):
286         """ Blocking method that wait until all RMs in the 'guid' list 
287             reach a state >= READY or until a System Failure occurs 
288             (e.g. Task Failure) 
289
290         :param guids: List of guids
291         :type guids: list
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.READY, 
298                 quit = quit)
299
300     def wait(self, guids, state, quit):
301         """ Blocking method that wait until all RMs in the 'guid' list 
302             reach a state >= 'state' or until quit yileds True
303            
304         :param guids: List of guids
305         :type guids: list
306         """
307         if isinstance(guids, int):
308             guids = [guids]
309
310         while True:
311             # If there are no more guids to wait for
312             # or the quit function returns True, exit the loop
313             if len(guids) == 0 or quit():
314                 break
315
316             # If a guid reached one of the target states, remove it from list
317             guid = guids[0]
318             rstate = self.state(guid)
319             
320             hrrstate = ResourceState2str.get(rstate)
321             hrstate = ResourceState2str.get(state)
322
323             if rstate >= state:
324                 guids.remove(guid)
325                 self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
326                     guid, hrrstate, hrstate))
327             else:
328                 # Debug...
329                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
330                     guid, hrrstate, hrstate))
331                 time.sleep(0.5)
332   
333     def get_task(self, tid):
334         """ Get a specific task
335
336         :param tid: Id of the task
337         :type tid: int
338         :rtype: Task
339         """
340         return self._tasks.get(tid)
341
342     def get_resource(self, guid):
343         """ Get a specific Resource Manager
344
345         :param guid: Id of the task
346         :type guid: int
347         :rtype: ResourceManager
348         """
349         return self._resources.get(guid)
350
351     @property
352     def resources(self):
353         """ Returns the list of all the Resource Manager Id
354
355         :rtype: set
356
357         """
358         return self._resources.keys()
359
360     def register_resource(self, rtype, guid = None):
361         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
362         for the RM of type 'rtype' and add it to the list of Resources.
363
364         :param rtype: Type of the RM
365         :type rtype: str
366         :return: Id of the RM
367         :rtype: int
368         """
369         # Get next available guid
370         guid = self._guid_generator.next(guid)
371         
372         # Instantiate RM
373         rm = ResourceFactory.create(rtype, self, guid)
374
375         # Store RM
376         self._resources[guid] = rm
377
378         return guid
379
380     def get_attributes(self, guid):
381         """ Return all the attibutes of a specific RM
382
383         :param guid: Guid of the RM
384         :type guid: int
385         :return: List of attributes
386         :rtype: list
387         """
388         rm = self.get_resource(guid)
389         return rm.get_attributes()
390
391     def register_connection(self, guid1, guid2):
392         """ Registers a guid1 with a guid2. 
393             The declaration order is not important
394
395             :param guid1: First guid to connect
396             :type guid1: ResourceManager
397
398             :param guid2: Second guid to connect
399             :type guid: ResourceManager
400         """
401         rm1 = self.get_resource(guid1)
402         rm2 = self.get_resource(guid2)
403
404         rm1.register_connection(guid2)
405         rm2.register_connection(guid1)
406
407     def register_condition(self, guids1, action, guids2, state,
408             time = None):
409         """ Registers an action START or STOP for all RM on guids1 to occur 
410             time 'time' after all elements in guids2 reached state 'state'.
411
412             :param guids1: List of guids of RMs subjected to action
413             :type guids1: list
414
415             :param action: Action to register (either START or STOP)
416             :type action: ResourceAction
417
418             :param guids2: List of guids of RMs to we waited for
419             :type guids2: list
420
421             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
422             :type state: ResourceState
423
424             :param time: Time to wait after guids2 has reached status 
425             :type time: string
426
427         """
428         if isinstance(guids1, int):
429             guids1 = [guids1]
430         if isinstance(guids2, int):
431             guids2 = [guids2]
432
433         for guid1 in guids1:
434             rm = self.get_resource(guid1)
435             rm.register_condition(action, guids2, state, time)
436
437     def enable_trace(self, guid, name):
438         """ Enable trace
439
440         :param name: Name of the trace
441         :type name: str
442         """
443         rm = self.get_resource(guid)
444         rm.enable_trace(name)
445
446     def trace_enabled(self, guid, name):
447         """ Returns True if trace is enabled
448
449         :param name: Name of the trace
450         :type name: str
451         """
452         rm = self.get_resource(guid)
453         return rm.trace_enabled(name)
454
455     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
456         """ Get information on collected trace
457
458         :param name: Name of the trace
459         :type name: str
460
461         :param attr: Can be one of:
462                          - TraceAttr.ALL (complete trace content), 
463                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
464                          - TraceAttr.PATH (full path to the trace file),
465                          - TraceAttr.SIZE (size of trace file). 
466         :type attr: str
467
468         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
469         :type name: int
470
471         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
472         :type name: int
473
474         :rtype: str
475         """
476         rm = self.get_resource(guid)
477         return rm.trace(name, attr, block, offset)
478
479     def discover(self, guid):
480         """ Discover a specific RM defined by its 'guid'
481
482             :param guid: Guid of the RM
483             :type guid: int
484
485         """
486         rm = self.get_resource(guid)
487         return rm.discover()
488
489     def provision(self, guid):
490         """ Provision a specific RM defined by its 'guid'
491
492             :param guid: Guid of the RM
493             :type guid: int
494
495         """
496         rm = self.get_resource(guid)
497         return rm.provision()
498
499     def get(self, guid, name):
500         """ Get a specific attribute 'name' from the RM 'guid'
501
502             :param guid: Guid of the RM
503             :type guid: int
504
505             :param name: attribute's name
506             :type name: str
507
508         """
509         rm = self.get_resource(guid)
510         return rm.get(name)
511
512     def set(self, guid, name, value):
513         """ Set a specific attribute 'name' from the RM 'guid' 
514             with the value 'value' 
515
516             :param guid: Guid of the RM
517             :type guid: int
518
519             :param name: attribute's name
520             :type name: str
521
522             :param value: attribute's value
523
524         """
525         rm = self.get_resource(guid)
526         return rm.set(name, value)
527
528     def state(self, guid, hr = False):
529         """ Returns the state of a resource
530
531             :param guid: Resource guid
532             :type guid: integer
533
534             :param hr: Human readable. Forces return of a 
535                 status string instead of a number 
536             :type hr: boolean
537
538         """
539         rm = self.get_resource(guid)
540         state = rm.state
541
542         if hr:
543             return ResourceState2str.get(state)
544
545         return state
546
547     def stop(self, guid):
548         """ Stop a specific RM defined by its 'guid'
549
550             :param guid: Guid of the RM
551             :type guid: int
552
553         """
554         rm = self.get_resource(guid)
555         return rm.stop()
556
557     def start(self, guid):
558         """ Start a specific RM defined by its 'guid'
559
560             :param guid: Guid of the RM
561             :type guid: int
562
563         """
564         rm = self.get_resource(guid)
565         return rm.start()
566
567     def set_with_conditions(self, name, value, guids1, guids2, state,
568             time = None):
569         """ Set value 'value' on attribute with name 'name' on all RMs of
570             guids1 when 'time' has elapsed since all elements in guids2 
571             have reached state 'state'.
572
573             :param name: Name of attribute to set in RM
574             :type name: string
575
576             :param value: Value of attribute to set in RM
577             :type name: string
578
579             :param guids1: List of guids of RMs subjected to action
580             :type guids1: list
581
582             :param action: Action to register (either START or STOP)
583             :type action: ResourceAction
584
585             :param guids2: List of guids of RMs to we waited for
586             :type guids2: list
587
588             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
589             :type state: ResourceState
590
591             :param time: Time to wait after guids2 has reached status 
592             :type time: string
593
594         """
595         if isinstance(guids1, int):
596             guids1 = [guids1]
597         if isinstance(guids2, int):
598             guids2 = [guids2]
599
600         for guid1 in guids1:
601             rm = self.get_resource(guid)
602             rm.set_with_conditions(name, value, guids2, state, time)
603
604     def deploy(self, guids = None, wait_all_ready = True, group = None):
605         """ Deploy all resource manager in guids list
606
607         :param guids: List of guids of RMs to deploy
608         :type guids: list
609
610         :param wait_all_ready: Wait until all RMs are ready in
611             order to start the RMs
612         :type guid: int
613
614         :param group: Id of deployment group in which to deploy RMs
615         :type group: int
616
617         """
618         self.logger.debug(" ------- DEPLOY START ------ ")
619
620         if not guids:
621             # If no guids list was passed, all 'NEW' RMs will be deployed
622             guids = []
623             for guid in self.resources:
624                 if self.state(guid) == ResourceState.NEW:
625                     guids.append(guid)
626                 
627         if isinstance(guids, int):
628             guids = [guids]
629
630         # Create deployment group
631         # New guids can be added to a same deployment group later on
632         new_group = False
633         if not group:
634             new_group = True
635             group = self._group_id_generator.next()
636
637         if group not in self._groups:
638             self._groups[group] = []
639
640         self._groups[group].extend(guids)
641
642         def wait_all_and_start(group):
643             # Function that checks if all resources are READY
644             # before scheduling a start_with_conditions for each RM
645             reschedule = False
646             
647             # Get all guids in group
648             guids = self._groups[group]
649
650             for guid in guids:
651                 if self.state(guid) < ResourceState.READY:
652                     reschedule = True
653                     break
654
655             if reschedule:
656                 callback = functools.partial(wait_all_and_start, group)
657                 self.schedule("1s", callback)
658             else:
659                 # If all resources are ready, we schedule the start
660                 for guid in guids:
661                     rm = self.get_resource(guid)
662                     self.schedule("0s", rm.start_with_conditions)
663
664         if wait_all_ready and new_group:
665             # Schedule a function to check that all resources are
666             # READY, and only then schedule the start.
667             # This aims at reducing the number of tasks looping in the 
668             # scheduler. 
669             # Instead of having many start tasks, we will have only one for 
670             # the whole group.
671             callback = functools.partial(wait_all_and_start, group)
672             self.schedule("0s", callback)
673
674         for guid in guids:
675             rm = self.get_resource(guid)
676             rm.deployment_group = group
677             self.schedule("0s", rm.deploy_with_conditions)
678
679             if not wait_all_ready:
680                 self.schedule("0s", rm.start_with_conditions)
681
682             if rm.conditions.get(ResourceAction.STOP):
683                 # Only if the RM has STOP conditions we
684                 # schedule a stop. Otherwise the RM will stop immediately
685                 self.schedule("0s", rm.stop_with_conditions)
686
687     def release(self, guids = None):
688         """ Release al RMs on the guids list or 
689         all the resources if no list is specified
690
691             :param guids: List of RM guids
692             :type guids: list
693
694         """
695         if not guids:
696             guids = self.resources
697
698         # Remove all pending tasks from the scheduler queue
699         for tid in list(self._scheduler.pending):
700             self._scheduler.remove(tid)
701
702         self._runner.empty()
703
704         for guid in guids:
705             rm = self.get_resource(guid)
706             self.schedule("0s", rm.release)
707
708         self.wait_released(guids)
709         
710     def shutdown(self):
711         """ Shutdown the Experiment Controller. 
712         Releases all the resources and stops task processing thread
713
714         """
715         # If there was a major failure we can't exit gracefully
716         if self._state == ECState.FAILED:
717             raise RuntimeError("EC failure. Can not exit gracefully")
718
719         self.release()
720
721         # Mark the EC state as TERMINATED
722         self._state = ECState.TERMINATED
723
724         # Stop processing thread
725         self._stop = True
726
727         # Notify condition to wake up the processing thread
728         self._notify()
729         
730         if self._thread.is_alive():
731            self._thread.join()
732
733     def schedule(self, date, callback, track = False):
734         """ Schedule a callback to be executed at time date.
735
736             :param date: string containing execution time for the task.
737                     It can be expressed as an absolute time, using
738                     timestamp format, or as a relative time matching
739                     ^\d+.\d+(h|m|s|ms|us)$
740
741             :param callback: code to be executed for the task. Must be a
742                         Python function, and receives args and kwargs
743                         as arguments.
744
745             :param track: if set to True, the task will be retrivable with
746                     the get_task() method
747
748             :return : The Id of the task
749         """
750         timestamp = stabsformat(date)
751         task = Task(timestamp, callback)
752         task = self._scheduler.schedule(task)
753
754         if track:
755             self._tasks[task.id] = task
756
757         # Notify condition to wake up the processing thread
758         self._notify()
759
760         return task.id
761      
762     def _process(self):
763         """ Process scheduled tasks.
764
765         .. note::
766
767         The _process method is executed in an independent thread held by the 
768         ExperimentController for as long as the experiment is running.
769         
770         Tasks are scheduled by invoking the schedule method with a target callback. 
771         The schedule method is given a execution time which controls the
772         order in which tasks are processed. 
773
774         Tasks are processed in parallel using multithreading. 
775         The environmental variable NEPI_NTHREADS can be used to control
776         the number of threads used to process tasks. The default value is 50.
777
778         Exception handling:
779
780         To execute tasks in parallel, an ParallelRunner (PR) object, holding
781         a pool of threads (workers), is used.
782         For each available thread in the PR, the next task popped from 
783         the scheduler queue is 'put' in the PR.
784         Upon receiving a task to execute, each PR worker (thread) invokes the 
785         _execute method of the EC, passing the task as argument. 
786         This method, calls task.callback inside a try/except block. If an 
787         exception is raised by the tasks.callback, it will be trapped by the 
788         try block, logged to standard error (usually the console), and the EC 
789         state will be set to ECState.FAILED.
790         The invocation of _notify immediately after, forces the processing
791         loop in the _process method, to wake up if it was blocked waiting for new 
792         tasks to arrived, and to check the EC state.
793         As the EC is in FAILED state, the processing loop exits and the 
794         'finally' block is invoked. In the 'finally' block, the 'sync' method
795         of the PR is invoked, which forces the PR to raise any unchecked errors
796         that might have been raised by the workers.
797
798         """
799
800         self._runner.start()
801
802         while not self._stop:
803             try:
804                 self._cond.acquire()
805
806                 task = self._scheduler.next()
807                 
808                 if not task:
809                     # No task to execute. Wait for a new task to be scheduled.
810                     self._cond.wait()
811                 else:
812                     # The task timestamp is in the future. Wait for timeout 
813                     # or until another task is scheduled.
814                     now = tnow()
815                     if now < task.timestamp:
816                         # Calculate timeout in seconds
817                         timeout = tdiffsec(task.timestamp, now)
818
819                         # Re-schedule task with the same timestamp
820                         self._scheduler.schedule(task)
821                         
822                         task = None
823
824                         # Wait timeout or until a new task awakes the condition
825                         self._cond.wait(timeout)
826                
827                 self._cond.release()
828
829                 if task:
830                     # Process tasks in parallel
831                     self._runner.put(self._execute, task)
832             except: 
833                 import traceback
834                 err = traceback.format_exc()
835                 self.logger.error("Error while processing tasks in the EC: %s" % err)
836
837                 # Set the EC to FAILED state 
838                 self._state = ECState.FAILED
839             
840                 # Set the FailureManager failure level to EC failure
841                 self._fm.set_ec_failure()
842
843         self.logger.debug("Exiting the task processing loop ... ")
844         
845         self._runner.sync()
846         self._runner.destroy()
847
848     def _execute(self, task):
849         """ Executes a single task. 
850
851             :param task: Object containing the callback to execute
852             :type task: Task
853
854         .. note::
855
856         If the invokation of the task callback raises an
857         exception, the processing thread of the ExperimentController
858         will be stopped and the experiment will be aborted.
859
860         """
861         # Invoke callback
862         task.status = TaskStatus.DONE
863
864         try:
865             task.result = task.callback()
866         except:
867             import traceback
868             err = traceback.format_exc()
869             task.result = err
870             task.status = TaskStatus.ERROR
871             
872             self.logger.error("Error occurred while executing task: %s" % err)
873
874     def _notify(self):
875         """ Awakes the processing thread in case it is blocked waiting
876         for a new task to be scheduled.
877         """
878         self._cond.acquire()
879         self._cond.notify()
880         self._cond.release()
881