1 /* See the DRL-LICENSE file for this file's software license. */
10 #include "common_accounting.h"
11 #include "samplehold.h"
14 static int match(const key_flow *key, const sampled_flow *flow) {
15 if (flow->state != FLOW_USED)
18 if (key->source_ip != flow->source_ip)
21 if (key->dest_ip != flow->dest_ip)
24 if (key->source_port != flow->source_port)
27 if (key->dest_port != flow->dest_port)
30 if (key->protocol != flow->protocol)
36 static void get_key(key_flow *key, sampled_flow *flow) {
37 key->source_ip = flow->source_ip;
38 key->dest_ip = flow->dest_ip;
39 key->source_port = flow->source_port;
40 key->dest_port = flow->dest_port;
41 key->protocol = flow->protocol;
46 static void move_flow(sampled_flow *dest, sampled_flow *src) {
47 memmove(dest, src, sizeof(sampled_flow));
48 memset(src, 0, sizeof(sampled_flow));
51 uint32_t sampled_table_size(const sampled_flow_table table) {
58 * max_bytes is the maximum number of bytes that can pass though DURING THE
59 * MEASUREMENT INTERVAL. So, if you can have 100 Mbit/s and your measurement
60 * interval is 1/10 of a second, your max_bytes is 10Mbit because that's all
61 * you can transfer in 1/10 of a second.
63 * flow_percentage is the percentage of max_bytes that is considered an
66 * oversampling factor is a knob that tunes how accurate our results are at
67 * the cost of additional state/memory.
69 sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow *key), const uint32_t max_bytes, const uint32_t flow_percentage, const uint32_t oversampling_factor, common_accounting_t *common) {
70 sampled_flow_table table = malloc(sizeof(struct sampled_flow_table));
71 double base_size = (double) 100 / (double) flow_percentage;
77 table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03);
79 table->hash_function = hash_function;
80 table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
81 table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
83 table->largest = NULL;
84 table->backing = malloc(sizeof(sampled_flow) * table->capacity);
86 if (table->backing == NULL) {
91 memset(table->backing, 0, sizeof(sampled_flow) * table->capacity);
95 table->common = common;
96 gettimeofday(&table->common->last_update, NULL);
101 void sampled_table_destroy(sampled_flow_table table) {
102 free(table->backing);
106 sampled_flow *sampled_table_lookup(sampled_flow_table table, const key_flow *key) {
107 uint32_t hash = table->hash_function(key) % table->capacity;
108 uint32_t location = hash;
111 if (table->backing[location].state == FLOW_FREE) {
112 /* It ain't here... */
116 if (match(key, &table->backing[location])) {
118 return &table->backing[location];
122 if (location == table->capacity) {
125 } while (location != hash);
130 int sampled_table_sample(sampled_flow_table table, const key_flow *key) {
131 sampled_flow *lookup = sampled_table_lookup(table, key);
135 /* First we update the common accouting information so that we have accurate
136 * aggregate information. */
137 table->common->bytes_since += key->packet_size;
139 /* Below here we're dealing with individual flows. */
141 /* It's already in the table, update it. */
142 if (lookup != NULL) {
143 lookup->bytes += key->packet_size;
147 /* It's not in the table, probabilistically sample it. */
148 packet_prob = table->sample_prob * (double) key->packet_size;
149 random_number = rand() % RANDOM_GRANULARITY;
151 if (random_number < packet_prob) {
152 /* It's being sampled - add it to the table. */
153 uint32_t hash = table->hash_function(key) % table->capacity;
154 uint32_t location = hash;
157 if (table->backing[location].state == FLOW_FREE ||
158 table->backing[location].state == FLOW_DELETED) {
159 lookup = &table->backing[location];
164 if (location == table->capacity) {
167 } while (location != hash);
169 if (lookup == NULL) {
170 /* Table is full!?! */
171 printlog(LOG_WARN, "samplehold.c: Table full!\n");
177 lookup->bytes = key->packet_size;
178 lookup->source_ip = key->source_ip;
179 lookup->dest_ip = key->dest_ip;
180 lookup->source_port = key->source_port;
181 lookup->dest_port = key->dest_port;
182 lookup->protocol = key->protocol;
183 lookup->state = FLOW_USED;
184 lookup->last_bytes = 0;
187 gettimeofday(&lookup->last_update, NULL);
196 int sampled_table_cleanup(sampled_flow_table table) {
198 * 1) Remove "small" flows from the table.
199 * 2) Compact the table so that the remaining flows are closer to their
201 * 3) Reset the state of deleted flows to free.
204 /* How it might work...
205 * 1) Scan through the backing array.
206 * 2) If the flow is small, memset it to 0.
207 * It it's large, add it to a linked list.
208 * 3) For all items in the linked list, hash them and put them in the
212 /* For now though, we're going to do it the inefficient way and loop
213 * through the backing twice.
218 /* Clear small items. */
219 for (i = 0; i < table->capacity; ++i) {
220 if (table->backing[i].state == FLOW_USED && table->backing[i].bytes > table->threshold) {
221 /* It gets to stick around. */
224 memset(&table->backing[i], 0, sizeof(sampled_flow));
228 /* Compact the table and put things closer to their hash locations. */
229 for (i = 0; i < table->capacity; ++i) {
230 if (table->backing[i].state == FLOW_USED) {
234 get_key(&key, &table->backing[i]);
235 hash = table->hash_function(&key) % table->capacity;
238 /* Already in the best place */
239 table->backing[i].bytes = 0;
240 table->backing[i].last_bytes = 0;
241 table->backing[i].rate = 0;
243 uint32_t location = hash;
246 if (table->backing[location].state == FLOW_FREE) {
247 move_flow(&table->backing[location], &table->backing[i]);
248 table->backing[location].bytes = 0;
249 table->backing[location].last_bytes = 0;
250 table->backing[location].rate = 0;
255 if (location == table->capacity) {
258 } while (location != hash);
263 table->largest = NULL;
268 void sampled_table_update_flows(sampled_flow_table table, struct timeval now, double ewma_weight) {
270 uint32_t largest_rate = 0;
271 uint32_t rate_delta = 0;
272 double time_delta = 0;
273 double unweighted_rate = 0;
275 /* Update common aggregate information. */
276 time_delta = timeval_subtract(now, table->common->last_update);
278 if (time_delta <= 0) {
281 unweighted_rate = table->common->bytes_since / time_delta;
284 table->common->last_inst_rate = table->common->inst_rate;
285 table->common->inst_rate = unweighted_rate;
287 table->common->last_rate = table->common->rate;
289 /* If the rate is zero, then we don't know anything yet. Don't apply EWMA
291 if (table->common->rate == 0) {
292 table->common->rate = unweighted_rate;
294 table->common->rate = table->common->rate * ewma_weight +
295 unweighted_rate * (1 - ewma_weight);
298 table->common->bytes_since = 0;
299 table->common->last_update = now;
301 /* Update per-flow information. */
302 table->largest = &table->backing[i];
303 largest_rate = table->backing[i].rate;
305 for (i = 0; i < table->capacity; ++i) {
306 if (table->backing[i].state == FLOW_USED) {
307 rate_delta = table->backing[i].bytes - table->backing[i].last_bytes;
308 time_delta = timeval_subtract(now, table->backing[i].last_update);
310 /* Calculate the unweighted rate. Be careful not to divide by
311 * something silly. */
312 if (time_delta <= 0) {
315 unweighted_rate = rate_delta / time_delta;
318 if (table->backing[i].rate == 0) {
319 table->backing[i].rate = unweighted_rate;
321 table->backing[i].rate = (table->backing[i].rate * ewma_weight +
322 unweighted_rate * (1 - ewma_weight));
325 table->backing[i].last_bytes = table->backing[i].bytes;
326 table->backing[i].last_update = now;
328 if (table->backing[i].rate > largest_rate) {
329 largest_rate = table->backing[i].rate;
330 table->largest = &table->backing[i];
335 table->common->max_flow_rate = largest_rate;
338 sampled_flow *sampled_table_largest(sampled_flow_table table) {
339 return table->largest;