#!/usr/bin/env python ############################################################################### # # CCNX benchmark # Copyright (C) 2014 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # Author: Alina Quereilhac # ############################################################################### # # This library contains functions to parse (CCNx) ccnd logs. # # Results from experiments must be stored in a directory # named with the experiment run id. # ccnd logs are stored in .log files in a subdirectory per node. # The following diagram exemplifies the experiment result directory # structure (nidi is the unique identifier assigned to node i): # # run_id # \ nid1 # \ nid2.log # \ nid2 # \ nid1.log # \ nid3 # \ nid3.log # from __future__ import print_function import collections import functools import networkx import os import pickle import tempfile from nepi.util.timefuncs import compute_delay_ms from nepi.util.statfuncs import compute_mean import nepi.data.processing.ping.parser as ping_parser def is_control(content_name): return content_name.startswith("ccnx:/%C1") or \ content_name.startswith("ccnx:/ccnx") or \ content_name.startswith("ccnx:/...") def parse_file(filename): """ Parses message information from ccnd log files filename: path to ccndlog file """ faces = dict() sep = " " with open(filename, "r") as f: data = [] for line in f: cols = line.strip().split(sep) # CCN_PEEK # MESSAGE interest_from # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7) # # MESSAGE interest_to # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7) # # MESSAGE CONTENT FROM # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 # # MESSAGE CONTENT_TO # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8 # # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048) # External face creation # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695 if line.find("accepted datagram client") > -1: face_id = (cols[5]).replace("id=",'') ip = cols[7] port = cols[9] faces[face_id] = (ip, port) continue # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4) if line.find("releasing face id") > -1: face_id = cols[5] if face_id in faces: del faces[face_id] continue if len(cols) < 6: continue timestamp = cols[0] message_type = cols[3] if message_type not in ["interest_from", "interest_to", "content_from", "content_to", "interest_dupnonce", "interest_expiry"]: continue face_id = cols[4] content_name = cols[5] # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4 nonce = "" if message_type in ["interest_from", "interest_to", "interest_dupnonce"]: last = cols[-1] if len(last.split("-")) == 5: nonce = last try: size = int((cols[6]).replace('(','')) except: print("interest_expiry without face id!", line) continue # If no external IP address was identified for this face # asume it is a local face peer = "localhost" if face_id in faces: peer, port = faces[face_id] data.append((content_name, timestamp, message_type, peer, face_id, size, nonce, line)) return data def dump_content_history(content_history): f = tempfile.NamedTemporaryFile(delete=False) pickle.dump(content_history, f) f.close() return f.name def load_content_history(fname): with open(fname, "r") as f: content_history = pickle.load(f) os.remove(fname) return content_history def annotate_cn_node(graph, nid, ips2nid, data, content_history): for (content_name, timestamp, message_type, peer, face_id, size, nonce, line) in data: # Ignore control messages for the time being if is_control(content_name): continue if message_type == "interest_from" and \ peer == "localhost": graph.node[nid]["ccn_consumer"] = True elif message_type == "content_from" and \ peer == "localhost": graph.node[nid]["ccn_producer"] = True # Ignore local messages for the time being. # They could later be used to calculate the processing times # of messages. if peer == "localhost": continue # remove digest if message_type in ["content_from", "content_to"]: content_name = "/".join(content_name.split("/")[:-1]) if content_name not in content_history: content_history[content_name] = list() peernid = ips2nid[peer] graph.add_edge(nid, peernid) content_history[content_name].append((timestamp, message_type, nid, peernid, nonce, size, line)) def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False): """ Adds CCN content history for each node in the topology graph. """ # Make a copy of the graph to ensure integrity graph = graph.copy() ips2nid = dict() for nid in graph.nodes(): ips = graph.node[nid]["ips"] for ip in ips: ips2nid[ip] = nid found_files = False # Now walk through the ccnd logs... for dirpath, dnames, fnames in os.walk(logs_dir): # continue if we are not at the leaf level (if there are subdirectories) if dnames: continue # Each dirpath correspond to a different node nid = os.path.basename(dirpath) # Cast to numeric nid if necessary if int(nid) in graph.nodes(): nid = int(nid) content_history = dict() for fname in fnames: if fname.endswith(".log"): found_files = True filename = os.path.join(dirpath, fname) data = parse_file(filename) annotate_cn_node(graph, nid, ips2nid, data, content_history) # Avoid storing everything in memory, instead dump to a file # and reference the file fname = dump_content_history(content_history) graph.node[nid]["history"] = fname if not found_files: msg = "No CCND output files were found to parse at %s " % logs_dir raise RuntimeError(msg) if parse_ping_logs: ping_parser.annotate_cn_graph(logs_dir, graph) return graph def ccn_producers(graph): """ Returns the nodes that are content providers """ return [nid for nid in graph.nodes() \ if graph.node[nid].get("ccn_producer")] def ccn_consumers(graph): """ Returns the nodes that are content consumers """ return [nid for nid in graph.nodes() \ if graph.node[nid].get("ccn_consumer")] def process_content_history(graph): """ Compute CCN message counts and aggregates content historical information in the content_names dictionary """ ## Assume single source source = ccn_consumers(graph)[0] interest_expiry_count = 0 interest_dupnonce_count = 0 interest_count = 0 content_count = 0 content_names = dict() # Collect information about exchanged messages by content name and # link delay info. for nid in graph.nodes(): # Load the data collected from the node's ccnd log fname = graph.node[nid]["history"] history = load_content_history(fname) for content_name in history.keys(): hist = history[content_name] for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist: if message_type in ["content_from", "content_to"]: # The first Interest sent will not have a version or chunk number. # The first Content sent back in reply, will end in /=00 or /%00. # Make sure to map the first Content to the first Interest. if content_name.endswith("/=00"): content_name = "/".join(content_name.split("/")[0:-2]) # Add content name to dictionary if content_name not in content_names: content_names[content_name] = dict() content_names[content_name]["interest"] = dict() content_names[content_name]["content"] = list() # Classify interests by replica if message_type in ["interest_from"] and \ nonce not in content_names[content_name]["interest"]: content_names[content_name]["interest"][nonce] = list() # Add consumer history if nid == source: if message_type in ["interest_to", "content_from"]: # content name history as seen by the source if "consumer_history" not in content_names[content_name]: content_names[content_name]["consumer_history"] = list() content_names[content_name]["consumer_history"].append( (timestamp, message_type)) # Add messages per content name and cumulate totals by message type if message_type == "interest_dupnonce": interest_dupnonce_count += 1 elif message_type == "interest_expiry": interest_expiry_count += 1 elif message_type == "interest_from": interest_count += 1 # Append to interest history of the content name content_names[content_name]["interest"][nonce].append( (timestamp, nid2, nid1)) elif message_type == "content_from": content_count += 1 # Append to content history of the content name content_names[content_name]["content"].append((timestamp, nid2, nid1)) else: continue del hist del history # Compute the time elapsed between the time an interest is sent # in the consumer node and when the content is received back for content_name in content_names.keys(): # order content and interest messages by timestamp content_names[content_name]["content"] = sorted( content_names[content_name]["content"]) for nonce, timestamps in content_names[content_name][ "interest"].iteritems(): content_names[content_name]["interest"][nonce] = sorted( timestamps) history = sorted(content_names[content_name]["consumer_history"]) content_names[content_name]["consumer_history"] = history # compute the rtt time of the message rtt = None waiting_content = False interest_timestamp = None content_timestamp = None for (timestamp, message_type) in history: if not waiting_content and message_type == "interest_to": waiting_content = True interest_timestamp = timestamp continue if waiting_content and message_type == "content_from": content_timestamp = timestamp break # If we can't determine who sent the interest, discard it rtt = -1 if interest_timestamp and content_timestamp: rtt = compute_delay_ms(content_timestamp, interest_timestamp) content_names[content_name]["rtt"] = rtt content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp) return (graph, content_names, interest_expiry_count, interest_dupnonce_count, interest_count, content_count) def process_content_history_logs(logs_dir, graph, parse_ping_logs = False): """ Parse CCN logs and aggregate content history information in graph. Returns annotated graph and message countn and content names history. """ ## Process logs and analyse data try: graph = annotate_cn_graph(logs_dir, graph, parse_ping_logs = parse_ping_logs) except: print("Skipping: Error parsing ccnd logs", logs_dir) raise source = ccn_consumers(graph)[0] target = ccn_producers(graph)[0] # Process the data from the ccnd logs, but do not re compute # the link delay. try: (graph, content_names, interest_expiry_count, interest_dupnonce_count, interest_count, content_count) = process_content_history(graph) except: print("Skipping: Error processing ccn data", logs_dir) raise return (graph, content_names, interest_expiry_count, interest_dupnonce_count, interest_count, content_count)