Bug fixing the netgraph persistence
[nepi.git] / src / nepi / data / processing / ccn / parser.py
1 #!/usr/bin/env python
2
3 ###############################################################################
4 #
5 #    CCNX benchmark
6 #    Copyright (C) 2014 INRIA
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU General Public License as published by
10 #    the Free Software Foundation, either version 3 of the License, or
11 #    (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU General Public License for more details.
17 #
18 #    You should have received a copy of the GNU General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 #
22 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
23 #
24 ###############################################################################
25
26 #
27 # This library contains functions to parse (CCNx) ccnd logs.
28 #
29 # Results from experiments must be stored in a directory
30 # named with the experiment run id.
31 # ccnd logs are stored in .log files in a subdirectory per node.
32 # The following diagram exemplifies the experiment result directory
33 # structure (nidi is the unique identifier assigned to node i):
34 #
35 #    run_id
36 #               \   nid1
37 #                        \ nid2.log
38 #               \   nid2
39 #                        \ nid1.log
40 #               \   nid3
41 #                        \ nid3.log
42 #
43
44 import collections
45 import functools
46 import networkx
47 import os
48 import pickle
49 import tempfile
50
51 from nepi.util.timefuncs import compute_delay_ms
52 from nepi.util.statfuncs import compute_mean
53 import nepi.data.processing.ping.parser as ping_parser
54
55 def is_control(content_name):
56     return content_name.startswith("ccnx:/%C1") or \
57             content_name.startswith("ccnx:/ccnx") or \
58             content_name.startswith("ccnx:/...")
59
60
61 def parse_file(filename):
62     """ Parses message information from ccnd log files
63
64         filename: path to ccndlog file
65
66     """
67
68     faces = dict()
69     sep = " "
70
71     f = open(filename, "r")
72
73     data = []
74
75     for line in f:
76         cols =  line.strip().split(sep)
77
78         # CCN_PEEK
79         # MESSAGE interest_from
80         # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
81         #
82         # MESSAGE interest_to
83         # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
84         #
85         # MESSAGE CONTENT FROM
86         # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
87         #
88         # MESSAGE CONTENT_TO
89         # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
90         #
91         # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
92
93         # External face creation
94         # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
95
96         if line.find("accepted datagram client") > -1:
97             face_id = (cols[5]).replace("id=",'')
98             ip = cols[7] 
99             port = cols[9]
100             faces[face_id] = (ip, port)
101             continue
102
103         # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
104         if line.find("releasing face id") > -1:
105             face_id = cols[5]
106             if face_id in faces:
107                 del faces[face_id]
108             continue
109
110         if len(cols) < 6:
111             continue
112
113         timestamp = cols[0]
114         message_type = cols[3]
115
116         if message_type not in ["interest_from", "interest_to", "content_from", 
117                 "content_to", "interest_dupnonce", "interest_expiry"]:
118             continue
119
120         face_id = cols[4] 
121         content_name = cols[5]
122
123         # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
124         nonce = ""
125         if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
126             last = cols[-1]
127             if len(last.split("-")) == 5:
128                 nonce = last
129
130         try:
131             size = int((cols[6]).replace('(',''))
132         except:
133             print "interest_expiry without face id!", line
134             continue
135
136         # If no external IP address was identified for this face
137         # asume it is a local face
138         peer = "localhost"
139
140         if face_id in faces:
141             peer, port = faces[face_id]
142
143         data.append((content_name, timestamp, message_type, peer, face_id, 
144             size, nonce, line))
145
146     f.close()
147
148     return data
149
150 def dump_content_history(content_history):
151     f = tempfile.NamedTemporaryFile(delete=False)
152     pickle.dump(content_history, f)
153     f.close()
154     return f.name
155
156 def load_content_history(fname):
157     f = open(fname, "r")
158     content_history = pickle.load(f)
159     f.close()
160
161     os.remove(fname)
162     return content_history
163
164 def annotate_cn_node(graph, nid, ips2nid, data, content_history):
165     for (content_name, timestamp, message_type, peer, face_id, 
166             size, nonce, line) in data:
167
168         # Ignore control messages for the time being
169         if is_control(content_name):
170             continue
171
172         if message_type == "interest_from" and \
173                 peer == "localhost":
174             graph.node[nid]["ccn_consumer"] = True
175         elif message_type == "content_from" and \
176                 peer == "localhost":
177             graph.node[nid]["ccn_producer"] = True
178
179         # Ignore local messages for the time being. 
180         # They could later be used to calculate the processing times
181         # of messages.
182         if peer == "localhost":
183             continue
184
185         # remove digest
186         if message_type in ["content_from", "content_to"]:
187             content_name = "/".join(content_name.split("/")[:-1])
188            
189         if content_name not in content_history:
190             content_history[content_name] = list()
191       
192         peernid = ips2nid[peer]
193         graph.add_edge(nid, peernid)
194
195         content_history[content_name].append((timestamp, message_type, nid, 
196             peernid, nonce, size, line))
197
198 def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
199     """ Adds CCN content history for each node in the topology graph.
200
201     """
202     
203     # Make a copy of the graph to ensure integrity
204     graph = graph.copy()
205
206     ips2nid = dict()
207
208     for nid in graph.nodes():
209         ips = graph.node[nid]["ips"]
210         for ip in ips:
211             ips2nid[ip] = nid
212
213     found_files = False
214
215     # Now walk through the ccnd logs...
216     for dirpath, dnames, fnames in os.walk(logs_dir):
217         # continue if we are not at the leaf level (if there are subdirectories)
218         if dnames: 
219             continue
220         
221         # Each dirpath correspond to a different node
222         nid = os.path.basename(dirpath)
223
224         # Cast to numeric nid if necessary
225         if int(nid) in graph.nodes():
226             nid = int(nid)
227     
228         content_history = dict()
229
230         for fname in fnames:
231             if fname.endswith(".log"):
232                 found_files = True
233                 filename = os.path.join(dirpath, fname)
234                 data = parse_file(filename)
235                 annotate_cn_node(graph, nid, ips2nid, data, content_history)
236
237         # Avoid storing everything in memory, instead dump to a file
238         # and reference the file
239         fname = dump_content_history(content_history)
240         graph.node[nid]["history"] = fname
241
242     if not found_files:
243         msg = "No CCND output files were found to parse at %s " % logs_dir
244         raise RuntimeError, msg
245
246     if parse_ping_logs:
247         ping_parser.annotate_cn_graph(logs_dir, graph)
248
249     return graph
250
251 def ccn_producers(graph):
252     """ Returns the nodes that are content providers """
253     return [nid for nid in graph.nodes() \
254             if graph.node[nid].get("ccn_producer")]
255
256 def ccn_consumers(graph):
257     """ Returns the nodes that are content consumers """
258     return [nid for nid in graph.nodes() \
259             if graph.node[nid].get("ccn_consumer")]
260
261 def process_content_history(graph):
262     """ Compute CCN message counts and aggregates content historical 
263     information in the content_names dictionary 
264     
265     """
266
267     ## Assume single source
268     source = ccn_consumers(graph)[0]
269
270     interest_expiry_count = 0
271     interest_dupnonce_count = 0
272     interest_count = 0
273     content_count = 0
274     content_names = dict()
275
276     # Collect information about exchanged messages by content name and
277     # link delay info.
278     for nid in graph.nodes():
279         # Load the data collected from the node's ccnd log
280         fname = graph.node[nid]["history"]
281         history = load_content_history(fname)
282
283         for content_name in history.keys():
284             hist = history[content_name]
285
286             for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
287                 if message_type in ["content_from", "content_to"]:
288                     # The first Interest sent will not have a version or chunk number.
289                     # The first Content sent back in reply, will end in /=00 or /%00.
290                     # Make sure to map the first Content to the first Interest.
291                     if content_name.endswith("/=00"):
292                         content_name = "/".join(content_name.split("/")[0:-2])
293
294                 # Add content name to dictionary
295                 if content_name not in content_names:
296                     content_names[content_name] = dict()
297                     content_names[content_name]["interest"] = dict()
298                     content_names[content_name]["content"] = list()
299
300                 # Classify interests by replica
301                 if message_type in ["interest_from"] and \
302                         nonce not in content_names[content_name]["interest"]:
303                     content_names[content_name]["interest"][nonce] = list()
304      
305                 # Add consumer history
306                 if nid == source:
307                     if message_type in ["interest_to", "content_from"]:
308                         # content name history as seen by the source
309                         if "consumer_history" not in content_names[content_name]:
310                             content_names[content_name]["consumer_history"] = list()
311
312                         content_names[content_name]["consumer_history"].append(
313                                 (timestamp, message_type)) 
314
315                 # Add messages per content name and cumulate totals by message type
316                 if message_type == "interest_dupnonce":
317                     interest_dupnonce_count += 1
318                 elif message_type == "interest_expiry":
319                     interest_expiry_count += 1
320                 elif message_type == "interest_from":
321                     interest_count += 1
322                     # Append to interest history of the content name
323                     content_names[content_name]["interest"][nonce].append(
324                             (timestamp, nid2, nid1))
325                 elif message_type == "content_from":
326                     content_count += 1
327                     # Append to content history of the content name
328                     content_names[content_name]["content"].append((timestamp, nid2, nid1))
329                 else:
330                     continue
331             del hist
332         del history
333
334     # Compute the time elapsed between the time an interest is sent
335     # in the consumer node and when the content is received back
336     for content_name in content_names.keys():
337         # order content and interest messages by timestamp
338         content_names[content_name]["content"] = sorted(
339               content_names[content_name]["content"])
340         
341         for nonce, timestamps in content_names[content_name][
342                     "interest"].iteritems():
343               content_names[content_name]["interest"][nonce] = sorted(
344                         timestamps)
345       
346         history = sorted(content_names[content_name]["consumer_history"])
347         content_names[content_name]["consumer_history"] = history
348
349         # compute the rtt time of the message
350         rtt = None
351         waiting_content = False 
352         interest_timestamp = None
353         content_timestamp = None
354         
355         for (timestamp, message_type) in history:
356             if not waiting_content and message_type == "interest_to":
357                 waiting_content = True
358                 interest_timestamp = timestamp
359                 continue
360
361             if waiting_content and message_type == "content_from":
362                 content_timestamp = timestamp
363                 break
364     
365         # If we can't determine who sent the interest, discard it
366         rtt = -1
367         if interest_timestamp and content_timestamp:
368             rtt = compute_delay_ms(content_timestamp, interest_timestamp)
369
370         content_names[content_name]["rtt"] = rtt
371         content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
372
373     return (graph,
374         content_names,
375         interest_expiry_count,
376         interest_dupnonce_count,
377         interest_count,
378         content_count)
379
380 def process_content_history_logs(logs_dir, graph, parse_ping_logs = False):
381     """ Parse CCN logs and aggregate content history information in graph.
382     Returns annotated graph and message countn and content names history.
383
384     """
385     ## Process logs and analyse data
386     try:
387         graph = annotate_cn_graph(logs_dir, graph, 
388                 parse_ping_logs = parse_ping_logs)
389     except:
390         print "Skipping: Error parsing ccnd logs", logs_dir
391         raise
392
393     source = ccn_consumers(graph)[0]
394     target = ccn_producers(graph)[0]
395
396     # Process the data from the ccnd logs, but do not re compute
397     # the link delay. 
398     try:
399         (graph,
400         content_names,
401         interest_expiry_count,
402         interest_dupnonce_count,
403         interest_count,
404         content_count) = process_content_history(graph)
405     except:
406         print "Skipping: Error processing ccn data", logs_dir
407         raise
408
409     return (graph,
410             content_names,
411             interest_expiry_count,
412             interest_dupnonce_count,
413             interest_count,
414             content_count)