register_resource(autoDeploy=True)
[nepi.git] / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 # historical note: this class used to be in util/guid.py but is used only here
106 # FIXME: This class is not thread-safe. Should it be made thread-safe?
107 class GuidGenerator(object):
108     def __init__(self):
109         self._last_guid = 0
110
111     # historical note: this used to be called `next`
112     # which confused 2to3 - and me - while it has
113     # nothing to do at all with the iteration protocol
114     def generate(self, guid = None):
115         if guid == None:
116             guid = self._last_guid + 1
117
118         self._last_guid = self._last_guid if guid <= self._last_guid else guid
119
120         return guid
121
122 class ExperimentController(object):
123     """
124     .. note::
125
126     An experiment, or scenario, is defined by a concrete set of resources,
127     and the behavior, configuration and interconnection of those resources. 
128     The Experiment Description (ED) is a detailed representation of a
129     single experiment. It contains all the necessary information to 
130     allow repeating the experiment. NEPI allows to describe
131     experiments by registering components (resources), configuring them
132     and interconnecting them.
133     
134     A same experiment (scenario) can be executed many times, generating 
135     different results. We call an experiment execution (instance) a 'run'.
136
137     The ExperimentController (EC), is the entity responsible of
138     managing an experiment run. The same scenario can be 
139     recreated (and re-run) by instantiating an EC and recreating 
140     the same experiment description. 
141
142     An experiment is represented as a graph of interconnected
143     resources. A resource is a generic concept in the sense that any
144     component taking part of an experiment, whether physical of
145     virtual, is considered a resource. A resources could be a host, 
146     a virtual machine, an application, a simulator, a IP address.
147
148     A ResourceManager (RM), is the entity responsible for managing a 
149     single resource. ResourceManagers are specific to a resource
150     type (i.e. An RM to control a Linux application will not be
151     the same as the RM used to control a ns-3 simulation).
152     To support a new type of resource, a new RM must be implemented. 
153     NEPI already provides a variety of RMs to control basic resources, 
154     and new can be extended from the existing ones.
155
156     Through the EC interface the user can create ResourceManagers (RMs),
157     configure them and interconnect them, to describe an experiment.
158     Describing an experiment through the EC does not run the experiment.
159     Only when the 'deploy()' method is invoked on the EC, the EC will take 
160     actions to transform the 'described' experiment into a 'running' experiment.
161
162     While the experiment is running, it is possible to continue to
163     create/configure/connect RMs, and to deploy them to involve new
164     resources in the experiment (this is known as 'interactive' deployment).
165     
166     An experiments in NEPI is identified by a string id, 
167     which is either given by the user, or automatically generated by NEPI.  
168     The purpose of this identifier is to separate files and results that 
169     belong to different experiment scenarios. 
170     However, since a same 'experiment' can be run many times, the experiment
171     id is not enough to identify an experiment instance (run).
172     For this reason, the ExperimentController has two identifier, the 
173     exp_id, which can be re-used in different ExperimentController,
174     and the run_id, which is unique to one ExperimentController instance, and
175     is automatically generated by NEPI.
176    
177     """
178
179     @classmethod
180     def load(cls, filepath, format = SFormats.XML):
181         serializer = ECSerializer()
182         ec = serializer.load(filepath)
183         return ec
184
185     def __init__(self, exp_id = None, local_dir = None, persist = False,
186             fm = None, add_node_callback = None, add_edge_callback = None, 
187             **kwargs):
188         """ ExperimentController entity to model an execute a network 
189         experiment.
190         
191         :param exp_id: Human readable name to identify the experiment
192         :type exp_id: str
193
194         :param local_dir: Path to local directory where to store experiment
195             related files
196         :type local_dir: str
197
198         :param persist: Save an XML description of the experiment after 
199         completion at local_dir
200         :type persist: bool
201
202         :param fm: FailureManager object. If None is given, the default 
203             FailureManager class will be used
204         :type fm: FailureManager
205
206         :param add_node_callback: Callback to invoke for node instantiation
207         when automatic topology creation mode is used 
208         :type add_node_callback: function
209
210         :param add_edge_callback: Callback to invoke for edge instantiation 
211         when automatic topology creation mode is used 
212         :type add_edge_callback: function
213
214         """
215         super(ExperimentController, self).__init__()
216
217         # Logging
218         self._logger = logging.getLogger("ExperimentController")
219
220         # Run identifier. It identifies a concrete execution instance (run) 
221         # of an experiment.
222         # Since a same experiment (same configuration) can be executed many 
223         # times, this run_id permits to separate result files generated on 
224         # different experiment executions
225         self._run_id = tsformat()
226
227         # Experiment identifier. Usually assigned by the user
228         # Identifies the experiment scenario (i.e. configuration, 
229         # resources used, etc)
230         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
231
232         # Local path where to store experiment related files (results, etc)
233         if not local_dir:
234             local_dir = tempfile.gettempdir() # /tmp
235
236         self._local_dir = local_dir
237         self._exp_dir = os.path.join(local_dir, self.exp_id)
238         self._run_dir = os.path.join(self.exp_dir, self.run_id)
239
240         # If True persist the experiment controller in XML format, after completion
241         self._persist = persist
242
243         # generator of globally unique ids
244         self._guid_generator = GuidGenerator()
245         
246         # Resource managers
247         self._resources = dict()
248
249         # Scheduler. It a queue that holds tasks scheduled for
250         # execution, and yields the next task to be executed 
251         # ordered by execution and arrival time
252         self._scheduler = HeapScheduler()
253
254         # Tasks
255         self._tasks = dict()
256
257         # RM groups (for deployment) 
258         self._groups = dict()
259
260         # generator of globally unique id for groups
261         self._group_id_generator = GuidGenerator()
262
263         # Flag to stop processing thread
264         self._stop = False
265     
266         # Entity in charge of managing system failures
267         if not fm:
268             self._fm = FailureManager()
269         self._fm.set_ec(self)
270
271         # EC state
272         self._state = ECState.RUNNING
273
274         # Automatically construct experiment description 
275         self._netgraph = None
276         if add_node_callback or add_edge_callback or kwargs.get("topology"):
277             self._build_from_netgraph(add_node_callback, add_edge_callback, 
278                     **kwargs)
279
280         # The runner is a pool of threads used to parallelize 
281         # execution of tasks
282         self._nthreads = 20
283         self._runner = None
284
285         # Event processing thread
286         self._cond = threading.Condition()
287         self._thread = threading.Thread(target = self._process)
288         self._thread.setDaemon(True)
289         self._thread.start()
290         
291     @property
292     def logger(self):
293         """ Returns the logger instance of the Experiment Controller
294
295         """
296         return self._logger
297
298     @property
299     def fm(self):
300         """ Returns the failure manager
301
302         """
303
304         return self._fm
305
306     @property
307     def failure_level(self):
308         """ Returns the level of FAILURE of th experiment
309
310         """
311
312         return self._fm._failure_level
313
314     @property
315     def ecstate(self):
316         """ Returns the state of the Experiment Controller
317
318         """
319         return self._state
320
321     @property
322     def exp_id(self):
323         """ Returns the experiment id assigned by the user
324
325         """
326         return self._exp_id
327
328     @property
329     def run_id(self):
330         """ Returns the experiment instance (run) identifier (automatically 
331         generated)
332
333         """
334         return self._run_id
335
336     @property
337     def nthreads(self):
338         """ Returns the number of processing nthreads used
339
340         """
341         return self._nthreads
342
343     @property
344     def local_dir(self):
345         """ Root local directory for experiment files
346
347         """
348         return self._local_dir
349
350     @property
351     def exp_dir(self):
352         """ Local directory to store results and other files related to the 
353         experiment.
354
355         """
356         return self._exp_dir
357
358     @property
359     def run_dir(self):
360         """ Local directory to store results and other files related to the 
361         experiment run.
362
363         """
364         return self._run_dir
365
366     @property
367     def persist(self):
368         """ If True, persists the ExperimentController to XML format upon 
369         experiment completion
370
371         """
372         return self._persist
373
374     @property
375     def netgraph(self):
376         """ Return NetGraph instance if experiment description was automatically 
377         generated
378
379         """
380         return self._netgraph
381
382     @property
383     def abort(self):
384         """ Returns True if the experiment has failed and should be interrupted,
385         False otherwise.
386
387         """
388         return self._fm.abort
389
390     def inform_failure(self, guid):
391         """ Reports a failure in a RM to the EC for evaluation
392
393             :param guid: Resource id
394             :type guid: int
395
396         """
397
398         return self._fm.eval_failure(guid)
399
400     def wait_finished(self, guids):
401         """ Blocking method that waits until all RMs in the 'guids' list 
402         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
403         RELEASED ), or until a failure in the experiment occurs 
404         (i.e. abort == True) 
405         
406             :param guids: List of guids
407             :type guids: list
408
409         """
410
411         def quit():
412             return self.abort
413
414         return self.wait(guids, state = ResourceState.STOPPED, 
415                 quit = quit)
416
417     def wait_started(self, guids):
418         """ Blocking method that waits until all RMs in the 'guids' list 
419         have reached a state >= STARTED, or until a failure in the 
420         experiment occurs (i.e. abort == True) 
421         
422             :param guids: List of guids
423             :type guids: list
424
425         """
426
427         def quit():
428             return self.abort
429
430         return self.wait(guids, state = ResourceState.STARTED, 
431                 quit = quit)
432
433     def wait_released(self, guids):
434         """ Blocking method that waits until all RMs in the 'guids' list 
435         have reached a state == RELEASED, or until the EC fails 
436         
437             :param guids: List of guids
438             :type guids: list
439
440         """
441
442         def quit():
443             return self._state == ECState.FAILED
444
445         return self.wait(guids, state = ResourceState.RELEASED, 
446                 quit = quit)
447
448     def wait_deployed(self, guids):
449         """ Blocking method that waits until all RMs in the 'guids' list 
450         have reached a state >= READY, or until a failure in the 
451         experiment occurs (i.e. abort == True) 
452         
453             :param guids: List of guids
454             :type guids: list
455
456         """
457
458         def quit():
459             return self.abort
460
461         return self.wait(guids, state = ResourceState.READY, 
462                 quit = quit)
463
464     def wait(self, guids, state, quit):
465         """ Blocking method that waits until all RMs in the 'guids' list 
466         have reached a state >= 'state', or until the 'quit' callback
467         yields True
468            
469             :param guids: List of guids
470             :type guids: list
471         
472         """
473         if isinstance(guids, int):
474             guids = [guids]
475
476         # Make a copy to avoid modifying the original guids list
477         guids = list(guids)
478
479         while True:
480             # If there are no more guids to wait for
481             # or the quit function returns True, exit the loop
482             if len(guids) == 0 or quit():
483                 break
484
485             # If a guid reached one of the target states, remove it from list
486             guid = guids.pop()
487             rm = self.get_resource(guid)
488             rstate = rm.state
489             
490             if rstate >= state:
491                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
492                     rm.get_rtype(), guid, rstate, state))
493             else:
494                 # Debug...
495                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
496                     guid, rstate, state))
497
498                 guids.append(guid)
499
500                 time.sleep(0.5)
501
502     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
503         plotter = ECPlotter()
504         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
505                 show = show)
506         return fpath
507
508     def serialize(self, format = SFormats.XML):
509         serializer = ECSerializer()
510         sec = serializer.load(self, format = format)
511         return sec
512
513     def save(self, dirpath = None, format = SFormats.XML):
514         if dirpath == None:
515             dirpath = self.run_dir
516
517         try:
518             os.makedirs(dirpath)
519         except OSError:
520             pass
521
522         serializer = ECSerializer()
523         path = serializer.save(self, dirpath, format = format)
524         return path
525
526     def get_task(self, tid):
527         """ Returns a task by its id
528
529             :param tid: Id of the task
530             :type tid: int
531             
532             :rtype: Task
533             
534         """
535         return self._tasks.get(tid)
536
537     def get_resource(self, guid):
538         """ Returns a registered ResourceManager by its guid
539
540             :param guid: Id of the resource
541             :type guid: int
542             
543             :rtype: ResourceManager
544             
545         """
546         rm = self._resources.get(guid)
547         return rm
548
549     def get_resources_by_type(self, rtype):
550         """ Returns the ResourceManager objects of type rtype
551
552             :param rtype: Resource type
553             :type rtype: string
554             
555             :rtype: list of ResourceManagers
556             
557         """
558         rms = []
559         for guid, rm in self._resources.items():
560             if rm.get_rtype() == rtype: 
561                 rms.append(rm)
562         return rms
563
564     def remove_resource(self, guid):
565         del self._resources[guid]
566
567     @property
568     def resources(self):
569         """ Returns the guids of all ResourceManagers 
570
571             :return: Set of all RM guids
572             :rtype: list
573
574         """
575         keys = list(self._resources.keys())
576
577         return keys
578
579     def filter_resources(self, rtype):
580         """ Returns the guids of all ResourceManagers of type rtype
581
582             :param rtype: Resource type
583             :type rtype: string
584             
585             :rtype: list of guids
586             
587         """
588         rms = []
589         for guid, rm in self._resources.items():
590             if rm.get_rtype() == rtype: 
591                 rms.append(rm.guid)
592         return rms
593
594     def register_resource(self, rtype, guid = None, **keywords):
595         """ Registers a new ResourceManager of type 'rtype' in the experiment
596         
597         This method will assign a new 'guid' for the RM, if no guid
598         is specified.
599
600             :param rtype: Type of the RM
601             :type rtype: str
602
603             :return: Guid of the RM
604             :rtype: int
605             
606         """
607         # Get next available guid
608         # xxx_next_hiccup
609         guid = self._guid_generator.generate(guid)
610         
611         # Instantiate RM
612         rm = ResourceFactory.create(rtype, self, guid)
613
614         # Store RM
615         self._resources[guid] = rm
616
617         ### so we can do something like
618         # node = ec.register_resource("linux::Node",
619         #                             username = user,
620         #                             hostname = host,
621         #                             autoDeploy = True)
622         ### instead of
623         # node = ec.register_resource("linux::Node")
624         # ec.set(node, "username", user)
625         # ec.set(node, "hostname", host)
626         # ec.deploy(node)
627
628         auto_deploy = 'autoDeploy' in keywords and keywords['autoDeploy']
629
630         # now we can do all the calls to 'set'
631         for name, value in keywords.items():
632             # autoDeploy is handled locally and not propagated to 'set'
633             if name != 'autoDeploy':
634                 self.set(guid, name, value)
635
636         if auto_deploy:
637             self.deploy(guid)
638
639         return guid
640
641     def get_attributes(self, guid):
642         """ Returns all the attributes of the RM with guid 'guid'
643
644             :param guid: Guid of the RM
645             :type guid: int
646
647             :return: List of attributes
648             :rtype: list
649
650         """
651         rm = self.get_resource(guid)
652         return rm.get_attributes()
653
654     def get_attribute(self, guid, name):
655         """ Returns the attribute 'name' of the RM with guid 'guid'
656
657             :param guid: Guid of the RM
658             :type guid: int
659
660             :param name: Name of the attribute
661             :type name: str
662
663             :return: The attribute with name 'name'
664             :rtype: Attribute
665
666         """
667         rm = self.get_resource(guid)
668         return rm.get_attribute(name)
669
670     def register_connection(self, guid1, guid2):
671         """ Registers a connection between a RM with guid 'guid1'
672         and another RM with guid 'guid2'. 
673     
674         The order of the in which the two guids are provided is not
675         important, since the connection relationship is symmetric.
676
677             :param guid1: First guid to connect
678             :type guid1: ResourceManager
679
680             :param guid2: Second guid to connect
681             :type guid: ResourceManager
682
683         """
684         rm1 = self.get_resource(guid1)
685         rm2 = self.get_resource(guid2)
686
687         rm1.register_connection(guid2)
688         rm2.register_connection(guid1)
689
690     def register_condition(self, guids1, action, guids2, state,
691             time = None):
692         """ Registers an action START, STOP or DEPLOY for all RM on list
693         guids1 to occur at time 'time' after all elements in list guids2 
694         have reached state 'state'.
695
696             :param guids1: List of guids of RMs subjected to action
697             :type guids1: list
698
699             :param action: Action to perform (either START, STOP or DEPLOY)
700             :type action: ResourceAction
701
702             :param guids2: List of guids of RMs to we waited for
703             :type guids2: list
704
705             :param state: State to wait for on RMs of list guids2 (STARTED,
706                 STOPPED, etc)
707             :type state: ResourceState
708
709             :param time: Time to wait after guids2 has reached status 
710             :type time: string
711
712         """
713         if isinstance(guids1, int):
714             guids1 = [guids1]
715         if isinstance(guids2, int):
716             guids2 = [guids2]
717
718         for guid1 in guids1:
719             rm = self.get_resource(guid1)
720             rm.register_condition(action, guids2, state, time)
721
722     def enable_trace(self, guid, name):
723         """ Enables a trace to be collected during the experiment run
724
725             :param name: Name of the trace
726             :type name: str
727
728         """
729         rm = self.get_resource(guid)
730         rm.enable_trace(name)
731
732     def trace_enabled(self, guid, name):
733         """ Returns True if the trace of name 'name' is enabled
734
735             :param name: Name of the trace
736             :type name: str
737
738         """
739         rm = self.get_resource(guid)
740         return rm.trace_enabled(name)
741
742     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
743         """ Returns information on a collected trace, the trace stream or 
744         blocks (chunks) of the trace stream
745
746             :param name: Name of the trace
747             :type name: str
748
749             :param attr: Can be one of:
750                          - TraceAttr.ALL (complete trace content), 
751                          - TraceAttr.STREAM (block in bytes to read starting 
752                                 at offset),
753                          - TraceAttr.PATH (full path to the trace file),
754                          - TraceAttr.SIZE (size of trace file). 
755             :type attr: str
756
757             :param block: Number of bytes to retrieve from trace, when attr is 
758                 TraceAttr.STREAM 
759             :type name: int
760
761             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
762             :type name: int
763
764             :rtype: str
765
766         """
767         rm = self.get_resource(guid)
768         return rm.trace(name, attr, block, offset)
769
770     def get_traces(self, guid):
771         """ Returns the list of the trace names of the RM with guid 'guid'
772
773             :param guid: Guid of the RM
774             :type guid: int
775
776             :return: List of trace names
777             :rtype: list
778
779         """
780         rm = self.get_resource(guid)
781         return rm.get_traces()
782
783
784     def discover(self, guid):
785         """ Discovers an available resource matching the criteria defined
786         by the RM with guid 'guid', and associates that resource to the RM
787
788         Not all RM types require (or are capable of) performing resource 
789         discovery. For the RM types which are not capable of doing so, 
790         invoking this method does not have any consequences. 
791
792             :param guid: Guid of the RM
793             :type guid: int
794
795         """
796         rm = self.get_resource(guid)
797         return rm.discover()
798
799     def provision(self, guid):
800         """ Provisions the resource associated to the RM with guid 'guid'.
801
802         Provisioning means making a resource 'accessible' to the user. 
803         Not all RM types require (or are capable of) performing resource 
804         provisioning. For the RM types which are not capable of doing so, 
805         invoking this method does not have any consequences. 
806
807             :param guid: Guid of the RM
808             :type guid: int
809
810         """
811         rm = self.get_resource(guid)
812         return rm.provision()
813
814     def get(self, guid, name):
815         """ Returns the value of the attribute with name 'name' on the
816         RM with guid 'guid'
817
818             :param guid: Guid of the RM
819             :type guid: int
820
821             :param name: Name of the attribute 
822             :type name: str
823
824             :return: The value of the attribute with name 'name'
825
826         """
827         rm = self.get_resource(guid)
828         return rm.get(name)
829
830     def set(self, guid, name, value):
831         """ Modifies the value of the attribute with name 'name' on the 
832         RM with guid 'guid'.
833
834             :param guid: Guid of the RM
835             :type guid: int
836
837             :param name: Name of the attribute
838             :type name: str
839
840             :param value: Value of the attribute
841
842         """
843         rm = self.get_resource(guid)
844         rm.set(name, value)
845
846     def get_global(self, rtype, name):
847         """ Returns the value of the global attribute with name 'name' on the
848         RMs of rtype 'rtype'.
849
850             :param guid: Guid of the RM
851             :type guid: int
852
853             :param name: Name of the attribute 
854             :type name: str
855
856             :return: The value of the attribute with name 'name'
857
858         """
859         rclass = ResourceFactory.get_resource_type(rtype)
860         return rclass.get_global(name)
861
862     def set_global(self, rtype, name, value):
863         """ Modifies the value of the global attribute with name 'name' on the 
864         RMs of with rtype 'rtype'.
865
866             :param guid: Guid of the RM
867             :type guid: int
868
869             :param name: Name of the attribute
870             :type name: str
871
872             :param value: Value of the attribute
873
874         """
875         rclass = ResourceFactory.get_resource_type(rtype)
876         return rclass.set_global(name, value)
877
878     def state(self, guid, hr = False):
879         """ Returns the state of a resource
880
881             :param guid: Resource guid
882             :type guid: integer
883
884             :param hr: Human readable. Forces return of a 
885                 status string instead of a number 
886             :type hr: boolean
887
888         """
889         rm = self.get_resource(guid)
890         state = rm.state
891
892         if hr:
893             return ResourceState2str.get(state)
894
895         return state
896
897     def stop(self, guid):
898         """ Stops the RM with guid 'guid'
899
900         Stopping a RM means that the resource it controls will
901         no longer take part of the experiment.
902
903             :param guid: Guid of the RM
904             :type guid: int
905
906         """
907         rm = self.get_resource(guid)
908         return rm.stop()
909
910     def start(self, guid):
911         """ Starts the RM with guid 'guid'
912
913         Starting a RM means that the resource it controls will
914         begin taking part of the experiment.
915
916             :param guid: Guid of the RM
917             :type guid: int
918
919         """
920         rm = self.get_resource(guid)
921         return rm.start()
922
923     def get_start_time(self, guid):
924         """ Returns the start time of the RM as a timestamp """
925         rm = self.get_resource(guid)
926         return rm.start_time
927
928     def get_stop_time(self, guid):
929         """ Returns the stop time of the RM as a timestamp """
930         rm = self.get_resource(guid)
931         return rm.stop_time
932
933     def get_discover_time(self, guid):
934         """ Returns the discover time of the RM as a timestamp """
935         rm = self.get_resource(guid)
936         return rm.discover_time
937
938     def get_provision_time(self, guid):
939         """ Returns the provision time of the RM as a timestamp """
940         rm = self.get_resource(guid)
941         return rm.provision_time
942
943     def get_ready_time(self, guid):
944         """ Returns the deployment time of the RM as a timestamp """
945         rm = self.get_resource(guid)
946         return rm.ready_time
947
948     def get_release_time(self, guid):
949         """ Returns the release time of the RM as a timestamp """
950         rm = self.get_resource(guid)
951         return rm.release_time
952
953     def get_failed_time(self, guid):
954         """ Returns the time failure occured for the RM as a timestamp """
955         rm = self.get_resource(guid)
956         return rm.failed_time
957
958     def set_with_conditions(self, name, value, guids1, guids2, state,
959             time = None):
960         """ Modifies the value of attribute with name 'name' on all RMs 
961         on the guids1 list when time 'time' has elapsed since all 
962         elements in guids2 list have reached state 'state'.
963
964             :param name: Name of attribute to set in RM
965             :type name: string
966
967             :param value: Value of attribute to set in RM
968             :type name: string
969
970             :param guids1: List of guids of RMs subjected to action
971             :type guids1: list
972
973             :param action: Action to register (either START or STOP)
974             :type action: ResourceAction
975
976             :param guids2: List of guids of RMs to we waited for
977             :type guids2: list
978
979             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
980             :type state: ResourceState
981
982             :param time: Time to wait after guids2 has reached status 
983             :type time: string
984
985         """
986         if isinstance(guids1, int):
987             guids1 = [guids1]
988         if isinstance(guids2, int):
989             guids2 = [guids2]
990
991         for guid1 in guids1:
992             rm = self.get_resource(guid)
993             rm.set_with_conditions(name, value, guids2, state, time)
994
995     def deploy(self, guids = None, wait_all_ready = True, group = None):
996         """ Deploys all ResourceManagers in the guids list. 
997         
998         If the argument 'guids' is not given, all RMs with state NEW
999         are deployed.
1000
1001             :param guids: List of guids of RMs to deploy
1002             :type guids: list
1003
1004             :param wait_all_ready: Wait until all RMs are ready in
1005                 order to start the RMs
1006             :type guid: int
1007
1008             :param group: Id of deployment group in which to deploy RMs
1009             :type group: int
1010
1011         """
1012         self.logger.debug(" ------- DEPLOY START ------ ")
1013
1014         if not guids:
1015             # If no guids list was passed, all 'NEW' RMs will be deployed
1016             guids = []
1017             for guid, rm in self._resources.items():
1018                 if rm.state == ResourceState.NEW:
1019                     guids.append(guid)
1020                 
1021         if isinstance(guids, int):
1022             guids = [guids]
1023
1024         # Create deployment group
1025         # New guids can be added to a same deployment group later on
1026         new_group = False
1027         if not group:
1028             new_group = True
1029             # xxx_next_hiccup
1030             group = self._group_id_generator.generate()
1031
1032         if group not in self._groups:
1033             self._groups[group] = []
1034
1035         self._groups[group].extend(guids)
1036
1037         def wait_all_and_start(group):
1038             # Function that checks if all resources are READY
1039             # before scheduling a start_with_conditions for each RM
1040             reschedule = False
1041             
1042             # Get all guids in group
1043             guids = self._groups[group]
1044
1045             for guid in guids:
1046                 if self.state(guid) < ResourceState.READY:
1047                     reschedule = True
1048                     break
1049
1050             if reschedule:
1051                 callback = functools.partial(wait_all_and_start, group)
1052                 self.schedule("1s", callback)
1053             else:
1054                 # If all resources are ready, we schedule the start
1055                 for guid in guids:
1056                     rm = self.get_resource(guid)
1057                     self.schedule("0s", rm.start_with_conditions)
1058
1059                     if rm.conditions.get(ResourceAction.STOP):
1060                         # Only if the RM has STOP conditions we
1061                         # schedule a stop. Otherwise the RM will stop immediately
1062                         self.schedule("0s", rm.stop_with_conditions)
1063
1064         if wait_all_ready and new_group:
1065             # Schedule a function to check that all resources are
1066             # READY, and only then schedule the start.
1067             # This aims at reducing the number of tasks looping in the 
1068             # scheduler. 
1069             # Instead of having many start tasks, we will have only one for 
1070             # the whole group.
1071             callback = functools.partial(wait_all_and_start, group)
1072             self.schedule("0s", callback)
1073
1074         for guid in guids:
1075             rm = self.get_resource(guid)
1076             rm.deployment_group = group
1077             self.schedule("0s", rm.deploy_with_conditions)
1078
1079             if not wait_all_ready:
1080                 self.schedule("0s", rm.start_with_conditions)
1081
1082                 if rm.conditions.get(ResourceAction.STOP):
1083                     # Only if the RM has STOP conditions we
1084                     # schedule a stop. Otherwise the RM will stop immediately
1085                     self.schedule("0s", rm.stop_with_conditions)
1086
1087     def release(self, guids = None):
1088         """ Releases all ResourceManagers in the guids list.
1089
1090         If the argument 'guids' is not given, all RMs registered
1091         in the experiment are released.
1092
1093             :param guids: List of RM guids
1094             :type guids: list
1095
1096         """
1097         if self._state == ECState.RELEASED:
1098             return 
1099
1100         if isinstance(guids, int):
1101             guids = [guids]
1102
1103         if not guids:
1104             guids = self.resources
1105
1106         for guid in guids:
1107             rm = self.get_resource(guid)
1108             self.schedule("0s", rm.release)
1109
1110         self.wait_released(guids)
1111
1112         if self.persist:
1113             self.save()
1114
1115         for guid in guids:
1116             if self.get(guid, "hardRelease"):
1117                 self.remove_resource(guid)\
1118
1119         # Mark the EC state as RELEASED
1120         self._state = ECState.RELEASED
1121         
1122     def shutdown(self):
1123         """ Releases all resources and stops the ExperimentController
1124
1125         """
1126         # If there was a major failure we can't exit gracefully
1127         if self._state == ECState.FAILED:
1128             raise RuntimeError("EC failure. Can not exit gracefully")
1129
1130         # Remove all pending tasks from the scheduler queue
1131         for tid in list(self._scheduler.pending):
1132             self._scheduler.remove(tid)
1133
1134         # Remove pending tasks from the workers queue
1135         self._runner.empty()
1136
1137         self.release()
1138
1139         # Mark the EC state as TERMINATED
1140         self._state = ECState.TERMINATED
1141
1142         # Stop processing thread
1143         self._stop = True
1144
1145         # Notify condition to wake up the processing thread
1146         self._notify()
1147         
1148         if self._thread.is_alive():
1149            self._thread.join()
1150
1151     def schedule(self, date, callback, track = False):
1152         """ Schedules a callback to be executed at time 'date'.
1153
1154             :param date: string containing execution time for the task.
1155                     It can be expressed as an absolute time, using
1156                     timestamp format, or as a relative time matching
1157                     ^\d+.\d+(h|m|s|ms|us)$
1158
1159             :param callback: code to be executed for the task. Must be a
1160                         Python function, and receives args and kwargs
1161                         as arguments.
1162
1163             :param track: if set to True, the task will be retrievable with
1164                     the get_task() method
1165
1166             :return : The Id of the task
1167             :rtype: int
1168             
1169         """
1170         timestamp = stabsformat(date)
1171         task = Task(timestamp, callback)
1172         task = self._scheduler.schedule(task)
1173
1174         if track:
1175             self._tasks[task.id] = task
1176
1177         # Notify condition to wake up the processing thread
1178         self._notify()
1179
1180         return task.id
1181      
1182     def _process(self):
1183         """ Process scheduled tasks.
1184
1185         .. note::
1186         
1187         Tasks are scheduled by invoking the schedule method with a target 
1188         callback and an execution time. 
1189         The schedule method creates a new Task object with that callback 
1190         and execution time, and pushes it into the '_scheduler' queue. 
1191         The execution time and the order of arrival of tasks are used 
1192         to order the tasks in the queue.
1193
1194         The _process method is executed in an independent thread held by 
1195         the ExperimentController for as long as the experiment is running.
1196         This method takes tasks from the '_scheduler' queue in a loop 
1197         and processes them in parallel using multithreading. 
1198         The environmental variable NEPI_NTHREADS can be used to control
1199         the number of threads used to process tasks. The default value is 
1200         50.
1201
1202         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1203         This object keeps a pool of threads (workers), and a queue of tasks
1204         scheduled for 'immediate' execution. 
1205         
1206         On each iteration, the '_process' loop will take the next task that 
1207         is scheduled for 'future' execution from the '_scheduler' queue, 
1208         and if the execution time of that task is >= to the current time, 
1209         it will push that task into the PR for 'immediate execution'. 
1210         As soon as a worker is free, the PR will assign the next task to
1211         that worker.
1212
1213         Upon receiving a task to execute, each PR worker (thread) will 
1214         invoke the  _execute method of the EC, passing the task as 
1215         argument.         
1216         The _execute method will then invoke task.callback inside a 
1217         try/except block. If an exception is raised by the tasks.callback, 
1218         it will be trapped by the try block, logged to standard error 
1219         (usually the console), and the task will be marked as failed.
1220
1221         """
1222
1223         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1224         self._runner = ParallelRun(maxthreads = self.nthreads)
1225         self._runner.start()
1226
1227         while not self._stop:
1228             try:
1229                 self._cond.acquire()
1230
1231                 task = next(self._scheduler)
1232                 
1233                 if not task:
1234                     # No task to execute. Wait for a new task to be scheduled.
1235                     self._cond.wait()
1236                 else:
1237                     # The task timestamp is in the future. Wait for timeout 
1238                     # or until another task is scheduled.
1239                     now = tnow()
1240                     if now < task.timestamp:
1241                         # Calculate timeout in seconds
1242                         timeout = tdiffsec(task.timestamp, now)
1243
1244                         # Re-schedule task with the same timestamp
1245                         self._scheduler.schedule(task)
1246                         
1247                         task = None
1248
1249                         # Wait timeout or until a new task awakes the condition
1250                         self._cond.wait(timeout)
1251                
1252                 self._cond.release()
1253
1254                 if task:
1255                     # Process tasks in parallel
1256                     self._runner.put(self._execute, task)
1257             except: 
1258                 import traceback
1259                 err = traceback.format_exc()
1260                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1261
1262                 # Set the EC to FAILED state 
1263                 self._state = ECState.FAILED
1264             
1265                 # Set the FailureManager failure level to EC failure
1266                 self._fm.set_ec_failure()
1267
1268         self.logger.debug("Exiting the task processing loop ... ")
1269         
1270         self._runner.sync()
1271         self._runner.destroy()
1272
1273     def _execute(self, task):
1274         """ Executes a single task. 
1275
1276             :param task: Object containing the callback to execute
1277             :type task: Task
1278
1279         """
1280         try:
1281             # Invoke callback
1282             task.result = task.callback()
1283             task.status = TaskStatus.DONE
1284         except:
1285             import traceback
1286             err = traceback.format_exc()
1287             task.result = err
1288             task.status = TaskStatus.ERROR
1289             
1290             self.logger.error("Error occurred while executing task: %s" % err)
1291
1292     def _notify(self):
1293         """ Awakes the processing thread if it is blocked waiting 
1294         for new tasks to arrive
1295         
1296         """
1297         self._cond.acquire()
1298         self._cond.notify()
1299         self._cond.release()
1300
1301     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1302             **kwargs):
1303         """ Automates experiment description using a NetGraph instance.
1304         """
1305         self._netgraph = NetGraph(**kwargs)
1306
1307         if add_node_callback:
1308             ### Add resources to the EC
1309             for nid in self.netgraph.nodes():
1310                 add_node_callback(self, nid)
1311
1312         if add_edge_callback:
1313             #### Add connections between resources
1314             for nid1, nid2 in self.netgraph.edges():
1315                 add_edge_callback(self, nid1, nid2)
1316