From 8bfaca5b9a63a4900c3c99fc6a97e2b821870f66 Mon Sep 17 00:00:00 2001 From: Ethan Jackson Date: Tue, 22 Oct 2013 23:07:55 -0700 Subject: [PATCH] ofproto: Modularize netflow. The netflow code has its tentacles all over the ofproto-dpif module. This is fine today, but in future facets, which correspond roughly to netflow_flows, will be retired. In preparation, this patch hides as much implementation detail as possible inside the netflow module. Signed-off-by: Ethan Jackson Acked-by: Ben Pfaff --- ofproto/netflow.c | 263 ++++++++++++++++++++++++---------- ofproto/netflow.h | 29 +--- ofproto/ofproto-dpif-upcall.c | 3 +- ofproto/ofproto-dpif.c | 74 ++-------- ofproto/ofproto.h | 8 -- 5 files changed, 205 insertions(+), 172 deletions(-) diff --git a/ofproto/netflow.c b/ofproto/netflow.c index a094bac1e..4e8949324 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -22,6 +22,7 @@ #include #include "byte-order.h" #include "collectors.h" +#include "dpif.h" #include "flow.h" #include "lib/netflow.h" #include "ofpbuf.h" @@ -49,8 +50,37 @@ struct netflow { long long int active_timeout; /* Timeout for flows that are still active. */ long long int next_timeout; /* Next scheduled active timeout. */ long long int reconfig_time; /* When we reconfigured the timeouts. */ + + struct hmap flows; /* Contains 'netflow_flows'. */ +}; + +struct netflow_flow { + struct hmap_node hmap_node; + + long long int last_expired; /* Time this flow last timed out. */ + long long int created; /* Time flow was created since time out. */ + + ofp_port_t output_iface; /* Output interface index. */ + uint16_t tcp_flags; /* Bitwise-OR of all TCP flags seen. */ + + ofp_port_t in_port; /* Input port. */ + ovs_be32 nw_src; /* IPv4 source address. */ + ovs_be32 nw_dst; /* IPv4 destination address. */ + uint8_t nw_tos; /* IP ToS (including DSCP and ECN). */ + uint8_t nw_proto; /* IP protocol. */ + ovs_be16 tp_src; /* TCP/UDP/SCTP source port. */ + ovs_be16 tp_dst; /* TCP/UDP/SCTP destination port. */ + + uint64_t packet_count; /* Packets from subrules. */ + uint64_t byte_count; /* Bytes from subrules. */ + long long int used; /* Last-used time (0 if never used). */ }; +static struct netflow_flow *netflow_flow_lookup(const struct netflow *, + const struct flow *); +static uint32_t netflow_flow_hash(const struct flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *); + void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) { @@ -67,7 +97,6 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, - struct ofexpired *expired, uint32_t packet_count, uint32_t byte_count) { struct netflow_v5_header *nf_hdr; @@ -94,38 +123,37 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, nf_hdr->count = htons(ntohs(nf_hdr->count) + 1); nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec); - nf_rec->src_addr = expired->flow.nw_src; - nf_rec->dst_addr = expired->flow.nw_dst; + nf_rec->src_addr = nf_flow->nw_src; + nf_rec->dst_addr = nf_flow->nw_dst; nf_rec->nexthop = htonl(0); if (nf->add_id_to_iface) { uint16_t iface = (nf->engine_id & 0x7f) << 9; - nf_rec->input = htons(iface - | (ofp_to_u16(expired->flow.in_port.ofp_port) & 0x1ff)); + nf_rec->input = htons(iface | (ofp_to_u16(nf_flow->in_port) & 0x1ff)); nf_rec->output = htons(iface | (ofp_to_u16(nf_flow->output_iface) & 0x1ff)); } else { - nf_rec->input = htons(ofp_to_u16(expired->flow.in_port.ofp_port)); + nf_rec->input = htons(ofp_to_u16(nf_flow->in_port)); nf_rec->output = htons(ofp_to_u16(nf_flow->output_iface)); } nf_rec->packet_count = htonl(packet_count); nf_rec->byte_count = htonl(byte_count); nf_rec->init_time = htonl(nf_flow->created - nf->boot_time); - nf_rec->used_time = htonl(MAX(nf_flow->created, expired->used) + nf_rec->used_time = htonl(MAX(nf_flow->created, nf_flow->used) - nf->boot_time); - if (expired->flow.nw_proto == IPPROTO_ICMP) { + if (nf_flow->nw_proto == IPPROTO_ICMP) { /* In NetFlow, the ICMP type and code are concatenated and * placed in the 'dst_port' field. */ - uint8_t type = ntohs(expired->flow.tp_src); - uint8_t code = ntohs(expired->flow.tp_dst); + uint8_t type = ntohs(nf_flow->tp_src); + uint8_t code = ntohs(nf_flow->tp_dst); nf_rec->src_port = htons(0); nf_rec->dst_port = htons((type << 8) | code); } else { - nf_rec->src_port = expired->flow.tp_src; - nf_rec->dst_port = expired->flow.tp_dst; + nf_rec->src_port = nf_flow->tp_src; + nf_rec->dst_port = nf_flow->tp_dst; } - nf_rec->tcp_flags = (uint8_t)nf_flow->tcp_flags; - nf_rec->ip_proto = expired->flow.nw_proto; - nf_rec->ip_tos = expired->flow.nw_tos & IP_DSCP_MASK; + nf_rec->tcp_flags = (uint8_t) nf_flow->tcp_flags; + nf_rec->ip_proto = nf_flow->nw_proto; + nf_rec->ip_tos = nf_flow->nw_tos & IP_DSCP_MASK; /* NetFlow messages are limited to 30 records. */ if (ntohs(nf_hdr->count) >= 30) { @@ -134,35 +162,84 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, } void -netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow, - struct ofexpired *expired) +netflow_flow_update(struct netflow *nf, struct flow *flow, + ofp_port_t output_iface, + const struct dpif_flow_stats *stats) { - uint64_t pkt_delta = expired->packet_count - nf_flow->packet_count_off; - uint64_t byte_delta = expired->byte_count - nf_flow->byte_count_off; + struct netflow_flow *nf_flow; + long long int used; + + /* NetFlow only reports on IP packets. */ + if (flow->dl_type != htons(ETH_TYPE_IP)) { + return; + } + + nf_flow = netflow_flow_lookup(nf, flow); + if (!nf_flow) { + nf_flow = xzalloc(sizeof *nf_flow); + nf_flow->in_port = flow->in_port.ofp_port; + nf_flow->nw_src = flow->nw_src; + nf_flow->nw_dst = flow->nw_dst; + nf_flow->nw_tos = flow->nw_tos; + nf_flow->nw_proto = flow->nw_proto; + nf_flow->tp_src = flow->tp_src; + nf_flow->tp_dst = flow->tp_dst; + nf_flow->created = stats->used; + nf_flow->output_iface = output_iface; + hmap_insert(&nf->flows, &nf_flow->hmap_node, netflow_flow_hash(flow)); + } + + if (nf_flow->output_iface != output_iface) { + netflow_expire__(nf, nf_flow); + nf_flow->created = stats->used; + nf_flow->output_iface = output_iface; + } + + nf_flow->packet_count += stats->n_packets; + nf_flow->byte_count += stats->n_bytes; + nf_flow->tcp_flags |= stats->tcp_flags; + + used = MAX(nf_flow->used, stats->used); + if (nf_flow->used != used) { + nf_flow->used = used; + if (!nf->active_timeout || !nf_flow->last_expired + || nf->reconfig_time > nf_flow->last_expired) { + /* Keep the time updated to prevent a flood of expiration in + * the future. */ + nf_flow->last_expired = time_msec(); + } + } +} + +static void +netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) +{ + uint64_t pkts, bytes; + + pkts = nf_flow->packet_count; + bytes = nf_flow->byte_count; nf_flow->last_expired += nf->active_timeout; - /* NetFlow only reports on IP packets and we should only report flows - * that actually have traffic. */ - if (expired->flow.dl_type != htons(ETH_TYPE_IP) || pkt_delta == 0) { + if (pkts == 0) { return; } - if ((byte_delta >> 32) <= 175) { + if ((bytes >> 32) <= 175) { /* NetFlow v5 records are limited to 32-bit counters. If we've wrapped * a counter, send as multiple records so we don't lose track of any * traffic. We try to evenly distribute the packet and byte counters, * so that the bytes-per-packet lengths don't look wonky across the * records. */ - while (byte_delta) { - int n_recs = (byte_delta + UINT32_MAX - 1) / UINT32_MAX; - uint32_t pkt_count = pkt_delta / n_recs; - uint32_t byte_count = byte_delta / n_recs; + while (bytes) { + int n_recs = (bytes + UINT32_MAX - 1) / UINT32_MAX; + uint32_t pkt_count = pkts / n_recs; + uint32_t byte_count = bytes / n_recs; - gen_netflow_rec(nf, nf_flow, expired, pkt_count, byte_count); + gen_netflow_rec(nf, nf_flow, pkt_count, byte_count); - pkt_delta -= pkt_count; - byte_delta -= byte_count; + pkts -= pkt_count; + bytes -= byte_count; } } else { /* In 600 seconds, a 10GbE link can theoretically transmit 75 * 10**10 @@ -176,31 +253,70 @@ netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow, */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1); - VLOG_WARN_RL(&rl, "impossible byte counter %"PRIu64, byte_delta); + VLOG_WARN_RL(&rl, "impossible byte counter %"PRIu64, bytes); } /* Update flow tracking data. */ nf_flow->created = 0; - nf_flow->packet_count_off = expired->packet_count; - nf_flow->byte_count_off = expired->byte_count; + nf_flow->packet_count = 0; + nf_flow->byte_count = 0; nf_flow->tcp_flags = 0; } +void +netflow_expire(struct netflow *nf, struct flow *flow) +{ + struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + + if (nf_flow) { + netflow_expire__(nf, nf_flow); + } +} + +void +netflow_flow_clear(struct netflow *nf, struct flow *flow) +{ + struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + + if (nf_flow) { + ovs_assert(!nf_flow->packet_count); + ovs_assert(!nf_flow->byte_count); + hmap_remove(&nf->flows, &nf_flow->hmap_node); + free(nf_flow); + } +} + /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ -bool +void netflow_run(struct netflow *nf) { + long long int now = time_msec(); + struct netflow_flow *nf_flow, *next; + if (nf->packet.size) { collectors_send(nf->collectors, nf->packet.data, nf->packet.size); nf->packet.size = 0; } - if (nf->active_timeout && time_msec() >= nf->next_timeout) { - nf->next_timeout = time_msec() + 1000; - return true; - } else { - return false; + if (!nf->active_timeout || now < nf->next_timeout) { + return; + } + + nf->next_timeout = now + 1000; + + HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) { + if (now > nf_flow->last_expired + nf->active_timeout) { + bool idle = nf_flow->used < nf_flow->last_expired; + netflow_expire__(nf, nf_flow); + + if (idle) { + /* If the netflow_flow hasn't been used in a while, it's + * possible the upper layer lost track of it. */ + hmap_remove(&nf->flows, &nf_flow->hmap_node); + free(nf_flow); + } + } } } @@ -254,6 +370,7 @@ netflow_create(void) nf->collectors = NULL; nf->add_id_to_iface = false; nf->netflow_cnt = 0; + hmap_init(&nf->flows); ofpbuf_init(&nf->packet, 1500); return nf; } @@ -267,52 +384,42 @@ netflow_destroy(struct netflow *nf) free(nf); } } + +/* Helpers. */ -/* Initializes a new 'nf_flow' given that the caller has already cleared it to - * all-zero-bits. */ -void -netflow_flow_init(struct netflow_flow *nf_flow OVS_UNUSED) +static struct netflow_flow * +netflow_flow_lookup(const struct netflow *nf, const struct flow *flow) { - /* Nothing to do. */ -} - -void -netflow_flow_clear(struct netflow_flow *nf_flow) -{ - ofp_port_t output_iface = nf_flow->output_iface; - - memset(nf_flow, 0, sizeof *nf_flow); - nf_flow->output_iface = output_iface; -} - -void -netflow_flow_update_time(struct netflow *nf, struct netflow_flow *nf_flow, - long long int used) -{ - if (!nf_flow->created) { - nf_flow->created = used; + struct netflow_flow *nf_flow; + + HMAP_FOR_EACH_WITH_HASH (nf_flow, hmap_node, netflow_flow_hash(flow), + &nf->flows) { + if (flow->in_port.ofp_port == nf_flow->in_port + && flow->nw_src == nf_flow->nw_src + && flow->nw_dst == nf_flow->nw_dst + && flow->nw_tos == nf_flow->nw_tos + && flow->nw_proto == nf_flow->nw_proto + && flow->tp_src == nf_flow->tp_src + && flow->tp_dst == nf_flow->tp_dst) { + return nf_flow; + } } - if (!nf || !nf->active_timeout || !nf_flow->last_expired || - nf->reconfig_time > nf_flow->last_expired) { - /* Keep the time updated to prevent a flood of expiration in - * the future. */ - nf_flow->last_expired = time_msec(); - } + return NULL; } -void -netflow_flow_update_flags(struct netflow_flow *nf_flow, uint16_t tcp_flags) +static uint32_t +netflow_flow_hash(const struct flow *flow) { - nf_flow->tcp_flags |= tcp_flags; -} + uint32_t hash = 0; -bool -netflow_active_timeout_expired(struct netflow *nf, struct netflow_flow *nf_flow) -{ - if (nf->active_timeout) { - return time_msec() > nf_flow->last_expired + nf->active_timeout; - } + hash = mhash_add(hash, (OVS_FORCE uint32_t) flow->in_port.ofp_port); + hash = mhash_add(hash, ntohl(flow->nw_src)); + hash = mhash_add(hash, ntohl(flow->nw_dst)); + hash = mhash_add(hash, flow->nw_tos); + hash = mhash_add(hash, flow->nw_proto); + hash = mhash_add(hash, ntohs(flow->tp_src)); + hash = mhash_add(hash, ntohs(flow->tp_dst)); - return false; + return mhash_finish(hash, 28); } diff --git a/ofproto/netflow.h b/ofproto/netflow.h index e1a2443b7..6493841cf 100644 --- a/ofproto/netflow.h +++ b/ofproto/netflow.h @@ -28,8 +28,6 @@ * accounted.) */ #define NF_ACTIVE_TIMEOUT_DEFAULT 600 -struct ofexpired; - struct netflow_options { struct sset collectors; uint8_t engine_type; @@ -42,33 +40,20 @@ struct netflow_options { #define NF_OUT_MULTI OFP_PORT_C(UINT16_MAX - 1) #define NF_OUT_DROP OFP_PORT_C(UINT16_MAX - 2) -struct netflow_flow { - long long int last_expired; /* Time this flow last timed out. */ - long long int created; /* Time flow was created since time out. */ - - uint64_t packet_count_off; /* Packet count at last time out. */ - uint64_t byte_count_off; /* Byte count at last time out. */ - - ofp_port_t output_iface; /* Output interface index. */ - uint16_t tcp_flags; /* Bitwise-OR of all TCP flags seen. */ -}; - struct netflow *netflow_create(void); void netflow_destroy(struct netflow *); int netflow_set_options(struct netflow *, const struct netflow_options *); -void netflow_expire(struct netflow *, struct netflow_flow *, - struct ofexpired *); +void netflow_expire(struct netflow *, struct flow *); -bool netflow_run(struct netflow *); +void netflow_run(struct netflow *); void netflow_wait(struct netflow *); void netflow_mask_wc(struct flow *, struct flow_wildcards *); -void netflow_flow_init(struct netflow_flow *); -void netflow_flow_clear(struct netflow_flow *); -void netflow_flow_update_time(struct netflow *, struct netflow_flow *, - long long int used); -void netflow_flow_update_flags(struct netflow_flow *, uint16_t tcp_flags); -bool netflow_active_timeout_expired(struct netflow *, struct netflow_flow *); +void netflow_flow_clear(struct netflow *netflow, struct flow *flow); + +void netflow_flow_update(struct netflow *nf, struct flow *flow, + ofp_port_t output_iface, + const struct dpif_flow_stats *); #endif /* netflow.h */ diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index a7bf38d02..3f9ad58bc 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -255,8 +255,7 @@ udpif_revalidate(struct udpif *udpif) /* Since we remove each miss on revalidation, their statistics won't be * accounted to the appropriate 'facet's in the upper layer. In most * cases, this is alright because we've already pushed the stats to the - * relevant rules. However, NetFlow requires absolute packet counts on - * 'facet's which could now be incorrect. */ + * relevant rules. */ atomic_add(&udpif->reval_seq, 1, &junk); guarded_list_pop_all(&udpif->fmbs, &fmbs); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 89b255d8f..dfe809fd3 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -287,7 +287,6 @@ struct facet { /* Accounting. */ uint64_t accounted_bytes; /* Bytes processed by facet_account(). */ - struct netflow_flow nf_flow; /* Per-flow NetFlow tracking data. */ uint16_t tcp_flags; /* TCP flags seen for this 'rule'. */ struct xlate_out xout; @@ -547,9 +546,6 @@ static void handle_upcalls(struct dpif_backer *); /* Flow expiration. */ static int expire(struct dpif_backer *); -/* NetFlow. */ -static void send_netflow_active_timeouts(struct ofproto_dpif *); - /* Global variables. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -1495,9 +1491,7 @@ run(struct ofproto *ofproto_) } if (ofproto->netflow) { - if (netflow_run(ofproto->netflow)) { - send_netflow_active_timeouts(ofproto); - } + netflow_run(ofproto->netflow); } if (ofproto->sflow) { dpif_sflow_run(ofproto->sflow); @@ -3854,8 +3848,6 @@ facet_create(const struct flow_miss *miss) facet->learn_rl = time_msec() + 500; list_init(&facet->subfacets); - netflow_flow_init(&facet->nf_flow); - netflow_flow_update_time(ofproto->netflow, &facet->nf_flow, facet->used); xlate_out_copy(&facet->xout, &miss->xout); @@ -3865,7 +3857,11 @@ facet_create(const struct flow_miss *miss) classifier_insert(&ofproto->facets, &facet->cr); ovs_rwlock_unlock(&ofproto->facets.rwlock); - facet->nf_flow.output_iface = facet->xout.nf_output_iface; + if (ofproto->netflow && !facet_is_controller_flow(facet)) { + netflow_flow_update(ofproto->netflow, &facet->flow, + facet->xout.nf_output_iface, &miss->stats); + } + return facet; } @@ -4085,19 +4081,13 @@ facet_flush_stats(struct facet *facet) } if (ofproto->netflow && !facet_is_controller_flow(facet)) { - struct ofexpired expired; - expired.flow = facet->flow; - expired.packet_count = facet->packet_count; - expired.byte_count = facet->byte_count; - expired.used = facet->used; - netflow_expire(ofproto->netflow, &facet->nf_flow, &expired); + netflow_expire(ofproto->netflow, &facet->flow); + netflow_flow_clear(ofproto->netflow, &facet->flow); } /* Reset counters to prevent double counting if 'facet' ever gets * reinstalled. */ facet_reset_counters(facet); - - netflow_flow_clear(&facet->nf_flow); facet->tcp_flags = 0; } @@ -4277,7 +4267,6 @@ facet_revalidate(struct facet *facet) facet->xout.has_fin_timeout = xout.has_fin_timeout; facet->xout.nf_output_iface = xout.nf_output_iface; facet->xout.mirrors = xout.mirrors; - facet->nf_flow.output_iface = facet->xout.nf_output_iface; ovs_mutex_lock(&new_rule->up.mutex); facet->used = MAX(facet->used, new_rule->up.created); @@ -4338,9 +4327,10 @@ facet_push_stats(struct facet *facet, bool may_learn) facet->prev_byte_count = facet->byte_count; facet->prev_used = facet->used; - netflow_flow_update_time(facet->ofproto->netflow, &facet->nf_flow, - facet->used); - netflow_flow_update_flags(&facet->nf_flow, facet->tcp_flags); + if (facet->ofproto->netflow && !facet_is_controller_flow(facet)) { + netflow_flow_update(facet->ofproto->netflow, &facet->flow, + facet->xout.nf_output_iface, &stats); + } mirror_update_stats(facet->ofproto->mbridge, facet->xout.mirrors, stats.n_packets, stats.n_bytes); flow_push_stats(facet->ofproto, &facet->flow, &stats, may_learn); @@ -5107,46 +5097,6 @@ get_netflow_ids(const struct ofproto *ofproto_, dpif_get_netflow_ids(ofproto->backer->dpif, engine_type, engine_id); } - -static void -send_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet) -{ - if (!facet_is_controller_flow(facet) && - netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) { - struct subfacet *subfacet; - struct ofexpired expired; - - LIST_FOR_EACH (subfacet, list_node, &facet->subfacets) { - if (subfacet->path == SF_FAST_PATH) { - struct dpif_flow_stats stats; - - subfacet_install(subfacet, &facet->xout.odp_actions, - &stats); - subfacet_update_stats(subfacet, &stats); - } - } - - expired.flow = facet->flow; - expired.packet_count = facet->packet_count; - expired.byte_count = facet->byte_count; - expired.used = facet->used; - netflow_expire(ofproto->netflow, &facet->nf_flow, &expired); - } -} - -static void -send_netflow_active_timeouts(struct ofproto_dpif *ofproto) -{ - struct cls_cursor cursor; - struct facet *facet; - - ovs_rwlock_rdlock(&ofproto->facets.rwlock); - cls_cursor_init(&cursor, &ofproto->facets, NULL); - CLS_CURSOR_FOR_EACH (facet, cr, &cursor) { - send_active_timeout(ofproto, facet); - } - ovs_rwlock_unlock(&ofproto->facets.rwlock); -} static struct ofproto_dpif * ofproto_dpif_lookup(const char *name) diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h index 94c2ab17d..d6ab1ae56 100644 --- a/ofproto/ofproto.h +++ b/ofproto/ofproto.h @@ -55,13 +55,6 @@ struct ofproto_controller_info { } pairs; }; -struct ofexpired { - struct flow flow; - uint64_t packet_count; /* Packets from subrules. */ - uint64_t byte_count; /* Bytes from subrules. */ - long long int used; /* Last-used time (0 if never used). */ -}; - struct ofproto_sflow_options { struct sset targets; uint32_t sampling_rate; @@ -72,7 +65,6 @@ struct ofproto_sflow_options { char *control_ip; }; - struct ofproto_ipfix_bridge_exporter_options { struct sset targets; uint32_t sampling_rate; -- 2.45.2