small modification to th experiment runner
[nepi.git] / src / nepi / execution / runner.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.ec import ExperimentController, ECState
21
22 import math
23 import numpy
24 import os
25 import time
26
27 class ExperimentRunner(object):
28     """ The ExperimentRunner entity is reponsible of
29     re-running an experiment described by an ExperimentController 
30     multiple time.
31
32     """
33     def __init__(self):
34         super(ExperimentRunner, self).__init__()
35     
36     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
37             wait_guids = [], compute_metric_callback = None, 
38             evaluate_convergence_callback = None ):
39         """ Re-runs a same experiment multiple times
40
41         :param ec: Experiment description of experiment to run
42         :type ec: ExperimentController
43
44         :param min_runs: Minimum number of repetitions for experiment
45         :type min_runs: int
46
47         :param max_runs: Maximum number of repetitions for experiment
48         :type max_runs: int
49
50         :param wait_time: Time to wait in seconds between invoking
51             ec.deploy() and ec.release()
52         :type wait_time: float
53
54         :param wait_guids: List of guids to pass to ec.wait_finished
55             after invoking ec.deploy()
56         :type wait_guids: list 
57
58         :param compute_metric_callback: function to invoke after each 
59             experiment run, to compute an experiment metric. 
60             It will be invoked with the ec and the run count as arguments,
61             and it must return the metric value(s) computed for the run
62
63                 metric = compute_metric_callback(ec, run)
64             
65         :type compute_metric_callback: function 
66
67         :param evaluate_convergence_callback: function to evaluate whether the 
68             collected metric samples have converged and the experiment runner
69             can stop. It will be invoked with the ec, the run count and the
70             list of collected metric samples as argument, and it must return
71             either True or False
72
73                 stop = evaluate_convergence_callback(ec, run, metrics)
74
75             If stop is True, then the runner will exit.
76             
77         :type evaluate_convergence_callback: function 
78
79         """
80
81         if (not max_runs or max_runs < 0) and not compute_metric_callback:
82             msg = "Undefined STOP condition, set stop_callback or max_runs"
83             raise RuntimeError, msg
84
85         if compute_metric_callback and not evaluate_convergence_callback:
86             evaluate_convergence_callback = self.evaluate_normal_convergence
87             ec.logger.info(" Treating data as normal to evaluate convergence. "
88                     "Experiment will stop when the standard error with 95% "
89                     "confidence interval is >= 5% of the mean of the collected samples ")
90         
91         # Force persistence of experiment controller
92         ec._persist = True
93
94         filepath = ec.save(dirpath = ec.exp_dir)
95
96         samples = []
97         run = 0
98         stop = False
99
100         while not stop: 
101             run += 1
102
103             ec = self.run_experiment(filepath, wait_time, wait_guids)
104             
105             ec.logger.info(" RUN %d \n" % run)
106
107             if compute_metric_callback:
108                 metric = compute_metric_callback(ec, run)
109                 if metric is not None:
110                     samples.append(metric)
111
112                     if run >= min_runs and evaluate_convergence_callback:
113                         if evaluate_convergence_callback(ec, run, samples):
114                             stop = True
115
116             if run >= min_runs and max_runs > -1 and run >= max_runs :
117                 stop = True
118
119             del ec
120
121         return run
122
123     def evaluate_normal_convergence(self, ec, run, samples):
124         if len(samples) == 0:
125             msg = "0 samples collected"
126             raise RuntimeError, msg
127
128         x = numpy.array(samples)
129         n = len(samples)
130         std = x.std()
131         se = std / math.sqrt(n)
132         m = x.mean()
133
134         # confidence interval for 95% confidence level.
135         # Asuming samples are normally distributed
136         ci95 = se * 2
137         
138         ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % (
139             run, n, m, std, ci95 ) )
140
141         return m * 0.05 >= ci95
142
143     def run_experiment(self, filepath, wait_time, wait_guids): 
144         ec = ExperimentController.load(filepath)
145
146         ec.deploy()
147     
148         ec.wait_finished(wait_guids)
149         time.sleep(wait_time)
150
151         ec.release()
152
153         if ec.state == ECState.FAILED:
154             raise RuntimeError, "Experiment failed"
155
156         return ec
157