SFA for PL without specifing hostname
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54
55     @property
56     def ec(self):
57         """ Returns the ExperimentController associated to this FailureManager 
58         
59         """
60         
61         return self._ec()
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.OK:
66             for guid in self.ec.resources:
67                 try:
68                     state = self.ec.state(guid)
69                     critical = self.ec.get(guid, "critical")
70                     if state == ResourceState.FAILED and critical:
71                         self._failure_level = FailureLevel.RM_FAILURE
72                         self.ec.logger.debug("RM critical failure occurred on guid %d." \
73                                 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
74                         break
75                 except:
76                     # An error might occure because a RM was deleted abruptly.
77                     # In this case the error should be ignored.
78                     if guid in self.ec._resources:
79                         raise
80
81         return self._failure_level != FailureLevel.OK
82
83     def set_ec_failure(self):
84         self._failure_level = FailureLevel.EC_FAILURE
85
86 class ECState(object):
87     """ Possible states for an ExperimentController
88    
89     """
90     RUNNING = 1
91     FAILED = 2
92     TERMINATED = 3
93
94 class ExperimentController(object):
95     """
96     .. class:: Class Args :
97       
98         :param exp_id: Human readable identifier for the experiment scenario. 
99         :type exp_id: str
100
101     .. note::
102
103     An experiment, or scenario, is defined by a concrete set of resources,
104     behavior, configuration and interconnection of those resources. 
105     The Experiment Description (ED) is a detailed representation of a
106     single experiment. It contains all the necessary information to 
107     allow repeating the experiment. NEPI allows to describe
108     experiments by registering components (resources), configuring them
109     and interconnecting them.
110     
111     A same experiment (scenario) can be executed many times, generating 
112     different results. We call an experiment execution (instance) a 'run'.
113
114     The ExperimentController (EC), is the entity responsible of
115     managing an experiment run. The same scenario can be 
116     recreated (and re-run) by instantiating an EC and recreating 
117     the same experiment description. 
118
119     In NEPI, an experiment is represented as a graph of interconnected
120     resources. A resource is a generic concept in the sense that any
121     component taking part of an experiment, whether physical of
122     virtual, is considered a resource. A resources could be a host, 
123     a virtual machine, an application, a simulator, a IP address.
124
125     A ResourceManager (RM), is the entity responsible for managing a 
126     single resource. ResourceManagers are specific to a resource
127     type (i.e. An RM to control a Linux application will not be
128     the same as the RM used to control a ns-3 simulation).
129     To support a new type of resource in NEPI, a new RM must be 
130     implemented. NEPI already provides a variety of
131     RMs to control basic resources, and new can be extended from
132     the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154         
155     """
156
157     def __init__(self, exp_id = None): 
158         super(ExperimentController, self).__init__()
159
160         # Logging
161         self._logger = logging.getLogger("ExperimentController")
162
163         # Run identifier. It identifies a concrete execution instance (run) 
164         # of an experiment.
165         # Since a same experiment (same configuration) can be executed many 
166         # times, this run_id permits to separate result files generated on 
167         # different experiment executions
168         self._run_id = tsformat()
169
170         # Experiment identifier. Usually assigned by the user
171         # Identifies the experiment scenario (i.e. configuration, 
172         # resources used, etc)
173         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
174
175         # generator of globally unique ids
176         self._guid_generator = guid.GuidGenerator()
177         
178         # Resource managers
179         self._resources = dict()
180
181         # Scheduler. It a queue that holds tasks scheduled for
182         # execution, and yields the next task to be executed 
183         # ordered by execution and arrival time
184         self._scheduler = HeapScheduler()
185
186         # Tasks
187         self._tasks = dict()
188
189         # RM groups (for deployment) 
190         self._groups = dict()
191
192         # generator of globally unique id for groups
193         self._group_id_generator = guid.GuidGenerator()
194
195         # Flag to stop processing thread
196         self._stop = False
197     
198         # Entity in charge of managing system failures
199         self._fm = FailureManager(self)
200
201         # EC state
202         self._state = ECState.RUNNING
203
204         # The runner is a pool of threads used to parallelize 
205         # execution of tasks
206         nthreads = int(os.environ.get("NEPI_NTHREADS", "3"))
207         self._runner = ParallelRun(maxthreads = nthreads)
208
209         # Event processing thread
210         self._cond = threading.Condition()
211         self._thread = threading.Thread(target = self._process)
212         self._thread.setDaemon(True)
213         self._thread.start()
214         
215     @property
216     def logger(self):
217         """ Returns the logger instance of the Experiment Controller
218
219         """
220         return self._logger
221
222     @property
223     def ecstate(self):
224         """ Returns the state of the Experiment Controller
225
226         """
227         return self._state
228
229     @property
230     def exp_id(self):
231         """ Returns the experiment id assigned by the user
232
233         """
234         return self._exp_id
235
236     @property
237     def run_id(self):
238         """ Returns the experiment instance (run) identifier (automatically 
239         generated)
240
241         """
242         return self._run_id
243
244     @property
245     def abort(self):
246         """ Returns True if the experiment has failed and should be interrupted,
247         False otherwise.
248
249         """
250         return self._fm.abort
251
252     def wait_finished(self, guids):
253         """ Blocking method that waits until all RMs in the 'guids' list 
254         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
255         RELEASED ), or until a failure in the experiment occurs 
256         (i.e. abort == True) 
257         
258             :param guids: List of guids
259             :type guids: list
260
261         """
262
263         def quit():
264             return self.abort
265
266         return self.wait(guids, state = ResourceState.STOPPED, 
267                 quit = quit)
268
269     def wait_started(self, guids):
270         """ Blocking method that waits until all RMs in the 'guids' list 
271         have reached a state >= STARTED, or until a failure in the 
272         experiment occurs (i.e. abort == True) 
273         
274             :param guids: List of guids
275             :type guids: list
276
277         """
278
279         def quit():
280             return self.abort
281
282         return self.wait(guids, state = ResourceState.STARTED, 
283                 quit = quit)
284
285     def wait_released(self, guids):
286         """ Blocking method that waits until all RMs in the 'guids' list 
287         have reached a state == RELEASED, or until the EC fails 
288         
289             :param guids: List of guids
290             :type guids: list
291
292         """
293
294         def quit():
295             return self._state == ECState.FAILED
296
297         return self.wait(guids, state = ResourceState.RELEASED, 
298                 quit = quit)
299
300     def wait_deployed(self, guids):
301         """ Blocking method that waits until all RMs in the 'guids' list 
302         have reached a state >= READY, or until a failure in the 
303         experiment occurs (i.e. abort == True) 
304         
305             :param guids: List of guids
306             :type guids: list
307
308         """
309
310         def quit():
311             return self.abort
312
313         return self.wait(guids, state = ResourceState.READY, 
314                 quit = quit)
315
316     def wait(self, guids, state, quit):
317         """ Blocking method that waits until all RMs in the 'guids' list 
318         have reached a state >= 'state', or until the 'quit' callback
319         yields True
320            
321             :param guids: List of guids
322             :type guids: list
323         
324         """
325         if isinstance(guids, int):
326             guids = [guids]
327
328         # Make a copy to avoid modifying the original guids list
329         guids = list(guids)
330
331         while True:
332             # If there are no more guids to wait for
333             # or the quit function returns True, exit the loop
334             if len(guids) == 0 or quit():
335                 break
336
337             # If a guid reached one of the target states, remove it from list
338             guid = guids[0]
339             rstate = self.state(guid)
340             
341             hrrstate = ResourceState2str.get(rstate)
342             hrstate = ResourceState2str.get(state)
343
344             if rstate >= state:
345                 guids.remove(guid)
346                 rm = self.get_resource(guid)
347                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
348                     rm.get_rtype(), guid, hrrstate, hrstate))
349             else:
350                 # Debug...
351                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
352                     guid, hrrstate, hrstate))
353                 time.sleep(0.5)
354   
355     def get_task(self, tid):
356         """ Returns a task by its id
357
358             :param tid: Id of the task
359             :type tid: int
360             
361             :rtype: Task
362             
363         """
364         return self._tasks.get(tid)
365
366     def get_resource(self, guid):
367         """ Returns a registered ResourceManager by its guid
368
369             :param guid: Id of the task
370             :type guid: int
371             
372             :rtype: ResourceManager
373             
374         """
375         rm = self._resources.get(guid)
376         return rm
377
378     def remove_resource(self, guid):
379         del self._resources[guid]
380
381     @property
382     def resources(self):
383         """ Returns the set() of guids of all the ResourceManager
384
385             :return: Set of all RM guids
386             :rtype: set
387
388         """
389         keys = self._resources.keys()
390
391         return keys
392
393     def register_resource(self, rtype, guid = None):
394         """ Registers a new ResourceManager of type 'rtype' in the experiment
395         
396         This method will assign a new 'guid' for the RM, if no guid
397         is specified.
398
399             :param rtype: Type of the RM
400             :type rtype: str
401
402             :return: Guid of the RM
403             :rtype: int
404             
405         """
406         # Get next available guid
407         guid = self._guid_generator.next(guid)
408         
409         # Instantiate RM
410         rm = ResourceFactory.create(rtype, self, guid)
411
412         # Store RM
413         self._resources[guid] = rm
414
415         return guid
416
417     def get_attributes(self, guid):
418         """ Returns all the attributes of the RM with guid 'guid'
419
420             :param guid: Guid of the RM
421             :type guid: int
422
423             :return: List of attributes
424             :rtype: list
425
426         """
427         rm = self.get_resource(guid)
428         return rm.get_attributes()
429
430     def get_attribute(self, guid, name):
431         """ Returns the attribute 'name' of the RM with guid 'guid'
432
433             :param guid: Guid of the RM
434             :type guid: int
435
436             :param name: Name of the attribute
437             :type name: str
438
439             :return: The attribute with name 'name'
440             :rtype: Attribute
441
442         """
443         rm = self.get_resource(guid)
444         return rm.get_attribute(name)
445
446     def register_connection(self, guid1, guid2):
447         """ Registers a connection between a RM with guid 'guid1'
448         and another RM with guid 'guid2'. 
449     
450         The order of the in which the two guids are provided is not
451         important, since the connection relationship is symmetric.
452
453             :param guid1: First guid to connect
454             :type guid1: ResourceManager
455
456             :param guid2: Second guid to connect
457             :type guid: ResourceManager
458
459         """
460         rm1 = self.get_resource(guid1)
461         rm2 = self.get_resource(guid2)
462
463         rm1.register_connection(guid2)
464         rm2.register_connection(guid1)
465
466     def register_condition(self, guids1, action, guids2, state,
467             time = None):
468         """ Registers an action START, STOP or DEPLOY for all RM on list
469         guids1 to occur at time 'time' after all elements in list guids2 
470         have reached state 'state'.
471
472             :param guids1: List of guids of RMs subjected to action
473             :type guids1: list
474
475             :param action: Action to perform (either START, STOP or DEPLOY)
476             :type action: ResourceAction
477
478             :param guids2: List of guids of RMs to we waited for
479             :type guids2: list
480
481             :param state: State to wait for on RMs of list guids2 (STARTED,
482                 STOPPED, etc)
483             :type state: ResourceState
484
485             :param time: Time to wait after guids2 has reached status 
486             :type time: string
487
488         """
489         if isinstance(guids1, int):
490             guids1 = [guids1]
491         if isinstance(guids2, int):
492             guids2 = [guids2]
493
494         for guid1 in guids1:
495             rm = self.get_resource(guid1)
496             rm.register_condition(action, guids2, state, time)
497
498     def enable_trace(self, guid, name):
499         """ Enables a trace to be collected during the experiment run
500
501             :param name: Name of the trace
502             :type name: str
503
504         """
505         rm = self.get_resource(guid)
506         rm.enable_trace(name)
507
508     def trace_enabled(self, guid, name):
509         """ Returns True if the trace of name 'name' is enabled
510
511             :param name: Name of the trace
512             :type name: str
513
514         """
515         rm = self.get_resource(guid)
516         return rm.trace_enabled(name)
517
518     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
519         """ Returns information on a collected trace, the trace stream or 
520         blocks (chunks) of the trace stream
521
522             :param name: Name of the trace
523             :type name: str
524
525             :param attr: Can be one of:
526                          - TraceAttr.ALL (complete trace content), 
527                          - TraceAttr.STREAM (block in bytes to read starting 
528                                 at offset),
529                          - TraceAttr.PATH (full path to the trace file),
530                          - TraceAttr.SIZE (size of trace file). 
531             :type attr: str
532
533             :param block: Number of bytes to retrieve from trace, when attr is 
534                 TraceAttr.STREAM 
535             :type name: int
536
537             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
538             :type name: int
539
540             :rtype: str
541
542         """
543         rm = self.get_resource(guid)
544         return rm.trace(name, attr, block, offset)
545
546     def get_traces(self, guid):
547         """ Returns the list of the trace names of the RM with guid 'guid'
548
549             :param guid: Guid of the RM
550             :type guid: int
551
552             :return: List of trace names
553             :rtype: list
554
555         """
556         rm = self.get_resource(guid)
557         return rm.get_traces()
558
559
560     def discover(self, guid):
561         """ Discovers an available resource matching the criteria defined
562         by the RM with guid 'guid', and associates that resource to the RM
563
564         Not all RM types require (or are capable of) performing resource 
565         discovery. For the RM types which are not capable of doing so, 
566         invoking this method does not have any consequences. 
567
568             :param guid: Guid of the RM
569             :type guid: int
570
571         """
572         rm = self.get_resource(guid)
573         return rm.discover()
574
575     def provision(self, guid):
576         """ Provisions the resource associated to the RM with guid 'guid'.
577
578         Provisioning means making a resource 'accessible' to the user. 
579         Not all RM types require (or are capable of) performing resource 
580         provisioning. For the RM types which are not capable of doing so, 
581         invoking this method does not have any consequences. 
582
583             :param guid: Guid of the RM
584             :type guid: int
585
586         """
587         rm = self.get_resource(guid)
588         return rm.provision()
589
590     def get(self, guid, name):
591         """ Returns the value of the attribute with name 'name' on the
592         RM with guid 'guid'
593
594             :param guid: Guid of the RM
595             :type guid: int
596
597             :param name: Name of the attribute 
598             :type name: str
599
600             :return: The value of the attribute with name 'name'
601
602         """
603         rm = self.get_resource(guid)
604         return rm.get(name)
605
606     def set(self, guid, name, value):
607         """ Modifies the value of the attribute with name 'name' on the 
608         RM with guid 'guid'.
609
610             :param guid: Guid of the RM
611             :type guid: int
612
613             :param name: Name of the attribute
614             :type name: str
615
616             :param value: Value of the attribute
617
618         """
619         rm = self.get_resource(guid)
620         rm.set(name, value)
621
622     def get_global(self, rtype, name):
623         """ Returns the value of the global attribute with name 'name' on the
624         RMs of rtype 'rtype'.
625
626             :param guid: Guid of the RM
627             :type guid: int
628
629             :param name: Name of the attribute 
630             :type name: str
631
632             :return: The value of the attribute with name 'name'
633
634         """
635         rclass = ResourceFactory.get_resource_type(rtype)
636         return rclass.get_global(name)
637
638     def set_global(self, rtype, name, value):
639         """ Modifies the value of the global attribute with name 'name' on the 
640         RMs of with rtype 'rtype'.
641
642             :param guid: Guid of the RM
643             :type guid: int
644
645             :param name: Name of the attribute
646             :type name: str
647
648             :param value: Value of the attribute
649
650         """
651         rclass = ResourceFactory.get_resource_type(rtype)
652         return rclass.set_global(name, value)
653
654     def state(self, guid, hr = False):
655         """ Returns the state of a resource
656
657             :param guid: Resource guid
658             :type guid: integer
659
660             :param hr: Human readable. Forces return of a 
661                 status string instead of a number 
662             :type hr: boolean
663
664         """
665         rm = self.get_resource(guid)
666         state = rm.state
667
668         if hr:
669             return ResourceState2str.get(state)
670
671         return state
672
673     def stop(self, guid):
674         """ Stops the RM with guid 'guid'
675
676         Stopping a RM means that the resource it controls will
677         no longer take part of the experiment.
678
679             :param guid: Guid of the RM
680             :type guid: int
681
682         """
683         rm = self.get_resource(guid)
684         return rm.stop()
685
686     def start(self, guid):
687         """ Starts the RM with guid 'guid'
688
689         Starting a RM means that the resource it controls will
690         begin taking part of the experiment.
691
692             :param guid: Guid of the RM
693             :type guid: int
694
695         """
696         rm = self.get_resource(guid)
697         return rm.start()
698
699     def get_start_time(self, guid):
700         """ Returns the start time of the RM as a timestamp """
701         rm = self.get_resource(guid)
702         return rm.start_time
703
704     def get_stop_time(self, guid):
705         """ Returns the stop time of the RM as a timestamp """
706         rm = self.get_resource(guid)
707         return rm.stop_time
708
709     def get_discover_time(self, guid):
710         """ Returns the discover time of the RM as a timestamp """
711         rm = self.get_resource(guid)
712         return rm.discover_time
713
714     def get_provision_time(self, guid):
715         """ Returns the provision time of the RM as a timestamp """
716         rm = self.get_resource(guid)
717         return rm.provision_time
718
719     def get_ready_time(self, guid):
720         """ Returns the deployment time of the RM as a timestamp """
721         rm = self.get_resource(guid)
722         return rm.ready_time
723
724     def get_release_time(self, guid):
725         """ Returns the release time of the RM as a timestamp """
726         rm = self.get_resource(guid)
727         return rm.release_time
728
729     def get_failed_time(self, guid):
730         """ Returns the time failure occured for the RM as a timestamp """
731         rm = self.get_resource(guid)
732         return rm.failed_time
733
734     def set_with_conditions(self, name, value, guids1, guids2, state,
735             time = None):
736         """ Modifies the value of attribute with name 'name' on all RMs 
737         on the guids1 list when time 'time' has elapsed since all 
738         elements in guids2 list have reached state 'state'.
739
740             :param name: Name of attribute to set in RM
741             :type name: string
742
743             :param value: Value of attribute to set in RM
744             :type name: string
745
746             :param guids1: List of guids of RMs subjected to action
747             :type guids1: list
748
749             :param action: Action to register (either START or STOP)
750             :type action: ResourceAction
751
752             :param guids2: List of guids of RMs to we waited for
753             :type guids2: list
754
755             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
756             :type state: ResourceState
757
758             :param time: Time to wait after guids2 has reached status 
759             :type time: string
760
761         """
762         if isinstance(guids1, int):
763             guids1 = [guids1]
764         if isinstance(guids2, int):
765             guids2 = [guids2]
766
767         for guid1 in guids1:
768             rm = self.get_resource(guid)
769             rm.set_with_conditions(name, value, guids2, state, time)
770
771     def deploy(self, guids = None, wait_all_ready = True, group = None):
772         """ Deploys all ResourceManagers in the guids list. 
773         
774         If the argument 'guids' is not given, all RMs with state NEW
775         are deployed.
776
777             :param guids: List of guids of RMs to deploy
778             :type guids: list
779
780             :param wait_all_ready: Wait until all RMs are ready in
781                 order to start the RMs
782             :type guid: int
783
784             :param group: Id of deployment group in which to deploy RMs
785             :type group: int
786
787         """
788         self.logger.debug(" ------- DEPLOY START ------ ")
789
790         if not guids:
791             # If no guids list was passed, all 'NEW' RMs will be deployed
792             guids = []
793             for guid in self.resources:
794                 if self.state(guid) == ResourceState.NEW:
795                     guids.append(guid)
796                 
797         if isinstance(guids, int):
798             guids = [guids]
799
800         # Create deployment group
801         # New guids can be added to a same deployment group later on
802         new_group = False
803         if not group:
804             new_group = True
805             group = self._group_id_generator.next()
806
807         if group not in self._groups:
808             self._groups[group] = []
809
810         self._groups[group].extend(guids)
811
812         def wait_all_and_start(group):
813             # Function that checks if all resources are READY
814             # before scheduling a start_with_conditions for each RM
815             reschedule = False
816             
817             # Get all guids in group
818             guids = self._groups[group]
819
820             for guid in guids:
821                 if self.state(guid) < ResourceState.READY:
822                     reschedule = True
823                     break
824
825             if reschedule:
826                 callback = functools.partial(wait_all_and_start, group)
827                 self.schedule("1s", callback)
828             else:
829                 # If all resources are ready, we schedule the start
830                 for guid in guids:
831                     rm = self.get_resource(guid)
832                     self.schedule("0s", rm.start_with_conditions)
833
834                     if rm.conditions.get(ResourceAction.STOP):
835                         # Only if the RM has STOP conditions we
836                         # schedule a stop. Otherwise the RM will stop immediately
837                         self.schedule("0s", rm.stop_with_conditions)
838
839         if wait_all_ready and new_group:
840             # Schedule a function to check that all resources are
841             # READY, and only then schedule the start.
842             # This aims at reducing the number of tasks looping in the 
843             # scheduler. 
844             # Instead of having many start tasks, we will have only one for 
845             # the whole group.
846             callback = functools.partial(wait_all_and_start, group)
847             self.schedule("0s", callback)
848
849         for guid in guids:
850             rm = self.get_resource(guid)
851             rm.deployment_group = group
852             self.schedule("0s", rm.deploy_with_conditions)
853
854             if not wait_all_ready:
855                 self.schedule("0s", rm.start_with_conditions)
856
857                 if rm.conditions.get(ResourceAction.STOP):
858                     # Only if the RM has STOP conditions we
859                     # schedule a stop. Otherwise the RM will stop immediately
860                     self.schedule("0s", rm.stop_with_conditions)
861
862     def release(self, guids = None):
863         """ Releases all ResourceManagers in the guids list.
864
865         If the argument 'guids' is not given, all RMs registered
866         in the experiment are released.
867
868             :param guids: List of RM guids
869             :type guids: list
870
871         """
872         if isinstance(guids, int):
873             guids = [guids]
874
875         if not guids:
876             guids = self.resources
877
878         for guid in guids:
879             rm = self.get_resource(guid)
880             self.schedule("0s", rm.release)
881
882         self.wait_released(guids)
883
884         for guid in guids:
885             if self.get(guid, "hardRelease"):
886                 self.remove_resource(guid)
887         
888     def shutdown(self):
889         """ Releases all resources and stops the ExperimentController
890
891         """
892         # If there was a major failure we can't exit gracefully
893         if self._state == ECState.FAILED:
894             raise RuntimeError("EC failure. Can not exit gracefully")
895
896         # Remove all pending tasks from the scheduler queue
897         for tid in list(self._scheduler.pending):
898             self._scheduler.remove(tid)
899
900         # Remove pending tasks from the workers queue
901         self._runner.empty()
902
903         self.release()
904
905         # Mark the EC state as TERMINATED
906         self._state = ECState.TERMINATED
907
908         # Stop processing thread
909         self._stop = True
910
911         # Notify condition to wake up the processing thread
912         self._notify()
913         
914         if self._thread.is_alive():
915            self._thread.join()
916
917     def schedule(self, date, callback, track = False):
918         """ Schedules a callback to be executed at time 'date'.
919
920             :param date: string containing execution time for the task.
921                     It can be expressed as an absolute time, using
922                     timestamp format, or as a relative time matching
923                     ^\d+.\d+(h|m|s|ms|us)$
924
925             :param callback: code to be executed for the task. Must be a
926                         Python function, and receives args and kwargs
927                         as arguments.
928
929             :param track: if set to True, the task will be retrievable with
930                     the get_task() method
931
932             :return : The Id of the task
933             :rtype: int
934             
935         """
936         timestamp = stabsformat(date)
937         task = Task(timestamp, callback)
938         task = self._scheduler.schedule(task)
939
940         if track:
941             self._tasks[task.id] = task
942
943         # Notify condition to wake up the processing thread
944         self._notify()
945
946         return task.id
947      
948     def _process(self):
949         """ Process scheduled tasks.
950
951         .. note::
952         
953         Tasks are scheduled by invoking the schedule method with a target 
954         callback and an execution time. 
955         The schedule method creates a new Task object with that callback 
956         and execution time, and pushes it into the '_scheduler' queue. 
957         The execution time and the order of arrival of tasks are used 
958         to order the tasks in the queue.
959
960         The _process method is executed in an independent thread held by 
961         the ExperimentController for as long as the experiment is running.
962         This method takes tasks from the '_scheduler' queue in a loop 
963         and processes them in parallel using multithreading. 
964         The environmental variable NEPI_NTHREADS can be used to control
965         the number of threads used to process tasks. The default value is 
966         50.
967
968         To execute tasks in parallel, a ParallelRunner (PR) object is used.
969         This object keeps a pool of threads (workers), and a queue of tasks
970         scheduled for 'immediate' execution. 
971         
972         On each iteration, the '_process' loop will take the next task that 
973         is scheduled for 'future' execution from the '_scheduler' queue, 
974         and if the execution time of that task is >= to the current time, 
975         it will push that task into the PR for 'immediate execution'. 
976         As soon as a worker is free, the PR will assign the next task to
977         that worker.
978
979         Upon receiving a task to execute, each PR worker (thread) will 
980         invoke the  _execute method of the EC, passing the task as 
981         argument.         
982         The _execute method will then invoke task.callback inside a 
983         try/except block. If an exception is raised by the tasks.callback, 
984         it will be trapped by the try block, logged to standard error 
985         (usually the console), and the task will be marked as failed.
986
987         """
988
989         self._runner.start()
990
991         while not self._stop:
992             try:
993                 self._cond.acquire()
994
995                 task = self._scheduler.next()
996                 
997                 if not task:
998                     # No task to execute. Wait for a new task to be scheduled.
999                     self._cond.wait()
1000                 else:
1001                     # The task timestamp is in the future. Wait for timeout 
1002                     # or until another task is scheduled.
1003                     now = tnow()
1004                     if now < task.timestamp:
1005                         # Calculate timeout in seconds
1006                         timeout = tdiffsec(task.timestamp, now)
1007
1008                         # Re-schedule task with the same timestamp
1009                         self._scheduler.schedule(task)
1010                         
1011                         task = None
1012
1013                         # Wait timeout or until a new task awakes the condition
1014                         self._cond.wait(timeout)
1015                
1016                 self._cond.release()
1017
1018                 if task:
1019                     # Process tasks in parallel
1020                     self._runner.put(self._execute, task)
1021             except: 
1022                 import traceback
1023                 err = traceback.format_exc()
1024                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1025
1026                 # Set the EC to FAILED state 
1027                 self._state = ECState.FAILED
1028             
1029                 # Set the FailureManager failure level to EC failure
1030                 self._fm.set_ec_failure()
1031
1032         self.logger.debug("Exiting the task processing loop ... ")
1033         
1034         self._runner.sync()
1035         self._runner.destroy()
1036
1037     def _execute(self, task):
1038         """ Executes a single task. 
1039
1040             :param task: Object containing the callback to execute
1041             :type task: Task
1042
1043         """
1044         try:
1045             # Invoke callback
1046             task.result = task.callback()
1047             task.status = TaskStatus.DONE
1048         except:
1049             import traceback
1050             err = traceback.format_exc()
1051             task.result = err
1052             task.status = TaskStatus.ERROR
1053             
1054             self.logger.error("Error occurred while executing task: %s" % err)
1055
1056     def _notify(self):
1057         """ Awakes the processing thread if it is blocked waiting 
1058         for new tasks to arrive
1059         
1060         """
1061         self._cond.acquire()
1062         self._cond.notify()
1063         self._cond.release()
1064