Initial implementation of sFlow.
authorBen Pfaff <blp@nicira.com>
Mon, 4 Jan 2010 21:08:37 +0000 (13:08 -0800)
committerBen Pfaff <blp@nicira.com>
Mon, 4 Jan 2010 21:08:37 +0000 (13:08 -0800)
Tested very slightly with "ping" and "sflowtool -t | tcpdump -r -".

23 files changed:
README
datapath/actions.c
datapath/datapath.c
datapath/datapath.h
include/openvswitch/datapath-protocol.h
lib/dpif-linux.c
lib/dpif-netdev.c
lib/dpif-provider.h
lib/dpif.c
lib/dpif.h
lib/vlog-modules.def
ofproto/automake.mk
ofproto/collectors.c
ofproto/collectors.h
ofproto/ofproto-sflow.c [new file with mode: 0644]
ofproto/ofproto-sflow.h [new file with mode: 0644]
ofproto/ofproto.c
ofproto/ofproto.h
utilities/automake.mk
vswitchd/automake.mk
vswitchd/bridge.c
vswitchd/ovs-vswitchd.8.in
vswitchd/ovs-vswitchd.conf.5.in

diff --git a/README b/README
index 7871c76..dd85be5 100644 (file)
--- a/README
+++ b/README
@@ -6,8 +6,8 @@ What is Open vSwitch?
 Open vSwitch is a multilayer software switch licensed under the open
 source Apache 2 license.  Our goal is to implement a production
 quality switch platform that supports standard management interfaces
-(e.g. NetFlow, RSPAN, ERSPAN, IOS-like CLI), and opens the forwarding
-functions to programmatic extension and control.
+(e.g. NetFlow, sFlow, RSPAN, ERSPAN, IOS-like CLI), and opens the
+forwarding functions to programmatic extension and control.
 
 Open vSwitch is well suited to function as a virtual switch in VM
 environments.  In addition to exposing standard control and visibility
@@ -20,7 +20,8 @@ The bulk of the code is written in platform-independent C and is
 easily ported to other environments.  The current release of Open
 vSwitch supports the following features:
 
-    * Visibility into inter-VM communication via NetFlow, SPAN, and RSPAN
+    * Visibility into inter-VM communication via NetFlow, sFlow, SPAN,
+      and RSPAN
     * Standard 802.1Q VLAN model with trunking
     * Per VM policing
     * NIC bonding with source-MAC load balancing
index cadab05..7c618cc 100644 (file)
@@ -366,6 +366,33 @@ output_control(struct datapath *dp, struct sk_buff *skb, u32 arg, gfp_t gfp)
        return dp_output_control(dp, skb, _ODPL_ACTION_NR, arg);
 }
 
+/* Send a copy of this packet up to the sFlow agent, along with extra
+ * information about what happened to it. */
+static void sflow_sample(struct datapath *dp, struct sk_buff *skb,
+                        const union odp_action *a, int n_actions, gfp_t gfp)
+{
+       struct odp_sflow_sample_header *hdr;
+       unsigned int actlen = n_actions * sizeof(union odp_action);
+       unsigned int hdrlen = sizeof(struct odp_sflow_sample_header);
+       struct sk_buff *nskb;
+       int i;
+
+       nskb = skb_copy_expand(skb, actlen + hdrlen, 0, gfp);
+       if (!nskb)
+               return;
+
+       memcpy(__skb_push(nskb, actlen), a, actlen);
+       hdr = (struct odp_sflow_sample_header*)__skb_push(nskb, hdrlen);
+       hdr->n_actions = n_actions;
+       hdr->sample_pool = 0;
+       for_each_possible_cpu (i) {
+               const struct dp_stats_percpu *stats;
+               stats = per_cpu_ptr(dp->stats_percpu, i);
+               hdr->sample_pool += stats->sflow_pool;
+       }
+       dp_output_control(dp, nskb, _ODPL_SFLOW_NR, 0);
+}
+
 /* Execute a list of actions against 'skb'. */
 int execute_actions(struct datapath *dp, struct sk_buff *skb,
                    struct odp_flow_key *key,
@@ -378,6 +405,19 @@ int execute_actions(struct datapath *dp, struct sk_buff *skb,
         * is slightly obscure just to avoid that. */
        int prev_port = -1;
        int err;
+
+       if (dp->sflow_probability) {
+               /* Increment sample pool. */
+               int cpu = get_cpu();
+               per_cpu_ptr(dp->stats_percpu, cpu)->sflow_pool++;
+               put_cpu();
+
+               /* Sample packet. */
+               if (dp->sflow_probability == UINT_MAX ||
+                   net_random() < dp->sflow_probability)
+                       sflow_sample(dp, skb, a, n_actions, gfp);
+       }
+
        for (; n_actions > 0; a++, n_actions--) {
                WARN_ON_ONCE(skb_shared(skb));
                if (prev_port != -1) {
index 2a8fb50..f6a02f7 100644 (file)
@@ -715,8 +715,7 @@ dp_output_control(struct datapath *dp, struct sk_buff *skb, int queue_no,
        int err;
 
        WARN_ON_ONCE(skb_shared(skb));
-       BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR);
-
+       BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR && queue_no != _ODPL_SFLOW_NR);
        queue = &dp->queues[queue_no];
        err = -ENOBUFS;
        if (skb_queue_len(queue) >= DP_MAX_QUEUE_LEN)
@@ -1393,6 +1392,7 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
        int dp_idx = iminor(f->f_dentry->d_inode);
        struct datapath *dp;
        int drop_frags, listeners, port_no;
+       unsigned int sflow_probability;
        int err;
 
        /* Handle commands with special locking requirements up front. */
@@ -1456,6 +1456,16 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
                set_listen_mask(f, listeners);
                break;
 
+       case ODP_GET_SFLOW_PROBABILITY:
+               err = put_user(dp->sflow_probability, (unsigned int __user *)argp);
+               break;
+
+       case ODP_SET_SFLOW_PROBABILITY:
+               err = get_user(sflow_probability, (unsigned int __user *)argp);
+               if (!err)
+                       dp->sflow_probability = sflow_probability;
+               break;
+
        case ODP_PORT_QUERY:
                err = query_port(dp, (struct odp_port __user *)argp);
                break;
index 9b4c438..929b9d8 100644 (file)
@@ -79,7 +79,7 @@ struct dp_bucket {
        struct sw_flow *flows[];
 };
 
-#define DP_N_QUEUES 2
+#define DP_N_QUEUES 3
 #define DP_MAX_QUEUE_LEN 100
 
 struct dp_stats_percpu {
@@ -87,6 +87,7 @@ struct dp_stats_percpu {
        u64 n_hit;
        u64 n_missed;
        u64 n_lost;
+       u64 sflow_pool;         /* Packets that could have been sampled. */
 };
 
 struct dp_port_group {
@@ -95,10 +96,29 @@ struct dp_port_group {
        u16 ports[];
 };
 
+/**
+ * struct datapath - datapath for flow-based packet switching
+ * @mutex: Mutual exclusion for ioctls.
+ * @dp_idx: Datapath number (index into the dps[] array in datapath.c).
+ * @ifobj: &struct kobject representing the datapath.
+ * @drop_frags: Drop all IP fragments if nonzero.
+ * @queues: %DP_N_QUEUES sets of queued packets for userspace to handle.
+ * @waitqueue: Waitqueue, for waiting for new packets in @queues.
+ * @n_flows: Number of flows currently in flow table.
+ * @table: Current flow table (RCU protected).
+ * @groups: Port groups, used by ODPAT_OUTPUT_GROUP action (RCU protected).
+ * @n_ports: Number of ports currently in @ports.
+ * @ports: Map from port number to &struct net_bridge_port.  %ODPP_LOCAL port
+ * always exists, other ports may be %NULL.
+ * @port_list: List of all ports in @ports in arbitrary order.
+ * @stats_percpu: Per-CPU datapath statistics.
+ * @sflow_probability: Probability of sampling a packet to the %ODPL_SFLOW
+ * queue, where 0 means never sample, UINT_MAX means always sample, and
+ * other values are intermediate probabilities.
+ */
 struct datapath {
        struct mutex mutex;
        int dp_idx;
-
        struct kobject ifobj;
 
        int drop_frags;
@@ -117,10 +137,13 @@ struct datapath {
        /* Switch ports. */
        unsigned int n_ports;
        struct net_bridge_port *ports[DP_MAX_PORTS];
-       struct list_head port_list; /* All ports, including local_port. */
+       struct list_head port_list;
 
        /* Stats. */
        struct dp_stats_percpu *stats_percpu;
+
+       /* sFlow Sampling */
+       unsigned int sflow_probability;
 };
 
 struct net_bridge_port {
index ab7eb9e..2ae0c82 100644 (file)
@@ -77,6 +77,9 @@
 
 #define ODP_EXECUTE             _IOR('O', 18, struct odp_execute)
 
+#define ODP_SET_SFLOW_PROBABILITY _IOR('O', 20, int)
+#define ODP_GET_SFLOW_PROBABILITY _IOW('O', 21, int)
+
 struct odp_stats {
     /* Flows. */
     __u32 n_flows;              /* Number of flows in flow table. */
@@ -98,6 +101,7 @@ struct odp_stats {
     /* Queues. */
     __u16 max_miss_queue;       /* Max length of ODPL_MISS queue. */
     __u16 max_action_queue;     /* Max length of ODPL_ACTION queue. */
+    __u16 max_sflow_queue;      /* Max length of ODPL_SFLOW queue. */
 };
 
 /* Logical ports. */
@@ -109,7 +113,9 @@ struct odp_stats {
 #define ODPL_MISS       (1 << _ODPL_MISS_NR)
 #define _ODPL_ACTION_NR 1       /* Packet output to ODPP_CONTROLLER. */
 #define ODPL_ACTION     (1 << _ODPL_ACTION_NR)
-#define ODPL_ALL        (ODPL_MISS | ODPL_ACTION)
+#define _ODPL_SFLOW_NR  2       /* sFlow samples. */
+#define ODPL_SFLOW      (1 << _ODPL_SFLOW_NR)
+#define ODPL_ALL        (ODPL_MISS | ODPL_ACTION | ODPL_SFLOW)
 
 /* Format of messages read from datapath fd. */
 struct odp_msg {
@@ -118,7 +124,23 @@ struct odp_msg {
     __u16 port;                 /* Port on which frame was received. */
     __u16 reserved;
     __u32 arg;                  /* Argument value specified in action. */
-    /* Followed by packet data. */
+
+    /*
+     * Followed by:
+     *
+     * ODPL_MISS, ODPL_ACTION: packet data.
+     *
+     * ODPL_SFLOW: "struct odp_sflow_sample_header", followed by
+     *   an array of "union odp_action"s, followed by packet data.
+     */
+};
+
+/* Header added to sFlow sampled packet. */
+struct odp_sflow_sample_header {
+    __u64 sample_pool;          /* Number of potentially sampled packets. */
+    __u32 n_actions;            /* Number of following "union odp_action"s. */
+    __u32 reserved;             /* Pad up to 64-bit boundary. */
+    /* Followed by n_action "union odp_action"s. */
 };
 
 #define ODP_PORT_INTERNAL (1 << 0) /* This port is simulated. */
index 2bf329f..b7ba306 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -196,6 +196,7 @@ dpif_linux_delete(struct dpif *dpif_)
 static int
 dpif_linux_get_stats(const struct dpif *dpif_, struct odp_stats *stats)
 {
+    memset(stats, 0, sizeof *stats);
     return do_ioctl(dpif_, ODP_DP_STATS, stats);
 }
 
@@ -395,6 +396,19 @@ dpif_linux_recv_set_mask(struct dpif *dpif_, int listen_mask)
     return do_ioctl(dpif_, ODP_SET_LISTEN_MASK, &listen_mask);
 }
 
+static int
+dpif_linux_get_sflow_probability(const struct dpif *dpif_,
+                                 uint32_t *probability)
+{
+    return do_ioctl(dpif_, ODP_GET_SFLOW_PROBABILITY, probability);
+}
+
+static int
+dpif_linux_set_sflow_probability(struct dpif *dpif_, uint32_t probability)
+{
+    return do_ioctl(dpif_, ODP_SET_SFLOW_PROBABILITY, &probability);
+}
+
 static int
 dpif_linux_recv(struct dpif *dpif_, struct ofpbuf **bufp)
 {
@@ -475,6 +489,8 @@ const struct dpif_class dpif_linux_class = {
     dpif_linux_execute,
     dpif_linux_recv_get_mask,
     dpif_linux_recv_set_mask,
+    dpif_linux_get_sflow_probability,
+    dpif_linux_set_sflow_probability,
     dpif_linux_recv,
     dpif_linux_recv_wait,
 };
index c4b5a99..720e8cb 100644 (file)
@@ -1333,6 +1333,8 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_execute,
     dpif_netdev_recv_get_mask,
     dpif_netdev_recv_set_mask,
+    NULL,                       /* get_sflow_probability */
+    NULL,                       /* set_sflow_probability */
     dpif_netdev_recv,
     dpif_netdev_recv_wait,
 };
index 020e017..eade868 100644 (file)
@@ -278,6 +278,23 @@ struct dpif_class {
      * corresponding type when it calls the recv member function. */
     int (*recv_set_mask)(struct dpif *dpif, int listen_mask);
 
+    /* Retrieves 'dpif''s sFlow sampling probability into '*probability'.
+     * Return value is 0 or a positive errno value.  EOPNOTSUPP indicates that
+     * the datapath does not support sFlow, as does a null pointer.
+     *
+     * A probability of 0 means sample no packets, UINT32_MAX means sample
+     * every packet, and other values are intermediate probabilities. */
+    int (*get_sflow_probability)(const struct dpif *dpif,
+                                 uint32_t *probability);
+
+    /* Sets 'dpif''s sFlow sampling probability to 'probability'.  Return value
+     * is 0 or a positive errno value.  EOPNOTSUPP indicates that the datapath
+     * does not support sFlow, as does a null pointer.
+     *
+     * A probability of 0 means sample no packets, UINT32_MAX means sample
+     * every packet, and other values are intermediate probabilities. */
+    int (*set_sflow_probability)(struct dpif *dpif, uint32_t probability);
+
     /* Attempts to receive a message from 'dpif'.  If successful, stores the
      * message into '*packetp'.  The message, if one is received, must begin
      * with 'struct odp_msg' as a header.  Only messages of the types selected
index 793eaa1..8bca124 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -298,6 +298,7 @@ dpif_set_drop_frags(struct dpif *dpif, bool drop_frags)
     return error;
 }
 
+
 /* Attempts to add 'devname' as a port on 'dpif', given the combination of
  * ODP_PORT_* flags in 'flags'.  If successful, returns 0 and sets '*port_nop'
  * to the new port's port number (if 'port_nop' is non-null).  On failure,
@@ -844,6 +845,41 @@ dpif_recv_set_mask(struct dpif *dpif, int listen_mask)
     return error;
 }
 
+/* Retrieve the sFlow sampling probability.  A probability of 0 means sample no
+ * packets, UINT32_MAX means sample every packet, and other values are
+ * intermediate probabilities.
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  EOPNOTSUPP
+ * indicates that 'dpif' does not support sFlow sampling. */
+int
+dpif_get_sflow_probability(const struct dpif *dpif, uint32_t *probability)
+{
+    int error = (dpif->class->get_sflow_probability
+                 ? dpif->class->get_sflow_probability(dpif, probability)
+                 : EOPNOTSUPP);
+    if (error) {
+        *probability = 0;
+    }
+    log_operation(dpif, "get_sflow_probability", error);
+    return error;
+}
+
+/* Set the sFlow sampling probability.  A probability of 0 means sample no
+ * packets, UINT32_MAX means sample every packet, and other values are
+ * intermediate probabilities.
+ *
+ * Returns 0 if successful, otherwise a positive errno value.  EOPNOTSUPP
+ * indicates that 'dpif' does not support sFlow sampling. */
+int
+dpif_set_sflow_probability(struct dpif *dpif, uint32_t probability)
+{
+    int error = (dpif->class->set_sflow_probability
+                 ? dpif->class->set_sflow_probability(dpif, probability)
+                 : EOPNOTSUPP);
+    log_operation(dpif, "set_sflow_probability", error);
+    return error;
+}
+
 /* Attempts to receive a message from 'dpif'.  If successful, stores the
  * message into '*packetp'.  The message, if one is received, will begin with
  * 'struct odp_msg' as a header.  Only messages of the types selected with
@@ -867,6 +903,7 @@ dpif_recv(struct dpif *dpif, struct ofpbuf **packetp)
                         "%zu on port %"PRIu16": %s", dpif_name(dpif),
                         (msg->type == _ODPL_MISS_NR ? "miss"
                          : msg->type == _ODPL_ACTION_NR ? "action"
+                         : msg->type == _ODPL_SFLOW_NR ? "sFlow"
                          : "<unknown>"),
                         payload_len, msg->port, s);
             free(s);
@@ -893,7 +930,7 @@ dpif_recv_purge(struct dpif *dpif)
         return error;
     }
 
-    for (i = 0; i < stats.max_miss_queue + stats.max_action_queue; i++) {
+    for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) {
         struct ofpbuf *buf;
         error = dpif_recv(dpif, &buf);
         if (error) {
index 1d109c2..bf3c648 100644 (file)
@@ -84,6 +84,8 @@ int dpif_execute(struct dpif *, uint16_t in_port,
 
 int dpif_recv_get_mask(const struct dpif *, int *listen_mask);
 int dpif_recv_set_mask(struct dpif *, int listen_mask);
+int dpif_get_sflow_probability(const struct dpif *, uint32_t *probability);
+int dpif_set_sflow_probability(struct dpif *, uint32_t probability);
 int dpif_recv(struct dpif *, struct ofpbuf **);
 int dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *);
index 0d44e73..b791525 100644 (file)
@@ -61,6 +61,7 @@ VLOG_MODULE(proc_net_compat)
 VLOG_MODULE(process)
 VLOG_MODULE(rconn)
 VLOG_MODULE(rtnetlink)
+VLOG_MODULE(sflow)
 VLOG_MODULE(stp)
 VLOG_MODULE(stats)
 VLOG_MODULE(status)
index 87a0fa6..3c18977 100644 (file)
@@ -21,6 +21,8 @@ ofproto_libofproto_a_SOURCES = \
        ofproto/netflow.h \
        ofproto/ofproto.c \
        ofproto/ofproto.h \
+       ofproto/ofproto-sflow.c \
+       ofproto/ofproto-sflow.h \
        ofproto/pktbuf.c \
        ofproto/pktbuf.h \
        ofproto/pinsched.c \
index f7cb1db..4589f32 100644 (file)
@@ -121,3 +121,9 @@ collectors_send(const struct collectors *c, const void *payload, size_t n)
         }
     }
 }
+
+int
+collectors_count(const struct collectors *c)
+{
+    return c->n_fds;
+}
index a4abb63..ac70f37 100644 (file)
@@ -28,4 +28,6 @@ void collectors_destroy(struct collectors *);
 
 void collectors_send(const struct collectors *, const void *, size_t);
 
+int collectors_count(const struct collectors *);
+
 #endif /* collectors.h */
diff --git a/ofproto/ofproto-sflow.c b/ofproto/ofproto-sflow.c
new file mode 100644 (file)
index 0000000..608abe9
--- /dev/null
@@ -0,0 +1,567 @@
+/*
+ * Copyright (c) 2009, 2010 InMon Corp.
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "ofproto-sflow.h"
+#include <inttypes.h>
+#include <stdlib.h>
+#include "collectors.h"
+#include "dpif.h"
+#include "compiler.h"
+#include "netdev.h"
+#include "ofpbuf.h"
+#include "ofproto.h"
+#include "poll-loop.h"
+#include "port-array.h"
+#include "sflow_api.h"
+#include "socket-util.h"
+#include "timeval.h"
+
+#define THIS_MODULE VLM_sflow
+#include "vlog.h"
+
+struct ofproto_sflow_port {
+    struct netdev *netdev;      /* Underlying network device, for stats. */
+    SFLDataSource_instance dsi; /* sFlow library's notion of port number. */
+};
+
+struct ofproto_sflow {
+    struct ofproto *ofproto;
+    struct collectors *collectors;
+    SFLAgent *sflow_agent;
+    struct ofproto_sflow_options *options;
+    struct dpif *dpif;
+    time_t next_tick;
+    size_t n_flood, n_all;
+    struct port_array ports;    /* Indexed by ODP port number. */
+};
+
+#define RECEIVER_INDEX 1
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+static bool
+ofproto_sflow_options_equal(const struct ofproto_sflow_options *a,
+                            const struct ofproto_sflow_options *b)
+{
+    return (svec_equal(&a->targets, &b->targets)
+            && a->sampling_rate == b->sampling_rate
+            && a->polling_interval == b->polling_interval
+            && a->header_len == b->header_len
+            && a->sub_id == b->sub_id
+            && !strcmp(a->agent_device, b->agent_device)
+            && !strcmp(a->control_ip, b->control_ip));
+}
+
+static struct ofproto_sflow_options *
+ofproto_sflow_options_clone(const struct ofproto_sflow_options *old)
+{
+    struct ofproto_sflow_options *new = xmemdup(old, sizeof *old);
+    new->agent_device = old->agent_device ? xstrdup(old->agent_device) : NULL;
+    new->control_ip = old->control_ip ? xstrdup(old->control_ip) : NULL;
+    return new;
+}
+
+static void
+ofproto_sflow_options_destroy(struct ofproto_sflow_options *options)
+{
+    if (options) {
+        free(options->agent_device);
+        free(options->control_ip);
+        free(options);
+    }
+}
+
+/* sFlow library callback to allocate memory. */
+static void *
+sflow_agent_alloc_cb(void *magic UNUSED, SFLAgent *agent UNUSED, size_t bytes)
+{
+    return calloc(1, bytes);
+}
+
+/* sFlow library callback to free memory. */
+static int
+sflow_agent_free_cb(void *magic UNUSED, SFLAgent *agent UNUSED, void *obj)
+{
+    free(obj);
+    return 0;
+}
+
+/* sFlow library callback to report error. */
+static void
+sflow_agent_error_cb(void *magic UNUSED, SFLAgent *agent UNUSED, char *msg)
+{
+    VLOG_WARN("sFlow agent error: %s", msg);
+}
+
+/* sFlow library callback to send datagram. */
+static void
+sflow_agent_send_packet_cb(void *os_, SFLAgent *agent UNUSED,
+                           SFLReceiver *receiver UNUSED, u_char *pkt,
+                           uint32_t pktLen)
+{
+    struct ofproto_sflow *os = os_;
+    collectors_send(os->collectors, pkt, pktLen);
+}
+
+static void
+sflow_agent_get_counters(void *os_, SFLPoller *poller,
+                         SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+    struct ofproto_sflow *os = os_;
+    SFLCounters_sample_element elem;
+    struct ofproto_sflow_port *osp;
+    SFLIf_counters *counters;
+    struct netdev_stats stats;
+    enum netdev_flags flags;
+    uint32_t current;
+
+    osp = port_array_get(&os->ports, poller->bridgePort);
+    if (!osp) {
+        return;
+    }
+
+    elem.tag = SFLCOUNTERS_GENERIC;
+    counters = &elem.counterBlock.generic;
+    counters->ifIndex = SFL_DS_INDEX(poller->dsi);
+    counters->ifType = 6;
+    if (!netdev_get_features(osp->netdev, &current, NULL, NULL, NULL)) {
+        counters->ifSpeed = netdev_features_to_bps(current);
+        counters->ifDirection = (netdev_features_is_full_duplex(current)
+                                 ? 1 : 2);
+    } else {
+        counters->ifSpeed = 100000000;
+        counters->ifDirection = 1;
+    }
+    if (!netdev_get_flags(osp->netdev, &flags) && flags & NETDEV_UP) {
+        bool carrier;
+
+        counters->ifStatus = 1; /* ifAdminStatus up. */
+        if (!netdev_get_carrier(osp->netdev, &carrier) && carrier) {
+            counters->ifStatus |= 2; /* ifOperStatus us. */
+        }
+    } else {
+        counters->ifStatus = 0;  /* Down. */
+    }
+
+    /* XXX
+       1. Is the multicast counter filled in?
+       2. Does the multicast counter include broadcasts?
+       3. Does the rx_packets counter include multicasts/broadcasts?
+    */
+    netdev_get_stats(osp->netdev, &stats);
+    counters->ifInOctets = stats.rx_bytes;
+    counters->ifInUcastPkts = stats.rx_packets;
+    counters->ifInMulticastPkts = stats.multicast;
+    counters->ifInBroadcastPkts = -1;
+    counters->ifInDiscards = stats.rx_dropped;
+    counters->ifInErrors = stats.rx_errors;
+    counters->ifInUnknownProtos = -1;
+    counters->ifOutOctets = stats.tx_bytes;
+    counters->ifOutUcastPkts = stats.tx_packets;
+    counters->ifOutMulticastPkts = -1;
+    counters->ifOutBroadcastPkts = -1;
+    counters->ifOutDiscards = stats.tx_dropped;
+    counters->ifOutErrors = stats.tx_errors;
+    counters->ifPromiscuousMode = 0;
+
+    SFLADD_ELEMENT(cs, &elem);
+    sfl_poller_writeCountersSample(poller, cs);
+}
+
+/* Obtains an address to use for the local sFlow agent and stores it into
+ * '*agent_addr'.  Returns true if successful, false on failure.
+ *
+ * The sFlow agent address should be a local IP address that is persistent and
+ * reachable over the network, if possible.  The IP address associated with
+ * 'agent_device' is used if it has one, and otherwise 'control_ip', the IP
+ * address used to talk to the controller. */
+static bool
+sflow_choose_agent_address(const char *agent_device, const char *control_ip,
+                           SFLAddress *agent_addr)
+{
+    struct in_addr in4;
+
+    memset(agent_addr, 0, sizeof *agent_addr);
+    agent_addr->type = SFLADDRESSTYPE_IP_V4;
+
+    if (agent_device) {
+        struct netdev *netdev;
+
+        if (!netdev_open(agent_device, NETDEV_ETH_TYPE_NONE, &netdev)) {
+            int error = netdev_get_in4(netdev, &in4, NULL);
+            netdev_close(netdev);
+            if (!error) {
+                goto success;
+            }
+        }
+    }
+
+    if (control_ip && !lookup_ip(control_ip, &in4)) {
+        goto success;
+    }
+
+    VLOG_ERR("could not determine IP address for sFlow agent");
+    return false;
+
+success:
+    agent_addr->address.ip_v4.addr = in4.s_addr;
+    return true;
+}
+
+void
+ofproto_sflow_clear(struct ofproto_sflow *os)
+{
+    struct ofproto_sflow_port *osp;
+    unsigned int odp_port;
+
+    if (os->sflow_agent) {
+        sfl_agent_release(os->sflow_agent);
+        os->sflow_agent = NULL;
+    }
+    collectors_destroy(os->collectors);
+    os->collectors = NULL;
+    ofproto_sflow_options_destroy(os->options);
+    os->options = NULL;
+
+    PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) {
+        ofproto_sflow_del_port(os, odp_port);
+    }
+    port_array_clear(&os->ports);
+
+    /* Turn off sampling to save CPU cycles. */
+    dpif_set_sflow_probability(os->dpif, 0);
+}
+
+bool
+ofproto_sflow_is_enabled(const struct ofproto_sflow *os)
+{
+    return os->collectors != NULL;
+}
+
+struct ofproto_sflow *
+ofproto_sflow_create(struct dpif *dpif)
+{
+    struct ofproto_sflow *os;
+
+    os = xcalloc(1, sizeof *os);
+    os->dpif = dpif;
+    os->next_tick = time_now() + 1;
+    port_array_init(&os->ports);
+    return os;
+}
+
+void
+ofproto_sflow_destroy(struct ofproto_sflow *os)
+{
+    if (os) {
+        ofproto_sflow_clear(os);
+        port_array_destroy(&os->ports);
+        free(os);
+    }
+}
+
+static void
+ofproto_sflow_add_poller(struct ofproto_sflow *os,
+                         struct ofproto_sflow_port *osp, uint16_t odp_port)
+{
+    SFLPoller *poller = sfl_agent_addPoller(os->sflow_agent, &osp->dsi, os,
+                                            sflow_agent_get_counters);
+    sfl_poller_set_sFlowCpInterval(poller, os->options->polling_interval);
+    sfl_poller_set_sFlowCpReceiver(poller, RECEIVER_INDEX);
+    sfl_poller_set_bridgePort(poller, odp_port);
+}
+
+void
+ofproto_sflow_add_port(struct ofproto_sflow *os, uint16_t odp_port,
+                       const char *netdev_name)
+{
+    struct ofproto_sflow_port *osp;
+    struct netdev *netdev;
+    uint32_t ifindex;
+    int error;
+
+    ofproto_sflow_del_port(os, odp_port);
+
+    /* Open network device. */
+    error = netdev_open(netdev_name, NETDEV_ETH_TYPE_NONE, &netdev);
+    if (error) {
+        VLOG_WARN_RL(&rl, "failed to open network device \"%s\": %s",
+                     netdev_name, strerror(error));
+        return;
+    }
+
+    /* Add to table of ports. */
+    osp = xmalloc(sizeof *osp);
+    osp->netdev = netdev;
+    ifindex = netdev_get_ifindex(netdev);
+    if (ifindex <= 0) {
+        ifindex = (os->sflow_agent->subId << 16) + odp_port;
+    }
+    SFL_DS_SET(osp->dsi, 0, ifindex, 0);
+    port_array_set(&os->ports, odp_port, osp);
+
+    /* Add poller. */
+    if (os->sflow_agent) {
+        ofproto_sflow_add_poller(os, osp, odp_port);
+    }
+}
+
+void
+ofproto_sflow_del_port(struct ofproto_sflow *os, uint16_t odp_port)
+{
+    struct ofproto_sflow_port *osp = port_array_get(&os->ports, odp_port);
+    if (osp) {
+        if (os->sflow_agent) {
+            sfl_agent_removePoller(os->sflow_agent, &osp->dsi);
+        }
+        netdev_close(osp->netdev);
+        free(osp);
+        port_array_set(&os->ports, odp_port, NULL);
+    }
+}
+
+void
+ofproto_sflow_set_options(struct ofproto_sflow *os,
+                          const struct ofproto_sflow_options *options)
+{
+    struct ofproto_sflow_port *osp;
+    SFLDataSource_instance dsi;
+    bool options_changed;
+    SFLSampler *sampler;
+    SFLReceiver *receiver;
+    unsigned int odp_port;
+    SFLAddress agentIP;
+    time_t now;
+    int error;
+
+    options_changed = (!os->options
+                       || !ofproto_sflow_options_equal(options, os->options));
+
+    /* Configure collectors if options have changed or if we're shortchanged in
+     * collectors (which indicates that opening one or more of the configured
+     * collectors failed, so that we should retry). */
+    if (options_changed
+        || collectors_count(os->collectors) < options->targets.n) {
+        collectors_destroy(os->collectors);
+        error = collectors_create(&options->targets,
+                                  SFL_DEFAULT_COLLECTOR_PORT, &os->collectors);
+        if (os->collectors == NULL) {
+            VLOG_WARN_RL(&rl, "no configured collectors, sFlow disabled");
+            ofproto_sflow_clear(os);
+            return;
+        }
+    }
+
+    /* Avoid reconfiguring if options didn't change. */
+    if (!options_changed) {
+        return;
+    }
+    ofproto_sflow_options_destroy(os->options);
+    os->options = ofproto_sflow_options_clone(options);
+
+    /* Choose agent IP address. */
+    if (!sflow_choose_agent_address(options->agent_device,
+                                    options->control_ip, &agentIP)) {
+        ofproto_sflow_clear(os);
+        return;
+    }
+
+    /* Create agent. */
+    VLOG_INFO("creating sFlow agent %d", options->sub_id);
+    if (os->sflow_agent) {
+        sfl_agent_release(os->sflow_agent);
+    }
+    os->sflow_agent = xcalloc(1, sizeof *os->sflow_agent);
+    now = time_now();
+    sfl_agent_init(os->sflow_agent,
+                   &agentIP,
+                   options->sub_id,
+                   now,         /* Boot time. */
+                   now,         /* Current time. */
+                   os,          /* Pointer supplied to callbacks. */
+                   sflow_agent_alloc_cb,
+                   sflow_agent_free_cb,
+                   sflow_agent_error_cb,
+                   sflow_agent_send_packet_cb);
+
+    receiver = sfl_agent_addReceiver(os->sflow_agent);
+    sfl_receiver_set_sFlowRcvrOwner(receiver, "OpenVSwitch sFlow");
+    sfl_receiver_set_sFlowRcvrTimeout(receiver, 0xffffffff);
+
+    /* Add a single sampler to represent the whole switch (special <ifIndex>:0
+     * datasource).  The alternative is to model a physical switch more closely
+     * and instantiate a separate sampler object for each interface, but then
+     * unicasts would have to be offered to two samplers, and
+     * broadcasts/multicasts would have to be offered to all of them.  Doing it
+     * this way with a single <ifindex>:0 sampler is much more efficient for a
+     * virtual switch, and is allowed by the sFlow standard.
+     */
+    SFL_DS_SET(dsi, 0, 0, 0);
+    sampler = sfl_agent_addSampler(os->sflow_agent, &dsi);
+    sfl_sampler_set_sFlowFsReceiver(sampler, RECEIVER_INDEX);
+    sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, options->sampling_rate);
+    sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, options->header_len);
+
+    /* Set the sampling_rate down in the datapath. */
+    dpif_set_sflow_probability(os->dpif,
+                               MAX(1, UINT32_MAX / options->sampling_rate));
+
+    /* Add the currently known ports. */
+    PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) {
+        ofproto_sflow_add_poller(os, osp, odp_port);
+    }
+}
+
+void
+ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg)
+{
+    SFL_FLOW_SAMPLE_TYPE fs;
+    SFLFlow_sample_element hdrElem;
+    SFLSampled_header *header;
+    SFLFlow_sample_element switchElem;
+    SFLSampler *sampler = os->sflow_agent->samplers;
+    const struct odp_sflow_sample_header *hdr;
+    const union odp_action *actions;
+    struct ofpbuf payload;
+    size_t n_actions, n_outputs;
+    size_t min_size;
+    flow_t flow;
+    size_t i;
+
+    /* Get odp_sflow_sample_header. */
+    min_size = sizeof *msg + sizeof *hdr;
+    if (min_size > msg->length) {
+        VLOG_WARN_RL(&rl, "sFlow packet too small (%"PRIu32" < %zu)",
+                     msg->length, min_size);
+        return;
+    }
+    hdr = (const struct odp_sflow_sample_header *) (msg + 1);
+
+    /* Get actions. */
+    n_actions = hdr->n_actions;
+    if (n_actions > 65536 / sizeof *actions) {
+        VLOG_WARN_RL(&rl, "too many actions in sFlow packet (%"PRIu32" > %zu)",
+                     65536 / sizeof *actions, n_actions);
+        return;
+    }
+    min_size += n_actions * sizeof *actions;
+    if (min_size > msg->length) {
+        VLOG_WARN_RL(&rl, "sFlow packet with %zu actions too small "
+                     "(%"PRIu32" < %zu)",
+                     n_actions, msg->length, min_size);
+        return;
+    }
+    actions = (const union odp_action *) (hdr + 1);
+
+    /* Get packet payload and extract flow. */
+    payload.data = (union odp_action *) (actions + n_actions);
+    payload.size = msg->length - min_size;
+    flow_extract(&payload, msg->port, &flow);
+
+    /* Build a flow sample */
+    memset(&fs, 0, sizeof fs);
+    fs.input = msg->port == ODPP_LOCAL ? 0x3fffffff : msg->port;
+    fs.output = 0;              /* Filled in correctly below. */
+    fs.sample_pool = hdr->sample_pool;
+
+    /* Sampled header. */
+    memset(&hdrElem, 0, sizeof hdrElem);
+    hdrElem.tag = SFLFLOW_HEADER;
+    header = &hdrElem.flowType.header;
+    header->header_protocol = SFLHEADER_ETHERNET_ISO8023;
+    header->frame_length = payload.size;
+    header->stripped = 4; /* Ethernet FCS stripped off. */
+    header->header_length = MIN(payload.size,
+                                sampler->sFlowFsMaximumHeaderSize);
+    header->header_bytes = payload.data;
+
+    /* Add extended switch element. */
+    memset(&switchElem, 0, sizeof(switchElem));
+    switchElem.tag = SFLFLOW_EX_SWITCH;
+    switchElem.flowType.sw.src_vlan = flow.dl_vlan;
+    switchElem.flowType.sw.src_priority = -1; /* XXX */
+    switchElem.flowType.sw.dst_vlan = -1;     /* Filled in correctly below. */
+    switchElem.flowType.sw.dst_priority = switchElem.flowType.sw.src_priority;
+
+    /* Figure out the output ports. */
+    n_outputs = 0;
+    for (i = 0; i < n_actions; i++) {
+        const union odp_action *a = &actions[i];
+
+        switch (a->type) {
+        case ODPAT_OUTPUT:
+            fs.output = a->output.port;
+            n_outputs++;
+            break;
+
+        case ODPAT_OUTPUT_GROUP:
+            n_outputs += (a->output_group.group == DP_GROUP_FLOOD ? os->n_flood
+                          : a->output_group.group == DP_GROUP_ALL ? os->n_all
+                          : 0);
+            break;
+
+        case ODPAT_SET_VLAN_VID:
+            switchElem.flowType.sw.dst_vlan = a->vlan_vid.vlan_vid;
+            break;
+
+        case ODPAT_SET_VLAN_PCP:
+            switchElem.flowType.sw.dst_priority = a->vlan_pcp.vlan_pcp;
+            break;
+
+        default:
+            break;
+        }
+    }
+    if (n_outputs > 1 || !fs.output) {
+        /* Setting the high bit means "multiple output ports". */
+        fs.output = 0x80000000 | n_outputs;
+    }
+
+    /* Submit the flow sample to be encoded into the next datagram. */
+    SFLADD_ELEMENT(&fs, &hdrElem);
+    SFLADD_ELEMENT(&fs, &switchElem);
+    sfl_sampler_writeFlowSample(sampler, &fs);
+}
+
+void
+ofproto_sflow_set_group_sizes(struct ofproto_sflow *os,
+                              size_t n_flood, size_t n_all)
+{
+    os->n_flood = n_flood;
+    os->n_all = n_all;
+}
+
+void
+ofproto_sflow_run(struct ofproto_sflow *os)
+{
+    if (ofproto_sflow_is_enabled(os)) {
+        time_t now = time_now();
+        if (now >= os->next_tick) {
+            sfl_agent_tick(os->sflow_agent, now);
+            os->next_tick = now + 1;
+        }
+    }
+}
+
+void
+ofproto_sflow_wait(struct ofproto_sflow *os)
+{
+    if (ofproto_sflow_is_enabled(os)) {
+        poll_timer_wait(os->next_tick * 1000 - time_msec());
+    }
+}
diff --git a/ofproto/ofproto-sflow.h b/ofproto/ofproto-sflow.h
new file mode 100644 (file)
index 0000000..ec86d11
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2009 InMon Corp.
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OFPROTO_SFLOW_H
+#define OFPROTO_SFLOW_H 1
+
+#include <stdint.h>
+#include "svec.h"
+
+struct dpif;
+struct odp_msg;
+struct ofproto_sflow_options;
+
+struct ofproto_sflow *ofproto_sflow_create(struct dpif *);
+void ofproto_sflow_destroy(struct ofproto_sflow *);
+void ofproto_sflow_set_options(struct ofproto_sflow *,
+                               const struct ofproto_sflow_options *);
+void ofproto_sflow_clear(struct ofproto_sflow *);
+bool ofproto_sflow_is_enabled(const struct ofproto_sflow *);
+
+void ofproto_sflow_add_port(struct ofproto_sflow *, uint16_t odp_port,
+                            const char *netdev_name);
+void ofproto_sflow_del_port(struct ofproto_sflow *, uint16_t odp_port);
+void ofproto_sflow_set_group_sizes(struct ofproto_sflow *,
+                                   size_t n_flood, size_t n_all);
+
+void ofproto_sflow_run(struct ofproto_sflow *);
+void ofproto_sflow_wait(struct ofproto_sflow *);
+
+void ofproto_sflow_received(struct ofproto_sflow *, struct odp_msg *);
+
+#endif /* ofproto/ofproto-sflow.h */
index 4995bbe..43054fa 100644 (file)
@@ -35,6 +35,7 @@
 #include "netflow.h"
 #include "odp-util.h"
 #include "ofp-print.h"
+#include "ofproto-sflow.h"
 #include "ofpbuf.h"
 #include "openflow/nicira-ext.h"
 #include "openflow/openflow.h"
 #define THIS_MODULE VLM_ofproto
 #include "vlog.h"
 
-enum {
-    DP_GROUP_FLOOD = 0,
-    DP_GROUP_ALL = 1
-};
+#include "sflow_api.h"
 
 enum {
     TABLEID_HASH = 0,
@@ -209,6 +207,7 @@ struct ofproto {
     struct pinsched *miss_sched, *action_sched;
     struct executer *executer;
     struct netflow *netflow;
+    struct ofproto_sflow *sflow;
 
     /* Flow table. */
     struct classifier cls;
@@ -253,7 +252,8 @@ static void handle_odp_msg(struct ofproto *, struct ofpbuf *);
 static void handle_openflow(struct ofconn *, struct ofproto *,
                             struct ofpbuf *);
 
-static void refresh_port_group(struct ofproto *, unsigned int group);
+static void refresh_port_groups(struct ofproto *);
+
 static void update_port(struct ofproto *, const char *devname);
 static int init_ports(struct ofproto *);
 static void reinit_ports(struct ofproto *);
@@ -282,7 +282,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux,
         dpif_close(dpif);
         return error;
     }
-    error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION);
+    error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION | ODPL_SFLOW);
     if (error) {
         VLOG_ERR("failed to listen on datapath %s: %s",
                  datapath, strerror(error));
@@ -316,6 +316,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux,
     p->miss_sched = p->action_sched = NULL;
     p->executer = NULL;
     p->netflow = NULL;
+    p->sflow = NULL;
 
     /* Initialize flow table. */
     classifier_init(&p->cls);
@@ -548,6 +549,30 @@ ofproto_set_netflow(struct ofproto *ofproto,
     }
 }
 
+void
+ofproto_set_sflow(struct ofproto *ofproto,
+                  const struct ofproto_sflow_options *oso)
+{
+    struct ofproto_sflow *os = ofproto->sflow;
+    if (oso) {
+        if (!os) {
+            struct ofport *ofport;
+            unsigned int odp_port;
+
+            os = ofproto->sflow = ofproto_sflow_create(ofproto->dpif);
+            refresh_port_groups(ofproto);
+            PORT_ARRAY_FOR_EACH (ofport, &ofproto->ports, odp_port) {
+                ofproto_sflow_add_port(os, odp_port,
+                                       netdev_get_name(ofport->netdev));
+            }
+        }
+        ofproto_sflow_set_options(os, oso);
+    } else {
+        ofproto_sflow_destroy(os);
+        ofproto->sflow = NULL;
+    }
+}
+
 void
 ofproto_set_failure(struct ofproto *ofproto, bool fail_open)
 {
@@ -718,6 +743,7 @@ ofproto_destroy(struct ofproto *p)
     pinsched_destroy(p->action_sched);
     executer_destroy(p->executer);
     netflow_destroy(p->netflow);
+    ofproto_sflow_destroy(p->sflow);
 
     switch_status_unregister(p->ss_cat);
 
@@ -870,6 +896,9 @@ ofproto_run1(struct ofproto *p)
     if (p->netflow) {
         netflow_run(p->netflow);
     }
+    if (p->sflow) {
+        ofproto_sflow_run(p->sflow);
+    }
 
     return 0;
 }
@@ -926,6 +955,9 @@ ofproto_wait(struct ofproto *p)
     if (p->executer) {
         executer_wait(p->executer);
     }
+    if (p->sflow) {
+        ofproto_sflow_wait(p->sflow);
+    }
     if (!tag_set_is_empty(&p->revalidate_set)) {
         poll_immediate_wake();
     }
@@ -1066,7 +1098,7 @@ reinit_ports(struct ofproto *p)
     svec_destroy(&devnames);
 }
 
-static void
+static size_t
 refresh_port_group(struct ofproto *p, unsigned int group)
 {
     uint16_t *ports;
@@ -1085,13 +1117,18 @@ refresh_port_group(struct ofproto *p, unsigned int group)
     }
     dpif_port_group_set(p->dpif, group, ports, n_ports);
     free(ports);
+
+    return n_ports;
 }
 
 static void
 refresh_port_groups(struct ofproto *p)
 {
-    refresh_port_group(p, DP_GROUP_FLOOD);
-    refresh_port_group(p, DP_GROUP_ALL);
+    size_t n_flood = refresh_port_group(p, DP_GROUP_FLOOD);
+    size_t n_all = refresh_port_group(p, DP_GROUP_ALL);
+    if (p->sflow) {
+        ofproto_sflow_set_group_sizes(p->sflow, n_flood, n_all);
+    }
 }
 
 static struct ofport *
@@ -1190,19 +1227,29 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
 static void
 ofport_install(struct ofproto *p, struct ofport *ofport)
 {
+    uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no);
+    const char *netdev_name = (const char *) ofport->opp.name;
+
     netdev_monitor_add(p->netdev_monitor, ofport->netdev);
-    port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no),
-                   ofport);
-    shash_add(&p->port_by_name, (char *) ofport->opp.name, ofport);
+    port_array_set(&p->ports, odp_port, ofport);
+    shash_add(&p->port_by_name, netdev_name, ofport);
+    if (p->sflow) {
+        ofproto_sflow_add_port(p->sflow, odp_port, netdev_name);
+    }
 }
 
 static void
 ofport_remove(struct ofproto *p, struct ofport *ofport)
 {
+    uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no);
+
     netdev_monitor_remove(p->netdev_monitor, ofport->netdev);
-    port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no), NULL);
+    port_array_set(&p->ports, odp_port, NULL);
     shash_delete(&p->port_by_name,
                  shash_find(&p->port_by_name, (char *) ofport->opp.name));
+    if (p->sflow) {
+        ofproto_sflow_del_port(p->sflow, odp_port);
+    }
 }
 
 static void
@@ -2291,7 +2338,7 @@ update_port_config(struct ofproto *p, struct ofport *port,
 #undef REVALIDATE_BITS
     if (mask & OFPPC_NO_FLOOD) {
         port->opp.config ^= OFPPC_NO_FLOOD;
-        refresh_port_group(p, DP_GROUP_FLOOD);
+        refresh_port_groups(p);
     }
     if (mask & OFPPC_NO_PACKET_IN) {
         port->opp.config ^= OFPPC_NO_PACKET_IN;
@@ -3108,7 +3155,7 @@ handle_openflow(struct ofconn *ofconn, struct ofproto *p,
 }
 \f
 static void
-handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
+handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
 {
     struct odp_msg *msg = packet->data;
     uint16_t in_port = odp_port_to_ofp_port(msg->port);
@@ -3116,14 +3163,6 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
     struct ofpbuf payload;
     flow_t flow;
 
-    /* Handle controller actions. */
-    if (msg->type == _ODPL_ACTION_NR) {
-        COVERAGE_INC(ofproto_ctlr_action);
-        pinsched_send(p->action_sched, in_port, packet,
-                      send_packet_in_action, p);
-        return;
-    }
-
     payload.data = msg + 1;
     payload.size = msg->length - sizeof *msg;
     flow_extract(&payload, msg->port, &flow);
@@ -3193,6 +3232,36 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
         ofpbuf_delete(packet);
     }
 }
+
+static void
+handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
+{
+    struct odp_msg *msg = packet->data;
+
+    switch (msg->type) {
+    case _ODPL_ACTION_NR:
+        COVERAGE_INC(ofproto_ctlr_action);
+        pinsched_send(p->action_sched, odp_port_to_ofp_port(msg->port), packet,
+                      send_packet_in_action, p);
+        break;
+
+    case _ODPL_SFLOW_NR:
+        if (p->sflow) {
+            ofproto_sflow_received(p->sflow, msg);
+        }
+        ofpbuf_delete(packet);
+        break;
+
+    case _ODPL_MISS_NR:
+        handle_odp_miss_msg(p, packet);
+        break;
+
+    default:
+        VLOG_WARN_RL(&rl, "received ODP message of unexpected type %"PRIu32,
+                     msg->type);
+        break;
+    }
+}
 \f
 static void
 revalidate_cb(struct cls_rule *sub_, void *cbdata_)
index 50dd5d5..6377e51 100644 (file)
@@ -29,6 +29,11 @@ struct ofhooks;
 struct ofproto;
 struct svec;
 
+enum {
+    DP_GROUP_FLOOD = 0,
+    DP_GROUP_ALL = 1
+};
+
 struct ofexpired {
     flow_t flow;
     uint64_t packet_count;      /* Packets from subrules. */
@@ -36,6 +41,16 @@ struct ofexpired {
     long long int used;         /* Last-used time (0 if never used). */
 };
 
+struct ofproto_sflow_options {
+    struct svec targets;
+    uint32_t sampling_rate;
+    uint32_t polling_interval;
+    uint32_t header_len;
+    uint32_t sub_id;
+    char *agent_device;
+    char *control_ip;
+};
+
 int ofproto_create(const char *datapath, const struct ofhooks *, void *aux,
                    struct ofproto **ofprotop);
 void ofproto_destroy(struct ofproto *);
@@ -62,6 +77,7 @@ int ofproto_set_listeners(struct ofproto *, const struct svec *listeners);
 int ofproto_set_snoops(struct ofproto *, const struct svec *snoops);
 int ofproto_set_netflow(struct ofproto *,
                         const struct netflow_options *nf_options);
+void ofproto_set_sflow(struct ofproto *, const struct ofproto_sflow_options *);
 void ofproto_set_failure(struct ofproto *, bool fail_open);
 void ofproto_set_rate_limit(struct ofproto *, int rate_limit, int burst_limit);
 int ofproto_set_stp(struct ofproto *, bool enable_stp);
index 9ac12c9..1a9d492 100644 (file)
@@ -80,6 +80,7 @@ utilities_ovs_ofctl_LDADD = lib/libopenvswitch.a $(FAULT_LIBS) $(SSL_LIBS)
 utilities_ovs_openflowd_SOURCES = utilities/ovs-openflowd.c
 utilities_ovs_openflowd_LDADD = \
        ofproto/libofproto.a \
+       lib/libsflow.a \
        lib/libopenvswitch.a \
        $(FAULT_LIBS) \
        $(SSL_LIBS)
index 8e27fc2..d810c83 100644 (file)
@@ -21,6 +21,7 @@ vswitchd_ovs_vswitchd_SOURCES = \
        vswitchd/xenserver.h
 vswitchd_ovs_vswitchd_LDADD = \
        ofproto/libofproto.a \
+       lib/libsflow.a \
        lib/libopenvswitch.a \
        $(FAULT_LIBS) \
        $(SSL_LIBS)
index dbcf312..3b7ec51 100644 (file)
@@ -61,6 +61,7 @@
 #include "vconn-ssl.h"
 #include "xenserver.h"
 #include "xtoxll.h"
+#include "sflow_api.h"
 
 #define THIS_MODULE VLM_bridge
 #include "vlog.h"
@@ -210,6 +211,7 @@ static uint64_t bridge_pick_datapath_id(struct bridge *,
                                         const uint8_t bridge_ea[ETH_ADDR_LEN],
                                         struct iface *hw_addr_iface);
 static struct iface *bridge_get_local_iface(struct bridge *);
+static const char *bridge_get_controller(const struct bridge *br);
 static uint64_t dpid_from_hash(const void *, size_t nbytes);
 
 static void bridge_unixctl_fdb_show(struct unixctl_conn *, const char *args);
@@ -527,6 +529,7 @@ bridge_reconfigure(void)
     struct svec old_br, new_br;
     struct bridge *br, *next;
     size_t i;
+    int sflow_bridge_number;
 
     COVERAGE_INC(bridge_reconfigure);
 
@@ -646,6 +649,7 @@ bridge_reconfigure(void)
         svec_destroy(&want_ifaces);
         svec_destroy(&add_ifaces);
     }
+    sflow_bridge_number = 0;
     LIST_FOR_EACH (br, struct bridge, node, &all_bridges) {
         uint8_t ea[8];
         uint64_t dpid;
@@ -716,6 +720,42 @@ bridge_reconfigure(void)
         }
         svec_destroy(&nf_options.collectors);
 
+        if (cfg_has("sflow.%s.host", br->name)) {
+            struct ofproto_sflow_options oso;
+
+            svec_init(&oso.targets);
+            cfg_get_all_keys(&oso.targets, "sflow.%s.host", br->name);
+
+            oso.sampling_rate = SFL_DEFAULT_SAMPLING_RATE;
+            if (cfg_has("sflow.%s.sampling", br->name)) {
+                oso.sampling_rate = cfg_get_int(0, "sflow.%s.sampling",
+                                                br->name);
+            }
+
+            oso.polling_interval = SFL_DEFAULT_POLLING_INTERVAL;
+            if (cfg_has("sflow.%s.polling", br->name)) {
+                oso.polling_interval = cfg_get_int(0, "sflow.%s.polling",
+                                                   br->name);
+            }
+
+            oso.header_len = SFL_DEFAULT_HEADER_SIZE;
+            if (cfg_has("sflow.%s.header", br->name)) {
+                oso.header_len = cfg_get_int(0, "sflow.%s.header", br->name);
+            }
+
+            oso.sub_id = sflow_bridge_number++;
+            oso.agent_device = (char *) cfg_get_string(0, "sflow.%s.agent",
+                                                       br->name);
+            oso.control_ip = (char *) cfg_get_string(0,
+                                                     "bridge.%s.controller.ip",
+                                                     br->name);
+            ofproto_set_sflow(br->ofproto, &oso);
+
+            svec_destroy(&oso.targets);
+        } else {
+            ofproto_set_sflow(br->ofproto, NULL);
+        }
+
         /* Update the controller and related settings.  It would be more
          * straightforward to call this from bridge_reconfigure_one(), but we
          * can't do it there for two reasons.  First, and most importantly, at
index 431c948..76df35f 100644 (file)
@@ -48,6 +48,9 @@ Port mirroring, with optional VLAN tagging.
 NetFlow v5 flow logging.
 .
 .IP \(bu
+sFlow monitoring.
+.
+.IP \(bu
 Connectivity to an external OpenFlow controller, such as NOX.
 .
 .PP
index c416678..5449e57 100644 (file)
@@ -459,6 +459,29 @@ netflow.mybr.host=nflow.example.com:9995
 
 .fi
 .RE
+.SS "sFlow Monitoring"
+sFlow is a protocol for monitoring switches.  A bridge may be
+configured to send sFlow records to sFlow collectors by defining the
+key \fBsflow.\fIbridge\fB.host\fR for each collector in the form
+\fIip\fR[\fB:\fIport\fR].  Records from \fIbridge\fR will be sent to
+each \fIip\fR on UDP \fIport\fR.  The \fIip\fR must be specified
+numerically, not as a DNS name.  If \Iport\fR is omitted, port 6343 is
+used.
+.PP
+By default, 1 out of every 400 packets is sent to the configured sFlow
+collector.  To override this, set \fBsflow.\fIbridge\fB.sampling\fR to
+the number of switched packets out of which 1, on average, will be
+sent to the sFlow collector, e.g. a value of 1 sends every packet to
+the collector, a value of 2 sends 50% of the packets to the collector,
+and so on.
+.PP
+\fBovs\-vswitchd\fR also occasionally sends switch port statistics to
+sFlow collectors, by default every 30 seconds.  To override this, set
+\fBsflow.\fIbridge\fB.polling\fR to a duration in seconds.
+.PP
+By default, \fBovs\-vswitchd\fR sends the first 128 bytes of sampled
+packets to sFlow collectors.  To override this, set
+\fBsflow.\fIbridge\fB.header\fR to a size in bytes.
 .SS "Remote Management"
 A \fBovs\-vswitchd\fR instance may be remotely managed by a controller that
 supports the OpenFlow Management Protocol, such as NOX.  This
@@ -514,7 +537,7 @@ switch will perform all configured bridging and switching locally.
 .TP
 \fBdiscover\fR
 Use controller discovery to find the local OpenFlow controller.
-Refer to \fB\ovs\-openflowd\fR(8) for information on how to configure a DHCP
+Refer to \fBovs\-openflowd\fR(8) for information on how to configure a DHCP
 server to support controller discovery.  The following additional
 options control the discovery process:
 .