applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / execution / runner.py
index 6757d7b..dd2f7da 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
 
 import math
 import numpy
 import os
-import tempfile
 import time
 
 class ExperimentRunner(object):
-    """ The ExperimentRunner entity is reponsible of
+    """ The ExperimentRunner entity is responsible of
     re-running an experiment described by an ExperimentController 
-    multiple time.
+    multiple time
 
     """
     def __init__(self):
@@ -37,58 +35,61 @@ class ExperimentRunner(object):
     def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
             wait_guids = [], compute_metric_callback = None, 
             evaluate_convergence_callback = None ):
-        """ Re-runs a same experiment multiple times
-
-        :param ec: Experiment description of experiment to run
-        :type name: ExperimentController
-        :rtype: EperimentController
-
-        :param min_runs: Minimum number of repetitions for experiment
-        :type name: int
-        :rtype: int
-
-        :param max_runs: Maximum number of repetitions for experiment
-        :type name: int
-        :rtype: int
-
-        :param wait_time: Time to wait in seconds between invoking
-            ec.deploy() and ec.release()
-        :type name: float
-        :rtype: float
-
-        :param wait_guids: List of guids to pass to ec.wait_finished
-            after invoking ec.deploy()
-        :type name: list 
-        :rtype: list of int
-
-        :param compute_metric_callback: function to invoke after each 
-            experiment run, to compute an experiment metric. 
-            It will be invoked with the ec and the run count as arguments,
-            and it must return the value of the computed metric:
+        """ Run a same experiment independently multiple times, until the 
+        evaluate_convergence_callback function returns True
+
+        :param ec: Description of experiment to replicate. 
+            The runner takes care of deploying the EC, so ec.deploy()
+            must not be invoked directly before or after invoking
+            runner.run().
+        :type ec: ExperimentController
+
+        :param min_runs: Minimum number of times the experiment must be 
+            replicated
+        :type min_runs: int
+
+        :param max_runs: Maximum number of times the experiment can be 
+            replicated
+        :type max_runs: int
+
+        :param wait_time: Time to wait in seconds on each run between invoking
+            ec.deploy() and ec.release().
+        :type wait_time: float
+
+        :param wait_guids: List of guids wait for finalization on each run.
+            This list is passed to ec.wait_finished()
+        :type wait_guids: list 
+
+        :param compute_metric_callback: User defined function invoked after 
+            each experiment run to compute a metric. The metric is usually
+            a network measurement obtained from the data collected 
+            during experiment execution.
+            The function is invoked passing the ec and the run number as arguments. 
+            It must return the value for the computed metric(s) (usually a single 
+            numerical value, but it can be several).
 
                 metric = compute_metric_callback(ec, run)
             
-        :type name: function 
-        :rtype: function
+        :type compute_metric_callback: function 
 
-        :param evaluate_convergence_callback: function to evaluate whether the 
-            collected metric samples have converged and the experiment runner
-            can stop. It will be invoked with the ec, the run count and the
-            list of collected metric samples as argument, and it must return
-            either True or False:
+        :param evaluate_convergence_callback: User defined function invoked after
+            computing the metric on each run, to evaluate the experiment was
+            run enough times. It takes the list of cumulated metrics produced by 
+            the compute_metric_callback up to the current run, and decided 
+            whether the metrics have statistically converged to a meaningful value
+            or not. It must return either True or False. 
 
                 stop = evaluate_convergence_callback(ec, run, metrics)
 
             If stop is True, then the runner will exit.
             
-        :type name: function 
-        :rtype: function
+        :type evaluate_convergence_callback: function 
 
         """
 
         if (not max_runs or max_runs < 0) and not compute_metric_callback:
             msg = "Undefined STOP condition, set stop_callback or max_runs"
-            raise RuntimeError, msg
+            raise RuntimeError(msg)
 
         if compute_metric_callback and not evaluate_convergence_callback:
             evaluate_convergence_callback = self.evaluate_normal_convergence
@@ -96,65 +97,80 @@ class ExperimentRunner(object):
                     "Experiment will stop when the standard error with 95% "
                     "confidence interval is >= 5% of the mean of the collected samples ")
         
-        # Set useRunId = True in Collectors to make sure results are
-        # independently stored.
-        collectors = ec.get_resources_by_type("Collector")
-        for collector in collectors:
-            collector.set("useRunId", True)
+        # Force persistence of experiment controller
+        ec._persist = True
 
-        dirpath = tempfile.mkdtemp()
-        filepath = ec.save(dirpath)
+        filepath = ec.save(dirpath = ec.exp_dir)
 
         samples = []
         run = 0
-        while True: 
+        stop = False
+
+        while not stop: 
             run += 1
 
             ec = self.run_experiment(filepath, wait_time, wait_guids)
             
             ec.logger.info(" RUN %d \n" % run)
 
-            if run >= min_runs and max_runs > -1 and run >= max_runs :
-                break
-
             if compute_metric_callback:
                 metric = compute_metric_callback(ec, run)
-                samples.append(metric)
+                if metric is not None:
+                    samples.append(metric)
 
-            if run >= min_runs and evaluate_convergence_callback:
-                if evaluate_convergence_callback(ec, run, samples):
-                    break
+                    if run >= min_runs and evaluate_convergence_callback:
+                        if evaluate_convergence_callback(ec, run, samples):
+                            stop = True
+
+            if run >= min_runs and max_runs > -1 and run >= max_runs :
+                stop = True
+
+            ec.shutdown()
             del ec
 
         return run
 
-    def evaluate_normal_convergence(self, ec, run, samples):
-        if len(samples) == 0:
+    def evaluate_normal_convergence(self, ec, run, metrics):
+        """ Returns True when the confidence interval of the sample mean is
+        less than 5% of the mean value, for a 95% confidence level,
+        assuming normal distribution of the data
+        """
+
+        if len(metrics) == 0:
             msg = "0 samples collected"
-            raise RuntimeError, msg
+            raise RuntimeError(msg)
 
-        x = numpy.array(samples)
-        n = len(samples)
+        x = numpy.array(metrics)
+        n = len(metrics)
         std = x.std()
         se = std / math.sqrt(n)
         m = x.mean()
-        se95 = se * 2
+
+        # Confidence interval for 95% confidence level, 
+        # assuming normally distributed data.
+        ci95 = se * 2
         
-        ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
-            run, n, m, std, se95 ) )
+        ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % (
+            run, n, m, std, ci95 ) )
+
+        return m * 0.05 >= ci95
 
-        return m * 0.05 >= se95
+    def run_experiment(self, filepath, wait_time, wait_guids):
+        """ Run an experiment based on the description stored
+        in filepath.
 
-    def run_experiment(self, filepath, wait_time, wait_guids): 
+        """
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
-
+    
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
-        return ec
+        if ec.state == ECState.FAILED:
+            raise RuntimeError("Experiment failed")
 
+        return ec