applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / data / processing / ccn / parser.py
1 #!/usr/bin/env python
2
3 ###############################################################################
4 #
5 #    CCNX benchmark
6 #    Copyright (C) 2014 INRIA
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU General Public License version 2 as
10 #    published by the Free Software Foundation;
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU General Public License for more details.
16 #
17 #    You should have received a copy of the GNU General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 #
21 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
22 #
23 ###############################################################################
24
25 #
26 # This library contains functions to parse (CCNx) ccnd logs.
27 #
28 # Results from experiments must be stored in a directory
29 # named with the experiment run id.
30 # ccnd logs are stored in .log files in a subdirectory per node.
31 # The following diagram exemplifies the experiment result directory
32 # structure (nidi is the unique identifier assigned to node i):
33 #
34 #    run_id
35 #               \   nid1
36 #                        \ nid2.log
37 #               \   nid2
38 #                        \ nid1.log
39 #               \   nid3
40 #                        \ nid3.log
41 #
42
43 from __future__ import print_function
44
45 import collections
46 import functools
47 import networkx
48 import os
49 import pickle
50 import tempfile
51
52 from nepi.util.timefuncs import compute_delay_ms
53 from nepi.util.statfuncs import compute_mean
54 import nepi.data.processing.ping.parser as ping_parser
55
56 def is_control(content_name):
57     return content_name.startswith("ccnx:/%C1") or \
58             content_name.startswith("ccnx:/ccnx") or \
59             content_name.startswith("ccnx:/...")
60
61
62 def parse_file(filename):
63     """ Parses message information from ccnd log files
64
65         filename: path to ccndlog file
66
67     """
68
69     faces = dict()
70     sep = " "
71
72     with open(filename, "r") as f:
73
74         data = []
75
76         for line in f:
77             cols =  line.strip().split(sep)
78
79             # CCN_PEEK
80             # MESSAGE interest_from
81             # 1374181938.808523 ccnd[9245]: debug.4352 interest_from 6 ccnx:/test/bunny.ts (23 bytes,sim=0CDCC1D7)
82             #
83             # MESSAGE interest_to
84             # 1374181938.812750 ccnd[9245]: debug.3502 interest_to 5 ccnx:/test/bunny.ts (39 bytes,i=2844,sim=0CDCC1D7)
85             #
86             # MESSAGE CONTENT FROM
87             # 1374181938.868682 ccnd[9245]: debug.4643 content_from 5 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
88             #
89             # MESSAGE CONTENT_TO
90             # 1374181938.868772 ccnd[9245]: debug.1619 content_to 6 ccnx:/test/bunny.ts/%FD%05%1E%85%8FVw/%00/%9E%3D%01%D9%3Cn%95%2BvZ%8
91             #
92             # 1375596708.222304 ccnd[9758]: debug.3692 interest_expiry ccnx:/test/bunny.ts/%FD%05%1E%86%B1GS/%00%0A%F7 (44 bytes,c=0:1,i=2819,sim=49FA8048)
93
94             # External face creation
95             # 1374181452.965961 ccnd[9245]: accepted datagram client id=5 (flags=0x40012) 204.85.191.10 port 9695
96
97             if line.find("accepted datagram client") > -1:
98                 face_id = (cols[5]).replace("id=",'')
99                 ip = cols[7] 
100                 port = cols[9]
101                 faces[face_id] = (ip, port)
102                 continue
103
104             # 1374181452.985296 ccnd[9245]: releasing face id 4 (slot 4)
105             if line.find("releasing face id") > -1:
106                 face_id = cols[5]
107                 if face_id in faces:
108                     del faces[face_id]
109                 continue
110
111             if len(cols) < 6:
112                 continue
113
114             timestamp = cols[0]
115             message_type = cols[3]
116
117             if message_type not in ["interest_from", "interest_to", "content_from", 
118                     "content_to", "interest_dupnonce", "interest_expiry"]:
119                 continue
120
121             face_id = cols[4] 
122             content_name = cols[5]
123
124             # Interest Nonce ? -> 412A74-0844-0008-50AA-F6EAD4
125             nonce = ""
126             if message_type in ["interest_from", "interest_to", "interest_dupnonce"]:
127                 last = cols[-1]
128                 if len(last.split("-")) == 5:
129                     nonce = last
130
131             try:
132                 size = int((cols[6]).replace('(',''))
133             except:
134                 print("interest_expiry without face id!", line)
135                 continue
136
137             # If no external IP address was identified for this face
138             # asume it is a local face
139             peer = "localhost"
140     
141             if face_id in faces:
142                 peer, port = faces[face_id]
143     
144             data.append((content_name, timestamp, message_type, peer, face_id, 
145                 size, nonce, line))
146
147     return data
148
149 def dump_content_history(content_history):
150     f = tempfile.NamedTemporaryFile(delete=False)
151     pickle.dump(content_history, f)
152     f.close()
153     return f.name
154
155 def load_content_history(fname):
156     with open(fname, "r") as f:
157         content_history = pickle.load(f)
158
159     os.remove(fname)
160     return content_history
161
162 def annotate_cn_node(graph, nid, ips2nid, data, content_history):
163     for (content_name, timestamp, message_type, peer, face_id, 
164             size, nonce, line) in data:
165
166         # Ignore control messages for the time being
167         if is_control(content_name):
168             continue
169
170         if message_type == "interest_from" and \
171                 peer == "localhost":
172             graph.node[nid]["ccn_consumer"] = True
173         elif message_type == "content_from" and \
174                 peer == "localhost":
175             graph.node[nid]["ccn_producer"] = True
176
177         # Ignore local messages for the time being. 
178         # They could later be used to calculate the processing times
179         # of messages.
180         if peer == "localhost":
181             continue
182
183         # remove digest
184         if message_type in ["content_from", "content_to"]:
185             content_name = "/".join(content_name.split("/")[:-1])
186            
187         if content_name not in content_history:
188             content_history[content_name] = list()
189       
190         peernid = ips2nid[peer]
191         graph.add_edge(nid, peernid)
192
193         content_history[content_name].append((timestamp, message_type, nid, 
194             peernid, nonce, size, line))
195
196 def annotate_cn_graph(logs_dir, graph, parse_ping_logs = False):
197     """ Adds CCN content history for each node in the topology graph.
198
199     """
200     
201     # Make a copy of the graph to ensure integrity
202     graph = graph.copy()
203
204     ips2nid = dict()
205
206     for nid in graph.nodes():
207         ips = graph.node[nid]["ips"]
208         for ip in ips:
209             ips2nid[ip] = nid
210
211     found_files = False
212
213     # Now walk through the ccnd logs...
214     for dirpath, dnames, fnames in os.walk(logs_dir):
215         # continue if we are not at the leaf level (if there are subdirectories)
216         if dnames: 
217             continue
218         
219         # Each dirpath correspond to a different node
220         nid = os.path.basename(dirpath)
221
222         # Cast to numeric nid if necessary
223         if int(nid) in graph.nodes():
224             nid = int(nid)
225     
226         content_history = dict()
227
228         for fname in fnames:
229             if fname.endswith(".log"):
230                 found_files = True
231                 filename = os.path.join(dirpath, fname)
232                 data = parse_file(filename)
233                 annotate_cn_node(graph, nid, ips2nid, data, content_history)
234
235         # Avoid storing everything in memory, instead dump to a file
236         # and reference the file
237         fname = dump_content_history(content_history)
238         graph.node[nid]["history"] = fname
239
240     if not found_files:
241         msg = "No CCND output files were found to parse at %s " % logs_dir
242         raise RuntimeError(msg)
243
244     if parse_ping_logs:
245         ping_parser.annotate_cn_graph(logs_dir, graph)
246
247     return graph
248
249 def ccn_producers(graph):
250     """ Returns the nodes that are content providers """
251     return [nid for nid in graph.nodes() \
252             if graph.node[nid].get("ccn_producer")]
253
254 def ccn_consumers(graph):
255     """ Returns the nodes that are content consumers """
256     return [nid for nid in graph.nodes() \
257             if graph.node[nid].get("ccn_consumer")]
258
259 def process_content_history(graph):
260     """ Compute CCN message counts and aggregates content historical 
261     information in the content_names dictionary 
262     
263     """
264
265     ## Assume single source
266     source = ccn_consumers(graph)[0]
267
268     interest_expiry_count = 0
269     interest_dupnonce_count = 0
270     interest_count = 0
271     content_count = 0
272     content_names = dict()
273
274     # Collect information about exchanged messages by content name and
275     # link delay info.
276     for nid in graph.nodes():
277         # Load the data collected from the node's ccnd log
278         fname = graph.node[nid]["history"]
279         history = load_content_history(fname)
280
281         for content_name in history.keys():
282             hist = history[content_name]
283
284             for (timestamp, message_type, nid1, nid2, nonce, size, line) in hist:
285                 if message_type in ["content_from", "content_to"]:
286                     # The first Interest sent will not have a version or chunk number.
287                     # The first Content sent back in reply, will end in /=00 or /%00.
288                     # Make sure to map the first Content to the first Interest.
289                     if content_name.endswith("/=00"):
290                         content_name = "/".join(content_name.split("/")[0:-2])
291
292                 # Add content name to dictionary
293                 if content_name not in content_names:
294                     content_names[content_name] = dict()
295                     content_names[content_name]["interest"] = dict()
296                     content_names[content_name]["content"] = list()
297
298                 # Classify interests by replica
299                 if message_type in ["interest_from"] and \
300                         nonce not in content_names[content_name]["interest"]:
301                     content_names[content_name]["interest"][nonce] = list()
302      
303                 # Add consumer history
304                 if nid == source:
305                     if message_type in ["interest_to", "content_from"]:
306                         # content name history as seen by the source
307                         if "consumer_history" not in content_names[content_name]:
308                             content_names[content_name]["consumer_history"] = list()
309
310                         content_names[content_name]["consumer_history"].append(
311                                 (timestamp, message_type)) 
312
313                 # Add messages per content name and cumulate totals by message type
314                 if message_type == "interest_dupnonce":
315                     interest_dupnonce_count += 1
316                 elif message_type == "interest_expiry":
317                     interest_expiry_count += 1
318                 elif message_type == "interest_from":
319                     interest_count += 1
320                     # Append to interest history of the content name
321                     content_names[content_name]["interest"][nonce].append(
322                             (timestamp, nid2, nid1))
323                 elif message_type == "content_from":
324                     content_count += 1
325                     # Append to content history of the content name
326                     content_names[content_name]["content"].append((timestamp, nid2, nid1))
327                 else:
328                     continue
329             del hist
330         del history
331
332     # Compute the time elapsed between the time an interest is sent
333     # in the consumer node and when the content is received back
334     for content_name in content_names.keys():
335         # order content and interest messages by timestamp
336         content_names[content_name]["content"] = sorted(
337               content_names[content_name]["content"])
338         
339         for nonce, timestamps in content_names[content_name][
340                     "interest"].iteritems():
341               content_names[content_name]["interest"][nonce] = sorted(
342                         timestamps)
343       
344         history = sorted(content_names[content_name]["consumer_history"])
345         content_names[content_name]["consumer_history"] = history
346
347         # compute the rtt time of the message
348         rtt = None
349         waiting_content = False 
350         interest_timestamp = None
351         content_timestamp = None
352         
353         for (timestamp, message_type) in history:
354             if not waiting_content and message_type == "interest_to":
355                 waiting_content = True
356                 interest_timestamp = timestamp
357                 continue
358
359             if waiting_content and message_type == "content_from":
360                 content_timestamp = timestamp
361                 break
362     
363         # If we can't determine who sent the interest, discard it
364         rtt = -1
365         if interest_timestamp and content_timestamp:
366             rtt = compute_delay_ms(content_timestamp, interest_timestamp)
367
368         content_names[content_name]["rtt"] = rtt
369         content_names[content_name]["lapse"] = (interest_timestamp, content_timestamp)
370
371     return (graph,
372         content_names,
373         interest_expiry_count,
374         interest_dupnonce_count,
375         interest_count,
376         content_count)
377
378 def process_content_history_logs(logs_dir, graph, parse_ping_logs = False):
379     """ Parse CCN logs and aggregate content history information in graph.
380     Returns annotated graph and message countn and content names history.
381
382     """
383     ## Process logs and analyse data
384     try:
385         graph = annotate_cn_graph(logs_dir, graph, 
386                 parse_ping_logs = parse_ping_logs)
387     except:
388         print("Skipping: Error parsing ccnd logs", logs_dir)
389         raise
390
391     source = ccn_consumers(graph)[0]
392     target = ccn_producers(graph)[0]
393
394     # Process the data from the ccnd logs, but do not re compute
395     # the link delay. 
396     try:
397         (graph,
398         content_names,
399         interest_expiry_count,
400         interest_dupnonce_count,
401         interest_count,
402         content_count) = process_content_history(graph)
403     except:
404         print("Skipping: Error processing ccn data", logs_dir)
405         raise
406
407     return (graph,
408             content_names,
409             interest_expiry_count,
410             interest_dupnonce_count,
411             interest_count,
412             content_count)