1 /* See the DRL-LICENSE file for this file's software license. */
5 #include <netinet/in.h>
10 #include <sys/types.h>
13 #include "common_accounting.h"
14 #include "samplehold.h"
17 static int match(const key_flow *key, const sampled_flow *flow) {
18 if (flow->state != FLOW_USED)
21 if (key->source_ip != flow->source_ip)
24 if (key->dest_ip != flow->dest_ip)
27 if (key->source_port != flow->source_port)
30 if (key->dest_port != flow->dest_port)
33 if (key->protocol != flow->protocol)
39 static void get_key(key_flow *key, sampled_flow *flow) {
40 key->source_ip = flow->source_ip;
41 key->dest_ip = flow->dest_ip;
42 key->source_port = flow->source_port;
43 key->dest_port = flow->dest_port;
44 key->protocol = flow->protocol;
49 static void move_flow(sampled_flow *dest, sampled_flow *src) {
50 memmove(dest, src, sizeof(sampled_flow));
51 memset(src, 0, sizeof(sampled_flow));
54 uint32_t sampled_table_size(const sampled_flow_table table) {
61 * max_bytes is the maximum number of bytes that can pass though DURING THE
62 * MEASUREMENT INTERVAL. So, if you can have 100 Mbit/s and your measurement
63 * interval is 1/10 of a second, your max_bytes is 10Mbit because that's all
64 * you can transfer in 1/10 of a second.
66 * flow_percentage is the percentage of max_bytes that is considered an
69 * oversampling factor is a knob that tunes how accurate our results are at
70 * the cost of additional state/memory.
72 sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow *key), const uint32_t max_bytes, const uint32_t flow_percentage, const uint32_t oversampling_factor, common_accounting_t *common) {
73 sampled_flow_table table = malloc(sizeof(struct sampled_flow_table));
74 double base_size = (double) 100 / (double) flow_percentage;
80 table->capacity = (uint32_t) (base_size * oversampling_factor);
82 table->hash_function = hash_function;
83 table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
84 table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
87 /* Allocate the backing and give it a little bit extra to deal with variance. */
88 table->largest = NULL;
89 table->backing = malloc(sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
91 if (table->backing == NULL) {
96 memset(table->backing, 0, sizeof(sampled_flow) * table->capacity * SAMPLEHOLD_BONUS_FACTOR);
100 table->common = common;
101 gettimeofday(&table->common->last_update, NULL);
106 void sampled_table_destroy(sampled_flow_table table) {
107 free(table->backing);
111 sampled_flow *sampled_table_lookup(sampled_flow_table table, const key_flow *key) {
112 uint32_t hash = table->hash_function(key) % table->capacity;
113 uint32_t location = hash;
116 if (table->backing[location].state == FLOW_FREE) {
117 /* It ain't here... */
121 if (match(key, &table->backing[location])) {
123 return &table->backing[location];
127 if (location == table->capacity) {
130 } while (location != hash);
135 int sampled_table_sample(sampled_flow_table table, const key_flow *key) {
136 sampled_flow *lookup = sampled_table_lookup(table, key);
140 /* First we update the common accouting information so that we have accurate
141 * aggregate information. */
142 table->common->bytes_since += key->packet_size;
144 /* Below here we're dealing with individual flows. */
146 /* It's already in the table, update it. */
147 if (lookup != NULL) {
148 lookup->bytes += key->packet_size;
152 /* It's not in the table, probabilistically sample it. */
153 packet_prob = table->sample_prob * (double) key->packet_size;
154 random_number = rand() % RANDOM_GRANULARITY;
156 if (random_number < packet_prob) {
157 /* It's being sampled - add it to the table. */
158 uint32_t hash = table->hash_function(key) % table->capacity;
159 uint32_t location = hash;
162 if (table->backing[location].state == FLOW_FREE ||
163 table->backing[location].state == FLOW_DELETED) {
164 lookup = &table->backing[location];
169 if (location == table->capacity) {
172 } while (location != hash);
174 if (lookup == NULL) {
175 /* Table is full!?! */
176 printlog(LOG_WARN, "samplehold.c: Table full!\n");
182 lookup->bytes = key->packet_size;
183 lookup->source_ip = key->source_ip;
184 lookup->dest_ip = key->dest_ip;
185 lookup->source_port = key->source_port;
186 lookup->dest_port = key->dest_port;
187 lookup->protocol = key->protocol;
188 lookup->state = FLOW_USED;
189 lookup->last_bytes = 0;
192 gettimeofday(&lookup->last_update, NULL);
201 int sampled_table_cleanup(sampled_flow_table table) {
203 * 1) Remove "small" flows from the table.
204 * 2) Compact the table so that the remaining flows are closer to their
206 * 3) Reset the state of deleted flows to free.
209 /* How it might work...
210 * 1) Scan through the backing array.
211 * 2) If the flow is small, memset it to 0.
212 * It it's large, add it to a linked list.
213 * 3) For all items in the linked list, hash them and put them in the
217 /* For now though, we're going to do it the inefficient way and loop
218 * through the backing twice.
223 /* Clear small items. */
224 for (i = 0; i < table->capacity; ++i) {
225 if (table->backing[i].state == FLOW_USED && table->backing[i].bytes > table->threshold) {
226 /* It gets to stick around. */
229 memset(&table->backing[i], 0, sizeof(sampled_flow));
233 /* Compact the table and put things closer to their hash locations. */
234 for (i = 0; i < table->capacity; ++i) {
235 if (table->backing[i].state == FLOW_USED) {
239 get_key(&key, &table->backing[i]);
240 hash = table->hash_function(&key) % table->capacity;
243 /* Already in the best place */
244 table->backing[i].bytes = 0;
245 table->backing[i].last_bytes = 0;
246 table->backing[i].rate = 0;
248 uint32_t location = hash;
251 if (table->backing[location].state == FLOW_FREE) {
252 move_flow(&table->backing[location], &table->backing[i]);
253 table->backing[location].bytes = 0;
254 table->backing[location].last_bytes = 0;
255 table->backing[location].rate = 0;
260 if (location == table->capacity) {
263 } while (location != hash);
268 table->largest = NULL;
273 void sampled_table_update_flows(sampled_flow_table table, struct timeval now, double ewma_weight) {
275 uint32_t largest_rate = 0;
276 uint32_t rate_delta = 0;
277 double time_delta = 0;
278 double unweighted_rate = 0;
279 struct in_addr src, dst;
280 char sip[22], dip[22];
282 /* Reset statistics. */
283 table->common->num_flows = 0;
284 table->common->num_flows_5k = 0;
285 table->common->num_flows_10k = 0;
286 table->common->num_flows_20k = 0;
287 table->common->num_flows_50k = 0;
288 table->common->avg_rate = 0;
289 /* End statistics. */
291 /* Update common aggregate information. */
292 time_delta = timeval_subtract(now, table->common->last_update);
294 if (time_delta <= 0) {
297 unweighted_rate = table->common->bytes_since / time_delta;
300 table->common->last_inst_rate = table->common->inst_rate;
301 table->common->inst_rate = unweighted_rate;
303 table->common->last_rate = table->common->rate;
305 /* If the rate is zero, then we don't know anything yet. Don't apply EWMA
307 if (table->common->rate == 0) {
308 table->common->rate = unweighted_rate;
310 table->common->rate = table->common->rate * ewma_weight +
311 unweighted_rate * (1 - ewma_weight);
314 printlog(LOG_DEBUG, "table->common->rate is now %u\n", table->common->rate);
316 table->common->bytes_since = 0;
317 table->common->last_update = now;
318 table->common->num_flows = 0;
320 /* Update per-flow information. */
321 table->largest = &table->backing[i];
322 largest_rate = table->backing[i].rate;
324 for (i = 0; i < table->capacity; ++i) {
325 if (table->backing[i].state == FLOW_USED) {
326 rate_delta = table->backing[i].bytes - table->backing[i].last_bytes;
327 time_delta = timeval_subtract(now, table->backing[i].last_update);
329 /* Calculate the unweighted rate. Be careful not to divide by
330 * something silly. */
331 if (time_delta <= 0) {
334 unweighted_rate = rate_delta / time_delta;
337 if (table->backing[i].rate == 0) {
338 table->backing[i].rate = unweighted_rate;
340 table->backing[i].rate = (table->backing[i].rate * ewma_weight +
341 unweighted_rate * (1 - ewma_weight));
344 table->backing[i].last_bytes = table->backing[i].bytes;
345 table->backing[i].last_update = now;
347 if (table->backing[i].rate > largest_rate) {
348 largest_rate = table->backing[i].rate;
349 table->largest = &table->backing[i];
352 if (table->backing[i].rate > 51200) {
353 table->common->num_flows_50k += 1;
354 table->common->num_flows_20k += 1;
355 table->common->num_flows_10k += 1;
356 table->common->num_flows_5k += 1;
357 } else if (table->backing[i].rate > 20480) {
358 table->common->num_flows_20k += 1;
359 table->common->num_flows_10k += 1;
360 table->common->num_flows_5k += 1;
361 } else if (table->backing[i].rate > 10240) {
362 table->common->num_flows_10k += 1;
363 table->common->num_flows_5k += 1;
364 } else if (table->backing[i].rate > 5120) {
365 table->common->num_flows_5k += 1;
368 table->common->num_flows += 1;
370 /* Print debugging info. */
371 src.s_addr = ntohl(table->backing[i].source_ip);
372 dst.s_addr = ntohl(table->backing[i].dest_ip);
373 strcpy(sip, inet_ntoa(src));
374 strcpy(dip, inet_ntoa(dst));
375 printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", &table->backing[i],
376 sip, table->backing[i].source_port,
377 dip, table->backing[i].dest_port,
378 table->backing[i].rate);
382 if (table->common->num_flows > 0) {
383 table->common->avg_rate = table->common->rate / table->common->num_flows;
386 table->common->max_flow_rate = largest_rate;
389 sampled_flow *sampled_table_largest(sampled_flow_table table) {
390 return table->largest;