Added a preprocessor option to limit exempt traffic and turned in on for now.
[distributedratelimiting.git] / drl / ulogd_DRL.c
index ef2a17b..14de4fe 100644 (file)
@@ -229,6 +229,7 @@ uint32_t local_ip = 0;
 limiter_t limiter;
 extern FILE *logfile;
 extern uint8_t system_loglevel;
+extern uint8_t do_enforcement;
 
 /* functions */
 
@@ -520,7 +521,7 @@ static identity_t *new_identity(ident_config *config) {
     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
 
     ident->id = config->id;
-    ident->limit = (uint32_t) (((double) config->limit * 1000000.0) / 8.0);
+    ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
     ident->fixed_ewma_weight = config->fixed_ewma_weight;
     ident->intervals = config->intervals;
     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
@@ -896,18 +897,20 @@ static void print_instance(drl_instance_t *instance) {
     leaf_t *leaf = NULL;
     identity_t *ident = NULL;
 
-    map_reset_iterate(instance->leaf_map);
-    while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
-        printf("%x:", leaf->xid);
-        ident = leaf->parent;
-        while (ident) {
-            printf("%d:",ident->id);
-            ident = ident->parent;
+    if (system_loglevel == LOG_DEBUG) {
+        map_reset_iterate(instance->leaf_map);
+        while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
+            printf("%x:", leaf->xid);
+            ident = leaf->parent;
+            while (ident) {
+                printf("%d:",ident->id);
+                ident = ident->parent;
+            }
+            printf("Leaf's parent pointer is %p\n", leaf->parent);
         }
-        printf("Leaf's parent pointer is %p\n", leaf->parent);
-    }
 
-    printf("instance->last_machine is %p\n", instance->last_machine);
+        printf("instance->last_machine is %p\n", instance->last_machine);
+    }
 }
 
 static int assign_htb_hierarchy(drl_instance_t *instance) {
@@ -922,7 +925,7 @@ static int assign_htb_hierarchy(drl_instance_t *instance) {
         } else {
             /* Pointerific! */
             instance->machines[i]->htb_parent =
-                        instance->machines[i]->parent->htb_node;
+                instance->machines[i]->parent->htb_node;
         }
 
         instance->machines[i]->htb_node = next_node;
@@ -987,15 +990,6 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
     }
     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
-    /* Add back 1:20. */
-    sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
-
-    if (execute_cmd(cmd)) {
-        return 1;
-    }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-
-
     /* Add machines. */
     for (i = 0; i < instance->machine_count; ++i) {
         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
@@ -1009,6 +1003,25 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
         printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
     }
 
+#define LIMITEXEMPT
+
+    /* Add back 1:20. */
+#ifdef LIMITEXEMPT
+    if (instance->last_machine == NULL) {
+        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
+    } else {
+        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
+            instance->last_machine->htb_node);
+    }
+#else
+    sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
+#endif
+
+    if (execute_cmd(cmd)) {
+        return 1;
+    }
+    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+
     /* Add sets. */
     for (j = (instance->set_count - 1); j >= 0; --j) {
         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
@@ -1120,8 +1133,7 @@ static int setup_tc_grd(drl_instance_t *instance) {
                 instance->leaves[i].xid, instance->leaves[i].xid);
 
         if (execute_cmd(cmd)) {
-            //FIXME: remove this print and do a log.
-            printf("GRD: pfifo qdisc wasn't there!\n");
+            printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
         }
 
         /* Add the netem qdisc. */
@@ -1142,8 +1154,7 @@ static int setup_tc_grd(drl_instance_t *instance) {
     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
 
     if (execute_cmd(cmd)) {
-        //FIXME: remove this print and do a log.
-        printf("GRD: pfifo qdisc wasn't there!\n");
+        printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
     }
 
     /* Add the netem qdisc. */
@@ -1160,8 +1171,7 @@ static int setup_tc_grd(drl_instance_t *instance) {
     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
 
     if (execute_cmd(cmd)) {
-        //FIXME: remove this print and do a log.
-        printf("GRD: pfifo qdisc wasn't there!\n");
+        printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
     }
 
     /* Add the netem qdisc. */
@@ -1269,6 +1279,8 @@ static int init_drl(void) {
 
     if (parse_drl_config(drl_configfile.u.string, &configs)) {
         /* Parse error occured. Return non-zero to notify init_drl(). */
+        printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
+            drl_configfile.u.string);
         return false;
     }
 
@@ -1276,11 +1288,14 @@ static int init_drl(void) {
     if (validate_configs(configs, &limiter.stable_instance)) {
         /* Clean up everything. */
         free_failed_config(configs, &limiter.stable_instance);
+        printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
+            drl_configfile.u.string);
         return false;
     }
 
     if (init_identities(configs, &limiter.stable_instance)) {
         free_failed_config(configs, &limiter.stable_instance);
+        printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
         return false;
     }
 
@@ -1295,11 +1310,13 @@ static int init_drl(void) {
         case POLICY_FPS:
             if (assign_htb_hierarchy(&limiter.stable_instance)) {
                 free_instance(&limiter.stable_instance);
+                printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
                 return false;
             }
 
             if (create_htb_hierarchy(&limiter.stable_instance)) {
                 free_instance(&limiter.stable_instance);
+                printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
                 return false;
             }
         break;
@@ -1307,6 +1324,7 @@ static int init_drl(void) {
         case POLICY_GRD:
             if (setup_tc_grd(&limiter.stable_instance)) {
                 free_instance(&limiter.stable_instance);
+                printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
                 return false;
             }
         break;
@@ -1459,6 +1477,33 @@ static void time_reconfig(int iterations) {
     // Seems to take about 85ms / iteration
 }
 
+static int stop_enforcement(drl_instance_t *instance) {
+    char cmd[300];
+    int i;
+
+    for (i = 0; i < instance->machine_count; ++i) {
+        sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
+                instance->machines[i]->htb_parent,
+                instance->machines[i]->htb_node);
+
+        if (execute_cmd(cmd)) {
+            return 1;
+        }
+    }
+
+    for (i = 0; i < instance->set_count; ++i) {
+        sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
+                instance->sets[i]->htb_parent,
+                instance->sets[i]->htb_node);
+
+        if (execute_cmd(cmd)) {
+            return 1;
+        }
+    }
+
+    return 0;
+}
+
 static void *signal_thread_func(void *args) {
     int sig;
     int err;
@@ -1466,11 +1511,13 @@ static void *signal_thread_func(void *args) {
 
     sigemptyset(&sigs);
     sigaddset(&sigs, SIGHUP);
+    sigaddset(&sigs, SIGUSR1);
     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
 
     while (1) {
         sigemptyset(&sigs);
         sigaddset(&sigs, SIGHUP);
+        sigaddset(&sigs, SIGUSR1);
 
         err = sigwait(&sigs, &sig);
 
@@ -1483,11 +1530,23 @@ static void *signal_thread_func(void *args) {
             case SIGHUP:
                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
                 reconfig();
-                //time_reconfig(1000); //instrumentation
+                //time_reconfig(1000); /* instrumentation */
                 flushlog();
                 break;
+            case SIGUSR1:
+                pthread_rwlock_wrlock(&limiter.limiter_lock);
+                if (do_enforcement) {
+                    do_enforcement = 0;
+                    stop_enforcement(&limiter.stable_instance);
+                    printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
+                } else {
+                    do_enforcement = 1;
+                    printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
+                }
+                pthread_rwlock_unlock(&limiter.limiter_lock);
+                break;
             default:
-                /* Should be impossible... */
+                /* Intentionally blank. */
                 break;
         }
     }
@@ -1502,6 +1561,7 @@ static void _drl_reg_op(void)
 
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
+    sigaddset(&signal_mask, SIGUSR1);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
@@ -1525,8 +1585,7 @@ static void _drl_reg_op(void)
      * see estimate.c
      */
     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
-        ulogd_log(ULOGD_ERROR, "couldn't start estimate thread for 0x%x %s\n",limiter.localaddr,
-                limiter.ip);
+        printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
         exit(EXIT_FAILURE);
     }