From: Alina Quereilhac Date: Fri, 8 Aug 2014 17:36:34 +0000 (+0200) Subject: Commiting improvements to Collector. Local_dir added to ExperimentController X-Git-Tag: nepi-3.2.0~99^2~6 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=bac63fdc5983e2ade1902f711c1e7899d82ca4ae Commiting improvements to Collector. Local_dir added to ExperimentController --- diff --git a/examples/ccn_flooding/flooding.py b/examples/ccn_flooding/flooding.py new file mode 100644 index 00000000..c35d7ef4 --- /dev/null +++ b/examples/ccn_flooding/flooding.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python + +############################################################################### +# +# NEPI, a framework to manage network experiments +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# +############################################################################### + +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceState, ResourceAction +from nepi.util.netgraph import NetGraph, TopologyType + +import os +import tempfile + +from dminer import ccn + +PL_NODES = dict({ + "0": "iraplab1.iralab.uni-karlsruhe.de", + "1": "planetlab1.informatik.uni-goettingen.de", + "2": "dfn-ple1.x-win.dfn.de", + "3": "mars.planetlab.haw-hamburg.de", + "4": "planetlab2.unineuchatel.ch", + "5": "planetlab-node3.it-sudparis.eu", + "6": "planetlab2.extern.kuleuven.be", + "7": "node2pl.planet-lab.telecom-lille1.eu", + "8": "planetvs2.informatik.uni-stuttgart.de", + "9": "planetlab1.informatik.uni-wuerzburg.de", + "10": "planet1.l3s.uni-hannover.de", + "11": "planetlab1.wiwi.hu-berlin.de", + "12": "pl2.uni-rostock.de", + "13": "planetlab1.u-strasbg.fr", + "14": "peeramidion.irisa.fr" + }) + +pl_slice = os.environ.get("PL_SLICE") +pl_user = os.environ.get("PL_USER") +pl_password = os.environ.get("PL_PASS") +pl_ssh_key = os.environ.get("PL_SSHKEY") + +content_name = "ccnx:/test/bunny.ts" + +pipeline = 4 # Default value for ccncat + +operating_system = "f14" + +country = "germany" + +repofile = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "..", "repoFile1.0.8.2") + +def add_collector(ec, trace_name, store_dir, sub_dir, rename = None): + collector = ec.register_resource("Collector") + ec.set(collector, "traceName", trace_name) + ec.set(collector, "storeDir", store_dir) + ec.set(collector, "subDir", sub_dir) + if rename: + ec.set(collector, "rename", rename) + + return collector + +def add_node(ec, n): + hostname = PL_NODES[n] + + node = ec.register_resource("PlanetlabNode") + ec.set(node, "hostname", hostname) + ec.set(node, "username", username) + ec.set(node, "identity", identity) + ec.set(node, "pluser", pl_user) + ec.set(node, "plpassword", pl_password) + #ec.set(node, "country", country) + #ec.set(node, "operatingSystem", operating_system) + ec.set(node, "cleanExperiment", True) + ec.set(node, "cleanProcesses", True) + + return node + +def add_ccnd(ec, node, n): + global PL_NODES + + ccnd = ec.register_resource("LinuxCCND") + ec.set(ccnd, "debug", 7) + ec.register_connection(ccnd, node) + + # collector for ccnd trace + hostname = PL_NODES[n] + collector = add_collector(ec, "stderr", hostname, "log") + ec.register_connection(collector, ccnd) + + PL_NODES[n] = (hostname, node, ccnd) + return ccnd + +def add_ccnr(ec, ccnd): + ccnr = ec.register_resource("LinuxCCNR") + + ec.set(ccnr, "repoFile1", repofile) + ec.register_connection(ccnr, ccnd) + + return ccnr + +def add_fib_entry(ec, n1, n2): + (hostname1, node1, ccnd1) = PL_NODES[n1] + (hostname2, node2, ccnd2) = PL_NODES[n2] + + entry = ec.register_resource("LinuxFIBEntry") + ec.set(entry, "host", peer_host) + + ec.register_connection(entry, ccnd1) + + ec.enable_trace(entry, "ping") + collector = add_collector(ec, "ping", hostname2) + ec.register_connection(collector, entry) + + return entry + +def add_ccncat(ec, ccnd): + ccncat = ec.register_resource("LinuxCCNCat") + ec.set(ccncat, "pipeline", pipeline) + ec.set(ccncat, "contentName", content_name) + ec.register_connection(ccncat, ccnd) + + return ccncat + +def compute_metric_callback(ec, run): + ## Process logs and analyse data + try: + graph = ccn.parse_ccndlogs(graph = graph, + parse_ping_logs = True) + except: + print "Skipping: Error parsing ccnd logs", run_dir + raise + + source = ccn.consumers(graph)[0] + target = ccn.producers(graph)[0] + + # Process the data from the ccnd logs, but do not re compute + # the link delay. + try: + (content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = ccn.process_ccn_data(graph, source) + except: + print "Skipping: Error processing ccn data", run_dir + raise + + # Compute the shortest path + shortest_path = ccn.shortest_path(graph, source, target) + + # Compute the load coefficient + lcoeff = ccn.load_coefficient(graph, shortest_path, content_names) + + return lcoeff + +if __name__ == '__main__': + + #### Generate a LADDER network topology + net_graph = NetGraph(topo_type = TopologyType.LADDER, + node_count = 6, + assign_st = True, + assign_ips = True) + + target = net_graph.targets()[0] + source = net_graph.sources()[0] + + wait_guids = [] + + #### Create NEPI Experiment Description (EC) + ec = ExperimentController(exp_id) + + ### Add CCN nodes to the (EC) + for n in graph.nodes(): + node = add_node(ec, n) + ccnd = add_ccnd(ec, node, n) + + if n == target: + ccnr = add_ccnr(ec, ccnd) + + ## Add content retrival application + if n == source: + ccncat = add_ccncat(ec, ccnd) + wait_guids.append(ccncat) + + #### Add connections between CCN nodes + for n1, n2 in graph.edges(): + add_fib_entry(ec, n1, n2) + add_fib_entry(ec, n2, n1) + + #### Define the callback to compute experiment metric + metric_callback = functools.partial(compute_metric_callback, ping) + + #### Run experiment until metric convergence + rnr = ExperimentRunner() + + runs = rnr.run(ec, min_runs = 10, max_runs = 300 + compute_metric_callback = metric_callback, + wait_guids = wait_guids, + wait_time = 0) + diff --git a/examples/ccn_flooding/repoFile1.0.8.2 b/examples/ccn_flooding/repoFile1.0.8.2 new file mode 100644 index 00000000..a90b0d96 Binary files /dev/null and b/examples/ccn_flooding/repoFile1.0.8.2 differ diff --git a/examples/linux/ccn/ccncat_extended_ring_topo.py b/examples/linux/ccn/ccncat_extended_ring_topo.py index a67075b3..af823a26 100644 --- a/examples/linux/ccn/ccncat_extended_ring_topo.py +++ b/examples/linux/ccn/ccncat_extended_ring_topo.py @@ -92,10 +92,9 @@ def add_stream(ec, ccnd, content_name): return app -def add_collector(ec, trace_name, store_dir): +def add_collector(ec, trace_name): collector = ec.register_resource("Collector") ec.set(collector, "traceName", trace_name) - ec.set(collector, "storeDir", store_dir) return collector @@ -136,7 +135,7 @@ if __name__ == '__main__': ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options() - ec = ExperimentController(exp_id = exp_id) + ec = ExperimentController(exp_id = exp_id, local_dir = results_dir) # hosts in the US #host1 = "planetlab4.wail.wisc.edu" @@ -214,7 +213,7 @@ if __name__ == '__main__': app, ResourceState.STARTED, time = "10s") # Register a collector to automatically collect traces - collector = add_collector(ec, "stderr", results_dir) + collector = add_collector(ec, "stderr") for ccnd in ccnds.values(): ec.register_connection(collector, ccnd) diff --git a/examples/linux/ccn/two_nodes_file_retrieval.py b/examples/linux/ccn/two_nodes_file_retrieval.py index 8e40c3bd..10814ae7 100644 --- a/examples/linux/ccn/two_nodes_file_retrieval.py +++ b/examples/linux/ccn/two_nodes_file_retrieval.py @@ -34,8 +34,10 @@ import os ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>> ssh_user = ####### <<< ASSING the SSH username >>> +results_dir = "/tmp/demo_CCN_results" + ## Create the experiment controller -ec = ExperimentController(exp_id = "demo_CCN") +ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir) ## Register node 1 node1 = ec.register_resource("LinuxNode") @@ -113,16 +115,13 @@ ec.register_connection(app, ccnd2) # Register a collector to automatically collect the ccnd logs # to a local directory -results_dir = "/tmp/demo_CCN_results" col1 = ec.register_resource("Collector") ec.set(col1, "traceName", "stderr") -ec.set(col1, "storeDir", results_dir) ec.set(col1, "subDir", hostname1) ec.register_connection(col1, ccnd1) col2 = ec.register_resource("Collector") ec.set(col2, "traceName", "stderr") -ec.set(col2, "storeDir", results_dir) ec.set(col2, "subDir", hostname2) ec.register_connection(col2, ccnd2) diff --git a/examples/planetlab/ccn/two_nodes_file_retrieval.py b/examples/planetlab/ccn/two_nodes_file_retrieval.py index 096268f7..57c58089 100644 --- a/examples/planetlab/ccn/two_nodes_file_retrieval.py +++ b/examples/planetlab/ccn/two_nodes_file_retrieval.py @@ -36,8 +36,11 @@ pl_pass = ######## <<< ASSIGN the password used to login to the PlanetLab websit pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>> slicename = ####### <<< ASSING the PlanetLab slicename >>> +results_dir = "/tmp/demo_CCN_results" + ## Create the experiment controller -ec = ExperimentController(exp_id = "demo_CCN") +ec = ExperimentController(exp_id = "demo_CCN", + local_dir = results_dir) ## Register node 1 node1 = ec.register_resource("PlanetlabNode") @@ -137,16 +140,13 @@ ec.register_connection(app, ccnd2) # Register a collector to automatically collect the ccnd logs # to a local directory -results_dir = "/tmp/demo_CCN_results" col1 = ec.register_resource("Collector") ec.set(col1, "traceName", "stderr") -ec.set(col1, "storeDir", results_dir) ec.set(col1, "subDir", hostname1) ec.register_connection(col1, ccnd1) col2 = ec.register_resource("Collector") ec.set(col2, "traceName", "stderr") -ec.set(col2, "storeDir", results_dir) ec.set(col2, "subDir", hostname2) ec.register_connection(col2, ccnd2) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 2c6557ff..1357e4f7 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -34,6 +34,7 @@ import functools import logging import os import sys +import tempfile import time import threading import weakref @@ -150,7 +151,7 @@ class ExperimentController(object): exp_id, which can be re-used in different ExperimentController, and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. - + """ @classmethod @@ -159,7 +160,22 @@ class ExperimentController(object): ec = serializer.load(filepath) return ec - def __init__(self, exp_id = None): + def __init__(self, exp_id = None, local_dir = None, persist = False): + """ ExperimentController entity to model an execute a network experiment. + + :param exp_id: Human readable name to identify the experiment + :type name: str + + :param local_dir: Path to local directory where to store experiment + related files + :type name: str + + :param persist: Save an XML description of the experiment after + completion at local_dir + :type name: bool + + """ + super(ExperimentController, self).__init__() # Logging @@ -177,6 +193,17 @@ class ExperimentController(object): # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.mkdtemp() + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist + # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -261,7 +288,36 @@ class ExperimentController(object): """ return self._nthreads - + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If Trie persist the ExperimentController to XML format upon completion + + """ + return self._persist + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -942,6 +998,9 @@ class ExperimentController(object): self.wait_released(guids) + if self.persist: + self.save(dirpath = self.run_dir) + for guid in guids: if self.get(guid, "hardRelease"): self.remove_resource(guid) diff --git a/src/nepi/execution/runner.py b/src/nepi/execution/runner.py index 60e75a4f..75e766f4 100644 --- a/src/nepi/execution/runner.py +++ b/src/nepi/execution/runner.py @@ -40,26 +40,21 @@ class ExperimentRunner(object): """ Re-runs a same experiment multiple times :param ec: Experiment description of experiment to run - :type name: ExperimentController - :rtype: EperimentController + :type ec: ExperimentController :param min_runs: Minimum number of repetitions for experiment - :type name: int - :rtype: int + :type min_runs: int :param max_runs: Maximum number of repetitions for experiment - :type name: int - :rtype: int + :type max_runs: int :param wait_time: Time to wait in seconds between invoking ec.deploy() and ec.release() - :type name: float - :rtype: float + :type wait_time: float :param wait_guids: List of guids to pass to ec.wait_finished after invoking ec.deploy() - :type name: list - :rtype: list of int + :type wait_guids: list :param compute_metric_callback: function to invoke after each experiment run, to compute an experiment metric. @@ -68,8 +63,7 @@ class ExperimentRunner(object): metric = compute_metric_callback(ec, run) - :type name: function - :rtype: function + :type compute_metric_callback: function :param evaluate_convergence_callback: function to evaluate whether the collected metric samples have converged and the experiment runner @@ -81,8 +75,7 @@ class ExperimentRunner(object): If stop is True, then the runner will exit. - :type name: function - :rtype: function + :type evaluate_convergence_callback: function """ @@ -96,11 +89,8 @@ class ExperimentRunner(object): "Experiment will stop when the standard error with 95% " "confidence interval is >= 5% of the mean of the collected samples ") - # Set useRunId = True in Collectors to make sure results are - # independently stored. - collectors = ec.get_resources_by_type("Collector") - for collector in collectors: - collector.set("useRunId", True) + # Force persistence of experiment controller + ec._persist = True dirpath = tempfile.mkdtemp() filepath = ec.save(dirpath) diff --git a/src/nepi/resources/all/collector.py b/src/nepi/resources/all/collector.py index af0811cf..792e25fb 100644 --- a/src/nepi/resources/all/collector.py +++ b/src/nepi/resources/all/collector.py @@ -51,18 +51,6 @@ class Collector(ResourceManager): "Name of the trace to be collected", flags = Flags.Design) - store_dir = Attribute("storeDir", - "Path to local directory to store trace results", - default = tempfile.gettempdir(), - flags = Flags.Design) - - use_run_id = Attribute("useRunId", - "If set to True stores traces into a sub directory named after " - "the RUN ID assigned by the EC", - type = Types.Bool, - default = False, - flags = Flags.Design) - sub_dir = Attribute("subDir", "Sub directory to collect traces into", flags = Flags.Design) @@ -72,10 +60,8 @@ class Collector(ResourceManager): flags = Flags.Design) cls._register_attribute(trace_name) - cls._register_attribute(store_dir) cls._register_attribute(sub_dir) cls._register_attribute(rename) - cls._register_attribute(use_run_id) def __init__(self, ec, guid): super(Collector, self).__init__(ec, guid) @@ -94,17 +80,14 @@ class Collector(ResourceManager): self.error(msg) raise RuntimeError, msg - self._store_path = self.get("storeDir") - - if self.get("useRunId"): - self._store_path = os.path.join(self._store_path, self.ec.run_id) + self._store_path = self.ec.run_dir subdir = self.get("subDir") if subdir: - self._store_path = os.path.join(self._store_path, subdir) + self._store_path = os.path.join(self.store_path, subdir) msg = "Creating local directory at %s to store %s traces " % ( - self._store_path, trace_name) + self.store_path, trace_name) self.info(msg) try: diff --git a/src/nepi/util/netgraph.py b/src/nepi/util/netgraph.py new file mode 100644 index 00000000..c3ba8144 --- /dev/null +++ b/src/nepi/util/netgraph.py @@ -0,0 +1,286 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import ipaddr +import networkx +import random + +class TopologyType: + LINEAR = "linear" + LADDER = "ladder" + MESH = "mesh" + TREE = "tree" + STAR = "star" + ADHOC = "adhoc" + +## TODO: +## - AQ: Add support for hypergraphs (to be able to add hyper edges to +## model CSMA or wireless networks) + +class NetGraph(object): + """ NetGraph represents a network topology. + Network graphs are internally using the networkx library. + + """ + + def __init__(self, *args, **kwargs): + """ A graph can be generated using a specified pattern + (LADDER, MESH, TREE, etc), or provided as an argument. + + :param graph: Undirected graph to use as internal representation + :type graph: networkx.Graph + + :param topo_type: One of TopologyType.{LINEAR,LADDER,MESH,TREE,STAR} + used to automatically generate the topology graph. + :type topo_type: TopologyType + + :param node_count: Number of nodes in the topology to be generated. + :type node_count: int + + :param branches: Number of branches (arms) for the STAR topology. + :type branches: int + + + :param assign_ips: Automatically assign IP addresses to each node. + :type assign_ips: bool + + :param network: Base network segment for IP address assignment. + :type network: str + + :param prefix: Base network prefix for IP address assignment. + :type prefix: int + + :param version: IP version for IP address assignment. + :type version: int + + + :param assign_st: Select source and target nodes on the graph. + :type assign_st: bool + + NOTE: Only point-to-point like network topologies are supported for now. + (Wireless and Ethernet networks were several nodes share the same + edge (hyperedge) can not be modeled for the moment). + + """ + self._graph = kwargs.get("graph") + self._topo_type = TopologyType.ADHOC + + if not self._graph and kwargs.get("topo_type") and \ + kwargs.get("node_count"): + topo_type = kwargs["topo_type"] + node_count = kwargs["node_count"] + branches = kwargs.get("branches") + + self._topo_type = topo_type + self._graph = self.generate_grap(topo_type, node_count, + branches = branches) + + if kwargs.get("assign_ips"): + network = kwargs.get("network", "10.0.0.0") + prefix = kwargs.get("prefix", 8) + version = kwargs.get("version", 4) + + self.assign_p2p_ips(self, network = network, prefix = prefix, + version = version) + + if kwargs.get("assign_st"): + self.select_target_zero() + self.select_random_leaf_source() + + @property + def graph(self): + return self._graph + + @property + def topo_type(self): + return self._topo_type + + @property + def order(self): + return self.graph.order() + + @property + def nodes(self): + return self.graph.nodes() + + @property + def edges(self): + return self.graph.edges() + + def generate_graph(self, topo_type, node_count, branches = None): + if topo_type == LADDER: + total_nodes = node_count/2 + graph = networkx.ladder_graph(total_nodes) + + elif topo_type == LINEAR: + graph = networkx.path_graph(node_count) + + elif topo_type == MESH: + graph = networkx.complete_graph(node_count) + + elif topo_type == TREE: + h = math.log(node_count + 1)/math.log(2) - 1 + graph = networkx.balanced_tree(2, h) + + elif topo_type == STAR: + graph = networkx.Graph() + graph.add_node(0) + + nodesinbranch = (node_count - 1)/ BRANCHES + c = 1 + + for i in xrange(BRANCHES): + prev = 0 + for n in xrange(1, nodesinbranch + 1): + graph.add_node(c) + graph.add_edge(prev, c) + prev = c + c += 1 + + # node ids are int, make them str + g = networkx.Graph() + g.add_nodes_from(map(lambda nid: NODES[str(nid)], + graph.nodes())) + g.add_edges_from(map(lambda t: (NODES[str(t[0])], NODES[str(t[1])]), + graph.edges())) + + return g + + def add_node(self, nid): + nid = str(nid) + + if nid not in self.graph: + self.graph.add_node(nid) + + def add_edge(self, nid1, nid2): + nid1 = str(nid1) + nid2 = str(nid2) + + self.add_node(nid1) + self.add_node( nid2) + + if nid1 not in self.graph[nid2]: + self.graph.add_edge(nid2, nid1) + + # The weight of the edge is the delay of the link + self.graph.edge[nid1][nid2]["weight"] = None + # confidence interval of the mean RTT + self.graph.edge[nid1][nid2]["weight_ci"] = None + + def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4): + """ Assign IP addresses to each end of each edge of the network graph, + computing all the point to point subnets and addresses in the network + representation. + + :param network: Base network address used for subnetting. + :type network: str + + :param prefix: Prefix for the base network address used for subnetting. + :type prefixt: int + + :param version: IP version (either 4 or 6). + :type version: int + + """ + if len(networkx.connected_components(self.graph)) > 1: + raise RuntimeError("Disconnected graph!!") + + # Assign IP addresses to host + netblock = "%s/%d" % (network, prefix) + if version == 4: + net = ipaddr.IPv4Network(netblock) + new_prefix = 31 + elif version == 6: + net = ipaddr.IPv6Network(netblock) + new_prefix = 31 + else: + raise RuntimeError, "Invalid IP version %d" % version + + sub_itr = net.iter_subnets(new_prefix = new_prefix) + + for nid1, nid2 in self.graph.edges(): + #### Compute subnets for each link + + # get a subnet of base_add with prefix /30 + subnet = sub_itr.next() + mask = subnet.netmask.exploded + network = subnet.network.exploded + prefixlen = subnet.prefixlen + + # get host addresses in that subnet + i = subnet.iterhosts() + addr1 = i.next() + addr2 = i.next() + + ip1 = addr1.exploded + ip2 = addr2.exploded + self.graph.edge[nid1][nid2]["net"] = dict() + self.graph.edge[nid1][nid2]["net"][nid1] = ip1 + self.graph.edge[nid1][nid2]["net"][nid2] = ip2 + self.graph.edge[nid1][nid2]["net"]["mask"] = mask + self.graph.edge[nid1][nid2]["net"]["network"] = mask + self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen + + def get_p2p_info(self, nid1, nid2): + net = self.graph.edge[nid1][nid2]["net"] + return ( net[nid1], net[nid2], net["mask"], net["network"], + net["prefixlen"] ) + + def set_source(self, nid): + graph.node[nid]["source"] = True + + def set_target(self, nid): + graph.node[nid]["target"] = True + + def targets(self): + """ Returns the nodes that are targets """ + return [nid for nid in self.graph.nodes() \ + if self.graph.node[nid].get("target")] + + def sources(self): + """ Returns the nodes that are sources """ + return [nid for nid in self.graph.nodes() \ + if self.graph.node[nid].get("sources")] + + def select_target_zero(self): + """ Marks the node 0 as target + """ + self.set_target("0") + + def select_random_leaf_source(self): + """ Marks a random leaf node as source. + """ + + # The ladder is a special case because is not symmetric. + if self.topo_type == TopologyType.LADDER: + total_nodes = self.order/2 + leaf1 = str(total_nodes - 1) + leaf2 = str(nodes - 1) + leaves = [leaf1, leaf2] + source = leaves.pop(random.randint(0, len(leaves) - 1)) + else: + # options must not be already sources or targets + options = [ k for k,v in graph.degree().iteritems() \ + if v == 1 and not graph.node[k].get("source") \ + and not graph.node[k].get("target")] + + source = options.pop(random.randint(0, len(options) - 1)) + + self.set_source(source) + diff --git a/src/nepi/util/parsers/xml_parser.py b/src/nepi/util/parsers/xml_parser.py index d5bc8a3f..125360e3 100644 --- a/src/nepi/util/parsers/xml_parser.py +++ b/src/nepi/util/parsers/xml_parser.py @@ -91,6 +91,8 @@ class ECXMLParser(object): ecnode.setAttribute("exp_id", xmlencode(ec.exp_id)) ecnode.setAttribute("run_id", xmlencode(ec.run_id)) ecnode.setAttribute("nthreads", xmlencode(ec.nthreads)) + ecnode.setAttribute("local_dir", xmlencode(ec.local_dir)) + ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir)) doc.appendChild(ecnode) for guid, rm in ec._resources.iteritems(): @@ -188,10 +190,11 @@ class ECXMLParser(object): if ecnode.nodeType == doc.ELEMENT_NODE: exp_id = xmldecode(ecnode.getAttribute("exp_id")) run_id = xmldecode(ecnode.getAttribute("run_id")) + local_dir = xmldecode(ecnode.getAttribute("local_dir")) nthreads = xmldecode(ecnode.getAttribute("nthreads")) os.environ["NEPI_NTHREADS"] = nthreads - ec = ExperimentController(exp_id = exp_id) + ec = ExperimentController(exp_id = exp_id, local_dir = local_dir) connections = set() diff --git a/test/resources/linux/multirun.py b/test/resources/linux/multirun.py index d548bd2c..dbee7df8 100755 --- a/test/resources/linux/multirun.py +++ b/test/resources/linux/multirun.py @@ -49,7 +49,8 @@ class LinuxMultiRunTestCase(unittest.TestCase): dirpath = tempfile.mkdtemp() - ec = ExperimentController(exp_id="test-condition-multirun") + ec = ExperimentController(exp_id = "test-condition-multirun", + local_dir = dirpath) node = ec.register_resource("LinuxNode") ec.set(node, "hostname", host) @@ -63,8 +64,6 @@ class LinuxMultiRunTestCase(unittest.TestCase): collector = ec.register_resource("Collector") ec.set(collector, "traceName", "stdout") - ec.set(collector, "storeDir", dirpath) - ec.set(collector, "useRunId", True) ec.register_connection(ping, collector) def compute_metric_callback(ping, ec, run): @@ -87,8 +86,9 @@ class LinuxMultiRunTestCase(unittest.TestCase): self.assertTrue(runs >= 5) dircount = 0 - for d in os.listdir(dirpath): - path = os.path.join(dirpath, d) + + for d in os.listdir(ec.exp_dir): + path = os.path.join(ec.exp_dir, d) if os.path.isdir(path): dircount += 1 logs = glob.glob(os.path.join(path, "*.stdout"))