X-Git-Url: http://git.onelab.eu/?p=distributedratelimiting.git;a=blobdiff_plain;f=drl%2Fulogd_DRL.c;h=f7b027256901326322647c007087b9982b726cb8;hp=79cb4ef43f746a3f63e865692f136c57786d92ff;hb=89df43dd6b8cb8df82cfbf395b923014d2826b5a;hpb=d12ab8f1cd4ff135d692f7841360af70f0beb57b diff --git a/drl/ulogd_DRL.c b/drl/ulogd_DRL.c index 79cb4ef..f7b0272 100644 --- a/drl/ulogd_DRL.c +++ b/drl/ulogd_DRL.c @@ -124,16 +124,40 @@ * Add the config options for DRL. */ -static config_entry_t partition = { +static config_entry_t create_htb = { .next = NULL, + .key = "create_htb", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 1 }, +}; + +static config_entry_t enforce_on = { + .next = &create_htb, + .key = "enforce_on", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u = { .value = 1 }, +}; + +static config_entry_t partition = { + .next = &enforce_on, .key = "partition_set", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 0xfffffff }, }; -static config_entry_t netem_slice = { +static config_entry_t sfq_slice = { .next = &partition, + .key = "sfq_slice", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + .u = { .string = "NONE" }, +}; + +static config_entry_t netem_slice = { + .next = &sfq_slice, .key = "netem_slice", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, @@ -263,9 +287,9 @@ extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; -/* From peer_comm.c - used to simulate partition. */ -extern int do_partition; -extern int partition_set; +/* Used to simulate partitions. */ +int do_partition = 0; +int partition_set = 0xfffffff; /* functions */ @@ -681,9 +705,33 @@ static identity_t *new_identity(ident_config *config) { ident->comm.remote_nodes = comm_nodes; + if (!create_htb.u.value) { + ident->htb_node = config->htb_node; + ident->htb_parent = config->htb_parent; + } + return ident; } +static int validate_htb_exists(int node, int parent) { + FILE *pipe = popen("/sbin/tc class show dev eth0", "r"); + char line[200]; + + while (fgets(line, 200, pipe) != NULL) { + int n, p; + char ignore[200]; + + sscanf(line, "class htb 1:%x parent 1:%x prio %s", &n, &p, ignore); + if (n == node && p == parent) { + pclose(pipe); + return 0; + } + } + + pclose(pipe); + return 1; +} + /* Determines the validity of the parameters of one ident_config. * * 0 valid @@ -720,6 +768,22 @@ static int validate_config(ident_config *config) { return 1; } + if (!create_htb.u.value) { + if (config->htb_node < 0 || config->htb_parent < 0) { + printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n"); + return 1; + } + + if (validate_htb_exists(config->htb_node, config->htb_parent)) { + printlog(LOG_CRITICAL, "Identity specified htb node %x with parent %x. No such node/parent combo seems to exist!\n", config->htb_node, config->htb_parent); + return 1; + } + } else { + if (config->htb_node > -1 || config->htb_parent > -1) { + printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n"); + } + } + /* Note: Parsing stage requires that each ident has at least one peer. */ return 0; } @@ -1064,6 +1128,14 @@ static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; int next_node = 0x100; + /* If we're not going to create our own htb hierarchy (for instance, + * if we're going to let PL's node manager do it for us), then we don't + * want this function to do anything. */ + if (!create_htb.u.value) { + printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); + return 0; + } + /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { if (instance->machines[i]->parent == NULL) { @@ -1133,11 +1205,36 @@ static inline int add_htb_netem(const char *iface, const uint32_t parent_major, return execute_cmd(cmd); } +static inline int add_htb_sfq(const char *iface, const uint32_t parent_major, + const uint32_t parent_minor, const uint32_t handle, + const int perturb) { + char cmd[300]; + + sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, + parent_minor, handle); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + if (execute_cmd(cmd)) + printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n"); + + sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d", + iface, parent_major, parent_minor, handle, perturb); + printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); + return execute_cmd(cmd); +} + static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; uint64_t gigabit = 1024 * 1024 * 1024; + /* If we're not going to create our own htb hierarchy (for instance, + * if we're going to let PL's node manager do it for us), then we don't + * want this function to do anything. */ + if (!create_htb.u.value) { + printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); + return 0; + } + /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); execute_cmd(cmd); @@ -1265,11 +1362,35 @@ static int create_htb_hierarchy(drl_instance_t *instance) { } } + /* Turn on SFQ for experimentation. */ + if (strcmp(sfq_slice.u.string, "NONE")) { + if (!strcmp(sfq_slice.u.string, "ALL")) { + if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30)) + return 1; + if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30)) + return 1; + + for (k = 0; k < instance->leaf_count; ++k) { + if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid), + (0x1000 | instance->leaves[k].xid), 30)) { + return 1; + } + } + } else { + uint32_t slice_xid; + + sscanf(sfq_slice.u.string, "%x", &slice_xid); + + if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30)) + return 1; + } + } + return 0; } static int setup_tc_grd(drl_instance_t *instance) { - int i; + int i, j; char cmd[300]; for (i = 0; i < instance->leaf_count; ++i) { @@ -1282,15 +1403,11 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", - instance->leaves[i].xid, instance->leaves[i].xid); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", instance->leaves[i].xid, instance->leaves[i].xid); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } @@ -1303,13 +1420,10 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } @@ -1320,16 +1434,65 @@ static int setup_tc_grd(drl_instance_t *instance) { } /* Add the netem qdisc. */ -#ifdef DELAY40MS - sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); -#else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); -#endif if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } + /* Artifical delay or loss for experimentation. */ + if (netem_delay.u.value || netem_loss.u.value) { + if (!strcmp(netem_slice.u.string, "ALL")) { + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + + for (j = 0; j < instance->leaf_count; ++j) { + leaf_t *current = &instance->leaves[j]; + + current->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } else { + uint32_t slice_xid; + leaf_t *leaf = NULL; + + sscanf(netem_slice.u.string, "%x", &slice_xid); + + leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid)); + + if (leaf == NULL) { + /* Leaf not found - invalid selection. */ + printf("Your experimental setup is incorrect...\n"); + return 1; + } + + leaf->delay = netem_delay.u.value; + + sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value); + + if (execute_cmd(cmd)) { + printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); + return 1; + } + } + } + return 0; } @@ -1363,10 +1526,10 @@ static int init_drl(void) { pthread_rwlock_init(&limiter.limiter_lock,NULL); /* determine our local IP by iterating through interfaces */ - if ((limiter.ip = get_local_ip())==0) { - printlog(LOG_CRITICAL, - "ulogd_DRL unable to aquire local IP address, not registering.\n"); - return (false); + limiter.ip = get_local_ip(); + if (limiter.ip == NULL) { + printlog(LOG_CRITICAL, "ulogd_DRL unable to aquire local IP address, not registering.\n"); + return false; } limiter.localaddr = inet_addr(limiter.ip); limiter.port = htons(LIMITER_LISTEN_PORT); @@ -1596,32 +1759,6 @@ static void reconfig() { pthread_rwlock_unlock(&limiter.limiter_lock); } -static ulog_output_t drl_op = { - .name = "drl", - .output = &_output_drl, - .signal = NULL, /* This appears to be broken. Using my own handler. */ - .init = NULL, - .fini = NULL, -}; - -/* Tests the amount of time it takes to call reconfig(). */ -static void time_reconfig(int iterations) { - struct timeval start, end; - int i; - - gettimeofday(&start, NULL); - for (i = 0; i < iterations; ++i) { - reconfig(); - } - gettimeofday(&end, NULL); - - printf("%d reconfigs() took %d seconds and %d microseconds.\n", - iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); - exit(0); - - // Seems to take about 85ms / iteration -} - static int stop_enforcement(drl_instance_t *instance) { char cmd[300]; int i; @@ -1658,27 +1795,34 @@ static void *signal_thread_func(void *args) { sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); sigaddset(&sigs, SIGUSR2); + sigaddset(&sigs, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); - sigaddset(&sigs, SIGHUP); + //sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); sigaddset(&sigs, SIGUSR2); + sigaddset(&sigs, SIGRTMAX); err = sigwait(&sigs, &sig); if (err) { printlog(LOG_CRITICAL, "sigwait() returned an error.\n"); flushlog(); + continue; + } + + if (sig == SIGRTMAX) { + printf("Caught SIGRTMAX - toggling fake partitions.\n"); + do_partition = !do_partition; + continue; } switch (sig) { case SIGHUP: - printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n"); - reconfig(); - //time_reconfig(1000); /* instrumentation */ - flushlog(); + printlog(LOG_CRITICAL, "Caught SIGHUP in signal_thread_func?!?\n"); + printf("Caught SIGHUP in signal_thread_func?!?\n"); break; case SIGUSR1: pthread_rwlock_wrlock(&limiter.limiter_lock); @@ -1693,26 +1837,26 @@ static void *signal_thread_func(void *args) { pthread_rwlock_unlock(&limiter.limiter_lock); break; case SIGUSR2: - do_partition = !do_partition; + printlog(LOG_WARN, "Caught SIGUSR2 - re-reading XML file.\n"); + printf("Caught SIGUSR2 - re-reading XML file.\n"); + reconfig(); + flushlog(); break; default: /* Intentionally blank. */ break; } } - } -/* register output plugin with ulogd */ -static void _drl_reg_op(void) -{ - ulog_output_t *op = &drl_op; +static int drl_plugin_init() { sigset_t signal_mask; sigemptyset(&signal_mask); - sigaddset(&signal_mask, SIGHUP); + //sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); sigaddset(&signal_mask, SIGUSR2); + sigaddset(&signal_mask, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) { @@ -1729,8 +1873,6 @@ static void _drl_reg_op(void) exit(EXIT_FAILURE); } - register_output(op); - /* start up the thread that will periodically estimate the * local rate and set the local limits * see estimate.c @@ -1740,6 +1882,62 @@ static void _drl_reg_op(void) fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); exit(EXIT_FAILURE); } + + if (enforce_on.u.value) { + pthread_rwlock_wrlock(&limiter.limiter_lock); + do_enforcement = 1; + printlog(LOG_CRITICAL, "--Switching enforcement on.--\n"); + pthread_rwlock_unlock(&limiter.limiter_lock); + } + + return 0; +} + +static void drl_signal(int sig) { + if (sig == SIGHUP) { + printf("Caught SIGHUP - reopening DRL log file.\n"); + + fclose(logfile); + logfile = fopen(drl_logfile.u.string, "a"); + printlog(LOG_CRITICAL, "Reopened logfile.\n"); + } else { + printlog(LOG_WARN, "Caught unexpected signal %d in drl_signal.\n", sig); + } +} + +static ulog_output_t drl_op = { + .name = "drl", + .output = &_output_drl, + .signal = &drl_signal, + .init = &drl_plugin_init, + .fini = NULL, +}; + +#if 0 +/* Tests the amount of time it takes to call reconfig(). */ +static void time_reconfig(int iterations) { + struct timeval start, end; + int i; + + gettimeofday(&start, NULL); + for (i = 0; i < iterations; ++i) { + reconfig(); + } + gettimeofday(&end, NULL); + + printf("%d reconfigs() took %d seconds and %d microseconds.\n", + iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); + exit(0); + + // Seems to take about 85ms / iteration +} +#endif + +/* register output plugin with ulogd */ +static void _drl_reg_op(void) +{ + ulog_output_t *op = &drl_op; + register_output(op); } void _init(void)