+\f
+/* Flow monitors (NXST_FLOW_MONITOR). */
+
+/* A counter incremented when something significant happens to an OpenFlow
+ * rule.
+ *
+ * - When a rule is added, its 'add_seqno' and 'modify_seqno' are set to
+ * the current value (which is then incremented).
+ *
+ * - When a rule is modified, its 'modify_seqno' is set to the current
+ * value (which is then incremented).
+ *
+ * Thus, by comparing an old value of monitor_seqno against a rule's
+ * 'add_seqno', one can tell whether the rule was added before or after the old
+ * value was read, and similarly for 'modify_seqno'.
+ *
+ * 32 bits should normally be sufficient (and would be nice, to save space in
+ * each rule) but then we'd have to have some special cases for wraparound.
+ *
+ * We initialize monitor_seqno to 1 to allow 0 to be used as an invalid
+ * value. */
+static uint64_t monitor_seqno = 1;
+
+COVERAGE_DEFINE(ofmonitor_pause);
+COVERAGE_DEFINE(ofmonitor_resume);
+
+enum ofperr
+ofmonitor_create(const struct ofputil_flow_monitor_request *request,
+ struct ofconn *ofconn, struct ofmonitor **monitorp)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct ofmonitor *m;
+
+ *monitorp = NULL;
+
+ m = ofmonitor_lookup(ofconn, request->id);
+ if (m) {
+ return OFPERR_NXBRC_FM_DUPLICATE_ID;
+ }
+
+ m = xmalloc(sizeof *m);
+ m->ofconn = ofconn;
+ hmap_insert(&ofconn->monitors, &m->ofconn_node, hash_int(request->id, 0));
+ m->id = request->id;
+ m->flags = request->flags;
+ m->out_port = request->out_port;
+ m->table_id = request->table_id;
+ minimatch_init(&m->match, &request->match);
+
+ *monitorp = m;
+ return 0;
+}
+
+struct ofmonitor *
+ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct ofmonitor *m;
+
+ HMAP_FOR_EACH_IN_BUCKET (m, ofconn_node, hash_int(id, 0),
+ &ofconn->monitors) {
+ if (m->id == id) {
+ return m;
+ }
+ }
+ return NULL;
+}
+
+void
+ofmonitor_destroy(struct ofmonitor *m)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ if (m) {
+ minimatch_destroy(&m->match);
+ hmap_remove(&m->ofconn->monitors, &m->ofconn_node);
+ free(m);
+ }
+}
+
+void
+ofmonitor_report(struct connmgr *mgr, struct rule *rule,
+ enum nx_flow_update_event event,
+ enum ofp_flow_removed_reason reason,
+ const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ enum nx_flow_monitor_flags update;
+ struct ofconn *ofconn;
+
+ switch (event) {
+ case NXFME_ADDED:
+ update = NXFMF_ADD;
+ rule->add_seqno = rule->modify_seqno = monitor_seqno++;
+ break;
+
+ case NXFME_DELETED:
+ update = NXFMF_DELETE;
+ break;
+
+ case NXFME_MODIFIED:
+ update = NXFMF_MODIFY;
+ rule->modify_seqno = monitor_seqno++;
+ break;
+
+ default:
+ case NXFME_ABBREV:
+ OVS_NOT_REACHED();
+ }
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ enum nx_flow_monitor_flags flags = 0;
+ struct ofmonitor *m;
+
+ if (ofconn->monitor_paused) {
+ /* Only send NXFME_DELETED notifications for flows that were added
+ * before we paused. */
+ if (event != NXFME_DELETED
+ || rule->add_seqno > ofconn->monitor_paused) {
+ continue;
+ }
+ }
+
+ HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+ if (m->flags & update
+ && (m->table_id == 0xff || m->table_id == rule->table_id)
+ && ofoperation_has_out_port(rule->pending, m->out_port)
+ && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+ flags |= m->flags;
+ }
+ }
+
+ if (flags) {
+ if (list_is_empty(&ofconn->updates)) {
+ ofputil_start_flow_update(&ofconn->updates);
+ ofconn->sent_abbrev_update = false;
+ }
+
+ if (ofconn != abbrev_ofconn || ofconn->monitor_paused) {
+ struct ofputil_flow_update fu;
+ struct match match;
+
+ fu.event = event;
+ fu.reason = event == NXFME_DELETED ? reason : 0;
+ fu.table_id = rule->table_id;
+ fu.cookie = rule->flow_cookie;
+ minimatch_expand(&rule->cr.match, &match);
+ fu.match = &match;
+ fu.priority = rule->cr.priority;
+
+ ovs_mutex_lock(&rule->mutex);
+ fu.idle_timeout = rule->idle_timeout;
+ fu.hard_timeout = rule->hard_timeout;
+ ovs_mutex_unlock(&rule->mutex);
+
+ if (flags & NXFMF_ACTIONS) {
+ struct rule_actions *actions = rule_get_actions(rule);
+ fu.ofpacts = actions->ofpacts;
+ fu.ofpacts_len = actions->ofpacts_len;
+ } else {
+ fu.ofpacts = NULL;
+ fu.ofpacts_len = 0;
+ }
+ ofputil_append_flow_update(&fu, &ofconn->updates);
+ } else if (!ofconn->sent_abbrev_update) {
+ struct ofputil_flow_update fu;
+
+ fu.event = NXFME_ABBREV;
+ fu.xid = abbrev_xid;
+ ofputil_append_flow_update(&fu, &ofconn->updates);
+
+ ofconn->sent_abbrev_update = true;
+ }
+ }
+ }
+}
+
+void
+ofmonitor_flush(struct connmgr *mgr)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct ofconn *ofconn;
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ struct ofpbuf *msg, *next;
+
+ LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+ unsigned int n_bytes;
+
+ list_remove(&msg->list_node);
+ ofconn_send(ofconn, msg, ofconn->monitor_counter);
+ n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter);
+ if (!ofconn->monitor_paused && n_bytes > 128 * 1024) {
+ struct ofpbuf *pause;
+
+ COVERAGE_INC(ofmonitor_pause);
+ ofconn->monitor_paused = monitor_seqno++;
+ pause = ofpraw_alloc_xid(OFPRAW_NXT_FLOW_MONITOR_PAUSED,
+ OFP10_VERSION, htonl(0), 0);
+ ofconn_send(ofconn, pause, ofconn->monitor_counter);
+ }
+ }
+ }
+}
+
+static void
+ofmonitor_resume(struct ofconn *ofconn)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct rule_collection rules;
+ struct ofpbuf *resumed;
+ struct ofmonitor *m;
+ struct list msgs;
+
+ rule_collection_init(&rules);
+ HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+ ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
+ }
+
+ list_init(&msgs);
+ ofmonitor_compose_refresh_updates(&rules, &msgs);
+
+ resumed = ofpraw_alloc_xid(OFPRAW_NXT_FLOW_MONITOR_RESUMED, OFP10_VERSION,
+ htonl(0), 0);
+ list_push_back(&msgs, &resumed->list_node);
+ ofconn_send_replies(ofconn, &msgs);
+
+ ofconn->monitor_paused = 0;
+}
+
+static bool
+ofmonitor_may_resume(const struct ofconn *ofconn)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ return (ofconn->monitor_paused != 0
+ && !rconn_packet_counter_n_packets(ofconn->monitor_counter));
+}
+
+static void
+ofmonitor_run(struct connmgr *mgr)
+{
+ struct ofconn *ofconn;
+
+ ovs_mutex_lock(&ofproto_mutex);
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ if (ofmonitor_may_resume(ofconn)) {
+ COVERAGE_INC(ofmonitor_resume);
+ ofmonitor_resume(ofconn);
+ }
+ }
+ ovs_mutex_unlock(&ofproto_mutex);
+}
+
+static void
+ofmonitor_wait(struct connmgr *mgr)
+{
+ struct ofconn *ofconn;
+
+ ovs_mutex_lock(&ofproto_mutex);
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ if (ofmonitor_may_resume(ofconn)) {
+ poll_immediate_wake();
+ }
+ }
+ ovs_mutex_unlock(&ofproto_mutex);
+}