+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+ enum nx_flow_monitor_flags flags,
+ struct list *msgs)
+{
+ struct ofoperation *op = rule->pending;
+ struct ofputil_flow_update fu;
+ struct match match;
+
+ if (op && op->type == OFOPERATION_ADD && !op->victim) {
+ /* We'll report the final flow when the operation completes. Reporting
+ * it now would cause a duplicate report later. */
+ return;
+ }
+
+ fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+ ? NXFME_ADDED : NXFME_MODIFIED);
+ fu.reason = 0;
+ fu.idle_timeout = rule->idle_timeout;
+ fu.hard_timeout = rule->hard_timeout;
+ fu.table_id = rule->table_id;
+ fu.cookie = rule->flow_cookie;
+ minimatch_expand(&rule->cr.match, &match);
+ fu.match = &match;
+ fu.priority = rule->cr.priority;
+ if (!(flags & NXFMF_ACTIONS)) {
+ fu.ofpacts = NULL;
+ fu.ofpacts_len = 0;
+ } else if (!op) {
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ } else {
+ /* An operation is in progress. Use the previous version of the flow's
+ * actions, so that when the operation commits we report the change. */
+ switch (op->type) {
+ case OFOPERATION_ADD:
+ /* We already verified that there was a victim. */
+ fu.ofpacts = op->victim->ofpacts;
+ fu.ofpacts_len = op->victim->ofpacts_len;
+ break;
+
+ case OFOPERATION_MODIFY:
+ if (op->ofpacts) {
+ fu.ofpacts = op->ofpacts;
+ fu.ofpacts_len = op->ofpacts_len;
+ } else {
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ }
+ break;
+
+ case OFOPERATION_DELETE:
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+ }
+
+ if (list_is_empty(msgs)) {
+ ofputil_start_flow_update(msgs);
+ }
+ ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+ struct rule *rule;
+
+ LIST_FOR_EACH (rule, ofproto_node, rules) {
+ enum nx_flow_monitor_flags flags = rule->monitor_flags;
+ rule->monitor_flags = 0;
+
+ ofproto_compose_flow_refresh_update(rule, flags, msgs);
+ }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+ struct rule *rule, uint64_t seqno,
+ struct list *rules)
+{
+ enum nx_flow_monitor_flags update;
+
+ if (ofproto_rule_is_hidden(rule)) {
+ return;
+ }
+
+ if (!(rule->pending
+ ? ofoperation_has_out_port(rule->pending, m->out_port)
+ : ofproto_rule_has_out_port(rule, m->out_port))) {
+ return;
+ }
+
+ if (seqno) {
+ if (rule->add_seqno > seqno) {
+ update = NXFMF_ADD | NXFMF_MODIFY;
+ } else if (rule->modify_seqno > seqno) {
+ update = NXFMF_MODIFY;
+ } else {
+ return;
+ }
+
+ if (!(m->flags & update)) {
+ return;
+ }
+ } else {
+ update = NXFMF_INITIAL;
+ }
+
+ if (!rule->monitor_flags) {
+ list_push_back(rules, &rule->ofproto_node);
+ }
+ rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+ uint64_t seqno,
+ struct list *rules)
+{
+ const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+ const struct ofoperation *op;
+ const struct oftable *table;
+ struct cls_rule target;
+
+ cls_rule_init_from_minimatch(&target, &m->match, 0);
+ FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+ struct cls_cursor cursor;
+ struct rule *rule;
+
+ cls_cursor_init(&cursor, &table->cls, &target);
+ CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+ assert(!rule->pending); /* XXX */
+ ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+ }
+ }
+
+ HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+ struct rule *rule = op->rule;
+
+ if (((m->table_id == 0xff
+ ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+ : m->table_id == rule->table_id))
+ && cls_rule_is_loose_match(&rule->cr, &target.match)) {
+ ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+ }
+ }
+ cls_rule_destroy(&target);
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+ struct list *rules)
+{
+ if (m->flags & NXFMF_INITIAL) {
+ ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+ }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+ uint64_t seqno, struct list *rules)
+{
+ ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+ struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+ struct ofmonitor **monitors;
+ size_t n_monitors, allocated_monitors;
+ struct list replies;
+ enum ofperr error;
+ struct list rules;
+ struct ofpbuf b;
+ size_t i;
+
+ error = 0;
+ ofpbuf_use_const(&b, oh, ntohs(oh->length));
+ monitors = NULL;
+ n_monitors = allocated_monitors = 0;
+ for (;;) {
+ struct ofputil_flow_monitor_request request;
+ struct ofmonitor *m;
+ int retval;
+
+ retval = ofputil_decode_flow_monitor_request(&request, &b);
+ if (retval == EOF) {
+ break;
+ } else if (retval) {
+ error = retval;
+ goto error;
+ }
+
+ if (request.table_id != 0xff
+ && request.table_id >= ofproto->n_tables) {
+ error = OFPERR_OFPBRC_BAD_TABLE_ID;
+ goto error;
+ }
+
+ error = ofmonitor_create(&request, ofconn, &m);
+ if (error) {
+ goto error;
+ }
+
+ if (n_monitors >= allocated_monitors) {
+ monitors = x2nrealloc(monitors, &allocated_monitors,
+ sizeof *monitors);
+ }
+ monitors[n_monitors++] = m;
+ }
+
+ list_init(&rules);
+ for (i = 0; i < n_monitors; i++) {
+ ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+ }
+
+ ofpmp_init(&replies, oh);
+ ofmonitor_compose_refresh_updates(&rules, &replies);
+ ofconn_send_replies(ofconn, &replies);
+
+ free(monitors);
+
+ return 0;
+
+error:
+ for (i = 0; i < n_monitors; i++) {
+ ofmonitor_destroy(monitors[i]);
+ }
+ free(monitors);
+ return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+ struct ofmonitor *m;
+ uint32_t id;
+
+ id = ofputil_decode_flow_monitor_cancel(oh);
+ m = ofmonitor_lookup(ofconn, id);
+ if (!m) {
+ return OFPERR_NXBRC_FM_BAD_ID;
+ }
+
+ ofmonitor_destroy(m);
+ return 0;
+}
+