X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=doc%2Fsphinx%2F_build%2Fhtml%2F_modules%2Fnepi%2Fdata%2Fprocessing%2Fccn%2Fparser.html;fp=doc%2Fsphinx%2F_build%2Fhtml%2F_modules%2Fnepi%2Fdata%2Fprocessing%2Fccn%2Fparser.html;h=bb3d703f6d8b921b0f50733fccc9b858e84857f8;hb=9b6a92c1bb6c85d1b6d194624e49ea01ce3893e7;hp=0000000000000000000000000000000000000000;hpb=ed00cc0dfbe89c748c07b7da96dfff338cdb301a;p=nepi.git diff --git a/doc/sphinx/_build/html/_modules/nepi/data/processing/ccn/parser.html b/doc/sphinx/_build/html/_modules/nepi/data/processing/ccn/parser.html new file mode 100644 index 00000000..bb3d703f --- /dev/null +++ b/doc/sphinx/_build/html/_modules/nepi/data/processing/ccn/parser.html @@ -0,0 +1,505 @@ + + + + + + + + nepi.data.processing.ccn.parser — NEPI 3.2 documentation + + + + + + + + + + + + + +
+
+ + +
+
+ +
+
+
+
+ +

Source code for nepi.data.processing.ccn.parser

+#!/usr/bin/env python
+
+###############################################################################
+#
+#    CCNX benchmark
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#
+###############################################################################
+
+#
+# This library contains functions to parse (CCNx) ccnd logs.
+#
+# Results from experiments must be stored in a directory
+# named with the experiment run id.
+# ccnd logs are stored in .log files in a subdirectory per node.
+# The following diagram exemplifies the experiment result directory
+# structure (nidi is the unique identifier assigned to node i):
+#
+#    run_id
+#               \   nid1
+#                        \ nid2.log
+#               \   nid2
+#                        \ nid1.log
+#               \   nid3
+#                        \ nid3.log
+#
+
+import collections
+import functools
+import networkx
+import os
+import pickle
+import tempfile
+
+from nepi.util.timefuncs import compute_delay_ms
+from nepi.util.statfuncs import compute_mean
+import nepi.data.processing.ping.parser as ping_parser
+
+
[docs]def is_control(content_name): + return content_name.startswith("ccnx:/%C1") or \ + content_name.startswith("ccnx:/ccnx") or \ + content_name.startswith("ccnx:/...") + +
+
[docs]def parse_file(filename): + """ Parses message information from ccnd log files + + filename: path to ccndlog file + + """ + + faces = dict() + sep = " " + + f = open(filename, "r") + + data = [] + + for line in f: + cols = line.strip().split(sep) + + # CCN_PEEK + # MESSAGE interest_from + # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7) + # + # MESSAGE interest_to + # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7) + # + # MESSAGE CONTENT FROM + # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # MESSAGE CONTENT_TO + # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 + # + # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048) + + # External face creation + # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695 + + if line.find("accepted datagram client") > -1: + face_id = (cols[5]).replace("id=",'') + ip = cols[7] + port = cols[9] + faces[face_id] = (ip, port) + continue + + # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4) + if line.find("releasing face id") > -1: + face_id = cols[5] + if face_id in faces: + del faces[face_id] + continue + + if len(cols) < 6: + continue + + timestamp = cols[0] + message_type = cols[3] + + if message_type not in ["interest_from", "interest_to", "content_from", + "content_to", "interest_dupnonce", "interest_expiry"]: + continue + + face_id = cols[4] + content_name = cols[5] + + # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4 + nonce = "" + if message_type in ["interest_from", "interest_to", "interest_dupnonce"]: + last = cols[-1] + if len(last.split("-")) == 5: + nonce = last + + try: + size = int((cols[6]).replace('(','')) + except: + print "interest_expiry without face id!", line + continue + + # If no external IP address was identified for this face + # asume it is a local face + peer = "localhost" + + if face_id in faces: + peer, port = faces[face_id] + + data.append((content_name, timestamp, message_type, peer, face_id, + size, nonce, line)) + + f.close() + + return data +
+
[docs]def dump_content_history(content_history): + f = tempfile.NamedTemporaryFile(delete=False) + pickle.dump(content_history, f) + f.close() + return f.name +
+
[docs]def load_content_history(fname): + f = open(fname, "r") + content_history = pickle.load(f) + f.close() + + os.remove(fname) + return content_history +
+
[docs]def annotate_cn_node(graph, nid, ips2nid, data, content_history): + for (content_name, timestamp, message_type, peer, face_id, + size, nonce, line) in data: + + # Ignore control messages for the time being + if is_control(content_name): + continue + + if message_type == "interest_from" and \ + peer == "localhost": + graph.node[nid]["ccn_consumer"] = True + elif message_type == "content_from" and \ + peer == "localhost": + graph.node[nid]["ccn_producer"] = True + + # Ignore local messages for the time being. + # They could later be used to calculate the processing times + # of messages. + if peer == "localhost": + continue + + # remove digest + if message_type in ["content_from", "content_to"]: + content_name = "/".join(content_name.split("/")[:-1]) + + if content_name not in content_history: + content_history[content_name] = list() + + peernid = ips2nid[peer] + graph.add_edge(nid, peernid) + + content_history[content_name].append((timestamp, message_type, nid, + peernid, nonce, size, line)) +
+
[docs]def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False): + """ Adds CCN content history for each node in the topology graph. + + """ + + # Make a copy of the graph to ensure integrity + graph = graph.copy() + + ips2nid = dict() + + for nid in graph.nodes(): + ips = graph.node[nid]["ips"] + for ip in ips: + ips2nid[ip] = nid + + found_files = False + + # Now walk through the ccnd logs... + for dirpath, dnames, fnames in os.walk(logs_dir): + # continue if we are not at the leaf level (if there are subdirectories) + if dnames: + continue + + # Each dirpath correspond to a different node + nid = os.path.basename(dirpath) + + # Cast to numeric nid if necessary + if int(nid) in graph.nodes(): + nid = int(nid) + + content_history = dict() + + for fname in fnames: + if fname.endswith(".log"): + found_files = True + filename = os.path.join(dirpath, fname) + data = parse_file(filename) + annotate_cn_node(graph, nid, ips2nid, data, content_history) + + # Avoid storing everything in memory, instead dump to a file + # and reference the file + fname = dump_content_history(content_history) + graph.node[nid]["history"] = fname + + if not found_files: + msg = "No CCND output files were found to parse at %s " % logs_dir + raise RuntimeError, msg + + if parse_ping_logs: + ping_parser.annotate_cn_graph(logs_dir, graph) + + return graph +
+
[docs]def ccn_producers(graph): + """ Returns the nodes that are content providers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_producer")] +
+
[docs]def ccn_consumers(graph): + """ Returns the nodes that are content consumers """ + return [nid for nid in graph.nodes() \ + if graph.node[nid].get("ccn_consumer")] +
+
[docs]def process_content_history(graph): + """ Compute CCN message counts and aggregates content historical + information in the content_names dictionary + + """ + + ## Assume single source + source = ccn_consumers(graph)[0] + + interest_expiry_count = 0 + interest_dupnonce_count = 0 + interest_count = 0 + content_count = 0 + content_names = dict() + + # Collect information about exchanged messages by content name and + # link delay info. + for nid in graph.nodes(): + # Load the data collected from the node's ccnd log + fname = graph.node[nid]["history"] + history = load_content_history(fname) + + for content_name in history.keys(): + hist = history[content_name] + + for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist: + if message_type in ["content_from", "content_to"]: + # The first Interest sent will not have a version or chunk number. + # The first Content sent back in reply, will end in /=00 or /%00. + # Make sure to map the first Content to the first Interest. + if content_name.endswith("/=00"): + content_name = "/".join(content_name.split("/")[0:-2]) + + # Add content name to dictionary + if content_name not in content_names: + content_names[content_name] = dict() + content_names[content_name]["interest"] = dict() + content_names[content_name]["content"] = list() + + # Classify interests by replica + if message_type in ["interest_from"] and \ + nonce not in content_names[content_name]["interest"]: + content_names[content_name]["interest"][nonce] = list() + + # Add consumer history + if nid == source: + if message_type in ["interest_to", "content_from"]: + # content name history as seen by the source + if "consumer_history" not in content_names[content_name]: + content_names[content_name]["consumer_history"] = list() + + content_names[content_name]["consumer_history"].append( + (timestamp, message_type)) + + # Add messages per content name and cumulate totals by message type + if message_type == "interest_dupnonce": + interest_dupnonce_count += 1 + elif message_type == "interest_expiry": + interest_expiry_count += 1 + elif message_type == "interest_from": + interest_count += 1 + # Append to interest history of the content name + content_names[content_name]["interest"][nonce].append( + (timestamp, nid2, nid1)) + elif message_type == "content_from": + content_count += 1 + # Append to content history of the content name + content_names[content_name]["content"].append((timestamp, nid2, nid1)) + else: + continue + del hist + del history + + # Compute the time elapsed between the time an interest is sent + # in the consumer node and when the content is received back + for content_name in content_names.keys(): + # order content and interest messages by timestamp + content_names[content_name]["content"] = sorted( + content_names[content_name]["content"]) + + for nonce, timestamps in content_names[content_name][ + "interest"].iteritems(): + content_names[content_name]["interest"][nonce] = sorted( + timestamps) + + history = sorted(content_names[content_name]["consumer_history"]) + content_names[content_name]["consumer_history"] = history + + # compute the rtt time of the message + rtt = None + waiting_content = False + interest_timestamp = None + content_timestamp = None + + for (timestamp, message_type) in history: + if not waiting_content and message_type == "interest_to": + waiting_content = True + interest_timestamp = timestamp + continue + + if waiting_content and message_type == "content_from": + content_timestamp = timestamp + break + + # If we can't determine who sent the interest, discard it + rtt = -1 + if interest_timestamp and content_timestamp: + rtt = compute_delay_ms(content_timestamp, interest_timestamp) + + content_names[content_name]["rtt"] = rtt + content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp) + + return (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) +
+
[docs]def process_content_history_logs(logs_dir, graph, parse_ping_logs = False): + """ Parse CCN logs and aggregate content history information in graph. + Returns annotated graph and message countn and content names history. + + """ + ## Process logs and analyse data + try: + graph = annotate_cn_graph(logs_dir, graph, + parse_ping_logs = parse_ping_logs) + except: + print "Skipping: Error parsing ccnd logs", logs_dir + raise + + source = ccn_consumers(graph)[0] + target = ccn_producers(graph)[0] + + # Process the data from the ccnd logs, but do not re compute + # the link delay. + try: + (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count) = process_content_history(graph) + except: + print "Skipping: Error processing ccn data", logs_dir + raise + + return (graph, + content_names, + interest_expiry_count, + interest_dupnonce_count, + interest_count, + content_count)
+
+ +
+
+
+
+
+ + + + \ No newline at end of file