Removing ResourceState.FINISHED beacuse it is redundant with STOPPED
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state
42     """
43     OK = 1
44     RM_FAILURE = 2
45     EC_FAILURE = 3
46
47 class FailureManager(object):
48     """ The FailureManager is responsible for handling errors,
49     and deciding whether an experiment should be aborted
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55
56     @property
57     def ec(self):
58         """ Returns the Experiment Controller """
59         return self._ec()
60
61     @property
62     def abort(self):
63         if self._failure_level == FailureLevel.OK:
64             for guid in self.ec.resources:
65                 state = self.ec.state(guid)
66                 critical = self.ec.get(guid, "critical")
67                 if state == ResourceState.FAILED and critical:
68                     self._failure_level = FailureLevel.RM_FAILURE
69                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
70                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
71                     break
72
73         return self._failure_level != FailureLevel.OK
74
75     def set_ec_failure(self):
76         self._failure_level = FailureLevel.EC_FAILURE
77
78
79 class ECState(object):
80     """ State of the Experiment Controller
81    
82     """
83     RUNNING = 1
84     FAILED = 2
85     TERMINATED = 3
86
87 class ExperimentController(object):
88     """
89     .. class:: Class Args :
90       
91         :param exp_id: Human readable identifier for the experiment scenario. 
92         :type exp_id: str
93
94     .. note::
95
96         An experiment, or scenario, is defined by a concrete set of resources,
97         behavior, configuration and interconnection of those resources. 
98         The Experiment Description (ED) is a detailed representation of a
99         single experiment. It contains all the necessary information to 
100         allow repeating the experiment. NEPI allows to describe
101         experiments by registering components (resources), configuring them
102         and interconnecting them.
103         
104         A same experiment (scenario) can be executed many times, generating 
105         different results. We call an experiment execution (instance) a 'run'.
106
107         The ExperimentController (EC), is the entity responsible of
108         managing an experiment run. The same scenario can be 
109         recreated (and re-run) by instantiating an EC and recreating 
110         the same experiment description. 
111
112         In NEPI, an experiment is represented as a graph of interconnected
113         resources. A resource is a generic concept in the sense that any
114         component taking part of an experiment, whether physical of
115         virtual, is considered a resource. A resources could be a host, 
116         a virtual machine, an application, a simulator, a IP address.
117
118         A ResourceManager (RM), is the entity responsible for managing a 
119         single resource. ResourceManagers are specific to a resource
120         type (i.e. An RM to control a Linux application will not be
121         the same as the RM used to control a ns-3 simulation).
122         To support a new type of resource in NEPI, a new RM must be 
123         implemented. NEPI already provides a variety of
124         RMs to control basic resources, and new can be extended from
125         the existing ones.
126
127         Through the EC interface the user can create ResourceManagers (RMs),
128         configure them and interconnect them, to describe an experiment.
129         Describing an experiment through the EC does not run the experiment.
130         Only when the 'deploy()' method is invoked on the EC, the EC will take 
131         actions to transform the 'described' experiment into a 'running' experiment.
132
133         While the experiment is running, it is possible to continue to
134         create/configure/connect RMs, and to deploy them to involve new
135         resources in the experiment (this is known as 'interactive' deployment).
136         
137         An experiments in NEPI is identified by a string id, 
138         which is either given by the user, or automatically generated by NEPI.  
139         The purpose of this identifier is to separate files and results that 
140         belong to different experiment scenarios. 
141         However, since a same 'experiment' can be run many times, the experiment
142         id is not enough to identify an experiment instance (run).
143         For this reason, the ExperimentController has two identifier, the 
144         exp_id, which can be re-used in different ExperimentController,
145         and the run_id, which is unique to one ExperimentController instance, and
146         is automatically generated by NEPI.
147         
148     """
149
150     def __init__(self, exp_id = None): 
151         super(ExperimentController, self).__init__()
152         # Logging
153         self._logger = logging.getLogger("ExperimentController")
154
155         # Run identifier. It identifies a concrete execution instance (run) 
156         # of an experiment.
157         # Since a same experiment (same configuration) can be executed many 
158         # times, this run_id permits to separate result files generated on 
159         # different experiment executions
160         self._run_id = tsformat()
161
162         # Experiment identifier. Usually assigned by the user
163         # Identifies the experiment scenario (i.e. configuration, 
164         # resources used, etc)
165         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
166
167         # generator of globally unique ids
168         self._guid_generator = guid.GuidGenerator()
169         
170         # Resource managers
171         self._resources = dict()
172
173         # Scheduler. It a queue that holds tasks scheduled for
174         # execution, and yields the next task to be executed 
175         # ordered by execution and arrival time
176         self._scheduler = HeapScheduler()
177
178         # Tasks
179         self._tasks = dict()
180
181         # RM groups (for deployment) 
182         self._groups = dict()
183
184         # generator of globally unique id for groups
185         self._group_id_generator = guid.GuidGenerator()
186
187         # Flag to stop processing thread
188         self._stop = False
189     
190         # Entity in charge of managing system failures
191         self._fm = FailureManager(self)
192
193         # EC state
194         self._state = ECState.RUNNING
195
196         # The runner is a pool of threads used to parallelize 
197         # execution of tasks
198         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
199         self._runner = ParallelRun(maxthreads = nthreads)
200
201         # Event processing thread
202         self._cond = threading.Condition()
203         self._thread = threading.Thread(target = self._process)
204         self._thread.setDaemon(True)
205         self._thread.start()
206
207     @property
208     def logger(self):
209         """ Return the logger of the Experiment Controller
210
211         """
212         return self._logger
213
214     @property
215     def ecstate(self):
216         """ Return the state of the Experiment Controller
217
218         """
219         return self._state
220
221     @property
222     def exp_id(self):
223         """ Return the experiment id assigned by the user
224
225         """
226         return self._exp_id
227
228     @property
229     def run_id(self):
230         """ Return the experiment instance (run) identifier  
231
232         """
233         return self._run_id
234
235     @property
236     def abort(self):
237         return self._fm.abort
238
239     def wait_finished(self, guids):
240         """ Blocking method that wait until all RMs in the 'guid' list 
241             reach a state >= STOPPED (i.e. STOPPED, FAILED or 
242             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
243
244         :param guids: List of guids
245         :type guids: list
246
247         """
248
249         def quit():
250             return self.abort
251
252         return self.wait(guids, state = ResourceState.STOPPED, 
253                 quit = quit)
254
255     def wait_started(self, guids):
256         """ Blocking method that wait until all RMs in the 'guid' list 
257             reach a state >= STARTED or until a System Failure occurs 
258             (e.g. Task Failure) 
259
260         :param guids: List of guids
261         :type guids: list
262         """
263
264         def quit():
265             return self.abort
266
267         return self.wait(guids, state = ResourceState.STARTED, 
268                 quit = quit)
269
270     def wait_released(self, guids):
271         """ Blocking method that wait until all RMs in the 'guid' list 
272             reach a state = RELEASED or until the EC fails
273
274         :param guids: List of guids
275         :type guids: list
276         """
277
278         def quit():
279             return self._state == ECState.FAILED
280
281         return self.wait(guids, state = ResourceState.RELEASED, 
282                 quit = quit)
283
284     def wait_deployed(self, guids):
285         """ Blocking method that wait until all RMs in the 'guid' list 
286             reach a state >= READY or until a System Failure occurs 
287             (e.g. Task Failure) 
288
289         :param guids: List of guids
290         :type guids: list
291         """
292
293         def quit():
294             return self.abort
295
296         return self.wait(guids, state = ResourceState.READY, 
297                 quit = quit)
298
299     def wait(self, guids, state, quit):
300         """ Blocking method that wait until all RMs in the 'guid' list 
301             reach a state >= 'state' or until quit yileds True
302            
303         :param guids: List of guids
304         :type guids: list
305         """
306         if isinstance(guids, int):
307             guids = [guids]
308
309         # Make a copy to avoid modifying the original guids list
310         guids = list(guids)
311
312         while True:
313             # If there are no more guids to wait for
314             # or the quit function returns True, exit the loop
315             if len(guids) == 0 or quit():
316                 break
317
318             # If a guid reached one of the target states, remove it from list
319             guid = guids[0]
320             rstate = self.state(guid)
321             
322             hrrstate = ResourceState2str.get(rstate)
323             hrstate = ResourceState2str.get(state)
324
325             if rstate >= state:
326                 guids.remove(guid)
327                 rm = self.get_resource(guid)
328                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
329                     rm.get_rtype(), guid, hrrstate, hrstate))
330             else:
331                 # Debug...
332                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
333                     guid, hrrstate, hrstate))
334                 time.sleep(0.5)
335   
336     def get_task(self, tid):
337         """ Get a specific task
338
339         :param tid: Id of the task
340         :type tid: int
341         :rtype: Task
342         """
343         return self._tasks.get(tid)
344
345     def get_resource(self, guid):
346         """ Get a specific Resource Manager
347
348         :param guid: Id of the task
349         :type guid: int
350         :rtype: ResourceManager
351         """
352         return self._resources.get(guid)
353
354     @property
355     def resources(self):
356         """ Returns the list of all the Resource Manager Id
357
358         :rtype: set
359
360         """
361         return self._resources.keys()
362
363     def register_resource(self, rtype, guid = None):
364         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
365         for the RM of type 'rtype' and add it to the list of Resources.
366
367         :param rtype: Type of the RM
368         :type rtype: str
369         :return: Id of the RM
370         :rtype: int
371         """
372         # Get next available guid
373         guid = self._guid_generator.next(guid)
374         
375         # Instantiate RM
376         rm = ResourceFactory.create(rtype, self, guid)
377
378         # Store RM
379         self._resources[guid] = rm
380
381         return guid
382
383     def get_attributes(self, guid):
384         """ Return all the attibutes of a specific RM
385
386         :param guid: Guid of the RM
387         :type guid: int
388         :return: List of attributes
389         :rtype: list
390         """
391         rm = self.get_resource(guid)
392         return rm.get_attributes()
393
394     def register_connection(self, guid1, guid2):
395         """ Registers a guid1 with a guid2. 
396             The declaration order is not important
397
398             :param guid1: First guid to connect
399             :type guid1: ResourceManager
400
401             :param guid2: Second guid to connect
402             :type guid: ResourceManager
403         """
404         rm1 = self.get_resource(guid1)
405         rm2 = self.get_resource(guid2)
406
407         rm1.register_connection(guid2)
408         rm2.register_connection(guid1)
409
410     def register_condition(self, guids1, action, guids2, state,
411             time = None):
412         """ Registers an action START or STOP for all RM on guids1 to occur 
413             time 'time' after all elements in guids2 reached state 'state'.
414
415             :param guids1: List of guids of RMs subjected to action
416             :type guids1: list
417
418             :param action: Action to register (either START or STOP)
419             :type action: ResourceAction
420
421             :param guids2: List of guids of RMs to we waited for
422             :type guids2: list
423
424             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
425             :type state: ResourceState
426
427             :param time: Time to wait after guids2 has reached status 
428             :type time: string
429
430         """
431         if isinstance(guids1, int):
432             guids1 = [guids1]
433         if isinstance(guids2, int):
434             guids2 = [guids2]
435
436         for guid1 in guids1:
437             rm = self.get_resource(guid1)
438             rm.register_condition(action, guids2, state, time)
439
440     def enable_trace(self, guid, name):
441         """ Enable trace
442
443         :param name: Name of the trace
444         :type name: str
445         """
446         rm = self.get_resource(guid)
447         rm.enable_trace(name)
448
449     def trace_enabled(self, guid, name):
450         """ Returns True if trace is enabled
451
452         :param name: Name of the trace
453         :type name: str
454         """
455         rm = self.get_resource(guid)
456         return rm.trace_enabled(name)
457
458     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
459         """ Get information on collected trace
460
461         :param name: Name of the trace
462         :type name: str
463
464         :param attr: Can be one of:
465                          - TraceAttr.ALL (complete trace content), 
466                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
467                          - TraceAttr.PATH (full path to the trace file),
468                          - TraceAttr.SIZE (size of trace file). 
469         :type attr: str
470
471         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
472         :type name: int
473
474         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
475         :type name: int
476
477         :rtype: str
478         """
479         rm = self.get_resource(guid)
480         return rm.trace(name, attr, block, offset)
481
482     def discover(self, guid):
483         """ Discover a specific RM defined by its 'guid'
484
485             :param guid: Guid of the RM
486             :type guid: int
487
488         """
489         rm = self.get_resource(guid)
490         return rm.discover()
491
492     def provision(self, guid):
493         """ Provision a specific RM defined by its 'guid'
494
495             :param guid: Guid of the RM
496             :type guid: int
497
498         """
499         rm = self.get_resource(guid)
500         return rm.provision()
501
502     def get(self, guid, name):
503         """ Get a specific attribute 'name' from the RM 'guid'
504
505             :param guid: Guid of the RM
506             :type guid: int
507
508             :param name: attribute's name
509             :type name: str
510
511         """
512         rm = self.get_resource(guid)
513         return rm.get(name)
514
515     def set(self, guid, name, value):
516         """ Set a specific attribute 'name' from the RM 'guid' 
517             with the value 'value' 
518
519             :param guid: Guid of the RM
520             :type guid: int
521
522             :param name: attribute's name
523             :type name: str
524
525             :param value: attribute's value
526
527         """
528         rm = self.get_resource(guid)
529         return rm.set(name, value)
530
531     def state(self, guid, hr = False):
532         """ Returns the state of a resource
533
534             :param guid: Resource guid
535             :type guid: integer
536
537             :param hr: Human readable. Forces return of a 
538                 status string instead of a number 
539             :type hr: boolean
540
541         """
542         rm = self.get_resource(guid)
543         state = rm.state
544
545         if hr:
546             return ResourceState2str.get(state)
547
548         return state
549
550     def stop(self, guid):
551         """ Stop a specific RM defined by its 'guid'
552
553             :param guid: Guid of the RM
554             :type guid: int
555
556         """
557         rm = self.get_resource(guid)
558         return rm.stop()
559
560     def start(self, guid):
561         """ Start a specific RM defined by its 'guid'
562
563             :param guid: Guid of the RM
564             :type guid: int
565
566         """
567         rm = self.get_resource(guid)
568         return rm.start()
569
570     def set_with_conditions(self, name, value, guids1, guids2, state,
571             time = None):
572         """ Set value 'value' on attribute with name 'name' on all RMs of
573             guids1 when 'time' has elapsed since all elements in guids2 
574             have reached state 'state'.
575
576             :param name: Name of attribute to set in RM
577             :type name: string
578
579             :param value: Value of attribute to set in RM
580             :type name: string
581
582             :param guids1: List of guids of RMs subjected to action
583             :type guids1: list
584
585             :param action: Action to register (either START or STOP)
586             :type action: ResourceAction
587
588             :param guids2: List of guids of RMs to we waited for
589             :type guids2: list
590
591             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
592             :type state: ResourceState
593
594             :param time: Time to wait after guids2 has reached status 
595             :type time: string
596
597         """
598         if isinstance(guids1, int):
599             guids1 = [guids1]
600         if isinstance(guids2, int):
601             guids2 = [guids2]
602
603         for guid1 in guids1:
604             rm = self.get_resource(guid)
605             rm.set_with_conditions(name, value, guids2, state, time)
606
607     def deploy(self, guids = None, wait_all_ready = True, group = None):
608         """ Deploy all resource manager in guids list
609
610         :param guids: List of guids of RMs to deploy
611         :type guids: list
612
613         :param wait_all_ready: Wait until all RMs are ready in
614             order to start the RMs
615         :type guid: int
616
617         :param group: Id of deployment group in which to deploy RMs
618         :type group: int
619
620         """
621         self.logger.debug(" ------- DEPLOY START ------ ")
622
623         if not guids:
624             # If no guids list was passed, all 'NEW' RMs will be deployed
625             guids = []
626             for guid in self.resources:
627                 if self.state(guid) == ResourceState.NEW:
628                     guids.append(guid)
629                 
630         if isinstance(guids, int):
631             guids = [guids]
632
633         # Create deployment group
634         # New guids can be added to a same deployment group later on
635         new_group = False
636         if not group:
637             new_group = True
638             group = self._group_id_generator.next()
639
640         if group not in self._groups:
641             self._groups[group] = []
642
643         self._groups[group].extend(guids)
644
645         def wait_all_and_start(group):
646             # Function that checks if all resources are READY
647             # before scheduling a start_with_conditions for each RM
648             reschedule = False
649             
650             # Get all guids in group
651             guids = self._groups[group]
652
653             for guid in guids:
654                 if self.state(guid) < ResourceState.READY:
655                     reschedule = True
656                     break
657
658             if reschedule:
659
660                 callback = functools.partial(wait_all_and_start, group)
661                 self.schedule("1s", callback)
662             else:
663                 # If all resources are ready, we schedule the start
664                 for guid in guids:
665                     rm = self.get_resource(guid)
666                     self.schedule("0s", rm.start_with_conditions)
667
668         if wait_all_ready and new_group:
669             # Schedule a function to check that all resources are
670             # READY, and only then schedule the start.
671             # This aims at reducing the number of tasks looping in the 
672             # scheduler. 
673             # Instead of having many start tasks, we will have only one for 
674             # the whole group.
675             callback = functools.partial(wait_all_and_start, group)
676             self.schedule("0s", callback)
677
678         for guid in guids:
679             rm = self.get_resource(guid)
680             rm.deployment_group = group
681             self.schedule("0s", rm.deploy_with_conditions)
682
683             if not wait_all_ready:
684                 self.schedule("0s", rm.start_with_conditions)
685
686             if rm.conditions.get(ResourceAction.STOP):
687                 # Only if the RM has STOP conditions we
688                 # schedule a stop. Otherwise the RM will stop immediately
689                 self.schedule("0s", rm.stop_with_conditions)
690
691     def release(self, guids = None):
692         """ Release al RMs on the guids list or 
693         all the resources if no list is specified
694
695             :param guids: List of RM guids
696             :type guids: list
697
698         """
699         if not guids:
700             guids = self.resources
701
702         # Remove all pending tasks from the scheduler queue
703         for tid in list(self._scheduler.pending):
704             self._scheduler.remove(tid)
705
706         self._runner.empty()
707
708         for guid in guids:
709             rm = self.get_resource(guid)
710             self.schedule("0s", rm.release)
711
712         self.wait_released(guids)
713         
714     def shutdown(self):
715         """ Shutdown the Experiment Controller. 
716         Releases all the resources and stops task processing thread
717
718         """
719         # If there was a major failure we can't exit gracefully
720         if self._state == ECState.FAILED:
721             raise RuntimeError("EC failure. Can not exit gracefully")
722
723         self.release()
724
725         # Mark the EC state as TERMINATED
726         self._state = ECState.TERMINATED
727
728         # Stop processing thread
729         self._stop = True
730
731         # Notify condition to wake up the processing thread
732         self._notify()
733         
734         if self._thread.is_alive():
735            self._thread.join()
736
737     def schedule(self, date, callback, track = False):
738         """ Schedule a callback to be executed at time date.
739
740             :param date: string containing execution time for the task.
741                     It can be expressed as an absolute time, using
742                     timestamp format, or as a relative time matching
743                     ^\d+.\d+(h|m|s|ms|us)$
744
745             :param callback: code to be executed for the task. Must be a
746                         Python function, and receives args and kwargs
747                         as arguments.
748
749             :param track: if set to True, the task will be retrivable with
750                     the get_task() method
751
752             :return : The Id of the task
753         """
754         timestamp = stabsformat(date)
755         task = Task(timestamp, callback)
756         task = self._scheduler.schedule(task)
757
758         if track:
759             self._tasks[task.id] = task
760
761         # Notify condition to wake up the processing thread
762         self._notify()
763
764         return task.id
765      
766     def _process(self):
767         """ Process scheduled tasks.
768
769         .. note::
770
771         The _process method is executed in an independent thread held by the 
772         ExperimentController for as long as the experiment is running.
773         
774         Tasks are scheduled by invoking the schedule method with a target callback. 
775         The schedule method is given a execution time which controls the
776         order in which tasks are processed. 
777
778         Tasks are processed in parallel using multithreading. 
779         The environmental variable NEPI_NTHREADS can be used to control
780         the number of threads used to process tasks. The default value is 50.
781
782         Exception handling:
783
784         To execute tasks in parallel, an ParallelRunner (PR) object, holding
785         a pool of threads (workers), is used.
786         For each available thread in the PR, the next task popped from 
787         the scheduler queue is 'put' in the PR.
788         Upon receiving a task to execute, each PR worker (thread) invokes the 
789         _execute method of the EC, passing the task as argument. 
790         This method, calls task.callback inside a try/except block. If an 
791         exception is raised by the tasks.callback, it will be trapped by the 
792         try block, logged to standard error (usually the console), and the EC 
793         state will be set to ECState.FAILED.
794         The invocation of _notify immediately after, forces the processing
795         loop in the _process method, to wake up if it was blocked waiting for new 
796         tasks to arrived, and to check the EC state.
797         As the EC is in FAILED state, the processing loop exits and the 
798         'finally' block is invoked. In the 'finally' block, the 'sync' method
799         of the PR is invoked, which forces the PR to raise any unchecked errors
800         that might have been raised by the workers.
801
802         """
803
804         self._runner.start()
805
806         while not self._stop:
807             try:
808                 self._cond.acquire()
809
810                 task = self._scheduler.next()
811                 
812                 if not task:
813                     # No task to execute. Wait for a new task to be scheduled.
814                     self._cond.wait()
815                 else:
816                     # The task timestamp is in the future. Wait for timeout 
817                     # or until another task is scheduled.
818                     now = tnow()
819                     if now < task.timestamp:
820                         # Calculate timeout in seconds
821                         timeout = tdiffsec(task.timestamp, now)
822
823                         # Re-schedule task with the same timestamp
824                         self._scheduler.schedule(task)
825                         
826                         task = None
827
828                         # Wait timeout or until a new task awakes the condition
829                         self._cond.wait(timeout)
830                
831                 self._cond.release()
832
833                 if task:
834                     # Process tasks in parallel
835                     self._runner.put(self._execute, task)
836             except: 
837                 import traceback
838                 err = traceback.format_exc()
839                 self.logger.error("Error while processing tasks in the EC: %s" % err)
840
841                 # Set the EC to FAILED state 
842                 self._state = ECState.FAILED
843             
844                 # Set the FailureManager failure level to EC failure
845                 self._fm.set_ec_failure()
846
847         self.logger.debug("Exiting the task processing loop ... ")
848         
849         self._runner.sync()
850         self._runner.destroy()
851
852     def _execute(self, task):
853         """ Executes a single task. 
854
855             :param task: Object containing the callback to execute
856             :type task: Task
857
858         .. note::
859
860         If the invokation of the task callback raises an
861         exception, the processing thread of the ExperimentController
862         will be stopped and the experiment will be aborted.
863
864         """
865         # Invoke callback
866         task.status = TaskStatus.DONE
867
868         try:
869             task.result = task.callback()
870         except:
871             import traceback
872             err = traceback.format_exc()
873             task.result = err
874             task.status = TaskStatus.ERROR
875             
876             self.logger.error("Error occurred while executing task: %s" % err)
877
878     def _notify(self):
879         """ Awakes the processing thread in case it is blocked waiting
880         for a new task to be scheduled.
881         """
882         self._cond.acquire()
883         self._cond.notify()
884         self._cond.release()
885