2e573dd91f0da821b7fb6df0ec4c974e973bc07f
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Describes the system failure state """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52
53     """
54
55     def __init__(self, ec):
56         self._ec = weakref.ref(ec)
57         self._failure_level = FailureLevel.OK
58         self._abort = False
59
60     @property
61     def ec(self):
62         """ Returns the ExperimentController associated to this FailureManager 
63         
64         """
65         
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         if self._failure_level == FailureLevel.OK:
74             rm = self.ec.get_resource(guid)
75             state = rm.state
76             critical = rm.get("critical")
77
78             if state == ResourceState.FAILED and critical:
79                 self._failure_level = FailureLevel.RM_FAILURE
80                 self._abort = True
81                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
82                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
83
84     def set_ec_failure(self):
85         self._failure_level = FailureLevel.EC_FAILURE
86
87 class ECState(object):
88     """ Possible states for an ExperimentController
89    
90     """
91     RUNNING = 1
92     FAILED = 2
93     TERMINATED = 3
94
95 class ExperimentController(object):
96     """
97     .. class:: Class Args :
98       
99         :param exp_id: Human readable identifier for the experiment scenario. 
100         :type exp_id: str
101
102     .. note::
103
104     An experiment, or scenario, is defined by a concrete set of resources,
105     and the behavior, configuration and interconnection of those resources. 
106     The Experiment Description (ED) is a detailed representation of a
107     single experiment. It contains all the necessary information to 
108     allow repeating the experiment. NEPI allows to describe
109     experiments by registering components (resources), configuring them
110     and interconnecting them.
111     
112     A same experiment (scenario) can be executed many times, generating 
113     different results. We call an experiment execution (instance) a 'run'.
114
115     The ExperimentController (EC), is the entity responsible of
116     managing an experiment run. The same scenario can be 
117     recreated (and re-run) by instantiating an EC and recreating 
118     the same experiment description. 
119
120     An experiment is represented as a graph of interconnected
121     resources. A resource is a generic concept in the sense that any
122     component taking part of an experiment, whether physical of
123     virtual, is considered a resource. A resources could be a host, 
124     a virtual machine, an application, a simulator, a IP address.
125
126     A ResourceManager (RM), is the entity responsible for managing a 
127     single resource. ResourceManagers are specific to a resource
128     type (i.e. An RM to control a Linux application will not be
129     the same as the RM used to control a ns-3 simulation).
130     To support a new type of resource, a new RM must be implemented. 
131     NEPI already provides a variety of RMs to control basic resources, 
132     and new can be extended from the existing ones.
133
134     Through the EC interface the user can create ResourceManagers (RMs),
135     configure them and interconnect them, to describe an experiment.
136     Describing an experiment through the EC does not run the experiment.
137     Only when the 'deploy()' method is invoked on the EC, the EC will take 
138     actions to transform the 'described' experiment into a 'running' experiment.
139
140     While the experiment is running, it is possible to continue to
141     create/configure/connect RMs, and to deploy them to involve new
142     resources in the experiment (this is known as 'interactive' deployment).
143     
144     An experiments in NEPI is identified by a string id, 
145     which is either given by the user, or automatically generated by NEPI.  
146     The purpose of this identifier is to separate files and results that 
147     belong to different experiment scenarios. 
148     However, since a same 'experiment' can be run many times, the experiment
149     id is not enough to identify an experiment instance (run).
150     For this reason, the ExperimentController has two identifier, the 
151     exp_id, which can be re-used in different ExperimentController,
152     and the run_id, which is unique to one ExperimentController instance, and
153     is automatically generated by NEPI.
154    
155     """
156
157     @classmethod
158     def load(cls, filepath, format = SFormats.XML):
159         serializer = ECSerializer()
160         ec = serializer.load(filepath)
161         return ec
162
163     def __init__(self, exp_id = None, local_dir = None, persist = False,
164             add_node_callback = None, add_edge_callback = None, **kwargs):
165         """ ExperimentController entity to model an execute a network 
166         experiment.
167         
168         :param exp_id: Human readable name to identify the experiment
169         :type exp_id: str
170
171         :param local_dir: Path to local directory where to store experiment
172             related files
173         :type local_dir: str
174
175         :param persist: Save an XML description of the experiment after 
176         completion at local_dir
177         :type persist: bool
178
179         :param add_node_callback: Callback to invoke for node instantiation
180         when automatic topology creation mode is used 
181         :type add_node_callback: function
182
183         :param add_edge_callback: Callback to invoke for edge instantiation 
184         when automatic topology creation mode is used 
185         :type add_edge_callback: function
186
187         """
188         super(ExperimentController, self).__init__()
189
190         # Logging
191         self._logger = logging.getLogger("ExperimentController")
192
193         # Run identifier. It identifies a concrete execution instance (run) 
194         # of an experiment.
195         # Since a same experiment (same configuration) can be executed many 
196         # times, this run_id permits to separate result files generated on 
197         # different experiment executions
198         self._run_id = tsformat()
199
200         # Experiment identifier. Usually assigned by the user
201         # Identifies the experiment scenario (i.e. configuration, 
202         # resources used, etc)
203         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
204
205         # Local path where to store experiment related files (results, etc)
206         if not local_dir:
207             local_dir = tempfile.gettempdir() # /tmp
208
209         self._local_dir = local_dir
210         self._exp_dir = os.path.join(local_dir, self.exp_id)
211         self._run_dir = os.path.join(self.exp_dir, self.run_id)
212
213         # If True persist the experiment controller in XML format, after completion
214         self._persist = persist
215
216         # generator of globally unique ids
217         self._guid_generator = guid.GuidGenerator()
218         
219         # Resource managers
220         self._resources = dict()
221
222         # Scheduler. It a queue that holds tasks scheduled for
223         # execution, and yields the next task to be executed 
224         # ordered by execution and arrival time
225         self._scheduler = HeapScheduler()
226
227         # Tasks
228         self._tasks = dict()
229
230         # RM groups (for deployment) 
231         self._groups = dict()
232
233         # generator of globally unique id for groups
234         self._group_id_generator = guid.GuidGenerator()
235
236         # Flag to stop processing thread
237         self._stop = False
238     
239         # Entity in charge of managing system failures
240         self._fm = FailureManager(self)
241
242         # EC state
243         self._state = ECState.RUNNING
244
245         # Automatically construct experiment description 
246         self._netgraph = None
247         if add_node_callback or add_edge_callback or kwargs.get("topology"):
248             self._build_from_netgraph(add_node_callback, add_edge_callback, 
249                     **kwargs)
250
251         # The runner is a pool of threads used to parallelize 
252         # execution of tasks
253         self._nthreads = 20
254         self._runner = None
255
256         # Event processing thread
257         self._cond = threading.Condition()
258         self._thread = threading.Thread(target = self._process)
259         self._thread.setDaemon(True)
260         self._thread.start()
261         
262     @property
263     def logger(self):
264         """ Returns the logger instance of the Experiment Controller
265
266         """
267         return self._logger
268
269     @property
270     def failure_level(self):
271         """ Returns the level of FAILURE of th experiment
272
273         """
274
275         return self._fm._failure_level
276
277     @property
278     def ecstate(self):
279         """ Returns the state of the Experiment Controller
280
281         """
282         return self._state
283
284     @property
285     def exp_id(self):
286         """ Returns the experiment id assigned by the user
287
288         """
289         return self._exp_id
290
291     @property
292     def run_id(self):
293         """ Returns the experiment instance (run) identifier (automatically 
294         generated)
295
296         """
297         return self._run_id
298
299     @property
300     def nthreads(self):
301         """ Returns the number of processing nthreads used
302
303         """
304         return self._nthreads
305
306     @property
307     def local_dir(self):
308         """ Root local directory for experiment files
309
310         """
311         return self._local_dir
312
313     @property
314     def exp_dir(self):
315         """ Local directory to store results and other files related to the 
316         experiment.
317
318         """
319         return self._exp_dir
320
321     @property
322     def run_dir(self):
323         """ Local directory to store results and other files related to the 
324         experiment run.
325
326         """
327         return self._run_dir
328
329     @property
330     def persist(self):
331         """ If True, persists the ExperimentController to XML format upon 
332         experiment completion
333
334         """
335         return self._persist
336
337     @property
338     def netgraph(self):
339         """ Return NetGraph instance if experiment description was automatically 
340         generated
341
342         """
343         return self._netgraph
344
345     @property
346     def abort(self):
347         """ Returns True if the experiment has failed and should be interrupted,
348         False otherwise.
349
350         """
351         return self._fm.abort
352
353     def inform_failure(self, guid):
354         """ Reports a failure in a RM to the EC for evaluation
355
356             :param guid: Resource id
357             :type guid: int
358
359         """
360
361         return self._fm.eval_failure(guid)
362
363     def wait_finished(self, guids):
364         """ Blocking method that waits until all RMs in the 'guids' list 
365         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
366         RELEASED ), or until a failure in the experiment occurs 
367         (i.e. abort == True) 
368         
369             :param guids: List of guids
370             :type guids: list
371
372         """
373
374         def quit():
375             return self.abort
376
377         return self.wait(guids, state = ResourceState.STOPPED, 
378                 quit = quit)
379
380     def wait_started(self, guids):
381         """ Blocking method that waits until all RMs in the 'guids' list 
382         have reached a state >= STARTED, or until a failure in the 
383         experiment occurs (i.e. abort == True) 
384         
385             :param guids: List of guids
386             :type guids: list
387
388         """
389
390         def quit():
391             return self.abort
392
393         return self.wait(guids, state = ResourceState.STARTED, 
394                 quit = quit)
395
396     def wait_released(self, guids):
397         """ Blocking method that waits until all RMs in the 'guids' list 
398         have reached a state == RELEASED, or until the EC fails 
399         
400             :param guids: List of guids
401             :type guids: list
402
403         """
404
405         def quit():
406             return self._state == ECState.FAILED
407
408         return self.wait(guids, state = ResourceState.RELEASED, 
409                 quit = quit)
410
411     def wait_deployed(self, guids):
412         """ Blocking method that waits until all RMs in the 'guids' list 
413         have reached a state >= READY, or until a failure in the 
414         experiment occurs (i.e. abort == True) 
415         
416             :param guids: List of guids
417             :type guids: list
418
419         """
420
421         def quit():
422             return self.abort
423
424         return self.wait(guids, state = ResourceState.READY, 
425                 quit = quit)
426
427     def wait(self, guids, state, quit):
428         """ Blocking method that waits until all RMs in the 'guids' list 
429         have reached a state >= 'state', or until the 'quit' callback
430         yields True
431            
432             :param guids: List of guids
433             :type guids: list
434         
435         """
436         if isinstance(guids, int):
437             guids = [guids]
438
439         # Make a copy to avoid modifying the original guids list
440         guids = list(guids)
441
442         while True:
443             # If there are no more guids to wait for
444             # or the quit function returns True, exit the loop
445             if len(guids) == 0 or quit():
446                 break
447
448             # If a guid reached one of the target states, remove it from list
449             guid = guids.pop()
450             rm = self.get_resource(guid)
451             rstate = rm.state
452             
453             if rstate >= state:
454                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
455                     rm.get_rtype(), guid, rstate, state))
456             else:
457                 # Debug...
458                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
459                     guid, rstate, state))
460
461                 guids.append(guid)
462
463                 time.sleep(0.5)
464
465     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
466         plotter = ECPlotter()
467         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
468                 show = show)
469         return fpath
470
471     def serialize(self, format = SFormats.XML):
472         serializer = ECSerializer()
473         sec = serializer.load(self, format = format)
474         return sec
475
476     def save(self, dirpath = None, format = SFormats.XML):
477         if dirpath == None:
478             dirpath = self.run_dir
479
480         try:
481             os.makedirs(dirpath)
482         except OSError:
483             pass
484
485         serializer = ECSerializer()
486         path = serializer.save(self, dirpath, format = format)
487         return path
488
489     def get_task(self, tid):
490         """ Returns a task by its id
491
492             :param tid: Id of the task
493             :type tid: int
494             
495             :rtype: Task
496             
497         """
498         return self._tasks.get(tid)
499
500     def get_resource(self, guid):
501         """ Returns a registered ResourceManager by its guid
502
503             :param guid: Id of the resource
504             :type guid: int
505             
506             :rtype: ResourceManager
507             
508         """
509         rm = self._resources.get(guid)
510         return rm
511
512     def get_resources_by_type(self, rtype):
513         """ Returns the ResourceManager objects of type rtype
514
515             :param rtype: Resource type
516             :type rtype: string
517             
518             :rtype: list of ResourceManagers
519             
520         """
521         rms = []
522         for guid, rm in self._resources.iteritems():
523             if rm.get_rtype() == rtype: 
524                 rms.append(rm)
525         return rms
526
527     def remove_resource(self, guid):
528         del self._resources[guid]
529
530     @property
531     def resources(self):
532         """ Returns the guids of all ResourceManagers 
533
534             :return: Set of all RM guids
535             :rtype: list
536
537         """
538         keys = self._resources.keys()
539
540         return keys
541
542     def filter_resources(self, rtype):
543         """ Returns the guids of all ResourceManagers of type rtype
544
545             :param rtype: Resource type
546             :type rtype: string
547             
548             :rtype: list of guids
549             
550         """
551         rms = []
552         for guid, rm in self._resources.iteritems():
553             if rm.get_rtype() == rtype: 
554                 rms.append(rm.guid)
555         return rms
556
557     def register_resource(self, rtype, guid = None):
558         """ Registers a new ResourceManager of type 'rtype' in the experiment
559         
560         This method will assign a new 'guid' for the RM, if no guid
561         is specified.
562
563             :param rtype: Type of the RM
564             :type rtype: str
565
566             :return: Guid of the RM
567             :rtype: int
568             
569         """
570         # Get next available guid
571         guid = self._guid_generator.next(guid)
572         
573         # Instantiate RM
574         rm = ResourceFactory.create(rtype, self, guid)
575
576         # Store RM
577         self._resources[guid] = rm
578
579         return guid
580
581     def get_attributes(self, guid):
582         """ Returns all the attributes of the RM with guid 'guid'
583
584             :param guid: Guid of the RM
585             :type guid: int
586
587             :return: List of attributes
588             :rtype: list
589
590         """
591         rm = self.get_resource(guid)
592         return rm.get_attributes()
593
594     def get_attribute(self, guid, name):
595         """ Returns the attribute 'name' of the RM with guid 'guid'
596
597             :param guid: Guid of the RM
598             :type guid: int
599
600             :param name: Name of the attribute
601             :type name: str
602
603             :return: The attribute with name 'name'
604             :rtype: Attribute
605
606         """
607         rm = self.get_resource(guid)
608         return rm.get_attribute(name)
609
610     def register_connection(self, guid1, guid2):
611         """ Registers a connection between a RM with guid 'guid1'
612         and another RM with guid 'guid2'. 
613     
614         The order of the in which the two guids are provided is not
615         important, since the connection relationship is symmetric.
616
617             :param guid1: First guid to connect
618             :type guid1: ResourceManager
619
620             :param guid2: Second guid to connect
621             :type guid: ResourceManager
622
623         """
624         rm1 = self.get_resource(guid1)
625         rm2 = self.get_resource(guid2)
626
627         rm1.register_connection(guid2)
628         rm2.register_connection(guid1)
629
630     def register_condition(self, guids1, action, guids2, state,
631             time = None):
632         """ Registers an action START, STOP or DEPLOY for all RM on list
633         guids1 to occur at time 'time' after all elements in list guids2 
634         have reached state 'state'.
635
636             :param guids1: List of guids of RMs subjected to action
637             :type guids1: list
638
639             :param action: Action to perform (either START, STOP or DEPLOY)
640             :type action: ResourceAction
641
642             :param guids2: List of guids of RMs to we waited for
643             :type guids2: list
644
645             :param state: State to wait for on RMs of list guids2 (STARTED,
646                 STOPPED, etc)
647             :type state: ResourceState
648
649             :param time: Time to wait after guids2 has reached status 
650             :type time: string
651
652         """
653         if isinstance(guids1, int):
654             guids1 = [guids1]
655         if isinstance(guids2, int):
656             guids2 = [guids2]
657
658         for guid1 in guids1:
659             rm = self.get_resource(guid1)
660             rm.register_condition(action, guids2, state, time)
661
662     def enable_trace(self, guid, name):
663         """ Enables a trace to be collected during the experiment run
664
665             :param name: Name of the trace
666             :type name: str
667
668         """
669         rm = self.get_resource(guid)
670         rm.enable_trace(name)
671
672     def trace_enabled(self, guid, name):
673         """ Returns True if the trace of name 'name' is enabled
674
675             :param name: Name of the trace
676             :type name: str
677
678         """
679         rm = self.get_resource(guid)
680         return rm.trace_enabled(name)
681
682     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
683         """ Returns information on a collected trace, the trace stream or 
684         blocks (chunks) of the trace stream
685
686             :param name: Name of the trace
687             :type name: str
688
689             :param attr: Can be one of:
690                          - TraceAttr.ALL (complete trace content), 
691                          - TraceAttr.STREAM (block in bytes to read starting 
692                                 at offset),
693                          - TraceAttr.PATH (full path to the trace file),
694                          - TraceAttr.SIZE (size of trace file). 
695             :type attr: str
696
697             :param block: Number of bytes to retrieve from trace, when attr is 
698                 TraceAttr.STREAM 
699             :type name: int
700
701             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
702             :type name: int
703
704             :rtype: str
705
706         """
707         rm = self.get_resource(guid)
708         return rm.trace(name, attr, block, offset)
709
710     def get_traces(self, guid):
711         """ Returns the list of the trace names of the RM with guid 'guid'
712
713             :param guid: Guid of the RM
714             :type guid: int
715
716             :return: List of trace names
717             :rtype: list
718
719         """
720         rm = self.get_resource(guid)
721         return rm.get_traces()
722
723
724     def discover(self, guid):
725         """ Discovers an available resource matching the criteria defined
726         by the RM with guid 'guid', and associates that resource to the RM
727
728         Not all RM types require (or are capable of) performing resource 
729         discovery. For the RM types which are not capable of doing so, 
730         invoking this method does not have any consequences. 
731
732             :param guid: Guid of the RM
733             :type guid: int
734
735         """
736         rm = self.get_resource(guid)
737         return rm.discover()
738
739     def provision(self, guid):
740         """ Provisions the resource associated to the RM with guid 'guid'.
741
742         Provisioning means making a resource 'accessible' to the user. 
743         Not all RM types require (or are capable of) performing resource 
744         provisioning. For the RM types which are not capable of doing so, 
745         invoking this method does not have any consequences. 
746
747             :param guid: Guid of the RM
748             :type guid: int
749
750         """
751         rm = self.get_resource(guid)
752         return rm.provision()
753
754     def get(self, guid, name):
755         """ Returns the value of the attribute with name 'name' on the
756         RM with guid 'guid'
757
758             :param guid: Guid of the RM
759             :type guid: int
760
761             :param name: Name of the attribute 
762             :type name: str
763
764             :return: The value of the attribute with name 'name'
765
766         """
767         rm = self.get_resource(guid)
768         return rm.get(name)
769
770     def set(self, guid, name, value):
771         """ Modifies the value of the attribute with name 'name' on the 
772         RM with guid 'guid'.
773
774             :param guid: Guid of the RM
775             :type guid: int
776
777             :param name: Name of the attribute
778             :type name: str
779
780             :param value: Value of the attribute
781
782         """
783         rm = self.get_resource(guid)
784         rm.set(name, value)
785
786     def get_global(self, rtype, name):
787         """ Returns the value of the global attribute with name 'name' on the
788         RMs of rtype 'rtype'.
789
790             :param guid: Guid of the RM
791             :type guid: int
792
793             :param name: Name of the attribute 
794             :type name: str
795
796             :return: The value of the attribute with name 'name'
797
798         """
799         rclass = ResourceFactory.get_resource_type(rtype)
800         return rclass.get_global(name)
801
802     def set_global(self, rtype, name, value):
803         """ Modifies the value of the global attribute with name 'name' on the 
804         RMs of with rtype 'rtype'.
805
806             :param guid: Guid of the RM
807             :type guid: int
808
809             :param name: Name of the attribute
810             :type name: str
811
812             :param value: Value of the attribute
813
814         """
815         rclass = ResourceFactory.get_resource_type(rtype)
816         return rclass.set_global(name, value)
817
818     def state(self, guid, hr = False):
819         """ Returns the state of a resource
820
821             :param guid: Resource guid
822             :type guid: integer
823
824             :param hr: Human readable. Forces return of a 
825                 status string instead of a number 
826             :type hr: boolean
827
828         """
829         rm = self.get_resource(guid)
830         state = rm.state
831
832         if hr:
833             return ResourceState2str.get(state)
834
835         return state
836
837     def stop(self, guid):
838         """ Stops the RM with guid 'guid'
839
840         Stopping a RM means that the resource it controls will
841         no longer take part of the experiment.
842
843             :param guid: Guid of the RM
844             :type guid: int
845
846         """
847         rm = self.get_resource(guid)
848         return rm.stop()
849
850     def start(self, guid):
851         """ Starts the RM with guid 'guid'
852
853         Starting a RM means that the resource it controls will
854         begin taking part of the experiment.
855
856             :param guid: Guid of the RM
857             :type guid: int
858
859         """
860         rm = self.get_resource(guid)
861         return rm.start()
862
863     def get_start_time(self, guid):
864         """ Returns the start time of the RM as a timestamp """
865         rm = self.get_resource(guid)
866         return rm.start_time
867
868     def get_stop_time(self, guid):
869         """ Returns the stop time of the RM as a timestamp """
870         rm = self.get_resource(guid)
871         return rm.stop_time
872
873     def get_discover_time(self, guid):
874         """ Returns the discover time of the RM as a timestamp """
875         rm = self.get_resource(guid)
876         return rm.discover_time
877
878     def get_provision_time(self, guid):
879         """ Returns the provision time of the RM as a timestamp """
880         rm = self.get_resource(guid)
881         return rm.provision_time
882
883     def get_ready_time(self, guid):
884         """ Returns the deployment time of the RM as a timestamp """
885         rm = self.get_resource(guid)
886         return rm.ready_time
887
888     def get_release_time(self, guid):
889         """ Returns the release time of the RM as a timestamp """
890         rm = self.get_resource(guid)
891         return rm.release_time
892
893     def get_failed_time(self, guid):
894         """ Returns the time failure occured for the RM as a timestamp """
895         rm = self.get_resource(guid)
896         return rm.failed_time
897
898     def set_with_conditions(self, name, value, guids1, guids2, state,
899             time = None):
900         """ Modifies the value of attribute with name 'name' on all RMs 
901         on the guids1 list when time 'time' has elapsed since all 
902         elements in guids2 list have reached state 'state'.
903
904             :param name: Name of attribute to set in RM
905             :type name: string
906
907             :param value: Value of attribute to set in RM
908             :type name: string
909
910             :param guids1: List of guids of RMs subjected to action
911             :type guids1: list
912
913             :param action: Action to register (either START or STOP)
914             :type action: ResourceAction
915
916             :param guids2: List of guids of RMs to we waited for
917             :type guids2: list
918
919             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
920             :type state: ResourceState
921
922             :param time: Time to wait after guids2 has reached status 
923             :type time: string
924
925         """
926         if isinstance(guids1, int):
927             guids1 = [guids1]
928         if isinstance(guids2, int):
929             guids2 = [guids2]
930
931         for guid1 in guids1:
932             rm = self.get_resource(guid)
933             rm.set_with_conditions(name, value, guids2, state, time)
934
935     def deploy(self, guids = None, wait_all_ready = True, group = None):
936         """ Deploys all ResourceManagers in the guids list. 
937         
938         If the argument 'guids' is not given, all RMs with state NEW
939         are deployed.
940
941             :param guids: List of guids of RMs to deploy
942             :type guids: list
943
944             :param wait_all_ready: Wait until all RMs are ready in
945                 order to start the RMs
946             :type guid: int
947
948             :param group: Id of deployment group in which to deploy RMs
949             :type group: int
950
951         """
952         self.logger.debug(" ------- DEPLOY START ------ ")
953
954         if not guids:
955             # If no guids list was passed, all 'NEW' RMs will be deployed
956             guids = []
957             for guid, rm in self._resources.iteritems():
958                 if rm.state == ResourceState.NEW:
959                     guids.append(guid)
960                 
961         if isinstance(guids, int):
962             guids = [guids]
963
964         # Create deployment group
965         # New guids can be added to a same deployment group later on
966         new_group = False
967         if not group:
968             new_group = True
969             group = self._group_id_generator.next()
970
971         if group not in self._groups:
972             self._groups[group] = []
973
974         self._groups[group].extend(guids)
975
976         def wait_all_and_start(group):
977             # Function that checks if all resources are READY
978             # before scheduling a start_with_conditions for each RM
979             reschedule = False
980             
981             # Get all guids in group
982             guids = self._groups[group]
983
984             for guid in guids:
985                 if self.state(guid) < ResourceState.READY:
986                     reschedule = True
987                     break
988
989             if reschedule:
990                 callback = functools.partial(wait_all_and_start, group)
991                 self.schedule("1s", callback)
992             else:
993                 # If all resources are ready, we schedule the start
994                 for guid in guids:
995                     rm = self.get_resource(guid)
996                     self.schedule("0s", rm.start_with_conditions)
997
998                     if rm.conditions.get(ResourceAction.STOP):
999                         # Only if the RM has STOP conditions we
1000                         # schedule a stop. Otherwise the RM will stop immediately
1001                         self.schedule("0s", rm.stop_with_conditions)
1002
1003         if wait_all_ready and new_group:
1004             # Schedule a function to check that all resources are
1005             # READY, and only then schedule the start.
1006             # This aims at reducing the number of tasks looping in the 
1007             # scheduler. 
1008             # Instead of having many start tasks, we will have only one for 
1009             # the whole group.
1010             callback = functools.partial(wait_all_and_start, group)
1011             self.schedule("0s", callback)
1012
1013         for guid in guids:
1014             rm = self.get_resource(guid)
1015             rm.deployment_group = group
1016             self.schedule("0s", rm.deploy_with_conditions)
1017
1018             if not wait_all_ready:
1019                 self.schedule("0s", rm.start_with_conditions)
1020
1021                 if rm.conditions.get(ResourceAction.STOP):
1022                     # Only if the RM has STOP conditions we
1023                     # schedule a stop. Otherwise the RM will stop immediately
1024                     self.schedule("0s", rm.stop_with_conditions)
1025
1026     def release(self, guids = None):
1027         """ Releases all ResourceManagers in the guids list.
1028
1029         If the argument 'guids' is not given, all RMs registered
1030         in the experiment are released.
1031
1032             :param guids: List of RM guids
1033             :type guids: list
1034
1035         """
1036         if isinstance(guids, int):
1037             guids = [guids]
1038
1039         if not guids:
1040             guids = self.resources
1041
1042         for guid in guids:
1043             rm = self.get_resource(guid)
1044             self.schedule("0s", rm.release)
1045
1046         self.wait_released(guids)
1047
1048         if self.persist:
1049             self.save()
1050
1051         for guid in guids:
1052             if self.get(guid, "hardRelease"):
1053                 self.remove_resource(guid)
1054         
1055     def shutdown(self):
1056         """ Releases all resources and stops the ExperimentController
1057
1058         """
1059         # If there was a major failure we can't exit gracefully
1060         if self._state == ECState.FAILED:
1061             raise RuntimeError("EC failure. Can not exit gracefully")
1062
1063         # Remove all pending tasks from the scheduler queue
1064         for tid in list(self._scheduler.pending):
1065             self._scheduler.remove(tid)
1066
1067         # Remove pending tasks from the workers queue
1068         self._runner.empty()
1069
1070         self.release()
1071
1072         # Mark the EC state as TERMINATED
1073         self._state = ECState.TERMINATED
1074
1075         # Stop processing thread
1076         self._stop = True
1077
1078         # Notify condition to wake up the processing thread
1079         self._notify()
1080         
1081         if self._thread.is_alive():
1082            self._thread.join()
1083
1084     def schedule(self, date, callback, track = False):
1085         """ Schedules a callback to be executed at time 'date'.
1086
1087             :param date: string containing execution time for the task.
1088                     It can be expressed as an absolute time, using
1089                     timestamp format, or as a relative time matching
1090                     ^\d+.\d+(h|m|s|ms|us)$
1091
1092             :param callback: code to be executed for the task. Must be a
1093                         Python function, and receives args and kwargs
1094                         as arguments.
1095
1096             :param track: if set to True, the task will be retrievable with
1097                     the get_task() method
1098
1099             :return : The Id of the task
1100             :rtype: int
1101             
1102         """
1103         timestamp = stabsformat(date)
1104         task = Task(timestamp, callback)
1105         task = self._scheduler.schedule(task)
1106
1107         if track:
1108             self._tasks[task.id] = task
1109
1110         # Notify condition to wake up the processing thread
1111         self._notify()
1112
1113         return task.id
1114      
1115     def _process(self):
1116         """ Process scheduled tasks.
1117
1118         .. note::
1119         
1120         Tasks are scheduled by invoking the schedule method with a target 
1121         callback and an execution time. 
1122         The schedule method creates a new Task object with that callback 
1123         and execution time, and pushes it into the '_scheduler' queue. 
1124         The execution time and the order of arrival of tasks are used 
1125         to order the tasks in the queue.
1126
1127         The _process method is executed in an independent thread held by 
1128         the ExperimentController for as long as the experiment is running.
1129         This method takes tasks from the '_scheduler' queue in a loop 
1130         and processes them in parallel using multithreading. 
1131         The environmental variable NEPI_NTHREADS can be used to control
1132         the number of threads used to process tasks. The default value is 
1133         50.
1134
1135         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1136         This object keeps a pool of threads (workers), and a queue of tasks
1137         scheduled for 'immediate' execution. 
1138         
1139         On each iteration, the '_process' loop will take the next task that 
1140         is scheduled for 'future' execution from the '_scheduler' queue, 
1141         and if the execution time of that task is >= to the current time, 
1142         it will push that task into the PR for 'immediate execution'. 
1143         As soon as a worker is free, the PR will assign the next task to
1144         that worker.
1145
1146         Upon receiving a task to execute, each PR worker (thread) will 
1147         invoke the  _execute method of the EC, passing the task as 
1148         argument.         
1149         The _execute method will then invoke task.callback inside a 
1150         try/except block. If an exception is raised by the tasks.callback, 
1151         it will be trapped by the try block, logged to standard error 
1152         (usually the console), and the task will be marked as failed.
1153
1154         """
1155
1156         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1157         self._runner = ParallelRun(maxthreads = self.nthreads)
1158         self._runner.start()
1159
1160         while not self._stop:
1161             try:
1162                 self._cond.acquire()
1163
1164                 task = self._scheduler.next()
1165                 
1166                 if not task:
1167                     # No task to execute. Wait for a new task to be scheduled.
1168                     self._cond.wait()
1169                 else:
1170                     # The task timestamp is in the future. Wait for timeout 
1171                     # or until another task is scheduled.
1172                     now = tnow()
1173                     if now < task.timestamp:
1174                         # Calculate timeout in seconds
1175                         timeout = tdiffsec(task.timestamp, now)
1176
1177                         # Re-schedule task with the same timestamp
1178                         self._scheduler.schedule(task)
1179                         
1180                         task = None
1181
1182                         # Wait timeout or until a new task awakes the condition
1183                         self._cond.wait(timeout)
1184                
1185                 self._cond.release()
1186
1187                 if task:
1188                     # Process tasks in parallel
1189                     self._runner.put(self._execute, task)
1190             except: 
1191                 import traceback
1192                 err = traceback.format_exc()
1193                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1194
1195                 # Set the EC to FAILED state 
1196                 self._state = ECState.FAILED
1197             
1198                 # Set the FailureManager failure level to EC failure
1199                 self._fm.set_ec_failure()
1200
1201         self.logger.debug("Exiting the task processing loop ... ")
1202         
1203         self._runner.sync()
1204         self._runner.destroy()
1205
1206     def _execute(self, task):
1207         """ Executes a single task. 
1208
1209             :param task: Object containing the callback to execute
1210             :type task: Task
1211
1212         """
1213         try:
1214             # Invoke callback
1215             task.result = task.callback()
1216             task.status = TaskStatus.DONE
1217         except:
1218             import traceback
1219             err = traceback.format_exc()
1220             task.result = err
1221             task.status = TaskStatus.ERROR
1222             
1223             self.logger.error("Error occurred while executing task: %s" % err)
1224
1225     def _notify(self):
1226         """ Awakes the processing thread if it is blocked waiting 
1227         for new tasks to arrive
1228         
1229         """
1230         self._cond.acquire()
1231         self._cond.notify()
1232         self._cond.release()
1233
1234     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1235             **kwargs):
1236         """ Automates experiment description using a NetGraph instance.
1237         """
1238         self._netgraph = NetGraph(**kwargs)
1239
1240         if add_node_callback:
1241             ### Add resources to the EC
1242             for nid in self.netgraph.nodes():
1243                 add_node_callback(self, nid)
1244
1245         if add_edge_callback:
1246             #### Add connections between resources
1247             for nid1, nid2 in self.netgraph.edges():
1248                 add_edge_callback(self, nid1, nid2)
1249