dpif: Make caller of dpif_recv() provide buffer space.
authorBen Pfaff <blp@nicira.com>
Fri, 6 Apr 2012 23:23:28 +0000 (16:23 -0700)
committerBen Pfaff <blp@nicira.com>
Thu, 19 Apr 2012 03:28:51 +0000 (20:28 -0700)
This improves performance under heavy flow setup loads.

Signed-off-by: Ben Pfaff <blp@nicira.com>
lib/dpif-linux.c
lib/dpif-netdev.c
lib/dpif-provider.h
lib/dpif.c
lib/dpif.h
ofproto/ofproto-dpif.c

index a2908cf..932b36f 100644 (file)
@@ -1089,7 +1089,8 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,
 }
 
 static int
-dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
+dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
+                struct ofpbuf *buf)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int read_tries = 0;
@@ -1123,7 +1124,6 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
         dpif->ready_mask &= ~(1u << indx);
 
         for (;;) {
-            struct ofpbuf *buf;
             int dp_ifindex;
             int error;
 
@@ -1131,10 +1131,8 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
                 return EAGAIN;
             }
 
-            buf = ofpbuf_new(2048);
             error = nl_sock_recv(upcall_sock, buf, false);
             if (error) {
-                ofpbuf_delete(buf);
                 if (error == EAGAIN) {
                     break;
                 }
@@ -1145,8 +1143,6 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
             if (!error && dp_ifindex == dpif->dp_ifindex) {
                 return 0;
             }
-
-            ofpbuf_delete(buf);
             if (error) {
                 return error;
             }
index a907722..a33fe23 100644 (file)
@@ -935,7 +935,8 @@ find_nonempty_queue(struct dpif *dpif)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall)
+dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
+                 struct ofpbuf *buf)
 {
     struct dp_netdev_queue *q = find_nonempty_queue(dpif);
     if (q) {
@@ -943,6 +944,9 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall)
         *upcall = *u;
         free(u);
 
+        ofpbuf_uninit(buf);
+        *buf = *u->packet;
+
         return 0;
     } else {
         return EAGAIN;
index 75c65bb..6338f50 100644 (file)
@@ -307,21 +307,19 @@ struct dpif_class {
                              uint32_t *priority);
 
     /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
-     * '*upcall'.  Should only be called if 'recv_set' has been used to enable
-     * receiving packets from 'dpif'.
+     * '*upcall', using 'buf' for storage.  Should only be called if 'recv_set'
+     * has been used to enable receiving packets from 'dpif'.
      *
-     * The caller takes ownership of the data that 'upcall' points to.
-     * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned
-     * by 'upcall->packet', so their memory cannot be freed separately.  (This
-     * is hardly a great way to do things but it works out OK for the dpif
-     * providers that exist so far.)
-     *
-     * For greatest efficiency, 'upcall->packet' should have at least
-     * offsetof(struct ofp_packet_in, data) bytes of headroom.
+     * The implementation should point 'upcall->packet' and 'upcall->key' into
+     * data in the caller-provided 'buf'.  If necessary to make room, the
+     * implementation may expand the data in 'buf'.  (This is hardly a great
+     * way to do things but it works out OK for the dpif providers that exist
+     * so far.)
      *
      * This function must not block.  If no upcall is pending when it is
      * called, it should return EAGAIN without blocking. */
-    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall);
+    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
+                struct ofpbuf *buf);
 
     /* Arranges for the poll loop to wake up when 'dpif' has a message queued
      * to be received with the recv member function. */
index e000e13..a1bd59b 100644 (file)
@@ -1069,21 +1069,20 @@ dpif_recv_set(struct dpif *dpif, bool enable)
 }
 
 /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
- * '*upcall'.  Should only be called if dpif_recv_set() has been used to enable
- * receiving packets on 'dpif'.
+ * '*upcall', using 'buf' for storage.  Should only be called if
+ * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
  *
- * The caller takes ownership of the data that 'upcall' points to.
- * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned by
- * 'upcall->packet', so their memory cannot be freed separately.  (This is
+ * 'upcall->packet' and 'upcall->key' point into data in the caller-provided
+ * 'buf', so their memory cannot be freed separately from 'buf'.  (This is
  * hardly a great way to do things but it works out OK for the dpif providers
  * and clients that exist so far.)
  *
  * Returns 0 if successful, otherwise a positive errno value.  Returns EAGAIN
  * if no upcall is immediately available. */
 int
-dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall)
+dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)
 {
-    int error = dpif->dpif_class->recv(dpif, upcall);
+    int error = dpif->dpif_class->recv(dpif, upcall, buf);
     if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
         struct ds flow;
         char *packet;
index 768934b..bdd4fee 100644 (file)
@@ -251,7 +251,7 @@ struct dpif_upcall {
 };
 
 int dpif_recv_set(struct dpif *, bool enable);
-int dpif_recv(struct dpif *, struct dpif_upcall *);
+int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *);
 \f
index 978b5ed..6263431 100644 (file)
@@ -2783,7 +2783,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
                                                 &flow, &initial_tci,
                                                 upcall->packet);
         if (fitness == ODP_FIT_ERROR) {
-            ofpbuf_delete(upcall->packet);
             continue;
         }
         flow_extract(upcall->packet, flow.skb_priority, flow.tun_id,
@@ -2793,7 +2792,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
         if (process_special(ofproto, &flow, upcall->packet)) {
             ofproto_update_local_port_stats(&ofproto->up,
                                             0, upcall->packet->size);
-            ofpbuf_delete(upcall->packet);
             ofproto->n_matches++;
             continue;
         }
@@ -2842,7 +2840,6 @@ handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
         }
     }
     HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &todo) {
-        ofpbuf_list_delete(&miss->packets);
         hmap_remove(&todo, &miss->hmap_node);
         free(miss);
     }
@@ -2864,7 +2861,6 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto,
                                             upcall->key_len, &flow,
                                             &initial_tci, upcall->packet);
     if (fitness == ODP_FIT_ERROR) {
-        ofpbuf_delete(upcall->packet);
         return;
     }
 
@@ -2876,31 +2872,39 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto,
     } else {
         VLOG_WARN_RL(&rl, "invalid user cookie : 0x%"PRIx64, upcall->userdata);
     }
-    ofpbuf_delete(upcall->packet);
 }
 
 static int
 handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
 {
     struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+    struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH];
+    uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8];
+    int n_processed;
     int n_misses;
     int i;
 
-    assert (max_batch <= FLOW_MISS_MAX_BATCH);
+    assert(max_batch <= FLOW_MISS_MAX_BATCH);
 
+    n_processed = 0;
     n_misses = 0;
-    for (i = 0; i < max_batch; i++) {
+    for (n_processed = 0; n_processed < max_batch; n_processed++) {
         struct dpif_upcall *upcall = &misses[n_misses];
+        struct ofpbuf *buf = &miss_bufs[n_misses];
         int error;
 
-        error = dpif_recv(ofproto->dpif, upcall);
+        ofpbuf_use_stub(buf, miss_buf_stubs[n_misses],
+                        sizeof miss_buf_stubs[n_misses]);
+        error = dpif_recv(ofproto->dpif, upcall, buf);
         if (error) {
+            ofpbuf_uninit(buf);
             break;
         }
 
         switch (upcall->type) {
         case DPIF_UC_ACTION:
             handle_userspace_upcall(ofproto, upcall);
+            ofpbuf_uninit(buf);
             break;
 
         case DPIF_UC_MISS:
@@ -2917,8 +2921,11 @@ handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
     }
 
     handle_miss_upcalls(ofproto, misses, n_misses);
+    for (i = 0; i < n_misses; i++) {
+        ofpbuf_uninit(&miss_bufs[i]);
+    }
 
-    return i;
+    return n_processed;
 }
 \f
 /* Flow expiration. */