5584c681b28143c06c6ab76b0ebe421b0cac7035
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54
55     @property
56     def ec(self):
57         """ Returns the ExperimentController associated to this FailureManager 
58         
59         """
60         
61         return self._ec()
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.OK:
66             for guid in self.ec.resources:
67                 try:
68                     state = self.ec.state(guid)
69                     critical = self.ec.get(guid, "critical")
70                     if state == ResourceState.FAILED and critical:
71                         self._failure_level = FailureLevel.RM_FAILURE
72                         self.ec.logger.debug("RM critical failure occurred on guid %d." \
73                                 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
74                         break
75                 except:
76                     # An error might occure because a RM was deleted abruptly.
77                     # In this case the error should be ignored.
78                     if guid in self.ec._resources:
79                         raise
80
81         return self._failure_level != FailureLevel.OK
82
83     def set_ec_failure(self):
84         self._failure_level = FailureLevel.EC_FAILURE
85
86 class ECState(object):
87     """ Possible states for an ExperimentController
88    
89     """
90     RUNNING = 1
91     FAILED = 2
92     TERMINATED = 3
93
94 class ExperimentController(object):
95     """
96     .. class:: Class Args :
97       
98         :param exp_id: Human readable identifier for the experiment scenario. 
99         :type exp_id: str
100
101     .. note::
102
103     An experiment, or scenario, is defined by a concrete set of resources,
104     behavior, configuration and interconnection of those resources. 
105     The Experiment Description (ED) is a detailed representation of a
106     single experiment. It contains all the necessary information to 
107     allow repeating the experiment. NEPI allows to describe
108     experiments by registering components (resources), configuring them
109     and interconnecting them.
110     
111     A same experiment (scenario) can be executed many times, generating 
112     different results. We call an experiment execution (instance) a 'run'.
113
114     The ExperimentController (EC), is the entity responsible of
115     managing an experiment run. The same scenario can be 
116     recreated (and re-run) by instantiating an EC and recreating 
117     the same experiment description. 
118
119     In NEPI, an experiment is represented as a graph of interconnected
120     resources. A resource is a generic concept in the sense that any
121     component taking part of an experiment, whether physical of
122     virtual, is considered a resource. A resources could be a host, 
123     a virtual machine, an application, a simulator, a IP address.
124
125     A ResourceManager (RM), is the entity responsible for managing a 
126     single resource. ResourceManagers are specific to a resource
127     type (i.e. An RM to control a Linux application will not be
128     the same as the RM used to control a ns-3 simulation).
129     To support a new type of resource in NEPI, a new RM must be 
130     implemented. NEPI already provides a variety of
131     RMs to control basic resources, and new can be extended from
132     the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154         
155     """
156
157     def __init__(self, exp_id = None): 
158         super(ExperimentController, self).__init__()
159
160         # Logging
161         self._logger = logging.getLogger("ExperimentController")
162
163         # Run identifier. It identifies a concrete execution instance (run) 
164         # of an experiment.
165         # Since a same experiment (same configuration) can be executed many 
166         # times, this run_id permits to separate result files generated on 
167         # different experiment executions
168         self._run_id = tsformat()
169
170         # Experiment identifier. Usually assigned by the user
171         # Identifies the experiment scenario (i.e. configuration, 
172         # resources used, etc)
173         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
174
175         # generator of globally unique ids
176         self._guid_generator = guid.GuidGenerator()
177         
178         # Resource managers
179         self._resources = dict()
180
181         # Scheduler. It a queue that holds tasks scheduled for
182         # execution, and yields the next task to be executed 
183         # ordered by execution and arrival time
184         self._scheduler = HeapScheduler()
185
186         # Tasks
187         self._tasks = dict()
188
189         # RM groups (for deployment) 
190         self._groups = dict()
191
192         # generator of globally unique id for groups
193         self._group_id_generator = guid.GuidGenerator()
194
195         # Flag to stop processing thread
196         self._stop = False
197     
198         # Entity in charge of managing system failures
199         self._fm = FailureManager(self)
200
201         # EC state
202         self._state = ECState.RUNNING
203
204         # Blacklist file for PL nodes
205         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
206         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
207         if not os.path.exists(plblacklist_file):
208             if os.path.isdir(nepi_home):
209                 open(plblacklist_file, 'w').close()
210             else:
211                 os.makedirs(nepi_home)
212                 open(plblacklist_file, 'w').close()
213                     
214         # The runner is a pool of threads used to parallelize 
215         # execution of tasks
216         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
217         self._runner = ParallelRun(maxthreads = nthreads)
218
219         # Event processing thread
220         self._cond = threading.Condition()
221         self._thread = threading.Thread(target = self._process)
222         self._thread.setDaemon(True)
223         self._thread.start()
224         
225     @property
226     def logger(self):
227         """ Returns the logger instance of the Experiment Controller
228
229         """
230         return self._logger
231
232     @property
233     def ecstate(self):
234         """ Returns the state of the Experiment Controller
235
236         """
237         return self._state
238
239     @property
240     def exp_id(self):
241         """ Returns the experiment id assigned by the user
242
243         """
244         return self._exp_id
245
246     @property
247     def run_id(self):
248         """ Returns the experiment instance (run) identifier (automatically 
249         generated)
250
251         """
252         return self._run_id
253
254     @property
255     def abort(self):
256         """ Returns True if the experiment has failed and should be interrupted,
257         False otherwise.
258
259         """
260         return self._fm.abort
261
262     def wait_finished(self, guids):
263         """ Blocking method that waits until all RMs in the 'guids' list 
264         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
265         RELEASED ), or until a failure in the experiment occurs 
266         (i.e. abort == True) 
267         
268             :param guids: List of guids
269             :type guids: list
270
271         """
272
273         def quit():
274             return self.abort
275
276         return self.wait(guids, state = ResourceState.STOPPED, 
277                 quit = quit)
278
279     def wait_started(self, guids):
280         """ Blocking method that waits until all RMs in the 'guids' list 
281         have reached a state >= STARTED, or until a failure in the 
282         experiment occurs (i.e. abort == True) 
283         
284             :param guids: List of guids
285             :type guids: list
286
287         """
288
289         def quit():
290             return self.abort
291
292         return self.wait(guids, state = ResourceState.STARTED, 
293                 quit = quit)
294
295     def wait_released(self, guids):
296         """ Blocking method that waits until all RMs in the 'guids' list 
297         have reached a state == RELEASED, or until the EC fails 
298         
299             :param guids: List of guids
300             :type guids: list
301
302         """
303
304         def quit():
305             return self._state == ECState.FAILED
306
307         return self.wait(guids, state = ResourceState.RELEASED, 
308                 quit = quit)
309
310     def wait_deployed(self, guids):
311         """ Blocking method that waits until all RMs in the 'guids' list 
312         have reached a state >= READY, or until a failure in the 
313         experiment occurs (i.e. abort == True) 
314         
315             :param guids: List of guids
316             :type guids: list
317
318         """
319
320         def quit():
321             return self.abort
322
323         return self.wait(guids, state = ResourceState.READY, 
324                 quit = quit)
325
326     def wait(self, guids, state, quit):
327         """ Blocking method that waits until all RMs in the 'guids' list 
328         have reached a state >= 'state', or until the 'quit' callback
329         yields True
330            
331             :param guids: List of guids
332             :type guids: list
333         
334         """
335         
336         if isinstance(guids, int):
337             guids = [guids]
338
339         # Make a copy to avoid modifying the original guids list
340         guids = list(guids)
341
342         while True:
343             # If there are no more guids to wait for
344             # or the quit function returns True, exit the loop
345             if len(guids) == 0 or quit():
346                 break
347
348             # If a guid reached one of the target states, remove it from list
349             guid = guids[0]
350             rstate = self.state(guid)
351             
352             hrrstate = ResourceState2str.get(rstate)
353             hrstate = ResourceState2str.get(state)
354
355             if rstate >= state:
356                 guids.remove(guid)
357                 rm = self.get_resource(guid)
358                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
359                     rm.get_rtype(), guid, hrrstate, hrstate))
360             else:
361                 # Debug...
362                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
363                     guid, hrrstate, hrstate))
364                 time.sleep(0.5)
365   
366     def get_task(self, tid):
367         """ Returns a task by its id
368
369             :param tid: Id of the task
370             :type tid: int
371             
372             :rtype: Task
373             
374         """
375         return self._tasks.get(tid)
376
377     def get_resource(self, guid):
378         """ Returns a registered ResourceManager by its guid
379
380             :param guid: Id of the task
381             :type guid: int
382             
383             :rtype: ResourceManager
384             
385         """
386         rm = self._resources.get(guid)
387         return rm
388
389     def remove_resource(self, guid):
390         del self._resources[guid]
391
392     @property
393     def resources(self):
394         """ Returns the set() of guids of all the ResourceManager
395
396             :return: Set of all RM guids
397             :rtype: set
398
399         """
400         keys = self._resources.keys()
401
402         return keys
403
404     def register_resource(self, rtype, guid = None):
405         """ Registers a new ResourceManager of type 'rtype' in the experiment
406         
407         This method will assign a new 'guid' for the RM, if no guid
408         is specified.
409
410             :param rtype: Type of the RM
411             :type rtype: str
412
413             :return: Guid of the RM
414             :rtype: int
415             
416         """
417         # Get next available guid
418         guid = self._guid_generator.next(guid)
419         
420         # Instantiate RM
421         rm = ResourceFactory.create(rtype, self, guid)
422
423         # Store RM
424         self._resources[guid] = rm
425
426         return guid
427
428     def get_attributes(self, guid):
429         """ Returns all the attributes of the RM with guid 'guid'
430
431             :param guid: Guid of the RM
432             :type guid: int
433
434             :return: List of attributes
435             :rtype: list
436
437         """
438         rm = self.get_resource(guid)
439         return rm.get_attributes()
440
441     def get_attribute(self, guid, name):
442         """ Returns the attribute 'name' of the RM with guid 'guid'
443
444             :param guid: Guid of the RM
445             :type guid: int
446
447             :param name: Name of the attribute
448             :type name: str
449
450             :return: The attribute with name 'name'
451             :rtype: Attribute
452
453         """
454         rm = self.get_resource(guid)
455         return rm.get_attribute(name)
456
457     def register_connection(self, guid1, guid2):
458         """ Registers a connection between a RM with guid 'guid1'
459         and another RM with guid 'guid2'. 
460     
461         The order of the in which the two guids are provided is not
462         important, since the connection relationship is symmetric.
463
464             :param guid1: First guid to connect
465             :type guid1: ResourceManager
466
467             :param guid2: Second guid to connect
468             :type guid: ResourceManager
469
470         """
471         rm1 = self.get_resource(guid1)
472         rm2 = self.get_resource(guid2)
473
474         rm1.register_connection(guid2)
475         rm2.register_connection(guid1)
476
477     def register_condition(self, guids1, action, guids2, state,
478             time = None):
479         """ Registers an action START, STOP or DEPLOY for all RM on list
480         guids1 to occur at time 'time' after all elements in list guids2 
481         have reached state 'state'.
482
483             :param guids1: List of guids of RMs subjected to action
484             :type guids1: list
485
486             :param action: Action to perform (either START, STOP or DEPLOY)
487             :type action: ResourceAction
488
489             :param guids2: List of guids of RMs to we waited for
490             :type guids2: list
491
492             :param state: State to wait for on RMs of list guids2 (STARTED,
493                 STOPPED, etc)
494             :type state: ResourceState
495
496             :param time: Time to wait after guids2 has reached status 
497             :type time: string
498
499         """
500         if isinstance(guids1, int):
501             guids1 = [guids1]
502         if isinstance(guids2, int):
503             guids2 = [guids2]
504
505         for guid1 in guids1:
506             rm = self.get_resource(guid1)
507             rm.register_condition(action, guids2, state, time)
508
509     def enable_trace(self, guid, name):
510         """ Enables a trace to be collected during the experiment run
511
512             :param name: Name of the trace
513             :type name: str
514
515         """
516         rm = self.get_resource(guid)
517         rm.enable_trace(name)
518
519     def trace_enabled(self, guid, name):
520         """ Returns True if the trace of name 'name' is enabled
521
522             :param name: Name of the trace
523             :type name: str
524
525         """
526         rm = self.get_resource(guid)
527         return rm.trace_enabled(name)
528
529     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
530         """ Returns information on a collected trace, the trace stream or 
531         blocks (chunks) of the trace stream
532
533             :param name: Name of the trace
534             :type name: str
535
536             :param attr: Can be one of:
537                          - TraceAttr.ALL (complete trace content), 
538                          - TraceAttr.STREAM (block in bytes to read starting 
539                                 at offset),
540                          - TraceAttr.PATH (full path to the trace file),
541                          - TraceAttr.SIZE (size of trace file). 
542             :type attr: str
543
544             :param block: Number of bytes to retrieve from trace, when attr is 
545                 TraceAttr.STREAM 
546             :type name: int
547
548             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
549             :type name: int
550
551             :rtype: str
552
553         """
554         rm = self.get_resource(guid)
555         return rm.trace(name, attr, block, offset)
556
557     def get_traces(self, guid):
558         """ Returns the list of the trace names of the RM with guid 'guid'
559
560             :param guid: Guid of the RM
561             :type guid: int
562
563             :return: List of trace names
564             :rtype: list
565
566         """
567         rm = self.get_resource(guid)
568         return rm.get_traces()
569
570
571     def discover(self, guid):
572         """ Discovers an available resource matching the criteria defined
573         by the RM with guid 'guid', and associates that resource to the RM
574
575         Not all RM types require (or are capable of) performing resource 
576         discovery. For the RM types which are not capable of doing so, 
577         invoking this method does not have any consequences. 
578
579             :param guid: Guid of the RM
580             :type guid: int
581
582         """
583         rm = self.get_resource(guid)
584         return rm.discover()
585
586     def provision(self, guid):
587         """ Provisions the resource associated to the RM with guid 'guid'.
588
589         Provisioning means making a resource 'accessible' to the user. 
590         Not all RM types require (or are capable of) performing resource 
591         provisioning. For the RM types which are not capable of doing so, 
592         invoking this method does not have any consequences. 
593
594             :param guid: Guid of the RM
595             :type guid: int
596
597         """
598         rm = self.get_resource(guid)
599         return rm.provision()
600
601     def get(self, guid, name):
602         """ Returns the value of the attribute with name 'name' on the
603         RM with guid 'guid'
604
605             :param guid: Guid of the RM
606             :type guid: int
607
608             :param name: Name of the attribute 
609             :type name: str
610
611             :return: The value of the attribute with name 'name'
612
613         """
614         rm = self.get_resource(guid)
615         return rm.get(name)
616
617     def set(self, guid, name, value):
618         """ Modifies the value of the attribute with name 'name' on the 
619         RM with guid 'guid'.
620
621             :param guid: Guid of the RM
622             :type guid: int
623
624             :param name: Name of the attribute
625             :type name: str
626
627             :param value: Value of the attribute
628
629         """
630         rm = self.get_resource(guid)
631         rm.set(name, value)
632
633     def get_global(self, rtype, name):
634         """ Returns the value of the global attribute with name 'name' on the
635         RMs of rtype 'rtype'.
636
637             :param guid: Guid of the RM
638             :type guid: int
639
640             :param name: Name of the attribute 
641             :type name: str
642
643             :return: The value of the attribute with name 'name'
644
645         """
646         rclass = ResourceFactory.get_resource_type(rtype)
647         return rclass.get_global(name)
648
649     def set_global(self, rtype, name, value):
650         """ Modifies the value of the global attribute with name 'name' on the 
651         RMs of with rtype 'rtype'.
652
653             :param guid: Guid of the RM
654             :type guid: int
655
656             :param name: Name of the attribute
657             :type name: str
658
659             :param value: Value of the attribute
660
661         """
662         rclass = ResourceFactory.get_resource_type(rtype)
663         return rclass.set_global(name, value)
664
665     def state(self, guid, hr = False):
666         """ Returns the state of a resource
667
668             :param guid: Resource guid
669             :type guid: integer
670
671             :param hr: Human readable. Forces return of a 
672                 status string instead of a number 
673             :type hr: boolean
674
675         """
676         rm = self.get_resource(guid)
677         state = rm.state
678
679         if hr:
680             return ResourceState2str.get(state)
681
682         return state
683
684     def stop(self, guid):
685         """ Stops the RM with guid 'guid'
686
687         Stopping a RM means that the resource it controls will
688         no longer take part of the experiment.
689
690             :param guid: Guid of the RM
691             :type guid: int
692
693         """
694         rm = self.get_resource(guid)
695         return rm.stop()
696
697     def start(self, guid):
698         """ Starts the RM with guid 'guid'
699
700         Starting a RM means that the resource it controls will
701         begin taking part of the experiment.
702
703             :param guid: Guid of the RM
704             :type guid: int
705
706         """
707         rm = self.get_resource(guid)
708         return rm.start()
709
710     def get_start_time(self, guid):
711         """ Returns the start time of the RM as a timestamp """
712         rm = self.get_resource(guid)
713         return rm.start_time
714
715     def get_stop_time(self, guid):
716         """ Returns the stop time of the RM as a timestamp """
717         rm = self.get_resource(guid)
718         return rm.stop_time
719
720     def get_discover_time(self, guid):
721         """ Returns the discover time of the RM as a timestamp """
722         rm = self.get_resource(guid)
723         return rm.discover_time
724
725     def get_provision_time(self, guid):
726         """ Returns the provision time of the RM as a timestamp """
727         rm = self.get_resource(guid)
728         return rm.provision_time
729
730     def get_ready_time(self, guid):
731         """ Returns the deployment time of the RM as a timestamp """
732         rm = self.get_resource(guid)
733         return rm.ready_time
734
735     def get_release_time(self, guid):
736         """ Returns the release time of the RM as a timestamp """
737         rm = self.get_resource(guid)
738         return rm.release_time
739
740     def get_failed_time(self, guid):
741         """ Returns the time failure occured for the RM as a timestamp """
742         rm = self.get_resource(guid)
743         return rm.failed_time
744
745     def set_with_conditions(self, name, value, guids1, guids2, state,
746             time = None):
747         """ Modifies the value of attribute with name 'name' on all RMs 
748         on the guids1 list when time 'time' has elapsed since all 
749         elements in guids2 list have reached state 'state'.
750
751             :param name: Name of attribute to set in RM
752             :type name: string
753
754             :param value: Value of attribute to set in RM
755             :type name: string
756
757             :param guids1: List of guids of RMs subjected to action
758             :type guids1: list
759
760             :param action: Action to register (either START or STOP)
761             :type action: ResourceAction
762
763             :param guids2: List of guids of RMs to we waited for
764             :type guids2: list
765
766             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
767             :type state: ResourceState
768
769             :param time: Time to wait after guids2 has reached status 
770             :type time: string
771
772         """
773         if isinstance(guids1, int):
774             guids1 = [guids1]
775         if isinstance(guids2, int):
776             guids2 = [guids2]
777
778         for guid1 in guids1:
779             rm = self.get_resource(guid)
780             rm.set_with_conditions(name, value, guids2, state, time)
781
782     def deploy(self, guids = None, wait_all_ready = True, group = None):
783         """ Deploys all ResourceManagers in the guids list. 
784         
785         If the argument 'guids' is not given, all RMs with state NEW
786         are deployed.
787
788             :param guids: List of guids of RMs to deploy
789             :type guids: list
790
791             :param wait_all_ready: Wait until all RMs are ready in
792                 order to start the RMs
793             :type guid: int
794
795             :param group: Id of deployment group in which to deploy RMs
796             :type group: int
797
798         """
799         self.logger.debug(" ------- DEPLOY START ------ ")
800
801         if not guids:
802             # If no guids list was passed, all 'NEW' RMs will be deployed
803             guids = []
804             for guid in self.resources:
805                 if self.state(guid) == ResourceState.NEW:
806                     guids.append(guid)
807                 
808         if isinstance(guids, int):
809             guids = [guids]
810
811         # Create deployment group
812         # New guids can be added to a same deployment group later on
813         new_group = False
814         if not group:
815             new_group = True
816             group = self._group_id_generator.next()
817
818         if group not in self._groups:
819             self._groups[group] = []
820
821         self._groups[group].extend(guids)
822
823         def wait_all_and_start(group):
824             # Function that checks if all resources are READY
825             # before scheduling a start_with_conditions for each RM
826             reschedule = False
827             
828             # Get all guids in group
829             guids = self._groups[group]
830
831             for guid in guids:
832                 if self.state(guid) < ResourceState.READY:
833                     reschedule = True
834                     break
835
836             if reschedule:
837                 callback = functools.partial(wait_all_and_start, group)
838                 self.schedule("1s", callback)
839             else:
840                 # If all resources are ready, we schedule the start
841                 for guid in guids:
842                     rm = self.get_resource(guid)
843                     self.schedule("0s", rm.start_with_conditions)
844
845                     if rm.conditions.get(ResourceAction.STOP):
846                         # Only if the RM has STOP conditions we
847                         # schedule a stop. Otherwise the RM will stop immediately
848                         self.schedule("0s", rm.stop_with_conditions)
849
850         if wait_all_ready and new_group:
851             # Schedule a function to check that all resources are
852             # READY, and only then schedule the start.
853             # This aims at reducing the number of tasks looping in the 
854             # scheduler. 
855             # Instead of having many start tasks, we will have only one for 
856             # the whole group.
857             callback = functools.partial(wait_all_and_start, group)
858             self.schedule("0s", callback)
859
860         for guid in guids:
861             rm = self.get_resource(guid)
862             rm.deployment_group = group
863             self.schedule("0s", rm.deploy_with_conditions)
864
865             if not wait_all_ready:
866                 self.schedule("0s", rm.start_with_conditions)
867
868                 if rm.conditions.get(ResourceAction.STOP):
869                     # Only if the RM has STOP conditions we
870                     # schedule a stop. Otherwise the RM will stop immediately
871                     self.schedule("0s", rm.stop_with_conditions)
872
873     def release(self, guids = None):
874         """ Releases all ResourceManagers in the guids list.
875
876         If the argument 'guids' is not given, all RMs registered
877         in the experiment are released.
878
879             :param guids: List of RM guids
880             :type guids: list
881
882         """
883         if isinstance(guids, int):
884             guids = [guids]
885
886         if not guids:
887             guids = self.resources
888
889         for guid in guids:
890             rm = self.get_resource(guid)
891             self.schedule("0s", rm.release)
892
893         self.wait_released(guids)
894
895         for guid in guids:
896             if self.get(guid, "hardRelease"):
897                 self.remove_resource(guid)
898         
899     def shutdown(self):
900         """ Releases all resources and stops the ExperimentController
901
902         """
903         # If there was a major failure we can't exit gracefully
904         if self._state == ECState.FAILED:
905             raise RuntimeError("EC failure. Can not exit gracefully")
906
907         # Remove all pending tasks from the scheduler queue
908         for tid in list(self._scheduler.pending):
909             self._scheduler.remove(tid)
910
911         # Remove pending tasks from the workers queue
912         self._runner.empty()
913
914         self.release()
915
916         # Mark the EC state as TERMINATED
917         self._state = ECState.TERMINATED
918
919         # Stop processing thread
920         self._stop = True
921
922         # Notify condition to wake up the processing thread
923         self._notify()
924         
925         if self._thread.is_alive():
926            self._thread.join()
927
928     def schedule(self, date, callback, track = False):
929         """ Schedules a callback to be executed at time 'date'.
930
931             :param date: string containing execution time for the task.
932                     It can be expressed as an absolute time, using
933                     timestamp format, or as a relative time matching
934                     ^\d+.\d+(h|m|s|ms|us)$
935
936             :param callback: code to be executed for the task. Must be a
937                         Python function, and receives args and kwargs
938                         as arguments.
939
940             :param track: if set to True, the task will be retrievable with
941                     the get_task() method
942
943             :return : The Id of the task
944             :rtype: int
945             
946         """
947         timestamp = stabsformat(date)
948         task = Task(timestamp, callback)
949         task = self._scheduler.schedule(task)
950
951         if track:
952             self._tasks[task.id] = task
953
954         # Notify condition to wake up the processing thread
955         self._notify()
956
957         return task.id
958      
959     def _process(self):
960         """ Process scheduled tasks.
961
962         .. note::
963         
964         Tasks are scheduled by invoking the schedule method with a target 
965         callback and an execution time. 
966         The schedule method creates a new Task object with that callback 
967         and execution time, and pushes it into the '_scheduler' queue. 
968         The execution time and the order of arrival of tasks are used 
969         to order the tasks in the queue.
970
971         The _process method is executed in an independent thread held by 
972         the ExperimentController for as long as the experiment is running.
973         This method takes tasks from the '_scheduler' queue in a loop 
974         and processes them in parallel using multithreading. 
975         The environmental variable NEPI_NTHREADS can be used to control
976         the number of threads used to process tasks. The default value is 
977         50.
978
979         To execute tasks in parallel, a ParallelRunner (PR) object is used.
980         This object keeps a pool of threads (workers), and a queue of tasks
981         scheduled for 'immediate' execution. 
982         
983         On each iteration, the '_process' loop will take the next task that 
984         is scheduled for 'future' execution from the '_scheduler' queue, 
985         and if the execution time of that task is >= to the current time, 
986         it will push that task into the PR for 'immediate execution'. 
987         As soon as a worker is free, the PR will assign the next task to
988         that worker.
989
990         Upon receiving a task to execute, each PR worker (thread) will 
991         invoke the  _execute method of the EC, passing the task as 
992         argument.         
993         The _execute method will then invoke task.callback inside a 
994         try/except block. If an exception is raised by the tasks.callback, 
995         it will be trapped by the try block, logged to standard error 
996         (usually the console), and the task will be marked as failed.
997
998         """
999
1000         self._runner.start()
1001
1002         while not self._stop:
1003             try:
1004                 self._cond.acquire()
1005
1006                 task = self._scheduler.next()
1007                 
1008                 if not task:
1009                     # No task to execute. Wait for a new task to be scheduled.
1010                     self._cond.wait()
1011                 else:
1012                     # The task timestamp is in the future. Wait for timeout 
1013                     # or until another task is scheduled.
1014                     now = tnow()
1015                     if now < task.timestamp:
1016                         # Calculate timeout in seconds
1017                         timeout = tdiffsec(task.timestamp, now)
1018
1019                         # Re-schedule task with the same timestamp
1020                         self._scheduler.schedule(task)
1021                         
1022                         task = None
1023
1024                         # Wait timeout or until a new task awakes the condition
1025                         self._cond.wait(timeout)
1026                
1027                 self._cond.release()
1028
1029                 if task:
1030                     # Process tasks in parallel
1031                     self._runner.put(self._execute, task)
1032             except: 
1033                 import traceback
1034                 err = traceback.format_exc()
1035                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1036
1037                 # Set the EC to FAILED state 
1038                 self._state = ECState.FAILED
1039             
1040                 # Set the FailureManager failure level to EC failure
1041                 self._fm.set_ec_failure()
1042
1043         self.logger.debug("Exiting the task processing loop ... ")
1044         
1045         self._runner.sync()
1046         self._runner.destroy()
1047
1048     def _execute(self, task):
1049         """ Executes a single task. 
1050
1051             :param task: Object containing the callback to execute
1052             :type task: Task
1053
1054         """
1055         try:
1056             # Invoke callback
1057             task.result = task.callback()
1058             task.status = TaskStatus.DONE
1059         except:
1060             import traceback
1061             err = traceback.format_exc()
1062             task.result = err
1063             task.status = TaskStatus.ERROR
1064             
1065             self.logger.error("Error occurred while executing task: %s" % err)
1066
1067     def _notify(self):
1068         """ Awakes the processing thread if it is blocked waiting 
1069         for new tasks to arrive
1070         
1071         """
1072         self._cond.acquire()
1073         self._cond.notify()
1074         self._cond.release()
1075