register_resource now also recognizes the connectedTo keyword
[nepi.git] / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 # historical note: this class used to be in util/guid.py but is used only here
106 # FIXME: This class is not thread-safe. Should it be made thread-safe?
107 class GuidGenerator(object):
108     def __init__(self):
109         self._last_guid = 0
110
111     # historical note: this used to be called `next`
112     # which confused 2to3 - and me - while it has
113     # nothing to do at all with the iteration protocol
114     def generate(self, guid = None):
115         if guid == None:
116             guid = self._last_guid + 1
117
118         self._last_guid = self._last_guid if guid <= self._last_guid else guid
119
120         return guid
121
122 class ExperimentController(object):
123     """
124     .. note::
125
126     An experiment, or scenario, is defined by a concrete set of resources,
127     and the behavior, configuration and interconnection of those resources. 
128     The Experiment Description (ED) is a detailed representation of a
129     single experiment. It contains all the necessary information to 
130     allow repeating the experiment. NEPI allows to describe
131     experiments by registering components (resources), configuring them
132     and interconnecting them.
133     
134     A same experiment (scenario) can be executed many times, generating 
135     different results. We call an experiment execution (instance) a 'run'.
136
137     The ExperimentController (EC), is the entity responsible of
138     managing an experiment run. The same scenario can be 
139     recreated (and re-run) by instantiating an EC and recreating 
140     the same experiment description. 
141
142     An experiment is represented as a graph of interconnected
143     resources. A resource is a generic concept in the sense that any
144     component taking part of an experiment, whether physical of
145     virtual, is considered a resource. A resources could be a host, 
146     a virtual machine, an application, a simulator, a IP address.
147
148     A ResourceManager (RM), is the entity responsible for managing a 
149     single resource. ResourceManagers are specific to a resource
150     type (i.e. An RM to control a Linux application will not be
151     the same as the RM used to control a ns-3 simulation).
152     To support a new type of resource, a new RM must be implemented. 
153     NEPI already provides a variety of RMs to control basic resources, 
154     and new can be extended from the existing ones.
155
156     Through the EC interface the user can create ResourceManagers (RMs),
157     configure them and interconnect them, to describe an experiment.
158     Describing an experiment through the EC does not run the experiment.
159     Only when the 'deploy()' method is invoked on the EC, the EC will take 
160     actions to transform the 'described' experiment into a 'running' experiment.
161
162     While the experiment is running, it is possible to continue to
163     create/configure/connect RMs, and to deploy them to involve new
164     resources in the experiment (this is known as 'interactive' deployment).
165     
166     An experiments in NEPI is identified by a string id, 
167     which is either given by the user, or automatically generated by NEPI.  
168     The purpose of this identifier is to separate files and results that 
169     belong to different experiment scenarios. 
170     However, since a same 'experiment' can be run many times, the experiment
171     id is not enough to identify an experiment instance (run).
172     For this reason, the ExperimentController has two identifier, the 
173     exp_id, which can be re-used in different ExperimentController,
174     and the run_id, which is unique to one ExperimentController instance, and
175     is automatically generated by NEPI.
176    
177     """
178
179     @classmethod
180     def load(cls, filepath, format = SFormats.XML):
181         serializer = ECSerializer()
182         ec = serializer.load(filepath)
183         return ec
184
185     def __init__(self, exp_id = None, local_dir = None, persist = False,
186             fm = None, add_node_callback = None, add_edge_callback = None, 
187             **kwargs):
188         """ ExperimentController entity to model an execute a network 
189         experiment.
190         
191         :param exp_id: Human readable name to identify the experiment
192         :type exp_id: str
193
194         :param local_dir: Path to local directory where to store experiment
195             related files
196         :type local_dir: str
197
198         :param persist: Save an XML description of the experiment after 
199         completion at local_dir
200         :type persist: bool
201
202         :param fm: FailureManager object. If None is given, the default 
203             FailureManager class will be used
204         :type fm: FailureManager
205
206         :param add_node_callback: Callback to invoke for node instantiation
207         when automatic topology creation mode is used 
208         :type add_node_callback: function
209
210         :param add_edge_callback: Callback to invoke for edge instantiation 
211         when automatic topology creation mode is used 
212         :type add_edge_callback: function
213
214         """
215         super(ExperimentController, self).__init__()
216
217         # Logging
218         self._logger = logging.getLogger("ExperimentController")
219
220         # Run identifier. It identifies a concrete execution instance (run) 
221         # of an experiment.
222         # Since a same experiment (same configuration) can be executed many 
223         # times, this run_id permits to separate result files generated on 
224         # different experiment executions
225         self._run_id = tsformat()
226
227         # Experiment identifier. Usually assigned by the user
228         # Identifies the experiment scenario (i.e. configuration, 
229         # resources used, etc)
230         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
231
232         # Local path where to store experiment related files (results, etc)
233         if not local_dir:
234             local_dir = tempfile.gettempdir() # /tmp
235
236         self._local_dir = local_dir
237         self._exp_dir = os.path.join(local_dir, self.exp_id)
238         self._run_dir = os.path.join(self.exp_dir, self.run_id)
239
240         # If True persist the experiment controller in XML format, after completion
241         self._persist = persist
242
243         # generator of globally unique ids
244         self._guid_generator = GuidGenerator()
245         
246         # Resource managers
247         self._resources = dict()
248
249         # Scheduler. It a queue that holds tasks scheduled for
250         # execution, and yields the next task to be executed 
251         # ordered by execution and arrival time
252         self._scheduler = HeapScheduler()
253
254         # Tasks
255         self._tasks = dict()
256
257         # RM groups (for deployment) 
258         self._groups = dict()
259
260         # generator of globally unique id for groups
261         self._group_id_generator = GuidGenerator()
262
263         # Flag to stop processing thread
264         self._stop = False
265     
266         # Entity in charge of managing system failures
267         if not fm:
268             self._fm = FailureManager()
269         self._fm.set_ec(self)
270
271         # EC state
272         self._state = ECState.RUNNING
273
274         # Automatically construct experiment description 
275         self._netgraph = None
276         if add_node_callback or add_edge_callback or kwargs.get("topology"):
277             self._build_from_netgraph(add_node_callback, add_edge_callback, 
278                     **kwargs)
279
280         # The runner is a pool of threads used to parallelize 
281         # execution of tasks
282         self._nthreads = 20
283         self._runner = None
284
285         # Event processing thread
286         self._cond = threading.Condition()
287         self._thread = threading.Thread(target = self._process)
288         self._thread.setDaemon(True)
289         self._thread.start()
290         
291     @property
292     def logger(self):
293         """ Returns the logger instance of the Experiment Controller
294
295         """
296         return self._logger
297
298     @property
299     def fm(self):
300         """ Returns the failure manager
301
302         """
303
304         return self._fm
305
306     @property
307     def failure_level(self):
308         """ Returns the level of FAILURE of th experiment
309
310         """
311
312         return self._fm._failure_level
313
314     @property
315     def ecstate(self):
316         """ Returns the state of the Experiment Controller
317
318         """
319         return self._state
320
321     @property
322     def exp_id(self):
323         """ Returns the experiment id assigned by the user
324
325         """
326         return self._exp_id
327
328     @property
329     def run_id(self):
330         """ Returns the experiment instance (run) identifier (automatically 
331         generated)
332
333         """
334         return self._run_id
335
336     @property
337     def nthreads(self):
338         """ Returns the number of processing nthreads used
339
340         """
341         return self._nthreads
342
343     @property
344     def local_dir(self):
345         """ Root local directory for experiment files
346
347         """
348         return self._local_dir
349
350     @property
351     def exp_dir(self):
352         """ Local directory to store results and other files related to the 
353         experiment.
354
355         """
356         return self._exp_dir
357
358     @property
359     def run_dir(self):
360         """ Local directory to store results and other files related to the 
361         experiment run.
362
363         """
364         return self._run_dir
365
366     @property
367     def persist(self):
368         """ If True, persists the ExperimentController to XML format upon 
369         experiment completion
370
371         """
372         return self._persist
373
374     @property
375     def netgraph(self):
376         """ Return NetGraph instance if experiment description was automatically 
377         generated
378
379         """
380         return self._netgraph
381
382     @property
383     def abort(self):
384         """ Returns True if the experiment has failed and should be interrupted,
385         False otherwise.
386
387         """
388         return self._fm.abort
389
390     def inform_failure(self, guid):
391         """ Reports a failure in a RM to the EC for evaluation
392
393             :param guid: Resource id
394             :type guid: int
395
396         """
397
398         return self._fm.eval_failure(guid)
399
400     def wait_finished(self, guids):
401         """ Blocking method that waits until all RMs in the 'guids' list 
402         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
403         RELEASED ), or until a failure in the experiment occurs 
404         (i.e. abort == True) 
405         
406             :param guids: List of guids
407             :type guids: list
408
409         """
410
411         def quit():
412             return self.abort
413
414         return self.wait(guids, state = ResourceState.STOPPED, 
415                 quit = quit)
416
417     def wait_started(self, guids):
418         """ Blocking method that waits until all RMs in the 'guids' list 
419         have reached a state >= STARTED, or until a failure in the 
420         experiment occurs (i.e. abort == True) 
421         
422             :param guids: List of guids
423             :type guids: list
424
425         """
426
427         def quit():
428             return self.abort
429
430         return self.wait(guids, state = ResourceState.STARTED, 
431                 quit = quit)
432
433     def wait_released(self, guids):
434         """ Blocking method that waits until all RMs in the 'guids' list 
435         have reached a state == RELEASED, or until the EC fails 
436         
437             :param guids: List of guids
438             :type guids: list
439
440         """
441
442         def quit():
443             return self._state == ECState.FAILED
444
445         return self.wait(guids, state = ResourceState.RELEASED, 
446                 quit = quit)
447
448     def wait_deployed(self, guids):
449         """ Blocking method that waits until all RMs in the 'guids' list 
450         have reached a state >= READY, or until a failure in the 
451         experiment occurs (i.e. abort == True) 
452         
453             :param guids: List of guids
454             :type guids: list
455
456         """
457
458         def quit():
459             return self.abort
460
461         return self.wait(guids, state = ResourceState.READY, 
462                 quit = quit)
463
464     def wait(self, guids, state, quit):
465         """ Blocking method that waits until all RMs in the 'guids' list 
466         have reached a state >= 'state', or until the 'quit' callback
467         yields True
468            
469             :param guids: List of guids
470             :type guids: list
471         
472         """
473         if isinstance(guids, int):
474             guids = [guids]
475
476         # Make a copy to avoid modifying the original guids list
477         guids = list(guids)
478
479         while True:
480             # If there are no more guids to wait for
481             # or the quit function returns True, exit the loop
482             if len(guids) == 0 or quit():
483                 break
484
485             # If a guid reached one of the target states, remove it from list
486             guid = guids.pop()
487             rm = self.get_resource(guid)
488             rstate = rm.state
489             
490             if rstate >= state:
491                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
492                     rm.get_rtype(), guid, rstate, state))
493             else:
494                 # Debug...
495                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
496                     guid, rstate, state))
497
498                 guids.append(guid)
499
500                 time.sleep(0.5)
501
502     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
503         plotter = ECPlotter()
504         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
505                 show = show)
506         return fpath
507
508     def serialize(self, format = SFormats.XML):
509         serializer = ECSerializer()
510         sec = serializer.load(self, format = format)
511         return sec
512
513     def save(self, dirpath = None, format = SFormats.XML):
514         if dirpath == None:
515             dirpath = self.run_dir
516
517         try:
518             os.makedirs(dirpath)
519         except OSError:
520             pass
521
522         serializer = ECSerializer()
523         path = serializer.save(self, dirpath, format = format)
524         return path
525
526     def get_task(self, tid):
527         """ Returns a task by its id
528
529             :param tid: Id of the task
530             :type tid: int
531             
532             :rtype: Task
533             
534         """
535         return self._tasks.get(tid)
536
537     def get_resource(self, guid):
538         """ Returns a registered ResourceManager by its guid
539
540             :param guid: Id of the resource
541             :type guid: int
542             
543             :rtype: ResourceManager
544             
545         """
546         rm = self._resources.get(guid)
547         return rm
548
549     def get_resources_by_type(self, rtype):
550         """ Returns the ResourceManager objects of type rtype
551
552             :param rtype: Resource type
553             :type rtype: string
554             
555             :rtype: list of ResourceManagers
556             
557         """
558         rms = []
559         for guid, rm in self._resources.items():
560             if rm.get_rtype() == rtype: 
561                 rms.append(rm)
562         return rms
563
564     def remove_resource(self, guid):
565         del self._resources[guid]
566
567     @property
568     def resources(self):
569         """ Returns the guids of all ResourceManagers 
570
571             :return: Set of all RM guids
572             :rtype: list
573
574         """
575         keys = list(self._resources.keys())
576
577         return keys
578
579     def filter_resources(self, rtype):
580         """ Returns the guids of all ResourceManagers of type rtype
581
582             :param rtype: Resource type
583             :type rtype: string
584             
585             :rtype: list of guids
586             
587         """
588         rms = []
589         for guid, rm in self._resources.items():
590             if rm.get_rtype() == rtype: 
591                 rms.append(rm.guid)
592         return rms
593
594     def register_resource(self, rtype, guid = None, **keywords):
595         """ Registers a new ResourceManager of type 'rtype' in the experiment
596         
597         This method will assign a new 'guid' for the RM, if no guid
598         is specified.
599
600             :param rtype: Type of the RM
601             :type rtype: str
602
603             :return: Guid of the RM
604             :rtype: int
605             
606         Specifying additional keywords results in the following actions
607         * autoDeploy:
608           boolean: if set, causes the created object to be `deploy`ed
609           before returning from register_resource
610         * connectedTo:
611           resourceObject: if set, causes the `register_connection` method
612           to be called before returning and after auto-deployment if relevant
613         * other keywords are used to call `set` to set attributes
614
615         Example:
616         app = ec.register_resource("linux::Application",
617                                     command = "systemctl start httpd",
618                                     autoDeploy = True,
619                                     connectedTo = node)
620         ### instead of
621         app = ec.register_resource("linux::Application",
622         ec.set(app, "command", "systemctl start httpd")
623         ec.deploy(app)
624         ec.register_connection(app, node) 
625
626         """
627         # Get next available guid
628         # xxx_next_hiccup
629         guid = self._guid_generator.generate(guid)
630         
631         # Instantiate RM
632         rm = ResourceFactory.create(rtype, self, guid)
633
634         # Store RM
635         self._resources[guid] = rm
636
637         ### special keywords
638         specials = []
639
640         # is there a need to call deploy
641         special = 'autoDeploy'
642         specials.append(special)
643         auto_deploy = special in keywords and keywords[special]
644
645         # is there a need to call register_connection, and if so to what
646         special = 'connectedTo'
647         specials.append(special)
648         connected_to = special in keywords and keywords[special]
649
650         ### now we can do all the calls to 'set'
651         for name, value in keywords.items():
652             # specials are handled locally and not propagated to 'set'
653             if name not in specials:
654                 self.set(guid, name, value)
655
656         ### deal with specials
657         if auto_deploy:
658             self.deploy(guid)
659         if connected_to:
660             self.register_connection(guid, connected_to)
661
662         return guid
663
664     def get_attributes(self, guid):
665         """ Returns all the attributes of the RM with guid 'guid'
666
667             :param guid: Guid of the RM
668             :type guid: int
669
670             :return: List of attributes
671             :rtype: list
672
673         """
674         rm = self.get_resource(guid)
675         return rm.get_attributes()
676
677     def get_attribute(self, guid, name):
678         """ Returns the attribute 'name' of the RM with guid 'guid'
679
680             :param guid: Guid of the RM
681             :type guid: int
682
683             :param name: Name of the attribute
684             :type name: str
685
686             :return: The attribute with name 'name'
687             :rtype: Attribute
688
689         """
690         rm = self.get_resource(guid)
691         return rm.get_attribute(name)
692
693     def register_connection(self, guid1, guid2):
694         """ Registers a connection between a RM with guid 'guid1'
695         and another RM with guid 'guid2'. 
696     
697         The order of the in which the two guids are provided is not
698         important, since the connection relationship is symmetric.
699
700             :param guid1: First guid to connect
701             :type guid1: ResourceManager
702
703             :param guid2: Second guid to connect
704             :type guid: ResourceManager
705
706         """
707         rm1 = self.get_resource(guid1)
708         rm2 = self.get_resource(guid2)
709
710         rm1.register_connection(guid2)
711         rm2.register_connection(guid1)
712
713     def register_condition(self, guids1, action, guids2, state,
714             time = None):
715         """ Registers an action START, STOP or DEPLOY for all RM on list
716         guids1 to occur at time 'time' after all elements in list guids2 
717         have reached state 'state'.
718
719             :param guids1: List of guids of RMs subjected to action
720             :type guids1: list
721
722             :param action: Action to perform (either START, STOP or DEPLOY)
723             :type action: ResourceAction
724
725             :param guids2: List of guids of RMs to we waited for
726             :type guids2: list
727
728             :param state: State to wait for on RMs of list guids2 (STARTED,
729                 STOPPED, etc)
730             :type state: ResourceState
731
732             :param time: Time to wait after guids2 has reached status 
733             :type time: string
734
735         """
736         if isinstance(guids1, int):
737             guids1 = [guids1]
738         if isinstance(guids2, int):
739             guids2 = [guids2]
740
741         for guid1 in guids1:
742             rm = self.get_resource(guid1)
743             rm.register_condition(action, guids2, state, time)
744
745     def enable_trace(self, guid, name):
746         """ Enables a trace to be collected during the experiment run
747
748             :param name: Name of the trace
749             :type name: str
750
751         """
752         rm = self.get_resource(guid)
753         rm.enable_trace(name)
754
755     def trace_enabled(self, guid, name):
756         """ Returns True if the trace of name 'name' is enabled
757
758             :param name: Name of the trace
759             :type name: str
760
761         """
762         rm = self.get_resource(guid)
763         return rm.trace_enabled(name)
764
765     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
766         """ Returns information on a collected trace, the trace stream or 
767         blocks (chunks) of the trace stream
768
769             :param name: Name of the trace
770             :type name: str
771
772             :param attr: Can be one of:
773                          - TraceAttr.ALL (complete trace content), 
774                          - TraceAttr.STREAM (block in bytes to read starting 
775                                 at offset),
776                          - TraceAttr.PATH (full path to the trace file),
777                          - TraceAttr.SIZE (size of trace file). 
778             :type attr: str
779
780             :param block: Number of bytes to retrieve from trace, when attr is 
781                 TraceAttr.STREAM 
782             :type name: int
783
784             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
785             :type name: int
786
787             :rtype: str
788
789         """
790         rm = self.get_resource(guid)
791         return rm.trace(name, attr, block, offset)
792
793     def get_traces(self, guid):
794         """ Returns the list of the trace names of the RM with guid 'guid'
795
796             :param guid: Guid of the RM
797             :type guid: int
798
799             :return: List of trace names
800             :rtype: list
801
802         """
803         rm = self.get_resource(guid)
804         return rm.get_traces()
805
806
807     def discover(self, guid):
808         """ Discovers an available resource matching the criteria defined
809         by the RM with guid 'guid', and associates that resource to the RM
810
811         Not all RM types require (or are capable of) performing resource 
812         discovery. For the RM types which are not capable of doing so, 
813         invoking this method does not have any consequences. 
814
815             :param guid: Guid of the RM
816             :type guid: int
817
818         """
819         rm = self.get_resource(guid)
820         return rm.discover()
821
822     def provision(self, guid):
823         """ Provisions the resource associated to the RM with guid 'guid'.
824
825         Provisioning means making a resource 'accessible' to the user. 
826         Not all RM types require (or are capable of) performing resource 
827         provisioning. For the RM types which are not capable of doing so, 
828         invoking this method does not have any consequences. 
829
830             :param guid: Guid of the RM
831             :type guid: int
832
833         """
834         rm = self.get_resource(guid)
835         return rm.provision()
836
837     def get(self, guid, name):
838         """ Returns the value of the attribute with name 'name' on the
839         RM with guid 'guid'
840
841             :param guid: Guid of the RM
842             :type guid: int
843
844             :param name: Name of the attribute 
845             :type name: str
846
847             :return: The value of the attribute with name 'name'
848
849         """
850         rm = self.get_resource(guid)
851         return rm.get(name)
852
853     def set(self, guid, name, value):
854         """ Modifies the value of the attribute with name 'name' on the 
855         RM with guid 'guid'.
856
857             :param guid: Guid of the RM
858             :type guid: int
859
860             :param name: Name of the attribute
861             :type name: str
862
863             :param value: Value of the attribute
864
865         """
866         rm = self.get_resource(guid)
867         rm.set(name, value)
868
869     def get_global(self, rtype, name):
870         """ Returns the value of the global attribute with name 'name' on the
871         RMs of rtype 'rtype'.
872
873             :param guid: Guid of the RM
874             :type guid: int
875
876             :param name: Name of the attribute 
877             :type name: str
878
879             :return: The value of the attribute with name 'name'
880
881         """
882         rclass = ResourceFactory.get_resource_type(rtype)
883         return rclass.get_global(name)
884
885     def set_global(self, rtype, name, value):
886         """ Modifies the value of the global attribute with name 'name' on the 
887         RMs of with rtype 'rtype'.
888
889             :param guid: Guid of the RM
890             :type guid: int
891
892             :param name: Name of the attribute
893             :type name: str
894
895             :param value: Value of the attribute
896
897         """
898         rclass = ResourceFactory.get_resource_type(rtype)
899         return rclass.set_global(name, value)
900
901     def state(self, guid, hr = False):
902         """ Returns the state of a resource
903
904             :param guid: Resource guid
905             :type guid: integer
906
907             :param hr: Human readable. Forces return of a 
908                 status string instead of a number 
909             :type hr: boolean
910
911         """
912         rm = self.get_resource(guid)
913         state = rm.state
914
915         if hr:
916             return ResourceState2str.get(state)
917
918         return state
919
920     def stop(self, guid):
921         """ Stops the RM with guid 'guid'
922
923         Stopping a RM means that the resource it controls will
924         no longer take part of the experiment.
925
926             :param guid: Guid of the RM
927             :type guid: int
928
929         """
930         rm = self.get_resource(guid)
931         return rm.stop()
932
933     def start(self, guid):
934         """ Starts the RM with guid 'guid'
935
936         Starting a RM means that the resource it controls will
937         begin taking part of the experiment.
938
939             :param guid: Guid of the RM
940             :type guid: int
941
942         """
943         rm = self.get_resource(guid)
944         return rm.start()
945
946     def get_start_time(self, guid):
947         """ Returns the start time of the RM as a timestamp """
948         rm = self.get_resource(guid)
949         return rm.start_time
950
951     def get_stop_time(self, guid):
952         """ Returns the stop time of the RM as a timestamp """
953         rm = self.get_resource(guid)
954         return rm.stop_time
955
956     def get_discover_time(self, guid):
957         """ Returns the discover time of the RM as a timestamp """
958         rm = self.get_resource(guid)
959         return rm.discover_time
960
961     def get_provision_time(self, guid):
962         """ Returns the provision time of the RM as a timestamp """
963         rm = self.get_resource(guid)
964         return rm.provision_time
965
966     def get_ready_time(self, guid):
967         """ Returns the deployment time of the RM as a timestamp """
968         rm = self.get_resource(guid)
969         return rm.ready_time
970
971     def get_release_time(self, guid):
972         """ Returns the release time of the RM as a timestamp """
973         rm = self.get_resource(guid)
974         return rm.release_time
975
976     def get_failed_time(self, guid):
977         """ Returns the time failure occured for the RM as a timestamp """
978         rm = self.get_resource(guid)
979         return rm.failed_time
980
981     def set_with_conditions(self, name, value, guids1, guids2, state,
982             time = None):
983         """ Modifies the value of attribute with name 'name' on all RMs 
984         on the guids1 list when time 'time' has elapsed since all 
985         elements in guids2 list have reached state 'state'.
986
987             :param name: Name of attribute to set in RM
988             :type name: string
989
990             :param value: Value of attribute to set in RM
991             :type name: string
992
993             :param guids1: List of guids of RMs subjected to action
994             :type guids1: list
995
996             :param action: Action to register (either START or STOP)
997             :type action: ResourceAction
998
999             :param guids2: List of guids of RMs to we waited for
1000             :type guids2: list
1001
1002             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
1003             :type state: ResourceState
1004
1005             :param time: Time to wait after guids2 has reached status 
1006             :type time: string
1007
1008         """
1009         if isinstance(guids1, int):
1010             guids1 = [guids1]
1011         if isinstance(guids2, int):
1012             guids2 = [guids2]
1013
1014         for guid1 in guids1:
1015             rm = self.get_resource(guid)
1016             rm.set_with_conditions(name, value, guids2, state, time)
1017
1018     def deploy(self, guids = None, wait_all_ready = True, group = None):
1019         """ Deploys all ResourceManagers in the guids list. 
1020         
1021         If the argument 'guids' is not given, all RMs with state NEW
1022         are deployed.
1023
1024             :param guids: List of guids of RMs to deploy
1025             :type guids: list
1026
1027             :param wait_all_ready: Wait until all RMs are ready in
1028                 order to start the RMs
1029             :type guid: int
1030
1031             :param group: Id of deployment group in which to deploy RMs
1032             :type group: int
1033
1034         """
1035         self.logger.debug(" ------- DEPLOY START ------ ")
1036
1037         if not guids:
1038             # If no guids list was passed, all 'NEW' RMs will be deployed
1039             guids = []
1040             for guid, rm in self._resources.items():
1041                 if rm.state == ResourceState.NEW:
1042                     guids.append(guid)
1043                 
1044         if isinstance(guids, int):
1045             guids = [guids]
1046
1047         # Create deployment group
1048         # New guids can be added to a same deployment group later on
1049         new_group = False
1050         if not group:
1051             new_group = True
1052             # xxx_next_hiccup
1053             group = self._group_id_generator.generate()
1054
1055         if group not in self._groups:
1056             self._groups[group] = []
1057
1058         self._groups[group].extend(guids)
1059
1060         def wait_all_and_start(group):
1061             # Function that checks if all resources are READY
1062             # before scheduling a start_with_conditions for each RM
1063             reschedule = False
1064             
1065             # Get all guids in group
1066             guids = self._groups[group]
1067
1068             for guid in guids:
1069                 if self.state(guid) < ResourceState.READY:
1070                     reschedule = True
1071                     break
1072
1073             if reschedule:
1074                 callback = functools.partial(wait_all_and_start, group)
1075                 self.schedule("1s", callback)
1076             else:
1077                 # If all resources are ready, we schedule the start
1078                 for guid in guids:
1079                     rm = self.get_resource(guid)
1080                     self.schedule("0s", rm.start_with_conditions)
1081
1082                     if rm.conditions.get(ResourceAction.STOP):
1083                         # Only if the RM has STOP conditions we
1084                         # schedule a stop. Otherwise the RM will stop immediately
1085                         self.schedule("0s", rm.stop_with_conditions)
1086
1087         if wait_all_ready and new_group:
1088             # Schedule a function to check that all resources are
1089             # READY, and only then schedule the start.
1090             # This aims at reducing the number of tasks looping in the 
1091             # scheduler. 
1092             # Instead of having many start tasks, we will have only one for 
1093             # the whole group.
1094             callback = functools.partial(wait_all_and_start, group)
1095             self.schedule("0s", callback)
1096
1097         for guid in guids:
1098             rm = self.get_resource(guid)
1099             rm.deployment_group = group
1100             self.schedule("0s", rm.deploy_with_conditions)
1101
1102             if not wait_all_ready:
1103                 self.schedule("0s", rm.start_with_conditions)
1104
1105                 if rm.conditions.get(ResourceAction.STOP):
1106                     # Only if the RM has STOP conditions we
1107                     # schedule a stop. Otherwise the RM will stop immediately
1108                     self.schedule("0s", rm.stop_with_conditions)
1109
1110     def release(self, guids = None):
1111         """ Releases all ResourceManagers in the guids list.
1112
1113         If the argument 'guids' is not given, all RMs registered
1114         in the experiment are released.
1115
1116             :param guids: List of RM guids
1117             :type guids: list
1118
1119         """
1120         if self._state == ECState.RELEASED:
1121             return 
1122
1123         if isinstance(guids, int):
1124             guids = [guids]
1125
1126         if not guids:
1127             guids = self.resources
1128
1129         for guid in guids:
1130             rm = self.get_resource(guid)
1131             self.schedule("0s", rm.release)
1132
1133         self.wait_released(guids)
1134
1135         if self.persist:
1136             self.save()
1137
1138         for guid in guids:
1139             if self.get(guid, "hardRelease"):
1140                 self.remove_resource(guid)\
1141
1142         # Mark the EC state as RELEASED
1143         self._state = ECState.RELEASED
1144         
1145     def shutdown(self):
1146         """ Releases all resources and stops the ExperimentController
1147
1148         """
1149         # If there was a major failure we can't exit gracefully
1150         if self._state == ECState.FAILED:
1151             raise RuntimeError("EC failure. Can not exit gracefully")
1152
1153         # Remove all pending tasks from the scheduler queue
1154         for tid in list(self._scheduler.pending):
1155             self._scheduler.remove(tid)
1156
1157         # Remove pending tasks from the workers queue
1158         self._runner.empty()
1159
1160         self.release()
1161
1162         # Mark the EC state as TERMINATED
1163         self._state = ECState.TERMINATED
1164
1165         # Stop processing thread
1166         self._stop = True
1167
1168         # Notify condition to wake up the processing thread
1169         self._notify()
1170         
1171         if self._thread.is_alive():
1172            self._thread.join()
1173
1174     def schedule(self, date, callback, track = False):
1175         """ Schedules a callback to be executed at time 'date'.
1176
1177             :param date: string containing execution time for the task.
1178                     It can be expressed as an absolute time, using
1179                     timestamp format, or as a relative time matching
1180                     ^\d+.\d+(h|m|s|ms|us)$
1181
1182             :param callback: code to be executed for the task. Must be a
1183                         Python function, and receives args and kwargs
1184                         as arguments.
1185
1186             :param track: if set to True, the task will be retrievable with
1187                     the get_task() method
1188
1189             :return : The Id of the task
1190             :rtype: int
1191             
1192         """
1193         timestamp = stabsformat(date)
1194         task = Task(timestamp, callback)
1195         task = self._scheduler.schedule(task)
1196
1197         if track:
1198             self._tasks[task.id] = task
1199
1200         # Notify condition to wake up the processing thread
1201         self._notify()
1202
1203         return task.id
1204      
1205     def _process(self):
1206         """ Process scheduled tasks.
1207
1208         .. note::
1209         
1210         Tasks are scheduled by invoking the schedule method with a target 
1211         callback and an execution time. 
1212         The schedule method creates a new Task object with that callback 
1213         and execution time, and pushes it into the '_scheduler' queue. 
1214         The execution time and the order of arrival of tasks are used 
1215         to order the tasks in the queue.
1216
1217         The _process method is executed in an independent thread held by 
1218         the ExperimentController for as long as the experiment is running.
1219         This method takes tasks from the '_scheduler' queue in a loop 
1220         and processes them in parallel using multithreading. 
1221         The environmental variable NEPI_NTHREADS can be used to control
1222         the number of threads used to process tasks. The default value is 
1223         50.
1224
1225         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1226         This object keeps a pool of threads (workers), and a queue of tasks
1227         scheduled for 'immediate' execution. 
1228         
1229         On each iteration, the '_process' loop will take the next task that 
1230         is scheduled for 'future' execution from the '_scheduler' queue, 
1231         and if the execution time of that task is >= to the current time, 
1232         it will push that task into the PR for 'immediate execution'. 
1233         As soon as a worker is free, the PR will assign the next task to
1234         that worker.
1235
1236         Upon receiving a task to execute, each PR worker (thread) will 
1237         invoke the  _execute method of the EC, passing the task as 
1238         argument.         
1239         The _execute method will then invoke task.callback inside a 
1240         try/except block. If an exception is raised by the tasks.callback, 
1241         it will be trapped by the try block, logged to standard error 
1242         (usually the console), and the task will be marked as failed.
1243
1244         """
1245
1246         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1247         self._runner = ParallelRun(maxthreads = self.nthreads)
1248         self._runner.start()
1249
1250         while not self._stop:
1251             try:
1252                 self._cond.acquire()
1253
1254                 task = next(self._scheduler)
1255                 
1256                 if not task:
1257                     # No task to execute. Wait for a new task to be scheduled.
1258                     self._cond.wait()
1259                 else:
1260                     # The task timestamp is in the future. Wait for timeout 
1261                     # or until another task is scheduled.
1262                     now = tnow()
1263                     if now < task.timestamp:
1264                         # Calculate timeout in seconds
1265                         timeout = tdiffsec(task.timestamp, now)
1266
1267                         # Re-schedule task with the same timestamp
1268                         self._scheduler.schedule(task)
1269                         
1270                         task = None
1271
1272                         # Wait timeout or until a new task awakes the condition
1273                         self._cond.wait(timeout)
1274                
1275                 self._cond.release()
1276
1277                 if task:
1278                     # Process tasks in parallel
1279                     self._runner.put(self._execute, task)
1280             except: 
1281                 import traceback
1282                 err = traceback.format_exc()
1283                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1284
1285                 # Set the EC to FAILED state 
1286                 self._state = ECState.FAILED
1287             
1288                 # Set the FailureManager failure level to EC failure
1289                 self._fm.set_ec_failure()
1290
1291         self.logger.debug("Exiting the task processing loop ... ")
1292         
1293         self._runner.sync()
1294         self._runner.destroy()
1295
1296     def _execute(self, task):
1297         """ Executes a single task. 
1298
1299             :param task: Object containing the callback to execute
1300             :type task: Task
1301
1302         """
1303         try:
1304             # Invoke callback
1305             task.result = task.callback()
1306             task.status = TaskStatus.DONE
1307         except:
1308             import traceback
1309             err = traceback.format_exc()
1310             task.result = err
1311             task.status = TaskStatus.ERROR
1312             
1313             self.logger.error("Error occurred while executing task: %s" % err)
1314
1315     def _notify(self):
1316         """ Awakes the processing thread if it is blocked waiting 
1317         for new tasks to arrive
1318         
1319         """
1320         self._cond.acquire()
1321         self._cond.notify()
1322         self._cond.release()
1323
1324     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1325             **kwargs):
1326         """ Automates experiment description using a NetGraph instance.
1327         """
1328         self._netgraph = NetGraph(**kwargs)
1329
1330         if add_node_callback:
1331             ### Add resources to the EC
1332             for nid in self.netgraph.nodes():
1333                 add_node_callback(self, nid)
1334
1335         if add_edge_callback:
1336             #### Add connections between resources
1337             for nid1, nid2 in self.netgraph.edges():
1338                 add_edge_callback(self, nid1, nid2)
1339