60e75a4ff8a5eecdf1208739d59f9500c58377be
[nepi.git] / src / nepi / execution / runner.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.ec import ExperimentController
21
22 import math
23 import numpy
24 import os
25 import tempfile
26 import time
27
28 class ExperimentRunner(object):
29     """ The ExperimentRunner entity is reponsible of
30     re-running an experiment described by an ExperimentController 
31     multiple time.
32
33     """
34     def __init__(self):
35         super(ExperimentRunner, self).__init__()
36     
37     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
38             wait_guids = [], compute_metric_callback = None, 
39             evaluate_convergence_callback = None ):
40         """ Re-runs a same experiment multiple times
41
42         :param ec: Experiment description of experiment to run
43         :type name: ExperimentController
44         :rtype: EperimentController
45
46         :param min_runs: Minimum number of repetitions for experiment
47         :type name: int
48         :rtype: int
49
50         :param max_runs: Maximum number of repetitions for experiment
51         :type name: int
52         :rtype: int
53
54         :param wait_time: Time to wait in seconds between invoking
55             ec.deploy() and ec.release()
56         :type name: float
57         :rtype: float
58
59         :param wait_guids: List of guids to pass to ec.wait_finished
60             after invoking ec.deploy()
61         :type name: list 
62         :rtype: list of int
63
64         :param compute_metric_callback: function to invoke after each 
65             experiment run, to compute an experiment metric. 
66             It will be invoked with the ec and the run count as arguments,
67             and it must return a numeric value for the computed metric:
68
69                 metric = compute_metric_callback(ec, run)
70             
71         :type name: function 
72         :rtype: function
73
74         :param evaluate_convergence_callback: function to evaluate whether the 
75             collected metric samples have converged and the experiment runner
76             can stop. It will be invoked with the ec, the run count and the
77             list of collected metric samples as argument, and it must return
78             either True or False:
79
80                 stop = evaluate_convergence_callback(ec, run, metrics)
81
82             If stop is True, then the runner will exit.
83             
84         :type name: function 
85         :rtype: function
86
87         """
88
89         if (not max_runs or max_runs < 0) and not compute_metric_callback:
90             msg = "Undefined STOP condition, set stop_callback or max_runs"
91             raise RuntimeError, msg
92
93         if compute_metric_callback and not evaluate_convergence_callback:
94             evaluate_convergence_callback = self.evaluate_normal_convergence
95             ec.logger.info(" Treating data as normal to evaluate convergence. "
96                     "Experiment will stop when the standard error with 95% "
97                     "confidence interval is >= 5% of the mean of the collected samples ")
98         
99         # Set useRunId = True in Collectors to make sure results are
100         # independently stored.
101         collectors = ec.get_resources_by_type("Collector")
102         for collector in collectors:
103             collector.set("useRunId", True)
104
105         dirpath = tempfile.mkdtemp()
106         filepath = ec.save(dirpath)
107
108         samples = []
109         run = 0
110         while True: 
111             run += 1
112
113             ec = self.run_experiment(filepath, wait_time, wait_guids)
114             
115             ec.logger.info(" RUN %d \n" % run)
116
117             if run >= min_runs and max_runs > -1 and run >= max_runs :
118                 break
119
120             if compute_metric_callback:
121                 metric = compute_metric_callback(ec, run)
122                 if metric is not None:
123                     samples.append(metric)
124
125                     if run >= min_runs and evaluate_convergence_callback:
126                         if evaluate_convergence_callback(ec, run, samples):
127                             break
128             del ec
129
130         return run
131
132     def evaluate_normal_convergence(self, ec, run, samples):
133         if len(samples) == 0:
134             msg = "0 samples collected"
135             raise RuntimeError, msg
136         
137         x = numpy.array(samples)
138         n = len(samples)
139         std = x.std()
140         se = std / math.sqrt(n)
141         m = x.mean()
142         se95 = se * 2
143         
144         ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
145             run, n, m, std, se95 ) )
146
147         return m * 0.05 >= se95
148
149     def run_experiment(self, filepath, wait_time, wait_guids): 
150         ec = ExperimentController.load(filepath)
151
152         ec.deploy()
153
154         ec.wait_finished(wait_guids)
155         time.sleep(wait_time)
156
157         ec.release()
158
159         return ec
160
161