#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
from nepi.execution.ec import ExperimentController
import math
import numpy
import os
import tempfile
import time
class ExperimentRunner(object):
""" The ExperimentRunner entity is reponsible of
re-running an experiment described by an ExperimentController
multiple time.
"""
def __init__(self):
super(ExperimentRunner, self).__init__()
def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0,
wait_guids = [], compute_metric_callback = None,
evaluate_convergence_callback = None ):
""" Re-runs a same experiment multiple times
:param ec: Experiment description of experiment to run
:type name: ExperimentController
:rtype: EperimentController
:param min_runs: Minimum number of repetitions for experiment
:type name: int
:rtype: int
:param max_runs: Maximum number of repetitions for experiment
:type name: int
:rtype: int
:param wait_time: Time to wait in seconds between invoking
ec.deploy() and ec.release()
:type name: float
:rtype: float
:param wait_guids: List of guids to pass to ec.wait_finished
after invoking ec.deploy()
:type name: list
:rtype: list of int
:param compute_metric_callback: function to invoke after each
experiment run, to compute an experiment metric.
It will be invoked with the ec and the run count as arguments,
and it must return a numeric value for the computed metric:
metric = compute_metric_callback(ec, run)
:type name: function
:rtype: function
:param evaluate_convergence_callback: function to evaluate whether the
collected metric samples have converged and the experiment runner
can stop. It will be invoked with the ec, the run count and the
list of collected metric samples as argument, and it must return
either True or False:
stop = evaluate_convergence_callback(ec, run, metrics)
If stop is True, then the runner will exit.
:type name: function
:rtype: function
"""
if (not max_runs or max_runs < 0) and not compute_metric_callback:
msg = "Undefined STOP condition, set stop_callback or max_runs"
raise RuntimeError, msg
if compute_metric_callback and not evaluate_convergence_callback:
evaluate_convergence_callback = self.evaluate_normal_convergence
ec.logger.info(" Treating data as normal to evaluate convergence. "
"Experiment will stop when the standard error with 95% "
"confidence interval is >= 5% of the mean of the collected samples ")
# Set useRunId = True in Collectors to make sure results are
# independently stored.
collectors = ec.get_resources_by_type("Collector")
for collector in collectors:
collector.set("useRunId", True)
dirpath = tempfile.mkdtemp()
filepath = ec.save(dirpath)
samples = []
run = 0
while True:
run += 1
ec = self.run_experiment(filepath, wait_time, wait_guids)
ec.logger.info(" RUN %d \n" % run)
if run >= min_runs and max_runs > -1 and run >= max_runs :
break
if compute_metric_callback:
metric = compute_metric_callback(ec, run)
if metric is not None:
samples.append(metric)
if run >= min_runs and evaluate_convergence_callback:
if evaluate_convergence_callback(ec, run, samples):
break
del ec
return run
def evaluate_normal_convergence(self, ec, run, samples):
if len(samples) == 0:
msg = "0 samples collected"
raise RuntimeError, msg
x = numpy.array(samples)
n = len(samples)
std = x.std()
se = std / math.sqrt(n)
m = x.mean()
se95 = se * 2
ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
run, n, m, std, se95 ) )
return m * 0.05 >= se95
def run_experiment(self, filepath, wait_time, wait_guids):
ec = ExperimentController.load(filepath)
ec.deploy()
ec.wait_finished(wait_guids)
time.sleep(wait_time)
ec.release()
return ec