1357e4f7575525a59c5387ce2c40a6243f817957
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
32
33 import functools
34 import logging
35 import os
36 import sys
37 import tempfile
38 import time
39 import threading
40 import weakref
41
42 class FailureLevel(object):
43     """ Describes the system failure state """
44     OK = 1
45     RM_FAILURE = 2
46     EC_FAILURE = 3
47
48 class FailureManager(object):
49     """ The FailureManager is responsible for handling errors
50     and deciding whether an experiment should be aborted or not
51
52     """
53
54     def __init__(self, ec):
55         self._ec = weakref.ref(ec)
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     @property
60     def ec(self):
61         """ Returns the ExperimentController associated to this FailureManager 
62         
63         """
64         
65         return self._ec()
66
67     @property
68     def abort(self):
69         return self._abort
70
71     def eval_failure(self, guid):
72         if self._failure_level == FailureLevel.OK:
73             rm = self.ec.get_resource(guid)
74             state = rm.state
75             critical = rm.get("critical")
76
77             if state == ResourceState.FAILED and critical:
78                 self._failure_level = FailureLevel.RM_FAILURE
79                 self._abort = True
80                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
81                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
82
83     def set_ec_failure(self):
84         self._failure_level = FailureLevel.EC_FAILURE
85
86 class ECState(object):
87     """ Possible states for an ExperimentController
88    
89     """
90     RUNNING = 1
91     FAILED = 2
92     TERMINATED = 3
93
94 class ExperimentController(object):
95     """
96     .. class:: Class Args :
97       
98         :param exp_id: Human readable identifier for the experiment scenario. 
99         :type exp_id: str
100
101     .. note::
102
103     An experiment, or scenario, is defined by a concrete set of resources,
104     behavior, configuration and interconnection of those resources. 
105     The Experiment Description (ED) is a detailed representation of a
106     single experiment. It contains all the necessary information to 
107     allow repeating the experiment. NEPI allows to describe
108     experiments by registering components (resources), configuring them
109     and interconnecting them.
110     
111     A same experiment (scenario) can be executed many times, generating 
112     different results. We call an experiment execution (instance) a 'run'.
113
114     The ExperimentController (EC), is the entity responsible of
115     managing an experiment run. The same scenario can be 
116     recreated (and re-run) by instantiating an EC and recreating 
117     the same experiment description. 
118
119     In NEPI, an experiment is represented as a graph of interconnected
120     resources. A resource is a generic concept in the sense that any
121     component taking part of an experiment, whether physical of
122     virtual, is considered a resource. A resources could be a host, 
123     a virtual machine, an application, a simulator, a IP address.
124
125     A ResourceManager (RM), is the entity responsible for managing a 
126     single resource. ResourceManagers are specific to a resource
127     type (i.e. An RM to control a Linux application will not be
128     the same as the RM used to control a ns-3 simulation).
129     To support a new type of resource in NEPI, a new RM must be 
130     implemented. NEPI already provides a variety of
131     RMs to control basic resources, and new can be extended from
132     the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154    
155     """
156
157     @classmethod
158     def load(cls, filepath, format = SFormats.XML):
159         serializer = ECSerializer()
160         ec = serializer.load(filepath)
161         return ec
162
163     def __init__(self, exp_id = None, local_dir = None, persist = False):
164         """ ExperimentController entity to model an execute a network experiment.
165         
166         :param exp_id: Human readable name to identify the experiment
167         :type name: str
168
169         :param local_dir: Path to local directory where to store experiment
170             related files
171         :type name: str
172
173         :param persist: Save an XML description of the experiment after 
174         completion at local_dir
175         :type name: bool
176
177         """
178
179         super(ExperimentController, self).__init__()
180
181         # Logging
182         self._logger = logging.getLogger("ExperimentController")
183
184         # Run identifier. It identifies a concrete execution instance (run) 
185         # of an experiment.
186         # Since a same experiment (same configuration) can be executed many 
187         # times, this run_id permits to separate result files generated on 
188         # different experiment executions
189         self._run_id = tsformat()
190
191         # Experiment identifier. Usually assigned by the user
192         # Identifies the experiment scenario (i.e. configuration, 
193         # resources used, etc)
194         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
195
196         # Local path where to store experiment related files (results, etc)
197         if not local_dir:
198             local_dir = tempfile.mkdtemp()
199
200         self._local_dir = local_dir
201         self._exp_dir = os.path.join(local_dir, self.exp_id)
202         self._run_dir = os.path.join(self.exp_dir, self.run_id)
203
204         # If True persist the experiment controller in XML format, after completion
205         self._persist = persist
206
207         # generator of globally unique ids
208         self._guid_generator = guid.GuidGenerator()
209         
210         # Resource managers
211         self._resources = dict()
212
213         # Scheduler. It a queue that holds tasks scheduled for
214         # execution, and yields the next task to be executed 
215         # ordered by execution and arrival time
216         self._scheduler = HeapScheduler()
217
218         # Tasks
219         self._tasks = dict()
220
221         # RM groups (for deployment) 
222         self._groups = dict()
223
224         # generator of globally unique id for groups
225         self._group_id_generator = guid.GuidGenerator()
226
227         # Flag to stop processing thread
228         self._stop = False
229     
230         # Entity in charge of managing system failures
231         self._fm = FailureManager(self)
232
233         # EC state
234         self._state = ECState.RUNNING
235
236         # The runner is a pool of threads used to parallelize 
237         # execution of tasks
238         self._nthreads = 20
239         self._runner = None
240
241         # Event processing thread
242         self._cond = threading.Condition()
243         self._thread = threading.Thread(target = self._process)
244         self._thread.setDaemon(True)
245         self._thread.start()
246         
247     @property
248     def logger(self):
249         """ Returns the logger instance of the Experiment Controller
250
251         """
252         return self._logger
253
254     @property
255     def failure_level(self):
256         """ Returns the level of FAILURE of th experiment
257
258         """
259
260         return self._fm._failure_level
261
262     @property
263     def ecstate(self):
264         """ Returns the state of the Experiment Controller
265
266         """
267         return self._state
268
269     @property
270     def exp_id(self):
271         """ Returns the experiment id assigned by the user
272
273         """
274         return self._exp_id
275
276     @property
277     def run_id(self):
278         """ Returns the experiment instance (run) identifier (automatically 
279         generated)
280
281         """
282         return self._run_id
283
284     @property
285     def nthreads(self):
286         """ Returns the number of processing nthreads used
287
288         """
289         return self._nthreads
290
291     @property
292     def local_dir(self):
293         """ Root local directory for experiment files
294
295         """
296         return self._local_dir
297
298     @property
299     def exp_dir(self):
300         """ Local directory to store results and other files related to the 
301         experiment.
302
303         """
304         return self._exp_dir
305
306     @property
307     def run_dir(self):
308         """ Local directory to store results and other files related to the 
309         experiment run.
310
311         """
312         return self._run_dir
313
314     @property
315     def persist(self):
316         """ If Trie persist the ExperimentController to XML format upon completion
317
318         """
319         return self._persist
320
321     @property
322     def abort(self):
323         """ Returns True if the experiment has failed and should be interrupted,
324         False otherwise.
325
326         """
327         return self._fm.abort
328
329     def inform_failure(self, guid):
330         """ Reports a failure in a RM to the EC for evaluation
331
332             :param guid: Resource id
333             :type guid: int
334
335         """
336
337         return self._fm.eval_failure(guid)
338
339     def wait_finished(self, guids):
340         """ Blocking method that waits until all RMs in the 'guids' list 
341         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
342         RELEASED ), or until a failure in the experiment occurs 
343         (i.e. abort == True) 
344         
345             :param guids: List of guids
346             :type guids: list
347
348         """
349
350         def quit():
351             return self.abort
352
353         return self.wait(guids, state = ResourceState.STOPPED, 
354                 quit = quit)
355
356     def wait_started(self, guids):
357         """ Blocking method that waits until all RMs in the 'guids' list 
358         have reached a state >= STARTED, or until a failure in the 
359         experiment occurs (i.e. abort == True) 
360         
361             :param guids: List of guids
362             :type guids: list
363
364         """
365
366         def quit():
367             return self.abort
368
369         return self.wait(guids, state = ResourceState.STARTED, 
370                 quit = quit)
371
372     def wait_released(self, guids):
373         """ Blocking method that waits until all RMs in the 'guids' list 
374         have reached a state == RELEASED, or until the EC fails 
375         
376             :param guids: List of guids
377             :type guids: list
378
379         """
380
381         def quit():
382             return self._state == ECState.FAILED
383
384         return self.wait(guids, state = ResourceState.RELEASED, 
385                 quit = quit)
386
387     def wait_deployed(self, guids):
388         """ Blocking method that waits until all RMs in the 'guids' list 
389         have reached a state >= READY, or until a failure in the 
390         experiment occurs (i.e. abort == True) 
391         
392             :param guids: List of guids
393             :type guids: list
394
395         """
396
397         def quit():
398             return self.abort
399
400         return self.wait(guids, state = ResourceState.READY, 
401                 quit = quit)
402
403     def wait(self, guids, state, quit):
404         """ Blocking method that waits until all RMs in the 'guids' list 
405         have reached a state >= 'state', or until the 'quit' callback
406         yields True
407            
408             :param guids: List of guids
409             :type guids: list
410         
411         """
412         if isinstance(guids, int):
413             guids = [guids]
414
415         # Make a copy to avoid modifying the original guids list
416         guids = list(guids)
417
418         while True:
419             # If there are no more guids to wait for
420             # or the quit function returns True, exit the loop
421             if len(guids) == 0 or quit():
422                 break
423
424             # If a guid reached one of the target states, remove it from list
425             guid = guids.pop()
426             rm = self.get_resource(guid)
427             rstate = rm.state
428             
429             if rstate >= state:
430                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
431                     rm.get_rtype(), guid, rstate, state))
432             else:
433                 # Debug...
434                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
435                     guid, rstate, state))
436
437                 guids.append(guid)
438
439                 time.sleep(0.5)
440
441     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
442         plotter = ECPlotter()
443         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
444                 show = show)
445         return fpath
446
447     def serialize(self, format = SFormats.XML):
448         serializer = ECSerializer()
449         sec = serializer.load(self, format = format)
450         return sec
451
452     def save(self, dirpath = None, format = SFormats.XML):
453         serializer = ECSerializer()
454         path = serializer.save(self, dirpath  = None, format = format)
455         return path
456
457     def get_task(self, tid):
458         """ Returns a task by its id
459
460             :param tid: Id of the task
461             :type tid: int
462             
463             :rtype: Task
464             
465         """
466         return self._tasks.get(tid)
467
468     def get_resource(self, guid):
469         """ Returns a registered ResourceManager by its guid
470
471             :param guid: Id of the resource
472             :type guid: int
473             
474             :rtype: ResourceManager
475             
476         """
477         rm = self._resources.get(guid)
478         return rm
479
480     def get_resources_by_type(self, rtype):
481         """ Returns a registered ResourceManager by its guid
482
483             :param rtype: Resource type
484             :type rtype: string
485             
486             :rtype: list of ResourceManagers
487             
488         """
489         rms = []
490         for guid, rm in self._resources.iteritems():
491             if rm.get_rtype() == type: 
492                 rms.append(rm)
493         return rms
494
495     def remove_resource(self, guid):
496         del self._resources[guid]
497
498     @property
499     def resources(self):
500         """ Returns the set() of guids of all the ResourceManager
501
502             :return: Set of all RM guids
503             :rtype: set
504
505         """
506         keys = self._resources.keys()
507
508         return keys
509
510     def register_resource(self, rtype, guid = None):
511         """ Registers a new ResourceManager of type 'rtype' in the experiment
512         
513         This method will assign a new 'guid' for the RM, if no guid
514         is specified.
515
516             :param rtype: Type of the RM
517             :type rtype: str
518
519             :return: Guid of the RM
520             :rtype: int
521             
522         """
523         # Get next available guid
524         guid = self._guid_generator.next(guid)
525         
526         # Instantiate RM
527         rm = ResourceFactory.create(rtype, self, guid)
528
529         # Store RM
530         self._resources[guid] = rm
531
532         return guid
533
534     def get_attributes(self, guid):
535         """ Returns all the attributes of the RM with guid 'guid'
536
537             :param guid: Guid of the RM
538             :type guid: int
539
540             :return: List of attributes
541             :rtype: list
542
543         """
544         rm = self.get_resource(guid)
545         return rm.get_attributes()
546
547     def get_attribute(self, guid, name):
548         """ Returns the attribute 'name' of the RM with guid 'guid'
549
550             :param guid: Guid of the RM
551             :type guid: int
552
553             :param name: Name of the attribute
554             :type name: str
555
556             :return: The attribute with name 'name'
557             :rtype: Attribute
558
559         """
560         rm = self.get_resource(guid)
561         return rm.get_attribute(name)
562
563     def register_connection(self, guid1, guid2):
564         """ Registers a connection between a RM with guid 'guid1'
565         and another RM with guid 'guid2'. 
566     
567         The order of the in which the two guids are provided is not
568         important, since the connection relationship is symmetric.
569
570             :param guid1: First guid to connect
571             :type guid1: ResourceManager
572
573             :param guid2: Second guid to connect
574             :type guid: ResourceManager
575
576         """
577         rm1 = self.get_resource(guid1)
578         rm2 = self.get_resource(guid2)
579
580         rm1.register_connection(guid2)
581         rm2.register_connection(guid1)
582
583     def register_condition(self, guids1, action, guids2, state,
584             time = None):
585         """ Registers an action START, STOP or DEPLOY for all RM on list
586         guids1 to occur at time 'time' after all elements in list guids2 
587         have reached state 'state'.
588
589             :param guids1: List of guids of RMs subjected to action
590             :type guids1: list
591
592             :param action: Action to perform (either START, STOP or DEPLOY)
593             :type action: ResourceAction
594
595             :param guids2: List of guids of RMs to we waited for
596             :type guids2: list
597
598             :param state: State to wait for on RMs of list guids2 (STARTED,
599                 STOPPED, etc)
600             :type state: ResourceState
601
602             :param time: Time to wait after guids2 has reached status 
603             :type time: string
604
605         """
606         if isinstance(guids1, int):
607             guids1 = [guids1]
608         if isinstance(guids2, int):
609             guids2 = [guids2]
610
611         for guid1 in guids1:
612             rm = self.get_resource(guid1)
613             rm.register_condition(action, guids2, state, time)
614
615     def enable_trace(self, guid, name):
616         """ Enables a trace to be collected during the experiment run
617
618             :param name: Name of the trace
619             :type name: str
620
621         """
622         rm = self.get_resource(guid)
623         rm.enable_trace(name)
624
625     def trace_enabled(self, guid, name):
626         """ Returns True if the trace of name 'name' is enabled
627
628             :param name: Name of the trace
629             :type name: str
630
631         """
632         rm = self.get_resource(guid)
633         return rm.trace_enabled(name)
634
635     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
636         """ Returns information on a collected trace, the trace stream or 
637         blocks (chunks) of the trace stream
638
639             :param name: Name of the trace
640             :type name: str
641
642             :param attr: Can be one of:
643                          - TraceAttr.ALL (complete trace content), 
644                          - TraceAttr.STREAM (block in bytes to read starting 
645                                 at offset),
646                          - TraceAttr.PATH (full path to the trace file),
647                          - TraceAttr.SIZE (size of trace file). 
648             :type attr: str
649
650             :param block: Number of bytes to retrieve from trace, when attr is 
651                 TraceAttr.STREAM 
652             :type name: int
653
654             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
655             :type name: int
656
657             :rtype: str
658
659         """
660         rm = self.get_resource(guid)
661         return rm.trace(name, attr, block, offset)
662
663     def get_traces(self, guid):
664         """ Returns the list of the trace names of the RM with guid 'guid'
665
666             :param guid: Guid of the RM
667             :type guid: int
668
669             :return: List of trace names
670             :rtype: list
671
672         """
673         rm = self.get_resource(guid)
674         return rm.get_traces()
675
676
677     def discover(self, guid):
678         """ Discovers an available resource matching the criteria defined
679         by the RM with guid 'guid', and associates that resource to the RM
680
681         Not all RM types require (or are capable of) performing resource 
682         discovery. For the RM types which are not capable of doing so, 
683         invoking this method does not have any consequences. 
684
685             :param guid: Guid of the RM
686             :type guid: int
687
688         """
689         rm = self.get_resource(guid)
690         return rm.discover()
691
692     def provision(self, guid):
693         """ Provisions the resource associated to the RM with guid 'guid'.
694
695         Provisioning means making a resource 'accessible' to the user. 
696         Not all RM types require (or are capable of) performing resource 
697         provisioning. For the RM types which are not capable of doing so, 
698         invoking this method does not have any consequences. 
699
700             :param guid: Guid of the RM
701             :type guid: int
702
703         """
704         rm = self.get_resource(guid)
705         return rm.provision()
706
707     def get(self, guid, name):
708         """ Returns the value of the attribute with name 'name' on the
709         RM with guid 'guid'
710
711             :param guid: Guid of the RM
712             :type guid: int
713
714             :param name: Name of the attribute 
715             :type name: str
716
717             :return: The value of the attribute with name 'name'
718
719         """
720         rm = self.get_resource(guid)
721         return rm.get(name)
722
723     def set(self, guid, name, value):
724         """ Modifies the value of the attribute with name 'name' on the 
725         RM with guid 'guid'.
726
727             :param guid: Guid of the RM
728             :type guid: int
729
730             :param name: Name of the attribute
731             :type name: str
732
733             :param value: Value of the attribute
734
735         """
736         rm = self.get_resource(guid)
737         rm.set(name, value)
738
739     def get_global(self, rtype, name):
740         """ Returns the value of the global attribute with name 'name' on the
741         RMs of rtype 'rtype'.
742
743             :param guid: Guid of the RM
744             :type guid: int
745
746             :param name: Name of the attribute 
747             :type name: str
748
749             :return: The value of the attribute with name 'name'
750
751         """
752         rclass = ResourceFactory.get_resource_type(rtype)
753         return rclass.get_global(name)
754
755     def set_global(self, rtype, name, value):
756         """ Modifies the value of the global attribute with name 'name' on the 
757         RMs of with rtype 'rtype'.
758
759             :param guid: Guid of the RM
760             :type guid: int
761
762             :param name: Name of the attribute
763             :type name: str
764
765             :param value: Value of the attribute
766
767         """
768         rclass = ResourceFactory.get_resource_type(rtype)
769         return rclass.set_global(name, value)
770
771     def state(self, guid, hr = False):
772         """ Returns the state of a resource
773
774             :param guid: Resource guid
775             :type guid: integer
776
777             :param hr: Human readable. Forces return of a 
778                 status string instead of a number 
779             :type hr: boolean
780
781         """
782         rm = self.get_resource(guid)
783         state = rm.state
784
785         if hr:
786             return ResourceState2str.get(state)
787
788         return state
789
790     def stop(self, guid):
791         """ Stops the RM with guid 'guid'
792
793         Stopping a RM means that the resource it controls will
794         no longer take part of the experiment.
795
796             :param guid: Guid of the RM
797             :type guid: int
798
799         """
800         rm = self.get_resource(guid)
801         return rm.stop()
802
803     def start(self, guid):
804         """ Starts the RM with guid 'guid'
805
806         Starting a RM means that the resource it controls will
807         begin taking part of the experiment.
808
809             :param guid: Guid of the RM
810             :type guid: int
811
812         """
813         rm = self.get_resource(guid)
814         return rm.start()
815
816     def get_start_time(self, guid):
817         """ Returns the start time of the RM as a timestamp """
818         rm = self.get_resource(guid)
819         return rm.start_time
820
821     def get_stop_time(self, guid):
822         """ Returns the stop time of the RM as a timestamp """
823         rm = self.get_resource(guid)
824         return rm.stop_time
825
826     def get_discover_time(self, guid):
827         """ Returns the discover time of the RM as a timestamp """
828         rm = self.get_resource(guid)
829         return rm.discover_time
830
831     def get_provision_time(self, guid):
832         """ Returns the provision time of the RM as a timestamp """
833         rm = self.get_resource(guid)
834         return rm.provision_time
835
836     def get_ready_time(self, guid):
837         """ Returns the deployment time of the RM as a timestamp """
838         rm = self.get_resource(guid)
839         return rm.ready_time
840
841     def get_release_time(self, guid):
842         """ Returns the release time of the RM as a timestamp """
843         rm = self.get_resource(guid)
844         return rm.release_time
845
846     def get_failed_time(self, guid):
847         """ Returns the time failure occured for the RM as a timestamp """
848         rm = self.get_resource(guid)
849         return rm.failed_time
850
851     def set_with_conditions(self, name, value, guids1, guids2, state,
852             time = None):
853         """ Modifies the value of attribute with name 'name' on all RMs 
854         on the guids1 list when time 'time' has elapsed since all 
855         elements in guids2 list have reached state 'state'.
856
857             :param name: Name of attribute to set in RM
858             :type name: string
859
860             :param value: Value of attribute to set in RM
861             :type name: string
862
863             :param guids1: List of guids of RMs subjected to action
864             :type guids1: list
865
866             :param action: Action to register (either START or STOP)
867             :type action: ResourceAction
868
869             :param guids2: List of guids of RMs to we waited for
870             :type guids2: list
871
872             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
873             :type state: ResourceState
874
875             :param time: Time to wait after guids2 has reached status 
876             :type time: string
877
878         """
879         if isinstance(guids1, int):
880             guids1 = [guids1]
881         if isinstance(guids2, int):
882             guids2 = [guids2]
883
884         for guid1 in guids1:
885             rm = self.get_resource(guid)
886             rm.set_with_conditions(name, value, guids2, state, time)
887
888     def deploy(self, guids = None, wait_all_ready = True, group = None):
889         """ Deploys all ResourceManagers in the guids list. 
890         
891         If the argument 'guids' is not given, all RMs with state NEW
892         are deployed.
893
894             :param guids: List of guids of RMs to deploy
895             :type guids: list
896
897             :param wait_all_ready: Wait until all RMs are ready in
898                 order to start the RMs
899             :type guid: int
900
901             :param group: Id of deployment group in which to deploy RMs
902             :type group: int
903
904         """
905         self.logger.debug(" ------- DEPLOY START ------ ")
906
907         if not guids:
908             # If no guids list was passed, all 'NEW' RMs will be deployed
909             guids = []
910             for guid, rm in self._resources.iteritems():
911                 if rm.state == ResourceState.NEW:
912                     guids.append(guid)
913                 
914         if isinstance(guids, int):
915             guids = [guids]
916
917         # Create deployment group
918         # New guids can be added to a same deployment group later on
919         new_group = False
920         if not group:
921             new_group = True
922             group = self._group_id_generator.next()
923
924         if group not in self._groups:
925             self._groups[group] = []
926
927         self._groups[group].extend(guids)
928
929         def wait_all_and_start(group):
930             # Function that checks if all resources are READY
931             # before scheduling a start_with_conditions for each RM
932             reschedule = False
933             
934             # Get all guids in group
935             guids = self._groups[group]
936
937             for guid in guids:
938                 if self.state(guid) < ResourceState.READY:
939                     reschedule = True
940                     break
941
942             if reschedule:
943                 callback = functools.partial(wait_all_and_start, group)
944                 self.schedule("1s", callback)
945             else:
946                 # If all resources are ready, we schedule the start
947                 for guid in guids:
948                     rm = self.get_resource(guid)
949                     self.schedule("0s", rm.start_with_conditions)
950
951                     if rm.conditions.get(ResourceAction.STOP):
952                         # Only if the RM has STOP conditions we
953                         # schedule a stop. Otherwise the RM will stop immediately
954                         self.schedule("0s", rm.stop_with_conditions)
955
956         if wait_all_ready and new_group:
957             # Schedule a function to check that all resources are
958             # READY, and only then schedule the start.
959             # This aims at reducing the number of tasks looping in the 
960             # scheduler. 
961             # Instead of having many start tasks, we will have only one for 
962             # the whole group.
963             callback = functools.partial(wait_all_and_start, group)
964             self.schedule("0s", callback)
965
966         for guid in guids:
967             rm = self.get_resource(guid)
968             rm.deployment_group = group
969             self.schedule("0s", rm.deploy_with_conditions)
970
971             if not wait_all_ready:
972                 self.schedule("0s", rm.start_with_conditions)
973
974                 if rm.conditions.get(ResourceAction.STOP):
975                     # Only if the RM has STOP conditions we
976                     # schedule a stop. Otherwise the RM will stop immediately
977                     self.schedule("0s", rm.stop_with_conditions)
978
979     def release(self, guids = None):
980         """ Releases all ResourceManagers in the guids list.
981
982         If the argument 'guids' is not given, all RMs registered
983         in the experiment are released.
984
985             :param guids: List of RM guids
986             :type guids: list
987
988         """
989         if isinstance(guids, int):
990             guids = [guids]
991
992         if not guids:
993             guids = self.resources
994
995         for guid in guids:
996             rm = self.get_resource(guid)
997             self.schedule("0s", rm.release)
998
999         self.wait_released(guids)
1000
1001         if self.persist:
1002             self.save(dirpath = self.run_dir)
1003
1004         for guid in guids:
1005             if self.get(guid, "hardRelease"):
1006                 self.remove_resource(guid)
1007         
1008     def shutdown(self):
1009         """ Releases all resources and stops the ExperimentController
1010
1011         """
1012         # If there was a major failure we can't exit gracefully
1013         if self._state == ECState.FAILED:
1014             raise RuntimeError("EC failure. Can not exit gracefully")
1015
1016         # Remove all pending tasks from the scheduler queue
1017         for tid in list(self._scheduler.pending):
1018             self._scheduler.remove(tid)
1019
1020         # Remove pending tasks from the workers queue
1021         self._runner.empty()
1022
1023         self.release()
1024
1025         # Mark the EC state as TERMINATED
1026         self._state = ECState.TERMINATED
1027
1028         # Stop processing thread
1029         self._stop = True
1030
1031         # Notify condition to wake up the processing thread
1032         self._notify()
1033         
1034         if self._thread.is_alive():
1035            self._thread.join()
1036
1037     def schedule(self, date, callback, track = False):
1038         """ Schedules a callback to be executed at time 'date'.
1039
1040             :param date: string containing execution time for the task.
1041                     It can be expressed as an absolute time, using
1042                     timestamp format, or as a relative time matching
1043                     ^\d+.\d+(h|m|s|ms|us)$
1044
1045             :param callback: code to be executed for the task. Must be a
1046                         Python function, and receives args and kwargs
1047                         as arguments.
1048
1049             :param track: if set to True, the task will be retrievable with
1050                     the get_task() method
1051
1052             :return : The Id of the task
1053             :rtype: int
1054             
1055         """
1056         timestamp = stabsformat(date)
1057         task = Task(timestamp, callback)
1058         task = self._scheduler.schedule(task)
1059
1060         if track:
1061             self._tasks[task.id] = task
1062
1063         # Notify condition to wake up the processing thread
1064         self._notify()
1065
1066         return task.id
1067      
1068     def _process(self):
1069         """ Process scheduled tasks.
1070
1071         .. note::
1072         
1073         Tasks are scheduled by invoking the schedule method with a target 
1074         callback and an execution time. 
1075         The schedule method creates a new Task object with that callback 
1076         and execution time, and pushes it into the '_scheduler' queue. 
1077         The execution time and the order of arrival of tasks are used 
1078         to order the tasks in the queue.
1079
1080         The _process method is executed in an independent thread held by 
1081         the ExperimentController for as long as the experiment is running.
1082         This method takes tasks from the '_scheduler' queue in a loop 
1083         and processes them in parallel using multithreading. 
1084         The environmental variable NEPI_NTHREADS can be used to control
1085         the number of threads used to process tasks. The default value is 
1086         50.
1087
1088         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1089         This object keeps a pool of threads (workers), and a queue of tasks
1090         scheduled for 'immediate' execution. 
1091         
1092         On each iteration, the '_process' loop will take the next task that 
1093         is scheduled for 'future' execution from the '_scheduler' queue, 
1094         and if the execution time of that task is >= to the current time, 
1095         it will push that task into the PR for 'immediate execution'. 
1096         As soon as a worker is free, the PR will assign the next task to
1097         that worker.
1098
1099         Upon receiving a task to execute, each PR worker (thread) will 
1100         invoke the  _execute method of the EC, passing the task as 
1101         argument.         
1102         The _execute method will then invoke task.callback inside a 
1103         try/except block. If an exception is raised by the tasks.callback, 
1104         it will be trapped by the try block, logged to standard error 
1105         (usually the console), and the task will be marked as failed.
1106
1107         """
1108
1109         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1110         self._runner = ParallelRun(maxthreads = self.nthreads)
1111         self._runner.start()
1112
1113         while not self._stop:
1114             try:
1115                 self._cond.acquire()
1116
1117                 task = self._scheduler.next()
1118                 
1119                 if not task:
1120                     # No task to execute. Wait for a new task to be scheduled.
1121                     self._cond.wait()
1122                 else:
1123                     # The task timestamp is in the future. Wait for timeout 
1124                     # or until another task is scheduled.
1125                     now = tnow()
1126                     if now < task.timestamp:
1127                         # Calculate timeout in seconds
1128                         timeout = tdiffsec(task.timestamp, now)
1129
1130                         # Re-schedule task with the same timestamp
1131                         self._scheduler.schedule(task)
1132                         
1133                         task = None
1134
1135                         # Wait timeout or until a new task awakes the condition
1136                         self._cond.wait(timeout)
1137                
1138                 self._cond.release()
1139
1140                 if task:
1141                     # Process tasks in parallel
1142                     self._runner.put(self._execute, task)
1143             except: 
1144                 import traceback
1145                 err = traceback.format_exc()
1146                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1147
1148                 # Set the EC to FAILED state 
1149                 self._state = ECState.FAILED
1150             
1151                 # Set the FailureManager failure level to EC failure
1152                 self._fm.set_ec_failure()
1153
1154         self.logger.debug("Exiting the task processing loop ... ")
1155         
1156         self._runner.sync()
1157         self._runner.destroy()
1158
1159     def _execute(self, task):
1160         """ Executes a single task. 
1161
1162             :param task: Object containing the callback to execute
1163             :type task: Task
1164
1165         """
1166         try:
1167             # Invoke callback
1168             task.result = task.callback()
1169             task.status = TaskStatus.DONE
1170         except:
1171             import traceback
1172             err = traceback.format_exc()
1173             task.result = err
1174             task.status = TaskStatus.ERROR
1175             
1176             self.logger.error("Error occurred while executing task: %s" % err)
1177
1178     def _notify(self):
1179         """ Awakes the processing thread if it is blocked waiting 
1180         for new tasks to arrive
1181         
1182         """
1183         self._cond.acquire()
1184         self._cond.notify()
1185         self._cond.release()
1186