From: Mark Hamilton Date: Fri, 13 Sep 2013 22:50:48 +0000 (-0700) Subject: utilities: a top like tool for ovs-dpctl dump-flows. X-Git-Tag: sliver-openvswitch-2.0.90-1~13^2~27 X-Git-Url: http://git.onelab.eu/?p=sliver-openvswitch.git;a=commitdiff_plain;h=14b4c575c28421d1181b509dbeae6e4849c7da69 utilities: a top like tool for ovs-dpctl dump-flows. This python script summarizes ovs-dpctl dump-flows content by aggregating the number of packets, total bytes and occurrence of the following fields: - Datapath in_port - Ethernet type - Source and destination MAC addresses - IP protocol - Source and destination IPv4 addresses - Source and destination IPv6 addresses - UDP and TCP destination port - Tunnel source and destination addresses Testing included confirming both mega-flows and non-megaflows are properly parsed. Bit masks are applied in the case of mega-flows prior to aggregation. Test --script parameter which runs in non-interactive mode. Tested syntax against python 2.4.3, 2.6 and 2.7. Confirmed script passes pep8 and pylint run as: pylint --disable=I0011 --include-id=y --reports=n This tool has been added to these distribution: - add ovs-dpctl-top to debian distribution - add ovs-dpctl-top to rpm distribution. - add ovs-dpctl-top to XenServer RPM. Signed-off-by: Mark Hamilton Signed-off-by: Gurucharan Shetty --- diff --git a/debian/control b/debian/control index fe58b3172..46b563070 100644 --- a/debian/control +++ b/debian/control @@ -66,7 +66,7 @@ Description: Open vSwitch common components Package: openvswitch-switch Architecture: linux-any Suggests: openvswitch-datapath-module -Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase +Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase, python-argparse Description: Open vSwitch switch implementations Open vSwitch is a production quality, multilayer, software-based, Ethernet virtual switch. It is designed to enable massive network diff --git a/debian/openvswitch-switch.install b/debian/openvswitch-switch.install index 4d7a15b17..c4f1426fe 100644 --- a/debian/openvswitch-switch.install +++ b/debian/openvswitch-switch.install @@ -1,4 +1,5 @@ usr/bin/ovs-dpctl +usr/bin/ovs-dpctl-top usr/bin/ovs-pcap usr/bin/ovs-tcpundump usr/bin/ovs-vlan-test diff --git a/debian/openvswitch-switch.manpages b/debian/openvswitch-switch.manpages index a0a331c3a..0afb675cf 100644 --- a/debian/openvswitch-switch.manpages +++ b/debian/openvswitch-switch.manpages @@ -1,5 +1,6 @@ _debian/ovsdb/ovsdb-server.1 _debian/utilities/ovs-dpctl.8 +_debian/utilities/ovs-dpctl-top.8 _debian/utilities/ovs-pcap.1 _debian/utilities/ovs-tcpundump.1 _debian/utilities/ovs-vlan-test.8 diff --git a/manpages.mk b/manpages.mk index 263f2ea2e..811d2f992 100644 --- a/manpages.mk +++ b/manpages.mk @@ -124,6 +124,10 @@ utilities/ovs-dpctl.8.in: lib/common.man: lib/vlog.man: +utilities/ovs-dpctl-top.8: \ + utilities/ovs-dpctl-top.8.in +utilities/ovs-dpctl-top.8.in: + utilities/ovs-l3ping.8: \ utilities/ovs-l3ping.8.in \ lib/common-syn.man \ diff --git a/rhel/openvswitch.spec.in b/rhel/openvswitch.spec.in index 0fd52004d..f77cd3a65 100644 --- a/rhel/openvswitch.spec.in +++ b/rhel/openvswitch.spec.in @@ -114,6 +114,7 @@ exit 0 /usr/bin/ovs-appctl /usr/bin/ovs-benchmark /usr/bin/ovs-dpctl +/usr/bin/ovs-dpctl-top /usr/bin/ovs-ofctl /usr/bin/ovs-parse-backtrace /usr/bin/ovs-pcap @@ -137,6 +138,7 @@ exit 0 /usr/share/man/man8/ovs-bugtool.8.gz /usr/share/man/man8/ovs-ctl.8.gz /usr/share/man/man8/ovs-dpctl.8.gz +/usr/share/man/man8/ovs-dpctl-top.8.gz /usr/share/man/man8/ovs-ofctl.8.gz /usr/share/man/man8/ovs-parse-backtrace.8.gz /usr/share/man/man8/ovs-pki.8.gz diff --git a/utilities/automake.mk b/utilities/automake.mk index 9f2bb6347..ff50a3438 100644 --- a/utilities/automake.mk +++ b/utilities/automake.mk @@ -7,6 +7,7 @@ bin_PROGRAMS += \ bin_SCRIPTS += utilities/ovs-pki if HAVE_PYTHON bin_SCRIPTS += \ + utilities/ovs-dpctl-top \ utilities/ovs-l3ping \ utilities/ovs-parse-backtrace \ utilities/ovs-pcap \ @@ -24,6 +25,7 @@ EXTRA_DIST += \ utilities/ovs-check-dead-ifs.in \ utilities/ovs-ctl.in \ utilities/ovs-dev.py \ + utilities/ovs-dpctl-top.in \ utilities/ovs-l3ping.in \ utilities/ovs-lib.in \ utilities/ovs-parse-backtrace.in \ @@ -39,6 +41,7 @@ MAN_ROOTS += \ utilities/ovs-controller.8.in \ utilities/ovs-ctl.8 \ utilities/ovs-dpctl.8.in \ + utilities/ovs-dpctl-top.8.in \ utilities/ovs-l3ping.8.in \ utilities/ovs-ofctl.8.in \ utilities/ovs-parse-backtrace.8 \ @@ -57,6 +60,8 @@ DISTCLEANFILES += \ utilities/ovs-check-dead-ifs \ utilities/ovs-controller.8 \ utilities/ovs-dpctl.8 \ + utilities/ovs-dpctl-top \ + utilities/ovs-dpctl-top.8 \ utilities/ovs-l3ping \ utilities/ovs-l3ping.8 \ utilities/ovs-lib \ @@ -80,6 +85,7 @@ man_MANS += \ utilities/ovs-benchmark.1 \ utilities/ovs-controller.8 \ utilities/ovs-dpctl.8 \ + utilities/ovs-dpctl-top.8 \ utilities/ovs-l3ping.8 \ utilities/ovs-ofctl.8 \ utilities/ovs-parse-backtrace.8 \ diff --git a/utilities/ovs-dpctl-top.8.in b/utilities/ovs-dpctl-top.8.in new file mode 100644 index 000000000..410e99959 --- /dev/null +++ b/utilities/ovs-dpctl-top.8.in @@ -0,0 +1,140 @@ +.de IQ +. br +. ns +. IP "\\$1" +.. +.TH ovs\-dpctl\-top "8" "@VERSION@" "Open vSwitch" "Open vSwitch Manual" +. +.SH NAME +\fBovs\-dpctl\-top\fR \- Top like behavior for ovs\-dpctl dump\-flows +. +.SH SYNOPSIS +\fBovs\-dpctl\-top\fR [\-h] [\-v] [\-f FLOWFILES] [\-V] [\-s] [\-\-host HOST] +[\-a | \-\-accumulate] [\-\-accumulate\-decay ACCUMULATEDECAY] [\-d DELAY] +. +.SH DESCRIPTION +.PP +This program summarizes \fBovs\-dpctl\fR flow content by aggregating the number +of packets, total bytes and occurrence of the following fields: +.IP +\- Datapath in_port +.IP +\- Ethernet type +.IP +\- Source and destination MAC addresses +.IP +\- IP protocol +.IP +\- Source and destination IPv4 addresses +.IP +\- Source and destination IPv6 addresses +.IP +\- UDP and TCP destination port +.IP +\- Tunnel source and destination addresses +. +.SS "Output shows four values:" +.IP +\- FIELDS: the flow fields for example in_port(1). +.IP +\- COUNT: the number of lines in the dump\-flow output contain the flow field. +.IP +\- PACKETS: the total number of packets containing the flow field. +.IP +\- BYTES: the total number of bytes containing the flow field. If units are +not present then values are in bytes. +.IP +\- AVERAGE: the average packets size (BYTES/PACKET). +.PP +.SS "Top Behavior" +.PP +While in top mode, the default behavior, the following single character commands +are supported: +.IP +a \- toggles top in accumulate and live mode. Accumulate mode is described +below. +.IP +s \- toggles which column is used to sort content in decreasing order. A +DESC title is placed over the column. +.IP +_ \- a space indicating to collect dump\-flow content again +.IP +h \- halt output. Any character will restart sampling +.IP +f \- cycle through flow fields +.IP +q \- q for quit. +.PP +.SS "Accumulate Mode" +.PP +There are two supported modes: live and accumulate. The default is live. +The parameter \fB\-\-accumulate\fR or the 'a' character in top mode enables the +latter. In live mode, recent dump\-flow content is presented. +Where as accumulate mode keeps track of the prior historical +information until the flow is reset not when the flow is purged. Reset +flows are determined when the packet count for a flow has decreased from +its previous sample. There is one caveat, eventually the system will +run out of memory if, after the accumulate\-decay period any flows that +have not been refreshed are purged. The goal here is to free memory +of flows that are not active. Statistics are not decremented. Their purpose +is to reflect the overall history of the flow fields. +.PP +.SS "Debugging Errors" +.PP +Parsing errors are counted and displayed in the status line at the beginning +of the output. Use the \fB\-\-verbose\fR option with \fB\-\-script to see +what output was not parsed, like this: +.PP +$ ovs\-dpctl dump\-flows | ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-verbose\fR +.PP +Error messages will identify content that failed to parse. +.PP +.SS "Access Remote Hosts" +.PP +The \fB\-\-host\fR must follow the format user@hostname. This script simply +calls \&'ssh user@Hostname' without checking for login credentials therefore +public keys should be installed on the system identified by hostname, such as: +.PP +$ ssh\-copy\-id user@hostname +.PP +Consult ssh\-copy\-id man pages for more details. +.PP +.SS "Expected usage" +.PP +$ ovs\-dpctl\-top +.PP +or to run as a script: +.PP +$ ovs\-dpctl dump\-flows > dump\-flows.log +.PP +$ ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-flow\-file\fR dump\-flows.log +.SS "OPTIONS" +.TP +\fB\-h\fR, \fB\-\-help\fR +show this help message and exit. +.TP +\fB\-v\fR, \fB\-\-version\fR +show program's version number and exit. +.TP +\fB\-f\fR FLOWFILES, \fB\-\-flow\-file\fR FLOWFILES +file containing flows from ovs\-dpctl dump\-flow. +.TP +\fB\-V\fR, \fB\-\-verbose\fR +enable debug level verbosity. +.TP +\fB\-s\fR, \fB\-\-script\fR +Run from a script (no user interface). +.TP +\fB\-\-host\fR HOST +Specify a user@host for retrieving flows see Accessing +Remote Hosts for more information. +.TP +\fB\-a\fR, \fB\-\-accumulate\fR +Accumulate dump\-flow content. +.TP +\fB\-\-accumulate\-decay\fR ACCUMULATEDECAY +Decay old accumulated flows. The default is 5 minutes. A value of 0 disables +decay. +.TP +\fB\-d\fR DELAY, \fB\-\-delay\fR DELAY +Delay in milliseconds to collect dump\-flow content (sample rate). diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in new file mode 100755 index 000000000..f43fdeb7a --- /dev/null +++ b/utilities/ovs-dpctl-top.in @@ -0,0 +1,1687 @@ +#! @PYTHON@ +# +# Copyright (c) 2013 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# The approximate_size code was copied from +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin +# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim, +# used under a Creative Commons Attribution-Share-Alike license: +# http://creativecommons.org/licenses/by-sa/3.0/ +# +# + +"""Top like behavior for ovs-dpctl dump-flows output. + +This program summarizes ovs-dpctl flow content by aggregating the number +of packets, total bytes and occurrence of the following fields: + + - Datapath in_port + + - Ethernet type + + - Source and destination MAC addresses + + - IP protocol + + - Source and destination IPv4 addresses + + - Source and destination IPv6 addresses + + - UDP and TCP destination port + + - Tunnel source and destination addresses + + +Output shows four values: + - FIELDS: the flow fields for example in_port(1). + + - PACKETS: the total number of packets containing the flow field. + + - BYTES: the total number of bytes containing the flow field. If units are + not present then values are in bytes. + + - AVERAGE: the average packets size (BYTES/PACKET). + + - COUNT: the number of lines in the dump-flow output contain the flow field. + +Top Behavior + +While in top mode, the default behavior, the following single character +commands are supported: + + a - toggles top in accumulate and live mode. Accumulate mode is described + below. + + s - toggles which column is used to sort content in decreasing order. A + DESC title is placed over the column. + + _ - a space indicating to collect dump-flow content again + + h - halt output. Any character will restart sampling + + f - cycle through flow fields + + q - q for quit. + +Accumulate Mode + +There are two supported modes: live and accumulate. The default is live. +The parameter --accumulate or the 'a' character in top mode enables the +latter. In live mode, recent dump-flow content is presented. +Where as accumulate mode keeps track of the prior historical +information until the flow is reset not when the flow is purged. Reset +flows are determined when the packet count for a flow has decreased from +its previous sample. There is one caveat, eventually the system will +run out of memory if, after the accumulate-decay period any flows that +have not been refreshed are purged. The goal here is to free memory +of flows that are not active. Statistics are not decremented. Their purpose +is to reflect the overall history of the flow fields. + + +Debugging Errors + +Parsing errors are counted and displayed in the status line at the beginning +of the output. Use the --verbose option with --script to see what output + was not parsed, like this: +$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose + +Error messages will identify content that failed to parse. + + +Access Remote Hosts + +The --host must follow the format user@hostname. This script simply calls +'ssh user@Hostname' without checking for login credentials therefore public +keys should be installed on the system identified by hostname, such as: + +$ ssh-copy-id user@hostname + +Consult ssh-copy-id man pages for more details. + + +Expected usage + +$ ovs-dpctl-top + +or to run as a script: +$ ovs-dpctl dump-flows > dump-flows.log +$ ovs-dpctl-top --script --flow-file dump-flows.log + +""" + +# pylint: disable-msg=C0103 +# pylint: disable-msg=C0302 +# pylint: disable-msg=R0902 +# pylint: disable-msg=R0903 +# pylint: disable-msg=R0904 +# pylint: disable-msg=R0912 +# pylint: disable-msg=R0913 +# pylint: disable-msg=R0914 + +import sys +import os +try: + ## + # Arg parse is not installed on older Python distributions. + # ovs ships with a version in the directory mentioned below. + import argparse +except ImportError: + sys.path.append(os.path.join("@pkgdatadir@", "python")) + import argparse +import logging +import re +import unittest +import copy +import curses +import operator +import subprocess +import fcntl +import struct +import termios +import datetime +import threading +import time +import socket + + +## +# The following two definitions provide the necessary netaddr functionality. +# Python netaddr module is not part of the core installation. Packaging +# netaddr was involved and seems inappropriate given that only two +# methods where used. +def ipv4_to_network(ip_str): + """ Calculate the network given a ipv4/mask value. + If a mask is not present simply return ip_str. + """ + pack_length = '!HH' + try: + (ip, mask) = ip_str.split("/") + except ValueError: + # just an ip address no mask. + return ip_str + + ip_p = socket.inet_pton(socket.AF_INET, ip) + ip_t = struct.unpack(pack_length, ip_p) + mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask)) + network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)] + + return socket.inet_ntop(socket.AF_INET, + struct.pack('!HH', network_n[0], network_n[1])) + + +def ipv6_to_network(ip_str): + """ Calculate the network given a ipv6/mask value. + If a mask is not present simply return ip_str. + """ + pack_length = '!HHHHHHHH' + try: + (ip, mask) = ip_str.split("/") + except ValueError: + # just an ip address no mask. + return ip_str + + ip_p = socket.inet_pton(socket.AF_INET6, ip) + ip_t = struct.unpack(pack_length, ip_p) + mask_t = struct.unpack(pack_length, + socket.inet_pton(socket.AF_INET6, mask)) + network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)] + + return socket.inet_ntop(socket.AF_INET6, + struct.pack(pack_length, + network_n[0], network_n[1], + network_n[2], network_n[3], + network_n[4], network_n[5], + network_n[6], network_n[7])) + + +## +# columns displayed +## +class Columns: + """ Holds column specific content. + Titles needs to be less than 8 characters. + """ + VALUE_WIDTH = 9 + FIELDS = "fields" + PACKETS = "packets" + COUNT = "count" + BYTES = "bytes" + AVERAGE = "average" + + def __init__(self): + pass + + @staticmethod + def assoc_list(obj): + """ Return a associated list. """ + return [(Columns.FIELDS, repr(obj)), + (Columns.PACKETS, obj.packets), + (Columns.BYTES, obj.bytes), + (Columns.COUNT, obj.count), + (Columns.AVERAGE, obj.average), + ] + + +def element_eth_get(field_type, element, stats_dict): + """ Extract eth frame src and dst from a dump-flow element.""" + fmt = "%s(src=%s,dst=%s)" + + element = fmt % (field_type, element["src"], element["dst"]) + return SumData(field_type, element, stats_dict["packets"], + stats_dict["bytes"], element) + + +def element_ipv4_get(field_type, element, stats_dict): + """ Extract src and dst from a dump-flow element.""" + fmt = "%s(src=%s,dst=%s)" + element_show = fmt % (field_type, element["src"], element["dst"]) + + element_key = fmt % (field_type, ipv4_to_network(element["src"]), + ipv4_to_network(element["dst"])) + + return SumData(field_type, element_show, stats_dict["packets"], + stats_dict["bytes"], element_key) + + +def element_tunnel_get(field_type, element, stats_dict): + """ Extract src and dst from a tunnel.""" + return element_ipv4_get(field_type, element, stats_dict) + + +def element_ipv6_get(field_type, element, stats_dict): + """ Extract src and dst from a dump-flow element.""" + + fmt = "%s(src=%s,dst=%s)" + element_show = fmt % (field_type, element["src"], element["dst"]) + + element_key = fmt % (field_type, ipv6_to_network(element["src"]), + ipv6_to_network(element["dst"])) + + return SumData(field_type, element_show, stats_dict["packets"], + stats_dict["bytes"], element_key) + + +def element_dst_port_get(field_type, element, stats_dict): + """ Extract src and dst from a dump-flow element.""" + element_key = "%s(dst=%s)" % (field_type, element["dst"]) + return SumData(field_type, element_key, stats_dict["packets"], + stats_dict["bytes"], element_key) + + +def element_passthrough_get(field_type, element, stats_dict): + """ Extract src and dst from a dump-flow element.""" + element_key = "%s(%s)" % (field_type, element) + return SumData(field_type, element_key, + stats_dict["packets"], stats_dict["bytes"], element_key) + + +# pylint: disable-msg=R0903 +class OutputFormat: + """ Holds field_type and function to extract element value. """ + def __init__(self, field_type, generator): + self.field_type = field_type + self.generator = generator + +OUTPUT_FORMAT = [ + OutputFormat("eth", element_eth_get), + OutputFormat("ipv4", element_ipv4_get), + OutputFormat("ipv6", element_ipv6_get), + OutputFormat("tunnel", element_tunnel_get), + OutputFormat("udp", element_dst_port_get), + OutputFormat("tcp", element_dst_port_get), + OutputFormat("eth_type", element_passthrough_get), + OutputFormat("in_port", element_passthrough_get) + ] + + +ELEMENT_KEY = { + "udp": "udp.dst", + "tcp": "tcp.dst" + } + + +def top_input_get(args): + """ Return subprocess stdout.""" + cmd = [] + if (args.host): + cmd += ["ssh", args.host] + cmd += ["ovs-dpctl", "dump-flows"] + + return subprocess.Popen(cmd, stderr=subprocess.STDOUT, + stdout=subprocess.PIPE).stdout + + +def args_get(): + """ read program parameters handle any necessary validation of input. """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__) + ## + # None is a special value indicating to read flows from stdin. + # This handles the case + # ovs-dpctl dump-flows | ovs-dpctl-flows.py + parser.add_argument("-v", "--version", version="@VERSION@", + action="version", help="show version") + parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None, + action="append", + help="file containing flows from ovs-dpctl dump-flow") + parser.add_argument("-V", "--verbose", dest="verbose", + default=logging.CRITICAL, + action="store_const", const=logging.DEBUG, + help="enable debug level verbosity") + parser.add_argument("-s", "--script", dest="top", action="store_false", + help="Run from a script (no user interface)") + parser.add_argument("--host", dest="host", + help="Specify a user@host for retrieving flows see" + "Accessing Remote Hosts for more information") + + parser.add_argument("-a", "--accumulate", dest="accumulate", + action="store_true", default=False, + help="Accumulate dump-flow content") + parser.add_argument("--accumulate-decay", dest="accumulateDecay", + default=5.0 * 60, type=float, + help="Decay old accumulated flows. " + "The default is 5 minutes. " + "A value of 0 disables decay.") + parser.add_argument("-d", "--delay", dest="delay", type=int, + default=1000, + help="Delay in milliseconds to collect dump-flow " + "content (sample rate).") + + args = parser.parse_args() + + logging.basicConfig(level=args.verbose) + + return args + +### +# Code to parse a single line in dump-flow +### +# key(values) +FIELDS_CMPND = re.compile("([\w]+)\((.+)\)") +# key:value +FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)") +FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)") + + +def flow_line_iter(line): + """ iterate over flow dump elements. + return tuples of (true, element) or (false, remaining element) + """ + # splits by , except for when in a (). Actions element was not + # split properly but we don't need it. + rc = [] + + element = "" + paren_count = 0 + + for ch in line: + if (ch == '('): + paren_count += 1 + elif (ch == ')'): + paren_count -= 1 + + if (ch == ' '): + # ignore white space. + continue + elif ((ch == ',') and (paren_count == 0)): + rc.append(element) + element = "" + else: + element += ch + + if (paren_count): + raise ValueError(line) + else: + if (len(element) > 0): + rc.append(element) + return rc + + +def flow_line_compound_parse(compound): + """ Parse compound element + for example + src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03 + which is in + eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03) + """ + result = {} + for element in flow_line_iter(compound): + match = FIELDS_CMPND_ELEMENT.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = value + + match = FIELDS_CMPND.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = flow_line_compound_parse(value) + continue + + if (len(result.keys()) == 0): + return compound + return result + + +def flow_line_split(line): + """ Convert a flow dump line into ([fields], [stats], actions) tuple. + Where fields and stats are lists. + This function relies on a the following ovs-dpctl dump-flow + output characteristics: + 1. The dumpe flow line consists of a list of frame fields, list of stats + and action. + 2. list of frame fields, each stat and action field are delimited by ', '. + 3. That all other non stat field are not delimited by ', '. + + """ + + results = re.split(', ', line) + + (field, stats, action) = (results[0], results[1:-1], results[-1]) + + fields = flow_line_iter(field) + return (fields, stats, action) + + +def elements_to_dict(elements): + """ Convert line to a hierarchy of dictionaries. """ + result = {} + for element in elements: + match = FIELDS_CMPND.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = flow_line_compound_parse(value) + continue + + match = FIELDS_ELEMENT.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = value + else: + raise ValueError("can't parse >%s<" % element) + return result + + +# pylint: disable-msg=R0903 +class SumData(object): + """ Interface that all data going into SumDb must implement. + Holds the flow field and its corresponding count, total packets, + total bytes and calculates average. + + __repr__ is used as key into SumData singleton. + __str__ is used as human readable output. + """ + + def __init__(self, field_type, field, packets, flow_bytes, key): + # Count is the number of lines in the dump-flow log. + self.field_type = field_type + self.field = field + self.count = 1 + self.packets = int(packets) + self.bytes = int(flow_bytes) + self.key = key + + def decrement(self, decr_packets, decr_bytes, decr_count): + """ Decrement content to calculate delta from previous flow sample.""" + self.packets -= decr_packets + self.bytes -= decr_bytes + self.count -= decr_count + + def __iadd__(self, other): + """ Add two objects. """ + + if (self.key != other.key): + raise ValueError("adding two unrelated types") + + self.count += other.count + self.packets += other.packets + self.bytes += other.bytes + return self + + def __isub__(self, other): + """ Decrement two objects. """ + + if (self.key != other.key): + raise ValueError("adding two unrelated types") + + self.count -= other.count + self.packets -= other.packets + self.bytes -= other.bytes + return self + + def __getattr__(self, name): + """ Handle average. """ + if (name == "average"): + if (self.packets == 0): + return float(0.0) + else: + return float(self.bytes) / float(self.packets) + raise AttributeError(name) + + def __str__(self): + """ Used for debugging. """ + return "%s %s %s %s" % (self.field, self.count, + self.packets, self.bytes) + + def __repr__(self): + """ Used as key in the FlowDB table. """ + return self.key + + +def flow_aggregate(fields_dict, stats_dict): + """ Search for content in a line. + Passed the flow port of the dump-flows plus the current stats consisting + of packets, bytes, etc + """ + result = [] + + for output_format in OUTPUT_FORMAT: + field = fields_dict.get(output_format.field_type, None) + if (field): + obj = output_format.generator(output_format.field_type, + field, stats_dict) + result.append(obj) + + return result + + +def flows_read(ihdl, flow_db): + """ read flow content from ihdl and insert into flow_db. """ + + done = False + while (not done): + line = ihdl.readline() + if (len(line) == 0): + # end of input + break + + try: + flow_db.flow_line_add(line) + except ValueError, arg: + logging.error(arg) + + return flow_db + + +def get_terminal_size(): + """ + return column width and height of the terminal + """ + for fd_io in [0, 1, 2]: + try: + result = struct.unpack('hh', + fcntl.ioctl(fd_io, termios.TIOCGWINSZ, + '1234')) + except IOError: + result = None + continue + + if (result is None or result == (0, 0)): + # Maybe we can't get the width. In that case assume (25, 80) + result = (25, 80) + + return result + +## +# Content derived from: +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin +## +SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'], + 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']} + + +def approximate_size(size, a_kilobyte_is_1024_bytes=True): + """Convert a file size to human-readable form. + + Keyword arguments: + size -- file size in bytes + a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024 + if False, use multiples of 1000 + + Returns: string + + """ + size = float(size) + if size < 0: + raise ValueError('number must be non-negative') + + if (a_kilobyte_is_1024_bytes): + multiple = 1024 + else: + multiple = 1000 + for suffix in SUFFIXES[multiple]: + size /= multiple + if size < multiple: + return "%.1f %s" % (size, suffix) + + raise ValueError('number too large') + + +## +# End copied content +## +class ColMeta: + """ Concepts about columns. """ + def __init__(self, sortable, width): + self.sortable = sortable + self.width = width + + +class RowMeta: + """ How to render rows. """ + def __init__(self, label, fmt): + self.label = label + self.fmt = fmt + + +def fmt_packet(obj, width): + """ Provide a string for packets that is appropriate for output.""" + return str(obj.packets).rjust(width) + + +def fmt_count(obj, width): + """ Provide a string for average that is appropriate for output.""" + return str(obj.count).rjust(width) + + +def fmt_avg(obj, width): + """ Provide a string for average that is appropriate for output.""" + return str(int(obj.average)).rjust(width) + + +def fmt_field(obj, width): + """ truncate really long flow and insert ellipses to help make it + clear. + """ + + ellipses = " ... " + value = obj.field + if (len(obj.field) > width): + value = value[:(width - len(ellipses))] + ellipses + return value.ljust(width) + + +def fmt_bytes(obj, width): + """ Provide a string for average that is appropriate for output.""" + if (len(str(obj.bytes)) <= width): + value = str(obj.bytes) + else: + value = approximate_size(obj.bytes) + return value.rjust(width) + + +def title_center(value, width): + """ Center a column title.""" + return value.upper().center(width) + + +def title_rjust(value, width): + """ Right justify a column title. """ + return value.upper().rjust(width) + + +def column_picker(order, obj): + """ return the column as specified by order. """ + if (order == 1): + return obj.count + elif (order == 2): + return obj.packets + elif (order == 3): + return obj.bytes + elif (order == 4): + return obj.average + else: + raise ValueError("order outside of range %s" % order) + + +class Render: + """ Renders flow data. """ + def __init__(self, console_width): + """ Calculate column widths taking into account changes in format.""" + + self._start_time = datetime.datetime.now() + + self._cols = [ColMeta(False, 0), + ColMeta(True, Columns.VALUE_WIDTH), + ColMeta(True, Columns.VALUE_WIDTH), + ColMeta(True, Columns.VALUE_WIDTH), + ColMeta(True, Columns.VALUE_WIDTH)] + self._console_width = console_width + self.console_width_set(console_width) + + # Order in this array dictate the order of the columns. + # The 0 width for the first entry is a place holder. This is + # dynamically calculated. The first column is special. We need a + # way to indicate which field are presented. + self._descs = [RowMeta("", title_rjust), + RowMeta("", title_rjust), + RowMeta("", title_rjust), + RowMeta("", title_rjust), + RowMeta("", title_rjust)] + self._column_sort_select = 0 + self.column_select_event() + + self._titles = [ + RowMeta(Columns.FIELDS, title_center), + RowMeta(Columns.COUNT, title_rjust), + RowMeta(Columns.PACKETS, title_rjust), + RowMeta(Columns.BYTES, title_rjust), + RowMeta(Columns.AVERAGE, title_rjust) + ] + + self._datas = [ + RowMeta(None, fmt_field), + RowMeta(None, fmt_count), + RowMeta(None, fmt_packet), + RowMeta(None, fmt_bytes), + RowMeta(None, fmt_avg) + ] + + ## + # _field_types hold which fields are displayed in the field + # column, with the keyword all implying all fields. + ## + self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT] + + ## + # The default is to show all field types. + ## + self._field_type_select = -1 + self.field_type_toggle() + + def _field_type_select_get(self): + """ Return which field type to display. """ + return self._field_types[self._field_type_select] + + def field_type_toggle(self): + """ toggle which field types to show. """ + self._field_type_select += 1 + if (self._field_type_select >= len(self._field_types)): + self._field_type_select = 0 + value = Columns.FIELDS + " (%s)" % self._field_type_select_get() + self._titles[0].label = value + + def column_select_event(self): + """ Handles column select toggle. """ + + self._descs[self._column_sort_select].label = "" + for _ in range(len(self._cols)): + self._column_sort_select += 1 + if (self._column_sort_select >= len(self._cols)): + self._column_sort_select = 0 + + # Now look for the next sortable column + if (self._cols[self._column_sort_select].sortable): + break + self._descs[self._column_sort_select].label = "DESC" + + def console_width_set(self, console_width): + """ Adjust the output given the new console_width. """ + self._console_width = console_width + + spaces = len(self._cols) - 1 + ## + # Calculating column width can be tedious but important. The + # flow field value can be long. The goal here is to dedicate + # fixed column space for packets, bytes, average and counts. Give the + # remaining space to the flow column. When numbers get large + # transition output to output generated by approximate_size which + # limits output to ###.# XiB in other words 9 characters. + ## + # At this point, we know the maximum length values. We may + # truncate the flow column to get everything to fit. + self._cols[0].width = 0 + values_max_length = sum([ii.width for ii in self._cols]) + spaces + flow_max_length = console_width - values_max_length + self._cols[0].width = flow_max_length + + def format(self, flow_db): + """ shows flows based on --script parameter.""" + + rc = [] + ## + # Top output consists of + # Title + # Column title (2 rows) + # data + # statistics and status + + ## + # Title + ## + rc.append("Flow Summary".center(self._console_width)) + + stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \ + flow_db.flow_stats_get() + accumulate = flow_db.accumulate_get() + if (accumulate): + stats += "Accumulate: on " + else: + stats += "Accumulate: off " + + duration = datetime.datetime.now() - self._start_time + stats += "Duration: %s " % str(duration) + rc.append(stats.ljust(self._console_width)) + + ## + # 2 rows for columns. + ## + # Indicate which column is in descending order. + rc.append(" ".join([ii.fmt(ii.label, col.width) + for (ii, col) in zip(self._descs, self._cols)])) + + rc.append(" ".join([ii.fmt(ii.label, col.width) + for (ii, col) in zip(self._titles, self._cols)])) + + ## + # Data. + ## + for dd in flow_db.field_values_in_order(self._field_type_select_get(), + self._column_sort_select): + rc.append(" ".join([ii.fmt(dd, col.width) + for (ii, col) in zip(self._datas, + self._cols)])) + + return rc + + +def curses_screen_begin(): + """ begin curses screen control. """ + stdscr = curses.initscr() + curses.cbreak() + curses.noecho() + stdscr.keypad(1) + return stdscr + + +def curses_screen_end(stdscr): + """ end curses screen control. """ + curses.nocbreak() + stdscr.keypad(0) + curses.echo() + curses.endwin() + + +class FlowDB: + """ Implements live vs accumulate mode. + + Flows are stored as key value pairs. The key consists of the content + prior to stat fields. The value portion consists of stats in a dictionary + form. + + @ \todo future add filtering here. + """ + def __init__(self, accumulate): + self._accumulate = accumulate + self._error_count = 0 + # Values are (stats, last update time.) + # The last update time is used for aging. + self._flow_lock = threading.Lock() + # This dictionary holds individual flows. + self._flows = {} + # This dictionary holds aggregate of flow fields. + self._fields = {} + + def accumulate_get(self): + """ Return the current accumulate state. """ + return self._accumulate + + def accumulate_toggle(self): + """ toggle accumulate flow behavior. """ + self._accumulate = not self._accumulate + + def begin(self): + """ Indicate the beginning of processing flow content. + if accumulate is false clear current set of flows. """ + + if (not self._accumulate): + self._flow_lock.acquire() + try: + self._flows.clear() + finally: + self._flow_lock.release() + self._fields.clear() + + def flow_line_add(self, line): + """ Split a line from a ovs-dpctl dump-flow into key and stats. + The order of the content in the flow should be: + - flow content + - stats for the flow + - actions + + This method also assumes that the dump flow output does not + change order of fields of the same flow. + """ + + line = line.rstrip("\n") + (fields, stats, _) = flow_line_split(line) + + try: + fields_dict = elements_to_dict(fields) + + if (len(fields_dict) == 0): + raise ValueError("flow fields are missing %s", line) + + stats_dict = elements_to_dict(stats) + if (len(stats_dict) == 0): + raise ValueError("statistics are missing %s.", line) + + ## + # In accumulate mode, the Flow database can reach 10,000's of + # persistent flows. The interaction of the script with this many + # flows is too slow. Instead, delta are sent to the flow_db + # database allow incremental changes to be done in O(m) time + # where m is the current flow list, instead of iterating over + # all flows in O(n) time where n is the entire history of flows. + key = ",".join(fields) + + self._flow_lock.acquire() + try: + (stats_old_dict, _) = self._flows.get(key, (None, None)) + finally: + self._flow_lock.release() + + self.flow_event(fields_dict, stats_old_dict, stats_dict) + + except ValueError, arg: + logging.error(arg) + self._error_count += 1 + raise + + self._flow_lock.acquire() + try: + self._flows[key] = (stats_dict, datetime.datetime.now()) + finally: + self._flow_lock.release() + + def decay(self, decayTimeInSeconds): + """ Decay content. """ + now = datetime.datetime.now() + for (key, value) in self._flows.items(): + (stats_dict, updateTime) = value + delta = now - updateTime + + if (delta.seconds > decayTimeInSeconds): + self._flow_lock.acquire() + try: + del self._flows[key] + + fields_dict = elements_to_dict(flow_line_iter(key)) + matches = flow_aggregate(fields_dict, stats_dict) + for match in matches: + self.field_dec(match) + + finally: + self._flow_lock.release() + + def flow_stats_get(self): + """ Return statistics in a form of a dictionary. """ + rc = None + self._flow_lock.acquire() + try: + rc = {"flow_total": len(self._flows), + "flow_errors": self._error_count} + finally: + self._flow_lock.release() + return rc + + def field_types_get(self): + """ Return the set of types stored in the singleton. """ + types = set((ii.field_type for ii in self._fields.values())) + return types + + def field_add(self, data): + """ Collect dump-flow data to sum number of times item appears. """ + current = self._fields.get(repr(data), None) + if (current is None): + current = copy.copy(data) + else: + current += data + self._fields[repr(current)] = current + + def field_dec(self, data): + """ Collect dump-flow data to sum number of times item appears. """ + current = self._fields.get(repr(data), None) + if (current is None): + raise ValueError("decrementing field missing %s" % repr(data)) + + current -= data + self._fields[repr(current)] = current + if (current.count == 0): + del self._fields[repr(current)] + + def field_values_in_order(self, field_type_select, column_order): + """ Return a list of items in order maximum first. """ + values = self._fields.values() + if (field_type_select != "all"): + # If a field type other than "all" then reduce the list. + values = [ii for ii in values + if (ii.field_type == field_type_select)] + values = [(column_picker(column_order, ii), ii) for ii in values] + values.sort(key=operator.itemgetter(0)) + values.reverse() + values = [ii[1] for ii in values] + return values + + def flow_event(self, fields_dict, stats_old_dict, stats_new_dict): + """ Receives new flow information. """ + + # In order to avoid processing every flow at every sample + # period, changes in flow packet count is used to determine the + # delta in the flow statistics. This delta is used in the call + # to self.decrement prior to self.field_add + + if (stats_old_dict is None): + # This is a new flow + matches = flow_aggregate(fields_dict, stats_new_dict) + for match in matches: + self.field_add(match) + else: + old_packets = int(stats_old_dict.get("packets", 0)) + new_packets = int(stats_new_dict.get("packets", 0)) + if (old_packets == new_packets): + # ignore. same data. + pass + else: + old_bytes = stats_old_dict.get("bytes", 0) + # old_packets != new_packets + # if old_packets > new_packets then we end up decrementing + # packets and bytes. + matches = flow_aggregate(fields_dict, stats_new_dict) + for match in matches: + match.decrement(int(old_packets), int(old_bytes), 1) + self.field_add(match) + + +class DecayThread(threading.Thread): + """ Periodically call flow database to see if any flows are old. """ + def __init__(self, flow_db, interval): + """ Start decay thread. """ + threading.Thread.__init__(self) + + self._interval = max(1, interval) + self._min_interval = min(1, interval / 10) + self._flow_db = flow_db + self._event = threading.Event() + self._running = True + + self.daemon = True + + def run(self): + """ Worker thread which handles decaying accumulated flows. """ + + while(self._running): + self._event.wait(self._min_interval) + if (self._running): + self._flow_db.decay(self._interval) + + def stop(self): + """ Stop thread. """ + self._running = False + self._event.set() + ## + # Give the calling thread time to terminate but not too long. + # this thread is a daemon so the application will terminate if + # we timeout during the join. This is just a cleaner way to + # release resources. + self.join(2.0) + + +def flow_top_command(stdscr, render, flow_db): + """ Handle input while in top mode. """ + ch = stdscr.getch() + ## + # Any character will restart sampling. + if (ch == ord('h')): + # halt output. + ch = stdscr.getch() + while (ch == -1): + ch = stdscr.getch() + + if (ch == ord('s')): + # toggle which column sorts data in descending order. + render.column_select_event() + elif (ch == ord('a')): + flow_db.accumulate_toggle() + elif (ch == ord('f')): + render.field_type_toggle() + elif (ch == ord(' ')): + # resample + pass + + return ch + + +def decay_timer_start(flow_db, accumulateDecay): + """ If accumulateDecay greater than zero then start timer. """ + if (accumulateDecay > 0): + decay_timer = DecayThread(flow_db, accumulateDecay) + decay_timer.start() + return decay_timer + else: + return None + + +def flows_top(args): + """ handles top like behavior when --script is not specified. """ + + flow_db = FlowDB(args.accumulate) + render = Render(0) + + decay_timer = decay_timer_start(flow_db, args.accumulateDecay) + lines = [] + + try: + stdscr = curses_screen_begin() + try: + ch = 'X' + #stdscr.nodelay(1) + stdscr.timeout(args.delay) + + while (ch != ord('q')): + flow_db.begin() + + try: + ihdl = top_input_get(args) + try: + flows_read(ihdl, flow_db) + finally: + ihdl.close() + except OSError, arg: + logging.critical(arg) + break + + (console_height, console_width) = stdscr.getmaxyx() + render.console_width_set(console_width) + + output_height = console_height - 1 + line_count = range(output_height) + line_output = render.format(flow_db) + lines = zip(line_count, line_output[:output_height]) + + stdscr.erase() + for (count, line) in lines: + stdscr.addstr(count, 0, line[:console_width]) + stdscr.refresh() + + ch = flow_top_command(stdscr, render, flow_db) + + finally: + curses_screen_end(stdscr) + except KeyboardInterrupt: + pass + if (decay_timer): + decay_timer.stop() + + # repeat output + for (count, line) in lines: + print line + + +def flows_script(args): + """ handles --script option. """ + + flow_db = FlowDB(args.accumulate) + flow_db.begin() + + if (args.flowFiles is None): + logging.info("reading flows from stdin") + ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0) + try: + flow_db = flows_read(ihdl, flow_db) + finally: + ihdl.close() + else: + for flowFile in args.flowFiles: + logging.info("reading flows from %s", flowFile) + ihdl = open(flowFile, "r") + try: + flow_db = flows_read(ihdl, flow_db) + finally: + ihdl.close() + + (_, console_width) = get_terminal_size() + render = Render(console_width) + + for line in render.format(flow_db): + print line + + +def main(): + """ Return 0 on success or 1 on failure. + + Algorithm + There are four stages to the process ovs-dpctl dump-flow content. + 1. Retrieve current input + 2. store in FlowDB and maintain history + 3. Iterate over FlowDB and aggregating stats for each flow field + 4. present data. + + Retrieving current input is currently trivial, the ovs-dpctl dump-flow + is called. Future version will have more elaborate means for collecting + dump-flow content. FlowDB returns all data as in the form of a hierarchical + dictionary. Input will vary. + + In the case of accumulate mode, flows are not purged from the FlowDB + manager. Instead at the very least, merely the latest statistics are + kept. In the case, of live output the FlowDB is purged prior to sampling + data. + + Aggregating results requires identify flow fields to aggregate out + of the flow and summing stats. + + """ + args = args_get() + + try: + if (args.top): + flows_top(args) + else: + flows_script(args) + except KeyboardInterrupt: + return 1 + return 0 + +if __name__ == '__main__': + sys.exit(main()) +elif __name__ == 'ovs-dpctl-top': + # pylint: disable-msg=R0915 + + ## + # Test case beyond this point. + # pylint: disable-msg=R0904 + class TestsuiteFlowParse(unittest.TestCase): + """ + parse flow into hierarchy of dictionaries. + """ + def test_flow_parse(self): + """ test_flow_parse. """ + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:1, bytes:92, "\ + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + (fields, stats, _) = flow_line_split(line) + flow_dict = elements_to_dict(fields + stats) + self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8") + self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03") + self.assertEqual(flow_dict["ipv6"]["src"], + "fe80::55bf:fe42:bc96:2812") + self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3") + self.assertEqual(flow_dict["packets"], "1") + self.assertEqual(flow_dict["bytes"], "92") + + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:1, bytes:92, "\ + "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + (fields, stats, _) = flow_line_split(line) + flow_dict = elements_to_dict(fields + stats) + self.assertEqual(flow_dict["used"], "-0.703s") + self.assertEqual(flow_dict["packets"], "1") + self.assertEqual(flow_dict["bytes"], "92") + + def test_flow_sum(self): + """ test_flow_sum. """ + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:2, bytes:92, "\ + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + (fields, stats, _) = flow_line_split(line) + stats_dict = elements_to_dict(stats) + fields_dict = elements_to_dict(fields) + ## + # Test simple case of one line. + flow_db = FlowDB(False) + matches = flow_aggregate(fields_dict, stats_dict) + for match in matches: + flow_db.field_add(match) + + flow_types = flow_db.field_types_get() + expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"] + self.assert_(len(flow_types) == len(expected_flow_types)) + for flow_type in flow_types: + self.assertTrue(flow_type in expected_flow_types) + + for flow_type in flow_types: + sum_value = flow_db.field_values_in_order("all", 1) + self.assert_(len(sum_value) == 5) + self.assert_(sum_value[0].packets == 2) + self.assert_(sum_value[0].count == 1) + self.assert_(sum_value[0].bytes == 92) + + ## + # Add line again just to see counts go up. + matches = flow_aggregate(fields_dict, stats_dict) + for match in matches: + flow_db.field_add(match) + + flow_types = flow_db.field_types_get() + self.assert_(len(flow_types) == len(expected_flow_types)) + for flow_type in flow_types: + self.assertTrue(flow_type in expected_flow_types) + + for flow_type in flow_types: + sum_value = flow_db.field_values_in_order("all", 1) + self.assert_(len(sum_value) == 5) + self.assert_(sum_value[0].packets == 4) + self.assert_(sum_value[0].count == 2) + self.assert_(sum_value[0].bytes == 2 * 92) + + def test_assoc_list(self): + """ test_assoc_list. """ + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:2, bytes:92, "\ + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + valid_flows = [ + 'eth_type(0x86dd)', + 'udp(dst=5355)', + 'in_port(4)', + 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)', + 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)' + ] + + (fields, stats, _) = flow_line_split(line) + stats_dict = elements_to_dict(stats) + fields_dict = elements_to_dict(fields) + + ## + # Test simple case of one line. + flow_db = FlowDB(False) + matches = flow_aggregate(fields_dict, stats_dict) + for match in matches: + flow_db.field_add(match) + + for sum_value in flow_db.field_values_in_order("all", 1): + assoc_list = Columns.assoc_list(sum_value) + for item in assoc_list: + if (item[0] == "fields"): + self.assertTrue(item[1] in valid_flows) + elif (item[0] == "packets"): + self.assertTrue(item[1] == 2) + elif (item[0] == "count"): + self.assertTrue(item[1] == 1) + elif (item[0] == "average"): + self.assertTrue(item[1] == 46.0) + elif (item[0] == "bytes"): + self.assertTrue(item[1] == 92) + else: + raise ValueError("unknown %s", item[0]) + + def test_human_format(self): + """ test_assoc_list. """ + + self.assertEqual(approximate_size(0.0), "0.0 KiB") + self.assertEqual(approximate_size(1024), "1.0 KiB") + self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB") + self.assertEqual(approximate_size((1024 * 1024) + 100000), + "1.1 MiB") + value = (1024 * 1024 * 1024) + 100000000 + self.assertEqual(approximate_size(value), "1.1 GiB") + + def test_flow_line_split(self): + """ Splitting a flow line is not trivial. + There is no clear delimiter. Comma is used liberally.""" + expected_fields = ["in_port(4)", + "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)", + "eth_type(0x86dd)", + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3," + "label=0,proto=17,tclass=0,hlimit=1,frag=no)", + "udp(src=61252,dst=5355)"] + expected_stats = ["packets:2", "bytes:92", "used:0.703s"] + expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \ + "38,41,44,47,50,53,56,59,62,65" + + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:2, bytes:92, "\ + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + (fields, stats, actions) = flow_line_split(line) + + self.assertEqual(fields, expected_fields) + self.assertEqual(stats, expected_stats) + self.assertEqual(actions, expected_actions) + + def test_accumulate_decay(self): + """ test_accumulate_decay: test accumulated decay. """ + lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b," + "dst=ff:ff:ff:ff:ff:ff)," + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255," + "tip=10.24.104.230/255.255.255.255,op=1/0xff," + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00," + "tha=00:00:00:00:00:00/00:00:00:00:00:00), " + "packets:1, bytes:120, used:0.004s, actions:1"] + + flow_db = FlowDB(True) + flow_db.begin() + flow_db.flow_line_add(lines[0]) + + # Make sure we decay + time.sleep(4) + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1) + flow_db.decay(1) + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0) + + flow_db.flow_line_add(lines[0]) + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1) + flow_db.decay(30) + # Should not be deleted. + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1) + + flow_db.flow_line_add(lines[0]) + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1) + timer = decay_timer_start(flow_db, 2) + time.sleep(10) + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0) + timer.stop() + + def test_accumulate(self): + """ test_accumulate test that FlowDB supports accumulate. """ + + lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b," + "dst=ff:ff:ff:ff:ff:ff)," + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255," + "tip=10.24.104.230/255.255.255.255,op=1/0xff," + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00," + "tha=00:00:00:00:00:00/00:00:00:00:00:00), " + "packets:1, bytes:120, used:0.004s, actions:1", + "in_port(2)," + "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66)," + "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::," + "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0," + "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), " + "packets:2, bytes:5026, used:0.348s, actions:1", + "in_port(1),eth(src=ee:ee:ee:ee:ee:ee," + "dst=ff:ff:ff:ff:ff:ff)," + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255," + "tip=10.24.104.230/255.255.255.255,op=1/0xff," + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00," + "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, " + "bytes:240, used:0.004s, actions:1"] + + lines = [ + "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1", + "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1", + "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1", + "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1", + "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1", + "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1", + ] + + # Turn on accumulate. + flow_db = FlowDB(True) + flow_db.begin() + + flow_db.flow_line_add(lines[0]) + + # Test one flow exist. + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 1) + self.assertEqual(in_ports[0].bytes, 120) + self.assertEqual(in_ports[0].count, 1) + + # simulate another sample + # Test two different flows exist. + flow_db.begin() + flow_db.flow_line_add(lines[1]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 1) + self.assertEqual(in_ports[0].bytes, 120) + self.assertEqual(in_ports[0].count, 1) + + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 126) + self.assertEqual(in_ports[0].count, 1) + + # Test first flow increments packets. + flow_db.begin() + flow_db.flow_line_add(lines[2]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 240) + self.assertEqual(in_ports[0].count, 1) + + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 126) + self.assertEqual(in_ports[0].count, 1) + + # Test third flow but with the same in_port(1) as the first flow. + flow_db.begin() + flow_db.flow_line_add(lines[3]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 3) + self.assertEqual(in_ports[0].bytes, 360) + self.assertEqual(in_ports[0].count, 2) + + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 126) + self.assertEqual(in_ports[0].count, 1) + + # Third flow has changes. + flow_db.begin() + flow_db.flow_line_add(lines[4]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 4) + self.assertEqual(in_ports[0].bytes, 480) + self.assertEqual(in_ports[0].count, 2) + + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 126) + self.assertEqual(in_ports[0].count, 1) + + # First flow reset. + flow_db.begin() + flow_db.flow_line_add(lines[5]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 3) + self.assertEqual(in_ports[0].bytes, 360) + self.assertEqual(in_ports[0].count, 2) + + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 2) + self.assertEqual(in_ports[0].bytes, 126) + self.assertEqual(in_ports[0].count, 1) + + def test_parse_character_errors(self): + """ test_parsing errors. + The flow parses is purposely loose. Its not designed to validate + input. Merely pull out what it can but there are situations + that a parse error can be detected. + """ + + lines = ["complete garbage", + "in_port(2),eth(src=68:ef:bd:25:ef:c0," + "dst=33:33:00:00:00:66)," + "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::," + "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0," + "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029)," + "packets:2,bytes:5026,actions:1"] + + flow_db = FlowDB(False) + flow_db.begin() + for line in lines: + try: + flow_db.flow_line_add(line) + except ValueError: + # We want an exception. That is how we know we have + # correctly found a simple parsing error. We are not + # looking to validate flow output just catch simple issues. + continue + self.assertTrue(False) + + def test_tunnel_parsing(self): + """ test_tunnel_parsing test parse flows with tunnel. """ + lines = [ + "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10," + "tos=0x0,ttl=64,flags(key)),in_port(1)," + "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30)," + "eth_type(0x8902), packets:6, bytes:534, used:0.128s, " + "actions:userspace(pid=4294962691,slow_path(cfm))" + ] + flow_db = FlowDB(False) + flow_db.begin() + flow_db.flow_line_add(lines[0]) + sum_values = flow_db.field_values_in_order("all", 1) + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")] + self.assertEqual(len(in_ports), 1) + self.assertEqual(in_ports[0].packets, 6) + self.assertEqual(in_ports[0].bytes, 534) + self.assertEqual(in_ports[0].count, 1) + + def test_flow_multiple_paren(self): + """ test_flow_multiple_paren. """ + line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)" + valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))", + "in_port(2)"] + rc = flow_line_iter(line) + self.assertEqual(valid, rc) + + def test_to_network(self): + """ test_to_network test ipv4_to_network and ipv6_to_network. """ + ipv4s = [ + ("192.168.0.1", "192.168.0.1"), + ("192.168.0.1/255.255.255.255", "192.168.0.1"), + ("192.168.0.1/255.255.255.0", "192.168.0.0"), + ("192.168.0.1/255.255.0.0", "192.168.0.0"), + ("192.168.0.1/255.0.0.0", "192.0.0.0"), + ("192.168.0.1/0.0.0.0", "0.0.0.0"), + ("10.24.106.230/255.255.255.255", "10.24.106.230"), + ("10.24.106.230/255.255.255.0", "10.24.106.0"), + ("10.24.106.0/255.255.255.0", "10.24.106.0"), + ("10.24.106.0/255.255.252.0", "10.24.104.0") + ] + + ipv6s = [ + ("1::192:168:0:1", "1::192:168:0:1"), + ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"), + ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"), + ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"), + ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"), + ("1::192:168:0:1/1::0:0:0:0", "1::"), + ("1::192:168:0:1/::", "::") + ] + + for (ipv4_test, ipv4_check) in ipv4s: + self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check) + + for (ipv6_test, ipv6_check) in ipv6s: + self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check) diff --git a/xenserver/openvswitch-xen.spec.in b/xenserver/openvswitch-xen.spec.in index 4d3b8fa6a..87efd8887 100644 --- a/xenserver/openvswitch-xen.spec.in +++ b/xenserver/openvswitch-xen.spec.in @@ -426,6 +426,7 @@ exit 0 /usr/sbin/ovsdb-server /usr/bin/ovs-appctl /usr/bin/ovs-dpctl +/usr/bin/ovs-dpctl-top /usr/bin/ovs-ofctl /usr/bin/ovs-parse-backtrace /usr/bin/ovs-pcap @@ -443,6 +444,7 @@ exit 0 /usr/share/man/man8/ovs-bugtool.8.gz /usr/share/man/man8/ovs-ctl.8.gz /usr/share/man/man8/ovs-dpctl.8.gz +/usr/share/man/man8/ovs-dpctl-top.8.gz /usr/share/man/man8/ovs-ofctl.8.gz /usr/share/man/man8/ovs-parse-backtrace.8.gz /usr/share/man/man1/ovs-pcap.1.gz