--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+
+import math
+import numpy
+import os
+import tempfile
+import time
+
+class ExperimentRunner(object):
+ """ The ExperimentRunner entity is reponsible of
+ re-running an experiment described by an ExperimentController
+ multiple time.
+
+ """
+ def __init__(self):
+ super(ExperimentRunner, self).__init__()
+
+ def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0,
+ wait_guids = [], compute_metric_callback = None,
+ evaluate_convergence_callback = None ):
+ """ Re-runs a same experiment multiple times
+
+ :param ec: Experiment description of experiment to run
+ :type name: ExperimentController
+ :rtype: EperimentController
+
+ :param min_runs: Minimum number of repetitions for experiment
+ :type name: int
+ :rtype: int
+
+ :param max_runs: Maximum number of repetitions for experiment
+ :type name: int
+ :rtype: int
+
+ :param wait_time: Time to wait in seconds between invoking
+ ec.deploy() and ec.release()
+ :type name: float
+ :rtype: float
+
+ :param wait_guids: List of guids to pass to ec.wait_finished
+ after invoking ec.deploy()
+ :type name: list
+ :rtype: list of int
+
+ :param compute_metric_callback: function to invoke after each
+ experiment run, to compute an experiment metric.
+ It will be invoked with the ec and the run count as arguments,
+ and it must return the value of the computed metric:
+
+ metric = compute_metric_callback(ec, run)
+
+ :type name: function
+ :rtype: function
+
+ :param evaluate_convergence_callback: function to evaluate whether the
+ collected metric samples have converged and the experiment runner
+ can stop. It will be invoked with the ec, the run count and the
+ list of collected metric samples as argument, and it must return
+ either True or False:
+
+ stop = evaluate_convergence_callback(ec, run, metrics)
+
+ If stop is True, then the runner will exit.
+
+ :type name: function
+ :rtype: function
+
+ """
+
+ if (not max_runs or max_runs < 0) and not compute_metric_callback:
+ msg = "Undefined STOP condition, set stop_callback or max_runs"
+ raise RuntimeError, msg
+
+ if compute_metric_callback and not evaluate_convergence_callback:
+ evaluate_convergence_callback = self.evaluate_normal_convergence
+ ec.logger.info(" Treating data as normal to evaluate convergence. "
+ "Experiment will stop when the standard error with 95% "
+ "confidence interval is >= 5% of the mean of the collected samples ")
+
+ # Set useRunId = True in Collectors to make sure results are
+ # independently stored.
+ collectors = ec.get_resources_by_type("Collector")
+ for collector in collectors:
+ collector.set("useRunId", True)
+
+ dirpath = tempfile.mkdtemp()
+ filepath = ec.save(dirpath)
+
+ samples = []
+ run = 0
+ while True:
+ run += 1
+
+ ec = self.run_experiment(filepath, wait_time, wait_guids)
+
+ ec.logger.info(" RUN %d \n" % run)
+
+ if run >= min_runs and max_runs > -1 and run >= max_runs :
+ break
+
+ if compute_metric_callback:
+ metric = compute_metric_callback(ec, run)
+ samples.append(metric)
+
+ if run >= min_runs and evaluate_convergence_callback:
+ if evaluate_convergence_callback(ec, run, samples):
+ break
+ del ec
+
+ return run
+
+ def evaluate_normal_convergence(self, ec, run, samples):
+ if len(samples) == 0:
+ msg = "0 samples collected"
+ raise RuntimeError, msg
+
+ x = numpy.array(samples)
+ n = len(samples)
+ std = x.std()
+ se = std / math.sqrt(n)
+ m = x.mean()
+ se95 = se * 2
+
+ ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % (
+ run, n, m, std, se95 ) )
+
+ return m * 0.05 >= se95
+
+ def run_experiment(self, filepath, wait_time, wait_guids):
+ ec = ExperimentController.load(filepath)
+
+ ec.deploy()
+
+ ec.wait_finished(wait_guids)
+ time.sleep(wait_time)
+
+ ec.release()
+
+ return ec
+
+
--- /dev/null
+#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceManager, ResourceState, \
+ clsinit_copy, ResourceAction, ResourceFactory
+from nepi.execution.runner import ExperimentRunner
+
+import functools
+import os
+import shutil
+import tempfile
+import time
+import unittest
+
+reschedule_delay = "0.5s"
+deploy_time = 0
+run_time = 0
+
+class Link(ResourceManager):
+ _rtype = "dummy::Link"
+ def do_deploy(self):
+ time.sleep(deploy_time)
+ super(Link, self).do_deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Interface(ResourceManager):
+ _rtype = "dummy::Interface"
+
+ def do_deploy(self):
+ node = self.get_connected(Node.get_rtype())[0]
+ link = self.get_connected(Link.get_rtype())[0]
+
+ if node.state < ResourceState.READY or \
+ link.state < ResourceState.READY:
+ self.ec.schedule(reschedule_delay, self.deploy)
+ self.logger.debug(" -------- RESCHEDULING ------- ")
+ else:
+ time.sleep(deploy_time)
+ super(Interface, self).do_deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Node(ResourceManager):
+ _rtype = "dummy::Node"
+
+ def do_deploy(self):
+ self.logger.debug(" -------- DO_DEPLOY ------- ")
+ time.sleep(deploy_time)
+ super(Node, self).do_deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+class Application(ResourceManager):
+ _rtype = "dummy::Application"
+
+ def do_deploy(self):
+ node = self.get_connected(Node.get_rtype())[0]
+
+ if node.state < ResourceState.READY:
+ self.ec.schedule(reschedule_delay, self.deploy)
+ self.logger.debug(" -------- RESCHEDULING ------- ")
+ else:
+ time.sleep(deploy_time)
+ super(Application, self).do_deploy()
+ self.logger.debug(" -------- DEPLOYED ------- ")
+
+ def do_start(self):
+ super(Application, self).do_start()
+ time.sleep(run_time)
+ self.ec.schedule("0s", self.stop)
+
+ResourceFactory.register_type(Application)
+ResourceFactory.register_type(Node)
+ResourceFactory.register_type(Interface)
+ResourceFactory.register_type(Link)
+
+class RunnerTestCase(unittest.TestCase):
+ def test_runner_max_runs(self):
+ node_count = 4
+ app_count = 2
+
+ ec = ExperimentController(exp_id = "max-runs-test")
+
+ # Add simulated nodes and applications
+ nodes = list()
+ apps = list()
+ ifaces = list()
+
+ for i in xrange(node_count):
+ node = ec.register_resource("dummy::Node")
+ nodes.append(node)
+
+ iface = ec.register_resource("dummy::Interface")
+ ec.register_connection(node, iface)
+ ifaces.append(iface)
+
+ for i in xrange(app_count):
+ app = ec.register_resource("dummy::Application")
+ ec.register_connection(node, app)
+ apps.append(app)
+
+ link = ec.register_resource("dummy::Link")
+
+ for iface in ifaces:
+ ec.register_connection(link, iface)
+
+ rnr = ExperimentRunner()
+ runs = rnr.run(ec, min_runs = 5, max_runs = 10, wait_guids = apps,
+ wait_time = 0)
+
+ self.assertEquals(runs, 10)
+
+ def test_runner_convergence(self):
+ node_count = 4
+ app_count = 2
+
+ ec = ExperimentController(exp_id = "convergence-test")
+
+ # Add simulated nodes and applications
+ nodes = list()
+ apps = list()
+ ifaces = list()
+
+ for i in xrange(node_count):
+ node = ec.register_resource("dummy::Node")
+ nodes.append(node)
+
+ iface = ec.register_resource("dummy::Interface")
+ ec.register_connection(node, iface)
+ ifaces.append(iface)
+
+ for i in xrange(app_count):
+ app = ec.register_resource("dummy::Application")
+ ec.register_connection(node, app)
+ apps.append(app)
+
+ link = ec.register_resource("dummy::Link")
+
+ for iface in ifaces:
+ ec.register_connection(link, iface)
+
+ samples = [10, 10, 10, 10, 12, 10, 12, 10, 10, 11]
+
+ def compute_metric_callback(samples, ec, run):
+ return samples[run-1]
+
+ metric_callback = functools.partial(compute_metric_callback, samples)
+
+ rnr = ExperimentRunner()
+ runs = rnr.run(ec, min_runs = 5,
+ compute_metric_callback = metric_callback,
+ wait_guids = apps,
+ wait_time = 0)
+
+ self.assertEquals(runs, 10)
+
+if __name__ == '__main__':
+ unittest.main()
+