Merging nepi-3.1-icn into nepi-3-dev
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 21 Aug 2014 22:12:05 +0000 (00:12 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 21 Aug 2014 22:12:05 +0000 (00:12 +0200)
28 files changed:
examples/ccn_emu_live/ccn_flooding/dce.py [new file with mode: 0644]
examples/ccn_emu_live/ccn_flooding/planetlab.py [new file with mode: 0644]
examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 [new file with mode: 0644]
examples/linux/ccn/ccncat_extended_ring_topo.py
examples/linux/ccn/two_nodes_file_retrieval.py
examples/planetlab/ccn/two_nodes_file_retrieval.py
src/nepi/data/__init__.py [new file with mode: 0644]
src/nepi/data/processing/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ccn/parser.py [new file with mode: 0644]
src/nepi/data/processing/ping/__init__.py [new file with mode: 0644]
src/nepi/data/processing/ping/parser.py [new file with mode: 0644]
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/execution/runner.py
src/nepi/resources/all/collector.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/util/guid.py
src/nepi/util/netgraph.py [new file with mode: 0644]
src/nepi/util/parsers/xml_parser.py
src/nepi/util/plotter.py
src/nepi/util/serializer.py
src/nepi/util/statfuncs.py [new file with mode: 0644]
src/nepi/util/timefuncs.py
test/resources/linux/multirun.py
test/util/serializer.py

diff --git a/examples/ccn_emu_live/ccn_flooding/dce.py b/examples/ccn_emu_live/ccn_flooding/dce.py
new file mode 100644 (file)
index 0000000..3386e79
--- /dev/null
@@ -0,0 +1,242 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    NEPI, a framework to manage network experiments
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import networkx
+import socket
+import os
+
+content_name = "ccnx:/test/bunny.ts"
+
+STOP_TIME = "5000s"
+
+repofile = os.path.join(
+        os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def get_simulator(ec):
+    simulator = ec.filter_resources("LinuxNS3Simulation")
+
+    if not simulator:
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", "localhost")
+
+        simu = ec.register_resource("LinuxNS3Simulation")
+        ec.register_connection(simu, node)
+        return simu
+
+    return simulator[0]
+
+def add_collector(ec, trace_name, subdir, newname = None):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "subDir", subdir)
+    if newname:
+        ec.set(collector, "rename", newname)
+
+    return collector
+
+def add_dce_host(ec, nid):
+    simu = get_simulator(ec)
+    
+    host = ec.register_resource("ns3::Node")
+    ec.set(host, "enableStack", True)
+    ec.register_connection(host, simu)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "host", host)
+    
+def add_dce_ccnd(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+    
+    # Add dce ccnd to the dce node
+    ccnd = ec.register_resource("ns3::LinuxDceCCND")
+    ec.set (ccnd, "stackSize", 1<<20)
+    ec.set (ccnd, "debug", 7)
+    ec.set (ccnd, "capacity", 50000)
+    ec.set (ccnd, "StartTime", "1s")
+    ec.set (ccnd, "StopTime", STOP_TIME)
+    ec.register_connection(ccnd, host)
+
+    # Collector to retrieve ccnd log
+    collector = add_collector(ec, "stderr", nid, "log")
+    ec.register_connection(collector, ccnd)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_dce_ccnr(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+    
+    # Add a CCN content repository to the dce node
+    ccnr = ec.register_resource("ns3::LinuxDceCCNR")
+    ec.set (ccnr, "repoFile1", repofile) 
+    ec.set (ccnr, "stackSize", 1<<20)
+    ec.set (ccnr, "StartTime", "2s")
+    ec.set (ccnr, "StopTime", STOP_TIME)
+    ec.register_connection(ccnr, host)
+
+def add_dce_ccncat(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+   
+    ccnpeek = ec.register_resource("ns3::LinuxDceCCNPeek")
+    #ec.set (ccnpeek, "contentName", "ccnx:/chunk0")
+    ec.set (ccnpeek, "contentName", content_name)
+    ec.set (ccnpeek, "stackSize", 1<<20)
+    ec.set (ccnpeek, "StartTime", "5s")
+    ec.set (ccnpeek, "StopTime", STOP_TIME)
+    ec.register_connection(ccnpeek, host)
+
+    collector = add_collector(ec, "stdout", nid, "peek")
+    ec.register_connection(collector, ccnpeek)
+
+    # Add a ccncat application to the dce host
+    ccncat = ec.register_resource("ns3::LinuxDceCCNCat")
+    ec.set (ccncat, "contentName", content_name)
+    ec.set (ccncat, "stackSize", 1<<20)
+    ec.set (ccncat, "StartTime", "8s")
+    ec.set (ccncat, "StopTime", STOP_TIME)
+    ec.register_connection(ccncat, host)
+
+def add_dce_fib_entry(ec, nid1, nid2):
+    # Retrieve annotations from netgraph
+    host1 = ec.netgraph.node_annotation(nid1, "host")
+    net = ec.netgraph.edge_net_annotation(nid1, nid2)
+    ip2 = net[nid2]
+
+    # Add FIB entry between peer hosts
+    ccndc = ec.register_resource("ns3::LinuxDceFIBEntry")
+    ec.set (ccndc, "protocol", "udp") 
+    ec.set (ccndc, "uri", "ccnx:/") 
+    ec.set (ccndc, "host", ip2)
+    ec.set (ccndc, "stackSize", 1<<20)
+    ec.set (ccndc, "StartTime", "2s")
+    ec.set (ccndc, "StopTime", STOP_TIME)
+    ec.register_connection(ccndc, host1)
+
+def add_dce_net_iface(ec, nid1, nid2):
+    # Retrieve annotations from netgraph
+    host = ec.netgraph.node_annotation(nid1, "host")
+    net = ec.netgraph.edge_net_annotation(nid1, nid2)
+    ip1 = net[nid1]
+    prefix = net["prefix"]
+
+    dev = ec.register_resource("ns3::PointToPointNetDevice")
+    ec.set(dev,"DataRate", "5Mbps")
+    ec.set(dev, "ip", ip1)
+    ec.set(dev, "prefix", prefix)
+    ec.register_connection(host, dev)
+
+    queue = ec.register_resource("ns3::DropTailQueue")
+    ec.register_connection(dev, queue)
+
+    return dev
+
+def avg_interests(ec, run):
+    ## Process logs
+    logs_dir = ec.run_dir
+
+    (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = ccn_parser.process_content_history_logs(
+                logs_dir, 
+                ec.netgraph.topology)
+
+    shortest_path = networkx.shortest_path(graph, 
+            source = ec.netgraph.sources()[0], 
+            target = ec.netgraph.targets()[0])
+
+    ### Compute metric: Avg number of Interests seen per content name
+    ###                 normalized by the number of nodes in the shortest path
+    content_name_count = len(content_names.values())
+    nodes_in_shortest_path = len(shortest_path) - 1
+    metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path))
+
+    # TODO: DUMP RESULTS TO FILE
+    # TODO: DUMP GRAPH DELAYS!
+    f = open("/tmp/metric", "w+")
+    f.write("%.2f\n" % metric)
+    f.close()
+    print " METRIC", metric
+
+    return metric
+
+def add_dce_edge(ec, nid1, nid2):
+    ### Add network interfaces to hosts
+    p2p1 = add_dce_net_iface(ec, nid1, nid2)
+    p2p2 = add_dce_net_iface(ec, nid2, nid1)
+
+    # Create point to point link between interfaces
+    chan = ec.register_resource("ns3::PointToPointChannel")
+    ec.set(chan, "Delay", "0ms")
+
+    ec.register_connection(chan, p2p1)
+    ec.register_connection(chan, p2p2)
+
+    #### Add routing between CCN nodes
+    add_dce_fib_entry(ec, nid1, nid2)
+    add_dce_fib_entry(ec, nid2, nid1)
+
+def add_dce_node(ec, nid):
+    ### Add CCN nodes (ec.netgraph holds the topology graph)
+    add_dce_host(ec, nid)
+    add_dce_ccnd(ec, nid)
+        
+    if nid == ec.netgraph.targets()[0]:
+        add_dce_ccnr(ec, nid)
+
+    if nid == ec.netgraph.sources()[0]:
+        add_dce_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+    #### Create NEPI Experiment Description with LINEAR topology 
+    ec = ExperimentController("dce_ccn", 
+            topo_type = TopologyType.LINEAR, 
+            node_count = 4,
+            assign_st = True,
+            assign_ips = True,
+            add_node_callback = add_dce_node, 
+            add_edge_callback = add_dce_edge)
+    
+    print "Results stored at", ec.exp_dir
+
+    #### Retrieve the consumer to wait for ot to finish
+    ccncat = ec.filter_resources("ns3::LinuxDceCCNCat")
+   
+    #### Run experiment until metric convergences
+    rnr = ExperimentRunner()
+    runs = rnr.run(ec, min_runs = 1, max_runs = 1, 
+            compute_metric_callback = avg_interests,
+            wait_guids = ccncat,
+            wait_time = 0)
+
diff --git a/examples/ccn_emu_live/ccn_flooding/planetlab.py b/examples/ccn_emu_live/ccn_flooding/planetlab.py
new file mode 100644 (file)
index 0000000..766e96b
--- /dev/null
@@ -0,0 +1,222 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    NEPI, a framework to manage network experiments
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+from nepi.execution.ec import ExperimentController 
+from nepi.execution.runner import ExperimentRunner
+from nepi.util.netgraph import NetGraph, TopologyType
+import nepi.data.processing.ccn.parser as ccn_parser
+
+import networkx
+import socket
+import os
+
+PL_NODES = dict({
+    "0": "iraplab1.iralab.uni-karlsruhe.de",
+    "1": "planetlab1.informatik.uni-goettingen.de",
+    "2": "dfn-ple1.x-win.dfn.de",
+    "3": "mars.planetlab.haw-hamburg.de",
+    "4": "planetlab2.unineuchatel.ch", 
+    "5": "planetlab-node3.it-sudparis.eu",
+    "6": "planetlab2.extern.kuleuven.be",
+    "7": "node2pl.planet-lab.telecom-lille1.eu",
+    "8": "planetvs2.informatik.uni-stuttgart.de",
+    "9": "planetlab1.informatik.uni-wuerzburg.de",
+    "10": "planet1.l3s.uni-hannover.de",
+    "11": "planetlab1.wiwi.hu-berlin.de",
+    "12": "pl2.uni-rostock.de", 
+    "13": "planetlab1.u-strasbg.fr",
+    "14": "peeramidion.irisa.fr"
+    })
+
+pl_slice = os.environ.get("PL_SLICE")
+pl_user = os.environ.get("PL_USER")
+pl_password = os.environ.get("PL_PASS")
+pl_ssh_key = os.environ.get("PL_SSHKEY")
+
+content_name = "ccnx:/test/bunny.ts"
+
+pipeline = 4 # Default value for ccncat
+
+operating_system = "f14"
+
+country = "germany"
+
+repofile = os.path.join(
+        os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2")
+
+def add_collector(ec, trace_name, subdir, newname = None):
+    collector = ec.register_resource("Collector")
+    ec.set(collector, "traceName", trace_name)
+    ec.set(collector, "subDir", subdir)
+    if newname:
+        ec.set(collector, "rename", newname)
+
+    return collector
+
+def add_pl_host(ec, nid):
+    hostname = PL_NODES[nid]
+
+    # Add a planetlab host to the experiment description
+    host = ec.register_resource("PlanetlabNode")
+    ec.set(host, "hostname", hostname)
+    ec.set(host, "username", pl_slice)
+    ec.set(host, "identity", pl_ssh_key)
+    #ec.set(host, "pluser", pl_user)
+    #ec.set(host, "plpassword", pl_password)
+    #ec.set(host, "country", country)
+    #ec.set(host, "operatingSystem", operating_system)
+    ec.set(host, "cleanExperiment", True)
+    ec.set(host, "cleanProcesses", True)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "hostname", hostname)
+    ec.netgraph.annotate_node(nid, "host", host)
+    
+    # Annotate the graph node with an ip address
+    ip = socket.gethostbyname(hostname)
+    ec.netgraph.annotate_node_ip(nid, ip)
+
+def add_pl_ccnd(ec, nid):
+    # Retrieve annotation from netgraph
+    host = ec.netgraph.node_annotation(nid, "host")
+    
+    # Add a CCN daemon to the planetlab node
+    ccnd = ec.register_resource("LinuxCCND")
+    ec.set(ccnd, "debug", 7)
+    ec.register_connection(ccnd, host)
+    
+    # Collector to retrieve ccnd log
+    collector = add_collector(ec, "stderr", nid, "log")
+    ec.register_connection(collector, ccnd)
+
+    # Annotate the graph
+    ec.netgraph.annotate_node(nid, "ccnd", ccnd)
+
+def add_pl_ccnr(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN content repository to the planetlab node
+    ccnr = ec.register_resource("LinuxCCNR")
+
+    ec.set(ccnr, "repoFile1", repofile)
+    ec.register_connection(ccnr, ccnd)
+
+def add_pl_ccncat(ec, nid):
+    # Retrieve annotation from netgraph
+    ccnd = ec.netgraph.node_annotation(nid, "ccnd")
+    
+    # Add a CCN cat application to the planetlab node
+    ccncat = ec.register_resource("LinuxCCNCat")
+    ec.set(ccncat, "pipeline", pipeline)
+    ec.set(ccncat, "contentName", content_name)
+    ec.register_connection(ccncat, ccnd)
+
+def add_pl_fib_entry(ec, nid1, nid2):
+    # Retrieve annotations from netgraph
+    ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd")
+    hostname2 = ec.netgraph.node_annotation(nid2, "hostname")
+    
+    # Add a FIB entry between one planetlab node and its peer
+    entry = ec.register_resource("LinuxFIBEntry")
+    ec.set(entry, "host", hostname2)
+    ec.register_connection(entry, ccnd1)
+
+    # Collector to retrieve peering ping output (to measure neighbors delay)
+    ec.enable_trace(entry, "ping")
+    collector = add_collector(ec, "ping", nid1)
+    ec.register_connection(collector, entry)
+
+    return entry
+
+def avg_interests(ec, run):
+    ## Process logs
+    logs_dir = ec.run_dir
+
+    (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = ccn_parser.process_content_history_logs(
+                logs_dir, 
+                ec.netgraph.topology)
+
+    shortest_path = networkx.shortest_path(graph, 
+            source = ec.netgraph.sources()[0], 
+            target = ec.netgraph.targets()[0])
+
+    ### Compute metric: Avg number of Interests seen per content name
+    ###                 normalized by the number of nodes in the shortest path
+    content_name_count = len(content_names.values())
+    nodes_in_shortest_path = len(shortest_path) - 1
+    metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path))
+
+    # TODO: DUMP RESULTS TO FILE
+    # TODO: DUMP GRAPH DELAYS!
+    f = open("/tmp/metric", "w+")
+    f.write("%.2f\n" % metric)
+    f.close()
+    print " METRIC", metric
+
+    return metric
+
+def add_pl_edge(ec, nid1, nid2):
+    #### Add connections between CCN nodes
+    add_pl_fib_entry(ec, nid1, nid2)
+    add_pl_fib_entry(ec, nid2, nid1)
+
+def add_pl_node(ec, nid):
+    ### Add CCN nodes (ec.netgraph holds the topology graph)
+    add_pl_host(ec, nid)
+    add_pl_ccnd(ec, nid)
+        
+    if nid == ec.netgraph.targets()[0]:
+        add_pl_ccnr(ec, nid)
+
+    if nid == ec.netgraph.sources()[0]:
+        add_pl_ccncat(ec, nid)
+
+if __name__ == '__main__':
+
+    #### Create NEPI Experiment Description with LINEAR topology 
+    ec = ExperimentController("pl_ccn", 
+            topo_type = TopologyType.LINEAR, 
+            node_count = 4, 
+            #assign_ips = True,
+            assign_st = True,
+            add_node_callback = add_pl_node, 
+            add_edge_callback = add_pl_edge)
+    
+    print "Results stored at", ec.exp_dir
+
+    #### Retrieve the content producing resource to wait for ot to finish
+    ccncat = ec.filter_resources("LinuxCCNCat")
+   
+    #### Run experiment until metric convergences
+    rnr = ExperimentRunner()
+    runs = rnr.run(ec, min_runs = 10, max_runs = 300, 
+            compute_metric_callback = avg_interests,
+            wait_guids = ccncat,
+            wait_time = 0)
+
diff --git a/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 b/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2
new file mode 100644 (file)
index 0000000..a90b0d9
Binary files /dev/null and b/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 differ
index a67075b..af823a2 100644 (file)
@@ -92,10 +92,9 @@ def add_stream(ec, ccnd, content_name):
 
     return app
 
-def add_collector(ec, trace_name, store_dir):
+def add_collector(ec, trace_name):
     collector = ec.register_resource("Collector")
     ec.set(collector, "traceName", trace_name)
-    ec.set(collector, "storeDir", store_dir)
 
     return collector
 
@@ -136,7 +135,7 @@ if __name__ == '__main__':
     
     ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options()
 
-    ec = ExperimentController(exp_id = exp_id)
+    ec = ExperimentController(exp_id = exp_id, local_dir = results_dir)
 
     # hosts in the US
     #host1 = "planetlab4.wail.wisc.edu"
@@ -214,7 +213,7 @@ if __name__ == '__main__':
             app, ResourceState.STARTED, time = "10s")
 
     # Register a collector to automatically collect traces
-    collector = add_collector(ec, "stderr", results_dir)
+    collector = add_collector(ec, "stderr")
     for ccnd in ccnds.values():
         ec.register_connection(collector, ccnd)
 
index 8e40c3b..10814ae 100644 (file)
@@ -34,8 +34,10 @@ import os
 ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>>
 ssh_user = ####### <<< ASSING the SSH username >>>
 
+results_dir = "/tmp/demo_CCN_results"
+
 ## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir)
 
 ## Register node 1
 node1 = ec.register_resource("LinuxNode")
@@ -113,16 +115,13 @@ ec.register_connection(app, ccnd2)
 
 # Register a collector to automatically collect the ccnd logs
 # to a local directory
-results_dir = "/tmp/demo_CCN_results"
 col1 = ec.register_resource("Collector")
 ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
 ec.set(col1, "subDir", hostname1)
 ec.register_connection(col1, ccnd1)
 
 col2 = ec.register_resource("Collector")
 ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
 ec.set(col2, "subDir", hostname2)
 ec.register_connection(col2, ccnd2)
 
index 096268f..57c5808 100644 (file)
@@ -36,8 +36,11 @@ pl_pass = ######## <<< ASSIGN the password used to login to the PlanetLab websit
 pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>>
 slicename = ####### <<< ASSING the PlanetLab slicename >>>
 
+results_dir = "/tmp/demo_CCN_results"
+
 ## Create the experiment controller
-ec = ExperimentController(exp_id = "demo_CCN")
+ec = ExperimentController(exp_id = "demo_CCN", 
+        local_dir = results_dir)
 
 ## Register node 1
 node1 = ec.register_resource("PlanetlabNode")
@@ -137,16 +140,13 @@ ec.register_connection(app, ccnd2)
 
 # Register a collector to automatically collect the ccnd logs
 # to a local directory
-results_dir = "/tmp/demo_CCN_results"
 col1 = ec.register_resource("Collector")
 ec.set(col1, "traceName", "stderr")
-ec.set(col1, "storeDir", results_dir)
 ec.set(col1, "subDir", hostname1)
 ec.register_connection(col1, ccnd1)
 
 col2 = ec.register_resource("Collector")
 ec.set(col2, "traceName", "stderr")
-ec.set(col2, "storeDir", results_dir)
 ec.set(col2, "subDir", hostname2)
 ec.register_connection(col2, ccnd2)
 
diff --git a/src/nepi/data/__init__.py b/src/nepi/data/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/__init__.py b/src/nepi/data/processing/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/__init__.py b/src/nepi/data/processing/ccn/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ccn/parser.py b/src/nepi/data/processing/ccn/parser.py
new file mode 100644 (file)
index 0000000..e3c1007
--- /dev/null
@@ -0,0 +1,402 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse (CCNx) ccnd logs.
+#
+# Results from experiments must be stored in a directory
+# named with the experiment run id.
+# ccnd logs are stored in .log files in a subdirectory per node.
+# The following diagram exemplifies the experiment result directory
+# structure (nidi is the unique identifier assigned to node i):
+#
+#    run_id
+#               \   nid1
+#                        \ nid2.log
+#               \   nid2
+#                        \ nid1.log
+#               \   nid3
+#                        \ nid3.log
+#
+
+import collections
+import functools
+import networkx
+import os
+import pickle
+import tempfile
+
+from nepi.util.timefuncs import compute_delay_ms
+from nepi.util.statfuncs import compute_mean
+import nepi.data.processing.ping.parser as ping_parser
+
+def is_control(content_name):
+    return content_name.startswith("ccnx:/%C1") or \
+            content_name.startswith("ccnx:/ccnx") or \
+            content_name.startswith("ccnx:/...")
+
+
+def parse_file(filename):
+    """ Parses message information from ccnd log files
+
+        filename: path to ccndlog file
+
+    """
+
+    faces = dict()
+    sep = " "
+
+    f = open(filename, "r")
+
+    data = []
+
+    for line in f:
+        cols =  line.strip().split(sep)
+
+        # CCN_PEEK
+        # MESSAGE interest_from
+        # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
+        #
+        # MESSAGE interest_to
+        # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
+        #
+        # MESSAGE CONTENT FROM
+        # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # MESSAGE CONTENT_TO
+        # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
+        #
+        # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
+
+        # External face creation
+        # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
+
+        if line.find("accepted datagram client") > -1:
+            face_id = (cols[5]).replace("id=",'')
+            ip = cols[7] 
+            port = cols[9]
+            faces[face_id] = (ip, port)
+            continue
+
+        # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
+        if line.find("releasing face id") > -1:
+            face_id = cols[5]
+            if face_id in faces:
+                del faces[face_id]
+            continue
+
+        if len(cols) < 6:
+            continue
+
+        timestamp = cols[0]
+        message_type = cols[3]
+
+        if message_type not in ["interest_from", "interest_to", "content_from", 
+                "content_to", "interest_dupnonce", "interest_expiry"]:
+            continue
+
+        face_id = cols[4] 
+        content_name = cols[5]
+
+        # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
+        nonce = ""
+        if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
+            last = cols[-1]
+            if len(last.split("-")) == 5:
+                nonce = last
+
+        try:
+            size = int((cols[6]).replace('(',''))
+        except:
+            print "interest_expiry without face id!", line
+            continue
+
+        # If no external IP address was identified for this face
+        # asume it is a local face
+        peer = "localhost"
+
+        if face_id in faces:
+            peer, port = faces[face_id]
+
+        data.append((content_name, timestamp, message_type, peer, face_id, 
+            size, nonce, line))
+
+    f.close()
+
+    return data
+
+def dump_content_history(content_history):
+    f = tempfile.NamedTemporaryFile(delete=False)
+    pickle.dump(content_history, f)
+    f.close()
+    return f.name
+
+def load_content_history(fname):
+    f = open(fname, "r")
+    content_history = pickle.load(f)
+    f.close()
+
+    os.remove(fname)
+    return content_history
+
+def annotate_cn_node(graph, nid, ips2nid, data, content_history):
+    for (content_name, timestamp, message_type, peer, face_id, 
+            size, nonce, line) in data:
+
+        # Ignore control messages for the time being
+        if is_control(content_name):
+            continue
+
+        if message_type == "interest_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_consumer"] = True
+        elif message_type == "content_from" and \
+                peer == "localhost":
+            graph.node[nid]["ccn_producer"] = True
+
+        # Ignore local messages for the time being. 
+        # They could later be used to calculate the processing times
+        # of messages.
+        if peer == "localhost":
+            continue
+
+        # remove digest
+        if message_type in ["content_from", "content_to"]:
+            content_name = "/".join(content_name.split("/")[:-1])
+           
+        if content_name not in content_history:
+            content_history[content_name] = list()
+      
+        peernid = ips2nid[peer]
+        graph.add_edge(nid, peernid)
+
+        content_history[content_name].append((timestamp, message_type, nid, 
+            peernid, nonce, size, line))
+
+def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
+    """ Adds CCN content history for each node in the topology graph.
+
+    """
+    # Make a copy of the graph to ensure integrity
+    graph = graph.copy()
+
+    ips2nid = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nid[ip] = nid
+
+    # Now walk through the ccnd logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different node
+        nid = os.path.basename(dirpath)
+    
+        content_history = dict()
+
+        for fname in fnames:
+            if fname.endswith(".log"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nid, data, content_history)
+
+        # Avoid storing everything in memory, instead dump to a file
+        # and reference the file
+        fname = dump_content_history(content_history)
+        graph.node[nid]["history"] = fname
+
+    if parse_ping_logs:
+        ping_parser.annotate_cn_graph(logs_dir, graph)
+
+    return graph
+
+def ccn_producers(graph):
+    """ Returns the nodes that are content providers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_producer")]
+
+def ccn_consumers(graph):
+    """ Returns the nodes that are content consumers """
+    return [nid for nid in graph.nodes() \
+            if graph.node[nid].get("ccn_consumer")]
+
+def process_content_history(graph):
+    """ Compute CCN message counts and aggregates content historical 
+    information in the content_names dictionary 
+    
+    """
+
+    ## Assume single source
+    source = ccn_consumers(graph)[0]
+
+    interest_expiry_count = 0
+    interest_dupnonce_count = 0
+    interest_count = 0
+    content_count = 0
+    content_names = dict()
+
+    # Collect information about exchanged messages by content name and
+    # link delay info.
+    for nid in graph.nodes():
+        # Load the data collected from the node's ccnd log
+        fname = graph.node[nid]["history"]
+        history = load_content_history(fname)
+
+        for content_name in history.keys():
+            hist = history[content_name]
+
+            for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
+                if message_type in ["content_from", "content_to"]:
+                    # The first Interest sent will not have a version or chunk number.
+                    # The first Content sent back in reply, will end in /=00 or /%00.
+                    # Make sure to map the first Content to the first Interest.
+                    if content_name.endswith("/=00"):
+                        content_name = "/".join(content_name.split("/")[0:-2])
+
+                # Add content name to dictionary
+                if content_name not in content_names:
+                    content_names[content_name] = dict()
+                    content_names[content_name]["interest"] = dict()
+                    content_names[content_name]["content"] = list()
+
+                # Classify interests by replica
+                if message_type in ["interest_from"] and \
+                        nonce not in content_names[content_name]["interest"]:
+                    content_names[content_name]["interest"][nonce] = list()
+     
+                # Add consumer history
+                if nid == source:
+                    if message_type in ["interest_to", "content_from"]:
+                        # content name history as seen by the source
+                        if "consumer_history" not in content_names[content_name]:
+                            content_names[content_name]["consumer_history"] = list()
+
+                        content_names[content_name]["consumer_history"].append(
+                                (timestamp, message_type)) 
+
+                # Add messages per content name and cumulate totals by message type
+                if message_type == "interest_dupnonce":
+                    interest_dupnonce_count += 1
+                elif message_type == "interest_expiry":
+                    interest_expiry_count += 1
+                elif message_type == "interest_from":
+                    interest_count += 1
+                    # Append to interest history of the content name
+                    content_names[content_name]["interest"][nonce].append(
+                            (timestamp, nid2, nid1))
+                elif message_type == "content_from":
+                    content_count += 1
+                    # Append to content history of the content name
+                    content_names[content_name]["content"].append((timestamp, nid2, nid1))
+                else:
+                    continue
+            del hist
+        del history
+
+    # Compute the time elapsed between the time an interest is sent
+    # in the consumer node and when the content is received back
+    for content_name in content_names.keys():
+        # order content and interest messages by timestamp
+        content_names[content_name]["content"] = sorted(
+              content_names[content_name]["content"])
+        
+        for nonce, timestamps in content_names[content_name][
+                    "interest"].iteritems():
+              content_names[content_name]["interest"][nonce] = sorted(
+                        timestamps)
+      
+        history = sorted(content_names[content_name]["consumer_history"])
+        content_names[content_name]["consumer_history"] = history
+
+        # compute the rtt time of the message
+        rtt = None
+        waiting_content = False 
+        interest_timestamp = None
+        content_timestamp = None
+        
+        for (timestamp, message_type) in history:
+            if not waiting_content and message_type == "interest_to":
+                waiting_content = True
+                interest_timestamp = timestamp
+                continue
+
+            if waiting_content and message_type == "content_from":
+                content_timestamp = timestamp
+                break
+    
+        # If we can't determine who sent the interest, discard it
+        rtt = -1
+        if interest_timestamp and content_timestamp:
+            rtt = compute_delay_ms(content_timestamp, interest_timestamp)
+
+        content_names[content_name]["rtt"] = rtt
+        content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
+
+    return (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count)
+
+def process_content_history_logs(logs_dir, graph):
+    """ Parse CCN logs and aggregate content history information in graph.
+    Returns annotated graph and message countn and content names history.
+
+    """
+    ## Process logs and analyse data
+    try:
+        graph = annotate_cn_graph(logs_dir, graph, 
+                parse_ping_logs = True)
+    except:
+        print "Skipping: Error parsing ccnd logs", logs_dir
+        raise
+
+    source = ccn_consumers(graph)[0]
+    target = ccn_producers(graph)[0]
+
+    # Process the data from the ccnd logs, but do not re compute
+    # the link delay. 
+    try:
+        (graph,
+        content_names,
+        interest_expiry_count,
+        interest_dupnonce_count,
+        interest_count,
+        content_count) = process_content_history(graph)
+    except:
+        print "Skipping: Error processing ccn data", logs_dir
+        raise
+
+    return (graph,
+            content_names,
+            interest_expiry_count,
+            interest_dupnonce_count,
+            interest_count,
+            content_count) 
diff --git a/src/nepi/data/processing/ping/__init__.py b/src/nepi/data/processing/ping/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/nepi/data/processing/ping/parser.py b/src/nepi/data/processing/ping/parser.py
new file mode 100644 (file)
index 0000000..0248c8c
--- /dev/null
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse log files generated using ping. 
+#
+
+import collections
+import re
+import os
+
+# RE to match line starting "traceroute to"
+_rre = re.compile("\d+ bytes from ((?P<hostname>[^\s]+) )?\(?(?P<ip>[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P<time>[^\s]+) ms")
+
+def parse_file(filename):
+    """
+        filename: path to traceroute file
+
+    """
+
+    f = open(filename, "r")
+
+    # Traceroute info
+    target_ip = None
+    target_hostname = None
+   
+    data = []
+
+    for line in f:
+        # match traceroute to ...
+        m = re.match(_rre, line)
+        if not m:
+            continue
+
+        target_ip = m.groupdict()["ip"]
+        # FIX THIS: Make sure the regular expression does not inlcude 
+        # the ')' in the ip group 
+        target_ip = target_ip.replace(")","")
+        target_hostname = m.groupdict()["hostname"]
+        time = m.groupdict()["time"]
+        data.append((target_ip, target_hostname, time))
+
+    f.close()
+
+    return data
+
+def annotate_cn_node(graph, nid1, ips2nid, data):
+    for (target_ip, target_hostname, time) in data:
+        nid2 = ips2nid[target_ip]
+
+        if "delays" not in graph.edge[nid1][nid2]:
+            graph.edge[nid1][nid2]["delays"] = []
+
+        time = float(time.replace("ms", "").replace(" ",""))
+
+        graph.edge[nid1][nid2]["delays"].append(time)
+
+def annotate_cn_graph(logs_dir, graph): 
+    """ Add delay inormation to graph using data collected using
+    ping.
+
+    """
+    ips2nid = dict()
+
+    for nid in graph.nodes():
+        ips = graph.node[nid]["ips"]
+        for ip in ips:
+            ips2nid[ip] = nid
+
+    # Walk through the ping logs...
+    for dirpath, dnames, fnames in os.walk(logs_dir):
+        # continue if we are not at the leaf level (if there are subdirectories)
+        if dnames: 
+            continue
+        
+        # Each dirpath correspond to a different host
+        nid = os.path.basename(dirpath)
+    
+        for fname in fnames:
+            if fname.endswith(".ping"):
+                filename = os.path.join(dirpath, fname)
+                data = parse_file(filename)
+                annotate_cn_node(graph, nid, ips2nid, data)
+
+    # Take as weight the most frequent value
+    for nid1, nid2 in graph.edges():
+        delays = collections.Counter(graph.edge[nid1][nid2]["delays"])
+        weight = delays.most_common(1)[0][0]
+        del graph.edge[nid1][nid2]["delays"]
+        graph.edge[nid1][nid2]["weight"] = weight
+
+    return graph
+
+
index 19fbfc2..bf0a853 100644 (file)
@@ -55,6 +55,7 @@ class Flags:
     # Attribute global is set to all resources of rtype
     Global  = 1 << 7 # 128
 
+
 class Attribute(object):
     """
     .. class:: Class Args :
index 2c6557f..2e573dd 100644 (file)
@@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -34,6 +35,7 @@ import functools
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
@@ -100,7 +102,7 @@ class ExperimentController(object):
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -115,7 +117,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -125,10 +127,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -150,7 +151,7 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
     @classmethod
@@ -159,7 +160,31 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None): 
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            add_node_callback = None, add_edge_callback = None, **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type exp_id: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type local_dir: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type persist: bool
+
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
+
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -177,6 +202,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.gettempdir() # /tmp
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -206,6 +242,12 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback or add_edge_callback or kwargs.get("topology"):
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -261,7 +303,45 @@ class ExperimentController(object):
         """
         return self._nthreads
 
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
+
+        """
+        return self._persist
+
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -394,8 +474,16 @@ class ExperimentController(object):
         return sec
 
     def save(self, dirpath = None, format = SFormats.XML):
+        if dirpath == None:
+            dirpath = self.run_dir
+
+        try:
+            os.makedirs(dirpath)
+        except OSError:
+            pass
+
         serializer = ECSerializer()
-        path = serializer.save(self, dirpath  = None, format = format)
+        path = serializer.save(self, dirpath, format = format)
         return path
 
     def get_task(self, tid):
@@ -422,7 +510,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
@@ -432,7 +520,7 @@ class ExperimentController(object):
         """
         rms = []
         for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
@@ -441,16 +529,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
         keys = self._resources.keys()
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -942,6 +1045,9 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save()
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
                 self.remove_resource(guid)
@@ -1125,3 +1231,19 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        if add_node_callback:
+            ### Add resources to the EC
+            for nid in self.netgraph.nodes():
+                add_node_callback(self, nid)
+
+        if add_edge_callback:
+            #### Add connections between resources
+            for nid1, nid2 in self.netgraph.edges():
+                add_edge_callback(self, nid1, nid2)
+
index c389cfe..0f75e9a 100644 (file)
@@ -113,11 +113,13 @@ def failtrap(func):
         try:
             return func(self, *args, **kwargs)
         except:
+            self.fail()
+            
             import traceback
             err = traceback.format_exc()
-            self.error(err)
-            self.debug("SETTING guid %d to state FAILED" % self.guid)
-            self.fail()
+            logger = Logger(self._rtype)
+            logger.error(err)
+            logger.error("SETTING guid %d to state FAILED" % self.guid)
             raise
     
     return wrapped
@@ -560,11 +562,14 @@ class ResourceManager(Logger):
             try:
                 self.do_release()
             except:
+                self.set_released()
+
                 import traceback
                 err = traceback.format_exc()
-                self.error(err)
-
-                self.set_released()
+                msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % (
+                        self._rtype, self.guid, err)
+                logger = Logger(self._rtype)
+                logger.debug(msg)
 
     def fail(self):
         """ Sets the RM to state FAILED.
@@ -1050,12 +1055,18 @@ class ResourceManager(Logger):
     def set_released(self, time = None):
         """ Mark ResourceManager as REALEASED """
         self.set_state(ResourceState.RELEASED, "_release_time", time)
-        self.debug("----- RELEASED ---- ")
+
+        msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_failed(self, time = None):
         """ Mark ResourceManager as FAILED """
         self.set_state(ResourceState.FAILED, "_failed_time", time)
-        self.debug("----- FAILED ---- ")
+
+        msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid)
+        logger = Logger(self._rtype)
+        logger.debug(msg)
 
     def set_discovered(self, time = None):
         """ Mark ResourceManager as DISCOVERED """
index 60e75a4..3a09ff4 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.ec import ExperimentController
+from nepi.execution.ec import ExperimentController, ECState
 
 import math
 import numpy
 import os
-import tempfile
 import time
 
 class ExperimentRunner(object):
@@ -40,26 +39,21 @@ class ExperimentRunner(object):
         """ Re-runs a same experiment multiple times
 
         :param ec: Experiment description of experiment to run
-        :type name: ExperimentController
-        :rtype: EperimentController
+        :type ec: ExperimentController
 
         :param min_runs: Minimum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type min_runs: int
 
         :param max_runs: Maximum number of repetitions for experiment
-        :type name: int
-        :rtype: int
+        :type max_runs: int
 
         :param wait_time: Time to wait in seconds between invoking
             ec.deploy() and ec.release()
-        :type name: float
-        :rtype: float
+        :type wait_time: float
 
         :param wait_guids: List of guids to pass to ec.wait_finished
             after invoking ec.deploy()
-        :type name: list 
-        :rtype: list of int
+        :type wait_guids: list 
 
         :param compute_metric_callback: function to invoke after each 
             experiment run, to compute an experiment metric. 
@@ -68,8 +62,7 @@ class ExperimentRunner(object):
 
                 metric = compute_metric_callback(ec, run)
             
-        :type name: function 
-        :rtype: function
+        :type compute_metric_callback: function 
 
         :param evaluate_convergence_callback: function to evaluate whether the 
             collected metric samples have converged and the experiment runner
@@ -81,8 +74,7 @@ class ExperimentRunner(object):
 
             If stop is True, then the runner will exit.
             
-        :type name: function 
-        :rtype: function
+        :type evaluate_convergence_callback: function 
 
         """
 
@@ -96,27 +88,22 @@ class ExperimentRunner(object):
                     "Experiment will stop when the standard error with 95% "
                     "confidence interval is >= 5% of the mean of the collected samples ")
         
-        # Set useRunId = True in Collectors to make sure results are
-        # independently stored.
-        collectors = ec.get_resources_by_type("Collector")
-        for collector in collectors:
-            collector.set("useRunId", True)
+        # Force persistence of experiment controller
+        ec._persist = True
 
-        dirpath = tempfile.mkdtemp()
-        filepath = ec.save(dirpath)
+        filepath = ec.save(dirpath = ec.exp_dir)
 
         samples = []
         run = 0
-        while True: 
+        stop = False
+
+        while not stop: 
             run += 1
 
             ec = self.run_experiment(filepath, wait_time, wait_guids)
             
             ec.logger.info(" RUN %d \n" % run)
 
-            if run >= min_runs and max_runs > -1 and run >= max_runs :
-                break
-
             if compute_metric_callback:
                 metric = compute_metric_callback(ec, run)
                 if metric is not None:
@@ -124,7 +111,11 @@ class ExperimentRunner(object):
 
                     if run >= min_runs and evaluate_convergence_callback:
                         if evaluate_convergence_callback(ec, run, samples):
-                            break
+                            stop = True
+
+            if run >= min_runs and max_runs > -1 and run >= max_runs :
+                stop = True
+
             del ec
 
         return run
@@ -150,12 +141,15 @@ class ExperimentRunner(object):
         ec = ExperimentController.load(filepath)
 
         ec.deploy()
-
+    
         ec.wait_finished(wait_guids)
         time.sleep(wait_time)
 
         ec.release()
 
+        if ec.state == ECState.FAILED:
+            raise RuntimeError, "Experiment failed"
+
         return ec
 
 
index af0811c..6742fc8 100644 (file)
@@ -51,18 +51,6 @@ class Collector(ResourceManager):
                 "Name of the trace to be collected", 
                 flags = Flags.Design)
 
-        store_dir = Attribute("storeDir", 
-                "Path to local directory to store trace results", 
-                default = tempfile.gettempdir(),
-                flags = Flags.Design)
-
-        use_run_id = Attribute("useRunId", 
-                "If set to True stores traces into a sub directory named after "
-                "the RUN ID assigned by the EC", 
-                type = Types.Bool,
-                default = False,
-                flags = Flags.Design)
-
         sub_dir = Attribute("subDir", 
                 "Sub directory to collect traces into", 
                 flags = Flags.Design)
@@ -72,10 +60,8 @@ class Collector(ResourceManager):
                 flags = Flags.Design)
 
         cls._register_attribute(trace_name)
-        cls._register_attribute(store_dir)
         cls._register_attribute(sub_dir)
         cls._register_attribute(rename)
-        cls._register_attribute(use_run_id)
 
     def __init__(self, ec, guid):
         super(Collector, self).__init__(ec, guid)
@@ -94,17 +80,14 @@ class Collector(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        self._store_path = self.get("storeDir")
-
-        if self.get("useRunId"):
-            self._store_path = os.path.join(self._store_path, self.ec.run_id)
+        self._store_path = self.ec.run_dir
 
         subdir = self.get("subDir")
         if subdir:
-            self._store_path = os.path.join(self._store_path, subdir)
+            self._store_path = os.path.join(self.store_path, subdir)
         
         msg = "Creating local directory at %s to store %s traces " % (
-            self._store_path, trace_name)
+                self.store_path, trace_name)
         self.info(msg)
 
         try:
@@ -130,10 +113,11 @@ class Collector(ResourceManager):
 
         rms = self.get_connected()
         for rm in rms:
-            result = self.ec.trace(rm.guid, trace_name)
             fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
-                rename))
+                 rename))
+
             try:
+                result = self.ec.trace(rm.guid, trace_name)
                 f = open(fpath, "w")
                 f.write(result)
                 f.close()
index 5e06dab..ca9772d 100644 (file)
@@ -173,7 +173,9 @@ class LinuxApplication(ResourceManager):
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
+        self._node = None
         self._home = "app-%s" % self.guid
+
         # whether the command should run in foreground attached
         # to a terminal
         self._in_foreground = False
@@ -194,9 +196,16 @@ class LinuxApplication(ResourceManager):
 
     @property
     def node(self):
-        node = self.get_connected(LinuxNode.get_rtype())
-        if node: return node[0]
-        raise RuntimeError, "Application must be connected to Node"
+        if not self._node:
+            node = self.get_connected(LinuxNode.get_rtype())
+            if not node: 
+                msg = "Application %s guid %d NOT connected to Node" % (
+                        self._rtype, self.guid)
+                raise RuntimeError, msg
+
+            self._node = node[0]
+
+        return self._node
 
     @property
     def app_home(self):
@@ -639,7 +648,7 @@ class LinuxApplication(ResourceManager):
                     if (proc and proc.poll()) or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
-        
+            
             super(LinuxApplication, self).do_stop()
 
     def do_release(self):
index 8bf9bd1..4b7e9c4 100644 (file)
@@ -85,14 +85,37 @@ class LinuxFIBEntry(LinuxApplication):
         super(LinuxFIBEntry, self).__init__(ec, guid)
         self._home = "fib-%s" % self.guid
         self._ping = None
-        self._mtr = None
         self._traceroute = None
+        self._ccnd = None
 
     @property
     def ccnd(self):
-        ccnd = self.get_connected(LinuxCCND.get_rtype())
-        if ccnd: return ccnd[0]
-        return None
+        if not self._ccnd:
+            ccnd = self.get_connected(LinuxCCND.get_rtype())
+            if ccnd: 
+                self._ccnd = ccnd[0]
+            
+        return self._ccnd
+
+    @property
+    def ping(self):
+        if not self._ping:
+            from nepi.resources.linux.ping import LinuxPing
+            ping = self.get_connected(LinuxPing.get_rtype())
+            if ping: 
+                self._ping = ping[0]
+            
+        return self._ping
+
+    @property
+    def traceroute(self):
+        if not self._traceroute:
+            from nepi.resources.linux.traceroute import LinuxTraceroute
+            traceroute = self.get_connected(LinuxTraceroute.get_rtype())
+            if traceroute: 
+                self._traceroute = traceroute[0]
+            
+        return self._traceroute
 
     @property
     def node(self):
@@ -101,11 +124,14 @@ class LinuxFIBEntry(LinuxApplication):
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         if name == "ping":
-            return self.ec.trace(self._ping, "stdout", attr, block, offset)
-        if name == "mtr":
-            return self.ec.trace(self._mtr, "stdout", attr, block, offset)
+            if not self.ping:
+                return None
+            return self.ec.trace(self.ping.guid, "stdout", attr, block, offset)
+
         if name == "traceroute":
-            return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
+            if not self.traceroute:
+                return None
+            return self.ec.trace(self.traceroute.guid, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
     
@@ -159,38 +185,28 @@ class LinuxFIBEntry(LinuxApplication):
             raise RuntimeError, msg
         
     def configure(self):
-        if self.trace_enabled("ping"):
+        if self.trace_enabled("ping") and not self.ping:
             self.info("Configuring PING trace")
-            self._ping = self.ec.register_resource("LinuxPing")
-            self.ec.set(self._ping, "printTimestamp", True)
-            self.ec.set(self._ping, "target", self.get("host"))
-            self.ec.set(self._ping, "earlyStart", True)
-            self.ec.register_connection(self._ping, self.node.guid)
+            ping = self.ec.register_resource("LinuxPing")
+            self.ec.set(ping, "printTimestamp", True)
+            self.ec.set(ping, "target", self.get("host"))
+            self.ec.set(ping, "earlyStart", True)
+            self.ec.register_connection(ping, self.node.guid)
+            self.ec.register_connection(ping, self.guid)
             # schedule ping deploy
-            self.ec.deploy(guids=[self._ping], group = self.deployment_group)
-
-        if self.trace_enabled("mtr"):
-            self.info("Configuring MTR trace")
-            self._mtr = self.ec.register_resource("LinuxMtr")
-            self.ec.set(self._mtr, "noDns", True)
-            self.ec.set(self._mtr, "printTimestamp", True)
-            self.ec.set(self._mtr, "continuous", True)
-            self.ec.set(self._mtr, "target", self.get("host"))
-            self.ec.set(self._mtr, "earlyStart", True)
-            self.ec.register_connection(self._mtr, self.node.guid)
-            # schedule mtr deploy
-            self.ec.deploy(guids=[self._mtr], group = self.deployment_group)
+            self.ec.deploy(guids=[ping], group = self.deployment_group)
 
-        if self.trace_enabled("traceroute"):
+        if self.trace_enabled("traceroute") and not self.traceroute:
             self.info("Configuring TRACEROUTE trace")
-            self._traceroute = self.ec.register_resource("LinuxTraceroute")
-            self.ec.set(self._traceroute, "printTimestamp", True)
-            self.ec.set(self._traceroute, "continuous", True)
-            self.ec.set(self._traceroute, "target", self.get("host"))
-            self.ec.set(self._traceroute, "earlyStart", True)
-            self.ec.register_connection(self._traceroute, self.node.guid)
+            traceroute = self.ec.register_resource("LinuxTraceroute")
+            self.ec.set(traceroute, "printTimestamp", True)
+            self.ec.set(traceroute, "continuous", True)
+            self.ec.set(traceroute, "target", self.get("host"))
+            self.ec.set(traceroute, "earlyStart", True)
+            self.ec.register_connection(traceroute, self.node.guid)
+            self.ec.register_connection(traceroute, self.guid)
             # schedule mtr deploy
-            self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
+            self.ec.deploy(guids=[traceroute], group = self.deployment_group)
 
     def do_start(self):
         if self.state == ResourceState.READY:
index 522a174..44624a2 100644 (file)
 # Should it be made thread-safe?
 class GuidGenerator(object):
     def __init__(self):
-        self._guids = list()
+        self._last_guid = 0
 
     def next(self, guid = None):
-        if guid != None:
-            return guid
-        else:
-            last_guid = 0 if len(self._guids) == 0 else self._guids[-1]
-            guid = last_guid + 1 
-        self._guids.append(guid)
-        self._guids.sort()
+        if guid == None:
+            guid = self._last_guid + 1
+
+        self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
         return guid
 
diff --git a/src/nepi/util/netgraph.py b/src/nepi/util/netgraph.py
new file mode 100644 (file)
index 0000000..6d68622
--- /dev/null
@@ -0,0 +1,341 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import ipaddr
+import networkx
+import random
+
+class TopologyType:
+    LINEAR = "linear"
+    LADDER = "ladder"
+    MESH = "mesh"
+    TREE = "tree"
+    STAR = "star"
+    ADHOC = "adhoc"
+
+## TODO: 
+##      - AQ: Add support for hypergraphs (to be able to add hyper edges to 
+##        model CSMA or wireless networks)
+
+class NetGraph(object):
+    """ NetGraph represents a network topology. 
+    Network graphs are internally using the networkx library.
+
+    """
+
+    def __init__(self, **kwargs):
+        """ A graph can be generated using a specified pattern 
+        (LADDER, MESH, TREE, etc), or provided as an argument.
+
+            :param topology: Undirected graph to use as internal representation 
+            :type topology: networkx.Graph
+
+            :param topo_type: One of TopologyType.{LINEAR,LADDER,MESH,TREE,STAR}
+            used to automatically generate the topology graph. 
+            :type topo_type: TopologyType
+
+            :param node_count: Number of nodes in the topology to be generated. 
+            :type node_count: int
+
+            :param branches: Number of branches (arms) for the STAR topology. 
+            :type branches: int
+
+
+            :param assign_ips: Automatically assign IP addresses to each node. 
+            :type assign_ips: bool
+
+            :param network: Base network segment for IP address assignment.
+            :type network: str
+
+            :param prefix: Base network prefix for IP address assignment.
+            :type prefix: int
+
+            :param version: IP version for IP address assignment.
+            :type version: int
+
+
+            :param assign_st: Select source and target nodes on the graph.
+            :type assign_st: bool
+
+        NOTE: Only point-to-point like network topologies are supported for now.
+                (Wireless and Ethernet networks were several nodes share the same
+                edge (hyperedge) can not be modeled for the moment).
+
+        """
+        self._topology = kwargs.get("topology")
+        self._topo_type = kwargs.get("topo_type", TopologyType.ADHOC)
+
+        if not self.topology:
+            if kwargs.get("node_count"):
+                node_count = kwargs["node_count"]
+                branches = kwargs.get("branches")
+
+                self._topology = self.generate_topology(self.topo_type, 
+                        node_count, branches = branches)
+            else:
+                self._topology = networkx.Graph()
+
+        if kwargs.get("assign_ips"):
+            network = kwargs.get("network", "10.0.0.0")
+            prefix = kwargs.get("prefix", 8)
+            version = kwargs.get("version", 4)
+
+            self.assign_p2p_ips(network = network, prefix = prefix, 
+                    version = version)
+
+        if kwargs.get("assign_st"):
+            self.select_target_zero()
+            self.select_random_leaf_source()
+
+    @property
+    def topology(self):
+        return self._topology
+
+    @property
+    def topo_type(self):
+        return self._topo_type
+
+    @property
+    def order(self):
+        return self.topology.order()
+
+    def nodes(self):
+        return self.topology.nodes()
+
+    def edges(self):
+        return self.topology.edges()
+
+    def generate_topology(self, topo_type, node_count, branches = None):
+        if topo_type == TopologyType.LADDER:
+            total_nodes = node_count/2
+            graph = networkx.ladder_graph(total_nodes)
+
+        elif topo_type == TopologyType.LINEAR:
+            graph = networkx.path_graph(node_count)
+
+        elif topo_type == TopologyType.MESH:
+            graph = networkx.complete_graph(node_count)
+
+        elif topo_type == TopologyType.TREE:
+            h = math.log(node_count + 1)/math.log(2) - 1
+            graph = networkx.balanced_tree(2, h)
+
+        elif topo_type == TopologyType.STAR:
+            graph = networkx.Graph()
+            graph.add_node(0)
+
+            nodesinbranch = (node_count - 1)/ BRANCHES
+            c = 1
+
+            for i in xrange(BRANCHES):
+                prev = 0
+                for n in xrange(1, nodesinbranch + 1):
+                    graph.add_node(c)
+                    graph.add_edge(prev, c)
+                    prev = c
+                    c += 1
+
+        # node ids are int, make them str
+        g = networkx.Graph()
+        g.add_nodes_from(map(lambda nid: str(nid), graph.nodes()))
+        g.add_edges_from(map(lambda t: (str(t[0]), str(t[1])), 
+            graph.edges()))
+
+        return g
+
+    def add_node(self, nid):
+        nid = str(nid)
+
+        if nid not in self.topology: 
+            self.topology.add_node(nid)
+
+    def add_edge(self, nid1, nid2):
+        nid1 = str(nid1)
+        nid2 = str(nid2)
+
+        self.add_node(nid1)
+        self.add_node( nid2)
+
+        if nid1 not in self.topology[nid2]:
+            self.topology.add_edge(nid2, nid1)
+
+    def annotate_node_ip(self, nid, ip):
+        if "ips" not in self.topology.node[nid]:
+            self.topology.node[nid]["ips"] = list()
+
+        self.topology.node[nid]["ips"].append(ip)
+    def node_ip_annotations(self, nid):
+        return self.topology.node[nid].get("ips", [])
+   
+    def annotate_node(self, nid, name, value):
+        if not isinstance(value, str) and not isinstance(value, int) and \
+                not isinstance(value, float) and not isinstance(value, bool):
+            raise RuntimeError, "Non-serializable annotation"
+
+        self.topology.node[nid][name] = value
+    
+    def node_annotation(self, nid, name):
+        return self.topology.node[nid].get(name)
+
+    def node_annotations(self, nid):
+        return self.topology.node[nid].keys()
+    
+    def del_node_annotation(self, nid, name):
+        del self.topology.node[nid][name]
+
+    def annotate_edge(self, nid1, nid2, name, value):
+        if not isinstance(value, str) and not isinstance(value, int) and \
+                not isinstance(value, float) and not isinstance(value, bool):
+            raise RuntimeError, "Non-serializable annotation"
+
+        self.topology.edge[nid1][nid2][name] = value
+   
+    def annotate_edge_net(self, nid1, nid2, ip1, ip2, mask, network, 
+            prefixlen):
+        self.topology.edge[nid1][nid2]["net"] = dict()
+        self.topology.edge[nid1][nid2]["net"][nid1] = ip1
+        self.topology.edge[nid1][nid2]["net"][nid2] = ip2
+        self.topology.edge[nid1][nid2]["net"]["mask"] = mask
+        self.topology.edge[nid1][nid2]["net"]["network"] = network
+        self.topology.edge[nid1][nid2]["net"]["prefix"] = prefixlen
+
+    def edge_net_annotation(self, nid1, nid2):
+        return self.topology.edge[nid1][nid2].get("net", dict())
+    def edge_annotation(self, nid1, nid2, name):
+        return self.topology.edge[nid1][nid2].get(name)
+    def edge_annotations(self, nid1, nid2):
+        return self.topology.edge[nid1][nid2].keys()
+    
+    def del_edge_annotation(self, nid1, nid2, name):
+        del self.topology.edge[nid1][nid2][name]
+
+    def assign_p2p_ips(self, network = "10.0.0.0", prefix = 8, version = 4):
+        """ Assign IP addresses to each end of each edge of the network graph,
+        computing all the point to point subnets and addresses in the network
+        representation.
+
+            :param network: Base network address used for subnetting. 
+            :type network: str
+
+            :param prefix: Prefix for the base network address used for subnetting.
+            :type prefixt: int
+
+            :param version: IP version (either 4 or 6).
+            :type version: int
+
+        """
+        if networkx.number_connected_components(self.topology) > 1:
+            raise RuntimeError("Disconnected graph!!")
+
+        # Assign IP addresses to host
+        netblock = "%s/%d" % (network, prefix)
+        if version == 4:
+            net = ipaddr.IPv4Network(netblock)
+            new_prefix = 30
+        elif version == 6:
+            net = ipaddr.IPv6Network(netblock)
+            new_prefix = 30
+        else:
+            raise RuntimeError, "Invalid IP version %d" % version
+        
+        ## Clear all previusly assigned IPs
+        for nid in self.topology.nodes():
+            self.topology.node[nid]["ips"] = list()
+
+        ## Generate and assign new IPs
+        sub_itr = net.iter_subnets(new_prefix = new_prefix)
+        
+        for nid1, nid2 in self.topology.edges():
+            #### Compute subnets for each link
+            
+            # get a subnet of base_add with prefix /30
+            subnet = sub_itr.next()
+            mask = subnet.netmask.exploded
+            network = subnet.network.exploded
+            prefixlen = subnet.prefixlen
+
+            # get host addresses in that subnet
+            i = subnet.iterhosts()
+            addr1 = i.next()
+            addr2 = i.next()
+
+            ip1 = addr1.exploded
+            ip2 = addr2.exploded
+            self.annotate_edge_net(nid1, nid2, ip1, ip2, mask, network, 
+                    prefixlen)
+
+            self.annotate_node_ip(nid1, ip1)
+            self.annotate_node_ip(nid2, ip2)
+
+    def get_p2p_info(self, nid1, nid2):
+        net = self.topology.edge[nid1][nid2]["net"]
+        return ( net[nid1], net[nid2], net["mask"], net["network"], 
+                net["prefixlen"] )
+
+    def set_source(self, nid):
+        self.topology.node[nid]["source"] = True
+
+    def is_source(self, nid):
+        return self.topology.node[nid].get("source")
+
+    def set_target(self, nid):
+        self.topology.node[nid]["target"] = True
+
+    def is_target(self, nid):
+        return self.topology.node[nid].get("target")
+
+    def targets(self):
+        """ Returns the nodes that are targets """
+        return [nid for nid in self.topology.nodes() \
+                if self.topology.node[nid].get("target")]
+
+    def sources(self):
+        """ Returns the nodes that are sources """
+        return [nid for nid in self.topology.nodes() \
+                if self.topology.node[nid].get("source")]
+
+    def select_target_zero(self):
+        """ Marks the node 0 as target
+        """
+        self.set_target("0")
+
+    def select_random_leaf_source(self):
+        """  Marks a random leaf node as source. 
+        """
+
+        # The ladder is a special case because is not symmetric.
+        if self.topo_type == TopologyType.LADDER:
+            total_nodes = self.order/2
+            leaf1 = str(total_nodes - 1)
+            leaf2 = str(nodes - 1)
+            leaves = [leaf1, leaf2]
+            source = leaves.pop(random.randint(0, len(leaves) - 1))
+        else:
+            # options must not be already sources or targets
+            options = [ k for k,v in self.topology.degree().iteritems() \
+                    if v == 1 and not self.topology.node[k].get("source") \
+                        and not self.topology.node[k].get("target")]
+
+            source = options.pop(random.randint(0, len(options) - 1))
+        
+        self.set_source(source)
+
index d5bc8a3..ca68be1 100644 (file)
@@ -17,6 +17,7 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
+from nepi.util.netgraph import NetGraph, TopologyType 
 from nepi.util.timefuncs import stformat, tsformat
 
 from xml.dom import minidom
@@ -91,14 +92,85 @@ class ECXMLParser(object):
         ecnode.setAttribute("exp_id", xmlencode(ec.exp_id))
         ecnode.setAttribute("run_id", xmlencode(ec.run_id))
         ecnode.setAttribute("nthreads", xmlencode(ec.nthreads))
+        ecnode.setAttribute("local_dir", xmlencode(ec.local_dir))
         doc.appendChild(ecnode)
 
+        if ec.netgraph != None:
+            self._netgraph_to_xml(doc, ecnode, ec)
+
+        rmsnode = doc.createElement("rms")
+        ecnode.appendChild(rmsnode)
+
         for guid, rm in ec._resources.iteritems():
-            self._rm_to_xml(doc, ecnode, ec, guid, rm)
+            self._rm_to_xml(doc, rmsnode, ec, guid, rm)
 
         return doc
-
-    def _rm_to_xml(self, doc, ecnode, ec, guid, rm):
+    
+    def _netgraph_to_xml(self, doc, ecnode, ec):
+        ngnode = doc.createElement("topology")
+        ngnode.setAttribute("topo-type", xmlencode(ec.netgraph.topo_type))
+        ecnode.appendChild(ngnode)
+        
+        self. _netgraph_nodes_to_xml(doc, ngnode, ec)
+        self. _netgraph_edges_to_xml(doc, ngnode, ec)
+        
+    def _netgraph_nodes_to_xml(self, doc, ngnode, ec):
+        ngnsnode = doc.createElement("nodes")
+        ngnode.appendChild(ngnsnode)
+
+        for nid in ec.netgraph.nodes():
+            ngnnode = doc.createElement("node")
+            ngnnode.setAttribute("nid", xmlencode(nid))
+            ngnsnode.appendChild(ngnnode)
+
+            # Mark ources and targets
+            if ec.netgraph.is_source(nid):
+                ngnnode.setAttribute("source", xmlencode(True))
+
+            if ec.netgraph.is_target(nid):
+                ngnnode.setAttribute("target", xmlencode(True))
+
+            # Node annotations
+            annosnode = doc.createElement("node-annotations")
+            add_annotations = False
+            for name in ec.netgraph.node_annotations(nid):
+                add_annotations = True
+                value = ec.netgraph.node_annotation(nid, name)
+                annonode = doc.createElement("node-annotation")
+                annonode.setAttribute("name", xmlencode(name))
+                annonode.setAttribute("value", xmlencode(value))
+                annonode.setAttribute("type", from_type(value))
+                annosnode.appendChild(annonode)
+
+            if add_annotations:
+                ngnnode.appendChild(annosnode)
+
+    def _netgraph_edges_to_xml(self, doc, ngnode, ec):
+        ngesnode = doc.createElement("edges")
+        ngnode.appendChild(ngesnode)
+
+        for nid1, nid2 in ec.netgraph.edges():
+            ngenode = doc.createElement("edge")
+            ngenode.setAttribute("nid1", xmlencode(nid1))
+            ngenode.setAttribute("nid2", xmlencode(nid2))
+            ngesnode.appendChild(ngenode)
+
+            # Edge annotations
+            annosnode = doc.createElement("edge-annotations")
+            add_annotations = False
+            for name in ec.netgraph.edge_annotations(nid1, nid2):
+                add_annotations = True
+                value = ec.netgraph.edge_annotation(nid1, nid2, name)
+                annonode = doc.createElement("edge-annotation")
+                annonode.setAttribute("name", xmlencode(name))
+                annonode.setAttribute("value", xmlencode(value))
+                annonode.setAttribute("type", from_type(value))
+                annosnode.appendChild(annonode)
+
+            if add_annotations:
+                ngenode.appendChild(annosnode)
+
+    def _rm_to_xml(self, doc, rmsnode, ec, guid, rm):
         rmnode = doc.createElement("rm")
         rmnode.setAttribute("guid", xmlencode(guid))
         rmnode.setAttribute("rtype", xmlencode(rm._rtype))
@@ -117,7 +189,7 @@ class ECXMLParser(object):
             rmnode.setAttribute("release_time", xmlencode(rm._release_time))
         if rm._failed_time:
             rmnode.setAttribute("failed_time", xmlencode(rm._failed_time))
-        ecnode.appendChild(rmnode)
+        rmsnode.appendChild(rmnode)
 
         anode = doc.createElement("attributes")
         attributes = False
@@ -188,17 +260,28 @@ class ECXMLParser(object):
             if ecnode.nodeType == doc.ELEMENT_NODE:
                 exp_id = xmldecode(ecnode.getAttribute("exp_id"))
                 run_id = xmldecode(ecnode.getAttribute("run_id"))
+                local_dir = xmldecode(ecnode.getAttribute("local_dir"))
+
+                # Configure number of preocessing threads
                 nthreads = xmldecode(ecnode.getAttribute("nthreads"))
-            
                 os.environ["NEPI_NTHREADS"] = nthreads
-                ec = ExperimentController(exp_id = exp_id)
+
+                # Deserialize netgraph
+                netgraph = self._netgraph_from_xml(doc, ecnode)
+                topo_type = netgraph.topo_type if netgraph else None
+
+                # Instantiate EC
+                ec = ExperimentController(exp_id = exp_id, local_dir = local_dir, 
+                        topology = netgraph.topology, topo_type = topo_type)
 
                 connections = set()
 
-                rmnode_list = ecnode.getElementsByTagName("rm")
-                for rmnode in rmnode_list:
-                    if rmnode.nodeType == doc.ELEMENT_NODE:
-                        self._rm_from_xml(doc, rmnode, ec, connections)
+                rmsnode_list = ecnode.getElementsByTagName("rms")
+                if rmsnode_list:
+                    rmnode_list = rmsnode_list[0].getElementsByTagName("rm") 
+                    for rmnode in rmnode_list:
+                        if rmnode.nodeType == doc.ELEMENT_NODE:
+                            self._rm_from_xml(doc, rmnode, ec, connections)
 
                 for (guid1, guid2) in connections:
                     ec.register_connection(guid1, guid2)
@@ -207,6 +290,70 @@ class ECXMLParser(object):
 
         return ec
 
+    def _netgraph_from_xml(self, doc, ecnode):
+        netgraph = None
+
+        topology = ecnode.getElementsByTagName("topology")
+        if topology:
+            topology = topology[0]
+            topo_type = xmldecode(topology.getAttribute("topo-type"))
+
+            netgraph = NetGraph(topo_type = topo_type)
+
+            ngnsnode_list = topology.getElementsByTagName("nodes")
+            if ngnsnode_list:
+                ngnsnode = ngnsnode_list[0].getElementsByTagName("node") 
+                for ngnnode in ngnsnode:
+                    nid = xmldecode(ngnnode.getAttribute("nid"))
+                    netgraph.add_node(nid)
+
+                    if ngnnode.hasAttribute("source"):
+                        netgraph.set_source(nid)
+                    if ngnnode.hasAttribute("target"):
+                        netgraph.set_target(nid)
+
+                    annosnode_list = ngnnode.getElementsByTagName("node-annotations")
+                    
+                    if annosnode_list:
+                        annosnode = annosnode_list[0].getElementsByTagName("node-annotation") 
+                        for annonode in annosnode:
+                            name = xmldecode(annonode.getAttribute("name"))
+
+                            if name == "ips":
+                                ips = xmldecode(annonode.getAttribute("value"), eval) # list
+                                for ip in ips:
+                                    netgraph.annotate_node_ip(nid, ip)
+                            else:
+                                value = xmldecode(annonode.getAttribute("value"))
+                                tipe = xmldecode(annonode.getAttribute("type"))
+                                value = to_type(tipe, value)
+                                netgraph.annotate_node(nid, name, value)
+
+            ngesnode_list = topology.getElementsByTagName("edges") 
+            if ngesnode_list:
+                ngesnode = ngesnode_list[0].getElementsByTagName("edge") 
+                for ngenode in ngesnode:
+                    nid1 = xmldecode(ngenode.getAttribute("nid1"))
+                    nid2 = xmldecode(ngenode.getAttribute("nid2"))
+                    netgraph.add_edge(nid1, nid2)
+
+                    annosnode_list = ngenode.getElementsByTagName("edge-annotations")
+                    if annosnode_list:
+                        annosnode = annosnode_list[0].getElementsByTagName("edge-annotation") 
+                        for annonode in annosnode:
+                            name = xmldecode(annonode.getAttribute("name"))
+
+                            if name == "net":
+                                net = xmldecode(annonode.getAttribute("value"), eval) # dict
+                                netgraph.annotate_edge_net(nid1, nid2, net[nid1], net[nid2], 
+                                        net["mask"], net["network"], net["prefix"])
+                            else:
+                                value = xmldecode(annonode.getAttribute("value"))
+                                tipe = xmldecode(annonode.getAttribute("type"))
+                                value = to_type(tipe, value)
+                                netgraph.annotate_edge(nid1, nid2, name, value)
+        return netgraph
+
     def _rm_from_xml(self, doc, rmnode, ec, connections):
         start_time = None
         stop_time = None
@@ -286,7 +433,7 @@ class ECXMLParser(object):
             ccnnode_list = cnnode_list[0].getElementsByTagName("condition") 
             for ccnnode in ccnnode_list:
                 action = xmldecode(ccnnode.getAttribute("action"), int)
-                group = xmldecode(ccnnode.getAttribute("group"), eval)
+                group = xmldecode(ccnnode.getAttribute("group"), eval) # list
                 state = xmldecode(ccnnode.getAttribute("state"), int)
                 time = xmldecode(ccnnode.getAttribute("time"))
                 time = to_type('STRING', time)
index 48b826e..6b5622e 100644 (file)
@@ -23,7 +23,7 @@ import os
 try:
     import networkx
 except ImportError:
-    msg = ("Networkx library is not installed, you will not be able to plot.")
+    msg = "Networkx library is not installed, you will not be able to plot."
     logger = Logger("Plotter")
     logger.debug(msg)
 
index aaf1aff..7347c5b 100644 (file)
@@ -46,11 +46,7 @@ class ECSerializer(object):
 
         return sec
 
-    def save(self, ec, dirpath = None, format = SFormats.XML):
-        if not dirpath:
-            import tempfile
-            dirpath = tempfile.mkdtemp()
+    def save(self, ec, dirpath, format = SFormats.XML):
         date = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
         filename = "%s_%s" % (ec.exp_id, date)
 
diff --git a/src/nepi/util/statfuncs.py b/src/nepi/util/statfuncs.py
new file mode 100644 (file)
index 0000000..5c0b2ca
--- /dev/null
@@ -0,0 +1,46 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import math
+import numpy
+from scipy import stats
+
+def compute_mean(sample):
+    # TODO: Discard outliers !!!!
+
+    if not sample:
+        print " CANNOT COMPUTE STATS for ", sample
+        return (0, 0, 0, 0)
+
+    x = numpy.array(sample)
+
+    # sample mean and standard deviation
+    n, min_max, mean, var, skew, kurt = stats.describe(x)
+    std = math.sqrt(var)
+
+    # for the population mean and std ...
+    # mean = x.mean()
+    # std = x.std()
+    
+    # Calculate confidence interval t-distribution
+    ## BUG: Use quantil of NORMAL distribution, not t-student quantil distribution
+    ci = stats.t.interval(0.95, n-1, loc = mean, scale = std/math.sqrt(n))
+
+    return (mean, std, ci[0], ci[1])
+
index 3d7c366..f7fbc85 100644 (file)
@@ -113,3 +113,11 @@ def stabsformat(sdate, dbase = None):
 
     return None
 
+def compute_delay_ms(timestamp2, timestamp1):
+    d1 = datetime.datetime.fromtimestamp(float(timestamp1))
+    d2 = datetime.datetime.fromtimestamp(float(timestamp2))
+    delay = d2 - d1
+
+    # round up resolution - round up to miliseconds
+    return delay.total_seconds() * 1000
+
index d548bd2..dbee7df 100755 (executable)
@@ -49,7 +49,8 @@ class LinuxMultiRunTestCase(unittest.TestCase):
 
         dirpath = tempfile.mkdtemp()
 
-        ec = ExperimentController(exp_id="test-condition-multirun")
+        ec = ExperimentController(exp_id = "test-condition-multirun", 
+                local_dir = dirpath)
         
         node = ec.register_resource("LinuxNode")
         ec.set(node, "hostname", host)
@@ -63,8 +64,6 @@ class LinuxMultiRunTestCase(unittest.TestCase):
 
         collector = ec.register_resource("Collector")
         ec.set(collector, "traceName", "stdout")
-        ec.set(collector, "storeDir", dirpath)
-        ec.set(collector, "useRunId", True)
         ec.register_connection(ping, collector)
 
         def compute_metric_callback(ping, ec, run):
@@ -87,8 +86,9 @@ class LinuxMultiRunTestCase(unittest.TestCase):
         self.assertTrue(runs >= 5)
 
         dircount = 0
-        for d in os.listdir(dirpath):
-            path = os.path.join(dirpath, d)
+
+        for d in os.listdir(ec.exp_dir):
+            path = os.path.join(ec.exp_dir, d)
             if os.path.isdir(path):
                 dircount += 1
                 logs = glob.glob(os.path.join(path, "*.stdout"))
index 3896ea2..993a0f4 100755 (executable)
@@ -132,7 +132,7 @@ class SerializerTestCase(unittest.TestCase):
 
         # Load serialized experiment
         ec2 = ExperimentController.load(filepath)
-        apps = ec2.get_resources_by_type("dummy::Application")
+        apps = ec2.filter_resources("dummy::Application")
         ec2.deploy()
         ec2.wait_finished(apps)
         ec2.shutdown()