cfm: Expose remote CFM opstate in the database.
[sliver-openvswitch.git] / ofproto / ofproto.c
index a710b3d..5c9ab9d 100644 (file)
@@ -34,6 +34,7 @@
 #include "nx-match.h"
 #include "ofp-actions.h"
 #include "ofp-errors.h"
+#include "ofp-msgs.h"
 #include "ofp-print.h"
 #include "ofp-util.h"
 #include "ofpbuf.h"
@@ -87,6 +88,7 @@ struct ofopgroup {
     struct ofproto *ofproto;    /* Owning ofproto. */
     struct list ofproto_node;   /* In ofproto's "pending" list. */
     struct list ops;            /* List of "struct ofoperation"s. */
+    int n_running;              /* Number of ops still pending. */
 
     /* Data needed to send OpenFlow reply on failure or to send a buffered
      * packet on success.
@@ -101,7 +103,6 @@ struct ofopgroup {
     struct ofconn *ofconn;      /* ofconn for reply (but see note above). */
     struct ofp_header *request; /* Original request (truncated at 64 bytes). */
     uint32_t buffer_id;         /* Buffer id from original request. */
-    int error;                  /* 0 if no error yet, otherwise error code. */
 };
 
 static struct ofopgroup *ofopgroup_create_unattached(struct ofproto *);
@@ -109,7 +110,7 @@ static struct ofopgroup *ofopgroup_create(struct ofproto *, struct ofconn *,
                                           const struct ofp_header *,
                                           uint32_t buffer_id);
 static void ofopgroup_submit(struct ofopgroup *);
-static void ofopgroup_destroy(struct ofopgroup *);
+static void ofopgroup_complete(struct ofopgroup *);
 
 /* A single flow table operation. */
 struct ofoperation {
@@ -118,15 +119,25 @@ struct ofoperation {
     struct hmap_node hmap_node; /* In ofproto's "deletions" hmap. */
     struct rule *rule;          /* Rule being operated upon. */
     enum ofoperation_type type; /* Type of operation. */
-    struct rule *victim;        /* OFOPERATION_ADD: Replaced rule. */
-    struct ofpact *ofpacts;     /* OFOPERATION_MODIFY: Replaced actions. */
-    size_t ofpacts_len;         /* OFOPERATION_MODIFY: Bytes of ofpacts. */
+
+    /* OFOPERATION_ADD. */
+    struct rule *victim;        /* Rule being replaced, if any.. */
+
+    /* OFOPERATION_MODIFY: The old actions, if the actions are changing. */
+    struct ofpact *ofpacts;
+    size_t ofpacts_len;
+
+    /* OFOPERATION_DELETE. */
+    enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
+    enum ofperr error;          /* 0 if no error. */
 };
 
 static struct ofoperation *ofoperation_create(struct ofopgroup *,
                                               struct rule *,
-                                              enum ofoperation_type);
+                                              enum ofoperation_type,
+                                              enum ofp_flow_removed_reason);
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
@@ -182,7 +193,6 @@ static void reinit_ports(struct ofproto *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -946,7 +956,8 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                ofoperation_create(group, rule, OFOPERATION_DELETE,
+                                   OFPRR_DELETE);
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
@@ -1439,7 +1450,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
@@ -1888,13 +1899,36 @@ ofproto_rule_destroy(struct rule *rule)
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t port)
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
 {
     return (port == OFPP_NONE
             || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
 }
 
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+    if (ofproto_rule_has_out_port(op->rule, out_port)) {
+        return true;
+    }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+    case OFOPERATION_DELETE:
+        return false;
+
+    case OFOPERATION_MODIFY:
+        return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
+    }
+
+    NOT_REACHED();
+}
+
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
  * statistics appropriately.  'packet' must have at least sizeof(struct
  * ofp_packet_in) bytes of headroom.
@@ -1919,8 +1953,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
 {
     return rule->cr.priority > UINT16_MAX;
 }
@@ -1985,7 +2019,8 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
     struct ofpbuf *buf;
 
     /* Send reply. */
-    osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_OFPT_GET_CONFIG_REPLY, oh, 0);
+    osc = ofpbuf_put_uninit(buf, sizeof *osc);
     flags = ofproto->frag_handling;
     if (ofconn_get_invalid_ttl_to_controller(ofconn)) {
         flags |= OFPC_INVALID_TTL_TO_CONTROLLER;
@@ -1998,8 +2033,9 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
 }
 
 static enum ofperr
-handle_set_config(struct ofconn *ofconn, const struct ofp_switch_config *osc)
+handle_set_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
+    const struct ofp_switch_config *osc = ofpmsg_body(oh);
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     uint16_t flags = ntohs(osc->flags);
 
@@ -2044,7 +2080,7 @@ reject_slave_controller(struct ofconn *ofconn)
 }
 
 static enum ofperr
-handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
+handle_packet_out(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofputil_packet_out po;
@@ -2063,7 +2099,7 @@ handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
 
     /* Decode message. */
     ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
-    error = ofputil_decode_packet_out(&po, opo, &ofpacts);
+    error = ofputil_decode_packet_out(&po, oh, &ofpacts);
     if (error) {
         goto exit_free_ofpacts;
     }
@@ -2149,13 +2185,14 @@ handle_port_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 
 static enum ofperr
 handle_desc_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *request)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofp_desc_stats *ods;
     struct ofpbuf *msg;
 
-    ods = ofputil_make_stats_reply(sizeof *ods, request, &msg);
+    msg = ofpraw_alloc_stats_reply(request, 0);
+    ods = ofpbuf_put_zeros(msg, sizeof *ods);
     ovs_strlcpy(ods->mfr_desc, p->mfr_desc, sizeof ods->mfr_desc);
     ovs_strlcpy(ods->hw_desc, p->hw_desc, sizeof ods->hw_desc);
     ovs_strlcpy(ods->sw_desc, p->sw_desc, sizeof ods->sw_desc);
@@ -2168,15 +2205,14 @@ handle_desc_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_table_stats_request(struct ofconn *ofconn,
-                           const struct ofp_stats_msg *request)
+                           const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
-    struct ofp_table_stats *ots;
+    struct ofp10_table_stats *ots;
     struct ofpbuf *msg;
     size_t i;
 
-    ofputil_make_stats_reply(sizeof(struct ofp_stats_msg), request, &msg);
-
+    msg = ofpraw_alloc_stats_reply(request, sizeof *ots * p->n_tables);
     ots = ofpbuf_put_zeros(msg, sizeof *ots * p->n_tables);
     for (i = 0; i < p->n_tables; i++) {
         ots[i].table_id = i;
@@ -2208,14 +2244,14 @@ static void
 append_port_stat(struct ofport *port, struct list *replies)
 {
     struct netdev_stats stats;
-    struct ofp_port_stats *ops;
+    struct ofp10_port_stats *ops;
 
     /* Intentionally ignore return value, since errors will set
      * 'stats' to all-1s, which is correct for OpenFlow, and
      * netdev_get_stats() will log errors. */
     ofproto_port_get_stats(port, &stats);
 
-    ops = ofputil_append_stats_reply(sizeof *ops, replies);
+    ops = ofpmp_append(replies, sizeof *ops);
     ops->port_no = htons(port->pp.port_no);
     memset(ops->pad, 0, sizeof ops->pad);
     put_32aligned_be64(&ops->rx_packets, htonll(stats.rx_packets));
@@ -2234,13 +2270,14 @@ append_port_stat(struct ofport *port, struct list *replies)
 
 static enum ofperr
 handle_port_stats_request(struct ofconn *ofconn,
-                          const struct ofp_port_stats_request *psr)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    const struct ofp10_port_stats_request *psr = ofpmsg_body(request);
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(&psr->osm, &replies);
+    ofpmp_init(&replies, request);
     if (psr->port_no != htons(OFPP_NONE)) {
         port = ofproto_get_port(p, ntohs(psr->port_no));
         if (port) {
@@ -2258,17 +2295,18 @@ handle_port_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_port_desc_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    enum ofp_version version;
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
 
+    version = ofputil_protocol_to_ofp_version(ofconn_get_protocol(ofconn));
     HMAP_FOR_EACH (port, hmap_node, &p->ports) {
-        ofputil_append_port_desc_stats_reply(ofconn_get_protocol(ofconn),
-                                             &port->pp, &replies);
+        ofputil_append_port_desc_stats_reply(version, &port->pp, &replies);
     }
 
     ofconn_send_replies(ofconn, &replies);
@@ -2387,7 +2425,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2431,7 +2470,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2452,7 +2492,7 @@ age_secs(long long int age_ms)
 
 static enum ofperr
 handle_flow_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *osm)
+                          const struct ofp_header *request)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request fsr;
@@ -2461,7 +2501,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&fsr, &osm->header);
+    error = ofputil_decode_flow_stats_request(&fsr, request);
     if (error) {
         return error;
     }
@@ -2473,7 +2513,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
         return error;
     }
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
     LIST_FOR_EACH (rule, ofproto_node, &rules) {
         long long int now = time_msec();
         struct ofputil_flow_stats fs;
@@ -2554,7 +2594,7 @@ ofproto_get_netflow_ids(const struct ofproto *ofproto,
 /* Checks the fault status of CFM for 'ofp_port' within 'ofproto'.  Returns a
  * bitmask of 'cfm_fault_reason's to indicate a CFM fault (generally
  * indicating a connectivity problem).  Returns zero if CFM is not faulted,
- * and -1 if CFM is not enabled on 'port'. */
+ * and -1 if CFM is not enabled on 'ofp_port'. */
 int
 ofproto_port_get_cfm_fault(const struct ofproto *ofproto, uint16_t ofp_port)
 {
@@ -2564,6 +2604,19 @@ ofproto_port_get_cfm_fault(const struct ofproto *ofproto, uint16_t ofp_port)
             : -1);
 }
 
+/* Checks the operational status reported by the remote CFM endpoint of
+ * 'ofp_port'  Returns 1 if operationally up, 0 if operationally down, and -1
+ * if CFM is not enabled on 'ofp_port' or does not support operational status.
+ */
+int
+ofproto_port_get_cfm_opup(const struct ofproto *ofproto, uint16_t ofp_port)
+{
+    struct ofport *ofport = ofproto_get_port(ofproto, ofp_port);
+    return (ofport && ofproto->ofproto_class->get_cfm_opup
+            ? ofproto->ofproto_class->get_cfm_opup(ofport)
+            : -1);
+}
+
 /* Gets the MPIDs of the remote maintenance points broadcasting to 'ofp_port'
  * within 'ofproto'.  Populates 'rmps' with an array of MPIDs owned by
  * 'ofproto', and 'n_rmps' with the number of MPIDs in 'rmps'.  Returns a
@@ -2598,7 +2651,7 @@ ofproto_port_get_cfm_health(const struct ofproto *ofproto, uint16_t ofp_port)
 
 static enum ofperr
 handle_aggregate_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *oh)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request request;
@@ -2609,7 +2662,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&request, &osm->header);
+    error = ofputil_decode_flow_stats_request(&request, oh);
     if (error) {
         return error;
     }
@@ -2651,7 +2704,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
         stats.byte_count = UINT64_MAX;
     }
 
-    reply = ofputil_encode_aggregate_stats_reply(&stats, osm);
+    reply = ofputil_encode_aggregate_stats_reply(&stats, oh);
     ofconn_send_reply(ofconn, reply);
 
     return 0;
@@ -2666,9 +2719,9 @@ static void
 put_queue_stats(struct queue_stats_cbdata *cbdata, uint32_t queue_id,
                 const struct netdev_queue_stats *stats)
 {
-    struct ofp_queue_stats *reply;
+    struct ofp10_queue_stats *reply;
 
-    reply = ofputil_append_stats_reply(sizeof *reply, &cbdata->replies);
+    reply = ofpmp_append(&cbdata->replies, sizeof *reply);
     reply->port_no = htons(cbdata->ofport->pp.port_no);
     memset(reply->pad, 0, sizeof reply->pad);
     reply->queue_id = htonl(queue_id);
@@ -2709,9 +2762,10 @@ handle_queue_stats_for_port(struct ofport *port, uint32_t queue_id,
 
 static enum ofperr
 handle_queue_stats_request(struct ofconn *ofconn,
-                           const struct ofp_queue_stats_request *qsr)
+                           const struct ofp_header *rq)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    const struct ofp10_queue_stats_request *qsr = ofpmsg_body(rq);
     struct queue_stats_cbdata cbdata;
     unsigned int port_no;
     struct ofport *port;
@@ -2720,7 +2774,7 @@ handle_queue_stats_request(struct ofconn *ofconn,
 
     COVERAGE_INC(ofproto_queue_req);
 
-    ofputil_start_stats_reply(&qsr->osm, &cbdata.replies);
+    ofpmp_init(&cbdata.replies, rq);
 
     port_no = ntohs(qsr->port_no);
     queue_id = ntohl(qsr->queue_id);
@@ -2849,6 +2903,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
+    rule->monitor_flags = 0;
+    rule->add_seqno = 0;
+    rule->modify_seqno = 0;
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
@@ -2880,11 +2937,12 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-        op = ofoperation_create(group, rule, OFOPERATION_ADD);
+        op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
         op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
         if (error) {
+            op->group->n_running--;
             ofoperation_destroy(rule->pending);
         } else if (evict) {
             delete_flow__(evict, group);
@@ -2922,6 +2980,10 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
     group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
     error = OFPERR_OFPBRC_EPERM;
     LIST_FOR_EACH (rule, ofproto_node, rules) {
+        struct ofoperation *op;
+        bool actions_changed;
+        ovs_be64 new_cookie;
+
         if (rule_is_modifiable(rule)) {
             /* At least one rule is modifiable, don't report EPERM error. */
             error = 0;
@@ -2929,21 +2991,26 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
             continue;
         }
 
-        if (!ofpacts_equal(fm->ofpacts, fm->ofpacts_len,
-                           rule->ofpacts, rule->ofpacts_len)) {
-            struct ofoperation *op;
+        actions_changed = !ofpacts_equal(fm->ofpacts, fm->ofpacts_len,
+                                         rule->ofpacts, rule->ofpacts_len);
+        new_cookie = (fm->new_cookie != htonll(UINT64_MAX)
+                      ? fm->new_cookie
+                      : rule->flow_cookie);
+        if (!actions_changed && new_cookie == rule->flow_cookie) {
+            /* No change at all. */
+            continue;
+        }
 
-            op = ofoperation_create(group, rule, OFOPERATION_MODIFY);
+        op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
+        rule->flow_cookie = new_cookie;
+        if (actions_changed) {
             op->ofpacts = rule->ofpacts;
             op->ofpacts_len = rule->ofpacts_len;
             rule->ofpacts = xmemdup(fm->ofpacts, fm->ofpacts_len);
             rule->ofpacts_len = fm->ofpacts_len;
             rule->ofproto->ofproto_class->rule_modify_actions(rule);
         } else {
-            rule->modified = time_msec();
-        }
-        if (fm->new_cookie != htonll(UINT64_MAX)) {
-            rule->flow_cookie = fm->new_cookie;
+            ofoperation_complete(op, 0);
         }
     }
     ofopgroup_submit(group);
@@ -3013,7 +3080,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
@@ -3078,7 +3145,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
 
-    if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+    if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
         return;
     }
 
@@ -3088,6 +3155,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
     calc_flow_duration__(rule->created, time_msec(),
                          &fr.duration_sec, &fr.duration_nsec);
     fr.idle_timeout = rule->idle_timeout;
+    fr.hard_timeout = rule->hard_timeout;
     rule->ofproto->ofproto_class->rule_get_stats(rule, &fr.packet_count,
                                                  &fr.byte_count);
 
@@ -3112,6 +3180,9 @@ ofproto_rule_update_used(struct rule *rule, long long int used)
  * OFPRR_HARD_TIMEOUT or OFPRR_IDLE_TIMEOUT), and then removes 'rule' from its
  * ofproto.
  *
+ * 'rule' must not have a pending operation (that is, 'rule->pending' must be
+ * NULL).
+ *
  * ofproto implementation ->run() functions should use this function to expire
  * OpenFlow flows. */
 void
@@ -3125,7 +3196,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
@@ -3153,9 +3224,9 @@ handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
         goto exit_free_ofpacts;
     }
 
+    if (fm.flags & OFPFF10_EMERG) {
     /* We do not support the OpenFlow 1.0 emergency flow cache, which is not
      * required in OpenFlow 1.0.1 and removed from OpenFlow 1.1. */
-    if (fm.flags & OFPFF_EMERG) {
         /* We do not support the emergency flow cache.  It will hopefully get
          * dropped from OpenFlow in the near future.  There is no good error
          * code, so just state that the flow table is full. */
@@ -3238,7 +3309,7 @@ handle_flow_mod__(struct ofproto *ofproto, struct ofconn *ofconn,
 static enum ofperr
 handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct nx_role_request *nrr = (struct nx_role_request *) oh;
+    const struct nx_role_request *nrr = ofpmsg_body(oh);
     struct nx_role_request *reply;
     struct ofpbuf *buf;
     uint32_t role;
@@ -3256,7 +3327,8 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 
     ofconn_set_role(ofconn, role);
 
-    reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_NXT_ROLE_REPLY, oh, 0);
+    reply = ofpbuf_put_zeros(buf, sizeof *reply);
     reply->role = htonl(role);
     ofconn_send_reply(ofconn, buf);
 
@@ -3267,8 +3339,7 @@ static enum ofperr
 handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_flow_mod_table_id *msg
-        = (const struct nx_flow_mod_table_id *) oh;
+    const struct nx_flow_mod_table_id *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
 
     cur = ofconn_get_protocol(ofconn);
@@ -3281,8 +3352,7 @@ handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_flow_format(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_set_flow_format *msg
-        = (const struct nx_set_flow_format *) oh;
+    const struct nx_set_flow_format *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
     enum ofputil_protocol next_base;
 
@@ -3306,10 +3376,9 @@ static enum ofperr
 handle_nxt_set_packet_in_format(struct ofconn *ofconn,
                                 const struct ofp_header *oh)
 {
-    const struct nx_set_packet_in_format *msg;
+    const struct nx_set_packet_in_format *msg = ofpmsg_body(oh);
     uint32_t format;
 
-    msg = (const struct nx_set_packet_in_format *) oh;
     format = ntohl(msg->format);
     if (format != NXPIF_OPENFLOW10 && format != NXPIF_NXM) {
         return OFPERR_OFPBRC_EPERM;
@@ -3328,7 +3397,7 @@ handle_nxt_set_packet_in_format(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_async_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_async_config *msg = (const struct nx_async_config *) oh;
+    const struct nx_async_config *msg = ofpmsg_body(oh);
     uint32_t master[OAM_N_TYPES];
     uint32_t slave[OAM_N_TYPES];
 
@@ -3353,9 +3422,8 @@ static enum ofperr
 handle_nxt_set_controller_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_controller_id *nci;
+    const struct nx_controller_id *nci = ofpmsg_body(oh);
 
-    nci = (const struct nx_controller_id *) oh;
     if (!is_all_zeros(nci->zero, sizeof nci->zero)) {
         return OFPERR_NXBRC_MUST_BE_ZERO;
     }
@@ -3373,131 +3441,375 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return OFPROTO_POSTPONE;
     }
 
-    make_openflow_xid(sizeof *oh, OFPT10_BARRIER_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply((oh->version == OFP10_VERSION
+                              ? OFPRAW_OFPT10_BARRIER_REPLY
+                              : OFPRAW_OFPT11_BARRIER_REPLY), oh, 0);
     ofconn_send_reply(ofconn, buf);
     return 0;
 }
 
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+                                    enum nx_flow_monitor_flags flags,
+                                    struct list *msgs)
+{
+    struct ofoperation *op = rule->pending;
+    struct ofputil_flow_update fu;
+
+    if (op && op->type == OFOPERATION_ADD && !op->victim) {
+        /* We'll report the final flow when the operation completes.  Reporting
+         * it now would cause a duplicate report later. */
+        return;
+    }
+
+    fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+                ? NXFME_ADDED : NXFME_MODIFIED);
+    fu.reason = 0;
+    fu.idle_timeout = rule->idle_timeout;
+    fu.hard_timeout = rule->hard_timeout;
+    fu.table_id = rule->table_id;
+    fu.cookie = rule->flow_cookie;
+    fu.match = CONST_CAST(struct cls_rule *, &rule->cr);
+    if (!(flags & NXFMF_ACTIONS)) {
+        fu.ofpacts = NULL;
+        fu.ofpacts_len = 0;
+    } else if (!op) {
+        fu.ofpacts = rule->ofpacts;
+        fu.ofpacts_len = rule->ofpacts_len;
+    } else {
+        /* An operation is in progress.  Use the previous version of the flow's
+         * actions, so that when the operation commits we report the change. */
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            /* We already verified that there was a victim. */
+            fu.ofpacts = op->victim->ofpacts;
+            fu.ofpacts_len = op->victim->ofpacts_len;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (op->ofpacts) {
+                fu.ofpacts = op->ofpacts;
+                fu.ofpacts_len = op->ofpacts_len;
+            } else {
+                fu.ofpacts = rule->ofpacts;
+                fu.ofpacts_len = rule->ofpacts_len;
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            fu.ofpacts = rule->ofpacts;
+            fu.ofpacts_len = rule->ofpacts_len;
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+    }
+
+    if (list_is_empty(msgs)) {
+        ofputil_start_flow_update(msgs);
+    }
+    ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+    struct rule *rule;
+
+    LIST_FOR_EACH (rule, ofproto_node, rules) {
+        enum nx_flow_monitor_flags flags = rule->monitor_flags;
+        rule->monitor_flags = 0;
+
+        ofproto_compose_flow_refresh_update(rule, flags, msgs);
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+                                       struct rule *rule, uint64_t seqno,
+                                       struct list *rules)
+{
+    enum nx_flow_monitor_flags update;
+
+    if (ofproto_rule_is_hidden(rule)) {
+        return;
+    }
+
+    if (!(rule->pending
+          ? ofoperation_has_out_port(rule->pending, m->out_port)
+          : ofproto_rule_has_out_port(rule, m->out_port))) {
+        return;
+    }
+
+    if (seqno) {
+        if (rule->add_seqno > seqno) {
+            update = NXFMF_ADD | NXFMF_MODIFY;
+        } else if (rule->modify_seqno > seqno) {
+            update = NXFMF_MODIFY;
+        } else {
+            return;
+        }
+
+        if (!(m->flags & update)) {
+            return;
+        }
+    } else {
+        update = NXFMF_INITIAL;
+    }
+
+    if (!rule->monitor_flags) {
+        list_push_back(rules, &rule->ofproto_node);
+    }
+    rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+                                        uint64_t seqno,
+                                        struct list *rules)
+{
+    const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+    const struct ofoperation *op;
+    const struct oftable *table;
+
+    FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &table->cls, &m->match);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            assert(!rule->pending); /* XXX */
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+
+    HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+        struct rule *rule = op->rule;
+
+        if (((m->table_id == 0xff
+              ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+              : m->table_id == rule->table_id))
+            && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+                                        struct list *rules)
+{
+    if (m->flags & NXFMF_INITIAL) {
+        ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+    }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+                               uint64_t seqno, struct list *rules)
+{
+    ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofmonitor **monitors;
+    size_t n_monitors, allocated_monitors;
+    struct list replies;
+    enum ofperr error;
+    struct list rules;
+    struct ofpbuf b;
+    size_t i;
+
+    error = 0;
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    monitors = NULL;
+    n_monitors = allocated_monitors = 0;
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        struct ofmonitor *m;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval == EOF) {
+            break;
+        } else if (retval) {
+            error = retval;
+            goto error;
+        }
+
+        if (request.table_id != 0xff
+            && request.table_id >= ofproto->n_tables) {
+            error = OFPERR_OFPBRC_BAD_TABLE_ID;
+            goto error;
+        }
+
+        error = ofmonitor_create(&request, ofconn, &m);
+        if (error) {
+            goto error;
+        }
+
+        if (n_monitors >= allocated_monitors) {
+            monitors = x2nrealloc(monitors, &allocated_monitors,
+                                  sizeof *monitors);
+        }
+        monitors[n_monitors++] = m;
+    }
+
+    list_init(&rules);
+    for (i = 0; i < n_monitors; i++) {
+        ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+    }
+
+    ofpmp_init(&replies, oh);
+    ofmonitor_compose_refresh_updates(&rules, &replies);
+    ofconn_send_replies(ofconn, &replies);
+
+    free(monitors);
+
+    return 0;
+
+error:
+    for (i = 0; i < n_monitors; i++) {
+        ofmonitor_destroy(monitors[i]);
+    }
+    free(monitors);
+    return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofmonitor *m;
+    uint32_t id;
+
+    id = ofputil_decode_flow_monitor_cancel(oh);
+    m = ofmonitor_lookup(ofconn, id);
+    if (!m) {
+        return OFPERR_NXBRC_FM_BAD_ID;
+    }
+
+    ofmonitor_destroy(m);
+    return 0;
+}
+
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
     const struct ofp_header *oh = msg->data;
-    const struct ofputil_msg_type *type;
+    enum ofptype type;
     enum ofperr error;
 
-    error = ofputil_decode_msg_type(oh, &type);
+    error = ofptype_decode(&type, oh);
     if (error) {
         return error;
     }
 
-    switch (ofputil_msg_type_code(type)) {
+    switch (type) {
         /* OpenFlow requests. */
-    case OFPUTIL_OFPT_ECHO_REQUEST:
+    case OFPTYPE_ECHO_REQUEST:
         return handle_echo_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_FEATURES_REQUEST:
+    case OFPTYPE_FEATURES_REQUEST:
         return handle_features_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_GET_CONFIG_REQUEST:
+    case OFPTYPE_GET_CONFIG_REQUEST:
         return handle_get_config_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_SET_CONFIG:
-        return handle_set_config(ofconn, msg->data);
+    case OFPTYPE_SET_CONFIG:
+        return handle_set_config(ofconn, oh);
 
-    case OFPUTIL_OFPT_PACKET_OUT:
-        return handle_packet_out(ofconn, msg->data);
+    case OFPTYPE_PACKET_OUT:
+        return handle_packet_out(ofconn, oh);
 
-    case OFPUTIL_OFPT_PORT_MOD:
+    case OFPTYPE_PORT_MOD:
         return handle_port_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_FLOW_MOD:
+    case OFPTYPE_FLOW_MOD:
         return handle_flow_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_BARRIER_REQUEST:
+    case OFPTYPE_BARRIER_REQUEST:
         return handle_barrier_request(ofconn, oh);
 
         /* OpenFlow replies. */
-    case OFPUTIL_OFPT_ECHO_REPLY:
+    case OFPTYPE_ECHO_REPLY:
         return 0;
 
         /* Nicira extension requests. */
-    case OFPUTIL_NXT_ROLE_REQUEST:
+    case OFPTYPE_ROLE_REQUEST:
         return handle_role_request(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD_TABLE_ID:
+    case OFPTYPE_FLOW_MOD_TABLE_ID:
         return handle_nxt_flow_mod_table_id(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_FLOW_FORMAT:
+    case OFPTYPE_SET_FLOW_FORMAT:
         return handle_nxt_set_flow_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_PACKET_IN_FORMAT:
+    case OFPTYPE_SET_PACKET_IN_FORMAT:
         return handle_nxt_set_packet_in_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_CONTROLLER_ID:
+    case OFPTYPE_SET_CONTROLLER_ID:
         return handle_nxt_set_controller_id(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD:
-        return handle_flow_mod(ofconn, oh);
-
-    case OFPUTIL_NXT_FLOW_AGE:
+    case OFPTYPE_FLOW_AGE:
         /* Nothing to do. */
         return 0;
 
-    case OFPUTIL_NXT_SET_ASYNC_CONFIG:
+    case OFPTYPE_FLOW_MONITOR_CANCEL:
+        return handle_flow_monitor_cancel(ofconn, oh);
+
+    case OFPTYPE_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
         /* Statistics requests. */
-    case OFPUTIL_OFPST_DESC_REQUEST:
-        return handle_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_FLOW_REQUEST:
-    case OFPUTIL_NXST_FLOW_REQUEST:
-        return handle_flow_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_AGGREGATE_REQUEST:
-    case OFPUTIL_NXST_AGGREGATE_REQUEST:
-        return handle_aggregate_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_TABLE_REQUEST:
-        return handle_table_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_REQUEST:
-        return handle_port_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_QUEUE_REQUEST:
-        return handle_queue_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_DESC_REQUEST:
-        return handle_port_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_MSG_INVALID:
-    case OFPUTIL_OFPT_HELLO:
-    case OFPUTIL_OFPT_ERROR:
-    case OFPUTIL_OFPT_FEATURES_REPLY:
-    case OFPUTIL_OFPT_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPT_PACKET_IN:
-    case OFPUTIL_OFPT_FLOW_REMOVED:
-    case OFPUTIL_OFPT_PORT_STATUS:
-    case OFPUTIL_OFPT_BARRIER_REPLY:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REQUEST:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPST_DESC_REPLY:
-    case OFPUTIL_OFPST_FLOW_REPLY:
-    case OFPUTIL_OFPST_QUEUE_REPLY:
-    case OFPUTIL_OFPST_PORT_REPLY:
-    case OFPUTIL_OFPST_TABLE_REPLY:
-    case OFPUTIL_OFPST_AGGREGATE_REPLY:
-    case OFPUTIL_OFPST_PORT_DESC_REPLY:
-    case OFPUTIL_NXT_ROLE_REPLY:
-    case OFPUTIL_NXT_FLOW_REMOVED:
-    case OFPUTIL_NXT_PACKET_IN:
-    case OFPUTIL_NXST_FLOW_REPLY:
-    case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPTYPE_DESC_STATS_REQUEST:
+        return handle_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_STATS_REQUEST:
+        return handle_flow_stats_request(ofconn, oh);
+
+    case OFPTYPE_AGGREGATE_STATS_REQUEST:
+        return handle_aggregate_stats_request(ofconn, oh);
+
+    case OFPTYPE_TABLE_STATS_REQUEST:
+        return handle_table_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_STATS_REQUEST:
+        return handle_port_stats_request(ofconn, oh);
+
+    case OFPTYPE_QUEUE_STATS_REQUEST:
+        return handle_queue_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_DESC_STATS_REQUEST:
+        return handle_port_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_MONITOR_STATS_REQUEST:
+        return handle_flow_monitor_request(ofconn, oh);
+
+    case OFPTYPE_HELLO:
+    case OFPTYPE_ERROR:
+    case OFPTYPE_FEATURES_REPLY:
+    case OFPTYPE_GET_CONFIG_REPLY:
+    case OFPTYPE_PACKET_IN:
+    case OFPTYPE_FLOW_REMOVED:
+    case OFPTYPE_PORT_STATUS:
+    case OFPTYPE_BARRIER_REPLY:
+    case OFPTYPE_DESC_STATS_REPLY:
+    case OFPTYPE_FLOW_STATS_REPLY:
+    case OFPTYPE_QUEUE_STATS_REPLY:
+    case OFPTYPE_PORT_STATS_REPLY:
+    case OFPTYPE_TABLE_STATS_REPLY:
+    case OFPTYPE_AGGREGATE_STATS_REPLY:
+    case OFPTYPE_PORT_DESC_STATS_REPLY:
+    case OFPTYPE_ROLE_REPLY:
+    case OFPTYPE_FLOW_MONITOR_PAUSED:
+    case OFPTYPE_FLOW_MONITOR_RESUMED:
+    case OFPTYPE_FLOW_MONITOR_STATS_REPLY:
     default:
-        return (oh->type == OFPT10_STATS_REQUEST ||
-                oh->type == OFPT10_STATS_REPLY
-                ? OFPERR_OFPBRC_BAD_STAT
-                : OFPERR_OFPBRC_BAD_TYPE);
+        return OFPERR_OFPBRC_BAD_TYPE;
     }
 }
 
@@ -3569,8 +3881,8 @@ ofopgroup_create(struct ofproto *ofproto, struct ofconn *ofconn,
 static void
 ofopgroup_submit(struct ofopgroup *group)
 {
-    if (list_is_empty(&group->ops)) {
-        ofopgroup_destroy(group);
+    if (!group->n_running) {
+        ofopgroup_complete(group);
     } else {
         list_push_back(&group->ofproto->pending, &group->ofproto_node);
         group->ofproto->n_pending++;
@@ -3578,20 +3890,134 @@ ofopgroup_submit(struct ofopgroup *group)
 }
 
 static void
-ofopgroup_destroy(struct ofopgroup *group)
+ofopgroup_complete(struct ofopgroup *group)
 {
-    assert(list_is_empty(&group->ops));
+    struct ofproto *ofproto = group->ofproto;
+
+    struct ofconn *abbrev_ofconn;
+    ovs_be32 abbrev_xid;
+
+    struct ofoperation *op, *next_op;
+    int error;
+
+    assert(!group->n_running);
+
+    error = 0;
+    LIST_FOR_EACH (op, group_node, &group->ops) {
+        if (op->error) {
+            error = op->error;
+            break;
+        }
+    }
+
+    if (!error && group->ofconn && group->buffer_id != UINT32_MAX) {
+        LIST_FOR_EACH (op, group_node, &group->ops) {
+            if (op->type != OFOPERATION_DELETE) {
+                struct ofpbuf *packet;
+                uint16_t in_port;
+
+                error = ofconn_pktbuf_retrieve(group->ofconn, group->buffer_id,
+                                               &packet, &in_port);
+                if (packet) {
+                    assert(!error);
+                    error = rule_execute(op->rule, in_port, packet);
+                }
+                break;
+            }
+        }
+    }
+
+    if (!error && !list_is_empty(&group->ofconn_node)) {
+        abbrev_ofconn = group->ofconn;
+        abbrev_xid = group->request->xid;
+    } else {
+        abbrev_ofconn = NULL;
+        abbrev_xid = htonl(0);
+    }
+    LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
+        struct rule *rule = op->rule;
+
+        if (!op->error && !ofproto_rule_is_hidden(rule)) {
+            /* Check that we can just cast from ofoperation_type to
+             * nx_flow_update_event. */
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_ADD
+                              == NXFME_ADDED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_DELETE
+                              == NXFME_DELETED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_MODIFY
+                              == NXFME_MODIFIED);
+
+            ofmonitor_report(ofproto->connmgr, rule,
+                             (enum nx_flow_update_event) op->type,
+                             op->reason, abbrev_ofconn, abbrev_xid);
+        }
+
+        rule->pending = NULL;
+
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            if (!op->error) {
+                ofproto_rule_destroy__(op->victim);
+                if ((rule->cr.wc.vlan_tci_mask & htons(VLAN_VID_MASK))
+                    == htons(VLAN_VID_MASK)) {
+                    if (ofproto->vlan_bitmap) {
+                        uint16_t vid = vlan_tci_to_vid(rule->cr.flow.vlan_tci);
+
+                        if (!bitmap_is_set(ofproto->vlan_bitmap, vid)) {
+                            bitmap_set1(ofproto->vlan_bitmap, vid);
+                            ofproto->vlans_changed = true;
+                        }
+                    } else {
+                        ofproto->vlans_changed = true;
+                    }
+                }
+            } else {
+                oftable_substitute_rule(rule, op->victim);
+                ofproto_rule_destroy__(rule);
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            assert(!op->error);
+            ofproto_rule_destroy__(rule);
+            op->rule = NULL;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (!op->error) {
+                rule->modified = time_msec();
+            } else {
+                rule->flow_cookie = op->flow_cookie;
+                if (op->ofpacts) {
+                    free(rule->ofpacts);
+                    rule->ofpacts = op->ofpacts;
+                    rule->ofpacts_len = op->ofpacts_len;
+                    op->ofpacts = NULL;
+                    op->ofpacts_len = 0;
+                }
+            }
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+
+        ofoperation_destroy(op);
+    }
+
+    ofmonitor_flush(ofproto->connmgr);
+
     if (!list_is_empty(&group->ofproto_node)) {
-        assert(group->ofproto->n_pending > 0);
-        group->ofproto->n_pending--;
+        assert(ofproto->n_pending > 0);
+        ofproto->n_pending--;
         list_remove(&group->ofproto_node);
     }
     if (!list_is_empty(&group->ofconn_node)) {
         list_remove(&group->ofconn_node);
-        if (group->error) {
-            ofconn_send_error(group->ofconn, group->request, group->error);
+        if (error) {
+            ofconn_send_error(group->ofconn, group->request, error);
         }
-        connmgr_retry(group->ofproto->connmgr);
+        connmgr_retry(ofproto->connmgr);
     }
     free(group->request);
     free(group);
@@ -3600,11 +4026,15 @@ ofopgroup_destroy(struct ofopgroup *group)
 /* Initiates a new operation on 'rule', of the specified 'type', within
  * 'group'.  Prior to calling, 'rule' must not have any pending operation.
  *
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted.  For other 'type's, 'reason' is ignored (use 0).
+ *
  * Returns the newly created ofoperation (which is also available as
  * rule->pending). */
 static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
-                   enum ofoperation_type type)
+                   enum ofoperation_type type,
+                   enum ofp_flow_removed_reason reason)
 {
     struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
@@ -3616,8 +4046,11 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
+    op->reason = reason;
     op->flow_cookie = rule->flow_cookie;
 
+    group->n_running++;
+
     if (type == OFOPERATION_DELETE) {
         hmap_insert(&ofproto->deletions, &op->hmap_node,
                     cls_rule_hash(&rule->cr, rule->table_id));
@@ -3640,10 +4073,6 @@ ofoperation_destroy(struct ofoperation *op)
     list_remove(&op->group_node);
     free(op->ofpacts);
     free(op);
-
-    if (list_is_empty(&group->ops) && !list_is_empty(&group->ofproto_node)) {
-        ofopgroup_destroy(group);
-    }
 }
 
 /* Indicates that 'op' completed with status 'error', which is either 0 to
@@ -3679,76 +4108,15 @@ void
 ofoperation_complete(struct ofoperation *op, enum ofperr error)
 {
     struct ofopgroup *group = op->group;
-    struct rule *rule = op->rule;
-    struct ofproto *ofproto = rule->ofproto;
 
-    assert(rule->pending == op);
-
-    if (!error
-        && !group->error
-        && op->type != OFOPERATION_DELETE
-        && group->ofconn
-        && group->buffer_id != UINT32_MAX
-        && list_is_singleton(&op->group_node)) {
-        struct ofpbuf *packet;
-        uint16_t in_port;
-
-        error = ofconn_pktbuf_retrieve(group->ofconn, group->buffer_id,
-                                       &packet, &in_port);
-        if (packet) {
-            assert(!error);
-            error = rule_execute(rule, in_port, packet);
-        }
-    }
-    if (!group->error) {
-        group->error = error;
-    }
-
-    switch (op->type) {
-    case OFOPERATION_ADD:
-        if (!error) {
-            ofproto_rule_destroy__(op->victim);
-            if ((rule->cr.wc.vlan_tci_mask & htons(VLAN_VID_MASK))
-                == htons(VLAN_VID_MASK)) {
-                if (ofproto->vlan_bitmap) {
-                    uint16_t vid = vlan_tci_to_vid(rule->cr.flow.vlan_tci);
-
-                    if (!bitmap_is_set(ofproto->vlan_bitmap, vid)) {
-                        bitmap_set1(ofproto->vlan_bitmap, vid);
-                        ofproto->vlans_changed = true;
-                    }
-                } else {
-                    ofproto->vlans_changed = true;
-                }
-            }
-        } else {
-            oftable_substitute_rule(rule, op->victim);
-            ofproto_rule_destroy__(rule);
-            op->rule = NULL;
-        }
-        break;
-
-    case OFOPERATION_DELETE:
-        assert(!error);
-        ofproto_rule_destroy__(rule);
-        op->rule = NULL;
-        break;
-
-    case OFOPERATION_MODIFY:
-        if (!error) {
-            rule->modified = time_msec();
-        } else {
-            free(rule->ofpacts);
-            rule->ofpacts = op->ofpacts;
-            rule->ofpacts_len = op->ofpacts_len;
-            op->ofpacts = NULL;
-        }
-        break;
+    assert(op->rule->pending == op);
+    assert(group->n_running > 0);
+    assert(!error || op->type != OFOPERATION_DELETE);
 
-    default:
-        NOT_REACHED();
+    op->error = error;
+    if (!--group->n_running && !list_is_empty(&group->ofproto_node)) {
+        ofopgroup_complete(group);
     }
-    ofoperation_destroy(op);
 }
 
 struct rule *
@@ -3850,7 +4218,8 @@ ofproto_evict(struct ofproto *ofproto)
                 break;
             }
 
-            ofoperation_create(group, rule, OFOPERATION_DELETE);
+            ofoperation_create(group, rule,
+                               OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }