dpif-linux: Give each port its own userspace-kernel channel.
[sliver-openvswitch.git] / lib / dpif-linux.c
index c75b8cc..63d7afb 100644 (file)
 VLOG_DEFINE_THIS_MODULE(dpif_linux);
 enum { MAX_PORTS = USHRT_MAX };
 
-enum { N_CHANNELS = 17 };
-BUILD_ASSERT_DECL(IS_POW2(N_CHANNELS - 1));
-BUILD_ASSERT_DECL(N_CHANNELS > 1);
-BUILD_ASSERT_DECL(N_CHANNELS <= 32); /* We use a 32-bit word as a mask. */
-
 /* This ethtool flag was introduced in Linux 2.6.24, so it might be
  * missing if we have old headers. */
 #define ETH_FLAG_LRO      (1 << 15)    /* LRO is enabled */
@@ -131,68 +126,26 @@ static int dpif_linux_flow_transact(struct dpif_linux_flow *request,
 static void dpif_linux_flow_get_stats(const struct dpif_linux_flow *,
                                       struct dpif_flow_stats *);
 
-/* Packet drop monitoring.
- *
- * When kernel-to-user Netlink buffers overflow, the kernel notifies us that
- * one or more packets were dropped, but it doesn't tell us anything about
- * those packets.  However, the administrator really wants to know.  So we do
- * the next best thing, and keep track of the top sources of packets received
- * on each kernel-to-user channel, since the top sources are those that will
- * cause the buffers to overflow.
- *
- * We use a variation on the "Space-Saving" algorithm in Metwally et al.,
- * "Efficient Computation of Frequent and Top-k Elements in Data Streams", ACM
- * Transactions on Database Systems 31:3 (2006).  This algorithm yields
- * perfectly accurate results when the data stream's unique values (in this
- * case, port numbers) fit into our data structure, and degrades gracefully
- * even for challenging distributions (e.g. Zipf).
- *
- * Our implementation is very simple, without any of the special flourishes
- * described in the paper.  It avoids the need to use a hash for lookup by
- * keeping the constant factor (N_SKETCHES) very small.  The error calculations
- * in the paper make it sound like the results should still be satisfactory.
- *
- * "space-saving" and "Metwally" seem like awkward names for data structures,
- * so we call this a "sketch" even though technically that's a different sort
- * of summary structure.
- */
-
-/* One of N_SKETCHES counting elements per channel in the Metwally
- * "space-saving" algorithm. */
-enum { N_SKETCHES = 8 };        /* Number of elements per channel. */
-struct dpif_sketch {
-    uint32_t port_no;           /* Port number. */
-    unsigned int hits;          /* Number of hits. */
-    unsigned int error;         /* Upper bound on error in 'hits'. */
-};
-
-/* One of N_CHANNELS channels per dpif between the kernel and userspace. */
+/* One of the dpif channels between the kernel and userspace. */
 struct dpif_channel {
     struct nl_sock *sock;       /* Netlink socket. */
-    struct dpif_sketch sketches[N_SKETCHES]; /* From max to min 'hits'. */
     long long int last_poll;    /* Last time this channel was polled. */
 };
 
-static void update_sketch(struct dpif_channel *, uint32_t port_no);
-static void scale_sketches(struct dpif *);
 static void report_loss(struct dpif *, struct dpif_channel *);
 
-/* Interval, in milliseconds, at which to scale down the sketch values by a
- * factor of 2.  The Metwally algorithm doesn't do this, which makes sense in
- * the context it assumes, but in our situation we ought to weight recent data
- * more heavily than old data, so in my opinion this is reasonable. */
-#define SCALE_INTERVAL (60 * 1000)
-
 /* Datapath interface for the openvswitch Linux kernel module. */
 struct dpif_linux {
     struct dpif dpif;
     int dp_ifindex;
 
     /* Upcall messages. */
-    struct dpif_channel channels[N_CHANNELS];
-    uint32_t ready_mask;        /* 1-bit for each sock with unread messages. */
+    int uc_array_size;          /* Size of 'channels' and 'epoll_events'. */
+    struct dpif_channel *channels;
+    struct epoll_event *epoll_events;
     int epoll_fd;               /* epoll fd that includes channel socks. */
-    long long int next_scale;   /* Next time to scale down the sketches. */
+    int n_events;               /* Num events returned by epoll_wait(). */
+    int event_offset;           /* Offset into 'epoll_events'. */
 
     /* Change notification. */
     struct sset changed_ports;  /* Ports that have changed. */
@@ -300,8 +253,6 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)
     dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,
               dp->dp_ifindex, dp->dp_ifindex);
 
-    dpif->next_scale = LLONG_MAX;
-
     dpif->dp_ifindex = dp->dp_ifindex;
     sset_init(&dpif->changed_ports);
     *dpifp = &dpif->dpif;
@@ -310,17 +261,108 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)
 static void
 destroy_channels(struct dpif_linux *dpif)
 {
-    struct dpif_channel *ch;
+    int i;
 
-    if (dpif->epoll_fd >= 0) {
-        close(dpif->epoll_fd);
-        dpif->epoll_fd = -1;
+    if (dpif->epoll_fd < 0) {
+        return;
     }
-    for (ch = dpif->channels; ch < &dpif->channels[N_CHANNELS]; ch++) {
+
+    for (i = 0; i < dpif->uc_array_size; i++ ) {
+        struct dpif_linux_vport vport_request;
+        struct dpif_channel *ch = &dpif->channels[i];
+        uint32_t upcall_pid = 0;
+
+        if (!ch->sock) {
+            continue;
+        }
+
+        /* Turn off upcalls. */
+        dpif_linux_vport_init(&vport_request);
+        vport_request.cmd = OVS_VPORT_CMD_SET;
+        vport_request.dp_ifindex = dpif->dp_ifindex;
+        vport_request.port_no = i;
+        vport_request.upcall_pid = &upcall_pid;
+        dpif_linux_vport_transact(&vport_request, NULL, NULL);
+
         nl_sock_destroy(ch->sock);
-        ch->sock = NULL;
     }
-    dpif->next_scale = LLONG_MAX;
+
+    free(dpif->channels);
+    dpif->channels = NULL;
+    dpif->uc_array_size = 0;
+
+    free(dpif->epoll_events);
+    dpif->epoll_events = NULL;
+    dpif->n_events = dpif->event_offset = 0;
+
+    close(dpif->epoll_fd);
+    dpif->epoll_fd = -1;
+}
+
+static int
+add_channel(struct dpif_linux *dpif, uint32_t port_no, struct nl_sock *sock)
+{
+    struct epoll_event event;
+
+    if (dpif->epoll_fd < 0) {
+        return 0;
+    }
+
+    /* We assume that the datapath densely chooses port numbers, which
+     * can therefore be used as an index into an array of channels. */
+    if (port_no >= dpif->uc_array_size) {
+        int new_size = port_no + 1;
+        int i;
+
+        if (new_size > 65535) {
+            VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
+                         dpif_name(&dpif->dpif), port_no);
+            return EFBIG;
+        }
+
+        dpif->channels = xrealloc(dpif->channels,
+                                  new_size * sizeof *dpif->channels);
+        for (i = dpif->uc_array_size; i < new_size; i++) {
+            dpif->channels[i].sock = NULL;
+        }
+
+        dpif->epoll_events = xrealloc(dpif->epoll_events,
+                                      new_size * sizeof *dpif->epoll_events);
+        dpif->uc_array_size = new_size;
+    }
+
+    memset(&event, 0, sizeof event);
+    event.events = EPOLLIN;
+    event.data.u32 = port_no;
+    if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
+                  &event) < 0) {
+        return errno;
+    }
+
+    dpif->channels[port_no].sock = sock;
+    dpif->channels[port_no].last_poll = LLONG_MIN;
+
+    return 0;
+}
+
+static void
+del_channel(struct dpif_linux *dpif, uint32_t port_no)
+{
+    struct dpif_channel *ch;
+
+    if (dpif->epoll_fd < 0 || port_no >= dpif->uc_array_size) {
+        return;
+    }
+
+    ch = &dpif->channels[port_no];
+    if (!ch->sock) {
+        return;
+    }
+
+    epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
+
+    nl_sock_destroy(ch->sock);
+    ch->sock = NULL;
 }
 
 static void
@@ -347,15 +389,8 @@ dpif_linux_destroy(struct dpif *dpif_)
 }
 
 static void
-dpif_linux_run(struct dpif *dpif_)
+dpif_linux_run(struct dpif *dpif_ OVS_UNUSED)
 {
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-
-    if (time_msec() >= dpif->next_scale) {
-        dpif->next_scale = time_msec() + SCALE_INTERVAL;
-        scale_sketches(dpif_);
-    }
-
     if (nln) {
         nln_run(nln);
     }
@@ -396,10 +431,18 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
     const char *type = netdev_get_type(netdev);
     struct dpif_linux_vport request, reply;
     const struct ofpbuf *options;
+    struct nl_sock *sock = NULL;
     uint32_t upcall_pid;
     struct ofpbuf *buf;
     int error;
 
+    if (dpif->epoll_fd >= 0) {
+        error = nl_sock_create(NETLINK_GENERIC, &sock);
+        if (error) {
+            return error;
+        }
+    }
+
     dpif_linux_vport_init(&request);
     request.cmd = OVS_VPORT_CMD_NEW;
     request.dp_ifindex = dpif->dp_ifindex;
@@ -408,6 +451,7 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
         VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it has "
                      "unsupported type `%s'",
                      dpif_name(dpif_), name, type);
+        nl_sock_destroy(sock);
         return EINVAL;
     }
     request.name = name;
@@ -423,23 +467,42 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
     }
 
     request.port_no = *port_nop;
-    upcall_pid = dpif_linux_port_get_pid(dpif_, request.port_no);
+    upcall_pid = sock ? nl_sock_pid(sock) : 0;
     request.upcall_pid = &upcall_pid;
 
     error = dpif_linux_vport_transact(&request, &reply, &buf);
-
     if (!error) {
         *port_nop = reply.port_no;
         VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
                  dpif_name(dpif_), reply.port_no, upcall_pid);
     } else if (error == EBUSY && *port_nop != UINT32_MAX) {
         VLOG_INFO("%s: requested port %"PRIu32" is in use",
-                 dpif_name(dpif_), *port_nop);
+                  dpif_name(dpif_), *port_nop);
+        nl_sock_destroy(sock);
+        ofpbuf_delete(buf);
+        return error;
     }
-
     ofpbuf_delete(buf);
 
-    return error;
+    if (sock) {
+        error = add_channel(dpif, *port_nop, sock);
+        if (error) {
+            VLOG_INFO("%s: could not add channel for port %s",
+                      dpif_name(dpif_), name);
+
+            /* Delete the port. */
+            dpif_linux_vport_init(&request);
+            request.cmd = OVS_VPORT_CMD_DEL;
+            request.dp_ifindex = dpif->dp_ifindex;
+            request.port_no = *port_nop;
+            dpif_linux_vport_transact(&request, NULL, NULL);
+
+            nl_sock_destroy(sock);
+            return error;
+        }
+    }
+
+    return 0;
 }
 
 static int
@@ -455,6 +518,8 @@ dpif_linux_port_del(struct dpif *dpif_, uint32_t port_no)
     vport.port_no = port_no;
     error = dpif_linux_vport_transact(&vport, NULL, NULL);
 
+    del_channel(dpif, port_no);
+
     return error;
 }
 
@@ -517,11 +582,9 @@ dpif_linux_port_get_pid(const struct dpif *dpif_, uint32_t port_no)
     if (dpif->epoll_fd < 0) {
         return 0;
     } else {
-        int idx;
-
-        idx = (port_no != UINT32_MAX
-               ? 1 + (port_no & (N_CHANNELS - 2))
-               : 0);
+        /* The UINT32_MAX "reserved" port number uses the "ovs-system"'s
+         * channel, since it is not heavily loaded. */
+        int idx = (port_no >= dpif->uc_array_size) ? 0 : port_no;
         return nl_sock_pid(dpif->channels[idx].sock);
     }
 }
@@ -1019,35 +1082,6 @@ dpif_linux_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
     }
 }
 
-static void
-set_upcall_pids(struct dpif *dpif_)
-{
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-    struct dpif_port_dump port_dump;
-    struct dpif_port port;
-    int error;
-
-    DPIF_PORT_FOR_EACH (&port, &port_dump, &dpif->dpif) {
-        uint32_t upcall_pid = dpif_linux_port_get_pid(dpif_, port.port_no);
-        struct dpif_linux_vport vport_request;
-
-        dpif_linux_vport_init(&vport_request);
-        vport_request.cmd = OVS_VPORT_CMD_SET;
-        vport_request.dp_ifindex = dpif->dp_ifindex;
-        vport_request.port_no = port.port_no;
-        vport_request.upcall_pid = &upcall_pid;
-        error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
-        if (!error) {
-            VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
-                     dpif_name(&dpif->dpif), vport_request.port_no,
-                     upcall_pid);
-        } else {
-            VLOG_WARN_RL(&error_rl, "%s: failed to set upcall pid on port: %s",
-                         dpif_name(&dpif->dpif), strerror(error));
-        }
-    }
-}
-
 static int
 dpif_linux_recv_set(struct dpif *dpif_, bool enable)
 {
@@ -1060,44 +1094,61 @@ dpif_linux_recv_set(struct dpif *dpif_, bool enable)
     if (!enable) {
         destroy_channels(dpif);
     } else {
-        struct dpif_channel *ch;
-        int error;
+        struct dpif_port_dump port_dump;
+        struct dpif_port port;
 
-        dpif->epoll_fd = epoll_create(N_CHANNELS);
+        dpif->epoll_fd = epoll_create(10);
         if (dpif->epoll_fd < 0) {
             return errno;
         }
 
-        for (ch = dpif->channels; ch < &dpif->channels[N_CHANNELS]; ch++) {
-            int indx = ch - dpif->channels;
-            struct epoll_event event;
+        DPIF_PORT_FOR_EACH (&port, &port_dump, &dpif->dpif) {
+            struct dpif_linux_vport vport_request;
+            struct nl_sock *sock;
+            uint32_t upcall_pid;
+            int error;
 
-            error = nl_sock_create(NETLINK_GENERIC, &ch->sock);
+            error = nl_sock_create(NETLINK_GENERIC, &sock);
             if (error) {
-                destroy_channels(dpif);
                 return error;
             }
 
-            memset(&event, 0, sizeof event);
-            event.events = EPOLLIN;
-            event.data.u32 = indx;
-            if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(ch->sock),
-                          &event) < 0) {
-                error = errno;
-                destroy_channels(dpif);
-                return error;
+            upcall_pid = nl_sock_pid(sock);
+
+            dpif_linux_vport_init(&vport_request);
+            vport_request.cmd = OVS_VPORT_CMD_SET;
+            vport_request.dp_ifindex = dpif->dp_ifindex;
+            vport_request.port_no = port.port_no;
+            vport_request.upcall_pid = &upcall_pid;
+            error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
+            if (!error) {
+                VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
+                         dpif_name(&dpif->dpif), vport_request.port_no,
+                         upcall_pid);
+            } else {
+                VLOG_WARN_RL(&error_rl,
+                             "%s: failed to set upcall pid on port: %s",
+                             dpif_name(&dpif->dpif), strerror(error));
+                nl_sock_destroy(sock);
+
+                if (error == ENODEV || error == ENOENT) {
+                    /* This device isn't there, but keep trying the others. */
+                    continue;
+                } else {
+                    return error;
+                }
             }
 
-            memset(ch->sketches, 0, sizeof ch->sketches);
-            ch->last_poll = LLONG_MIN;
+            error = add_channel(dpif, port.port_no, sock);
+            if (error) {
+                VLOG_INFO("%s: could not add channel for port %s",
+                          dpif_name(dpif_), port.name);
+                nl_sock_destroy(sock);
+                return error;
+            }
         }
-
-        dpif->ready_mask = 0;
-        dpif->next_scale = time_msec() + SCALE_INTERVAL;
     }
 
-    set_upcall_pids(dpif_);
-
     return 0;
 }
 
@@ -1181,29 +1232,28 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
        return EAGAIN;
     }
 
-    if (!dpif->ready_mask) {
-        struct epoll_event events[N_CHANNELS];
+    if (dpif->event_offset >= dpif->n_events) {
         int retval;
-        int i;
+
+        dpif->event_offset = dpif->n_events = 0;
 
         do {
-            retval = epoll_wait(dpif->epoll_fd, events, N_CHANNELS, 0);
+            retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
+                                dpif->uc_array_size, 0);
         } while (retval < 0 && errno == EINTR);
         if (retval < 0) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
             VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno));
-        }
-
-        for (i = 0; i < retval; i++) {
-            dpif->ready_mask |= 1u << events[i].data.u32;
+        } else if (retval > 0) {
+            dpif->n_events = retval;
         }
     }
 
-    while (dpif->ready_mask) {
-        int indx = ffs(dpif->ready_mask) - 1;
-        struct dpif_channel *ch = &dpif->channels[indx];
+    while (dpif->event_offset < dpif->n_events) {
+        int idx = dpif->epoll_events[dpif->event_offset].data.u32;
+        struct dpif_channel *ch = &dpif->channels[idx];
 
-        dpif->ready_mask &= ~(1u << indx);
+        dpif->event_offset++;
 
         for (;;) {
             int dp_ifindex;
@@ -1233,16 +1283,8 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
 
             error = parse_odp_packet(buf, upcall, &dp_ifindex);
             if (!error && dp_ifindex == dpif->dp_ifindex) {
-                const struct nlattr *in_port;
-
-                in_port = nl_attr_find__(upcall->key, upcall->key_len,
-                                         OVS_KEY_ATTR_IN_PORT);
-                if (in_port) {
-                    update_sketch(ch, nl_attr_get_u32(in_port));
-                }
                 return 0;
-            }
-            if (error) {
+            } else if (error) {
                 return error;
             }
         }
@@ -1273,8 +1315,10 @@ dpif_linux_recv_purge(struct dpif *dpif_)
        return;
     }
 
-    for (ch = dpif->channels; ch < &dpif->channels[N_CHANNELS]; ch++) {
-        nl_sock_drain(ch->sock);
+    for (ch = dpif->channels; ch < &dpif->channels[dpif->uc_array_size]; ch++) {
+        if (ch->sock) {
+            nl_sock_drain(ch->sock);
+        }
     }
 }
 
@@ -1875,55 +1919,6 @@ dpif_linux_flow_get_stats(const struct dpif_linux_flow *flow,
     stats->tcp_flags = flow->tcp_flags ? *flow->tcp_flags : 0;
 }
 \f
-/* Metwally "space-saving" algorithm implementation. */
-
-/* Updates 'ch' to record that a packet was received on 'port_no'. */
-static void
-update_sketch(struct dpif_channel *ch, uint32_t port_no)
-{
-    struct dpif_sketch *sk;
-
-    /* Find an existing counting element for 'port_no' or, if none, replace the
-     * counting element with the fewest hits by 'port_no'. */
-    for (sk = ch->sketches; ; sk++) {
-        if (port_no == sk->port_no) {
-            break;
-        } else if (sk == &ch->sketches[N_SKETCHES - 1]) {
-            sk->port_no = port_no;
-            sk->error = sk->hits;
-            break;
-        }
-    }
-
-    /* Increment the hit count, then re-sort the counting elements (usually
-     * nothing needs to be done). */
-    sk->hits++;
-    while (sk > ch->sketches && sk[-1].hits > sk->hits) {
-        struct dpif_sketch tmp = sk[-1];
-        sk[-1] = *sk;
-        *sk = tmp;
-        sk--;
-    }
-}
-
-/* Divide the counts of all the the counting elements in 'dpif' by 2.  See the
- * comment on SCALE_INTERVAL. */
-static void
-scale_sketches(struct dpif *dpif_)
-{
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-    struct dpif_channel *ch;
-
-    for (ch = dpif->channels; ch < &dpif->channels[N_CHANNELS]; ch++) {
-        struct dpif_sketch *sk;
-
-        for (sk = ch->sketches; sk < &ch->sketches[N_SKETCHES]; sk++) {
-            sk->hits /= 2;
-            sk->error /= 2;
-        }
-    }
-}
-
 /* Logs information about a packet that was recently lost in 'ch' (in
  * 'dpif_'). */
 static void
@@ -1931,7 +1926,6 @@ report_loss(struct dpif *dpif_, struct dpif_channel *ch)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
-    struct dpif_sketch *sk;
     struct ds s;
 
     if (VLOG_DROP_ERR(&rl)) {
@@ -1943,25 +1937,6 @@ report_loss(struct dpif *dpif_, struct dpif_channel *ch)
         ds_put_format(&s, " (last polled %lld ms ago)",
                       time_msec() - ch->last_poll);
     }
-    ds_put_cstr(&s, ", most frequent sources are");
-    for (sk = ch->sketches; sk < &ch->sketches[N_SKETCHES]; sk++) {
-        if (sk->hits) {
-            struct dpif_port port;
-
-            ds_put_format(&s, " %"PRIu32, sk->port_no);
-            if (!dpif_port_query_by_number(dpif_, sk->port_no, &port)) {
-                ds_put_format(&s, "(%s)", port.name);
-                dpif_port_destroy(&port);
-            }
-            if (sk->error) {
-                ds_put_format(&s, ": %u to %u,",
-                              sk->hits - sk->error, sk->hits);
-            } else {
-                ds_put_format(&s, ": %u,", sk->hits);
-            }
-        }
-    }
-    ds_chomp(&s, ',');
 
     VLOG_WARN("%s: lost packet on channel %td%s",
               dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s));