3 # Copyright (c) 2013 Nicira, Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
26 """Top like behavior for ovs-dpctl dump-flows output.
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
35 - Source and destination MAC addresses
39 - Source and destination IPv4 addresses
41 - Source and destination IPv6 addresses
43 - UDP and TCP destination port
45 - Tunnel source and destination addresses
48 Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
51 - PACKETS: the total number of packets containing the flow field.
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
56 - AVERAGE: the average packets size (BYTES/PACKET).
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
62 While in top mode, the default behavior, the following single character
63 commands are supported:
65 a - toggles top in accumulate and live mode. Accumulate mode is described
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
71 _ - a space indicating to collect dump-flow content again
73 h - halt output. Any character will restart sampling
75 f - cycle through flow fields. The initial field is in_port
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
101 Error messages will identify content that failed to parse.
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
110 $ ssh-copy-id user@hostname
112 Consult ssh-copy-id man pages for more details.
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
171 (ip, mask) = ip_str.split("/")
173 # just an ip address no mask.
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
185 def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
189 pack_length = '!HHHHHHHH'
191 (ip, mask) = ip_str.split("/")
193 # just an ip address no mask.
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
238 def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
247 def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
259 def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
264 def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
277 def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
284 def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
291 # pylint: disable-msg=R0903
293 """ Holds field_type and function to extract element value. """
294 def __init__(self, field_type, generator):
295 self.field_type = field_type
296 self.generator = generator
299 # The order below is important. The initial flow field depends on whether
300 # --script or top mode is used. In top mode, the expected behavior, in_port
301 # flow fields are shown first. A future feature will allow users to
302 # filter output by selecting a row. Filtering by in_port is a natural
303 # filtering starting point.
305 # In script mode, all fields are shown. The expectation is that users could
306 # filter output by piping through grep.
308 # In top mode, the default flow field is in_port. In --script mode,
309 # the default flow field is all.
311 # All is added to the end of the OUTPUT_FORMAT list.
314 OutputFormat("in_port", element_passthrough_get),
315 OutputFormat("eth", element_eth_get),
316 OutputFormat("eth_type", element_passthrough_get),
317 OutputFormat("ipv4", element_ipv4_get),
318 OutputFormat("ipv6", element_ipv6_get),
319 OutputFormat("udp", element_dst_port_get),
320 OutputFormat("tcp", element_dst_port_get),
321 OutputFormat("tunnel", element_tunnel_get),
332 def top_input_get(args):
333 """ Return subprocess stdout."""
336 cmd += ["ssh", args.host]
337 cmd += ["ovs-dpctl", "dump-flows"]
339 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
340 stdout=subprocess.PIPE).stdout
344 """ read program parameters handle any necessary validation of input. """
346 parser = argparse.ArgumentParser(
347 formatter_class=argparse.RawDescriptionHelpFormatter,
350 # None is a special value indicating to read flows from stdin.
351 # This handles the case
352 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
353 parser.add_argument("-v", "--version", version="@VERSION@",
354 action="version", help="show version")
355 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
357 help="file containing flows from ovs-dpctl dump-flow")
358 parser.add_argument("-V", "--verbose", dest="verbose",
359 default=logging.CRITICAL,
360 action="store_const", const=logging.DEBUG,
361 help="enable debug level verbosity")
362 parser.add_argument("-s", "--script", dest="top", action="store_false",
363 help="Run from a script (no user interface)")
364 parser.add_argument("--host", dest="host",
365 help="Specify a user@host for retrieving flows see"
366 "Accessing Remote Hosts for more information")
368 parser.add_argument("-a", "--accumulate", dest="accumulate",
369 action="store_true", default=False,
370 help="Accumulate dump-flow content")
371 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
372 default=5.0 * 60, type=float,
373 help="Decay old accumulated flows. "
374 "The default is 5 minutes. "
375 "A value of 0 disables decay.")
376 parser.add_argument("-d", "--delay", dest="delay", type=int,
378 help="Delay in milliseconds to collect dump-flow "
379 "content (sample rate).")
381 args = parser.parse_args()
383 logging.basicConfig(level=args.verbose)
388 # Code to parse a single line in dump-flow
391 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
393 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
394 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
397 def flow_line_iter(line):
398 """ iterate over flow dump elements.
399 return tuples of (true, element) or (false, remaining element)
401 # splits by , except for when in a (). Actions element was not
402 # split properly but we don't need it.
415 # ignore white space.
417 elif ((ch == ',') and (paren_count == 0)):
424 raise ValueError(line)
426 if (len(element) > 0):
431 def flow_line_compound_parse(compound):
432 """ Parse compound element
434 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
436 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
439 for element in flow_line_iter(compound):
440 match = FIELDS_CMPND_ELEMENT.search(element)
443 value = match.group(2)
446 match = FIELDS_CMPND.search(element)
449 value = match.group(2)
450 result[key] = flow_line_compound_parse(value)
453 if (len(result.keys()) == 0):
458 def flow_line_split(line):
459 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
460 Where fields and stats are lists.
461 This function relies on a the following ovs-dpctl dump-flow
462 output characteristics:
463 1. The dumpe flow line consists of a list of frame fields, list of stats
465 2. list of frame fields, each stat and action field are delimited by ', '.
466 3. That all other non stat field are not delimited by ', '.
470 results = re.split(', ', line)
472 (field, stats, action) = (results[0], results[1:-1], results[-1])
474 fields = flow_line_iter(field)
475 return (fields, stats, action)
478 def elements_to_dict(elements):
479 """ Convert line to a hierarchy of dictionaries. """
481 for element in elements:
482 match = FIELDS_CMPND.search(element)
485 value = match.group(2)
486 result[key] = flow_line_compound_parse(value)
489 match = FIELDS_ELEMENT.search(element)
492 value = match.group(2)
495 raise ValueError("can't parse >%s<" % element)
499 # pylint: disable-msg=R0903
500 class SumData(object):
501 """ Interface that all data going into SumDb must implement.
502 Holds the flow field and its corresponding count, total packets,
503 total bytes and calculates average.
505 __repr__ is used as key into SumData singleton.
506 __str__ is used as human readable output.
509 def __init__(self, field_type, field, packets, flow_bytes, key):
510 # Count is the number of lines in the dump-flow log.
511 self.field_type = field_type
514 self.packets = int(packets)
515 self.bytes = int(flow_bytes)
518 def decrement(self, decr_packets, decr_bytes, decr_count):
519 """ Decrement content to calculate delta from previous flow sample."""
520 self.packets -= decr_packets
521 self.bytes -= decr_bytes
522 self.count -= decr_count
524 def __iadd__(self, other):
525 """ Add two objects. """
527 if (self.key != other.key):
528 raise ValueError("adding two unrelated types")
530 self.count += other.count
531 self.packets += other.packets
532 self.bytes += other.bytes
535 def __isub__(self, other):
536 """ Decrement two objects. """
538 if (self.key != other.key):
539 raise ValueError("adding two unrelated types")
541 self.count -= other.count
542 self.packets -= other.packets
543 self.bytes -= other.bytes
546 def __getattr__(self, name):
547 """ Handle average. """
548 if (name == "average"):
549 if (self.packets == 0):
552 return float(self.bytes) / float(self.packets)
553 raise AttributeError(name)
556 """ Used for debugging. """
557 return "%s %s %s %s" % (self.field, self.count,
558 self.packets, self.bytes)
561 """ Used as key in the FlowDB table. """
565 def flow_aggregate(fields_dict, stats_dict):
566 """ Search for content in a line.
567 Passed the flow port of the dump-flows plus the current stats consisting
568 of packets, bytes, etc
572 for output_format in OUTPUT_FORMAT:
573 field = fields_dict.get(output_format.field_type, None)
575 obj = output_format.generator(output_format.field_type,
582 def flows_read(ihdl, flow_db):
583 """ read flow content from ihdl and insert into flow_db. """
587 line = ihdl.readline()
593 flow_db.flow_line_add(line)
594 except ValueError, arg:
600 def get_terminal_size():
602 return column width and height of the terminal
604 for fd_io in [0, 1, 2]:
606 result = struct.unpack('hh',
607 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
613 if (result is None or result == (0, 0)):
614 # Maybe we can't get the width. In that case assume (25, 80)
620 # Content derived from:
621 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
623 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
624 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
627 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
628 """Convert a file size to human-readable form.
631 size -- file size in bytes
632 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
633 if False, use multiples of 1000
640 raise ValueError('number must be non-negative')
642 if (a_kilobyte_is_1024_bytes):
646 for suffix in SUFFIXES[multiple]:
649 return "%.1f %s" % (size, suffix)
651 raise ValueError('number too large')
658 """ Concepts about columns. """
659 def __init__(self, sortable, width):
660 self.sortable = sortable
665 """ How to render rows. """
666 def __init__(self, label, fmt):
671 def fmt_packet(obj, width):
672 """ Provide a string for packets that is appropriate for output."""
673 return str(obj.packets).rjust(width)
676 def fmt_count(obj, width):
677 """ Provide a string for average that is appropriate for output."""
678 return str(obj.count).rjust(width)
681 def fmt_avg(obj, width):
682 """ Provide a string for average that is appropriate for output."""
683 return str(int(obj.average)).rjust(width)
686 def fmt_field(obj, width):
687 """ truncate really long flow and insert ellipses to help make it
693 if (len(obj.field) > width):
694 value = value[:(width - len(ellipses))] + ellipses
695 return value.ljust(width)
698 def fmt_bytes(obj, width):
699 """ Provide a string for average that is appropriate for output."""
700 if (len(str(obj.bytes)) <= width):
701 value = str(obj.bytes)
703 value = approximate_size(obj.bytes)
704 return value.rjust(width)
707 def title_center(value, width):
708 """ Center a column title."""
709 return value.upper().center(width)
712 def title_rjust(value, width):
713 """ Right justify a column title. """
714 return value.upper().rjust(width)
717 def column_picker(order, obj):
718 """ return the column as specified by order. """
728 raise ValueError("order outside of range %s" % order)
732 """ Renders flow data.
734 The two FIELD_SELECT variables should be set to the actual field minus
735 1. During construction, an internal method increments and initializes
738 FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
740 FIELD_SELECT_SCRIPT = 7
741 FIELD_SELECT_TOP = -1
743 def __init__(self, console_width, field_select):
744 """ Calculate column widths taking into account changes in format."""
746 self._start_time = datetime.datetime.now()
748 self._cols = [ColMeta(False, 0),
749 ColMeta(True, Columns.VALUE_WIDTH),
750 ColMeta(True, Columns.VALUE_WIDTH),
751 ColMeta(True, Columns.VALUE_WIDTH),
752 ColMeta(True, Columns.VALUE_WIDTH)]
753 self._console_width = console_width
754 self.console_width_set(console_width)
756 # Order in this array dictate the order of the columns.
757 # The 0 width for the first entry is a place holder. This is
758 # dynamically calculated. The first column is special. We need a
759 # way to indicate which field are presented.
760 self._descs = [RowMeta("", title_rjust),
761 RowMeta("", title_rjust),
762 RowMeta("", title_rjust),
763 RowMeta("", title_rjust),
764 RowMeta("", title_rjust)]
765 self._column_sort_select = 0
766 self.column_select_event()
769 RowMeta(Columns.FIELDS, title_center),
770 RowMeta(Columns.COUNT, title_rjust),
771 RowMeta(Columns.PACKETS, title_rjust),
772 RowMeta(Columns.BYTES, title_rjust),
773 RowMeta(Columns.AVERAGE, title_rjust)
777 RowMeta(None, fmt_field),
778 RowMeta(None, fmt_count),
779 RowMeta(None, fmt_packet),
780 RowMeta(None, fmt_bytes),
781 RowMeta(None, fmt_avg)
785 # _field_types hold which fields are displayed in the field
786 # column, with the keyword all implying all fields.
788 self._field_types = Render.FLOW_FIELDS
791 # The default is to show all field types.
793 self._field_type_select = field_select
794 self.field_type_toggle()
796 def _field_type_select_get(self):
797 """ Return which field type to display. """
798 return self._field_types[self._field_type_select]
800 def field_type_toggle(self):
801 """ toggle which field types to show. """
802 self._field_type_select += 1
803 if (self._field_type_select >= len(self._field_types)):
804 self._field_type_select = 0
805 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
806 self._titles[0].label = value
808 def column_select_event(self):
809 """ Handles column select toggle. """
811 self._descs[self._column_sort_select].label = ""
812 for _ in range(len(self._cols)):
813 self._column_sort_select += 1
814 if (self._column_sort_select >= len(self._cols)):
815 self._column_sort_select = 0
817 # Now look for the next sortable column
818 if (self._cols[self._column_sort_select].sortable):
820 self._descs[self._column_sort_select].label = "DESC"
822 def console_width_set(self, console_width):
823 """ Adjust the output given the new console_width. """
824 self._console_width = console_width
826 spaces = len(self._cols) - 1
828 # Calculating column width can be tedious but important. The
829 # flow field value can be long. The goal here is to dedicate
830 # fixed column space for packets, bytes, average and counts. Give the
831 # remaining space to the flow column. When numbers get large
832 # transition output to output generated by approximate_size which
833 # limits output to ###.# XiB in other words 9 characters.
835 # At this point, we know the maximum length values. We may
836 # truncate the flow column to get everything to fit.
837 self._cols[0].width = 0
838 values_max_length = sum([ii.width for ii in self._cols]) + spaces
839 flow_max_length = console_width - values_max_length
840 self._cols[0].width = flow_max_length
842 def format(self, flow_db):
843 """ shows flows based on --script parameter."""
847 # Top output consists of
849 # Column title (2 rows)
851 # statistics and status
856 rc.append("Flow Summary".center(self._console_width))
858 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
859 flow_db.flow_stats_get()
860 accumulate = flow_db.accumulate_get()
862 stats += "Accumulate: on "
864 stats += "Accumulate: off "
866 duration = datetime.datetime.now() - self._start_time
867 stats += "Duration: %s " % str(duration)
868 rc.append(stats.ljust(self._console_width))
871 # 2 rows for columns.
873 # Indicate which column is in descending order.
874 rc.append(" ".join([ii.fmt(ii.label, col.width)
875 for (ii, col) in zip(self._descs, self._cols)]))
877 rc.append(" ".join([ii.fmt(ii.label, col.width)
878 for (ii, col) in zip(self._titles, self._cols)]))
883 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
884 self._column_sort_select):
885 rc.append(" ".join([ii.fmt(dd, col.width)
886 for (ii, col) in zip(self._datas,
892 def curses_screen_begin():
893 """ begin curses screen control. """
894 stdscr = curses.initscr()
901 def curses_screen_end(stdscr):
902 """ end curses screen control. """
910 """ Implements live vs accumulate mode.
912 Flows are stored as key value pairs. The key consists of the content
913 prior to stat fields. The value portion consists of stats in a dictionary
916 @ \todo future add filtering here.
918 def __init__(self, accumulate):
919 self._accumulate = accumulate
920 self._error_count = 0
921 # Values are (stats, last update time.)
922 # The last update time is used for aging.
923 self._flow_lock = threading.Lock()
924 # This dictionary holds individual flows.
926 # This dictionary holds aggregate of flow fields.
929 def accumulate_get(self):
930 """ Return the current accumulate state. """
931 return self._accumulate
933 def accumulate_toggle(self):
934 """ toggle accumulate flow behavior. """
935 self._accumulate = not self._accumulate
938 """ Indicate the beginning of processing flow content.
939 if accumulate is false clear current set of flows. """
941 if (not self._accumulate):
942 self._flow_lock.acquire()
946 self._flow_lock.release()
949 def flow_line_add(self, line):
950 """ Split a line from a ovs-dpctl dump-flow into key and stats.
951 The order of the content in the flow should be:
956 This method also assumes that the dump flow output does not
957 change order of fields of the same flow.
960 line = line.rstrip("\n")
961 (fields, stats, _) = flow_line_split(line)
964 fields_dict = elements_to_dict(fields)
966 if (len(fields_dict) == 0):
967 raise ValueError("flow fields are missing %s", line)
969 stats_dict = elements_to_dict(stats)
970 if (len(stats_dict) == 0):
971 raise ValueError("statistics are missing %s.", line)
974 # In accumulate mode, the Flow database can reach 10,000's of
975 # persistent flows. The interaction of the script with this many
976 # flows is too slow. Instead, delta are sent to the flow_db
977 # database allow incremental changes to be done in O(m) time
978 # where m is the current flow list, instead of iterating over
979 # all flows in O(n) time where n is the entire history of flows.
980 key = ",".join(fields)
982 self._flow_lock.acquire()
984 (stats_old_dict, _) = self._flows.get(key, (None, None))
986 self._flow_lock.release()
988 self.flow_event(fields_dict, stats_old_dict, stats_dict)
990 except ValueError, arg:
992 self._error_count += 1
995 self._flow_lock.acquire()
997 self._flows[key] = (stats_dict, datetime.datetime.now())
999 self._flow_lock.release()
1001 def decay(self, decayTimeInSeconds):
1002 """ Decay content. """
1003 now = datetime.datetime.now()
1004 for (key, value) in self._flows.items():
1005 (stats_dict, updateTime) = value
1006 delta = now - updateTime
1008 if (delta.seconds > decayTimeInSeconds):
1009 self._flow_lock.acquire()
1011 del self._flows[key]
1013 fields_dict = elements_to_dict(flow_line_iter(key))
1014 matches = flow_aggregate(fields_dict, stats_dict)
1015 for match in matches:
1016 self.field_dec(match)
1019 self._flow_lock.release()
1021 def flow_stats_get(self):
1022 """ Return statistics in a form of a dictionary. """
1024 self._flow_lock.acquire()
1026 rc = {"flow_total": len(self._flows),
1027 "flow_errors": self._error_count}
1029 self._flow_lock.release()
1032 def field_types_get(self):
1033 """ Return the set of types stored in the singleton. """
1034 types = set((ii.field_type for ii in self._fields.values()))
1037 def field_add(self, data):
1038 """ Collect dump-flow data to sum number of times item appears. """
1039 current = self._fields.get(repr(data), None)
1040 if (current is None):
1041 current = copy.copy(data)
1044 self._fields[repr(current)] = current
1046 def field_dec(self, data):
1047 """ Collect dump-flow data to sum number of times item appears. """
1048 current = self._fields.get(repr(data), None)
1049 if (current is None):
1050 raise ValueError("decrementing field missing %s" % repr(data))
1053 self._fields[repr(current)] = current
1054 if (current.count == 0):
1055 del self._fields[repr(current)]
1057 def field_values_in_order(self, field_type_select, column_order):
1058 """ Return a list of items in order maximum first. """
1059 values = self._fields.values()
1060 if (field_type_select != "all"):
1061 # If a field type other than "all" then reduce the list.
1062 values = [ii for ii in values
1063 if (ii.field_type == field_type_select)]
1064 values = [(column_picker(column_order, ii), ii) for ii in values]
1065 values.sort(key=operator.itemgetter(0))
1067 values = [ii[1] for ii in values]
1070 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1071 """ Receives new flow information. """
1073 # In order to avoid processing every flow at every sample
1074 # period, changes in flow packet count is used to determine the
1075 # delta in the flow statistics. This delta is used in the call
1076 # to self.decrement prior to self.field_add
1078 if (stats_old_dict is None):
1079 # This is a new flow
1080 matches = flow_aggregate(fields_dict, stats_new_dict)
1081 for match in matches:
1082 self.field_add(match)
1084 old_packets = int(stats_old_dict.get("packets", 0))
1085 new_packets = int(stats_new_dict.get("packets", 0))
1086 if (old_packets == new_packets):
1087 # ignore. same data.
1090 old_bytes = stats_old_dict.get("bytes", 0)
1091 # old_packets != new_packets
1092 # if old_packets > new_packets then we end up decrementing
1093 # packets and bytes.
1094 matches = flow_aggregate(fields_dict, stats_new_dict)
1095 for match in matches:
1096 match.decrement(int(old_packets), int(old_bytes), 1)
1097 self.field_add(match)
1100 class DecayThread(threading.Thread):
1101 """ Periodically call flow database to see if any flows are old. """
1102 def __init__(self, flow_db, interval):
1103 """ Start decay thread. """
1104 threading.Thread.__init__(self)
1106 self._interval = max(1, interval)
1107 self._min_interval = min(1, interval / 10)
1108 self._flow_db = flow_db
1109 self._event = threading.Event()
1110 self._running = True
1115 """ Worker thread which handles decaying accumulated flows. """
1117 while(self._running):
1118 self._event.wait(self._min_interval)
1120 self._flow_db.decay(self._interval)
1123 """ Stop thread. """
1124 self._running = False
1127 # Give the calling thread time to terminate but not too long.
1128 # this thread is a daemon so the application will terminate if
1129 # we timeout during the join. This is just a cleaner way to
1130 # release resources.
1134 def flow_top_command(stdscr, render, flow_db):
1135 """ Handle input while in top mode. """
1138 # Any character will restart sampling.
1139 if (ch == ord('h')):
1145 if (ch == ord('s')):
1146 # toggle which column sorts data in descending order.
1147 render.column_select_event()
1148 elif (ch == ord('a')):
1149 flow_db.accumulate_toggle()
1150 elif (ch == ord('f')):
1151 render.field_type_toggle()
1152 elif (ch == ord(' ')):
1159 def decay_timer_start(flow_db, accumulateDecay):
1160 """ If accumulateDecay greater than zero then start timer. """
1161 if (accumulateDecay > 0):
1162 decay_timer = DecayThread(flow_db, accumulateDecay)
1169 def flows_top(args):
1170 """ handles top like behavior when --script is not specified. """
1172 flow_db = FlowDB(args.accumulate)
1173 render = Render(0, Render.FIELD_SELECT_TOP)
1175 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1179 stdscr = curses_screen_begin()
1183 stdscr.timeout(args.delay)
1185 while (ch != ord('q')):
1189 ihdl = top_input_get(args)
1191 flows_read(ihdl, flow_db)
1194 except OSError, arg:
1195 logging.critical(arg)
1198 (console_height, console_width) = stdscr.getmaxyx()
1199 render.console_width_set(console_width)
1201 output_height = console_height - 1
1202 line_count = range(output_height)
1203 line_output = render.format(flow_db)
1204 lines = zip(line_count, line_output[:output_height])
1207 for (count, line) in lines:
1208 stdscr.addstr(count, 0, line[:console_width])
1211 ch = flow_top_command(stdscr, render, flow_db)
1214 curses_screen_end(stdscr)
1215 except KeyboardInterrupt:
1221 for (count, line) in lines:
1225 def flows_script(args):
1226 """ handles --script option. """
1228 flow_db = FlowDB(args.accumulate)
1231 if (args.flowFiles is None):
1232 logging.info("reading flows from stdin")
1233 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1235 flow_db = flows_read(ihdl, flow_db)
1239 for flowFile in args.flowFiles:
1240 logging.info("reading flows from %s", flowFile)
1241 ihdl = open(flowFile, "r")
1243 flow_db = flows_read(ihdl, flow_db)
1247 (_, console_width) = get_terminal_size()
1248 render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
1250 for line in render.format(flow_db):
1255 """ Return 0 on success or 1 on failure.
1258 There are four stages to the process ovs-dpctl dump-flow content.
1259 1. Retrieve current input
1260 2. store in FlowDB and maintain history
1261 3. Iterate over FlowDB and aggregating stats for each flow field
1264 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1265 is called. Future version will have more elaborate means for collecting
1266 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1267 dictionary. Input will vary.
1269 In the case of accumulate mode, flows are not purged from the FlowDB
1270 manager. Instead at the very least, merely the latest statistics are
1271 kept. In the case, of live output the FlowDB is purged prior to sampling
1274 Aggregating results requires identify flow fields to aggregate out
1275 of the flow and summing stats.
1285 except KeyboardInterrupt:
1289 if __name__ == '__main__':
1291 elif __name__ == 'ovs-dpctl-top':
1292 # pylint: disable-msg=R0915
1295 # Test case beyond this point.
1296 # pylint: disable-msg=R0904
1297 class TestsuiteFlowParse(unittest.TestCase):
1299 parse flow into hierarchy of dictionaries.
1301 def test_flow_parse(self):
1302 """ test_flow_parse. """
1303 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1304 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1305 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1306 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1307 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1308 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1309 "38,41,44,47,50,53,56,59,62,65"
1311 (fields, stats, _) = flow_line_split(line)
1312 flow_dict = elements_to_dict(fields + stats)
1313 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1314 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1315 self.assertEqual(flow_dict["ipv6"]["src"],
1316 "fe80::55bf:fe42:bc96:2812")
1317 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1318 self.assertEqual(flow_dict["packets"], "1")
1319 self.assertEqual(flow_dict["bytes"], "92")
1321 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1322 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1323 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1324 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1325 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1326 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1327 "38,41,44,47,50,53,56,59,62,65"
1329 (fields, stats, _) = flow_line_split(line)
1330 flow_dict = elements_to_dict(fields + stats)
1331 self.assertEqual(flow_dict["used"], "-0.703s")
1332 self.assertEqual(flow_dict["packets"], "1")
1333 self.assertEqual(flow_dict["bytes"], "92")
1335 def test_flow_sum(self):
1336 """ test_flow_sum. """
1337 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1338 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1339 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1340 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1341 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1342 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1343 "38,41,44,47,50,53,56,59,62,65"
1345 (fields, stats, _) = flow_line_split(line)
1346 stats_dict = elements_to_dict(stats)
1347 fields_dict = elements_to_dict(fields)
1349 # Test simple case of one line.
1350 flow_db = FlowDB(False)
1351 matches = flow_aggregate(fields_dict, stats_dict)
1352 for match in matches:
1353 flow_db.field_add(match)
1355 flow_types = flow_db.field_types_get()
1356 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1357 self.assert_(len(flow_types) == len(expected_flow_types))
1358 for flow_type in flow_types:
1359 self.assertTrue(flow_type in expected_flow_types)
1361 for flow_type in flow_types:
1362 sum_value = flow_db.field_values_in_order("all", 1)
1363 self.assert_(len(sum_value) == 5)
1364 self.assert_(sum_value[0].packets == 2)
1365 self.assert_(sum_value[0].count == 1)
1366 self.assert_(sum_value[0].bytes == 92)
1369 # Add line again just to see counts go up.
1370 matches = flow_aggregate(fields_dict, stats_dict)
1371 for match in matches:
1372 flow_db.field_add(match)
1374 flow_types = flow_db.field_types_get()
1375 self.assert_(len(flow_types) == len(expected_flow_types))
1376 for flow_type in flow_types:
1377 self.assertTrue(flow_type in expected_flow_types)
1379 for flow_type in flow_types:
1380 sum_value = flow_db.field_values_in_order("all", 1)
1381 self.assert_(len(sum_value) == 5)
1382 self.assert_(sum_value[0].packets == 4)
1383 self.assert_(sum_value[0].count == 2)
1384 self.assert_(sum_value[0].bytes == 2 * 92)
1386 def test_assoc_list(self):
1387 """ test_assoc_list. """
1388 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1389 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1390 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1391 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1392 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1393 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1394 "38,41,44,47,50,53,56,59,62,65"
1400 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1401 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1404 (fields, stats, _) = flow_line_split(line)
1405 stats_dict = elements_to_dict(stats)
1406 fields_dict = elements_to_dict(fields)
1409 # Test simple case of one line.
1410 flow_db = FlowDB(False)
1411 matches = flow_aggregate(fields_dict, stats_dict)
1412 for match in matches:
1413 flow_db.field_add(match)
1415 for sum_value in flow_db.field_values_in_order("all", 1):
1416 assoc_list = Columns.assoc_list(sum_value)
1417 for item in assoc_list:
1418 if (item[0] == "fields"):
1419 self.assertTrue(item[1] in valid_flows)
1420 elif (item[0] == "packets"):
1421 self.assertTrue(item[1] == 2)
1422 elif (item[0] == "count"):
1423 self.assertTrue(item[1] == 1)
1424 elif (item[0] == "average"):
1425 self.assertTrue(item[1] == 46.0)
1426 elif (item[0] == "bytes"):
1427 self.assertTrue(item[1] == 92)
1429 raise ValueError("unknown %s", item[0])
1431 def test_human_format(self):
1432 """ test_assoc_list. """
1434 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1435 self.assertEqual(approximate_size(1024), "1.0 KiB")
1436 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1437 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1439 value = (1024 * 1024 * 1024) + 100000000
1440 self.assertEqual(approximate_size(value), "1.1 GiB")
1442 def test_flow_line_split(self):
1443 """ Splitting a flow line is not trivial.
1444 There is no clear delimiter. Comma is used liberally."""
1445 expected_fields = ["in_port(4)",
1446 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1448 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1449 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1450 "udp(src=61252,dst=5355)"]
1451 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1452 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1453 "38,41,44,47,50,53,56,59,62,65"
1455 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1456 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1457 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1458 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1459 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1460 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1461 "38,41,44,47,50,53,56,59,62,65"
1463 (fields, stats, actions) = flow_line_split(line)
1465 self.assertEqual(fields, expected_fields)
1466 self.assertEqual(stats, expected_stats)
1467 self.assertEqual(actions, expected_actions)
1469 def test_accumulate_decay(self):
1470 """ test_accumulate_decay: test accumulated decay. """
1471 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1472 "dst=ff:ff:ff:ff:ff:ff),"
1473 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1474 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1475 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1476 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1477 "packets:1, bytes:120, used:0.004s, actions:1"]
1479 flow_db = FlowDB(True)
1481 flow_db.flow_line_add(lines[0])
1483 # Make sure we decay
1485 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1487 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1489 flow_db.flow_line_add(lines[0])
1490 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1492 # Should not be deleted.
1493 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1495 flow_db.flow_line_add(lines[0])
1496 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1497 timer = decay_timer_start(flow_db, 2)
1499 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1502 def test_accumulate(self):
1503 """ test_accumulate test that FlowDB supports accumulate. """
1505 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1506 "dst=ff:ff:ff:ff:ff:ff),"
1507 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1508 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1509 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1510 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1511 "packets:1, bytes:120, used:0.004s, actions:1",
1513 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1514 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1515 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1516 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1517 "packets:2, bytes:5026, used:0.348s, actions:1",
1518 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1519 "dst=ff:ff:ff:ff:ff:ff),"
1520 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1521 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1522 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1523 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1524 "bytes:240, used:0.004s, actions:1"]
1527 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1528 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1529 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1530 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1531 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1532 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1535 # Turn on accumulate.
1536 flow_db = FlowDB(True)
1539 flow_db.flow_line_add(lines[0])
1541 # Test one flow exist.
1542 sum_values = flow_db.field_values_in_order("all", 1)
1543 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1544 self.assertEqual(len(in_ports), 1)
1545 self.assertEqual(in_ports[0].packets, 1)
1546 self.assertEqual(in_ports[0].bytes, 120)
1547 self.assertEqual(in_ports[0].count, 1)
1549 # simulate another sample
1550 # Test two different flows exist.
1552 flow_db.flow_line_add(lines[1])
1553 sum_values = flow_db.field_values_in_order("all", 1)
1554 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1555 self.assertEqual(len(in_ports), 1)
1556 self.assertEqual(in_ports[0].packets, 1)
1557 self.assertEqual(in_ports[0].bytes, 120)
1558 self.assertEqual(in_ports[0].count, 1)
1560 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1561 self.assertEqual(len(in_ports), 1)
1562 self.assertEqual(in_ports[0].packets, 2)
1563 self.assertEqual(in_ports[0].bytes, 126)
1564 self.assertEqual(in_ports[0].count, 1)
1566 # Test first flow increments packets.
1568 flow_db.flow_line_add(lines[2])
1569 sum_values = flow_db.field_values_in_order("all", 1)
1570 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1571 self.assertEqual(len(in_ports), 1)
1572 self.assertEqual(in_ports[0].packets, 2)
1573 self.assertEqual(in_ports[0].bytes, 240)
1574 self.assertEqual(in_ports[0].count, 1)
1576 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1577 self.assertEqual(len(in_ports), 1)
1578 self.assertEqual(in_ports[0].packets, 2)
1579 self.assertEqual(in_ports[0].bytes, 126)
1580 self.assertEqual(in_ports[0].count, 1)
1582 # Test third flow but with the same in_port(1) as the first flow.
1584 flow_db.flow_line_add(lines[3])
1585 sum_values = flow_db.field_values_in_order("all", 1)
1586 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1587 self.assertEqual(len(in_ports), 1)
1588 self.assertEqual(in_ports[0].packets, 3)
1589 self.assertEqual(in_ports[0].bytes, 360)
1590 self.assertEqual(in_ports[0].count, 2)
1592 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1593 self.assertEqual(len(in_ports), 1)
1594 self.assertEqual(in_ports[0].packets, 2)
1595 self.assertEqual(in_ports[0].bytes, 126)
1596 self.assertEqual(in_ports[0].count, 1)
1598 # Third flow has changes.
1600 flow_db.flow_line_add(lines[4])
1601 sum_values = flow_db.field_values_in_order("all", 1)
1602 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1603 self.assertEqual(len(in_ports), 1)
1604 self.assertEqual(in_ports[0].packets, 4)
1605 self.assertEqual(in_ports[0].bytes, 480)
1606 self.assertEqual(in_ports[0].count, 2)
1608 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1609 self.assertEqual(len(in_ports), 1)
1610 self.assertEqual(in_ports[0].packets, 2)
1611 self.assertEqual(in_ports[0].bytes, 126)
1612 self.assertEqual(in_ports[0].count, 1)
1616 flow_db.flow_line_add(lines[5])
1617 sum_values = flow_db.field_values_in_order("all", 1)
1618 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1619 self.assertEqual(len(in_ports), 1)
1620 self.assertEqual(in_ports[0].packets, 3)
1621 self.assertEqual(in_ports[0].bytes, 360)
1622 self.assertEqual(in_ports[0].count, 2)
1624 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1625 self.assertEqual(len(in_ports), 1)
1626 self.assertEqual(in_ports[0].packets, 2)
1627 self.assertEqual(in_ports[0].bytes, 126)
1628 self.assertEqual(in_ports[0].count, 1)
1630 def test_parse_character_errors(self):
1631 """ test_parsing errors.
1632 The flow parses is purposely loose. Its not designed to validate
1633 input. Merely pull out what it can but there are situations
1634 that a parse error can be detected.
1637 lines = ["complete garbage",
1638 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1639 "dst=33:33:00:00:00:66),"
1640 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1641 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1642 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1643 "packets:2,bytes:5026,actions:1"]
1645 flow_db = FlowDB(False)
1649 flow_db.flow_line_add(line)
1651 # We want an exception. That is how we know we have
1652 # correctly found a simple parsing error. We are not
1653 # looking to validate flow output just catch simple issues.
1655 self.assertTrue(False)
1657 def test_tunnel_parsing(self):
1658 """ test_tunnel_parsing test parse flows with tunnel. """
1660 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1661 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1662 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1663 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1664 "actions:userspace(pid=4294962691,slow_path(cfm))"
1666 flow_db = FlowDB(False)
1668 flow_db.flow_line_add(lines[0])
1669 sum_values = flow_db.field_values_in_order("all", 1)
1670 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1671 self.assertEqual(len(in_ports), 1)
1672 self.assertEqual(in_ports[0].packets, 6)
1673 self.assertEqual(in_ports[0].bytes, 534)
1674 self.assertEqual(in_ports[0].count, 1)
1676 def test_flow_multiple_paren(self):
1677 """ test_flow_multiple_paren. """
1678 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1679 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1681 rc = flow_line_iter(line)
1682 self.assertEqual(valid, rc)
1684 def test_to_network(self):
1685 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1687 ("192.168.0.1", "192.168.0.1"),
1688 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1689 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1690 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1691 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1692 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1693 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1694 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1695 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1696 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1700 ("1::192:168:0:1", "1::192:168:0:1"),
1701 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1702 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1703 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1704 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1705 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1706 ("1::192:168:0:1/::", "::")
1709 for (ipv4_test, ipv4_check) in ipv4s:
1710 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1712 for (ipv6_test, ipv6_check) in ipv6s:
1713 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
1716 """ test_ui: test expected ui behavior. """
1717 #pylint: disable=W0212
1718 top_render = Render(80, Render.FIELD_SELECT_TOP)
1719 script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1720 self.assertEqual(top_render._field_type_select_get(), "in_port")
1721 self.assertEqual(script_render._field_type_select_get(), "all")
1722 #pylint: enable=W0212