Changing total_seconds in timefuncs, fixing _execute in ec, adding unittest
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54
55     @property
56     def ec(self):
57         """ Returns the ExperimentController associated to this FailureManager 
58         
59         """
60         
61         return self._ec()
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.OK:
66             for guid in self.ec.resources:
67                 state = self.ec.state(guid)
68                 critical = self.ec.get(guid, "critical")
69                 if state == ResourceState.FAILED and critical:
70                     self._failure_level = FailureLevel.RM_FAILURE
71                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
72                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
73                     break
74
75         return self._failure_level != FailureLevel.OK
76
77     def set_ec_failure(self):
78         self._failure_level = FailureLevel.EC_FAILURE
79
80
81 class ECState(object):
82     """ Possible states for an ExperimentController
83    
84     """
85     RUNNING = 1
86     FAILED = 2
87     TERMINATED = 3
88
89 class ExperimentController(object):
90     """
91     .. class:: Class Args :
92       
93         :param exp_id: Human readable identifier for the experiment scenario. 
94         :type exp_id: str
95
96     .. note::
97
98     An experiment, or scenario, is defined by a concrete set of resources,
99     behavior, configuration and interconnection of those resources. 
100     The Experiment Description (ED) is a detailed representation of a
101     single experiment. It contains all the necessary information to 
102     allow repeating the experiment. NEPI allows to describe
103     experiments by registering components (resources), configuring them
104     and interconnecting them.
105     
106     A same experiment (scenario) can be executed many times, generating 
107     different results. We call an experiment execution (instance) a 'run'.
108
109     The ExperimentController (EC), is the entity responsible of
110     managing an experiment run. The same scenario can be 
111     recreated (and re-run) by instantiating an EC and recreating 
112     the same experiment description. 
113
114     In NEPI, an experiment is represented as a graph of interconnected
115     resources. A resource is a generic concept in the sense that any
116     component taking part of an experiment, whether physical of
117     virtual, is considered a resource. A resources could be a host, 
118     a virtual machine, an application, a simulator, a IP address.
119
120     A ResourceManager (RM), is the entity responsible for managing a 
121     single resource. ResourceManagers are specific to a resource
122     type (i.e. An RM to control a Linux application will not be
123     the same as the RM used to control a ns-3 simulation).
124     To support a new type of resource in NEPI, a new RM must be 
125     implemented. NEPI already provides a variety of
126     RMs to control basic resources, and new can be extended from
127     the existing ones.
128
129     Through the EC interface the user can create ResourceManagers (RMs),
130     configure them and interconnect them, to describe an experiment.
131     Describing an experiment through the EC does not run the experiment.
132     Only when the 'deploy()' method is invoked on the EC, the EC will take 
133     actions to transform the 'described' experiment into a 'running' experiment.
134
135     While the experiment is running, it is possible to continue to
136     create/configure/connect RMs, and to deploy them to involve new
137     resources in the experiment (this is known as 'interactive' deployment).
138     
139     An experiments in NEPI is identified by a string id, 
140     which is either given by the user, or automatically generated by NEPI.  
141     The purpose of this identifier is to separate files and results that 
142     belong to different experiment scenarios. 
143     However, since a same 'experiment' can be run many times, the experiment
144     id is not enough to identify an experiment instance (run).
145     For this reason, the ExperimentController has two identifier, the 
146     exp_id, which can be re-used in different ExperimentController,
147     and the run_id, which is unique to one ExperimentController instance, and
148     is automatically generated by NEPI.
149         
150     """
151
152     def __init__(self, exp_id = None): 
153         super(ExperimentController, self).__init__()
154
155         # Logging
156         self._logger = logging.getLogger("ExperimentController")
157
158         # Run identifier. It identifies a concrete execution instance (run) 
159         # of an experiment.
160         # Since a same experiment (same configuration) can be executed many 
161         # times, this run_id permits to separate result files generated on 
162         # different experiment executions
163         self._run_id = tsformat()
164
165         # Experiment identifier. Usually assigned by the user
166         # Identifies the experiment scenario (i.e. configuration, 
167         # resources used, etc)
168         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
169
170         # generator of globally unique ids
171         self._guid_generator = guid.GuidGenerator()
172         
173         # Resource managers
174         self._resources = dict()
175
176         # Scheduler. It a queue that holds tasks scheduled for
177         # execution, and yields the next task to be executed 
178         # ordered by execution and arrival time
179         self._scheduler = HeapScheduler()
180
181         # Tasks
182         self._tasks = dict()
183
184         # RM groups (for deployment) 
185         self._groups = dict()
186
187         # generator of globally unique id for groups
188         self._group_id_generator = guid.GuidGenerator()
189
190         # Flag to stop processing thread
191         self._stop = False
192     
193         # Entity in charge of managing system failures
194         self._fm = FailureManager(self)
195
196         # EC state
197         self._state = ECState.RUNNING
198
199         # The runner is a pool of threads used to parallelize 
200         # execution of tasks
201         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
202         self._runner = ParallelRun(maxthreads = nthreads)
203
204         # Event processing thread
205         self._cond = threading.Condition()
206         self._thread = threading.Thread(target = self._process)
207         self._thread.setDaemon(True)
208         self._thread.start()
209
210     @property
211     def logger(self):
212         """ Returns the logger instance of the Experiment Controller
213
214         """
215         return self._logger
216
217     @property
218     def ecstate(self):
219         """ Returns the state of the Experiment Controller
220
221         """
222         return self._state
223
224     @property
225     def exp_id(self):
226         """ Returns the experiment id assigned by the user
227
228         """
229         return self._exp_id
230
231     @property
232     def run_id(self):
233         """ Returns the experiment instance (run) identifier (automatically 
234         generated)
235
236         """
237         return self._run_id
238
239     @property
240     def abort(self):
241         """ Returns True if the experiment has failed and should be interrupted,
242         False otherwise.
243
244         """
245         return self._fm.abort
246
247     def wait_finished(self, guids):
248         """ Blocking method that waits until all RMs in the 'guids' list 
249         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
250         RELEASED ), or until a failure in the experiment occurs 
251         (i.e. abort == True) 
252         
253             :param guids: List of guids
254             :type guids: list
255
256         """
257
258         def quit():
259             return self.abort
260
261         return self.wait(guids, state = ResourceState.STOPPED, 
262                 quit = quit)
263
264     def wait_started(self, guids):
265         """ Blocking method that waits until all RMs in the 'guids' list 
266         have reached a state >= STARTED, or until a failure in the 
267         experiment occurs (i.e. abort == True) 
268         
269             :param guids: List of guids
270             :type guids: list
271
272         """
273
274         def quit():
275             return self.abort
276
277         return self.wait(guids, state = ResourceState.STARTED, 
278                 quit = quit)
279
280     def wait_released(self, guids):
281         """ Blocking method that waits until all RMs in the 'guids' list 
282         have reached a state == RELEASED, or until the EC fails 
283         
284             :param guids: List of guids
285             :type guids: list
286
287         """
288
289         def quit():
290             return self._state == ECState.FAILED
291
292         return self.wait(guids, state = ResourceState.RELEASED, 
293                 quit = quit)
294
295     def wait_deployed(self, guids):
296         """ Blocking method that waits until all RMs in the 'guids' list 
297         have reached a state >= READY, or until a failure in the 
298         experiment occurs (i.e. abort == True) 
299         
300             :param guids: List of guids
301             :type guids: list
302
303         """
304
305         def quit():
306             return self.abort
307
308         return self.wait(guids, state = ResourceState.READY, 
309                 quit = quit)
310
311     def wait(self, guids, state, quit):
312         """ Blocking method that waits until all RMs in the 'guids' list 
313         have reached a state >= 'state', or until the 'quit' callback
314         yields True
315            
316             :param guids: List of guids
317             :type guids: list
318         
319         """
320         
321         if isinstance(guids, int):
322             guids = [guids]
323
324         # Make a copy to avoid modifying the original guids list
325         guids = list(guids)
326
327         while True:
328             # If there are no more guids to wait for
329             # or the quit function returns True, exit the loop
330             if len(guids) == 0 or quit():
331                 break
332
333             # If a guid reached one of the target states, remove it from list
334             guid = guids[0]
335             rstate = self.state(guid)
336             
337             hrrstate = ResourceState2str.get(rstate)
338             hrstate = ResourceState2str.get(state)
339
340             if rstate >= state:
341                 guids.remove(guid)
342                 rm = self.get_resource(guid)
343                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
344                     rm.get_rtype(), guid, hrrstate, hrstate))
345             else:
346                 # Debug...
347                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
348                     guid, hrrstate, hrstate))
349                 time.sleep(0.5)
350   
351     def get_task(self, tid):
352         """ Returns a task by its id
353
354             :param tid: Id of the task
355             :type tid: int
356             
357             :rtype: Task
358             
359         """
360         return self._tasks.get(tid)
361
362     def get_resource(self, guid):
363         """ Returns a registered ResourceManager by its guid
364
365             :param guid: Id of the task
366             :type guid: int
367             
368             :rtype: ResourceManager
369             
370         """
371         return self._resources.get(guid)
372
373     @property
374     def resources(self):
375         """ Returns the set() of guids of all the ResourceManager
376
377             :return: Set of all RM guids
378             :rtype: set
379
380         """
381         return self._resources.keys()
382
383     def register_resource(self, rtype, guid = None):
384         """ Registers a new ResourceManager of type 'rtype' in the experiment
385         
386         This method will assign a new 'guid' for the RM, if no guid
387         is specified.
388
389             :param rtype: Type of the RM
390             :type rtype: str
391
392             :return: Guid of the RM
393             :rtype: int
394             
395         """
396         # Get next available guid
397         guid = self._guid_generator.next(guid)
398         
399         # Instantiate RM
400         rm = ResourceFactory.create(rtype, self, guid)
401
402         # Store RM
403         self._resources[guid] = rm
404
405         return guid
406
407     def get_attributes(self, guid):
408         """ Returns all the attributes of the RM with guid 'guid'
409
410             :param guid: Guid of the RM
411             :type guid: int
412
413             :return: List of attributes
414             :rtype: list
415
416         """
417         rm = self.get_resource(guid)
418         return rm.get_attributes()
419
420     def get_attribute(self, guid, name):
421         """ Returns the attribute 'name' of the RM with guid 'guid'
422
423             :param guid: Guid of the RM
424             :type guid: int
425
426             :param name: Name of the attribute
427             :type name: str
428
429             :return: The attribute with name 'name'
430             :rtype: Attribute
431
432         """
433         rm = self.get_resource(guid)
434         return rm.get_attribute(name)
435
436     def register_connection(self, guid1, guid2):
437         """ Registers a connection between a RM with guid 'guid1'
438         and another RM with guid 'guid2'. 
439     
440         The order of the in which the two guids are provided is not
441         important, since the connection relationship is symmetric.
442
443             :param guid1: First guid to connect
444             :type guid1: ResourceManager
445
446             :param guid2: Second guid to connect
447             :type guid: ResourceManager
448
449         """
450         rm1 = self.get_resource(guid1)
451         rm2 = self.get_resource(guid2)
452
453         rm1.register_connection(guid2)
454         rm2.register_connection(guid1)
455
456     def register_condition(self, guids1, action, guids2, state,
457             time = None):
458         """ Registers an action START, STOP or DEPLOY for all RM on list
459         guids1 to occur at time 'time' after all elements in list guids2 
460         have reached state 'state'.
461
462             :param guids1: List of guids of RMs subjected to action
463             :type guids1: list
464
465             :param action: Action to perform (either START, STOP or DEPLOY)
466             :type action: ResourceAction
467
468             :param guids2: List of guids of RMs to we waited for
469             :type guids2: list
470
471             :param state: State to wait for on RMs of list guids2 (STARTED,
472                 STOPPED, etc)
473             :type state: ResourceState
474
475             :param time: Time to wait after guids2 has reached status 
476             :type time: string
477
478         """
479         if isinstance(guids1, int):
480             guids1 = [guids1]
481         if isinstance(guids2, int):
482             guids2 = [guids2]
483
484         for guid1 in guids1:
485             rm = self.get_resource(guid1)
486             rm.register_condition(action, guids2, state, time)
487
488     def enable_trace(self, guid, name):
489         """ Enables a trace to be collected during the experiment run
490
491             :param name: Name of the trace
492             :type name: str
493
494         """
495         rm = self.get_resource(guid)
496         rm.enable_trace(name)
497
498     def trace_enabled(self, guid, name):
499         """ Returns True if the trace of name 'name' is enabled
500
501             :param name: Name of the trace
502             :type name: str
503
504         """
505         rm = self.get_resource(guid)
506         return rm.trace_enabled(name)
507
508     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
509         """ Returns information on a collected trace, the trace stream or 
510         blocks (chunks) of the trace stream
511
512             :param name: Name of the trace
513             :type name: str
514
515             :param attr: Can be one of:
516                          - TraceAttr.ALL (complete trace content), 
517                          - TraceAttr.STREAM (block in bytes to read starting 
518                                 at offset),
519                          - TraceAttr.PATH (full path to the trace file),
520                          - TraceAttr.SIZE (size of trace file). 
521             :type attr: str
522
523             :param block: Number of bytes to retrieve from trace, when attr is 
524                 TraceAttr.STREAM 
525             :type name: int
526
527             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
528             :type name: int
529
530             :rtype: str
531
532         """
533         rm = self.get_resource(guid)
534         return rm.trace(name, attr, block, offset)
535
536     def get_traces(self, guid):
537         """ Returns the list of the trace names of the RM with guid 'guid'
538
539             :param guid: Guid of the RM
540             :type guid: int
541
542             :return: List of trace names
543             :rtype: list
544
545         """
546         rm = self.get_resource(guid)
547         return rm.get_traces()
548
549
550     def discover(self, guid):
551         """ Discovers an available resource matching the criteria defined
552         by the RM with guid 'guid', and associates that resource to the RM
553
554         Not all RM types require (or are capable of) performing resource 
555         discovery. For the RM types which are not capable of doing so, 
556         invoking this method does not have any consequences. 
557
558             :param guid: Guid of the RM
559             :type guid: int
560
561         """
562         rm = self.get_resource(guid)
563         return rm.discover()
564
565     def provision(self, guid):
566         """ Provisions the resource associated to the RM with guid 'guid'.
567
568         Provisioning means making a resource 'accessible' to the user. 
569         Not all RM types require (or are capable of) performing resource 
570         provisioning. For the RM types which are not capable of doing so, 
571         invoking this method does not have any consequences. 
572
573             :param guid: Guid of the RM
574             :type guid: int
575
576         """
577         rm = self.get_resource(guid)
578         return rm.provision()
579
580     def get(self, guid, name):
581         """ Returns the value of the attribute with name 'name' on the
582         RM with guid 'guid'
583
584             :param guid: Guid of the RM
585             :type guid: int
586
587             :param name: Name of the attribute 
588             :type name: str
589
590             :return: The value of the attribute with name 'name'
591
592         """
593         rm = self.get_resource(guid)
594         return rm.get(name)
595
596     def set(self, guid, name, value):
597         """ Modifies the value of the attribute with name 'name' on the 
598         RM with guid 'guid'.
599
600             :param guid: Guid of the RM
601             :type guid: int
602
603             :param name: Name of the attribute
604             :type name: str
605
606             :param value: Value of the attribute
607
608         """
609         rm = self.get_resource(guid)
610         return rm.set(name, value)
611
612     def state(self, guid, hr = False):
613         """ Returns the state of a resource
614
615             :param guid: Resource guid
616             :type guid: integer
617
618             :param hr: Human readable. Forces return of a 
619                 status string instead of a number 
620             :type hr: boolean
621
622         """
623         rm = self.get_resource(guid)
624         state = rm.state
625
626         if hr:
627             return ResourceState2str.get(state)
628
629         return state
630
631     def stop(self, guid):
632         """ Stops the RM with guid 'guid'
633
634         Stopping a RM means that the resource it controls will
635         no longer take part of the experiment.
636
637             :param guid: Guid of the RM
638             :type guid: int
639
640         """
641         rm = self.get_resource(guid)
642         return rm.stop()
643
644     def start(self, guid):
645         """ Starts the RM with guid 'guid'
646
647         Starting a RM means that the resource it controls will
648         begin taking part of the experiment.
649
650             :param guid: Guid of the RM
651             :type guid: int
652
653         """
654         rm = self.get_resource(guid)
655         return rm.start()
656
657     def set_with_conditions(self, name, value, guids1, guids2, state,
658             time = None):
659         """ Modifies the value of attribute with name 'name' on all RMs 
660         on the guids1 list when time 'time' has elapsed since all 
661         elements in guids2 list have reached state 'state'.
662
663             :param name: Name of attribute to set in RM
664             :type name: string
665
666             :param value: Value of attribute to set in RM
667             :type name: string
668
669             :param guids1: List of guids of RMs subjected to action
670             :type guids1: list
671
672             :param action: Action to register (either START or STOP)
673             :type action: ResourceAction
674
675             :param guids2: List of guids of RMs to we waited for
676             :type guids2: list
677
678             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
679             :type state: ResourceState
680
681             :param time: Time to wait after guids2 has reached status 
682             :type time: string
683
684         """
685         if isinstance(guids1, int):
686             guids1 = [guids1]
687         if isinstance(guids2, int):
688             guids2 = [guids2]
689
690         for guid1 in guids1:
691             rm = self.get_resource(guid)
692             rm.set_with_conditions(name, value, guids2, state, time)
693
694     def deploy(self, guids = None, wait_all_ready = True, group = None):
695         """ Deploys all ResourceManagers in the guids list. 
696         
697         If the argument 'guids' is not given, all RMs with state NEW
698         are deployed.
699
700             :param guids: List of guids of RMs to deploy
701             :type guids: list
702
703             :param wait_all_ready: Wait until all RMs are ready in
704                 order to start the RMs
705             :type guid: int
706
707             :param group: Id of deployment group in which to deploy RMs
708             :type group: int
709
710         """
711         self.logger.debug(" ------- DEPLOY START ------ ")
712
713         if not guids:
714             # If no guids list was passed, all 'NEW' RMs will be deployed
715             guids = []
716             for guid in self.resources:
717                 if self.state(guid) == ResourceState.NEW:
718                     guids.append(guid)
719                 
720         if isinstance(guids, int):
721             guids = [guids]
722
723         # Create deployment group
724         # New guids can be added to a same deployment group later on
725         new_group = False
726         if not group:
727             new_group = True
728             group = self._group_id_generator.next()
729
730         if group not in self._groups:
731             self._groups[group] = []
732
733         self._groups[group].extend(guids)
734
735         def wait_all_and_start(group):
736             # Function that checks if all resources are READY
737             # before scheduling a start_with_conditions for each RM
738             reschedule = False
739             
740             # Get all guids in group
741             guids = self._groups[group]
742
743             for guid in guids:
744                 if self.state(guid) < ResourceState.READY:
745                     reschedule = True
746                     break
747
748             if reschedule:
749                 callback = functools.partial(wait_all_and_start, group)
750                 self.schedule("1s", callback)
751             else:
752                 # If all resources are ready, we schedule the start
753                 for guid in guids:
754                     rm = self.get_resource(guid)
755                     self.schedule("0s", rm.start_with_conditions)
756
757                     if rm.conditions.get(ResourceAction.STOP):
758                         # Only if the RM has STOP conditions we
759                         # schedule a stop. Otherwise the RM will stop immediately
760                         self.schedule("0s", rm.stop_with_conditions)
761
762         if wait_all_ready and new_group:
763             # Schedule a function to check that all resources are
764             # READY, and only then schedule the start.
765             # This aims at reducing the number of tasks looping in the 
766             # scheduler. 
767             # Instead of having many start tasks, we will have only one for 
768             # the whole group.
769             callback = functools.partial(wait_all_and_start, group)
770             self.schedule("0s", callback)
771
772         for guid in guids:
773             rm = self.get_resource(guid)
774             rm.deployment_group = group
775             self.schedule("0s", rm.deploy_with_conditions)
776
777             if not wait_all_ready:
778                 self.schedule("0s", rm.start_with_conditions)
779
780                 if rm.conditions.get(ResourceAction.STOP):
781                     # Only if the RM has STOP conditions we
782                     # schedule a stop. Otherwise the RM will stop immediately
783                     self.schedule("0s", rm.stop_with_conditions)
784
785     def release(self, guids = None):
786         """ Releases all ResourceManagers in the guids list.
787
788         If the argument 'guids' is not given, all RMs registered
789         in the experiment are released.
790
791             :param guids: List of RM guids
792             :type guids: list
793
794         """
795         if not guids:
796             guids = self.resources
797
798         # Remove all pending tasks from the scheduler queue
799         for tid in list(self._scheduler.pending):
800             self._scheduler.remove(tid)
801
802         self._runner.empty()
803
804         for guid in guids:
805             rm = self.get_resource(guid)
806             self.schedule("0s", rm.release)
807
808         self.wait_released(guids)
809         
810     def shutdown(self):
811         """ Releases all resources and stops the ExperimentController
812
813         """
814         # If there was a major failure we can't exit gracefully
815         if self._state == ECState.FAILED:
816             raise RuntimeError("EC failure. Can not exit gracefully")
817
818         self.release()
819
820         # Mark the EC state as TERMINATED
821         self._state = ECState.TERMINATED
822
823         # Stop processing thread
824         self._stop = True
825
826         # Notify condition to wake up the processing thread
827         self._notify()
828         
829         if self._thread.is_alive():
830            self._thread.join()
831
832     def schedule(self, date, callback, track = False):
833         """ Schedules a callback to be executed at time 'date'.
834
835             :param date: string containing execution time for the task.
836                     It can be expressed as an absolute time, using
837                     timestamp format, or as a relative time matching
838                     ^\d+.\d+(h|m|s|ms|us)$
839
840             :param callback: code to be executed for the task. Must be a
841                         Python function, and receives args and kwargs
842                         as arguments.
843
844             :param track: if set to True, the task will be retrievable with
845                     the get_task() method
846
847             :return : The Id of the task
848             :rtype: int
849             
850         """
851         timestamp = stabsformat(date)
852         task = Task(timestamp, callback)
853         task = self._scheduler.schedule(task)
854
855         if track:
856             self._tasks[task.id] = task
857
858         # Notify condition to wake up the processing thread
859         self._notify()
860
861         return task.id
862      
863     def _process(self):
864         """ Process scheduled tasks.
865
866         .. note::
867         
868         Tasks are scheduled by invoking the schedule method with a target 
869         callback and an execution time. 
870         The schedule method creates a new Task object with that callback 
871         and execution time, and pushes it into the '_scheduler' queue. 
872         The execution time and the order of arrival of tasks are used 
873         to order the tasks in the queue.
874
875         The _process method is executed in an independent thread held by 
876         the ExperimentController for as long as the experiment is running.
877         This method takes tasks from the '_scheduler' queue in a loop 
878         and processes them in parallel using multithreading. 
879         The environmental variable NEPI_NTHREADS can be used to control
880         the number of threads used to process tasks. The default value is 
881         50.
882
883         To execute tasks in parallel, a ParallelRunner (PR) object is used.
884         This object keeps a pool of threads (workers), and a queue of tasks
885         scheduled for 'immediate' execution. 
886         
887         On each iteration, the '_process' loop will take the next task that 
888         is scheduled for 'future' execution from the '_scheduler' queue, 
889         and if the execution time of that task is >= to the current time, 
890         it will push that task into the PR for 'immediate execution'. 
891         As soon as a worker is free, the PR will assign the next task to
892         that worker.
893
894         Upon receiving a task to execute, each PR worker (thread) will 
895         invoke the  _execute method of the EC, passing the task as 
896         argument.         
897         The _execute method will then invoke task.callback inside a 
898         try/except block. If an exception is raised by the tasks.callback, 
899         it will be trapped by the try block, logged to standard error 
900         (usually the console), and the task will be marked as failed.
901
902         """
903
904         self._runner.start()
905
906         while not self._stop:
907             try:
908                 self._cond.acquire()
909
910                 task = self._scheduler.next()
911                 
912                 if not task:
913                     # No task to execute. Wait for a new task to be scheduled.
914                     self._cond.wait()
915                 else:
916                     # The task timestamp is in the future. Wait for timeout 
917                     # or until another task is scheduled.
918                     now = tnow()
919                     if now < task.timestamp:
920                         # Calculate timeout in seconds
921                         timeout = tdiffsec(task.timestamp, now)
922
923                         # Re-schedule task with the same timestamp
924                         self._scheduler.schedule(task)
925                         
926                         task = None
927
928                         # Wait timeout or until a new task awakes the condition
929                         self._cond.wait(timeout)
930                
931                 self._cond.release()
932
933                 if task:
934                     # Process tasks in parallel
935                     self._runner.put(self._execute, task)
936             except: 
937                 import traceback
938                 err = traceback.format_exc()
939                 self.logger.error("Error while processing tasks in the EC: %s" % err)
940
941                 # Set the EC to FAILED state 
942                 self._state = ECState.FAILED
943             
944                 # Set the FailureManager failure level to EC failure
945                 self._fm.set_ec_failure()
946
947         self.logger.debug("Exiting the task processing loop ... ")
948         
949         self._runner.sync()
950         self._runner.destroy()
951
952     def _execute(self, task):
953         """ Executes a single task. 
954
955             :param task: Object containing the callback to execute
956             :type task: Task
957
958         """
959         try:
960             # Invoke callback
961             task.result = task.callback()
962             task.status = TaskStatus.DONE
963         except:
964             import traceback
965             err = traceback.format_exc()
966             task.result = err
967             task.status = TaskStatus.ERROR
968             
969             self.logger.error("Error occurred while executing task: %s" % err)
970
971     def _notify(self):
972         """ Awakes the processing thread if it is blocked waiting 
973         for new tasks to arrive
974         
975         """
976         self._cond.acquire()
977         self._cond.notify()
978         self._cond.release()
979