Implement QoS framework.
[sliver-openvswitch.git] / ofproto / ofproto.c
index 636e5ec..5232cfa 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2009, 2010 Nicira Networks.
+ * Copyright (c) 2010 Jean Tourrilhes - HP-Labs.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,6 +19,7 @@
 #include "ofproto.h"
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/socket.h>
 #include <net/if.h>
 #include <netinet/in.h>
 #include <stdbool.h>
@@ -34,6 +36,7 @@
 #include "netflow.h"
 #include "odp-util.h"
 #include "ofp-print.h"
+#include "ofp-util.h"
 #include "ofproto-sflow.h"
 #include "ofpbuf.h"
 #include "openflow/nicira-ext.h"
@@ -56,6 +59,9 @@
 #include "vconn.h"
 #include "xtoxll.h"
 
+#include <linux/types.h>        /* XXX */
+#include <linux/pkt_sched.h>    /* XXX */
+
 #define THIS_MODULE VLM_ofproto
 #include "vlog.h"
 
@@ -204,6 +210,7 @@ struct ofconn {
     struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
     struct discovery *discovery; /* Controller discovery object, if enabled. */
     struct status_category *ss;  /* Switch status category. */
+    enum ofproto_band band;      /* In-band or out-of-band? */
 };
 
 /* We use OFPR_NO_MATCH and OFPR_ACTION as indexes into struct ofconn's
@@ -221,6 +228,9 @@ static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
 static void ofconn_destroy(struct ofconn *);
 static void ofconn_run(struct ofconn *, struct ofproto *);
 static void ofconn_wait(struct ofconn *);
+static bool ofconn_receives_async_msgs(const struct ofconn *);
+static char *ofconn_make_name(const struct ofproto *, const char *target);
+
 static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
                      struct rconn_packet_counter *counter);
 
@@ -247,11 +257,16 @@ struct ofproto {
 
     /* Configuration. */
     struct switch_status *switch_status;
-    struct in_band *in_band;
     struct fail_open *fail_open;
     struct netflow *netflow;
     struct ofproto_sflow *sflow;
 
+    /* In-band control. */
+    struct in_band *in_band;
+    long long int next_in_band_update;
+    struct sockaddr_in *extra_in_band_remotes;
+    size_t n_extra_remotes;
+
     /* Flow table. */
     struct classifier cls;
     bool need_revalidate;
@@ -449,7 +464,9 @@ add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
     if (discovery) {
         ofconn->discovery = discovery;
     } else {
-        rconn_connect(ofconn->rconn, c->target);
+        char *name = ofconn_make_name(ofproto, c->target);
+        rconn_connect(ofconn->rconn, c->target, name);
+        free(name);
     }
     hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
                 hash_string(c->target, 0));
@@ -466,6 +483,9 @@ update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
     int probe_interval;
     int i;
 
+    ofconn->band = (is_in_band_controller(c)
+                    ? OFPROTO_IN_BAND : OFPROTO_OUT_OF_BAND);
+
     rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
 
     probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
@@ -497,7 +517,7 @@ update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
 static const char *
 ofconn_get_target(const struct ofconn *ofconn)
 {
-    return ofconn->discovery ? "discover" : rconn_get_name(ofconn->rconn);
+    return ofconn->discovery ? "discover" : rconn_get_target(ofconn->rconn);
 }
 
 static struct ofconn *
@@ -514,17 +534,74 @@ find_controller_by_target(struct ofproto *ofproto, const char *target)
     return NULL;
 }
 
+static void
+update_in_band_remotes(struct ofproto *ofproto)
+{
+    const struct ofconn *ofconn;
+    struct sockaddr_in *addrs;
+    size_t max_addrs, n_addrs;
+    bool discovery;
+    size_t i;
+
+    /* Allocate enough memory for as many remotes as we could possibly have. */
+    max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
+    addrs = xmalloc(max_addrs * sizeof *addrs);
+    n_addrs = 0;
+
+    /* Add all the remotes. */
+    discovery = false;
+    HMAP_FOR_EACH (ofconn, struct ofconn, hmap_node, &ofproto->controllers) {
+        struct sockaddr_in *sin = &addrs[n_addrs];
+
+        if (ofconn->band == OFPROTO_OUT_OF_BAND) {
+            continue;
+        }
+
+        sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
+        if (sin->sin_addr.s_addr) {
+            sin->sin_port = rconn_get_remote_port(ofconn->rconn);
+            n_addrs++;
+        }
+        if (ofconn->discovery) {
+            discovery = true;
+        }
+    }
+    for (i = 0; i < ofproto->n_extra_remotes; i++) {
+        addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
+    }
+
+    /* Create or update or destroy in-band.
+     *
+     * Ordinarily we only enable in-band if there's at least one remote
+     * address, but discovery needs the in-band rules for DHCP to be installed
+     * even before we know any remote addresses. */
+    if (n_addrs || discovery) {
+        if (!ofproto->in_band) {
+            in_band_create(ofproto, ofproto->dpif, ofproto->switch_status,
+                           &ofproto->in_band);
+        }
+        if (ofproto->in_band) {
+            in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
+        }
+        ofproto->next_in_band_update = time_msec() + 1000;
+    } else {
+        in_band_destroy(ofproto->in_band);
+        ofproto->in_band = NULL;
+    }
+
+    /* Clean up. */
+    free(addrs);
+}
+
 void
 ofproto_set_controllers(struct ofproto *p,
                         const struct ofproto_controller *controllers,
                         size_t n_controllers)
 {
     struct shash new_controllers;
-    struct rconn **in_band_rconns;
     enum ofproto_fail_mode fail_mode;
     struct ofconn *ofconn, *next;
     bool ss_exists;
-    size_t n_in_band;
     size_t i;
 
     shash_init(&new_controllers);
@@ -537,8 +614,6 @@ ofproto_set_controllers(struct ofproto *p,
         }
     }
 
-    in_band_rconns = xmalloc(n_controllers * sizeof *in_band_rconns);
-    n_in_band = 0;
     fail_mode = OFPROTO_FAIL_STANDALONE;
     ss_exists = false;
     HMAP_FOR_EACH_SAFE (ofconn, next, struct ofconn, hmap_node,
@@ -550,14 +625,9 @@ ofproto_set_controllers(struct ofproto *p,
             ofconn_destroy(ofconn);
         } else {
             update_controller(ofconn, c);
-
             if (ofconn->ss) {
                 ss_exists = true;
             }
-            if (is_in_band_controller(c)) {
-                in_band_rconns[n_in_band++] = ofconn->rconn;
-            }
-
             if (c->fail == OFPROTO_FAIL_SECURE) {
                 fail_mode = OFPROTO_FAIL_SECURE;
             }
@@ -565,18 +635,7 @@ ofproto_set_controllers(struct ofproto *p,
     }
     shash_destroy(&new_controllers);
 
-    if (n_in_band) {
-        if (!p->in_band) {
-            in_band_create(p, p->dpif, p->switch_status, &p->in_band);
-        }
-        if (p->in_band) {
-            in_band_set_remotes(p->in_band, in_band_rconns, n_in_band);
-        }
-    } else {
-        in_band_destroy(p->in_band);
-        p->in_band = NULL;
-    }
-    free(in_band_rconns);
+    update_in_band_remotes(p);
 
     if (!hmap_is_empty(&p->controllers)
         && fail_mode == OFPROTO_FAIL_STANDALONE) {
@@ -608,6 +667,47 @@ ofproto_set_controllers(struct ofproto *p,
     }
 }
 
+static bool
+any_extras_changed(const struct ofproto *ofproto,
+                   const struct sockaddr_in *extras, size_t n)
+{
+    size_t i;
+
+    if (n != ofproto->n_extra_remotes) {
+        return true;
+    }
+
+    for (i = 0; i < n; i++) {
+        const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
+        const struct sockaddr_in *new = &extras[i];
+
+        if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
+            old->sin_port != new->sin_port) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+/* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
+ * in-band control should guarantee access, in the same way that in-band
+ * control guarantees access to OpenFlow controllers. */
+void
+ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
+                                  const struct sockaddr_in *extras, size_t n)
+{
+    if (!any_extras_changed(ofproto, extras, n)) {
+        return;
+    }
+
+    free(ofproto->extra_in_band_remotes);
+    ofproto->n_extra_remotes = n;
+    ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
+
+    update_in_band_remotes(ofproto);
+}
+
 void
 ofproto_set_desc(struct ofproto *p,
                  const char *mfr_desc, const char *hw_desc,
@@ -811,6 +911,7 @@ ofproto_destroy(struct ofproto *p)
 
     in_band_destroy(p->in_band);
     p->in_band = NULL;
+    free(p->extra_in_band_remotes);
 
     ofproto_flush_flows(p);
     classifier_destroy(&p->cls);
@@ -876,25 +977,47 @@ process_port_change(struct ofproto *ofproto, int error, char *devname)
     }
 }
 
+/* Returns a "preference level" for snooping 'ofconn'.  A higher return value
+ * means that 'ofconn' is more interesting for monitoring than a lower return
+ * value. */
+static int
+snoop_preference(const struct ofconn *ofconn)
+{
+    switch (ofconn->role) {
+    case NX_ROLE_MASTER:
+        return 3;
+    case NX_ROLE_OTHER:
+        return 2;
+    case NX_ROLE_SLAVE:
+        return 1;
+    default:
+        /* Shouldn't happen. */
+        return 0;
+    }
+}
+
 /* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
  * Connects this vconn to a controller. */
 static void
 add_snooper(struct ofproto *ofproto, struct vconn *vconn)
 {
-    struct ofconn *ofconn;
+    struct ofconn *ofconn, *best;
 
-    /* Arbitrarily pick the first controller in the list for monitoring.  We
-     * could do something smarter or more flexible later, if it ever proves
-     * useful. */
+    /* Pick a controller for monitoring. */
+    best = NULL;
     LIST_FOR_EACH (ofconn, struct ofconn, node, &ofproto->all_conns) {
-        if (ofconn->type == OFCONN_CONTROLLER) {
-            rconn_add_monitor(ofconn->rconn, vconn);
-            return;
+        if (ofconn->type == OFCONN_CONTROLLER
+            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
+            best = ofconn;
         }
+    }
 
+    if (best) {
+        rconn_add_monitor(best->rconn, vconn);
+    } else {
+        VLOG_INFO_RL(&rl, "no controller connection to snoop");
+        vconn_close(vconn);
     }
-    VLOG_INFO_RL(&rl, "no controller connection to monitor");
-    vconn_close(vconn);
 }
 
 int
@@ -939,6 +1062,9 @@ ofproto_run1(struct ofproto *p)
     }
 
     if (p->in_band) {
+        if (time_msec() >= p->next_in_band_update) {
+            update_in_band_remotes(p);
+        }
         in_band_run(p->in_band);
     }
 
@@ -959,8 +1085,15 @@ ofproto_run1(struct ofproto *p)
 
         retval = pvconn_accept(p->listeners[i], OFP_VERSION, &vconn);
         if (!retval) {
-            ofconn_create(p, rconn_new_from_vconn("passive", vconn),
-                          OFCONN_TRANSIENT);
+            struct rconn *rconn;
+            char *name;
+
+            rconn = rconn_create(60, 0);
+            name = ofconn_make_name(p, vconn_get_name(vconn));
+            rconn_connect_unreliably(rconn, vconn, name);
+            free(name);
+
+            ofconn_create(p, rconn, OFCONN_TRANSIENT);
         } else if (retval != EAGAIN) {
             VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
         }
@@ -1043,6 +1176,7 @@ ofproto_wait(struct ofproto *p)
         ofconn_wait(ofconn);
     }
     if (p->in_band) {
+        poll_timer_wait_until(p->next_in_band_update);
         in_band_wait(p->in_band);
     }
     if (p->fail_open) {
@@ -1059,7 +1193,7 @@ ofproto_wait(struct ofproto *p)
         VLOG_DBG_RL(&rl, "need revalidate in ofproto_wait_cb()");
         poll_immediate_wake();
     } else if (p->next_expiration != LLONG_MAX) {
-        poll_timer_wait(p->next_expiration - time_msec());
+        poll_timer_wait_until(p->next_expiration);
     }
     for (i = 0; i < p->n_listeners; i++) {
         pvconn_wait(p->listeners[i]);
@@ -1238,7 +1372,6 @@ make_ofport(const struct odp_port *odp_port)
     memset(&netdev_options, 0, sizeof netdev_options);
     netdev_options.name = odp_port->devname;
     netdev_options.ethertype = NETDEV_ETH_TYPE_NONE;
-    netdev_options.may_open = true;
 
     error = netdev_open(&netdev_options, &netdev);
     if (error) {
@@ -1313,7 +1446,7 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
         struct ofp_port_status *ops;
         struct ofpbuf *b;
 
-        if (ofconn->role == NX_ROLE_SLAVE) {
+        if (!ofconn_receives_async_msgs(ofconn)) {
             continue;
         }
 
@@ -1348,7 +1481,7 @@ ofport_remove(struct ofproto *p, struct ofport *ofport)
     uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no);
 
     netdev_monitor_remove(p->netdev_monitor, ofport->netdev);
-    port_array_set(&p->ports, odp_port, NULL);
+    port_array_delete(&p->ports, odp_port);
     shash_delete(&p->port_by_name,
                  shash_find(&p->port_by_name, (char *) ofport->opp.name));
     if (p->sflow) {
@@ -1510,7 +1643,9 @@ ofconn_run(struct ofconn *ofconn, struct ofproto *p)
         }
         if (discovery_run(ofconn->discovery, &controller_name)) {
             if (controller_name) {
-                rconn_connect(ofconn->rconn, controller_name);
+                char *ofconn_name = ofconn_make_name(p, controller_name);
+                rconn_connect(ofconn->rconn, controller_name, ofconn_name);
+                free(ofconn_name);
             } else {
                 rconn_disconnect(ofconn->rconn);
             }
@@ -1562,6 +1697,34 @@ ofconn_wait(struct ofconn *ofconn)
         COVERAGE_INC(ofproto_ofconn_stuck);
     }
 }
+
+/* Returns true if 'ofconn' should receive asynchronous messages. */
+static bool
+ofconn_receives_async_msgs(const struct ofconn *ofconn)
+{
+    if (ofconn->type == OFCONN_CONTROLLER) {
+        /* Ordinary controllers always get asynchronous messages unless they
+         * have configured themselves as "slaves".  */
+        return ofconn->role != NX_ROLE_SLAVE;
+    } else {
+        /* Transient connections don't get asynchronous messages unless they
+         * have explicitly asked for them by setting a nonzero miss send
+         * length. */
+        return ofconn->miss_send_len > 0;
+    }
+}
+
+/* Returns a human-readable name for an OpenFlow connection between 'ofproto'
+ * and 'target', suitable for use in log messages for identifying the
+ * connection.
+ *
+ * The name is dynamically allocated.  The caller should free it (with free())
+ * when it is no longer needed. */
+static char *
+ofconn_make_name(const struct ofproto *ofproto, const char *target)
+{
+    return xasprintf("%s<->%s", dpif_base_name(ofproto->dpif), target);
+}
 \f
 /* Caller is responsible for initializing the 'cr' member of the returned
  * rule. */
@@ -1638,7 +1801,7 @@ rule_has_out_port(const struct rule *rule, uint16_t out_port)
     }
     for (oa = actions_first(&i, rule->actions, rule->n_actions); oa;
          oa = actions_next(&i)) {
-        if (oa->type == htons(OFPAT_OUTPUT) && oa->output.port == out_port) {
+        if (action_outputs_to_port(oa, out_port)) {
             return true;
         }
     }
@@ -1910,15 +2073,11 @@ is_controller_rule(struct rule *rule)
      * NetFlow expiration messages since it is just part of the control
      * logic for the network and not real traffic. */
 
-    if (rule && rule->super) {
-        struct rule *super = rule->super;
-
-        return super->n_actions == 1 &&
-               super->actions[0].type == htons(OFPAT_OUTPUT) &&
-               super->actions[0].output.port == htons(OFPP_CONTROLLER);
-    }
-
-    return false;
+    return (rule
+            && rule->super
+            && rule->super->n_actions == 1
+            && action_outputs_to_port(&rule->super->actions[0],
+                                      htons(OFPP_CONTROLLER)));
 }
 
 static void
@@ -2035,7 +2194,8 @@ handle_features_request(struct ofproto *p, struct ofconn *ofconn,
                          (1u << OFPAT_SET_NW_DST) |
                          (1u << OFPAT_SET_NW_TOS) |
                          (1u << OFPAT_SET_TP_SRC) |
-                         (1u << OFPAT_SET_TP_DST));
+                         (1u << OFPAT_SET_TP_DST) |
+                         (1u << OFPAT_ENQUEUE));
 
     PORT_ARRAY_FOR_EACH (port, &p->ports, port_no) {
         hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp));
@@ -2116,7 +2276,7 @@ add_controller_action(struct odp_actions *actions,
                       const struct ofp_action_output *oao)
 {
     union odp_action *a = odp_actions_add(actions, ODPAT_CONTROLLER);
-    a->controller.arg = oao->max_len ? ntohs(oao->max_len) : UINT32_MAX;
+    a->controller.arg = ntohs(oao->max_len);
 }
 
 struct action_xlate_ctx {
@@ -2263,6 +2423,48 @@ xlate_output_action(struct action_xlate_ctx *ctx,
     }
 }
 
+/* If the final ODP action in 'ctx' is "pop priority", drop it, as an
+ * optimization, because we're going to add another action that sets the
+ * priority immediately after, or because there are no actions following the
+ * pop.  */
+static void
+remove_pop_action(struct action_xlate_ctx *ctx)
+{
+    size_t n = ctx->out->n_actions;
+    if (n > 0 && ctx->out->actions[n - 1].type == ODPAT_POP_PRIORITY) {
+        ctx->out->n_actions--;
+    }
+}
+
+static void
+xlate_enqueue_action(struct action_xlate_ctx *ctx,
+                     const struct ofp_action_enqueue *oae)
+{
+    uint16_t ofp_port, odp_port;
+
+    /* Figure out ODP output port. */
+    ofp_port = ntohs(oae->port);
+    if (ofp_port != OFPP_IN_PORT) {
+        odp_port = ofp_port_to_odp_port(ofp_port);
+    } else {
+        odp_port = ctx->flow.in_port;
+    }
+
+    /* Add ODP actions. */
+    remove_pop_action(ctx);
+    odp_actions_add(ctx->out, ODPAT_SET_PRIORITY)->priority.priority
+        = TC_H_MAKE(1, ntohl(oae->queue_id)); /* XXX */
+    add_output_action(ctx, odp_port);
+    odp_actions_add(ctx->out, ODPAT_POP_PRIORITY);
+
+    /* Update NetFlow output port. */
+    if (ctx->nf_output_iface == NF_OUT_DROP) {
+        ctx->nf_output_iface = odp_port;
+    } else if (ctx->nf_output_iface != NF_OUT_FLOOD) {
+        ctx->nf_output_iface = NF_OUT_MULTI;
+    }
+}
+
 static void
 xlate_nicira_action(struct action_xlate_ctx *ctx,
                     const struct nx_action_header *nah)
@@ -2286,7 +2488,7 @@ xlate_nicira_action(struct action_xlate_ctx *ctx,
         break;
 
     /* If you add a new action here that modifies flow data, don't forget to
-     * update the flow key in ctx->flow in the same key. */
+     * update the flow key in ctx->flow at the same time. */
 
     default:
         VLOG_DBG_RL(&rl, "unknown Nicira action type %"PRIu16, subtype);
@@ -2331,7 +2533,7 @@ do_xlate_actions(const union ofp_action *in, size_t n_in,
 
         case OFPAT_STRIP_VLAN:
             odp_actions_add(ctx->out, ODPAT_STRIP_VLAN);
-            ctx->flow.dl_vlan = OFP_VLAN_NONE;
+            ctx->flow.dl_vlan = htons(OFP_VLAN_NONE);
             ctx->flow.dl_vlan_pcp = 0;
             break;
 
@@ -2380,6 +2582,10 @@ do_xlate_actions(const union ofp_action *in, size_t n_in,
             xlate_nicira_action(ctx, (const struct nx_action_header *) ia);
             break;
 
+        case OFPAT_ENQUEUE:
+            xlate_enqueue_action(ctx, (const struct ofp_action_enqueue *) ia);
+            break;
+
         default:
             VLOG_DBG_RL(&rl, "unknown action type %"PRIu16, type);
             break;
@@ -2407,6 +2613,7 @@ xlate_actions(const union ofp_action *in, size_t n_in,
     ctx.may_set_up_flow = true;
     ctx.nf_output_iface = NF_OUT_DROP;
     do_xlate_actions(in, n_in, &ctx);
+    remove_pop_action(&ctx);
 
     /* Check with in-band control to see if we're allowed to set up this
      * flow. */
@@ -2672,7 +2879,7 @@ handle_table_stats_request(struct ofproto *p, struct ofconn *ofconn,
 
 static void
 append_port_stat(struct ofport *port, uint16_t port_no, struct ofconn *ofconn, 
-                 struct ofpbuf *msg)
+                 struct ofpbuf **msgp)
 {
     struct netdev_stats stats;
     struct ofp_port_stats *ops;
@@ -2682,7 +2889,7 @@ append_port_stat(struct ofport *port, uint16_t port_no, struct ofconn *ofconn,
      * netdev_get_stats() will log errors. */
     netdev_get_stats(port->netdev, &stats);
 
-    ops = append_stats_reply(sizeof *ops, ofconn, &msg);
+    ops = append_stats_reply(sizeof *ops, ofconn, msgp);
     ops->port_no = htons(odp_port_to_ofp_port(port_no));
     memset(ops->pad, 0, sizeof ops->pad);
     ops->rx_packets = htonll(stats.rx_packets);
@@ -2720,11 +2927,11 @@ handle_port_stats_request(struct ofproto *p, struct ofconn *ofconn,
         port = port_array_get(&p->ports, 
                 ofp_port_to_odp_port(ntohs(psr->port_no)));
         if (port) {
-            append_port_stat(port, ntohs(psr->port_no), ofconn, msg);
+            append_port_stat(port, ntohs(psr->port_no), ofconn, &msg);
         }
     } else {
         PORT_ARRAY_FOR_EACH (port, &p->ports, port_no) {
-            append_port_stat(port, port_no, ofconn, msg);
+            append_port_stat(port, port_no, ofconn, &msg);
         }
     }
 
@@ -2984,6 +3191,95 @@ handle_aggregate_stats_request(struct ofproto *p, struct ofconn *ofconn,
     return 0;
 }
 
+struct queue_stats_cbdata {
+    struct ofconn *ofconn;
+    struct ofpbuf *msg;
+    uint16_t port_no;
+};
+
+static void
+put_queue_stats(struct queue_stats_cbdata *cbdata, uint16_t queue_id,
+                const struct netdev_queue_stats *stats)
+{
+    struct ofp_queue_stats *reply;
+
+    reply = append_stats_reply(sizeof *reply, cbdata->ofconn, &cbdata->msg);
+    reply->port_no = htons(cbdata->port_no);
+    memset(reply->pad, 0, sizeof reply->pad);
+    reply->queue_id = htonl(queue_id);
+    reply->tx_bytes = htonll(stats->tx_bytes);
+    reply->tx_packets = htonll(stats->tx_packets);
+    reply->tx_errors = htonll(stats->tx_errors);
+}
+
+static void
+handle_queue_stats_dump_cb(unsigned int queue_id,
+                           struct netdev_queue_stats *stats,
+                           void *cbdata_)
+{
+    struct queue_stats_cbdata *cbdata = cbdata_;
+
+    put_queue_stats(cbdata, queue_id, stats);
+}
+
+static void
+handle_queue_stats_for_port(struct ofport *port, uint16_t port_no,
+                            uint16_t queue_id,
+                            struct queue_stats_cbdata *cbdata)
+{
+    cbdata->port_no = port_no;
+    if (queue_id == OFPQ_ALL) {
+        netdev_dump_queue_stats(port->netdev,
+                                handle_queue_stats_dump_cb, cbdata);
+    } else {
+        struct netdev_queue_stats stats;
+
+        netdev_get_queue_stats(port->netdev, queue_id, &stats);
+        put_queue_stats(cbdata, queue_id, &stats);
+    }
+}
+
+static int
+handle_queue_stats_request(struct ofproto *ofproto, struct ofconn *ofconn,
+                           const struct ofp_stats_request *osr,
+                           size_t arg_size)
+{
+    struct ofp_queue_stats_request *qsr;
+    struct queue_stats_cbdata cbdata;
+    struct ofport *port;
+    unsigned int port_no;
+    uint32_t queue_id;
+
+    if (arg_size != sizeof *qsr) {
+        return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
+    }
+    qsr = (struct ofp_queue_stats_request *) osr->body;
+
+    COVERAGE_INC(ofproto_queue_req);
+
+    cbdata.ofconn = ofconn;
+    cbdata.msg = start_stats_reply(osr, 128);
+
+    port_no = ntohs(qsr->port_no);
+    queue_id = ntohl(qsr->queue_id);
+    if (port_no == OFPP_ALL) {
+        PORT_ARRAY_FOR_EACH (port, &ofproto->ports, port_no) {
+            handle_queue_stats_for_port(port, port_no, queue_id, &cbdata);
+        }
+    } else if (port_no < ofproto->max_ports) {
+        port = port_array_get(&ofproto->ports, port_no);
+        if (port) {
+            handle_queue_stats_for_port(port, port_no, queue_id, &cbdata);
+        }
+    } else {
+        ofpbuf_delete(cbdata.msg);
+        return ofp_mkerr(OFPET_QUEUE_OP_FAILED, OFPQOFC_BAD_PORT);
+    }
+    queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
+
+    return 0;
+}
+
 static int
 handle_stats_request(struct ofproto *p, struct ofconn *ofconn,
                      struct ofp_header *oh)
@@ -3015,6 +3311,9 @@ handle_stats_request(struct ofproto *p, struct ofconn *ofconn,
     case OFPST_PORT:
         return handle_port_stats_request(p, ofconn, osr, arg_size);
 
+    case OFPST_QUEUE:
+        return handle_queue_stats_request(p, ofconn, osr, arg_size);
+
     case OFPST_VENDOR:
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
 
@@ -3411,7 +3710,7 @@ handle_role_request(struct ofproto *ofproto,
     uint32_t role;
 
     if (ntohs(msg->header.length) != sizeof *nrr) {
-        VLOG_WARN_RL(&rl, "received role request of length %zu (expected %zu)",
+        VLOG_WARN_RL(&rl, "received role request of length %u (expected %zu)",
                      ntohs(msg->header.length), sizeof *nrr);
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
     }
@@ -3461,7 +3760,7 @@ handle_vendor(struct ofproto *p, struct ofconn *ofconn, void *msg)
     struct nicira_header *nh;
 
     if (ntohs(ovh->header.length) < sizeof(struct ofp_vendor_header)) {
-        VLOG_WARN_RL(&rl, "received vendor message of length %zu "
+        VLOG_WARN_RL(&rl, "received vendor message of length %u "
                           "(expected at least %zu)",
                    ntohs(ovh->header.length), sizeof(struct ofp_vendor_header));
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
@@ -3470,7 +3769,7 @@ handle_vendor(struct ofproto *p, struct ofconn *ofconn, void *msg)
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
     }
     if (ntohs(ovh->header.length) < sizeof(struct nicira_header)) {
-        VLOG_WARN_RL(&rl, "received Nicira vendor message of length %zu "
+        VLOG_WARN_RL(&rl, "received Nicira vendor message of length %u "
                           "(expected at least %zu)",
                      ntohs(ovh->header.length), sizeof(struct nicira_header));
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
@@ -3775,7 +4074,7 @@ send_flow_removed(struct ofproto *p, struct rule *rule,
     prev = NULL;
     LIST_FOR_EACH (ofconn, struct ofconn, node, &p->all_conns) {
         if (rule->send_flow_removed && rconn_is_connected(ofconn->rconn)
-            && ofconn->role != NX_ROLE_SLAVE) {
+            && ofconn_receives_async_msgs(ofconn)) {
             if (prev) {
                 queue_tx(ofpbuf_clone(buf), prev, prev->reply_counter);
             } else {
@@ -3905,65 +4204,147 @@ update_used(struct ofproto *p)
     free(flows);
 }
 
+/* pinsched callback for sending 'packet' on 'ofconn'. */
 static void
 do_send_packet_in(struct ofpbuf *packet, void *ofconn_)
 {
     struct ofconn *ofconn = ofconn_;
+
+    rconn_send_with_limit(ofconn->rconn, packet,
+                          ofconn->packet_in_counter, 100);
+}
+
+/* Takes 'packet', which has been converted with do_convert_to_packet_in(), and
+ * finalizes its content for sending on 'ofconn', and passes it to 'ofconn''s
+ * packet scheduler for sending.
+ *
+ * 'max_len' specifies the maximum number of bytes of the packet to send on
+ * 'ofconn' (INT_MAX specifies no limit).
+ *
+ * If 'clone' is true, the caller retains ownership of 'packet'.  Otherwise,
+ * ownership is transferred to this function. */
+static void
+schedule_packet_in(struct ofconn *ofconn, struct ofpbuf *packet, int max_len,
+                   bool clone)
+{
     struct ofproto *ofproto = ofconn->ofproto;
-    struct odp_msg *msg = packet->data;
-    struct ofpbuf payload;
-    struct ofpbuf *opi;
+    struct ofp_packet_in *opi = packet->data;
+    uint16_t in_port = ofp_port_to_odp_port(ntohs(opi->in_port));
+    int send_len, trim_size;
     uint32_t buffer_id;
-    int send_len;
 
-    /* Extract packet payload from 'msg'. */
-    payload.data = msg + 1;
-    payload.size = msg->length - sizeof *msg;
+    /* Get buffer. */
+    if (opi->reason == OFPR_ACTION) {
+        buffer_id = UINT32_MAX;
+    } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
+        buffer_id = pktbuf_get_null();
+    } else if (!ofconn->pktbuf) {
+        buffer_id = UINT32_MAX;
+    } else {
+        struct ofpbuf payload;
+        payload.data = opi->data;
+        payload.size = packet->size - offsetof(struct ofp_packet_in, data);
+        buffer_id = pktbuf_save(ofconn->pktbuf, &payload, in_port);
+    }
 
-    /* Construct packet-in message. */
-    send_len = INT_MAX;
+    /* Figure out how much of the packet to send. */
+    send_len = ntohs(opi->total_len);
+    if (buffer_id != UINT32_MAX) {
+        send_len = MIN(send_len, ofconn->miss_send_len);
+    }
+    send_len = MIN(send_len, max_len);
+
+    /* Adjust packet length and clone if necessary. */
+    trim_size = offsetof(struct ofp_packet_in, data) + send_len;
+    if (clone) {
+        packet = ofpbuf_clone_data(packet->data, trim_size);
+        opi = packet->data;
+    } else {
+        packet->size = trim_size;
+    }
+
+    /* Update packet headers. */
+    opi->buffer_id = htonl(buffer_id);
+    update_openflow_length(packet);
+
+    /* Hand over to packet scheduler.  It might immediately call into
+     * do_send_packet_in() or it might buffer it for a while (until a later
+     * call to pinsched_run()). */
+    pinsched_send(ofconn->schedulers[opi->reason], in_port,
+                  packet, do_send_packet_in, ofconn);
+}
+
+/* Replace struct odp_msg header in 'packet' by equivalent struct
+ * ofp_packet_in.  The odp_msg must have sufficient headroom to do so (e.g. as
+ * returned by dpif_recv()).
+ *
+ * The conversion is not complete: the caller still needs to trim any unneeded
+ * payload off the end of the buffer, set the length in the OpenFlow header,
+ * and set buffer_id.  Those require us to know the controller settings and so
+ * must be done on a per-controller basis.
+ *
+ * Returns the maximum number of bytes of the packet that should be sent to
+ * the controller (INT_MAX if no limit). */
+static int
+do_convert_to_packet_in(struct ofpbuf *packet)
+{
+    struct odp_msg *msg = packet->data;
+    struct ofp_packet_in *opi;
+    uint8_t reason;
+    uint16_t total_len;
+    uint16_t in_port;
+    int max_len;
+
+    /* Extract relevant header fields */
     if (msg->type == _ODPL_ACTION_NR) {
-        buffer_id = UINT32_MAX;
+        reason = OFPR_ACTION;
+        max_len = msg->arg;
     } else {
-        if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
-            buffer_id = pktbuf_get_null();
-        } else {
-            buffer_id = pktbuf_save(ofconn->pktbuf, &payload, msg->port);
-        }
-        if (buffer_id != UINT32_MAX) {
-            send_len = ofconn->miss_send_len;
-        }
+        reason = OFPR_NO_MATCH;
+        max_len = INT_MAX;
     }
-    opi = make_packet_in(buffer_id, odp_port_to_ofp_port(msg->port),
-                         msg->type, &payload, send_len);
+    total_len = msg->length - sizeof *msg;
+    in_port = odp_port_to_ofp_port(msg->port);
 
-    /* Send. */
-    rconn_send_with_limit(ofconn->rconn, opi, ofconn->packet_in_counter, 100);
+    /* Repurpose packet buffer by overwriting header. */
+    ofpbuf_pull(packet, sizeof(struct odp_msg));
+    opi = ofpbuf_push_zeros(packet, offsetof(struct ofp_packet_in, data));
+    opi->header.version = OFP_VERSION;
+    opi->header.type = OFPT_PACKET_IN;
+    opi->total_len = htons(total_len);
+    opi->in_port = htons(in_port);
+    opi->reason = reason;
 
-    ofpbuf_delete(packet);
+    return max_len;
 }
 
+/* Given 'packet' containing an odp_msg of type _ODPL_ACTION_NR or
+ * _ODPL_MISS_NR, sends an OFPT_PACKET_IN message to each OpenFlow controller
+ * as necessary according to their individual configurations.
+ *
+ * 'packet' must have sufficient headroom to convert it into a struct
+ * ofp_packet_in (e.g. as returned by dpif_recv()).
+ *
+ * Takes ownership of 'packet'. */
 static void
 send_packet_in(struct ofproto *ofproto, struct ofpbuf *packet)
 {
-    struct odp_msg *msg = packet->data;
     struct ofconn *ofconn, *prev;
+    int max_len;
 
-    assert(msg->type == _ODPL_MISS_NR || msg->type == _ODPL_ACTION_NR);
+    max_len = do_convert_to_packet_in(packet);
 
     prev = NULL;
     LIST_FOR_EACH (ofconn, struct ofconn, node, &ofproto->all_conns) {
-        if (ofconn->role != NX_ROLE_SLAVE) {
+        if (ofconn_receives_async_msgs(ofconn)) {
             if (prev) {
-                pinsched_send(prev->schedulers[msg->type], msg->port,
-                              ofpbuf_clone(packet), do_send_packet_in, prev);
+                schedule_packet_in(prev, packet, max_len, true);
             }
             prev = ofconn;
         }
     }
     if (prev) {
-        pinsched_send(prev->schedulers[msg->type], msg->port,
-                      packet, do_send_packet_in, prev);
+        schedule_packet_in(prev, packet, max_len, false);
     } else {
         ofpbuf_delete(packet);
     }
@@ -4013,7 +4394,8 @@ default_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet,
     /* Learn source MAC (but don't try to learn from revalidation). */
     if (packet != NULL) {
         tag_type rev_tag = mac_learning_learn(ofproto->ml, flow->dl_src,
-                                              0, flow->in_port);
+                                              0, flow->in_port,
+                                              GRAT_ARP_LOCK_NONE);
         if (rev_tag) {
             /* The log messages here could actually be useful in debugging,
              * so keep the rate limit relatively high. */
@@ -4025,7 +4407,8 @@ default_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet,
     }
 
     /* Determine output port. */
-    out_port = mac_learning_lookup_tag(ofproto->ml, flow->dl_dst, 0, tags);
+    out_port = mac_learning_lookup_tag(ofproto->ml, flow->dl_dst, 0, tags,
+                                       NULL);
     if (out_port < 0) {
         add_output_group_action(actions, DP_GROUP_FLOOD, nf_output_iface);
     } else if (out_port != flow->in_port) {