Added a check to see if the sending rate is within 1% of the limit rather than
[distributedratelimiting.git] / drl / ulogd_DRL.c
index 5d7ca7e..ef5cf20 100644 (file)
@@ -766,8 +766,37 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
     return 0;
 }
 
+static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
+    int count = 0;
+    identity_t *current_ident;
+    leaf_t *current_leaf;
+    leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
+    if (leaves == NULL) {
+        return 1;
+    }
+
+    map_reset_iterate(instance->leaf_map);
+    while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
+        current_ident = current_leaf->parent;
+        while (current_ident != NULL && current_ident != instance->last_machine) {
+            if (current_ident == ident) {
+                /* Found the ident we were looking for - add the leaf. */
+                leaves[count] = current_leaf;
+                count += 1;
+                break;
+            }
+            current_ident = current_ident->parent;
+        }
+    }
+
+    ident->leaves = leaves;
+    ident->leaf_count = count;
+
+    return 0;
+}
+
 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
-    int i;
+    int i, j;
     ident_config *config = configs.machines;
     leaf_t *leaf = NULL;
 
@@ -817,6 +846,18 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
                           instance->machines[i], calendar);
+
+        /* Setup the array of pointers to leaves.  This is easy for machines
+         * because a machine node applies to every leaf. */
+        instance->machines[i]->leaves =
+            malloc(instance->leaf_count * sizeof(leaf_t *));
+        if (instance->machines[i]->leaves == NULL) {
+            return ENOMEM;
+        }
+        instance->machines[i]->leaf_count = instance->leaf_count;
+        for (j = 0; j < instance->leaf_count; ++j) {
+            instance->machines[i]->leaves[j] = &instance->leaves[j];
+        }
     }
 
     /* Connect the set subtree to the machines. Any set or leaf without a
@@ -838,6 +879,13 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
                           instance->sets[i], calendar);
+
+        /* Setup the array of pointers to leaves.  This is harder for sets,
+         * but this doesn't need to be super-efficient because it happens
+         * rarely and it isn't on the critical path for reconfig(). */
+        if (fill_set_leaf_pointer(instance, instance->sets[i])) {
+            return ENOMEM;
+        }
     }
 
     /* Success. */
@@ -848,18 +896,20 @@ static void print_instance(drl_instance_t *instance) {
     leaf_t *leaf = NULL;
     identity_t *ident = NULL;
 
-    map_reset_iterate(instance->leaf_map);
-    while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
-        printf("%x:", leaf->xid);
-        ident = leaf->parent;
-        while (ident) {
-            printf("%d:",ident->id);
-            ident = ident->parent;
+    if (system_loglevel == LOG_DEBUG) {
+        map_reset_iterate(instance->leaf_map);
+        while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
+            printf("%x:", leaf->xid);
+            ident = leaf->parent;
+            while (ident) {
+                printf("%d:",ident->id);
+                ident = ident->parent;
+            }
+            printf("Leaf's parent pointer is %p\n", leaf->parent);
         }
-        printf("Leaf's parent pointer is %p\n", leaf->parent);
-    }
 
-    printf("instance->last_machine is %p\n", instance->last_machine);
+        printf("instance->last_machine is %p\n", instance->last_machine);
+    }
 }
 
 static int assign_htb_hierarchy(drl_instance_t *instance) {
@@ -874,7 +924,7 @@ static int assign_htb_hierarchy(drl_instance_t *instance) {
         } else {
             /* Pointerific! */
             instance->machines[i]->htb_parent =
-                        instance->machines[i]->parent->htb_node;
+                instance->machines[i]->parent->htb_node;
         }
 
         instance->machines[i]->htb_node = next_node;
@@ -1039,6 +1089,101 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
     }
     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
+#ifdef DELAY40MS
+    /* Only for artificial delay testing. */
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    /* End delay testing */
+#endif
+
+//#define SFQTEST
+
+#ifdef SFQTEST
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 sfq perturb 20");
+    execute_cmd(cmd);
+#endif
+
+    return 0;
+}
+
+static int setup_tc_grd(drl_instance_t *instance) {
+    int i;
+    char cmd[300];
+
+    for (i = 0; i < instance->leaf_count; ++i) {
+        /* Delete the old pfifo qdisc that might have been there before. */
+        sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+
+        if (execute_cmd(cmd)) {
+            printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
+        }
+
+        /* Add the netem qdisc. */
+#ifdef DELAY40MS
+        sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+#else
+        sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+#endif
+
+        if (execute_cmd(cmd)) {
+            return 1;
+        }
+    }
+
+    /* Do the same for 1000 and 1fff. */
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+
+    if (execute_cmd(cmd)) {
+        printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
+    }
+
+    /* Add the netem qdisc. */
+#ifdef DELAY40MS
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+#else
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
+#endif
+
+    if (execute_cmd(cmd)) {
+        return 1;
+    }
+
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
+
+    if (execute_cmd(cmd)) {
+        printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
+    }
+
+    /* Add the netem qdisc. */
+#ifdef DELAY40MS
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
+#else
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
+#endif
+
+    if (execute_cmd(cmd)) {
+        return 1;
+    }
+
     return 0;
 }
 
@@ -1097,9 +1242,9 @@ static int init_drl(void) {
 
     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
     if (strcasecmp(policy.u.string,"GRD") == 0) {
-        limiter.policynum = POLICY_GRD;
+        limiter.policy = POLICY_GRD;
     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
-        limiter.policynum = POLICY_FPS;
+        limiter.policy = POLICY_FPS;
     } else {
         printlog(LOG_CRITICAL,
                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
@@ -1155,13 +1300,27 @@ static int init_drl(void) {
     /* Debugging - FIXME: remove this? */
     print_instance(&limiter.stable_instance);
 
-    if (assign_htb_hierarchy(&limiter.stable_instance)) {
-        free_instance(&limiter.stable_instance);
-        return false;
-    }
+    switch (limiter.policy) {
+        case POLICY_FPS:
+            if (assign_htb_hierarchy(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+
+            if (create_htb_hierarchy(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+        break;
 
-    if (create_htb_hierarchy(&limiter.stable_instance)) {
-        free_instance(&limiter.stable_instance);
+        case POLICY_GRD:
+            if (setup_tc_grd(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+        break;
+
+        default:
         return false;
     }
 
@@ -1204,9 +1363,6 @@ static void reconfig() {
         return;
     }
 
-    /* Lock */
-    pthread_rwlock_wrlock(&limiter.limiter_lock);
-
     if (validate_configs(configs, &limiter.new_instance)) {
         free_failed_config(configs, &limiter.new_instance);
         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
@@ -1226,28 +1382,56 @@ static void reconfig() {
 
     /* Debugging - FIXME: remove this? */
     print_instance(&limiter.new_instance);
+    
+    /* Lock */
+    pthread_rwlock_wrlock(&limiter.limiter_lock);
 
-    if (assign_htb_hierarchy(&limiter.new_instance)) {
-        free_instance(&limiter.new_instance);
-        printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
-        pthread_rwlock_unlock(&limiter.limiter_lock);
-        return;
-    }
+    switch (limiter.policy) {
+        case POLICY_FPS:
+            if (assign_htb_hierarchy(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
+                pthread_rwlock_unlock(&limiter.limiter_lock);
+                return;
+            }
 
-    if (create_htb_hierarchy(&limiter.new_instance)) {
-        free_instance(&limiter.new_instance);
-        printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+            if (create_htb_hierarchy(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+
+                /* Re-create old instance. */
+                if (create_htb_hierarchy(&limiter.stable_instance)) {
+                    /* Error reinstating the old one - big problem. */
+                    printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
+                    printlog(LOG_CRITICAL, "Giving up...\n");
+                    flushlog();
+                    exit(EXIT_FAILURE);
+                }
 
-        /* Re-create old instance. */
-        if (create_htb_hierarchy(&limiter.stable_instance)) {
-            /* Error reinstating the old one - big problem. */
-            printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
-            flushlog();
-            exit(EXIT_FAILURE);
-        }
+                pthread_rwlock_unlock(&limiter.limiter_lock);
+                return;
+            }
+        break;
+
+        case POLICY_GRD:
+            if (setup_tc_grd(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
+
+                /* Try to re-create old instance. */
+                if (setup_tc_grd(&limiter.stable_instance)) {
+                    printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
+                    printlog(LOG_CRITICAL, "Giving up...\n");
+                    flushlog();
+                    exit(EXIT_FAILURE);
+                }
+            }
+        break;
 
-        pthread_rwlock_unlock(&limiter.limiter_lock);
-        return;
+        default:
+            /* Should be impossible. */
+            printf("Pigs are flying?\n");
+            exit(EXIT_FAILURE);
     }
 
     /* Switch over new to stable instance. */
@@ -1350,8 +1534,7 @@ static void _drl_reg_op(void)
      * see estimate.c
      */
     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
-        ulogd_log(ULOGD_ERROR, "couldn't start estimate thread for 0x%x %s\n",limiter.localaddr,
-                limiter.ip);
+        printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
         exit(EXIT_FAILURE);
     }