Adding state RELEASED in EC to allow two stage termination from the experiment runner
[nepi.git] / src / nepi / execution / runner.py
index 60e75a4..21a20cb 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
 
 import math
 import numpy
 import os
-import tempfile
 import time
 
 class ExperimentRunner(object):
@@ -40,49 +39,42 @@ class ExperimentRunner(object):
         """ Re-runs a same experiment multiple times
 
         :param ec: Experiment description of experiment to run
-        :type name: ExperimentController
-        :rtype: EperimentController
+        :type ec: ExperimentController
 
         :param min_runs: Minimum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type min_runs: int
 
         :param max_runs: Maximum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type max_runs: int
 
         :param wait_time: Time to wait in seconds between invoking
             ec.deploy() and ec.release()
-        :type name: float
-        :rtype: float
+        :type wait_time: float
 
         :param wait_guids: List of guids to pass to ec.wait_finished
             after invoking ec.deploy()
-        :type name: list 
-        :rtype: list of int
+        :type wait_guids: list 
 
         :param compute_metric_callback: function to invoke after each 
             experiment run, to compute an experiment metric. 
             It will be invoked with the ec and the run count as arguments,
-            and it must return a numeric value for the computed metric:
+            and it must return the metric value(s) computed for the run
 
                 metric = compute_metric_callback(ec, run)
             
-        :type name: function 
-        :rtype: function
+        :type compute_metric_callback: function 
 
         :param evaluate_convergence_callback: function to evaluate whether the 
             collected metric samples have converged and the experiment runner
             can stop. It will be invoked with the ec, the run count and the
             list of collected metric samples as argument, and it must return
-            either True or False:
+            either True or False
 
                 stop = evaluate_convergence_callback(ec, run, metrics)
 
             If stop is True, then the runner will exit.
             
-        :type name: function 
-        :rtype: function
+        :type evaluate_convergence_callback: function 
 
         """
 
@@ -96,27 +88,22 @@ class ExperimentRunner(object):
                     "Experiment will stop when the standard error with 95% "
                     "confidence interval is >= 5% of the mean of the collected samples ")
         
-        # Set useRunId = True in Collectors to make sure results are
-        # independently stored.
-        collectors = ec.get_resources_by_type("Collector")
-        for collector in collectors:
-            collector.set("useRunId", True)
+        # Force persistence of experiment controller
+        ec._persist = True
 
-        dirpath = tempfile.mkdtemp()
-        filepath = ec.save(dirpath)
+        filepath = ec.save(dirpath = ec.exp_dir)
 
         samples = []
         run = 0
-        while True: 
+        stop = False
+
+        while not stop: 
             run += 1
 
             ec = self.run_experiment(filepath, wait_time, wait_guids)
             
             ec.logger.info(" RUN %d \n" % run)
 
-            if run >= min_runs and max_runs > -1 and run >= max_runs :
-                break
-
             if compute_metric_callback:
                 metric = compute_metric_callback(ec, run)
                 if metric is not None:
@@ -124,7 +111,12 @@ class ExperimentRunner(object):
 
                     if run >= min_runs and evaluate_convergence_callback:
                         if evaluate_convergence_callback(ec, run, samples):
-                            break
+                            stop = True
+
+            if run >= min_runs and max_runs > -1 and run >= max_runs :
+                stop = True
+
+            ec.shutdown()
             del ec
 
         return run
@@ -133,29 +125,34 @@ class ExperimentRunner(object):
         if len(samples) == 0:
             msg = "0 samples collected"
             raise RuntimeError, msg
-        
+
         x = numpy.array(samples)
         n = len(samples)
         std = x.std()
         se = std / math.sqrt(n)
         m = x.mean()
-        se95 = se * 2
+
+        # confidence interval for 95% confidence level.
+        # Asuming samples are normally distributed
+        ci95 = se * 2
         
-        ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
-            run, n, m, std, se95 ) )
+        ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f CI (95%%) %.2f \n" % (
+            run, n, m, std, ci95 ) )
 
-        return m * 0.05 >= se95
+        return m * 0.05 >= ci95
 
     def run_experiment(self, filepath, wait_time, wait_guids): 
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
-
+    
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
-        return ec
+        if ec.state == ECState.FAILED:
+            raise RuntimeError, "Experiment failed"
 
+        return ec