Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / estimate.c
diff --git a/drl/estimate.c b/drl/estimate.c
new file mode 100644 (file)
index 0000000..f1dd142
--- /dev/null
@@ -0,0 +1,486 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+/*
+ * Thread to periodically calculate the estimated local limits
+ * Barath Raghavan 2006/2007
+ * Ken Yocum 2007
+ * Kevin Webb 2007/2008
+ */
+
+/** The size of the buffer we use to hold tc commands. */
+#define CMD_BUFFER_SIZE 200
+
+/* DRL specifics */
+#include "raterouter.h" 
+#include "util.h"
+#include "ratetypes.h" /* needs util and pthread.h */
+#include "logging.h"
+
+static int underlimit_flowcount_count = 0;
+static int underlimit_normal_count = 0;
+
+/**
+ * Called for each identity each estimate interval.  Uses flow table information
+ * to estimate the current aggregate rate and the rate of the individual flows
+ * in the table.
+ */
+static void estimate(identity_t *ident) {
+    struct timeval now;
+
+    gettimeofday(&now, NULL);
+
+    pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
+
+    ident->table_update_function(ident->table, now, ident->ewma_weight);
+
+    pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
+}
+
+/**
+ * Determines the FPS weight allocation when the identity is under its current
+ * local rate limit.
+ */
+static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
+    uint32_t target = local_rate;
+    double ideal_weight;
+    double total_weight = peer_weights + ident->last_localweight;
+
+    if (ident->flowstart) {
+        target = local_rate*4;
+        if (local_rate >= FLOW_START_THRESHOLD) {
+            ident->flowstart = false;
+        }
+    }
+    else {
+        /* June 16, 2008 (KCW)
+         * ident->flowstart gets set initially to one, but it is never set again.  However,
+         * if a limiter gets flows and then the number of flows drops to zero, it has trouble
+         * increasing the limit again. */
+        if (local_rate < FLOW_START_THRESHOLD) {
+            ident->flowstart = true;
+        }
+    }
+
+    if (target >= ident->limit) {
+        ideal_weight = total_weight;
+    } else if (target <= 0) {
+        ideal_weight = 0; // no flows here
+    } else {
+        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+    }
+
+#if 0
+    else if (peer_weights <= 0) {
+#if 0
+        // doesn't matter what we pick as our weight, so pick 1 / N.
+        ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
+#endif
+        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+    } else {
+#if 0
+        double divisor = (double) ident->limit - (double) target;
+        ideal_weight = ((double) target * peer_weights) / divisor;
+#else
+        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+#endif
+    }
+#endif
+
+    return ideal_weight;
+}
+
+/**
+ * Determines the FPS weight allocation when the identity is over its current
+ * local rate limit.
+ */
+static double allocate_fps_over_limit(identity_t *ident) {
+    double ideal_weight;
+
+    if (ident->common.max_flow_rate > 0) {
+        ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
+
+        printlog(LOG_DEBUG, "%.3f  %d  %d  FlowCount, TotalRate, MaxRate\n",
+                ideal_weight, ident->common.rate, ident->common.max_flow_rate);
+    } else {
+        ideal_weight = 1;
+    }
+
+    return ideal_weight;
+}
+
+/**
+ * Determines the amount of FPS weight to allocate to the identity during each
+ * estimate interval.  Note that total_weight includes local weight.
+ */
+static uint32_t allocate_fps(identity_t *ident, double total_weight) {
+    common_accounting_t *ftable = &ident->common; /* Common flow table info */
+    uint32_t local_rate = ftable->rate;
+    uint32_t ideallocal = 0;
+    double peer_weights; /* sum of weights of all other limiters  */
+    double idealweight = 0;
+    double last_portion = 0;
+    double this_portion = 0;
+
+    static int dampen = 0;
+    int dampen_increase = 0;
+
+    double ideal_under = 0;
+    double ideal_over = 0;
+
+    int regime = 0;
+
+    /* two cases:
+       1. the aggregate is < limit
+       2. the aggregate is >= limit
+       */
+    peer_weights = total_weight - ident->last_localweight;
+    if (peer_weights < 0) {
+        peer_weights = 0;
+    }
+
+    if (dampen == 1) {
+        int64_t rate_delta =
+            (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
+        double threshold =
+            (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+
+        if (rate_delta > threshold) {
+            dampen_increase = 1;
+            printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
+                     rate_delta, threshold);
+        }
+    }
+
+    if (local_rate <= 0) {
+        idealweight = 0;
+    } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) {
+        /* We're under the limit - all flows are bottlenecked. */
+        idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
+        ideal_over = allocate_fps_over_limit(ident);
+        ideal_under = idealweight;
+
+        if (ideal_over < idealweight) {
+            idealweight = ideal_over;
+            regime = 3;
+            dampen = 2;
+            underlimit_flowcount_count += 1;
+        } else {
+            regime = 1;
+            dampen = 0;
+            underlimit_normal_count += 1;
+        }
+
+        /* Apply EWMA */
+        ident->localweight = (ident->localweight * ident->ewma_weight +
+                              idealweight * (1 - ident->ewma_weight));
+        
+    } else {
+        idealweight = allocate_fps_over_limit(ident);
+        
+        /* Apply EWMA */
+        ident->localweight = (ident->localweight * ident->ewma_weight +
+                              idealweight * (1 - ident->ewma_weight));
+
+        /* This is the portion of the total weight in the system that was caused
+         * by this limiter in the last interval. */
+        last_portion = ident->last_localweight / total_weight;
+
+        /* This is the fraction of the total weight in the system that our
+         * proposed value for idealweight would use. */
+        this_portion = ident->localweight / (peer_weights + ident->localweight);
+
+        /* Dampen the large increase the first time... */
+        if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
+            ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
+            dampen = 1;
+        } else {
+            dampen = 2;
+        }
+
+        ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
+        ideal_over = idealweight;
+
+        regime = 2;
+    }
+
+    /* Convert weight into a rate - add in our new local weight */
+    total_weight = ident->localweight + peer_weights;
+
+    /* compute local allocation:
+       if there is traffic elsewhere, use the weights
+       otherwise do a L/n allocation */
+    if (total_weight > 0) {
+    //if (peer_weights > 0) {
+        ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
+    } else {
+        ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
+    }
+
+    printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
+
+    printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
+            ideal_under / (ideal_under + peer_weights),
+            ideal_over / (ideal_over + peer_weights),
+            ident->localweight / (ident->localweight + peer_weights),
+            (double) local_rate / (double) ident->limit);
+
+    printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
+
+    printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
+            local_rate, idealweight, ident->localweight, total_weight);
+
+    //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
+    //       dampen, dampen_increase, peer_weights, regime);
+
+    //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count);
+
+    if (regime == 3) {
+        printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
+                 ideal_over, ideal_under);
+    }
+
+    printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
+
+    return(ideallocal);
+}
+
+/**
+ * Determines the local drop probability for a GRD identity every estimate
+ * interval.
+ */
+static double allocate_grd(identity_t *ident, double aggdemand) {
+    double dropprob;
+    double global_limit = (double) (ident->limit);
+
+    if (aggdemand > global_limit) {
+        dropprob = (aggdemand-global_limit)/aggdemand;
+    } else {
+        dropprob = 0.0;
+    }
+    
+    //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+    //        ident->common.rate, aggdemand, dropprob);
+
+    return dropprob;
+}
+
+/** 
+ * Given current estimates of local rate (weight) and remote rates (weights)
+ * use GRD or FPS to calculate a new local limit. 
+ */
+static void allocate(limiter_t *limiter, identity_t *ident) {
+    /* Represents aggregate rate for GRD and aggregate weight for FPS. */
+    double comm_val = 0;
+
+    /* Read comm_val from comm layer. */
+    read_comm(&ident->comm, &comm_val);
+    printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
+
+    /* Experimental printing. */
+    printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
+             (double) ident->common.rate / (double) 128, ident->id);
+    ident->avg_bytes += ident->common.rate;
+    
+    if (limiter->policynum == POLICY_FPS) {
+        ident->locallimit = allocate_fps(ident, comm_val);
+        ident->last_localweight = ident->localweight;
+        
+        /* Update other limiters with our weight by writing to comm layer. */
+        write_local_value(&ident->comm, ident->localweight);
+    } else {
+        ident->locallimit = 0; /* Unused with GRD. */
+        ident->last_localdropprob = ident->localdropprob;
+        ident->localdropprob = allocate_grd(ident, comm_val);
+        
+        /* Update other limiters with our rate by writing to comm layer. */
+        write_local_value(&ident->comm, ident->common.rate);
+    }
+
+    /* Update identity state. */
+    ident->common.last_rate = ident->common.rate;
+}
+
+/**
+ * This is called once per estimate interval to enforce the rate that allocate
+ * has decided upon.  It makes calls to tc using system().
+ */
+static void enforce(limiter_t *limiter, identity_t *ident) {
+    char cmd[CMD_BUFFER_SIZE];
+    int ret = 0;
+
+    switch (limiter->policynum) {
+        case POLICY_FPS:
+
+            /* TC treats limits of 0 (8bit) as unlimited, which causes the
+             * entire rate limiting system to become unpredictable.  In
+             * reality, we also don't want any limiter to be able to set its
+             * limit so low that it chokes all of the flows to the point that
+             * they can't increase.  Thus, when we're setting a low limit, we
+             * make sure that it isn't too low by using the
+             * FLOW_START_THRESHOLD. */
+
+            if (ident->locallimit < FLOW_START_THRESHOLD) {
+                ident->locallimit = FLOW_START_THRESHOLD * 2;
+            }
+
+            /* Do not allow the node to set a limit higher than its
+             * administratively assigned upper limit (bwcap). */
+            if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
+                ident->locallimit = limiter->nodelimit;
+            }
+
+            printf("FPS: Setting local limit to %d\n", ident->locallimit);
+            printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
+
+            snprintf(cmd, CMD_BUFFER_SIZE,
+                     "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
+                     ident->htb_parent, ident->htb_node, ident->locallimit);
+
+            ret = system(cmd);
+
+            if (ret) {
+                /* FIXME: call failed.  What to do? */
+            }
+            break;
+
+        case POLICY_GRD:
+/* FIXME: Figure out where to enforce GRD. */
+#if 0
+            for (i = 0; i < ident->num_slices; i++){
+
+                sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+                        ident->xids[i],ident->xids[i], (100*ident->localdropprob));
+
+                ret = system(cmd);
+
+                if (ret==-1)
+                    print_system_error(ret);
+            }
+#endif
+            break;
+
+        default: 
+            printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
+            break;
+    }
+
+    return;
+}
+
+/**
+ * This function is periodically called to clean the stable instance's flow
+ * accounting tables for each identity.
+ */
+static void clean(drl_instance_t *instance) {
+    identity_t *ident = NULL;
+
+    map_reset_iterate(instance->ident_map);
+    while ((ident = map_next(instance->ident_map)) != NULL) {
+        pthread_mutex_lock(&ident->table_mutex);
+
+        ident->table_cleanup_function(ident->table);
+
+        pthread_mutex_unlock(&ident->table_mutex);
+    }
+
+    /* Periodically flush the log file. */
+    flushlog();
+}
+
+static void print_averages(drl_instance_t *instance, int print_interval) {
+    identity_t *ident = NULL;
+
+    map_reset_iterate(instance->ident_map);
+    while ((ident = map_next(instance->ident_map)) != NULL) {
+        ident->avg_bytes /= (double) print_interval;
+        //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
+        printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
+                 ident->avg_bytes / 128, ident->id);
+        //printf("%.3f \t Avg rate. ID:%d\n",
+        //         ident->avg_bytes / 128, ident->id);
+        ident->avg_bytes = 0;
+    }
+}
+
+/** Thread function to handle local rate estimation.
+ *
+ * None of our simple hashmap functions are thread safe, so we lock the limiter
+ * with an rwlock to prevent another thread from attempting to modify the set
+ * of identities.
+ *
+ * Each identity also has a private lock for its table.  This gets locked by
+ * table-modifying functions such as estimate and clean.
+ */
+void handle_estimation(void *arg) {
+    limiter_t *limiter = (limiter_t *) arg;
+    identity_t *ident = NULL;
+    int clean_timer, clean_wait_intervals;
+    useconds_t sleep_time = limiter->estintms * 1000;
+    uint32_t cal_slot = 0;
+    int print_interval = 1000 / (limiter->estintms);
+
+    sigset_t signal_mask;
+
+    sigemptyset(&signal_mask);
+    sigaddset(&signal_mask, SIGHUP);
+    pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
+
+    /* Determine the number of intervals we should wait before hitting the
+     * specified clean interval. (Converts seconds -> intervals). */
+    clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
+    clean_timer = clean_wait_intervals;
+
+    while (true) {
+        /* Sleep according to the delay of the estimate interval. */
+        usleep(sleep_time);
+
+        /* Grab the limiter lock for reading.  This prevents identities from
+         * disappearing beneath our feet. */
+        pthread_rwlock_rdlock(&limiter->limiter_lock);
+
+        cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
+
+        /* Service all the identities that are scheduled to run during this
+         * tick. */
+        while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
+            ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
+            TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
+
+            /* Update the ident's flow accouting table with the latest info. */
+            estimate(ident);
+
+            /* Determine its share of the rate allocation. */
+            allocate(limiter, ident);
+
+            /* Make tc calls to enforce the rate we decided upon. */
+            enforce(limiter, ident);
+
+            /* Tell the comm library to propagate this identity's result for
+             * this interval.*/
+            send_update(&ident->comm, ident->id);
+
+            /* Add ident back to the queue at a future time slot. */
+            TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
+                              ((cal_slot + ident->intervals) & SCHEDMASK),
+                              ident, calendar);
+        }
+
+        print_interval--;
+        if (loglevel() == LOG_DEBUG && print_interval <= 0) {
+            print_interval = 1000 / (limiter->estintms);
+            print_averages(&limiter->stable_instance, print_interval);
+        }
+
+        /* Check if enough intervals have passed for cleaning. */
+        if (clean_timer <= 0) {
+            clean(&limiter->stable_instance);
+            clean_timer = clean_wait_intervals;
+        } else {
+            clean_timer--;
+        }
+
+        limiter->stable_instance.cal_slot += 1;
+
+        pthread_rwlock_unlock(&limiter->limiter_lock); 
+    }
+}