3 # Copyright (c) 2013 Nicira, Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
26 """Top like behavior for ovs-dpctl dump-flows output.
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
35 - Source and destination MAC addresses
39 - Source and destination IPv4 addresses
41 - Source and destination IPv6 addresses
43 - UDP and TCP destination port
45 - Tunnel source and destination addresses
48 Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
51 - PACKETS: the total number of packets containing the flow field.
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
56 - AVERAGE: the average packets size (BYTES/PACKET).
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
62 While in top mode, the default behavior, the following single character
63 commands are supported:
65 a - toggles top in accumulate and live mode. Accumulate mode is described
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
71 _ - a space indicating to collect dump-flow content again
73 h - halt output. Any character will restart sampling
75 f - cycle through flow fields
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
101 Error messages will identify content that failed to parse.
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
110 $ ssh-copy-id user@hostname
112 Consult ssh-copy-id man pages for more details.
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
171 (ip, mask) = ip_str.split("/")
173 # just an ip address no mask.
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
185 def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
189 pack_length = '!HHHHHHHH'
191 (ip, mask) = ip_str.split("/")
193 # just an ip address no mask.
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
238 def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
247 def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
259 def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
264 def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
277 def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
284 def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
291 # pylint: disable-msg=R0903
293 """ Holds field_type and function to extract element value. """
294 def __init__(self, field_type, generator):
295 self.field_type = field_type
296 self.generator = generator
299 OutputFormat("eth", element_eth_get),
300 OutputFormat("ipv4", element_ipv4_get),
301 OutputFormat("ipv6", element_ipv6_get),
302 OutputFormat("tunnel", element_tunnel_get),
303 OutputFormat("udp", element_dst_port_get),
304 OutputFormat("tcp", element_dst_port_get),
305 OutputFormat("eth_type", element_passthrough_get),
306 OutputFormat("in_port", element_passthrough_get)
316 def top_input_get(args):
317 """ Return subprocess stdout."""
320 cmd += ["ssh", args.host]
321 cmd += ["ovs-dpctl", "dump-flows"]
323 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
324 stdout=subprocess.PIPE).stdout
328 """ read program parameters handle any necessary validation of input. """
330 parser = argparse.ArgumentParser(
331 formatter_class=argparse.RawDescriptionHelpFormatter,
334 # None is a special value indicating to read flows from stdin.
335 # This handles the case
336 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
337 parser.add_argument("-v", "--version", version="@VERSION@",
338 action="version", help="show version")
339 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
341 help="file containing flows from ovs-dpctl dump-flow")
342 parser.add_argument("-V", "--verbose", dest="verbose",
343 default=logging.CRITICAL,
344 action="store_const", const=logging.DEBUG,
345 help="enable debug level verbosity")
346 parser.add_argument("-s", "--script", dest="top", action="store_false",
347 help="Run from a script (no user interface)")
348 parser.add_argument("--host", dest="host",
349 help="Specify a user@host for retrieving flows see"
350 "Accessing Remote Hosts for more information")
352 parser.add_argument("-a", "--accumulate", dest="accumulate",
353 action="store_true", default=False,
354 help="Accumulate dump-flow content")
355 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
356 default=5.0 * 60, type=float,
357 help="Decay old accumulated flows. "
358 "The default is 5 minutes. "
359 "A value of 0 disables decay.")
360 parser.add_argument("-d", "--delay", dest="delay", type=int,
362 help="Delay in milliseconds to collect dump-flow "
363 "content (sample rate).")
365 args = parser.parse_args()
367 logging.basicConfig(level=args.verbose)
372 # Code to parse a single line in dump-flow
375 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
377 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
378 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
381 def flow_line_iter(line):
382 """ iterate over flow dump elements.
383 return tuples of (true, element) or (false, remaining element)
385 # splits by , except for when in a (). Actions element was not
386 # split properly but we don't need it.
399 # ignore white space.
401 elif ((ch == ',') and (paren_count == 0)):
408 raise ValueError(line)
410 if (len(element) > 0):
415 def flow_line_compound_parse(compound):
416 """ Parse compound element
418 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
420 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
423 for element in flow_line_iter(compound):
424 match = FIELDS_CMPND_ELEMENT.search(element)
427 value = match.group(2)
430 match = FIELDS_CMPND.search(element)
433 value = match.group(2)
434 result[key] = flow_line_compound_parse(value)
437 if (len(result.keys()) == 0):
442 def flow_line_split(line):
443 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
444 Where fields and stats are lists.
445 This function relies on a the following ovs-dpctl dump-flow
446 output characteristics:
447 1. The dumpe flow line consists of a list of frame fields, list of stats
449 2. list of frame fields, each stat and action field are delimited by ', '.
450 3. That all other non stat field are not delimited by ', '.
454 results = re.split(', ', line)
456 (field, stats, action) = (results[0], results[1:-1], results[-1])
458 fields = flow_line_iter(field)
459 return (fields, stats, action)
462 def elements_to_dict(elements):
463 """ Convert line to a hierarchy of dictionaries. """
465 for element in elements:
466 match = FIELDS_CMPND.search(element)
469 value = match.group(2)
470 result[key] = flow_line_compound_parse(value)
473 match = FIELDS_ELEMENT.search(element)
476 value = match.group(2)
479 raise ValueError("can't parse >%s<" % element)
483 # pylint: disable-msg=R0903
484 class SumData(object):
485 """ Interface that all data going into SumDb must implement.
486 Holds the flow field and its corresponding count, total packets,
487 total bytes and calculates average.
489 __repr__ is used as key into SumData singleton.
490 __str__ is used as human readable output.
493 def __init__(self, field_type, field, packets, flow_bytes, key):
494 # Count is the number of lines in the dump-flow log.
495 self.field_type = field_type
498 self.packets = int(packets)
499 self.bytes = int(flow_bytes)
502 def decrement(self, decr_packets, decr_bytes, decr_count):
503 """ Decrement content to calculate delta from previous flow sample."""
504 self.packets -= decr_packets
505 self.bytes -= decr_bytes
506 self.count -= decr_count
508 def __iadd__(self, other):
509 """ Add two objects. """
511 if (self.key != other.key):
512 raise ValueError("adding two unrelated types")
514 self.count += other.count
515 self.packets += other.packets
516 self.bytes += other.bytes
519 def __isub__(self, other):
520 """ Decrement two objects. """
522 if (self.key != other.key):
523 raise ValueError("adding two unrelated types")
525 self.count -= other.count
526 self.packets -= other.packets
527 self.bytes -= other.bytes
530 def __getattr__(self, name):
531 """ Handle average. """
532 if (name == "average"):
533 if (self.packets == 0):
536 return float(self.bytes) / float(self.packets)
537 raise AttributeError(name)
540 """ Used for debugging. """
541 return "%s %s %s %s" % (self.field, self.count,
542 self.packets, self.bytes)
545 """ Used as key in the FlowDB table. """
549 def flow_aggregate(fields_dict, stats_dict):
550 """ Search for content in a line.
551 Passed the flow port of the dump-flows plus the current stats consisting
552 of packets, bytes, etc
556 for output_format in OUTPUT_FORMAT:
557 field = fields_dict.get(output_format.field_type, None)
559 obj = output_format.generator(output_format.field_type,
566 def flows_read(ihdl, flow_db):
567 """ read flow content from ihdl and insert into flow_db. """
571 line = ihdl.readline()
577 flow_db.flow_line_add(line)
578 except ValueError, arg:
584 def get_terminal_size():
586 return column width and height of the terminal
588 for fd_io in [0, 1, 2]:
590 result = struct.unpack('hh',
591 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
597 if (result is None or result == (0, 0)):
598 # Maybe we can't get the width. In that case assume (25, 80)
604 # Content derived from:
605 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
607 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
608 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
611 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
612 """Convert a file size to human-readable form.
615 size -- file size in bytes
616 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
617 if False, use multiples of 1000
624 raise ValueError('number must be non-negative')
626 if (a_kilobyte_is_1024_bytes):
630 for suffix in SUFFIXES[multiple]:
633 return "%.1f %s" % (size, suffix)
635 raise ValueError('number too large')
642 """ Concepts about columns. """
643 def __init__(self, sortable, width):
644 self.sortable = sortable
649 """ How to render rows. """
650 def __init__(self, label, fmt):
655 def fmt_packet(obj, width):
656 """ Provide a string for packets that is appropriate for output."""
657 return str(obj.packets).rjust(width)
660 def fmt_count(obj, width):
661 """ Provide a string for average that is appropriate for output."""
662 return str(obj.count).rjust(width)
665 def fmt_avg(obj, width):
666 """ Provide a string for average that is appropriate for output."""
667 return str(int(obj.average)).rjust(width)
670 def fmt_field(obj, width):
671 """ truncate really long flow and insert ellipses to help make it
677 if (len(obj.field) > width):
678 value = value[:(width - len(ellipses))] + ellipses
679 return value.ljust(width)
682 def fmt_bytes(obj, width):
683 """ Provide a string for average that is appropriate for output."""
684 if (len(str(obj.bytes)) <= width):
685 value = str(obj.bytes)
687 value = approximate_size(obj.bytes)
688 return value.rjust(width)
691 def title_center(value, width):
692 """ Center a column title."""
693 return value.upper().center(width)
696 def title_rjust(value, width):
697 """ Right justify a column title. """
698 return value.upper().rjust(width)
701 def column_picker(order, obj):
702 """ return the column as specified by order. """
712 raise ValueError("order outside of range %s" % order)
716 """ Renders flow data. """
717 def __init__(self, console_width):
718 """ Calculate column widths taking into account changes in format."""
720 self._start_time = datetime.datetime.now()
722 self._cols = [ColMeta(False, 0),
723 ColMeta(True, Columns.VALUE_WIDTH),
724 ColMeta(True, Columns.VALUE_WIDTH),
725 ColMeta(True, Columns.VALUE_WIDTH),
726 ColMeta(True, Columns.VALUE_WIDTH)]
727 self._console_width = console_width
728 self.console_width_set(console_width)
730 # Order in this array dictate the order of the columns.
731 # The 0 width for the first entry is a place holder. This is
732 # dynamically calculated. The first column is special. We need a
733 # way to indicate which field are presented.
734 self._descs = [RowMeta("", title_rjust),
735 RowMeta("", title_rjust),
736 RowMeta("", title_rjust),
737 RowMeta("", title_rjust),
738 RowMeta("", title_rjust)]
739 self._column_sort_select = 0
740 self.column_select_event()
743 RowMeta(Columns.FIELDS, title_center),
744 RowMeta(Columns.COUNT, title_rjust),
745 RowMeta(Columns.PACKETS, title_rjust),
746 RowMeta(Columns.BYTES, title_rjust),
747 RowMeta(Columns.AVERAGE, title_rjust)
751 RowMeta(None, fmt_field),
752 RowMeta(None, fmt_count),
753 RowMeta(None, fmt_packet),
754 RowMeta(None, fmt_bytes),
755 RowMeta(None, fmt_avg)
759 # _field_types hold which fields are displayed in the field
760 # column, with the keyword all implying all fields.
762 self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
765 # The default is to show all field types.
767 self._field_type_select = -1
768 self.field_type_toggle()
770 def _field_type_select_get(self):
771 """ Return which field type to display. """
772 return self._field_types[self._field_type_select]
774 def field_type_toggle(self):
775 """ toggle which field types to show. """
776 self._field_type_select += 1
777 if (self._field_type_select >= len(self._field_types)):
778 self._field_type_select = 0
779 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
780 self._titles[0].label = value
782 def column_select_event(self):
783 """ Handles column select toggle. """
785 self._descs[self._column_sort_select].label = ""
786 for _ in range(len(self._cols)):
787 self._column_sort_select += 1
788 if (self._column_sort_select >= len(self._cols)):
789 self._column_sort_select = 0
791 # Now look for the next sortable column
792 if (self._cols[self._column_sort_select].sortable):
794 self._descs[self._column_sort_select].label = "DESC"
796 def console_width_set(self, console_width):
797 """ Adjust the output given the new console_width. """
798 self._console_width = console_width
800 spaces = len(self._cols) - 1
802 # Calculating column width can be tedious but important. The
803 # flow field value can be long. The goal here is to dedicate
804 # fixed column space for packets, bytes, average and counts. Give the
805 # remaining space to the flow column. When numbers get large
806 # transition output to output generated by approximate_size which
807 # limits output to ###.# XiB in other words 9 characters.
809 # At this point, we know the maximum length values. We may
810 # truncate the flow column to get everything to fit.
811 self._cols[0].width = 0
812 values_max_length = sum([ii.width for ii in self._cols]) + spaces
813 flow_max_length = console_width - values_max_length
814 self._cols[0].width = flow_max_length
816 def format(self, flow_db):
817 """ shows flows based on --script parameter."""
821 # Top output consists of
823 # Column title (2 rows)
825 # statistics and status
830 rc.append("Flow Summary".center(self._console_width))
832 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
833 flow_db.flow_stats_get()
834 accumulate = flow_db.accumulate_get()
836 stats += "Accumulate: on "
838 stats += "Accumulate: off "
840 duration = datetime.datetime.now() - self._start_time
841 stats += "Duration: %s " % str(duration)
842 rc.append(stats.ljust(self._console_width))
845 # 2 rows for columns.
847 # Indicate which column is in descending order.
848 rc.append(" ".join([ii.fmt(ii.label, col.width)
849 for (ii, col) in zip(self._descs, self._cols)]))
851 rc.append(" ".join([ii.fmt(ii.label, col.width)
852 for (ii, col) in zip(self._titles, self._cols)]))
857 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
858 self._column_sort_select):
859 rc.append(" ".join([ii.fmt(dd, col.width)
860 for (ii, col) in zip(self._datas,
866 def curses_screen_begin():
867 """ begin curses screen control. """
868 stdscr = curses.initscr()
875 def curses_screen_end(stdscr):
876 """ end curses screen control. """
884 """ Implements live vs accumulate mode.
886 Flows are stored as key value pairs. The key consists of the content
887 prior to stat fields. The value portion consists of stats in a dictionary
890 @ \todo future add filtering here.
892 def __init__(self, accumulate):
893 self._accumulate = accumulate
894 self._error_count = 0
895 # Values are (stats, last update time.)
896 # The last update time is used for aging.
897 self._flow_lock = threading.Lock()
898 # This dictionary holds individual flows.
900 # This dictionary holds aggregate of flow fields.
903 def accumulate_get(self):
904 """ Return the current accumulate state. """
905 return self._accumulate
907 def accumulate_toggle(self):
908 """ toggle accumulate flow behavior. """
909 self._accumulate = not self._accumulate
912 """ Indicate the beginning of processing flow content.
913 if accumulate is false clear current set of flows. """
915 if (not self._accumulate):
916 self._flow_lock.acquire()
920 self._flow_lock.release()
923 def flow_line_add(self, line):
924 """ Split a line from a ovs-dpctl dump-flow into key and stats.
925 The order of the content in the flow should be:
930 This method also assumes that the dump flow output does not
931 change order of fields of the same flow.
934 line = line.rstrip("\n")
935 (fields, stats, _) = flow_line_split(line)
938 fields_dict = elements_to_dict(fields)
940 if (len(fields_dict) == 0):
941 raise ValueError("flow fields are missing %s", line)
943 stats_dict = elements_to_dict(stats)
944 if (len(stats_dict) == 0):
945 raise ValueError("statistics are missing %s.", line)
948 # In accumulate mode, the Flow database can reach 10,000's of
949 # persistent flows. The interaction of the script with this many
950 # flows is too slow. Instead, delta are sent to the flow_db
951 # database allow incremental changes to be done in O(m) time
952 # where m is the current flow list, instead of iterating over
953 # all flows in O(n) time where n is the entire history of flows.
954 key = ",".join(fields)
956 self._flow_lock.acquire()
958 (stats_old_dict, _) = self._flows.get(key, (None, None))
960 self._flow_lock.release()
962 self.flow_event(fields_dict, stats_old_dict, stats_dict)
964 except ValueError, arg:
966 self._error_count += 1
969 self._flow_lock.acquire()
971 self._flows[key] = (stats_dict, datetime.datetime.now())
973 self._flow_lock.release()
975 def decay(self, decayTimeInSeconds):
976 """ Decay content. """
977 now = datetime.datetime.now()
978 for (key, value) in self._flows.items():
979 (stats_dict, updateTime) = value
980 delta = now - updateTime
982 if (delta.seconds > decayTimeInSeconds):
983 self._flow_lock.acquire()
987 fields_dict = elements_to_dict(flow_line_iter(key))
988 matches = flow_aggregate(fields_dict, stats_dict)
989 for match in matches:
990 self.field_dec(match)
993 self._flow_lock.release()
995 def flow_stats_get(self):
996 """ Return statistics in a form of a dictionary. """
998 self._flow_lock.acquire()
1000 rc = {"flow_total": len(self._flows),
1001 "flow_errors": self._error_count}
1003 self._flow_lock.release()
1006 def field_types_get(self):
1007 """ Return the set of types stored in the singleton. """
1008 types = set((ii.field_type for ii in self._fields.values()))
1011 def field_add(self, data):
1012 """ Collect dump-flow data to sum number of times item appears. """
1013 current = self._fields.get(repr(data), None)
1014 if (current is None):
1015 current = copy.copy(data)
1018 self._fields[repr(current)] = current
1020 def field_dec(self, data):
1021 """ Collect dump-flow data to sum number of times item appears. """
1022 current = self._fields.get(repr(data), None)
1023 if (current is None):
1024 raise ValueError("decrementing field missing %s" % repr(data))
1027 self._fields[repr(current)] = current
1028 if (current.count == 0):
1029 del self._fields[repr(current)]
1031 def field_values_in_order(self, field_type_select, column_order):
1032 """ Return a list of items in order maximum first. """
1033 values = self._fields.values()
1034 if (field_type_select != "all"):
1035 # If a field type other than "all" then reduce the list.
1036 values = [ii for ii in values
1037 if (ii.field_type == field_type_select)]
1038 values = [(column_picker(column_order, ii), ii) for ii in values]
1039 values.sort(key=operator.itemgetter(0))
1041 values = [ii[1] for ii in values]
1044 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1045 """ Receives new flow information. """
1047 # In order to avoid processing every flow at every sample
1048 # period, changes in flow packet count is used to determine the
1049 # delta in the flow statistics. This delta is used in the call
1050 # to self.decrement prior to self.field_add
1052 if (stats_old_dict is None):
1053 # This is a new flow
1054 matches = flow_aggregate(fields_dict, stats_new_dict)
1055 for match in matches:
1056 self.field_add(match)
1058 old_packets = int(stats_old_dict.get("packets", 0))
1059 new_packets = int(stats_new_dict.get("packets", 0))
1060 if (old_packets == new_packets):
1061 # ignore. same data.
1064 old_bytes = stats_old_dict.get("bytes", 0)
1065 # old_packets != new_packets
1066 # if old_packets > new_packets then we end up decrementing
1067 # packets and bytes.
1068 matches = flow_aggregate(fields_dict, stats_new_dict)
1069 for match in matches:
1070 match.decrement(int(old_packets), int(old_bytes), 1)
1071 self.field_add(match)
1074 class DecayThread(threading.Thread):
1075 """ Periodically call flow database to see if any flows are old. """
1076 def __init__(self, flow_db, interval):
1077 """ Start decay thread. """
1078 threading.Thread.__init__(self)
1080 self._interval = max(1, interval)
1081 self._min_interval = min(1, interval / 10)
1082 self._flow_db = flow_db
1083 self._event = threading.Event()
1084 self._running = True
1089 """ Worker thread which handles decaying accumulated flows. """
1091 while(self._running):
1092 self._event.wait(self._min_interval)
1094 self._flow_db.decay(self._interval)
1097 """ Stop thread. """
1098 self._running = False
1101 # Give the calling thread time to terminate but not too long.
1102 # this thread is a daemon so the application will terminate if
1103 # we timeout during the join. This is just a cleaner way to
1104 # release resources.
1108 def flow_top_command(stdscr, render, flow_db):
1109 """ Handle input while in top mode. """
1112 # Any character will restart sampling.
1113 if (ch == ord('h')):
1119 if (ch == ord('s')):
1120 # toggle which column sorts data in descending order.
1121 render.column_select_event()
1122 elif (ch == ord('a')):
1123 flow_db.accumulate_toggle()
1124 elif (ch == ord('f')):
1125 render.field_type_toggle()
1126 elif (ch == ord(' ')):
1133 def decay_timer_start(flow_db, accumulateDecay):
1134 """ If accumulateDecay greater than zero then start timer. """
1135 if (accumulateDecay > 0):
1136 decay_timer = DecayThread(flow_db, accumulateDecay)
1143 def flows_top(args):
1144 """ handles top like behavior when --script is not specified. """
1146 flow_db = FlowDB(args.accumulate)
1149 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1153 stdscr = curses_screen_begin()
1157 stdscr.timeout(args.delay)
1159 while (ch != ord('q')):
1163 ihdl = top_input_get(args)
1165 flows_read(ihdl, flow_db)
1168 except OSError, arg:
1169 logging.critical(arg)
1172 (console_height, console_width) = stdscr.getmaxyx()
1173 render.console_width_set(console_width)
1175 output_height = console_height - 1
1176 line_count = range(output_height)
1177 line_output = render.format(flow_db)
1178 lines = zip(line_count, line_output[:output_height])
1181 for (count, line) in lines:
1182 stdscr.addstr(count, 0, line[:console_width])
1185 ch = flow_top_command(stdscr, render, flow_db)
1188 curses_screen_end(stdscr)
1189 except KeyboardInterrupt:
1195 for (count, line) in lines:
1199 def flows_script(args):
1200 """ handles --script option. """
1202 flow_db = FlowDB(args.accumulate)
1205 if (args.flowFiles is None):
1206 logging.info("reading flows from stdin")
1207 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1209 flow_db = flows_read(ihdl, flow_db)
1213 for flowFile in args.flowFiles:
1214 logging.info("reading flows from %s", flowFile)
1215 ihdl = open(flowFile, "r")
1217 flow_db = flows_read(ihdl, flow_db)
1221 (_, console_width) = get_terminal_size()
1222 render = Render(console_width)
1224 for line in render.format(flow_db):
1229 """ Return 0 on success or 1 on failure.
1232 There are four stages to the process ovs-dpctl dump-flow content.
1233 1. Retrieve current input
1234 2. store in FlowDB and maintain history
1235 3. Iterate over FlowDB and aggregating stats for each flow field
1238 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1239 is called. Future version will have more elaborate means for collecting
1240 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1241 dictionary. Input will vary.
1243 In the case of accumulate mode, flows are not purged from the FlowDB
1244 manager. Instead at the very least, merely the latest statistics are
1245 kept. In the case, of live output the FlowDB is purged prior to sampling
1248 Aggregating results requires identify flow fields to aggregate out
1249 of the flow and summing stats.
1259 except KeyboardInterrupt:
1263 if __name__ == '__main__':
1265 elif __name__ == 'ovs-dpctl-top':
1266 # pylint: disable-msg=R0915
1269 # Test case beyond this point.
1270 # pylint: disable-msg=R0904
1271 class TestsuiteFlowParse(unittest.TestCase):
1273 parse flow into hierarchy of dictionaries.
1275 def test_flow_parse(self):
1276 """ test_flow_parse. """
1277 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1278 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1279 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1280 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1281 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1282 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1283 "38,41,44,47,50,53,56,59,62,65"
1285 (fields, stats, _) = flow_line_split(line)
1286 flow_dict = elements_to_dict(fields + stats)
1287 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1288 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1289 self.assertEqual(flow_dict["ipv6"]["src"],
1290 "fe80::55bf:fe42:bc96:2812")
1291 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1292 self.assertEqual(flow_dict["packets"], "1")
1293 self.assertEqual(flow_dict["bytes"], "92")
1295 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1296 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1297 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1298 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1299 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1300 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1301 "38,41,44,47,50,53,56,59,62,65"
1303 (fields, stats, _) = flow_line_split(line)
1304 flow_dict = elements_to_dict(fields + stats)
1305 self.assertEqual(flow_dict["used"], "-0.703s")
1306 self.assertEqual(flow_dict["packets"], "1")
1307 self.assertEqual(flow_dict["bytes"], "92")
1309 def test_flow_sum(self):
1310 """ test_flow_sum. """
1311 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1312 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1313 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1314 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1315 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1316 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1317 "38,41,44,47,50,53,56,59,62,65"
1319 (fields, stats, _) = flow_line_split(line)
1320 stats_dict = elements_to_dict(stats)
1321 fields_dict = elements_to_dict(fields)
1323 # Test simple case of one line.
1324 flow_db = FlowDB(False)
1325 matches = flow_aggregate(fields_dict, stats_dict)
1326 for match in matches:
1327 flow_db.field_add(match)
1329 flow_types = flow_db.field_types_get()
1330 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1331 self.assert_(len(flow_types) == len(expected_flow_types))
1332 for flow_type in flow_types:
1333 self.assertTrue(flow_type in expected_flow_types)
1335 for flow_type in flow_types:
1336 sum_value = flow_db.field_values_in_order("all", 1)
1337 self.assert_(len(sum_value) == 5)
1338 self.assert_(sum_value[0].packets == 2)
1339 self.assert_(sum_value[0].count == 1)
1340 self.assert_(sum_value[0].bytes == 92)
1343 # Add line again just to see counts go up.
1344 matches = flow_aggregate(fields_dict, stats_dict)
1345 for match in matches:
1346 flow_db.field_add(match)
1348 flow_types = flow_db.field_types_get()
1349 self.assert_(len(flow_types) == len(expected_flow_types))
1350 for flow_type in flow_types:
1351 self.assertTrue(flow_type in expected_flow_types)
1353 for flow_type in flow_types:
1354 sum_value = flow_db.field_values_in_order("all", 1)
1355 self.assert_(len(sum_value) == 5)
1356 self.assert_(sum_value[0].packets == 4)
1357 self.assert_(sum_value[0].count == 2)
1358 self.assert_(sum_value[0].bytes == 2 * 92)
1360 def test_assoc_list(self):
1361 """ test_assoc_list. """
1362 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1363 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1364 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1365 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1366 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1367 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1368 "38,41,44,47,50,53,56,59,62,65"
1374 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1375 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1378 (fields, stats, _) = flow_line_split(line)
1379 stats_dict = elements_to_dict(stats)
1380 fields_dict = elements_to_dict(fields)
1383 # Test simple case of one line.
1384 flow_db = FlowDB(False)
1385 matches = flow_aggregate(fields_dict, stats_dict)
1386 for match in matches:
1387 flow_db.field_add(match)
1389 for sum_value in flow_db.field_values_in_order("all", 1):
1390 assoc_list = Columns.assoc_list(sum_value)
1391 for item in assoc_list:
1392 if (item[0] == "fields"):
1393 self.assertTrue(item[1] in valid_flows)
1394 elif (item[0] == "packets"):
1395 self.assertTrue(item[1] == 2)
1396 elif (item[0] == "count"):
1397 self.assertTrue(item[1] == 1)
1398 elif (item[0] == "average"):
1399 self.assertTrue(item[1] == 46.0)
1400 elif (item[0] == "bytes"):
1401 self.assertTrue(item[1] == 92)
1403 raise ValueError("unknown %s", item[0])
1405 def test_human_format(self):
1406 """ test_assoc_list. """
1408 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1409 self.assertEqual(approximate_size(1024), "1.0 KiB")
1410 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1411 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1413 value = (1024 * 1024 * 1024) + 100000000
1414 self.assertEqual(approximate_size(value), "1.1 GiB")
1416 def test_flow_line_split(self):
1417 """ Splitting a flow line is not trivial.
1418 There is no clear delimiter. Comma is used liberally."""
1419 expected_fields = ["in_port(4)",
1420 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1422 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1423 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1424 "udp(src=61252,dst=5355)"]
1425 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1426 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1427 "38,41,44,47,50,53,56,59,62,65"
1429 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1430 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1431 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1432 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1433 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1434 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1435 "38,41,44,47,50,53,56,59,62,65"
1437 (fields, stats, actions) = flow_line_split(line)
1439 self.assertEqual(fields, expected_fields)
1440 self.assertEqual(stats, expected_stats)
1441 self.assertEqual(actions, expected_actions)
1443 def test_accumulate_decay(self):
1444 """ test_accumulate_decay: test accumulated decay. """
1445 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1446 "dst=ff:ff:ff:ff:ff:ff),"
1447 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1448 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1449 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1450 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1451 "packets:1, bytes:120, used:0.004s, actions:1"]
1453 flow_db = FlowDB(True)
1455 flow_db.flow_line_add(lines[0])
1457 # Make sure we decay
1459 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1461 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1463 flow_db.flow_line_add(lines[0])
1464 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1466 # Should not be deleted.
1467 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1469 flow_db.flow_line_add(lines[0])
1470 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1471 timer = decay_timer_start(flow_db, 2)
1473 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1476 def test_accumulate(self):
1477 """ test_accumulate test that FlowDB supports accumulate. """
1479 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1480 "dst=ff:ff:ff:ff:ff:ff),"
1481 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1482 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1483 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1484 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1485 "packets:1, bytes:120, used:0.004s, actions:1",
1487 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1488 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1489 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1490 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1491 "packets:2, bytes:5026, used:0.348s, actions:1",
1492 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1493 "dst=ff:ff:ff:ff:ff:ff),"
1494 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1495 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1496 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1497 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1498 "bytes:240, used:0.004s, actions:1"]
1501 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1502 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1503 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1504 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1505 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1506 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1509 # Turn on accumulate.
1510 flow_db = FlowDB(True)
1513 flow_db.flow_line_add(lines[0])
1515 # Test one flow exist.
1516 sum_values = flow_db.field_values_in_order("all", 1)
1517 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1518 self.assertEqual(len(in_ports), 1)
1519 self.assertEqual(in_ports[0].packets, 1)
1520 self.assertEqual(in_ports[0].bytes, 120)
1521 self.assertEqual(in_ports[0].count, 1)
1523 # simulate another sample
1524 # Test two different flows exist.
1526 flow_db.flow_line_add(lines[1])
1527 sum_values = flow_db.field_values_in_order("all", 1)
1528 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1529 self.assertEqual(len(in_ports), 1)
1530 self.assertEqual(in_ports[0].packets, 1)
1531 self.assertEqual(in_ports[0].bytes, 120)
1532 self.assertEqual(in_ports[0].count, 1)
1534 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1535 self.assertEqual(len(in_ports), 1)
1536 self.assertEqual(in_ports[0].packets, 2)
1537 self.assertEqual(in_ports[0].bytes, 126)
1538 self.assertEqual(in_ports[0].count, 1)
1540 # Test first flow increments packets.
1542 flow_db.flow_line_add(lines[2])
1543 sum_values = flow_db.field_values_in_order("all", 1)
1544 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1545 self.assertEqual(len(in_ports), 1)
1546 self.assertEqual(in_ports[0].packets, 2)
1547 self.assertEqual(in_ports[0].bytes, 240)
1548 self.assertEqual(in_ports[0].count, 1)
1550 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1551 self.assertEqual(len(in_ports), 1)
1552 self.assertEqual(in_ports[0].packets, 2)
1553 self.assertEqual(in_ports[0].bytes, 126)
1554 self.assertEqual(in_ports[0].count, 1)
1556 # Test third flow but with the same in_port(1) as the first flow.
1558 flow_db.flow_line_add(lines[3])
1559 sum_values = flow_db.field_values_in_order("all", 1)
1560 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1561 self.assertEqual(len(in_ports), 1)
1562 self.assertEqual(in_ports[0].packets, 3)
1563 self.assertEqual(in_ports[0].bytes, 360)
1564 self.assertEqual(in_ports[0].count, 2)
1566 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1567 self.assertEqual(len(in_ports), 1)
1568 self.assertEqual(in_ports[0].packets, 2)
1569 self.assertEqual(in_ports[0].bytes, 126)
1570 self.assertEqual(in_ports[0].count, 1)
1572 # Third flow has changes.
1574 flow_db.flow_line_add(lines[4])
1575 sum_values = flow_db.field_values_in_order("all", 1)
1576 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1577 self.assertEqual(len(in_ports), 1)
1578 self.assertEqual(in_ports[0].packets, 4)
1579 self.assertEqual(in_ports[0].bytes, 480)
1580 self.assertEqual(in_ports[0].count, 2)
1582 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1583 self.assertEqual(len(in_ports), 1)
1584 self.assertEqual(in_ports[0].packets, 2)
1585 self.assertEqual(in_ports[0].bytes, 126)
1586 self.assertEqual(in_ports[0].count, 1)
1590 flow_db.flow_line_add(lines[5])
1591 sum_values = flow_db.field_values_in_order("all", 1)
1592 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1593 self.assertEqual(len(in_ports), 1)
1594 self.assertEqual(in_ports[0].packets, 3)
1595 self.assertEqual(in_ports[0].bytes, 360)
1596 self.assertEqual(in_ports[0].count, 2)
1598 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1599 self.assertEqual(len(in_ports), 1)
1600 self.assertEqual(in_ports[0].packets, 2)
1601 self.assertEqual(in_ports[0].bytes, 126)
1602 self.assertEqual(in_ports[0].count, 1)
1604 def test_parse_character_errors(self):
1605 """ test_parsing errors.
1606 The flow parses is purposely loose. Its not designed to validate
1607 input. Merely pull out what it can but there are situations
1608 that a parse error can be detected.
1611 lines = ["complete garbage",
1612 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1613 "dst=33:33:00:00:00:66),"
1614 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1615 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1616 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1617 "packets:2,bytes:5026,actions:1"]
1619 flow_db = FlowDB(False)
1623 flow_db.flow_line_add(line)
1625 # We want an exception. That is how we know we have
1626 # correctly found a simple parsing error. We are not
1627 # looking to validate flow output just catch simple issues.
1629 self.assertTrue(False)
1631 def test_tunnel_parsing(self):
1632 """ test_tunnel_parsing test parse flows with tunnel. """
1634 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1635 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1636 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1637 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1638 "actions:userspace(pid=4294962691,slow_path(cfm))"
1640 flow_db = FlowDB(False)
1642 flow_db.flow_line_add(lines[0])
1643 sum_values = flow_db.field_values_in_order("all", 1)
1644 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1645 self.assertEqual(len(in_ports), 1)
1646 self.assertEqual(in_ports[0].packets, 6)
1647 self.assertEqual(in_ports[0].bytes, 534)
1648 self.assertEqual(in_ports[0].count, 1)
1650 def test_flow_multiple_paren(self):
1651 """ test_flow_multiple_paren. """
1652 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1653 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1655 rc = flow_line_iter(line)
1656 self.assertEqual(valid, rc)
1658 def test_to_network(self):
1659 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1661 ("192.168.0.1", "192.168.0.1"),
1662 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1663 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1664 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1665 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1666 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1667 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1668 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1669 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1670 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1674 ("1::192:168:0:1", "1::192:168:0:1"),
1675 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1676 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1677 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1678 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1679 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1680 ("1::192:168:0:1/::", "::")
1683 for (ipv4_test, ipv4_check) in ipv4s:
1684 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1686 for (ipv6_test, ipv6_check) in ipv6s:
1687 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)