X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=ofproto%2Fconnmgr.c;h=8bb96f028191ae5f2dde66e90f498e7d53d14fe5;hb=e441a806a0387487080ca16176ffdee7a75b3d1e;hp=21727898c7975883d47977bce1d37c9b17757110;hpb=6f00e29b8b3f6996eceffc57b47bf707589d8335;p=sliver-openvswitch.git diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c index 21727898c..8bb96f028 100644 --- a/ofproto/connmgr.c +++ b/ofproto/connmgr.c @@ -44,7 +44,16 @@ VLOG_DEFINE_THIS_MODULE(connmgr); static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); -/* An OpenFlow connection. */ +/* An OpenFlow connection. + * + * + * Thread-safety + * ============= + * + * 'ofproto_mutex' must be held whenever an ofconn is created or destroyed or, + * more or less equivalently, whenever an ofconn is added to or removed from a + * connmgr. 'ofproto_mutex' doesn't protect the data inside the ofconn, except + * as specifically noted below. */ struct ofconn { /* Configuration that persists from one connection to the next. */ @@ -90,18 +99,43 @@ struct ofconn { uint32_t master_async_config[OAM_N_TYPES]; /* master, other */ uint32_t slave_async_config[OAM_N_TYPES]; /* slave */ - /* Flow monitors. */ - struct hmap monitors; /* Contains "struct ofmonitor"s. */ - struct list updates; /* List of "struct ofpbuf"s. */ - bool sent_abbrev_update; /* Does 'updates' contain NXFME_ABBREV? */ - struct rconn_packet_counter *monitor_counter; - uint64_t monitor_paused; +/* Flow monitors (e.g. NXST_FLOW_MONITOR). */ + + /* Configuration. Contains "struct ofmonitor"s. */ + struct hmap monitors OVS_GUARDED_BY(ofproto_mutex); + + /* Flow control. + * + * When too many flow monitor notifications back up in the transmit buffer, + * we pause the transmission of further notifications. These members track + * the flow control state. + * + * When notifications are flowing, 'monitor_paused' is 0. When + * notifications are paused, 'monitor_paused' is the value of + * 'monitor_seqno' at the point we paused. + * + * 'monitor_counter' counts the OpenFlow messages and bytes currently in + * flight. This value growing too large triggers pausing. */ + uint64_t monitor_paused OVS_GUARDED_BY(ofproto_mutex); + struct rconn_packet_counter *monitor_counter OVS_GUARDED_BY(ofproto_mutex); + + /* State of monitors for a single ongoing flow_mod. + * + * 'updates' is a list of "struct ofpbuf"s that contain + * NXST_FLOW_MONITOR_REPLY messages representing the changes made by the + * current flow_mod. + * + * When 'updates' is nonempty, 'sent_abbrev_update' is true if 'updates' + * contains an update event of type NXFME_ABBREV and false otherwise.. */ + struct list updates OVS_GUARDED_BY(ofproto_mutex); + bool sent_abbrev_update OVS_GUARDED_BY(ofproto_mutex); }; static struct ofconn *ofconn_create(struct connmgr *, struct rconn *, - enum ofconn_type, bool enable_async_msgs); -static void ofconn_destroy(struct ofconn *); -static void ofconn_flush(struct ofconn *); + enum ofconn_type, bool enable_async_msgs) + OVS_REQUIRES(ofproto_mutex); +static void ofconn_destroy(struct ofconn *) OVS_REQUIRES(ofproto_mutex); +static void ofconn_flush(struct ofconn *) OVS_REQUIRES(ofproto_mutex); static void ofconn_reconfigure(struct ofconn *, const struct ofproto_controller *); @@ -226,9 +260,12 @@ connmgr_destroy(struct connmgr *mgr) return; } + ovs_mutex_lock(&ofproto_mutex); LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) { ofconn_destroy(ofconn); } + ovs_mutex_unlock(&ofproto_mutex); + hmap_destroy(&mgr->controllers); HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) { @@ -271,6 +308,7 @@ void connmgr_run(struct connmgr *mgr, bool (*handle_openflow)(struct ofconn *, const struct ofpbuf *ofp_msg)) + OVS_EXCLUDED(ofproto_mutex) { struct ofconn *ofconn, *next_ofconn; struct ofservice *ofservice; @@ -310,8 +348,11 @@ connmgr_run(struct connmgr *mgr, rconn_connect_unreliably(rconn, vconn, name); free(name); + ovs_mutex_lock(&ofproto_mutex); ofconn = ofconn_create(mgr, rconn, OFCONN_SERVICE, ofservice->enable_async_msgs); + ovs_mutex_unlock(&ofproto_mutex); + ofconn_set_rate_limit(ofconn, ofservice->rate_limit, ofservice->burst_limit); } else if (retval != EAGAIN) { @@ -409,7 +450,8 @@ connmgr_retry(struct connmgr *mgr) /* OpenFlow configuration. */ static void add_controller(struct connmgr *, const char *target, uint8_t dscp, - uint32_t allowed_versions); + uint32_t allowed_versions) + OVS_REQUIRES(ofproto_mutex); static struct ofconn *find_controller_by_target(struct connmgr *, const char *target); static void update_fail_open(struct connmgr *); @@ -501,6 +543,7 @@ void connmgr_set_controllers(struct connmgr *mgr, const struct ofproto_controller *controllers, size_t n_controllers, uint32_t allowed_versions) + OVS_EXCLUDED(ofproto_mutex) { bool had_controllers = connmgr_has_controllers(mgr); struct shash new_controllers; @@ -508,6 +551,10 @@ connmgr_set_controllers(struct connmgr *mgr, struct ofservice *ofservice, *next_ofservice; size_t i; + /* Required to add and remove ofconns. This could probably be narrowed to + * cover a smaller amount of code, if that yielded some benefit. */ + ovs_mutex_lock(&ofproto_mutex); + /* Create newly configured controllers and services. * Create a name to ofproto_controller mapping in 'new_controllers'. */ shash_init(&new_controllers); @@ -595,6 +642,7 @@ connmgr_set_controllers(struct connmgr *mgr, if (had_controllers != connmgr_has_controllers(mgr)) { ofproto_flush_flows(mgr->ofproto); } + ovs_mutex_unlock(&ofproto_mutex); } /* Drops the connections between 'mgr' and all of its primary and secondary @@ -642,6 +690,7 @@ connmgr_has_snoops(const struct connmgr *mgr) static void add_controller(struct connmgr *mgr, const char *target, uint8_t dscp, uint32_t allowed_versions) + OVS_REQUIRES(ofproto_mutex) { char *name = ofconn_make_name(mgr, target); struct ofconn *ofconn; @@ -691,10 +740,9 @@ update_in_band_remotes(struct connmgr *mgr) continue; } - if (stream_parse_target_with_default_ports(target, - OFP_TCP_PORT, - OFP_SSL_PORT, - sin)) { + if (stream_parse_target_with_default_port(target, + OFP_OLD_PORT, + sin)) { n_addrs++; } } @@ -1110,6 +1158,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type, * connection to the next. */ static void ofconn_flush(struct ofconn *ofconn) + OVS_REQUIRES(ofproto_mutex) { struct ofmonitor *monitor, *next_monitor; int i; @@ -1192,6 +1241,7 @@ ofconn_flush(struct ofconn *ofconn) static void ofconn_destroy(struct ofconn *ofconn) + OVS_REQUIRES(ofproto_mutex) { ofconn_flush(ofconn); @@ -1238,7 +1288,7 @@ ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c) static bool ofconn_may_recv(const struct ofconn *ofconn) { - int count = ofconn->reply_counter->n_packets; + int count = rconn_packet_counter_n_packets(ofconn->reply_counter); return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX; } @@ -1281,11 +1331,13 @@ ofconn_run(struct ofconn *ofconn, } } + ovs_mutex_lock(&ofproto_mutex); if (!rconn_is_alive(ofconn->rconn)) { ofconn_destroy(ofconn); } else if (!rconn_is_connected(ofconn->rconn)) { ofconn_flush(ofconn); } + ovs_mutex_unlock(&ofproto_mutex); } static void @@ -1464,11 +1516,20 @@ static void schedule_packet_in(struct ofconn *ofconn, struct ofputil_packet_in pin) { struct connmgr *mgr = ofconn->connmgr; + uint16_t controller_max_len; pin.total_len = pin.packet_len; - /* Get OpenFlow buffer_id. */ if (pin.reason == OFPR_ACTION) { + controller_max_len = pin.send_len; /* max_len */ + } else { + controller_max_len = ofconn->miss_send_len; + } + + /* Get OpenFlow buffer_id. + * For OpenFlow 1.2+, OFPCML_NO_BUFFER (== UINT16_MAX) specifies + * unbuffered. This behaviour doesn't violate prior versions, too. */ + if (controller_max_len == UINT16_MAX) { pin.buffer_id = UINT32_MAX; } else if (mgr->fail_open && fail_open_is_active(mgr->fail_open)) { pin.buffer_id = pktbuf_get_null(); @@ -1479,15 +1540,13 @@ schedule_packet_in(struct ofconn *ofconn, struct ofputil_packet_in pin) pin.fmd.in_port); } - /* Figure out how much of the packet to send. */ - if (pin.reason == OFPR_NO_MATCH) { + /* Figure out how much of the packet to send. + * If not buffered, send the entire packet. Otherwise, depending on + * the reason of packet-in, send what requested by the controller. */ + if (pin.buffer_id == UINT32_MAX) { pin.send_len = pin.packet_len; } else { - /* Caller should have initialized 'send_len' to 'max_len' specified in - * output action. */ - } - if (pin.buffer_id != UINT32_MAX) { - pin.send_len = MIN(pin.send_len, ofconn->miss_send_len); + pin.send_len = MIN(pin.packet_len, controller_max_len); } /* Make OFPT_PACKET_IN and hand over to packet scheduler. It might @@ -1667,6 +1726,7 @@ connmgr_has_in_band(struct connmgr *mgr) * In-band control has more sophisticated code that manages flows itself. */ void connmgr_flushed(struct connmgr *mgr) + OVS_EXCLUDED(ofproto_mutex) { if (mgr->fail_open) { fail_open_flushed(mgr->fail_open); @@ -1781,6 +1841,7 @@ COVERAGE_DEFINE(ofmonitor_resume); enum ofperr ofmonitor_create(const struct ofputil_flow_monitor_request *request, struct ofconn *ofconn, struct ofmonitor **monitorp) + OVS_REQUIRES(ofproto_mutex) { struct ofmonitor *m; @@ -1806,6 +1867,7 @@ ofmonitor_create(const struct ofputil_flow_monitor_request *request, struct ofmonitor * ofmonitor_lookup(struct ofconn *ofconn, uint32_t id) + OVS_REQUIRES(ofproto_mutex) { struct ofmonitor *m; @@ -1820,6 +1882,7 @@ ofmonitor_lookup(struct ofconn *ofconn, uint32_t id) void ofmonitor_destroy(struct ofmonitor *m) + OVS_REQUIRES(ofproto_mutex) { if (m) { minimatch_destroy(&m->match); @@ -1833,6 +1896,7 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule, enum nx_flow_update_event event, enum ofp_flow_removed_reason reason, const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid) + OVS_REQUIRES(ofproto_mutex) { enum nx_flow_monitor_flags update; struct ofconn *ofconn; @@ -1897,10 +1961,10 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule, fu.match = &match; fu.priority = rule->cr.priority; - ovs_mutex_lock(&rule->timeout_mutex); + ovs_mutex_lock(&rule->mutex); fu.idle_timeout = rule->idle_timeout; fu.hard_timeout = rule->hard_timeout; - ovs_mutex_unlock(&rule->timeout_mutex); + ovs_mutex_unlock(&rule->mutex); if (flags & NXFMF_ACTIONS) { fu.ofpacts = rule->actions->ofpacts; @@ -1925,6 +1989,7 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule, void ofmonitor_flush(struct connmgr *mgr) + OVS_REQUIRES(ofproto_mutex) { struct ofconn *ofconn; @@ -1932,10 +1997,12 @@ ofmonitor_flush(struct connmgr *mgr) struct ofpbuf *msg, *next; LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) { + unsigned int n_bytes; + list_remove(&msg->list_node); ofconn_send(ofconn, msg, ofconn->monitor_counter); - if (!ofconn->monitor_paused - && ofconn->monitor_counter->n_bytes > 128 * 1024) { + n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter); + if (!ofconn->monitor_paused && n_bytes > 128 * 1024) { struct ofpbuf *pause; COVERAGE_INC(ofmonitor_pause); @@ -1950,6 +2017,7 @@ ofmonitor_flush(struct connmgr *mgr) static void ofmonitor_resume(struct ofconn *ofconn) + OVS_REQUIRES(ofproto_mutex) { struct rule_collection rules; struct ofpbuf *resumed; @@ -1972,17 +2040,27 @@ ofmonitor_resume(struct ofconn *ofconn) ofconn->monitor_paused = 0; } +static bool +ofmonitor_may_resume(const struct ofconn *ofconn) + OVS_REQUIRES(ofproto_mutex) +{ + return (ofconn->monitor_paused != 0 + && !rconn_packet_counter_n_packets(ofconn->monitor_counter)); +} + static void ofmonitor_run(struct connmgr *mgr) { struct ofconn *ofconn; + ovs_mutex_lock(&ofproto_mutex); LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { - if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + if (ofmonitor_may_resume(ofconn)) { COVERAGE_INC(ofmonitor_resume); ofmonitor_resume(ofconn); } } + ovs_mutex_unlock(&ofproto_mutex); } static void @@ -1990,9 +2068,11 @@ ofmonitor_wait(struct connmgr *mgr) { struct ofconn *ofconn; + ovs_mutex_lock(&ofproto_mutex); LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { - if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + if (ofmonitor_may_resume(ofconn)) { poll_immediate_wake(); } } + ovs_mutex_unlock(&ofproto_mutex); }