1 /* See the DRL-LICENSE file for this file's software license. */
4 * ulogd output target for DRL: GRD and FPS
6 * Ken Yocum <kyocum@cs.ucsd.edu>
8 * Original shell of this code from ulogd_NETFLOW
9 * Like that code, we keep track of per-slice data rates
10 * out of this slice. Thus we are rate limiting particular slices
11 * across multiple boxes, ensuring that their outbound rate does not
12 * exceed some fixed limit.
16 * Enforcer: linux drop percentage.
18 * This file reads packets from the netlink socket. It updates all
19 * the hashmaps which track how much data has arrived per flow.
20 * It starts two threads for this limiter.
21 * One thread handles periodic estimation.
22 * The other thread handles communication with other limiters.
27 * ulogd_DRL: attach to netlink socket, accept packets. replaces ratelimit.cc
28 * util.c: generic hashing functions, flow comparisons, sundry items.
29 * gossip.c: Recv gossip, send gossip.
30 * peer_comm.c: Thread to listen for updates from other limiters.
31 * estimate.c: Thread to calculate the local limits.
34 * Ken Yocum <kyocum@cs.ucsd.edu>
37 * Some code appropriated from ulogd_NETFLOW:
39 * Mark Huang <mlhuang@cs.princeton.edu>
40 * Copyright (C) 2004-2005 The Trustees of Princeton University
42 * Based on admindump.pl by Mic Bowman and Paul Brett
43 * Copyright (c) 2002 Intel Corporation
47 /* Enable GNU glibc extensions */
53 /* va_start() and friends */
59 /* strstr() and friends */
62 /* dirname() and basename() */
65 /* fork() and wait() */
66 #include <sys/types.h>
70 /* errno and assert() */
77 /* time() and friends */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
93 /* pthread_create() */
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
111 * Jenkins hash support
112 * lives in raterouter.h
116 #include "raterouter.h"
118 #include "calendar.h"
119 #include "ratetypes.h" /* needs util and pthread.h */
123 * /etc/ulogd.conf configuration options
124 * Add the config options for DRL.
127 static config_entry_t drl_configfile = {
129 .key = "drl_configfile",
130 .type = CONFIG_TYPE_STRING,
131 .options = CONFIG_OPT_MANDATORY,
132 .u = { .string = "drl.xml" },
135 /** The administrative bandwidth limit (mbps) for the local node. The node
136 * will not set a limit higher than this, even when distributed capacity is
137 * available. Set to 0 for no limit. */
138 static config_entry_t nodelimit = {
139 .next = &drl_configfile,
141 .type = CONFIG_TYPE_INT,
142 .options = CONFIG_OPT_MANDATORY,
146 /** Determines the verbosity of logging. */
147 static config_entry_t drl_loglevel = {
149 .key = "drl_loglevel",
150 .type = CONFIG_TYPE_INT,
151 .options = CONFIG_OPT_MANDATORY,
152 .u = { .value = LOG_WARN },
155 /** The path of the logfile. */
156 static config_entry_t drl_logfile = {
157 .next = &drl_loglevel,
158 .key = "drl_logfile",
159 .type = CONFIG_TYPE_STRING,
160 .options = CONFIG_OPT_MANDATORY,
161 .u = { .string = "drl_logfile.log" },
164 /** The choice of DRL protocol. */
165 static config_entry_t policy = {
166 .next = &drl_logfile,
168 .type = CONFIG_TYPE_STRING,
169 .options = CONFIG_OPT_MANDATORY,
170 .u = { .string = "GRD" },
173 /** The estimate interval, in milliseconds. */
174 static config_entry_t estintms = {
177 .type = CONFIG_TYPE_INT,
178 .options = CONFIG_OPT_MANDATORY,
179 .u = { .value = 100 },
182 #define config_entries (&estintms)
185 * Debug functionality
192 #define NIPQUAD(addr) \
193 ((unsigned char *)&addr)[0], \
194 ((unsigned char *)&addr)[1], \
195 ((unsigned char *)&addr)[2], \
196 ((unsigned char *)&addr)[3]
198 #define IPQUAD(addr) \
199 ((unsigned char *)&addr)[3], \
200 ((unsigned char *)&addr)[2], \
201 ((unsigned char *)&addr)[1], \
202 ((unsigned char *)&addr)[0]
206 /* Salt for the hash functions */
210 * Hash slice name lookups on context ID.
213 /* Special context IDs */
214 #define UNKNOWN_XID -1
218 CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
225 pthread_t estimate_thread;
226 pthread_t signal_thread;
227 pthread_t comm_thread;
228 uint32_t local_ip = 0;
230 extern FILE *logfile;
231 extern uint8_t system_loglevel;
232 extern uint8_t do_enforcement;
236 static inline uint32_t
237 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port)
239 unsigned char mybytes[FLOWKEYSIZE];
240 mybytes[0] = protocol;
241 *(uint32_t*)(&(mybytes[1])) = src_ip;
242 *(uint32_t*)(&(mybytes[5])) = dst_ip;
243 *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
244 return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1);
247 uint32_t sampled_hasher(const key_flow *key) {
248 return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
251 uint32_t standard_hasher(const key_flow *key) {
252 return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
260 /* Interesting keys */
284 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
285 static struct intr_id intr_ids[] = {
286 [OOB_TIME_SEC] = { "oob.time.sec", 0 },
287 [OOB_MARK] = { "oob.mark", 0 },
288 [IP_SADDR] = { "ip.saddr", 0 },
289 [IP_DADDR] = { "ip.daddr", 0 },
290 [IP_TOTLEN] = { "ip.totlen", 0 },
291 [IP_PROTOCOL] = { "ip.protocol", 0 },
292 [TCP_SPORT] = { "tcp.sport", 0 },
293 [TCP_DPORT] { "tcp.dport", 0 },
294 [TCP_ACK] = { "tcp.ack", 0 },
295 [TCP_FIN] = { "tcp.fin", 0 },
296 [TCP_SYN] = { "tcp.syn", 0 },
297 [TCP_RST] = { "tcp.rst", 0 },
298 [UDP_SPORT] = { "udp.sport", 0 },
299 [UDP_DPORT] = { "udp.dport", 0 },
300 [ICMP_TYPE] = { "icmp.type", 0 },
301 [ICMP_CODE] = { "icmp.code", 0 },
302 [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
303 [GRE_VERSION] = { "gre.version", 0 },
304 [GRE_KEY] = { "gre.key", 0 },
305 [PPTP_CALLID] = { "pptp.callid", 0 },
308 #define GET_VALUE(x) intr_ids[x].res->value
310 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
312 static int _output_drl(ulog_iret_t *res)
315 uint32_t src_ip, dst_ip;
316 uint16_t src_port, dst_port;
323 protocol = GET_VALUE(IP_PROTOCOL).ui8;
324 src_ip = GET_VALUE(IP_SADDR).ui32;
325 dst_ip = GET_VALUE(IP_DADDR).ui32;
326 xid = GET_VALUE(OOB_MARK).ui32;
331 src_port = GET_VALUE(TCP_SPORT).ui16;
332 dst_port = GET_VALUE(TCP_DPORT).ui16;
336 /* netflow had an issue with many udp flows and set
337 * src_port=0 to handle it. We don't.
339 src_port = GET_VALUE(UDP_SPORT).ui16;
342 * traceroutes create a large number of flows in the db
343 * this is a quick hack to catch the most common form
344 * of traceroute (basically we're mapping any UDP packet
345 * in the 33435-33524 range to the "trace" port, 33524 is
346 * 3 packets * nhops (30).
348 dst_port = GET_VALUE(UDP_DPORT).ui16;
349 if (dst_port >= 33435 && dst_port <= 33524)
354 src_port = GET_VALUE(ICMP_TYPE).ui8;
355 dst_port = GET_VALUE(ICMP_CODE).ui8;
358 * We special case some of the ICMP traffic that the kernel
359 * always generates. Since this is attributed to root, it
360 * creates significant "noise" in the output. We want to be
361 * able to quickly see that root is generating traffic.
363 if (xid == ROOT_XID) {
364 if (src_port == ICMP_ECHOREPLY)
365 xid = ICMP_ECHOREPLY_XID;
366 else if (src_port == ICMP_UNREACH)
367 xid = ICMP_UNREACH_XID;
372 if (GET_VALUE(GRE_FLAG_KEY).b) {
373 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
374 /* Get PPTP call ID */
375 src_port = GET_VALUE(PPTP_CALLID).ui16;
377 /* XXX Truncate GRE keys to 16 bits */
378 src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
381 /* No key available */
388 /* This is the default key for packets from unsupported protocols */
394 key.protocol = protocol;
395 key.source_ip = src_ip;
396 key.dest_ip = dst_ip;
397 key.source_port = src_port;
398 key.dest_port = dst_port;
399 key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
400 key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
402 pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
404 leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
406 /* Even if the packet doesn't match any specific xid, it should still
407 * count in the machine-type tables. This catches root (xid == 0) and
408 * unclassified (xid = fff) packets, which don't have map entries. */
410 ident = limiter.stable_instance.last_machine;
412 ident = leaf->parent;
416 pthread_mutex_lock(&ident->table_mutex);
418 /* Update the identity's table. */
419 ident->table_sample_function(ident->table, &key);
421 pthread_mutex_unlock(&ident->table_mutex);
423 ident = ident->parent;
426 pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
431 /* get all key id's for the keys we are intrested in */
432 static int get_ids(void)
435 struct intr_id *cur_id;
437 for (i = 0; i < INTR_IDS; i++) {
438 cur_id = &intr_ids[i];
439 cur_id->res = keyh_getres(keyh_getid(cur_id->name));
441 ulogd_log(ULOGD_ERROR,
442 "Cannot resolve keyhash id for %s\n",
450 static void free_identity(identity_t *ident) {
452 free_comm(&ident->comm);
455 ident->table_destroy_function(ident->table);
458 pthread_mutex_destroy(&ident->table_mutex);
464 static void free_identity_map(map_handle map) {
465 identity_t *tofree = NULL;
467 map_reset_iterate(map);
468 while ((tofree = (identity_t *) map_next(map))) {
469 free_identity(tofree);
475 static void free_instance(drl_instance_t *instance) {
476 if (instance->leaves)
477 free(instance->leaves);
478 if (instance->leaf_map)
479 free_map(instance->leaf_map, 0);
480 if (instance->ident_map)
481 free_identity_map(instance->ident_map);
482 if (instance->machines)
483 free(instance->machines);
485 free(instance->sets);
490 memset(instance, 0, sizeof(drl_instance_t));
493 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
495 if (configs.machines)
496 free_ident_list(configs.machines);
498 free_ident_list(configs.sets);
502 free_instance(instance);
505 static identity_t *new_identity(ident_config *config) {
506 identity_t *ident = malloc(sizeof(identity_t));
507 remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
508 ident_peer *peer = config->peers;
515 if (comm_nodes == NULL) {
520 memset(ident, 0, sizeof(identity_t));
521 memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
523 ident->id = config->id;
524 ident->limit = (uint32_t) (((double) config->limit * 1000000.0) / 8.0);
525 ident->fixed_ewma_weight = config->fixed_ewma_weight;
526 ident->intervals = config->intervals;
527 ident->ewma_weight = pow(ident->fixed_ewma_weight,
528 (limiter.estintms/1000.0) * config->intervals);
529 ident->parent = NULL;
531 pthread_mutex_init(&ident->table_mutex, NULL);
532 switch (config->accounting) {
535 standard_table_create(standard_hasher, &ident->common);
537 /* Ugly function pointer casting. Makes things sufficiently
538 * generic, though. */
539 ident->table_sample_function =
540 (int (*)(void *, const key_flow *)) standard_table_sample;
541 ident->table_cleanup_function =
542 (int (*)(void *)) standard_table_cleanup;
543 ident->table_update_function =
544 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
545 ident->table_destroy_function =
546 (void (*)(void *)) standard_table_destroy;
550 ident->table = sampled_table_create(sampled_hasher,
551 ident->limit * IDENT_CLEAN_INTERVAL,
552 1, 20, &ident->common);
554 ident->table_sample_function =
555 (int (*)(void *, const key_flow *)) sampled_table_sample;
556 ident->table_cleanup_function =
557 (int (*)(void *)) sampled_table_cleanup;
558 ident->table_update_function =
559 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
560 ident->table_destroy_function =
561 (void (*)(void *)) sampled_table_destroy;
565 ident->table = simple_table_create(&ident->common);
567 ident->table_sample_function =
568 (int (*)(void *, const key_flow *)) simple_table_sample;
569 ident->table_cleanup_function =
570 (int (*)(void *)) simple_table_cleanup;
571 ident->table_update_function =
572 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
573 ident->table_destroy_function =
574 (void (*)(void *)) simple_table_destroy;
578 /* Make sure the table was allocated. */
579 if (ident->table == NULL) {
585 comm_nodes[peer_slot].addr = peer->ip;
586 comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
591 if (new_comm(&ident->comm, config, comm_nodes)) {
592 printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
596 ident->comm.remote_nodes = comm_nodes;
601 /* Determines the validity of the parameters of one ident_config.
606 static int validate_config(ident_config *config) {
607 /* Limit must be a positive integer. */
608 if (config->limit < 1) {
612 /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
613 if (config->commfabric != COMM_MESH &&
614 config->commfabric != COMM_GOSSIP) {
618 /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
619 if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
623 /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
625 if (config->accounting != ACT_STANDARD &&
626 config->accounting != ACT_SAMPLEHOLD &&
627 config->accounting != ACT_SIMPLE) {
631 /* Ewma weight must be greater than or equal to zero. */
632 if (config->fixed_ewma_weight < 0) {
636 /* Note: Parsing stage requires that each ident has at least one peer. */
643 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
645 ident_config *mlist = configs.machines;
646 ident_config *slist = configs.sets;
647 ident_config *tmp = NULL;
651 /* ID must be non-zero and unique. */
652 /* This is ugly and hackish, but this function will be called rarely.
653 * I'm tired of trying to be clever. */
655 printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
660 if (mlist->id == tmp->id) {
661 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
668 if (mlist->id == tmp->id) {
669 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
675 if (validate_config(mlist)) {
676 printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
683 instance->sets = malloc(configs.set_count * sizeof(identity_t *));
684 if (instance->sets == NULL) {
688 memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
689 instance->set_count = configs.set_count;
691 /* For sets, make sure that the hierarchy is valid. */
693 ident_member *members = slist->members;
695 /* ID must be non-zero and unique. */
697 printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
702 if (slist->id == tmp->id) {
703 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
709 if (validate_config(slist)) {
710 printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
714 /* Allocate an identity_t for this set-type identity. */
715 instance->sets[i] = new_identity(slist);
717 if (instance->sets[i] == NULL) {
721 /* Loop through children and look up each in leaf or ident map
722 * depending on the type of child. Set the child's parent pointer
723 * to the identity we just created above, unless it is already set,
724 * in which case we have an error. */
726 identity_t *child_ident = NULL;
727 leaf_t *child_leaf = NULL;
729 switch (members->type) {
731 child_leaf = map_search(instance->leaf_map, &members->value,
732 sizeof(members->value));
733 if (child_leaf == NULL) {
736 if (child_leaf->parent != NULL) {
737 /* Error - This leaf already has a parent. */
740 child_leaf->parent = instance->sets[i];
743 child_ident = map_search(instance->ident_map, &members->value,
744 sizeof(members->value));
745 if (child_ident == NULL) {
748 if (child_ident->parent != NULL) {
749 /* Error - This identity already has a parent. */
752 child_ident->parent = instance->sets[i];
755 /* Error - shouldn't be possible. */
758 members = members->next;
761 map_insert(instance->ident_map, &instance->sets[i]->id,
762 sizeof(instance->sets[i]->id), instance->sets[i]);
770 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
772 identity_t *current_ident;
773 leaf_t *current_leaf;
774 leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
775 if (leaves == NULL) {
779 map_reset_iterate(instance->leaf_map);
780 while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
781 current_ident = current_leaf->parent;
782 while (current_ident != NULL && current_ident != instance->last_machine) {
783 if (current_ident == ident) {
784 /* Found the ident we were looking for - add the leaf. */
785 leaves[count] = current_leaf;
789 current_ident = current_ident->parent;
793 ident->leaves = leaves;
794 ident->leaf_count = count;
799 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
801 ident_config *config = configs.machines;
804 instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
806 if (instance->cal == NULL) {
810 for (i = 0; i < SCHEDLEN; ++i) {
811 TAILQ_INIT(instance->cal + i);
813 instance->cal_slot = 0;
815 instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
817 if (instance->machines == NULL) {
821 memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
822 instance->machine_count = configs.machine_count;
824 /* Allocate and add the machine identities. */
825 for (i = 0; i < configs.machine_count; ++i) {
826 instance->machines[i] = new_identity(config);
828 if (instance->machines[i] == NULL) {
832 /* The first has no parent - it is the root. All others have the
833 * previous ident as their parent. */
835 instance->machines[i]->parent = NULL;
837 instance->machines[i]->parent = instance->machines[i - 1];
840 instance->last_machine = instance->machines[i];
842 /* Add the ident to the guid->ident map. */
843 map_insert(instance->ident_map, &instance->machines[i]->id,
844 sizeof(instance->machines[i]->id), instance->machines[i]);
846 config = config->next;
848 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
849 instance->machines[i], calendar);
851 /* Setup the array of pointers to leaves. This is easy for machines
852 * because a machine node applies to every leaf. */
853 instance->machines[i]->leaves =
854 malloc(instance->leaf_count * sizeof(leaf_t *));
855 if (instance->machines[i]->leaves == NULL) {
858 instance->machines[i]->leaf_count = instance->leaf_count;
859 for (j = 0; j < instance->leaf_count; ++j) {
860 instance->machines[i]->leaves[j] = &instance->leaves[j];
864 /* Connect the set subtree to the machines. Any set or leaf without a
865 * parent will take the last machine as its parent. */
868 map_reset_iterate(instance->leaf_map);
869 while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
870 if (leaf->parent == NULL) {
871 leaf->parent = instance->last_machine;
876 for (i = 0; i < instance->set_count; ++i) {
877 if (instance->sets[i]->parent == NULL) {
878 instance->sets[i]->parent = instance->last_machine;
881 TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
882 instance->sets[i], calendar);
884 /* Setup the array of pointers to leaves. This is harder for sets,
885 * but this doesn't need to be super-efficient because it happens
886 * rarely and it isn't on the critical path for reconfig(). */
887 if (fill_set_leaf_pointer(instance, instance->sets[i])) {
896 static void print_instance(drl_instance_t *instance) {
898 identity_t *ident = NULL;
900 if (system_loglevel == LOG_DEBUG) {
901 map_reset_iterate(instance->leaf_map);
902 while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
903 printf("%x:", leaf->xid);
904 ident = leaf->parent;
906 printf("%d:",ident->id);
907 ident = ident->parent;
909 printf("Leaf's parent pointer is %p\n", leaf->parent);
912 printf("instance->last_machine is %p\n", instance->last_machine);
916 static int assign_htb_hierarchy(drl_instance_t *instance) {
918 int next_node = 0x11;
920 /* Chain machine nodes under 1:10. */
921 for (i = 0; i < instance->machine_count; ++i) {
922 if (instance->machines[i]->parent == NULL) {
924 instance->machines[i]->htb_parent = 0x10;
927 instance->machines[i]->htb_parent =
928 instance->machines[i]->parent->htb_node;
931 instance->machines[i]->htb_node = next_node;
937 /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
939 for (j = (instance->set_count - 1); j >= 0; --j) {
940 if (instance->sets[j]->parent == NULL) {
941 instance->sets[j]->htb_parent = 0x10;
943 instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
945 instance->sets[j]->htb_node = next_node;
953 /* Added this so that I could comment one line and kill off all of the
954 * command execution. */
955 static int execute_cmd(const char *cmd) {
959 static int create_htb_hierarchy(drl_instance_t *instance) {
963 /* Nuke the hierarchy. */
964 sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
966 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
968 /* Re-initialize the basics. */
969 sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
970 if (execute_cmd(cmd)) {
973 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
974 sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit");
975 if (execute_cmd(cmd)) {
978 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
980 /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/
981 if (limiter.nodelimit) {
982 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit",
983 (unsigned long) limiter.nodelimit * 1024 * 1024);
985 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit");
988 if (execute_cmd(cmd)) {
991 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
994 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
996 if (execute_cmd(cmd)) {
999 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1003 for (i = 0; i < instance->machine_count; ++i) {
1004 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
1005 instance->machines[i]->htb_parent,
1006 instance->machines[i]->htb_node,
1007 (unsigned long) instance->machines[i]->limit * 1024 * 1024);
1009 if (execute_cmd(cmd)) {
1012 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1016 for (j = (instance->set_count - 1); j >= 0; --j) {
1017 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
1018 instance->sets[j]->htb_parent,
1019 instance->sets[j]->htb_node,
1020 (unsigned long) instance->sets[j]->limit * 1024 * 1024);
1022 if (execute_cmd(cmd)) {
1025 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1028 /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1029 for (k = 0; k < instance->leaf_count; ++k) {
1030 if (instance->leaves[k].parent == NULL) {
1031 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit",
1032 instance->leaves[k].xid,
1033 (unsigned long) 100 * 1024 * 1024);
1035 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit",
1036 instance->leaves[k].parent->htb_node,
1037 instance->leaves[k].xid,
1038 (unsigned long) 100 * 1024 * 1024);
1041 if (execute_cmd(cmd)) {
1044 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1046 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit",
1047 instance->leaves[k].xid);
1049 if (execute_cmd(cmd)) {
1052 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1055 /* Add 1:1000 and 1:2000 */
1056 if (instance->last_machine == NULL) {
1057 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit");
1059 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit",
1060 instance->last_machine->htb_node);
1063 if (execute_cmd(cmd)) {
1066 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1067 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit");
1069 if (execute_cmd(cmd)) {
1072 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1074 /* Add 1:1fff and 1:2fff */
1075 if (instance->last_machine == NULL) {
1076 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit");
1078 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit",
1079 instance->last_machine->htb_node);
1082 if (execute_cmd(cmd)) {
1085 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1086 sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit");
1088 if (execute_cmd(cmd)) {
1091 printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1094 /* Only for artificial delay testing. */
1095 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1098 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1100 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
1103 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
1105 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
1108 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
1110 /* End delay testing */
1116 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1119 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 sfq perturb 20");
1126 static int setup_tc_grd(drl_instance_t *instance) {
1130 for (i = 0; i < instance->leaf_count; ++i) {
1131 /* Delete the old pfifo qdisc that might have been there before. */
1132 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1133 instance->leaves[i].xid, instance->leaves[i].xid);
1135 if (execute_cmd(cmd)) {
1136 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1139 /* Add the netem qdisc. */
1141 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
1142 instance->leaves[i].xid, instance->leaves[i].xid);
1144 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1145 instance->leaves[i].xid, instance->leaves[i].xid);
1148 if (execute_cmd(cmd)) {
1153 /* Do the same for 1000 and 1fff. */
1154 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1156 if (execute_cmd(cmd)) {
1157 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1160 /* Add the netem qdisc. */
1162 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1164 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1167 if (execute_cmd(cmd)) {
1171 sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1173 if (execute_cmd(cmd)) {
1174 printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1177 /* Add the netem qdisc. */
1179 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
1181 sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1184 if (execute_cmd(cmd)) {
1193 * Initialize this limiter with options
1194 * Open UDP socket for peer communication
1196 static int init_drl(void) {
1197 parsed_configs configs;
1198 struct sockaddr_in server_address;
1200 memset(&limiter, 0, sizeof(limiter_t));
1202 /* Setup logging. */
1203 system_loglevel = (uint8_t) drl_loglevel.u.value;
1204 logfile = fopen(drl_logfile.u.string, "w");
1206 if (logfile == NULL) {
1207 printf("Couldn't open logfile - ");
1212 printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1214 limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1216 init_hashing(); /* for all hash maps */
1218 pthread_rwlock_init(&limiter.limiter_lock,NULL);
1220 /* determine our local IP by iterating through interfaces */
1221 if ((limiter.ip = get_local_ip())==0) {
1222 printlog(LOG_CRITICAL,
1223 "ulogd_DRL unable to aquire local IP address, not registering.\n");
1226 limiter.localaddr = inet_addr(limiter.ip);
1227 limiter.port = htons(LIMITER_LISTEN_PORT);
1228 limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1229 if (limiter.udp_socket < 0) {
1230 printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1234 memset(&server_address, 0, sizeof(server_address));
1235 server_address.sin_family = AF_INET;
1236 server_address.sin_addr.s_addr = limiter.localaddr;
1237 server_address.sin_port = limiter.port;
1239 if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1240 printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1244 printlog(LOG_WARN, " POLICY: %s\n",policy.u.string);
1245 if (strcasecmp(policy.u.string,"GRD") == 0) {
1246 limiter.policy = POLICY_GRD;
1247 } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1248 limiter.policy = POLICY_FPS;
1250 printlog(LOG_CRITICAL,
1251 "Unknown DRL policy %s, aborting.\n",policy.u.string);
1255 limiter.estintms = estintms.u.value;
1256 if (limiter.estintms > 1000) {
1257 printlog(LOG_CRITICAL,
1258 "DRL: sorry estimate intervals must be less than 1 second.");
1259 printlog(LOG_CRITICAL,
1260 " Simple source mods will allow larger intervals. Using 1 second.\n");
1261 limiter.estintms = 1000;
1263 printlog(LOG_WARN, " Est interval: %dms\n",limiter.estintms);
1265 /* Acquire the big limiter lock for writing. Prevents pretty much
1266 * anything else from happening while the hierarchy is being changed. */
1267 pthread_rwlock_wrlock(&limiter.limiter_lock);
1269 limiter.stable_instance.ident_map = allocate_map();
1270 if (limiter.stable_instance.ident_map == NULL) {
1271 printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1275 if (get_eligible_leaves(&limiter.stable_instance)) {
1276 printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1280 if (parse_drl_config(drl_configfile.u.string, &configs)) {
1281 /* Parse error occured. Return non-zero to notify init_drl(). */
1282 printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1283 drl_configfile.u.string);
1287 /* Validate identity hierarchy! */
1288 if (validate_configs(configs, &limiter.stable_instance)) {
1289 /* Clean up everything. */
1290 free_failed_config(configs, &limiter.stable_instance);
1291 printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1292 drl_configfile.u.string);
1296 if (init_identities(configs, &limiter.stable_instance)) {
1297 free_failed_config(configs, &limiter.stable_instance);
1298 printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1302 /* At this point, we should be done with configs. */
1303 free_ident_list(configs.machines);
1304 free_ident_list(configs.sets);
1306 /* Debugging - FIXME: remove this? */
1307 print_instance(&limiter.stable_instance);
1309 switch (limiter.policy) {
1311 if (assign_htb_hierarchy(&limiter.stable_instance)) {
1312 free_instance(&limiter.stable_instance);
1313 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1317 if (create_htb_hierarchy(&limiter.stable_instance)) {
1318 free_instance(&limiter.stable_instance);
1319 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1325 if (setup_tc_grd(&limiter.stable_instance)) {
1326 free_instance(&limiter.stable_instance);
1327 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1336 pthread_rwlock_unlock(&limiter.limiter_lock);
1338 if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1339 printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1343 printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1348 static void reconfig() {
1349 parsed_configs configs;
1351 printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1354 memset(&configs, 0, sizeof(parsed_configs));
1355 memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1357 limiter.new_instance.ident_map = allocate_map();
1358 if (limiter.new_instance.ident_map == NULL) {
1359 printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1363 if (get_eligible_leaves(&limiter.new_instance)) {
1364 free_failed_config(configs, &limiter.new_instance);
1365 printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1369 if (parse_drl_config(drl_configfile.u.string, &configs)) {
1370 free_failed_config(configs, &limiter.new_instance);
1371 printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1375 if (validate_configs(configs, &limiter.new_instance)) {
1376 free_failed_config(configs, &limiter.new_instance);
1377 printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1378 pthread_rwlock_unlock(&limiter.limiter_lock);
1382 if (init_identities(configs, &limiter.new_instance)) {
1383 free_failed_config(configs, &limiter.new_instance);
1384 printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1385 pthread_rwlock_unlock(&limiter.limiter_lock);
1389 free_ident_list(configs.machines);
1390 free_ident_list(configs.sets);
1392 /* Debugging - FIXME: remove this? */
1393 print_instance(&limiter.new_instance);
1396 pthread_rwlock_wrlock(&limiter.limiter_lock);
1398 switch (limiter.policy) {
1400 if (assign_htb_hierarchy(&limiter.new_instance)) {
1401 free_instance(&limiter.new_instance);
1402 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1403 pthread_rwlock_unlock(&limiter.limiter_lock);
1407 if (create_htb_hierarchy(&limiter.new_instance)) {
1408 free_instance(&limiter.new_instance);
1409 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1411 /* Re-create old instance. */
1412 if (create_htb_hierarchy(&limiter.stable_instance)) {
1413 /* Error reinstating the old one - big problem. */
1414 printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1415 printlog(LOG_CRITICAL, "Giving up...\n");
1420 pthread_rwlock_unlock(&limiter.limiter_lock);
1426 if (setup_tc_grd(&limiter.new_instance)) {
1427 free_instance(&limiter.new_instance);
1428 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1430 /* Try to re-create old instance. */
1431 if (setup_tc_grd(&limiter.stable_instance)) {
1432 printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1433 printlog(LOG_CRITICAL, "Giving up...\n");
1441 /* Should be impossible. */
1442 printf("Pigs are flying?\n");
1446 /* Switch over new to stable instance. */
1447 free_instance(&limiter.stable_instance);
1448 memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1450 /* Success! - Unlock */
1451 pthread_rwlock_unlock(&limiter.limiter_lock);
1454 static ulog_output_t drl_op = {
1456 .output = &_output_drl,
1457 .signal = NULL, /* This appears to be broken. Using my own handler. */
1462 /* Tests the amount of time it takes to call reconfig(). */
1463 static void time_reconfig(int iterations) {
1464 struct timeval start, end;
1467 gettimeofday(&start, NULL);
1468 for (i = 0; i < iterations; ++i) {
1471 gettimeofday(&end, NULL);
1473 printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1474 iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1477 // Seems to take about 85ms / iteration
1480 static int stop_enforcement(drl_instance_t *instance) {
1484 for (i = 0; i < instance->machine_count; ++i) {
1485 sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1486 instance->machines[i]->htb_parent,
1487 instance->machines[i]->htb_node);
1489 if (execute_cmd(cmd)) {
1494 for (i = 0; i < instance->set_count; ++i) {
1495 sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1496 instance->sets[i]->htb_parent,
1497 instance->sets[i]->htb_node);
1499 if (execute_cmd(cmd)) {
1507 static void *signal_thread_func(void *args) {
1513 sigaddset(&sigs, SIGHUP);
1514 sigaddset(&sigs, SIGUSR1);
1515 pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1519 sigaddset(&sigs, SIGHUP);
1520 sigaddset(&sigs, SIGUSR1);
1522 err = sigwait(&sigs, &sig);
1525 printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1531 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1533 //time_reconfig(1000); /* instrumentation */
1537 pthread_rwlock_wrlock(&limiter.limiter_lock);
1538 if (do_enforcement) {
1540 stop_enforcement(&limiter.stable_instance);
1541 printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1544 printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1546 pthread_rwlock_unlock(&limiter.limiter_lock);
1549 /* Intentionally blank. */
1556 /* register output plugin with ulogd */
1557 static void _drl_reg_op(void)
1559 ulog_output_t *op = &drl_op;
1560 sigset_t signal_mask;
1562 sigemptyset(&signal_mask);
1563 sigaddset(&signal_mask, SIGHUP);
1564 sigaddset(&signal_mask, SIGUSR1);
1565 pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1567 if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1568 printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1569 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1575 printlog(LOG_CRITICAL, "Init failed. :(\n");
1576 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1581 register_output(op);
1583 /* start up the thread that will periodically estimate the
1584 * local rate and set the local limits
1587 if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1588 printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1589 fprintf(stderr, "An error has occured starting ulogd_DRL. Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1596 /* have the opts parsed */
1597 config_parse_file("DRL", config_entries);
1600 ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1604 /* Seed the hash function */
1605 salt = getpid() ^ time(NULL);