Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / samplehold.c
diff --git a/drl/samplehold.c b/drl/samplehold.c
new file mode 100644 (file)
index 0000000..4b79f3e
--- /dev/null
@@ -0,0 +1,339 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "common_accounting.h"
+#include "samplehold.h"
+
+static int match(const key_flow *key, const sampled_flow *flow) {
+    if (flow->state != FLOW_USED)
+        return 0;
+
+    if (key->source_ip != flow->source_ip)
+        return 0;
+
+    if (key->dest_ip != flow->dest_ip)
+        return 0;
+
+    if (key->source_port != flow->source_port)
+        return 0;
+
+    if (key->dest_port != flow->dest_port)
+        return 0;
+
+    if (key->protocol != flow->protocol)
+        return 0;
+
+    return 1;
+}
+
+static void get_key(key_flow *key, sampled_flow *flow) {
+    key->source_ip = flow->source_ip;
+    key->dest_ip = flow->dest_ip;
+    key->source_port = flow->source_port;
+    key->dest_port = flow->dest_port;
+    key->protocol = flow->protocol;
+
+    key->packet_size = 0;
+}
+
+static void move_flow(sampled_flow *dest, sampled_flow *src) {
+    memmove(dest, src, sizeof(sampled_flow));
+    memset(src, 0, sizeof(sampled_flow));
+}
+
+uint32_t sampled_table_size(const sampled_flow_table table) {
+    return table->size;
+}
+
+/*
+ * Notes to myself...
+ *
+ * max_bytes is the maximum number of bytes that can pass though DURING THE
+ * MEASUREMENT INTERVAL.  So, if you can have 100 Mbit/s and your measurement
+ * interval is 1/10 of a second, your max_bytes is 10Mbit because that's all
+ * you can transfer in 1/10 of a second.
+ *
+ * flow_percentage is the percentage of max_bytes that is considered an
+ * interesting flow.
+ *
+ * oversampling factor is a knob that tunes how accurate our results are at
+ * the cost of additional state/memory.
+ */
+sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow *key), const uint32_t max_bytes, const uint32_t flow_percentage, const uint32_t oversampling_factor, common_accounting_t *common) {
+    sampled_flow_table table = malloc(sizeof(struct sampled_flow_table));
+    double base_size = (double) 100 / (double) flow_percentage;
+
+    if (table == NULL) {
+        return NULL;
+    }
+
+    table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03);
+    table->size = 0;
+    table->hash_function = hash_function;
+    table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
+    table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
+
+    table->largest = NULL;
+    table->backing = malloc(sizeof(sampled_flow) * table->capacity);
+
+    if (table->backing == NULL) {
+        free(table);
+        return NULL;
+    }
+
+    memset(table->backing, 0, sizeof(sampled_flow) * table->capacity);
+
+    srand(time(NULL));
+
+    table->common = common;
+    gettimeofday(&table->common->last_update, NULL);
+
+    return table;
+}
+
+void sampled_table_destroy(sampled_flow_table table) {
+    free(table->backing);
+    free(table);
+}
+
+sampled_flow *sampled_table_lookup(sampled_flow_table table, const key_flow *key) {
+    uint32_t hash = table->hash_function(key) % table->capacity;
+    uint32_t location = hash;
+
+    do {
+        if (table->backing[location].state == FLOW_FREE) {
+            /* It ain't here... */
+            return NULL;
+        }
+
+        if (match(key, &table->backing[location])) {
+            /* Got it! */
+            return &table->backing[location];
+        }
+
+        location++;
+        if (location == table->capacity) {
+            location = 0;
+        }
+    } while (location != hash);
+
+    return NULL;
+}
+
+int sampled_table_sample(sampled_flow_table table, const key_flow *key) {
+    sampled_flow *lookup = sampled_table_lookup(table, key);
+    int random_number;
+    double packet_prob;
+
+    /* First we update the common accouting information so that we have accurate
+     * aggregate information. */
+    table->common->bytes_since += key->packet_size;
+
+    /* Below here we're dealing with individual flows. */
+
+    /* It's already in the table, update it. */
+    if (lookup != NULL) {
+        lookup->bytes += key->packet_size;
+        return 1;
+    }
+
+    /* It's not in the table, probabilistically sample it. */
+    packet_prob = table->sample_prob * (double) key->packet_size;
+    random_number = rand() % RANDOM_GRANULARITY;
+
+    if (random_number < packet_prob) {
+        /* It's being sampled - add it to the table. */
+        uint32_t hash = table->hash_function(key) % table->capacity;
+        uint32_t location = hash;
+
+        do {
+            if (table->backing[location].state == FLOW_FREE ||
+                table->backing[location].state == FLOW_DELETED) {
+                lookup = &table->backing[location];
+                break;
+            }
+
+            location++;
+            if (location == table->capacity) {
+                location = 0;
+            }
+        } while (location != hash);
+
+        if (lookup == NULL) {
+            /* Table is full!?! */
+            printf("Full table!\n");
+            return 0;
+        }
+
+        table->size += 1;
+
+        lookup->bytes = key->packet_size;
+        lookup->source_ip = key->source_ip;
+        lookup->dest_ip = key->dest_ip;
+        lookup->source_port = key->source_port;
+        lookup->dest_port = key->dest_port;
+        lookup->protocol = key->protocol;
+        lookup->state = FLOW_USED;
+        lookup->last_bytes = 0;
+        lookup->rate = 0;
+
+        gettimeofday(&lookup->last_update, NULL);
+
+        return 1;
+    }
+
+    /* Not sampled. */
+    return 0;
+}
+
+int sampled_table_cleanup(sampled_flow_table table) {
+    /* This should...
+     * 1) Remove "small" flows from the table.
+     * 2) Compact the table so that the remaining flows are closer to their
+     * hash locations.
+     * 3) Reset the state of deleted flows to free.
+     */
+
+    /* How it might work...
+     * 1) Scan through the backing array.
+     * 2) If the flow is small, memset it to 0.
+     *    It it's large, add it to a linked list.
+     * 3) For all items in the linked list, hash them and put them in the
+     * correct location.
+     */
+
+    /* For now though, we're going to do it the inefficient way and loop
+     * through the backing twice.
+     */
+
+    int i;
+
+    /* Clear small items. */
+    for (i = 0; i < table->capacity; ++i) {
+        if (table->backing[i].state == FLOW_USED && table->backing[i].bytes > table->threshold) {
+            /* It gets to stick around. */
+        } else {
+            /* It dies... */
+            memset(&table->backing[i], 0, sizeof(sampled_flow));
+        }
+    }
+
+    /* Compact the table and put things closer to their hash locations. */
+    for (i = 0; i < table->capacity; ++i) {
+        if (table->backing[i].state == FLOW_USED) {
+            uint32_t hash;
+            key_flow key;
+            
+            get_key(&key, &table->backing[i]);
+            hash = table->hash_function(&key) % table->capacity;
+
+            if (i == hash) {
+                /* Already in the best place */
+                table->backing[i].bytes = 0;
+                table->backing[i].last_bytes = 0;
+                table->backing[i].rate = 0;
+            } else {
+                uint32_t location = hash;
+
+                do {
+                    if (table->backing[location].state == FLOW_FREE) {
+                        move_flow(&table->backing[location], &table->backing[i]);
+                        table->backing[location].bytes = 0;
+                        table->backing[location].last_bytes = 0;
+                        table->backing[location].rate = 0;
+                        break;
+                    }
+
+                    location++;
+                    if (location == table->capacity) {
+                        location = 0;
+                    }
+                } while (location != hash);
+            }
+        }
+    }
+
+    table->largest = NULL;
+
+    return 0;
+}
+
+void sampled_table_update_flows(sampled_flow_table table, struct timeval now, double ewma_weight) {
+    int i = 0;
+    uint32_t largest_rate = 0;
+    uint32_t rate_delta = 0;
+    double time_delta = 0;
+    double unweighted_rate = 0;
+
+    /* Update common aggregate information. */
+    time_delta = timeval_subtract(now, table->common->last_update);
+
+    if (time_delta <= 0) {
+        unweighted_rate = 0;
+    } else {
+        unweighted_rate = table->common->bytes_since / time_delta;
+    }
+
+    table->common->last_inst_rate = table->common->inst_rate;
+    table->common->inst_rate = unweighted_rate;
+
+    table->common->last_rate = table->common->rate;
+
+    /* If the rate is zero, then we don't know anything yet.  Don't apply EWMA
+     * in that case. */
+    if (table->common->rate == 0) {
+        table->common->rate = unweighted_rate;
+    } else {
+        table->common->rate = table->common->rate * ewma_weight +
+                              unweighted_rate * (1 - ewma_weight);
+    }
+
+    table->common->bytes_since = 0;
+    table->common->last_update = now;
+
+    /* Update per-flow information. */
+    table->largest = &table->backing[i];
+    largest_rate = table->backing[i].rate;
+
+    for (i = 0; i < table->capacity; ++i) {
+        if (table->backing[i].state == FLOW_USED) {
+            rate_delta = table->backing[i].bytes - table->backing[i].last_bytes;
+            time_delta = timeval_subtract(now, table->backing[i].last_update);
+
+            /* Calculate the unweighted rate.  Be careful not to divide by
+             * something silly. */
+            if (time_delta <= 0) {
+                unweighted_rate = 0;
+            } else {
+                unweighted_rate = rate_delta / time_delta; 
+            }
+
+            if (table->backing[i].rate == 0) {
+                table->backing[i].rate = unweighted_rate;
+            } else {
+                table->backing[i].rate = (table->backing[i].rate * ewma_weight +
+                                          unweighted_rate * (1 - ewma_weight));
+            }
+
+            table->backing[i].last_bytes = table->backing[i].bytes;
+            table->backing[i].last_update = now;
+
+            if (table->backing[i].rate > largest_rate) {
+                largest_rate = table->backing[i].rate;
+                table->largest = &table->backing[i];
+            }
+        }
+    }
+
+    table->common->max_flow_rate = largest_rate;
+}
+
+sampled_flow *sampled_table_largest(sampled_flow_table table) {
+    return table->largest;
+}