Lots of changes. In no particular order:
[distributedratelimiting.git] / drl / multipleinterval.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #include <arpa/inet.h>
4 #include <assert.h>
5 #include <inttypes.h>
6 #include <netinet/in.h>
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <sys/time.h>
12 #include <sys/types.h>
13 #include <time.h>
14
15 #include "common_accounting.h"
16 #include "multipleinterval.h"
17 #include "logging.h"
18
19 multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common) {
20     int i;
21     multiple_flow_table table = malloc(sizeof(struct mul_flow_table));
22
23     if (table == NULL) {
24         return NULL;
25     }
26
27     memset(table, 0, sizeof(struct mul_flow_table));
28     table->common = common;
29     table->hash_function = hash_function;
30     table->interval_count = interval_count;
31
32     gettimeofday(&table->common->last_update, NULL);
33
34     table->intervals = malloc(interval_count * sizeof(interval));
35
36     if (table->intervals == NULL) {
37         free(table);
38         return NULL;
39     }
40
41     memset(table->intervals, 0, interval_count * sizeof(interval));
42     table->intervals[0].valid = 1;
43     table->intervals[0].last_update = table->common->last_update;
44
45     for (i = 0; i < interval_count; ++i) {
46         table->intervals[i].next = &table->intervals[(i + 1) % interval_count];
47     }
48
49     table->current_interval = &table->intervals[0];
50
51     return table;
52 }
53
54 void multiple_table_destroy(multiple_flow_table table) {
55     multiple_flow *current, *next;
56
57     if ((current = table->flows_head)) {
58         while (current->next) {
59             next = current->next;
60             free(current->intervals);
61             free(current);
62             current = next;
63         }
64         free(current->intervals);
65         free(current);
66     }
67
68     free(table->intervals);
69     free(table);
70 }
71
72 /* Looks for the flow in the table.  If the flow isn't there, it allocates a
73  * place for it. */
74 multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key) {
75     uint32_t hash;
76     multiple_flow *flow;
77     struct in_addr src, dst;
78     char sip[22], dip[22];
79     int i;
80
81     if (table == NULL) {
82         return NULL;
83     }
84
85     hash = table->hash_function(key);
86
87     /* Find the flow, if it's there. */
88     for (flow = table->flows[hash]; flow; flow = flow->nexth) {
89         if (flow->source_ip == key->source_ip &&
90                 flow->dest_ip == key->dest_ip &&
91                 flow->source_port == key->source_port &&
92                 flow->dest_port == key->dest_port &&
93                 flow->protocol == key->protocol) {
94             break;
95         }
96     }
97
98     if (flow == NULL) {
99         flow = malloc(sizeof(multiple_flow));
100         if (flow == NULL) {
101             printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
102             return NULL;
103         }
104         memset(flow, 0, sizeof(multiple_flow));
105
106         flow->intervals = malloc(table->interval_count * sizeof(interval));
107         if (flow->intervals == NULL) {
108             free(flow);
109             printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
110             return NULL;
111         }
112         memset(flow->intervals, 0, table->interval_count * sizeof(interval));
113
114         flow->protocol = key->protocol;
115         flow->source_ip = key->source_ip;
116         flow->dest_ip = key->dest_ip;
117         flow->source_port = key->source_port;
118         flow->dest_port = key->dest_port;
119
120         flow->intervals[0].last_packet = key->packet_time;
121         flow->intervals[0].last_update = table->common->last_update;
122         flow->intervals[0].valid = 1;
123
124         for (i = 0; i < table->interval_count; ++i) {
125             flow->intervals[i].next = &flow->intervals[(i + 1) % table->interval_count];
126         }
127
128         flow->current_interval = &flow->intervals[0];
129
130         /* Add the flow to the hash list. */
131         flow->nexth = table->flows[hash];
132         table->flows[hash] = flow;
133
134         /* Add the flow to the linked list. */
135         if (table->flows_tail) {
136             flow->prev = table->flows_tail;
137             table->flows_tail->next = flow;
138             table->flows_tail = flow;
139         } else {
140             table->flows_head = table->flows_tail = flow;
141             /* next and prev are already null due to memset above. */
142         }
143
144         src.s_addr = ntohl(flow->source_ip);
145         dst.s_addr = ntohl(flow->dest_ip);
146         strcpy(sip, inet_ntoa(src));
147         strcpy(dip, inet_ntoa(dst));
148         printlog(LOG_DEBUG, "ALLOC:%s:%hu -> %s:%hu\n", sip,
149                 flow->source_port, dip, flow->dest_port);
150     }
151
152     return flow;
153 }
154
155 int multiple_table_sample(multiple_flow_table table, const key_flow *key) {
156     multiple_flow *flow;
157
158     assert(table != NULL);
159     assert(table->common != NULL);
160
161     /* Update aggregate. */
162     //table->common->bytes_since += key->packet_size;
163     table->current_interval->bytes_since += key->packet_size;
164     table->current_interval->valid = 1;
165
166     /* Update flow. */
167     flow = multiple_table_lookup(table, key);
168     if (flow == NULL) {
169         return 0;
170     }
171
172     /* Update flow's last packet info so that we know when to delete. */
173     flow->last_packet = key->packet_time;
174
175     /* Update interval information. */
176     flow->current_interval->bytes_since += key->packet_size;
177     flow->current_interval->last_packet = key->packet_time;
178     flow->current_interval->valid = 1;
179
180     return 1;
181 }
182
183 void multiple_table_remove(multiple_flow_table table, multiple_flow *flow) {
184     key_flow key;
185     uint32_t hash;
186
187     assert(flow);
188
189     /* Remove the flow from the hash list. */
190     key.source_ip = flow->source_ip;
191     key.dest_ip = flow->dest_ip;
192     key.source_port = flow->source_port;
193     key.dest_port = flow->dest_port;
194     key.protocol = flow->protocol;
195
196     hash = table->hash_function(&key);
197
198     assert(table->flows[hash]);
199
200     if (table->flows[hash] == flow) {
201         /* It's the head of the hash list. */
202         table->flows[hash] = flow->nexth;
203     } else {
204         multiple_flow *current, *prev;
205         
206         prev = table->flows[hash];
207
208         for (current = table->flows[hash]->nexth; current; current = current->nexth) {
209             if (current == flow) {
210                 prev->nexth = flow->nexth;
211                 break;
212             } else {
213                 prev = current;
214             }
215         }
216
217         if (current == NULL) {
218             printlog(LOG_CRITICAL, "Flow %p disappeared?\n", flow);
219         }
220         assert(current != NULL);
221     }
222
223     /* Remove the flow from the linked list. */
224     if (flow->prev == NULL && flow->next == NULL) {
225         /* It's the head, tail, and only element of the list. */
226         assert(table->flows_head == flow);
227         assert(table->flows_tail == flow);
228
229         table->flows_head = NULL;
230         table->flows_tail = NULL;
231     } else if (flow->prev == NULL) {
232         /* It's the head of the list. */
233         assert(table->flows_head == flow);
234
235         table->flows_head = flow->next;
236
237         if (table->flows_head != NULL) {
238             table->flows_head->prev = NULL;
239         }
240     } else if (flow->next == NULL) {
241         /* It's the tail of the list. */
242         assert(table->flows_tail == flow);
243
244         table->flows_tail = flow->prev;
245
246         table->flows_tail->next = NULL;
247     } else {
248         /* Not the head or tail of the list. */
249         assert(table->flows_head != flow);
250
251         flow->prev->next = flow->next;
252
253         if (flow->next != NULL) {
254             flow->next->prev = flow->prev;
255         }
256     }
257
258     /* Free the interval info. */
259     memset(flow->intervals, 0, table->interval_count * sizeof(interval));
260     free(flow->intervals);
261
262     /* Free the flow. */
263     memset(flow, 0, sizeof(multiple_flow));
264     free(flow);
265 }
266
267 int multiple_table_cleanup(multiple_flow_table table) {
268     multiple_flow *current = table->flows_head;
269     multiple_flow *remove;
270     time_t now = time(NULL);
271
272     while (current != NULL) {
273         if (current->last_packet + MUL_FLOW_IDLE_TIME <= now) {
274             /* Flow hasn't received a packet in the time limit - kill it. */
275             remove = current;
276             current = current->next;
277
278             multiple_table_remove(table, remove);
279         } else {
280             current = current->next;
281         }
282     }
283
284     return 0;
285 }
286
287 static interval *get_oldest_interval(interval *newest) {
288     interval *candidate = newest;
289     interval *oldest = NULL;
290
291     while (oldest == NULL) {
292         candidate = candidate->next;
293
294         if (candidate == newest) {
295             oldest = newest;
296         } else if (candidate->valid) {
297             oldest = candidate;
298         }
299     }
300
301     return oldest;
302 }
303
304 static uint32_t get_bytes_over_interval(interval *newest, interval *oldest) {
305     uint32_t result = newest->bytes_since;
306     interval *current = oldest;
307
308     while (current != newest) {
309         result += current->bytes_since;
310         current = current->next;
311     }
312
313     return result;
314 }
315
316 void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight) {
317     uint32_t maxflowrate = 0;
318     double time_delta;
319     double unweighted_rate;
320     multiple_flow *current;
321     struct in_addr src, dst;
322     char sip[22], dip[22];
323     key_flow largest_flow_info;
324
325     /* Table interval variables. */
326     interval *table_newest = NULL;
327     interval *table_oldest = NULL;
328     uint32_t table_bytes_over_intervals = 0;
329
330     /* Reset statistics. */
331     table->common->num_flows = 0;
332     table->common->num_flows_5k = 0;
333     table->common->num_flows_10k = 0;
334     table->common->num_flows_20k = 0;
335     table->common->num_flows_50k = 0;
336     table->common->avg_rate = 0;
337     /* End statistics. */
338
339     table_newest = table->current_interval;
340     table_oldest = get_oldest_interval(table_newest);
341
342     table_bytes_over_intervals = get_bytes_over_interval(table_newest, table_oldest);
343
344     time_delta = timeval_subtract(now, table_oldest->last_update);
345
346     if (time_delta <= 0) {
347         unweighted_rate = 0;
348     } else {
349         unweighted_rate = table_bytes_over_intervals / time_delta;
350     }
351
352     table->common->last_inst_rate = table->common->inst_rate;
353     table->common->inst_rate = unweighted_rate;
354     printf("Unweighted rate is: %.3f, computed from %d bytes in %f seconds\n", unweighted_rate, table_bytes_over_intervals, time_delta);
355
356     table->common->last_rate = table->common->rate;
357
358     /* If the rate is zero, then we don't know anything yet.  Don't apply EWMA
359      * in that case. */
360     if (table->common->rate == 0) {
361         table->common->rate = unweighted_rate;
362     } else {
363         //FIXME: Continue to use ewma here?
364         table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
365     }
366
367     table->common->last_update = now;
368     table->current_interval = table->current_interval->next;
369     table->current_interval->last_update = now;
370     table->current_interval->bytes_since = 0;
371     table->current_interval->valid = 1;
372
373     /* Update per-flow information. */
374     for (current = table->flows_head; current; current = current->next) {
375         interval *newest = current->current_interval;
376         interval *oldest = get_oldest_interval(newest);
377         uint32_t bytes_over_intervals = 0;
378
379         /* This flow is invalid - don't consider it further. */
380         if (newest->valid == 0) {
381             printlog(LOG_WARN, "Found invalid flow in table.\n");
382             continue;
383         }
384
385         time_delta = timeval_subtract(now, oldest->last_update);
386         bytes_over_intervals = get_bytes_over_interval(newest, oldest);
387
388         if (time_delta <= 0) {
389             unweighted_rate = 0;
390         } else {
391             unweighted_rate = bytes_over_intervals / time_delta;
392         }
393
394         current->last_rate = current->rate;
395
396         if (current->rate == 0) {
397             current->rate = unweighted_rate;
398         } else {
399             //FIXME: Continue to use ewma here?
400             current->rate = current->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
401         }
402
403         /* Update the accounting info for intervals. */
404         current->current_interval = current->current_interval->next;
405         current->current_interval->last_update = now;
406         current->current_interval->bytes_since = 0;
407         current->current_interval->valid = 1;
408
409         if (current->rate > maxflowrate) {
410             maxflowrate = current->rate;
411             largest_flow_info.source_ip = current->source_ip;
412             largest_flow_info.dest_ip = current->dest_ip;
413             largest_flow_info.source_port = current->source_port;
414             largest_flow_info.dest_port = current->dest_port;
415             largest_flow_info.protocol = current->protocol;
416         }
417
418         if (current->rate > 51200) {
419             table->common->num_flows_50k += 1;
420             table->common->num_flows_20k += 1;
421             table->common->num_flows_10k += 1;
422             table->common->num_flows_5k += 1;
423             table->common->num_flows += 1;
424         } else if (current->rate > 20480) {
425             table->common->num_flows_20k += 1;
426             table->common->num_flows_10k += 1;
427             table->common->num_flows_5k += 1;
428             table->common->num_flows += 1;
429         } else if (current->rate > 10240) {
430             table->common->num_flows_10k += 1;
431             table->common->num_flows_5k += 1;
432             table->common->num_flows += 1;
433         } else if (current->rate > 5120) {
434             table->common->num_flows_5k += 1;
435             table->common->num_flows += 1;
436         } else {
437             table->common->num_flows += 1;
438         }
439
440         src.s_addr = ntohl(current->source_ip);
441         dst.s_addr = ntohl(current->dest_ip);
442         strcpy(sip, inet_ntoa(src));
443         strcpy(dip, inet_ntoa(dst));
444         printlog(LOG_DEBUG, "FLOW: (%p)  %s:%d -> %s:%d at %d\n", current,
445                 sip, current->source_port,
446                 dip, current->dest_port,
447                 current->rate);
448     }
449
450     if (table->common->num_flows > 0) {
451         table->common->avg_rate = table->common->rate / table->common->num_flows;
452     }
453
454     printlog(LOG_DEBUG, "FLOW:--\n--\n");
455
456     table->common->max_flow_rate = maxflowrate;
457     table->common->max_flow_rate_flow_hash = table->hash_function(&largest_flow_info);
458 }