connmgr: Always send full packet in packet_in when not buffering.
[sliver-openvswitch.git] / ofproto / connmgr.c
index 8a84da5..8bb96f0 100644 (file)
 VLOG_DEFINE_THIS_MODULE(connmgr);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
-/* An OpenFlow connection. */
+/* An OpenFlow connection.
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * 'ofproto_mutex' must be held whenever an ofconn is created or destroyed or,
+ * more or less equivalently, whenever an ofconn is added to or removed from a
+ * connmgr.  'ofproto_mutex' doesn't protect the data inside the ofconn, except
+ * as specifically noted below. */
 struct ofconn {
 /* Configuration that persists from one connection to the next. */
 
@@ -60,7 +69,7 @@ struct ofconn {
 /* State that should be cleared from one connection to the next. */
 
     /* OpenFlow state. */
-    enum nx_role role;           /* Role. */
+    enum ofp12_controller_role role;           /* Role. */
     enum ofputil_protocol protocol; /* Current protocol variant. */
     enum nx_packet_in_format packet_in_format; /* OFPT_PACKET_IN format. */
 
@@ -90,25 +99,50 @@ struct ofconn {
     uint32_t master_async_config[OAM_N_TYPES]; /* master, other */
     uint32_t slave_async_config[OAM_N_TYPES];  /* slave */
 
-    /* Flow monitors. */
-    struct hmap monitors;       /* Contains "struct ofmonitor"s. */
-    struct list updates;        /* List of "struct ofpbuf"s. */
-    bool sent_abbrev_update;    /* Does 'updates' contain NXFME_ABBREV? */
-    struct rconn_packet_counter *monitor_counter;
-    uint64_t monitor_paused;
+/* Flow monitors (e.g. NXST_FLOW_MONITOR). */
+
+    /* Configuration.  Contains "struct ofmonitor"s. */
+    struct hmap monitors OVS_GUARDED_BY(ofproto_mutex);
+
+    /* Flow control.
+     *
+     * When too many flow monitor notifications back up in the transmit buffer,
+     * we pause the transmission of further notifications.  These members track
+     * the flow control state.
+     *
+     * When notifications are flowing, 'monitor_paused' is 0.  When
+     * notifications are paused, 'monitor_paused' is the value of
+     * 'monitor_seqno' at the point we paused.
+     *
+     * 'monitor_counter' counts the OpenFlow messages and bytes currently in
+     * flight.  This value growing too large triggers pausing. */
+    uint64_t monitor_paused OVS_GUARDED_BY(ofproto_mutex);
+    struct rconn_packet_counter *monitor_counter OVS_GUARDED_BY(ofproto_mutex);
+
+    /* State of monitors for a single ongoing flow_mod.
+     *
+     * 'updates' is a list of "struct ofpbuf"s that contain
+     * NXST_FLOW_MONITOR_REPLY messages representing the changes made by the
+     * current flow_mod.
+     *
+     * When 'updates' is nonempty, 'sent_abbrev_update' is true if 'updates'
+     * contains an update event of type NXFME_ABBREV and false otherwise.. */
+    struct list updates OVS_GUARDED_BY(ofproto_mutex);
+    bool sent_abbrev_update OVS_GUARDED_BY(ofproto_mutex);
 };
 
 static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
-                                    enum ofconn_type, bool enable_async_msgs);
-static void ofconn_destroy(struct ofconn *);
-static void ofconn_flush(struct ofconn *);
+                                    enum ofconn_type, bool enable_async_msgs)
+    OVS_REQUIRES(ofproto_mutex);
+static void ofconn_destroy(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
+static void ofconn_flush(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
 
 static void ofconn_reconfigure(struct ofconn *,
                                const struct ofproto_controller *);
 
 static void ofconn_run(struct ofconn *,
                        bool (*handle_openflow)(struct ofconn *,
-                                               struct ofpbuf *ofp_msg));
+                                               const struct ofpbuf *ofp_msg));
 static void ofconn_wait(struct ofconn *, bool handling_openflow);
 
 static const char *ofconn_get_target(const struct ofconn *);
@@ -226,9 +260,12 @@ connmgr_destroy(struct connmgr *mgr)
         return;
     }
 
+    ovs_mutex_lock(&ofproto_mutex);
     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
         ofconn_destroy(ofconn);
     }
+    ovs_mutex_unlock(&ofproto_mutex);
+
     hmap_destroy(&mgr->controllers);
 
     HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
@@ -269,7 +306,9 @@ connmgr_destroy(struct connmgr *mgr)
  * fail-open processing) are suppressed too. */
 void
 connmgr_run(struct connmgr *mgr,
-            bool (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+            bool (*handle_openflow)(struct ofconn *,
+                                    const struct ofpbuf *ofp_msg))
+    OVS_EXCLUDED(ofproto_mutex)
 {
     struct ofconn *ofconn, *next_ofconn;
     struct ofservice *ofservice;
@@ -309,12 +348,15 @@ connmgr_run(struct connmgr *mgr,
             rconn_connect_unreliably(rconn, vconn, name);
             free(name);
 
+            ovs_mutex_lock(&ofproto_mutex);
             ofconn = ofconn_create(mgr, rconn, OFCONN_SERVICE,
                                    ofservice->enable_async_msgs);
+            ovs_mutex_unlock(&ofproto_mutex);
+
             ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
                                   ofservice->burst_limit);
         } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+            VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
         }
     }
 
@@ -326,7 +368,7 @@ connmgr_run(struct connmgr *mgr,
         if (!retval) {
             add_snooper(mgr, vconn);
         } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+            VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
         }
     }
 }
@@ -408,7 +450,8 @@ connmgr_retry(struct connmgr *mgr)
 /* OpenFlow configuration. */
 
 static void add_controller(struct connmgr *, const char *target, uint8_t dscp,
-                           uint32_t allowed_versions);
+                           uint32_t allowed_versions)
+    OVS_REQUIRES(ofproto_mutex);
 static struct ofconn *find_controller_by_target(struct connmgr *,
                                                 const char *target);
 static void update_fail_open(struct connmgr *);
@@ -500,6 +543,7 @@ void
 connmgr_set_controllers(struct connmgr *mgr,
                         const struct ofproto_controller *controllers,
                         size_t n_controllers, uint32_t allowed_versions)
+    OVS_EXCLUDED(ofproto_mutex)
 {
     bool had_controllers = connmgr_has_controllers(mgr);
     struct shash new_controllers;
@@ -507,6 +551,10 @@ connmgr_set_controllers(struct connmgr *mgr,
     struct ofservice *ofservice, *next_ofservice;
     size_t i;
 
+    /* Required to add and remove ofconns.  This could probably be narrowed to
+     * cover a smaller amount of code, if that yielded some benefit. */
+    ovs_mutex_lock(&ofproto_mutex);
+
     /* Create newly configured controllers and services.
      * Create a name to ofproto_controller mapping in 'new_controllers'. */
     shash_init(&new_controllers);
@@ -594,6 +642,7 @@ connmgr_set_controllers(struct connmgr *mgr,
     if (had_controllers != connmgr_has_controllers(mgr)) {
         ofproto_flush_flows(mgr->ofproto);
     }
+    ovs_mutex_unlock(&ofproto_mutex);
 }
 
 /* Drops the connections between 'mgr' and all of its primary and secondary
@@ -641,6 +690,7 @@ connmgr_has_snoops(const struct connmgr *mgr)
 static void
 add_controller(struct connmgr *mgr, const char *target, uint8_t dscp,
                uint32_t allowed_versions)
+    OVS_REQUIRES(ofproto_mutex)
 {
     char *name = ofconn_make_name(mgr, target);
     struct ofconn *ofconn;
@@ -690,10 +740,9 @@ update_in_band_remotes(struct connmgr *mgr)
             continue;
         }
 
-        if (stream_parse_target_with_default_ports(target,
-                                                   OFP_TCP_PORT,
-                                                   OFP_SSL_PORT,
-                                                   sin)) {
+        if (stream_parse_target_with_default_port(target,
+                                                  OFP_OLD_PORT,
+                                                  sin)) {
             n_addrs++;
         }
     }
@@ -757,7 +806,7 @@ set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
         if (!error) {
             pvconns[n_pvconns++] = pvconn;
         } else {
-            VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
+            VLOG_ERR("failed to listen on %s: %s", name, ovs_strerror(error));
             if (!retval) {
                 retval = error;
             }
@@ -777,12 +826,13 @@ static int
 snoop_preference(const struct ofconn *ofconn)
 {
     switch (ofconn->role) {
-    case NX_ROLE_MASTER:
+    case OFPCR12_ROLE_MASTER:
         return 3;
-    case NX_ROLE_OTHER:
+    case OFPCR12_ROLE_EQUAL:
         return 2;
-    case NX_ROLE_SLAVE:
+    case OFPCR12_ROLE_SLAVE:
         return 1;
+    case OFPCR12_ROLE_NOCHANGE:
     default:
         /* Shouldn't happen. */
         return 0;
@@ -822,6 +872,17 @@ ofconn_get_type(const struct ofconn *ofconn)
     return ofconn->type;
 }
 
+/* If a master election id is defined, stores it into '*idp' and returns
+ * true.  Otherwise, stores UINT64_MAX into '*idp' and returns false. */
+bool
+ofconn_get_master_election_id(const struct ofconn *ofconn, uint64_t *idp)
+{
+    *idp = (ofconn->connmgr->master_election_id_defined
+            ? ofconn->connmgr->master_election_id
+            : UINT64_MAX);
+    return ofconn->connmgr->master_election_id_defined;
+}
+
 /* Sets the master election id.
  *
  * Returns true if successful, false if the id is stale
@@ -844,24 +905,24 @@ ofconn_set_master_election_id(struct ofconn *ofconn, uint64_t id)
 
 /* Returns the role configured for 'ofconn'.
  *
- * The default role, if no other role has been set, is NX_ROLE_OTHER. */
-enum nx_role
+ * The default role, if no other role has been set, is OFPCR12_ROLE_EQUAL. */
+enum ofp12_controller_role
 ofconn_get_role(const struct ofconn *ofconn)
 {
     return ofconn->role;
 }
 
-/* Changes 'ofconn''s role to 'role'.  If 'role' is NX_ROLE_MASTER then any
- * existing master is demoted to a slave. */
+/* Changes 'ofconn''s role to 'role'.  If 'role' is OFPCR12_ROLE_MASTER then
+ * any existing master is demoted to a slave. */
 void
-ofconn_set_role(struct ofconn *ofconn, enum nx_role role)
+ofconn_set_role(struct ofconn *ofconn, enum ofp12_controller_role role)
 {
-    if (role == NX_ROLE_MASTER) {
+    if (role == OFPCR12_ROLE_MASTER) {
         struct ofconn *other;
 
         HMAP_FOR_EACH (other, hmap_node, &ofconn->connmgr->controllers) {
-            if (other->role == NX_ROLE_MASTER) {
-                other->role = NX_ROLE_SLAVE;
+            if (other->role == OFPCR12_ROLE_MASTER) {
+                other->role = OFPCR12_ROLE_SLAVE;
             }
         }
     }
@@ -972,6 +1033,15 @@ ofconn_set_async_config(struct ofconn *ofconn,
     memcpy(ofconn->slave_async_config, slave_masks, size);
 }
 
+void
+ofconn_get_async_config(struct ofconn *ofconn,
+                        uint32_t *master_masks, uint32_t *slave_masks)
+{
+    size_t size = sizeof ofconn->master_async_config;
+    memcpy(master_masks, ofconn->master_async_config, size);
+    memcpy(slave_masks, ofconn->slave_async_config, size);
+}
+
 /* Sends 'msg' on 'ofconn', accounting it as a reply.  (If there is a
  * sufficient number of OpenFlow replies in-flight on a single ofconn, then the
  * connmgr will stop accepting new OpenFlow requests on that ofconn until the
@@ -1026,7 +1096,7 @@ ofconn_send_error(const struct ofconn *ofconn,
 /* Same as pktbuf_retrieve(), using the pktbuf owned by 'ofconn'. */
 enum ofperr
 ofconn_pktbuf_retrieve(struct ofconn *ofconn, uint32_t id,
-                       struct ofpbuf **bufferp, uint16_t *in_port)
+                       struct ofpbuf **bufferp, ofp_port_t *in_port)
 {
     return pktbuf_retrieve(ofconn->pktbuf, id, bufferp, in_port);
 }
@@ -1088,11 +1158,12 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
  * connection to the next. */
 static void
 ofconn_flush(struct ofconn *ofconn)
+    OVS_REQUIRES(ofproto_mutex)
 {
     struct ofmonitor *monitor, *next_monitor;
     int i;
 
-    ofconn->role = NX_ROLE_OTHER;
+    ofconn->role = OFPCR12_ROLE_EQUAL;
     ofconn_set_protocol(ofconn, OFPUTIL_P_NONE);
     ofconn->packet_in_format = NXPIF_OPENFLOW10;
 
@@ -1170,6 +1241,7 @@ ofconn_flush(struct ofconn *ofconn)
 
 static void
 ofconn_destroy(struct ofconn *ofconn)
+    OVS_REQUIRES(ofproto_mutex)
 {
     ofconn_flush(ofconn);
 
@@ -1177,6 +1249,7 @@ ofconn_destroy(struct ofconn *ofconn)
         hmap_remove(&ofconn->connmgr->controllers, &ofconn->hmap_node);
     }
 
+    hmap_destroy(&ofconn->monitors);
     list_remove(&ofconn->node);
     rconn_destroy(ofconn->rconn);
     rconn_packet_counter_destroy(ofconn->packet_in_counter);
@@ -1215,13 +1288,14 @@ ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c)
 static bool
 ofconn_may_recv(const struct ofconn *ofconn)
 {
-    int count = ofconn->reply_counter->n_packets;
+    int count = rconn_packet_counter_n_packets(ofconn->reply_counter);
     return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
 }
 
 static void
 ofconn_run(struct ofconn *ofconn,
-           bool (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+           bool (*handle_openflow)(struct ofconn *,
+                                   const struct ofpbuf *ofp_msg))
 {
     struct connmgr *mgr = ofconn->connmgr;
     size_t i;
@@ -1257,11 +1331,13 @@ ofconn_run(struct ofconn *ofconn,
         }
     }
 
+    ovs_mutex_lock(&ofproto_mutex);
     if (!rconn_is_alive(ofconn->rconn)) {
         ofconn_destroy(ofconn);
     } else if (!rconn_is_connected(ofconn->rconn)) {
         ofconn_flush(ofconn);
     }
+    ovs_mutex_unlock(&ofproto_mutex);
 }
 
 static void
@@ -1290,8 +1366,8 @@ ofconn_receives_async_msg(const struct ofconn *ofconn,
 {
     const uint32_t *async_config;
 
-    assert(reason < 32);
-    assert((unsigned int) type < OAM_N_TYPES);
+    ovs_assert(reason < 32);
+    ovs_assert((unsigned int) type < OAM_N_TYPES);
 
     if (ofconn_get_protocol(ofconn) == OFPUTIL_P_NONE
         || !rconn_is_connected(ofconn->rconn)) {
@@ -1307,7 +1383,7 @@ ofconn_receives_async_msg(const struct ofconn *ofconn,
         return false;
     }
 
-    async_config = (ofconn->role == NX_ROLE_SLAVE
+    async_config = (ofconn->role == OFPCR12_ROLE_SLAVE
                     ? ofconn->slave_async_config
                     : ofconn->master_async_config);
     if (!(async_config[type] & (1u << reason))) {
@@ -1440,11 +1516,20 @@ static void
 schedule_packet_in(struct ofconn *ofconn, struct ofputil_packet_in pin)
 {
     struct connmgr *mgr = ofconn->connmgr;
+    uint16_t controller_max_len;
 
     pin.total_len = pin.packet_len;
 
-    /* Get OpenFlow buffer_id. */
     if (pin.reason == OFPR_ACTION) {
+        controller_max_len = pin.send_len;  /* max_len */
+    } else {
+        controller_max_len = ofconn->miss_send_len;
+    }
+
+    /* Get OpenFlow buffer_id.
+     * For OpenFlow 1.2+, OFPCML_NO_BUFFER (== UINT16_MAX) specifies
+     * unbuffered.  This behaviour doesn't violate prior versions, too. */
+    if (controller_max_len == UINT16_MAX) {
         pin.buffer_id = UINT32_MAX;
     } else if (mgr->fail_open && fail_open_is_active(mgr->fail_open)) {
         pin.buffer_id = pktbuf_get_null();
@@ -1455,15 +1540,13 @@ schedule_packet_in(struct ofconn *ofconn, struct ofputil_packet_in pin)
                                     pin.fmd.in_port);
     }
 
-    /* Figure out how much of the packet to send. */
-    if (pin.reason == OFPR_NO_MATCH) {
+    /* Figure out how much of the packet to send.
+     * If not buffered, send the entire packet.  Otherwise, depending on
+     * the reason of packet-in, send what requested by the controller. */
+    if (pin.buffer_id == UINT32_MAX) {
         pin.send_len = pin.packet_len;
     } else {
-        /* Caller should have initialized 'send_len' to 'max_len' specified in
-         * output action. */
-    }
-    if (pin.buffer_id != UINT32_MAX) {
-        pin.send_len = MIN(pin.send_len, ofconn->miss_send_len);
+        pin.send_len = MIN(pin.packet_len, controller_max_len);
     }
 
     /* Make OFPT_PACKET_IN and hand over to packet scheduler.  It might
@@ -1630,18 +1713,9 @@ any_extras_changed(const struct connmgr *mgr,
 /* In-band implementation. */
 
 bool
-connmgr_msg_in_hook(struct connmgr *mgr, const struct flow *flow,
-                    const struct ofpbuf *packet)
+connmgr_has_in_band(struct connmgr *mgr)
 {
-    return mgr->in_band && in_band_msg_in_hook(mgr->in_band, flow, packet);
-}
-
-bool
-connmgr_may_set_up_flow(struct connmgr *mgr, const struct flow *flow,
-                        const struct nlattr *odp_actions,
-                        size_t actions_len)
-{
-    return !mgr->in_band || in_band_rule_check(flow, odp_actions, actions_len);
+    return mgr->in_band != NULL;
 }
 \f
 /* Fail-open and in-band implementation. */
@@ -1652,6 +1726,7 @@ connmgr_may_set_up_flow(struct connmgr *mgr, const struct flow *flow,
  * In-band control has more sophisticated code that manages flows itself. */
 void
 connmgr_flushed(struct connmgr *mgr)
+    OVS_EXCLUDED(ofproto_mutex)
 {
     if (mgr->fail_open) {
         fail_open_flushed(mgr->fail_open);
@@ -1766,6 +1841,7 @@ COVERAGE_DEFINE(ofmonitor_resume);
 enum ofperr
 ofmonitor_create(const struct ofputil_flow_monitor_request *request,
                  struct ofconn *ofconn, struct ofmonitor **monitorp)
+    OVS_REQUIRES(ofproto_mutex)
 {
     struct ofmonitor *m;
 
@@ -1791,6 +1867,7 @@ ofmonitor_create(const struct ofputil_flow_monitor_request *request,
 
 struct ofmonitor *
 ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
+    OVS_REQUIRES(ofproto_mutex)
 {
     struct ofmonitor *m;
 
@@ -1805,6 +1882,7 @@ ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
 
 void
 ofmonitor_destroy(struct ofmonitor *m)
+    OVS_REQUIRES(ofproto_mutex)
 {
     if (m) {
         minimatch_destroy(&m->match);
@@ -1818,6 +1896,7 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule,
                  enum nx_flow_update_event event,
                  enum ofp_flow_removed_reason reason,
                  const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+    OVS_REQUIRES(ofproto_mutex)
 {
     enum nx_flow_monitor_flags update;
     struct ofconn *ofconn;
@@ -1876,16 +1955,20 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule,
 
                 fu.event = event;
                 fu.reason = event == NXFME_DELETED ? reason : 0;
-                fu.idle_timeout = rule->idle_timeout;
-                fu.hard_timeout = rule->hard_timeout;
                 fu.table_id = rule->table_id;
                 fu.cookie = rule->flow_cookie;
                 minimatch_expand(&rule->cr.match, &match);
                 fu.match = &match;
                 fu.priority = rule->cr.priority;
+
+                ovs_mutex_lock(&rule->mutex);
+                fu.idle_timeout = rule->idle_timeout;
+                fu.hard_timeout = rule->hard_timeout;
+                ovs_mutex_unlock(&rule->mutex);
+
                 if (flags & NXFMF_ACTIONS) {
-                    fu.ofpacts = rule->ofpacts;
-                    fu.ofpacts_len = rule->ofpacts_len;
+                    fu.ofpacts = rule->actions->ofpacts;
+                    fu.ofpacts_len = rule->actions->ofpacts_len;
                 } else {
                     fu.ofpacts = NULL;
                     fu.ofpacts_len = 0;
@@ -1906,6 +1989,7 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule,
 
 void
 ofmonitor_flush(struct connmgr *mgr)
+    OVS_REQUIRES(ofproto_mutex)
 {
     struct ofconn *ofconn;
 
@@ -1913,10 +1997,12 @@ ofmonitor_flush(struct connmgr *mgr)
         struct ofpbuf *msg, *next;
 
         LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+            unsigned int n_bytes;
+
             list_remove(&msg->list_node);
             ofconn_send(ofconn, msg, ofconn->monitor_counter);
-            if (!ofconn->monitor_paused
-                && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+            n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter);
+            if (!ofconn->monitor_paused && n_bytes > 128 * 1024) {
                 struct ofpbuf *pause;
 
                 COVERAGE_INC(ofmonitor_pause);
@@ -1931,13 +2017,14 @@ ofmonitor_flush(struct connmgr *mgr)
 
 static void
 ofmonitor_resume(struct ofconn *ofconn)
+    OVS_REQUIRES(ofproto_mutex)
 {
+    struct rule_collection rules;
     struct ofpbuf *resumed;
     struct ofmonitor *m;
-    struct list rules;
     struct list msgs;
 
-    list_init(&rules);
+    rule_collection_init(&rules);
     HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
         ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
     }
@@ -1953,17 +2040,27 @@ ofmonitor_resume(struct ofconn *ofconn)
     ofconn->monitor_paused = 0;
 }
 
+static bool
+ofmonitor_may_resume(const struct ofconn *ofconn)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    return (ofconn->monitor_paused != 0
+            && !rconn_packet_counter_n_packets(ofconn->monitor_counter));
+}
+
 static void
 ofmonitor_run(struct connmgr *mgr)
 {
     struct ofconn *ofconn;
 
+    ovs_mutex_lock(&ofproto_mutex);
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
-        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+        if (ofmonitor_may_resume(ofconn)) {
             COVERAGE_INC(ofmonitor_resume);
             ofmonitor_resume(ofconn);
         }
     }
+    ovs_mutex_unlock(&ofproto_mutex);
 }
 
 static void
@@ -1971,9 +2068,11 @@ ofmonitor_wait(struct connmgr *mgr)
 {
     struct ofconn *ofconn;
 
+    ovs_mutex_lock(&ofproto_mutex);
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
-        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+        if (ofmonitor_may_resume(ofconn)) {
             poll_immediate_wake();
         }
     }
+    ovs_mutex_unlock(&ofproto_mutex);
 }