/* See the DRL-LICENSE file for this file's software license. */ /* * ulogd output target for DRL: GRD and FPS * * Ken Yocum * * Original shell of this code from ulogd_NETFLOW * Like that code, we keep track of per-slice data rates * out of this slice. Thus we are rate limiting particular slices * across multiple boxes, ensuring that their outbound rate does not * exceed some fixed limit. * * Fabric: static mesh * * Enforcer: linux drop percentage. * * This file reads packets from the netlink socket. It updates all * the hashmaps which track how much data has arrived per flow. * It starts two threads for this limiter. * One thread handles periodic estimation. * The other thread handles communication with other limiters. * * * Code map * * ulogd_DRL: attach to netlink socket, accept packets. replaces ratelimit.cc * util.c: generic hashing functions, flow comparisons, sundry items. * gossip.c: Recv gossip, send gossip. * peer_comm.c: Thread to listen for updates from other limiters. * estimate.c: Thread to calculate the local limits. * * * Ken Yocum * 2007 * * Some code appropriated from ulogd_NETFLOW: * * Mark Huang * Copyright (C) 2004-2005 The Trustees of Princeton University * * Based on admindump.pl by Mic Bowman and Paul Brett * Copyright (c) 2002 Intel Corporation * */ /* Enable GNU glibc extensions */ #define _GNU_SOURCE #include #include /* va_start() and friends */ #include /* ispunct() */ #include /* strstr() and friends */ #include /* dirname() and basename() */ #include /* fork() and wait() */ #include #include #include /* errno and assert() */ #include #include /* getopt_long() */ #include /* time() and friends */ #include #include /* inet_aton() */ #include #include #include /* ICMP definitions */ #include #include /* stat() */ #include /* pthread_create() */ #include /* flock() */ #include /* Signal definitions - so that we can catch SIGHUP and update config. */ #include #include #include /* Perhaps useful for files within vservers? */ #if !defined(STANDALONE) && HAVE_LIBPROPER #include #endif /* * Jenkins hash support * lives in raterouter.h */ /* DRL specifics */ #include "raterouter.h" #include "util.h" #include "ratetypes.h" /* needs util and pthread.h */ #include "calendar.h" #include "logging.h" /* * /etc/ulogd.conf configuration options * Add the config options for DRL. */ static config_entry_t leaves = { .next = NULL, .key = "leaves", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "PLANETLAB" }, }; static config_entry_t bind_addr = { .next = &leaves, .key = "bind_addr", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "AUTO" }, }; static config_entry_t create_htb = { .next = &bind_addr, .key = "create_htb", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 1 }, }; static config_entry_t enforce_on = { .next = &create_htb, .key = "enforce_on", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 1 }, }; static config_entry_t partition = { .next = &enforce_on, .key = "partition_set", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 0xfffffff }, }; static config_entry_t sfq_slice = { .next = &partition, .key = "sfq_slice", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "NONE" }, }; static config_entry_t netem_slice = { .next = &sfq_slice, .key = "netem_slice", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "ALL" }, }; static config_entry_t netem_loss = { .next = &netem_slice, .key = "netem_loss", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 0 }, }; static config_entry_t netem_delay = { .next = &netem_loss, .key = "netem_delay", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 0 }, }; static config_entry_t drl_configfile = { .next = &netem_delay, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "drl.xml" }, }; /** The administrative bandwidth limit (mbps) for the local node. The node * will not set a limit higher than this, even when distributed capacity is * available. Set to 0 for no limit. */ static config_entry_t nodelimit = { .next = &drl_configfile, .key = "nodelimit", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = 0 }, }; /** Determines the verbosity of logging. */ static config_entry_t drl_loglevel = { .next = &nodelimit, .key = "drl_loglevel", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = LOG_WARN }, }; /** The path of the logfile. */ static config_entry_t drl_logfile = { .next = &drl_loglevel, .key = "drl_logfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "drl_logfile.log" }, }; /** The choice of DRL protocol. */ static config_entry_t policy = { .next = &drl_logfile, .key = "policy", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "FPS" }, }; /** The estimate interval, in milliseconds. */ static config_entry_t estintms = { .next = &policy, .key = "estintms", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = 500 }, }; #define config_entries (&estintms) /* * Debug functionality */ #ifdef DMALLOC #include #endif #define NIPQUAD(addr) \ ((unsigned char *)&addr)[0], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[3] #define IPQUAD(addr) \ ((unsigned char *)&addr)[3], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[0] /* Salt for the hash functions */ static int salt; /* * Hash slice name lookups on context ID. */ /* Special context IDs */ #define UNKNOWN_XID -1 #define ROOT_XID 0 enum { CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */ ICMP_ECHOREPLY_XID, ICMP_UNREACH_XID, }; /* globals */ pthread_t estimate_thread; pthread_t signal_thread; pthread_t comm_thread; uint32_t local_ip = 0; limiter_t limiter; extern FILE *logfile; extern uint8_t system_loglevel; extern uint8_t do_enforcement; /* Used to simulate partitions. */ int do_partition = 0; int partition_set = 0xfffffff; /* functions */ static inline uint32_t hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1); } uint32_t sampled_hasher(const key_flow *key) { /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */ return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX); } uint32_t standard_hasher(const key_flow *key) { return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE); } uint32_t multiple_hasher(const key_flow *key) { return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE); } struct intr_id { char* name; ulog_iret_t *res; }; /* Interesting keys */ enum { OOB_TIME_SEC = 0, OOB_MARK, IP_SADDR, IP_DADDR, IP_TOTLEN, IP_PROTOCOL, TCP_SPORT, TCP_DPORT, TCP_ACK, TCP_FIN, TCP_SYN, TCP_RST, UDP_SPORT, UDP_DPORT, ICMP_TYPE, ICMP_CODE, GRE_FLAG_KEY, GRE_VERSION, GRE_KEY, PPTP_CALLID, }; #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0])) static struct intr_id intr_ids[] = { [OOB_TIME_SEC] = { "oob.time.sec", 0 }, [OOB_MARK] = { "oob.mark", 0 }, [IP_SADDR] = { "ip.saddr", 0 }, [IP_DADDR] = { "ip.daddr", 0 }, [IP_TOTLEN] = { "ip.totlen", 0 }, [IP_PROTOCOL] = { "ip.protocol", 0 }, [TCP_SPORT] = { "tcp.sport", 0 }, [TCP_DPORT] { "tcp.dport", 0 }, [TCP_ACK] = { "tcp.ack", 0 }, [TCP_FIN] = { "tcp.fin", 0 }, [TCP_SYN] = { "tcp.syn", 0 }, [TCP_RST] = { "tcp.rst", 0 }, [UDP_SPORT] = { "udp.sport", 0 }, [UDP_DPORT] = { "udp.dport", 0 }, [ICMP_TYPE] = { "icmp.type", 0 }, [ICMP_CODE] = { "icmp.code", 0 }, [GRE_FLAG_KEY] = { "gre.flag.key", 0 }, [GRE_VERSION] = { "gre.version", 0 }, [GRE_KEY] = { "gre.key", 0 }, [PPTP_CALLID] = { "pptp.callid", 0 }, }; #define GET_VALUE(x) intr_ids[x].res->value #define DATE(t) ((t) / (24*60*60) * (24*60*60)) static int _output_drl(ulog_iret_t *res) { int xid; uint32_t src_ip, dst_ip; uint16_t src_port, dst_port; uint8_t protocol; key_flow key; identity_t *ident; leaf_t *leaf; protocol = GET_VALUE(IP_PROTOCOL).ui8; src_ip = GET_VALUE(IP_SADDR).ui32; dst_ip = GET_VALUE(IP_DADDR).ui32; xid = GET_VALUE(OOB_MARK).ui32; switch (protocol) { case IPPROTO_TCP: src_port = GET_VALUE(TCP_SPORT).ui16; dst_port = GET_VALUE(TCP_DPORT).ui16; break; case IPPROTO_UDP: /* netflow had an issue with many udp flows and set * src_port=0 to handle it. We don't. */ src_port = GET_VALUE(UDP_SPORT).ui16; /* * traceroutes create a large number of flows in the db * this is a quick hack to catch the most common form * of traceroute (basically we're mapping any UDP packet * in the 33435-33524 range to the "trace" port, 33524 is * 3 packets * nhops (30). */ dst_port = GET_VALUE(UDP_DPORT).ui16; if (dst_port >= 33435 && dst_port <= 33524) dst_port = 33435; break; case IPPROTO_ICMP: src_port = GET_VALUE(ICMP_TYPE).ui8; dst_port = GET_VALUE(ICMP_CODE).ui8; /* * We special case some of the ICMP traffic that the kernel * always generates. Since this is attributed to root, it * creates significant "noise" in the output. We want to be * able to quickly see that root is generating traffic. */ if (xid == ROOT_XID) { if (src_port == ICMP_ECHOREPLY) xid = ICMP_ECHOREPLY_XID; else if (src_port == ICMP_UNREACH) xid = ICMP_UNREACH_XID; } break; case IPPROTO_GRE: if (GET_VALUE(GRE_FLAG_KEY).b) { if (GET_VALUE(GRE_VERSION).ui8 == 1) { /* Get PPTP call ID */ src_port = GET_VALUE(PPTP_CALLID).ui16; } else { /* XXX Truncate GRE keys to 16 bits */ src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32; } } else { /* No key available */ src_port = 0; } dst_port = 0; break; default: /* This is the default key for packets from unsupported protocols */ src_port = 0; dst_port = 0; break; } key.protocol = protocol; key.source_ip = src_ip; key.dest_ip = dst_ip; key.source_port = src_port; key.dest_port = dst_port; key.packet_size = GET_VALUE(IP_TOTLEN).ui16; key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32; pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */ leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid)); /* Even if the packet doesn't match any specific xid, it should still * count in the machine-type tables. This catches root (xid == 0) and * unclassified (xid = fff) packets, which don't have map entries. */ if (leaf == NULL) { ident = limiter.stable_instance.last_machine; } else { ident = leaf->parent; } while (ident) { pthread_mutex_lock(&ident->table_mutex); /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); #ifdef SHADOW_ACCTING /* Update the shadow perfect copy of the accounting table. */ standard_table_sample((standard_flow_table) ident->shadow_table, &key); #endif pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; } pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */ return 0; } /* get all key id's for the keys we are intrested in */ static int get_ids(void) { int i; struct intr_id *cur_id; for (i = 0; i < INTR_IDS; i++) { cur_id = &intr_ids[i]; cur_id->res = keyh_getres(keyh_getid(cur_id->name)); if (!cur_id->res) { ulogd_log(ULOGD_ERROR, "Cannot resolve keyhash id for %s\n", cur_id->name); return 1; } } return 0; } static void free_identity(identity_t *ident) { if (ident) { free_comm(&ident->comm); if (ident->table) { ident->table_destroy_function(ident->table); } if (ident->loop_action) { ident->loop_action->valid = 0; } if (ident->comm_action) { ident->comm_action->valid = 0; } pthread_mutex_destroy(&ident->table_mutex); free(ident); } } static void free_identity_map(map_handle map) { identity_t *tofree = NULL; map_reset_iterate(map); while ((tofree = (identity_t *) map_next(map))) { free_identity(tofree); } free_map(map, 0); } static void free_instance(drl_instance_t *instance) { if (instance->leaves) free(instance->leaves); if (instance->leaf_map) free_map(instance->leaf_map, 0); if (instance->ident_map) free_identity_map(instance->ident_map); if (instance->machines) free(instance->machines); if (instance->sets) free(instance->sets); /* FIXME: Drain the calendar first and free all the entries. */ if (instance->cal) { free(instance->cal); } memset(instance, 0, sizeof(drl_instance_t)); } static void free_failed_config(parsed_configs configs, drl_instance_t *instance) { /* Free configs. */ if (configs.machines) free_ident_list(configs.machines); if (configs.sets) free_ident_list(configs.sets); /* Free instance. */ if (instance) free_instance(instance); } static identity_t *new_identity(ident_config *config) { identity_t *ident = malloc(sizeof(identity_t)); remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count); ident_peer *peer = config->peers; int peer_slot = 0; if (ident == NULL) { return NULL; } if (comm_nodes == NULL) { free(ident); return NULL; } memset(ident, 0, sizeof(identity_t)); memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t)); ident->id = config->id; ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; ident->communication_intervals = config->communication_intervals; ident->mainloop_intervals = config->mainloop_intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, (limiter.estintms/1000.0) * config->mainloop_intervals); ident->parent = NULL; ident->independent = config->independent; pthread_mutex_init(&ident->table_mutex, NULL); switch (config->accounting) { case ACT_STANDARD: ident->table = standard_table_create(standard_hasher, &ident->common); /* Ugly function pointer casting. Makes things sufficiently * generic, though. */ ident->table_sample_function = (int (*)(void *, const key_flow *)) standard_table_sample; ident->table_cleanup_function = (int (*)(void *)) standard_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) standard_table_update_flows; ident->table_destroy_function = (void (*)(void *)) standard_table_destroy; break; case ACT_MULTIPLE: ident->table = multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) multiple_table_sample; ident->table_cleanup_function = (int (*)(void *)) multiple_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) multiple_table_update_flows; ident->table_destroy_function = (void (*)(void *)) multiple_table_destroy; break; case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; ident->table_cleanup_function = (int (*)(void *)) sampled_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) sampled_table_update_flows; ident->table_destroy_function = (void (*)(void *)) sampled_table_destroy; break; case ACT_SIMPLE: ident->table = simple_table_create(&ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) simple_table_sample; ident->table_cleanup_function = (int (*)(void *)) simple_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) simple_table_update_flows; ident->table_destroy_function = (void (*)(void *)) simple_table_destroy; break; } #ifdef SHADOW_ACCTING ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common); if (ident->shadow_table == NULL) { ident->table_destroy_function(ident->table); free(ident); return NULL; } #endif /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); return NULL; } while (peer) { comm_nodes[peer_slot].addr = peer->ip; comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT); peer = peer->next; peer_slot += 1; } if (new_comm(&ident->comm, config, comm_nodes)) { printlog(LOG_CRITICAL, "Failed to create communication structure.\n"); return NULL; } ident->comm.remote_nodes = comm_nodes; if (!create_htb.u.value) { ident->htb_node = config->htb_node; ident->htb_parent = config->htb_parent; } return ident; } static int validate_htb_exists(int node, int parent) { FILE *pipe = popen("/sbin/tc class show dev eth0", "r"); char line[200]; if (parent != 0) { while (fgets(line, 200, pipe) != NULL) { int n, p; char ignore[200]; sscanf(line, "class htb 1:%x parent 1:%x prio %s", &n, &p, ignore); if (n == node && p == parent) { pclose(pipe); return 0; } } } else { while (fgets(line, 200, pipe) != NULL) { int n, p; char ignore[200]; sscanf(line, "class htb 1:%x root prio %d %s", &n, &p, ignore); if (n == node && strstr(line, "root") != NULL) { pclose(pipe); return 0; } } } pclose(pipe); return 1; } /* Determines the validity of the parameters of one ident_config. * * 0 valid * 1 invalid */ static int validate_config(ident_config *config) { /* Limit must be a positive integer. */ if (config->limit < 1) { return 1; } /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */ if (config->commfabric != COMM_MESH && config->commfabric != COMM_GOSSIP) { return 1; } /* If commfabric is COMM_GOSSIP, this must be a positive integer. */ if (config->commfabric == COMM_GOSSIP && config->branch < 1) { return 1; } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, * ACT_SIMPLE, ACT_MULTIPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && config->accounting != ACT_SIMPLE && config->accounting != ACT_MULTIPLE) { return 1; } /* Ewma weight must be greater than or equal to zero. */ if (config->fixed_ewma_weight < 0) { return 1; } if (!create_htb.u.value) { if (config->htb_node < 0 || config->htb_parent < 0) { printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n"); return 1; } if (validate_htb_exists(config->htb_node, config->htb_parent)) { printlog(LOG_CRITICAL, "Identity specified htb node %x with parent %x. No such node/parent combo seems to exist!\n", config->htb_node, config->htb_parent); return 1; } } else { if (config->htb_node > -1 || config->htb_parent > -1) { printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n"); } } /* Note: Parsing stage requires that each ident has at least one peer. */ return 0; } /* 0 success * non-zero failure */ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { ident_config *mlist = configs.machines; ident_config *slist = configs.sets; ident_config *tmp = NULL; int i = 0; while (mlist) { /* ID must be non-zero and unique. */ /* This is ugly and hackish, but this function will be called rarely. * I'm tired of trying to be clever. */ if (mlist->id < 0) { printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id); return EINVAL; } tmp = mlist->next; while (tmp) { if (mlist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } tmp = tmp->next; } tmp = configs.sets; while (tmp) { if (mlist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } tmp = tmp->next; } if (validate_config(mlist)) { printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } if (mlist->independent) { printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n"); mlist->independent = 0; } mlist = mlist->next; } instance->sets = malloc(configs.set_count * sizeof(identity_t *)); if (instance->sets == NULL) { printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n"); return ENOMEM; } memset(instance->sets, 0, configs.set_count * sizeof(identity_t *)); instance->set_count = configs.set_count; /* For sets, make sure that the hierarchy is valid. */ while (slist) { ident_member *members = slist->members; /* ID must be non-zero and unique. */ if (slist->id < 0) { printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id); return EINVAL; } tmp = slist->next; while (tmp) { if (slist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id); return EINVAL; } tmp = tmp->next; } if (validate_config(slist)) { printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id); return EINVAL; } /* Allocate an identity_t for this set-type identity. */ instance->sets[i] = new_identity(slist); if (instance->sets[i] == NULL) { printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n"); return ENOMEM; } /* Loop through children and look up each in leaf or ident map * depending on the type of child. Set the child's parent pointer * to the identity we just created above, unless it is already set, * in which case we have an error. */ while (members) { identity_t *child_ident = NULL; leaf_t *child_leaf = NULL; switch (members->type) { case MEMBER_XID: child_leaf = map_search(instance->leaf_map, &members->value, sizeof(members->value)); if (child_leaf == NULL) { printlog(LOG_CRITICAL, "xid: child leaf not found.\n"); return EINVAL; } if (child_leaf->parent != NULL) { /* Error - This leaf already has a parent. */ printlog(LOG_CRITICAL, "xid: child already has a parent.\n"); return EINVAL; } child_leaf->parent = instance->sets[i]; break; case MEMBER_GUID: child_ident = map_search(instance->ident_map, &members->value, sizeof(members->value)); if (child_ident == NULL) { printlog(LOG_CRITICAL, "guid: child identity not found.\n"); return EINVAL; } if (child_ident->parent != NULL) { /* Error - This identity already has a parent. */ printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n"); return EINVAL; } child_ident->parent = instance->sets[i]; break; default: /* Error - shouldn't be possible. */ break; } members = members->next; } map_insert(instance->ident_map, &instance->sets[i]->id, sizeof(instance->sets[i]->id), instance->sets[i]); slist = slist->next; i++; } return 0; } static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) { int count = 0; identity_t *current_ident; leaf_t *current_leaf; leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); if (leaves == NULL) { return 1; } map_reset_iterate(instance->leaf_map); while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) { current_ident = current_leaf->parent; while (current_ident != NULL && current_ident != instance->last_machine) { if (current_ident == ident) { /* Found the ident we were looking for - add the leaf. */ leaves[count] = current_leaf; count += 1; break; } current_ident = current_ident->parent; } } ident->leaves = leaves; ident->leaf_count = count; return 0; } static int init_identities(parsed_configs configs, drl_instance_t *instance) { int i, j; ident_config *config = configs.machines; leaf_t *leaf = NULL; instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN); if (instance->cal == NULL) { return ENOMEM; } for (i = 0; i < SCHEDLEN; ++i) { TAILQ_INIT(instance->cal + i); } instance->cal_slot = 0; instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *)); if (instance->machines == NULL) { return ENOMEM; } memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *)); instance->machine_count = configs.machine_count; /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { identity_action *loop_action; identity_action *comm_action; instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } loop_action = malloc(sizeof(identity_action)); comm_action = malloc(sizeof(identity_action)); if (loop_action == NULL || comm_action == NULL) { return ENOMEM; } /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { instance->machines[i]->parent = NULL; } else { instance->machines[i]->parent = instance->machines[i - 1]; } instance->last_machine = instance->machines[i]; /* Add the ident to the guid->ident map. */ map_insert(instance->ident_map, &instance->machines[i]->id, sizeof(instance->machines[i]->id), instance->machines[i]); config = config->next; memset(loop_action, 0, sizeof(identity_action)); memset(comm_action, 0, sizeof(identity_action)); loop_action->ident = instance->machines[i]; loop_action->action = ACTION_MAINLOOP; loop_action->valid = 1; comm_action->ident = instance->machines[i]; comm_action->action = ACTION_COMMUNICATE; comm_action->valid = 1; instance->machines[i]->loop_action = loop_action; instance->machines[i]->comm_action = comm_action; TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), loop_action, calendar); TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), comm_action, calendar); /* Setup the array of pointers to leaves. This is easy for machines * because a machine node applies to every leaf. */ instance->machines[i]->leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); if (instance->machines[i]->leaves == NULL) { return ENOMEM; } instance->machines[i]->leaf_count = instance->leaf_count; for (j = 0; j < instance->leaf_count; ++j) { instance->machines[i]->leaves[j] = &instance->leaves[j]; } } /* Connect the set subtree to the machines. Any set or leaf without a * parent will take the last machine as its parent. */ /* Leaves... */ map_reset_iterate(instance->leaf_map); while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { if (leaf->parent == NULL) { leaf->parent = instance->last_machine; } } /* Sets... */ for (i = 0; i < instance->set_count; ++i) { identity_action *loop_action; identity_action *comm_action; if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) { instance->sets[i]->parent = instance->last_machine; } loop_action = malloc(sizeof(identity_action)); comm_action = malloc(sizeof(identity_action)); if (loop_action == NULL || comm_action == NULL) { return ENOMEM; } memset(loop_action, 0, sizeof(identity_action)); memset(comm_action, 0, sizeof(identity_action)); loop_action->ident = instance->sets[i]; loop_action->action = ACTION_MAINLOOP; loop_action->valid = 1; comm_action->ident = instance->sets[i]; comm_action->action = ACTION_COMMUNICATE; comm_action->valid = 1; instance->sets[i]->loop_action = loop_action; instance->sets[i]->comm_action = comm_action; TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), loop_action, calendar); TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), comm_action, calendar); /* Setup the array of pointers to leaves. This is harder for sets, * but this doesn't need to be super-efficient because it happens * rarely and it isn't on the critical path for reconfig(). */ if (fill_set_leaf_pointer(instance, instance->sets[i])) { return ENOMEM; } } /* Success. */ return 0; } static void print_instance(drl_instance_t *instance) { leaf_t *leaf = NULL; identity_t *ident = NULL; if (system_loglevel == LOG_DEBUG) { map_reset_iterate(instance->leaf_map); while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { printf("%x:", leaf->xid); ident = leaf->parent; while (ident) { printf("%d:",ident->id); ident = ident->parent; } printf("Leaf's parent pointer is %p\n", leaf->parent); } printf("instance->last_machine is %p\n", instance->last_machine); } } static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; int next_node = 0x100; /* If we're not going to create our own htb hierarchy (for instance, * if we're going to let PL's node manager do it for us), then we don't * want this function to do anything. */ if (!create_htb.u.value) { printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); return 0; } /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { if (instance->machines[i]->parent == NULL) { /* Top node. */ instance->machines[i]->htb_parent = 0x10; } else { /* Pointerific! */ instance->machines[i]->htb_parent = instance->machines[i]->parent->htb_node; } instance->machines[i]->htb_node = next_node; next_node += 1; } next_node += 0x10; /* Add set nodes under machine nodes. Iterate backwards to ensure parent is * already there. */ for (j = (instance->set_count - 1); j >= 0; --j) { if (instance->sets[j]->parent == NULL) { /* Independent node - goes under 0x10 away from machine nodes. */ instance->sets[j]->htb_parent = 0x10; } else { instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node; } instance->sets[j]->htb_node = next_node; next_node += 1; } return 0; } /* Added this so that I could comment one line and kill off all of the * command execution. */ static inline int execute_cmd(const char *cmd) { return system(cmd); } static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, const uint32_t classid_major, const uint32_t classid_minor, const uint64_t rate, const uint64_t ceil) { char cmd[300]; sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit", iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil); printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd); return execute_cmd(cmd); } static inline int add_htb_netem(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, const uint32_t handle, const int loss, const int delay) { char cmd[300]; sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, parent_minor, handle); printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); if (execute_cmd(cmd)) printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n"); sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms", iface, parent_major, parent_minor, handle, loss, delay); printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); return execute_cmd(cmd); } static inline int add_htb_sfq(const char *iface, const uint32_t parent_major, const uint32_t parent_minor, const uint32_t handle, const int perturb) { char cmd[300]; sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major, parent_minor, handle); printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); if (execute_cmd(cmd)) printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n"); sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d", iface, parent_major, parent_minor, handle, perturb); printlog(LOG_WARN, "HTB_cmd: %s\n", cmd); return execute_cmd(cmd); } static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; uint64_t gigabit = 1024 * 1024 * 1024; /* If we're not going to create our own htb hierarchy (for instance, * if we're going to let PL's node manager do it for us), then we don't * want this function to do anything. */ if (!create_htb.u.value) { printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n"); return 0; } /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); execute_cmd(cmd); printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Re-initialize the basics. */ sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit)) return 1; /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/ if (limiter.nodelimit) { if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024)) return 1; } else { if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit)) return 1; } /* Add machines. */ for (i = 0; i < instance->machine_count; ++i) { if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1, instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) { return 1; } } #define LIMITEXEMPT /* Add back 1:20. */ #ifdef LIMITEXEMPT if (instance->last_machine == NULL) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); } else { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit", instance->last_machine->htb_node); } #else sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); #endif if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1, instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) { return 1; } } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) return 1; } else { if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit)) return 1; } /* Add exempt node for the leaf under 1:20 as 1:2 */ if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit)) return 1; } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit)) return 1; } else { if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit)) return 1; } if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit)) return 1; /* Add 1:1fff and 1:2fff */ if (instance->last_machine == NULL) { if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit)) return 1; } else { if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit)) return 1; } if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit)) return 1; /* Artifical delay or loss for experimentation. */ if (netem_delay.u.value || netem_loss.u.value) { if (!strcmp(netem_slice.u.string, "ALL")) { /* By default, netem applies to all leaves. */ if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value)) return 1; if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value)) return 1; for (k = 0; k < instance->leaf_count; ++k) { if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid), (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) { return 1; } //FIXME: add exempt delay/loss here on 0x2000 ... ? } } else { /* netem_slice is not the default ALL value. Only apply netem * to the slice that is set in netem_slice.u.string. */ uint32_t slice_xid; sscanf(netem_slice.u.string, "%x", &slice_xid); if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value)) return 1; } } /* Turn on SFQ for experimentation. */ if (strcmp(sfq_slice.u.string, "NONE")) { if (!strcmp(sfq_slice.u.string, "ALL")) { if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30)) return 1; if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30)) return 1; for (k = 0; k < instance->leaf_count; ++k) { if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid), (0x1000 | instance->leaves[k].xid), 30)) { return 1; } } } else { uint32_t slice_xid; sscanf(sfq_slice.u.string, "%x", &slice_xid); if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30)) return 1; } } return 0; } static int setup_tc_grd(drl_instance_t *instance) { int i, j; char cmd[300]; for (i = 0; i < instance->leaf_count; ++i) { /* Delete the old pfifo qdisc that might have been there before. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo", instance->leaves[i].xid, instance->leaves[i].xid); if (execute_cmd(cmd)) { printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", instance->leaves[i].xid, instance->leaves[i].xid); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } /* Do the same for 1000 and 1fff. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); if (execute_cmd(cmd)) { printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo"); if (execute_cmd(cmd)) { printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } /* Artifical delay or loss for experimentation. */ if (netem_delay.u.value || netem_loss.u.value) { if (!strcmp(netem_slice.u.string, "ALL")) { sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } for (j = 0; j < instance->leaf_count; ++j) { leaf_t *current = &instance->leaves[j]; current->delay = netem_delay.u.value; sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } } else { uint32_t slice_xid; leaf_t *leaf = NULL; sscanf(netem_slice.u.string, "%x", &slice_xid); leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid)); if (leaf == NULL) { /* Leaf not found - invalid selection. */ printf("Your experimental setup is incorrect...\n"); return 1; } leaf->delay = netem_delay.u.value; sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value); if (execute_cmd(cmd)) { printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd); return 1; } } } return 0; } /* init_drl * * Initialize this limiter with options * Open UDP socket for peer communication */ static int init_drl(void) { parsed_configs configs; struct sockaddr_in server_address; memset(&limiter, 0, sizeof(limiter_t)); /* Setup logging. */ system_loglevel = (uint8_t) drl_loglevel.u.value; logfile = fopen(drl_logfile.u.string, "w"); if (logfile == NULL) { printf("Couldn't open logfile - "); perror("fopen()"); exit(EXIT_FAILURE); } printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n"); limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0); init_hashing(); /* for all hash maps */ pthread_rwlock_init(&limiter.limiter_lock,NULL); /* determine our local IP by iterating through interfaces */ if (strncmp(bind_addr.u.string, "AUTO", 4)) { limiter.ip = bind_addr.u.string; } else { limiter.ip = get_local_ip(); if (limiter.ip == NULL) { printlog(LOG_CRITICAL, "ulogd_DRL unable to aquire local IP address, not registering.\n"); return false; } } limiter.localaddr = inet_addr(limiter.ip); limiter.port = htons(LIMITER_LISTEN_PORT); limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); if (limiter.udp_socket < 0) { perror("socket()"); printlog(LOG_CRITICAL, "Failed to create UDP socket().\n"); return false; } memset(&server_address, 0, sizeof(server_address)); server_address.sin_family = AF_INET; server_address.sin_addr.s_addr = limiter.localaddr; server_address.sin_port = limiter.port; if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) { perror("bind()"); printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n"); return false; } printlog(LOG_WARN, " POLICY: %s\n",policy.u.string); if (strcasecmp(policy.u.string,"GRD") == 0) { limiter.policy = POLICY_GRD; } else if (strcasecmp(policy.u.string,"FPS") == 0) { limiter.policy = POLICY_FPS; } else { printlog(LOG_CRITICAL, "Unknown DRL policy %s, aborting.\n",policy.u.string); return (false); } limiter.estintms = estintms.u.value; if (limiter.estintms > 1000) { printlog(LOG_CRITICAL, "DRL: sorry estimate intervals must be less than 1 second."); printlog(LOG_CRITICAL, " Simple source mods will allow larger intervals. Using 1 second.\n"); limiter.estintms = 1000; } printlog(LOG_WARN, " Est interval: %dms\n",limiter.estintms); /* Acquire the big limiter lock for writing. Prevents pretty much * anything else from happening while the hierarchy is being changed. */ pthread_rwlock_wrlock(&limiter.limiter_lock); limiter.stable_instance.ident_map = allocate_map(); if (limiter.stable_instance.ident_map == NULL) { printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n"); return false; } /* If no leaves are specified, assume we're on planetlab and read them out * of /proc/virtual. Otherwise, read the specified line. */ if (!strncmp(leaves.u.string, "PLANETLAB", 9)) { if (get_eligible_leaves(&limiter.stable_instance)) { printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n"); return false; } } else { if (parse_leaves(&limiter.stable_instance, leaves.u.string)) { printlog(LOG_CRITICAL, "Failed to parse leaf string.\n"); return false; } } if (parse_drl_config(drl_configfile.u.string, &configs)) { /* Parse error occured. Return non-zero to notify init_drl(). */ printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n", drl_configfile.u.string); return false; } /* Validate identity hierarchy! */ if (validate_configs(configs, &limiter.stable_instance)) { /* Clean up everything. */ free_failed_config(configs, &limiter.stable_instance); printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n", drl_configfile.u.string); return false; } if (init_identities(configs, &limiter.stable_instance)) { free_failed_config(configs, &limiter.stable_instance); printlog(LOG_CRITICAL, "Failed to initialize identities.\n"); return false; } /* At this point, we should be done with configs. */ free_ident_list(configs.machines); free_ident_list(configs.sets); print_instance(&limiter.stable_instance); switch (limiter.policy) { case POLICY_FPS: if (assign_htb_hierarchy(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n"); return false; } if (create_htb_hierarchy(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n"); return false; } break; case POLICY_GRD: if (setup_tc_grd(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n"); return false; } break; default: return false; } partition_set = partition.u.value; pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n"); return false; } printlog(LOG_WARN, "ulogd_DRL init finished.\n"); return true; } static void reconfig() { parsed_configs configs; printlog(LOG_DEBUG, "--Starting reconfig()--\n"); flushlog(); memset(&configs, 0, sizeof(parsed_configs)); memset(&limiter.new_instance, 0, sizeof(drl_instance_t)); limiter.new_instance.ident_map = allocate_map(); if (limiter.new_instance.ident_map == NULL) { printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n"); return; } if (!strncmp(leaves.u.string, "PLANETLAB", 9)) { if (get_eligible_leaves(&limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n"); return; } } else { if (parse_leaves(&limiter.new_instance, leaves.u.string)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Failed to parse leaf string.\n"); return; } } if (parse_drl_config(drl_configfile.u.string, &configs)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n"); return; } if (validate_configs(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Validation failed during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } if (init_identities(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } free_ident_list(configs.machines); free_ident_list(configs.sets); print_instance(&limiter.new_instance); /* Lock */ pthread_rwlock_wrlock(&limiter.limiter_lock); switch (limiter.policy) { case POLICY_FPS: if (assign_htb_hierarchy(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } if (create_htb_hierarchy(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); /* Re-create old instance. */ if (create_htb_hierarchy(&limiter.stable_instance)) { /* Error reinstating the old one - big problem. */ printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); printlog(LOG_CRITICAL, "Giving up...\n"); flushlog(); exit(EXIT_FAILURE); } pthread_rwlock_unlock(&limiter.limiter_lock); return; } break; case POLICY_GRD: if (setup_tc_grd(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n"); /* Try to re-create old instance. */ if (setup_tc_grd(&limiter.stable_instance)) { printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n"); printlog(LOG_CRITICAL, "Giving up...\n"); flushlog(); exit(EXIT_FAILURE); } } break; default: /* Should be impossible. */ printf("Pigs are flying?\n"); exit(EXIT_FAILURE); } /* Switch over new to stable instance. */ free_instance(&limiter.stable_instance); memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t)); /* Success! - Unlock */ pthread_rwlock_unlock(&limiter.limiter_lock); } static int stop_enforcement(drl_instance_t *instance) { char cmd[300]; int i; for (i = 0; i < instance->machine_count; ++i) { sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit", instance->machines[i]->htb_parent, instance->machines[i]->htb_node); if (execute_cmd(cmd)) { return 1; } } for (i = 0; i < instance->set_count; ++i) { sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit", instance->sets[i]->htb_parent, instance->sets[i]->htb_node); if (execute_cmd(cmd)) { return 1; } } return 0; } static void *signal_thread_func(void *args) { int sig; int err; sigset_t sigs; sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); sigaddset(&sigs, SIGUSR2); sigaddset(&sigs, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); //sigaddset(&sigs, SIGHUP); sigaddset(&sigs, SIGUSR1); sigaddset(&sigs, SIGUSR2); sigaddset(&sigs, SIGRTMAX); err = sigwait(&sigs, &sig); if (err) { printlog(LOG_CRITICAL, "sigwait() returned an error.\n"); flushlog(); continue; } if (sig == SIGRTMAX) { printf("Caught SIGRTMAX - toggling fake partitions.\n"); do_partition = !do_partition; continue; } switch (sig) { case SIGHUP: printlog(LOG_CRITICAL, "Caught SIGHUP in signal_thread_func?!?\n"); printf("Caught SIGHUP in signal_thread_func?!?\n"); break; case SIGUSR1: pthread_rwlock_wrlock(&limiter.limiter_lock); if (do_enforcement) { do_enforcement = 0; stop_enforcement(&limiter.stable_instance); printlog(LOG_CRITICAL, "--Switching enforcement off.--\n"); } else { do_enforcement = 1; printlog(LOG_CRITICAL, "--Switching enforcement on.--\n"); } pthread_rwlock_unlock(&limiter.limiter_lock); break; case SIGUSR2: printlog(LOG_WARN, "Caught SIGUSR2 - re-reading XML file.\n"); printf("Caught SIGUSR2 - re-reading XML file.\n"); reconfig(); flushlog(); break; default: /* Intentionally blank. */ break; } } } static int drl_plugin_init() { sigset_t signal_mask; sigemptyset(&signal_mask); //sigaddset(&signal_mask, SIGHUP); sigaddset(&signal_mask, SIGUSR1); sigaddset(&signal_mask, SIGUSR2); sigaddset(&signal_mask, SIGRTMAX); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) { printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); flushlog(); exit(EXIT_FAILURE); } if (!init_drl()) { printlog(LOG_CRITICAL, "Init failed. :(\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); flushlog(); exit(EXIT_FAILURE); } /* start up the thread that will periodically estimate the * local rate and set the local limits * see estimate.c */ if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) { printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); exit(EXIT_FAILURE); } if (enforce_on.u.value) { pthread_rwlock_wrlock(&limiter.limiter_lock); do_enforcement = 1; printlog(LOG_CRITICAL, "--Switching enforcement on.--\n"); pthread_rwlock_unlock(&limiter.limiter_lock); } return 0; } static void drl_signal(int sig) { if (sig == SIGHUP) { printf("Caught SIGHUP - reopening DRL log file.\n"); fclose(logfile); logfile = fopen(drl_logfile.u.string, "a"); printlog(LOG_CRITICAL, "Reopened logfile.\n"); } else { printlog(LOG_WARN, "Caught unexpected signal %d in drl_signal.\n", sig); } } static ulog_output_t drl_op = { .name = "drl", .output = &_output_drl, .signal = &drl_signal, .init = &drl_plugin_init, .fini = NULL, }; #if 0 /* Tests the amount of time it takes to call reconfig(). */ static void time_reconfig(int iterations) { struct timeval start, end; int i; gettimeofday(&start, NULL); for (i = 0; i < iterations; ++i) { reconfig(); } gettimeofday(&end, NULL); printf("%d reconfigs() took %d seconds and %d microseconds.\n", iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); exit(0); // Seems to take about 85ms / iteration } #endif /* register output plugin with ulogd */ static void _drl_reg_op(void) { ulog_output_t *op = &drl_op; register_output(op); } void _init(void) { /* have the opts parsed */ config_parse_file("DRL", config_entries); if (get_ids()) { ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n"); exit(2); } /* Seed the hash function */ salt = getpid() ^ time(NULL); _drl_reg_op(); }