1 /* See the DRL-LICENSE file for this file's software license. */
6 #include <netinet/in.h>
12 #include <sys/types.h>
15 #include "common_accounting.h"
16 #include "multipleinterval.h"
19 multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common) {
21 multiple_flow_table table = malloc(sizeof(struct mul_flow_table));
27 memset(table, 0, sizeof(struct mul_flow_table));
28 table->common = common;
29 table->hash_function = hash_function;
30 table->interval_count = interval_count;
32 gettimeofday(&table->common->last_update, NULL);
34 table->intervals = malloc(interval_count * sizeof(interval));
36 if (table->intervals == NULL) {
41 memset(table->intervals, 0, interval_count * sizeof(interval));
42 table->intervals[0].valid = 1;
43 table->intervals[0].last_update = table->common->last_update;
45 for (i = 0; i < interval_count; ++i) {
46 table->intervals[i].next = &table->intervals[(i + 1) % interval_count];
49 table->current_interval = &table->intervals[0];
54 void multiple_table_destroy(multiple_flow_table table) {
55 multiple_flow *current, *next;
57 if ((current = table->flows_head)) {
58 while (current->next) {
60 free(current->intervals);
64 free(current->intervals);
68 free(table->intervals);
72 /* Looks for the flow in the table. If the flow isn't there, it allocates a
74 multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key) {
77 struct in_addr src, dst;
78 char sip[22], dip[22];
85 hash = table->hash_function(key);
87 /* Find the flow, if it's there. */
88 for (flow = table->flows[hash]; flow; flow = flow->nexth) {
89 if (flow->source_ip == key->source_ip &&
90 flow->dest_ip == key->dest_ip &&
91 flow->source_port == key->source_port &&
92 flow->dest_port == key->dest_port &&
93 flow->protocol == key->protocol) {
99 flow = malloc(sizeof(multiple_flow));
101 printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
104 memset(flow, 0, sizeof(multiple_flow));
106 flow->intervals = malloc(table->interval_count * sizeof(interval));
107 if (flow->intervals == NULL) {
109 printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
112 memset(flow->intervals, 0, table->interval_count * sizeof(interval));
114 flow->protocol = key->protocol;
115 flow->source_ip = key->source_ip;
116 flow->dest_ip = key->dest_ip;
117 flow->source_port = key->source_port;
118 flow->dest_port = key->dest_port;
120 flow->intervals[0].last_packet = key->packet_time;
121 flow->intervals[0].last_update = table->common->last_update;
122 flow->intervals[0].valid = 1;
124 for (i = 0; i < table->interval_count; ++i) {
125 flow->intervals[i].next = &flow->intervals[(i + 1) % table->interval_count];
128 flow->current_interval = &flow->intervals[0];
130 /* Add the flow to the hash list. */
131 flow->nexth = table->flows[hash];
132 table->flows[hash] = flow;
134 /* Add the flow to the linked list. */
135 if (table->flows_tail) {
136 flow->prev = table->flows_tail;
137 table->flows_tail->next = flow;
138 table->flows_tail = flow;
140 table->flows_head = table->flows_tail = flow;
141 /* next and prev are already null due to memset above. */
144 src.s_addr = ntohl(flow->source_ip);
145 dst.s_addr = ntohl(flow->dest_ip);
146 strcpy(sip, inet_ntoa(src));
147 strcpy(dip, inet_ntoa(dst));
148 printlog(LOG_DEBUG, "ALLOC:%s:%hu -> %s:%hu\n", sip,
149 flow->source_port, dip, flow->dest_port);
155 int multiple_table_sample(multiple_flow_table table, const key_flow *key) {
158 assert(table != NULL);
159 assert(table->common != NULL);
161 /* Update aggregate. */
162 //table->common->bytes_since += key->packet_size;
163 table->current_interval->bytes_since += key->packet_size;
164 table->current_interval->valid = 1;
167 flow = multiple_table_lookup(table, key);
172 /* Update flow's last packet info so that we know when to delete. */
173 flow->last_packet = key->packet_time;
175 /* Update interval information. */
176 flow->current_interval->bytes_since += key->packet_size;
177 flow->current_interval->last_packet = key->packet_time;
178 flow->current_interval->valid = 1;
183 void multiple_table_remove(multiple_flow_table table, multiple_flow *flow) {
189 /* Remove the flow from the hash list. */
190 key.source_ip = flow->source_ip;
191 key.dest_ip = flow->dest_ip;
192 key.source_port = flow->source_port;
193 key.dest_port = flow->dest_port;
194 key.protocol = flow->protocol;
196 hash = table->hash_function(&key);
198 assert(table->flows[hash]);
200 if (table->flows[hash] == flow) {
201 /* It's the head of the hash list. */
202 table->flows[hash] = flow->nexth;
204 multiple_flow *current, *prev;
206 prev = table->flows[hash];
208 for (current = table->flows[hash]->nexth; current; current = current->nexth) {
209 if (current == flow) {
210 prev->nexth = flow->nexth;
217 if (current == NULL) {
218 printlog(LOG_CRITICAL, "Flow %p disappeared?\n", flow);
220 assert(current != NULL);
223 /* Remove the flow from the linked list. */
224 if (flow->prev == NULL && flow->next == NULL) {
225 /* It's the head, tail, and only element of the list. */
226 assert(table->flows_head == flow);
227 assert(table->flows_tail == flow);
229 table->flows_head = NULL;
230 table->flows_tail = NULL;
231 } else if (flow->prev == NULL) {
232 /* It's the head of the list. */
233 assert(table->flows_head == flow);
235 table->flows_head = flow->next;
237 if (table->flows_head != NULL) {
238 table->flows_head->prev = NULL;
240 } else if (flow->next == NULL) {
241 /* It's the tail of the list. */
242 assert(table->flows_tail == flow);
244 table->flows_tail = flow->prev;
246 table->flows_tail->next = NULL;
248 /* Not the head or tail of the list. */
249 assert(table->flows_head != flow);
251 flow->prev->next = flow->next;
253 if (flow->next != NULL) {
254 flow->next->prev = flow->prev;
258 /* Free the interval info. */
259 memset(flow->intervals, 0, table->interval_count * sizeof(interval));
260 free(flow->intervals);
263 memset(flow, 0, sizeof(multiple_flow));
267 int multiple_table_cleanup(multiple_flow_table table) {
268 multiple_flow *current = table->flows_head;
269 multiple_flow *remove;
270 time_t now = time(NULL);
272 while (current != NULL) {
273 if (current->last_packet + MUL_FLOW_IDLE_TIME <= now) {
274 /* Flow hasn't received a packet in the time limit - kill it. */
276 current = current->next;
278 multiple_table_remove(table, remove);
280 current = current->next;
287 static interval *get_oldest_interval(interval *newest) {
288 interval *candidate = newest;
289 interval *oldest = NULL;
291 while (oldest == NULL) {
292 candidate = candidate->next;
294 if (candidate == newest) {
296 } else if (candidate->valid) {
304 static uint32_t get_bytes_over_interval(interval *newest, interval *oldest) {
305 uint32_t result = newest->bytes_since;
306 interval *current = oldest;
308 while (current != newest) {
309 result += current->bytes_since;
310 current = current->next;
316 void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight) {
317 uint32_t maxflowrate = 0;
319 double unweighted_rate;
320 multiple_flow *current;
321 struct in_addr src, dst;
322 char sip[22], dip[22];
323 key_flow largest_flow_info;
325 /* Table interval variables. */
326 interval *table_newest = NULL;
327 interval *table_oldest = NULL;
328 uint32_t table_bytes_over_intervals = 0;
330 /* Reset statistics. */
331 table->common->num_flows = 0;
332 table->common->num_flows_5k = 0;
333 table->common->num_flows_10k = 0;
334 table->common->num_flows_20k = 0;
335 table->common->num_flows_50k = 0;
336 table->common->avg_rate = 0;
337 /* End statistics. */
339 table_newest = table->current_interval;
340 table_oldest = get_oldest_interval(table_newest);
342 table_bytes_over_intervals = get_bytes_over_interval(table_newest, table_oldest);
344 time_delta = timeval_subtract(now, table_oldest->last_update);
346 if (time_delta <= 0) {
349 unweighted_rate = table_bytes_over_intervals / time_delta;
352 table->common->last_inst_rate = table->common->inst_rate;
353 table->common->inst_rate = unweighted_rate;
354 printf("Unweighted rate is: %.3f, computed from %d bytes in %f seconds\n", unweighted_rate, table_bytes_over_intervals, time_delta);
356 table->common->last_rate = table->common->rate;
358 /* If the rate is zero, then we don't know anything yet. Don't apply EWMA
360 if (table->common->rate == 0) {
361 table->common->rate = unweighted_rate;
363 //FIXME: Continue to use ewma here?
364 table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
367 table->common->last_update = now;
368 table->current_interval = table->current_interval->next;
369 table->current_interval->last_update = now;
370 table->current_interval->bytes_since = 0;
371 table->current_interval->valid = 1;
373 /* Update per-flow information. */
374 for (current = table->flows_head; current; current = current->next) {
375 interval *newest = current->current_interval;
376 interval *oldest = get_oldest_interval(newest);
377 uint32_t bytes_over_intervals = 0;
379 /* This flow is invalid - don't consider it further. */
380 if (newest->valid == 0) {
381 printlog(LOG_WARN, "Found invalid flow in table.\n");
385 time_delta = timeval_subtract(now, oldest->last_update);
386 bytes_over_intervals = get_bytes_over_interval(newest, oldest);
388 if (time_delta <= 0) {
391 unweighted_rate = bytes_over_intervals / time_delta;
394 current->last_rate = current->rate;
396 if (current->rate == 0) {
397 current->rate = unweighted_rate;
399 //FIXME: Continue to use ewma here?
400 current->rate = current->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
403 /* Update the accounting info for intervals. */
404 current->current_interval = current->current_interval->next;
405 current->current_interval->last_update = now;
406 current->current_interval->bytes_since = 0;
407 current->current_interval->valid = 1;
409 if (current->rate > maxflowrate) {
410 maxflowrate = current->rate;
411 largest_flow_info.source_ip = current->source_ip;
412 largest_flow_info.dest_ip = current->dest_ip;
413 largest_flow_info.source_port = current->source_port;
414 largest_flow_info.dest_port = current->dest_port;
415 largest_flow_info.protocol = current->protocol;
418 if (current->rate > 51200) {
419 table->common->num_flows_50k += 1;
420 table->common->num_flows_20k += 1;
421 table->common->num_flows_10k += 1;
422 table->common->num_flows_5k += 1;
423 table->common->num_flows += 1;
424 } else if (current->rate > 20480) {
425 table->common->num_flows_20k += 1;
426 table->common->num_flows_10k += 1;
427 table->common->num_flows_5k += 1;
428 table->common->num_flows += 1;
429 } else if (current->rate > 10240) {
430 table->common->num_flows_10k += 1;
431 table->common->num_flows_5k += 1;
432 table->common->num_flows += 1;
433 } else if (current->rate > 5120) {
434 table->common->num_flows_5k += 1;
435 table->common->num_flows += 1;
437 table->common->num_flows += 1;
440 src.s_addr = ntohl(current->source_ip);
441 dst.s_addr = ntohl(current->dest_ip);
442 strcpy(sip, inet_ntoa(src));
443 strcpy(dip, inet_ntoa(dst));
444 printlog(LOG_DEBUG, "FLOW: (%p) %s:%d -> %s:%d at %d\n", current,
445 sip, current->source_port,
446 dip, current->dest_port,
450 if (table->common->num_flows > 0) {
451 table->common->avg_rate = table->common->rate / table->common->num_flows;
454 printlog(LOG_DEBUG, "FLOW:--\n--\n");
456 table->common->max_flow_rate = maxflowrate;
457 table->common->max_flow_rate_flow_hash = table->hash_function(&largest_flow_info);