1 /* See the DRL-LICENSE file for this file's software license. */
4 * ulogd output target for DRL: GRD and FPS
6 * Ken Yocum <kyocum@cs.ucsd.edu>
8 * Original shell of this code from ulogd_NETFLOW
9 * Like that code, we keep track of per-slice data rates
10 * out of this slice. Thus we are rate limiting particular slices
11 * across multiple boxes, ensuring that their outbound rate does not
12 * exceed some fixed limit.
16 * Enforcer: linux drop percentage.
18 * This file reads packets from the netlink socket. It updates all
19 * the hashmaps which track how much data has arrived per flow.
20 * It starts two threads for this limiter.
21 * One thread handles periodic estimation.
22 * The other thread handles communication with other limiters.
27 * ulogd_DRL: attach to netlink socket, accept packets. replaces ratelimit.cc
28 * util.c: generic hashing functions, flow comparisons, sundry items.
29 * gossip.c: Recv gossip, send gossip.
30 * peer_comm.c: Thread to listen for updates from other limiters.
31 * estimate.c: Thread to calculate the local limits.
34 * Ken Yocum <kyocum@cs.ucsd.edu>
37 * Some code appropriated from ulogd_NETFLOW:
39 * Mark Huang <mlhuang@cs.princeton.edu>
40 * Copyright (C) 2004-2005 The Trustees of Princeton University
42 * Based on admindump.pl by Mic Bowman and Paul Brett
43 * Copyright (c) 2002 Intel Corporation
47 /* Enable GNU glibc extensions */
53 /* va_start() and friends */
59 /* strstr() and friends */
62 /* dirname() and basename() */
65 /* fork() and wait() */
66 #include <sys/types.h>
70 /* errno and assert() */
77 /* time() and friends */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
93 /* pthread_create() */
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
111 * Jenkins hash support
112 * lives in raterouter.h
116 #include "raterouter.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
123 * /etc/ulogd.conf configuration options
124 * Add the config options for DRL.
127 static config_entry_t leaves = {
130 .type = CONFIG_TYPE_STRING,
131 .options = CONFIG_OPT_NONE,
132 .u = { .string = "PLANETLAB" },
135 static config_entry_t bind_addr = {
138 .type = CONFIG_TYPE_STRING,
139 .options = CONFIG_OPT_NONE,
140 .u = { .string = "AUTO" },
143 static config_entry_t create_htb = {
146 .type = CONFIG_TYPE_INT,
147 .options = CONFIG_OPT_NONE,
151 static config_entry_t enforce_on = {
154 .type = CONFIG_TYPE_INT,
155 .options = CONFIG_OPT_NONE,
159 static config_entry_t partition = {
161 .key = "partition_set",
162 .type = CONFIG_TYPE_INT,
163 .options = CONFIG_OPT_NONE,
164 .u = { .value = 0xfffffff },
167 static config_entry_t sfq_slice = {
170 .type = CONFIG_TYPE_STRING,
171 .options = CONFIG_OPT_NONE,
172 .u = { .string = "NONE" },
175 static config_entry_t netem_slice = {
177 .key = "netem_slice",
178 .type = CONFIG_TYPE_STRING,
179 .options = CONFIG_OPT_NONE,
180 .u = { .string = "ALL" },
183 static config_entry_t netem_loss = {
184 .next = &netem_slice,
186 .type = CONFIG_TYPE_INT,
187 .options = CONFIG_OPT_NONE,
191 static config_entry_t netem_delay = {
193 .key = "netem_delay",
194 .type = CONFIG_TYPE_INT,
195 .options = CONFIG_OPT_NONE,
199 static config_entry_t drl_configfile = {
200 .next = &netem_delay,
201 .key = "drl_configfile",
202 .type = CONFIG_TYPE_STRING,
203 .options = CONFIG_OPT_MANDATORY,
204 .u = { .string = "drl.xml" },
207 /** The administrative bandwidth limit (mbps) for the local node. The node
208 * will not set a limit higher than this, even when distributed capacity is
209 * available. Set to 0 for no limit. */
210 static config_entry_t nodelimit = {
211 .next = &drl_configfile,
213 .type = CONFIG_TYPE_INT,
214 .options = CONFIG_OPT_MANDATORY,
218 /** Determines the verbosity of logging. */
219 static config_entry_t drl_loglevel = {
221 .key = "drl_loglevel",
222 .type = CONFIG_TYPE_INT,
223 .options = CONFIG_OPT_MANDATORY,
224 .u = { .value = LOG_WARN },
227 /** The path of the logfile. */
228 static config_entry_t drl_logfile = {
229 .next = &drl_loglevel,
230 .key = "drl_logfile",
231 .type = CONFIG_TYPE_STRING,
232 .options = CONFIG_OPT_MANDATORY,
233 .u = { .string = "drl_logfile.log" },
236 /** The choice of DRL protocol. */
237 static config_entry_t policy = {
238 .next = &drl_logfile,
240 .type = CONFIG_TYPE_STRING,
241 .options = CONFIG_OPT_MANDATORY,
242 .u = { .string = "FPS" },
245 /** The estimate interval, in milliseconds. */
246 static config_entry_t estintms = {
249 .type = CONFIG_TYPE_INT,
250 .options = CONFIG_OPT_MANDATORY,
251 .u = { .value = 500 },
254 #define config_entries (&estintms)
257 * Debug functionality
264 #define NIPQUAD(addr) \
265 ((unsigned char *)&addr)[0], \
266 ((unsigned char *)&addr)[1], \
267 ((unsigned char *)&addr)[2], \
268 ((unsigned char *)&addr)[3]
270 #define IPQUAD(addr) \
271 ((unsigned char *)&addr)[3], \
272 ((unsigned char *)&addr)[2], \
273 ((unsigned char *)&addr)[1], \
274 ((unsigned char *)&addr)[0]
278 /* Salt for the hash functions */
282 * Hash slice name lookups on context ID.
285 /* Special context IDs */
286 #define UNKNOWN_XID -1
290 CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
297 pthread_t estimate_thread;
298 pthread_t signal_thread;
299 pthread_t comm_thread;
300 uint32_t local_ip = 0;
302 extern FILE *logfile;
303 extern uint8_t system_loglevel;
304 extern uint8_t do_enforcement;
306 /* Used to simulate partitions. */
307 int do_partition = 0;
308 int partition_set = 0xfffffff;
312 static inline uint32_t
313 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
315 unsigned char mybytes[FLOWKEYSIZE];
316 mybytes[0] = protocol;
317 *(uint32_t*)(&(mybytes[1])) = src_ip;
318 *(uint32_t*)(&(mybytes[5])) = dst_ip;
319 *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
320 return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
323 uint32_t sampled_hasher(const key_flow *key) {
324 /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
325 return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
328 uint32_t standard_hasher(const key_flow *key) {
329 return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
332 uint32_t multiple_hasher(const key_flow *key) {
333 return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
341 /* Interesting keys */
365 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
366 static struct intr_id intr_ids[] = {
367 [OOB_TIME_SEC] = { "oob.time.sec", 0 },
368 [OOB_MARK] = { "oob.mark", 0 },
369 [IP_SADDR] = { "ip.saddr", 0 },
370 [IP_DADDR] = { "ip.daddr", 0 },
371 [IP_TOTLEN] = { "ip.totlen", 0 },
372 [IP_PROTOCOL] = { "ip.protocol", 0 },
373 [TCP_SPORT] = { "tcp.sport", 0 },
374 [TCP_DPORT] { "tcp.dport", 0 },
375 [TCP_ACK] = { "tcp.ack", 0 },
376 [TCP_FIN] = { "tcp.fin", 0 },
377 [TCP_SYN] = { "tcp.syn", 0 },
378 [TCP_RST] = { "tcp.rst", 0 },
379 [UDP_SPORT] = { "udp.sport", 0 },
380 [UDP_DPORT] = { "udp.dport", 0 },
381 [ICMP_TYPE] = { "icmp.type", 0 },
382 [ICMP_CODE] = { "icmp.code", 0 },
383 [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
384 [GRE_VERSION] = { "gre.version", 0 },
385 [GRE_KEY] = { "gre.key", 0 },
386 [PPTP_CALLID] = { "pptp.callid", 0 },
389 #define GET_VALUE(x) intr_ids[x].res->value
391 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
393 static int _output_drl(ulog_iret_t *res)
396 uint32_t src_ip, dst_ip;
397 uint16_t src_port, dst_port;
404 protocol = GET_VALUE(IP_PROTOCOL).ui8;
405 src_ip = GET_VALUE(IP_SADDR).ui32;
406 dst_ip = GET_VALUE(IP_DADDR).ui32;
407 xid = GET_VALUE(OOB_MARK).ui32;
412 src_port = GET_VALUE(TCP_SPORT).ui16;
413 dst_port = GET_VALUE(TCP_DPORT).ui16;
417 /* netflow had an issue with many udp flows and set
418 * src_port=0 to handle it. We don't.
420 src_port = GET_VALUE(UDP_SPORT).ui16;
423 * traceroutes create a large number of flows in the db
424 * this is a quick hack to catch the most common form
425 * of traceroute (basically we're mapping any UDP packet
426 * in the 33435-33524 range to the "trace" port, 33524 is
427 * 3 packets * nhops (30).
429 dst_port = GET_VALUE(UDP_DPORT).ui16;
430 if (dst_port >= 33435 && dst_port <= 33524)
435 src_port = GET_VALUE(ICMP_TYPE).ui8;
436 dst_port = GET_VALUE(ICMP_CODE).ui8;
439 * We special case some of the ICMP traffic that the kernel
440 * always generates. Since this is attributed to root, it
441 * creates significant "noise" in the output. We want to be
442 * able to quickly see that root is generating traffic.
444 if (xid == ROOT_XID) {
445 if (src_port == ICMP_ECHOREPLY)
446 xid = ICMP_ECHOREPLY_XID;
447 else if (src_port == ICMP_UNREACH)
448 xid = ICMP_UNREACH_XID;
453 if (GET_VALUE(GRE_FLAG_KEY).b) {
454 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
455 /* Get PPTP call ID */
456 src_port = GET_VALUE(PPTP_CALLID).ui16;
458 /* XXX Truncate GRE keys to 16 bits */
459 src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
462 /* No key available */
469 /* This is the default key for packets from unsupported protocols */
475 key.protocol = protocol;
476 key.source_ip = src_ip;
477 key.dest_ip = dst_ip;
478 key.source_port = src_port;
479 key.dest_port = dst_port;
480 key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
481 key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
483 pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
485 leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
487 /* Even if the packet doesn't match any specific xid, it should still
488 * count in the machine-type tables. This catches root (xid == 0) and
489 * unclassified (xid = fff) packets, which don't have map entries. */
491 ident = limiter.stable_instance.last_machine;
493 ident = leaf->parent;
497 pthread_mutex_lock(&ident->table_mutex);
499 /* Update the identity's table. */
500 ident->table_sample_function(ident->table, &key);
502 #ifdef SHADOW_ACCTING
504 /* Update the shadow perfect copy of the accounting table. */
505 standard_table_sample((standard_flow_table) ident->shadow_table, &key);
509 pthread_mutex_unlock(&ident->table_mutex);
511 ident = ident->parent;
514 pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
519 /* get all key id's for the keys we are intrested in */
520 static int get_ids(void)
523 struct intr_id *cur_id;
525 for (i = 0; i < INTR_IDS; i++) {
526 cur_id = &intr_ids[i];
527 cur_id->res = keyh_getres(keyh_getid(cur_id->name));
529 ulogd_log(ULOGD_ERROR,
530 "Cannot resolve keyhash id for %s\n",
538 static void free_identity(identity_t *ident) {
540 free_comm(&ident->comm);
543 ident->table_destroy_function(ident->table);
546 if (ident->loop_action) {
547 ident->loop_action->valid = 0;
550 if (ident->comm_action) {
551 ident->comm_action->valid = 0;
554 pthread_mutex_destroy(&ident->table_mutex);
560 static void free_identity_map(map_handle map) {
561 identity_t *tofree = NULL;
563 map_reset_iterate(map);
564 while ((tofree = (identity_t *) map_next(map))) {
565 free_identity(tofree);
571 static void free_instance(drl_instance_t *instance) {
572 if (instance->leaves)
573 free(instance->leaves);
574 if (instance->leaf_map)
575 free_map(instance->leaf_map, 0);
576 if (instance->ident_map)
577 free_identity_map(instance->ident_map);
578 if (instance->machines)
579 free(instance->machines);
581 free(instance->sets);
583 /* FIXME: Drain the calendar first and free all the entries. */
588 memset(instance, 0, sizeof(drl_instance_t));
591 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
593 if (configs.machines)
594 free_ident_list(configs.machines);
596 free_ident_list(configs.sets);
600 free_instance(instance);
603 static identity_t *new_identity(ident_config *config) {
604 identity_t *ident = malloc(sizeof(identity_t));
605 remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
606 ident_peer *peer = config->peers;
613 if (comm_nodes == NULL) {
618 memset(ident, 0, sizeof(identity_t));
619 memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
621 ident->id = config->id;
622 ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
623 ident->fixed_ewma_weight = config->fixed_ewma_weight;
624 ident->communication_intervals = config->communication_intervals;
625 ident->mainloop_intervals = config->mainloop_intervals;
626 ident->ewma_weight = pow(ident->fixed_ewma_weight,
627 (limiter.estintms/1000.0) * config->mainloop_intervals);
628 ident->parent = NULL;
629 ident->independent = config->independent;
631 pthread_mutex_init(&ident->table_mutex, NULL);
632 switch (config->accounting) {
635 standard_table_create(standard_hasher, &ident->common);
637 /* Ugly function pointer casting. Makes things sufficiently
638 * generic, though. */
639 ident->table_sample_function =
640 (int (*)(void *, const key_flow *)) standard_table_sample;
641 ident->table_cleanup_function =
642 (int (*)(void *)) standard_table_cleanup;
643 ident->table_update_function =
644 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
645 ident->table_destroy_function =
646 (void (*)(void *)) standard_table_destroy;
651 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
653 ident->table_sample_function =
654 (int (*)(void *, const key_flow *)) multiple_table_sample;
655 ident->table_cleanup_function =
656 (int (*)(void *)) multiple_table_cleanup;
657 ident->table_update_function =
658 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
659 ident->table_destroy_function =
660 (void (*)(void *)) multiple_table_destroy;
664 ident->table = sampled_table_create(sampled_hasher,
665 ident->limit * IDENT_CLEAN_INTERVAL,
666 SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
668 ident->table_sample_function =
669 (int (*)(void *, const key_flow *)) sampled_table_sample;
670 ident->table_cleanup_function =
671 (int (*)(void *)) sampled_table_cleanup;
672 ident->table_update_function =
673 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
674 ident->table_destroy_function =
675 (void (*)(void *)) sampled_table_destroy;
679 ident->table = simple_table_create(&ident->common);
681 ident->table_sample_function =
682 (int (*)(void *, const key_flow *)) simple_table_sample;
683 ident->table_cleanup_function =
684 (int (*)(void *)) simple_table_cleanup;
685 ident->table_update_function =
686 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
687 ident->table_destroy_function =
688 (void (*)(void *)) simple_table_destroy;
692 #ifdef SHADOW_ACCTING
694 ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
696 if (ident->shadow_table == NULL) {
697 ident->table_destroy_function(ident->table);
704 /* Make sure the table was allocated. */
705 if (ident->table == NULL) {
711 comm_nodes[peer_slot].addr = peer->ip;
712 comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
717 if (new_comm(&ident->comm, config, comm_nodes)) {
718 printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
722 ident->comm.remote_nodes = comm_nodes;
724 if (!create_htb.u.value) {
725 ident->htb_node = config->htb_node;
726 ident->htb_parent = config->htb_parent;
732 static int validate_htb_exists(int node, int parent) {
733 FILE *pipe = popen("/sbin/tc class show dev eth0", "r");
737 while (fgets(line, 200, pipe) != NULL) {
741 sscanf(line, "class htb 1:%x parent 1:%x prio %s", &n, &p, ignore);
742 if (n == node && p == parent) {
748 while (fgets(line, 200, pipe) != NULL) {
752 sscanf(line, "class htb 1:%x root prio %d %s", &n, &p, ignore);
753 if (n == node && strstr(line, "root") != NULL) {
765 /* Determines the validity of the parameters of one ident_config.
770 static int validate_config(ident_config *config) {
771 /* Limit must be a positive integer. */
772 if (config->limit < 1) {
776 /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
777 if (config->commfabric != COMM_MESH &&
778 config->commfabric != COMM_GOSSIP) {
782 /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
783 if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
787 /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
788 * ACT_SIMPLE, ACT_MULTIPLE). */
789 if (config->accounting != ACT_STANDARD &&
790 config->accounting != ACT_SAMPLEHOLD &&
791 config->accounting != ACT_SIMPLE &&
792 config->accounting != ACT_MULTIPLE) {
796 /* Ewma weight must be greater than or equal to zero. */
797 if (config->fixed_ewma_weight < 0) {
801 if (!create_htb.u.value) {
802 if (config->htb_node < 0 || config->htb_parent < 0) {
803 printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n");
807 if (validate_htb_exists(config->htb_node, config->htb_parent)) {
808 printlog(LOG_CRITICAL, "Identity specified htb node %x with parent %x. No such node/parent combo seems to exist!\n", config->htb_node, config->htb_parent);
812 if (config->htb_node > -1 || config->htb_parent > -1) {
813 printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n");
817 /* Note: Parsing stage requires that each ident has at least one peer. */
824 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
826 ident_config *mlist = configs.machines;
827 ident_config *slist = configs.sets;
828 ident_config *tmp = NULL;
832 /* ID must be non-zero and unique. */
833 /* This is ugly and hackish, but this function will be called rarely.
834 * I'm tired of trying to be clever. */
836 printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
841 if (mlist->id == tmp->id) {
842 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
849 if (mlist->id == tmp->id) {
850 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
856 if (validate_config(mlist)) {
857 printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
861 if (mlist->independent) {
862 printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n");
863 mlist->independent = 0;
869 instance->sets = malloc(configs.set_count * sizeof(identity_t *));
870 if (instance->sets == NULL) {
871 printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n");
875 memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
876 instance->set_count = configs.set_count;
878 /* For sets, make sure that the hierarchy is valid. */
880 ident_member *members = slist->members;
882 /* ID must be non-zero and unique. */
884 printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
889 if (slist->id == tmp->id) {
890 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
896 if (validate_config(slist)) {
897 printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
901 /* Allocate an identity_t for this set-type identity. */
902 instance->sets[i] = new_identity(slist);
904 if (instance->sets[i] == NULL) {
905 printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n");
909 /* Loop through children and look up each in leaf or ident map
910 * depending on the type of child. Set the child's parent pointer
911 * to the identity we just created above, unless it is already set,
912 * in which case we have an error. */
914 identity_t *child_ident = NULL;
915 leaf_t *child_leaf = NULL;
917 switch (members->type) {
919 child_leaf = map_search(instance->leaf_map, &members->value,
920 sizeof(members->value));
921 if (child_leaf == NULL) {
922 printlog(LOG_CRITICAL, "xid: child leaf not found.\n");
925 if (child_leaf->parent != NULL) {
926 /* Error - This leaf already has a parent. */
927 printlog(LOG_CRITICAL, "xid: child already has a parent.\n");
930 child_leaf->parent = instance->sets[i];
933 child_ident = map_search(instance->ident_map, &members->value,
934 sizeof(members->value));
935 if (child_ident == NULL) {
936 printlog(LOG_CRITICAL, "guid: child identity not found.\n");
939 if (child_ident->parent != NULL) {
940 /* Error - This identity already has a parent. */
941 printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n");
944 child_ident->parent = instance->sets[i];
947 /* Error - shouldn't be possible. */
950 members = members->next;
953 map_insert(instance->ident_map, &instance->sets[i]->id,
954 sizeof(instance->sets[i]->id), instance->sets[i]);
962 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
964 identity_t *current_ident;
965 leaf_t *current_leaf;
966 leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
967 if (leaves == NULL) {
971 map_reset_iterate(instance->leaf_map);
972 while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
973 current_ident = current_leaf->parent;
974 while (current_ident != NULL && current_ident != instance->last_machine) {
975 if (current_ident == ident) {
976 /* Found the ident we were looking for - add the leaf. */
977 leaves[count] = current_leaf;
981 current_ident = current_ident->parent;
985 ident->leaves = leaves;
986 ident->leaf_count = count;
991 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
993 ident_config *config = configs.machines;
996 instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
998 if (instance->cal == NULL) {
1002 for (i = 0; i < SCHEDLEN; ++i) {
1003 TAILQ_INIT(instance->cal + i);
1005 instance->cal_slot = 0;
1007 instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
1009 if (instance->machines == NULL) {
1013 memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
1014 instance->machine_count = configs.machine_count;
1016 /* Allocate and add the machine identities. */
1017 for (i = 0; i < configs.machine_count; ++i) {
1018 identity_action *loop_action;
1019 identity_action *comm_action;
1020 instance->machines[i] = new_identity(config);
1022 if (instance->machines[i] == NULL) {
1026 loop_action = malloc(sizeof(identity_action));
1027 comm_action = malloc(sizeof(identity_action));
1029 if (loop_action == NULL || comm_action == NULL) {
1033 /* The first has no parent - it is the root. All others have the
1034 * previous ident as their parent. */
1036 instance->machines[i]->parent = NULL;
1038 instance->machines[i]->parent = instance->machines[i - 1];
1041 instance->last_machine = instance->machines[i];
1043 /* Add the ident to the guid->ident map. */
1044 map_insert(instance->ident_map, &instance->machines[i]->id,
1045 sizeof(instance->machines[i]->id), instance->machines[i]);
1047 config = config->next;
1049 memset(loop_action, 0, sizeof(identity_action));
1050 memset(comm_action, 0, sizeof(identity_action));
1051 loop_action->ident = instance->machines[i];
1052 loop_action->action = ACTION_MAINLOOP;
1053 loop_action->valid = 1;
1054 comm_action->ident = instance->machines[i];
1055 comm_action->action = ACTION_COMMUNICATE;
1056 comm_action->valid = 1;
1058 instance->machines[i]->loop_action = loop_action;
1059 instance->machines[i]->comm_action = comm_action;
1061 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1062 loop_action, calendar);
1064 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1065 comm_action, calendar);
1067 /* Setup the array of pointers to leaves. This is easy for machines
1068 * because a machine node applies to every leaf. */
1069 instance->machines[i]->leaves =
1070 malloc(instance->leaf_count * sizeof(leaf_t *));
1071 if (instance->machines[i]->leaves == NULL) {
1074 instance->machines[i]->leaf_count = instance->leaf_count;
1075 for (j = 0; j < instance->leaf_count; ++j) {
1076 instance->machines[i]->leaves[j] = &instance->leaves[j];
1080 /* Connect the set subtree to the machines. Any set or leaf without a
1081 * parent will take the last machine as its parent. */
1084 map_reset_iterate(instance->leaf_map);
1085 while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1086 if (leaf->parent == NULL) {
1087 leaf->parent = instance->last_machine;
1092 for (i = 0; i < instance->set_count; ++i) {
1093 identity_action *loop_action;
1094 identity_action *comm_action;
1096 if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) {
1097 instance->sets[i]->parent = instance->last_machine;
1100 loop_action = malloc(sizeof(identity_action));
1101 comm_action = malloc(sizeof(identity_action));
1103 if (loop_action == NULL || comm_action == NULL) {
1107 memset(loop_action, 0, sizeof(identity_action));
1108 memset(comm_action, 0, sizeof(identity_action));
1109 loop_action->ident = instance->sets[i];
1110 loop_action->action = ACTION_MAINLOOP;
1111 loop_action->valid = 1;
1112 comm_action->ident = instance->sets[i];
1113 comm_action->action = ACTION_COMMUNICATE;
1114 comm_action->valid = 1;
1116 instance->sets[i]->loop_action = loop_action;
1117 instance->sets[i]->comm_action = comm_action;
1119 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1120 loop_action, calendar);
1122 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1123 comm_action, calendar);
1125 /* Setup the array of pointers to leaves. This is harder for sets,
1126 * but this doesn't need to be super-efficient because it happens
1127 * rarely and it isn't on the critical path for reconfig(). */
1128 if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1137 static void print_instance(drl_instance_t *instance) {
1138 leaf_t *leaf = NULL;
1139 identity_t *ident = NULL;
1141 if (system_loglevel == LOG_DEBUG) {
1142 map_reset_iterate(instance->leaf_map);
1143 while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1144 printf("%x:", leaf->xid);
1145 ident = leaf->parent;
1147 printf("%d:",ident->id);
1148 ident = ident->parent;
1150 printf("Leaf's parent pointer is %p\n", leaf->parent);
1153 printf("instance->last_machine is %p\n", instance->last_machine);
1157 static int assign_htb_hierarchy(drl_instance_t *instance) {
1159 int next_node = 0x100;
1161 /* If we're not going to create our own htb hierarchy (for instance,
1162 * if we're going to let PL's node manager do it for us), then we don't
1163 * want this function to do anything. */
1164 if (!create_htb.u.value) {
1165 printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1169 /* Chain machine nodes under 1:10. */
1170 for (i = 0; i < instance->machine_count; ++i) {
1171 if (instance->machines[i]->parent == NULL) {
1173 instance->machines[i]->htb_parent = 0x10;
1176 instance->machines[i]->htb_parent =
1177 instance->machines[i]->parent->htb_node;
1180 instance->machines[i]->htb_node = next_node;
1186 /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1188 for (j = (instance->set_count - 1); j >= 0; --j) {
1189 if (instance->sets[j]->parent == NULL) {
1190 /* Independent node - goes under 0x10 away from machine nodes. */
1191 instance->sets[j]->htb_parent = 0x10;
1193 instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1195 instance->sets[j]->htb_node = next_node;
1203 /* Added this so that I could comment one line and kill off all of the
1204 * command execution. */
1205 static inline int execute_cmd(const char *cmd) {
1209 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1210 const uint32_t classid_major, const uint32_t classid_minor,
1211 const uint64_t rate, const uint64_t ceil) {
1214 sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1215 iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1216 printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1218 return execute_cmd(cmd);
1221 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1222 const uint32_t parent_minor, const uint32_t handle,
1223 const int loss, const int delay) {
1226 sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1227 parent_minor, handle);
1228 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1229 if (execute_cmd(cmd))
1230 printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1232 sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1233 iface, parent_major, parent_minor, handle, loss, delay);
1234 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1235 return execute_cmd(cmd);
1238 static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
1239 const uint32_t parent_minor, const uint32_t handle,
1240 const int perturb) {
1243 sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1244 parent_minor, handle);
1245 printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1246 if (execute_cmd(cmd))
1247 printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
1249 sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
1250 iface, parent_major, parent_minor, handle, perturb);
1251 printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1252 return execute_cmd(cmd);
1255 static int create_htb_hierarchy(drl_instance_t *instance) {
1258 uint64_t gigabit = 1024 * 1024 * 1024;
1260 /* If we're not going to create our own htb hierarchy (for instance,
1261 * if we're going to let PL's node manager do it for us), then we don't
1262 * want this function to do anything. */
1263 if (!create_htb.u.value) {
1264 printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1268 /* Nuke the hierarchy. */
1269 sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1271 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1273 /* Re-initialize the basics. */
1274 sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1275 if (execute_cmd(cmd)) {
1278 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1280 if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1283 /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1284 if (limiter.nodelimit) {
1285 if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1288 if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1293 for (i = 0; i < instance->machine_count; ++i) {
1294 if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1295 instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1302 /* Add back 1:20. */
1304 if (instance->last_machine == NULL) {
1305 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1307 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1308 instance->last_machine->htb_node);
1311 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1314 if (execute_cmd(cmd)) {
1317 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1320 for (j = (instance->set_count - 1); j >= 0; --j) {
1321 if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1322 instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1327 /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1328 for (k = 0; k < instance->leaf_count; ++k) {
1329 if (instance->leaves[k].parent == NULL) {
1330 if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1333 if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1337 /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1338 if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1342 /* Add 1:1000 and 1:2000 */
1343 if (instance->last_machine == NULL) {
1344 if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1347 if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1351 if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1354 /* Add 1:1fff and 1:2fff */
1355 if (instance->last_machine == NULL) {
1356 if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1359 if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1363 if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1366 /* Artifical delay or loss for experimentation. */
1367 if (netem_delay.u.value || netem_loss.u.value) {
1368 if (!strcmp(netem_slice.u.string, "ALL")) {
1369 /* By default, netem applies to all leaves. */
1370 if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1372 if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1375 for (k = 0; k < instance->leaf_count; ++k) {
1376 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1377 (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1381 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1384 /* netem_slice is not the default ALL value. Only apply netem
1385 * to the slice that is set in netem_slice.u.string. */
1388 sscanf(netem_slice.u.string, "%x", &slice_xid);
1390 if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1395 /* Turn on SFQ for experimentation. */
1396 if (strcmp(sfq_slice.u.string, "NONE")) {
1397 if (!strcmp(sfq_slice.u.string, "ALL")) {
1398 if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
1400 if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
1403 for (k = 0; k < instance->leaf_count; ++k) {
1404 if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
1405 (0x1000 | instance->leaves[k].xid), 30)) {
1412 sscanf(sfq_slice.u.string, "%x", &slice_xid);
1414 if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
1422 static int setup_tc_grd(drl_instance_t *instance) {
1426 for (i = 0; i < instance->leaf_count; ++i) {
1427 /* Delete the old pfifo qdisc that might have been there before. */
1428 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1429 instance->leaves[i].xid, instance->leaves[i].xid);
1431 if (execute_cmd(cmd)) {
1432 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1435 /* Add the netem qdisc. */
1436 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1437 instance->leaves[i].xid, instance->leaves[i].xid);
1439 if (execute_cmd(cmd)) {
1440 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1445 /* Do the same for 1000 and 1fff. */
1446 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1448 if (execute_cmd(cmd)) {
1449 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1452 /* Add the netem qdisc. */
1453 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1455 if (execute_cmd(cmd)) {
1456 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1460 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1462 if (execute_cmd(cmd)) {
1463 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1466 /* Add the netem qdisc. */
1467 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1469 if (execute_cmd(cmd)) {
1470 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1474 /* Artifical delay or loss for experimentation. */
1475 if (netem_delay.u.value || netem_loss.u.value) {
1476 if (!strcmp(netem_slice.u.string, "ALL")) {
1477 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1478 if (execute_cmd(cmd)) {
1479 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1483 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1484 if (execute_cmd(cmd)) {
1485 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1489 for (j = 0; j < instance->leaf_count; ++j) {
1490 leaf_t *current = &instance->leaves[j];
1492 current->delay = netem_delay.u.value;
1494 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
1496 if (execute_cmd(cmd)) {
1497 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1503 leaf_t *leaf = NULL;
1505 sscanf(netem_slice.u.string, "%x", &slice_xid);
1507 leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
1510 /* Leaf not found - invalid selection. */
1511 printf("Your experimental setup is incorrect...\n");
1515 leaf->delay = netem_delay.u.value;
1517 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
1519 if (execute_cmd(cmd)) {
1520 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1531 * Initialize this limiter with options
1532 * Open UDP socket for peer communication
1534 static int init_drl(void) {
1535 parsed_configs configs;
1536 struct sockaddr_in server_address;
1538 memset(&limiter, 0, sizeof(limiter_t));
1540 /* Setup logging. */
1541 system_loglevel = (uint8_t) drl_loglevel.u.value;
1542 logfile = fopen(drl_logfile.u.string, "w");
1544 if (logfile == NULL) {
1545 printf("Couldn't open logfile - ");
1550 printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1552 limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1554 init_hashing(); /* for all hash maps */
1556 pthread_rwlock_init(&limiter.limiter_lock,NULL);
1558 /* determine our local IP by iterating through interfaces */
1559 if (strncmp(bind_addr.u.string, "AUTO", 4)) {
1560 limiter.ip = bind_addr.u.string;
1562 limiter.ip = get_local_ip();
1563 if (limiter.ip == NULL) {
1564 printlog(LOG_CRITICAL, "ulogd_DRL unable to aquire local IP address, not registering.\n");
1568 limiter.localaddr = inet_addr(limiter.ip);
1569 limiter.port = htons(LIMITER_LISTEN_PORT);
1570 limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1571 if (limiter.udp_socket < 0) {
1573 printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1577 memset(&server_address, 0, sizeof(server_address));
1578 server_address.sin_family = AF_INET;
1579 server_address.sin_addr.s_addr = limiter.localaddr;
1580 server_address.sin_port = limiter.port;
1582 if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1584 printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1588 printlog(LOG_WARN, " POLICY: %s\n",policy.u.string);
1589 if (strcasecmp(policy.u.string,"GRD") == 0) {
1590 limiter.policy = POLICY_GRD;
1591 } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1592 limiter.policy = POLICY_FPS;
1594 printlog(LOG_CRITICAL,
1595 "Unknown DRL policy %s, aborting.\n",policy.u.string);
1599 limiter.estintms = estintms.u.value;
1600 if (limiter.estintms > 1000) {
1601 printlog(LOG_CRITICAL,
1602 "DRL: sorry estimate intervals must be less than 1 second.");
1603 printlog(LOG_CRITICAL,
1604 " Simple source mods will allow larger intervals. Using 1 second.\n");
1605 limiter.estintms = 1000;
1607 printlog(LOG_WARN, " Est interval: %dms\n",limiter.estintms);
1609 /* Acquire the big limiter lock for writing. Prevents pretty much
1610 * anything else from happening while the hierarchy is being changed. */
1611 pthread_rwlock_wrlock(&limiter.limiter_lock);
1613 limiter.stable_instance.ident_map = allocate_map();
1614 if (limiter.stable_instance.ident_map == NULL) {
1615 printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1619 /* If no leaves are specified, assume we're on planetlab and read them out
1620 * of /proc/virtual. Otherwise, read the specified line. */
1621 if (!strncmp(leaves.u.string, "PLANETLAB", 9)) {
1622 if (get_eligible_leaves(&limiter.stable_instance)) {
1623 printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1627 if (parse_leaves(&limiter.stable_instance, leaves.u.string)) {
1628 printlog(LOG_CRITICAL, "Failed to parse leaf string.\n");
1633 if (parse_drl_config(drl_configfile.u.string, &configs)) {
1634 /* Parse error occured. Return non-zero to notify init_drl(). */
1635 printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1636 drl_configfile.u.string);
1640 /* Validate identity hierarchy! */
1641 if (validate_configs(configs, &limiter.stable_instance)) {
1642 /* Clean up everything. */
1643 free_failed_config(configs, &limiter.stable_instance);
1644 printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1645 drl_configfile.u.string);
1649 if (init_identities(configs, &limiter.stable_instance)) {
1650 free_failed_config(configs, &limiter.stable_instance);
1651 printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1655 /* At this point, we should be done with configs. */
1656 free_ident_list(configs.machines);
1657 free_ident_list(configs.sets);
1659 print_instance(&limiter.stable_instance);
1661 switch (limiter.policy) {
1663 if (assign_htb_hierarchy(&limiter.stable_instance)) {
1664 free_instance(&limiter.stable_instance);
1665 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1669 if (create_htb_hierarchy(&limiter.stable_instance)) {
1670 free_instance(&limiter.stable_instance);
1671 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1677 if (setup_tc_grd(&limiter.stable_instance)) {
1678 free_instance(&limiter.stable_instance);
1679 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1688 partition_set = partition.u.value;
1690 pthread_rwlock_unlock(&limiter.limiter_lock);
1692 if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1693 printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1697 printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1702 static void reconfig() {
1703 parsed_configs configs;
1705 printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1708 memset(&configs, 0, sizeof(parsed_configs));
1709 memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1711 limiter.new_instance.ident_map = allocate_map();
1712 if (limiter.new_instance.ident_map == NULL) {
1713 printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1717 if (!strncmp(leaves.u.string, "PLANETLAB", 9)) {
1718 if (get_eligible_leaves(&limiter.new_instance)) {
1719 free_failed_config(configs, &limiter.new_instance);
1720 printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1724 if (parse_leaves(&limiter.new_instance, leaves.u.string)) {
1725 free_failed_config(configs, &limiter.new_instance);
1726 printlog(LOG_CRITICAL, "Failed to parse leaf string.\n");
1731 if (parse_drl_config(drl_configfile.u.string, &configs)) {
1732 free_failed_config(configs, &limiter.new_instance);
1733 printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1737 if (validate_configs(configs, &limiter.new_instance)) {
1738 free_failed_config(configs, &limiter.new_instance);
1739 printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1740 pthread_rwlock_unlock(&limiter.limiter_lock);
1744 if (init_identities(configs, &limiter.new_instance)) {
1745 free_failed_config(configs, &limiter.new_instance);
1746 printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1747 pthread_rwlock_unlock(&limiter.limiter_lock);
1751 free_ident_list(configs.machines);
1752 free_ident_list(configs.sets);
1754 print_instance(&limiter.new_instance);
1757 pthread_rwlock_wrlock(&limiter.limiter_lock);
1759 switch (limiter.policy) {
1761 if (assign_htb_hierarchy(&limiter.new_instance)) {
1762 free_instance(&limiter.new_instance);
1763 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1764 pthread_rwlock_unlock(&limiter.limiter_lock);
1768 if (create_htb_hierarchy(&limiter.new_instance)) {
1769 free_instance(&limiter.new_instance);
1770 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1772 /* Re-create old instance. */
1773 if (create_htb_hierarchy(&limiter.stable_instance)) {
1774 /* Error reinstating the old one - big problem. */
1775 printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1776 printlog(LOG_CRITICAL, "Giving up...\n");
1781 pthread_rwlock_unlock(&limiter.limiter_lock);
1787 if (setup_tc_grd(&limiter.new_instance)) {
1788 free_instance(&limiter.new_instance);
1789 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1791 /* Try to re-create old instance. */
1792 if (setup_tc_grd(&limiter.stable_instance)) {
1793 printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1794 printlog(LOG_CRITICAL, "Giving up...\n");
1802 /* Should be impossible. */
1803 printf("Pigs are flying?\n");
1807 /* Switch over new to stable instance. */
1808 free_instance(&limiter.stable_instance);
1809 memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1811 /* Success! - Unlock */
1812 pthread_rwlock_unlock(&limiter.limiter_lock);
1815 static int stop_enforcement(drl_instance_t *instance) {
1819 for (i = 0; i < instance->machine_count; ++i) {
1820 sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1821 instance->machines[i]->htb_parent,
1822 instance->machines[i]->htb_node);
1824 if (execute_cmd(cmd)) {
1829 for (i = 0; i < instance->set_count; ++i) {
1830 sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1831 instance->sets[i]->htb_parent,
1832 instance->sets[i]->htb_node);
1834 if (execute_cmd(cmd)) {
1842 static void *signal_thread_func(void *args) {
1848 sigaddset(&sigs, SIGHUP);
1849 sigaddset(&sigs, SIGUSR1);
1850 sigaddset(&sigs, SIGUSR2);
1851 sigaddset(&sigs, SIGRTMAX);
1852 pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1856 //sigaddset(&sigs, SIGHUP);
1857 sigaddset(&sigs, SIGUSR1);
1858 sigaddset(&sigs, SIGUSR2);
1859 sigaddset(&sigs, SIGRTMAX);
1861 err = sigwait(&sigs, &sig);
1864 printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1869 if (sig == SIGRTMAX) {
1870 printf("Caught SIGRTMAX - toggling fake partitions.\n");
1871 do_partition = !do_partition;
1877 printlog(LOG_CRITICAL, "Caught SIGHUP in signal_thread_func?!?\n");
1878 printf("Caught SIGHUP in signal_thread_func?!?\n");
1881 pthread_rwlock_wrlock(&limiter.limiter_lock);
1882 if (do_enforcement) {
1884 stop_enforcement(&limiter.stable_instance);
1885 printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1888 printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1890 pthread_rwlock_unlock(&limiter.limiter_lock);
1893 printlog(LOG_WARN, "Caught SIGUSR2 - re-reading XML file.\n");
1894 printf("Caught SIGUSR2 - re-reading XML file.\n");
1899 /* Intentionally blank. */
1905 static int drl_plugin_init() {
1906 sigset_t signal_mask;
1908 sigemptyset(&signal_mask);
1909 //sigaddset(&signal_mask, SIGHUP);
1910 sigaddset(&signal_mask, SIGUSR1);
1911 sigaddset(&signal_mask, SIGUSR2);
1912 sigaddset(&signal_mask, SIGRTMAX);
1913 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1915 if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1916 printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1917 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1923 printlog(LOG_CRITICAL, "Init failed. :(\n");
1924 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1929 /* start up the thread that will periodically estimate the
1930 * local rate and set the local limits
1933 if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1934 printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1935 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1939 if (enforce_on.u.value) {
1940 pthread_rwlock_wrlock(&limiter.limiter_lock);
1942 printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1943 pthread_rwlock_unlock(&limiter.limiter_lock);
1949 static void drl_signal(int sig) {
1950 if (sig == SIGHUP) {
1951 printf("Caught SIGHUP - reopening DRL log file.\n");
1954 logfile = fopen(drl_logfile.u.string, "a");
1955 printlog(LOG_CRITICAL, "Reopened logfile.\n");
1957 printlog(LOG_WARN, "Caught unexpected signal %d in drl_signal.\n", sig);
1961 static ulog_output_t drl_op = {
1963 .output = &_output_drl,
1964 .signal = &drl_signal,
1965 .init = &drl_plugin_init,
1970 /* Tests the amount of time it takes to call reconfig(). */
1971 static void time_reconfig(int iterations) {
1972 struct timeval start, end;
1975 gettimeofday(&start, NULL);
1976 for (i = 0; i < iterations; ++i) {
1979 gettimeofday(&end, NULL);
1981 printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1982 iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1985 // Seems to take about 85ms / iteration
1989 /* register output plugin with ulogd */
1990 static void _drl_reg_op(void)
1992 ulog_output_t *op = &drl_op;
1993 register_output(op);
1998 /* have the opts parsed */
1999 config_parse_file("DRL", config_entries);
2002 ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
2006 /* Seed the hash function */
2007 salt = getpid() ^ time(NULL);