Add support for 'hard_timeout' in OF1.2 flow_removed message.
[sliver-openvswitch.git] / ofproto / ofproto.c
index 9340191..8c10f19 100644 (file)
@@ -34,6 +34,7 @@
 #include "nx-match.h"
 #include "ofp-actions.h"
 #include "ofp-errors.h"
+#include "ofp-msgs.h"
 #include "ofp-print.h"
 #include "ofp-util.h"
 #include "ofpbuf.h"
@@ -126,13 +127,17 @@ struct ofoperation {
     struct ofpact *ofpacts;
     size_t ofpacts_len;
 
+    /* OFOPERATION_DELETE. */
+    enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
     enum ofperr error;          /* 0 if no error. */
 };
 
 static struct ofoperation *ofoperation_create(struct ofopgroup *,
                                               struct rule *,
-                                              enum ofoperation_type);
+                                              enum ofoperation_type,
+                                              enum ofp_flow_removed_reason);
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
@@ -188,7 +193,6 @@ static void reinit_ports(struct ofproto *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -952,7 +956,8 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                ofoperation_create(group, rule, OFOPERATION_DELETE,
+                                   OFPRR_DELETE);
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
@@ -1445,7 +1450,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
@@ -1894,13 +1899,36 @@ ofproto_rule_destroy(struct rule *rule)
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t port)
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
 {
     return (port == OFPP_NONE
             || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
 }
 
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+    if (ofproto_rule_has_out_port(op->rule, out_port)) {
+        return true;
+    }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+    case OFOPERATION_DELETE:
+        return false;
+
+    case OFOPERATION_MODIFY:
+        return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
+    }
+
+    NOT_REACHED();
+}
+
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
  * statistics appropriately.  'packet' must have at least sizeof(struct
  * ofp_packet_in) bytes of headroom.
@@ -1925,8 +1953,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
 {
     return rule->cr.priority > UINT16_MAX;
 }
@@ -1991,7 +2019,8 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
     struct ofpbuf *buf;
 
     /* Send reply. */
-    osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_OFPT_GET_CONFIG_REPLY, oh, 0);
+    osc = ofpbuf_put_uninit(buf, sizeof *osc);
     flags = ofproto->frag_handling;
     if (ofconn_get_invalid_ttl_to_controller(ofconn)) {
         flags |= OFPC_INVALID_TTL_TO_CONTROLLER;
@@ -2004,8 +2033,9 @@ handle_get_config_request(struct ofconn *ofconn, const struct ofp_header *oh)
 }
 
 static enum ofperr
-handle_set_config(struct ofconn *ofconn, const struct ofp_switch_config *osc)
+handle_set_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
+    const struct ofp_switch_config *osc = ofpmsg_body(oh);
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     uint16_t flags = ntohs(osc->flags);
 
@@ -2050,7 +2080,7 @@ reject_slave_controller(struct ofconn *ofconn)
 }
 
 static enum ofperr
-handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
+handle_packet_out(struct ofconn *ofconn, const struct ofp_header *oh)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofputil_packet_out po;
@@ -2069,7 +2099,7 @@ handle_packet_out(struct ofconn *ofconn, const struct ofp_packet_out *opo)
 
     /* Decode message. */
     ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
-    error = ofputil_decode_packet_out(&po, opo, &ofpacts);
+    error = ofputil_decode_packet_out(&po, oh, &ofpacts);
     if (error) {
         goto exit_free_ofpacts;
     }
@@ -2155,13 +2185,14 @@ handle_port_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 
 static enum ofperr
 handle_desc_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *request)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct ofp_desc_stats *ods;
     struct ofpbuf *msg;
 
-    ods = ofputil_make_stats_reply(sizeof *ods, request, &msg);
+    msg = ofpraw_alloc_stats_reply(request, 0);
+    ods = ofpbuf_put_zeros(msg, sizeof *ods);
     ovs_strlcpy(ods->mfr_desc, p->mfr_desc, sizeof ods->mfr_desc);
     ovs_strlcpy(ods->hw_desc, p->hw_desc, sizeof ods->hw_desc);
     ovs_strlcpy(ods->sw_desc, p->sw_desc, sizeof ods->sw_desc);
@@ -2174,15 +2205,14 @@ handle_desc_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_table_stats_request(struct ofconn *ofconn,
-                           const struct ofp_stats_msg *request)
+                           const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
-    struct ofp_table_stats *ots;
+    struct ofp10_table_stats *ots;
     struct ofpbuf *msg;
     size_t i;
 
-    ofputil_make_stats_reply(sizeof(struct ofp_stats_msg), request, &msg);
-
+    msg = ofpraw_alloc_stats_reply(request, sizeof *ots * p->n_tables);
     ots = ofpbuf_put_zeros(msg, sizeof *ots * p->n_tables);
     for (i = 0; i < p->n_tables; i++) {
         ots[i].table_id = i;
@@ -2214,14 +2244,14 @@ static void
 append_port_stat(struct ofport *port, struct list *replies)
 {
     struct netdev_stats stats;
-    struct ofp_port_stats *ops;
+    struct ofp10_port_stats *ops;
 
     /* Intentionally ignore return value, since errors will set
      * 'stats' to all-1s, which is correct for OpenFlow, and
      * netdev_get_stats() will log errors. */
     ofproto_port_get_stats(port, &stats);
 
-    ops = ofputil_append_stats_reply(sizeof *ops, replies);
+    ops = ofpmp_append(replies, sizeof *ops);
     ops->port_no = htons(port->pp.port_no);
     memset(ops->pad, 0, sizeof ops->pad);
     put_32aligned_be64(&ops->rx_packets, htonll(stats.rx_packets));
@@ -2240,13 +2270,14 @@ append_port_stat(struct ofport *port, struct list *replies)
 
 static enum ofperr
 handle_port_stats_request(struct ofconn *ofconn,
-                          const struct ofp_port_stats_request *psr)
+                          const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    const struct ofp10_port_stats_request *psr = ofpmsg_body(request);
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(&psr->osm, &replies);
+    ofpmp_init(&replies, request);
     if (psr->port_no != htons(OFPP_NONE)) {
         port = ofproto_get_port(p, ntohs(psr->port_no));
         if (port) {
@@ -2264,17 +2295,18 @@ handle_port_stats_request(struct ofconn *ofconn,
 
 static enum ofperr
 handle_port_desc_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
+    enum ofp_version version;
     struct ofport *port;
     struct list replies;
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
 
+    version = ofputil_protocol_to_ofp_version(ofconn_get_protocol(ofconn));
     HMAP_FOR_EACH (port, hmap_node, &p->ports) {
-        ofputil_append_port_desc_stats_reply(ofconn_get_protocol(ofconn),
-                                             &port->pp, &replies);
+        ofputil_append_port_desc_stats_reply(version, &port->pp, &replies);
     }
 
     ofconn_send_replies(ofconn, &replies);
@@ -2393,7 +2425,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2437,7 +2470,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2458,7 +2492,7 @@ age_secs(long long int age_ms)
 
 static enum ofperr
 handle_flow_stats_request(struct ofconn *ofconn,
-                          const struct ofp_stats_msg *osm)
+                          const struct ofp_header *request)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request fsr;
@@ -2467,7 +2501,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&fsr, &osm->header);
+    error = ofputil_decode_flow_stats_request(&fsr, request);
     if (error) {
         return error;
     }
@@ -2479,7 +2513,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
         return error;
     }
 
-    ofputil_start_stats_reply(osm, &replies);
+    ofpmp_init(&replies, request);
     LIST_FOR_EACH (rule, ofproto_node, &rules) {
         long long int now = time_msec();
         struct ofputil_flow_stats fs;
@@ -2604,7 +2638,7 @@ ofproto_port_get_cfm_health(const struct ofproto *ofproto, uint16_t ofp_port)
 
 static enum ofperr
 handle_aggregate_stats_request(struct ofconn *ofconn,
-                               const struct ofp_stats_msg *osm)
+                               const struct ofp_header *oh)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct ofputil_flow_stats_request request;
@@ -2615,7 +2649,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
     struct rule *rule;
     enum ofperr error;
 
-    error = ofputil_decode_flow_stats_request(&request, &osm->header);
+    error = ofputil_decode_flow_stats_request(&request, oh);
     if (error) {
         return error;
     }
@@ -2657,7 +2691,7 @@ handle_aggregate_stats_request(struct ofconn *ofconn,
         stats.byte_count = UINT64_MAX;
     }
 
-    reply = ofputil_encode_aggregate_stats_reply(&stats, osm);
+    reply = ofputil_encode_aggregate_stats_reply(&stats, oh);
     ofconn_send_reply(ofconn, reply);
 
     return 0;
@@ -2672,9 +2706,9 @@ static void
 put_queue_stats(struct queue_stats_cbdata *cbdata, uint32_t queue_id,
                 const struct netdev_queue_stats *stats)
 {
-    struct ofp_queue_stats *reply;
+    struct ofp10_queue_stats *reply;
 
-    reply = ofputil_append_stats_reply(sizeof *reply, &cbdata->replies);
+    reply = ofpmp_append(&cbdata->replies, sizeof *reply);
     reply->port_no = htons(cbdata->ofport->pp.port_no);
     memset(reply->pad, 0, sizeof reply->pad);
     reply->queue_id = htonl(queue_id);
@@ -2715,9 +2749,10 @@ handle_queue_stats_for_port(struct ofport *port, uint32_t queue_id,
 
 static enum ofperr
 handle_queue_stats_request(struct ofconn *ofconn,
-                           const struct ofp_queue_stats_request *qsr)
+                           const struct ofp_header *rq)
 {
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    const struct ofp10_queue_stats_request *qsr = ofpmsg_body(rq);
     struct queue_stats_cbdata cbdata;
     unsigned int port_no;
     struct ofport *port;
@@ -2726,7 +2761,7 @@ handle_queue_stats_request(struct ofconn *ofconn,
 
     COVERAGE_INC(ofproto_queue_req);
 
-    ofputil_start_stats_reply(&qsr->osm, &cbdata.replies);
+    ofpmp_init(&cbdata.replies, rq);
 
     port_no = ntohs(qsr->port_no);
     queue_id = ntohl(qsr->queue_id);
@@ -2855,6 +2890,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
+    rule->monitor_flags = 0;
+    rule->add_seqno = 0;
+    rule->modify_seqno = 0;
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
@@ -2886,7 +2924,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-        op = ofoperation_create(group, rule, OFOPERATION_ADD);
+        op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
         op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
@@ -2950,7 +2988,7 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
             continue;
         }
 
-        op = ofoperation_create(group, rule, OFOPERATION_MODIFY);
+        op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
         rule->flow_cookie = new_cookie;
         if (actions_changed) {
             op->ofpacts = rule->ofpacts;
@@ -3029,7 +3067,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
@@ -3094,7 +3132,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
 
-    if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+    if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
         return;
     }
 
@@ -3104,6 +3142,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
     calc_flow_duration__(rule->created, time_msec(),
                          &fr.duration_sec, &fr.duration_nsec);
     fr.idle_timeout = rule->idle_timeout;
+    fr.hard_timeout = rule->hard_timeout;
     rule->ofproto->ofproto_class->rule_get_stats(rule, &fr.packet_count,
                                                  &fr.byte_count);
 
@@ -3144,7 +3183,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
@@ -3172,9 +3211,9 @@ handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
         goto exit_free_ofpacts;
     }
 
+    if (fm.flags & OFPFF10_EMERG) {
     /* We do not support the OpenFlow 1.0 emergency flow cache, which is not
      * required in OpenFlow 1.0.1 and removed from OpenFlow 1.1. */
-    if (fm.flags & OFPFF_EMERG) {
         /* We do not support the emergency flow cache.  It will hopefully get
          * dropped from OpenFlow in the near future.  There is no good error
          * code, so just state that the flow table is full. */
@@ -3257,7 +3296,7 @@ handle_flow_mod__(struct ofproto *ofproto, struct ofconn *ofconn,
 static enum ofperr
 handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct nx_role_request *nrr = (struct nx_role_request *) oh;
+    const struct nx_role_request *nrr = ofpmsg_body(oh);
     struct nx_role_request *reply;
     struct ofpbuf *buf;
     uint32_t role;
@@ -3275,7 +3314,8 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
 
     ofconn_set_role(ofconn, role);
 
-    reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply(OFPRAW_NXT_ROLE_REPLY, oh, 0);
+    reply = ofpbuf_put_zeros(buf, sizeof *reply);
     reply->role = htonl(role);
     ofconn_send_reply(ofconn, buf);
 
@@ -3286,8 +3326,7 @@ static enum ofperr
 handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_flow_mod_table_id *msg
-        = (const struct nx_flow_mod_table_id *) oh;
+    const struct nx_flow_mod_table_id *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
 
     cur = ofconn_get_protocol(ofconn);
@@ -3300,8 +3339,7 @@ handle_nxt_flow_mod_table_id(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_flow_format(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_set_flow_format *msg
-        = (const struct nx_set_flow_format *) oh;
+    const struct nx_set_flow_format *msg = ofpmsg_body(oh);
     enum ofputil_protocol cur, next;
     enum ofputil_protocol next_base;
 
@@ -3325,10 +3363,9 @@ static enum ofperr
 handle_nxt_set_packet_in_format(struct ofconn *ofconn,
                                 const struct ofp_header *oh)
 {
-    const struct nx_set_packet_in_format *msg;
+    const struct nx_set_packet_in_format *msg = ofpmsg_body(oh);
     uint32_t format;
 
-    msg = (const struct nx_set_packet_in_format *) oh;
     format = ntohl(msg->format);
     if (format != NXPIF_OPENFLOW10 && format != NXPIF_NXM) {
         return OFPERR_OFPBRC_EPERM;
@@ -3347,7 +3384,7 @@ handle_nxt_set_packet_in_format(struct ofconn *ofconn,
 static enum ofperr
 handle_nxt_set_async_config(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    const struct nx_async_config *msg = (const struct nx_async_config *) oh;
+    const struct nx_async_config *msg = ofpmsg_body(oh);
     uint32_t master[OAM_N_TYPES];
     uint32_t slave[OAM_N_TYPES];
 
@@ -3372,9 +3409,8 @@ static enum ofperr
 handle_nxt_set_controller_id(struct ofconn *ofconn,
                              const struct ofp_header *oh)
 {
-    const struct nx_controller_id *nci;
+    const struct nx_controller_id *nci = ofpmsg_body(oh);
 
-    nci = (const struct nx_controller_id *) oh;
     if (!is_all_zeros(nci->zero, sizeof nci->zero)) {
         return OFPERR_NXBRC_MUST_BE_ZERO;
     }
@@ -3392,131 +3428,375 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return OFPROTO_POSTPONE;
     }
 
-    make_openflow_xid(sizeof *oh, OFPT10_BARRIER_REPLY, oh->xid, &buf);
+    buf = ofpraw_alloc_reply((oh->version == OFP10_VERSION
+                              ? OFPRAW_OFPT10_BARRIER_REPLY
+                              : OFPRAW_OFPT11_BARRIER_REPLY), oh, 0);
     ofconn_send_reply(ofconn, buf);
     return 0;
 }
 
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+                                    enum nx_flow_monitor_flags flags,
+                                    struct list *msgs)
+{
+    struct ofoperation *op = rule->pending;
+    struct ofputil_flow_update fu;
+
+    if (op && op->type == OFOPERATION_ADD && !op->victim) {
+        /* We'll report the final flow when the operation completes.  Reporting
+         * it now would cause a duplicate report later. */
+        return;
+    }
+
+    fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+                ? NXFME_ADDED : NXFME_MODIFIED);
+    fu.reason = 0;
+    fu.idle_timeout = rule->idle_timeout;
+    fu.hard_timeout = rule->hard_timeout;
+    fu.table_id = rule->table_id;
+    fu.cookie = rule->flow_cookie;
+    fu.match = CONST_CAST(struct cls_rule *, &rule->cr);
+    if (!(flags & NXFMF_ACTIONS)) {
+        fu.ofpacts = NULL;
+        fu.ofpacts_len = 0;
+    } else if (!op) {
+        fu.ofpacts = rule->ofpacts;
+        fu.ofpacts_len = rule->ofpacts_len;
+    } else {
+        /* An operation is in progress.  Use the previous version of the flow's
+         * actions, so that when the operation commits we report the change. */
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            /* We already verified that there was a victim. */
+            fu.ofpacts = op->victim->ofpacts;
+            fu.ofpacts_len = op->victim->ofpacts_len;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (op->ofpacts) {
+                fu.ofpacts = op->ofpacts;
+                fu.ofpacts_len = op->ofpacts_len;
+            } else {
+                fu.ofpacts = rule->ofpacts;
+                fu.ofpacts_len = rule->ofpacts_len;
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            fu.ofpacts = rule->ofpacts;
+            fu.ofpacts_len = rule->ofpacts_len;
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+    }
+
+    if (list_is_empty(msgs)) {
+        ofputil_start_flow_update(msgs);
+    }
+    ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+    struct rule *rule;
+
+    LIST_FOR_EACH (rule, ofproto_node, rules) {
+        enum nx_flow_monitor_flags flags = rule->monitor_flags;
+        rule->monitor_flags = 0;
+
+        ofproto_compose_flow_refresh_update(rule, flags, msgs);
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+                                       struct rule *rule, uint64_t seqno,
+                                       struct list *rules)
+{
+    enum nx_flow_monitor_flags update;
+
+    if (ofproto_rule_is_hidden(rule)) {
+        return;
+    }
+
+    if (!(rule->pending
+          ? ofoperation_has_out_port(rule->pending, m->out_port)
+          : ofproto_rule_has_out_port(rule, m->out_port))) {
+        return;
+    }
+
+    if (seqno) {
+        if (rule->add_seqno > seqno) {
+            update = NXFMF_ADD | NXFMF_MODIFY;
+        } else if (rule->modify_seqno > seqno) {
+            update = NXFMF_MODIFY;
+        } else {
+            return;
+        }
+
+        if (!(m->flags & update)) {
+            return;
+        }
+    } else {
+        update = NXFMF_INITIAL;
+    }
+
+    if (!rule->monitor_flags) {
+        list_push_back(rules, &rule->ofproto_node);
+    }
+    rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+                                        uint64_t seqno,
+                                        struct list *rules)
+{
+    const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+    const struct ofoperation *op;
+    const struct oftable *table;
+
+    FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &table->cls, &m->match);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            assert(!rule->pending); /* XXX */
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+
+    HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+        struct rule *rule = op->rule;
+
+        if (((m->table_id == 0xff
+              ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+              : m->table_id == rule->table_id))
+            && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+                                        struct list *rules)
+{
+    if (m->flags & NXFMF_INITIAL) {
+        ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+    }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+                               uint64_t seqno, struct list *rules)
+{
+    ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofmonitor **monitors;
+    size_t n_monitors, allocated_monitors;
+    struct list replies;
+    enum ofperr error;
+    struct list rules;
+    struct ofpbuf b;
+    size_t i;
+
+    error = 0;
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    monitors = NULL;
+    n_monitors = allocated_monitors = 0;
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        struct ofmonitor *m;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval == EOF) {
+            break;
+        } else if (retval) {
+            error = retval;
+            goto error;
+        }
+
+        if (request.table_id != 0xff
+            && request.table_id >= ofproto->n_tables) {
+            error = OFPERR_OFPBRC_BAD_TABLE_ID;
+            goto error;
+        }
+
+        error = ofmonitor_create(&request, ofconn, &m);
+        if (error) {
+            goto error;
+        }
+
+        if (n_monitors >= allocated_monitors) {
+            monitors = x2nrealloc(monitors, &allocated_monitors,
+                                  sizeof *monitors);
+        }
+        monitors[n_monitors++] = m;
+    }
+
+    list_init(&rules);
+    for (i = 0; i < n_monitors; i++) {
+        ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+    }
+
+    ofpmp_init(&replies, oh);
+    ofmonitor_compose_refresh_updates(&rules, &replies);
+    ofconn_send_replies(ofconn, &replies);
+
+    free(monitors);
+
+    return 0;
+
+error:
+    for (i = 0; i < n_monitors; i++) {
+        ofmonitor_destroy(monitors[i]);
+    }
+    free(monitors);
+    return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofmonitor *m;
+    uint32_t id;
+
+    id = ofputil_decode_flow_monitor_cancel(oh);
+    m = ofmonitor_lookup(ofconn, id);
+    if (!m) {
+        return OFPERR_NXBRC_FM_BAD_ID;
+    }
+
+    ofmonitor_destroy(m);
+    return 0;
+}
+
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
     const struct ofp_header *oh = msg->data;
-    const struct ofputil_msg_type *type;
+    enum ofptype type;
     enum ofperr error;
 
-    error = ofputil_decode_msg_type(oh, &type);
+    error = ofptype_decode(&type, oh);
     if (error) {
         return error;
     }
 
-    switch (ofputil_msg_type_code(type)) {
+    switch (type) {
         /* OpenFlow requests. */
-    case OFPUTIL_OFPT_ECHO_REQUEST:
+    case OFPTYPE_ECHO_REQUEST:
         return handle_echo_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_FEATURES_REQUEST:
+    case OFPTYPE_FEATURES_REQUEST:
         return handle_features_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_GET_CONFIG_REQUEST:
+    case OFPTYPE_GET_CONFIG_REQUEST:
         return handle_get_config_request(ofconn, oh);
 
-    case OFPUTIL_OFPT_SET_CONFIG:
-        return handle_set_config(ofconn, msg->data);
+    case OFPTYPE_SET_CONFIG:
+        return handle_set_config(ofconn, oh);
 
-    case OFPUTIL_OFPT_PACKET_OUT:
-        return handle_packet_out(ofconn, msg->data);
+    case OFPTYPE_PACKET_OUT:
+        return handle_packet_out(ofconn, oh);
 
-    case OFPUTIL_OFPT_PORT_MOD:
+    case OFPTYPE_PORT_MOD:
         return handle_port_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_FLOW_MOD:
+    case OFPTYPE_FLOW_MOD:
         return handle_flow_mod(ofconn, oh);
 
-    case OFPUTIL_OFPT_BARRIER_REQUEST:
+    case OFPTYPE_BARRIER_REQUEST:
         return handle_barrier_request(ofconn, oh);
 
         /* OpenFlow replies. */
-    case OFPUTIL_OFPT_ECHO_REPLY:
+    case OFPTYPE_ECHO_REPLY:
         return 0;
 
         /* Nicira extension requests. */
-    case OFPUTIL_NXT_ROLE_REQUEST:
+    case OFPTYPE_ROLE_REQUEST:
         return handle_role_request(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD_TABLE_ID:
+    case OFPTYPE_FLOW_MOD_TABLE_ID:
         return handle_nxt_flow_mod_table_id(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_FLOW_FORMAT:
+    case OFPTYPE_SET_FLOW_FORMAT:
         return handle_nxt_set_flow_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_PACKET_IN_FORMAT:
+    case OFPTYPE_SET_PACKET_IN_FORMAT:
         return handle_nxt_set_packet_in_format(ofconn, oh);
 
-    case OFPUTIL_NXT_SET_CONTROLLER_ID:
+    case OFPTYPE_SET_CONTROLLER_ID:
         return handle_nxt_set_controller_id(ofconn, oh);
 
-    case OFPUTIL_NXT_FLOW_MOD:
-        return handle_flow_mod(ofconn, oh);
-
-    case OFPUTIL_NXT_FLOW_AGE:
+    case OFPTYPE_FLOW_AGE:
         /* Nothing to do. */
         return 0;
 
-    case OFPUTIL_NXT_SET_ASYNC_CONFIG:
+    case OFPTYPE_FLOW_MONITOR_CANCEL:
+        return handle_flow_monitor_cancel(ofconn, oh);
+
+    case OFPTYPE_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
         /* Statistics requests. */
-    case OFPUTIL_OFPST_DESC_REQUEST:
-        return handle_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_FLOW_REQUEST:
-    case OFPUTIL_NXST_FLOW_REQUEST:
-        return handle_flow_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_AGGREGATE_REQUEST:
-    case OFPUTIL_NXST_AGGREGATE_REQUEST:
-        return handle_aggregate_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_TABLE_REQUEST:
-        return handle_table_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_REQUEST:
-        return handle_port_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_QUEUE_REQUEST:
-        return handle_queue_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_OFPST_PORT_DESC_REQUEST:
-        return handle_port_desc_stats_request(ofconn, msg->data);
-
-    case OFPUTIL_MSG_INVALID:
-    case OFPUTIL_OFPT_HELLO:
-    case OFPUTIL_OFPT_ERROR:
-    case OFPUTIL_OFPT_FEATURES_REPLY:
-    case OFPUTIL_OFPT_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPT_PACKET_IN:
-    case OFPUTIL_OFPT_FLOW_REMOVED:
-    case OFPUTIL_OFPT_PORT_STATUS:
-    case OFPUTIL_OFPT_BARRIER_REPLY:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REQUEST:
-    case OFPUTIL_OFPT_QUEUE_GET_CONFIG_REPLY:
-    case OFPUTIL_OFPST_DESC_REPLY:
-    case OFPUTIL_OFPST_FLOW_REPLY:
-    case OFPUTIL_OFPST_QUEUE_REPLY:
-    case OFPUTIL_OFPST_PORT_REPLY:
-    case OFPUTIL_OFPST_TABLE_REPLY:
-    case OFPUTIL_OFPST_AGGREGATE_REPLY:
-    case OFPUTIL_OFPST_PORT_DESC_REPLY:
-    case OFPUTIL_NXT_ROLE_REPLY:
-    case OFPUTIL_NXT_FLOW_REMOVED:
-    case OFPUTIL_NXT_PACKET_IN:
-    case OFPUTIL_NXST_FLOW_REPLY:
-    case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPTYPE_DESC_STATS_REQUEST:
+        return handle_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_STATS_REQUEST:
+        return handle_flow_stats_request(ofconn, oh);
+
+    case OFPTYPE_AGGREGATE_STATS_REQUEST:
+        return handle_aggregate_stats_request(ofconn, oh);
+
+    case OFPTYPE_TABLE_STATS_REQUEST:
+        return handle_table_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_STATS_REQUEST:
+        return handle_port_stats_request(ofconn, oh);
+
+    case OFPTYPE_QUEUE_STATS_REQUEST:
+        return handle_queue_stats_request(ofconn, oh);
+
+    case OFPTYPE_PORT_DESC_STATS_REQUEST:
+        return handle_port_desc_stats_request(ofconn, oh);
+
+    case OFPTYPE_FLOW_MONITOR_STATS_REQUEST:
+        return handle_flow_monitor_request(ofconn, oh);
+
+    case OFPTYPE_HELLO:
+    case OFPTYPE_ERROR:
+    case OFPTYPE_FEATURES_REPLY:
+    case OFPTYPE_GET_CONFIG_REPLY:
+    case OFPTYPE_PACKET_IN:
+    case OFPTYPE_FLOW_REMOVED:
+    case OFPTYPE_PORT_STATUS:
+    case OFPTYPE_BARRIER_REPLY:
+    case OFPTYPE_DESC_STATS_REPLY:
+    case OFPTYPE_FLOW_STATS_REPLY:
+    case OFPTYPE_QUEUE_STATS_REPLY:
+    case OFPTYPE_PORT_STATS_REPLY:
+    case OFPTYPE_TABLE_STATS_REPLY:
+    case OFPTYPE_AGGREGATE_STATS_REPLY:
+    case OFPTYPE_PORT_DESC_STATS_REPLY:
+    case OFPTYPE_ROLE_REPLY:
+    case OFPTYPE_FLOW_MONITOR_PAUSED:
+    case OFPTYPE_FLOW_MONITOR_RESUMED:
+    case OFPTYPE_FLOW_MONITOR_STATS_REPLY:
     default:
-        return (oh->type == OFPT10_STATS_REQUEST ||
-                oh->type == OFPT10_STATS_REPLY
-                ? OFPERR_OFPBRC_BAD_STAT
-                : OFPERR_OFPBRC_BAD_TYPE);
+        return OFPERR_OFPBRC_BAD_TYPE;
     }
 }
 
@@ -3600,6 +3880,10 @@ static void
 ofopgroup_complete(struct ofopgroup *group)
 {
     struct ofproto *ofproto = group->ofproto;
+
+    struct ofconn *abbrev_ofconn;
+    ovs_be32 abbrev_xid;
+
     struct ofoperation *op, *next_op;
     int error;
 
@@ -3630,8 +3914,31 @@ ofopgroup_complete(struct ofopgroup *group)
         }
     }
 
+    if (!error && !list_is_empty(&group->ofconn_node)) {
+        abbrev_ofconn = group->ofconn;
+        abbrev_xid = group->request->xid;
+    } else {
+        abbrev_ofconn = NULL;
+        abbrev_xid = htonl(0);
+    }
     LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
         struct rule *rule = op->rule;
+
+        if (!op->error && !ofproto_rule_is_hidden(rule)) {
+            /* Check that we can just cast from ofoperation_type to
+             * nx_flow_update_event. */
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_ADD
+                              == NXFME_ADDED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_DELETE
+                              == NXFME_DELETED);
+            BUILD_ASSERT_DECL((enum nx_flow_update_event) OFOPERATION_MODIFY
+                              == NXFME_MODIFIED);
+
+            ofmonitor_report(ofproto->connmgr, rule,
+                             (enum nx_flow_update_event) op->type,
+                             op->reason, abbrev_ofconn, abbrev_xid);
+        }
+
         rule->pending = NULL;
 
         switch (op->type) {
@@ -3685,6 +3992,8 @@ ofopgroup_complete(struct ofopgroup *group)
         ofoperation_destroy(op);
     }
 
+    ofmonitor_flush(ofproto->connmgr);
+
     if (!list_is_empty(&group->ofproto_node)) {
         assert(ofproto->n_pending > 0);
         ofproto->n_pending--;
@@ -3704,11 +4013,15 @@ ofopgroup_complete(struct ofopgroup *group)
 /* Initiates a new operation on 'rule', of the specified 'type', within
  * 'group'.  Prior to calling, 'rule' must not have any pending operation.
  *
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted.  For other 'type's, 'reason' is ignored (use 0).
+ *
  * Returns the newly created ofoperation (which is also available as
  * rule->pending). */
 static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
-                   enum ofoperation_type type)
+                   enum ofoperation_type type,
+                   enum ofp_flow_removed_reason reason)
 {
     struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
@@ -3720,6 +4033,7 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
+    op->reason = reason;
     op->flow_cookie = rule->flow_cookie;
 
     group->n_running++;
@@ -3891,7 +4205,8 @@ ofproto_evict(struct ofproto *ofproto)
                 break;
             }
 
-            ofoperation_create(group, rule, OFOPERATION_DELETE);
+            ofoperation_create(group, rule,
+                               OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }