update OF code and examples
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38 import weakref
39
40 class FailureLevel(object):
41     """ Describes the system failure state """
42     OK = 1
43     RM_FAILURE = 2
44     EC_FAILURE = 3
45
46 class FailureManager(object):
47     """ The FailureManager is responsible for handling errors
48     and deciding whether an experiment should be aborted or not
49
50     """
51
52     def __init__(self, ec):
53         self._ec = weakref.ref(ec)
54         self._failure_level = FailureLevel.OK
55
56     @property
57     def ec(self):
58         """ Returns the ExperimentController associated to this FailureManager 
59         
60         """
61         
62         return self._ec()
63
64     @property
65     def abort(self):
66         if self._failure_level == FailureLevel.OK:
67             for guid in self.ec.resources:
68                 state = self.ec.state(guid)
69                 critical = self.ec.get(guid, "critical")
70                 if state == ResourceState.FAILED and critical:
71                     self._failure_level = FailureLevel.RM_FAILURE
72                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
73                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
74                     break
75
76         return self._failure_level != FailureLevel.OK
77
78     def set_ec_failure(self):
79         self._failure_level = FailureLevel.EC_FAILURE
80
81
82 class ECState(object):
83     """ Possible states for an ExperimentController
84    
85     """
86     RUNNING = 1
87     FAILED = 2
88     TERMINATED = 3
89
90 class ExperimentController(object):
91     """
92     .. class:: Class Args :
93       
94         :param exp_id: Human readable identifier for the experiment scenario. 
95         :type exp_id: str
96
97     .. note::
98
99     An experiment, or scenario, is defined by a concrete set of resources,
100     behavior, configuration and interconnection of those resources. 
101     The Experiment Description (ED) is a detailed representation of a
102     single experiment. It contains all the necessary information to 
103     allow repeating the experiment. NEPI allows to describe
104     experiments by registering components (resources), configuring them
105     and interconnecting them.
106     
107     A same experiment (scenario) can be executed many times, generating 
108     different results. We call an experiment execution (instance) a 'run'.
109
110     The ExperimentController (EC), is the entity responsible of
111     managing an experiment run. The same scenario can be 
112     recreated (and re-run) by instantiating an EC and recreating 
113     the same experiment description. 
114
115     In NEPI, an experiment is represented as a graph of interconnected
116     resources. A resource is a generic concept in the sense that any
117     component taking part of an experiment, whether physical of
118     virtual, is considered a resource. A resources could be a host, 
119     a virtual machine, an application, a simulator, a IP address.
120
121     A ResourceManager (RM), is the entity responsible for managing a 
122     single resource. ResourceManagers are specific to a resource
123     type (i.e. An RM to control a Linux application will not be
124     the same as the RM used to control a ns-3 simulation).
125     To support a new type of resource in NEPI, a new RM must be 
126     implemented. NEPI already provides a variety of
127     RMs to control basic resources, and new can be extended from
128     the existing ones.
129
130     Through the EC interface the user can create ResourceManagers (RMs),
131     configure them and interconnect them, to describe an experiment.
132     Describing an experiment through the EC does not run the experiment.
133     Only when the 'deploy()' method is invoked on the EC, the EC will take 
134     actions to transform the 'described' experiment into a 'running' experiment.
135
136     While the experiment is running, it is possible to continue to
137     create/configure/connect RMs, and to deploy them to involve new
138     resources in the experiment (this is known as 'interactive' deployment).
139     
140     An experiments in NEPI is identified by a string id, 
141     which is either given by the user, or automatically generated by NEPI.  
142     The purpose of this identifier is to separate files and results that 
143     belong to different experiment scenarios. 
144     However, since a same 'experiment' can be run many times, the experiment
145     id is not enough to identify an experiment instance (run).
146     For this reason, the ExperimentController has two identifier, the 
147     exp_id, which can be re-used in different ExperimentController,
148     and the run_id, which is unique to one ExperimentController instance, and
149     is automatically generated by NEPI.
150         
151     """
152
153     def __init__(self, exp_id = None): 
154         super(ExperimentController, self).__init__()
155
156         # Logging
157         self._logger = logging.getLogger("ExperimentController")
158
159         # Run identifier. It identifies a concrete execution instance (run) 
160         # of an experiment.
161         # Since a same experiment (same configuration) can be executed many 
162         # times, this run_id permits to separate result files generated on 
163         # different experiment executions
164         self._run_id = tsformat()
165
166         # Experiment identifier. Usually assigned by the user
167         # Identifies the experiment scenario (i.e. configuration, 
168         # resources used, etc)
169         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
170
171         # generator of globally unique ids
172         self._guid_generator = guid.GuidGenerator()
173         
174         # Resource managers
175         self._resources = dict()
176
177         # Scheduler. It a queue that holds tasks scheduled for
178         # execution, and yields the next task to be executed 
179         # ordered by execution and arrival time
180         self._scheduler = HeapScheduler()
181
182         # Tasks
183         self._tasks = dict()
184
185         # RM groups (for deployment) 
186         self._groups = dict()
187
188         # generator of globally unique id for groups
189         self._group_id_generator = guid.GuidGenerator()
190
191         # Flag to stop processing thread
192         self._stop = False
193     
194         # Entity in charge of managing system failures
195         self._fm = FailureManager(self)
196
197         # EC state
198         self._state = ECState.RUNNING
199
200         # The runner is a pool of threads used to parallelize 
201         # execution of tasks
202         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
203         self._runner = ParallelRun(maxthreads = nthreads)
204
205         # Event processing thread
206         self._cond = threading.Condition()
207         self._thread = threading.Thread(target = self._process)
208         self._thread.setDaemon(True)
209         self._thread.start()
210
211     @property
212     def logger(self):
213         """ Returns the logger instance of the Experiment Controller
214
215         """
216         return self._logger
217
218     @property
219     def ecstate(self):
220         """ Returns the state of the Experiment Controller
221
222         """
223         return self._state
224
225     @property
226     def exp_id(self):
227         """ Returns the experiment id assigned by the user
228
229         """
230         return self._exp_id
231
232     @property
233     def run_id(self):
234         """ Returns the experiment instance (run) identifier (automatically 
235         generated)
236
237         """
238         return self._run_id
239
240     @property
241     def abort(self):
242         """ Returns True if the experiment has failed and should be interrupted,
243         False otherwise.
244
245         """
246         return self._fm.abort
247
248     def wait_finished(self, guids):
249         """ Blocking method that waits until all RMs in the 'guids' list 
250         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
251         RELEASED ), or until a failure in the experiment occurs 
252         (i.e. abort == True) 
253         
254             :param guids: List of guids
255             :type guids: list
256
257         """
258
259         def quit():
260             return self.abort
261
262         return self.wait(guids, state = ResourceState.STOPPED, 
263                 quit = quit)
264
265     def wait_started(self, guids):
266         """ Blocking method that waits until all RMs in the 'guids' list 
267         have reached a state >= STARTED, or until a failure in the 
268         experiment occurs (i.e. abort == True) 
269         
270             :param guids: List of guids
271             :type guids: list
272
273         """
274
275         def quit():
276             return self.abort
277
278         return self.wait(guids, state = ResourceState.STARTED, 
279                 quit = quit)
280
281     def wait_released(self, guids):
282         """ Blocking method that waits until all RMs in the 'guids' list 
283         have reached a state == RELEASED, or until the EC fails 
284         
285             :param guids: List of guids
286             :type guids: list
287
288         """
289
290         def quit():
291             return self._state == ECState.FAILED
292
293         return self.wait(guids, state = ResourceState.RELEASED, 
294                 quit = quit)
295
296     def wait_deployed(self, guids):
297         """ Blocking method that waits until all RMs in the 'guids' list 
298         have reached a state >= READY, or until a failure in the 
299         experiment occurs (i.e. abort == True) 
300         
301             :param guids: List of guids
302             :type guids: list
303
304         """
305
306         def quit():
307             return self.abort
308
309         return self.wait(guids, state = ResourceState.READY, 
310                 quit = quit)
311
312     def wait(self, guids, state, quit):
313         """ Blocking method that waits until all RMs in the 'guids' list 
314         have reached a state >= 'state', or until the 'quit' callback
315         yields True
316            
317             :param guids: List of guids
318             :type guids: list
319         
320         """
321         
322         if isinstance(guids, int):
323             guids = [guids]
324
325         # Make a copy to avoid modifying the original guids list
326         guids = list(guids)
327
328         while True:
329             # If there are no more guids to wait for
330             # or the quit function returns True, exit the loop
331             if len(guids) == 0 or quit():
332                 break
333
334             # If a guid reached one of the target states, remove it from list
335             guid = guids[0]
336             rstate = self.state(guid)
337             
338             hrrstate = ResourceState2str.get(rstate)
339             hrstate = ResourceState2str.get(state)
340
341             if rstate >= state:
342                 guids.remove(guid)
343                 rm = self.get_resource(guid)
344                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
345                     rm.get_rtype(), guid, hrrstate, hrstate))
346             else:
347                 # Debug...
348                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
349                     guid, hrrstate, hrstate))
350                 time.sleep(0.5)
351   
352     def get_task(self, tid):
353         """ Returns a task by its id
354
355             :param tid: Id of the task
356             :type tid: int
357             
358             :rtype: Task
359             
360         """
361         return self._tasks.get(tid)
362
363     def get_resource(self, guid):
364         """ Returns a registered ResourceManager by its guid
365
366             :param guid: Id of the task
367             :type guid: int
368             
369             :rtype: ResourceManager
370             
371         """
372         return self._resources.get(guid)
373
374     @property
375     def resources(self):
376         """ Returns the set() of guids of all the ResourceManager
377
378             :return: Set of all RM guids
379             :rtype: set
380
381         """
382         return self._resources.keys()
383
384     def register_resource(self, rtype, guid = None):
385         """ Registers a new ResourceManager of type 'rtype' in the experiment
386         
387         This method will assign a new 'guid' for the RM, if no guid
388         is specified.
389
390             :param rtype: Type of the RM
391             :type rtype: str
392
393             :return: Guid of the RM
394             :rtype: int
395             
396         """
397         # Get next available guid
398         guid = self._guid_generator.next(guid)
399         
400         # Instantiate RM
401         rm = ResourceFactory.create(rtype, self, guid)
402
403         # Store RM
404         self._resources[guid] = rm
405
406         return guid
407
408     def get_attributes(self, guid):
409         """ Returns all the attributes of the RM with guid 'guid'
410
411             :param guid: Guid of the RM
412             :type guid: int
413
414             :return: List of attributes
415             :rtype: list
416
417         """
418         rm = self.get_resource(guid)
419         return rm.get_attributes()
420
421     def get_attribute(self, guid, name):
422         """ Returns the attribute 'name' of the RM with guid 'guid'
423
424             :param guid: Guid of the RM
425             :type guid: int
426
427             :param name: Name of the attribute
428             :type name: str
429
430             :return: The attribute with name 'name'
431             :rtype: Attribute
432
433         """
434         rm = self.get_resource(guid)
435         return rm.get_attribute(name)
436
437     def register_connection(self, guid1, guid2):
438         """ Registers a connection between a RM with guid 'guid1'
439         and another RM with guid 'guid2'. 
440     
441         The order of the in which the two guids are provided is not
442         important, since the connection relationship is symmetric.
443
444             :param guid1: First guid to connect
445             :type guid1: ResourceManager
446
447             :param guid2: Second guid to connect
448             :type guid: ResourceManager
449
450         """
451         rm1 = self.get_resource(guid1)
452         rm2 = self.get_resource(guid2)
453
454         rm1.register_connection(guid2)
455         rm2.register_connection(guid1)
456
457     def register_condition(self, guids1, action, guids2, state,
458             time = None):
459         """ Registers an action START, STOP or DEPLOY for all RM on list
460         guids1 to occur at time 'time' after all elements in list guids2 
461         have reached state 'state'.
462
463             :param guids1: List of guids of RMs subjected to action
464             :type guids1: list
465
466             :param action: Action to perform (either START, STOP or DEPLOY)
467             :type action: ResourceAction
468
469             :param guids2: List of guids of RMs to we waited for
470             :type guids2: list
471
472             :param state: State to wait for on RMs of list guids2 (STARTED,
473                 STOPPED, etc)
474             :type state: ResourceState
475
476             :param time: Time to wait after guids2 has reached status 
477             :type time: string
478
479         """
480         if isinstance(guids1, int):
481             guids1 = [guids1]
482         if isinstance(guids2, int):
483             guids2 = [guids2]
484
485         for guid1 in guids1:
486             rm = self.get_resource(guid1)
487             rm.register_condition(action, guids2, state, time)
488
489     def enable_trace(self, guid, name):
490         """ Enables a trace to be collected during the experiment run
491
492             :param name: Name of the trace
493             :type name: str
494
495         """
496         rm = self.get_resource(guid)
497         rm.enable_trace(name)
498
499     def trace_enabled(self, guid, name):
500         """ Returns True if the trace of name 'name' is enabled
501
502             :param name: Name of the trace
503             :type name: str
504
505         """
506         rm = self.get_resource(guid)
507         return rm.trace_enabled(name)
508
509     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
510         """ Returns information on a collected trace, the trace stream or 
511         blocks (chunks) of the trace stream
512
513             :param name: Name of the trace
514             :type name: str
515
516             :param attr: Can be one of:
517                          - TraceAttr.ALL (complete trace content), 
518                          - TraceAttr.STREAM (block in bytes to read starting 
519                                 at offset),
520                          - TraceAttr.PATH (full path to the trace file),
521                          - TraceAttr.SIZE (size of trace file). 
522             :type attr: str
523
524             :param block: Number of bytes to retrieve from trace, when attr is 
525                 TraceAttr.STREAM 
526             :type name: int
527
528             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
529             :type name: int
530
531             :rtype: str
532
533         """
534         rm = self.get_resource(guid)
535         return rm.trace(name, attr, block, offset)
536
537     def get_traces(self, guid):
538         """ Returns the list of the trace names of the RM with guid 'guid'
539
540             :param guid: Guid of the RM
541             :type guid: int
542
543             :return: List of trace names
544             :rtype: list
545
546         """
547         rm = self.get_resource(guid)
548         return rm.get_traces()
549
550
551     def discover(self, guid):
552         """ Discovers an available resource matching the criteria defined
553         by the RM with guid 'guid', and associates that resource to the RM
554
555         Not all RM types require (or are capable of) performing resource 
556         discovery. For the RM types which are not capable of doing so, 
557         invoking this method does not have any consequences. 
558
559             :param guid: Guid of the RM
560             :type guid: int
561
562         """
563         rm = self.get_resource(guid)
564         return rm.discover()
565
566     def provision(self, guid):
567         """ Provisions the resource associated to the RM with guid 'guid'.
568
569         Provisioning means making a resource 'accessible' to the user. 
570         Not all RM types require (or are capable of) performing resource 
571         provisioning. For the RM types which are not capable of doing so, 
572         invoking this method does not have any consequences. 
573
574             :param guid: Guid of the RM
575             :type guid: int
576
577         """
578         rm = self.get_resource(guid)
579         return rm.provision()
580
581     def get(self, guid, name):
582         """ Returns the value of the attribute with name 'name' on the
583         RM with guid 'guid'
584
585             :param guid: Guid of the RM
586             :type guid: int
587
588             :param name: Name of the attribute 
589             :type name: str
590
591             :return: The value of the attribute with name 'name'
592
593         """
594         rm = self.get_resource(guid)
595         return rm.get(name)
596
597     def set(self, guid, name, value):
598         """ Modifies the value of the attribute with name 'name' on the 
599         RM with guid 'guid'.
600
601             :param guid: Guid of the RM
602             :type guid: int
603
604             :param name: Name of the attribute
605             :type name: str
606
607             :param value: Value of the attribute
608
609         """
610         rm = self.get_resource(guid)
611         return rm.set(name, value)
612
613     def state(self, guid, hr = False):
614         """ Returns the state of a resource
615
616             :param guid: Resource guid
617             :type guid: integer
618
619             :param hr: Human readable. Forces return of a 
620                 status string instead of a number 
621             :type hr: boolean
622
623         """
624         rm = self.get_resource(guid)
625         state = rm.state
626
627         if hr:
628             return ResourceState2str.get(state)
629
630         return state
631
632     def stop(self, guid):
633         """ Stops the RM with guid 'guid'
634
635         Stopping a RM means that the resource it controls will
636         no longer take part of the experiment.
637
638             :param guid: Guid of the RM
639             :type guid: int
640
641         """
642         rm = self.get_resource(guid)
643         return rm.stop()
644
645     def start(self, guid):
646         """ Starts the RM with guid 'guid'
647
648         Starting a RM means that the resource it controls will
649         begin taking part of the experiment.
650
651             :param guid: Guid of the RM
652             :type guid: int
653
654         """
655         rm = self.get_resource(guid)
656         return rm.start()
657
658     def set_with_conditions(self, name, value, guids1, guids2, state,
659             time = None):
660         """ Modifies the value of attribute with name 'name' on all RMs 
661         on the guids1 list when time 'time' has elapsed since all 
662         elements in guids2 list have reached state 'state'.
663
664             :param name: Name of attribute to set in RM
665             :type name: string
666
667             :param value: Value of attribute to set in RM
668             :type name: string
669
670             :param guids1: List of guids of RMs subjected to action
671             :type guids1: list
672
673             :param action: Action to register (either START or STOP)
674             :type action: ResourceAction
675
676             :param guids2: List of guids of RMs to we waited for
677             :type guids2: list
678
679             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
680             :type state: ResourceState
681
682             :param time: Time to wait after guids2 has reached status 
683             :type time: string
684
685         """
686         if isinstance(guids1, int):
687             guids1 = [guids1]
688         if isinstance(guids2, int):
689             guids2 = [guids2]
690
691         for guid1 in guids1:
692             rm = self.get_resource(guid)
693             rm.set_with_conditions(name, value, guids2, state, time)
694
695     def deploy(self, guids = None, wait_all_ready = True, group = None):
696         """ Deploys all ResourceManagers in the guids list. 
697         
698         If the argument 'guids' is not given, all RMs with state NEW
699         are deployed.
700
701             :param guids: List of guids of RMs to deploy
702             :type guids: list
703
704             :param wait_all_ready: Wait until all RMs are ready in
705                 order to start the RMs
706             :type guid: int
707
708             :param group: Id of deployment group in which to deploy RMs
709             :type group: int
710
711         """
712         self.logger.debug(" ------- DEPLOY START ------ ")
713
714         if not guids:
715             # If no guids list was passed, all 'NEW' RMs will be deployed
716             guids = []
717             for guid in self.resources:
718                 if self.state(guid) == ResourceState.NEW:
719                     guids.append(guid)
720                 
721         if isinstance(guids, int):
722             guids = [guids]
723
724         # Create deployment group
725         # New guids can be added to a same deployment group later on
726         new_group = False
727         if not group:
728             new_group = True
729             group = self._group_id_generator.next()
730
731         if group not in self._groups:
732             self._groups[group] = []
733
734         self._groups[group].extend(guids)
735
736         def wait_all_and_start(group):
737             # Function that checks if all resources are READY
738             # before scheduling a start_with_conditions for each RM
739             reschedule = False
740             
741             # Get all guids in group
742             guids = self._groups[group]
743
744             for guid in guids:
745                 if self.state(guid) < ResourceState.READY:
746                     reschedule = True
747                     break
748
749             if reschedule:
750
751                 callback = functools.partial(wait_all_and_start, group)
752                 self.schedule("1s", callback)
753             else:
754                 # If all resources are ready, we schedule the start
755                 for guid in guids:
756                     rm = self.get_resource(guid)
757                     self.schedule("0s", rm.start_with_conditions)
758
759         if wait_all_ready and new_group:
760             # Schedule a function to check that all resources are
761             # READY, and only then schedule the start.
762             # This aims at reducing the number of tasks looping in the 
763             # scheduler. 
764             # Instead of having many start tasks, we will have only one for 
765             # the whole group.
766             callback = functools.partial(wait_all_and_start, group)
767             self.schedule("0s", callback)
768
769         for guid in guids:
770             rm = self.get_resource(guid)
771             rm.deployment_group = group
772             self.schedule("0s", rm.deploy_with_conditions)
773
774             if not wait_all_ready:
775                 self.schedule("0s", rm.start_with_conditions)
776
777             if rm.conditions.get(ResourceAction.STOP):
778                 # Only if the RM has STOP conditions we
779                 # schedule a stop. Otherwise the RM will stop immediately
780                 self.schedule("0s", rm.stop_with_conditions)
781
782     def release(self, guids = None):
783         """ Releases all ResourceManagers in the guids list.
784
785         If the argument 'guids' is not given, all RMs registered
786         in the experiment are released.
787
788             :param guids: List of RM guids
789             :type guids: list
790
791         """
792         if not guids:
793             guids = self.resources
794
795         # Remove all pending tasks from the scheduler queue
796         for tid in list(self._scheduler.pending):
797             self._scheduler.remove(tid)
798
799         self._runner.empty()
800
801         for guid in guids:
802             rm = self.get_resource(guid)
803             self.schedule("0s", rm.release)
804
805         self.wait_released(guids)
806         
807     def shutdown(self):
808         """ Releases all resources and stops the ExperimentController
809
810         """
811         # If there was a major failure we can't exit gracefully
812         if self._state == ECState.FAILED:
813             raise RuntimeError("EC failure. Can not exit gracefully")
814
815         self.release()
816
817         # Mark the EC state as TERMINATED
818         self._state = ECState.TERMINATED
819
820         # Stop processing thread
821         self._stop = True
822
823         # Notify condition to wake up the processing thread
824         self._notify()
825         
826         if self._thread.is_alive():
827            self._thread.join()
828
829     def schedule(self, date, callback, track = False):
830         """ Schedules a callback to be executed at time 'date'.
831
832             :param date: string containing execution time for the task.
833                     It can be expressed as an absolute time, using
834                     timestamp format, or as a relative time matching
835                     ^\d+.\d+(h|m|s|ms|us)$
836
837             :param callback: code to be executed for the task. Must be a
838                         Python function, and receives args and kwargs
839                         as arguments.
840
841             :param track: if set to True, the task will be retrievable with
842                     the get_task() method
843
844             :return : The Id of the task
845             :rtype: int
846             
847         """
848         timestamp = stabsformat(date)
849         task = Task(timestamp, callback)
850         task = self._scheduler.schedule(task)
851
852         if track:
853             self._tasks[task.id] = task
854
855         # Notify condition to wake up the processing thread
856         self._notify()
857
858         return task.id
859      
860     def _process(self):
861         """ Process scheduled tasks.
862
863         .. note::
864         
865         Tasks are scheduled by invoking the schedule method with a target 
866         callback and an execution time. 
867         The schedule method creates a new Task object with that callback 
868         and execution time, and pushes it into the '_scheduler' queue. 
869         The execution time and the order of arrival of tasks are used 
870         to order the tasks in the queue.
871
872         The _process method is executed in an independent thread held by 
873         the ExperimentController for as long as the experiment is running.
874         This method takes tasks from the '_scheduler' queue in a loop 
875         and processes them in parallel using multithreading. 
876         The environmental variable NEPI_NTHREADS can be used to control
877         the number of threads used to process tasks. The default value is 
878         50.
879
880         To execute tasks in parallel, a ParallelRunner (PR) object is used.
881         This object keeps a pool of threads (workers), and a queue of tasks
882         scheduled for 'immediate' execution. 
883         
884         On each iteration, the '_process' loop will take the next task that 
885         is scheduled for 'future' execution from the '_scheduler' queue, 
886         and if the execution time of that task is >= to the current time, 
887         it will push that task into the PR for 'immediate execution'. 
888         As soon as a worker is free, the PR will assign the next task to
889         that worker.
890
891         Upon receiving a task to execute, each PR worker (thread) will 
892         invoke the  _execute method of the EC, passing the task as 
893         argument.         
894         The _execute method will then invoke task.callback inside a 
895         try/except block. If an exception is raised by the tasks.callback, 
896         it will be trapped by the try block, logged to standard error 
897         (usually the console), and the task will be marked as failed.
898
899         """
900
901         self._runner.start()
902
903         while not self._stop:
904             try:
905                 self._cond.acquire()
906
907                 task = self._scheduler.next()
908                 
909                 if not task:
910                     # No task to execute. Wait for a new task to be scheduled.
911                     self._cond.wait()
912                 else:
913                     # The task timestamp is in the future. Wait for timeout 
914                     # or until another task is scheduled.
915                     now = tnow()
916                     if now < task.timestamp:
917                         # Calculate timeout in seconds
918                         timeout = tdiffsec(task.timestamp, now)
919
920                         # Re-schedule task with the same timestamp
921                         self._scheduler.schedule(task)
922                         
923                         task = None
924
925                         # Wait timeout or until a new task awakes the condition
926                         self._cond.wait(timeout)
927                
928                 self._cond.release()
929
930                 if task:
931                     # Process tasks in parallel
932                     self._runner.put(self._execute, task)
933             except: 
934                 import traceback
935                 err = traceback.format_exc()
936                 self.logger.error("Error while processing tasks in the EC: %s" % err)
937
938                 # Set the EC to FAILED state 
939                 self._state = ECState.FAILED
940             
941                 # Set the FailureManager failure level to EC failure
942                 self._fm.set_ec_failure()
943
944         self.logger.debug("Exiting the task processing loop ... ")
945         
946         self._runner.sync()
947         self._runner.destroy()
948
949     def _execute(self, task):
950         """ Executes a single task. 
951
952             :param task: Object containing the callback to execute
953             :type task: Task
954
955         """
956         # Invoke callback
957         task.status = TaskStatus.DONE
958
959         try:
960             task.result = task.callback()
961         except:
962             import traceback
963             err = traceback.format_exc()
964             task.result = err
965             task.status = TaskStatus.ERROR
966             
967             self.logger.error("Error occurred while executing task: %s" % err)
968
969     def _notify(self):
970         """ Awakes the processing thread if it is blocked waiting 
971         for new tasks to arrive
972         
973         """
974         self._cond.acquire()
975         self._cond.notify()
976         self._cond.release()
977