Adding the ExperimentRunner
[nepi.git] / src / nepi / execution / runner.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.ec import ExperimentController
21
22 import math
23 import numpy
24 import os
25 import tempfile
26 import time
27
28 class ExperimentRunner(object):
29     """ The ExperimentRunner entity is reponsible of
30     re-running an experiment described by an ExperimentController 
31     multiple time.
32
33     """
34     def __init__(self):
35         super(ExperimentRunner, self).__init__()
36     
37     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
38             wait_guids = [], compute_metric_callback = None, 
39             evaluate_convergence_callback = None ):
40         """ Re-runs a same experiment multiple times
41
42         :param ec: Experiment description of experiment to run
43         :type name: ExperimentController
44         :rtype: EperimentController
45
46         :param min_runs: Minimum number of repetitions for experiment
47         :type name: int
48         :rtype: int
49
50         :param max_runs: Maximum number of repetitions for experiment
51         :type name: int
52         :rtype: int
53
54         :param wait_time: Time to wait in seconds between invoking
55             ec.deploy() and ec.release()
56         :type name: float
57         :rtype: float
58
59         :param wait_guids: List of guids to pass to ec.wait_finished
60             after invoking ec.deploy()
61         :type name: list 
62         :rtype: list of int
63
64         :param compute_metric_callback: function to invoke after each 
65             experiment run, to compute an experiment metric. 
66             It will be invoked with the ec and the run count as arguments,
67             and it must return the value of the computed metric:
68
69                 metric = compute_metric_callback(ec, run)
70             
71         :type name: function 
72         :rtype: function
73
74         :param evaluate_convergence_callback: function to evaluate whether the 
75             collected metric samples have converged and the experiment runner
76             can stop. It will be invoked with the ec, the run count and the
77             list of collected metric samples as argument, and it must return
78             either True or False:
79
80                 stop = evaluate_convergence_callback(ec, run, metrics)
81
82             If stop is True, then the runner will exit.
83             
84         :type name: function 
85         :rtype: function
86
87         """
88
89         if (not max_runs or max_runs < 0) and not compute_metric_callback:
90             msg = "Undefined STOP condition, set stop_callback or max_runs"
91             raise RuntimeError, msg
92
93         if compute_metric_callback and not evaluate_convergence_callback:
94             evaluate_convergence_callback = self.evaluate_normal_convergence
95             ec.logger.info(" Treating data as normal to evaluate convergence. "
96                     "Experiment will stop when the standard error with 95% "
97                     "confidence interval is >= 5% of the mean of the collected samples ")
98         
99         # Set useRunId = True in Collectors to make sure results are
100         # independently stored.
101         collectors = ec.get_resources_by_type("Collector")
102         for collector in collectors:
103             collector.set("useRunId", True)
104
105         dirpath = tempfile.mkdtemp()
106         filepath = ec.save(dirpath)
107
108         samples = []
109         run = 0
110         while True: 
111             run += 1
112
113             ec = self.run_experiment(filepath, wait_time, wait_guids)
114             
115             ec.logger.info(" RUN %d \n" % run)
116
117             if run >= min_runs and max_runs > -1 and run >= max_runs :
118                 break
119
120             if compute_metric_callback:
121                 metric = compute_metric_callback(ec, run)
122                 samples.append(metric)
123
124             if run >= min_runs and evaluate_convergence_callback:
125                 if evaluate_convergence_callback(ec, run, samples):
126                     break
127             del ec
128
129         return run
130
131     def evaluate_normal_convergence(self, ec, run, samples):
132         if len(samples) == 0:
133             msg = "0 samples collected"
134             raise RuntimeError, msg
135
136         x = numpy.array(samples)
137         n = len(samples)
138         std = x.std()
139         se = std / math.sqrt(n)
140         m = x.mean()
141         se95 = se * 2
142         
143         ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
144             run, n, m, std, se95 ) )
145
146         return m * 0.05 >= se95
147
148     def run_experiment(self, filepath, wait_time, wait_guids): 
149         ec = ExperimentController.load(filepath)
150
151         ec.deploy()
152
153         ec.wait_finished(wait_guids)
154         time.sleep(wait_time)
155
156         ec.release()
157
158         return ec
159
160