From: Alina Quereilhac Date: Sun, 10 Aug 2014 02:56:51 +0000 (+0200) Subject: Fixing issues with serialization X-Git-Tag: nepi-3.2.0~99^2~5 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=d1c731d0fbf6c0b8a21607795fb1101a46a2518d Fixing issues with serialization --- diff --git a/examples/ccn_flooding/flooding.py b/examples/ccn_flooding/flooding.py deleted file mode 100644 index c35d7ef4..00000000 --- a/examples/ccn_flooding/flooding.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python - -############################################################################### -# -# NEPI, a framework to manage network experiments -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# Author: Alina Quereilhac -# -############################################################################### - -from nepi.execution.ec import ExperimentController -from nepi.execution.resource import ResourceState, ResourceAction -from nepi.util.netgraph import NetGraph, TopologyType - -import os -import tempfile - -from dminer import ccn - -PL_NODES = dict({ - "0": "iraplab1.iralab.uni-karlsruhe.de", - "1": "planetlab1.informatik.uni-goettingen.de", - "2": "dfn-ple1.x-win.dfn.de", - "3": "mars.planetlab.haw-hamburg.de", - "4": "planetlab2.unineuchatel.ch", - "5": "planetlab-node3.it-sudparis.eu", - "6": "planetlab2.extern.kuleuven.be", - "7": "node2pl.planet-lab.telecom-lille1.eu", - "8": "planetvs2.informatik.uni-stuttgart.de", - "9": "planetlab1.informatik.uni-wuerzburg.de", - "10": "planet1.l3s.uni-hannover.de", - "11": "planetlab1.wiwi.hu-berlin.de", - "12": "pl2.uni-rostock.de", - "13": "planetlab1.u-strasbg.fr", - "14": "peeramidion.irisa.fr" - }) - -pl_slice = os.environ.get("PL_SLICE") -pl_user = os.environ.get("PL_USER") -pl_password = os.environ.get("PL_PASS") -pl_ssh_key = os.environ.get("PL_SSHKEY") - -content_name = "ccnx:/test/bunny.ts" - -pipeline = 4 # Default value for ccncat - -operating_system = "f14" - -country = "germany" - -repofile = os.path.join( - os.path.dirname(os.path.realpath(__file__)), - "..", "repoFile1.0.8.2") - -def add_collector(ec, trace_name, store_dir, sub_dir, rename = None): - collector = ec.register_resource("Collector") - ec.set(collector, "traceName", trace_name) - ec.set(collector, "storeDir", store_dir) - ec.set(collector, "subDir", sub_dir) - if rename: - ec.set(collector, "rename", rename) - - return collector - -def add_node(ec, n): - hostname = PL_NODES[n] - - node = ec.register_resource("PlanetlabNode") - ec.set(node, "hostname", hostname) - ec.set(node, "username", username) - ec.set(node, "identity", identity) - ec.set(node, "pluser", pl_user) - ec.set(node, "plpassword", pl_password) - #ec.set(node, "country", country) - #ec.set(node, "operatingSystem", operating_system) - ec.set(node, "cleanExperiment", True) - ec.set(node, "cleanProcesses", True) - - return node - -def add_ccnd(ec, node, n): - global PL_NODES - - ccnd = ec.register_resource("LinuxCCND") - ec.set(ccnd, "debug", 7) - ec.register_connection(ccnd, node) - - # collector for ccnd trace - hostname = PL_NODES[n] - collector = add_collector(ec, "stderr", hostname, "log") - ec.register_connection(collector, ccnd) - - PL_NODES[n] = (hostname, node, ccnd) - return ccnd - -def add_ccnr(ec, ccnd): - ccnr = ec.register_resource("LinuxCCNR") - - ec.set(ccnr, "repoFile1", repofile) - ec.register_connection(ccnr, ccnd) - - return ccnr - -def add_fib_entry(ec, n1, n2): - (hostname1, node1, ccnd1) = PL_NODES[n1] - (hostname2, node2, ccnd2) = PL_NODES[n2] - - entry = ec.register_resource("LinuxFIBEntry") - ec.set(entry, "host", peer_host) - - ec.register_connection(entry, ccnd1) - - ec.enable_trace(entry, "ping") - collector = add_collector(ec, "ping", hostname2) - ec.register_connection(collector, entry) - - return entry - -def add_ccncat(ec, ccnd): - ccncat = ec.register_resource("LinuxCCNCat") - ec.set(ccncat, "pipeline", pipeline) - ec.set(ccncat, "contentName", content_name) - ec.register_connection(ccncat, ccnd) - - return ccncat - -def compute_metric_callback(ec, run): - ## Process logs and analyse data - try: - graph = ccn.parse_ccndlogs(graph = graph, - parse_ping_logs = True) - except: - print "Skipping: Error parsing ccnd logs", run_dir - raise - - source = ccn.consumers(graph)[0] - target = ccn.producers(graph)[0] - - # Process the data from the ccnd logs, but do not re compute - # the link delay. - try: - (content_names, - interest_expiry_count, - interest_dupnonce_count, - interest_count, - content_count) = ccn.process_ccn_data(graph, source) - except: - print "Skipping: Error processing ccn data", run_dir - raise - - # Compute the shortest path - shortest_path = ccn.shortest_path(graph, source, target) - - # Compute the load coefficient - lcoeff = ccn.load_coefficient(graph, shortest_path, content_names) - - return lcoeff - -if __name__ == '__main__': - - #### Generate a LADDER network topology - net_graph = NetGraph(topo_type = TopologyType.LADDER, - node_count = 6, - assign_st = True, - assign_ips = True) - - target = net_graph.targets()[0] - source = net_graph.sources()[0] - - wait_guids = [] - - #### Create NEPI Experiment Description (EC) - ec = ExperimentController(exp_id) - - ### Add CCN nodes to the (EC) - for n in graph.nodes(): - node = add_node(ec, n) - ccnd = add_ccnd(ec, node, n) - - if n == target: - ccnr = add_ccnr(ec, ccnd) - - ## Add content retrival application - if n == source: - ccncat = add_ccncat(ec, ccnd) - wait_guids.append(ccncat) - - #### Add connections between CCN nodes - for n1, n2 in graph.edges(): - add_fib_entry(ec, n1, n2) - add_fib_entry(ec, n2, n1) - - #### Define the callback to compute experiment metric - metric_callback = functools.partial(compute_metric_callback, ping) - - #### Run experiment until metric convergence - rnr = ExperimentRunner() - - runs = rnr.run(ec, min_runs = 10, max_runs = 300 - compute_metric_callback = metric_callback, - wait_guids = wait_guids, - wait_time = 0) - diff --git a/examples/ccn_flooding/planetlab.py b/examples/ccn_flooding/planetlab.py new file mode 100644 index 00000000..5834a85b --- /dev/null +++ b/examples/ccn_flooding/planetlab.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python + +############################################################################### +# +# NEPI, a framework to manage network experiments +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# +############################################################################### + +from nepi.execution.ec import ExperimentController +from nepi.execution.runner import ExperimentRunner +from nepi.util.netgraph import NetGraph, TopologyType +import nepi.data.processing.ccn.parser as ccn_parser + +import socket +import os + +PL_NODES = dict({ + "0": "iraplab1.iralab.uni-karlsruhe.de", + "1": "planetlab1.informatik.uni-goettingen.de", + "2": "dfn-ple1.x-win.dfn.de", + "3": "mars.planetlab.haw-hamburg.de", + "4": "planetlab2.unineuchatel.ch", + "5": "planetlab-node3.it-sudparis.eu", + "6": "planetlab2.extern.kuleuven.be", + "7": "node2pl.planet-lab.telecom-lille1.eu", + "8": "planetvs2.informatik.uni-stuttgart.de", + "9": "planetlab1.informatik.uni-wuerzburg.de", + "10": "planet1.l3s.uni-hannover.de", + "11": "planetlab1.wiwi.hu-berlin.de", + "12": "pl2.uni-rostock.de", + "13": "planetlab1.u-strasbg.fr", + "14": "peeramidion.irisa.fr" + }) + +pl_slice = os.environ.get("PL_SLICE") +pl_user = os.environ.get("PL_USER") +pl_password = os.environ.get("PL_PASS") +pl_ssh_key = os.environ.get("PL_SSHKEY") + +content_name = "ccnx:/test/bunny.ts" + +pipeline = 4 # Default value for ccncat + +operating_system = "f14" + +country = "germany" + +repofile = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "repoFile1.0.8.2") + +def add_collector(ec, trace_name, subdir, newname = None): + collector = ec.register_resource("Collector") + ec.set(collector, "traceName", trace_name) + ec.set(collector, "subDir", subdir) + if newname: + ec.set(collector, "rename", newname) + + return collector + +def add_pl_host(ec, nid): + hostname = PL_NODES[nid] + + # Add a planetlab host to the experiment description + host = ec.register_resource("PlanetlabNode") + ec.set(host, "hostname", hostname) + ec.set(host, "username", pl_slice) + ec.set(host, "identity", pl_ssh_key) + #ec.set(host, "pluser", pl_user) + #ec.set(host, "plpassword", pl_password) + #ec.set(host, "country", country) + #ec.set(host, "operatingSystem", operating_system) + ec.set(host, "cleanExperiment", True) + ec.set(host, "cleanProcesses", True) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "hostname", hostname) + ec.netgraph.annotate_node(nid, "host", host) + + # Annotate the graph node with an ip address + ip = socket.gethostbyname(hostname) + ec.netgraph.annotate_node_ip(nid, ip) + +def add_pl_ccnd(ec, nid): + # Retrieve annotation from netgraph + host = ec.netgraph.node_annotation(nid, "host") + + # Add a CCN daemon to the planetlab node + ccnd = ec.register_resource("LinuxCCND") + ec.set(ccnd, "debug", 7) + ec.register_connection(ccnd, host) + + # Collector to retrieve ccnd log + collector = add_collector(ec, "stderr", nid, "log") + ec.register_connection(collector, ccnd) + + # Annotate the graph + ec.netgraph.annotate_node(nid, "ccnd", ccnd) + +def add_pl_ccnr(ec, nid): + # Retrieve annotation from netgraph + ccnd = ec.netgraph.node_annotation(nid, "ccnd") + + # Add a CCN content repository to the planetlab node + ccnr = ec.register_resource("LinuxCCNR") + + ec.set(ccnr, "repoFile1", repofile) + ec.register_connection(ccnr, ccnd) + +def add_pl_ccncat(ec, nid): + # Retrieve annotation from netgraph + ccnd = ec.netgraph.node_annotation(nid, "ccnd") + + # Add a CCN cat application to the planetlab node + ccncat = ec.register_resource("LinuxCCNCat") + ec.set(ccncat, "pipeline", pipeline) + ec.set(ccncat, "contentName", content_name) + ec.register_connection(ccncat, ccnd) + +def add_pl_fib_entry(ec, nid1, nid2): + # Retrieve annotations from netgraph + ccnd1 = ec.netgraph.node_annotation(nid1, "ccnd") + hostname2 = ec.netgraph.node_annotation(nid2, "hostname") + + # Add a FIB entry between one planetlab node and its peer + entry = ec.register_resource("LinuxFIBEntry") + ec.set(entry, "host", hostname2) + ec.register_connection(entry, ccnd1) + + # Collector to retrieve peering ping output (to measure neighbors delay) + ec.enable_trace(entry, "ping") + collector = add_collector(ec, "ping", nid1) + ec.register_connection(collector, entry) + + return entry + +def avg_interests(ec, run): + ## Process logs + logs_dir = ec.run_dir + + print ec.netgraph + + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = ccn_parser.process_content_history_logs( + logs_dir, + ec.netgraph) + + shortest_path = networkx.shortest_path(graph, + source = ec.netgraph.sources()[0], + target = ec.netgraph.targets()[0]) + + ### Compute metric: Avg number of Interests seen per content name + ### normalized by the number of nodes in the shortest path + content_name_count = len(content_names.values()) + nodes_in_shortest_path = len(content_names.values()) + metric = interest_count / float(content_name_count) / float(nodes_in_shortest_path) + + # TODO: DUMP RESULTS TO FILE + # TODO: DUMP GRAPH DELAYS! + + return metric + +def add_pl_edge(ec, nid1, nid2): + #### Add connections between CCN nodes + add_pl_fib_entry(ec, nid1, nid2) + add_pl_fib_entry(ec, nid2, nid1) + +def add_pl_node(ec, nid): + ### Add CCN nodes (ec.netgraph holds the topology graph) + add_pl_host(ec, nid) + add_pl_ccnd(ec, nid) + + if nid == ec.netgraph.targets()[0]: + add_pl_ccnr(ec, nid) + + if nid == ec.netgraph.sources()[0]: + add_pl_ccncat(ec, nid) + +if __name__ == '__main__': + + #### Create NEPI Experiment Description with LINEAR topology + ec = ExperimentController("pl_ccn", + topo_type = TopologyType.LINEAR, + node_count = 4, + #assign_ips = True, + assign_st = True, + add_node_callback = add_pl_node, + add_edge_callback = add_pl_edge) + + print "Results stored at", ec.exp_dir + + #### Retrieve the content producing resource to wait for ot to finish + ccncat = ec.filter_resources("LinuxCCNCat") + + #### Run experiment until metric convergences + rnr = ExperimentRunner() + runs = rnr.run(ec, min_runs = 10, max_runs = 300, + compute_metric_callback = avg_interests, + wait_guids = ccncat, + wait_time = 0) + diff --git a/src/nepi/data/__init__.py b/src/nepi/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/__init__.py b/src/nepi/data/processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ccn/__init__.py b/src/nepi/data/processing/ccn/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ccn/parser.py b/src/nepi/data/processing/ccn/parser.py new file mode 100644 index 00000000..1a658307 --- /dev/null +++ b/src/nepi/data/processing/ccn/parser.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python + +############################################################################### +# +# CCNX benchmark +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# Author: Alina Quereilhac +# +############################################################################### + +# +# This library contains functions to parse (CCNx) ccnd logs. +# +# Results from experiments must be stored in a directory +# named with the experiment run id. +# ccnd logs are stored in .log files in a subdirectory per node. +# The following diagram exemplifies the experiment result directory +# structure (nidi is the unique identifier assigned to node i): +# +# run_id +# \ nid1 +# \ nid2.log +# \ nid2 +# \ nid1.log +# \ nid3 +# \ nid3.log +# + +import collections +import functools +import networkx +import os +import pickle +import tempfile + +from nepi.util.timefuncs import compute_delay_ms +from nepi.util.statfuncs import compute_mean +import nepi.data.processing.ping.parser as ping_parser + +def is_control(content_name): + return content_name.startswith("ccnx:/%C1") or \ + content_name.startswith("ccnx:/ccnx") or \ + content_name.startswith("ccnx:/...") + + +def parse_file(filename): + """ Parses message information from ccnd log files + + filename: path to ccndlog file + + """ + + faces = dict() + sep = " " + + f = open(filename, "r") + + data = [] + + for line in f: + cols = line.strip().split(sep) + + # CCN_PEEK + # MESSAGE interest_from + # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7) + # + # MESSAGE interest_to + # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7) + # + # MESSAGE CONTENT FROM + # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # MESSAGE CONTENT_TO + # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048) + + # External face creation + # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695 + + if line.find("accepted datagram client") > -1: + face_id = (cols[5]).replace("id=",'') + ip = cols[7] + port = cols[9] + faces[face_id] = (ip, port) + continue + + # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4) + if line.find("releasing face id") > -1: + face_id = cols[5] + if face_id in faces: + del faces[face_id] + continue + + if len(cols) < 6: + continue + + timestamp = cols[0] + message_type = cols[3] + + if message_type not in ["interest_from", "interest_to", "content_from", + "content_to", "interest_dupnonce", "interest_expiry"]: + continue + + face_id = cols[4] + content_name = cols[5] + + # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4 + nonce = "" + if message_type in ["interest_from", "interest_to", "interest_dupnonce"]: + last = cols[-1] + if len(last.split("-")) == 5: + nonce = last + + try: + size = int((cols[6]).replace('(','')) + except: + print "interest_expiry without face id!", line + continue + + # If no external IP address was identified for this face + # asume it is a local face + hostip = "localhost" + + if face_id in faces: + hostip, port = faces[face_id] + + data.append((content_name, timestamp, message_type, hostip, face_id, + size, nonce, line)) + + f.close() + + return data + +def dump_content_history(content_history): + f = tempfile.NamedTemporaryFile(delete=False) + pickle.dump(content_history, f) + f.close() + return f.name + +def load_content_history(fname): + f = open(fname, "r") + content_history = pickle.load(f) + f.close() + + os.remove(fname) + return content_history + +def annotate_cn_node(graph, nid, ips2nid, data, content_history): + for (content_name, timestamp, message_type, hostip, face_id, + size, nonce, line) in data: + + # Ignore local messages for the time being. + # They could later be used to calculate the processing times + # of messages. + if peer == "localhost": + return + + # Ignore control messages for the time being + if is_control(content_name): + return + + if message_type == "interest_from" and \ + peer == "localhost": + graph.node[nid]["ccn_consumer"] = True + elif message_type == "content_from" and \ + peer == "localhost": + graph.node[nid]["ccn_producer"] = True + + # remove digest + if message_type in ["content_from", "content_to"]: + content_name = "/".join(content_name.split("/")[:-1]) + + if content_name not in content_history: + content_history[content_name] = list() + + peernid = ips2nid[peer] + add_edge(graph, nid, peernid) + + content_history[content_name].append((timestamp, message_type, nid, + peernid, nonce, size, line)) + +def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False): + """ Adds CCN content history for each node in the topology graph. + + """ + # Make a copy of the graph to ensure integrity + graph = graph.copy() + + ips2nids = dict() + + for nid in graph.nodes(): + ips = graph.node[nid]["ips"] + for ip in ips: + ips2nids[ip] = nid + + # Now walk through the ccnd logs... + for dirpath, dnames, fnames in os.walk(logs_dir): + # continue if we are not at the leaf level (if there are subdirectories) + if dnames: + continue + + # Each dirpath correspond to a different node + nid = os.path.basename(dirpath) + + content_history = dict() + + for fname in fnames: + if fname.endswith(".log"): + filename = os.path.join(dirpath, fname) + data = parse_file(filename) + annotate_cn_node(graph, nid, ips2nids, data, content_history) + + # Avoid storing everything in memory, instead dump to a file + # and reference the file + fname = dump_content_history(content_history) + graph.node[nid]["history"] = fname + + if parse_ping_logs: + ping_parser.annotate_cn_graph(logs_dir, graph) + + return graph + +def ccn_producers(graph): + """ Returns the nodes that are content providers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_producer")] + +def ccn_consumers(graph): + """ Returns the nodes that are content consumers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_consumer")] + +def process_content_history(graph): + """ Compute CCN message counts and aggregates content historical + information in the content_names dictionary + + """ + + ## Assume single source + source = ccn_consumers(graph)[0] + + interest_expiry_count = 0 + interest_dupnonce_count = 0 + interest_count = 0 + content_count = 0 + content_names = dict() + + # Collect information about exchanged messages by content name and + # link delay info. + for nid in graph.nodes(): + # Load the data collected from the node's ccnd log + fname = graph.node[nid]["history"] + history = load_content_history(fname) + + for content_name in history.keys(): + hist = history[content_name] + + for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist: + if message_type in ["content_from", "content_to"]: + # The first Interest sent will not have a version or chunk number. + # The first Content sent back in reply, will end in /=00 or /%00. + # Make sure to map the first Content to the first Interest. + if content_name.endswith("/=00"): + content_name = "/".join(content_name.split("/")[0:-2]) + + # Add content name to dictionary + if content_name not in content_names: + content_names[content_name] = dict() + content_names[content_name]["interest"] = dict() + content_names[content_name]["content"] = list() + + # Classify interests by replica + if message_type in ["interest_from"] and \ + nonce not in content_names[content_name]["interest"]: + content_names[content_name]["interest"][nonce] = list() + + # Add consumer history + if nid == source: + if message_type in ["interest_to", "content_from"]: + # content name history as seen by the source + if "consumer_history" not in content_names[content_name]: + content_names[content_name]["consumer_history"] = list() + + content_names[content_name]["consumer_history"].append( + (timestamp, message_type)) + + # Add messages per content name and cumulate totals by message type + if message_type == "interest_dupnonce": + interest_dupnonce_count += 1 + elif message_type == "interest_expiry": + interest_expiry_count += 1 + elif message_type == "interest_from": + interest_count += 1 + # Append to interest history of the content name + content_names[content_name]["interest"][nonce].append( + (timestamp, nid2, nid1)) + elif message_type == "content_from": + content_count += 1 + # Append to content history of the content name + content_names[content_name]["content"].append((timestamp, nid2, nid1)) + else: + continue + del hist + del history + + # Compute the time elapsed between the time an interest is sent + # in the consumer node and when the content is received back + for content_name in content_names.keys(): + # order content and interest messages by timestamp + content_names[content_name]["content"] = sorted( + content_names[content_name]["content"]) + + for nonce, timestamps in content_names[content_name][ + "interest"].iteritems(): + content_names[content_name]["interest"][nonce] = sorted( + timestamps) + + history = sorted(content_names[content_name]["consumer_history"]) + content_names[content_name]["consumer_history"] = history + + # compute the rtt time of the message + rtt = None + waiting_content = False + interest_timestamp = None + content_timestamp = None + + for (timestamp, message_type) in history: + if not waiting_content and message_type == "interest_to": + waiting_content = True + interest_timestamp = timestamp + continue + + if waiting_content and message_type == "content_from": + content_timestamp = timestamp + break + + # If we can't determine who sent the interest, discard it + rtt = -1 + if interest_timestamp and content_timestamp: + rtt = compute_delay_ms(content_timestamp, interest_timestamp) + + content_names[content_name]["rtt"] = rtt + content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp) + + return (content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) + +def process_content_history_logs(logs_dir, graph): + """ Parse CCN logs and aggregate content history information in graph. + Returns annotated graph and message countn and content names history. + + """ + ## Process logs and analyse data + try: + graph = annotate_cn_graph(logs_dir, graph, + parse_ping_logs = True) + except: + print "Skipping: Error parsing ccnd logs", logs_dir + raise + + source = consumers(graph)[0] + target = producers(graph)[0] + + # Process the data from the ccnd logs, but do not re compute + # the link delay. + try: + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = process_content_history(graph) + except: + print "Skipping: Error processing ccn data", logs_dir + raise + + return (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) diff --git a/src/nepi/data/processing/ping/__init__.py b/src/nepi/data/processing/ping/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/nepi/data/processing/ping/parser.py b/src/nepi/data/processing/ping/parser.py new file mode 100644 index 00000000..33962075 --- /dev/null +++ b/src/nepi/data/processing/ping/parser.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python + +############################################################################### +# +# CCNX benchmark +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# Author: Alina Quereilhac +# +############################################################################### + +# +# This library contains functions to parse log files generated using ping. +# + +import re + +# RE to match line starting "traceroute to" +_rre = re.compile("\d+ bytes from ((?P[^\s]+) )?\(?(?P[^\s]+)\)??: icmp_.eq=\d+ ttl=\d+ time=(?P