#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
from nepi.execution.ec import ExperimentController, ECState
import math
import numpy
import os
import time
class ExperimentRunner(object):
""" The ExperimentRunner entity is responsible of
re-running an experiment described by an ExperimentController
multiple time
"""
def __init__(self):
super(ExperimentRunner, self).__init__()
def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0,
wait_guids = [], compute_metric_callback = None,
evaluate_convergence_callback = None ):
""" Run a same experiment independently multiple times, until the
evaluate_convergence_callback function returns True
:param ec: Description of experiment to replicate.
The runner takes care of deploying the EC, so ec.deploy()
must not be invoked directly before or after invoking
runner.run().
:type ec: ExperimentController
:param min_runs: Minimum number of times the experiment must be
replicated
:type min_runs: int
:param max_runs: Maximum number of times the experiment can be
replicated
:type max_runs: int
:param wait_time: Time to wait in seconds on each run between invoking
ec.deploy() and ec.release().
:type wait_time: float
:param wait_guids: List of guids wait for finalization on each run.
This list is passed to ec.wait_finished()
:type wait_guids: list
:param compute_metric_callback: User defined function invoked after
each experiment run to compute a metric. The metric is usually
a network measurement obtained from the data collected
during experiment execution.
The function is invoked passing the ec and the run number as arguments.
It must return the value for the computed metric(s) (usually a single
numerical value, but it can be several).
metric = compute_metric_callback(ec, run)
:type compute_metric_callback: function
:param evaluate_convergence_callback: User defined function invoked after
computing the metric on each run, to evaluate the experiment was
run enough times. It takes the list of cumulated metrics produced by
the compute_metric_callback up to the current run, and decided
whether the metrics have statistically converged to a meaningful value
or not. It must return either True or False.
stop = evaluate_convergence_callback(ec, run, metrics)
If stop is True, then the runner will exit.
:type evaluate_convergence_callback: function
"""
if (not max_runs or max_runs < 0) and not compute_metric_callback:
msg = "Undefined STOP condition, set stop_callback or max_runs"
raise RuntimeError(msg)
if compute_metric_callback and not evaluate_convergence_callback:
evaluate_convergence_callback = self.evaluate_normal_convergence
ec.logger.info(" Treating data as normal to evaluate convergence. "
"Experiment will stop when the standard error with 95% "
"confidence interval is >= 5% of the mean of the collected samples ")
# Force persistence of experiment controller
ec._persist = True
filepath = ec.save(dirpath = ec.exp_dir)
samples = []
run = 0
stop = False
while not stop:
run += 1
ec = self.run_experiment(filepath, wait_time, wait_guids)
ec.logger.info(" RUN %d \n" % run)
if compute_metric_callback:
metric = compute_metric_callback(ec, run)
if metric is not None:
samples.append(metric)
if run >= min_runs and evaluate_convergence_callback:
if evaluate_convergence_callback(ec, run, samples):
stop = True
if run >= min_runs and max_runs > -1 and run >= max_runs :
stop = True
ec.shutdown()
del ec
return run
def evaluate_normal_convergence(self, ec, run, metrics):
""" Returns True when the confidence interval of the sample mean is
less than 5% of the mean value, for a 95% confidence level,
assuming normal distribution of the data
"""
if len(metrics) == 0:
msg = "0 samples collected"
raise RuntimeError(msg)
x = numpy.array(metrics)
n = len(metrics)
std = x.std()
se = std / math.sqrt(n)
m = x.mean()
# Confidence interval for 95% confidence level,
# assuming normally distributed data.
ci95 = se * 2
ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % (
run, n, m, std, ci95 ) )
return m * 0.05 >= ci95
def run_experiment(self, filepath, wait_time, wait_guids):
""" Run an experiment based on the description stored
in filepath.
"""
ec = ExperimentController.load(filepath)
ec.deploy()
ec.wait_finished(wait_guids)
time.sleep(wait_time)
ec.release()
if ec.state == ECState.FAILED:
raise RuntimeError("Experiment failed")
return ec