VLOG_DEFINE_THIS_MODULE(connmgr);
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-/* An OpenFlow connection. */
+/* An OpenFlow connection.
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * 'ofproto_mutex' must be held whenever an ofconn is created or destroyed or,
+ * more or less equivalently, whenever an ofconn is added to or removed from a
+ * connmgr. 'ofproto_mutex' doesn't protect the data inside the ofconn. */
struct ofconn {
/* Configuration that persists from one connection to the next. */
};
static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
- enum ofconn_type, bool enable_async_msgs);
-static void ofconn_destroy(struct ofconn *);
-static void ofconn_flush(struct ofconn *);
+ enum ofconn_type, bool enable_async_msgs)
+ OVS_REQUIRES(ofproto_mutex);
+static void ofconn_destroy(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
+static void ofconn_flush(struct ofconn *) OVS_REQUIRES(ofproto_mutex);
static void ofconn_reconfigure(struct ofconn *,
const struct ofproto_controller *);
return;
}
+ ovs_mutex_lock(&ofproto_mutex);
LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
ofconn_destroy(ofconn);
}
+ ovs_mutex_unlock(&ofproto_mutex);
+
hmap_destroy(&mgr->controllers);
HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
connmgr_run(struct connmgr *mgr,
bool (*handle_openflow)(struct ofconn *,
const struct ofpbuf *ofp_msg))
+ OVS_EXCLUDED(ofproto_mutex)
{
struct ofconn *ofconn, *next_ofconn;
struct ofservice *ofservice;
rconn_connect_unreliably(rconn, vconn, name);
free(name);
+ ovs_mutex_lock(&ofproto_mutex);
ofconn = ofconn_create(mgr, rconn, OFCONN_SERVICE,
ofservice->enable_async_msgs);
+ ovs_mutex_unlock(&ofproto_mutex);
+
ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
ofservice->burst_limit);
} else if (retval != EAGAIN) {
- VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+ VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
}
}
if (!retval) {
add_snooper(mgr, vconn);
} else if (retval != EAGAIN) {
- VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+ VLOG_WARN_RL(&rl, "accept failed (%s)", ovs_strerror(retval));
}
}
}
/* OpenFlow configuration. */
static void add_controller(struct connmgr *, const char *target, uint8_t dscp,
- uint32_t allowed_versions);
+ uint32_t allowed_versions)
+ OVS_REQUIRES(ofproto_mutex);
static struct ofconn *find_controller_by_target(struct connmgr *,
const char *target);
static void update_fail_open(struct connmgr *);
connmgr_set_controllers(struct connmgr *mgr,
const struct ofproto_controller *controllers,
size_t n_controllers, uint32_t allowed_versions)
+ OVS_EXCLUDED(ofproto_mutex)
{
bool had_controllers = connmgr_has_controllers(mgr);
struct shash new_controllers;
struct ofservice *ofservice, *next_ofservice;
size_t i;
+ /* Required to add and remove ofconns. This could probably be narrowed to
+ * cover a smaller amount of code, if that yielded some benefit. */
+ ovs_mutex_lock(&ofproto_mutex);
+
/* Create newly configured controllers and services.
* Create a name to ofproto_controller mapping in 'new_controllers'. */
shash_init(&new_controllers);
if (had_controllers != connmgr_has_controllers(mgr)) {
ofproto_flush_flows(mgr->ofproto);
}
+ ovs_mutex_unlock(&ofproto_mutex);
}
/* Drops the connections between 'mgr' and all of its primary and secondary
static void
add_controller(struct connmgr *mgr, const char *target, uint8_t dscp,
uint32_t allowed_versions)
+ OVS_REQUIRES(ofproto_mutex)
{
char *name = ofconn_make_name(mgr, target);
struct ofconn *ofconn;
continue;
}
- if (stream_parse_target_with_default_ports(target,
- OFP_TCP_PORT,
- OFP_SSL_PORT,
- sin)) {
+ if (stream_parse_target_with_default_port(target,
+ OFP_OLD_PORT,
+ sin)) {
n_addrs++;
}
}
if (!error) {
pvconns[n_pvconns++] = pvconn;
} else {
- VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
+ VLOG_ERR("failed to listen on %s: %s", name, ovs_strerror(error));
if (!retval) {
retval = error;
}
memcpy(ofconn->slave_async_config, slave_masks, size);
}
+void
+ofconn_get_async_config(struct ofconn *ofconn,
+ uint32_t *master_masks, uint32_t *slave_masks)
+{
+ size_t size = sizeof ofconn->master_async_config;
+ memcpy(master_masks, ofconn->master_async_config, size);
+ memcpy(slave_masks, ofconn->slave_async_config, size);
+}
+
/* Sends 'msg' on 'ofconn', accounting it as a reply. (If there is a
* sufficient number of OpenFlow replies in-flight on a single ofconn, then the
* connmgr will stop accepting new OpenFlow requests on that ofconn until the
* connection to the next. */
static void
ofconn_flush(struct ofconn *ofconn)
+ OVS_REQUIRES(ofproto_mutex)
{
struct ofmonitor *monitor, *next_monitor;
int i;
static void
ofconn_destroy(struct ofconn *ofconn)
+ OVS_REQUIRES(ofproto_mutex)
{
ofconn_flush(ofconn);
static bool
ofconn_may_recv(const struct ofconn *ofconn)
{
- int count = ofconn->reply_counter->n_packets;
+ int count = rconn_packet_counter_n_packets(ofconn->reply_counter);
return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
}
}
}
+ ovs_mutex_lock(&ofproto_mutex);
if (!rconn_is_alive(ofconn->rconn)) {
ofconn_destroy(ofconn);
} else if (!rconn_is_connected(ofconn->rconn)) {
ofconn_flush(ofconn);
}
+ ovs_mutex_unlock(&ofproto_mutex);
}
static void
/* In-band implementation. */
bool
-connmgr_must_output_local(struct connmgr *mgr, const struct flow *flow,
- odp_port_t local_odp_port,
- const struct nlattr *odp_actions,
- size_t actions_len)
+connmgr_has_in_band(struct connmgr *mgr)
{
- return !mgr->in_band || in_band_rule_check(flow, local_odp_port,
- odp_actions, actions_len);
+ return mgr->in_band != NULL;
}
\f
/* Fail-open and in-band implementation. */
* In-band control has more sophisticated code that manages flows itself. */
void
connmgr_flushed(struct connmgr *mgr)
+ OVS_EXCLUDED(ofproto_mutex)
{
if (mgr->fail_open) {
fail_open_flushed(mgr->fail_open);
enum nx_flow_update_event event,
enum ofp_flow_removed_reason reason,
const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+ OVS_REQUIRES(ofproto_mutex)
{
enum nx_flow_monitor_flags update;
struct ofconn *ofconn;
fu.event = event;
fu.reason = event == NXFME_DELETED ? reason : 0;
- fu.idle_timeout = rule->idle_timeout;
- fu.hard_timeout = rule->hard_timeout;
fu.table_id = rule->table_id;
fu.cookie = rule->flow_cookie;
minimatch_expand(&rule->cr.match, &match);
fu.match = &match;
fu.priority = rule->cr.priority;
+
+ ovs_mutex_lock(&rule->mutex);
+ fu.idle_timeout = rule->idle_timeout;
+ fu.hard_timeout = rule->hard_timeout;
+ ovs_mutex_unlock(&rule->mutex);
+
if (flags & NXFMF_ACTIONS) {
- fu.ofpacts = rule->ofpacts;
- fu.ofpacts_len = rule->ofpacts_len;
+ fu.ofpacts = rule->actions->ofpacts;
+ fu.ofpacts_len = rule->actions->ofpacts_len;
} else {
fu.ofpacts = NULL;
fu.ofpacts_len = 0;
struct ofpbuf *msg, *next;
LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+ unsigned int n_bytes;
+
list_remove(&msg->list_node);
ofconn_send(ofconn, msg, ofconn->monitor_counter);
- if (!ofconn->monitor_paused
- && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+ n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter);
+ if (!ofconn->monitor_paused && n_bytes > 128 * 1024) {
struct ofpbuf *pause;
COVERAGE_INC(ofmonitor_pause);
static void
ofmonitor_resume(struct ofconn *ofconn)
{
+ struct rule_collection rules;
struct ofpbuf *resumed;
struct ofmonitor *m;
- struct list rules;
struct list msgs;
- list_init(&rules);
+ rule_collection_init(&rules);
HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
}
struct ofconn *ofconn;
LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
- if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ if (ofconn->monitor_paused
+ && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) {
COVERAGE_INC(ofmonitor_resume);
ofmonitor_resume(ofconn);
}
struct ofconn *ofconn;
LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
- if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ if (ofconn->monitor_paused
+ && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) {
poll_immediate_wake();
}
}