Commiting improvements to Collector. Local_dir added to ExperimentController
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 8 Aug 2014 17:36:34 +0000 (19:36 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 8 Aug 2014 17:36:34 +0000 (19:36 +0200)
examples/ccn_flooding/flooding.py [new file with mode: 0644]
examples/ccn_flooding/repoFile1.0.8.2 [new file with mode: 0644]
examples/linux/ccn/ccncat_extended_ring_topo.py
examples/linux/ccn/two_nodes_file_retrieval.py
examples/planetlab/ccn/two_nodes_file_retrieval.py
src/nepi/execution/ec.py
src/nepi/execution/runner.py
src/nepi/resources/all/collector.py
src/nepi/util/netgraph.py [new file with mode: 0644]
src/nepi/util/parsers/xml_parser.py
test/resources/linux/multirun.py

diff --git a/examples/ccn_flooding/flooding.py b/examples/ccn_flooding/flooding.py
new file mode 100644 (file)
index 0000000..c35d7ef
--- /dev/null
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    NEPI, a framework to manage network experiments
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.resource import ResourceState, ResourceAction
+from nepi.util.netgraph import NetGraph, TopologyType
+
+import os
+import tempfile
+
+from dminer import ccn
+
+PL_NODES = dict({
+    "0": "iraplab1.iralab.uni-karlsruhe.de",
+    "1": "planetlab1.informatik.uni-goettingen.de",
+    "2": "dfn-ple1.x-win.dfn.de",
+    "3": "mars.planetlab.haw-hamburg.de",
+    "4": "planetlab2.unineuchatel.ch", 
+    "5": "planetlab-node3.it-sudparis.eu",
+    "6": "planetlab2.extern.kuleuven.be",
+    "7": "node2pl.planet-lab.telecom-lille1.eu",
+    "8": "planetvs2.informatik.uni-stuttgart.de",
+    "9": "planetlab1.informatik.uni-wuerzburg.de",
+    "10": "planet1.l3s.uni-hannover.de",
+    "11": "planetlab1.wiwi.hu-berlin.de",
+    "12": "pl2.uni-rostock.de", 
+    "13": "planetlab1.u-strasbg.fr",
+    "14": "peeramidion.irisa.fr"
+    })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+        os.path.dirname(os.path.realpath(__file__)),
+            "..", "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, store_dir, sub_dir, rename = None):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "storeDir", store_dir)
+    ec.set(collector, "subDir", sub_dir)
+    if rename:
+        ec.set(collector, "rename", rename)
+
+    return collector
+
+def add_node(ec, n):
+    hostname = PL_NODES[n]
+
+    node = ec.register_resource("PlanetlabNode")
+    ec.set(node, "hostname", hostname)
+    ec.set(node, "username", username)
+    ec.set(node, "identity", identity)
+    ec.set(node, "pluser", pl_user)
+    ec.set(node, "plpassword", pl_password)
+    #ec.set(node, "country", country)
+    #ec.set(node, "operatingSystem", operating_system)
+    ec.set(node, "cleanExperiment", True)
+    ec.set(node, "cleanProcesses", True)
+
+    return node
+
+def add_ccnd(ec, node, n):
+    global PL_NODES
+
+    ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
+    ec.register_connection(ccnd, node)
+
+    # collector for ccnd trace
+    hostname = PL_NODES[n]
+    collector = add_collector(ec, "stderr", hostname, "log")
+    ec.register_connection(collector, ccnd)
+
+    PL_NODES[n] = (hostname, node, ccnd)
+    return ccnd
+
+def add_ccnr(ec, ccnd):
+    ccnr = ec.register_resource("LinuxCCNR")
+
+    ec.set(ccnr, "repoFile1", repofile)
+    ec.register_connection(ccnr, ccnd)
+
+    return ccnr
+
+def add_fib_entry(ec, n1, n2):
+    (hostname1, node1, ccnd1) = PL_NODES[n1] 
+    (hostname2, node2, ccnd2) = PL_NODES[n2]
+
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", peer_host)
+
+    ec.register_connection(entry, ccnd1)
+
+    ec.enable_trace(entry, "ping")
+    collector = add_collector(ec, "ping", hostname2)
+    ec.register_connection(collector, entry)
+
+    return entry
+
+def add_ccncat(ec, ccnd):
+    ccncat = ec.register_resource("LinuxCCNCat")
+    ec.set(ccncat, "pipeline", pipeline)
+    ec.set(ccncat, "contentName", content_name)
+    ec.register_connection(ccncat, ccnd)
+
+    return ccncat
+
+def compute_metric_callback(ec, run):
+    ## Process logs and analyse data
+    try:
+        graph = ccn.parse_ccndlogs(graph = graph, 
+                parse_ping_logs = True)
+    except:
+        print "Skipping: Error parsing ccnd logs", run_dir
+        raise
+
+    source = ccn.consumers(graph)[0]
+    target = ccn.producers(graph)[0]
+
+    # Process the data from the ccnd logs, but do not re compute
+    # the link delay. 
+    try:
+        (content_names,
+            interest_expiry_count,
+            interest_dupnonce_count,
+            interest_count,
+            content_count) = ccn.process_ccn_data(graph, source)
+    except:
+        print "Skipping: Error processing ccn data", run_dir
+        raise
+
+    # Compute the shortest path
+    shortest_path = ccn.shortest_path(graph, source, target)
+
+    # Compute the load coefficient
+    lcoeff = ccn.load_coefficient(graph, shortest_path, content_names)
+
+    return lcoeff
+             
+if __name__ == '__main__':
+    
+    #### Generate a LADDER network topology 
+    net_graph = NetGraph(topo_type = TopologyType.LADDER, 
+            node_count = 6, 
+            assign_st = True, 
+            assign_ips = True)
+   
+    target = net_graph.targets()[0]
+    source = net_graph.sources()[0]
+    
+    wait_guids = []
+
+    #### Create NEPI Experiment Description (EC)
+    ec = ExperimentController(exp_id)
+
+    ### Add CCN nodes to the (EC)
+    for n in graph.nodes():
+        node = add_node(ec, n)
+        ccnd = add_ccnd(ec, node, n)
+        
+        if n == target:
+            ccnr = add_ccnr(ec, ccnd)
+
+        ## Add content retrival application
+        if n == source:
+            ccncat = add_ccncat(ec, ccnd)
+            wait_guids.append(ccncat)
+
+    #### Add connections between CCN nodes
+    for n1, n2 in graph.edges():
+        add_fib_entry(ec, n1, n2)
+        add_fib_entry(ec, n2, n1)
+
+    #### Define the callback to compute experiment metric
+    metric_callback = functools.partial(compute_metric_callback, ping)
+
+    #### Run experiment until metric convergence
+    rnr = ExperimentRunner()
+
+    runs = rnr.run(ec, min_runs = 10, max_runs = 300 
+            compute_metric_callback = metric_callback,
+            wait_guids = wait_guids,
+            wait_time = 0)
+
diff --git a/examples/ccn_flooding/repoFile1.0.8.2 b/examples/ccn_flooding/repoFile1.0.8.2
new file mode 100644 (file)
index 0000000..a90b0d9
Binary files /dev/null and b/examples/ccn_flooding/repoFile1.0.8.2 differ
index a67075b..af823a2 100644 (file)
@@ -92,10 +92,9 @@ def add_stream(ec, ccnd, content_name):
 
     return app
 
-def add_collector(ec, trace_name, store_dir):
+def add_collector(ec, trace_name):
     collector = ec.register_resource("Collector")
     ec.set(collector, "traceName", trace_name)
-    ec.set(collector, "storeDir", store_dir)
 
     return collector
 
@@ -136,7 +135,7 @@ if __name__ == '__main__':
     
     ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
 
-    ec = ExperimentController(exp_id = exp_id)
+    ec = ExperimentController(exp_id = exp_id, local_dir = results_dir)
 
     # hosts in the US
     #host1 = "planetlab4.wail.wisc.edu"
@@ -214,7 +213,7 @@ if __name__ == '__main__':
             app, ResourceState.STARTED, time = "10s")
 
     # Register a collector to automatically collect traces
-    collector = add_collector(ec, "stderr", results_dir)
+    collector = add_collector(ec, "stderr")
     for ccnd in ccnds.values():
         ec.register_connection(collector, ccnd)
 
index 8e40c3b..10814ae 100644 (file)
@@ -34,8 +34,10 @@ import os
 ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
 ssh_user = ####### <<< ASSING the SSH username >>>
 
+results_dir = "/tmp/demo_CCN_results"
+
 ## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir)
 
 ## Register node 1
 node1 = ec.register_resource("LinuxNode")
@@ -113,16 +115,13 @@ ec.register_connection(app, ccnd2)
 
 # Register a collector to automatically collect the ccnd logs
 # to a local directory
-results_dir = "/tmp/demo_CCN_results"
 col1 = ec.register_resource("Collector")
 ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
 ec.set(col1, "subDir", hostname1)
 ec.register_connection(col1, ccnd1)
 
 col2 = ec.register_resource("Collector")
 ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
 ec.set(col2, "subDir", hostname2)
 ec.register_connection(col2, ccnd2)
 
index 096268f..57c5808 100644 (file)
@@ -36,8 +36,11 @@ pl_pass = ######## <<< ASSIGN the password used to login to the PlanetLab websit
 pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>>
 slicename = ####### <<< ASSING the PlanetLab slicename >>>
 
+results_dir = "/tmp/demo_CCN_results"
+
 ## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", 
+        local_dir = results_dir)
 
 ## Register node 1
 node1 = ec.register_resource("PlanetlabNode")
@@ -137,16 +140,13 @@ ec.register_connection(app, ccnd2)
 
 # Register a collector to automatically collect the ccnd logs
 # to a local directory
-results_dir = "/tmp/demo_CCN_results"
 col1 = ec.register_resource("Collector")
 ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
 ec.set(col1, "subDir", hostname1)
 ec.register_connection(col1, ccnd1)
 
 col2 = ec.register_resource("Collector")
 ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
 ec.set(col2, "subDir", hostname2)
 ec.register_connection(col2, ccnd2)
 
index 2c6557f..1357e4f 100644 (file)
@@ -34,6 +34,7 @@ import functools
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
@@ -150,7 +151,7 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
     @classmethod
@@ -159,7 +160,22 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None): 
+    def __init__(self, exp_id = None, local_dir = None, persist = False):
+        """ ExperimentController entity to model an execute a network experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type name: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type name: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type name: bool
+
+        """
+
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -177,6 +193,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.mkdtemp()
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -261,7 +288,36 @@ class ExperimentController(object):
         """
         return self._nthreads
 
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If Trie persist the ExperimentController to XML format upon completion
+
+        """
+        return self._persist
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -942,6 +998,9 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save(dirpath = self.run_dir)
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
                 self.remove_resource(guid)
index 60e75a4..75e766f 100644 (file)
@@ -40,26 +40,21 @@ class ExperimentRunner(object):
         """ Re-runs a same experiment multiple times
 
         :param ec: Experiment description of experiment to run
-        :type name: ExperimentController
-        :rtype: EperimentController
+        :type ec: ExperimentController
 
         :param min_runs: Minimum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type min_runs: int
 
         :param max_runs: Maximum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type max_runs: int
 
         :param wait_time: Time to wait in seconds between invoking
             ec.deploy() and ec.release()
-        :type name: float
-        :rtype: float
+        :type wait_time: float
 
         :param wait_guids: List of guids to pass to ec.wait_finished
             after invoking ec.deploy()
-        :type name: list 
-        :rtype: list of int
+        :type wait_guids: list 
 
         :param compute_metric_callback: function to invoke after each 
             experiment run, to compute an experiment metric. 
@@ -68,8 +63,7 @@ class ExperimentRunner(object):
 
                 metric = compute_metric_callback(ec, run)
             
-        :type name: function 
-        :rtype: function
+        :type compute_metric_callback: function 
 
         :param evaluate_convergence_callback: function to evaluate whether the 
             collected metric samples have converged and the experiment runner
@@ -81,8 +75,7 @@ class ExperimentRunner(object):
 
             If stop is True, then the runner will exit.
             
-        :type name: function 
-        :rtype: function
+        :type evaluate_convergence_callback: function 
 
         """
 
@@ -96,11 +89,8 @@ class ExperimentRunner(object):
                     "Experiment will stop when the standard error with 95% "
                     "confidence interval is >= 5% of the mean of the collected samples ")
         
-        # Set useRunId = True in Collectors to make sure results are
-        # independently stored.
-        collectors = ec.get_resources_by_type("Collector")
-        for collector in collectors:
-            collector.set("useRunId", True)
+        # Force persistence of experiment controller
+        ec._persist = True
 
         dirpath = tempfile.mkdtemp()
         filepath = ec.save(dirpath)
index af0811c..792e25f 100644 (file)
@@ -51,18 +51,6 @@ class Collector(ResourceManager):
                 "Name of the trace to be collected", 
                 flags = Flags.Design)
 
-        store_dir = Attribute("storeDir", 
-                "Path to local directory to store trace results", 
-                default = tempfile.gettempdir(),
-                flags = Flags.Design)
-
-        use_run_id = Attribute("useRunId", 
-                "If set to True stores traces into a sub directory named after "
-                "the RUN ID assigned by the EC", 
-                type = Types.Bool,
-                default = False,
-                flags = Flags.Design)
-
         sub_dir = Attribute("subDir", 
                 "Sub directory to collect traces into", 
                 flags = Flags.Design)
@@ -72,10 +60,8 @@ class Collector(ResourceManager):
                 flags = Flags.Design)
 
         cls._register_attribute(trace_name)
-        cls._register_attribute(store_dir)
         cls._register_attribute(sub_dir)
         cls._register_attribute(rename)
-        cls._register_attribute(use_run_id)
 
     def __init__(self, ec, guid):
         super(Collector, self).__init__(ec, guid)
@@ -94,17 +80,14 @@ class Collector(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        self._store_path = self.get("storeDir")
-
-        if self.get("useRunId"):
-            self._store_path = os.path.join(self._store_path, self.ec.run_id)
+        self._store_path = self.ec.run_dir
 
         subdir = self.get("subDir")
         if subdir:
-            self._store_path = os.path.join(self._store_path, subdir)
+            self._store_path = os.path.join(self.store_path, subdir)
         
         msg = "Creating local directory at %s to store %s traces " % (
-            self._store_path, trace_name)
+                self.store_path, trace_name)
         self.info(msg)
 
         try:
diff --git a/src/nepi/util/netgraph.py b/src/nepi/util/netgraph.py
new file mode 100644 (file)
index 0000000..c3ba814
--- /dev/null
@@ -0,0 +1,286 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import ipaddr
+import networkx
+import random
+
+class TopologyType:
+    LINEAR = "linear"
+    LADDER = "ladder"
+    MESH = "mesh"
+    TREE = "tree"
+    STAR = "star"
+    ADHOC = "adhoc"
+
+## TODO: 
+##      - AQ: Add support for hypergraphs (to be able to add hyper edges to 
+##        model CSMA or wireless networks)
+
+class NetGraph(object):
+    """ NetGraph represents a network topology. 
+    Network graphs are internally using the networkx library.
+
+    """
+
+    def __init__(self, *args, **kwargs):
+        """ A graph can be generated using a specified pattern 
+        (LADDER, MESH, TREE, etc), or provided as an argument.
+
+            :param graph: Undirected graph to use as internal representation 
+            :type graph: networkx.Graph
+
+            :param topo_type: One of TopologyType.{LINEAR,LADDER,MESH,TREE,STAR}
+            used to automatically generate the topology graph. 
+            :type topo_type: TopologyType
+
+            :param node_count: Number of nodes in the topology to be generated. 
+            :type node_count: int
+
+            :param branches: Number of branches (arms) for the STAR topology. 
+            :type branches: int
+
+
+            :param assign_ips: Automatically assign IP addresses to each node. 
+            :type assign_ips: bool
+
+            :param network: Base network segment for IP address assignment.
+            :type network: str
+
+            :param prefix: Base network prefix for IP address assignment.
+            :type prefix: int
+
+            :param version: IP version for IP address assignment.
+            :type version: int
+
+
+            :param assign_st: Select source and target nodes on the graph.
+            :type assign_st: bool
+
+        NOTE: Only point-to-point like network topologies are supported for now.
+                (Wireless and Ethernet networks were several nodes share the same
+                edge (hyperedge) can not be modeled for the moment).
+
+        """
+        self._graph = kwargs.get("graph") 
+        self._topo_type = TopologyType.ADHOC
+
+        if not self._graph and kwargs.get("topo_type") and \
+                kwargs.get("node_count"):
+            topo_type = kwargs["topo_type"]
+            node_count = kwargs["node_count"]
+            branches = kwargs.get("branches")
+
+            self._topo_type = topo_type
+            self._graph = self.generate_grap(topo_type, node_count, 
+                    branches = branches)
+
+        if kwargs.get("assign_ips"):
+            network = kwargs.get("network", "10.0.0.0")
+            prefix = kwargs.get("prefix", 8)
+            version = kwargs.get("version", 4)
+
+            self.assign_p2p_ips(self, network = network, prefix = prefix, 
+                    version = version)
+
+        if kwargs.get("assign_st"):
+            self.select_target_zero()
+            self.select_random_leaf_source()
+
+    @property
+    def graph(self):
+        return self._graph
+
+    @property
+    def topo_type(self):
+        return self._topo_type
+
+    @property
+    def order(self):
+        return self.graph.order()
+
+    @property
+    def nodes(self):
+        return self.graph.nodes()
+
+    @property
+    def edges(self):
+        return self.graph.edges()
+
+    def generate_graph(self, topo_type, node_count, branches = None):
+        if topo_type == LADDER:
+            total_nodes = node_count/2
+            graph = networkx.ladder_graph(total_nodes)
+
+        elif topo_type == LINEAR:
+            graph = networkx.path_graph(node_count)
+
+        elif topo_type == MESH:
+            graph = networkx.complete_graph(node_count)
+
+        elif topo_type == TREE:
+            h = math.log(node_count + 1)/math.log(2) - 1
+            graph = networkx.balanced_tree(2, h)
+
+        elif topo_type == STAR:
+            graph = networkx.Graph()
+            graph.add_node(0)
+
+            nodesinbranch = (node_count - 1)/ BRANCHES
+            c = 1
+
+            for i in xrange(BRANCHES):
+                prev = 0
+                for n in xrange(1, nodesinbranch + 1):
+                    graph.add_node(c)
+                    graph.add_edge(prev, c)
+                    prev = c
+                    c += 1
+
+        # node ids are int, make them str
+        g = networkx.Graph()
+        g.add_nodes_from(map(lambda nid: NODES[str(nid)], 
+            graph.nodes()))
+        g.add_edges_from(map(lambda t: (NODES[str(t[0])], NODES[str(t[1])]), 
+            graph.edges()))
+
+        return g
+
+    def add_node(self, nid):
+        nid = str(nid)
+
+        if nid not in self.graph:
+            self.graph.add_node(nid)
+
+    def add_edge(self, nid1, nid2):
+        nid1 = str(nid1)
+        nid2 = str(nid2)
+
+        self.add_node(nid1)
+        self.add_node( nid2)
+
+        if nid1 not in self.graph[nid2]:
+            self.graph.add_edge(nid2, nid1)
+
+            # The weight of the edge is the delay of the link
+            self.graph.edge[nid1][nid2]["weight"] = None
+            # confidence interval of the mean RTT
+            self.graph.edge[nid1][nid2]["weight_ci"] = None
+
+    def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
+        """ Assign IP addresses to each end of each edge of the network graph,
+        computing all the point to point subnets and addresses in the network
+        representation.
+
+            :param network: Base network address used for subnetting. 
+            :type network: str
+
+            :param prefix: Prefix for the base network address used for subnetting.
+            :type prefixt: int
+
+            :param version: IP version (either 4 or 6).
+            :type version: int
+
+        """
+        if len(networkx.connected_components(self.graph)) > 1:
+            raise RuntimeError("Disconnected graph!!")
+
+        # Assign IP addresses to host
+        netblock = "%s/%d" % (network, prefix)
+        if version == 4:
+            net = ipaddr.IPv4Network(netblock)
+            new_prefix = 31
+        elif version == 6:
+            net = ipaddr.IPv6Network(netblock)
+            new_prefix = 31
+        else:
+            raise RuntimeError, "Invalid IP version %d" % version
+
+        sub_itr = net.iter_subnets(new_prefix = new_prefix)
+
+        for nid1, nid2 in self.graph.edges():
+            #### Compute subnets for each link
+            
+            # get a subnet of base_add with prefix /30
+            subnet = sub_itr.next()
+            mask = subnet.netmask.exploded
+            network = subnet.network.exploded
+            prefixlen = subnet.prefixlen
+
+            # get host addresses in that subnet
+            i = subnet.iterhosts()
+            addr1 = i.next()
+            addr2 = i.next()
+
+            ip1 = addr1.exploded
+            ip2 = addr2.exploded
+            self.graph.edge[nid1][nid2]["net"] = dict()
+            self.graph.edge[nid1][nid2]["net"][nid1] = ip1
+            self.graph.edge[nid1][nid2]["net"][nid2] = ip2
+            self.graph.edge[nid1][nid2]["net"]["mask"] = mask
+            self.graph.edge[nid1][nid2]["net"]["network"] = mask
+            self.graph.edge[nid1][nid2]["net"]["prefix"] = prefixlen
+
+    def get_p2p_info(self, nid1, nid2):
+        net = self.graph.edge[nid1][nid2]["net"]
+        return ( net[nid1], net[nid2], net["mask"], net["network"], 
+                net["prefixlen"] )
+
+    def set_source(self, nid):
+        graph.node[nid]["source"] = True
+
+    def set_target(self, nid):
+        graph.node[nid]["target"] = True
+
+    def targets(self):
+        """ Returns the nodes that are targets """
+        return [nid for nid in self.graph.nodes() \
+                if self.graph.node[nid].get("target")]
+
+    def sources(self):
+        """ Returns the nodes that are sources """
+        return [nid for nid in self.graph.nodes() \
+                if self.graph.node[nid].get("sources")]
+
+    def select_target_zero(self):
+        """ Marks the node 0 as target
+        """
+        self.set_target("0")
+
+    def select_random_leaf_source(self):
+        """  Marks a random leaf node as source. 
+        """
+
+        # The ladder is a special case because is not symmetric.
+        if self.topo_type == TopologyType.LADDER:
+            total_nodes = self.order/2
+            leaf1 = str(total_nodes - 1)
+            leaf2 = str(nodes - 1)
+            leaves = [leaf1, leaf2]
+            source = leaves.pop(random.randint(0, len(leaves) - 1))
+        else:
+            # options must not be already sources or targets
+            options = [ k for k,v in graph.degree().iteritems() \
+                    if v == 1 and not graph.node[k].get("source") \
+                        and not graph.node[k].get("target")]
+
+            source = options.pop(random.randint(0, len(options) - 1))
+        
+        self.set_source(source)
+
index d5bc8a3..125360e 100644 (file)
@@ -91,6 +91,8 @@ class ECXMLParser(object):
         ecnode.setAttribute("exp_id", xmlencode(ec.exp_id))
         ecnode.setAttribute("run_id", xmlencode(ec.run_id))
         ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
+        ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
+        ecnode.setAttribute("exp_dir", xmlencode(ec.exp_dir))
         doc.appendChild(ecnode)
 
         for guid, rm in ec._resources.iteritems():
@@ -188,10 +190,11 @@ class ECXMLParser(object):
             if ecnode.nodeType == doc.ELEMENT_NODE:
                 exp_id = xmldecode(ecnode.getAttribute("exp_id"))
                 run_id = xmldecode(ecnode.getAttribute("run_id"))
+                local_dir = xmldecode(ecnode.getAttribute("local_dir"))
                 nthreads = xmldecode(ecnode.getAttribute("nthreads"))
             
                 os.environ["NEPI_NTHREADS"] = nthreads
-                ec = ExperimentController(exp_id = exp_id)
+                ec = ExperimentController(exp_id = exp_id, local_dir = local_dir)
 
                 connections = set()
 
index d548bd2..dbee7df 100755 (executable)
@@ -49,7 +49,8 @@ class LinuxMultiRunTestCase(unittest.TestCase):
 
         dirpath = tempfile.mkdtemp()
 
-        ec = ExperimentController(exp_id="test-condition-multirun")
+        ec = ExperimentController(exp_id = "test-condition-multirun", 
+                local_dir = dirpath)
         
         node = ec.register_resource("LinuxNode")
         ec.set(node, "hostname", host)
@@ -63,8 +64,6 @@ class LinuxMultiRunTestCase(unittest.TestCase):
 
         collector = ec.register_resource("Collector")
         ec.set(collector, "traceName", "stdout")
-        ec.set(collector, "storeDir", dirpath)
-        ec.set(collector, "useRunId", True)
         ec.register_connection(ping, collector)
 
         def compute_metric_callback(ping, ec, run):
@@ -87,8 +86,9 @@ class LinuxMultiRunTestCase(unittest.TestCase):
         self.assertTrue(runs >= 5)
 
         dircount = 0
-        for d in os.listdir(dirpath):
-            path = os.path.join(dirpath, d)
+
+        for d in os.listdir(ec.exp_dir):
+            path = os.path.join(ec.exp_dir, d)
             if os.path.isdir(path):
                 dircount += 1
                 logs = glob.glob(os.path.join(path, "*.stdout"))