+void
+netflow_expire(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex)
+{
+ struct netflow_flow *nf_flow;
+
+ ovs_mutex_lock(&mutex);
+ nf_flow = netflow_flow_lookup(nf, flow);
+ if (nf_flow) {
+ netflow_expire__(nf, nf_flow);
+ }
+ ovs_mutex_unlock(&mutex);
+}
+
+void
+netflow_flow_clear(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex)
+{
+ struct netflow_flow *nf_flow;
+
+ ovs_mutex_lock(&mutex);
+ nf_flow = netflow_flow_lookup(nf, flow);
+ if (nf_flow) {
+ ovs_assert(!nf_flow->packet_count);
+ ovs_assert(!nf_flow->byte_count);
+ hmap_remove(&nf->flows, &nf_flow->hmap_node);
+ free(nf_flow);
+ }
+ ovs_mutex_unlock(&mutex);
+}
+
+/* Returns true if it's time to send out a round of NetFlow active timeouts,
+ * false otherwise. */
+static void
+netflow_run__(struct netflow *nf) OVS_REQUIRES(mutex)
+{
+ long long int now = time_msec();
+ struct netflow_flow *nf_flow, *next;
+
+ if (ofpbuf_size(&nf->packet)) {
+ collectors_send(nf->collectors, ofpbuf_data(&nf->packet), ofpbuf_size(&nf->packet));
+ ofpbuf_set_size(&nf->packet, 0);
+ }
+
+ if (!nf->active_timeout || now < nf->next_timeout) {
+ return;
+ }
+
+ nf->next_timeout = now + 1000;
+
+ HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) {
+ if (now > nf_flow->last_expired + nf->active_timeout) {
+ bool idle = nf_flow->used < nf_flow->last_expired;
+ netflow_expire__(nf, nf_flow);
+
+ if (idle) {
+ /* If the netflow_flow hasn't been used in a while, it's
+ * possible the upper layer lost track of it. */
+ hmap_remove(&nf->flows, &nf_flow->hmap_node);
+ free(nf_flow);
+ }
+ }
+ }
+}
+