c403d797b0137c3f7d4202dddd2cdc4cd0eff54a
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 from nepi.util import guid
22 from nepi.util.parallel import ParallelRun
23 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
24 from nepi.execution.resource import ResourceFactory, ResourceAction, \
25         ResourceState, ResourceState2str
26 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
27 from nepi.execution.trace import TraceAttr
28 from nepi.util.serializer import ECSerializer, SFormats
29 from nepi.util.plotter import ECPlotter, PFormats
30 from nepi.util.netgraph import NetGraph, TopologyType 
31
32 # TODO: use multiprocessing instead of threading
33 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
34
35 import functools
36 import logging
37 import os
38 import sys
39 import tempfile
40 import time
41 import threading
42 import weakref
43
44 class FailureLevel(object):
45     """ Possible failure states for the experiment """
46     OK = 1
47     RM_FAILURE = 2
48     EC_FAILURE = 3
49
50 class FailureManager(object):
51     """ The FailureManager is responsible for handling errors
52     and deciding whether an experiment should be aborted or not
53     """
54
55     def __init__(self):
56         self._ec = None
57         self._failure_level = FailureLevel.OK
58         self._abort = False
59
60     def set_ec(self, ec):
61         self._ec = weakref.ref(ec)
62
63     @property
64     def ec(self):
65         """ Returns the ExperimentController associated to this FailureManager 
66         """
67         return self._ec()
68
69     @property
70     def abort(self):
71         return self._abort
72
73     def eval_failure(self, guid):
74         """ Implements failure policy and sets the abort state of the
75         experiment based on the failure state and criticality of
76         the RM
77
78         :param guid: Guid of the RM upon which the failure of the experiment
79             is evaluated
80         :type guid: int
81
82         """
83         if self._failure_level == FailureLevel.OK:
84             rm = self.ec.get_resource(guid)
85             state = rm.state
86             critical = rm.get("critical")
87
88             if state == ResourceState.FAILED and critical:
89                 self._failure_level = FailureLevel.RM_FAILURE
90                 self._abort = True
91                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
92                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
93
94     def set_ec_failure(self):
95         self._failure_level = FailureLevel.EC_FAILURE
96
97 class ECState(object):
98     """ Possible states of the ExperimentController
99    
100     """
101     RUNNING = 1
102     FAILED = 2
103     RELEASED = 3
104     TERMINATED = 4
105
106 class ExperimentController(object):
107     """
108     .. note::
109
110     An experiment, or scenario, is defined by a concrete set of resources,
111     and the behavior, configuration and interconnection of those resources. 
112     The Experiment Description (ED) is a detailed representation of a
113     single experiment. It contains all the necessary information to 
114     allow repeating the experiment. NEPI allows to describe
115     experiments by registering components (resources), configuring them
116     and interconnecting them.
117     
118     A same experiment (scenario) can be executed many times, generating 
119     different results. We call an experiment execution (instance) a 'run'.
120
121     The ExperimentController (EC), is the entity responsible of
122     managing an experiment run. The same scenario can be 
123     recreated (and re-run) by instantiating an EC and recreating 
124     the same experiment description. 
125
126     An experiment is represented as a graph of interconnected
127     resources. A resource is a generic concept in the sense that any
128     component taking part of an experiment, whether physical of
129     virtual, is considered a resource. A resources could be a host, 
130     a virtual machine, an application, a simulator, a IP address.
131
132     A ResourceManager (RM), is the entity responsible for managing a 
133     single resource. ResourceManagers are specific to a resource
134     type (i.e. An RM to control a Linux application will not be
135     the same as the RM used to control a ns-3 simulation).
136     To support a new type of resource, a new RM must be implemented. 
137     NEPI already provides a variety of RMs to control basic resources, 
138     and new can be extended from the existing ones.
139
140     Through the EC interface the user can create ResourceManagers (RMs),
141     configure them and interconnect them, to describe an experiment.
142     Describing an experiment through the EC does not run the experiment.
143     Only when the 'deploy()' method is invoked on the EC, the EC will take 
144     actions to transform the 'described' experiment into a 'running' experiment.
145
146     While the experiment is running, it is possible to continue to
147     create/configure/connect RMs, and to deploy them to involve new
148     resources in the experiment (this is known as 'interactive' deployment).
149     
150     An experiments in NEPI is identified by a string id, 
151     which is either given by the user, or automatically generated by NEPI.  
152     The purpose of this identifier is to separate files and results that 
153     belong to different experiment scenarios. 
154     However, since a same 'experiment' can be run many times, the experiment
155     id is not enough to identify an experiment instance (run).
156     For this reason, the ExperimentController has two identifier, the 
157     exp_id, which can be re-used in different ExperimentController,
158     and the run_id, which is unique to one ExperimentController instance, and
159     is automatically generated by NEPI.
160    
161     """
162
163     @classmethod
164     def load(cls, filepath, format = SFormats.XML):
165         serializer = ECSerializer()
166         ec = serializer.load(filepath)
167         return ec
168
169     def __init__(self, exp_id = None, local_dir = None, persist = False,
170             fm = None, add_node_callback = None, add_edge_callback = None, 
171             **kwargs):
172         """ ExperimentController entity to model an execute a network 
173         experiment.
174         
175         :param exp_id: Human readable name to identify the experiment
176         :type exp_id: str
177
178         :param local_dir: Path to local directory where to store experiment
179             related files
180         :type local_dir: str
181
182         :param persist: Save an XML description of the experiment after 
183         completion at local_dir
184         :type persist: bool
185
186         :param fm: FailureManager object. If None is given, the default 
187             FailureManager class will be used
188         :type fm: FailureManager
189
190         :param add_node_callback: Callback to invoke for node instantiation
191         when automatic topology creation mode is used 
192         :type add_node_callback: function
193
194         :param add_edge_callback: Callback to invoke for edge instantiation 
195         when automatic topology creation mode is used 
196         :type add_edge_callback: function
197
198         """
199         super(ExperimentController, self).__init__()
200
201         # Logging
202         self._logger = logging.getLogger("ExperimentController")
203
204         # Run identifier. It identifies a concrete execution instance (run) 
205         # of an experiment.
206         # Since a same experiment (same configuration) can be executed many 
207         # times, this run_id permits to separate result files generated on 
208         # different experiment executions
209         self._run_id = tsformat()
210
211         # Experiment identifier. Usually assigned by the user
212         # Identifies the experiment scenario (i.e. configuration, 
213         # resources used, etc)
214         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
215
216         # Local path where to store experiment related files (results, etc)
217         if not local_dir:
218             local_dir = tempfile.gettempdir() # /tmp
219
220         self._local_dir = local_dir
221         self._exp_dir = os.path.join(local_dir, self.exp_id)
222         self._run_dir = os.path.join(self.exp_dir, self.run_id)
223
224         # If True persist the experiment controller in XML format, after completion
225         self._persist = persist
226
227         # generator of globally unique ids
228         self._guid_generator = guid.GuidGenerator()
229         
230         # Resource managers
231         self._resources = dict()
232
233         # Scheduler. It a queue that holds tasks scheduled for
234         # execution, and yields the next task to be executed 
235         # ordered by execution and arrival time
236         self._scheduler = HeapScheduler()
237
238         # Tasks
239         self._tasks = dict()
240
241         # RM groups (for deployment) 
242         self._groups = dict()
243
244         # generator of globally unique id for groups
245         self._group_id_generator = guid.GuidGenerator()
246
247         # Flag to stop processing thread
248         self._stop = False
249     
250         # Entity in charge of managing system failures
251         if not fm:
252             self._fm = FailureManager()
253         self._fm.set_ec(self)
254
255         # EC state
256         self._state = ECState.RUNNING
257
258         # Automatically construct experiment description 
259         self._netgraph = None
260         if add_node_callback or add_edge_callback or kwargs.get("topology"):
261             self._build_from_netgraph(add_node_callback, add_edge_callback, 
262                     **kwargs)
263
264         # The runner is a pool of threads used to parallelize 
265         # execution of tasks
266         self._nthreads = 20
267         self._runner = None
268
269         # Event processing thread
270         self._cond = threading.Condition()
271         self._thread = threading.Thread(target = self._process)
272         self._thread.setDaemon(True)
273         self._thread.start()
274         
275     @property
276     def logger(self):
277         """ Returns the logger instance of the Experiment Controller
278
279         """
280         return self._logger
281
282     @property
283     def fm(self):
284         """ Returns the failure manager
285
286         """
287
288         return self._fm
289
290     @property
291     def failure_level(self):
292         """ Returns the level of FAILURE of th experiment
293
294         """
295
296         return self._fm._failure_level
297
298     @property
299     def ecstate(self):
300         """ Returns the state of the Experiment Controller
301
302         """
303         return self._state
304
305     @property
306     def exp_id(self):
307         """ Returns the experiment id assigned by the user
308
309         """
310         return self._exp_id
311
312     @property
313     def run_id(self):
314         """ Returns the experiment instance (run) identifier (automatically 
315         generated)
316
317         """
318         return self._run_id
319
320     @property
321     def nthreads(self):
322         """ Returns the number of processing nthreads used
323
324         """
325         return self._nthreads
326
327     @property
328     def local_dir(self):
329         """ Root local directory for experiment files
330
331         """
332         return self._local_dir
333
334     @property
335     def exp_dir(self):
336         """ Local directory to store results and other files related to the 
337         experiment.
338
339         """
340         return self._exp_dir
341
342     @property
343     def run_dir(self):
344         """ Local directory to store results and other files related to the 
345         experiment run.
346
347         """
348         return self._run_dir
349
350     @property
351     def persist(self):
352         """ If True, persists the ExperimentController to XML format upon 
353         experiment completion
354
355         """
356         return self._persist
357
358     @property
359     def netgraph(self):
360         """ Return NetGraph instance if experiment description was automatically 
361         generated
362
363         """
364         return self._netgraph
365
366     @property
367     def abort(self):
368         """ Returns True if the experiment has failed and should be interrupted,
369         False otherwise.
370
371         """
372         return self._fm.abort
373
374     def inform_failure(self, guid):
375         """ Reports a failure in a RM to the EC for evaluation
376
377             :param guid: Resource id
378             :type guid: int
379
380         """
381
382         return self._fm.eval_failure(guid)
383
384     def wait_finished(self, guids):
385         """ Blocking method that waits until all RMs in the 'guids' list 
386         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
387         RELEASED ), or until a failure in the experiment occurs 
388         (i.e. abort == True) 
389         
390             :param guids: List of guids
391             :type guids: list
392
393         """
394
395         def quit():
396             return self.abort
397
398         return self.wait(guids, state = ResourceState.STOPPED, 
399                 quit = quit)
400
401     def wait_started(self, guids):
402         """ Blocking method that waits until all RMs in the 'guids' list 
403         have reached a state >= STARTED, or until a failure in the 
404         experiment occurs (i.e. abort == True) 
405         
406             :param guids: List of guids
407             :type guids: list
408
409         """
410
411         def quit():
412             return self.abort
413
414         return self.wait(guids, state = ResourceState.STARTED, 
415                 quit = quit)
416
417     def wait_released(self, guids):
418         """ Blocking method that waits until all RMs in the 'guids' list 
419         have reached a state == RELEASED, or until the EC fails 
420         
421             :param guids: List of guids
422             :type guids: list
423
424         """
425
426         def quit():
427             return self._state == ECState.FAILED
428
429         return self.wait(guids, state = ResourceState.RELEASED, 
430                 quit = quit)
431
432     def wait_deployed(self, guids):
433         """ Blocking method that waits until all RMs in the 'guids' list 
434         have reached a state >= READY, or until a failure in the 
435         experiment occurs (i.e. abort == True) 
436         
437             :param guids: List of guids
438             :type guids: list
439
440         """
441
442         def quit():
443             return self.abort
444
445         return self.wait(guids, state = ResourceState.READY, 
446                 quit = quit)
447
448     def wait(self, guids, state, quit):
449         """ Blocking method that waits until all RMs in the 'guids' list 
450         have reached a state >= 'state', or until the 'quit' callback
451         yields True
452            
453             :param guids: List of guids
454             :type guids: list
455         
456         """
457         if isinstance(guids, int):
458             guids = [guids]
459
460         # Make a copy to avoid modifying the original guids list
461         guids = list(guids)
462
463         while True:
464             # If there are no more guids to wait for
465             # or the quit function returns True, exit the loop
466             if len(guids) == 0 or quit():
467                 break
468
469             # If a guid reached one of the target states, remove it from list
470             guid = guids.pop()
471             rm = self.get_resource(guid)
472             rstate = rm.state
473             
474             if rstate >= state:
475                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
476                     rm.get_rtype(), guid, rstate, state))
477             else:
478                 # Debug...
479                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
480                     guid, rstate, state))
481
482                 guids.append(guid)
483
484                 time.sleep(0.5)
485
486     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
487         plotter = ECPlotter()
488         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
489                 show = show)
490         return fpath
491
492     def serialize(self, format = SFormats.XML):
493         serializer = ECSerializer()
494         sec = serializer.load(self, format = format)
495         return sec
496
497     def save(self, dirpath = None, format = SFormats.XML):
498         if dirpath == None:
499             dirpath = self.run_dir
500
501         try:
502             os.makedirs(dirpath)
503         except OSError:
504             pass
505
506         serializer = ECSerializer()
507         path = serializer.save(self, dirpath, format = format)
508         return path
509
510     def get_task(self, tid):
511         """ Returns a task by its id
512
513             :param tid: Id of the task
514             :type tid: int
515             
516             :rtype: Task
517             
518         """
519         return self._tasks.get(tid)
520
521     def get_resource(self, guid):
522         """ Returns a registered ResourceManager by its guid
523
524             :param guid: Id of the resource
525             :type guid: int
526             
527             :rtype: ResourceManager
528             
529         """
530         rm = self._resources.get(guid)
531         return rm
532
533     def get_resources_by_type(self, rtype):
534         """ Returns the ResourceManager objects of type rtype
535
536             :param rtype: Resource type
537             :type rtype: string
538             
539             :rtype: list of ResourceManagers
540             
541         """
542         rms = []
543         for guid, rm in self._resources.items():
544             if rm.get_rtype() == rtype: 
545                 rms.append(rm)
546         return rms
547
548     def remove_resource(self, guid):
549         del self._resources[guid]
550
551     @property
552     def resources(self):
553         """ Returns the guids of all ResourceManagers 
554
555             :return: Set of all RM guids
556             :rtype: list
557
558         """
559         keys = list(self._resources.keys())
560
561         return keys
562
563     def filter_resources(self, rtype):
564         """ Returns the guids of all ResourceManagers of type rtype
565
566             :param rtype: Resource type
567             :type rtype: string
568             
569             :rtype: list of guids
570             
571         """
572         rms = []
573         for guid, rm in self._resources.items():
574             if rm.get_rtype() == rtype: 
575                 rms.append(rm.guid)
576         return rms
577
578     def register_resource(self, rtype, guid = None):
579         """ Registers a new ResourceManager of type 'rtype' in the experiment
580         
581         This method will assign a new 'guid' for the RM, if no guid
582         is specified.
583
584             :param rtype: Type of the RM
585             :type rtype: str
586
587             :return: Guid of the RM
588             :rtype: int
589             
590         """
591         # Get next available guid
592         # xxx_next_hiccup
593         guid = self._guid_generator.next(guid)
594         
595         # Instantiate RM
596         rm = ResourceFactory.create(rtype, self, guid)
597
598         # Store RM
599         self._resources[guid] = rm
600
601         return guid
602
603     def get_attributes(self, guid):
604         """ Returns all the attributes of the RM with guid 'guid'
605
606             :param guid: Guid of the RM
607             :type guid: int
608
609             :return: List of attributes
610             :rtype: list
611
612         """
613         rm = self.get_resource(guid)
614         return rm.get_attributes()
615
616     def get_attribute(self, guid, name):
617         """ Returns the attribute 'name' of the RM with guid 'guid'
618
619             :param guid: Guid of the RM
620             :type guid: int
621
622             :param name: Name of the attribute
623             :type name: str
624
625             :return: The attribute with name 'name'
626             :rtype: Attribute
627
628         """
629         rm = self.get_resource(guid)
630         return rm.get_attribute(name)
631
632     def register_connection(self, guid1, guid2):
633         """ Registers a connection between a RM with guid 'guid1'
634         and another RM with guid 'guid2'. 
635     
636         The order of the in which the two guids are provided is not
637         important, since the connection relationship is symmetric.
638
639             :param guid1: First guid to connect
640             :type guid1: ResourceManager
641
642             :param guid2: Second guid to connect
643             :type guid: ResourceManager
644
645         """
646         rm1 = self.get_resource(guid1)
647         rm2 = self.get_resource(guid2)
648
649         rm1.register_connection(guid2)
650         rm2.register_connection(guid1)
651
652     def register_condition(self, guids1, action, guids2, state,
653             time = None):
654         """ Registers an action START, STOP or DEPLOY for all RM on list
655         guids1 to occur at time 'time' after all elements in list guids2 
656         have reached state 'state'.
657
658             :param guids1: List of guids of RMs subjected to action
659             :type guids1: list
660
661             :param action: Action to perform (either START, STOP or DEPLOY)
662             :type action: ResourceAction
663
664             :param guids2: List of guids of RMs to we waited for
665             :type guids2: list
666
667             :param state: State to wait for on RMs of list guids2 (STARTED,
668                 STOPPED, etc)
669             :type state: ResourceState
670
671             :param time: Time to wait after guids2 has reached status 
672             :type time: string
673
674         """
675         if isinstance(guids1, int):
676             guids1 = [guids1]
677         if isinstance(guids2, int):
678             guids2 = [guids2]
679
680         for guid1 in guids1:
681             rm = self.get_resource(guid1)
682             rm.register_condition(action, guids2, state, time)
683
684     def enable_trace(self, guid, name):
685         """ Enables a trace to be collected during the experiment run
686
687             :param name: Name of the trace
688             :type name: str
689
690         """
691         rm = self.get_resource(guid)
692         rm.enable_trace(name)
693
694     def trace_enabled(self, guid, name):
695         """ Returns True if the trace of name 'name' is enabled
696
697             :param name: Name of the trace
698             :type name: str
699
700         """
701         rm = self.get_resource(guid)
702         return rm.trace_enabled(name)
703
704     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
705         """ Returns information on a collected trace, the trace stream or 
706         blocks (chunks) of the trace stream
707
708             :param name: Name of the trace
709             :type name: str
710
711             :param attr: Can be one of:
712                          - TraceAttr.ALL (complete trace content), 
713                          - TraceAttr.STREAM (block in bytes to read starting 
714                                 at offset),
715                          - TraceAttr.PATH (full path to the trace file),
716                          - TraceAttr.SIZE (size of trace file). 
717             :type attr: str
718
719             :param block: Number of bytes to retrieve from trace, when attr is 
720                 TraceAttr.STREAM 
721             :type name: int
722
723             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
724             :type name: int
725
726             :rtype: str
727
728         """
729         rm = self.get_resource(guid)
730         return rm.trace(name, attr, block, offset)
731
732     def get_traces(self, guid):
733         """ Returns the list of the trace names of the RM with guid 'guid'
734
735             :param guid: Guid of the RM
736             :type guid: int
737
738             :return: List of trace names
739             :rtype: list
740
741         """
742         rm = self.get_resource(guid)
743         return rm.get_traces()
744
745
746     def discover(self, guid):
747         """ Discovers an available resource matching the criteria defined
748         by the RM with guid 'guid', and associates that resource to the RM
749
750         Not all RM types require (or are capable of) performing resource 
751         discovery. For the RM types which are not capable of doing so, 
752         invoking this method does not have any consequences. 
753
754             :param guid: Guid of the RM
755             :type guid: int
756
757         """
758         rm = self.get_resource(guid)
759         return rm.discover()
760
761     def provision(self, guid):
762         """ Provisions the resource associated to the RM with guid 'guid'.
763
764         Provisioning means making a resource 'accessible' to the user. 
765         Not all RM types require (or are capable of) performing resource 
766         provisioning. For the RM types which are not capable of doing so, 
767         invoking this method does not have any consequences. 
768
769             :param guid: Guid of the RM
770             :type guid: int
771
772         """
773         rm = self.get_resource(guid)
774         return rm.provision()
775
776     def get(self, guid, name):
777         """ Returns the value of the attribute with name 'name' on the
778         RM with guid 'guid'
779
780             :param guid: Guid of the RM
781             :type guid: int
782
783             :param name: Name of the attribute 
784             :type name: str
785
786             :return: The value of the attribute with name 'name'
787
788         """
789         rm = self.get_resource(guid)
790         return rm.get(name)
791
792     def set(self, guid, name, value):
793         """ Modifies the value of the attribute with name 'name' on the 
794         RM with guid 'guid'.
795
796             :param guid: Guid of the RM
797             :type guid: int
798
799             :param name: Name of the attribute
800             :type name: str
801
802             :param value: Value of the attribute
803
804         """
805         rm = self.get_resource(guid)
806         rm.set(name, value)
807
808     def get_global(self, rtype, name):
809         """ Returns the value of the global attribute with name 'name' on the
810         RMs of rtype 'rtype'.
811
812             :param guid: Guid of the RM
813             :type guid: int
814
815             :param name: Name of the attribute 
816             :type name: str
817
818             :return: The value of the attribute with name 'name'
819
820         """
821         rclass = ResourceFactory.get_resource_type(rtype)
822         return rclass.get_global(name)
823
824     def set_global(self, rtype, name, value):
825         """ Modifies the value of the global attribute with name 'name' on the 
826         RMs of with rtype 'rtype'.
827
828             :param guid: Guid of the RM
829             :type guid: int
830
831             :param name: Name of the attribute
832             :type name: str
833
834             :param value: Value of the attribute
835
836         """
837         rclass = ResourceFactory.get_resource_type(rtype)
838         return rclass.set_global(name, value)
839
840     def state(self, guid, hr = False):
841         """ Returns the state of a resource
842
843             :param guid: Resource guid
844             :type guid: integer
845
846             :param hr: Human readable. Forces return of a 
847                 status string instead of a number 
848             :type hr: boolean
849
850         """
851         rm = self.get_resource(guid)
852         state = rm.state
853
854         if hr:
855             return ResourceState2str.get(state)
856
857         return state
858
859     def stop(self, guid):
860         """ Stops the RM with guid 'guid'
861
862         Stopping a RM means that the resource it controls will
863         no longer take part of the experiment.
864
865             :param guid: Guid of the RM
866             :type guid: int
867
868         """
869         rm = self.get_resource(guid)
870         return rm.stop()
871
872     def start(self, guid):
873         """ Starts the RM with guid 'guid'
874
875         Starting a RM means that the resource it controls will
876         begin taking part of the experiment.
877
878             :param guid: Guid of the RM
879             :type guid: int
880
881         """
882         rm = self.get_resource(guid)
883         return rm.start()
884
885     def get_start_time(self, guid):
886         """ Returns the start time of the RM as a timestamp """
887         rm = self.get_resource(guid)
888         return rm.start_time
889
890     def get_stop_time(self, guid):
891         """ Returns the stop time of the RM as a timestamp """
892         rm = self.get_resource(guid)
893         return rm.stop_time
894
895     def get_discover_time(self, guid):
896         """ Returns the discover time of the RM as a timestamp """
897         rm = self.get_resource(guid)
898         return rm.discover_time
899
900     def get_provision_time(self, guid):
901         """ Returns the provision time of the RM as a timestamp """
902         rm = self.get_resource(guid)
903         return rm.provision_time
904
905     def get_ready_time(self, guid):
906         """ Returns the deployment time of the RM as a timestamp """
907         rm = self.get_resource(guid)
908         return rm.ready_time
909
910     def get_release_time(self, guid):
911         """ Returns the release time of the RM as a timestamp """
912         rm = self.get_resource(guid)
913         return rm.release_time
914
915     def get_failed_time(self, guid):
916         """ Returns the time failure occured for the RM as a timestamp """
917         rm = self.get_resource(guid)
918         return rm.failed_time
919
920     def set_with_conditions(self, name, value, guids1, guids2, state,
921             time = None):
922         """ Modifies the value of attribute with name 'name' on all RMs 
923         on the guids1 list when time 'time' has elapsed since all 
924         elements in guids2 list have reached state 'state'.
925
926             :param name: Name of attribute to set in RM
927             :type name: string
928
929             :param value: Value of attribute to set in RM
930             :type name: string
931
932             :param guids1: List of guids of RMs subjected to action
933             :type guids1: list
934
935             :param action: Action to register (either START or STOP)
936             :type action: ResourceAction
937
938             :param guids2: List of guids of RMs to we waited for
939             :type guids2: list
940
941             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
942             :type state: ResourceState
943
944             :param time: Time to wait after guids2 has reached status 
945             :type time: string
946
947         """
948         if isinstance(guids1, int):
949             guids1 = [guids1]
950         if isinstance(guids2, int):
951             guids2 = [guids2]
952
953         for guid1 in guids1:
954             rm = self.get_resource(guid)
955             rm.set_with_conditions(name, value, guids2, state, time)
956
957     def deploy(self, guids = None, wait_all_ready = True, group = None):
958         """ Deploys all ResourceManagers in the guids list. 
959         
960         If the argument 'guids' is not given, all RMs with state NEW
961         are deployed.
962
963             :param guids: List of guids of RMs to deploy
964             :type guids: list
965
966             :param wait_all_ready: Wait until all RMs are ready in
967                 order to start the RMs
968             :type guid: int
969
970             :param group: Id of deployment group in which to deploy RMs
971             :type group: int
972
973         """
974         self.logger.debug(" ------- DEPLOY START ------ ")
975
976         if not guids:
977             # If no guids list was passed, all 'NEW' RMs will be deployed
978             guids = []
979             for guid, rm in self._resources.items():
980                 if rm.state == ResourceState.NEW:
981                     guids.append(guid)
982                 
983         if isinstance(guids, int):
984             guids = [guids]
985
986         # Create deployment group
987         # New guids can be added to a same deployment group later on
988         new_group = False
989         if not group:
990             new_group = True
991             # xxx_next_hiccup
992             group = self._group_id_generator.next()
993
994         if group not in self._groups:
995             self._groups[group] = []
996
997         self._groups[group].extend(guids)
998
999         def wait_all_and_start(group):
1000             # Function that checks if all resources are READY
1001             # before scheduling a start_with_conditions for each RM
1002             reschedule = False
1003             
1004             # Get all guids in group
1005             guids = self._groups[group]
1006
1007             for guid in guids:
1008                 if self.state(guid) < ResourceState.READY:
1009                     reschedule = True
1010                     break
1011
1012             if reschedule:
1013                 callback = functools.partial(wait_all_and_start, group)
1014                 self.schedule("1s", callback)
1015             else:
1016                 # If all resources are ready, we schedule the start
1017                 for guid in guids:
1018                     rm = self.get_resource(guid)
1019                     self.schedule("0s", rm.start_with_conditions)
1020
1021                     if rm.conditions.get(ResourceAction.STOP):
1022                         # Only if the RM has STOP conditions we
1023                         # schedule a stop. Otherwise the RM will stop immediately
1024                         self.schedule("0s", rm.stop_with_conditions)
1025
1026         if wait_all_ready and new_group:
1027             # Schedule a function to check that all resources are
1028             # READY, and only then schedule the start.
1029             # This aims at reducing the number of tasks looping in the 
1030             # scheduler. 
1031             # Instead of having many start tasks, we will have only one for 
1032             # the whole group.
1033             callback = functools.partial(wait_all_and_start, group)
1034             self.schedule("0s", callback)
1035
1036         for guid in guids:
1037             rm = self.get_resource(guid)
1038             rm.deployment_group = group
1039             self.schedule("0s", rm.deploy_with_conditions)
1040
1041             if not wait_all_ready:
1042                 self.schedule("0s", rm.start_with_conditions)
1043
1044                 if rm.conditions.get(ResourceAction.STOP):
1045                     # Only if the RM has STOP conditions we
1046                     # schedule a stop. Otherwise the RM will stop immediately
1047                     self.schedule("0s", rm.stop_with_conditions)
1048
1049     def release(self, guids = None):
1050         """ Releases all ResourceManagers in the guids list.
1051
1052         If the argument 'guids' is not given, all RMs registered
1053         in the experiment are released.
1054
1055             :param guids: List of RM guids
1056             :type guids: list
1057
1058         """
1059         if self._state == ECState.RELEASED:
1060             return 
1061
1062         if isinstance(guids, int):
1063             guids = [guids]
1064
1065         if not guids:
1066             guids = self.resources
1067
1068         for guid in guids:
1069             rm = self.get_resource(guid)
1070             self.schedule("0s", rm.release)
1071
1072         self.wait_released(guids)
1073
1074         if self.persist:
1075             self.save()
1076
1077         for guid in guids:
1078             if self.get(guid, "hardRelease"):
1079                 self.remove_resource(guid)\
1080
1081         # Mark the EC state as RELEASED
1082         self._state = ECState.RELEASED
1083         
1084     def shutdown(self):
1085         """ Releases all resources and stops the ExperimentController
1086
1087         """
1088         # If there was a major failure we can't exit gracefully
1089         if self._state == ECState.FAILED:
1090             raise RuntimeError("EC failure. Can not exit gracefully")
1091
1092         # Remove all pending tasks from the scheduler queue
1093         for tid in list(self._scheduler.pending):
1094             self._scheduler.remove(tid)
1095
1096         # Remove pending tasks from the workers queue
1097         self._runner.empty()
1098
1099         self.release()
1100
1101         # Mark the EC state as TERMINATED
1102         self._state = ECState.TERMINATED
1103
1104         # Stop processing thread
1105         self._stop = True
1106
1107         # Notify condition to wake up the processing thread
1108         self._notify()
1109         
1110         if self._thread.is_alive():
1111            self._thread.join()
1112
1113     def schedule(self, date, callback, track = False):
1114         """ Schedules a callback to be executed at time 'date'.
1115
1116             :param date: string containing execution time for the task.
1117                     It can be expressed as an absolute time, using
1118                     timestamp format, or as a relative time matching
1119                     ^\d+.\d+(h|m|s|ms|us)$
1120
1121             :param callback: code to be executed for the task. Must be a
1122                         Python function, and receives args and kwargs
1123                         as arguments.
1124
1125             :param track: if set to True, the task will be retrievable with
1126                     the get_task() method
1127
1128             :return : The Id of the task
1129             :rtype: int
1130             
1131         """
1132         timestamp = stabsformat(date)
1133         task = Task(timestamp, callback)
1134         task = self._scheduler.schedule(task)
1135
1136         if track:
1137             self._tasks[task.id] = task
1138
1139         # Notify condition to wake up the processing thread
1140         self._notify()
1141
1142         return task.id
1143      
1144     def _process(self):
1145         """ Process scheduled tasks.
1146
1147         .. note::
1148         
1149         Tasks are scheduled by invoking the schedule method with a target 
1150         callback and an execution time. 
1151         The schedule method creates a new Task object with that callback 
1152         and execution time, and pushes it into the '_scheduler' queue. 
1153         The execution time and the order of arrival of tasks are used 
1154         to order the tasks in the queue.
1155
1156         The _process method is executed in an independent thread held by 
1157         the ExperimentController for as long as the experiment is running.
1158         This method takes tasks from the '_scheduler' queue in a loop 
1159         and processes them in parallel using multithreading. 
1160         The environmental variable NEPI_NTHREADS can be used to control
1161         the number of threads used to process tasks. The default value is 
1162         50.
1163
1164         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1165         This object keeps a pool of threads (workers), and a queue of tasks
1166         scheduled for 'immediate' execution. 
1167         
1168         On each iteration, the '_process' loop will take the next task that 
1169         is scheduled for 'future' execution from the '_scheduler' queue, 
1170         and if the execution time of that task is >= to the current time, 
1171         it will push that task into the PR for 'immediate execution'. 
1172         As soon as a worker is free, the PR will assign the next task to
1173         that worker.
1174
1175         Upon receiving a task to execute, each PR worker (thread) will 
1176         invoke the  _execute method of the EC, passing the task as 
1177         argument.         
1178         The _execute method will then invoke task.callback inside a 
1179         try/except block. If an exception is raised by the tasks.callback, 
1180         it will be trapped by the try block, logged to standard error 
1181         (usually the console), and the task will be marked as failed.
1182
1183         """
1184
1185         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1186         self._runner = ParallelRun(maxthreads = self.nthreads)
1187         self._runner.start()
1188
1189         while not self._stop:
1190             try:
1191                 self._cond.acquire()
1192
1193                 task = next(self._scheduler)
1194                 
1195                 if not task:
1196                     # No task to execute. Wait for a new task to be scheduled.
1197                     self._cond.wait()
1198                 else:
1199                     # The task timestamp is in the future. Wait for timeout 
1200                     # or until another task is scheduled.
1201                     now = tnow()
1202                     if now < task.timestamp:
1203                         # Calculate timeout in seconds
1204                         timeout = tdiffsec(task.timestamp, now)
1205
1206                         # Re-schedule task with the same timestamp
1207                         self._scheduler.schedule(task)
1208                         
1209                         task = None
1210
1211                         # Wait timeout or until a new task awakes the condition
1212                         self._cond.wait(timeout)
1213                
1214                 self._cond.release()
1215
1216                 if task:
1217                     # Process tasks in parallel
1218                     self._runner.put(self._execute, task)
1219             except: 
1220                 import traceback
1221                 err = traceback.format_exc()
1222                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1223
1224                 # Set the EC to FAILED state 
1225                 self._state = ECState.FAILED
1226             
1227                 # Set the FailureManager failure level to EC failure
1228                 self._fm.set_ec_failure()
1229
1230         self.logger.debug("Exiting the task processing loop ... ")
1231         
1232         self._runner.sync()
1233         self._runner.destroy()
1234
1235     def _execute(self, task):
1236         """ Executes a single task. 
1237
1238             :param task: Object containing the callback to execute
1239             :type task: Task
1240
1241         """
1242         try:
1243             # Invoke callback
1244             task.result = task.callback()
1245             task.status = TaskStatus.DONE
1246         except:
1247             import traceback
1248             err = traceback.format_exc()
1249             task.result = err
1250             task.status = TaskStatus.ERROR
1251             
1252             self.logger.error("Error occurred while executing task: %s" % err)
1253
1254     def _notify(self):
1255         """ Awakes the processing thread if it is blocked waiting 
1256         for new tasks to arrive
1257         
1258         """
1259         self._cond.acquire()
1260         self._cond.notify()
1261         self._cond.release()
1262
1263     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1264             **kwargs):
1265         """ Automates experiment description using a NetGraph instance.
1266         """
1267         self._netgraph = NetGraph(**kwargs)
1268
1269         if add_node_callback:
1270             ### Add resources to the EC
1271             for nid in self.netgraph.nodes():
1272                 add_node_callback(self, nid)
1273
1274         if add_edge_callback:
1275             #### Add connections between resources
1276             for nid1, nid2 in self.netgraph.edges():
1277                 add_edge_callback(self, nid1, nid2)
1278