2c6557ffd99c709e43f2c32ac5ce1e61a7e09baa
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
32
33 import functools
34 import logging
35 import os
36 import sys
37 import time
38 import threading
39 import weakref
40
41 class FailureLevel(object):
42     """ Describes the system failure state """
43     OK = 1
44     RM_FAILURE = 2
45     EC_FAILURE = 3
46
47 class FailureManager(object):
48     """ The FailureManager is responsible for handling errors
49     and deciding whether an experiment should be aborted or not
50
51     """
52
53     def __init__(self, ec):
54         self._ec = weakref.ref(ec)
55         self._failure_level = FailureLevel.OK
56         self._abort = False
57
58     @property
59     def ec(self):
60         """ Returns the ExperimentController associated to this FailureManager 
61         
62         """
63         
64         return self._ec()
65
66     @property
67     def abort(self):
68         return self._abort
69
70     def eval_failure(self, guid):
71         if self._failure_level == FailureLevel.OK:
72             rm = self.ec.get_resource(guid)
73             state = rm.state
74             critical = rm.get("critical")
75
76             if state == ResourceState.FAILED and critical:
77                 self._failure_level = FailureLevel.RM_FAILURE
78                 self._abort = True
79                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
80                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
81
82     def set_ec_failure(self):
83         self._failure_level = FailureLevel.EC_FAILURE
84
85 class ECState(object):
86     """ Possible states for an ExperimentController
87    
88     """
89     RUNNING = 1
90     FAILED = 2
91     TERMINATED = 3
92
93 class ExperimentController(object):
94     """
95     .. class:: Class Args :
96       
97         :param exp_id: Human readable identifier for the experiment scenario. 
98         :type exp_id: str
99
100     .. note::
101
102     An experiment, or scenario, is defined by a concrete set of resources,
103     behavior, configuration and interconnection of those resources. 
104     The Experiment Description (ED) is a detailed representation of a
105     single experiment. It contains all the necessary information to 
106     allow repeating the experiment. NEPI allows to describe
107     experiments by registering components (resources), configuring them
108     and interconnecting them.
109     
110     A same experiment (scenario) can be executed many times, generating 
111     different results. We call an experiment execution (instance) a 'run'.
112
113     The ExperimentController (EC), is the entity responsible of
114     managing an experiment run. The same scenario can be 
115     recreated (and re-run) by instantiating an EC and recreating 
116     the same experiment description. 
117
118     In NEPI, an experiment is represented as a graph of interconnected
119     resources. A resource is a generic concept in the sense that any
120     component taking part of an experiment, whether physical of
121     virtual, is considered a resource. A resources could be a host, 
122     a virtual machine, an application, a simulator, a IP address.
123
124     A ResourceManager (RM), is the entity responsible for managing a 
125     single resource. ResourceManagers are specific to a resource
126     type (i.e. An RM to control a Linux application will not be
127     the same as the RM used to control a ns-3 simulation).
128     To support a new type of resource in NEPI, a new RM must be 
129     implemented. NEPI already provides a variety of
130     RMs to control basic resources, and new can be extended from
131     the existing ones.
132
133     Through the EC interface the user can create ResourceManagers (RMs),
134     configure them and interconnect them, to describe an experiment.
135     Describing an experiment through the EC does not run the experiment.
136     Only when the 'deploy()' method is invoked on the EC, the EC will take 
137     actions to transform the 'described' experiment into a 'running' experiment.
138
139     While the experiment is running, it is possible to continue to
140     create/configure/connect RMs, and to deploy them to involve new
141     resources in the experiment (this is known as 'interactive' deployment).
142     
143     An experiments in NEPI is identified by a string id, 
144     which is either given by the user, or automatically generated by NEPI.  
145     The purpose of this identifier is to separate files and results that 
146     belong to different experiment scenarios. 
147     However, since a same 'experiment' can be run many times, the experiment
148     id is not enough to identify an experiment instance (run).
149     For this reason, the ExperimentController has two identifier, the 
150     exp_id, which can be re-used in different ExperimentController,
151     and the run_id, which is unique to one ExperimentController instance, and
152     is automatically generated by NEPI.
153         
154     """
155
156     @classmethod
157     def load(cls, filepath, format = SFormats.XML):
158         serializer = ECSerializer()
159         ec = serializer.load(filepath)
160         return ec
161
162     def __init__(self, exp_id = None): 
163         super(ExperimentController, self).__init__()
164
165         # Logging
166         self._logger = logging.getLogger("ExperimentController")
167
168         # Run identifier. It identifies a concrete execution instance (run) 
169         # of an experiment.
170         # Since a same experiment (same configuration) can be executed many 
171         # times, this run_id permits to separate result files generated on 
172         # different experiment executions
173         self._run_id = tsformat()
174
175         # Experiment identifier. Usually assigned by the user
176         # Identifies the experiment scenario (i.e. configuration, 
177         # resources used, etc)
178         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
179
180         # generator of globally unique ids
181         self._guid_generator = guid.GuidGenerator()
182         
183         # Resource managers
184         self._resources = dict()
185
186         # Scheduler. It a queue that holds tasks scheduled for
187         # execution, and yields the next task to be executed 
188         # ordered by execution and arrival time
189         self._scheduler = HeapScheduler()
190
191         # Tasks
192         self._tasks = dict()
193
194         # RM groups (for deployment) 
195         self._groups = dict()
196
197         # generator of globally unique id for groups
198         self._group_id_generator = guid.GuidGenerator()
199
200         # Flag to stop processing thread
201         self._stop = False
202     
203         # Entity in charge of managing system failures
204         self._fm = FailureManager(self)
205
206         # EC state
207         self._state = ECState.RUNNING
208
209         # The runner is a pool of threads used to parallelize 
210         # execution of tasks
211         self._nthreads = 20
212         self._runner = None
213
214         # Event processing thread
215         self._cond = threading.Condition()
216         self._thread = threading.Thread(target = self._process)
217         self._thread.setDaemon(True)
218         self._thread.start()
219         
220     @property
221     def logger(self):
222         """ Returns the logger instance of the Experiment Controller
223
224         """
225         return self._logger
226
227     @property
228     def failure_level(self):
229         """ Returns the level of FAILURE of th experiment
230
231         """
232
233         return self._fm._failure_level
234
235     @property
236     def ecstate(self):
237         """ Returns the state of the Experiment Controller
238
239         """
240         return self._state
241
242     @property
243     def exp_id(self):
244         """ Returns the experiment id assigned by the user
245
246         """
247         return self._exp_id
248
249     @property
250     def run_id(self):
251         """ Returns the experiment instance (run) identifier (automatically 
252         generated)
253
254         """
255         return self._run_id
256
257     @property
258     def nthreads(self):
259         """ Returns the number of processing nthreads used
260
261         """
262         return self._nthreads
263
264  
265     @property
266     def abort(self):
267         """ Returns True if the experiment has failed and should be interrupted,
268         False otherwise.
269
270         """
271         return self._fm.abort
272
273     def inform_failure(self, guid):
274         """ Reports a failure in a RM to the EC for evaluation
275
276             :param guid: Resource id
277             :type guid: int
278
279         """
280
281         return self._fm.eval_failure(guid)
282
283     def wait_finished(self, guids):
284         """ Blocking method that waits until all RMs in the 'guids' list 
285         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
286         RELEASED ), or until a failure in the experiment occurs 
287         (i.e. abort == True) 
288         
289             :param guids: List of guids
290             :type guids: list
291
292         """
293
294         def quit():
295             return self.abort
296
297         return self.wait(guids, state = ResourceState.STOPPED, 
298                 quit = quit)
299
300     def wait_started(self, guids):
301         """ Blocking method that waits until all RMs in the 'guids' list 
302         have reached a state >= STARTED, or until a failure in the 
303         experiment occurs (i.e. abort == True) 
304         
305             :param guids: List of guids
306             :type guids: list
307
308         """
309
310         def quit():
311             return self.abort
312
313         return self.wait(guids, state = ResourceState.STARTED, 
314                 quit = quit)
315
316     def wait_released(self, guids):
317         """ Blocking method that waits until all RMs in the 'guids' list 
318         have reached a state == RELEASED, or until the EC fails 
319         
320             :param guids: List of guids
321             :type guids: list
322
323         """
324
325         def quit():
326             return self._state == ECState.FAILED
327
328         return self.wait(guids, state = ResourceState.RELEASED, 
329                 quit = quit)
330
331     def wait_deployed(self, guids):
332         """ Blocking method that waits until all RMs in the 'guids' list 
333         have reached a state >= READY, or until a failure in the 
334         experiment occurs (i.e. abort == True) 
335         
336             :param guids: List of guids
337             :type guids: list
338
339         """
340
341         def quit():
342             return self.abort
343
344         return self.wait(guids, state = ResourceState.READY, 
345                 quit = quit)
346
347     def wait(self, guids, state, quit):
348         """ Blocking method that waits until all RMs in the 'guids' list 
349         have reached a state >= 'state', or until the 'quit' callback
350         yields True
351            
352             :param guids: List of guids
353             :type guids: list
354         
355         """
356         if isinstance(guids, int):
357             guids = [guids]
358
359         # Make a copy to avoid modifying the original guids list
360         guids = list(guids)
361
362         while True:
363             # If there are no more guids to wait for
364             # or the quit function returns True, exit the loop
365             if len(guids) == 0 or quit():
366                 break
367
368             # If a guid reached one of the target states, remove it from list
369             guid = guids.pop()
370             rm = self.get_resource(guid)
371             rstate = rm.state
372             
373             if rstate >= state:
374                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
375                     rm.get_rtype(), guid, rstate, state))
376             else:
377                 # Debug...
378                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
379                     guid, rstate, state))
380
381                 guids.append(guid)
382
383                 time.sleep(0.5)
384
385     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
386         plotter = ECPlotter()
387         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
388                 show = show)
389         return fpath
390
391     def serialize(self, format = SFormats.XML):
392         serializer = ECSerializer()
393         sec = serializer.load(self, format = format)
394         return sec
395
396     def save(self, dirpath = None, format = SFormats.XML):
397         serializer = ECSerializer()
398         path = serializer.save(self, dirpath  = None, format = format)
399         return path
400
401     def get_task(self, tid):
402         """ Returns a task by its id
403
404             :param tid: Id of the task
405             :type tid: int
406             
407             :rtype: Task
408             
409         """
410         return self._tasks.get(tid)
411
412     def get_resource(self, guid):
413         """ Returns a registered ResourceManager by its guid
414
415             :param guid: Id of the resource
416             :type guid: int
417             
418             :rtype: ResourceManager
419             
420         """
421         rm = self._resources.get(guid)
422         return rm
423
424     def get_resources_by_type(self, rtype):
425         """ Returns a registered ResourceManager by its guid
426
427             :param rtype: Resource type
428             :type rtype: string
429             
430             :rtype: list of ResourceManagers
431             
432         """
433         rms = []
434         for guid, rm in self._resources.iteritems():
435             if rm.get_rtype() == type: 
436                 rms.append(rm)
437         return rms
438
439     def remove_resource(self, guid):
440         del self._resources[guid]
441
442     @property
443     def resources(self):
444         """ Returns the set() of guids of all the ResourceManager
445
446             :return: Set of all RM guids
447             :rtype: set
448
449         """
450         keys = self._resources.keys()
451
452         return keys
453
454     def register_resource(self, rtype, guid = None):
455         """ Registers a new ResourceManager of type 'rtype' in the experiment
456         
457         This method will assign a new 'guid' for the RM, if no guid
458         is specified.
459
460             :param rtype: Type of the RM
461             :type rtype: str
462
463             :return: Guid of the RM
464             :rtype: int
465             
466         """
467         # Get next available guid
468         guid = self._guid_generator.next(guid)
469         
470         # Instantiate RM
471         rm = ResourceFactory.create(rtype, self, guid)
472
473         # Store RM
474         self._resources[guid] = rm
475
476         return guid
477
478     def get_attributes(self, guid):
479         """ Returns all the attributes of the RM with guid 'guid'
480
481             :param guid: Guid of the RM
482             :type guid: int
483
484             :return: List of attributes
485             :rtype: list
486
487         """
488         rm = self.get_resource(guid)
489         return rm.get_attributes()
490
491     def get_attribute(self, guid, name):
492         """ Returns the attribute 'name' of the RM with guid 'guid'
493
494             :param guid: Guid of the RM
495             :type guid: int
496
497             :param name: Name of the attribute
498             :type name: str
499
500             :return: The attribute with name 'name'
501             :rtype: Attribute
502
503         """
504         rm = self.get_resource(guid)
505         return rm.get_attribute(name)
506
507     def register_connection(self, guid1, guid2):
508         """ Registers a connection between a RM with guid 'guid1'
509         and another RM with guid 'guid2'. 
510     
511         The order of the in which the two guids are provided is not
512         important, since the connection relationship is symmetric.
513
514             :param guid1: First guid to connect
515             :type guid1: ResourceManager
516
517             :param guid2: Second guid to connect
518             :type guid: ResourceManager
519
520         """
521         rm1 = self.get_resource(guid1)
522         rm2 = self.get_resource(guid2)
523
524         rm1.register_connection(guid2)
525         rm2.register_connection(guid1)
526
527     def register_condition(self, guids1, action, guids2, state,
528             time = None):
529         """ Registers an action START, STOP or DEPLOY for all RM on list
530         guids1 to occur at time 'time' after all elements in list guids2 
531         have reached state 'state'.
532
533             :param guids1: List of guids of RMs subjected to action
534             :type guids1: list
535
536             :param action: Action to perform (either START, STOP or DEPLOY)
537             :type action: ResourceAction
538
539             :param guids2: List of guids of RMs to we waited for
540             :type guids2: list
541
542             :param state: State to wait for on RMs of list guids2 (STARTED,
543                 STOPPED, etc)
544             :type state: ResourceState
545
546             :param time: Time to wait after guids2 has reached status 
547             :type time: string
548
549         """
550         if isinstance(guids1, int):
551             guids1 = [guids1]
552         if isinstance(guids2, int):
553             guids2 = [guids2]
554
555         for guid1 in guids1:
556             rm = self.get_resource(guid1)
557             rm.register_condition(action, guids2, state, time)
558
559     def enable_trace(self, guid, name):
560         """ Enables a trace to be collected during the experiment run
561
562             :param name: Name of the trace
563             :type name: str
564
565         """
566         rm = self.get_resource(guid)
567         rm.enable_trace(name)
568
569     def trace_enabled(self, guid, name):
570         """ Returns True if the trace of name 'name' is enabled
571
572             :param name: Name of the trace
573             :type name: str
574
575         """
576         rm = self.get_resource(guid)
577         return rm.trace_enabled(name)
578
579     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
580         """ Returns information on a collected trace, the trace stream or 
581         blocks (chunks) of the trace stream
582
583             :param name: Name of the trace
584             :type name: str
585
586             :param attr: Can be one of:
587                          - TraceAttr.ALL (complete trace content), 
588                          - TraceAttr.STREAM (block in bytes to read starting 
589                                 at offset),
590                          - TraceAttr.PATH (full path to the trace file),
591                          - TraceAttr.SIZE (size of trace file). 
592             :type attr: str
593
594             :param block: Number of bytes to retrieve from trace, when attr is 
595                 TraceAttr.STREAM 
596             :type name: int
597
598             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
599             :type name: int
600
601             :rtype: str
602
603         """
604         rm = self.get_resource(guid)
605         return rm.trace(name, attr, block, offset)
606
607     def get_traces(self, guid):
608         """ Returns the list of the trace names of the RM with guid 'guid'
609
610             :param guid: Guid of the RM
611             :type guid: int
612
613             :return: List of trace names
614             :rtype: list
615
616         """
617         rm = self.get_resource(guid)
618         return rm.get_traces()
619
620
621     def discover(self, guid):
622         """ Discovers an available resource matching the criteria defined
623         by the RM with guid 'guid', and associates that resource to the RM
624
625         Not all RM types require (or are capable of) performing resource 
626         discovery. For the RM types which are not capable of doing so, 
627         invoking this method does not have any consequences. 
628
629             :param guid: Guid of the RM
630             :type guid: int
631
632         """
633         rm = self.get_resource(guid)
634         return rm.discover()
635
636     def provision(self, guid):
637         """ Provisions the resource associated to the RM with guid 'guid'.
638
639         Provisioning means making a resource 'accessible' to the user. 
640         Not all RM types require (or are capable of) performing resource 
641         provisioning. For the RM types which are not capable of doing so, 
642         invoking this method does not have any consequences. 
643
644             :param guid: Guid of the RM
645             :type guid: int
646
647         """
648         rm = self.get_resource(guid)
649         return rm.provision()
650
651     def get(self, guid, name):
652         """ Returns the value of the attribute with name 'name' on the
653         RM with guid 'guid'
654
655             :param guid: Guid of the RM
656             :type guid: int
657
658             :param name: Name of the attribute 
659             :type name: str
660
661             :return: The value of the attribute with name 'name'
662
663         """
664         rm = self.get_resource(guid)
665         return rm.get(name)
666
667     def set(self, guid, name, value):
668         """ Modifies the value of the attribute with name 'name' on the 
669         RM with guid 'guid'.
670
671             :param guid: Guid of the RM
672             :type guid: int
673
674             :param name: Name of the attribute
675             :type name: str
676
677             :param value: Value of the attribute
678
679         """
680         rm = self.get_resource(guid)
681         rm.set(name, value)
682
683     def get_global(self, rtype, name):
684         """ Returns the value of the global attribute with name 'name' on the
685         RMs of rtype 'rtype'.
686
687             :param guid: Guid of the RM
688             :type guid: int
689
690             :param name: Name of the attribute 
691             :type name: str
692
693             :return: The value of the attribute with name 'name'
694
695         """
696         rclass = ResourceFactory.get_resource_type(rtype)
697         return rclass.get_global(name)
698
699     def set_global(self, rtype, name, value):
700         """ Modifies the value of the global attribute with name 'name' on the 
701         RMs of with rtype 'rtype'.
702
703             :param guid: Guid of the RM
704             :type guid: int
705
706             :param name: Name of the attribute
707             :type name: str
708
709             :param value: Value of the attribute
710
711         """
712         rclass = ResourceFactory.get_resource_type(rtype)
713         return rclass.set_global(name, value)
714
715     def state(self, guid, hr = False):
716         """ Returns the state of a resource
717
718             :param guid: Resource guid
719             :type guid: integer
720
721             :param hr: Human readable. Forces return of a 
722                 status string instead of a number 
723             :type hr: boolean
724
725         """
726         rm = self.get_resource(guid)
727         state = rm.state
728
729         if hr:
730             return ResourceState2str.get(state)
731
732         return state
733
734     def stop(self, guid):
735         """ Stops the RM with guid 'guid'
736
737         Stopping a RM means that the resource it controls will
738         no longer take part of the experiment.
739
740             :param guid: Guid of the RM
741             :type guid: int
742
743         """
744         rm = self.get_resource(guid)
745         return rm.stop()
746
747     def start(self, guid):
748         """ Starts the RM with guid 'guid'
749
750         Starting a RM means that the resource it controls will
751         begin taking part of the experiment.
752
753             :param guid: Guid of the RM
754             :type guid: int
755
756         """
757         rm = self.get_resource(guid)
758         return rm.start()
759
760     def get_start_time(self, guid):
761         """ Returns the start time of the RM as a timestamp """
762         rm = self.get_resource(guid)
763         return rm.start_time
764
765     def get_stop_time(self, guid):
766         """ Returns the stop time of the RM as a timestamp """
767         rm = self.get_resource(guid)
768         return rm.stop_time
769
770     def get_discover_time(self, guid):
771         """ Returns the discover time of the RM as a timestamp """
772         rm = self.get_resource(guid)
773         return rm.discover_time
774
775     def get_provision_time(self, guid):
776         """ Returns the provision time of the RM as a timestamp """
777         rm = self.get_resource(guid)
778         return rm.provision_time
779
780     def get_ready_time(self, guid):
781         """ Returns the deployment time of the RM as a timestamp """
782         rm = self.get_resource(guid)
783         return rm.ready_time
784
785     def get_release_time(self, guid):
786         """ Returns the release time of the RM as a timestamp """
787         rm = self.get_resource(guid)
788         return rm.release_time
789
790     def get_failed_time(self, guid):
791         """ Returns the time failure occured for the RM as a timestamp """
792         rm = self.get_resource(guid)
793         return rm.failed_time
794
795     def set_with_conditions(self, name, value, guids1, guids2, state,
796             time = None):
797         """ Modifies the value of attribute with name 'name' on all RMs 
798         on the guids1 list when time 'time' has elapsed since all 
799         elements in guids2 list have reached state 'state'.
800
801             :param name: Name of attribute to set in RM
802             :type name: string
803
804             :param value: Value of attribute to set in RM
805             :type name: string
806
807             :param guids1: List of guids of RMs subjected to action
808             :type guids1: list
809
810             :param action: Action to register (either START or STOP)
811             :type action: ResourceAction
812
813             :param guids2: List of guids of RMs to we waited for
814             :type guids2: list
815
816             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
817             :type state: ResourceState
818
819             :param time: Time to wait after guids2 has reached status 
820             :type time: string
821
822         """
823         if isinstance(guids1, int):
824             guids1 = [guids1]
825         if isinstance(guids2, int):
826             guids2 = [guids2]
827
828         for guid1 in guids1:
829             rm = self.get_resource(guid)
830             rm.set_with_conditions(name, value, guids2, state, time)
831
832     def deploy(self, guids = None, wait_all_ready = True, group = None):
833         """ Deploys all ResourceManagers in the guids list. 
834         
835         If the argument 'guids' is not given, all RMs with state NEW
836         are deployed.
837
838             :param guids: List of guids of RMs to deploy
839             :type guids: list
840
841             :param wait_all_ready: Wait until all RMs are ready in
842                 order to start the RMs
843             :type guid: int
844
845             :param group: Id of deployment group in which to deploy RMs
846             :type group: int
847
848         """
849         self.logger.debug(" ------- DEPLOY START ------ ")
850
851         if not guids:
852             # If no guids list was passed, all 'NEW' RMs will be deployed
853             guids = []
854             for guid, rm in self._resources.iteritems():
855                 if rm.state == ResourceState.NEW:
856                     guids.append(guid)
857                 
858         if isinstance(guids, int):
859             guids = [guids]
860
861         # Create deployment group
862         # New guids can be added to a same deployment group later on
863         new_group = False
864         if not group:
865             new_group = True
866             group = self._group_id_generator.next()
867
868         if group not in self._groups:
869             self._groups[group] = []
870
871         self._groups[group].extend(guids)
872
873         def wait_all_and_start(group):
874             # Function that checks if all resources are READY
875             # before scheduling a start_with_conditions for each RM
876             reschedule = False
877             
878             # Get all guids in group
879             guids = self._groups[group]
880
881             for guid in guids:
882                 if self.state(guid) < ResourceState.READY:
883                     reschedule = True
884                     break
885
886             if reschedule:
887                 callback = functools.partial(wait_all_and_start, group)
888                 self.schedule("1s", callback)
889             else:
890                 # If all resources are ready, we schedule the start
891                 for guid in guids:
892                     rm = self.get_resource(guid)
893                     self.schedule("0s", rm.start_with_conditions)
894
895                     if rm.conditions.get(ResourceAction.STOP):
896                         # Only if the RM has STOP conditions we
897                         # schedule a stop. Otherwise the RM will stop immediately
898                         self.schedule("0s", rm.stop_with_conditions)
899
900         if wait_all_ready and new_group:
901             # Schedule a function to check that all resources are
902             # READY, and only then schedule the start.
903             # This aims at reducing the number of tasks looping in the 
904             # scheduler. 
905             # Instead of having many start tasks, we will have only one for 
906             # the whole group.
907             callback = functools.partial(wait_all_and_start, group)
908             self.schedule("0s", callback)
909
910         for guid in guids:
911             rm = self.get_resource(guid)
912             rm.deployment_group = group
913             self.schedule("0s", rm.deploy_with_conditions)
914
915             if not wait_all_ready:
916                 self.schedule("0s", rm.start_with_conditions)
917
918                 if rm.conditions.get(ResourceAction.STOP):
919                     # Only if the RM has STOP conditions we
920                     # schedule a stop. Otherwise the RM will stop immediately
921                     self.schedule("0s", rm.stop_with_conditions)
922
923     def release(self, guids = None):
924         """ Releases all ResourceManagers in the guids list.
925
926         If the argument 'guids' is not given, all RMs registered
927         in the experiment are released.
928
929             :param guids: List of RM guids
930             :type guids: list
931
932         """
933         if isinstance(guids, int):
934             guids = [guids]
935
936         if not guids:
937             guids = self.resources
938
939         for guid in guids:
940             rm = self.get_resource(guid)
941             self.schedule("0s", rm.release)
942
943         self.wait_released(guids)
944
945         for guid in guids:
946             if self.get(guid, "hardRelease"):
947                 self.remove_resource(guid)
948         
949     def shutdown(self):
950         """ Releases all resources and stops the ExperimentController
951
952         """
953         # If there was a major failure we can't exit gracefully
954         if self._state == ECState.FAILED:
955             raise RuntimeError("EC failure. Can not exit gracefully")
956
957         # Remove all pending tasks from the scheduler queue
958         for tid in list(self._scheduler.pending):
959             self._scheduler.remove(tid)
960
961         # Remove pending tasks from the workers queue
962         self._runner.empty()
963
964         self.release()
965
966         # Mark the EC state as TERMINATED
967         self._state = ECState.TERMINATED
968
969         # Stop processing thread
970         self._stop = True
971
972         # Notify condition to wake up the processing thread
973         self._notify()
974         
975         if self._thread.is_alive():
976            self._thread.join()
977
978     def schedule(self, date, callback, track = False):
979         """ Schedules a callback to be executed at time 'date'.
980
981             :param date: string containing execution time for the task.
982                     It can be expressed as an absolute time, using
983                     timestamp format, or as a relative time matching
984                     ^\d+.\d+(h|m|s|ms|us)$
985
986             :param callback: code to be executed for the task. Must be a
987                         Python function, and receives args and kwargs
988                         as arguments.
989
990             :param track: if set to True, the task will be retrievable with
991                     the get_task() method
992
993             :return : The Id of the task
994             :rtype: int
995             
996         """
997         timestamp = stabsformat(date)
998         task = Task(timestamp, callback)
999         task = self._scheduler.schedule(task)
1000
1001         if track:
1002             self._tasks[task.id] = task
1003
1004         # Notify condition to wake up the processing thread
1005         self._notify()
1006
1007         return task.id
1008      
1009     def _process(self):
1010         """ Process scheduled tasks.
1011
1012         .. note::
1013         
1014         Tasks are scheduled by invoking the schedule method with a target 
1015         callback and an execution time. 
1016         The schedule method creates a new Task object with that callback 
1017         and execution time, and pushes it into the '_scheduler' queue. 
1018         The execution time and the order of arrival of tasks are used 
1019         to order the tasks in the queue.
1020
1021         The _process method is executed in an independent thread held by 
1022         the ExperimentController for as long as the experiment is running.
1023         This method takes tasks from the '_scheduler' queue in a loop 
1024         and processes them in parallel using multithreading. 
1025         The environmental variable NEPI_NTHREADS can be used to control
1026         the number of threads used to process tasks. The default value is 
1027         50.
1028
1029         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1030         This object keeps a pool of threads (workers), and a queue of tasks
1031         scheduled for 'immediate' execution. 
1032         
1033         On each iteration, the '_process' loop will take the next task that 
1034         is scheduled for 'future' execution from the '_scheduler' queue, 
1035         and if the execution time of that task is >= to the current time, 
1036         it will push that task into the PR for 'immediate execution'. 
1037         As soon as a worker is free, the PR will assign the next task to
1038         that worker.
1039
1040         Upon receiving a task to execute, each PR worker (thread) will 
1041         invoke the  _execute method of the EC, passing the task as 
1042         argument.         
1043         The _execute method will then invoke task.callback inside a 
1044         try/except block. If an exception is raised by the tasks.callback, 
1045         it will be trapped by the try block, logged to standard error 
1046         (usually the console), and the task will be marked as failed.
1047
1048         """
1049
1050         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1051         self._runner = ParallelRun(maxthreads = self.nthreads)
1052         self._runner.start()
1053
1054         while not self._stop:
1055             try:
1056                 self._cond.acquire()
1057
1058                 task = self._scheduler.next()
1059                 
1060                 if not task:
1061                     # No task to execute. Wait for a new task to be scheduled.
1062                     self._cond.wait()
1063                 else:
1064                     # The task timestamp is in the future. Wait for timeout 
1065                     # or until another task is scheduled.
1066                     now = tnow()
1067                     if now < task.timestamp:
1068                         # Calculate timeout in seconds
1069                         timeout = tdiffsec(task.timestamp, now)
1070
1071                         # Re-schedule task with the same timestamp
1072                         self._scheduler.schedule(task)
1073                         
1074                         task = None
1075
1076                         # Wait timeout or until a new task awakes the condition
1077                         self._cond.wait(timeout)
1078                
1079                 self._cond.release()
1080
1081                 if task:
1082                     # Process tasks in parallel
1083                     self._runner.put(self._execute, task)
1084             except: 
1085                 import traceback
1086                 err = traceback.format_exc()
1087                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1088
1089                 # Set the EC to FAILED state 
1090                 self._state = ECState.FAILED
1091             
1092                 # Set the FailureManager failure level to EC failure
1093                 self._fm.set_ec_failure()
1094
1095         self.logger.debug("Exiting the task processing loop ... ")
1096         
1097         self._runner.sync()
1098         self._runner.destroy()
1099
1100     def _execute(self, task):
1101         """ Executes a single task. 
1102
1103             :param task: Object containing the callback to execute
1104             :type task: Task
1105
1106         """
1107         try:
1108             # Invoke callback
1109             task.result = task.callback()
1110             task.status = TaskStatus.DONE
1111         except:
1112             import traceback
1113             err = traceback.format_exc()
1114             task.result = err
1115             task.status = TaskStatus.ERROR
1116             
1117             self.logger.error("Error occurred while executing task: %s" % err)
1118
1119     def _notify(self):
1120         """ Awakes the processing thread if it is blocked waiting 
1121         for new tasks to arrive
1122         
1123         """
1124         self._cond.acquire()
1125         self._cond.notify()
1126         self._cond.release()
1127