From: Alina Quereilhac Date: Thu, 21 Aug 2014 22:12:05 +0000 (+0200) Subject: Merging nepi-3.1-icn into nepi-3-dev X-Git-Tag: nepi-3.2.0~99 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ec3460fce064ea44366cb417dea0f9e148d3d804;hp=74bb6c4d720bab1e65a71db74525a1bf55cfcb2d;p=nepi.git Merging nepi-3.1-icn into nepi-3-dev --- diff --git a/examples/ccn_emu_live/ccn_flooding/dce.py b/examples/ccn_emu_live/ccn_flooding/dce.py new file mode 100644 index 00000000..3386e792 --- /dev/null +++ b/examples/ccn_emu_live/ccn_flooding/dce.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python + +############################################################################### +# +# NEPI, a framework to manage network experiments +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# +############################################################################### + +from nepi.execution.ec import ExperimentController +from nepi.execution.runner import ExperimentRunner +from nepi.util.netgraph import NetGraph, TopologyType +import nepi.data.processing.ccn.parser as ccn_parser + +import networkx +import socket +import os + +content_name = "ccnx:/test/bunny.ts" + +STOP_TIME = "5000s" + +repofile = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2") + +def get_simulator(ec): + simulator = ec.filter_resources("LinuxNS3Simulation") + + if not simulator: + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", "localhost") + + simu = ec.register_resource("LinuxNS3Simulation") + ec.register_connection(simu, node) + return simu + + return simulator[0] + +def add_collector(ec, trace_name, subdir, newname = None): + collector = ec.register_resource("Collector") + ec.set(collector, "traceName", trace_name) + ec.set(collector, "subDir", subdir) + if newname: + ec.set(collector, "rename", newname) + + return collector + +def add_dce_host(ec, nid): + simu = get_simulator(ec) + + host = ec.register_resource("ns3::Node") + ec.set(host, "enableStack", True) + ec.register_connection(host, simu) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "host", host) + +def add_dce_ccnd(ec, nid): + # Retrieve annotation from netgraph + host = ec.netgraph.node_annotation(nid, "host") + + # Add dce ccnd to the dce node + ccnd = ec.register_resource("ns3::LinuxDceCCND") + ec.set (ccnd, "stackSize", 1<<20) + ec.set (ccnd, "debug", 7) + ec.set (ccnd, "capacity", 50000) + ec.set (ccnd, "StartTime", "1s") + ec.set (ccnd, "StopTime", STOP_TIME) + ec.register_connection(ccnd, host) + + # Collector to retrieve ccnd log + collector = add_collector(ec, "stderr", nid, "log") + ec.register_connection(collector, ccnd) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "ccnd", ccnd) + +def add_dce_ccnr(ec, nid): + # Retrieve annotation from netgraph + host = ec.netgraph.node_annotation(nid, "host") + + # Add a CCN content repository to the dce node + ccnr = ec.register_resource("ns3::LinuxDceCCNR") + ec.set (ccnr, "repoFile1", repofile) + ec.set (ccnr, "stackSize", 1<<20) + ec.set (ccnr, "StartTime", "2s") + ec.set (ccnr, "StopTime", STOP_TIME) + ec.register_connection(ccnr, host) + +def add_dce_ccncat(ec, nid): + # Retrieve annotation from netgraph + host = ec.netgraph.node_annotation(nid, "host") + + ccnpeek = ec.register_resource("ns3::LinuxDceCCNPeek") + #ec.set (ccnpeek, "contentName", "ccnx:/chunk0") + ec.set (ccnpeek, "contentName", content_name) + ec.set (ccnpeek, "stackSize", 1<<20) + ec.set (ccnpeek, "StartTime", "5s") + ec.set (ccnpeek, "StopTime", STOP_TIME) + ec.register_connection(ccnpeek, host) + + collector = add_collector(ec, "stdout", nid, "peek") + ec.register_connection(collector, ccnpeek) + + # Add a ccncat application to the dce host + ccncat = ec.register_resource("ns3::LinuxDceCCNCat") + ec.set (ccncat, "contentName", content_name) + ec.set (ccncat, "stackSize", 1<<20) + ec.set (ccncat, "StartTime", "8s") + ec.set (ccncat, "StopTime", STOP_TIME) + ec.register_connection(ccncat, host) + +def add_dce_fib_entry(ec, nid1, nid2): + # Retrieve annotations from netgraph + host1 = ec.netgraph.node_annotation(nid1, "host") + net = ec.netgraph.edge_net_annotation(nid1, nid2) + ip2 = net[nid2] + + # Add FIB entry between peer hosts + ccndc = ec.register_resource("ns3::LinuxDceFIBEntry") + ec.set (ccndc, "protocol", "udp") + ec.set (ccndc, "uri", "ccnx:/") + ec.set (ccndc, "host", ip2) + ec.set (ccndc, "stackSize", 1<<20) + ec.set (ccndc, "StartTime", "2s") + ec.set (ccndc, "StopTime", STOP_TIME) + ec.register_connection(ccndc, host1) + +def add_dce_net_iface(ec, nid1, nid2): + # Retrieve annotations from netgraph + host = ec.netgraph.node_annotation(nid1, "host") + net = ec.netgraph.edge_net_annotation(nid1, nid2) + ip1 = net[nid1] + prefix = net["prefix"] + + dev = ec.register_resource("ns3::PointToPointNetDevice") + ec.set(dev,"DataRate", "5Mbps") + ec.set(dev, "ip", ip1) + ec.set(dev, "prefix", prefix) + ec.register_connection(host, dev) + + queue = ec.register_resource("ns3::DropTailQueue") + ec.register_connection(dev, queue) + + return dev + +def avg_interests(ec, run): + ## Process logs + logs_dir = ec.run_dir + + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = ccn_parser.process_content_history_logs( + logs_dir, + ec.netgraph.topology) + + shortest_path = networkx.shortest_path(graph, + source = ec.netgraph.sources()[0], + target = ec.netgraph.targets()[0]) + + ### Compute metric: Avg number of Interests seen per content name + ### normalized by the number of nodes in the shortest path + content_name_count = len(content_names.values()) + nodes_in_shortest_path = len(shortest_path) - 1 + metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path)) + + # TODO: DUMP RESULTS TO FILE + # TODO: DUMP GRAPH DELAYS! + f = open("/tmp/metric", "w+") + f.write("%.2f\n" % metric) + f.close() + print " METRIC", metric + + return metric + +def add_dce_edge(ec, nid1, nid2): + ### Add network interfaces to hosts + p2p1 = add_dce_net_iface(ec, nid1, nid2) + p2p2 = add_dce_net_iface(ec, nid2, nid1) + + # Create point to point link between interfaces + chan = ec.register_resource("ns3::PointToPointChannel") + ec.set(chan, "Delay", "0ms") + + ec.register_connection(chan, p2p1) + ec.register_connection(chan, p2p2) + + #### Add routing between CCN nodes + add_dce_fib_entry(ec, nid1, nid2) + add_dce_fib_entry(ec, nid2, nid1) + +def add_dce_node(ec, nid): + ### Add CCN nodes (ec.netgraph holds the topology graph) + add_dce_host(ec, nid) + add_dce_ccnd(ec, nid) + + if nid == ec.netgraph.targets()[0]: + add_dce_ccnr(ec, nid) + + if nid == ec.netgraph.sources()[0]: + add_dce_ccncat(ec, nid) + +if __name__ == '__main__': + + #### Create NEPI Experiment Description with LINEAR topology + ec = ExperimentController("dce_ccn", + topo_type = TopologyType.LINEAR, + node_count = 4, + assign_st = True, + assign_ips = True, + add_node_callback = add_dce_node, + add_edge_callback = add_dce_edge) + + print "Results stored at", ec.exp_dir + + #### Retrieve the consumer to wait for ot to finish + ccncat = ec.filter_resources("ns3::LinuxDceCCNCat") + + #### Run experiment until metric convergences + rnr = ExperimentRunner() + runs = rnr.run(ec, min_runs = 1, max_runs = 1, + compute_metric_callback = avg_interests, + wait_guids = ccncat, + wait_time = 0) + diff --git a/examples/ccn_emu_live/ccn_flooding/planetlab.py b/examples/ccn_emu_live/ccn_flooding/planetlab.py new file mode 100644 index 00000000..766e96b2 --- /dev/null +++ b/examples/ccn_emu_live/ccn_flooding/planetlab.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +############################################################################### +# +# NEPI, a framework to manage network experiments +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# +############################################################################### + +from nepi.execution.ec import ExperimentController +from nepi.execution.runner import ExperimentRunner +from nepi.util.netgraph import NetGraph, TopologyType +import nepi.data.processing.ccn.parser as ccn_parser + +import networkx +import socket +import os + +PL_NODES = dict({ + "0": "iraplab1.iralab.uni-karlsruhe.de", + "1": "planetlab1.informatik.uni-goettingen.de", + "2": "dfn-ple1.x-win.dfn.de", + "3": "mars.planetlab.haw-hamburg.de", + "4": "planetlab2.unineuchatel.ch", + "5": "planetlab-node3.it-sudparis.eu", + "6": "planetlab2.extern.kuleuven.be", + "7": "node2pl.planet-lab.telecom-lille1.eu", + "8": "planetvs2.informatik.uni-stuttgart.de", + "9": "planetlab1.informatik.uni-wuerzburg.de", + "10": "planet1.l3s.uni-hannover.de", + "11": "planetlab1.wiwi.hu-berlin.de", + "12": "pl2.uni-rostock.de", + "13": "planetlab1.u-strasbg.fr", + "14": "peeramidion.irisa.fr" + }) + +pl_slice = os.environ.get("PL_SLICE") +pl_user = os.environ.get("PL_USER") +pl_password = os.environ.get("PL_PASS") +pl_ssh_key = os.environ.get("PL_SSHKEY") + +content_name = "ccnx:/test/bunny.ts" + +pipeline = 4 # Default value for ccncat + +operating_system = "f14" + +country = "germany" + +repofile = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2") + +def add_collector(ec, trace_name, subdir, newname = None): + collector = ec.register_resource("Collector") + ec.set(collector, "traceName", trace_name) + ec.set(collector, "subDir", subdir) + if newname: + ec.set(collector, "rename", newname) + + return collector + +def add_pl_host(ec, nid): + hostname = PL_NODES[nid] + + # Add a planetlab host to the experiment description + host = ec.register_resource("PlanetlabNode") + ec.set(host, "hostname", hostname) + ec.set(host, "username", pl_slice) + ec.set(host, "identity", pl_ssh_key) + #ec.set(host, "pluser", pl_user) + #ec.set(host, "plpassword", pl_password) + #ec.set(host, "country", country) + #ec.set(host, "operatingSystem", operating_system) + ec.set(host, "cleanExperiment", True) + ec.set(host, "cleanProcesses", True) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "hostname", hostname) + ec.netgraph.annotate_node(nid, "host", host) + + # Annotate the graph node with an ip address + ip = socket.gethostbyname(hostname) + ec.netgraph.annotate_node_ip(nid, ip) + +def add_pl_ccnd(ec, nid): + # Retrieve annotation from netgraph + host = ec.netgraph.node_annotation(nid, "host") + + # Add a CCN daemon to the planetlab node + ccnd = ec.register_resource("LinuxCCND") + ec.set(ccnd, "debug", 7) + ec.register_connection(ccnd, host) + + # Collector to retrieve ccnd log + collector = add_collector(ec, "stderr", nid, "log") + ec.register_connection(collector, ccnd) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "ccnd", ccnd) + +def add_pl_ccnr(ec, nid): + # Retrieve annotation from netgraph + ccnd = ec.netgraph.node_annotation(nid, "ccnd") + + # Add a CCN content repository to the planetlab node + ccnr = ec.register_resource("LinuxCCNR") + + ec.set(ccnr, "repoFile1", repofile) + ec.register_connection(ccnr, ccnd) + +def add_pl_ccncat(ec, nid): + # Retrieve annotation from netgraph + ccnd = ec.netgraph.node_annotation(nid, "ccnd") + + # Add a CCN cat application to the planetlab node + ccncat = ec.register_resource("LinuxCCNCat") + ec.set(ccncat, "pipeline", pipeline) + ec.set(ccncat, "contentName", content_name) + ec.register_connection(ccncat, ccnd) + +def add_pl_fib_entry(ec, nid1, nid2): + # Retrieve annotations from netgraph + ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd") + hostname2 = ec.netgraph.node_annotation(nid2, "hostname") + + # Add a FIB entry between one planetlab node and its peer + entry = ec.register_resource("LinuxFIBEntry") + ec.set(entry, "host", hostname2) + ec.register_connection(entry, ccnd1) + + # Collector to retrieve peering ping output (to measure neighbors delay) + ec.enable_trace(entry, "ping") + collector = add_collector(ec, "ping", nid1) + ec.register_connection(collector, entry) + + return entry + +def avg_interests(ec, run): + ## Process logs + logs_dir = ec.run_dir + + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = ccn_parser.process_content_history_logs( + logs_dir, + ec.netgraph.topology) + + shortest_path = networkx.shortest_path(graph, + source = ec.netgraph.sources()[0], + target = ec.netgraph.targets()[0]) + + ### Compute metric: Avg number of Interests seen per content name + ### normalized by the number of nodes in the shortest path + content_name_count = len(content_names.values()) + nodes_in_shortest_path = len(shortest_path) - 1 + metric = interest_count / (float(content_name_count) * float(nodes_in_shortest_path)) + + # TODO: DUMP RESULTS TO FILE + # TODO: DUMP GRAPH DELAYS! + f = open("/tmp/metric", "w+") + f.write("%.2f\n" % metric) + f.close() + print " METRIC", metric + + return metric + +def add_pl_edge(ec, nid1, nid2): + #### Add connections between CCN nodes + add_pl_fib_entry(ec, nid1, nid2) + add_pl_fib_entry(ec, nid2, nid1) + +def add_pl_node(ec, nid): + ### Add CCN nodes (ec.netgraph holds the topology graph) + add_pl_host(ec, nid) + add_pl_ccnd(ec, nid) + + if nid == ec.netgraph.targets()[0]: + add_pl_ccnr(ec, nid) + + if nid == ec.netgraph.sources()[0]: + add_pl_ccncat(ec, nid) + +if __name__ == '__main__': + + #### Create NEPI Experiment Description with LINEAR topology + ec = ExperimentController("pl_ccn", + topo_type = TopologyType.LINEAR, + node_count = 4, + #assign_ips = True, + assign_st = True, + add_node_callback = add_pl_node, + add_edge_callback = add_pl_edge) + + print "Results stored at", ec.exp_dir + + #### Retrieve the content producing resource to wait for ot to finish + ccncat = ec.filter_resources("LinuxCCNCat") + + #### Run experiment until metric convergences + rnr = ExperimentRunner() + runs = rnr.run(ec, min_runs = 10, max_runs = 300, + compute_metric_callback = avg_interests, + wait_guids = ccncat, + wait_time = 0) + diff --git a/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 b/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 new file mode 100644 index 00000000..a90b0d96 Binary files /dev/null and b/examples/ccn_emu_live/ccn_flooding/repoFile1.0.8.2 differ diff --git a/examples/linux/ccn/ccncat_extended_ring_topo.py b/examples/linux/ccn/ccncat_extended_ring_topo.py index a67075b3..af823a26 100644 --- a/examples/linux/ccn/ccncat_extended_ring_topo.py +++ b/examples/linux/ccn/ccncat_extended_ring_topo.py @@ -92,10 +92,9 @@ def add_stream(ec, ccnd, content_name): return app -def add_collector(ec, trace_name, store_dir): +def add_collector(ec, trace_name): collector = ec.register_resource("Collector") ec.set(collector, "traceName", trace_name) - ec.set(collector, "storeDir", store_dir) return collector @@ -136,7 +135,7 @@ if __name__ == '__main__': ( pl_user, movie, exp_id, pl_ssh_key, results_dir ) = get_options() - ec = ExperimentController(exp_id = exp_id) + ec = ExperimentController(exp_id = exp_id, local_dir = results_dir) # hosts in the US #host1 = "planetlab4.wail.wisc.edu" @@ -214,7 +213,7 @@ if __name__ == '__main__': app, ResourceState.STARTED, time = "10s") # Register a collector to automatically collect traces - collector = add_collector(ec, "stderr", results_dir) + collector = add_collector(ec, "stderr") for ccnd in ccnds.values(): ec.register_connection(collector, ccnd) diff --git a/examples/linux/ccn/two_nodes_file_retrieval.py b/examples/linux/ccn/two_nodes_file_retrieval.py index 8e40c3bd..10814ae7 100644 --- a/examples/linux/ccn/two_nodes_file_retrieval.py +++ b/examples/linux/ccn/two_nodes_file_retrieval.py @@ -34,8 +34,10 @@ import os ssh_key = ####### <<< ASSING the absolute path to the private SSH key to login into the remote host >>> ssh_user = ####### <<< ASSING the SSH username >>> +results_dir = "/tmp/demo_CCN_results" + ## Create the experiment controller -ec = ExperimentController(exp_id = "demo_CCN") +ec = ExperimentController(exp_id = "demo_CCN", local_dir = results_dir) ## Register node 1 node1 = ec.register_resource("LinuxNode") @@ -113,16 +115,13 @@ ec.register_connection(app, ccnd2) # Register a collector to automatically collect the ccnd logs # to a local directory -results_dir = "/tmp/demo_CCN_results" col1 = ec.register_resource("Collector") ec.set(col1, "traceName", "stderr") -ec.set(col1, "storeDir", results_dir) ec.set(col1, "subDir", hostname1) ec.register_connection(col1, ccnd1) col2 = ec.register_resource("Collector") ec.set(col2, "traceName", "stderr") -ec.set(col2, "storeDir", results_dir) ec.set(col2, "subDir", hostname2) ec.register_connection(col2, ccnd2) diff --git a/examples/planetlab/ccn/two_nodes_file_retrieval.py b/examples/planetlab/ccn/two_nodes_file_retrieval.py index 096268f7..57c58089 100644 --- a/examples/planetlab/ccn/two_nodes_file_retrieval.py +++ b/examples/planetlab/ccn/two_nodes_file_retrieval.py @@ -36,8 +36,11 @@ pl_pass = ######## <<< ASSIGN the password used to login to the PlanetLab websit pl_ssh_key = ####### <<< ASSING the absolute path to the private SSH key used for Planetlab >>> slicename = ####### <<< ASSING the PlanetLab slicename >>> +results_dir = "/tmp/demo_CCN_results" + ## Create the experiment controller -ec = ExperimentController(exp_id = "demo_CCN") +ec = ExperimentController(exp_id = "demo_CCN", + local_dir = results_dir) ## Register node 1 node1 = ec.register_resource("PlanetlabNode") @@ -137,16 +140,13 @@ ec.register_connection(app, ccnd2) # Register a collector to automatically collect the ccnd logs # to a local directory -results_dir = "/tmp/demo_CCN_results" col1 = ec.register_resource("Collector") ec.set(col1, "traceName", "stderr") -ec.set(col1, "storeDir", results_dir) ec.set(col1, "subDir", hostname1) ec.register_connection(col1, ccnd1) col2 = ec.register_resource("Collector") ec.set(col2, "traceName", "stderr") -ec.set(col2, "storeDir", results_dir) ec.set(col2, "subDir", hostname2) ec.register_connection(col2, ccnd2) diff --git a/src/nepi/data/__init__.py b/src/nepi/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/__init__.py b/src/nepi/data/processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ccn/__init__.py b/src/nepi/data/processing/ccn/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ccn/parser.py b/src/nepi/data/processing/ccn/parser.py new file mode 100644 index 00000000..e3c1007a --- /dev/null +++ b/src/nepi/data/processing/ccn/parser.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python + +############################################################################### +# +# CCNX benchmark +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# Author: Alina Quereilhac +# +############################################################################### + +# +# This library contains functions to parse (CCNx) ccnd logs. +# +# Results from experiments must be stored in a directory +# named with the experiment run id. +# ccnd logs are stored in .log files in a subdirectory per node. +# The following diagram exemplifies the experiment result directory +# structure (nidi is the unique identifier assigned to node i): +# +# run_id +# \ nid1 +# \ nid2.log +# \ nid2 +# \ nid1.log +# \ nid3 +# \ nid3.log +# + +import collections +import functools +import networkx +import os +import pickle +import tempfile + +from nepi.util.timefuncs import compute_delay_ms +from nepi.util.statfuncs import compute_mean +import nepi.data.processing.ping.parser as ping_parser + +def is_control(content_name): + return content_name.startswith("ccnx:/%C1") or \ + content_name.startswith("ccnx:/ccnx") or \ + content_name.startswith("ccnx:/...") + + +def parse_file(filename): + """ Parses message information from ccnd log files + + filename: path to ccndlog file + + """ + + faces = dict() + sep = " " + + f = open(filename, "r") + + data = [] + + for line in f: + cols = line.strip().split(sep) + + # CCN_PEEK + # MESSAGE interest_from + # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7) + # + # MESSAGE interest_to + # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7) + # + # MESSAGE CONTENT FROM + # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # MESSAGE CONTENT_TO + # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048) + + # External face creation + # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695 + + if line.find("accepted datagram client") > -1: + face_id = (cols[5]).replace("id=",'') + ip = cols[7] + port = cols[9] + faces[face_id] = (ip, port) + continue + + # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4) + if line.find("releasing face id") > -1: + face_id = cols[5] + if face_id in faces: + del faces[face_id] + continue + + if len(cols) < 6: + continue + + timestamp = cols[0] + message_type = cols[3] + + if message_type not in ["interest_from", "interest_to", "content_from", + "content_to", "interest_dupnonce", "interest_expiry"]: + continue + + face_id = cols[4] + content_name = cols[5] + + # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4 + nonce = "" + if message_type in ["interest_from", "interest_to", "interest_dupnonce"]: + last = cols[-1] + if len(last.split("-")) == 5: + nonce = last + + try: + size = int((cols[6]).replace('(','')) + except: + print "interest_expiry without face id!", line + continue + + # If no external IP address was identified for this face + # asume it is a local face + peer = "localhost" + + if face_id in faces: + peer, port = faces[face_id] + + data.append((content_name, timestamp, message_type, peer, face_id, + size, nonce, line)) + + f.close() + + return data + +def dump_content_history(content_history): + f = tempfile.NamedTemporaryFile(delete=False) + pickle.dump(content_history, f) + f.close() + return f.name + +def load_content_history(fname): + f = open(fname, "r") + content_history = pickle.load(f) + f.close() + + os.remove(fname) + return content_history + +def annotate_cn_node(graph, nid, ips2nid, data, content_history): + for (content_name, timestamp, message_type, peer, face_id, + size, nonce, line) in data: + + # Ignore control messages for the time being + if is_control(content_name): + continue + + if message_type == "interest_from" and \ + peer == "localhost": + graph.node[nid]["ccn_consumer"] = True + elif message_type == "content_from" and \ + peer == "localhost": + graph.node[nid]["ccn_producer"] = True + + # Ignore local messages for the time being. + # They could later be used to calculate the processing times + # of messages. + if peer == "localhost": + continue + + # remove digest + if message_type in ["content_from", "content_to"]: + content_name = "/".join(content_name.split("/")[:-1]) + + if content_name not in content_history: + content_history[content_name] = list() + + peernid = ips2nid[peer] + graph.add_edge(nid, peernid) + + content_history[content_name].append((timestamp, message_type, nid, + peernid, nonce, size, line)) + +def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False): + """ Adds CCN content history for each node in the topology graph. + + """ + # Make a copy of the graph to ensure integrity + graph = graph.copy() + + ips2nid = dict() + + for nid in graph.nodes(): + ips = graph.node[nid]["ips"] + for ip in ips: + ips2nid[ip] = nid + + # Now walk through the ccnd logs... + for dirpath, dnames, fnames in os.walk(logs_dir): + # continue if we are not at the leaf level (if there are subdirectories) + if dnames: + continue + + # Each dirpath correspond to a different node + nid = os.path.basename(dirpath) + + content_history = dict() + + for fname in fnames: + if fname.endswith(".log"): + filename = os.path.join(dirpath, fname) + data = parse_file(filename) + annotate_cn_node(graph, nid, ips2nid, data, content_history) + + # Avoid storing everything in memory, instead dump to a file + # and reference the file + fname = dump_content_history(content_history) + graph.node[nid]["history"] = fname + + if parse_ping_logs: + ping_parser.annotate_cn_graph(logs_dir, graph) + + return graph + +def ccn_producers(graph): + """ Returns the nodes that are content providers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_producer")] + +def ccn_consumers(graph): + """ Returns the nodes that are content consumers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_consumer")] + +def process_content_history(graph): + """ Compute CCN message counts and aggregates content historical + information in the content_names dictionary + + """ + + ## Assume single source + source = ccn_consumers(graph)[0] + + interest_expiry_count = 0 + interest_dupnonce_count = 0 + interest_count = 0 + content_count = 0 + content_names = dict() + + # Collect information about exchanged messages by content name and + # link delay info. + for nid in graph.nodes(): + # Load the data collected from the node's ccnd log + fname = graph.node[nid]["history"] + history = load_content_history(fname) + + for content_name in history.keys(): + hist = history[content_name] + + for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist: + if message_type in ["content_from", "content_to"]: + # The first Interest sent will not have a version or chunk number. + # The first Content sent back in reply, will end in /=00 or /%00. + # Make sure to map the first Content to the first Interest. + if content_name.endswith("/=00"): + content_name = "/".join(content_name.split("/")[0:-2]) + + # Add content name to dictionary + if content_name not in content_names: + content_names[content_name] = dict() + content_names[content_name]["interest"] = dict() + content_names[content_name]["content"] = list() + + # Classify interests by replica + if message_type in ["interest_from"] and \ + nonce not in content_names[content_name]["interest"]: + content_names[content_name]["interest"][nonce] = list() + + # Add consumer history + if nid == source: + if message_type in ["interest_to", "content_from"]: + # content name history as seen by the source + if "consumer_history" not in content_names[content_name]: + content_names[content_name]["consumer_history"] = list() + + content_names[content_name]["consumer_history"].append( + (timestamp, message_type)) + + # Add messages per content name and cumulate totals by message type + if message_type == "interest_dupnonce": + interest_dupnonce_count += 1 + elif message_type == "interest_expiry": + interest_expiry_count += 1 + elif message_type == "interest_from": + interest_count += 1 + # Append to interest history of the content name + content_names[content_name]["interest"][nonce].append( + (timestamp, nid2, nid1)) + elif message_type == "content_from": + content_count += 1 + # Append to content history of the content name + content_names[content_name]["content"].append((timestamp, nid2, nid1)) + else: + continue + del hist + del history + + # Compute the time elapsed between the time an interest is sent + # in the consumer node and when the content is received back + for content_name in content_names.keys(): + # order content and interest messages by timestamp + content_names[content_name]["content"] = sorted( + content_names[content_name]["content"]) + + for nonce, timestamps in content_names[content_name][ + "interest"].iteritems(): + content_names[content_name]["interest"][nonce] = sorted( + timestamps) + + history = sorted(content_names[content_name]["consumer_history"]) + content_names[content_name]["consumer_history"] = history + + # compute the rtt time of the message + rtt = None + waiting_content = False + interest_timestamp = None + content_timestamp = None + + for (timestamp, message_type) in history: + if not waiting_content and message_type == "interest_to": + waiting_content = True + interest_timestamp = timestamp + continue + + if waiting_content and message_type == "content_from": + content_timestamp = timestamp + break + + # If we can't determine who sent the interest, discard it + rtt = -1 + if interest_timestamp and content_timestamp: + rtt = compute_delay_ms(content_timestamp, interest_timestamp) + + content_names[content_name]["rtt"] = rtt + content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp) + + return (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) + +def process_content_history_logs(logs_dir, graph): + """ Parse CCN logs and aggregate content history information in graph. + Returns annotated graph and message countn and content names history. + + """ + ## Process logs and analyse data + try: + graph = annotate_cn_graph(logs_dir, graph, + parse_ping_logs = True) + except: + print "Skipping: Error parsing ccnd logs", logs_dir + raise + + source = ccn_consumers(graph)[0] + target = ccn_producers(graph)[0] + + # Process the data from the ccnd logs, but do not re compute + # the link delay. + try: + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = process_content_history(graph) + except: + print "Skipping: Error processing ccn data", logs_dir + raise + + return (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) diff --git a/src/nepi/data/processing/ping/__init__.py b/src/nepi/data/processing/ping/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ping/parser.py b/src/nepi/data/processing/ping/parser.py new file mode 100644 index 00000000..0248c8cf --- /dev/null +++ b/src/nepi/data/processing/ping/parser.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +############################################################################### +# +# CCNX benchmark +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# Author: Alina Quereilhac +# +############################################################################### + +# +# This library contains functions to parse log files generated using ping. +# + +import collections +import re +import os + +# RE to match line starting "traceroute to" +_rre = re.compile("\d+ bytes from ((?P[^\s]+) )?\(?(?P[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P