applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / execution / runner.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.ec import ExperimentController, ECState
20
21 import math
22 import numpy
23 import os
24 import time
25
26 class ExperimentRunner(object):
27     """ The ExperimentRunner entity is responsible of
28     re-running an experiment described by an ExperimentController 
29     multiple time
30
31     """
32     def __init__(self):
33         super(ExperimentRunner, self).__init__()
34     
35     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
36             wait_guids = [], compute_metric_callback = None, 
37             evaluate_convergence_callback = None ):
38         """ Run a same experiment independently multiple times, until the 
39         evaluate_convergence_callback function returns True
40
41         :param ec: Description of experiment to replicate. 
42             The runner takes care of deploying the EC, so ec.deploy()
43             must not be invoked directly before or after invoking
44             runner.run().
45         :type ec: ExperimentController
46
47         :param min_runs: Minimum number of times the experiment must be 
48             replicated
49         :type min_runs: int
50
51         :param max_runs: Maximum number of times the experiment can be 
52             replicated
53         :type max_runs: int
54
55         :param wait_time: Time to wait in seconds on each run between invoking
56             ec.deploy() and ec.release().
57         :type wait_time: float
58
59         :param wait_guids: List of guids wait for finalization on each run.
60             This list is passed to ec.wait_finished()
61         :type wait_guids: list 
62
63         :param compute_metric_callback: User defined function invoked after 
64             each experiment run to compute a metric. The metric is usually
65             a network measurement obtained from the data collected 
66             during experiment execution.
67             The function is invoked passing the ec and the run number as arguments. 
68             It must return the value for the computed metric(s) (usually a single 
69             numerical value, but it can be several).
70
71                 metric = compute_metric_callback(ec, run)
72             
73         :type compute_metric_callback: function 
74
75         :param evaluate_convergence_callback: User defined function invoked after
76             computing the metric on each run, to evaluate the experiment was
77             run enough times. It takes the list of cumulated metrics produced by 
78             the compute_metric_callback up to the current run, and decided 
79             whether the metrics have statistically converged to a meaningful value
80             or not. It must return either True or False. 
81
82                 stop = evaluate_convergence_callback(ec, run, metrics)
83
84             If stop is True, then the runner will exit.
85             
86         :type evaluate_convergence_callback: function 
87
88         """
89
90         if (not max_runs or max_runs < 0) and not compute_metric_callback:
91             msg = "Undefined STOP condition, set stop_callback or max_runs"
92             raise RuntimeError(msg)
93
94         if compute_metric_callback and not evaluate_convergence_callback:
95             evaluate_convergence_callback = self.evaluate_normal_convergence
96             ec.logger.info(" Treating data as normal to evaluate convergence. "
97                     "Experiment will stop when the standard error with 95% "
98                     "confidence interval is >= 5% of the mean of the collected samples ")
99         
100         # Force persistence of experiment controller
101         ec._persist = True
102
103         filepath = ec.save(dirpath = ec.exp_dir)
104
105         samples = []
106         run = 0
107         stop = False
108
109         while not stop: 
110             run += 1
111
112             ec = self.run_experiment(filepath, wait_time, wait_guids)
113             
114             ec.logger.info(" RUN %d \n" % run)
115
116             if compute_metric_callback:
117                 metric = compute_metric_callback(ec, run)
118                 if metric is not None:
119                     samples.append(metric)
120
121                     if run >= min_runs and evaluate_convergence_callback:
122                         if evaluate_convergence_callback(ec, run, samples):
123                             stop = True
124
125             if run >= min_runs and max_runs > -1 and run >= max_runs :
126                 stop = True
127
128             ec.shutdown()
129             del ec
130
131         return run
132
133     def evaluate_normal_convergence(self, ec, run, metrics):
134         """ Returns True when the confidence interval of the sample mean is
135         less than 5% of the mean value, for a 95% confidence level,
136         assuming normal distribution of the data
137         """
138
139         if len(metrics) == 0:
140             msg = "0 samples collected"
141             raise RuntimeError(msg)
142
143         x = numpy.array(metrics)
144         n = len(metrics)
145         std = x.std()
146         se = std / math.sqrt(n)
147         m = x.mean()
148
149         # Confidence interval for 95% confidence level, 
150         # assuming normally distributed data.
151         ci95 = se * 2
152         
153         ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % (
154             run, n, m, std, ci95 ) )
155
156         return m * 0.05 >= ci95
157
158     def run_experiment(self, filepath, wait_time, wait_guids):
159         """ Run an experiment based on the description stored
160         in filepath.
161
162         """
163         ec = ExperimentController.load(filepath)
164
165         ec.deploy()
166     
167         ec.wait_finished(wait_guids)
168         time.sleep(wait_time)
169
170         ec.release()
171
172         if ec.state == ECState.FAILED:
173             raise RuntimeError("Experiment failed")
174
175         return ec
176