Add support for 'hard_timeout' in OF1.2 flow_removed message.
[sliver-openvswitch.git] / ofproto / ofproto.c
index f538869..8c10f19 100644 (file)
@@ -32,7 +32,9 @@
 #include "meta-flow.h"
 #include "netdev.h"
 #include "nx-match.h"
+#include "ofp-actions.h"
 #include "ofp-errors.h"
+#include "ofp-msgs.h"
 #include "ofp-print.h"
 #include "ofp-util.h"
 #include "ofpbuf.h"
@@ -86,6 +88,7 @@ struct ofopgroup {
     struct ofproto *ofproto;    /* Owning ofproto. */
     struct list ofproto_node;   /* In ofproto's "pending" list. */
     struct list ops;            /* List of "struct ofoperation"s. */
+    int n_running;              /* Number of ops still pending. */
 
     /* Data needed to send OpenFlow reply on failure or to send a buffered
      * packet on success.
@@ -100,7 +103,6 @@ struct ofopgroup {
     struct ofconn *ofconn;      /* ofconn for reply (but see note above). */
     struct ofp_header *request; /* Original request (truncated at 64 bytes). */
     uint32_t buffer_id;         /* Buffer id from original request. */
-    int error;                  /* 0 if no error yet, otherwise error code. */
 };
 
 static struct ofopgroup *ofopgroup_create_unattached(struct ofproto *);
@@ -108,7 +110,7 @@ static struct ofopgroup *ofopgroup_create(struct ofproto *, struct ofconn *,
                                           const struct ofp_header *,
                                           uint32_t buffer_id);
 static void ofopgroup_submit(struct ofopgroup *);
-static void ofopgroup_destroy(struct ofopgroup *);
+static void ofopgroup_complete(struct ofopgroup *);
 
 /* A single flow table operation. */
 struct ofoperation {
@@ -117,14 +119,25 @@ struct ofoperation {
     struct hmap_node hmap_node; /* In ofproto's "deletions" hmap. */
     struct rule *rule;          /* Rule being operated upon. */
     enum ofoperation_type type; /* Type of operation. */
-    struct rule *victim;        /* OFOPERATION_ADDING: Replaced rule. */
-    union ofp_action *actions;  /* OFOPERATION_MODIFYING: Replaced actions. */
-    int n_actions;              /* OFOPERATION_MODIFYING: # of old actions. */
+
+    /* OFOPERATION_ADD. */
+    struct rule *victim;        /* Rule being replaced, if any.. */
+
+    /* OFOPERATION_MODIFY: The old actions, if the actions are changing. */
+    struct ofpact *ofpacts;
+    size_t ofpacts_len;
+
+    /* OFOPERATION_DELETE. */
+    enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
+    enum ofperr error;          /* 0 if no error. */
 };
 
-static void ofoperation_create(struct ofopgroup *, struct rule *,
-                               enum ofoperation_type);
+static struct ofoperation *ofoperation_create(struct ofopgroup *,
+                                              struct rule *,
+                                              enum ofoperation_type,
+                                              enum ofp_flow_removed_reason);
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
@@ -180,7 +193,6 @@ static void reinit_ports(struct ofproto *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -385,6 +397,10 @@ ofproto_create(const char *datapath_name, const char *datapath_type,
     list_init(&ofproto->pending);
     ofproto->n_pending = 0;
     hmap_init(&ofproto->deletions);
+    ofproto->n_add = ofproto->n_delete = ofproto->n_modify = 0;
+    ofproto->first_op = ofproto->last_op = LLONG_MIN;
+    ofproto->next_op_report = LLONG_MAX;
+    ofproto->op_backoff = LLONG_MIN;
     ofproto->vlan_bitmap = NULL;
     ofproto->vlans_changed = false;
     ofproto->min_mtu = INT_MAX;
@@ -400,7 +416,6 @@ ofproto_create(const char *datapath_name, const char *datapath_type,
     assert(ofproto->n_tables);
 
     ofproto->datapath_id = pick_datapath_id(ofproto);
-    VLOG_INFO("using datapath ID %016"PRIx64, ofproto->datapath_id);
     init_ports(ofproto);
 
     *ofprotop = ofproto;
@@ -422,14 +437,18 @@ ofproto_init_tables(struct ofproto *ofproto, int n_tables)
     }
 }
 
+uint64_t
+ofproto_get_datapath_id(const struct ofproto *ofproto)
+{
+    return ofproto->datapath_id;
+}
+
 void
 ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
 {
     uint64_t old_dpid = p->datapath_id;
     p->datapath_id = datapath_id ? datapath_id : pick_datapath_id(p);
     if (p->datapath_id != old_dpid) {
-        VLOG_INFO("datapath ID changed to %016"PRIx64, p->datapath_id);
-
         /* Force all active connections to reconnect, since there is no way to
          * notify a controller that the datapath ID has changed. */
         ofproto_reconnect_controllers(p);
@@ -524,41 +543,40 @@ ofproto_set_desc(struct ofproto *p,
 
     if (mfr_desc) {
         if (strlen(mfr_desc) >= sizeof ods->mfr_desc) {
-            VLOG_WARN("truncating mfr_desc, must be less than %zu characters",
-                    sizeof ods->mfr_desc);
+            VLOG_WARN("%s: truncating mfr_desc, must be less than %zu bytes",
+                      p->name, sizeof ods->mfr_desc);
         }
         free(p->mfr_desc);
         p->mfr_desc = xstrdup(mfr_desc);
     }
     if (hw_desc) {
         if (strlen(hw_desc) >= sizeof ods->hw_desc) {
-            VLOG_WARN("truncating hw_desc, must be less than %zu characters",
-                    sizeof ods->hw_desc);
+            VLOG_WARN("%s: truncating hw_desc, must be less than %zu bytes",
+                      p->name, sizeof ods->hw_desc);
         }
         free(p->hw_desc);
         p->hw_desc = xstrdup(hw_desc);
     }
     if (sw_desc) {
         if (strlen(sw_desc) >= sizeof ods->sw_desc) {
-            VLOG_WARN("truncating sw_desc, must be less than %zu characters",
-                    sizeof ods->sw_desc);
+            VLOG_WARN("%s: truncating sw_desc, must be less than %zu bytes",
+                      p->name, sizeof ods->sw_desc);
         }
         free(p->sw_desc);
         p->sw_desc = xstrdup(sw_desc);
     }
     if (serial_desc) {
         if (strlen(serial_desc) >= sizeof ods->serial_num) {
-            VLOG_WARN("truncating serial_desc, must be less than %zu "
-                    "characters",
-                    sizeof ods->serial_num);
+            VLOG_WARN("%s: truncating serial_desc, must be less than %zu "
+                      "bytes", p->name, sizeof ods->serial_num);
         }
         free(p->serial_desc);
         p->serial_desc = xstrdup(serial_desc);
     }
     if (dp_desc) {
         if (strlen(dp_desc) >= sizeof ods->dp_desc) {
-            VLOG_WARN("truncating dp_desc, must be less than %zu characters",
-                    sizeof ods->dp_desc);
+            VLOG_WARN("%s: truncating dp_desc, must be less than %zu bytes",
+                      p->name, sizeof ods->dp_desc);
         }
         free(p->dp_desc);
         p->dp_desc = xstrdup(dp_desc);
@@ -938,7 +956,8 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                ofoperation_create(group, rule, OFOPERATION_DELETE,
+                                   OFPRR_DELETE);
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
@@ -1090,6 +1109,43 @@ ofproto_run(struct ofproto *p)
         NOT_REACHED();
     }
 
+    if (time_msec() >= p->next_op_report) {
+        long long int ago = (time_msec() - p->first_op) / 1000;
+        long long int interval = (p->last_op - p->first_op) / 1000;
+        struct ds s;
+
+        ds_init(&s);
+        ds_put_format(&s, "%d flow_mods ",
+                      p->n_add + p->n_delete + p->n_modify);
+        if (interval == ago) {
+            ds_put_format(&s, "in the last %lld s", ago);
+        } else if (interval) {
+            ds_put_format(&s, "in the %lld s starting %lld s ago",
+                          interval, ago);
+        } else {
+            ds_put_format(&s, "%lld s ago", ago);
+        }
+
+        ds_put_cstr(&s, " (");
+        if (p->n_add) {
+            ds_put_format(&s, "%d adds, ", p->n_add);
+        }
+        if (p->n_delete) {
+            ds_put_format(&s, "%d deletes, ", p->n_delete);
+        }
+        if (p->n_modify) {
+            ds_put_format(&s, "%d modifications, ", p->n_modify);
+        }
+        s.length -= 2;
+        ds_put_char(&s, ')');
+
+        VLOG_INFO("%s: %s", p->name, ds_cstr(&s));
+        ds_destroy(&s);
+
+        p->n_add = p->n_delete = p->n_modify = 0;
+        p->next_op_report = LLONG_MAX;
+    }
+
     return error;
 }
 
@@ -1337,27 +1393,28 @@ ofproto_port_del(struct ofproto *ofproto, uint16_t ofp_port)
  * (0...65535, inclusive) then the flow will be visible to OpenFlow
  * controllers; otherwise, it will be hidden.
  *
- * The caller retains ownership of 'cls_rule' and 'actions'.
+ * The caller retains ownership of 'cls_rule' and 'ofpacts'.
  *
  * This is a helper function for in-band control and fail-open. */
 void
 ofproto_add_flow(struct ofproto *ofproto, const struct cls_rule *cls_rule,
-                 const union ofp_action *actions, size_t n_actions)
+                 const struct ofpact *ofpacts, size_t ofpacts_len)
 {
     const struct rule *rule;
 
     rule = rule_from_cls_rule(classifier_find_rule_exactly(
                                     &ofproto->tables[0].cls, cls_rule));
-    if (!rule || !ofputil_actions_equal(rule->actions, rule->n_actions,
-                                        actions, n_actions)) {
+    if (!rule || !ofpacts_equal(rule->ofpacts, rule->ofpacts_len,
+                                ofpacts, ofpacts_len)) {
         struct ofputil_flow_mod fm;
 
         memset(&fm, 0, sizeof fm);
         fm.cr = *cls_rule;
         fm.buffer_id = UINT32_MAX;
-        fm.actions = (union ofp_action *) actions;
-        fm.n_actions = n_actions;
+        fm.ofpacts = xmemdup(ofpacts, ofpacts_len);
+        fm.ofpacts_len = ofpacts_len;
         add_flow(ofproto, NULL, &fm, NULL);
+        free(fm.ofpacts);
     }
 }
 
@@ -1393,7 +1450,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
@@ -1437,10 +1494,12 @@ reinit_ports(struct ofproto *p)
     sset_destroy(&devnames);
 }
 
-/* Opens and returns a netdev for 'ofproto_port', or a null pointer if the
- * netdev cannot be opened.  On success, also fills in 'opp'.  */
+/* Opens and returns a netdev for 'ofproto_port' in 'ofproto', or a null
+ * pointer if the netdev cannot be opened.  On success, also fills in
+ * 'opp'.  */
 static struct netdev *
-ofport_open(const struct ofproto_port *ofproto_port,
+ofport_open(const struct ofproto *ofproto,
+            const struct ofproto_port *ofproto_port,
             struct ofputil_phy_port *pp)
 {
     enum netdev_flags flags;
@@ -1449,8 +1508,9 @@ ofport_open(const struct ofproto_port *ofproto_port,
 
     error = netdev_open(ofproto_port->name, ofproto_port->type, &netdev);
     if (error) {
-        VLOG_WARN_RL(&rl, "ignoring port %s (%"PRIu16") because netdev %s "
+        VLOG_WARN_RL(&rl, "%s: ignoring port %s (%"PRIu16") because netdev %s "
                      "cannot be opened (%s)",
+                     ofproto->name,
                      ofproto_port->name, ofproto_port->ofp_port,
                      ofproto_port->name, strerror(error));
         return NULL;
@@ -1671,7 +1731,7 @@ update_port(struct ofproto *ofproto, const char *name)
 
     /* Fetch 'name''s location and properties from the datapath. */
     netdev = (!ofproto_port_query_by_name(ofproto, name, &ofproto_port)
-              ? ofport_open(&ofproto_port, &pp)
+              ? ofport_open(ofproto, &ofproto_port, &pp)
               : NULL);
     if (netdev) {
         port = ofproto_get_port(ofproto, ofproto_port.ofp_port);
@@ -1722,16 +1782,16 @@ init_ports(struct ofproto *p)
     OFPROTO_PORT_FOR_EACH (&ofproto_port, &dump, p) {
         uint16_t ofp_port = ofproto_port.ofp_port;
         if (ofproto_get_port(p, ofp_port)) {
-            VLOG_WARN_RL(&rl, "ignoring duplicate port %"PRIu16" in datapath",
-                         ofp_port);
+            VLOG_WARN_RL(&rl, "%s: ignoring duplicate port %"PRIu16" "
+                         "in datapath", p->name, ofp_port);
         } else if (shash_find(&p->port_by_name, ofproto_port.name)) {
-            VLOG_WARN_RL(&rl, "ignoring duplicate device %s in datapath",
-                         ofproto_port.name);
+            VLOG_WARN_RL(&rl, "%s: ignoring duplicate device %s in datapath",
+                         p->name, ofproto_port.name);
         } else {
             struct ofputil_phy_port pp;
             struct netdev *netdev;
 
-            netdev = ofport_open(&ofproto_port, &pp);
+            netdev = ofport_open(p, &ofproto_port, &pp);
             if (netdev) {
                 ofport_install(p, netdev, &pp);
             }
@@ -1816,7 +1876,7 @@ static void
 ofproto_rule_destroy__(struct rule *rule)
 {
     if (rule) {
-        free(rule->actions);
+        free(rule->ofpacts);
         rule->ofproto->ofproto_class->rule_dealloc(rule);
     }
 }
@@ -1838,23 +1898,35 @@ ofproto_rule_destroy(struct rule *rule)
 }
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
- * that outputs to 'out_port' (output to OFPP_FLOOD and OFPP_ALL doesn't
- * count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t out_port)
+ * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
 {
-    const union ofp_action *oa;
-    size_t left;
+    return (port == OFPP_NONE
+            || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
+}
 
-    if (out_port == OFPP_NONE) {
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+    if (ofproto_rule_has_out_port(op->rule, out_port)) {
         return true;
     }
-    OFPUTIL_ACTION_FOR_EACH_UNSAFE (oa, left, rule->actions, rule->n_actions) {
-        if (action_outputs_to_port(oa, htons(out_port))) {
-            return true;
-        }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+    case OFOPERATION_DELETE:
+        return false;
+
+    case OFOPERATION_MODIFY:
+        return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
     }
-    return false;
+
+    NOT_REACHED();
 }
 
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
@@ -1881,8 +1953,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
 {
     return rule->cr.priority > UINT16_MAX;
 }
@@ -1947,7 +2019,8 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
     struct ofpbuf *buf;
 
     /* Send reply. */
-    osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_OFPT_GET_CONFIG_REPLY, oh, 0);
+    osc = ofpbuf_put_uninit(buf, sizeof *osc);
     flags = ofproto->frag_handling;
     if (ofconn_get_invalid_ttl_to_controller(ofconn)) {
         flags |= OFPC_INVALID_TTL_TO_CONTROLLER;
@@ -1960,8 +2033,9 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
 }
 
 static enum ofperr
-handle_set_config(struct ofconn *ofconn, const struct ofp_switch_config *osc)
+handle_set_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
+    const struct ofp_switch_config *osc = ofpmsg_body(oh);
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     uint16_t flags = ntohs(osc->flags);
 
@@ -2006,11 +2080,13 @@ reject_slave_controller(struct ofconn *ofconn)
 }
 
 static enum ofperr
-handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
+handle_packet_out(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofputil_packet_out po;
     struct ofpbuf *payload;
+    uint64_t ofpacts_stub[1024 / 8];
+    struct ofpbuf ofpacts;
     struct flow flow;
     enum ofperr error;
 
@@ -2018,20 +2094,21 @@ handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
 
     error = reject_slave_controller(ofconn);
     if (error) {
-        return error;
+        goto exit;
     }
 
     /* Decode message. */
-    error = ofputil_decode_packet_out(&po, opo);
+    ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
+    error = ofputil_decode_packet_out(&po, oh, &ofpacts);
     if (error) {
-        return error;
+        goto exit_free_ofpacts;
     }
 
     /* Get payload. */
     if (po.buffer_id != UINT32_MAX) {
         error = ofconn_pktbuf_retrieve(ofconn, po.buffer_id, &payload, NULL);
         if (error || !payload) {
-            return error;
+            goto exit_free_ofpacts;
         }
     } else {
         payload = xmalloc(sizeof *payload);
@@ -2041,9 +2118,12 @@ handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
     /* Send out packet. */
     flow_extract(payload, 0, 0, po.in_port, &flow);
     error = p->ofproto_class->packet_out(p, payload, &flow,
-                                         po.actions, po.n_actions);
+                                         po.ofpacts, po.ofpacts_len);
     ofpbuf_delete(payload);
 
+exit_free_ofpacts:
+    ofpbuf_uninit(&ofpacts);
+exit:
     return error;
 }
 
@@ -2105,13 +2185,14 @@ handle_port_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 
 static enum ofperr
 handle_desc_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *request)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofp_desc_stats *ods;
     struct ofpbuf *msg;
 
-    ods = ofputil_make_stats_reply(sizeof *ods, request, &msg);
+    msg = ofpraw_alloc_stats_reply(request, 0);
+    ods = ofpbuf_put_zeros(msg, sizeof *ods);
     ovs_strlcpy(ods->mfr_desc, p->mfr_desc, sizeof ods->mfr_desc);
     ovs_strlcpy(ods->hw_desc, p->hw_desc, sizeof ods->hw_desc);
     ovs_strlcpy(ods->sw_desc, p->sw_desc, sizeof ods->sw_desc);
@@ -2124,15 +2205,14 @@ handle_desc_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_table_stats_request(struct ofconn *ofconn,
-                           const struct ofp_stats_msg *request)
+                           const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
-    struct ofp_table_stats *ots;
+    struct ofp10_table_stats *ots;
     struct ofpbuf *msg;
     size_t i;
 
-    ofputil_make_stats_reply(sizeof(struct ofp_stats_msg), request, &msg);
-
+    msg = ofpraw_alloc_stats_reply(request, sizeof *ots * p->n_tables);
     ots = ofpbuf_put_zeros(msg, sizeof *ots * p->n_tables);
     for (i = 0; i < p->n_tables; i++) {
         ots[i].table_id = i;
@@ -2164,14 +2244,14 @@ static void
 append_port_stat(struct ofport *port, struct list *replies)
 {
     struct netdev_stats stats;
-    struct ofp_port_stats *ops;
+    struct ofp10_port_stats *ops;
 
     /* Intentionally ignore return value, since errors will set
      * 'stats' to all-1s, which is correct for OpenFlow, and
      * netdev_get_stats() will log errors. */
     ofproto_port_get_stats(port, &stats);
 
-    ops = ofputil_append_stats_reply(sizeof *ops, replies);
+    ops = ofpmp_append(replies, sizeof *ops);
     ops->port_no = htons(port->pp.port_no);
     memset(ops->pad, 0, sizeof ops->pad);
     put_32aligned_be64(&ops->rx_packets, htonll(stats.rx_packets));
@@ -2190,13 +2270,14 @@ append_port_stat(struct ofport *port, struct list *replies)
 
 static enum ofperr
 handle_port_stats_request(struct ofconn *ofconn,
-                          const struct ofp_port_stats_request *psr)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    const struct ofp10_port_stats_request *psr = ofpmsg_body(request);
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(&psr->osm, &replies);
+    ofpmp_init(&replies, request);
     if (psr->port_no != htons(OFPP_NONE)) {
         port = ofproto_get_port(p, ntohs(psr->port_no));
         if (port) {
@@ -2214,17 +2295,18 @@ handle_port_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_port_desc_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    enum ofp_version version;
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
 
+    version = ofputil_protocol_to_ofp_version(ofconn_get_protocol(ofconn));
     HMAP_FOR_EACH (port, hmap_node, &p->ports) {
-        ofputil_append_port_desc_stats_reply(ofconn_get_protocol(ofconn),
-                                             &port->pp, &replies);
+        ofputil_append_port_desc_stats_reply(version, &port->pp, &replies);
     }
 
     ofconn_send_replies(ofconn, &replies);
@@ -2252,7 +2334,7 @@ check_table_id(const struct ofproto *ofproto, uint8_t table_id)
 }
 
 static struct oftable *
-next_visible_table(struct ofproto *ofproto, uint8_t table_id)
+next_visible_table(const struct ofproto *ofproto, uint8_t table_id)
 {
     struct oftable *table;
 
@@ -2268,7 +2350,7 @@ next_visible_table(struct ofproto *ofproto, uint8_t table_id)
 }
 
 static struct oftable *
-first_matching_table(struct ofproto *ofproto, uint8_t table_id)
+first_matching_table(const struct ofproto *ofproto, uint8_t table_id)
 {
     if (table_id == 0xff) {
         return next_visible_table(ofproto, 0);
@@ -2280,8 +2362,8 @@ first_matching_table(struct ofproto *ofproto, uint8_t table_id)
 }
 
 static struct oftable *
-next_matching_table(struct ofproto *ofproto,
-                    struct oftable *table, uint8_t table_id)
+next_matching_table(const struct ofproto *ofproto,
+                    const struct oftable *table, uint8_t table_id)
 {
     return (table_id == 0xff
             ? next_visible_table(ofproto, (table - ofproto->tables) + 1)
@@ -2343,7 +2425,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2387,7 +2470,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2408,7 +2492,7 @@ age_secs(long long int age_ms)
 
 static enum ofperr
 handle_flow_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *osm)
+                          const struct ofp_header *request)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request fsr;
@@ -2417,7 +2501,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&fsr, &osm->header);
+    error = ofputil_decode_flow_stats_request(&fsr, request);
     if (error) {
         return error;
     }
@@ -2429,7 +2513,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
         return error;
     }
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
     LIST_FOR_EACH (rule, ofproto_node, &rules) {
         long long int now = time_msec();
         struct ofputil_flow_stats fs;
@@ -2445,8 +2529,8 @@ handle_flow_stats_request(struct ofconn *ofconn,
         fs.hard_age = age_secs(now - rule->modified);
         ofproto->ofproto_class->rule_get_stats(rule, &fs.packet_count,
                                                &fs.byte_count);
-        fs.actions = rule->actions;
-        fs.n_actions = rule->n_actions;
+        fs.ofpacts = rule->ofpacts;
+        fs.ofpacts_len = rule->ofpacts_len;
         ofputil_append_flow_stats_reply(&fs, &replies);
     }
     ofconn_send_replies(ofconn, &replies);
@@ -2472,8 +2556,8 @@ flow_stats_ds(struct rule *rule, struct ds *results)
     ds_put_format(results, "n_bytes=%"PRIu64", ", byte_count);
     cls_rule_format(&rule->cr, results);
     ds_put_char(results, ',');
-    if (rule->n_actions > 0) {
-        ofp_print_actions(results, rule->actions, rule->n_actions);
+    if (rule->ofpacts_len > 0) {
+        ofpacts_format(rule->ofpacts, rule->ofpacts_len, results);
     } else {
         ds_put_cstr(results, "drop");
     }
@@ -2554,7 +2638,7 @@ ofproto_port_get_cfm_health(const struct ofproto *ofproto, uint16_t ofp_port)
 
 static enum ofperr
 handle_aggregate_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *oh)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request request;
@@ -2565,7 +2649,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&request, &osm->header);
+    error = ofputil_decode_flow_stats_request(&request, oh);
     if (error) {
         return error;
     }
@@ -2607,7 +2691,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
         stats.byte_count = UINT64_MAX;
     }
 
-    reply = ofputil_encode_aggregate_stats_reply(&stats, osm);
+    reply = ofputil_encode_aggregate_stats_reply(&stats, oh);
     ofconn_send_reply(ofconn, reply);
 
     return 0;
@@ -2622,9 +2706,9 @@ static void
 put_queue_stats(struct queue_stats_cbdata *cbdata, uint32_t queue_id,
                 const struct netdev_queue_stats *stats)
 {
-    struct ofp_queue_stats *reply;
+    struct ofp10_queue_stats *reply;
 
-    reply = ofputil_append_stats_reply(sizeof *reply, &cbdata->replies);
+    reply = ofpmp_append(&cbdata->replies, sizeof *reply);
     reply->port_no = htons(cbdata->ofport->pp.port_no);
     memset(reply->pad, 0, sizeof reply->pad);
     reply->queue_id = htonl(queue_id);
@@ -2643,7 +2727,7 @@ handle_queue_stats_dump_cb(uint32_t queue_id,
     put_queue_stats(cbdata, queue_id, stats);
 }
 
-static void
+static enum ofperr
 handle_queue_stats_for_port(struct ofport *port, uint32_t queue_id,
                             struct queue_stats_cbdata *cbdata)
 {
@@ -2656,42 +2740,51 @@ handle_queue_stats_for_port(struct ofport *port, uint32_t queue_id,
 
         if (!netdev_get_queue_stats(port->netdev, queue_id, &stats)) {
             put_queue_stats(cbdata, queue_id, &stats);
+        } else {
+            return OFPERR_OFPQOFC_BAD_QUEUE;
         }
     }
+    return 0;
 }
 
 static enum ofperr
 handle_queue_stats_request(struct ofconn *ofconn,
-                           const struct ofp_queue_stats_request *qsr)
+                           const struct ofp_header *rq)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    const struct ofp10_queue_stats_request *qsr = ofpmsg_body(rq);
     struct queue_stats_cbdata cbdata;
-    struct ofport *port;
     unsigned int port_no;
+    struct ofport *port;
     uint32_t queue_id;
+    enum ofperr error;
 
     COVERAGE_INC(ofproto_queue_req);
 
-    ofputil_start_stats_reply(&qsr->osm, &cbdata.replies);
+    ofpmp_init(&cbdata.replies, rq);
 
     port_no = ntohs(qsr->port_no);
     queue_id = ntohl(qsr->queue_id);
     if (port_no == OFPP_ALL) {
+        error = OFPERR_OFPQOFC_BAD_QUEUE;
         HMAP_FOR_EACH (port, hmap_node, &ofproto->ports) {
-            handle_queue_stats_for_port(port, queue_id, &cbdata);
+            if (!handle_queue_stats_for_port(port, queue_id, &cbdata)) {
+                error = 0;
+            }
         }
-    } else if (port_no < OFPP_MAX) {
+    } else {
         port = ofproto_get_port(ofproto, port_no);
-        if (port) {
-            handle_queue_stats_for_port(port, queue_id, &cbdata);
-        }
+        error = (port
+                 ? handle_queue_stats_for_port(port, queue_id, &cbdata)
+                 : OFPERR_OFPQOFC_BAD_PORT);
+    }
+    if (!error) {
+        ofconn_send_replies(ofconn, &cbdata.replies);
     } else {
         ofpbuf_list_delete(&cbdata.replies);
-        return OFPERR_OFPQOFC_BAD_PORT;
     }
-    ofconn_send_replies(ofconn, &cbdata.replies);
 
-    return 0;
+    return error;
 }
 
 static bool
@@ -2722,6 +2815,9 @@ is_flow_deletion_pending(const struct ofproto *ofproto,
  * error code on failure, or OFPROTO_POSTPONE if the operation cannot be
  * initiated now but may be retried later.
  *
+ * Upon successful return, takes ownership of 'fm->ofpacts'.  On failure,
+ * ownership remains with the caller.
+ *
  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
  * if any. */
 static enum ofperr
@@ -2790,10 +2886,13 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->hard_timeout = fm->hard_timeout;
     rule->table_id = table - ofproto->tables;
     rule->send_flow_removed = (fm->flags & OFPFF_SEND_FLOW_REM) != 0;
-    rule->actions = ofputil_actions_clone(fm->actions, fm->n_actions);
-    rule->n_actions = fm->n_actions;
+    rule->ofpacts = xmemdup(fm->ofpacts, fm->ofpacts_len);
+    rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
+    rule->monitor_flags = 0;
+    rule->add_seqno = 0;
+    rule->modify_seqno = 0;
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
@@ -2802,6 +2901,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     } else if (victim && victim->pending) {
         error = OFPROTO_POSTPONE;
     } else {
+        struct ofoperation *op;
         struct rule *evict;
 
         if (classifier_count(&table->cls) > table->max_flows) {
@@ -2824,11 +2924,12 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-        ofoperation_create(group, rule, OFOPERATION_ADD);
-        rule->pending->victim = victim;
+        op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
+        op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
         if (error) {
+            op->group->n_running--;
             ofoperation_destroy(rule->pending);
         } else if (evict) {
             delete_flow__(evict, group);
@@ -2866,6 +2967,10 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
     group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
     error = OFPERR_OFPBRC_EPERM;
     LIST_FOR_EACH (rule, ofproto_node, rules) {
+        struct ofoperation *op;
+        bool actions_changed;
+        ovs_be64 new_cookie;
+
         if (rule_is_modifiable(rule)) {
             /* At least one rule is modifiable, don't report EPERM error. */
             error = 0;
@@ -2873,19 +2978,26 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
             continue;
         }
 
-        if (!ofputil_actions_equal(fm->actions, fm->n_actions,
-                                   rule->actions, rule->n_actions)) {
-            ofoperation_create(group, rule, OFOPERATION_MODIFY);
-            rule->pending->actions = rule->actions;
-            rule->pending->n_actions = rule->n_actions;
-            rule->actions = ofputil_actions_clone(fm->actions, fm->n_actions);
-            rule->n_actions = fm->n_actions;
-            ofproto->ofproto_class->rule_modify_actions(rule);
-        } else {
-            rule->modified = time_msec();
+        actions_changed = !ofpacts_equal(fm->ofpacts, fm->ofpacts_len,
+                                         rule->ofpacts, rule->ofpacts_len);
+        new_cookie = (fm->new_cookie != htonll(UINT64_MAX)
+                      ? fm->new_cookie
+                      : rule->flow_cookie);
+        if (!actions_changed && new_cookie == rule->flow_cookie) {
+            /* No change at all. */
+            continue;
         }
-        if (fm->new_cookie != htonll(UINT64_MAX)) {
-            rule->flow_cookie = fm->new_cookie;
+
+        op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
+        rule->flow_cookie = new_cookie;
+        if (actions_changed) {
+            op->ofpacts = rule->ofpacts;
+            op->ofpacts_len = rule->ofpacts_len;
+            rule->ofpacts = xmemdup(fm->ofpacts, fm->ofpacts_len);
+            rule->ofpacts_len = fm->ofpacts_len;
+            rule->ofproto->ofproto_class->rule_modify_actions(rule);
+        } else {
+            ofoperation_complete(op, 0);
         }
     }
     ofopgroup_submit(group);
@@ -2955,7 +3067,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
@@ -3020,7 +3132,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
 
-    if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+    if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
         return;
     }
 
@@ -3030,6 +3142,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
     calc_flow_duration__(rule->created, time_msec(),
                          &fr.duration_sec, &fr.duration_nsec);
     fr.idle_timeout = rule->idle_timeout;
+    fr.hard_timeout = rule->hard_timeout;
     rule->ofproto->ofproto_class->rule_get_stats(rule, &fr.packet_count,
                                                  &fr.byte_count);
 
@@ -3054,6 +3167,9 @@ ofproto_rule_update_used(struct rule *rule, long long int used)
  * OFPRR_HARD_TIMEOUT or OFPRR_IDLE_TIMEOUT), and then removes 'rule' from its
  * ofproto.
  *
+ * 'rule' must not have a pending operation (that is, 'rule->pending' must be
+ * NULL).
+ *
  * ofproto implementation ->run() functions should use this function to expire
  * OpenFlow flows. */
 void
@@ -3067,7 +3183,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
@@ -3076,28 +3192,69 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
 static enum ofperr
 handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 {
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_mod fm;
+    uint64_t ofpacts_stub[1024 / 8];
+    struct ofpbuf ofpacts;
     enum ofperr error;
+    long long int now;
 
     error = reject_slave_controller(ofconn);
     if (error) {
-        return error;
+        goto exit;
     }
 
-    error = ofputil_decode_flow_mod(&fm, oh, ofconn_get_protocol(ofconn));
+    ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
+    error = ofputil_decode_flow_mod(&fm, oh, ofconn_get_protocol(ofconn),
+                                    &ofpacts);
     if (error) {
-        return error;
+        goto exit_free_ofpacts;
     }
 
+    if (fm.flags & OFPFF10_EMERG) {
     /* We do not support the OpenFlow 1.0 emergency flow cache, which is not
      * required in OpenFlow 1.0.1 and removed from OpenFlow 1.1. */
-    if (fm.flags & OFPFF_EMERG) {
-        /* There isn't a good fit for an error code, so just state that the
-         * flow table is full. */
-        return OFPERR_OFPFMFC_ALL_TABLES_FULL;
+        /* We do not support the emergency flow cache.  It will hopefully get
+         * dropped from OpenFlow in the near future.  There is no good error
+         * code, so just state that the flow table is full. */
+        error = OFPERR_OFPFMFC_ALL_TABLES_FULL;
+    } else {
+        error = handle_flow_mod__(ofconn_get_ofproto(ofconn), ofconn, &fm, oh);
+    }
+    if (error) {
+        goto exit_free_ofpacts;
     }
 
-    return handle_flow_mod__(ofconn_get_ofproto(ofconn), ofconn, &fm, oh);
+    /* Record the operation for logging a summary report. */
+    switch (fm.command) {
+    case OFPFC_ADD:
+        ofproto->n_add++;
+        break;
+
+    case OFPFC_MODIFY:
+    case OFPFC_MODIFY_STRICT:
+        ofproto->n_modify++;
+        break;
+
+    case OFPFC_DELETE:
+    case OFPFC_DELETE_STRICT:
+        ofproto->n_delete++;
+        break;
+    }
+
+    now = time_msec();
+    if (ofproto->next_op_report == LLONG_MAX) {
+        ofproto->first_op = now;
+        ofproto->next_op_report = MAX(now + 10 * 1000,
+                                      ofproto->op_backoff);
+        ofproto->op_backoff = ofproto->next_op_report + 60 * 1000;
+    }
+    ofproto->last_op = now;
+
+exit_free_ofpacts:
+    ofpbuf_uninit(&ofpacts);
+exit:
+    return error;
 }
 
 static enum ofperr
@@ -3128,8 +3285,9 @@ handle_flow_mod__(struct ofproto *ofproto, struct ofconn *ofconn,
 
     default:
         if (fm->command > 0xff) {
-            VLOG_WARN_RL(&rl, "flow_mod has explicit table_id but "
-                         "flow_mod_table_id extension is not enabled");
+            VLOG_WARN_RL(&rl, "%s: flow_mod has explicit table_id but "
+                         "flow_mod_table_id extension is not enabled",
+                         ofproto->name);
         }
         return OFPERR_OFPFMFC_BAD_COMMAND;
     }
@@ -3138,7 +3296,7 @@ handle_flow_mod__(struct ofproto *ofproto, struct ofconn *ofconn,
 static enum ofperr
 handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct nx_role_request *nrr = (struct nx_role_request *) oh;
+    const struct nx_role_request *nrr = ofpmsg_body(oh);
     struct nx_role_request *reply;
     struct ofpbuf *buf;
     uint32_t role;
@@ -3156,7 +3314,8 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 
     ofconn_set_role(ofconn, role);
 
-    reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_NXT_ROLE_REPLY, oh, 0);
+    reply = ofpbuf_put_zeros(buf, sizeof *reply);
     reply->role = htonl(role);
     ofconn_send_reply(ofconn, buf);
 
@@ -3167,8 +3326,7 @@ static enum ofperr
 handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_flow_mod_table_id *msg
-        = (const struct nx_flow_mod_table_id *) oh;
+    const struct nx_flow_mod_table_id *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
 
     cur = ofconn_get_protocol(ofconn);
@@ -3181,8 +3339,7 @@ handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_flow_format(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_set_flow_format *msg
-        = (const struct nx_set_flow_format *) oh;
+    const struct nx_set_flow_format *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
     enum ofputil_protocol next_base;
 
@@ -3206,10 +3363,9 @@ static enum ofperr
 handle_nxt_set_packet_in_format(struct ofconn *ofconn,
                                 const struct ofp_header *oh)
 {
-    const struct nx_set_packet_in_format *msg;
+    const struct nx_set_packet_in_format *msg = ofpmsg_body(oh);
     uint32_t format;
 
-    msg = (const struct nx_set_packet_in_format *) oh;
     format = ntohl(msg->format);
     if (format != NXPIF_OPENFLOW10 && format != NXPIF_NXM) {
         return OFPERR_OFPBRC_EPERM;
@@ -3228,7 +3384,7 @@ handle_nxt_set_packet_in_format(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_async_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_async_config *msg = (const struct nx_async_config *) oh;
+    const struct nx_async_config *msg = ofpmsg_body(oh);
     uint32_t master[OAM_N_TYPES];
     uint32_t slave[OAM_N_TYPES];
 
@@ -3241,6 +3397,10 @@ handle_nxt_set_async_config(struct ofconn *ofconn, const struct ofp_header *oh)
     slave[OAM_FLOW_REMOVED] = ntohl(msg->flow_removed_mask[1]);
 
     ofconn_set_async_config(ofconn, master, slave);
+    if (ofconn_get_type(ofconn) == OFCONN_SERVICE &&
+        !ofconn_get_miss_send_len(ofconn)) {
+        ofconn_set_miss_send_len(ofconn, OFP_DEFAULT_MISS_SEND_LEN);
+    }
 
     return 0;
 }
@@ -3249,9 +3409,8 @@ static enum ofperr
 handle_nxt_set_controller_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_controller_id *nci;
+    const struct nx_controller_id *nci = ofpmsg_body(oh);
 
-    nci = (const struct nx_controller_id *) oh;
     if (!is_all_zeros(nci->zero, sizeof nci->zero)) {
         return OFPERR_NXBRC_MUST_BE_ZERO;
     }
@@ -3269,131 +3428,375 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return OFPROTO_POSTPONE;
     }
 
-    make_openflow_xid(sizeof *oh, OFPT10_BARRIER_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply((oh->version == OFP10_VERSION
+                              ? OFPRAW_OFPT10_BARRIER_REPLY
+                              : OFPRAW_OFPT11_BARRIER_REPLY), oh, 0);
     ofconn_send_reply(ofconn, buf);
     return 0;
 }
 
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+                                    enum nx_flow_monitor_flags flags,
+                                    struct list *msgs)
+{
+    struct ofoperation *op = rule->pending;
+    struct ofputil_flow_update fu;
+
+    if (op && op->type == OFOPERATION_ADD && !op->victim) {
+        /* We'll report the final flow when the operation completes.  Reporting
+         * it now would cause a duplicate report later. */
+        return;
+    }
+
+    fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+                ? NXFME_ADDED : NXFME_MODIFIED);
+    fu.reason = 0;
+    fu.idle_timeout = rule->idle_timeout;
+    fu.hard_timeout = rule->hard_timeout;
+    fu.table_id = rule->table_id;
+    fu.cookie = rule->flow_cookie;
+    fu.match = CONST_CAST(struct cls_rule *, &rule->cr);
+    if (!(flags & NXFMF_ACTIONS)) {
+        fu.ofpacts = NULL;
+        fu.ofpacts_len = 0;
+    } else if (!op) {
+        fu.ofpacts = rule->ofpacts;
+        fu.ofpacts_len = rule->ofpacts_len;
+    } else {
+        /* An operation is in progress.  Use the previous version of the flow's
+         * actions, so that when the operation commits we report the change. */
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            /* We already verified that there was a victim. */
+            fu.ofpacts = op->victim->ofpacts;
+            fu.ofpacts_len = op->victim->ofpacts_len;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (op->ofpacts) {
+                fu.ofpacts = op->ofpacts;
+                fu.ofpacts_len = op->ofpacts_len;
+            } else {
+                fu.ofpacts = rule->ofpacts;
+                fu.ofpacts_len = rule->ofpacts_len;
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            fu.ofpacts = rule->ofpacts;
+            fu.ofpacts_len = rule->ofpacts_len;
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+    }
+
+    if (list_is_empty(msgs)) {
+        ofputil_start_flow_update(msgs);
+    }
+    ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+    struct rule *rule;
+
+    LIST_FOR_EACH (rule, ofproto_node, rules) {
+        enum nx_flow_monitor_flags flags = rule->monitor_flags;
+        rule->monitor_flags = 0;
+
+        ofproto_compose_flow_refresh_update(rule, flags, msgs);
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+                                       struct rule *rule, uint64_t seqno,
+                                       struct list *rules)
+{
+    enum nx_flow_monitor_flags update;
+
+    if (ofproto_rule_is_hidden(rule)) {
+        return;
+    }
+
+    if (!(rule->pending
+          ? ofoperation_has_out_port(rule->pending, m->out_port)
+          : ofproto_rule_has_out_port(rule, m->out_port))) {
+        return;
+    }
+
+    if (seqno) {
+        if (rule->add_seqno > seqno) {
+            update = NXFMF_ADD | NXFMF_MODIFY;
+        } else if (rule->modify_seqno > seqno) {
+            update = NXFMF_MODIFY;
+        } else {
+            return;
+        }
+
+        if (!(m->flags & update)) {
+            return;
+        }
+    } else {
+        update = NXFMF_INITIAL;
+    }
+
+    if (!rule->monitor_flags) {
+        list_push_back(rules, &rule->ofproto_node);
+    }
+    rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+                                        uint64_t seqno,
+                                        struct list *rules)
+{
+    const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+    const struct ofoperation *op;
+    const struct oftable *table;
+
+    FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &table->cls, &m->match);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            assert(!rule->pending); /* XXX */
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+
+    HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+        struct rule *rule = op->rule;
+
+        if (((m->table_id == 0xff
+              ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+              : m->table_id == rule->table_id))
+            && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+                                        struct list *rules)
+{
+    if (m->flags & NXFMF_INITIAL) {
+        ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+    }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+                               uint64_t seqno, struct list *rules)
+{
+    ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofmonitor **monitors;
+    size_t n_monitors, allocated_monitors;
+    struct list replies;
+    enum ofperr error;
+    struct list rules;
+    struct ofpbuf b;
+    size_t i;
+
+    error = 0;
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    monitors = NULL;
+    n_monitors = allocated_monitors = 0;
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        struct ofmonitor *m;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval == EOF) {
+            break;
+        } else if (retval) {
+            error = retval;
+            goto error;
+        }
+
+        if (request.table_id != 0xff
+            && request.table_id >= ofproto->n_tables) {
+            error = OFPERR_OFPBRC_BAD_TABLE_ID;
+            goto error;
+        }
+
+        error = ofmonitor_create(&request, ofconn, &m);
+        if (error) {
+            goto error;
+        }
+
+        if (n_monitors >= allocated_monitors) {
+            monitors = x2nrealloc(monitors, &allocated_monitors,
+                                  sizeof *monitors);
+        }
+        monitors[n_monitors++] = m;
+    }
+
+    list_init(&rules);
+    for (i = 0; i < n_monitors; i++) {
+        ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+    }
+
+    ofpmp_init(&replies, oh);
+    ofmonitor_compose_refresh_updates(&rules, &replies);
+    ofconn_send_replies(ofconn, &replies);
+
+    free(monitors);
+
+    return 0;
+
+error:
+    for (i = 0; i < n_monitors; i++) {
+        ofmonitor_destroy(monitors[i]);
+    }
+    free(monitors);
+    return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofmonitor *m;
+    uint32_t id;
+
+    id = ofputil_decode_flow_monitor_cancel(oh);
+    m = ofmonitor_lookup(ofconn, id);
+    if (!m) {
+        return OFPERR_NXBRC_FM_BAD_ID;
+    }
+
+    ofmonitor_destroy(m);
+    return 0;
+}
+
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
     const struct ofp_header *oh = msg->data;
-    const struct ofputil_msg_type *type;
+    enum ofptype type;
     enum ofperr error;
 
-    error = ofputil_decode_msg_type(oh, &type);
+    error = ofptype_decode(&type, oh);
     if (error) {
         return error;
     }
 
-    switch (ofputil_msg_type_code(type)) {
+    switch (type) {
         /* OpenFlow requests. */
-    case OFPUTIL_OFPT_ECHO_REQUEST:
+    case OFPTYPE_ECHO_REQUEST:
         return handle_echo_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_FEATURES_REQUEST:
+    case OFPTYPE_FEATURES_REQUEST:
         return handle_features_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_GET_CONFIG_REQUEST:
+    case OFPTYPE_GET_CONFIG_REQUEST:
         return handle_get_config_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_SET_CONFIG:
-        return handle_set_config(ofconn, msg->data);
+    case OFPTYPE_SET_CONFIG:
+        return handle_set_config(ofconn, oh);
 
-    case OFPUTIL_OFPT_PACKET_OUT:
-        return handle_packet_out(ofconn, msg->data);
+    case OFPTYPE_PACKET_OUT:
+        return handle_packet_out(ofconn, oh);
 
-    case OFPUTIL_OFPT_PORT_MOD:
+    case OFPTYPE_PORT_MOD:
         return handle_port_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_FLOW_MOD:
+    case OFPTYPE_FLOW_MOD:
         return handle_flow_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_BARRIER_REQUEST:
+    case OFPTYPE_BARRIER_REQUEST:
         return handle_barrier_request(ofconn, oh);
 
         /* OpenFlow replies. */
-    case OFPUTIL_OFPT_ECHO_REPLY:
+    case OFPTYPE_ECHO_REPLY:
         return 0;
 
         /* Nicira extension requests. */
-    case OFPUTIL_NXT_ROLE_REQUEST:
+    case OFPTYPE_ROLE_REQUEST:
         return handle_role_request(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD_TABLE_ID:
+    case OFPTYPE_FLOW_MOD_TABLE_ID:
         return handle_nxt_flow_mod_table_id(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_FLOW_FORMAT:
+    case OFPTYPE_SET_FLOW_FORMAT:
         return handle_nxt_set_flow_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_PACKET_IN_FORMAT:
+    case OFPTYPE_SET_PACKET_IN_FORMAT:
         return handle_nxt_set_packet_in_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_CONTROLLER_ID:
+    case OFPTYPE_SET_CONTROLLER_ID:
         return handle_nxt_set_controller_id(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD:
-        return handle_flow_mod(ofconn, oh);
-
-    case OFPUTIL_NXT_FLOW_AGE:
+    case OFPTYPE_FLOW_AGE:
         /* Nothing to do. */
         return 0;
 
-    case OFPUTIL_NXT_SET_ASYNC_CONFIG:
+    case OFPTYPE_FLOW_MONITOR_CANCEL:
+        return handle_flow_monitor_cancel(ofconn, oh);
+
+    case OFPTYPE_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
         /* Statistics requests. */
-    case OFPUTIL_OFPST_DESC_REQUEST:
-        return handle_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_FLOW_REQUEST:
-    case OFPUTIL_NXST_FLOW_REQUEST:
-        return handle_flow_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_AGGREGATE_REQUEST:
-    case OFPUTIL_NXST_AGGREGATE_REQUEST:
-        return handle_aggregate_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_TABLE_REQUEST:
-        return handle_table_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_REQUEST:
-        return handle_port_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_QUEUE_REQUEST:
-        return handle_queue_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_DESC_REQUEST:
-        return handle_port_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_MSG_INVALID:
-    case OFPUTIL_OFPT_HELLO:
-    case OFPUTIL_OFPT_ERROR:
-    case OFPUTIL_OFPT_FEATURES_REPLY:
-    case OFPUTIL_OFPT_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPT_PACKET_IN:
-    case OFPUTIL_OFPT_FLOW_REMOVED:
-    case OFPUTIL_OFPT_PORT_STATUS:
-    case OFPUTIL_OFPT_BARRIER_REPLY:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REQUEST:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPST_DESC_REPLY:
-    case OFPUTIL_OFPST_FLOW_REPLY:
-    case OFPUTIL_OFPST_QUEUE_REPLY:
-    case OFPUTIL_OFPST_PORT_REPLY:
-    case OFPUTIL_OFPST_TABLE_REPLY:
-    case OFPUTIL_OFPST_AGGREGATE_REPLY:
-    case OFPUTIL_OFPST_PORT_DESC_REPLY:
-    case OFPUTIL_NXT_ROLE_REPLY:
-    case OFPUTIL_NXT_FLOW_REMOVED:
-    case OFPUTIL_NXT_PACKET_IN:
-    case OFPUTIL_NXST_FLOW_REPLY:
-    case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPTYPE_DESC_STATS_REQUEST:
+        return handle_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_STATS_REQUEST:
+        return handle_flow_stats_request(ofconn, oh);
+
+    case OFPTYPE_AGGREGATE_STATS_REQUEST:
+        return handle_aggregate_stats_request(ofconn, oh);
+
+    case OFPTYPE_TABLE_STATS_REQUEST:
+        return handle_table_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_STATS_REQUEST:
+        return handle_port_stats_request(ofconn, oh);
+
+    case OFPTYPE_QUEUE_STATS_REQUEST:
+        return handle_queue_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_DESC_STATS_REQUEST:
+        return handle_port_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_MONITOR_STATS_REQUEST:
+        return handle_flow_monitor_request(ofconn, oh);
+
+    case OFPTYPE_HELLO:
+    case OFPTYPE_ERROR:
+    case OFPTYPE_FEATURES_REPLY:
+    case OFPTYPE_GET_CONFIG_REPLY:
+    case OFPTYPE_PACKET_IN:
+    case OFPTYPE_FLOW_REMOVED:
+    case OFPTYPE_PORT_STATUS:
+    case OFPTYPE_BARRIER_REPLY:
+    case OFPTYPE_DESC_STATS_REPLY:
+    case OFPTYPE_FLOW_STATS_REPLY:
+    case OFPTYPE_QUEUE_STATS_REPLY:
+    case OFPTYPE_PORT_STATS_REPLY:
+    case OFPTYPE_TABLE_STATS_REPLY:
+    case OFPTYPE_AGGREGATE_STATS_REPLY:
+    case OFPTYPE_PORT_DESC_STATS_REPLY:
+    case OFPTYPE_ROLE_REPLY:
+    case OFPTYPE_FLOW_MONITOR_PAUSED:
+    case OFPTYPE_FLOW_MONITOR_RESUMED:
+    case OFPTYPE_FLOW_MONITOR_STATS_REPLY:
     default:
-        return (oh->type == OFPT10_STATS_REQUEST ||
-                oh->type == OFPT10_STATS_REPLY
-                ? OFPERR_OFPBRC_BAD_STAT
-                : OFPERR_OFPBRC_BAD_TYPE);
+        return OFPERR_OFPBRC_BAD_TYPE;
     }
 }
 
@@ -3465,8 +3868,8 @@ ofopgroup_create(struct ofproto *ofproto, struct ofconn *ofconn,
 static void
 ofopgroup_submit(struct ofopgroup *group)
 {
-    if (list_is_empty(&group->ops)) {
-        ofopgroup_destroy(group);
+    if (!group->n_running) {
+        ofopgroup_complete(group);
     } else {
         list_push_back(&group->ofproto->pending, &group->ofproto_node);
         group->ofproto->n_pending++;
@@ -3474,31 +3877,153 @@ ofopgroup_submit(struct ofopgroup *group)
 }
 
 static void
-ofopgroup_destroy(struct ofopgroup *group)
+ofopgroup_complete(struct ofopgroup *group)
 {
-    assert(list_is_empty(&group->ops));
+    struct ofproto *ofproto = group->ofproto;
+
+    struct ofconn *abbrev_ofconn;
+    ovs_be32 abbrev_xid;
+
+    struct ofoperation *op, *next_op;
+    int error;
+
+    assert(!group->n_running);
+
+    error = 0;
+    LIST_FOR_EACH (op, group_node, &group->ops) {
+        if (op->error) {
+            error = op->error;
+            break;
+        }
+    }
+
+    if (!error && group->ofconn && group->buffer_id != UINT32_MAX) {
+        LIST_FOR_EACH (op, group_node, &group->ops) {
+            if (op->type != OFOPERATION_DELETE) {
+                struct ofpbuf *packet;
+                uint16_t in_port;
+
+                error = ofconn_pktbuf_retrieve(group->ofconn, group->buffer_id,
+                                               &packet, &in_port);
+                if (packet) {
+                    assert(!error);
+                    error = rule_execute(op->rule, in_port, packet);
+                }
+                break;
+            }
+        }
+    }
+
+    if (!error && !list_is_empty(&group->ofconn_node)) {
+        abbrev_ofconn = group->ofconn;
+        abbrev_xid = group->request->xid;
+    } else {
+        abbrev_ofconn = NULL;
+        abbrev_xid = htonl(0);
+    }
+    LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
+        struct rule *rule = op->rule;
+
+        if (!op->error && !ofproto_rule_is_hidden(rule)) {
+            /* Check that we can just cast from ofoperation_type to
+             * nx_flow_update_event. */
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_ADD
+                              == NXFME_ADDED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_DELETE
+                              == NXFME_DELETED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_MODIFY
+                              == NXFME_MODIFIED);
+
+            ofmonitor_report(ofproto->connmgr, rule,
+                             (enum nx_flow_update_event) op->type,
+                             op->reason, abbrev_ofconn, abbrev_xid);
+        }
+
+        rule->pending = NULL;
+
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            if (!op->error) {
+                ofproto_rule_destroy__(op->victim);
+                if ((rule->cr.wc.vlan_tci_mask & htons(VLAN_VID_MASK))
+                    == htons(VLAN_VID_MASK)) {
+                    if (ofproto->vlan_bitmap) {
+                        uint16_t vid = vlan_tci_to_vid(rule->cr.flow.vlan_tci);
+
+                        if (!bitmap_is_set(ofproto->vlan_bitmap, vid)) {
+                            bitmap_set1(ofproto->vlan_bitmap, vid);
+                            ofproto->vlans_changed = true;
+                        }
+                    } else {
+                        ofproto->vlans_changed = true;
+                    }
+                }
+            } else {
+                oftable_substitute_rule(rule, op->victim);
+                ofproto_rule_destroy__(rule);
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            assert(!op->error);
+            ofproto_rule_destroy__(rule);
+            op->rule = NULL;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (!op->error) {
+                rule->modified = time_msec();
+            } else {
+                rule->flow_cookie = op->flow_cookie;
+                if (op->ofpacts) {
+                    free(rule->ofpacts);
+                    rule->ofpacts = op->ofpacts;
+                    rule->ofpacts_len = op->ofpacts_len;
+                    op->ofpacts = NULL;
+                    op->ofpacts_len = 0;
+                }
+            }
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+
+        ofoperation_destroy(op);
+    }
+
+    ofmonitor_flush(ofproto->connmgr);
+
     if (!list_is_empty(&group->ofproto_node)) {
-        assert(group->ofproto->n_pending > 0);
-        group->ofproto->n_pending--;
+        assert(ofproto->n_pending > 0);
+        ofproto->n_pending--;
         list_remove(&group->ofproto_node);
     }
     if (!list_is_empty(&group->ofconn_node)) {
         list_remove(&group->ofconn_node);
-        if (group->error) {
-            ofconn_send_error(group->ofconn, group->request, group->error);
+        if (error) {
+            ofconn_send_error(group->ofconn, group->request, error);
         }
-        connmgr_retry(group->ofproto->connmgr);
+        connmgr_retry(ofproto->connmgr);
     }
     free(group->request);
     free(group);
 }
 
 /* Initiates a new operation on 'rule', of the specified 'type', within
- * 'group'.  Prior to calling, 'rule' must not have any pending operation. */
-static void
+ * 'group'.  Prior to calling, 'rule' must not have any pending operation.
+ *
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted.  For other 'type's, 'reason' is ignored (use 0).
+ *
+ * Returns the newly created ofoperation (which is also available as
+ * rule->pending). */
+static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
-                   enum ofoperation_type type)
+                   enum ofoperation_type type,
+                   enum ofp_flow_removed_reason reason)
 {
+    struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
 
     assert(!rule->pending);
@@ -3508,12 +4033,17 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
+    op->reason = reason;
     op->flow_cookie = rule->flow_cookie;
 
+    group->n_running++;
+
     if (type == OFOPERATION_DELETE) {
-        hmap_insert(&op->group->ofproto->deletions, &op->hmap_node,
+        hmap_insert(&ofproto->deletions, &op->hmap_node,
                     cls_rule_hash(&rule->cr, rule->table_id));
     }
+
+    return op;
 }
 
 static void
@@ -3528,12 +4058,8 @@ ofoperation_destroy(struct ofoperation *op)
         hmap_remove(&group->ofproto->deletions, &op->hmap_node);
     }
     list_remove(&op->group_node);
-    free(op->actions);
+    free(op->ofpacts);
     free(op);
-
-    if (list_is_empty(&group->ops) && !list_is_empty(&group->ofproto_node)) {
-        ofopgroup_destroy(group);
-    }
 }
 
 /* Indicates that 'op' completed with status 'error', which is either 0 to
@@ -3569,76 +4095,15 @@ void
 ofoperation_complete(struct ofoperation *op, enum ofperr error)
 {
     struct ofopgroup *group = op->group;
-    struct rule *rule = op->rule;
-    struct ofproto *ofproto = rule->ofproto;
 
-    assert(rule->pending == op);
-
-    if (!error
-        && !group->error
-        && op->type != OFOPERATION_DELETE
-        && group->ofconn
-        && group->buffer_id != UINT32_MAX
-        && list_is_singleton(&op->group_node)) {
-        struct ofpbuf *packet;
-        uint16_t in_port;
-
-        error = ofconn_pktbuf_retrieve(group->ofconn, group->buffer_id,
-                                       &packet, &in_port);
-        if (packet) {
-            assert(!error);
-            error = rule_execute(rule, in_port, packet);
-        }
-    }
-    if (!group->error) {
-        group->error = error;
-    }
-
-    switch (op->type) {
-    case OFOPERATION_ADD:
-        if (!error) {
-            ofproto_rule_destroy__(op->victim);
-            if ((rule->cr.wc.vlan_tci_mask & htons(VLAN_VID_MASK))
-                == htons(VLAN_VID_MASK)) {
-                if (ofproto->vlan_bitmap) {
-                    uint16_t vid = vlan_tci_to_vid(rule->cr.flow.vlan_tci);
+    assert(op->rule->pending == op);
+    assert(group->n_running > 0);
+    assert(!error || op->type != OFOPERATION_DELETE);
 
-                    if (!bitmap_is_set(ofproto->vlan_bitmap, vid)) {
-                        bitmap_set1(ofproto->vlan_bitmap, vid);
-                        ofproto->vlans_changed = true;
-                    }
-                } else {
-                    ofproto->vlans_changed = true;
-                }
-            }
-        } else {
-            oftable_substitute_rule(rule, op->victim);
-            ofproto_rule_destroy__(rule);
-            op->rule = NULL;
-        }
-        break;
-
-    case OFOPERATION_DELETE:
-        assert(!error);
-        ofproto_rule_destroy__(rule);
-        op->rule = NULL;
-        break;
-
-    case OFOPERATION_MODIFY:
-        if (!error) {
-            rule->modified = time_msec();
-        } else {
-            free(rule->actions);
-            rule->actions = op->actions;
-            rule->n_actions = op->n_actions;
-            op->actions = NULL;
-        }
-        break;
-
-    default:
-        NOT_REACHED();
+    op->error = error;
+    if (!--group->n_running && !list_is_empty(&group->ofproto_node)) {
+        ofopgroup_complete(group);
     }
-    ofoperation_destroy(op);
 }
 
 struct rule *
@@ -3662,8 +4127,9 @@ pick_datapath_id(const struct ofproto *ofproto)
         if (!error) {
             return eth_addr_to_uint64(ea);
         }
-        VLOG_WARN("could not get MAC address for %s (%s)",
-                  netdev_get_name(port->netdev), strerror(error));
+        VLOG_WARN("%s: could not get MAC address for %s (%s)",
+                  ofproto->name, netdev_get_name(port->netdev),
+                  strerror(error));
     }
     return ofproto->fallback_dpid;
 }
@@ -3739,7 +4205,8 @@ ofproto_evict(struct ofproto *ofproto)
                 break;
             }
 
-            ofoperation_create(group, rule, OFOPERATION_DELETE);
+            ofoperation_create(group, rule,
+                               OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }