ofproto-dpif: Don't output to in_port even if in_port is OFPP_LOCAL.
[sliver-openvswitch.git] / ofproto / ofproto-dpif.c
index 4de5a4e..f1d42a2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011 Nicira Networks.
+ * Copyright (c) 2009, 2010, 2011, 2012 Nicira Networks.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -273,7 +273,7 @@ static bool execute_controller_action(struct ofproto_dpif *,
                                       const struct flow *,
                                       const struct nlattr *odp_actions,
                                       size_t actions_len,
-                                      struct ofpbuf *packet);
+                                      struct ofpbuf *packet, bool clone);
 static void facet_execute(struct ofproto_dpif *, struct facet *,
                           struct ofpbuf *packet);
 
@@ -415,9 +415,7 @@ static bool is_admissible(struct ofproto_dpif *, const struct flow *,
 
 /* Upcalls. */
 #define FLOW_MISS_MAX_BATCH 50
-static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
-static void handle_miss_upcalls(struct ofproto_dpif *,
-                                struct dpif_upcall *, size_t n);
+static int handle_upcalls(struct ofproto_dpif *, unsigned int max_batch);
 
 /* Flow expiration. */
 static int expire(struct ofproto_dpif *);
@@ -583,44 +581,50 @@ destruct(struct ofproto *ofproto_)
     dpif_close(ofproto->dpif);
 }
 
+static int
+run_fast(struct ofproto *ofproto_)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+    unsigned int work;
+
+    /* Handle one or more batches of upcalls, until there's nothing left to do
+     * or until we do a fixed total amount of work.
+     *
+     * We do work in batches because it can be much cheaper to set up a number
+     * of flows and fire off their patches all at once.  We do multiple batches
+     * because in some cases handling a packet can cause another packet to be
+     * queued almost immediately as part of the return flow.  Both
+     * optimizations can make major improvements on some benchmarks and
+     * presumably for real traffic as well. */
+    work = 0;
+    while (work < FLOW_MISS_MAX_BATCH) {
+        int retval = handle_upcalls(ofproto, FLOW_MISS_MAX_BATCH - work);
+        if (retval <= 0) {
+            return -retval;
+        }
+        work += retval;
+    }
+    return 0;
+}
+
 static int
 run(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
     struct ofport_dpif *ofport;
     struct ofbundle *bundle;
-    size_t n_misses;
-    int i;
+    int error;
 
     if (!clogged) {
         complete_operations(ofproto);
     }
     dpif_run(ofproto->dpif);
 
-    n_misses = 0;
-    for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
-        struct dpif_upcall *upcall = &misses[n_misses];
-        int error;
-
-        error = dpif_recv(ofproto->dpif, upcall);
-        if (error) {
-            if (error == ENODEV && n_misses == 0) {
-                return error;
-            }
-            break;
-        }
-
-        if (upcall->type == DPIF_UC_MISS) {
-            /* Handle it later. */
-            n_misses++;
-        } else {
-            handle_upcall(ofproto, upcall);
-        }
+    error = run_fast(ofproto_);
+    if (error) {
+        return error;
     }
 
-    handle_miss_upcalls(ofproto, misses, n_misses);
-
     if (timer_expired(&ofproto->next_expiration)) {
         int delay = expire(ofproto);
         timer_set_duration(&ofproto->next_expiration, delay);
@@ -1040,7 +1044,7 @@ update_stp_port_state(struct ofport_dpif *ofport)
         ofport->stp_state = state;
         ofport->stp_state_entered = time_msec();
 
-        if (fwd_change) {
+        if (fwd_change && ofport->bundle) {
             bundle_update(ofport->bundle);
         }
 
@@ -1070,6 +1074,7 @@ set_stp_port(struct ofport *ofport_,
         if (sp) {
             ofport->stp_port = NULL;
             stp_port_disable(sp);
+            update_stp_port_state(ofport);
         }
         return 0;
     } else if (sp && stp_port_no(sp) != s->port_num
@@ -1109,6 +1114,7 @@ get_stp_port_status(struct ofport *ofport_,
     s->state = stp_port_get_state(sp);
     s->sec_in_state = (time_msec() - ofport->stp_state_entered) / 1000;
     s->role = stp_port_get_role(sp);
+    stp_port_get_counts(sp, &s->tx_count, &s->rx_count, &s->error_count);
 
     return 0;
 }
@@ -1165,8 +1171,8 @@ stp_process_packet(const struct ofport_dpif *ofport,
     }
 
     /* Trim off padding on payload. */
-    if (payload.size > htons(eth->eth_type) + ETH_HEADER_LEN) {
-        payload.size = htons(eth->eth_type) + ETH_HEADER_LEN;
+    if (payload.size > ntohs(eth->eth_type) + ETH_HEADER_LEN) {
+        payload.size = ntohs(eth->eth_type) + ETH_HEADER_LEN;
     }
 
     if (ofpbuf_try_pull(&payload, ETH_HEADER_LEN + LLC_HEADER_LEN)) {
@@ -2195,7 +2201,7 @@ handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
         }
         if (!execute_controller_action(ofproto, &facet->flow,
                                        facet->actions, facet->actions_len,
-                                       packet)) {
+                                       packet, true)) {
             struct flow_miss_op *op = &ops[(*n_ops)++];
             struct dpif_execute *execute = &op->dpif_op.execute;
 
@@ -2339,23 +2345,46 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto,
     }
 }
 
-static void
-handle_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static int
+handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
 {
-    switch (upcall->type) {
-    case DPIF_UC_ACTION:
-        handle_userspace_upcall(ofproto, upcall);
-        break;
+    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+    int n_misses;
+    int i;
 
-    case DPIF_UC_MISS:
-        /* The caller handles these. */
-        NOT_REACHED();
+    assert (max_batch <= FLOW_MISS_MAX_BATCH);
 
-    case DPIF_N_UC_TYPES:
-    default:
-        VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, upcall->type);
-        break;
+    n_misses = 0;
+    for (i = 0; i < max_batch; i++) {
+        struct dpif_upcall *upcall = &misses[n_misses];
+        int error;
+
+        error = dpif_recv(ofproto->dpif, upcall);
+        if (error) {
+            break;
+        }
+
+        switch (upcall->type) {
+        case DPIF_UC_ACTION:
+            handle_userspace_upcall(ofproto, upcall);
+            break;
+
+        case DPIF_UC_MISS:
+            /* Handle it later. */
+            n_misses++;
+            break;
+
+        case DPIF_N_UC_TYPES:
+        default:
+            VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
+                         upcall->type);
+            break;
+        }
     }
+
+    handle_miss_upcalls(ofproto, misses, n_misses);
+
+    return i;
 }
 \f
 /* Flow expiration. */
@@ -2670,11 +2699,17 @@ facet_free(struct facet *facet)
     free(facet);
 }
 
+/* If the 'actions_len' bytes of actions in 'odp_actions' are just a single
+ * OVS_ACTION_ATTR_USERSPACE action, executes it internally and returns true.
+ * Otherwise, returns false without doing anything.
+ *
+ * If 'clone' is true, the caller always retains ownership of 'packet'.
+ * Otherwise, ownership is transferred to this function if it returns true. */
 static bool
 execute_controller_action(struct ofproto_dpif *ofproto,
                           const struct flow *flow,
                           const struct nlattr *odp_actions, size_t actions_len,
-                          struct ofpbuf *packet)
+                          struct ofpbuf *packet, bool clone)
 {
     if (actions_len
         && odp_actions->nla_type == OVS_ACTION_ATTR_USERSPACE
@@ -2690,7 +2725,7 @@ execute_controller_action(struct ofproto_dpif *ofproto,
 
         nla = nl_attr_find_nested(odp_actions, OVS_USERSPACE_ATTR_USERDATA);
         send_packet_in_action(ofproto, packet, nl_attr_get_u64(nla), flow,
-                              false);
+                              clone);
         return true;
     } else {
         return false;
@@ -2711,7 +2746,7 @@ execute_odp_actions(struct ofproto_dpif *ofproto, const struct flow *flow,
     int error;
 
     if (execute_controller_action(ofproto, flow, odp_actions, actions_len,
-                                  packet)) {
+                                  packet, false)) {
         return true;
     }
 
@@ -3910,11 +3945,9 @@ xlate_output_action__(struct action_xlate_ctx *ctx,
         commit_odp_actions(ctx);
         compose_controller_action(ctx, max_len);
         break;
-    case OFPP_LOCAL:
-        add_output_action(ctx, OFPP_LOCAL);
-        break;
     case OFPP_NONE:
         break;
+    case OFPP_LOCAL:
     default:
         if (port != ctx->flow.in_port) {
             add_output_action(ctx, port);
@@ -4289,13 +4322,24 @@ xlate_actions(struct action_xlate_ctx *ctx,
 
     ctx->odp_actions = ofpbuf_new(512);
     ofpbuf_reserve(ctx->odp_actions, NL_A_U32_SIZE);
+    ctx->tags = 0;
+    ctx->may_set_up_flow = true;
+    ctx->has_learn = false;
+    ctx->has_normal = false;
+    ctx->nf_output_iface = NF_OUT_DROP;
+    ctx->recurse = 0;
+    ctx->priority = 0;
+    ctx->base_priority = 0;
+    ctx->base_flow = ctx->flow;
+    ctx->base_flow.tun_id = 0;
+    ctx->table_id = 0;
 
     if (ctx->flow.tos_frag & FLOW_FRAG_ANY) {
         switch (ctx->ofproto->up.frag_handling) {
         case OFPC_FRAG_NORMAL:
             /* We must pretend that transport ports are unavailable. */
-            ctx->flow.tp_src = htons(0);
-            ctx->flow.tp_dst = htons(0);
+            ctx->flow.tp_src = ctx->base_flow.tp_src = htons(0);
+            ctx->flow.tp_dst = ctx->base_flow.tp_dst = htons(0);
             break;
 
         case OFPC_FRAG_DROP:
@@ -4310,18 +4354,6 @@ xlate_actions(struct action_xlate_ctx *ctx,
         }
     }
 
-    ctx->tags = 0;
-    ctx->may_set_up_flow = true;
-    ctx->has_learn = false;
-    ctx->has_normal = false;
-    ctx->nf_output_iface = NF_OUT_DROP;
-    ctx->recurse = 0;
-    ctx->priority = 0;
-    ctx->base_priority = 0;
-    ctx->base_flow = ctx->flow;
-    ctx->base_flow.tun_id = 0;
-    ctx->table_id = 0;
-
     if (process_special(ctx->ofproto, &ctx->flow, ctx->packet)) {
         ctx->may_set_up_flow = false;
         return ctx->odp_actions;
@@ -4639,12 +4671,27 @@ compose_mirror_dsts(struct action_xlate_ctx *ctx,
     }
 }
 
+static void
+compose_dst_output_action(struct action_xlate_ctx *ctx, const struct dst *dst)
+{
+    ovs_be16 tci;
+
+    tci = htons(dst->vid);
+    if (tci) {
+        tci |= ctx->flow.vlan_tci & htons(VLAN_PCP_MASK);
+        tci |= htons(VLAN_CFI);
+    }
+    commit_vlan_action(ctx, tci);
+
+    compose_output_action(ctx, dst->port->odp_port);
+}
+
 static void
 compose_actions(struct action_xlate_ctx *ctx, uint16_t vlan,
                 const struct ofbundle *in_bundle,
                 const struct ofbundle *out_bundle)
 {
-    uint16_t initial_vid, cur_vid;
+    uint16_t initial_vid;
     const struct dst *dst;
     struct dst_set set;
 
@@ -4660,31 +4707,16 @@ compose_actions(struct action_xlate_ctx *ctx, uint16_t vlan,
     commit_odp_actions(ctx);
     initial_vid = vlan_tci_to_vid(ctx->flow.vlan_tci);
     for (dst = set.dsts; dst < &set.dsts[set.n]; dst++) {
-        if (dst->vid != initial_vid) {
-            continue;
+        if (dst->vid == initial_vid) {
+            compose_dst_output_action(ctx, dst);
         }
-        compose_output_action(ctx, dst->port->odp_port);
     }
 
     /* Then output the rest. */
-    cur_vid = initial_vid;
     for (dst = set.dsts; dst < &set.dsts[set.n]; dst++) {
-        if (dst->vid == initial_vid) {
-            continue;
-        }
-        if (dst->vid != cur_vid) {
-            ovs_be16 tci;
-
-            tci = htons(dst->vid);
-            tci |= ctx->flow.vlan_tci & htons(VLAN_PCP_MASK);
-            if (tci) {
-                tci |= htons(VLAN_CFI);
-            }
-            commit_vlan_action(ctx, tci);
-
-            cur_vid = dst->vid;
+        if (dst->vid != initial_vid) {
+            compose_dst_output_action(ctx, dst);
         }
-        compose_output_action(ctx, dst->port->odp_port);
     }
 
     dst_set_free(&set);
@@ -5373,6 +5405,7 @@ const struct ofproto_class ofproto_dpif_class = {
     destruct,
     dealloc,
     run,
+    run_fast,
     wait,
     flush,
     get_features,