X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Frunner.py;h=dd2f7dabb01faf6b2f09f76842b50c074afa4ce7;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=6757d7b5a6827578f78d6a784863870617901365;hpb=6c439bea6cf4d6af7512fc746dca75118bf39d39;p=nepi.git diff --git a/src/nepi/execution/runner.py b/src/nepi/execution/runner.py index 6757d7b5..dd2f7dab 100644 --- a/src/nepi/execution/runner.py +++ b/src/nepi/execution/runner.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -17,18 +16,17 @@ # # Author: Alina Quereilhac -from nepi.execution.ec import ExperimentController +from nepi.execution.ec import ExperimentController, ECState import math import numpy import os -import tempfile import time class ExperimentRunner(object): - """ The ExperimentRunner entity is reponsible of + """ The ExperimentRunner entity is responsible of re-running an experiment described by an ExperimentController - multiple time. + multiple time """ def __init__(self): @@ -37,58 +35,61 @@ class ExperimentRunner(object): def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, wait_guids = [], compute_metric_callback = None, evaluate_convergence_callback = None ): - """ Re-runs a same experiment multiple times - - :param ec: Experiment description of experiment to run - :type name: ExperimentController - :rtype: EperimentController - - :param min_runs: Minimum number of repetitions for experiment - :type name: int - :rtype: int - - :param max_runs: Maximum number of repetitions for experiment - :type name: int - :rtype: int - - :param wait_time: Time to wait in seconds between invoking - ec.deploy() and ec.release() - :type name: float - :rtype: float - - :param wait_guids: List of guids to pass to ec.wait_finished - after invoking ec.deploy() - :type name: list - :rtype: list of int - - :param compute_metric_callback: function to invoke after each - experiment run, to compute an experiment metric. - It will be invoked with the ec and the run count as arguments, - and it must return the value of the computed metric: + """ Run a same experiment independently multiple times, until the + evaluate_convergence_callback function returns True + + :param ec: Description of experiment to replicate. + The runner takes care of deploying the EC, so ec.deploy() + must not be invoked directly before or after invoking + runner.run(). + :type ec: ExperimentController + + :param min_runs: Minimum number of times the experiment must be + replicated + :type min_runs: int + + :param max_runs: Maximum number of times the experiment can be + replicated + :type max_runs: int + + :param wait_time: Time to wait in seconds on each run between invoking + ec.deploy() and ec.release(). + :type wait_time: float + + :param wait_guids: List of guids wait for finalization on each run. + This list is passed to ec.wait_finished() + :type wait_guids: list + + :param compute_metric_callback: User defined function invoked after + each experiment run to compute a metric. The metric is usually + a network measurement obtained from the data collected + during experiment execution. + The function is invoked passing the ec and the run number as arguments. + It must return the value for the computed metric(s) (usually a single + numerical value, but it can be several). metric = compute_metric_callback(ec, run) - :type name: function - :rtype: function + :type compute_metric_callback: function - :param evaluate_convergence_callback: function to evaluate whether the - collected metric samples have converged and the experiment runner - can stop. It will be invoked with the ec, the run count and the - list of collected metric samples as argument, and it must return - either True or False: + :param evaluate_convergence_callback: User defined function invoked after + computing the metric on each run, to evaluate the experiment was + run enough times. It takes the list of cumulated metrics produced by + the compute_metric_callback up to the current run, and decided + whether the metrics have statistically converged to a meaningful value + or not. It must return either True or False. stop = evaluate_convergence_callback(ec, run, metrics) If stop is True, then the runner will exit. - :type name: function - :rtype: function + :type evaluate_convergence_callback: function """ if (not max_runs or max_runs < 0) and not compute_metric_callback: msg = "Undefined STOP condition, set stop_callback or max_runs" - raise RuntimeError, msg + raise RuntimeError(msg) if compute_metric_callback and not evaluate_convergence_callback: evaluate_convergence_callback = self.evaluate_normal_convergence @@ -96,65 +97,80 @@ class ExperimentRunner(object): "Experiment will stop when the standard error with 95% " "confidence interval is >= 5% of the mean of the collected samples ") - # Set useRunId = True in Collectors to make sure results are - # independently stored. - collectors = ec.get_resources_by_type("Collector") - for collector in collectors: - collector.set("useRunId", True) + # Force persistence of experiment controller + ec._persist = True - dirpath = tempfile.mkdtemp() - filepath = ec.save(dirpath) + filepath = ec.save(dirpath = ec.exp_dir) samples = [] run = 0 - while True: + stop = False + + while not stop: run += 1 ec = self.run_experiment(filepath, wait_time, wait_guids) ec.logger.info(" RUN %d \n" % run) - if run >= min_runs and max_runs > -1 and run >= max_runs : - break - if compute_metric_callback: metric = compute_metric_callback(ec, run) - samples.append(metric) + if metric is not None: + samples.append(metric) - if run >= min_runs and evaluate_convergence_callback: - if evaluate_convergence_callback(ec, run, samples): - break + if run >= min_runs and evaluate_convergence_callback: + if evaluate_convergence_callback(ec, run, samples): + stop = True + + if run >= min_runs and max_runs > -1 and run >= max_runs : + stop = True + + ec.shutdown() del ec return run - def evaluate_normal_convergence(self, ec, run, samples): - if len(samples) == 0: + def evaluate_normal_convergence(self, ec, run, metrics): + """ Returns True when the confidence interval of the sample mean is + less than 5% of the mean value, for a 95% confidence level, + assuming normal distribution of the data + """ + + if len(metrics) == 0: msg = "0 samples collected" - raise RuntimeError, msg + raise RuntimeError(msg) - x = numpy.array(samples) - n = len(samples) + x = numpy.array(metrics) + n = len(metrics) std = x.std() se = std / math.sqrt(n) m = x.mean() - se95 = se * 2 + + # Confidence interval for 95% confidence level, + # assuming normally distributed data. + ci95 = se * 2 - ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % ( - run, n, m, std, se95 ) ) + ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % ( + run, n, m, std, ci95 ) ) + + return m * 0.05 >= ci95 - return m * 0.05 >= se95 + def run_experiment(self, filepath, wait_time, wait_guids): + """ Run an experiment based on the description stored + in filepath. - def run_experiment(self, filepath, wait_time, wait_guids): + """ ec = ExperimentController.load(filepath) ec.deploy() - + ec.wait_finished(wait_guids) time.sleep(wait_time) ec.release() - return ec + if ec.state == ECState.FAILED: + raise RuntimeError("Experiment failed") + return ec