util/guid.py merged into ec.py that is the only one using it
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 # historical note: this class used to be in util/guid.py but is used only here
106 # FIXME: This class is not thread-safe. Should it be made thread-safe?
107 class GuidGenerator(object):
108     def __init__(self):
109         self._last_guid = 0
110
111     # historical note: this used to be called `next`
112     # which confused 2to3 - and me - while it has
113     # nothing to do at all with the iteration protocol
114     def generate(self, guid = None):
115         if guid == None:
116             guid = self._last_guid + 1
117
118         self._last_guid = self._last_guid if guid <= self._last_guid else guid
119
120         return guid
121
122 class ExperimentController(object):
123     """
124     .. note::
125
126     An experiment, or scenario, is defined by a concrete set of resources,
127     and the behavior, configuration and interconnection of those resources. 
128     The Experiment Description (ED) is a detailed representation of a
129     single experiment. It contains all the necessary information to 
130     allow repeating the experiment. NEPI allows to describe
131     experiments by registering components (resources), configuring them
132     and interconnecting them.
133     
134     A same experiment (scenario) can be executed many times, generating 
135     different results. We call an experiment execution (instance) a 'run'.
136
137     The ExperimentController (EC), is the entity responsible of
138     managing an experiment run. The same scenario can be 
139     recreated (and re-run) by instantiating an EC and recreating 
140     the same experiment description. 
141
142     An experiment is represented as a graph of interconnected
143     resources. A resource is a generic concept in the sense that any
144     component taking part of an experiment, whether physical of
145     virtual, is considered a resource. A resources could be a host, 
146     a virtual machine, an application, a simulator, a IP address.
147
148     A ResourceManager (RM), is the entity responsible for managing a 
149     single resource. ResourceManagers are specific to a resource
150     type (i.e. An RM to control a Linux application will not be
151     the same as the RM used to control a ns-3 simulation).
152     To support a new type of resource, a new RM must be implemented. 
153     NEPI already provides a variety of RMs to control basic resources, 
154     and new can be extended from the existing ones.
155
156     Through the EC interface the user can create ResourceManagers (RMs),
157     configure them and interconnect them, to describe an experiment.
158     Describing an experiment through the EC does not run the experiment.
159     Only when the 'deploy()' method is invoked on the EC, the EC will take 
160     actions to transform the 'described' experiment into a 'running' experiment.
161
162     While the experiment is running, it is possible to continue to
163     create/configure/connect RMs, and to deploy them to involve new
164     resources in the experiment (this is known as 'interactive' deployment).
165     
166     An experiments in NEPI is identified by a string id, 
167     which is either given by the user, or automatically generated by NEPI.  
168     The purpose of this identifier is to separate files and results that 
169     belong to different experiment scenarios. 
170     However, since a same 'experiment' can be run many times, the experiment
171     id is not enough to identify an experiment instance (run).
172     For this reason, the ExperimentController has two identifier, the 
173     exp_id, which can be re-used in different ExperimentController,
174     and the run_id, which is unique to one ExperimentController instance, and
175     is automatically generated by NEPI.
176    
177     """
178
179     @classmethod
180     def load(cls, filepath, format = SFormats.XML):
181         serializer = ECSerializer()
182         ec = serializer.load(filepath)
183         return ec
184
185     def __init__(self, exp_id = None, local_dir = None, persist = False,
186             fm = None, add_node_callback = None, add_edge_callback = None, 
187             **kwargs):
188         """ ExperimentController entity to model an execute a network 
189         experiment.
190         
191         :param exp_id: Human readable name to identify the experiment
192         :type exp_id: str
193
194         :param local_dir: Path to local directory where to store experiment
195             related files
196         :type local_dir: str
197
198         :param persist: Save an XML description of the experiment after 
199         completion at local_dir
200         :type persist: bool
201
202         :param fm: FailureManager object. If None is given, the default 
203             FailureManager class will be used
204         :type fm: FailureManager
205
206         :param add_node_callback: Callback to invoke for node instantiation
207         when automatic topology creation mode is used 
208         :type add_node_callback: function
209
210         :param add_edge_callback: Callback to invoke for edge instantiation 
211         when automatic topology creation mode is used 
212         :type add_edge_callback: function
213
214         """
215         super(ExperimentController, self).__init__()
216
217         # Logging
218         self._logger = logging.getLogger("ExperimentController")
219
220         # Run identifier. It identifies a concrete execution instance (run) 
221         # of an experiment.
222         # Since a same experiment (same configuration) can be executed many 
223         # times, this run_id permits to separate result files generated on 
224         # different experiment executions
225         self._run_id = tsformat()
226
227         # Experiment identifier. Usually assigned by the user
228         # Identifies the experiment scenario (i.e. configuration, 
229         # resources used, etc)
230         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
231
232         # Local path where to store experiment related files (results, etc)
233         if not local_dir:
234             local_dir = tempfile.gettempdir() # /tmp
235
236         self._local_dir = local_dir
237         self._exp_dir = os.path.join(local_dir, self.exp_id)
238         self._run_dir = os.path.join(self.exp_dir, self.run_id)
239
240         # If True persist the experiment controller in XML format, after completion
241         self._persist = persist
242
243         # generator of globally unique ids
244         self._guid_generator = GuidGenerator()
245         
246         # Resource managers
247         self._resources = dict()
248
249         # Scheduler. It a queue that holds tasks scheduled for
250         # execution, and yields the next task to be executed 
251         # ordered by execution and arrival time
252         self._scheduler = HeapScheduler()
253
254         # Tasks
255         self._tasks = dict()
256
257         # RM groups (for deployment) 
258         self._groups = dict()
259
260         # generator of globally unique id for groups
261         self._group_id_generator = GuidGenerator()
262
263         # Flag to stop processing thread
264         self._stop = False
265     
266         # Entity in charge of managing system failures
267         if not fm:
268             self._fm = FailureManager()
269         self._fm.set_ec(self)
270
271         # EC state
272         self._state = ECState.RUNNING
273
274         # Automatically construct experiment description 
275         self._netgraph = None
276         if add_node_callback or add_edge_callback or kwargs.get("topology"):
277             self._build_from_netgraph(add_node_callback, add_edge_callback, 
278                     **kwargs)
279
280         # The runner is a pool of threads used to parallelize 
281         # execution of tasks
282         self._nthreads = 20
283         self._runner = None
284
285         # Event processing thread
286         self._cond = threading.Condition()
287         self._thread = threading.Thread(target = self._process)
288         self._thread.setDaemon(True)
289         self._thread.start()
290         
291     @property
292     def logger(self):
293         """ Returns the logger instance of the Experiment Controller
294
295         """
296         return self._logger
297
298     @property
299     def fm(self):
300         """ Returns the failure manager
301
302         """
303
304         return self._fm
305
306     @property
307     def failure_level(self):
308         """ Returns the level of FAILURE of th experiment
309
310         """
311
312         return self._fm._failure_level
313
314     @property
315     def ecstate(self):
316         """ Returns the state of the Experiment Controller
317
318         """
319         return self._state
320
321     @property
322     def exp_id(self):
323         """ Returns the experiment id assigned by the user
324
325         """
326         return self._exp_id
327
328     @property
329     def run_id(self):
330         """ Returns the experiment instance (run) identifier (automatically 
331         generated)
332
333         """
334         return self._run_id
335
336     @property
337     def nthreads(self):
338         """ Returns the number of processing nthreads used
339
340         """
341         return self._nthreads
342
343     @property
344     def local_dir(self):
345         """ Root local directory for experiment files
346
347         """
348         return self._local_dir
349
350     @property
351     def exp_dir(self):
352         """ Local directory to store results and other files related to the 
353         experiment.
354
355         """
356         return self._exp_dir
357
358     @property
359     def run_dir(self):
360         """ Local directory to store results and other files related to the 
361         experiment run.
362
363         """
364         return self._run_dir
365
366     @property
367     def persist(self):
368         """ If True, persists the ExperimentController to XML format upon 
369         experiment completion
370
371         """
372         return self._persist
373
374     @property
375     def netgraph(self):
376         """ Return NetGraph instance if experiment description was automatically 
377         generated
378
379         """
380         return self._netgraph
381
382     @property
383     def abort(self):
384         """ Returns True if the experiment has failed and should be interrupted,
385         False otherwise.
386
387         """
388         return self._fm.abort
389
390     def inform_failure(self, guid):
391         """ Reports a failure in a RM to the EC for evaluation
392
393             :param guid: Resource id
394             :type guid: int
395
396         """
397
398         return self._fm.eval_failure(guid)
399
400     def wait_finished(self, guids):
401         """ Blocking method that waits until all RMs in the 'guids' list 
402         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
403         RELEASED ), or until a failure in the experiment occurs 
404         (i.e. abort == True) 
405         
406             :param guids: List of guids
407             :type guids: list
408
409         """
410
411         def quit():
412             return self.abort
413
414         return self.wait(guids, state = ResourceState.STOPPED, 
415                 quit = quit)
416
417     def wait_started(self, guids):
418         """ Blocking method that waits until all RMs in the 'guids' list 
419         have reached a state >= STARTED, or until a failure in the 
420         experiment occurs (i.e. abort == True) 
421         
422             :param guids: List of guids
423             :type guids: list
424
425         """
426
427         def quit():
428             return self.abort
429
430         return self.wait(guids, state = ResourceState.STARTED, 
431                 quit = quit)
432
433     def wait_released(self, guids):
434         """ Blocking method that waits until all RMs in the 'guids' list 
435         have reached a state == RELEASED, or until the EC fails 
436         
437             :param guids: List of guids
438             :type guids: list
439
440         """
441
442         def quit():
443             return self._state == ECState.FAILED
444
445         return self.wait(guids, state = ResourceState.RELEASED, 
446                 quit = quit)
447
448     def wait_deployed(self, guids):
449         """ Blocking method that waits until all RMs in the 'guids' list 
450         have reached a state >= READY, or until a failure in the 
451         experiment occurs (i.e. abort == True) 
452         
453             :param guids: List of guids
454             :type guids: list
455
456         """
457
458         def quit():
459             return self.abort
460
461         return self.wait(guids, state = ResourceState.READY, 
462                 quit = quit)
463
464     def wait(self, guids, state, quit):
465         """ Blocking method that waits until all RMs in the 'guids' list 
466         have reached a state >= 'state', or until the 'quit' callback
467         yields True
468            
469             :param guids: List of guids
470             :type guids: list
471         
472         """
473         if isinstance(guids, int):
474             guids = [guids]
475
476         # Make a copy to avoid modifying the original guids list
477         guids = list(guids)
478
479         while True:
480             # If there are no more guids to wait for
481             # or the quit function returns True, exit the loop
482             if len(guids) == 0 or quit():
483                 break
484
485             # If a guid reached one of the target states, remove it from list
486             guid = guids.pop()
487             rm = self.get_resource(guid)
488             rstate = rm.state
489             
490             if rstate >= state:
491                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
492                     rm.get_rtype(), guid, rstate, state))
493             else:
494                 # Debug...
495                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
496                     guid, rstate, state))
497
498                 guids.append(guid)
499
500                 time.sleep(0.5)
501
502     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
503         plotter = ECPlotter()
504         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
505                 show = show)
506         return fpath
507
508     def serialize(self, format = SFormats.XML):
509         serializer = ECSerializer()
510         sec = serializer.load(self, format = format)
511         return sec
512
513     def save(self, dirpath = None, format = SFormats.XML):
514         if dirpath == None:
515             dirpath = self.run_dir
516
517         try:
518             os.makedirs(dirpath)
519         except OSError:
520             pass
521
522         serializer = ECSerializer()
523         path = serializer.save(self, dirpath, format = format)
524         return path
525
526     def get_task(self, tid):
527         """ Returns a task by its id
528
529             :param tid: Id of the task
530             :type tid: int
531             
532             :rtype: Task
533             
534         """
535         return self._tasks.get(tid)
536
537     def get_resource(self, guid):
538         """ Returns a registered ResourceManager by its guid
539
540             :param guid: Id of the resource
541             :type guid: int
542             
543             :rtype: ResourceManager
544             
545         """
546         rm = self._resources.get(guid)
547         return rm
548
549     def get_resources_by_type(self, rtype):
550         """ Returns the ResourceManager objects of type rtype
551
552             :param rtype: Resource type
553             :type rtype: string
554             
555             :rtype: list of ResourceManagers
556             
557         """
558         rms = []
559         for guid, rm in self._resources.items():
560             if rm.get_rtype() == rtype: 
561                 rms.append(rm)
562         return rms
563
564     def remove_resource(self, guid):
565         del self._resources[guid]
566
567     @property
568     def resources(self):
569         """ Returns the guids of all ResourceManagers 
570
571             :return: Set of all RM guids
572             :rtype: list
573
574         """
575         keys = list(self._resources.keys())
576
577         return keys
578
579     def filter_resources(self, rtype):
580         """ Returns the guids of all ResourceManagers of type rtype
581
582             :param rtype: Resource type
583             :type rtype: string
584             
585             :rtype: list of guids
586             
587         """
588         rms = []
589         for guid, rm in self._resources.items():
590             if rm.get_rtype() == rtype: 
591                 rms.append(rm.guid)
592         return rms
593
594     def register_resource(self, rtype, guid = None):
595         """ Registers a new ResourceManager of type 'rtype' in the experiment
596         
597         This method will assign a new 'guid' for the RM, if no guid
598         is specified.
599
600             :param rtype: Type of the RM
601             :type rtype: str
602
603             :return: Guid of the RM
604             :rtype: int
605             
606         """
607         # Get next available guid
608         # xxx_next_hiccup
609         guid = self._guid_generator.generate(guid)
610         
611         # Instantiate RM
612         rm = ResourceFactory.create(rtype, self, guid)
613
614         # Store RM
615         self._resources[guid] = rm
616
617         return guid
618
619     def get_attributes(self, guid):
620         """ Returns all the attributes of the RM with guid 'guid'
621
622             :param guid: Guid of the RM
623             :type guid: int
624
625             :return: List of attributes
626             :rtype: list
627
628         """
629         rm = self.get_resource(guid)
630         return rm.get_attributes()
631
632     def get_attribute(self, guid, name):
633         """ Returns the attribute 'name' of the RM with guid 'guid'
634
635             :param guid: Guid of the RM
636             :type guid: int
637
638             :param name: Name of the attribute
639             :type name: str
640
641             :return: The attribute with name 'name'
642             :rtype: Attribute
643
644         """
645         rm = self.get_resource(guid)
646         return rm.get_attribute(name)
647
648     def register_connection(self, guid1, guid2):
649         """ Registers a connection between a RM with guid 'guid1'
650         and another RM with guid 'guid2'. 
651     
652         The order of the in which the two guids are provided is not
653         important, since the connection relationship is symmetric.
654
655             :param guid1: First guid to connect
656             :type guid1: ResourceManager
657
658             :param guid2: Second guid to connect
659             :type guid: ResourceManager
660
661         """
662         rm1 = self.get_resource(guid1)
663         rm2 = self.get_resource(guid2)
664
665         rm1.register_connection(guid2)
666         rm2.register_connection(guid1)
667
668     def register_condition(self, guids1, action, guids2, state,
669             time = None):
670         """ Registers an action START, STOP or DEPLOY for all RM on list
671         guids1 to occur at time 'time' after all elements in list guids2 
672         have reached state 'state'.
673
674             :param guids1: List of guids of RMs subjected to action
675             :type guids1: list
676
677             :param action: Action to perform (either START, STOP or DEPLOY)
678             :type action: ResourceAction
679
680             :param guids2: List of guids of RMs to we waited for
681             :type guids2: list
682
683             :param state: State to wait for on RMs of list guids2 (STARTED,
684                 STOPPED, etc)
685             :type state: ResourceState
686
687             :param time: Time to wait after guids2 has reached status 
688             :type time: string
689
690         """
691         if isinstance(guids1, int):
692             guids1 = [guids1]
693         if isinstance(guids2, int):
694             guids2 = [guids2]
695
696         for guid1 in guids1:
697             rm = self.get_resource(guid1)
698             rm.register_condition(action, guids2, state, time)
699
700     def enable_trace(self, guid, name):
701         """ Enables a trace to be collected during the experiment run
702
703             :param name: Name of the trace
704             :type name: str
705
706         """
707         rm = self.get_resource(guid)
708         rm.enable_trace(name)
709
710     def trace_enabled(self, guid, name):
711         """ Returns True if the trace of name 'name' is enabled
712
713             :param name: Name of the trace
714             :type name: str
715
716         """
717         rm = self.get_resource(guid)
718         return rm.trace_enabled(name)
719
720     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
721         """ Returns information on a collected trace, the trace stream or 
722         blocks (chunks) of the trace stream
723
724             :param name: Name of the trace
725             :type name: str
726
727             :param attr: Can be one of:
728                          - TraceAttr.ALL (complete trace content), 
729                          - TraceAttr.STREAM (block in bytes to read starting 
730                                 at offset),
731                          - TraceAttr.PATH (full path to the trace file),
732                          - TraceAttr.SIZE (size of trace file). 
733             :type attr: str
734
735             :param block: Number of bytes to retrieve from trace, when attr is 
736                 TraceAttr.STREAM 
737             :type name: int
738
739             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
740             :type name: int
741
742             :rtype: str
743
744         """
745         rm = self.get_resource(guid)
746         return rm.trace(name, attr, block, offset)
747
748     def get_traces(self, guid):
749         """ Returns the list of the trace names of the RM with guid 'guid'
750
751             :param guid: Guid of the RM
752             :type guid: int
753
754             :return: List of trace names
755             :rtype: list
756
757         """
758         rm = self.get_resource(guid)
759         return rm.get_traces()
760
761
762     def discover(self, guid):
763         """ Discovers an available resource matching the criteria defined
764         by the RM with guid 'guid', and associates that resource to the RM
765
766         Not all RM types require (or are capable of) performing resource 
767         discovery. For the RM types which are not capable of doing so, 
768         invoking this method does not have any consequences. 
769
770             :param guid: Guid of the RM
771             :type guid: int
772
773         """
774         rm = self.get_resource(guid)
775         return rm.discover()
776
777     def provision(self, guid):
778         """ Provisions the resource associated to the RM with guid 'guid'.
779
780         Provisioning means making a resource 'accessible' to the user. 
781         Not all RM types require (or are capable of) performing resource 
782         provisioning. For the RM types which are not capable of doing so, 
783         invoking this method does not have any consequences. 
784
785             :param guid: Guid of the RM
786             :type guid: int
787
788         """
789         rm = self.get_resource(guid)
790         return rm.provision()
791
792     def get(self, guid, name):
793         """ Returns the value of the attribute with name 'name' on the
794         RM with guid 'guid'
795
796             :param guid: Guid of the RM
797             :type guid: int
798
799             :param name: Name of the attribute 
800             :type name: str
801
802             :return: The value of the attribute with name 'name'
803
804         """
805         rm = self.get_resource(guid)
806         return rm.get(name)
807
808     def set(self, guid, name, value):
809         """ Modifies the value of the attribute with name 'name' on the 
810         RM with guid 'guid'.
811
812             :param guid: Guid of the RM
813             :type guid: int
814
815             :param name: Name of the attribute
816             :type name: str
817
818             :param value: Value of the attribute
819
820         """
821         rm = self.get_resource(guid)
822         rm.set(name, value)
823
824     def get_global(self, rtype, name):
825         """ Returns the value of the global attribute with name 'name' on the
826         RMs of rtype 'rtype'.
827
828             :param guid: Guid of the RM
829             :type guid: int
830
831             :param name: Name of the attribute 
832             :type name: str
833
834             :return: The value of the attribute with name 'name'
835
836         """
837         rclass = ResourceFactory.get_resource_type(rtype)
838         return rclass.get_global(name)
839
840     def set_global(self, rtype, name, value):
841         """ Modifies the value of the global attribute with name 'name' on the 
842         RMs of with rtype 'rtype'.
843
844             :param guid: Guid of the RM
845             :type guid: int
846
847             :param name: Name of the attribute
848             :type name: str
849
850             :param value: Value of the attribute
851
852         """
853         rclass = ResourceFactory.get_resource_type(rtype)
854         return rclass.set_global(name, value)
855
856     def state(self, guid, hr = False):
857         """ Returns the state of a resource
858
859             :param guid: Resource guid
860             :type guid: integer
861
862             :param hr: Human readable. Forces return of a 
863                 status string instead of a number 
864             :type hr: boolean
865
866         """
867         rm = self.get_resource(guid)
868         state = rm.state
869
870         if hr:
871             return ResourceState2str.get(state)
872
873         return state
874
875     def stop(self, guid):
876         """ Stops the RM with guid 'guid'
877
878         Stopping a RM means that the resource it controls will
879         no longer take part of the experiment.
880
881             :param guid: Guid of the RM
882             :type guid: int
883
884         """
885         rm = self.get_resource(guid)
886         return rm.stop()
887
888     def start(self, guid):
889         """ Starts the RM with guid 'guid'
890
891         Starting a RM means that the resource it controls will
892         begin taking part of the experiment.
893
894             :param guid: Guid of the RM
895             :type guid: int
896
897         """
898         rm = self.get_resource(guid)
899         return rm.start()
900
901     def get_start_time(self, guid):
902         """ Returns the start time of the RM as a timestamp """
903         rm = self.get_resource(guid)
904         return rm.start_time
905
906     def get_stop_time(self, guid):
907         """ Returns the stop time of the RM as a timestamp """
908         rm = self.get_resource(guid)
909         return rm.stop_time
910
911     def get_discover_time(self, guid):
912         """ Returns the discover time of the RM as a timestamp """
913         rm = self.get_resource(guid)
914         return rm.discover_time
915
916     def get_provision_time(self, guid):
917         """ Returns the provision time of the RM as a timestamp """
918         rm = self.get_resource(guid)
919         return rm.provision_time
920
921     def get_ready_time(self, guid):
922         """ Returns the deployment time of the RM as a timestamp """
923         rm = self.get_resource(guid)
924         return rm.ready_time
925
926     def get_release_time(self, guid):
927         """ Returns the release time of the RM as a timestamp """
928         rm = self.get_resource(guid)
929         return rm.release_time
930
931     def get_failed_time(self, guid):
932         """ Returns the time failure occured for the RM as a timestamp """
933         rm = self.get_resource(guid)
934         return rm.failed_time
935
936     def set_with_conditions(self, name, value, guids1, guids2, state,
937             time = None):
938         """ Modifies the value of attribute with name 'name' on all RMs 
939         on the guids1 list when time 'time' has elapsed since all 
940         elements in guids2 list have reached state 'state'.
941
942             :param name: Name of attribute to set in RM
943             :type name: string
944
945             :param value: Value of attribute to set in RM
946             :type name: string
947
948             :param guids1: List of guids of RMs subjected to action
949             :type guids1: list
950
951             :param action: Action to register (either START or STOP)
952             :type action: ResourceAction
953
954             :param guids2: List of guids of RMs to we waited for
955             :type guids2: list
956
957             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
958             :type state: ResourceState
959
960             :param time: Time to wait after guids2 has reached status 
961             :type time: string
962
963         """
964         if isinstance(guids1, int):
965             guids1 = [guids1]
966         if isinstance(guids2, int):
967             guids2 = [guids2]
968
969         for guid1 in guids1:
970             rm = self.get_resource(guid)
971             rm.set_with_conditions(name, value, guids2, state, time)
972
973     def deploy(self, guids = None, wait_all_ready = True, group = None):
974         """ Deploys all ResourceManagers in the guids list. 
975         
976         If the argument 'guids' is not given, all RMs with state NEW
977         are deployed.
978
979             :param guids: List of guids of RMs to deploy
980             :type guids: list
981
982             :param wait_all_ready: Wait until all RMs are ready in
983                 order to start the RMs
984             :type guid: int
985
986             :param group: Id of deployment group in which to deploy RMs
987             :type group: int
988
989         """
990         self.logger.debug(" ------- DEPLOY START ------ ")
991
992         if not guids:
993             # If no guids list was passed, all 'NEW' RMs will be deployed
994             guids = []
995             for guid, rm in self._resources.items():
996                 if rm.state == ResourceState.NEW:
997                     guids.append(guid)
998                 
999         if isinstance(guids, int):
1000             guids = [guids]
1001
1002         # Create deployment group
1003         # New guids can be added to a same deployment group later on
1004         new_group = False
1005         if not group:
1006             new_group = True
1007             # xxx_next_hiccup
1008             group = self._group_id_generator.generate()
1009
1010         if group not in self._groups:
1011             self._groups[group] = []
1012
1013         self._groups[group].extend(guids)
1014
1015         def wait_all_and_start(group):
1016             # Function that checks if all resources are READY
1017             # before scheduling a start_with_conditions for each RM
1018             reschedule = False
1019             
1020             # Get all guids in group
1021             guids = self._groups[group]
1022
1023             for guid in guids:
1024                 if self.state(guid) < ResourceState.READY:
1025                     reschedule = True
1026                     break
1027
1028             if reschedule:
1029                 callback = functools.partial(wait_all_and_start, group)
1030                 self.schedule("1s", callback)
1031             else:
1032                 # If all resources are ready, we schedule the start
1033                 for guid in guids:
1034                     rm = self.get_resource(guid)
1035                     self.schedule("0s", rm.start_with_conditions)
1036
1037                     if rm.conditions.get(ResourceAction.STOP):
1038                         # Only if the RM has STOP conditions we
1039                         # schedule a stop. Otherwise the RM will stop immediately
1040                         self.schedule("0s", rm.stop_with_conditions)
1041
1042         if wait_all_ready and new_group:
1043             # Schedule a function to check that all resources are
1044             # READY, and only then schedule the start.
1045             # This aims at reducing the number of tasks looping in the 
1046             # scheduler. 
1047             # Instead of having many start tasks, we will have only one for 
1048             # the whole group.
1049             callback = functools.partial(wait_all_and_start, group)
1050             self.schedule("0s", callback)
1051
1052         for guid in guids:
1053             rm = self.get_resource(guid)
1054             rm.deployment_group = group
1055             self.schedule("0s", rm.deploy_with_conditions)
1056
1057             if not wait_all_ready:
1058                 self.schedule("0s", rm.start_with_conditions)
1059
1060                 if rm.conditions.get(ResourceAction.STOP):
1061                     # Only if the RM has STOP conditions we
1062                     # schedule a stop. Otherwise the RM will stop immediately
1063                     self.schedule("0s", rm.stop_with_conditions)
1064
1065     def release(self, guids = None):
1066         """ Releases all ResourceManagers in the guids list.
1067
1068         If the argument 'guids' is not given, all RMs registered
1069         in the experiment are released.
1070
1071             :param guids: List of RM guids
1072             :type guids: list
1073
1074         """
1075         if self._state == ECState.RELEASED:
1076             return 
1077
1078         if isinstance(guids, int):
1079             guids = [guids]
1080
1081         if not guids:
1082             guids = self.resources
1083
1084         for guid in guids:
1085             rm = self.get_resource(guid)
1086             self.schedule("0s", rm.release)
1087
1088         self.wait_released(guids)
1089
1090         if self.persist:
1091             self.save()
1092
1093         for guid in guids:
1094             if self.get(guid, "hardRelease"):
1095                 self.remove_resource(guid)\
1096
1097         # Mark the EC state as RELEASED
1098         self._state = ECState.RELEASED
1099         
1100     def shutdown(self):
1101         """ Releases all resources and stops the ExperimentController
1102
1103         """
1104         # If there was a major failure we can't exit gracefully
1105         if self._state == ECState.FAILED:
1106             raise RuntimeError("EC failure. Can not exit gracefully")
1107
1108         # Remove all pending tasks from the scheduler queue
1109         for tid in list(self._scheduler.pending):
1110             self._scheduler.remove(tid)
1111
1112         # Remove pending tasks from the workers queue
1113         self._runner.empty()
1114
1115         self.release()
1116
1117         # Mark the EC state as TERMINATED
1118         self._state = ECState.TERMINATED
1119
1120         # Stop processing thread
1121         self._stop = True
1122
1123         # Notify condition to wake up the processing thread
1124         self._notify()
1125         
1126         if self._thread.is_alive():
1127            self._thread.join()
1128
1129     def schedule(self, date, callback, track = False):
1130         """ Schedules a callback to be executed at time 'date'.
1131
1132             :param date: string containing execution time for the task.
1133                     It can be expressed as an absolute time, using
1134                     timestamp format, or as a relative time matching
1135                     ^\d+.\d+(h|m|s|ms|us)$
1136
1137             :param callback: code to be executed for the task. Must be a
1138                         Python function, and receives args and kwargs
1139                         as arguments.
1140
1141             :param track: if set to True, the task will be retrievable with
1142                     the get_task() method
1143
1144             :return : The Id of the task
1145             :rtype: int
1146             
1147         """
1148         timestamp = stabsformat(date)
1149         task = Task(timestamp, callback)
1150         task = self._scheduler.schedule(task)
1151
1152         if track:
1153             self._tasks[task.id] = task
1154
1155         # Notify condition to wake up the processing thread
1156         self._notify()
1157
1158         return task.id
1159      
1160     def _process(self):
1161         """ Process scheduled tasks.
1162
1163         .. note::
1164         
1165         Tasks are scheduled by invoking the schedule method with a target 
1166         callback and an execution time. 
1167         The schedule method creates a new Task object with that callback 
1168         and execution time, and pushes it into the '_scheduler' queue. 
1169         The execution time and the order of arrival of tasks are used 
1170         to order the tasks in the queue.
1171
1172         The _process method is executed in an independent thread held by 
1173         the ExperimentController for as long as the experiment is running.
1174         This method takes tasks from the '_scheduler' queue in a loop 
1175         and processes them in parallel using multithreading. 
1176         The environmental variable NEPI_NTHREADS can be used to control
1177         the number of threads used to process tasks. The default value is 
1178         50.
1179
1180         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1181         This object keeps a pool of threads (workers), and a queue of tasks
1182         scheduled for 'immediate' execution. 
1183         
1184         On each iteration, the '_process' loop will take the next task that 
1185         is scheduled for 'future' execution from the '_scheduler' queue, 
1186         and if the execution time of that task is >= to the current time, 
1187         it will push that task into the PR for 'immediate execution'. 
1188         As soon as a worker is free, the PR will assign the next task to
1189         that worker.
1190
1191         Upon receiving a task to execute, each PR worker (thread) will 
1192         invoke the  _execute method of the EC, passing the task as 
1193         argument.         
1194         The _execute method will then invoke task.callback inside a 
1195         try/except block. If an exception is raised by the tasks.callback, 
1196         it will be trapped by the try block, logged to standard error 
1197         (usually the console), and the task will be marked as failed.
1198
1199         """
1200
1201         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1202         self._runner = ParallelRun(maxthreads = self.nthreads)
1203         self._runner.start()
1204
1205         while not self._stop:
1206             try:
1207                 self._cond.acquire()
1208
1209                 task = next(self._scheduler)
1210                 
1211                 if not task:
1212                     # No task to execute. Wait for a new task to be scheduled.
1213                     self._cond.wait()
1214                 else:
1215                     # The task timestamp is in the future. Wait for timeout 
1216                     # or until another task is scheduled.
1217                     now = tnow()
1218                     if now < task.timestamp:
1219                         # Calculate timeout in seconds
1220                         timeout = tdiffsec(task.timestamp, now)
1221
1222                         # Re-schedule task with the same timestamp
1223                         self._scheduler.schedule(task)
1224                         
1225                         task = None
1226
1227                         # Wait timeout or until a new task awakes the condition
1228                         self._cond.wait(timeout)
1229                
1230                 self._cond.release()
1231
1232                 if task:
1233                     # Process tasks in parallel
1234                     self._runner.put(self._execute, task)
1235             except: 
1236                 import traceback
1237                 err = traceback.format_exc()
1238                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1239
1240                 # Set the EC to FAILED state 
1241                 self._state = ECState.FAILED
1242             
1243                 # Set the FailureManager failure level to EC failure
1244                 self._fm.set_ec_failure()
1245
1246         self.logger.debug("Exiting the task processing loop ... ")
1247         
1248         self._runner.sync()
1249         self._runner.destroy()
1250
1251     def _execute(self, task):
1252         """ Executes a single task. 
1253
1254             :param task: Object containing the callback to execute
1255             :type task: Task
1256
1257         """
1258         try:
1259             # Invoke callback
1260             task.result = task.callback()
1261             task.status = TaskStatus.DONE
1262         except:
1263             import traceback
1264             err = traceback.format_exc()
1265             task.result = err
1266             task.status = TaskStatus.ERROR
1267             
1268             self.logger.error("Error occurred while executing task: %s" % err)
1269
1270     def _notify(self):
1271         """ Awakes the processing thread if it is blocked waiting 
1272         for new tasks to arrive
1273         
1274         """
1275         self._cond.acquire()
1276         self._cond.notify()
1277         self._cond.release()
1278
1279     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1280             **kwargs):
1281         """ Automates experiment description using a NetGraph instance.
1282         """
1283         self._netgraph = NetGraph(**kwargs)
1284
1285         if add_node_callback:
1286             ### Add resources to the EC
1287             for nid in self.netgraph.nodes():
1288                 add_node_callback(self, nid)
1289
1290         if add_edge_callback:
1291             #### Add connections between resources
1292             for nid1, nid2 in self.netgraph.edges():
1293                 add_edge_callback(self, nid1, nid2)
1294