ofproto-dpif: Don't output to in_port even if in_port is OFPP_LOCAL.
[sliver-openvswitch.git] / ofproto / ofproto-dpif.c
index c41f4db..f1d42a2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011 Nicira Networks.
+ * Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -273,7 +273,7 @@ static bool execute_controller_action(struct ofproto_dpif *,
                                       const struct flow *,
                                       const struct nlattr *odp_actions,
                                       size_t actions_len,
-                                      struct ofpbuf *packet);
+                                      struct ofpbuf *packet, bool clone);
 static void facet_execute(struct ofproto_dpif *, struct facet *,
                           struct ofpbuf *packet);
 
@@ -415,9 +415,7 @@ static bool is_admissible(struct ofproto_dpif *, const struct flow *,
 
 /* Upcalls. */
 #define FLOW_MISS_MAX_BATCH 50
-static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
-static void handle_miss_upcalls(struct ofproto_dpif *,
-                                struct dpif_upcall *, size_t n);
+static int handle_upcalls(struct ofproto_dpif *, unsigned int max_batch);
 
 /* Flow expiration. */
 static int expire(struct ofproto_dpif *);
@@ -583,44 +581,50 @@ destruct(struct ofproto *ofproto_)
     dpif_close(ofproto->dpif);
 }
 
+static int
+run_fast(struct ofproto *ofproto_)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+    unsigned int work;
+
+    /* Handle one or more batches of upcalls, until there's nothing left to do
+     * or until we do a fixed total amount of work.
+     *
+     * We do work in batches because it can be much cheaper to set up a number
+     * of flows and fire off their patches all at once.  We do multiple batches
+     * because in some cases handling a packet can cause another packet to be
+     * queued almost immediately as part of the return flow.  Both
+     * optimizations can make major improvements on some benchmarks and
+     * presumably for real traffic as well. */
+    work = 0;
+    while (work < FLOW_MISS_MAX_BATCH) {
+        int retval = handle_upcalls(ofproto, FLOW_MISS_MAX_BATCH - work);
+        if (retval <= 0) {
+            return -retval;
+        }
+        work += retval;
+    }
+    return 0;
+}
+
 static int
 run(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
     struct ofport_dpif *ofport;
     struct ofbundle *bundle;
-    size_t n_misses;
-    int i;
+    int error;
 
     if (!clogged) {
         complete_operations(ofproto);
     }
     dpif_run(ofproto->dpif);
 
-    n_misses = 0;
-    for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
-        struct dpif_upcall *upcall = &misses[n_misses];
-        int error;
-
-        error = dpif_recv(ofproto->dpif, upcall);
-        if (error) {
-            if (error == ENODEV && n_misses == 0) {
-                return error;
-            }
-            break;
-        }
-
-        if (upcall->type == DPIF_UC_MISS) {
-            /* Handle it later. */
-            n_misses++;
-        } else {
-            handle_upcall(ofproto, upcall);
-        }
+    error = run_fast(ofproto_);
+    if (error) {
+        return error;
     }
 
-    handle_miss_upcalls(ofproto, misses, n_misses);
-
     if (timer_expired(&ofproto->next_expiration)) {
         int delay = expire(ofproto);
         timer_set_duration(&ofproto->next_expiration, delay);
@@ -2197,7 +2201,7 @@ handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
         }
         if (!execute_controller_action(ofproto, &facet->flow,
                                        facet->actions, facet->actions_len,
-                                       packet)) {
+                                       packet, true)) {
             struct flow_miss_op *op = &ops[(*n_ops)++];
             struct dpif_execute *execute = &op->dpif_op.execute;
 
@@ -2341,23 +2345,46 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto,
     }
 }
 
-static void
-handle_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static int
+handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
 {
-    switch (upcall->type) {
-    case DPIF_UC_ACTION:
-        handle_userspace_upcall(ofproto, upcall);
-        break;
+    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+    int n_misses;
+    int i;
 
-    case DPIF_UC_MISS:
-        /* The caller handles these. */
-        NOT_REACHED();
+    assert (max_batch <= FLOW_MISS_MAX_BATCH);
 
-    case DPIF_N_UC_TYPES:
-    default:
-        VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, upcall->type);
-        break;
+    n_misses = 0;
+    for (i = 0; i < max_batch; i++) {
+        struct dpif_upcall *upcall = &misses[n_misses];
+        int error;
+
+        error = dpif_recv(ofproto->dpif, upcall);
+        if (error) {
+            break;
+        }
+
+        switch (upcall->type) {
+        case DPIF_UC_ACTION:
+            handle_userspace_upcall(ofproto, upcall);
+            break;
+
+        case DPIF_UC_MISS:
+            /* Handle it later. */
+            n_misses++;
+            break;
+
+        case DPIF_N_UC_TYPES:
+        default:
+            VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
+                         upcall->type);
+            break;
+        }
     }
+
+    handle_miss_upcalls(ofproto, misses, n_misses);
+
+    return i;
 }
 \f
 /* Flow expiration. */
@@ -2672,11 +2699,17 @@ facet_free(struct facet *facet)
     free(facet);
 }
 
+/* If the 'actions_len' bytes of actions in 'odp_actions' are just a single
+ * OVS_ACTION_ATTR_USERSPACE action, executes it internally and returns true.
+ * Otherwise, returns false without doing anything.
+ *
+ * If 'clone' is true, the caller always retains ownership of 'packet'.
+ * Otherwise, ownership is transferred to this function if it returns true. */
 static bool
 execute_controller_action(struct ofproto_dpif *ofproto,
                           const struct flow *flow,
                           const struct nlattr *odp_actions, size_t actions_len,
-                          struct ofpbuf *packet)
+                          struct ofpbuf *packet, bool clone)
 {
     if (actions_len
         && odp_actions->nla_type == OVS_ACTION_ATTR_USERSPACE
@@ -2692,7 +2725,7 @@ execute_controller_action(struct ofproto_dpif *ofproto,
 
         nla = nl_attr_find_nested(odp_actions, OVS_USERSPACE_ATTR_USERDATA);
         send_packet_in_action(ofproto, packet, nl_attr_get_u64(nla), flow,
-                              false);
+                              clone);
         return true;
     } else {
         return false;
@@ -2713,7 +2746,7 @@ execute_odp_actions(struct ofproto_dpif *ofproto, const struct flow *flow,
     int error;
 
     if (execute_controller_action(ofproto, flow, odp_actions, actions_len,
-                                  packet)) {
+                                  packet, false)) {
         return true;
     }
 
@@ -3912,11 +3945,9 @@ xlate_output_action__(struct action_xlate_ctx *ctx,
         commit_odp_actions(ctx);
         compose_controller_action(ctx, max_len);
         break;
-    case OFPP_LOCAL:
-        add_output_action(ctx, OFPP_LOCAL);
-        break;
     case OFPP_NONE:
         break;
+    case OFPP_LOCAL:
     default:
         if (port != ctx->flow.in_port) {
             add_output_action(ctx, port);
@@ -5374,6 +5405,7 @@ const struct ofproto_class ofproto_dpif_class = {
     destruct,
     dealloc,
     run,
+    run_fast,
     wait,
     flush,
     get_features,