8ebab9d8b9398312978ff602b81cfe230f204ae1
[distributedratelimiting.git] / drl / samplehold.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #include <inttypes.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <sys/time.h>
8 #include <time.h>
9
10 #include "common_accounting.h"
11 #include "samplehold.h"
12 #include "logging.h"
13
14 static int match(const key_flow *key, const sampled_flow *flow) {
15     if (flow->state != FLOW_USED)
16         return 0;
17
18     if (key->source_ip != flow->source_ip)
19         return 0;
20
21     if (key->dest_ip != flow->dest_ip)
22         return 0;
23
24     if (key->source_port != flow->source_port)
25         return 0;
26
27     if (key->dest_port != flow->dest_port)
28         return 0;
29
30     if (key->protocol != flow->protocol)
31         return 0;
32
33     return 1;
34 }
35
36 static void get_key(key_flow *key, sampled_flow *flow) {
37     key->source_ip = flow->source_ip;
38     key->dest_ip = flow->dest_ip;
39     key->source_port = flow->source_port;
40     key->dest_port = flow->dest_port;
41     key->protocol = flow->protocol;
42
43     key->packet_size = 0;
44 }
45
46 static void move_flow(sampled_flow *dest, sampled_flow *src) {
47     memmove(dest, src, sizeof(sampled_flow));
48     memset(src, 0, sizeof(sampled_flow));
49 }
50
51 uint32_t sampled_table_size(const sampled_flow_table table) {
52     return table->size;
53 }
54
55 /*
56  * Notes to myself...
57  *
58  * max_bytes is the maximum number of bytes that can pass though DURING THE
59  * MEASUREMENT INTERVAL.  So, if you can have 100 Mbit/s and your measurement
60  * interval is 1/10 of a second, your max_bytes is 10Mbit because that's all
61  * you can transfer in 1/10 of a second.
62  *
63  * flow_percentage is the percentage of max_bytes that is considered an
64  * interesting flow.
65  *
66  * oversampling factor is a knob that tunes how accurate our results are at
67  * the cost of additional state/memory.
68  */
69 sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow *key), const uint32_t max_bytes, const uint32_t flow_percentage, const uint32_t oversampling_factor, common_accounting_t *common) {
70     sampled_flow_table table = malloc(sizeof(struct sampled_flow_table));
71     double base_size = (double) 100 / (double) flow_percentage;
72
73     if (table == NULL) {
74         return NULL;
75     }
76
77     table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03);
78     table->size = 0;
79     table->hash_function = hash_function;
80     table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
81     table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
82
83     table->largest = NULL;
84     table->backing = malloc(sizeof(sampled_flow) * table->capacity);
85
86     if (table->backing == NULL) {
87         free(table);
88         return NULL;
89     }
90
91     memset(table->backing, 0, sizeof(sampled_flow) * table->capacity);
92
93     srand(time(NULL));
94
95     table->common = common;
96     gettimeofday(&table->common->last_update, NULL);
97
98     return table;
99 }
100
101 void sampled_table_destroy(sampled_flow_table table) {
102     free(table->backing);
103     free(table);
104 }
105
106 sampled_flow *sampled_table_lookup(sampled_flow_table table, const key_flow *key) {
107     uint32_t hash = table->hash_function(key) % table->capacity;
108     uint32_t location = hash;
109
110     do {
111         if (table->backing[location].state == FLOW_FREE) {
112             /* It ain't here... */
113             return NULL;
114         }
115
116         if (match(key, &table->backing[location])) {
117             /* Got it! */
118             return &table->backing[location];
119         }
120
121         location++;
122         if (location == table->capacity) {
123             location = 0;
124         }
125     } while (location != hash);
126
127     return NULL;
128 }
129
130 int sampled_table_sample(sampled_flow_table table, const key_flow *key) {
131     sampled_flow *lookup = sampled_table_lookup(table, key);
132     int random_number;
133     double packet_prob;
134
135     /* First we update the common accouting information so that we have accurate
136      * aggregate information. */
137     table->common->bytes_since += key->packet_size;
138
139     /* Below here we're dealing with individual flows. */
140
141     /* It's already in the table, update it. */
142     if (lookup != NULL) {
143         lookup->bytes += key->packet_size;
144         return 1;
145     }
146
147     /* It's not in the table, probabilistically sample it. */
148     packet_prob = table->sample_prob * (double) key->packet_size;
149     random_number = rand() % RANDOM_GRANULARITY;
150
151     if (random_number < packet_prob) {
152         /* It's being sampled - add it to the table. */
153         uint32_t hash = table->hash_function(key) % table->capacity;
154         uint32_t location = hash;
155
156         do {
157             if (table->backing[location].state == FLOW_FREE ||
158                 table->backing[location].state == FLOW_DELETED) {
159                 lookup = &table->backing[location];
160                 break;
161             }
162
163             location++;
164             if (location == table->capacity) {
165                 location = 0;
166             }
167         } while (location != hash);
168
169         if (lookup == NULL) {
170             /* Table is full!?! */
171             printlog(LOG_WARN, "samplehold.c: Table full!\n");
172             return 0;
173         }
174
175         table->size += 1;
176
177         lookup->bytes = key->packet_size;
178         lookup->source_ip = key->source_ip;
179         lookup->dest_ip = key->dest_ip;
180         lookup->source_port = key->source_port;
181         lookup->dest_port = key->dest_port;
182         lookup->protocol = key->protocol;
183         lookup->state = FLOW_USED;
184         lookup->last_bytes = 0;
185         lookup->rate = 0;
186
187         gettimeofday(&lookup->last_update, NULL);
188
189         return 1;
190     }
191
192     /* Not sampled. */
193     return 0;
194 }
195
196 int sampled_table_cleanup(sampled_flow_table table) {
197     /* This should...
198      * 1) Remove "small" flows from the table.
199      * 2) Compact the table so that the remaining flows are closer to their
200      * hash locations.
201      * 3) Reset the state of deleted flows to free.
202      */
203
204     /* How it might work...
205      * 1) Scan through the backing array.
206      * 2) If the flow is small, memset it to 0.
207      *    It it's large, add it to a linked list.
208      * 3) For all items in the linked list, hash them and put them in the
209      * correct location.
210      */
211
212     /* For now though, we're going to do it the inefficient way and loop
213      * through the backing twice.
214      */
215
216     int i;
217
218     /* Clear small items. */
219     for (i = 0; i < table->capacity; ++i) {
220         if (table->backing[i].state == FLOW_USED && table->backing[i].bytes > table->threshold) {
221             /* It gets to stick around. */
222         } else {
223             /* It dies... */
224             memset(&table->backing[i], 0, sizeof(sampled_flow));
225         }
226     }
227
228     /* Compact the table and put things closer to their hash locations. */
229     for (i = 0; i < table->capacity; ++i) {
230         if (table->backing[i].state == FLOW_USED) {
231             uint32_t hash;
232             key_flow key;
233             
234             get_key(&key, &table->backing[i]);
235             hash = table->hash_function(&key) % table->capacity;
236
237             if (i == hash) {
238                 /* Already in the best place */
239                 table->backing[i].bytes = 0;
240                 table->backing[i].last_bytes = 0;
241                 table->backing[i].rate = 0;
242             } else {
243                 uint32_t location = hash;
244
245                 do {
246                     if (table->backing[location].state == FLOW_FREE) {
247                         move_flow(&table->backing[location], &table->backing[i]);
248                         table->backing[location].bytes = 0;
249                         table->backing[location].last_bytes = 0;
250                         table->backing[location].rate = 0;
251                         break;
252                     }
253
254                     location++;
255                     if (location == table->capacity) {
256                         location = 0;
257                     }
258                 } while (location != hash);
259             }
260         }
261     }
262
263     table->largest = NULL;
264
265     return 0;
266 }
267
268 void sampled_table_update_flows(sampled_flow_table table, struct timeval now, double ewma_weight) {
269     int i = 0;
270     uint32_t largest_rate = 0;
271     uint32_t rate_delta = 0;
272     double time_delta = 0;
273     double unweighted_rate = 0;
274
275     /* Update common aggregate information. */
276     time_delta = timeval_subtract(now, table->common->last_update);
277
278     if (time_delta <= 0) {
279         unweighted_rate = 0;
280     } else {
281         unweighted_rate = table->common->bytes_since / time_delta;
282     }
283
284     table->common->last_inst_rate = table->common->inst_rate;
285     table->common->inst_rate = unweighted_rate;
286
287     table->common->last_rate = table->common->rate;
288
289     /* If the rate is zero, then we don't know anything yet.  Don't apply EWMA
290      * in that case. */
291     if (table->common->rate == 0) {
292         table->common->rate = unweighted_rate;
293     } else {
294         table->common->rate = table->common->rate * ewma_weight +
295                               unweighted_rate * (1 - ewma_weight);
296     }
297
298     table->common->bytes_since = 0;
299     table->common->last_update = now;
300
301     /* Update per-flow information. */
302     table->largest = &table->backing[i];
303     largest_rate = table->backing[i].rate;
304
305     for (i = 0; i < table->capacity; ++i) {
306         if (table->backing[i].state == FLOW_USED) {
307             rate_delta = table->backing[i].bytes - table->backing[i].last_bytes;
308             time_delta = timeval_subtract(now, table->backing[i].last_update);
309
310             /* Calculate the unweighted rate.  Be careful not to divide by
311              * something silly. */
312             if (time_delta <= 0) {
313                 unweighted_rate = 0;
314             } else {
315                 unweighted_rate = rate_delta / time_delta; 
316             }
317
318             if (table->backing[i].rate == 0) {
319                 table->backing[i].rate = unweighted_rate;
320             } else {
321                 table->backing[i].rate = (table->backing[i].rate * ewma_weight +
322                                           unweighted_rate * (1 - ewma_weight));
323             }
324
325             table->backing[i].last_bytes = table->backing[i].bytes;
326             table->backing[i].last_update = now;
327
328             if (table->backing[i].rate > largest_rate) {
329                 largest_rate = table->backing[i].rate;
330                 table->largest = &table->backing[i];
331             }
332         }
333     }
334
335     table->common->max_flow_rate = largest_rate;
336 }
337
338 sampled_flow *sampled_table_largest(sampled_flow_table table) {
339     return table->largest;
340 }