From: Ethan Jackson Date: Wed, 30 Oct 2013 19:59:15 +0000 (-0700) Subject: netflow: Make thread safe. X-Git-Tag: sliver-openvswitch-2.1.90-1~10^2~201 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=8e407f2744c0fad4fe4785c7be5849eb6ad1f903;p=sliver-openvswitch.git netflow: Make thread safe. In future patches upcall handler threads will need to update netflow. Signed-off-by: Ethan Jackson Acked-by: Ben Pfaff --- diff --git a/ofproto/netflow.c b/ofproto/netflow.c index 4e8949324..c91ecf068 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -52,6 +52,8 @@ struct netflow { long long int reconfig_time; /* When we reconfigured the timeouts. */ struct hmap flows; /* Contains 'netflow_flows'. */ + + atomic_int ref_cnt; }; struct netflow_flow { @@ -76,10 +78,15 @@ struct netflow_flow { long long int used; /* Last-used time (0 if never used). */ }; +static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; + static struct netflow_flow *netflow_flow_lookup(const struct netflow *, - const struct flow *); + const struct flow *) + OVS_REQUIRES(mutex); static uint32_t netflow_flow_hash(const struct flow *); -static void netflow_expire__(struct netflow *, struct netflow_flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *) + OVS_REQUIRES(mutex); +static void netflow_run__(struct netflow *) OVS_REQUIRES(mutex); void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) @@ -98,6 +105,7 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, uint32_t packet_count, uint32_t byte_count) + OVS_REQUIRES(mutex) { struct netflow_v5_header *nf_hdr; struct netflow_v5_record *nf_rec; @@ -157,7 +165,7 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, /* NetFlow messages are limited to 30 records. */ if (ntohs(nf_hdr->count) >= 30) { - netflow_run(nf); + netflow_run__(nf); } } @@ -165,6 +173,7 @@ void netflow_flow_update(struct netflow *nf, struct flow *flow, ofp_port_t output_iface, const struct dpif_flow_stats *stats) + OVS_EXCLUDED(mutex) { struct netflow_flow *nf_flow; long long int used; @@ -174,6 +183,7 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, return; } + ovs_mutex_lock(&mutex); nf_flow = netflow_flow_lookup(nf, flow); if (!nf_flow) { nf_flow = xzalloc(sizeof *nf_flow); @@ -209,10 +219,13 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, nf_flow->last_expired = time_msec(); } } + + ovs_mutex_unlock(&mutex); } static void netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) + OVS_REQUIRES(mutex) { uint64_t pkts, bytes; @@ -264,32 +277,38 @@ netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) } void -netflow_expire(struct netflow *nf, struct flow *flow) +netflow_expire(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { netflow_expire__(nf, nf_flow); } + ovs_mutex_unlock(&mutex); } void -netflow_flow_clear(struct netflow *nf, struct flow *flow) +netflow_flow_clear(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { ovs_assert(!nf_flow->packet_count); ovs_assert(!nf_flow->byte_count); hmap_remove(&nf->flows, &nf_flow->hmap_node); free(nf_flow); } + ovs_mutex_unlock(&mutex); } /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ -void -netflow_run(struct netflow *nf) +static void +netflow_run__(struct netflow *nf) OVS_REQUIRES(mutex) { long long int now = time_msec(); struct netflow_flow *nf_flow, *next; @@ -321,23 +340,35 @@ netflow_run(struct netflow *nf) } void -netflow_wait(struct netflow *nf) +netflow_run(struct netflow *nf) { + ovs_mutex_lock(&mutex); + netflow_run__(nf); + ovs_mutex_unlock(&mutex); +} + +void +netflow_wait(struct netflow *nf) OVS_EXCLUDED(mutex) +{ + ovs_mutex_lock(&mutex); if (nf->active_timeout) { poll_timer_wait_until(nf->next_timeout); } if (nf->packet.size) { poll_immediate_wake(); } + ovs_mutex_unlock(&mutex); } int netflow_set_options(struct netflow *nf, const struct netflow_options *nf_options) + OVS_EXCLUDED(mutex) { int error = 0; long long int old_timeout; + ovs_mutex_lock(&mutex); nf->engine_type = nf_options->engine_type; nf->engine_id = nf_options->engine_id; nf->add_id_to_iface = nf_options->add_id_to_iface; @@ -356,6 +387,7 @@ netflow_set_options(struct netflow *nf, nf->reconfig_time = time_msec(); nf->next_timeout = time_msec(); } + ovs_mutex_unlock(&mutex); return error; } @@ -371,14 +403,35 @@ netflow_create(void) nf->add_id_to_iface = false; nf->netflow_cnt = 0; hmap_init(&nf->flows); + atomic_init(&nf->ref_cnt, 1); ofpbuf_init(&nf->packet, 1500); return nf; } -void -netflow_destroy(struct netflow *nf) +struct netflow * +netflow_ref(const struct netflow *nf_) { + struct netflow *nf = CONST_CAST(struct netflow *, nf_); if (nf) { + int orig; + atomic_add(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + } + return nf; +} + +void +netflow_unref(struct netflow *nf) +{ + int orig; + + if (!nf) { + return; + } + + atomic_sub(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + if (orig == 1) { ofpbuf_uninit(&nf->packet); collectors_destroy(nf->collectors); free(nf); @@ -389,6 +442,7 @@ netflow_destroy(struct netflow *nf) static struct netflow_flow * netflow_flow_lookup(const struct netflow *nf, const struct flow *flow) + OVS_REQUIRES(mutex) { struct netflow_flow *nf_flow; diff --git a/ofproto/netflow.h b/ofproto/netflow.h index 6493841cf..f37cfa7a4 100644 --- a/ofproto/netflow.h +++ b/ofproto/netflow.h @@ -41,7 +41,9 @@ struct netflow_options { #define NF_OUT_DROP OFP_PORT_C(UINT16_MAX - 2) struct netflow *netflow_create(void); -void netflow_destroy(struct netflow *); +struct netflow *netflow_ref(const struct netflow *); +void netflow_unref(struct netflow *); + int netflow_set_options(struct netflow *, const struct netflow_options *); void netflow_expire(struct netflow *, struct flow *); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index dfe809fd3..693bb0ac8 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -1421,7 +1421,7 @@ destruct(struct ofproto *ofproto_) mbridge_unref(ofproto->mbridge); - netflow_destroy(ofproto->netflow); + netflow_unref(ofproto->netflow); dpif_sflow_unref(ofproto->sflow); hmap_destroy(&ofproto->bundles); mac_learning_unref(ofproto->ml); @@ -5082,7 +5082,7 @@ set_netflow(struct ofproto *ofproto_, return netflow_set_options(ofproto->netflow, netflow_options); } else if (ofproto->netflow) { ofproto->backer->need_revalidate = REV_RECONFIGURE; - netflow_destroy(ofproto->netflow); + netflow_unref(ofproto->netflow); ofproto->netflow = NULL; }