Cleaned up a lot of printing.
[distributedratelimiting.git] / drl / standard.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 #include <arpa/inet.h>
4 #include <assert.h>
5 #include <inttypes.h>
6 #include <netinet/in.h>
7 #include <pthread.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <sys/time.h>
12 #include <sys/types.h>
13 #include <time.h>
14
15 #include "common_accounting.h"
16 #include "standard.h"
17 #include "logging.h"
18
19 standard_flow_table standard_table_create(uint32_t (*hash_function)(const key_flow *key), common_accounting_t *common) {
20     standard_flow_table table = malloc(sizeof(struct std_flow_table));
21
22     if (table == NULL) {
23         return NULL;
24     }
25
26     memset(table, 0, sizeof(struct std_flow_table));
27     table->common = common;
28     table->hash_function = hash_function;
29
30     gettimeofday(&table->common->last_update, NULL);
31
32     return table;
33 }
34
35 void standard_table_destroy(standard_flow_table table) {
36     standard_flow *current, *next;
37
38     if ((current = table->flows_head)) {
39         while (current->next) {
40             next = current->next;
41             free(current);
42             current = next;
43         }
44         free(current);
45     }
46
47     free(table);
48 }
49
50 /* Looks for the flow in the table.  If the flow isn't there, it allocates a
51  * place for it. */
52 standard_flow *standard_table_lookup(standard_flow_table table, const key_flow *key) {
53     uint32_t hash;
54     standard_flow *flow;
55     struct in_addr src, dst;
56     char sip[22], dip[22];
57
58     if (table == NULL) {
59         return NULL;
60     }
61
62     hash = table->hash_function(key);
63
64     /* Find the flow, if it's there. */
65     for (flow = table->flows[hash]; flow; flow = flow->nexth) {
66         if (flow->source_ip == key->source_ip &&
67                 flow->dest_ip == key->dest_ip &&
68                 flow->source_port == key->source_port &&
69                 flow->dest_port == key->dest_port &&
70                 flow->protocol == key->protocol) {
71             break;
72         }
73     }
74
75     if (flow == NULL) {
76         flow = malloc(sizeof(standard_flow));
77         if (flow == NULL) {
78             printlog(LOG_CRITICAL, "standard.c: Malloc returned NULL.\n");
79             return NULL;
80         }
81
82         memset(flow, 0, sizeof(standard_flow));
83         flow->protocol = key->protocol;
84         flow->source_ip = key->source_ip;
85         flow->dest_ip = key->dest_ip;
86         flow->source_port = key->source_port;
87         flow->dest_port = key->dest_port;
88         flow->last_packet = key->packet_time;
89         gettimeofday(&flow->last_update, NULL);
90
91         /* Add the flow to the hash list. */
92         flow->nexth = table->flows[hash];
93         table->flows[hash] = flow;
94
95         /* Add the flow to the linked list. */
96         if (table->flows_tail) {
97             flow->prev = table->flows_tail;
98             table->flows_tail->next = flow;
99             table->flows_tail = flow;
100         } else {
101             table->flows_head = table->flows_tail = flow;
102             /* next and prev are already null due to memset above. */
103         }
104
105         src.s_addr = ntohl(flow->source_ip);
106         dst.s_addr = ntohl(flow->dest_ip);
107         strcpy(sip, inet_ntoa(src));
108         strcpy(dip, inet_ntoa(dst));
109         printlog(LOG_DEBUG, "ALLOC:%s:%hd -> %s:%hd\n", sip,
110                 flow->source_port, dip, flow->dest_port);
111     }
112
113     return flow;
114 }
115
116 int standard_table_sample(standard_flow_table table, const key_flow *key) {
117     standard_flow *flow;
118
119     assert(table != NULL);
120     assert(table->common != NULL);
121
122     /* Update aggregate. */
123     table->common->bytes_since += key->packet_size;
124
125     /* Update flow. */
126     flow = standard_table_lookup(table, key);
127     if (flow == NULL) {
128         return 0;
129     }
130
131     flow->bytes_since += key->packet_size;
132     flow->last_packet = key->packet_time;
133
134     return 1;
135 }
136
137 void standard_table_remove(standard_flow_table table, standard_flow *flow) {
138     key_flow key;
139     uint32_t hash;
140     standard_flow *current, *prev;
141
142     assert(flow);
143
144     /* Remove the flow from the hash list. */
145     key.source_ip = flow->source_ip;
146     key.dest_ip = flow->dest_ip;
147     key.source_port = flow->source_port;
148     key.dest_port = flow->dest_port;
149     key.protocol = flow->protocol;
150
151     hash = table->hash_function(&key);
152
153     assert(table->flows[hash]);
154
155     if (table->flows[hash] == flow) {
156         /* It's the head of the hash list. */
157         table->flows[hash] = flow->nexth;
158     } else {
159         prev = table->flows[hash];
160         current = table->flows[hash]->nexth;
161
162         while (current != NULL) {
163             if (current == flow) {
164                 prev->nexth = flow->nexth;
165                 break;
166             } else {
167                 prev = current;
168                 current = current->next;
169             }
170         }
171
172         assert(current != NULL);
173     }
174
175     /* Remove the flow from the linked list. */
176     if (flow->prev == NULL && flow->next == NULL) {
177         /* It's the head, tail, and only element of the list. */
178         assert(table->flows_head == flow);
179         assert(table->flows_tail == flow);
180
181         table->flows_head = NULL;
182         table->flows_tail = NULL;
183     } else if (flow->prev == NULL) {
184         /* It's the head of the list. */
185         assert(table->flows_head == flow);
186
187         table->flows_head = flow->next;
188
189         if (table->flows_head != NULL) {
190             table->flows_head->prev = NULL;
191         }
192     } else if (flow->next == NULL) {
193         /* It's the tail of the list. */
194         assert(table->flows_tail == flow);
195
196         table->flows_tail = flow->prev;
197
198         table->flows_tail->next = NULL;
199     } else {
200         /* Not the head or tail of the list. */
201         assert(table->flows_head != flow);
202
203         flow->prev->next = flow->next;
204
205         if (flow->next != NULL) {
206             flow->next->prev = flow->prev;
207         }
208     }
209
210     memset(flow, 0, sizeof(standard_flow));
211
212     /* Free the flow. */
213     free(flow);
214 }
215
216 int standard_table_cleanup(standard_flow_table table) {
217     standard_flow *current = table->flows_head;
218     standard_flow *remove;
219     time_t now = time(NULL);
220
221     while (current != NULL) {
222         if (current->last_packet + FLOW_IDLE_TIME <= now) {
223             /* Flow hasn't received a packet in the time limit - kill it. */
224             remove = current;
225             current = current->next;
226
227             standard_table_remove(table, remove);
228         } else {
229             current = current->next;
230         }
231     }
232
233     return 0;
234 }
235
236 void standard_table_update_flows(standard_flow_table table, struct timeval now, double ewma_weight) {
237     uint32_t maxflowrate = 0;
238     double time_delta;
239     double unweighted_rate;
240     standard_flow *current;
241     struct in_addr src, dst;
242     char sip[22], dip[22];
243
244     time_delta = timeval_subtract(now, table->common->last_update);
245
246     if (time_delta <= 0) {
247         unweighted_rate = 0;
248     } else {
249         unweighted_rate = table->common->bytes_since / time_delta;
250     }
251
252     table->common->last_inst_rate = table->common->inst_rate;
253     table->common->inst_rate = unweighted_rate;
254
255     table->common->last_rate = table->common->rate;
256
257     /* If the rate is zero, then we don't know anything yet.  Don't apply EWMA
258      * in that case. */
259     if (table->common->rate == 0) {
260         table->common->rate = unweighted_rate;
261     } else {
262         table->common->rate = table->common->rate * ewma_weight +
263                               unweighted_rate * (1 - ewma_weight);
264     }
265
266     table->common->bytes_since = 0;
267     table->common->last_update = now;
268
269     /* Update per-flow information. */
270     for (current = table->flows_head; current; current = current->next) {
271         time_delta = timeval_subtract(now, current->last_update);
272
273         if (time_delta <= 0) {
274             unweighted_rate = 0;
275         } else {
276             unweighted_rate = current->bytes_since / time_delta;
277         }
278
279         current->last_rate = current->rate;
280
281         if (current->rate == 0) {
282             current->rate = unweighted_rate;
283         } else {
284             current->rate = current->rate * ewma_weight +
285                             unweighted_rate * (1 - ewma_weight);
286         }
287
288         current->bytes_since = 0;
289         current->last_update = now;
290
291         if (current->rate > maxflowrate) {
292             maxflowrate = current->rate;
293         }
294
295         src.s_addr = ntohl(current->source_ip);
296         dst.s_addr = ntohl(current->dest_ip);
297         strcpy(sip, inet_ntoa(src));
298         strcpy(dip, inet_ntoa(dst));
299         printlog(LOG_DEBUG, "FLOW: (%p)  %s:%d -> %s:%d at %d\n", current,
300                 sip, current->source_port,
301                 dip, current->dest_port,
302                 current->rate);
303     }
304
305     printlog(LOG_DEBUG, "FLOW:--\n--\n");
306
307     table->common->max_flow_rate = maxflowrate;
308 }