X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=ofproto%2Fnetflow.c;h=e9382afe9be707b2feccd6d0d2d84992e29265be;hb=cfc50ae514f805dcd9c14589f21158185424daf6;hp=4e894932489d9dc9246354af90f9382d3bd0b35a;hpb=8bfaca5b9a63a4900c3c99fc6a97e2b821870f66;p=sliver-openvswitch.git diff --git a/ofproto/netflow.c b/ofproto/netflow.c index 4e8949324..e9382afe9 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009, 2010, 2011 Nicira, Inc. + * Copyright (c) 2008, 2009, 2010, 2011, 2013, 2014 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,8 @@ struct netflow { long long int reconfig_time; /* When we reconfigured the timeouts. */ struct hmap flows; /* Contains 'netflow_flows'. */ + + struct ovs_refcount ref_cnt; }; struct netflow_flow { @@ -76,10 +78,16 @@ struct netflow_flow { long long int used; /* Last-used time (0 if never used). */ }; +static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; +static atomic_uint netflow_count = ATOMIC_VAR_INIT(0); + static struct netflow_flow *netflow_flow_lookup(const struct netflow *, - const struct flow *); + const struct flow *) + OVS_REQUIRES(mutex); static uint32_t netflow_flow_hash(const struct flow *); -static void netflow_expire__(struct netflow *, struct netflow_flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *) + OVS_REQUIRES(mutex); +static void netflow_run__(struct netflow *) OVS_REQUIRES(mutex); void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) @@ -90,19 +98,19 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) memset(&wc->masks.nw_proto, 0xff, sizeof wc->masks.nw_proto); memset(&wc->masks.nw_src, 0xff, sizeof wc->masks.nw_src); memset(&wc->masks.nw_dst, 0xff, sizeof wc->masks.nw_dst); - memset(&wc->masks.tp_src, 0xff, sizeof wc->masks.tp_src); - memset(&wc->masks.tp_dst, 0xff, sizeof wc->masks.tp_dst); + flow_unwildcard_tp_ports(flow, wc); wc->masks.nw_tos |= IP_DSCP_MASK; } static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, uint32_t packet_count, uint32_t byte_count) + OVS_REQUIRES(mutex) { struct netflow_v5_header *nf_hdr; struct netflow_v5_record *nf_rec; - if (!nf->packet.size) { + if (!ofpbuf_size(&nf->packet)) { struct timespec now; time_wall_timespec(&now); @@ -119,7 +127,7 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, nf_hdr->sampling_interval = htons(0); } - nf_hdr = nf->packet.data; + nf_hdr = ofpbuf_data(&nf->packet); nf_hdr->count = htons(ntohs(nf_hdr->count) + 1); nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec); @@ -157,14 +165,15 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, /* NetFlow messages are limited to 30 records. */ if (ntohs(nf_hdr->count) >= 30) { - netflow_run(nf); + netflow_run__(nf); } } void -netflow_flow_update(struct netflow *nf, struct flow *flow, +netflow_flow_update(struct netflow *nf, const struct flow *flow, ofp_port_t output_iface, const struct dpif_flow_stats *stats) + OVS_EXCLUDED(mutex) { struct netflow_flow *nf_flow; long long int used; @@ -174,6 +183,7 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, return; } + ovs_mutex_lock(&mutex); nf_flow = netflow_flow_lookup(nf, flow); if (!nf_flow) { nf_flow = xzalloc(sizeof *nf_flow); @@ -209,10 +219,13 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, nf_flow->last_expired = time_msec(); } } + + ovs_mutex_unlock(&mutex); } static void netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) + OVS_REQUIRES(mutex) { uint64_t pkts, bytes; @@ -257,46 +270,51 @@ netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) } /* Update flow tracking data. */ - nf_flow->created = 0; nf_flow->packet_count = 0; nf_flow->byte_count = 0; nf_flow->tcp_flags = 0; } void -netflow_expire(struct netflow *nf, struct flow *flow) +netflow_expire(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { netflow_expire__(nf, nf_flow); } + ovs_mutex_unlock(&mutex); } void -netflow_flow_clear(struct netflow *nf, struct flow *flow) +netflow_flow_clear(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { ovs_assert(!nf_flow->packet_count); ovs_assert(!nf_flow->byte_count); hmap_remove(&nf->flows, &nf_flow->hmap_node); free(nf_flow); } + ovs_mutex_unlock(&mutex); } /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ -void -netflow_run(struct netflow *nf) +static void +netflow_run__(struct netflow *nf) OVS_REQUIRES(mutex) { long long int now = time_msec(); struct netflow_flow *nf_flow, *next; - if (nf->packet.size) { - collectors_send(nf->collectors, nf->packet.data, nf->packet.size); - nf->packet.size = 0; + if (ofpbuf_size(&nf->packet)) { + collectors_send(nf->collectors, ofpbuf_data(&nf->packet), ofpbuf_size(&nf->packet)); + ofpbuf_set_size(&nf->packet, 0); } if (!nf->active_timeout || now < nf->next_timeout) { @@ -321,23 +339,35 @@ netflow_run(struct netflow *nf) } void -netflow_wait(struct netflow *nf) +netflow_run(struct netflow *nf) +{ + ovs_mutex_lock(&mutex); + netflow_run__(nf); + ovs_mutex_unlock(&mutex); +} + +void +netflow_wait(struct netflow *nf) OVS_EXCLUDED(mutex) { + ovs_mutex_lock(&mutex); if (nf->active_timeout) { poll_timer_wait_until(nf->next_timeout); } - if (nf->packet.size) { + if (ofpbuf_size(&nf->packet)) { poll_immediate_wake(); } + ovs_mutex_unlock(&mutex); } int netflow_set_options(struct netflow *nf, const struct netflow_options *nf_options) + OVS_EXCLUDED(mutex) { int error = 0; long long int old_timeout; + ovs_mutex_lock(&mutex); nf->engine_type = nf_options->engine_type; nf->engine_id = nf_options->engine_id; nf->add_id_to_iface = nf_options->add_id_to_iface; @@ -356,6 +386,7 @@ netflow_set_options(struct netflow *nf, nf->reconfig_time = time_msec(); nf->next_timeout = time_msec(); } + ovs_mutex_unlock(&mutex); return error; } @@ -364,6 +395,8 @@ struct netflow * netflow_create(void) { struct netflow *nf = xzalloc(sizeof *nf); + int junk; + nf->engine_type = 0; nf->engine_id = 0; nf->boot_time = time_msec(); @@ -371,24 +404,50 @@ netflow_create(void) nf->add_id_to_iface = false; nf->netflow_cnt = 0; hmap_init(&nf->flows); + ovs_refcount_init(&nf->ref_cnt); ofpbuf_init(&nf->packet, 1500); + atomic_add(&netflow_count, 1, &junk); return nf; } -void -netflow_destroy(struct netflow *nf) +struct netflow * +netflow_ref(const struct netflow *nf_) { + struct netflow *nf = CONST_CAST(struct netflow *, nf_); if (nf) { - ofpbuf_uninit(&nf->packet); + ovs_refcount_ref(&nf->ref_cnt); + } + return nf; +} + +void +netflow_unref(struct netflow *nf) +{ + if (nf && ovs_refcount_unref(&nf->ref_cnt) == 1) { + int orig; + + atomic_sub(&netflow_count, 1, &orig); collectors_destroy(nf->collectors); + ofpbuf_uninit(&nf->packet); free(nf); } } + +/* Returns true if there exist any netflow objects, false otherwise. */ +bool +netflow_exists(void) +{ + int n; + + atomic_read(&netflow_count, &n); + return n > 0; +} /* Helpers. */ static struct netflow_flow * netflow_flow_lookup(const struct netflow *nf, const struct flow *flow) + OVS_REQUIRES(mutex) { struct netflow_flow *nf_flow;