--- /dev/null
+#!/usr/bin/env python
+
+###############################################################################
+#
+# NEPI, a framework to manage network experiments
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceState, ResourceAction
+from nepi.util.netgraph import NetGraph, TopologyType
+
+import os
+import tempfile
+
+from dminer import ccn
+
+PL_NODES = dict({
+ "0": "iraplab1.iralab.uni-karlsruhe.de",
+ "1": "planetlab1.informatik.uni-goettingen.de",
+ "2": "dfn-ple1.x-win.dfn.de",
+ "3": "mars.planetlab.haw-hamburg.de",
+ "4": "planetlab2.unineuchatel.ch",
+ "5": "planetlab-node3.it-sudparis.eu",
+ "6": "planetlab2.extern.kuleuven.be",
+ "7": "node2pl.planet-lab.telecom-lille1.eu",
+ "8": "planetvs2.informatik.uni-stuttgart.de",
+ "9": "planetlab1.informatik.uni-wuerzburg.de",
+ "10": "planet1.l3s.uni-hannover.de",
+ "11": "planetlab1.wiwi.hu-berlin.de",
+ "12": "pl2.uni-rostock.de",
+ "13": "planetlab1.u-strasbg.fr",
+ "14": "peeramidion.irisa.fr"
+ })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "..", "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, store_dir, sub_dir, rename = None):
+ collector = ec.register_resource("Collector")
+ ec.set(collector, "traceName", trace_name)
+ ec.set(collector, "storeDir", store_dir)
+ ec.set(collector, "subDir", sub_dir)
+ if rename:
+ ec.set(collector, "rename", rename)
+
+ return collector
+
+def add_node(ec, n):
+ hostname = PL_NODES[n]
+
+ node = ec.register_resource("PlanetlabNode")
+ ec.set(node, "hostname", hostname)
+ ec.set(node, "username", username)
+ ec.set(node, "identity", identity)
+ ec.set(node, "pluser", pl_user)
+ ec.set(node, "plpassword", pl_password)
+ #ec.set(node, "country", country)
+ #ec.set(node, "operatingSystem", operating_system)
+ ec.set(node, "cleanExperiment", True)
+ ec.set(node, "cleanProcesses", True)
+
+ return node
+
+def add_ccnd(ec, node, n):
+ global PL_NODES
+
+ ccnd = ec.register_resource("LinuxCCND")
+ ec.set(ccnd, "debug", 7)
+ ec.register_connection(ccnd, node)
+
+ # collector for ccnd trace
+ hostname = PL_NODES[n]
+ collector = add_collector(ec, "stderr", hostname, "log")
+ ec.register_connection(collector, ccnd)
+
+ PL_NODES[n] = (hostname, node, ccnd)
+ return ccnd
+
+def add_ccnr(ec, ccnd):
+ ccnr = ec.register_resource("LinuxCCNR")
+
+ ec.set(ccnr, "repoFile1", repofile)
+ ec.register_connection(ccnr, ccnd)
+
+ return ccnr
+
+def add_fib_entry(ec, n1, n2):
+ (hostname1, node1, ccnd1) = PL_NODES[n1]
+ (hostname2, node2, ccnd2) = PL_NODES[n2]
+
+ entry = ec.register_resource("LinuxFIBEntry")
+ ec.set(entry, "host", peer_host)
+
+ ec.register_connection(entry, ccnd1)
+
+ ec.enable_trace(entry, "ping")
+ collector = add_collector(ec, "ping", hostname2)
+ ec.register_connection(collector, entry)
+
+ return entry
+
+def add_ccncat(ec, ccnd):
+ ccncat = ec.register_resource("LinuxCCNCat")
+ ec.set(ccncat, "pipeline", pipeline)
+ ec.set(ccncat, "contentName", content_name)
+ ec.register_connection(ccncat, ccnd)
+
+ return ccncat
+
+def compute_metric_callback(ec, run):
+ ## Process logs and analyse data
+ try:
+ graph = ccn.parse_ccndlogs(graph = graph,
+ parse_ping_logs = True)
+ except:
+ print "Skipping: Error parsing ccnd logs", run_dir
+ raise
+
+ source = ccn.consumers(graph)[0]
+ target = ccn.producers(graph)[0]
+
+ # Process the data from the ccnd logs, but do not re compute
+ # the link delay.
+ try:
+ (content_names,
+ interest_expiry_count,
+ interest_dupnonce_count,
+ interest_count,
+ content_count) = ccn.process_ccn_data(graph, source)
+ except:
+ print "Skipping: Error processing ccn data", run_dir
+ raise
+
+ # Compute the shortest path
+ shortest_path = ccn.shortest_path(graph, source, target)
+
+ # Compute the load coefficient
+ lcoeff = ccn.load_coefficient(graph, shortest_path, content_names)
+
+ return lcoeff
+
+if __name__ == '__main__':
+
+ #### Generate a LADDER network topology
+ net_graph = NetGraph(topo_type = TopologyType.LADDER,
+ node_count = 6,
+ assign_st = True,
+ assign_ips = True)
+
+ target = net_graph.targets()[0]
+ source = net_graph.sources()[0]
+
+ wait_guids = []
+
+ #### Create NEPI Experiment Description (EC)
+ ec = ExperimentController(exp_id)
+
+ ### Add CCN nodes to the (EC)
+ for n in graph.nodes():
+ node = add_node(ec, n)
+ ccnd = add_ccnd(ec, node, n)
+
+ if n == target:
+ ccnr = add_ccnr(ec, ccnd)
+
+ ## Add content retrival application
+ if n == source:
+ ccncat = add_ccncat(ec, ccnd)
+ wait_guids.append(ccncat)
+
+ #### Add connections between CCN nodes
+ for n1, n2 in graph.edges():
+ add_fib_entry(ec, n1, n2)
+ add_fib_entry(ec, n2, n1)
+
+ #### Define the callback to compute experiment metric
+ metric_callback = functools.partial(compute_metric_callback, ping)
+
+ #### Run experiment until metric convergence
+ rnr = ExperimentRunner()
+
+ runs = rnr.run(ec, min_runs = 10, max_runs = 300
+ compute_metric_callback = metric_callback,
+ wait_guids = wait_guids,
+ wait_time = 0)
+
return app
-def add_collector(ec, trace_name, store_dir):
+def add_collector(ec, trace_name):
collector = ec.register_resource("Collector")
ec.set(collector, "traceName", trace_name)
- ec.set(collector, "storeDir", store_dir)
return collector
( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
- ec = ExperimentController(exp_id = exp_id)
+ ec = ExperimentController(exp_id = exp_id, local_dir = results_dir)
# hosts in the US
#host1 = "planetlab4.wail.wisc.edu"
app, ResourceState.STARTED, time = "10s")
# Register a collector to automatically collect traces
- collector = add_collector(ec, "stderr", results_dir)
+ collector = add_collector(ec, "stderr")
for ccnd in ccnds.values():
ec.register_connection(collector, ccnd)
ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
ssh_user = ####### <<< ASSING the SSH username >>>
+results_dir = "/tmp/demo_CCN_results"
+
## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir)
## Register node 1
node1 = ec.register_resource("LinuxNode")
# Register a collector to automatically collect the ccnd logs
# to a local directory
-results_dir = "/tmp/demo_CCN_results"
col1 = ec.register_resource("Collector")
ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
ec.set(col1, "subDir", hostname1)
ec.register_connection(col1, ccnd1)
col2 = ec.register_resource("Collector")
ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
ec.set(col2, "subDir", hostname2)
ec.register_connection(col2, ccnd2)
pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>>
slicename = ####### <<< ASSING the PlanetLab slicename >>>
+results_dir = "/tmp/demo_CCN_results"
+
## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN",
+ local_dir = results_dir)
## Register node 1
node1 = ec.register_resource("PlanetlabNode")
# Register a collector to automatically collect the ccnd logs
# to a local directory
-results_dir = "/tmp/demo_CCN_results"
col1 = ec.register_resource("Collector")
ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
ec.set(col1, "subDir", hostname1)
ec.register_connection(col1, ccnd1)
col2 = ec.register_resource("Collector")
ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
ec.set(col2, "subDir", hostname2)
ec.register_connection(col2, ccnd2)
import logging
import os
import sys
+import tempfile
import time
import threading
import weakref
exp_id, which can be re-used in different ExperimentController,
and the run_id, which is unique to one ExperimentController instance, and
is automatically generated by NEPI.
-
+
"""
@classmethod
ec = serializer.load(filepath)
return ec
- def __init__(self, exp_id = None):
+ def __init__(self, exp_id = None, local_dir = None, persist = False):
+ """ ExperimentController entity to model an execute a network experiment.
+
+ :param exp_id: Human readable name to identify the experiment
+ :type name: str
+
+ :param local_dir: Path to local directory where to store experiment
+ related files
+ :type name: str
+
+ :param persist: Save an XML description of the experiment after
+ completion at local_dir
+ :type name: bool
+
+ """
+
super(ExperimentController, self).__init__()
# Logging
# resources used, etc)
self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
+ # Local path where to store experiment related files (results, etc)
+ if not local_dir:
+ local_dir = tempfile.mkdtemp()
+
+ self._local_dir = local_dir
+ self._exp_dir = os.path.join(local_dir, self.exp_id)
+ self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+ # If True persist the experiment controller in XML format, after completion
+ self._persist = persist
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
"""
return self._nthreads
-
+ @property
+ def local_dir(self):
+ """ Root local directory for experiment files
+
+ """
+ return self._local_dir
+
+ @property
+ def exp_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment.
+
+ """
+ return self._exp_dir
+
+ @property
+ def run_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment run.
+
+ """
+ return self._run_dir
+
+ @property
+ def persist(self):
+ """ If Trie persist the ExperimentController to XML format upon completion
+
+ """
+ return self._persist
+
@property
def abort(self):
""" Returns True if the experiment has failed and should be interrupted,
self.wait_released(guids)
+ if self.persist:
+ self.save(dirpath = self.run_dir)
+
for guid in guids:
if self.get(guid, "hardRelease"):
self.remove_resource(guid)
""" Re-runs a same experiment multiple times
:param ec: Experiment description of experiment to run
- :type name: ExperimentController
- :rtype: EperimentController
+ :type ec: ExperimentController
:param min_runs: Minimum number of repetitions for experiment
- :type name: int
- :rtype: int
+ :type min_runs: int
:param max_runs: Maximum number of repetitions for experiment
- :type name: int
- :rtype: int
+ :type max_runs: int
:param wait_time: Time to wait in seconds between invoking
ec.deploy() and ec.release()
- :type name: float
- :rtype: float
+ :type wait_time: float
:param wait_guids: List of guids to pass to ec.wait_finished
after invoking ec.deploy()
- :type name: list
- :rtype: list of int
+ :type wait_guids: list
:param compute_metric_callback: function to invoke after each
experiment run, to compute an experiment metric.
metric = compute_metric_callback(ec, run)
- :type name: function
- :rtype: function
+ :type compute_metric_callback: function
:param evaluate_convergence_callback: function to evaluate whether the
collected metric samples have converged and the experiment runner
If stop is True, then the runner will exit.
- :type name: function
- :rtype: function
+ :type evaluate_convergence_callback: function
"""
"Experiment will stop when the standard error with 95% "
"confidence interval is >= 5% of the mean of the collected samples ")
- # Set useRunId = True in Collectors to make sure results are
- # independently stored.
- collectors = ec.get_resources_by_type("Collector")
- for collector in collectors:
- collector.set("useRunId", True)
+ # Force persistence of experiment controller
+ ec._persist = True
dirpath = tempfile.mkdtemp()
filepath = ec.save(dirpath)
"Name of the trace to be collected",
flags = Flags.Design)
- store_dir = Attribute("storeDir",
- "Path to local directory to store trace results",
- default = tempfile.gettempdir(),
- flags = Flags.Design)
-
- use_run_id = Attribute("useRunId",
- "If set to True stores traces into a sub directory named after "
- "the RUN ID assigned by the EC",
- type = Types.Bool,
- default = False,
- flags = Flags.Design)
-
sub_dir = Attribute("subDir",
"Sub directory to collect traces into",
flags = Flags.Design)
flags = Flags.Design)
cls._register_attribute(trace_name)
- cls._register_attribute(store_dir)
cls._register_attribute(sub_dir)
cls._register_attribute(rename)
- cls._register_attribute(use_run_id)
def __init__(self, ec, guid):
super(Collector, self).__init__(ec, guid)
self.error(msg)
raise RuntimeError, msg
- self._store_path = self.get("storeDir")
-
- if self.get("useRunId"):
- self._store_path = os.path.join(self._store_path, self.ec.run_id)
+ self._store_path = self.ec.run_dir
subdir = self.get("subDir")
if subdir:
- self._store_path = os.path.join(self._store_path, subdir)
+ self._store_path = os.path.join(self.store_path, subdir)
msg = "Creating local directory at %s to store %s traces " % (
- self._store_path, trace_name)
+ self.store_path, trace_name)
self.info(msg)
try:
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import ipaddr
+import networkx
+import random
+
+class TopologyType:
+ LINEAR = "linear"
+ LADDER = "ladder"
+ MESH = "mesh"
+ TREE = "tree"
+ STAR = "star"
+ ADHOC = "adhoc"
+
+## TODO:
+## - AQ: Add support for hypergraphs (to be able to add hyper edges to
+## model CSMA or wireless networks)
+
+class NetGraph(object):
+ """ NetGraph represents a network topology.
+ Network graphs are internally using the networkx library.
+
+ """
+
+ def __init__(self, *args, **kwargs):
+ """ A graph can be generated using a specified pattern
+ (LADDER, MESH, TREE, etc), or provided as an argument.
+
+ :param graph: Undirected graph to use as internal representation
+ :type graph: networkx.Graph
+
+ :param topo_type: One of TopologyType.{LINEAR,LADDER,MESH,TREE,STAR}
+ used to automatically generate the topology graph.
+ :type topo_type: TopologyType
+
+ :param node_count: Number of nodes in the topology to be generated.
+ :type node_count: int
+
+ :param branches: Number of branches (arms) for the STAR topology.
+ :type branches: int
+
+
+ :param assign_ips: Automatically assign IP addresses to each node.
+ :type assign_ips: bool
+
+ :param network: Base network segment for IP address assignment.
+ :type network: str
+
+ :param prefix: Base network prefix for IP address assignment.
+ :type prefix: int
+
+ :param version: IP version for IP address assignment.
+ :type version: int
+
+
+ :param assign_st: Select source and target nodes on the graph.
+ :type assign_st: bool
+
+ NOTE: Only point-to-point like network topologies are supported for now.
+ (Wireless and Ethernet networks were several nodes share the same
+ edge (hyperedge) can not be modeled for the moment).
+
+ """
+ self._graph = kwargs.get("graph")
+ self._topo_type = TopologyType.ADHOC
+
+ if not self._graph and kwargs.get("topo_type") and \
+ kwargs.get("node_count"):
+ topo_type = kwargs["topo_type"]
+ node_count = kwargs["node_count"]
+ branches = kwargs.get("branches")
+
+ self._topo_type = topo_type
+ self._graph = self.generate_grap(topo_type, node_count,
+ branches = branches)
+
+ if kwargs.get("assign_ips"):
+ network = kwargs.get("network", "10.0.0.0")
+ prefix = kwargs.get("prefix", 8)
+ version = kwargs.get("version", 4)
+
+ self.assign_p2p_ips(self, network = network, prefix = prefix,
+ version = version)
+
+ if kwargs.get("assign_st"):
+ self.select_target_zero()
+ self.select_random_leaf_source()
+
+ @property
+ def graph(self):
+ return self._graph
+
+ @property
+ def topo_type(self):
+ return self._topo_type
+
+ @property
+ def order(self):
+ return self.graph.order()
+
+ @property
+ def nodes(self):
+ return self.graph.nodes()
+
+ @property
+ def edges(self):
+ return self.graph.edges()
+
+ def generate_graph(self, topo_type, node_count, branches = None):
+ if topo_type == LADDER:
+ total_nodes = node_count/2
+ graph = networkx.ladder_graph(total_nodes)
+
+ elif topo_type == LINEAR:
+ graph = networkx.path_graph(node_count)
+
+ elif topo_type == MESH:
+ graph = networkx.complete_graph(node_count)
+
+ elif topo_type == TREE:
+ h = math.log(node_count + 1)/math.log(2) - 1
+ graph = networkx.balanced_tree(2, h)
+
+ elif topo_type == STAR:
+ graph = networkx.Graph()
+ graph.add_node(0)
+
+ nodesinbranch = (node_count - 1)/ BRANCHES
+ c = 1
+
+ for i in xrange(BRANCHES):
+ prev = 0
+ for n in xrange(1, nodesinbranch + 1):
+ graph.add_node(c)
+ graph.add_edge(prev, c)
+ prev = c
+ c += 1
+
+ # node ids are int, make them str
+ g = networkx.Graph()
+ g.add_nodes_from(map(lambda nid: NODES[str(nid)],
+ graph.nodes()))
+ g.add_edges_from(map(lambda t: (NODES[str(t[0])], NODES[str(t[1])]),
+ graph.edges()))
+
+ return g
+
+ def add_node(self, nid):
+ nid = str(nid)
+
+ if nid not in self.graph:
+ self.graph.add_node(nid)
+
+ def add_edge(self, nid1, nid2):
+ nid1 = str(nid1)
+ nid2 = str(nid2)
+
+ self.add_node(nid1)
+ self.add_node( nid2)
+
+ if nid1 not in self.graph[nid2]:
+ self.graph.add_edge(nid2, nid1)
+
+ # The weight of the edge is the delay of the link
+ self.graph.edge[nid1][nid2]["weight"] = None
+ # confidence interval of the mean RTT
+ self.graph.edge[nid1][nid2]["weight_ci"] = None
+
+ def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
+ """ Assign IP addresses to each end of each edge of the network graph,
+ computing all the point to point subnets and addresses in the network
+ representation.
+
+ :param network: Base network address used for subnetting.
+ :type network: str
+
+ :param prefix: Prefix for the base network address used for subnetting.
+ :type prefixt: int
+
+ :param version: IP version (either 4 or 6).
+ :type version: int
+
+ """
+ if len(networkx.connected_components(self.graph)) > 1:
+ raise RuntimeError("Disconnected graph!!")
+
+ # Assign IP addresses to host
+ netblock = "%s/%d" % (network, prefix)
+ if version == 4:
+ net = ipaddr.IPv4Network(netblock)
+ new_prefix = 31
+ elif version == 6:
+ net = ipaddr.IPv6Network(netblock)
+ new_prefix = 31
+ else:
+ raise RuntimeError, "Invalid IP version %d" % version
+
+ sub_itr = net.iter_subnets(new_prefix = new_prefix)
+
+ for nid1, nid2 in self.graph.edges():
+ #### Compute subnets for each link
+
+ # get a subnet of base_add with prefix /30
+ subnet = sub_itr.next()
+ mask = subnet.netmask.exploded
+ network = subnet.network.exploded
+ prefixlen = subnet.prefixlen
+
+ # get host addresses in that subnet
+ i = subnet.iterhosts()
+ addr1 = i.next()
+ addr2 = i.next()
+
+ ip1 = addr1.exploded
+ ip2 = addr2.exploded
+ self.graph.edge[nid1][nid2]["net"] = dict()
+ self.graph.edge[nid1][nid2]["net"][nid1] = ip1
+ self.graph.edge[nid1][nid2]["net"][nid2] = ip2
+ self.graph.edge[nid1][nid2]["net"]["mask"] = mask
+ self.graph.edge[nid1][nid2]["net"]["network"] = mask
+ self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen
+
+ def get_p2p_info(self, nid1, nid2):
+ net = self.graph.edge[nid1][nid2]["net"]
+ return ( net[nid1], net[nid2], net["mask"], net["network"],
+ net["prefixlen"] )
+
+ def set_source(self, nid):
+ graph.node[nid]["source"] = True
+
+ def set_target(self, nid):
+ graph.node[nid]["target"] = True
+
+ def targets(self):
+ """ Returns the nodes that are targets """
+ return [nid for nid in self.graph.nodes() \
+ if self.graph.node[nid].get("target")]
+
+ def sources(self):
+ """ Returns the nodes that are sources """
+ return [nid for nid in self.graph.nodes() \
+ if self.graph.node[nid].get("sources")]
+
+ def select_target_zero(self):
+ """ Marks the node 0 as target
+ """
+ self.set_target("0")
+
+ def select_random_leaf_source(self):
+ """ Marks a random leaf node as source.
+ """
+
+ # The ladder is a special case because is not symmetric.
+ if self.topo_type == TopologyType.LADDER:
+ total_nodes = self.order/2
+ leaf1 = str(total_nodes - 1)
+ leaf2 = str(nodes - 1)
+ leaves = [leaf1, leaf2]
+ source = leaves.pop(random.randint(0, len(leaves) - 1))
+ else:
+ # options must not be already sources or targets
+ options = [ k for k,v in graph.degree().iteritems() \
+ if v == 1 and not graph.node[k].get("source") \
+ and not graph.node[k].get("target")]
+
+ source = options.pop(random.randint(0, len(options) - 1))
+
+ self.set_source(source)
+
ecnode.setAttribute("exp_id", xmlencode(ec.exp_id))
ecnode.setAttribute("run_id", xmlencode(ec.run_id))
ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
+ ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
+ ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir))
doc.appendChild(ecnode)
for guid, rm in ec._resources.iteritems():
if ecnode.nodeType == doc.ELEMENT_NODE:
exp_id = xmldecode(ecnode.getAttribute("exp_id"))
run_id = xmldecode(ecnode.getAttribute("run_id"))
+ local_dir = xmldecode(ecnode.getAttribute("local_dir"))
nthreads = xmldecode(ecnode.getAttribute("nthreads"))
os.environ["NEPI_NTHREADS"] = nthreads
- ec = ExperimentController(exp_id = exp_id)
+ ec = ExperimentController(exp_id = exp_id, local_dir = local_dir)
connections = set()
dirpath = tempfile.mkdtemp()
- ec = ExperimentController(exp_id="test-condition-multirun")
+ ec = ExperimentController(exp_id = "test-condition-multirun",
+ local_dir = dirpath)
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", host)
collector = ec.register_resource("Collector")
ec.set(collector, "traceName", "stdout")
- ec.set(collector, "storeDir", dirpath)
- ec.set(collector, "useRunId", True)
ec.register_connection(ping, collector)
def compute_metric_callback(ping, ec, run):
self.assertTrue(runs >= 5)
dircount = 0
- for d in os.listdir(dirpath):
- path = os.path.join(dirpath, d)
+
+ for d in os.listdir(ec.exp_dir):
+ path = os.path.join(ec.exp_dir, d)
if os.path.isdir(path):
dircount += 1
logs = glob.glob(os.path.join(path, "*.stdout"))