various formatting, including mixes of tab and spaces detected in py3
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.util import guid
20 from nepi.util.parallel import ParallelRun
21 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
22 from nepi.execution.resource import ResourceFactory, ResourceAction, \
23         ResourceState, ResourceState2str
24 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
25 from nepi.execution.trace import TraceAttr
26 from nepi.util.serializer import ECSerializer, SFormats
27 from nepi.util.plotter import ECPlotter, PFormats
28 from nepi.util.netgraph import NetGraph, TopologyType 
29
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
32
33 import functools
34 import logging
35 import os
36 import sys
37 import tempfile
38 import time
39 import threading
40 import weakref
41
42 class FailureLevel(object):
43     """ Possible failure states for the experiment """
44     OK = 1
45     RM_FAILURE = 2
46     EC_FAILURE = 3
47
48 class FailureManager(object):
49     """ The FailureManager is responsible for handling errors
50     and deciding whether an experiment should be aborted or not
51     """
52
53     def __init__(self):
54         self._ec = None
55         self._failure_level = FailureLevel.OK
56         self._abort = False
57
58     def set_ec(self, ec):
59         self._ec = weakref.ref(ec)
60
61     @property
62     def ec(self):
63         """ Returns the ExperimentController associated to this FailureManager 
64         """
65         return self._ec()
66
67     @property
68     def abort(self):
69         return self._abort
70
71     def eval_failure(self, guid):
72         """ Implements failure policy and sets the abort state of the
73         experiment based on the failure state and criticality of
74         the RM
75
76         :param guid: Guid of the RM upon which the failure of the experiment
77             is evaluated
78         :type guid: int
79
80         """
81         if self._failure_level == FailureLevel.OK:
82             rm = self.ec.get_resource(guid)
83             state = rm.state
84             critical = rm.get("critical")
85
86             if state == ResourceState.FAILED and critical:
87                 self._failure_level = FailureLevel.RM_FAILURE
88                 self._abort = True
89                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
90                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
91
92     def set_ec_failure(self):
93         self._failure_level = FailureLevel.EC_FAILURE
94
95 class ECState(object):
96     """ Possible states of the ExperimentController
97    
98     """
99     RUNNING = 1
100     FAILED = 2
101     RELEASED = 3
102     TERMINATED = 4
103
104 class ExperimentController(object):
105     """
106     .. note::
107
108     An experiment, or scenario, is defined by a concrete set of resources,
109     and the behavior, configuration and interconnection of those resources. 
110     The Experiment Description (ED) is a detailed representation of a
111     single experiment. It contains all the necessary information to 
112     allow repeating the experiment. NEPI allows to describe
113     experiments by registering components (resources), configuring them
114     and interconnecting them.
115     
116     A same experiment (scenario) can be executed many times, generating 
117     different results. We call an experiment execution (instance) a 'run'.
118
119     The ExperimentController (EC), is the entity responsible of
120     managing an experiment run. The same scenario can be 
121     recreated (and re-run) by instantiating an EC and recreating 
122     the same experiment description. 
123
124     An experiment is represented as a graph of interconnected
125     resources. A resource is a generic concept in the sense that any
126     component taking part of an experiment, whether physical of
127     virtual, is considered a resource. A resources could be a host, 
128     a virtual machine, an application, a simulator, a IP address.
129
130     A ResourceManager (RM), is the entity responsible for managing a 
131     single resource. ResourceManagers are specific to a resource
132     type (i.e. An RM to control a Linux application will not be
133     the same as the RM used to control a ns-3 simulation).
134     To support a new type of resource, a new RM must be implemented. 
135     NEPI already provides a variety of RMs to control basic resources, 
136     and new can be extended from the existing ones.
137
138     Through the EC interface the user can create ResourceManagers (RMs),
139     configure them and interconnect them, to describe an experiment.
140     Describing an experiment through the EC does not run the experiment.
141     Only when the 'deploy()' method is invoked on the EC, the EC will take 
142     actions to transform the 'described' experiment into a 'running' experiment.
143
144     While the experiment is running, it is possible to continue to
145     create/configure/connect RMs, and to deploy them to involve new
146     resources in the experiment (this is known as 'interactive' deployment).
147     
148     An experiments in NEPI is identified by a string id, 
149     which is either given by the user, or automatically generated by NEPI.  
150     The purpose of this identifier is to separate files and results that 
151     belong to different experiment scenarios. 
152     However, since a same 'experiment' can be run many times, the experiment
153     id is not enough to identify an experiment instance (run).
154     For this reason, the ExperimentController has two identifier, the 
155     exp_id, which can be re-used in different ExperimentController,
156     and the run_id, which is unique to one ExperimentController instance, and
157     is automatically generated by NEPI.
158    
159     """
160
161     @classmethod
162     def load(cls, filepath, format = SFormats.XML):
163         serializer = ECSerializer()
164         ec = serializer.load(filepath)
165         return ec
166
167     def __init__(self, exp_id = None, local_dir = None, persist = False,
168             fm = None, add_node_callback = None, add_edge_callback = None, 
169             **kwargs):
170         """ ExperimentController entity to model an execute a network 
171         experiment.
172         
173         :param exp_id: Human readable name to identify the experiment
174         :type exp_id: str
175
176         :param local_dir: Path to local directory where to store experiment
177             related files
178         :type local_dir: str
179
180         :param persist: Save an XML description of the experiment after 
181         completion at local_dir
182         :type persist: bool
183
184         :param fm: FailureManager object. If None is given, the default 
185             FailureManager class will be used
186         :type fm: FailureManager
187
188         :param add_node_callback: Callback to invoke for node instantiation
189         when automatic topology creation mode is used 
190         :type add_node_callback: function
191
192         :param add_edge_callback: Callback to invoke for edge instantiation 
193         when automatic topology creation mode is used 
194         :type add_edge_callback: function
195
196         """
197         super(ExperimentController, self).__init__()
198
199         # Logging
200         self._logger = logging.getLogger("ExperimentController")
201
202         # Run identifier. It identifies a concrete execution instance (run) 
203         # of an experiment.
204         # Since a same experiment (same configuration) can be executed many 
205         # times, this run_id permits to separate result files generated on 
206         # different experiment executions
207         self._run_id = tsformat()
208
209         # Experiment identifier. Usually assigned by the user
210         # Identifies the experiment scenario (i.e. configuration, 
211         # resources used, etc)
212         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
213
214         # Local path where to store experiment related files (results, etc)
215         if not local_dir:
216             local_dir = tempfile.gettempdir() # /tmp
217
218         self._local_dir = local_dir
219         self._exp_dir = os.path.join(local_dir, self.exp_id)
220         self._run_dir = os.path.join(self.exp_dir, self.run_id)
221
222         # If True persist the experiment controller in XML format, after completion
223         self._persist = persist
224
225         # generator of globally unique ids
226         self._guid_generator = guid.GuidGenerator()
227         
228         # Resource managers
229         self._resources = dict()
230
231         # Scheduler. It a queue that holds tasks scheduled for
232         # execution, and yields the next task to be executed 
233         # ordered by execution and arrival time
234         self._scheduler = HeapScheduler()
235
236         # Tasks
237         self._tasks = dict()
238
239         # RM groups (for deployment) 
240         self._groups = dict()
241
242         # generator of globally unique id for groups
243         self._group_id_generator = guid.GuidGenerator()
244
245         # Flag to stop processing thread
246         self._stop = False
247     
248         # Entity in charge of managing system failures
249         if not fm:
250             self._fm = FailureManager()
251         self._fm.set_ec(self)
252
253         # EC state
254         self._state = ECState.RUNNING
255
256         # Automatically construct experiment description 
257         self._netgraph = None
258         if add_node_callback or add_edge_callback or kwargs.get("topology"):
259             self._build_from_netgraph(add_node_callback, add_edge_callback, 
260                     **kwargs)
261
262         # The runner is a pool of threads used to parallelize 
263         # execution of tasks
264         self._nthreads = 20
265         self._runner = None
266
267         # Event processing thread
268         self._cond = threading.Condition()
269         self._thread = threading.Thread(target = self._process)
270         self._thread.setDaemon(True)
271         self._thread.start()
272         
273     @property
274     def logger(self):
275         """ Returns the logger instance of the Experiment Controller
276
277         """
278         return self._logger
279
280     @property
281     def fm(self):
282         """ Returns the failure manager
283
284         """
285
286         return self._fm
287
288     @property
289     def failure_level(self):
290         """ Returns the level of FAILURE of th experiment
291
292         """
293
294         return self._fm._failure_level
295
296     @property
297     def ecstate(self):
298         """ Returns the state of the Experiment Controller
299
300         """
301         return self._state
302
303     @property
304     def exp_id(self):
305         """ Returns the experiment id assigned by the user
306
307         """
308         return self._exp_id
309
310     @property
311     def run_id(self):
312         """ Returns the experiment instance (run) identifier (automatically 
313         generated)
314
315         """
316         return self._run_id
317
318     @property
319     def nthreads(self):
320         """ Returns the number of processing nthreads used
321
322         """
323         return self._nthreads
324
325     @property
326     def local_dir(self):
327         """ Root local directory for experiment files
328
329         """
330         return self._local_dir
331
332     @property
333     def exp_dir(self):
334         """ Local directory to store results and other files related to the 
335         experiment.
336
337         """
338         return self._exp_dir
339
340     @property
341     def run_dir(self):
342         """ Local directory to store results and other files related to the 
343         experiment run.
344
345         """
346         return self._run_dir
347
348     @property
349     def persist(self):
350         """ If True, persists the ExperimentController to XML format upon 
351         experiment completion
352
353         """
354         return self._persist
355
356     @property
357     def netgraph(self):
358         """ Return NetGraph instance if experiment description was automatically 
359         generated
360
361         """
362         return self._netgraph
363
364     @property
365     def abort(self):
366         """ Returns True if the experiment has failed and should be interrupted,
367         False otherwise.
368
369         """
370         return self._fm.abort
371
372     def inform_failure(self, guid):
373         """ Reports a failure in a RM to the EC for evaluation
374
375             :param guid: Resource id
376             :type guid: int
377
378         """
379
380         return self._fm.eval_failure(guid)
381
382     def wait_finished(self, guids):
383         """ Blocking method that waits until all RMs in the 'guids' list 
384         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
385         RELEASED ), or until a failure in the experiment occurs 
386         (i.e. abort == True) 
387         
388             :param guids: List of guids
389             :type guids: list
390
391         """
392
393         def quit():
394             return self.abort
395
396         return self.wait(guids, state = ResourceState.STOPPED, 
397                 quit = quit)
398
399     def wait_started(self, guids):
400         """ Blocking method that waits until all RMs in the 'guids' list 
401         have reached a state >= STARTED, or until a failure in the 
402         experiment occurs (i.e. abort == True) 
403         
404             :param guids: List of guids
405             :type guids: list
406
407         """
408
409         def quit():
410             return self.abort
411
412         return self.wait(guids, state = ResourceState.STARTED, 
413                 quit = quit)
414
415     def wait_released(self, guids):
416         """ Blocking method that waits until all RMs in the 'guids' list 
417         have reached a state == RELEASED, or until the EC fails 
418         
419             :param guids: List of guids
420             :type guids: list
421
422         """
423
424         def quit():
425             return self._state == ECState.FAILED
426
427         return self.wait(guids, state = ResourceState.RELEASED, 
428                 quit = quit)
429
430     def wait_deployed(self, guids):
431         """ Blocking method that waits until all RMs in the 'guids' list 
432         have reached a state >= READY, or until a failure in the 
433         experiment occurs (i.e. abort == True) 
434         
435             :param guids: List of guids
436             :type guids: list
437
438         """
439
440         def quit():
441             return self.abort
442
443         return self.wait(guids, state = ResourceState.READY, 
444                 quit = quit)
445
446     def wait(self, guids, state, quit):
447         """ Blocking method that waits until all RMs in the 'guids' list 
448         have reached a state >= 'state', or until the 'quit' callback
449         yields True
450            
451             :param guids: List of guids
452             :type guids: list
453         
454         """
455         if isinstance(guids, int):
456             guids = [guids]
457
458         # Make a copy to avoid modifying the original guids list
459         guids = list(guids)
460
461         while True:
462             # If there are no more guids to wait for
463             # or the quit function returns True, exit the loop
464             if len(guids) == 0 or quit():
465                 break
466
467             # If a guid reached one of the target states, remove it from list
468             guid = guids.pop()
469             rm = self.get_resource(guid)
470             rstate = rm.state
471             
472             if rstate >= state:
473                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
474                     rm.get_rtype(), guid, rstate, state))
475             else:
476                 # Debug...
477                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
478                     guid, rstate, state))
479
480                 guids.append(guid)
481
482                 time.sleep(0.5)
483
484     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
485         plotter = ECPlotter()
486         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
487                 show = show)
488         return fpath
489
490     def serialize(self, format = SFormats.XML):
491         serializer = ECSerializer()
492         sec = serializer.load(self, format = format)
493         return sec
494
495     def save(self, dirpath = None, format = SFormats.XML):
496         if dirpath == None:
497             dirpath = self.run_dir
498
499         try:
500             os.makedirs(dirpath)
501         except OSError:
502             pass
503
504         serializer = ECSerializer()
505         path = serializer.save(self, dirpath, format = format)
506         return path
507
508     def get_task(self, tid):
509         """ Returns a task by its id
510
511             :param tid: Id of the task
512             :type tid: int
513             
514             :rtype: Task
515             
516         """
517         return self._tasks.get(tid)
518
519     def get_resource(self, guid):
520         """ Returns a registered ResourceManager by its guid
521
522             :param guid: Id of the resource
523             :type guid: int
524             
525             :rtype: ResourceManager
526             
527         """
528         rm = self._resources.get(guid)
529         return rm
530
531     def get_resources_by_type(self, rtype):
532         """ Returns the ResourceManager objects of type rtype
533
534             :param rtype: Resource type
535             :type rtype: string
536             
537             :rtype: list of ResourceManagers
538             
539         """
540         rms = []
541         for guid, rm in self._resources.iteritems():
542             if rm.get_rtype() == rtype: 
543                 rms.append(rm)
544         return rms
545
546     def remove_resource(self, guid):
547         del self._resources[guid]
548
549     @property
550     def resources(self):
551         """ Returns the guids of all ResourceManagers 
552
553             :return: Set of all RM guids
554             :rtype: list
555
556         """
557         keys = self._resources.keys()
558
559         return keys
560
561     def filter_resources(self, rtype):
562         """ Returns the guids of all ResourceManagers of type rtype
563
564             :param rtype: Resource type
565             :type rtype: string
566             
567             :rtype: list of guids
568             
569         """
570         rms = []
571         for guid, rm in self._resources.iteritems():
572             if rm.get_rtype() == rtype: 
573                 rms.append(rm.guid)
574         return rms
575
576     def register_resource(self, rtype, guid = None):
577         """ Registers a new ResourceManager of type 'rtype' in the experiment
578         
579         This method will assign a new 'guid' for the RM, if no guid
580         is specified.
581
582             :param rtype: Type of the RM
583             :type rtype: str
584
585             :return: Guid of the RM
586             :rtype: int
587             
588         """
589         # Get next available guid
590         guid = self._guid_generator.next(guid)
591         
592         # Instantiate RM
593         rm = ResourceFactory.create(rtype, self, guid)
594
595         # Store RM
596         self._resources[guid] = rm
597
598         return guid
599
600     def get_attributes(self, guid):
601         """ Returns all the attributes of the RM with guid 'guid'
602
603             :param guid: Guid of the RM
604             :type guid: int
605
606             :return: List of attributes
607             :rtype: list
608
609         """
610         rm = self.get_resource(guid)
611         return rm.get_attributes()
612
613     def get_attribute(self, guid, name):
614         """ Returns the attribute 'name' of the RM with guid 'guid'
615
616             :param guid: Guid of the RM
617             :type guid: int
618
619             :param name: Name of the attribute
620             :type name: str
621
622             :return: The attribute with name 'name'
623             :rtype: Attribute
624
625         """
626         rm = self.get_resource(guid)
627         return rm.get_attribute(name)
628
629     def register_connection(self, guid1, guid2):
630         """ Registers a connection between a RM with guid 'guid1'
631         and another RM with guid 'guid2'. 
632     
633         The order of the in which the two guids are provided is not
634         important, since the connection relationship is symmetric.
635
636             :param guid1: First guid to connect
637             :type guid1: ResourceManager
638
639             :param guid2: Second guid to connect
640             :type guid: ResourceManager
641
642         """
643         rm1 = self.get_resource(guid1)
644         rm2 = self.get_resource(guid2)
645
646         rm1.register_connection(guid2)
647         rm2.register_connection(guid1)
648
649     def register_condition(self, guids1, action, guids2, state,
650             time = None):
651         """ Registers an action START, STOP or DEPLOY for all RM on list
652         guids1 to occur at time 'time' after all elements in list guids2 
653         have reached state 'state'.
654
655             :param guids1: List of guids of RMs subjected to action
656             :type guids1: list
657
658             :param action: Action to perform (either START, STOP or DEPLOY)
659             :type action: ResourceAction
660
661             :param guids2: List of guids of RMs to we waited for
662             :type guids2: list
663
664             :param state: State to wait for on RMs of list guids2 (STARTED,
665                 STOPPED, etc)
666             :type state: ResourceState
667
668             :param time: Time to wait after guids2 has reached status 
669             :type time: string
670
671         """
672         if isinstance(guids1, int):
673             guids1 = [guids1]
674         if isinstance(guids2, int):
675             guids2 = [guids2]
676
677         for guid1 in guids1:
678             rm = self.get_resource(guid1)
679             rm.register_condition(action, guids2, state, time)
680
681     def enable_trace(self, guid, name):
682         """ Enables a trace to be collected during the experiment run
683
684             :param name: Name of the trace
685             :type name: str
686
687         """
688         rm = self.get_resource(guid)
689         rm.enable_trace(name)
690
691     def trace_enabled(self, guid, name):
692         """ Returns True if the trace of name 'name' is enabled
693
694             :param name: Name of the trace
695             :type name: str
696
697         """
698         rm = self.get_resource(guid)
699         return rm.trace_enabled(name)
700
701     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
702         """ Returns information on a collected trace, the trace stream or 
703         blocks (chunks) of the trace stream
704
705             :param name: Name of the trace
706             :type name: str
707
708             :param attr: Can be one of:
709                          - TraceAttr.ALL (complete trace content), 
710                          - TraceAttr.STREAM (block in bytes to read starting 
711                                 at offset),
712                          - TraceAttr.PATH (full path to the trace file),
713                          - TraceAttr.SIZE (size of trace file). 
714             :type attr: str
715
716             :param block: Number of bytes to retrieve from trace, when attr is 
717                 TraceAttr.STREAM 
718             :type name: int
719
720             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
721             :type name: int
722
723             :rtype: str
724
725         """
726         rm = self.get_resource(guid)
727         return rm.trace(name, attr, block, offset)
728
729     def get_traces(self, guid):
730         """ Returns the list of the trace names of the RM with guid 'guid'
731
732             :param guid: Guid of the RM
733             :type guid: int
734
735             :return: List of trace names
736             :rtype: list
737
738         """
739         rm = self.get_resource(guid)
740         return rm.get_traces()
741
742
743     def discover(self, guid):
744         """ Discovers an available resource matching the criteria defined
745         by the RM with guid 'guid', and associates that resource to the RM
746
747         Not all RM types require (or are capable of) performing resource 
748         discovery. For the RM types which are not capable of doing so, 
749         invoking this method does not have any consequences. 
750
751             :param guid: Guid of the RM
752             :type guid: int
753
754         """
755         rm = self.get_resource(guid)
756         return rm.discover()
757
758     def provision(self, guid):
759         """ Provisions the resource associated to the RM with guid 'guid'.
760
761         Provisioning means making a resource 'accessible' to the user. 
762         Not all RM types require (or are capable of) performing resource 
763         provisioning. For the RM types which are not capable of doing so, 
764         invoking this method does not have any consequences. 
765
766             :param guid: Guid of the RM
767             :type guid: int
768
769         """
770         rm = self.get_resource(guid)
771         return rm.provision()
772
773     def get(self, guid, name):
774         """ Returns the value of the attribute with name 'name' on the
775         RM with guid 'guid'
776
777             :param guid: Guid of the RM
778             :type guid: int
779
780             :param name: Name of the attribute 
781             :type name: str
782
783             :return: The value of the attribute with name 'name'
784
785         """
786         rm = self.get_resource(guid)
787         return rm.get(name)
788
789     def set(self, guid, name, value):
790         """ Modifies the value of the attribute with name 'name' on the 
791         RM with guid 'guid'.
792
793             :param guid: Guid of the RM
794             :type guid: int
795
796             :param name: Name of the attribute
797             :type name: str
798
799             :param value: Value of the attribute
800
801         """
802         rm = self.get_resource(guid)
803         rm.set(name, value)
804
805     def get_global(self, rtype, name):
806         """ Returns the value of the global attribute with name 'name' on the
807         RMs of rtype 'rtype'.
808
809             :param guid: Guid of the RM
810             :type guid: int
811
812             :param name: Name of the attribute 
813             :type name: str
814
815             :return: The value of the attribute with name 'name'
816
817         """
818         rclass = ResourceFactory.get_resource_type(rtype)
819         return rclass.get_global(name)
820
821     def set_global(self, rtype, name, value):
822         """ Modifies the value of the global attribute with name 'name' on the 
823         RMs of with rtype 'rtype'.
824
825             :param guid: Guid of the RM
826             :type guid: int
827
828             :param name: Name of the attribute
829             :type name: str
830
831             :param value: Value of the attribute
832
833         """
834         rclass = ResourceFactory.get_resource_type(rtype)
835         return rclass.set_global(name, value)
836
837     def state(self, guid, hr = False):
838         """ Returns the state of a resource
839
840             :param guid: Resource guid
841             :type guid: integer
842
843             :param hr: Human readable. Forces return of a 
844                 status string instead of a number 
845             :type hr: boolean
846
847         """
848         rm = self.get_resource(guid)
849         state = rm.state
850
851         if hr:
852             return ResourceState2str.get(state)
853
854         return state
855
856     def stop(self, guid):
857         """ Stops the RM with guid 'guid'
858
859         Stopping a RM means that the resource it controls will
860         no longer take part of the experiment.
861
862             :param guid: Guid of the RM
863             :type guid: int
864
865         """
866         rm = self.get_resource(guid)
867         return rm.stop()
868
869     def start(self, guid):
870         """ Starts the RM with guid 'guid'
871
872         Starting a RM means that the resource it controls will
873         begin taking part of the experiment.
874
875             :param guid: Guid of the RM
876             :type guid: int
877
878         """
879         rm = self.get_resource(guid)
880         return rm.start()
881
882     def get_start_time(self, guid):
883         """ Returns the start time of the RM as a timestamp """
884         rm = self.get_resource(guid)
885         return rm.start_time
886
887     def get_stop_time(self, guid):
888         """ Returns the stop time of the RM as a timestamp """
889         rm = self.get_resource(guid)
890         return rm.stop_time
891
892     def get_discover_time(self, guid):
893         """ Returns the discover time of the RM as a timestamp """
894         rm = self.get_resource(guid)
895         return rm.discover_time
896
897     def get_provision_time(self, guid):
898         """ Returns the provision time of the RM as a timestamp """
899         rm = self.get_resource(guid)
900         return rm.provision_time
901
902     def get_ready_time(self, guid):
903         """ Returns the deployment time of the RM as a timestamp """
904         rm = self.get_resource(guid)
905         return rm.ready_time
906
907     def get_release_time(self, guid):
908         """ Returns the release time of the RM as a timestamp """
909         rm = self.get_resource(guid)
910         return rm.release_time
911
912     def get_failed_time(self, guid):
913         """ Returns the time failure occured for the RM as a timestamp """
914         rm = self.get_resource(guid)
915         return rm.failed_time
916
917     def set_with_conditions(self, name, value, guids1, guids2, state,
918             time = None):
919         """ Modifies the value of attribute with name 'name' on all RMs 
920         on the guids1 list when time 'time' has elapsed since all 
921         elements in guids2 list have reached state 'state'.
922
923             :param name: Name of attribute to set in RM
924             :type name: string
925
926             :param value: Value of attribute to set in RM
927             :type name: string
928
929             :param guids1: List of guids of RMs subjected to action
930             :type guids1: list
931
932             :param action: Action to register (either START or STOP)
933             :type action: ResourceAction
934
935             :param guids2: List of guids of RMs to we waited for
936             :type guids2: list
937
938             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
939             :type state: ResourceState
940
941             :param time: Time to wait after guids2 has reached status 
942             :type time: string
943
944         """
945         if isinstance(guids1, int):
946             guids1 = [guids1]
947         if isinstance(guids2, int):
948             guids2 = [guids2]
949
950         for guid1 in guids1:
951             rm = self.get_resource(guid)
952             rm.set_with_conditions(name, value, guids2, state, time)
953
954     def deploy(self, guids = None, wait_all_ready = True, group = None):
955         """ Deploys all ResourceManagers in the guids list. 
956         
957         If the argument 'guids' is not given, all RMs with state NEW
958         are deployed.
959
960             :param guids: List of guids of RMs to deploy
961             :type guids: list
962
963             :param wait_all_ready: Wait until all RMs are ready in
964                 order to start the RMs
965             :type guid: int
966
967             :param group: Id of deployment group in which to deploy RMs
968             :type group: int
969
970         """
971         self.logger.debug(" ------- DEPLOY START ------ ")
972
973         if not guids:
974             # If no guids list was passed, all 'NEW' RMs will be deployed
975             guids = []
976             for guid, rm in self._resources.iteritems():
977                 if rm.state == ResourceState.NEW:
978                     guids.append(guid)
979                 
980         if isinstance(guids, int):
981             guids = [guids]
982
983         # Create deployment group
984         # New guids can be added to a same deployment group later on
985         new_group = False
986         if not group:
987             new_group = True
988             group = self._group_id_generator.next()
989
990         if group not in self._groups:
991             self._groups[group] = []
992
993         self._groups[group].extend(guids)
994
995         def wait_all_and_start(group):
996             # Function that checks if all resources are READY
997             # before scheduling a start_with_conditions for each RM
998             reschedule = False
999             
1000             # Get all guids in group
1001             guids = self._groups[group]
1002
1003             for guid in guids:
1004                 if self.state(guid) < ResourceState.READY:
1005                     reschedule = True
1006                     break
1007
1008             if reschedule:
1009                 callback = functools.partial(wait_all_and_start, group)
1010                 self.schedule("1s", callback)
1011             else:
1012                 # If all resources are ready, we schedule the start
1013                 for guid in guids:
1014                     rm = self.get_resource(guid)
1015                     self.schedule("0s", rm.start_with_conditions)
1016
1017                     if rm.conditions.get(ResourceAction.STOP):
1018                         # Only if the RM has STOP conditions we
1019                         # schedule a stop. Otherwise the RM will stop immediately
1020                         self.schedule("0s", rm.stop_with_conditions)
1021
1022         if wait_all_ready and new_group:
1023             # Schedule a function to check that all resources are
1024             # READY, and only then schedule the start.
1025             # This aims at reducing the number of tasks looping in the 
1026             # scheduler. 
1027             # Instead of having many start tasks, we will have only one for 
1028             # the whole group.
1029             callback = functools.partial(wait_all_and_start, group)
1030             self.schedule("0s", callback)
1031
1032         for guid in guids:
1033             rm = self.get_resource(guid)
1034             rm.deployment_group = group
1035             self.schedule("0s", rm.deploy_with_conditions)
1036
1037             if not wait_all_ready:
1038                 self.schedule("0s", rm.start_with_conditions)
1039
1040                 if rm.conditions.get(ResourceAction.STOP):
1041                     # Only if the RM has STOP conditions we
1042                     # schedule a stop. Otherwise the RM will stop immediately
1043                     self.schedule("0s", rm.stop_with_conditions)
1044
1045     def release(self, guids = None):
1046         """ Releases all ResourceManagers in the guids list.
1047
1048         If the argument 'guids' is not given, all RMs registered
1049         in the experiment are released.
1050
1051             :param guids: List of RM guids
1052             :type guids: list
1053
1054         """
1055         if self._state == ECState.RELEASED:
1056             return 
1057
1058         if isinstance(guids, int):
1059             guids = [guids]
1060
1061         if not guids:
1062             guids = self.resources
1063
1064         for guid in guids:
1065             rm = self.get_resource(guid)
1066             self.schedule("0s", rm.release)
1067
1068         self.wait_released(guids)
1069
1070         if self.persist:
1071             self.save()
1072
1073         for guid in guids:
1074             if self.get(guid, "hardRelease"):
1075                 self.remove_resource(guid)\
1076
1077         # Mark the EC state as RELEASED
1078         self._state = ECState.RELEASED
1079         
1080     def shutdown(self):
1081         """ Releases all resources and stops the ExperimentController
1082
1083         """
1084         # If there was a major failure we can't exit gracefully
1085         if self._state == ECState.FAILED:
1086             raise RuntimeError("EC failure. Can not exit gracefully")
1087
1088         # Remove all pending tasks from the scheduler queue
1089         for tid in list(self._scheduler.pending):
1090             self._scheduler.remove(tid)
1091
1092         # Remove pending tasks from the workers queue
1093         self._runner.empty()
1094
1095         self.release()
1096
1097         # Mark the EC state as TERMINATED
1098         self._state = ECState.TERMINATED
1099
1100         # Stop processing thread
1101         self._stop = True
1102
1103         # Notify condition to wake up the processing thread
1104         self._notify()
1105         
1106         if self._thread.is_alive():
1107            self._thread.join()
1108
1109     def schedule(self, date, callback, track = False):
1110         """ Schedules a callback to be executed at time 'date'.
1111
1112             :param date: string containing execution time for the task.
1113                     It can be expressed as an absolute time, using
1114                     timestamp format, or as a relative time matching
1115                     ^\d+.\d+(h|m|s|ms|us)$
1116
1117             :param callback: code to be executed for the task. Must be a
1118                         Python function, and receives args and kwargs
1119                         as arguments.
1120
1121             :param track: if set to True, the task will be retrievable with
1122                     the get_task() method
1123
1124             :return : The Id of the task
1125             :rtype: int
1126             
1127         """
1128         timestamp = stabsformat(date)
1129         task = Task(timestamp, callback)
1130         task = self._scheduler.schedule(task)
1131
1132         if track:
1133             self._tasks[task.id] = task
1134
1135         # Notify condition to wake up the processing thread
1136         self._notify()
1137
1138         return task.id
1139      
1140     def _process(self):
1141         """ Process scheduled tasks.
1142
1143         .. note::
1144         
1145         Tasks are scheduled by invoking the schedule method with a target 
1146         callback and an execution time. 
1147         The schedule method creates a new Task object with that callback 
1148         and execution time, and pushes it into the '_scheduler' queue. 
1149         The execution time and the order of arrival of tasks are used 
1150         to order the tasks in the queue.
1151
1152         The _process method is executed in an independent thread held by 
1153         the ExperimentController for as long as the experiment is running.
1154         This method takes tasks from the '_scheduler' queue in a loop 
1155         and processes them in parallel using multithreading. 
1156         The environmental variable NEPI_NTHREADS can be used to control
1157         the number of threads used to process tasks. The default value is 
1158         50.
1159
1160         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1161         This object keeps a pool of threads (workers), and a queue of tasks
1162         scheduled for 'immediate' execution. 
1163         
1164         On each iteration, the '_process' loop will take the next task that 
1165         is scheduled for 'future' execution from the '_scheduler' queue, 
1166         and if the execution time of that task is >= to the current time, 
1167         it will push that task into the PR for 'immediate execution'. 
1168         As soon as a worker is free, the PR will assign the next task to
1169         that worker.
1170
1171         Upon receiving a task to execute, each PR worker (thread) will 
1172         invoke the  _execute method of the EC, passing the task as 
1173         argument.         
1174         The _execute method will then invoke task.callback inside a 
1175         try/except block. If an exception is raised by the tasks.callback, 
1176         it will be trapped by the try block, logged to standard error 
1177         (usually the console), and the task will be marked as failed.
1178
1179         """
1180
1181         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1182         self._runner = ParallelRun(maxthreads = self.nthreads)
1183         self._runner.start()
1184
1185         while not self._stop:
1186             try:
1187                 self._cond.acquire()
1188
1189                 task = self._scheduler.next()
1190                 
1191                 if not task:
1192                     # No task to execute. Wait for a new task to be scheduled.
1193                     self._cond.wait()
1194                 else:
1195                     # The task timestamp is in the future. Wait for timeout 
1196                     # or until another task is scheduled.
1197                     now = tnow()
1198                     if now < task.timestamp:
1199                         # Calculate timeout in seconds
1200                         timeout = tdiffsec(task.timestamp, now)
1201
1202                         # Re-schedule task with the same timestamp
1203                         self._scheduler.schedule(task)
1204                         
1205                         task = None
1206
1207                         # Wait timeout or until a new task awakes the condition
1208                         self._cond.wait(timeout)
1209                
1210                 self._cond.release()
1211
1212                 if task:
1213                     # Process tasks in parallel
1214                     self._runner.put(self._execute, task)
1215             except: 
1216                 import traceback
1217                 err = traceback.format_exc()
1218                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1219
1220                 # Set the EC to FAILED state 
1221                 self._state = ECState.FAILED
1222             
1223                 # Set the FailureManager failure level to EC failure
1224                 self._fm.set_ec_failure()
1225
1226         self.logger.debug("Exiting the task processing loop ... ")
1227         
1228         self._runner.sync()
1229         self._runner.destroy()
1230
1231     def _execute(self, task):
1232         """ Executes a single task. 
1233
1234             :param task: Object containing the callback to execute
1235             :type task: Task
1236
1237         """
1238         try:
1239             # Invoke callback
1240             task.result = task.callback()
1241             task.status = TaskStatus.DONE
1242         except:
1243             import traceback
1244             err = traceback.format_exc()
1245             task.result = err
1246             task.status = TaskStatus.ERROR
1247             
1248             self.logger.error("Error occurred while executing task: %s" % err)
1249
1250     def _notify(self):
1251         """ Awakes the processing thread if it is blocked waiting 
1252         for new tasks to arrive
1253         
1254         """
1255         self._cond.acquire()
1256         self._cond.notify()
1257         self._cond.release()
1258
1259     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1260             **kwargs):
1261         """ Automates experiment description using a NetGraph instance.
1262         """
1263         self._netgraph = NetGraph(**kwargs)
1264
1265         if add_node_callback:
1266             ### Add resources to the EC
1267             for nid in self.netgraph.nodes():
1268                 add_node_callback(self, nid)
1269
1270         if add_edge_callback:
1271             #### Add connections between resources
1272             for nid1, nid2 in self.netgraph.edges():
1273                 add_edge_callback(self, nid1, nid2)
1274