52b4bde22395e3c2e0879e029ac28dff74e5ad8c
[nepi.git] / src / nepi / execution / runner.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.ec import ExperimentController, ECState
21
22 import math
23 import numpy
24 import os
25 import tempfile
26 import time
27
28 class ExperimentRunner(object):
29     """ The ExperimentRunner entity is reponsible of
30     re-running an experiment described by an ExperimentController 
31     multiple time.
32
33     """
34     def __init__(self):
35         super(ExperimentRunner, self).__init__()
36     
37     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
38             wait_guids = [], compute_metric_callback = None, 
39             evaluate_convergence_callback = None ):
40         """ Re-runs a same experiment multiple times
41
42         :param ec: Experiment description of experiment to run
43         :type ec: ExperimentController
44
45         :param min_runs: Minimum number of repetitions for experiment
46         :type min_runs: int
47
48         :param max_runs: Maximum number of repetitions for experiment
49         :type max_runs: int
50
51         :param wait_time: Time to wait in seconds between invoking
52             ec.deploy() and ec.release()
53         :type wait_time: float
54
55         :param wait_guids: List of guids to pass to ec.wait_finished
56             after invoking ec.deploy()
57         :type wait_guids: list 
58
59         :param compute_metric_callback: function to invoke after each 
60             experiment run, to compute an experiment metric. 
61             It will be invoked with the ec and the run count as arguments,
62             and it must return a numeric value for the computed metric:
63
64                 metric = compute_metric_callback(ec, run)
65             
66         :type compute_metric_callback: function 
67
68         :param evaluate_convergence_callback: function to evaluate whether the 
69             collected metric samples have converged and the experiment runner
70             can stop. It will be invoked with the ec, the run count and the
71             list of collected metric samples as argument, and it must return
72             either True or False:
73
74                 stop = evaluate_convergence_callback(ec, run, metrics)
75
76             If stop is True, then the runner will exit.
77             
78         :type evaluate_convergence_callback: function 
79
80         """
81
82         if (not max_runs or max_runs < 0) and not compute_metric_callback:
83             msg = "Undefined STOP condition, set stop_callback or max_runs"
84             raise RuntimeError, msg
85
86         if compute_metric_callback and not evaluate_convergence_callback:
87             evaluate_convergence_callback = self.evaluate_normal_convergence
88             ec.logger.info(" Treating data as normal to evaluate convergence. "
89                     "Experiment will stop when the standard error with 95% "
90                     "confidence interval is >= 5% of the mean of the collected samples ")
91         
92         # Force persistence of experiment controller
93         ec._persist = True
94
95         dirpath = tempfile.mkdtemp()
96         filepath = ec.save(dirpath)
97
98         samples = []
99         run = 0
100         while True: 
101             run += 1
102
103             ec = self.run_experiment(filepath, wait_time, wait_guids)
104             
105             ec.logger.info(" RUN %d \n" % run)
106
107             if run >= min_runs and max_runs > -1 and run >= max_runs :
108                 break
109
110             if compute_metric_callback:
111                 metric = compute_metric_callback(ec, run)
112                 if metric is not None:
113                     samples.append(metric)
114
115                     if run >= min_runs and evaluate_convergence_callback:
116                         if evaluate_convergence_callback(ec, run, samples):
117                             break
118             del ec
119
120         return run
121
122     def evaluate_normal_convergence(self, ec, run, samples):
123         if len(samples) == 0:
124             msg = "0 samples collected"
125             raise RuntimeError, msg
126         
127         x = numpy.array(samples)
128         n = len(samples)
129         std = x.std()
130         se = std / math.sqrt(n)
131         m = x.mean()
132         se95 = se * 2
133         
134         ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
135             run, n, m, std, se95 ) )
136
137         return m * 0.05 >= se95
138
139     def run_experiment(self, filepath, wait_time, wait_guids): 
140         ec = ExperimentController.load(filepath)
141
142         ec.deploy()
143     
144         ec.wait_finished(wait_guids)
145         time.sleep(wait_time)
146
147         ec.release()
148
149         if ec.state == ECState.FAILED:
150             raise RuntimeError, "Experiment failed"
151
152         return ec
153
154