From: Alina Quereilhac Date: Sun, 3 Aug 2014 14:56:31 +0000 (+0200) Subject: Adding the ExperimentRunner X-Git-Tag: nepi-3.2.0~105^2~1 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=6c439bea6cf4d6af7512fc746dca75118bf39d39 Adding the ExperimentRunner --- diff --git a/src/nepi/execution/runner.py b/src/nepi/execution/runner.py new file mode 100644 index 00000000..6757d7b5 --- /dev/null +++ b/src/nepi/execution/runner.py @@ -0,0 +1,160 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.ec import ExperimentController + +import math +import numpy +import os +import tempfile +import time + +class ExperimentRunner(object): + """ The ExperimentRunner entity is reponsible of + re-running an experiment described by an ExperimentController + multiple time. + + """ + def __init__(self): + super(ExperimentRunner, self).__init__() + + def run(self, ec, min_runs = 1, max_runs = -1, wait_time = 0, + wait_guids = [], compute_metric_callback = None, + evaluate_convergence_callback = None ): + """ Re-runs a same experiment multiple times + + :param ec: Experiment description of experiment to run + :type name: ExperimentController + :rtype: EperimentController + + :param min_runs: Minimum number of repetitions for experiment + :type name: int + :rtype: int + + :param max_runs: Maximum number of repetitions for experiment + :type name: int + :rtype: int + + :param wait_time: Time to wait in seconds between invoking + ec.deploy() and ec.release() + :type name: float + :rtype: float + + :param wait_guids: List of guids to pass to ec.wait_finished + after invoking ec.deploy() + :type name: list + :rtype: list of int + + :param compute_metric_callback: function to invoke after each + experiment run, to compute an experiment metric. + It will be invoked with the ec and the run count as arguments, + and it must return the value of the computed metric: + + metric = compute_metric_callback(ec, run) + + :type name: function + :rtype: function + + :param evaluate_convergence_callback: function to evaluate whether the + collected metric samples have converged and the experiment runner + can stop. It will be invoked with the ec, the run count and the + list of collected metric samples as argument, and it must return + either True or False: + + stop = evaluate_convergence_callback(ec, run, metrics) + + If stop is True, then the runner will exit. + + :type name: function + :rtype: function + + """ + + if (not max_runs or max_runs < 0) and not compute_metric_callback: + msg = "Undefined STOP condition, set stop_callback or max_runs" + raise RuntimeError, msg + + if compute_metric_callback and not evaluate_convergence_callback: + evaluate_convergence_callback = self.evaluate_normal_convergence + ec.logger.info(" Treating data as normal to evaluate convergence. " + "Experiment will stop when the standard error with 95% " + "confidence interval is >= 5% of the mean of the collected samples ") + + # Set useRunId = True in Collectors to make sure results are + # independently stored. + collectors = ec.get_resources_by_type("Collector") + for collector in collectors: + collector.set("useRunId", True) + + dirpath = tempfile.mkdtemp() + filepath = ec.save(dirpath) + + samples = [] + run = 0 + while True: + run += 1 + + ec = self.run_experiment(filepath, wait_time, wait_guids) + + ec.logger.info(" RUN %d \n" % run) + + if run >= min_runs and max_runs > -1 and run >= max_runs : + break + + if compute_metric_callback: + metric = compute_metric_callback(ec, run) + samples.append(metric) + + if run >= min_runs and evaluate_convergence_callback: + if evaluate_convergence_callback(ec, run, samples): + break + del ec + + return run + + def evaluate_normal_convergence(self, ec, run, samples): + if len(samples) == 0: + msg = "0 samples collected" + raise RuntimeError, msg + + x = numpy.array(samples) + n = len(samples) + std = x.std() + se = std / math.sqrt(n) + m = x.mean() + se95 = se * 2 + + ec.logger.info(" RUN %d - SAMPLES %d MEAN %.2f STD %.2f SE95%% %.2f \n" % ( + run, n, m, std, se95 ) ) + + return m * 0.05 >= se95 + + def run_experiment(self, filepath, wait_time, wait_guids): + ec = ExperimentController.load(filepath) + + ec.deploy() + + ec.wait_finished(wait_guids) + time.sleep(wait_time) + + ec.release() + + return ec + + diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index bb8c1c8d..af0811cf 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -28,7 +28,7 @@ import tempfile @clsinit_copy class Collector(ResourceManager): - """ The collector is reponsible of collecting traces + """ The collector entity is reponsible of collecting traces of a same type associated to RMs into a local directory. .. class:: Class Args : diff --git a/test/execution/runner.py b/test/execution/runner.py new file mode 100755 index 00000000..b06d3832 --- /dev/null +++ b/test/execution/runner.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceManager, ResourceState, \ + clsinit_copy, ResourceAction, ResourceFactory +from nepi.execution.runner import ExperimentRunner + +import functools +import os +import shutil +import tempfile +import time +import unittest + +reschedule_delay = "0.5s" +deploy_time = 0 +run_time = 0 + +class Link(ResourceManager): + _rtype = "dummy::Link" + def do_deploy(self): + time.sleep(deploy_time) + super(Link, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Interface(ResourceManager): + _rtype = "dummy::Interface" + + def do_deploy(self): + node = self.get_connected(Node.get_rtype())[0] + link = self.get_connected(Link.get_rtype())[0] + + if node.state < ResourceState.READY or \ + link.state < ResourceState.READY: + self.ec.schedule(reschedule_delay, self.deploy) + self.logger.debug(" -------- RESCHEDULING ------- ") + else: + time.sleep(deploy_time) + super(Interface, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Node(ResourceManager): + _rtype = "dummy::Node" + + def do_deploy(self): + self.logger.debug(" -------- DO_DEPLOY ------- ") + time.sleep(deploy_time) + super(Node, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Application(ResourceManager): + _rtype = "dummy::Application" + + def do_deploy(self): + node = self.get_connected(Node.get_rtype())[0] + + if node.state < ResourceState.READY: + self.ec.schedule(reschedule_delay, self.deploy) + self.logger.debug(" -------- RESCHEDULING ------- ") + else: + time.sleep(deploy_time) + super(Application, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + + def do_start(self): + super(Application, self).do_start() + time.sleep(run_time) + self.ec.schedule("0s", self.stop) + +ResourceFactory.register_type(Application) +ResourceFactory.register_type(Node) +ResourceFactory.register_type(Interface) +ResourceFactory.register_type(Link) + +class RunnerTestCase(unittest.TestCase): + def test_runner_max_runs(self): + node_count = 4 + app_count = 2 + + ec = ExperimentController(exp_id = "max-runs-test") + + # Add simulated nodes and applications + nodes = list() + apps = list() + ifaces = list() + + for i in xrange(node_count): + node = ec.register_resource("dummy::Node") + nodes.append(node) + + iface = ec.register_resource("dummy::Interface") + ec.register_connection(node, iface) + ifaces.append(iface) + + for i in xrange(app_count): + app = ec.register_resource("dummy::Application") + ec.register_connection(node, app) + apps.append(app) + + link = ec.register_resource("dummy::Link") + + for iface in ifaces: + ec.register_connection(link, iface) + + rnr = ExperimentRunner() + runs = rnr.run(ec, min_runs = 5, max_runs = 10, wait_guids = apps, + wait_time = 0) + + self.assertEquals(runs, 10) + + def test_runner_convergence(self): + node_count = 4 + app_count = 2 + + ec = ExperimentController(exp_id = "convergence-test") + + # Add simulated nodes and applications + nodes = list() + apps = list() + ifaces = list() + + for i in xrange(node_count): + node = ec.register_resource("dummy::Node") + nodes.append(node) + + iface = ec.register_resource("dummy::Interface") + ec.register_connection(node, iface) + ifaces.append(iface) + + for i in xrange(app_count): + app = ec.register_resource("dummy::Application") + ec.register_connection(node, app) + apps.append(app) + + link = ec.register_resource("dummy::Link") + + for iface in ifaces: + ec.register_connection(link, iface) + + samples = [10, 10, 10, 10, 12, 10, 12, 10, 10, 11] + + def compute_metric_callback(samples, ec, run): + return samples[run-1] + + metric_callback = functools.partial(compute_metric_callback, samples) + + rnr = ExperimentRunner() + runs = rnr.run(ec, min_runs = 5, + compute_metric_callback = metric_callback, + wait_guids = apps, + wait_time = 0) + + self.assertEquals(runs, 10) + +if __name__ == '__main__': + unittest.main() +