790a3381284990599d2022b9ccb02976a6755f4e
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38
39 class FailurePolicy(object):
40     """ Defines how to respond to experiment failures  
41     """
42     IGNORE_RM_FAILURE = 1
43     ABORT_ON_RM_FAILURE = 2
44
45 class FailureLevel(object):
46     """ Describe the system failure state
47     """
48     OK = 1
49     RM_FAILURE = 2
50     TASK_FAILURE = 3
51     EC_FAILURE = 4
52
53 class FailureManager(object):
54     """ The FailureManager is responsible for handling errors,
55     and deciding whether an experiment should be aborted
56     """
57
58     def __init__(self, failure_policy = None):
59         self._failure_level = FailureLevel.OK
60         self._failure_policy = failure_policy or \
61                 FailurePolicy.ABORT_ON_RM_FAILURE
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.EC_FAILURE:
66             return True
67
68         if self._failure_level in [FailureLevel.TASK_FAILURE, 
69                 FailureLevel.RM_FAILURE] and \
70                         self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
71             return True
72
73         return False
74
75     def set_rm_failure(self):
76         self._failure_level = FailureLevel.RM_FAILURE
77
78     def set_task_failure(self):
79         self._failure_level = FailureLevel.TASK_FAILURE
80
81     def set_ec_failure(self):
82         self._failure_level = FailureLevel.EC_FAILURE
83
84 class ECState(object):
85     """ State of the Experiment Controller
86    
87     """
88     RUNNING = 1
89     FAILED = 2
90     TERMINATED = 3
91
92 class ExperimentController(object):
93     """
94     .. class:: Class Args :
95       
96         :param exp_id: Human readable identifier for the experiment scenario. 
97         :type exp_id: str
98
99     .. note::
100
101         An experiment, or scenario, is defined by a concrete set of resources,
102         behavior, configuration and interconnection of those resources. 
103         The Experiment Description (ED) is a detailed representation of a
104         single experiment. It contains all the necessary information to 
105         allow repeating the experiment. NEPI allows to describe
106         experiments by registering components (resources), configuring them
107         and interconnecting them.
108         
109         A same experiment (scenario) can be executed many times, generating 
110         different results. We call an experiment execution (instance) a 'run'.
111
112         The ExperimentController (EC), is the entity responsible of
113         managing an experiment run. The same scenario can be 
114         recreated (and re-run) by instantiating an EC and recreating 
115         the same experiment description. 
116
117         In NEPI, an experiment is represented as a graph of interconnected
118         resources. A resource is a generic concept in the sense that any
119         component taking part of an experiment, whether physical of
120         virtual, is considered a resource. A resources could be a host, 
121         a virtual machine, an application, a simulator, a IP address.
122
123         A ResourceManager (RM), is the entity responsible for managing a 
124         single resource. ResourceManagers are specific to a resource
125         type (i.e. An RM to control a Linux application will not be
126         the same as the RM used to control a ns-3 simulation).
127         To support a new type of resource in NEPI, a new RM must be 
128         implemented. NEPI already provides a variety of
129         RMs to control basic resources, and new can be extended from
130         the existing ones.
131
132         Through the EC interface the user can create ResourceManagers (RMs),
133         configure them and interconnect them, to describe an experiment.
134         Describing an experiment through the EC does not run the experiment.
135         Only when the 'deploy()' method is invoked on the EC, the EC will take 
136         actions to transform the 'described' experiment into a 'running' experiment.
137
138         While the experiment is running, it is possible to continue to
139         create/configure/connect RMs, and to deploy them to involve new
140         resources in the experiment (this is known as 'interactive' deployment).
141         
142         An experiments in NEPI is identified by a string id, 
143         which is either given by the user, or automatically generated by NEPI.  
144         The purpose of this identifier is to separate files and results that 
145         belong to different experiment scenarios. 
146         However, since a same 'experiment' can be run many times, the experiment
147         id is not enough to identify an experiment instance (run).
148         For this reason, the ExperimentController has two identifier, the 
149         exp_id, which can be re-used in different ExperimentController,
150         and the run_id, which is unique to one ExperimentController instance, and
151         is automatically generated by NEPI.
152         
153     """
154
155     def __init__(self, exp_id = None): 
156         super(ExperimentController, self).__init__()
157         # Logging
158         self._logger = logging.getLogger("ExperimentController")
159
160         # Run identifier. It identifies a concrete execution instance (run) 
161         # of an experiment.
162         # Since a same experiment (same configuration) can be executed many 
163         # times, this run_id permits to separate result files generated on 
164         # different experiment executions
165         self._run_id = tsformat()
166
167         # Experiment identifier. Usually assigned by the user
168         # Identifies the experiment scenario (i.e. configuration, 
169         # resources used, etc)
170         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
171
172         # generator of globally unique ids
173         self._guid_generator = guid.GuidGenerator()
174         
175         # Resource managers
176         self._resources = dict()
177
178         # Scheduler
179         self._scheduler = HeapScheduler()
180
181         # Tasks
182         self._tasks = dict()
183
184         # RM groups (for deployment) 
185         self._groups = dict()
186
187         # generator of globally unique id for groups
188         self._group_id_generator = guid.GuidGenerator()
189  
190         # Event processing thread
191         self._cond = threading.Condition()
192         self._thread = threading.Thread(target = self._process)
193         self._thread.setDaemon(True)
194         self._thread.start()
195
196         # Flag to stop processing thread
197         self._stop = False
198     
199         # Entity in charge of managing system failures
200         self._fm = FailureManager()
201
202         # EC state
203         self._state = ECState.RUNNING
204
205     @property
206     def logger(self):
207         """ Return the logger of the Experiment Controller
208
209         """
210         return self._logger
211
212     @property
213     def ecstate(self):
214         """ Return the state of the Experiment Controller
215
216         """
217         return self._state
218
219     @property
220     def exp_id(self):
221         """ Return the experiment id assigned by the user
222
223         """
224         return self._exp_id
225
226     @property
227     def run_id(self):
228         """ Return the experiment instance (run) identifier  
229
230         """
231         return self._run_id
232
233     @property
234     def abort(self):
235         return self._fm.abort
236
237     def set_rm_failure(self):
238         self._fm.set_rm_failure()
239
240     def wait_finished(self, guids):
241         """ Blocking method that wait until all RMs in the 'guid' list 
242             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
243             RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
244
245         :param guids: List of guids
246         :type guids: list
247
248         """
249
250         def quit():
251             return self.abort
252
253         return self.wait(guids, state = ResourceState.STOPPED, 
254                 quit = quit)
255
256     def wait_started(self, guids):
257         """ Blocking method that wait until all RMs in the 'guid' list 
258             reach a state >= STARTED or until a System Failure occurs 
259             (e.g. Task Failure) 
260
261         :param guids: List of guids
262         :type guids: list
263         """
264
265         def quit():
266             return self.abort
267
268         return self.wait(guids, state = ResourceState.STARTED, 
269                 quit = quit)
270
271     def wait_released(self, guids):
272         """ Blocking method that wait until all RMs in the 'guid' list 
273             reach a state = RELEASED or until the EC fails
274
275         :param guids: List of guids
276         :type guids: list
277         """
278
279         def quit():
280             return self._state == ECState.FAILED
281
282         return self.wait(guids, state = ResourceState.RELEASED, 
283                 quit = quit)
284
285     def wait_deployed(self, guids):
286         """ Blocking method that wait until all RMs in the 'guid' list 
287             reach a state >= READY or until a System Failure occurs 
288             (e.g. Task Failure) 
289
290         :param guids: List of guids
291         :type guids: list
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.READY, 
298                 quit = quit)
299
300     def wait(self, guids, state, quit):
301         """ Blocking method that wait until all RMs in the 'guid' list 
302             reach a state >= 'state' or until quit yileds True
303            
304         :param guids: List of guids
305         :type guids: list
306         """
307         if isinstance(guids, int):
308             guids = [guids]
309
310         while True:
311             # If there are no more guids to wait for
312             # or the quit function returns True, exit the loop
313             if len(guids) == 0 or quit():
314                 break
315
316             # If a guid reached one of the target states, remove it from list
317             guid = guids[0]
318             rstate = self.state(guid)
319
320             if rstate >= state:
321                 guids.remove(guid)
322             else:
323                 # Debug...
324                 hrrstate = ResourceState2str.get(rstate)
325                 hrstate = ResourceState2str.get(state)
326                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
327                     guid, hrrstate, hrstate))
328
329             time.sleep(0.5)
330   
331     def get_task(self, tid):
332         """ Get a specific task
333
334         :param tid: Id of the task
335         :type tid: int
336         :rtype: Task
337         """
338         return self._tasks.get(tid)
339
340     def get_resource(self, guid):
341         """ Get a specific Resource Manager
342
343         :param guid: Id of the task
344         :type guid: int
345         :rtype: ResourceManager
346         """
347         return self._resources.get(guid)
348
349     @property
350     def resources(self):
351         """ Returns the list of all the Resource Manager Id
352
353         :rtype: set
354
355         """
356         return self._resources.keys()
357
358     def register_resource(self, rtype, guid = None):
359         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
360         for the RM of type 'rtype' and add it to the list of Resources.
361
362         :param rtype: Type of the RM
363         :type rtype: str
364         :return: Id of the RM
365         :rtype: int
366         """
367         # Get next available guid
368         guid = self._guid_generator.next(guid)
369         
370         # Instantiate RM
371         rm = ResourceFactory.create(rtype, self, guid)
372
373         # Store RM
374         self._resources[guid] = rm
375
376         return guid
377
378     def get_attributes(self, guid):
379         """ Return all the attibutes of a specific RM
380
381         :param guid: Guid of the RM
382         :type guid: int
383         :return: List of attributes
384         :rtype: list
385         """
386         rm = self.get_resource(guid)
387         return rm.get_attributes()
388
389     def register_connection(self, guid1, guid2):
390         """ Registers a guid1 with a guid2. 
391             The declaration order is not important
392
393             :param guid1: First guid to connect
394             :type guid1: ResourceManager
395
396             :param guid2: Second guid to connect
397             :type guid: ResourceManager
398         """
399         rm1 = self.get_resource(guid1)
400         rm2 = self.get_resource(guid2)
401
402         rm1.register_connection(guid2)
403         rm2.register_connection(guid1)
404
405     def register_condition(self, guids1, action, guids2, state,
406             time = None):
407         """ Registers an action START or STOP for all RM on guids1 to occur 
408             time 'time' after all elements in guids2 reached state 'state'.
409
410             :param guids1: List of guids of RMs subjected to action
411             :type guids1: list
412
413             :param action: Action to register (either START or STOP)
414             :type action: ResourceAction
415
416             :param guids2: List of guids of RMs to we waited for
417             :type guids2: list
418
419             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
420             :type state: ResourceState
421
422             :param time: Time to wait after guids2 has reached status 
423             :type time: string
424
425         """
426         if isinstance(guids1, int):
427             guids1 = [guids1]
428         if isinstance(guids2, int):
429             guids2 = [guids2]
430
431         for guid1 in guids1:
432             rm = self.get_resource(guid1)
433             rm.register_condition(action, guids2, state, time)
434
435     def enable_trace(self, guid, name):
436         """ Enable trace
437
438         :param name: Name of the trace
439         :type name: str
440         """
441         rm = self.get_resource(guid)
442         rm.enable_trace(name)
443
444     def trace_enabled(self, guid, name):
445         """ Returns True if trace is enabled
446
447         :param name: Name of the trace
448         :type name: str
449         """
450         rm = self.get_resource(guid)
451         return rm.trace_enabled(name)
452
453     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
454         """ Get information on collected trace
455
456         :param name: Name of the trace
457         :type name: str
458
459         :param attr: Can be one of:
460                          - TraceAttr.ALL (complete trace content), 
461                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
462                          - TraceAttr.PATH (full path to the trace file),
463                          - TraceAttr.SIZE (size of trace file). 
464         :type attr: str
465
466         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
467         :type name: int
468
469         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
470         :type name: int
471
472         :rtype: str
473         """
474         rm = self.get_resource(guid)
475         return rm.trace(name, attr, block, offset)
476
477     def discover(self, guid):
478         """ Discover a specific RM defined by its 'guid'
479
480             :param guid: Guid of the RM
481             :type guid: int
482
483         """
484         rm = self.get_resource(guid)
485         return rm.discover()
486
487     def provision(self, guid):
488         """ Provision a specific RM defined by its 'guid'
489
490             :param guid: Guid of the RM
491             :type guid: int
492
493         """
494         rm = self.get_resource(guid)
495         return rm.provision()
496
497     def get(self, guid, name):
498         """ Get a specific attribute 'name' from the RM 'guid'
499
500             :param guid: Guid of the RM
501             :type guid: int
502
503             :param name: attribute's name
504             :type name: str
505
506         """
507         rm = self.get_resource(guid)
508         return rm.get(name)
509
510     def set(self, guid, name, value):
511         """ Set a specific attribute 'name' from the RM 'guid' 
512             with the value 'value' 
513
514             :param guid: Guid of the RM
515             :type guid: int
516
517             :param name: attribute's name
518             :type name: str
519
520             :param value: attribute's value
521
522         """
523         rm = self.get_resource(guid)
524         return rm.set(name, value)
525
526     def state(self, guid, hr = False):
527         """ Returns the state of a resource
528
529             :param guid: Resource guid
530             :type guid: integer
531
532             :param hr: Human readable. Forces return of a 
533                 status string instead of a number 
534             :type hr: boolean
535
536         """
537         rm = self.get_resource(guid)
538         state = rm.state
539
540         if hr:
541             return ResourceState2str.get(state)
542
543         return state
544
545     def stop(self, guid):
546         """ Stop a specific RM defined by its 'guid'
547
548             :param guid: Guid of the RM
549             :type guid: int
550
551         """
552         rm = self.get_resource(guid)
553         return rm.stop()
554
555     def start(self, guid):
556         """ Start a specific RM defined by its 'guid'
557
558             :param guid: Guid of the RM
559             :type guid: int
560
561         """
562         rm = self.get_resource(guid)
563         return rm.start()
564
565     def set_with_conditions(self, name, value, guids1, guids2, state,
566             time = None):
567         """ Set value 'value' on attribute with name 'name' on all RMs of
568             guids1 when 'time' has elapsed since all elements in guids2 
569             have reached state 'state'.
570
571             :param name: Name of attribute to set in RM
572             :type name: string
573
574             :param value: Value of attribute to set in RM
575             :type name: string
576
577             :param guids1: List of guids of RMs subjected to action
578             :type guids1: list
579
580             :param action: Action to register (either START or STOP)
581             :type action: ResourceAction
582
583             :param guids2: List of guids of RMs to we waited for
584             :type guids2: list
585
586             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
587             :type state: ResourceState
588
589             :param time: Time to wait after guids2 has reached status 
590             :type time: string
591
592         """
593         if isinstance(guids1, int):
594             guids1 = [guids1]
595         if isinstance(guids2, int):
596             guids2 = [guids2]
597
598         for guid1 in guids1:
599             rm = self.get_resource(guid)
600             rm.set_with_conditions(name, value, guids2, state, time)
601
602     def deploy(self, guids = None, wait_all_ready = True, group = None):
603         """ Deploy all resource manager in guids list
604
605         :param guids: List of guids of RMs to deploy
606         :type guids: list
607
608         :param wait_all_ready: Wait until all RMs are ready in
609             order to start the RMs
610         :type guid: int
611
612         :param group: Id of deployment group in which to deploy RMs
613         :type group: int
614
615         """
616         self.logger.debug(" ------- DEPLOY START ------ ")
617
618         if not guids:
619             # If no guids list was passed, all 'NEW' RMs will be deployed
620             guids = []
621             for guid in self.resources:
622                 if self.state(guid) == ResourceState.NEW:
623                     guids.append(guid)
624                 
625         if isinstance(guids, int):
626             guids = [guids]
627
628         # Create deployment group
629         # New guids can be added to a same deployment group later on
630         new_group = False
631         if not group:
632             new_group = True
633             group = self._group_id_generator.next()
634
635         if group not in self._groups:
636             self._groups[group] = []
637
638         self._groups[group].extend(guids)
639
640         def wait_all_and_start(group):
641             # Function that checks if all resources are READY
642             # before scheduling a start_with_conditions for each RM
643             reschedule = False
644             
645             # Get all guids in group
646             guids = self._groups[group]
647
648             for guid in guids:
649                 if self.state(guid) < ResourceState.READY:
650                     reschedule = True
651                     break
652
653             if reschedule:
654                 callback = functools.partial(wait_all_and_start, group)
655                 self.schedule("1s", callback)
656             else:
657                 # If all resources are ready, we schedule the start
658                 for guid in guids:
659                     rm = self.get_resource(guid)
660                     self.schedule("0s", rm.start_with_conditions)
661
662         if wait_all_ready and new_group:
663             # Schedule a function to check that all resources are
664             # READY, and only then schedule the start.
665             # This aims at reducing the number of tasks looping in the 
666             # scheduler. 
667             # Instead of having many start tasks, we will have only one for 
668             # the whole group.
669             callback = functools.partial(wait_all_and_start, group)
670             self.schedule("0s", callback)
671
672         for guid in guids:
673             rm = self.get_resource(guid)
674             rm.deployment_group = group
675             self.schedule("0s", rm.deploy_with_conditions)
676
677             if not wait_all_ready:
678                 self.schedule("0s", rm.start_with_conditions)
679
680             if rm.conditions.get(ResourceAction.STOP):
681                 # Only if the RM has STOP conditions we
682                 # schedule a stop. Otherwise the RM will stop immediately
683                 self.schedule("0s", rm.stop_with_conditions)
684
685     def release(self, guids = None):
686         """ Release al RMs on the guids list or 
687         all the resources if no list is specified
688
689             :param guids: List of RM guids
690             :type guids: list
691
692         """
693         if not guids:
694             guids = self.resources
695
696         # Remove all pending tasks from the scheduler queue
697         for tis in self._scheduler.pending:
698             self._scheduler.remove(tid)
699
700         for guid in guids:
701             rm = self.get_resource(guid)
702             self.schedule("0s", rm.release)
703
704         self.wait_released(guids)
705         
706     def shutdown(self):
707         """ Shutdown the Experiment Controller. 
708         Releases all the resources and stops task processing thread
709
710         """
711         self.release()
712
713         # Mark the EC state as TERMINATED
714         self._state = ECState.TERMINATED
715
716         # Stop processing thread
717         self._stop = True
718
719         # Notify condition to wake up the processing thread
720         self._notify()
721         
722         if self._thread.is_alive():
723            self._thread.join()
724
725     def schedule(self, date, callback, track = False):
726         """ Schedule a callback to be executed at time date.
727
728             :param date: string containing execution time for the task.
729                     It can be expressed as an absolute time, using
730                     timestamp format, or as a relative time matching
731                     ^\d+.\d+(h|m|s|ms|us)$
732
733             :param callback: code to be executed for the task. Must be a
734                         Python function, and receives args and kwargs
735                         as arguments.
736
737             :param track: if set to True, the task will be retrivable with
738                     the get_task() method
739
740             :return : The Id of the task
741         """
742         timestamp = stabsformat(date)
743         task = Task(timestamp, callback)
744         task = self._scheduler.schedule(task)
745
746         if track:
747             self._tasks[task.id] = task
748
749         # Notify condition to wake up the processing thread
750         self._notify()
751
752         return task.id
753      
754     def _process(self):
755         """ Process scheduled tasks.
756
757         .. note::
758
759         The _process method is executed in an independent thread held by the 
760         ExperimentController for as long as the experiment is running.
761         
762         Tasks are scheduled by invoking the schedule method with a target callback. 
763         The schedule method is given a execution time which controls the
764         order in which tasks are processed. 
765
766         Tasks are processed in parallel using multithreading. 
767         The environmental variable NEPI_NTHREADS can be used to control
768         the number of threads used to process tasks. The default value is 50.
769
770         Exception handling:
771
772         To execute tasks in parallel, an ParallelRunner (PR) object, holding
773         a pool of threads (workers), is used.
774         For each available thread in the PR, the next task popped from 
775         the scheduler queue is 'put' in the PR.
776         Upon receiving a task to execute, each PR worker (thread) invokes the 
777         _execute method of the EC, passing the task as argument. 
778         This method, calls task.callback inside a try/except block. If an 
779         exception is raised by the tasks.callback, it will be trapped by the 
780         try block, logged to standard error (usually the console), and the EC 
781         state will be set to ECState.FAILED.
782         The invocation of _notify immediately after, forces the processing
783         loop in the _process method, to wake up if it was blocked waiting for new 
784         tasks to arrived, and to check the EC state.
785         As the EC is in FAILED state, the processing loop exits and the 
786         'finally' block is invoked. In the 'finally' block, the 'sync' method
787         of the PR is invoked, which forces the PR to raise any unchecked errors
788         that might have been raised by the workers.
789
790         """
791         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
792
793         runner = ParallelRun(maxthreads = nthreads)
794         runner.start()
795
796         while not self._stop:
797             try:
798                 self._cond.acquire()
799
800                 task = self._scheduler.next()
801                 
802                 if not task:
803                     # No task to execute. Wait for a new task to be scheduled.
804                     self._cond.wait()
805                 else:
806                     # The task timestamp is in the future. Wait for timeout 
807                     # or until another task is scheduled.
808                     now = tnow()
809                     if now < task.timestamp:
810                         # Calculate timeout in seconds
811                         timeout = tdiffsec(task.timestamp, now)
812
813                         # Re-schedule task with the same timestamp
814                         self._scheduler.schedule(task)
815                         
816                         task = None
817
818                         # Wait timeout or until a new task awakes the condition
819                         self._cond.wait(timeout)
820                
821                 self._cond.release()
822
823                 if task:
824                     # Process tasks in parallel
825                     runner.put(self._execute, task)
826             except: 
827                 import traceback
828                 err = traceback.format_exc()
829                 self.logger.error("Error while processing tasks in the EC: %s" % err)
830
831                 # Set the EC to FAILED state 
832                 self._state = ECState.FAILED
833
834                 # Set the FailureManager failure level
835                 self._fm.set_ec_failure()
836
837         self.logger.debug("Exiting the task processing loop ... ")
838         runner.sync()
839         runner.destroy()
840
841     def _execute(self, task):
842         """ Executes a single task. 
843
844             :param task: Object containing the callback to execute
845             :type task: Task
846
847         .. note::
848
849         If the invokation of the task callback raises an
850         exception, the processing thread of the ExperimentController
851         will be stopped and the experiment will be aborted.
852
853         """
854         # Invoke callback
855         task.status = TaskStatus.DONE
856
857         try:
858             task.result = task.callback()
859         except:
860             import traceback
861             err = traceback.format_exc()
862             task.result = err
863             task.status = TaskStatus.ERROR
864             
865             self.logger.error("Error occurred while executing task: %s" % err)
866
867             # Set the FailureManager failure level
868             self._fm.set_task_failure()
869
870     def _notify(self):
871         """ Awakes the processing thread in case it is blocked waiting
872         for a new task to be scheduled.
873         """
874         self._cond.acquire()
875         self._cond.notify()
876         self._cond.release()
877