/* See the DRL-LICENSE file for this file's software license. */ /* * ulogd output target for DRL: GRD and FPS * * Ken Yocum * * Original shell of this code from ulogd_NETFLOW * Like that code, we keep track of per-slice data rates * out of this slice. Thus we are rate limiting particular slices * across multiple boxes, ensuring that their outbound rate does not * exceed some fixed limit. * * Fabric: static mesh * * Enforcer: linux drop percentage. * * This file reads packets from the netlink socket. It updates all * the hashmaps which track how much data has arrived per flow. * It starts two threads for this limiter. * One thread handles periodic estimation. * The other thread handles communication with other limiters. * * * Code map * * ulogd_DRL: attach to netlink socket, accept packets. replaces ratelimit.cc * util.c: generic hashing functions, flow comparisons, sundry items. * gossip.c: Recv gossip, send gossip. * peer_comm.c: Thread to listen for updates from other limiters. * estimate.c: Thread to calculate the local limits. * * * Ken Yocum * 2007 * * Some code appropriated from ulogd_NETFLOW: * * Mark Huang * Copyright (C) 2004-2005 The Trustees of Princeton University * * Based on admindump.pl by Mic Bowman and Paul Brett * Copyright (c) 2002 Intel Corporation * */ /* Enable GNU glibc extensions */ #define _GNU_SOURCE #include #include /* va_start() and friends */ #include /* ispunct() */ #include /* strstr() and friends */ #include /* dirname() and basename() */ #include /* fork() and wait() */ #include #include #include /* errno and assert() */ #include #include /* getopt_long() */ #include /* time() and friends */ #include #include /* inet_aton() */ #include #include #include /* ICMP definitions */ #include #include /* stat() */ #include /* pthread_create() */ #include /* flock() */ #include /* Signal definitions - so that we can catch SIGHUP and update config. */ #include #include #include /* Perhaps useful for files within vservers? */ #if !defined(STANDALONE) && HAVE_LIBPROPER #include #endif /* * Jenkins hash support * lives in raterouter.h */ /* DRL specifics */ #include "raterouter.h" #include "util.h" #include "calendar.h" #include "ratetypes.h" /* needs util and pthread.h */ #include "logging.h" /* * /etc/ulogd.conf configuration options * Add the config options for DRL. */ static config_entry_t drl_configfile = { .next = NULL, .key = "drl_configfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "drl.xml" }, }; /** The administrative bandwidth limit (mbps) for the local node. The node * will not set a limit higher than this, even when distributed capacity is * available. Set to 0 for no limit. */ static config_entry_t nodelimit = { .next = &drl_configfile, .key = "nodelimit", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = 0 }, }; /** Determines the verbosity of logging. */ static config_entry_t drl_loglevel = { .next = &nodelimit, .key = "drl_loglevel", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = LOG_WARN }, }; /** The path of the logfile. */ static config_entry_t drl_logfile = { .next = &drl_loglevel, .key = "drl_logfile", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "drl_logfile.log" }, }; /** The choice of DRL protocol. */ static config_entry_t policy = { .next = &drl_logfile, .key = "policy", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, .u = { .string = "GRD" }, }; /** The estimate interval, in milliseconds. */ static config_entry_t estintms = { .next = &policy, .key = "estintms", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_MANDATORY, .u = { .value = 100 }, }; #define config_entries (&estintms) /* * Debug functionality */ #ifdef DMALLOC #include #endif #define NIPQUAD(addr) \ ((unsigned char *)&addr)[0], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[3] #define IPQUAD(addr) \ ((unsigned char *)&addr)[3], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[0] /* Salt for the hash functions */ static int salt; /* * Hash slice name lookups on context ID. */ /* Special context IDs */ #define UNKNOWN_XID -1 #define ROOT_XID 0 enum { CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */ ICMP_ECHOREPLY_XID, ICMP_UNREACH_XID, }; /* globals */ pthread_t estimate_thread; pthread_t signal_thread; pthread_t comm_thread; uint32_t local_ip = 0; limiter_t limiter; extern FILE *logfile; extern uint8_t system_loglevel; /* functions */ static inline uint32_t hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port) { unsigned char mybytes[FLOWKEYSIZE]; mybytes[0] = protocol; *(uint32_t*)(&(mybytes[1])) = src_ip; *(uint32_t*)(&(mybytes[5])) = dst_ip; *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port; return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1); } uint32_t sampled_hasher(const key_flow *key) { return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); } uint32_t standard_hasher(const key_flow *key) { return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port); } struct intr_id { char* name; ulog_iret_t *res; }; /* Interesting keys */ enum { OOB_TIME_SEC = 0, OOB_MARK, IP_SADDR, IP_DADDR, IP_TOTLEN, IP_PROTOCOL, TCP_SPORT, TCP_DPORT, TCP_ACK, TCP_FIN, TCP_SYN, TCP_RST, UDP_SPORT, UDP_DPORT, ICMP_TYPE, ICMP_CODE, GRE_FLAG_KEY, GRE_VERSION, GRE_KEY, PPTP_CALLID, }; #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0])) static struct intr_id intr_ids[] = { [OOB_TIME_SEC] = { "oob.time.sec", 0 }, [OOB_MARK] = { "oob.mark", 0 }, [IP_SADDR] = { "ip.saddr", 0 }, [IP_DADDR] = { "ip.daddr", 0 }, [IP_TOTLEN] = { "ip.totlen", 0 }, [IP_PROTOCOL] = { "ip.protocol", 0 }, [TCP_SPORT] = { "tcp.sport", 0 }, [TCP_DPORT] { "tcp.dport", 0 }, [TCP_ACK] = { "tcp.ack", 0 }, [TCP_FIN] = { "tcp.fin", 0 }, [TCP_SYN] = { "tcp.syn", 0 }, [TCP_RST] = { "tcp.rst", 0 }, [UDP_SPORT] = { "udp.sport", 0 }, [UDP_DPORT] = { "udp.dport", 0 }, [ICMP_TYPE] = { "icmp.type", 0 }, [ICMP_CODE] = { "icmp.code", 0 }, [GRE_FLAG_KEY] = { "gre.flag.key", 0 }, [GRE_VERSION] = { "gre.version", 0 }, [GRE_KEY] = { "gre.key", 0 }, [PPTP_CALLID] = { "pptp.callid", 0 }, }; #define GET_VALUE(x) intr_ids[x].res->value #define DATE(t) ((t) / (24*60*60) * (24*60*60)) static int _output_drl(ulog_iret_t *res) { int xid; uint32_t src_ip, dst_ip; uint16_t src_port, dst_port; uint8_t protocol; key_flow key; identity_t *ident; leaf_t *leaf; protocol = GET_VALUE(IP_PROTOCOL).ui8; src_ip = GET_VALUE(IP_SADDR).ui32; dst_ip = GET_VALUE(IP_DADDR).ui32; xid = GET_VALUE(OOB_MARK).ui32; switch (protocol) { case IPPROTO_TCP: src_port = GET_VALUE(TCP_SPORT).ui16; dst_port = GET_VALUE(TCP_DPORT).ui16; break; case IPPROTO_UDP: /* netflow had an issue with many udp flows and set * src_port=0 to handle it. We don't. */ src_port = GET_VALUE(UDP_SPORT).ui16; /* * traceroutes create a large number of flows in the db * this is a quick hack to catch the most common form * of traceroute (basically we're mapping any UDP packet * in the 33435-33524 range to the "trace" port, 33524 is * 3 packets * nhops (30). */ dst_port = GET_VALUE(UDP_DPORT).ui16; if (dst_port >= 33435 && dst_port <= 33524) dst_port = 33435; break; case IPPROTO_ICMP: src_port = GET_VALUE(ICMP_TYPE).ui8; dst_port = GET_VALUE(ICMP_CODE).ui8; /* * We special case some of the ICMP traffic that the kernel * always generates. Since this is attributed to root, it * creates significant "noise" in the output. We want to be * able to quickly see that root is generating traffic. */ if (xid == ROOT_XID) { if (src_port == ICMP_ECHOREPLY) xid = ICMP_ECHOREPLY_XID; else if (src_port == ICMP_UNREACH) xid = ICMP_UNREACH_XID; } break; case IPPROTO_GRE: if (GET_VALUE(GRE_FLAG_KEY).b) { if (GET_VALUE(GRE_VERSION).ui8 == 1) { /* Get PPTP call ID */ src_port = GET_VALUE(PPTP_CALLID).ui16; } else { /* XXX Truncate GRE keys to 16 bits */ src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32; } } else { /* No key available */ src_port = 0; } dst_port = 0; break; default: /* This is the default key for packets from unsupported protocols */ src_port = 0; dst_port = 0; break; } key.protocol = protocol; key.source_ip = src_ip; key.dest_ip = dst_ip; key.source_port = src_port; key.dest_port = dst_port; key.packet_size = GET_VALUE(IP_TOTLEN).ui16; key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32; pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */ leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid)); /* Even if the packet doesn't match any specific xid, it should still * count in the machine-type tables. This catches root (xid == 0) and * unclassified (xid = fff) packets, which don't have map entries. */ if (leaf == NULL) { ident = limiter.stable_instance.last_machine; } else { ident = leaf->parent; } while (ident) { pthread_mutex_lock(&ident->table_mutex); /* Update the identity's table. */ ident->table_sample_function(ident->table, &key); pthread_mutex_unlock(&ident->table_mutex); ident = ident->parent; } pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */ return 0; } /* get all key id's for the keys we are intrested in */ static int get_ids(void) { int i; struct intr_id *cur_id; for (i = 0; i < INTR_IDS; i++) { cur_id = &intr_ids[i]; cur_id->res = keyh_getres(keyh_getid(cur_id->name)); if (!cur_id->res) { ulogd_log(ULOGD_ERROR, "Cannot resolve keyhash id for %s\n", cur_id->name); return 1; } } return 0; } static void free_identity(identity_t *ident) { if (ident) { free_comm(&ident->comm); if (ident->table) { ident->table_destroy_function(ident->table); } pthread_mutex_destroy(&ident->table_mutex); free(ident); } } static void free_identity_map(map_handle map) { identity_t *tofree = NULL; map_reset_iterate(map); while ((tofree = (identity_t *) map_next(map))) { free_identity(tofree); } free_map(map, 0); } static void free_instance(drl_instance_t *instance) { if (instance->leaves) free(instance->leaves); if (instance->leaf_map) free_map(instance->leaf_map, 0); if (instance->ident_map) free_identity_map(instance->ident_map); if (instance->machines) free(instance->machines); if (instance->sets) free(instance->sets); if (instance->cal) { free(instance->cal); } memset(instance, 0, sizeof(drl_instance_t)); } static void free_failed_config(parsed_configs configs, drl_instance_t *instance) { /* Free configs. */ if (configs.machines) free_ident_list(configs.machines); if (configs.sets) free_ident_list(configs.sets); /* Free instance. */ if (instance) free_instance(instance); } static identity_t *new_identity(ident_config *config) { identity_t *ident = malloc(sizeof(identity_t)); remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count); ident_peer *peer = config->peers; int peer_slot = 0; if (ident == NULL) { return NULL; } if (comm_nodes == NULL) { free(ident); return NULL; } memset(ident, 0, sizeof(identity_t)); memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t)); ident->id = config->id; ident->limit = (uint32_t) (((double) config->limit * 1000000.0) / 8.0); ident->fixed_ewma_weight = config->fixed_ewma_weight; ident->intervals = config->intervals; ident->ewma_weight = pow(ident->fixed_ewma_weight, (limiter.estintms/1000.0) * config->intervals); ident->parent = NULL; pthread_mutex_init(&ident->table_mutex, NULL); switch (config->accounting) { case ACT_STANDARD: ident->table = standard_table_create(standard_hasher, &ident->common); /* Ugly function pointer casting. Makes things sufficiently * generic, though. */ ident->table_sample_function = (int (*)(void *, const key_flow *)) standard_table_sample; ident->table_cleanup_function = (int (*)(void *)) standard_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) standard_table_update_flows; ident->table_destroy_function = (void (*)(void *)) standard_table_destroy; break; case ACT_SAMPLEHOLD: ident->table = sampled_table_create(sampled_hasher, ident->limit * IDENT_CLEAN_INTERVAL, 1, 20, &ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) sampled_table_sample; ident->table_cleanup_function = (int (*)(void *)) sampled_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) sampled_table_update_flows; ident->table_destroy_function = (void (*)(void *)) sampled_table_destroy; break; case ACT_SIMPLE: ident->table = simple_table_create(&ident->common); ident->table_sample_function = (int (*)(void *, const key_flow *)) simple_table_sample; ident->table_cleanup_function = (int (*)(void *)) simple_table_cleanup; ident->table_update_function = (void (*)(void *, struct timeval, double)) simple_table_update_flows; ident->table_destroy_function = (void (*)(void *)) simple_table_destroy; break; } /* Make sure the table was allocated. */ if (ident->table == NULL) { free(ident); return NULL; } while (peer) { comm_nodes[peer_slot].addr = peer->ip; comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT); peer = peer->next; peer_slot += 1; } if (new_comm(&ident->comm, config, comm_nodes)) { printlog(LOG_CRITICAL, "Failed to create communication structure.\n"); return NULL; } ident->comm.remote_nodes = comm_nodes; return ident; } /* Determines the validity of the parameters of one ident_config. * * 0 valid * 1 invalid */ static int validate_config(ident_config *config) { /* Limit must be a positive integer. */ if (config->limit < 1) { return 1; } /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */ if (config->commfabric != COMM_MESH && config->commfabric != COMM_GOSSIP) { return 1; } /* If commfabric is COMM_GOSSIP, this must be a positive integer. */ if (config->commfabric == COMM_GOSSIP && config->branch < 1) { return 1; } /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD, * ACT_SIMPLE). */ if (config->accounting != ACT_STANDARD && config->accounting != ACT_SAMPLEHOLD && config->accounting != ACT_SIMPLE) { return 1; } /* Ewma weight must be greater than or equal to zero. */ if (config->fixed_ewma_weight < 0) { return 1; } /* Note: Parsing stage requires that each ident has at least one peer. */ return 0; } /* 0 success * non-zero failure */ static int validate_configs(parsed_configs configs, drl_instance_t *instance) { ident_config *mlist = configs.machines; ident_config *slist = configs.sets; ident_config *tmp = NULL; int i = 0; while (mlist) { /* ID must be non-zero and unique. */ /* This is ugly and hackish, but this function will be called rarely. * I'm tired of trying to be clever. */ if (mlist->id < 0) { printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id); return EINVAL; } tmp = mlist->next; while (tmp) { if (mlist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } tmp = tmp->next; } tmp = configs.sets; while (tmp) { if (mlist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } tmp = tmp->next; } if (validate_config(mlist)) { printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id); return EINVAL; } mlist = mlist->next; } instance->sets = malloc(configs.set_count * sizeof(identity_t *)); if (instance->sets == NULL) { return ENOMEM; } memset(instance->sets, 0, configs.set_count * sizeof(identity_t *)); instance->set_count = configs.set_count; /* For sets, make sure that the hierarchy is valid. */ while (slist) { ident_member *members = slist->members; /* ID must be non-zero and unique. */ if (slist->id < 0) { printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id); return EINVAL; } tmp = slist->next; while (tmp) { if (slist->id == tmp->id) { printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id); return EINVAL; } tmp = tmp->next; } if (validate_config(slist)) { printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id); return EINVAL; } /* Allocate an identity_t for this set-type identity. */ instance->sets[i] = new_identity(slist); if (instance->sets[i] == NULL) { return ENOMEM; } /* Loop through children and look up each in leaf or ident map * depending on the type of child. Set the child's parent pointer * to the identity we just created above, unless it is already set, * in which case we have an error. */ while (members) { identity_t *child_ident = NULL; leaf_t *child_leaf = NULL; switch (members->type) { case MEMBER_XID: child_leaf = map_search(instance->leaf_map, &members->value, sizeof(members->value)); if (child_leaf == NULL) { return EINVAL; } if (child_leaf->parent != NULL) { /* Error - This leaf already has a parent. */ return EINVAL; } child_leaf->parent = instance->sets[i]; break; case MEMBER_GUID: child_ident = map_search(instance->ident_map, &members->value, sizeof(members->value)); if (child_ident == NULL) { return EINVAL; } if (child_ident->parent != NULL) { /* Error - This identity already has a parent. */ return EINVAL; } child_ident->parent = instance->sets[i]; break; default: /* Error - shouldn't be possible. */ break; } members = members->next; } map_insert(instance->ident_map, &instance->sets[i]->id, sizeof(instance->sets[i]->id), instance->sets[i]); slist = slist->next; i++; } return 0; } static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) { int count = 0; identity_t *current_ident; leaf_t *current_leaf; leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); if (leaves == NULL) { return 1; } map_reset_iterate(instance->leaf_map); while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) { current_ident = current_leaf->parent; while (current_ident != NULL && current_ident != instance->last_machine) { if (current_ident == ident) { /* Found the ident we were looking for - add the leaf. */ leaves[count] = current_leaf; count += 1; break; } current_ident = current_ident->parent; } } ident->leaves = leaves; ident->leaf_count = count; return 0; } static int init_identities(parsed_configs configs, drl_instance_t *instance) { int i, j; ident_config *config = configs.machines; leaf_t *leaf = NULL; instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN); if (instance->cal == NULL) { return ENOMEM; } for (i = 0; i < SCHEDLEN; ++i) { TAILQ_INIT(instance->cal + i); } instance->cal_slot = 0; instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *)); if (instance->machines == NULL) { return ENOMEM; } memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *)); instance->machine_count = configs.machine_count; /* Allocate and add the machine identities. */ for (i = 0; i < configs.machine_count; ++i) { instance->machines[i] = new_identity(config); if (instance->machines[i] == NULL) { return ENOMEM; } /* The first has no parent - it is the root. All others have the * previous ident as their parent. */ if (i == 0) { instance->machines[i]->parent = NULL; } else { instance->machines[i]->parent = instance->machines[i - 1]; } instance->last_machine = instance->machines[i]; /* Add the ident to the guid->ident map. */ map_insert(instance->ident_map, &instance->machines[i]->id, sizeof(instance->machines[i]->id), instance->machines[i]); config = config->next; TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), instance->machines[i], calendar); /* Setup the array of pointers to leaves. This is easy for machines * because a machine node applies to every leaf. */ instance->machines[i]->leaves = malloc(instance->leaf_count * sizeof(leaf_t *)); if (instance->machines[i]->leaves == NULL) { return ENOMEM; } instance->machines[i]->leaf_count = instance->leaf_count; for (j = 0; j < instance->leaf_count; ++j) { instance->machines[i]->leaves[j] = &instance->leaves[j]; } } /* Connect the set subtree to the machines. Any set or leaf without a * parent will take the last machine as its parent. */ /* Leaves... */ map_reset_iterate(instance->leaf_map); while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { if (leaf->parent == NULL) { leaf->parent = instance->last_machine; } } /* Sets... */ for (i = 0; i < instance->set_count; ++i) { if (instance->sets[i]->parent == NULL) { instance->sets[i]->parent = instance->last_machine; } TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK), instance->sets[i], calendar); /* Setup the array of pointers to leaves. This is harder for sets, * but this doesn't need to be super-efficient because it happens * rarely and it isn't on the critical path for reconfig(). */ if (fill_set_leaf_pointer(instance, instance->sets[i])) { return ENOMEM; } } /* Success. */ return 0; } static void print_instance(drl_instance_t *instance) { leaf_t *leaf = NULL; identity_t *ident = NULL; map_reset_iterate(instance->leaf_map); while ((leaf = (leaf_t *) map_next(instance->leaf_map))) { printf("%x:", leaf->xid); ident = leaf->parent; while (ident) { printf("%d:",ident->id); ident = ident->parent; } printf("Leaf's parent pointer is %p\n", leaf->parent); } printf("instance->last_machine is %p\n", instance->last_machine); } static int assign_htb_hierarchy(drl_instance_t *instance) { int i, j; int next_node = 0x11; /* Chain machine nodes under 1:10. */ for (i = 0; i < instance->machine_count; ++i) { if (instance->machines[i]->parent == NULL) { /* Top node. */ instance->machines[i]->htb_parent = 0x10; } else { /* Pointerific! */ instance->machines[i]->htb_parent = instance->machines[i]->parent->htb_node; } instance->machines[i]->htb_node = next_node; next_node += 1; } next_node += 0x10; /* Add set nodes under machine nodes. Iterate backwards to ensure parent is * already there. */ for (j = (instance->set_count - 1); j >= 0; --j) { if (instance->sets[j]->parent == NULL) { instance->sets[j]->htb_parent = 0x10; } else { instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node; } instance->sets[j]->htb_node = next_node; next_node += 1; } return 0; } /* Added this so that I could comment one line and kill off all of the * command execution. */ static int execute_cmd(const char *cmd) { return system(cmd); } static int create_htb_hierarchy(drl_instance_t *instance) { char cmd[300]; int i, j, k; /* Nuke the hierarchy. */ sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb"); execute_cmd(cmd); printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Re-initialize the basics. */ sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/ if (limiter.nodelimit) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit", (unsigned long) limiter.nodelimit * 1024 * 1024); } else { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit"); } if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add back 1:20. */ sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add machines. */ for (i = 0; i < instance->machine_count; ++i) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", instance->machines[i]->htb_parent, instance->machines[i]->htb_node, (unsigned long) instance->machines[i]->limit * 1024 * 1024); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add sets. */ for (j = (instance->set_count - 1); j >= 0; --j) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit", instance->sets[j]->htb_parent, instance->sets[j]->htb_node, (unsigned long) instance->sets[j]->limit * 1024 * 1024); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add leaves. FIXME: Set static sliver limit as ceil here! */ for (k = 0; k < instance->leaf_count; ++k) { if (instance->leaves[k].parent == NULL) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit", instance->leaves[k].xid, (unsigned long) 100 * 1024 * 1024); } else { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit", instance->leaves[k].parent->htb_node, instance->leaves[k].xid, (unsigned long) 100 * 1024 * 1024); } if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit", instance->leaves[k].xid); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); } /* Add 1:1000 and 1:2000 */ if (instance->last_machine == NULL) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit"); } else { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit", instance->last_machine->htb_node); } if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); /* Add 1:1fff and 1:2fff */ if (instance->last_machine == NULL) { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit"); } else { sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit", instance->last_machine->htb_node); } if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit"); if (execute_cmd(cmd)) { return 1; } printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd); #ifdef DELAY40MS /* Only for artificial delay testing. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); execute_cmd(cmd); sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); execute_cmd(cmd); sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo"); execute_cmd(cmd); sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms"); execute_cmd(cmd); sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo"); execute_cmd(cmd); sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms"); execute_cmd(cmd); /* End delay testing */ #endif return 0; } static int setup_tc_grd(drl_instance_t *instance) { int i; char cmd[300]; for (i = 0; i < instance->leaf_count; ++i) { /* Delete the old pfifo qdisc that might have been there before. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo", instance->leaves[i].xid, instance->leaves[i].xid); if (execute_cmd(cmd)) { //FIXME: remove this print and do a log. printf("GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ #ifdef DELAY40MS sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms", instance->leaves[i].xid, instance->leaves[i].xid); #else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms", instance->leaves[i].xid, instance->leaves[i].xid); #endif if (execute_cmd(cmd)) { return 1; } } /* Do the same for 1000 and 1fff. */ sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo"); if (execute_cmd(cmd)) { //FIXME: remove this print and do a log. printf("GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ #ifdef DELAY40MS sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms"); #else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms"); #endif if (execute_cmd(cmd)) { return 1; } sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo"); if (execute_cmd(cmd)) { //FIXME: remove this print and do a log. printf("GRD: pfifo qdisc wasn't there!\n"); } /* Add the netem qdisc. */ #ifdef DELAY40MS sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms"); #else sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms"); #endif if (execute_cmd(cmd)) { return 1; } return 0; } /* init_drl * * Initialize this limiter with options * Open UDP socket for peer communication */ static int init_drl(void) { parsed_configs configs; struct sockaddr_in server_address; memset(&limiter, 0, sizeof(limiter_t)); /* Setup logging. */ system_loglevel = (uint8_t) drl_loglevel.u.value; logfile = fopen(drl_logfile.u.string, "w"); if (logfile == NULL) { printf("Couldn't open logfile - "); perror("fopen()"); exit(EXIT_FAILURE); } printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n"); limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0); init_hashing(); /* for all hash maps */ pthread_rwlock_init(&limiter.limiter_lock,NULL); /* determine our local IP by iterating through interfaces */ if ((limiter.ip = get_local_ip())==0) { printlog(LOG_CRITICAL, "ulogd_DRL unable to aquire local IP address, not registering.\n"); return (false); } limiter.localaddr = inet_addr(limiter.ip); limiter.port = htons(LIMITER_LISTEN_PORT); limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); if (limiter.udp_socket < 0) { printlog(LOG_CRITICAL, "Failed to create UDP socket().\n"); return false; } memset(&server_address, 0, sizeof(server_address)); server_address.sin_family = AF_INET; server_address.sin_addr.s_addr = limiter.localaddr; server_address.sin_port = limiter.port; if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) { printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n"); return false; } printlog(LOG_WARN, " POLICY: %s\n",policy.u.string); if (strcasecmp(policy.u.string,"GRD") == 0) { limiter.policy = POLICY_GRD; } else if (strcasecmp(policy.u.string,"FPS") == 0) { limiter.policy = POLICY_FPS; } else { printlog(LOG_CRITICAL, "Unknown DRL policy %s, aborting.\n",policy.u.string); return (false); } limiter.estintms = estintms.u.value; if (limiter.estintms > 1000) { printlog(LOG_CRITICAL, "DRL: sorry estimate intervals must be less than 1 second."); printlog(LOG_CRITICAL, " Simple source mods will allow larger intervals. Using 1 second.\n"); limiter.estintms = 1000; } printlog(LOG_WARN, " Est interval: %dms\n",limiter.estintms); /* Acquire the big limiter lock for writing. Prevents pretty much * anything else from happening while the hierarchy is being changed. */ pthread_rwlock_wrlock(&limiter.limiter_lock); limiter.stable_instance.ident_map = allocate_map(); if (limiter.stable_instance.ident_map == NULL) { printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n"); return false; } if (get_eligible_leaves(&limiter.stable_instance)) { printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n"); return false; } if (parse_drl_config(drl_configfile.u.string, &configs)) { /* Parse error occured. Return non-zero to notify init_drl(). */ return false; } /* Validate identity hierarchy! */ if (validate_configs(configs, &limiter.stable_instance)) { /* Clean up everything. */ free_failed_config(configs, &limiter.stable_instance); return false; } if (init_identities(configs, &limiter.stable_instance)) { free_failed_config(configs, &limiter.stable_instance); return false; } /* At this point, we should be done with configs. */ free_ident_list(configs.machines); free_ident_list(configs.sets); /* Debugging - FIXME: remove this? */ print_instance(&limiter.stable_instance); switch (limiter.policy) { case POLICY_FPS: if (assign_htb_hierarchy(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); return false; } if (create_htb_hierarchy(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); return false; } break; case POLICY_GRD: if (setup_tc_grd(&limiter.stable_instance)) { free_instance(&limiter.stable_instance); return false; } break; default: return false; } pthread_rwlock_unlock(&limiter.limiter_lock); if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) { printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n"); return false; } printlog(LOG_WARN, "ulogd_DRL init finished.\n"); return true; } static void reconfig() { parsed_configs configs; printlog(LOG_DEBUG, "--Starting reconfig()--\n"); flushlog(); memset(&configs, 0, sizeof(parsed_configs)); memset(&limiter.new_instance, 0, sizeof(drl_instance_t)); limiter.new_instance.ident_map = allocate_map(); if (limiter.new_instance.ident_map == NULL) { printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n"); return; } if (get_eligible_leaves(&limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n"); return; } if (parse_drl_config(drl_configfile.u.string, &configs)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n"); return; } if (validate_configs(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Validation failed during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } if (init_identities(configs, &limiter.new_instance)) { free_failed_config(configs, &limiter.new_instance); printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } free_ident_list(configs.machines); free_ident_list(configs.sets); /* Debugging - FIXME: remove this? */ print_instance(&limiter.new_instance); /* Lock */ pthread_rwlock_wrlock(&limiter.limiter_lock); switch (limiter.policy) { case POLICY_FPS: if (assign_htb_hierarchy(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n"); pthread_rwlock_unlock(&limiter.limiter_lock); return; } if (create_htb_hierarchy(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n"); /* Re-create old instance. */ if (create_htb_hierarchy(&limiter.stable_instance)) { /* Error reinstating the old one - big problem. */ printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n"); printlog(LOG_CRITICAL, "Giving up...\n"); flushlog(); exit(EXIT_FAILURE); } pthread_rwlock_unlock(&limiter.limiter_lock); return; } break; case POLICY_GRD: if (setup_tc_grd(&limiter.new_instance)) { free_instance(&limiter.new_instance); printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n"); /* Try to re-create old instance. */ if (setup_tc_grd(&limiter.stable_instance)) { printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n"); printlog(LOG_CRITICAL, "Giving up...\n"); flushlog(); exit(EXIT_FAILURE); } } break; default: /* Should be impossible. */ printf("Pigs are flying?\n"); exit(EXIT_FAILURE); } /* Switch over new to stable instance. */ free_instance(&limiter.stable_instance); memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t)); /* Success! - Unlock */ pthread_rwlock_unlock(&limiter.limiter_lock); } static ulog_output_t drl_op = { .name = "drl", .output = &_output_drl, .signal = NULL, /* This appears to be broken. Using my own handler. */ .init = NULL, .fini = NULL, }; /* Tests the amount of time it takes to call reconfig(). */ static void time_reconfig(int iterations) { struct timeval start, end; int i; gettimeofday(&start, NULL); for (i = 0; i < iterations; ++i) { reconfig(); } gettimeofday(&end, NULL); printf("%d reconfigs() took %d seconds and %d microseconds.\n", iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec); exit(0); // Seems to take about 85ms / iteration } static void *signal_thread_func(void *args) { int sig; int err; sigset_t sigs; sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); pthread_sigmask(SIG_BLOCK, &sigs, NULL); while (1) { sigemptyset(&sigs); sigaddset(&sigs, SIGHUP); err = sigwait(&sigs, &sig); if (err) { printlog(LOG_CRITICAL, "sigwait() returned an error.\n"); flushlog(); } switch (sig) { case SIGHUP: printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n"); reconfig(); //time_reconfig(1000); //instrumentation flushlog(); break; default: /* Should be impossible... */ break; } } } /* register output plugin with ulogd */ static void _drl_reg_op(void) { ulog_output_t *op = &drl_op; sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, SIGHUP); pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) { printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); flushlog(); exit(EXIT_FAILURE); } if (!init_drl()) { printlog(LOG_CRITICAL, "Init failed. :(\n"); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); flushlog(); exit(EXIT_FAILURE); } register_output(op); /* start up the thread that will periodically estimate the * local rate and set the local limits * see estimate.c */ if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) { ulogd_log(ULOGD_ERROR, "couldn't start estimate thread for 0x%x %s\n",limiter.localaddr, limiter.ip); fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string); exit(EXIT_FAILURE); } } void _init(void) { /* have the opts parsed */ config_parse_file("DRL", config_entries); if (get_ids()) { ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n"); exit(2); } /* Seed the hash function */ salt = getpid() ^ time(NULL); _drl_reg_op(); }