Added a handler for SIGUSR1, which toggles off/on the enforcement calls to tc.
[distributedratelimiting.git] / drl / estimate.c
index f1dd142..29704be 100644 (file)
@@ -7,6 +7,8 @@
  * Kevin Webb 2007/2008
  */
 
+#include <assert.h>
+
 /** The size of the buffer we use to hold tc commands. */
 #define CMD_BUFFER_SIZE 200
 
 #include "ratetypes.h" /* needs util and pthread.h */
 #include "logging.h"
 
-static int underlimit_flowcount_count = 0;
-static int underlimit_normal_count = 0;
+#define PRINT_COUNTER_RESET (7)
+
+extern uint8_t system_loglevel;
+static int printcounter = PRINT_COUNTER_RESET - 1;
+
+uint8_t do_enforcement = 0;
 
 /**
  * Called for each identity each estimate interval.  Uses flow table information
@@ -153,7 +159,8 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
 
     if (local_rate <= 0) {
         idealweight = 0;
-    } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) {
+    } else if (dampen_increase == 0 &&
+               (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
         /* We're under the limit - all flows are bottlenecked. */
         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
         ideal_over = allocate_fps_over_limit(ident);
@@ -163,11 +170,9 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
             idealweight = ideal_over;
             regime = 3;
             dampen = 2;
-            underlimit_flowcount_count += 1;
         } else {
             regime = 1;
             dampen = 0;
-            underlimit_normal_count += 1;
         }
 
         /* Apply EWMA */
@@ -204,7 +209,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
     }
 
     /* Convert weight into a rate - add in our new local weight */
-    total_weight = ident->localweight + peer_weights;
+    ident->total_weight = total_weight = ident->localweight + peer_weights;
 
     /* compute local allocation:
        if there is traffic elsewhere, use the weights
@@ -226,14 +231,23 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
 
     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
 
-    printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
+    if (system_loglevel == LOG_DEBUG) {
+        printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
             local_rate, idealweight, ident->localweight, total_weight);
+    }
+
+    if (printcounter <= 0) {
+        printlog(LOG_WARN, "%d %.1f %.1f %.1f %d %d %d %d %d %d ", local_rate, idealweight,
+            ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k, ftable->num_flows_10k,
+            ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate);
+        printcounter = PRINT_COUNTER_RESET;
+    } else {
+        printcounter -= 1;
+    }
 
     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
     //       dampen, dampen_increase, peer_weights, regime);
 
-    //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count);
-
     if (regime == 3) {
         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
                  ideal_over, ideal_under);
@@ -258,8 +272,10 @@ static double allocate_grd(identity_t *ident, double aggdemand) {
         dropprob = 0.0;
     }
     
-    //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
-    //        ident->common.rate, aggdemand, dropprob);
+    if (system_loglevel == LOG_DEBUG) {
+        printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+           ident->common.rate, aggdemand, dropprob);
+    }
 
     return dropprob;
 }
@@ -273,7 +289,13 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
     double comm_val = 0;
 
     /* Read comm_val from comm layer. */
-    read_comm(&ident->comm, &comm_val);
+    if (limiter->policy == POLICY_FPS) {
+        read_comm(&ident->comm, &comm_val,
+                ident->total_weight / (double) (ident->comm.remote_node_count + 1));
+    } else {
+        read_comm(&ident->comm, &comm_val,
+                (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
+    }
     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
 
     /* Experimental printing. */
@@ -281,7 +303,7 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
              (double) ident->common.rate / (double) 128, ident->id);
     ident->avg_bytes += ident->common.rate;
     
-    if (limiter->policynum == POLICY_FPS) {
+    if (limiter->policy == POLICY_FPS) {
         ident->locallimit = allocate_fps(ident, comm_val);
         ident->last_localweight = ident->localweight;
         
@@ -289,8 +311,8 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
         write_local_value(&ident->comm, ident->localweight);
     } else {
         ident->locallimit = 0; /* Unused with GRD. */
-        ident->last_localdropprob = ident->localdropprob;
-        ident->localdropprob = allocate_grd(ident, comm_val);
+        ident->last_drop_prob = ident->drop_prob;
+        ident->drop_prob = allocate_grd(ident, comm_val);
         
         /* Update other limiters with our rate by writing to comm layer. */
         write_local_value(&ident->comm, ident->common.rate);
@@ -300,6 +322,26 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
     ident->common.last_rate = ident->common.rate;
 }
 
+/**
+ * Traces all of the parent pointers of a leaf all the way to the root in
+ * order to find the maximum drop probability in the chain.
+ */
+static double find_leaf_drop_prob(leaf_t *leaf) {
+    identity_t *current = leaf->parent;
+    double result = 0;
+
+    assert(current);
+
+    while (current != NULL) {
+        if (current->drop_prob > result) {
+            result = current->drop_prob;
+        }
+        current = current->parent;
+    }
+
+    return result;
+}
+
 /**
  * This is called once per estimate interval to enforce the rate that allocate
  * has decided upon.  It makes calls to tc using system().
@@ -307,8 +349,9 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
 static void enforce(limiter_t *limiter, identity_t *ident) {
     char cmd[CMD_BUFFER_SIZE];
     int ret = 0;
+    int i = 0;
 
-    switch (limiter->policynum) {
+    switch (limiter->policy) {
         case POLICY_FPS:
 
             /* TC treats limits of 0 (8bit) as unlimited, which causes the
@@ -320,7 +363,7 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
              * FLOW_START_THRESHOLD. */
 
             if (ident->locallimit < FLOW_START_THRESHOLD) {
-                ident->locallimit = FLOW_START_THRESHOLD * 2;
+                ident->locallimit = FLOW_START_THRESHOLD;
             }
 
             /* Do not allow the node to set a limit higher than its
@@ -329,38 +372,83 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
                 ident->locallimit = limiter->nodelimit;
             }
 
-            printf("FPS: Setting local limit to %d\n", ident->locallimit);
+            if (system_loglevel == LOG_DEBUG) {
+                printf("FPS: Setting local limit to %d\n", ident->locallimit);
+            }
             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
 
+            if (printcounter == PRINT_COUNTER_RESET) {
+                printlog(LOG_WARN, "%d\n", ident->locallimit);
+            }
+
             snprintf(cmd, CMD_BUFFER_SIZE,
                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
                      ident->htb_parent, ident->htb_node, ident->locallimit);
 
-            ret = system(cmd);
+            if (do_enforcement) {
+                ret = system(cmd);
 
-            if (ret) {
-                /* FIXME: call failed.  What to do? */
+                if (ret) {
+                    /* FIXME: call failed.  What to do? */
+                    printlog(LOG_CRITICAL, "***TC call failed?***\n");
+                }
             }
             break;
 
         case POLICY_GRD:
-/* FIXME: Figure out where to enforce GRD. */
-#if 0
-            for (i = 0; i < ident->num_slices; i++){
-
-                sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
-                        ident->xids[i],ident->xids[i], (100*ident->localdropprob));
-
-                ret = system(cmd);
-
-                if (ret==-1)
-                    print_system_error(ret);
-            }
+            for (i = 0; i < ident->leaf_count; ++i) {
+                if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
+                    /* The new drop probability for this identity is greater
+                     * than or equal to the leaf's current drop probability.
+                     * We can safely use the larger value at this leaf
+                     * immediately. */
+                    ident->leaves[i]->drop_prob = ident->drop_prob;
+                } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
+                    /* The old drop probability for this identity is less than
+                     * the leaf's current drop probability.  This means that
+                     * this identity couldn't have been the limiting ident,
+                     * so nothing needs to be done because the old limiting
+                     * ident is still the limiting factor. */
+
+                    /* Intentionally blank. */
+                } else {
+                    /* If neither of the above are true, then...
+                     * 1) The new drop probability for the identity is less
+                     * than what it previously was, and
+                     * 2) This ident may have had the maximum drop probability
+                     * of all idents limiting this leaf, and therefore we need
+                     * to follow the leaf's parents up to the root to find the
+                     * new leaf drop probability safely. */
+                    ident->leaves[i]->drop_prob =
+                            find_leaf_drop_prob(ident->leaves[i]);
+                }
+
+                /* Make the call to tc. */
+#ifdef DELAY40MS
+                snprintf(cmd, CMD_BUFFER_SIZE,
+                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+                         ident->leaves[i]->xid, ident->leaves[i]->xid,
+                         (100 * ident->leaves[i]->drop_prob));
+#else
+                snprintf(cmd, CMD_BUFFER_SIZE,
+                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
+                         ident->leaves[i]->xid, ident->leaves[i]->xid,
+                         (100 * ident->leaves[i]->drop_prob));
 #endif
+                if (do_enforcement) {
+                    ret = system(cmd);
+
+                    if (ret) {
+                        /* FIXME: call failed.  What to do? */
+                        printlog(LOG_CRITICAL, "***TC call failed?***\n");
+                    }
+                }
+            }
+
             break;
 
         default: 
-            printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
+            printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
             break;
     }
 
@@ -423,6 +511,7 @@ void handle_estimation(void *arg) {
 
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
+    sigaddset(&signal_mask, SIGUSR1);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     /* Determine the number of intervals we should wait before hitting the