utilities: a top like tool for ovs-dpctl dump-flows.
[sliver-openvswitch.git] / utilities / ovs-dpctl-top.in
1 #! @PYTHON@
2 #
3 # Copyright (c) 2013 Nicira, Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 #
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
23 #
24 #
25
26 """Top like behavior for ovs-dpctl dump-flows output.
27
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
30
31   - Datapath in_port
32
33   - Ethernet type
34
35   - Source and destination MAC addresses
36
37   - IP protocol
38
39   - Source and destination IPv4 addresses
40
41   - Source and destination IPv6 addresses
42
43   - UDP and TCP destination port
44
45   - Tunnel source and destination addresses
46
47
48 Output shows four values:
49   - FIELDS: the flow fields for example in_port(1).
50
51   - PACKETS: the total number of packets containing the flow field.
52
53   - BYTES: the total number of bytes containing the flow field. If units are
54   not present then values are in bytes.
55
56   - AVERAGE: the average packets size (BYTES/PACKET).
57
58   - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60 Top Behavior
61
62 While in top mode, the default behavior, the following single character
63 commands are supported:
64
65   a - toggles top in accumulate and live mode. Accumulate mode is described
66   below.
67
68   s - toggles which column is used to sort content in decreasing order. A
69   DESC title is placed over the column.
70
71   _ - a space indicating to collect dump-flow content again
72
73   h - halt output. Any character will restart sampling
74
75   f - cycle through flow fields
76
77   q - q for quit.
78
79 Accumulate Mode
80
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate  or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
92
93
94 Debugging Errors
95
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98  was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101 Error messages will identify content that failed to parse.
102
103
104 Access Remote Hosts
105
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
109
110 $ ssh-copy-id user@hostname
111
112 Consult ssh-copy-id man pages for more details.
113
114
115 Expected usage
116
117 $ ovs-dpctl-top
118
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
122
123 """
124
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
133
134 import sys
135 import os
136 try:
137     ##
138     # Arg parse is not installed on older Python distributions.
139     # ovs ships with a version in the directory mentioned below.
140     import argparse
141 except ImportError:
142     sys.path.append(os.path.join("@pkgdatadir@", "python"))
143     import argparse
144 import logging
145 import re
146 import unittest
147 import copy
148 import curses
149 import operator
150 import subprocess
151 import fcntl
152 import struct
153 import termios
154 import datetime
155 import threading
156 import time
157 import socket
158
159
160 ##
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166     """ Calculate the network given a ipv4/mask value.
167     If a mask is not present simply return ip_str.
168     """
169     pack_length = '!HH'
170     try:
171         (ip, mask) = ip_str.split("/")
172     except ValueError:
173         # just an ip address no mask.
174         return ip_str
175
176     ip_p = socket.inet_pton(socket.AF_INET, ip)
177     ip_t = struct.unpack(pack_length, ip_p)
178     mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179     network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181     return socket.inet_ntop(socket.AF_INET,
182                             struct.pack('!HH', network_n[0], network_n[1]))
183
184
185 def ipv6_to_network(ip_str):
186     """ Calculate the network given a ipv6/mask value.
187     If a mask is not present simply return ip_str.
188     """
189     pack_length = '!HHHHHHHH'
190     try:
191         (ip, mask) = ip_str.split("/")
192     except ValueError:
193         # just an ip address no mask.
194         return ip_str
195
196     ip_p = socket.inet_pton(socket.AF_INET6, ip)
197     ip_t = struct.unpack(pack_length, ip_p)
198     mask_t = struct.unpack(pack_length,
199                            socket.inet_pton(socket.AF_INET6, mask))
200     network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202     return socket.inet_ntop(socket.AF_INET6,
203                             struct.pack(pack_length,
204                                         network_n[0], network_n[1],
205                                         network_n[2], network_n[3],
206                                         network_n[4], network_n[5],
207                                         network_n[6], network_n[7]))
208
209
210 ##
211 # columns displayed
212 ##
213 class Columns:
214     """ Holds column specific content.
215     Titles needs to be less than 8 characters.
216     """
217     VALUE_WIDTH = 9
218     FIELDS = "fields"
219     PACKETS = "packets"
220     COUNT = "count"
221     BYTES = "bytes"
222     AVERAGE = "average"
223
224     def __init__(self):
225         pass
226
227     @staticmethod
228     def assoc_list(obj):
229         """ Return a associated list. """
230         return [(Columns.FIELDS, repr(obj)),
231                 (Columns.PACKETS, obj.packets),
232                 (Columns.BYTES, obj.bytes),
233                 (Columns.COUNT, obj.count),
234                 (Columns.AVERAGE, obj.average),
235                 ]
236
237
238 def element_eth_get(field_type, element, stats_dict):
239     """ Extract eth frame src and dst from a dump-flow element."""
240     fmt = "%s(src=%s,dst=%s)"
241
242     element = fmt % (field_type, element["src"], element["dst"])
243     return SumData(field_type, element, stats_dict["packets"],
244                    stats_dict["bytes"], element)
245
246
247 def element_ipv4_get(field_type, element, stats_dict):
248     """ Extract src and dst from a dump-flow element."""
249     fmt = "%s(src=%s,dst=%s)"
250     element_show = fmt % (field_type, element["src"], element["dst"])
251
252     element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253                          ipv4_to_network(element["dst"]))
254
255     return SumData(field_type, element_show, stats_dict["packets"],
256                        stats_dict["bytes"], element_key)
257
258
259 def element_tunnel_get(field_type, element, stats_dict):
260     """ Extract src and dst from a tunnel."""
261     return element_ipv4_get(field_type, element, stats_dict)
262
263
264 def element_ipv6_get(field_type, element, stats_dict):
265     """ Extract src and dst from a dump-flow element."""
266
267     fmt = "%s(src=%s,dst=%s)"
268     element_show = fmt % (field_type, element["src"], element["dst"])
269
270     element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271                          ipv6_to_network(element["dst"]))
272
273     return SumData(field_type, element_show, stats_dict["packets"],
274                        stats_dict["bytes"], element_key)
275
276
277 def element_dst_port_get(field_type, element, stats_dict):
278     """ Extract src and dst from a dump-flow element."""
279     element_key = "%s(dst=%s)" % (field_type, element["dst"])
280     return SumData(field_type, element_key, stats_dict["packets"],
281                    stats_dict["bytes"], element_key)
282
283
284 def element_passthrough_get(field_type, element, stats_dict):
285     """ Extract src and dst from a dump-flow element."""
286     element_key = "%s(%s)" % (field_type, element)
287     return SumData(field_type, element_key,
288                    stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291 # pylint: disable-msg=R0903
292 class OutputFormat:
293     """ Holds field_type and function to extract element value. """
294     def __init__(self, field_type, generator):
295         self.field_type = field_type
296         self.generator = generator
297
298 OUTPUT_FORMAT = [
299     OutputFormat("eth", element_eth_get),
300     OutputFormat("ipv4", element_ipv4_get),
301     OutputFormat("ipv6", element_ipv6_get),
302     OutputFormat("tunnel", element_tunnel_get),
303     OutputFormat("udp", element_dst_port_get),
304     OutputFormat("tcp", element_dst_port_get),
305     OutputFormat("eth_type", element_passthrough_get),
306     OutputFormat("in_port", element_passthrough_get)
307     ]
308
309
310 ELEMENT_KEY = {
311     "udp": "udp.dst",
312     "tcp": "tcp.dst"
313     }
314
315
316 def top_input_get(args):
317     """ Return subprocess stdout."""
318     cmd = []
319     if (args.host):
320         cmd += ["ssh", args.host]
321     cmd += ["ovs-dpctl", "dump-flows"]
322
323     return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
324                             stdout=subprocess.PIPE).stdout
325
326
327 def args_get():
328     """ read program parameters handle any necessary validation of input. """
329
330     parser = argparse.ArgumentParser(
331                           formatter_class=argparse.RawDescriptionHelpFormatter,
332                           description=__doc__)
333     ##
334     # None is a special value indicating to read flows from stdin.
335     # This handles the case
336     #   ovs-dpctl dump-flows | ovs-dpctl-flows.py
337     parser.add_argument("-v", "--version", version="@VERSION@",
338                         action="version", help="show version")
339     parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
340                         action="append",
341                         help="file containing flows from ovs-dpctl dump-flow")
342     parser.add_argument("-V", "--verbose", dest="verbose",
343                         default=logging.CRITICAL,
344                         action="store_const", const=logging.DEBUG,
345                         help="enable debug level verbosity")
346     parser.add_argument("-s", "--script", dest="top", action="store_false",
347                         help="Run from a script (no user interface)")
348     parser.add_argument("--host", dest="host",
349                         help="Specify a user@host for retrieving flows see"
350                              "Accessing Remote Hosts for more information")
351
352     parser.add_argument("-a", "--accumulate", dest="accumulate",
353                         action="store_true", default=False,
354                         help="Accumulate dump-flow content")
355     parser.add_argument("--accumulate-decay", dest="accumulateDecay",
356                         default=5.0 * 60, type=float,
357                         help="Decay old accumulated flows. "
358                         "The default is 5 minutes. "
359                         "A value of 0 disables decay.")
360     parser.add_argument("-d", "--delay", dest="delay", type=int,
361                         default=1000,
362                         help="Delay in milliseconds to collect dump-flow "
363                              "content (sample rate).")
364
365     args = parser.parse_args()
366
367     logging.basicConfig(level=args.verbose)
368
369     return args
370
371 ###
372 # Code to parse a single line in dump-flow
373 ###
374 # key(values)
375 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
376 # key:value
377 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
378 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
379
380
381 def flow_line_iter(line):
382     """ iterate over flow dump elements.
383     return tuples of (true, element) or (false, remaining element)
384     """
385     # splits by , except for when in a (). Actions element was not
386     # split properly but we don't need it.
387     rc = []
388
389     element = ""
390     paren_count = 0
391
392     for ch in line:
393         if (ch == '('):
394             paren_count += 1
395         elif (ch == ')'):
396             paren_count -= 1
397
398         if (ch == ' '):
399             # ignore white space.
400             continue
401         elif ((ch == ',') and (paren_count == 0)):
402             rc.append(element)
403             element = ""
404         else:
405             element += ch
406
407     if (paren_count):
408         raise ValueError(line)
409     else:
410         if (len(element) > 0):
411             rc.append(element)
412     return rc
413
414
415 def flow_line_compound_parse(compound):
416     """ Parse compound element
417     for example
418     src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
419     which is in
420     eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
421     """
422     result = {}
423     for element in flow_line_iter(compound):
424         match = FIELDS_CMPND_ELEMENT.search(element)
425         if (match):
426             key = match.group(1)
427             value = match.group(2)
428             result[key] = value
429
430         match = FIELDS_CMPND.search(element)
431         if (match):
432             key = match.group(1)
433             value = match.group(2)
434             result[key] = flow_line_compound_parse(value)
435             continue
436
437     if (len(result.keys()) == 0):
438         return compound
439     return result
440
441
442 def flow_line_split(line):
443     """ Convert a flow dump line into ([fields], [stats], actions) tuple.
444     Where fields and stats are lists.
445     This function relies on a the following ovs-dpctl dump-flow
446     output characteristics:
447     1. The dumpe flow line consists of a list of frame fields, list of stats
448        and action.
449     2. list of frame fields, each stat and action field are delimited by ', '.
450     3. That all other non stat field are not delimited by ', '.
451
452     """
453
454     results = re.split(', ', line)
455
456     (field, stats, action) = (results[0], results[1:-1], results[-1])
457
458     fields = flow_line_iter(field)
459     return (fields, stats, action)
460
461
462 def elements_to_dict(elements):
463     """ Convert line to a hierarchy of dictionaries. """
464     result = {}
465     for element in elements:
466         match = FIELDS_CMPND.search(element)
467         if (match):
468             key = match.group(1)
469             value = match.group(2)
470             result[key] = flow_line_compound_parse(value)
471             continue
472
473         match = FIELDS_ELEMENT.search(element)
474         if (match):
475             key = match.group(1)
476             value = match.group(2)
477             result[key] = value
478         else:
479             raise ValueError("can't parse >%s<" % element)
480     return result
481
482
483 # pylint: disable-msg=R0903
484 class SumData(object):
485     """ Interface that all data going into SumDb must implement.
486     Holds the flow field and its corresponding count, total packets,
487     total bytes and calculates average.
488
489     __repr__ is used as key into SumData singleton.
490     __str__ is used as human readable output.
491     """
492
493     def __init__(self, field_type, field, packets, flow_bytes, key):
494         # Count is the number of lines in the dump-flow log.
495         self.field_type = field_type
496         self.field = field
497         self.count = 1
498         self.packets = int(packets)
499         self.bytes = int(flow_bytes)
500         self.key = key
501
502     def decrement(self, decr_packets, decr_bytes, decr_count):
503         """ Decrement content to calculate delta from previous flow sample."""
504         self.packets -= decr_packets
505         self.bytes -= decr_bytes
506         self.count -= decr_count
507
508     def __iadd__(self, other):
509         """ Add two objects. """
510
511         if (self.key != other.key):
512             raise ValueError("adding two unrelated types")
513
514         self.count += other.count
515         self.packets += other.packets
516         self.bytes += other.bytes
517         return self
518
519     def __isub__(self, other):
520         """ Decrement two objects. """
521
522         if (self.key != other.key):
523             raise ValueError("adding two unrelated types")
524
525         self.count -= other.count
526         self.packets -= other.packets
527         self.bytes -= other.bytes
528         return self
529
530     def __getattr__(self, name):
531         """ Handle average. """
532         if (name == "average"):
533             if (self.packets == 0):
534                 return float(0.0)
535             else:
536                 return float(self.bytes) / float(self.packets)
537         raise AttributeError(name)
538
539     def __str__(self):
540         """ Used for debugging. """
541         return "%s %s %s %s" % (self.field, self.count,
542                                    self.packets, self.bytes)
543
544     def __repr__(self):
545         """ Used as key in the FlowDB table. """
546         return self.key
547
548
549 def flow_aggregate(fields_dict, stats_dict):
550     """ Search for content in a line.
551     Passed the flow port of the dump-flows plus the current stats consisting
552     of packets, bytes, etc
553     """
554     result = []
555
556     for output_format in OUTPUT_FORMAT:
557         field = fields_dict.get(output_format.field_type, None)
558         if (field):
559             obj = output_format.generator(output_format.field_type,
560                                           field, stats_dict)
561             result.append(obj)
562
563     return result
564
565
566 def flows_read(ihdl, flow_db):
567     """ read flow content from ihdl and insert into flow_db. """
568
569     done = False
570     while (not done):
571         line = ihdl.readline()
572         if (len(line) == 0):
573             # end of input
574             break
575
576         try:
577             flow_db.flow_line_add(line)
578         except ValueError, arg:
579             logging.error(arg)
580
581     return flow_db
582
583
584 def get_terminal_size():
585     """
586     return column width and height of the terminal
587     """
588     for fd_io in [0, 1, 2]:
589         try:
590             result = struct.unpack('hh',
591                                    fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
592                                                '1234'))
593         except IOError:
594             result = None
595             continue
596
597     if (result is None or result == (0, 0)):
598         # Maybe we can't get the width. In that case assume (25, 80)
599         result = (25, 80)
600
601     return result
602
603 ##
604 # Content derived from:
605 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
606 ##
607 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
608             1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
609
610
611 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
612     """Convert a file size to human-readable form.
613
614     Keyword arguments:
615     size -- file size in bytes
616     a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
617                                     if False, use multiples of 1000
618
619     Returns: string
620
621     """
622     size = float(size)
623     if size < 0:
624         raise ValueError('number must be non-negative')
625
626     if (a_kilobyte_is_1024_bytes):
627         multiple = 1024
628     else:
629         multiple = 1000
630     for suffix in SUFFIXES[multiple]:
631         size /= multiple
632         if size < multiple:
633             return "%.1f %s" % (size, suffix)
634
635     raise ValueError('number too large')
636
637
638 ##
639 # End copied content
640 ##
641 class ColMeta:
642     """ Concepts about columns. """
643     def __init__(self, sortable, width):
644         self.sortable = sortable
645         self.width = width
646
647
648 class RowMeta:
649     """ How to render rows. """
650     def __init__(self, label, fmt):
651         self.label = label
652         self.fmt = fmt
653
654
655 def fmt_packet(obj, width):
656     """ Provide a string for packets that is appropriate for output."""
657     return str(obj.packets).rjust(width)
658
659
660 def fmt_count(obj, width):
661     """ Provide a string for average that is appropriate for output."""
662     return str(obj.count).rjust(width)
663
664
665 def fmt_avg(obj, width):
666     """ Provide a string for average that is appropriate for output."""
667     return str(int(obj.average)).rjust(width)
668
669
670 def fmt_field(obj, width):
671     """ truncate really long flow and insert ellipses to help make it
672     clear.
673     """
674
675     ellipses = " ... "
676     value = obj.field
677     if (len(obj.field) > width):
678         value = value[:(width - len(ellipses))] + ellipses
679     return value.ljust(width)
680
681
682 def fmt_bytes(obj, width):
683     """ Provide a string for average that is appropriate for output."""
684     if (len(str(obj.bytes)) <= width):
685         value = str(obj.bytes)
686     else:
687         value = approximate_size(obj.bytes)
688     return value.rjust(width)
689
690
691 def title_center(value, width):
692     """ Center a column title."""
693     return value.upper().center(width)
694
695
696 def title_rjust(value, width):
697     """ Right justify a column title. """
698     return value.upper().rjust(width)
699
700
701 def column_picker(order, obj):
702     """ return the column as specified by order. """
703     if (order == 1):
704         return obj.count
705     elif (order == 2):
706         return obj.packets
707     elif (order == 3):
708         return obj.bytes
709     elif (order == 4):
710         return obj.average
711     else:
712         raise ValueError("order outside of range %s" % order)
713
714
715 class Render:
716     """ Renders flow data. """
717     def __init__(self, console_width):
718         """ Calculate column widths taking into account changes in format."""
719
720         self._start_time = datetime.datetime.now()
721
722         self._cols = [ColMeta(False, 0),
723                       ColMeta(True, Columns.VALUE_WIDTH),
724                       ColMeta(True, Columns.VALUE_WIDTH),
725                       ColMeta(True, Columns.VALUE_WIDTH),
726                       ColMeta(True, Columns.VALUE_WIDTH)]
727         self._console_width = console_width
728         self.console_width_set(console_width)
729
730         # Order in this array dictate the order of the columns.
731         # The 0 width for the first entry is a place holder. This is
732         # dynamically calculated. The first column is special. We need a
733         # way to indicate which field are presented.
734         self._descs = [RowMeta("", title_rjust),
735                        RowMeta("", title_rjust),
736                        RowMeta("", title_rjust),
737                        RowMeta("", title_rjust),
738                        RowMeta("", title_rjust)]
739         self._column_sort_select = 0
740         self.column_select_event()
741
742         self._titles = [
743             RowMeta(Columns.FIELDS, title_center),
744             RowMeta(Columns.COUNT, title_rjust),
745             RowMeta(Columns.PACKETS, title_rjust),
746             RowMeta(Columns.BYTES, title_rjust),
747             RowMeta(Columns.AVERAGE, title_rjust)
748         ]
749
750         self._datas = [
751             RowMeta(None, fmt_field),
752             RowMeta(None, fmt_count),
753             RowMeta(None, fmt_packet),
754             RowMeta(None, fmt_bytes),
755             RowMeta(None, fmt_avg)
756             ]
757
758         ##
759         # _field_types hold which fields are displayed in the field
760         # column, with the keyword all implying all fields.
761         ##
762         self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
763
764         ##
765         # The default is to show all field types.
766         ##
767         self._field_type_select = -1
768         self.field_type_toggle()
769
770     def _field_type_select_get(self):
771         """ Return which field type to display. """
772         return self._field_types[self._field_type_select]
773
774     def field_type_toggle(self):
775         """ toggle which field types to show. """
776         self._field_type_select += 1
777         if (self._field_type_select >= len(self._field_types)):
778             self._field_type_select = 0
779         value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
780         self._titles[0].label = value
781
782     def column_select_event(self):
783         """ Handles column select toggle. """
784
785         self._descs[self._column_sort_select].label = ""
786         for _ in range(len(self._cols)):
787             self._column_sort_select += 1
788             if (self._column_sort_select >= len(self._cols)):
789                 self._column_sort_select = 0
790
791             # Now look for the next sortable column
792             if (self._cols[self._column_sort_select].sortable):
793                 break
794         self._descs[self._column_sort_select].label = "DESC"
795
796     def console_width_set(self, console_width):
797         """ Adjust the output given the new console_width. """
798         self._console_width = console_width
799
800         spaces = len(self._cols) - 1
801         ##
802         # Calculating column width can be tedious but important. The
803         # flow field value can be long. The goal here is to dedicate
804         # fixed column space for packets, bytes, average and counts. Give the
805         # remaining space to the flow column. When numbers get large
806         # transition output to output generated by approximate_size which
807         # limits output to ###.# XiB in other words 9 characters.
808         ##
809         # At this point, we know the maximum length values. We may
810         # truncate the flow column to get everything to fit.
811         self._cols[0].width = 0
812         values_max_length = sum([ii.width for ii in self._cols]) + spaces
813         flow_max_length = console_width - values_max_length
814         self._cols[0].width = flow_max_length
815
816     def format(self, flow_db):
817         """ shows flows based on --script parameter."""
818
819         rc = []
820         ##
821         # Top output consists of
822         # Title
823         # Column title (2 rows)
824         # data
825         # statistics and status
826
827         ##
828         # Title
829         ##
830         rc.append("Flow Summary".center(self._console_width))
831
832         stats = " Total: %(flow_total)s  errors: %(flow_errors)s " % \
833                   flow_db.flow_stats_get()
834         accumulate = flow_db.accumulate_get()
835         if (accumulate):
836             stats += "Accumulate: on "
837         else:
838             stats += "Accumulate: off "
839
840         duration = datetime.datetime.now() - self._start_time
841         stats += "Duration: %s " % str(duration)
842         rc.append(stats.ljust(self._console_width))
843
844         ##
845         # 2 rows for columns.
846         ##
847         # Indicate which column is in descending order.
848         rc.append(" ".join([ii.fmt(ii.label, col.width)
849                             for (ii, col) in zip(self._descs, self._cols)]))
850
851         rc.append(" ".join([ii.fmt(ii.label, col.width)
852                          for (ii, col) in zip(self._titles, self._cols)]))
853
854         ##
855         # Data.
856         ##
857         for dd in flow_db.field_values_in_order(self._field_type_select_get(),
858                                                 self._column_sort_select):
859             rc.append(" ".join([ii.fmt(dd, col.width)
860                                 for (ii, col) in zip(self._datas,
861                                                      self._cols)]))
862
863         return rc
864
865
866 def curses_screen_begin():
867     """ begin curses screen control. """
868     stdscr = curses.initscr()
869     curses.cbreak()
870     curses.noecho()
871     stdscr.keypad(1)
872     return stdscr
873
874
875 def curses_screen_end(stdscr):
876     """ end curses screen control. """
877     curses.nocbreak()
878     stdscr.keypad(0)
879     curses.echo()
880     curses.endwin()
881
882
883 class FlowDB:
884     """ Implements live vs accumulate mode.
885
886     Flows are stored as key value pairs. The key consists of the content
887     prior to stat fields. The value portion consists of stats in a dictionary
888     form.
889
890     @ \todo future add filtering here.
891     """
892     def __init__(self, accumulate):
893         self._accumulate = accumulate
894         self._error_count = 0
895         # Values are (stats, last update time.)
896         # The last update time is used for aging.
897         self._flow_lock = threading.Lock()
898         # This dictionary holds individual flows.
899         self._flows = {}
900         # This dictionary holds aggregate of flow fields.
901         self._fields = {}
902
903     def accumulate_get(self):
904         """ Return the current accumulate state. """
905         return self._accumulate
906
907     def accumulate_toggle(self):
908         """ toggle accumulate flow behavior. """
909         self._accumulate = not self._accumulate
910
911     def begin(self):
912         """ Indicate the beginning of processing flow content.
913         if accumulate is false clear current set of flows. """
914
915         if (not self._accumulate):
916             self._flow_lock.acquire()
917             try:
918                 self._flows.clear()
919             finally:
920                 self._flow_lock.release()
921             self._fields.clear()
922
923     def flow_line_add(self, line):
924         """ Split a line from a ovs-dpctl dump-flow into key and stats.
925         The order of the content in the flow should be:
926         - flow content
927         - stats for the flow
928         - actions
929
930         This method also assumes that the dump flow output does not
931         change order of fields of the same flow.
932         """
933
934         line = line.rstrip("\n")
935         (fields, stats, _) = flow_line_split(line)
936
937         try:
938             fields_dict = elements_to_dict(fields)
939
940             if (len(fields_dict) == 0):
941                 raise ValueError("flow fields are missing %s", line)
942
943             stats_dict = elements_to_dict(stats)
944             if (len(stats_dict) == 0):
945                 raise ValueError("statistics are missing %s.", line)
946
947             ##
948             # In accumulate mode, the Flow database can reach 10,000's of
949             # persistent flows. The interaction of the script with this many
950             # flows is too slow. Instead, delta are sent to the flow_db
951             # database allow incremental changes to be done in O(m) time
952             # where m is the current flow list, instead of iterating over
953             # all flows in O(n) time where n is the entire history of flows.
954             key = ",".join(fields)
955
956             self._flow_lock.acquire()
957             try:
958                 (stats_old_dict, _) = self._flows.get(key, (None, None))
959             finally:
960                 self._flow_lock.release()
961
962             self.flow_event(fields_dict, stats_old_dict, stats_dict)
963
964         except ValueError, arg:
965             logging.error(arg)
966             self._error_count += 1
967             raise
968
969         self._flow_lock.acquire()
970         try:
971             self._flows[key] = (stats_dict, datetime.datetime.now())
972         finally:
973             self._flow_lock.release()
974
975     def decay(self, decayTimeInSeconds):
976         """ Decay content. """
977         now = datetime.datetime.now()
978         for (key, value) in self._flows.items():
979             (stats_dict, updateTime) = value
980             delta = now - updateTime
981
982             if (delta.seconds > decayTimeInSeconds):
983                 self._flow_lock.acquire()
984                 try:
985                     del self._flows[key]
986
987                     fields_dict = elements_to_dict(flow_line_iter(key))
988                     matches = flow_aggregate(fields_dict, stats_dict)
989                     for match in matches:
990                         self.field_dec(match)
991
992                 finally:
993                     self._flow_lock.release()
994
995     def flow_stats_get(self):
996         """ Return statistics in a form of a dictionary. """
997         rc = None
998         self._flow_lock.acquire()
999         try:
1000             rc = {"flow_total": len(self._flows),
1001                   "flow_errors": self._error_count}
1002         finally:
1003             self._flow_lock.release()
1004         return rc
1005
1006     def field_types_get(self):
1007         """ Return the set of types stored in the singleton. """
1008         types = set((ii.field_type for ii in self._fields.values()))
1009         return types
1010
1011     def field_add(self, data):
1012         """ Collect dump-flow data to sum number of times item appears. """
1013         current = self._fields.get(repr(data), None)
1014         if (current is None):
1015             current = copy.copy(data)
1016         else:
1017             current += data
1018         self._fields[repr(current)] = current
1019
1020     def field_dec(self, data):
1021         """ Collect dump-flow data to sum number of times item appears. """
1022         current = self._fields.get(repr(data), None)
1023         if (current is None):
1024             raise ValueError("decrementing field missing %s" % repr(data))
1025
1026         current -= data
1027         self._fields[repr(current)] = current
1028         if (current.count == 0):
1029             del self._fields[repr(current)]
1030
1031     def field_values_in_order(self, field_type_select, column_order):
1032         """ Return a list of items in order maximum first. """
1033         values = self._fields.values()
1034         if (field_type_select != "all"):
1035             # If a field type other than "all" then reduce the list.
1036             values = [ii for ii in values
1037                       if (ii.field_type == field_type_select)]
1038         values = [(column_picker(column_order, ii), ii) for ii in values]
1039         values.sort(key=operator.itemgetter(0))
1040         values.reverse()
1041         values = [ii[1] for ii in values]
1042         return values
1043
1044     def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1045         """ Receives new flow information. """
1046
1047         # In order to avoid processing every flow at every sample
1048         # period, changes in flow packet count is used to determine the
1049         # delta in the flow statistics. This delta is used in the call
1050         # to self.decrement prior to self.field_add
1051
1052         if (stats_old_dict is None):
1053             # This is a new flow
1054             matches = flow_aggregate(fields_dict, stats_new_dict)
1055             for match in matches:
1056                 self.field_add(match)
1057         else:
1058             old_packets = int(stats_old_dict.get("packets", 0))
1059             new_packets = int(stats_new_dict.get("packets", 0))
1060             if (old_packets == new_packets):
1061                 # ignore. same data.
1062                 pass
1063             else:
1064                 old_bytes = stats_old_dict.get("bytes", 0)
1065                 # old_packets != new_packets
1066                 # if old_packets > new_packets then we end up decrementing
1067                 # packets and bytes.
1068                 matches = flow_aggregate(fields_dict, stats_new_dict)
1069                 for match in matches:
1070                     match.decrement(int(old_packets), int(old_bytes), 1)
1071                     self.field_add(match)
1072
1073
1074 class DecayThread(threading.Thread):
1075     """ Periodically call flow database to see if any flows are old. """
1076     def __init__(self, flow_db, interval):
1077         """ Start decay thread. """
1078         threading.Thread.__init__(self)
1079
1080         self._interval = max(1, interval)
1081         self._min_interval = min(1, interval / 10)
1082         self._flow_db = flow_db
1083         self._event = threading.Event()
1084         self._running = True
1085
1086         self.daemon = True
1087
1088     def run(self):
1089         """ Worker thread which handles decaying accumulated flows. """
1090
1091         while(self._running):
1092             self._event.wait(self._min_interval)
1093             if (self._running):
1094                 self._flow_db.decay(self._interval)
1095
1096     def stop(self):
1097         """ Stop thread. """
1098         self._running = False
1099         self._event.set()
1100         ##
1101         # Give the calling thread time to terminate but not too long.
1102         # this thread is a daemon so the application will terminate if
1103         # we timeout during the join. This is just a cleaner way to
1104         # release resources.
1105         self.join(2.0)
1106
1107
1108 def flow_top_command(stdscr, render, flow_db):
1109     """ Handle input while in top mode. """
1110     ch = stdscr.getch()
1111     ##
1112     # Any character will restart sampling.
1113     if (ch == ord('h')):
1114         # halt output.
1115         ch = stdscr.getch()
1116         while (ch == -1):
1117             ch = stdscr.getch()
1118
1119     if (ch == ord('s')):
1120         # toggle which column sorts data in descending order.
1121         render.column_select_event()
1122     elif (ch == ord('a')):
1123         flow_db.accumulate_toggle()
1124     elif (ch == ord('f')):
1125         render.field_type_toggle()
1126     elif (ch == ord(' ')):
1127         # resample
1128         pass
1129
1130     return ch
1131
1132
1133 def decay_timer_start(flow_db, accumulateDecay):
1134     """ If accumulateDecay greater than zero then start timer. """
1135     if (accumulateDecay > 0):
1136         decay_timer = DecayThread(flow_db, accumulateDecay)
1137         decay_timer.start()
1138         return decay_timer
1139     else:
1140         return None
1141
1142
1143 def flows_top(args):
1144     """ handles top like behavior when --script is not specified. """
1145
1146     flow_db = FlowDB(args.accumulate)
1147     render = Render(0)
1148
1149     decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1150     lines = []
1151
1152     try:
1153         stdscr = curses_screen_begin()
1154         try:
1155             ch = 'X'
1156             #stdscr.nodelay(1)
1157             stdscr.timeout(args.delay)
1158
1159             while (ch != ord('q')):
1160                 flow_db.begin()
1161
1162                 try:
1163                     ihdl = top_input_get(args)
1164                     try:
1165                         flows_read(ihdl, flow_db)
1166                     finally:
1167                         ihdl.close()
1168                 except OSError, arg:
1169                     logging.critical(arg)
1170                     break
1171
1172                 (console_height, console_width) = stdscr.getmaxyx()
1173                 render.console_width_set(console_width)
1174
1175                 output_height = console_height - 1
1176                 line_count = range(output_height)
1177                 line_output = render.format(flow_db)
1178                 lines = zip(line_count, line_output[:output_height])
1179
1180                 stdscr.erase()
1181                 for (count, line) in lines:
1182                     stdscr.addstr(count, 0, line[:console_width])
1183                 stdscr.refresh()
1184
1185                 ch = flow_top_command(stdscr, render, flow_db)
1186
1187         finally:
1188             curses_screen_end(stdscr)
1189     except KeyboardInterrupt:
1190         pass
1191     if (decay_timer):
1192         decay_timer.stop()
1193
1194     # repeat output
1195     for (count, line) in lines:
1196         print line
1197
1198
1199 def flows_script(args):
1200     """ handles --script option. """
1201
1202     flow_db = FlowDB(args.accumulate)
1203     flow_db.begin()
1204
1205     if (args.flowFiles is None):
1206         logging.info("reading flows from stdin")
1207         ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1208         try:
1209             flow_db = flows_read(ihdl, flow_db)
1210         finally:
1211             ihdl.close()
1212     else:
1213         for flowFile in args.flowFiles:
1214             logging.info("reading flows from %s", flowFile)
1215             ihdl = open(flowFile, "r")
1216             try:
1217                 flow_db = flows_read(ihdl, flow_db)
1218             finally:
1219                 ihdl.close()
1220
1221     (_, console_width) = get_terminal_size()
1222     render = Render(console_width)
1223
1224     for line in render.format(flow_db):
1225         print line
1226
1227
1228 def main():
1229     """ Return 0 on success or 1 on failure.
1230
1231     Algorithm
1232     There are four stages to the process ovs-dpctl dump-flow content.
1233     1. Retrieve current input
1234     2. store in FlowDB and maintain history
1235     3. Iterate over FlowDB and aggregating stats for each flow field
1236     4. present data.
1237
1238     Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1239     is called. Future version will have more elaborate means for collecting
1240     dump-flow content. FlowDB returns all data as in the form of a hierarchical
1241     dictionary. Input will vary.
1242
1243     In the case of accumulate mode, flows are not purged from the FlowDB
1244     manager. Instead at the very least, merely the latest statistics are
1245     kept. In the case, of live output the FlowDB is purged prior to sampling
1246     data.
1247
1248     Aggregating results requires identify flow fields to aggregate out
1249     of the flow and summing stats.
1250
1251     """
1252     args = args_get()
1253
1254     try:
1255         if (args.top):
1256             flows_top(args)
1257         else:
1258             flows_script(args)
1259     except KeyboardInterrupt:
1260         return 1
1261     return 0
1262
1263 if __name__ == '__main__':
1264     sys.exit(main())
1265 elif __name__ == 'ovs-dpctl-top':
1266     # pylint: disable-msg=R0915
1267
1268     ##
1269     # Test case beyond this point.
1270     # pylint: disable-msg=R0904
1271     class TestsuiteFlowParse(unittest.TestCase):
1272         """
1273         parse flow into hierarchy of dictionaries.
1274         """
1275         def test_flow_parse(self):
1276             """ test_flow_parse. """
1277             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1278                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1279                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1280                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1281                    "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1282                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1283                    "38,41,44,47,50,53,56,59,62,65"
1284
1285             (fields, stats, _) = flow_line_split(line)
1286             flow_dict = elements_to_dict(fields + stats)
1287             self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1288             self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1289             self.assertEqual(flow_dict["ipv6"]["src"],
1290                              "fe80::55bf:fe42:bc96:2812")
1291             self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1292             self.assertEqual(flow_dict["packets"], "1")
1293             self.assertEqual(flow_dict["bytes"], "92")
1294
1295             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1296                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1297                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1298                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1299                    "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1300                    "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1301                    "38,41,44,47,50,53,56,59,62,65"
1302
1303             (fields, stats, _) = flow_line_split(line)
1304             flow_dict = elements_to_dict(fields + stats)
1305             self.assertEqual(flow_dict["used"], "-0.703s")
1306             self.assertEqual(flow_dict["packets"], "1")
1307             self.assertEqual(flow_dict["bytes"], "92")
1308
1309         def test_flow_sum(self):
1310             """ test_flow_sum. """
1311             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1312                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1313                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1314                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1315                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1316                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1317                    "38,41,44,47,50,53,56,59,62,65"
1318
1319             (fields, stats, _) = flow_line_split(line)
1320             stats_dict = elements_to_dict(stats)
1321             fields_dict = elements_to_dict(fields)
1322             ##
1323             # Test simple case of one line.
1324             flow_db = FlowDB(False)
1325             matches = flow_aggregate(fields_dict, stats_dict)
1326             for match in matches:
1327                 flow_db.field_add(match)
1328
1329             flow_types = flow_db.field_types_get()
1330             expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1331             self.assert_(len(flow_types) == len(expected_flow_types))
1332             for flow_type in flow_types:
1333                 self.assertTrue(flow_type in expected_flow_types)
1334
1335             for flow_type in flow_types:
1336                 sum_value = flow_db.field_values_in_order("all", 1)
1337                 self.assert_(len(sum_value) == 5)
1338                 self.assert_(sum_value[0].packets == 2)
1339                 self.assert_(sum_value[0].count == 1)
1340                 self.assert_(sum_value[0].bytes == 92)
1341
1342             ##
1343             # Add line again just to see counts go up.
1344             matches = flow_aggregate(fields_dict, stats_dict)
1345             for match in matches:
1346                 flow_db.field_add(match)
1347
1348             flow_types = flow_db.field_types_get()
1349             self.assert_(len(flow_types) == len(expected_flow_types))
1350             for flow_type in flow_types:
1351                 self.assertTrue(flow_type in expected_flow_types)
1352
1353             for flow_type in flow_types:
1354                 sum_value = flow_db.field_values_in_order("all", 1)
1355                 self.assert_(len(sum_value) == 5)
1356                 self.assert_(sum_value[0].packets == 4)
1357                 self.assert_(sum_value[0].count == 2)
1358                 self.assert_(sum_value[0].bytes == 2 * 92)
1359
1360         def test_assoc_list(self):
1361             """ test_assoc_list. """
1362             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1363                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1364                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1365                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1366                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1367                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1368                    "38,41,44,47,50,53,56,59,62,65"
1369
1370             valid_flows = [
1371                 'eth_type(0x86dd)',
1372                 'udp(dst=5355)',
1373                 'in_port(4)',
1374                 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1375                 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1376                 ]
1377
1378             (fields, stats, _) = flow_line_split(line)
1379             stats_dict = elements_to_dict(stats)
1380             fields_dict = elements_to_dict(fields)
1381
1382             ##
1383             # Test simple case of one line.
1384             flow_db = FlowDB(False)
1385             matches = flow_aggregate(fields_dict, stats_dict)
1386             for match in matches:
1387                 flow_db.field_add(match)
1388
1389             for sum_value in flow_db.field_values_in_order("all", 1):
1390                 assoc_list = Columns.assoc_list(sum_value)
1391                 for item in assoc_list:
1392                     if (item[0] == "fields"):
1393                         self.assertTrue(item[1] in valid_flows)
1394                     elif (item[0] == "packets"):
1395                         self.assertTrue(item[1] == 2)
1396                     elif (item[0] == "count"):
1397                         self.assertTrue(item[1] == 1)
1398                     elif (item[0] == "average"):
1399                         self.assertTrue(item[1] == 46.0)
1400                     elif (item[0] == "bytes"):
1401                         self.assertTrue(item[1] == 92)
1402                     else:
1403                         raise ValueError("unknown %s", item[0])
1404
1405         def test_human_format(self):
1406             """ test_assoc_list. """
1407
1408             self.assertEqual(approximate_size(0.0), "0.0 KiB")
1409             self.assertEqual(approximate_size(1024), "1.0 KiB")
1410             self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1411             self.assertEqual(approximate_size((1024 * 1024) + 100000),
1412                              "1.1 MiB")
1413             value = (1024 * 1024 * 1024) + 100000000
1414             self.assertEqual(approximate_size(value), "1.1 GiB")
1415
1416         def test_flow_line_split(self):
1417             """ Splitting a flow line is not trivial.
1418             There is no clear delimiter. Comma is used liberally."""
1419             expected_fields = ["in_port(4)",
1420                             "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1421                             "eth_type(0x86dd)",
1422                            "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1423                            "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1424                            "udp(src=61252,dst=5355)"]
1425             expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1426             expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1427                                "38,41,44,47,50,53,56,59,62,65"
1428
1429             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1430                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1431                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1432                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1433                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1434                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1435                    "38,41,44,47,50,53,56,59,62,65"
1436
1437             (fields, stats, actions) = flow_line_split(line)
1438
1439             self.assertEqual(fields, expected_fields)
1440             self.assertEqual(stats, expected_stats)
1441             self.assertEqual(actions, expected_actions)
1442
1443         def test_accumulate_decay(self):
1444             """ test_accumulate_decay: test accumulated decay. """
1445             lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1446                      "dst=ff:ff:ff:ff:ff:ff),"
1447                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1448                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1449                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1450                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1451                      "packets:1, bytes:120, used:0.004s, actions:1"]
1452
1453             flow_db = FlowDB(True)
1454             flow_db.begin()
1455             flow_db.flow_line_add(lines[0])
1456
1457             # Make sure we decay
1458             time.sleep(4)
1459             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1460             flow_db.decay(1)
1461             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1462
1463             flow_db.flow_line_add(lines[0])
1464             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1465             flow_db.decay(30)
1466             # Should not be deleted.
1467             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1468
1469             flow_db.flow_line_add(lines[0])
1470             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1471             timer = decay_timer_start(flow_db, 2)
1472             time.sleep(10)
1473             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1474             timer.stop()
1475
1476         def test_accumulate(self):
1477             """ test_accumulate test that FlowDB supports accumulate. """
1478
1479             lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1480                      "dst=ff:ff:ff:ff:ff:ff),"
1481                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1482                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1483                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1484                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1485                      "packets:1, bytes:120, used:0.004s, actions:1",
1486                      "in_port(2),"
1487                      "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1488                      "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1489                      "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1490                      "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1491                      "packets:2, bytes:5026, used:0.348s, actions:1",
1492                      "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1493                      "dst=ff:ff:ff:ff:ff:ff),"
1494                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1495                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1496                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1497                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1498                      "bytes:240, used:0.004s, actions:1"]
1499
1500             lines = [
1501                 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1502                 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1503                 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1504                 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1505                 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1506                 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1507                 ]
1508
1509             # Turn on accumulate.
1510             flow_db = FlowDB(True)
1511             flow_db.begin()
1512
1513             flow_db.flow_line_add(lines[0])
1514
1515             # Test one flow exist.
1516             sum_values = flow_db.field_values_in_order("all", 1)
1517             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1518             self.assertEqual(len(in_ports), 1)
1519             self.assertEqual(in_ports[0].packets, 1)
1520             self.assertEqual(in_ports[0].bytes, 120)
1521             self.assertEqual(in_ports[0].count, 1)
1522
1523             # simulate another sample
1524             # Test two different flows exist.
1525             flow_db.begin()
1526             flow_db.flow_line_add(lines[1])
1527             sum_values = flow_db.field_values_in_order("all", 1)
1528             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1529             self.assertEqual(len(in_ports), 1)
1530             self.assertEqual(in_ports[0].packets, 1)
1531             self.assertEqual(in_ports[0].bytes, 120)
1532             self.assertEqual(in_ports[0].count, 1)
1533
1534             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1535             self.assertEqual(len(in_ports), 1)
1536             self.assertEqual(in_ports[0].packets, 2)
1537             self.assertEqual(in_ports[0].bytes, 126)
1538             self.assertEqual(in_ports[0].count, 1)
1539
1540             # Test first flow increments packets.
1541             flow_db.begin()
1542             flow_db.flow_line_add(lines[2])
1543             sum_values = flow_db.field_values_in_order("all", 1)
1544             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1545             self.assertEqual(len(in_ports), 1)
1546             self.assertEqual(in_ports[0].packets, 2)
1547             self.assertEqual(in_ports[0].bytes, 240)
1548             self.assertEqual(in_ports[0].count, 1)
1549
1550             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1551             self.assertEqual(len(in_ports), 1)
1552             self.assertEqual(in_ports[0].packets, 2)
1553             self.assertEqual(in_ports[0].bytes, 126)
1554             self.assertEqual(in_ports[0].count, 1)
1555
1556             # Test third flow but with the same in_port(1) as the first flow.
1557             flow_db.begin()
1558             flow_db.flow_line_add(lines[3])
1559             sum_values = flow_db.field_values_in_order("all", 1)
1560             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1561             self.assertEqual(len(in_ports), 1)
1562             self.assertEqual(in_ports[0].packets, 3)
1563             self.assertEqual(in_ports[0].bytes, 360)
1564             self.assertEqual(in_ports[0].count, 2)
1565
1566             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1567             self.assertEqual(len(in_ports), 1)
1568             self.assertEqual(in_ports[0].packets, 2)
1569             self.assertEqual(in_ports[0].bytes, 126)
1570             self.assertEqual(in_ports[0].count, 1)
1571
1572             # Third flow has changes.
1573             flow_db.begin()
1574             flow_db.flow_line_add(lines[4])
1575             sum_values = flow_db.field_values_in_order("all", 1)
1576             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1577             self.assertEqual(len(in_ports), 1)
1578             self.assertEqual(in_ports[0].packets, 4)
1579             self.assertEqual(in_ports[0].bytes, 480)
1580             self.assertEqual(in_ports[0].count, 2)
1581
1582             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1583             self.assertEqual(len(in_ports), 1)
1584             self.assertEqual(in_ports[0].packets, 2)
1585             self.assertEqual(in_ports[0].bytes, 126)
1586             self.assertEqual(in_ports[0].count, 1)
1587
1588             # First flow reset.
1589             flow_db.begin()
1590             flow_db.flow_line_add(lines[5])
1591             sum_values = flow_db.field_values_in_order("all", 1)
1592             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1593             self.assertEqual(len(in_ports), 1)
1594             self.assertEqual(in_ports[0].packets, 3)
1595             self.assertEqual(in_ports[0].bytes, 360)
1596             self.assertEqual(in_ports[0].count, 2)
1597
1598             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1599             self.assertEqual(len(in_ports), 1)
1600             self.assertEqual(in_ports[0].packets, 2)
1601             self.assertEqual(in_ports[0].bytes, 126)
1602             self.assertEqual(in_ports[0].count, 1)
1603
1604         def test_parse_character_errors(self):
1605             """ test_parsing errors.
1606             The flow parses is purposely loose. Its not designed to validate
1607             input. Merely pull out what it can but there are situations
1608             that a parse error can be detected.
1609             """
1610
1611             lines = ["complete garbage",
1612                      "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1613                      "dst=33:33:00:00:00:66),"
1614                      "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1615                      "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1616                      "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1617                      "packets:2,bytes:5026,actions:1"]
1618
1619             flow_db = FlowDB(False)
1620             flow_db.begin()
1621             for line in lines:
1622                 try:
1623                     flow_db.flow_line_add(line)
1624                 except ValueError:
1625                     # We want an exception. That is how we know we have
1626                     # correctly found a simple parsing error. We are not
1627                     # looking to validate flow output just catch simple issues.
1628                     continue
1629                 self.assertTrue(False)
1630
1631         def test_tunnel_parsing(self):
1632             """ test_tunnel_parsing test parse flows with tunnel. """
1633             lines = [
1634                 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1635                 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1636                 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1637                 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1638                 "actions:userspace(pid=4294962691,slow_path(cfm))"
1639                 ]
1640             flow_db = FlowDB(False)
1641             flow_db.begin()
1642             flow_db.flow_line_add(lines[0])
1643             sum_values = flow_db.field_values_in_order("all", 1)
1644             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1645             self.assertEqual(len(in_ports), 1)
1646             self.assertEqual(in_ports[0].packets, 6)
1647             self.assertEqual(in_ports[0].bytes, 534)
1648             self.assertEqual(in_ports[0].count, 1)
1649
1650         def test_flow_multiple_paren(self):
1651             """ test_flow_multiple_paren. """
1652             line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1653             valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1654                      "in_port(2)"]
1655             rc = flow_line_iter(line)
1656             self.assertEqual(valid, rc)
1657
1658         def test_to_network(self):
1659             """ test_to_network test ipv4_to_network and ipv6_to_network. """
1660             ipv4s = [
1661                 ("192.168.0.1", "192.168.0.1"),
1662                 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1663                 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1664                 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1665                 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1666                 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1667                 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1668                 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1669                 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1670                 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1671                 ]
1672
1673             ipv6s = [
1674                 ("1::192:168:0:1", "1::192:168:0:1"),
1675                 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1676                 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1677                 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1678                 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1679                 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1680                 ("1::192:168:0:1/::", "::")
1681                 ]
1682
1683             for (ipv4_test, ipv4_check) in ipv4s:
1684                 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1685
1686             for (ipv6_test, ipv6_check) in ipv6s:
1687                 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)