Adding the ExperimentRunner
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 3 Aug 2014 14:56:31 +0000 (16:56 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 3 Aug 2014 14:56:31 +0000 (16:56 +0200)
src/nepi/execution/runner.py [new file with mode: 0644]
src/nepi/resources/all/collector.py
test/execution/runner.py [new file with mode: 0755]

diff --git a/src/nepi/execution/runner.py b/src/nepi/execution/runner.py
new file mode 100644 (file)
index 0000000..6757d7b
--- /dev/null
@@ -0,0 +1,160 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+
+import math
+import numpy
+import os
+import tempfile
+import time
+
+class ExperimentRunner(object):
+    """ The ExperimentRunner entity is reponsible of
+    re-running an experiment described by an ExperimentController 
+    multiple time.
+
+    """
+    def __init__(self):
+        super(ExperimentRunner, self).__init__()
+    
+    def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, 
+            wait_guids = [], compute_metric_callback = None, 
+            evaluate_convergence_callback = None ):
+        """ Re-runs a same experiment multiple times
+
+        :param ec: Experiment description of experiment to run
+        :type name: ExperimentController
+        :rtype: EperimentController
+
+        :param min_runs: Minimum number of repetitions for experiment
+        :type name: int
+        :rtype: int
+
+        :param max_runs: Maximum number of repetitions for experiment
+        :type name: int
+        :rtype: int
+
+        :param wait_time: Time to wait in seconds between invoking
+            ec.deploy() and ec.release()
+        :type name: float
+        :rtype: float
+
+        :param wait_guids: List of guids to pass to ec.wait_finished
+            after invoking ec.deploy()
+        :type name: list 
+        :rtype: list of int
+
+        :param compute_metric_callback: function to invoke after each 
+            experiment run, to compute an experiment metric. 
+            It will be invoked with the ec and the run count as arguments,
+            and it must return the value of the computed metric:
+
+                metric = compute_metric_callback(ec, run)
+            
+        :type name: function 
+        :rtype: function
+
+        :param evaluate_convergence_callback: function to evaluate whether the 
+            collected metric samples have converged and the experiment runner
+            can stop. It will be invoked with the ec, the run count and the
+            list of collected metric samples as argument, and it must return
+            either True or False:
+
+                stop = evaluate_convergence_callback(ec, run, metrics)
+
+            If stop is True, then the runner will exit.
+            
+        :type name: function 
+        :rtype: function
+
+        """
+
+        if (not max_runs or max_runs < 0) and not compute_metric_callback:
+            msg = "Undefined STOP condition, set stop_callback or max_runs"
+            raise RuntimeError, msg
+
+        if compute_metric_callback and not evaluate_convergence_callback:
+            evaluate_convergence_callback = self.evaluate_normal_convergence
+            ec.logger.info(" Treating data as normal to evaluate convergence. "
+                    "Experiment will stop when the standard error with 95% "
+                    "confidence interval is >= 5% of the mean of the collected samples ")
+        
+        # Set useRunId = True in Collectors to make sure results are
+        # independently stored.
+        collectors = ec.get_resources_by_type("Collector")
+        for collector in collectors:
+            collector.set("useRunId", True)
+
+        dirpath = tempfile.mkdtemp()
+        filepath = ec.save(dirpath)
+
+        samples = []
+        run = 0
+        while True: 
+            run += 1
+
+            ec = self.run_experiment(filepath, wait_time, wait_guids)
+            
+            ec.logger.info(" RUN %d \n" % run)
+
+            if run >= min_runs and max_runs > -1 and run >= max_runs :
+                break
+
+            if compute_metric_callback:
+                metric = compute_metric_callback(ec, run)
+                samples.append(metric)
+
+            if run >= min_runs and evaluate_convergence_callback:
+                if evaluate_convergence_callback(ec, run, samples):
+                    break
+            del ec
+
+        return run
+
+    def evaluate_normal_convergence(self, ec, run, samples):
+        if len(samples) == 0:
+            msg = "0 samples collected"
+            raise RuntimeError, msg
+
+        x = numpy.array(samples)
+        n = len(samples)
+        std = x.std()
+        se = std / math.sqrt(n)
+        m = x.mean()
+        se95 = se * 2
+        
+        ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
+            run, n, m, std, se95 ) )
+
+        return m * 0.05 >= se95
+
+    def run_experiment(self, filepath, wait_time, wait_guids): 
+        ec = ExperimentController.load(filepath)
+
+        ec.deploy()
+
+        ec.wait_finished(wait_guids)
+        time.sleep(wait_time)
+
+        ec.release()
+
+        return ec
+
+
index bb8c1c8..af0811c 100644 (file)
@@ -28,7 +28,7 @@ import tempfile
 
 @clsinit_copy
 class Collector(ResourceManager):
-    """ The collector is reponsible of collecting traces
+    """ The collector entity is reponsible of collecting traces
     of a same type associated to RMs into a local directory.
 
     .. class:: Class Args :
diff --git a/test/execution/runner.py b/test/execution/runner.py
new file mode 100755 (executable)
index 0000000..b06d383
--- /dev/null
@@ -0,0 +1,175 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceManager, ResourceState, \
+        clsinit_copy, ResourceAction, ResourceFactory
+from nepi.execution.runner import ExperimentRunner
+
+import functools
+import os
+import shutil
+import tempfile
+import time
+import unittest
+
+reschedule_delay = "0.5s"
+deploy_time = 0
+run_time = 0
+
+class Link(ResourceManager):
+    _rtype = "dummy::Link"
+    def do_deploy(self):
+        time.sleep(deploy_time)
+        super(Link, self).do_deploy()
+        self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Interface(ResourceManager):
+    _rtype = "dummy::Interface"
+
+    def do_deploy(self):
+        node = self.get_connected(Node.get_rtype())[0]
+        link = self.get_connected(Link.get_rtype())[0]
+
+        if node.state < ResourceState.READY or \
+                link.state < ResourceState.READY:
+            self.ec.schedule(reschedule_delay, self.deploy)
+            self.logger.debug(" -------- RESCHEDULING ------- ")
+        else:
+            time.sleep(deploy_time)
+            super(Interface, self).do_deploy()
+            self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Node(ResourceManager):
+    _rtype = "dummy::Node"
+
+    def do_deploy(self):
+        self.logger.debug(" -------- DO_DEPLOY ------- ")
+        time.sleep(deploy_time)
+        super(Node, self).do_deploy()
+        self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Application(ResourceManager):
+    _rtype = "dummy::Application"
+
+    def do_deploy(self):
+        node = self.get_connected(Node.get_rtype())[0]
+
+        if node.state < ResourceState.READY: 
+            self.ec.schedule(reschedule_delay, self.deploy)
+            self.logger.debug(" -------- RESCHEDULING ------- ")
+        else:
+            time.sleep(deploy_time)
+            super(Application, self).do_deploy()
+            self.logger.debug(" -------- DEPLOYED ------- ")
+
+    def do_start(self):
+        super(Application, self).do_start()
+        time.sleep(run_time)
+        self.ec.schedule("0s", self.stop)
+
+ResourceFactory.register_type(Application)
+ResourceFactory.register_type(Node)
+ResourceFactory.register_type(Interface)
+ResourceFactory.register_type(Link)
+
+class RunnerTestCase(unittest.TestCase):
+    def test_runner_max_runs(self):
+        node_count = 4
+        app_count = 2
+
+        ec = ExperimentController(exp_id = "max-runs-test")
+       
+        # Add simulated nodes and applications
+        nodes = list()
+        apps = list()
+        ifaces = list()
+
+        for i in xrange(node_count):
+            node = ec.register_resource("dummy::Node")
+            nodes.append(node)
+            
+            iface = ec.register_resource("dummy::Interface")
+            ec.register_connection(node, iface)
+            ifaces.append(iface)
+
+            for i in xrange(app_count):
+                app = ec.register_resource("dummy::Application")
+                ec.register_connection(node, app)
+                apps.append(app)
+
+        link = ec.register_resource("dummy::Link")
+
+        for iface in ifaces:
+            ec.register_connection(link, iface)
+
+        rnr = ExperimentRunner()
+        runs = rnr.run(ec, min_runs = 5, max_runs = 10, wait_guids = apps, 
+                wait_time = 0)
+
+        self.assertEquals(runs, 10)
+
+    def test_runner_convergence(self):
+        node_count = 4
+        app_count = 2
+
+        ec = ExperimentController(exp_id = "convergence-test")
+       
+        # Add simulated nodes and applications
+        nodes = list()
+        apps = list()
+        ifaces = list()
+
+        for i in xrange(node_count):
+            node = ec.register_resource("dummy::Node")
+            nodes.append(node)
+            
+            iface = ec.register_resource("dummy::Interface")
+            ec.register_connection(node, iface)
+            ifaces.append(iface)
+
+            for i in xrange(app_count):
+                app = ec.register_resource("dummy::Application")
+                ec.register_connection(node, app)
+                apps.append(app)
+
+        link = ec.register_resource("dummy::Link")
+
+        for iface in ifaces:
+            ec.register_connection(link, iface)
+
+        samples = [10, 10, 10, 10, 12, 10, 12, 10, 10, 11]
+        
+        def compute_metric_callback(samples, ec, run):
+            return samples[run-1]
+
+        metric_callback = functools.partial(compute_metric_callback, samples)
+
+        rnr = ExperimentRunner()
+        runs = rnr.run(ec, min_runs = 5, 
+                compute_metric_callback = metric_callback,
+                wait_guids = apps, 
+                wait_time = 0)
+
+        self.assertEquals(runs, 10)
+                       
+if __name__ == '__main__':
+    unittest.main()
+