Turned enforcement on by default at startup. Added a ulogd.conf config option to...
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t enforce_on = {
128     .next = NULL,
129     .key = "enforce_on",
130     .type = CONFIG_TYPE_INT,
131     .options = CONFIG_OPT_NONE,
132     .u = { .value = 1 },
133 };
134
135 static config_entry_t partition = {
136     .next = &enforce_on,
137     .key = "partition_set",
138     .type = CONFIG_TYPE_INT,
139     .options = CONFIG_OPT_NONE,
140     .u = { .value = 0xfffffff },
141 };
142
143 static config_entry_t netem_slice = {
144     .next = &partition,
145     .key = "netem_slice",
146     .type = CONFIG_TYPE_STRING,
147     .options = CONFIG_OPT_NONE,
148     .u = { .string = "ALL" },
149 };
150
151 static config_entry_t netem_loss = {
152     .next = &netem_slice,
153     .key = "netem_loss",
154     .type = CONFIG_TYPE_INT,
155     .options = CONFIG_OPT_NONE,
156     .u = { .value = 0 },
157 };
158
159 static config_entry_t netem_delay = {
160     .next = &netem_loss,
161     .key = "netem_delay",
162     .type = CONFIG_TYPE_INT,
163     .options = CONFIG_OPT_NONE,
164     .u = { .value = 0 },
165 };
166
167 static config_entry_t drl_configfile = {
168     .next = &netem_delay,
169     .key = "drl_configfile",
170     .type = CONFIG_TYPE_STRING,
171     .options = CONFIG_OPT_MANDATORY,
172     .u = { .string = "drl.xml" },
173 };
174
175 /** The administrative bandwidth limit (mbps) for the local node.  The node
176  * will not set a limit higher than this, even when distributed capacity is
177  * available.  Set to 0 for no limit. */
178 static config_entry_t nodelimit = {
179     .next = &drl_configfile,
180     .key = "nodelimit",
181     .type = CONFIG_TYPE_INT,
182     .options = CONFIG_OPT_MANDATORY,
183     .u = { .value = 0 },
184 };
185
186 /** Determines the verbosity of logging. */
187 static config_entry_t drl_loglevel = {
188     .next = &nodelimit,
189     .key = "drl_loglevel",
190     .type = CONFIG_TYPE_INT,
191     .options = CONFIG_OPT_MANDATORY,
192     .u = { .value = LOG_WARN },
193 };
194
195 /** The path of the logfile. */
196 static config_entry_t drl_logfile = {
197     .next = &drl_loglevel,
198     .key = "drl_logfile",
199     .type = CONFIG_TYPE_STRING,
200     .options = CONFIG_OPT_MANDATORY,
201     .u = { .string = "drl_logfile.log" },
202 };
203
204 /** The choice of DRL protocol. */
205 static config_entry_t policy = {
206     .next = &drl_logfile,
207     .key = "policy",
208     .type = CONFIG_TYPE_STRING,
209     .options = CONFIG_OPT_MANDATORY,
210     .u = { .string = "GRD" },
211 };
212
213 /** The estimate interval, in milliseconds. */
214 static config_entry_t estintms = {
215     .next = &policy,
216     .key = "estintms",
217     .type = CONFIG_TYPE_INT,
218     .options = CONFIG_OPT_MANDATORY,
219     .u = { .value = 100 },
220 };
221
222 #define config_entries (&estintms)
223
224 /*
225  * Debug functionality
226  */
227
228 #ifdef DMALLOC
229 #include <dmalloc.h>
230 #endif
231
232 #define NIPQUAD(addr) \
233     ((unsigned char *)&addr)[0], \
234     ((unsigned char *)&addr)[1], \
235     ((unsigned char *)&addr)[2], \
236     ((unsigned char *)&addr)[3]
237
238 #define IPQUAD(addr) \
239     ((unsigned char *)&addr)[3], \
240     ((unsigned char *)&addr)[2], \
241     ((unsigned char *)&addr)[1], \
242     ((unsigned char *)&addr)[0]
243
244
245
246 /* Salt for the hash functions */
247 static int salt;
248
249 /*
250  * Hash slice name lookups on context ID.
251  */
252
253 /* Special context IDs */
254 #define UNKNOWN_XID -1
255 #define ROOT_XID 0
256
257 enum {
258     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
259     ICMP_ECHOREPLY_XID,
260     ICMP_UNREACH_XID,
261 };
262
263
264 /* globals */
265 pthread_t estimate_thread;
266 pthread_t signal_thread;
267 pthread_t comm_thread;
268 uint32_t local_ip = 0;
269 limiter_t limiter;
270 extern FILE *logfile;
271 extern uint8_t system_loglevel;
272 extern uint8_t do_enforcement;
273
274 /* From peer_comm.c - used to simulate partition. */
275 extern int do_partition;
276 extern int partition_set;
277
278 /* functions */
279
280 static inline uint32_t
281 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
282 {
283     unsigned char mybytes[FLOWKEYSIZE];
284     mybytes[0] = protocol;
285     *(uint32_t*)(&(mybytes[1])) = src_ip;
286     *(uint32_t*)(&(mybytes[5])) = dst_ip;
287     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
288     return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
289 }
290
291 uint32_t sampled_hasher(const key_flow *key) {
292     /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
293     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
294 }
295
296 uint32_t standard_hasher(const key_flow *key) {
297     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
298 }
299
300 uint32_t multiple_hasher(const key_flow *key) {
301     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
302 }
303
304 struct intr_id {
305     char* name;
306     ulog_iret_t *res;
307 };
308
309 /* Interesting keys */
310 enum {
311     OOB_TIME_SEC = 0,
312     OOB_MARK,
313     IP_SADDR,
314     IP_DADDR,
315     IP_TOTLEN,
316     IP_PROTOCOL,
317     TCP_SPORT,
318     TCP_DPORT,
319     TCP_ACK,
320     TCP_FIN,
321     TCP_SYN,
322     TCP_RST,
323     UDP_SPORT,
324     UDP_DPORT,
325     ICMP_TYPE,
326     ICMP_CODE,
327     GRE_FLAG_KEY,
328     GRE_VERSION,
329     GRE_KEY,
330     PPTP_CALLID,
331 };
332
333 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
334 static struct intr_id intr_ids[] = {
335     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
336     [OOB_MARK] = { "oob.mark", 0 },
337     [IP_SADDR] = { "ip.saddr", 0 },
338     [IP_DADDR] = { "ip.daddr", 0 },
339     [IP_TOTLEN] = { "ip.totlen", 0 },
340     [IP_PROTOCOL] = { "ip.protocol", 0 },
341     [TCP_SPORT] = { "tcp.sport", 0 },
342     [TCP_DPORT] { "tcp.dport", 0 },
343     [TCP_ACK] = { "tcp.ack", 0 },
344     [TCP_FIN] = { "tcp.fin", 0 },
345     [TCP_SYN] = { "tcp.syn", 0 },
346     [TCP_RST] = { "tcp.rst", 0 },
347     [UDP_SPORT] = { "udp.sport", 0 },
348     [UDP_DPORT] = { "udp.dport", 0 },
349     [ICMP_TYPE] = { "icmp.type", 0 },
350     [ICMP_CODE] = { "icmp.code", 0 },
351     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
352     [GRE_VERSION] = { "gre.version", 0 },
353     [GRE_KEY] = { "gre.key", 0 },
354     [PPTP_CALLID] = { "pptp.callid", 0 },
355 };
356
357 #define GET_VALUE(x) intr_ids[x].res->value
358
359 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
360
361 static int _output_drl(ulog_iret_t *res)
362 {
363     int xid;
364     uint32_t src_ip, dst_ip;
365     uint16_t src_port, dst_port;
366     uint8_t protocol;
367
368     key_flow key;
369     identity_t *ident;
370     leaf_t *leaf;
371
372     protocol = GET_VALUE(IP_PROTOCOL).ui8;
373     src_ip = GET_VALUE(IP_SADDR).ui32;
374     dst_ip = GET_VALUE(IP_DADDR).ui32;
375     xid = GET_VALUE(OOB_MARK).ui32;
376
377     switch (protocol) {
378             
379         case IPPROTO_TCP:
380             src_port = GET_VALUE(TCP_SPORT).ui16;
381             dst_port = GET_VALUE(TCP_DPORT).ui16;
382             break;
383
384         case IPPROTO_UDP:
385             /* netflow had an issue with many udp flows and set
386              * src_port=0 to handle it.  We don't. 
387              */
388             src_port = GET_VALUE(UDP_SPORT).ui16;
389
390             /*
391              * traceroutes create a large number of flows in the db
392              * this is a quick hack to catch the most common form
393              * of traceroute (basically we're mapping any UDP packet
394              * in the 33435-33524 range to the "trace" port, 33524 is
395              * 3 packets * nhops (30).
396              */
397             dst_port = GET_VALUE(UDP_DPORT).ui16;
398             if (dst_port >= 33435 && dst_port <= 33524)
399                 dst_port = 33435;
400             break;
401
402         case IPPROTO_ICMP:
403             src_port = GET_VALUE(ICMP_TYPE).ui8;
404             dst_port = GET_VALUE(ICMP_CODE).ui8;
405
406             /*
407              * We special case some of the ICMP traffic that the kernel
408              * always generates. Since this is attributed to root, it 
409              * creates significant "noise" in the output. We want to be
410              * able to quickly see that root is generating traffic.
411              */
412             if (xid == ROOT_XID) {
413                 if (src_port == ICMP_ECHOREPLY)
414                     xid = ICMP_ECHOREPLY_XID;
415                 else if (src_port == ICMP_UNREACH)
416                     xid = ICMP_UNREACH_XID;
417             }
418             break;
419
420         case IPPROTO_GRE:
421             if (GET_VALUE(GRE_FLAG_KEY).b) {
422                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
423                     /* Get PPTP call ID */
424                     src_port = GET_VALUE(PPTP_CALLID).ui16;
425                 } else {
426                     /* XXX Truncate GRE keys to 16 bits */
427                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
428                 }
429             } else {
430                 /* No key available */
431                 src_port = 0;
432             }
433             dst_port = 0;
434             break;
435
436         default:
437             /* This is the default key for packets from unsupported protocols */
438             src_port = 0;
439             dst_port = 0;
440             break;
441     }
442
443     key.protocol = protocol;
444     key.source_ip = src_ip;
445     key.dest_ip = dst_ip;
446     key.source_port = src_port;
447     key.dest_port = dst_port;
448     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
449     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
450
451     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
452
453     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
454
455     /* Even if the packet doesn't match any specific xid, it should still
456      * count in the machine-type tables.  This catches root (xid == 0) and
457      * unclassified (xid = fff) packets, which don't have map entries. */
458     if (leaf == NULL) {
459         ident = limiter.stable_instance.last_machine;
460     } else {
461         ident = leaf->parent;
462     }
463
464     while (ident) {
465         pthread_mutex_lock(&ident->table_mutex);
466
467         /* Update the identity's table. */
468         ident->table_sample_function(ident->table, &key);
469
470 #ifdef SHADOW_ACCTING
471
472         /* Update the shadow perfect copy of the accounting table. */
473         standard_table_sample((standard_flow_table) ident->shadow_table, &key);
474
475 #endif
476
477         pthread_mutex_unlock(&ident->table_mutex);
478
479         ident = ident->parent;
480     }
481
482     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
483
484     return 0;
485 }
486
487 /* get all key id's for the keys we are intrested in */
488 static int get_ids(void)
489 {
490     int i;
491     struct intr_id *cur_id;
492
493     for (i = 0; i < INTR_IDS; i++) {
494         cur_id = &intr_ids[i];
495         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
496         if (!cur_id->res) {
497             ulogd_log(ULOGD_ERROR, 
498                     "Cannot resolve keyhash id for %s\n", 
499                     cur_id->name);
500             return 1;
501         }
502     }
503     return 0;
504 }
505
506 static void free_identity(identity_t *ident) {
507     if (ident) {
508         free_comm(&ident->comm);
509
510         if (ident->table) {
511             ident->table_destroy_function(ident->table);
512         }
513
514         if (ident->loop_action) {
515             ident->loop_action->valid = 0;
516         }
517
518         if (ident->comm_action) {
519             ident->comm_action->valid = 0;
520         }
521
522         pthread_mutex_destroy(&ident->table_mutex);
523
524         free(ident);
525     }
526 }
527
528 static void free_identity_map(map_handle map) {
529     identity_t *tofree = NULL;
530
531     map_reset_iterate(map);
532     while ((tofree = (identity_t *) map_next(map))) {
533         free_identity(tofree);
534     }
535
536     free_map(map, 0);
537 }
538
539 static void free_instance(drl_instance_t *instance) {
540     if (instance->leaves)
541         free(instance->leaves);
542     if (instance->leaf_map)
543         free_map(instance->leaf_map, 0);
544     if (instance->ident_map)
545         free_identity_map(instance->ident_map);
546     if (instance->machines)
547         free(instance->machines);
548     if (instance->sets)
549         free(instance->sets);
550
551     /* FIXME: Drain the calendar first and free all the entries. */
552     if (instance->cal) {
553         free(instance->cal);
554     }
555
556     memset(instance, 0, sizeof(drl_instance_t));
557 }
558
559 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
560     /* Free configs. */
561     if (configs.machines)
562         free_ident_list(configs.machines);
563     if (configs.sets)
564         free_ident_list(configs.sets);
565
566     /* Free instance. */
567     if (instance)
568         free_instance(instance);
569 }
570
571 static identity_t *new_identity(ident_config *config) {
572     identity_t *ident = malloc(sizeof(identity_t));
573     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
574     ident_peer *peer = config->peers;
575     int peer_slot = 0;
576
577     if (ident == NULL) {
578         return NULL;
579     }
580
581     if (comm_nodes == NULL) {
582         free(ident);
583         return NULL;
584     }
585
586     memset(ident, 0, sizeof(identity_t));
587     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
588
589     ident->id = config->id;
590     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
591     ident->fixed_ewma_weight = config->fixed_ewma_weight;
592     ident->communication_intervals = config->communication_intervals;
593     ident->mainloop_intervals = config->mainloop_intervals;
594     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
595                              (limiter.estintms/1000.0) * config->mainloop_intervals);
596     ident->parent = NULL;
597     ident->independent = config->independent;
598
599     pthread_mutex_init(&ident->table_mutex, NULL);
600     switch (config->accounting) {
601         case ACT_STANDARD:
602             ident->table =
603                 standard_table_create(standard_hasher, &ident->common);
604
605             /* Ugly function pointer casting.  Makes things sufficiently
606              * generic, though. */
607             ident->table_sample_function =
608                 (int (*)(void *, const key_flow *)) standard_table_sample;
609             ident->table_cleanup_function =
610                 (int (*)(void *)) standard_table_cleanup;
611             ident->table_update_function =
612                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
613             ident->table_destroy_function =
614                 (void (*)(void *)) standard_table_destroy;
615             break;
616
617         case ACT_MULTIPLE:
618             ident->table =
619                 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
620
621             ident->table_sample_function =
622                 (int (*)(void *, const key_flow *)) multiple_table_sample;
623             ident->table_cleanup_function =
624                 (int (*)(void *)) multiple_table_cleanup;
625             ident->table_update_function =
626                 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
627             ident->table_destroy_function =
628                 (void (*)(void *)) multiple_table_destroy;
629             break;
630
631         case ACT_SAMPLEHOLD:
632             ident->table = sampled_table_create(sampled_hasher,
633                     ident->limit * IDENT_CLEAN_INTERVAL,
634                     SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
635
636             ident->table_sample_function =
637                 (int (*)(void *, const key_flow *)) sampled_table_sample;
638             ident->table_cleanup_function =
639                 (int (*)(void *)) sampled_table_cleanup;
640             ident->table_update_function =
641                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
642             ident->table_destroy_function =
643                 (void (*)(void *)) sampled_table_destroy;
644             break;
645
646         case ACT_SIMPLE:
647             ident->table = simple_table_create(&ident->common);
648
649             ident->table_sample_function =
650                 (int (*)(void *, const key_flow *)) simple_table_sample;
651             ident->table_cleanup_function =
652                 (int (*)(void *)) simple_table_cleanup;
653             ident->table_update_function =
654                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
655             ident->table_destroy_function =
656                 (void (*)(void *)) simple_table_destroy;
657             break;
658     }
659
660 #ifdef SHADOW_ACCTING
661
662     ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
663
664     if (ident->shadow_table == NULL) {
665         ident->table_destroy_function(ident->table);
666         free(ident);
667         return NULL;
668     }
669
670 #endif
671
672     /* Make sure the table was allocated. */
673     if (ident->table == NULL) {
674         free(ident);
675         return NULL;
676     }
677
678     while (peer) {
679         comm_nodes[peer_slot].addr = peer->ip;
680         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
681         peer = peer->next;
682         peer_slot += 1;
683     }
684
685     if (new_comm(&ident->comm, config, comm_nodes)) {
686         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
687         return NULL;
688     }
689
690     ident->comm.remote_nodes = comm_nodes;
691
692     return ident;
693 }
694
695 /* Determines the validity of the parameters of one ident_config.
696  *
697  * 0 valid
698  * 1 invalid
699  */
700 static int validate_config(ident_config *config) {
701     /* Limit must be a positive integer. */
702     if (config->limit < 1) {
703         return 1;
704     }
705
706     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
707     if (config->commfabric != COMM_MESH &&
708             config->commfabric != COMM_GOSSIP) {
709         return 1;
710     }
711
712     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
713     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
714         return 1;
715     }
716
717     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
718      * ACT_SIMPLE, ACT_MULTIPLE). */
719     if (config->accounting != ACT_STANDARD &&
720             config->accounting != ACT_SAMPLEHOLD &&
721             config->accounting != ACT_SIMPLE &&
722             config->accounting != ACT_MULTIPLE) {
723         return 1;
724     }
725
726     /* Ewma weight must be greater than or equal to zero. */
727     if (config->fixed_ewma_weight < 0) {
728         return 1;
729     }
730
731     /* Note: Parsing stage requires that each ident has at least one peer. */
732     return 0;
733 }
734
735 /* 0 success
736  * non-zero failure
737  */
738 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
739
740     ident_config *mlist = configs.machines;
741     ident_config *slist = configs.sets;
742     ident_config *tmp = NULL;
743     int i = 0;
744
745     while (mlist) {
746         /* ID must be non-zero and unique. */
747         /* This is ugly and hackish, but this function will be called rarely.
748          * I'm tired of trying to be clever. */
749         if (mlist->id < 0) {
750             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
751             return EINVAL;
752         }
753         tmp = mlist->next;
754         while (tmp) {
755             if (mlist->id == tmp->id) {
756                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
757                 return EINVAL;
758             }
759             tmp = tmp->next;
760         }
761         tmp = configs.sets;
762         while (tmp) {
763             if (mlist->id == tmp->id) {
764                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
765                 return EINVAL;
766             }
767             tmp = tmp->next;
768         }
769
770         if (validate_config(mlist)) {
771             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
772             return EINVAL;
773         }
774
775         if (mlist->independent) {
776             printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n");
777             mlist->independent = 0;
778         }
779
780         mlist = mlist->next;
781     }
782
783     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
784     if (instance->sets == NULL) {
785         printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n");
786         return ENOMEM;
787     }
788
789     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
790     instance->set_count = configs.set_count;
791
792     /* For sets, make sure that the hierarchy is valid. */
793     while (slist) {
794         ident_member *members = slist->members;
795
796         /* ID must be non-zero and unique. */
797         if (slist->id < 0) {
798             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
799             return EINVAL;
800         }
801         tmp = slist->next;
802         while (tmp) {
803             if (slist->id == tmp->id) {
804                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
805                 return EINVAL;
806             }
807             tmp = tmp->next;
808         }
809
810         if (validate_config(slist)) {
811             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
812             return EINVAL;
813         }
814
815         /* Allocate an identity_t for this set-type identity. */
816         instance->sets[i] = new_identity(slist);
817
818         if (instance->sets[i] == NULL) {
819             printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n");
820             return ENOMEM;
821         }
822
823         /* Loop through children and look up each in leaf or ident map
824          * depending on the type of child.  Set the child's parent pointer
825          * to the identity we just created above, unless it is already set,
826          * in which case we have an error. */
827         while (members) {
828             identity_t *child_ident = NULL;
829             leaf_t *child_leaf = NULL;
830
831             switch (members->type) {
832                 case MEMBER_XID:
833                     child_leaf = map_search(instance->leaf_map, &members->value,
834                                             sizeof(members->value));
835                     if (child_leaf == NULL) {
836                         printlog(LOG_CRITICAL, "xid: child leaf not found.\n");
837                         return EINVAL;
838                     }
839                     if (child_leaf->parent != NULL) {
840                         /* Error - This leaf already has a parent. */
841                         printlog(LOG_CRITICAL, "xid: child already has a parent.\n");
842                         return EINVAL;
843                     }
844                     child_leaf->parent = instance->sets[i];
845                     break;
846                 case MEMBER_GUID:
847                     child_ident = map_search(instance->ident_map, &members->value,
848                                              sizeof(members->value));
849                     if (child_ident == NULL) {
850                         printlog(LOG_CRITICAL, "guid: child identity not found.\n");
851                         return EINVAL;
852                     }
853                     if (child_ident->parent != NULL) {
854                         /* Error - This identity already has a parent. */
855                         printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n");
856                         return EINVAL;
857                     }
858                     child_ident->parent = instance->sets[i];
859                     break;
860                 default:
861                     /* Error - shouldn't be possible. */
862                     break;
863             }
864             members = members->next;
865         }
866
867         map_insert(instance->ident_map, &instance->sets[i]->id,
868                    sizeof(instance->sets[i]->id), instance->sets[i]);
869
870         slist = slist->next;
871         i++;
872     }
873     return 0;
874 }
875
876 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
877     int count = 0;
878     identity_t *current_ident;
879     leaf_t *current_leaf;
880     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
881     if (leaves == NULL) {
882         return 1;
883     }
884
885     map_reset_iterate(instance->leaf_map);
886     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
887         current_ident = current_leaf->parent;
888         while (current_ident != NULL && current_ident != instance->last_machine) {
889             if (current_ident == ident) {
890                 /* Found the ident we were looking for - add the leaf. */
891                 leaves[count] = current_leaf;
892                 count += 1;
893                 break;
894             }
895             current_ident = current_ident->parent;
896         }
897     }
898
899     ident->leaves = leaves;
900     ident->leaf_count = count;
901
902     return 0;
903 }
904
905 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
906     int i, j;
907     ident_config *config = configs.machines;
908     leaf_t *leaf = NULL;
909
910     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
911
912     if (instance->cal == NULL) {
913         return ENOMEM;
914     }
915
916     for (i = 0; i < SCHEDLEN; ++i) {
917         TAILQ_INIT(instance->cal + i);
918     }
919     instance->cal_slot = 0;
920
921     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
922
923     if (instance->machines == NULL) {
924         return ENOMEM;
925     }
926
927     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
928     instance->machine_count = configs.machine_count;
929
930     /* Allocate and add the machine identities. */
931     for (i = 0; i < configs.machine_count; ++i) {
932         identity_action *loop_action;
933         identity_action *comm_action;
934         instance->machines[i] = new_identity(config);
935
936         if (instance->machines[i] == NULL) {
937             return ENOMEM;
938         }
939
940         loop_action = malloc(sizeof(identity_action));
941         comm_action = malloc(sizeof(identity_action));
942
943         if (loop_action == NULL || comm_action == NULL) {
944             return ENOMEM;
945         }
946
947         /* The first has no parent - it is the root.  All others have the
948          * previous ident as their parent. */
949         if (i == 0) {
950             instance->machines[i]->parent = NULL;
951         } else {
952             instance->machines[i]->parent = instance->machines[i - 1];
953         }
954
955         instance->last_machine = instance->machines[i];
956
957         /* Add the ident to the guid->ident map. */
958         map_insert(instance->ident_map, &instance->machines[i]->id,
959                    sizeof(instance->machines[i]->id), instance->machines[i]);
960
961         config = config->next;
962
963         memset(loop_action, 0, sizeof(identity_action));
964         memset(comm_action, 0, sizeof(identity_action));
965         loop_action->ident = instance->machines[i];
966         loop_action->action = ACTION_MAINLOOP;
967         loop_action->valid = 1;
968         comm_action->ident = instance->machines[i];
969         comm_action->action = ACTION_COMMUNICATE;
970         comm_action->valid = 1;
971
972         instance->machines[i]->loop_action = loop_action;
973         instance->machines[i]->comm_action = comm_action;
974
975         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
976                           loop_action, calendar);
977
978         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
979                           comm_action, calendar);
980
981         /* Setup the array of pointers to leaves.  This is easy for machines
982          * because a machine node applies to every leaf. */
983         instance->machines[i]->leaves =
984             malloc(instance->leaf_count * sizeof(leaf_t *));
985         if (instance->machines[i]->leaves == NULL) {
986             return ENOMEM;
987         }
988         instance->machines[i]->leaf_count = instance->leaf_count;
989         for (j = 0; j < instance->leaf_count; ++j) {
990             instance->machines[i]->leaves[j] = &instance->leaves[j];
991         }
992     }
993
994     /* Connect the set subtree to the machines. Any set or leaf without a
995      * parent will take the last machine as its parent. */
996
997     /* Leaves... */
998     map_reset_iterate(instance->leaf_map);
999     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1000         if (leaf->parent == NULL) {
1001             leaf->parent = instance->last_machine;
1002         }
1003     }
1004
1005     /* Sets... */
1006     for (i = 0; i < instance->set_count; ++i) {
1007         identity_action *loop_action;
1008         identity_action *comm_action;
1009
1010         if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) {
1011             instance->sets[i]->parent = instance->last_machine;
1012         }
1013
1014         loop_action = malloc(sizeof(identity_action));
1015         comm_action = malloc(sizeof(identity_action));
1016
1017         if (loop_action == NULL || comm_action == NULL) {
1018             return ENOMEM;
1019         }
1020
1021         memset(loop_action, 0, sizeof(identity_action));
1022         memset(comm_action, 0, sizeof(identity_action));
1023         loop_action->ident = instance->sets[i];
1024         loop_action->action = ACTION_MAINLOOP;
1025         loop_action->valid = 1;
1026         comm_action->ident = instance->sets[i];
1027         comm_action->action = ACTION_COMMUNICATE;
1028         comm_action->valid = 1;
1029
1030         instance->sets[i]->loop_action = loop_action;
1031         instance->sets[i]->comm_action = comm_action;
1032
1033         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1034                           loop_action, calendar);
1035
1036         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1037                           comm_action, calendar);
1038
1039         /* Setup the array of pointers to leaves.  This is harder for sets,
1040          * but this doesn't need to be super-efficient because it happens
1041          * rarely and it isn't on the critical path for reconfig(). */
1042         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1043             return ENOMEM;
1044         }
1045     }
1046
1047     /* Success. */
1048     return 0;
1049 }
1050
1051 static void print_instance(drl_instance_t *instance) {
1052     leaf_t *leaf = NULL;
1053     identity_t *ident = NULL;
1054
1055     if (system_loglevel == LOG_DEBUG) {
1056         map_reset_iterate(instance->leaf_map);
1057         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1058             printf("%x:", leaf->xid);
1059             ident = leaf->parent;
1060             while (ident) {
1061                 printf("%d:",ident->id);
1062                 ident = ident->parent;
1063             }
1064             printf("Leaf's parent pointer is %p\n", leaf->parent);
1065         }
1066
1067         printf("instance->last_machine is %p\n", instance->last_machine);
1068     }
1069 }
1070
1071 static int assign_htb_hierarchy(drl_instance_t *instance) {
1072     int i, j;
1073     int next_node = 0x100;
1074
1075     /* Chain machine nodes under 1:10. */
1076     for (i = 0; i < instance->machine_count; ++i) {
1077         if (instance->machines[i]->parent == NULL) {
1078             /* Top node. */
1079             instance->machines[i]->htb_parent = 0x10;
1080         } else {
1081             /* Pointerific! */
1082             instance->machines[i]->htb_parent =
1083                 instance->machines[i]->parent->htb_node;
1084         }
1085
1086         instance->machines[i]->htb_node = next_node;
1087         next_node += 1;
1088     }
1089
1090     next_node += 0x10;
1091
1092     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1093      * already there. */
1094     for (j = (instance->set_count - 1); j >= 0; --j) {
1095         if (instance->sets[j]->parent == NULL) {
1096             /* Independent node - goes under 0x10 away from machine nodes. */
1097             instance->sets[j]->htb_parent = 0x10;
1098         } else {
1099             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1100         }
1101         instance->sets[j]->htb_node = next_node;
1102
1103         next_node += 1;
1104     }
1105
1106     return 0;
1107 }
1108
1109 /* Added this so that I could comment one line and kill off all of the
1110  * command execution. */
1111 static inline int execute_cmd(const char *cmd) {
1112     return system(cmd);
1113 }
1114
1115 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1116                                const uint32_t classid_major, const uint32_t classid_minor,
1117                                const uint64_t rate, const uint64_t ceil) {
1118     char cmd[300];
1119
1120     sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1121             iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1122     printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1123
1124     return execute_cmd(cmd);
1125 }
1126
1127 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1128                                 const uint32_t parent_minor, const uint32_t handle,
1129                                 const int loss, const int delay) {
1130     char cmd[300];
1131
1132     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1133             parent_minor, handle);
1134     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1135     if (execute_cmd(cmd))
1136         printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1137
1138     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1139             iface, parent_major, parent_minor, handle, loss, delay);
1140     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1141     return execute_cmd(cmd);
1142 }
1143
1144 static int create_htb_hierarchy(drl_instance_t *instance) {
1145     char cmd[300];
1146     int i, j, k;
1147     uint64_t gigabit = 1024 * 1024 * 1024;
1148
1149     /* Nuke the hierarchy. */
1150     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1151     execute_cmd(cmd);
1152     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1153
1154     /* Re-initialize the basics. */
1155     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1156     if (execute_cmd(cmd)) {
1157         return 1;
1158     }
1159     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1160
1161     if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1162         return 1;
1163
1164     /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1165     if (limiter.nodelimit) {
1166         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1167             return 1;
1168     } else {
1169         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1170             return 1;
1171     }
1172
1173     /* Add machines. */
1174     for (i = 0; i < instance->machine_count; ++i) {
1175         if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1176                          instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1177             return 1;
1178         }
1179     }
1180
1181 #define LIMITEXEMPT
1182
1183     /* Add back 1:20. */
1184 #ifdef LIMITEXEMPT
1185     if (instance->last_machine == NULL) {
1186         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1187     } else {
1188         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1189             instance->last_machine->htb_node);
1190     }
1191 #else
1192     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1193 #endif
1194
1195     if (execute_cmd(cmd)) {
1196         return 1;
1197     }
1198     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1199
1200     /* Add sets. */
1201     for (j = (instance->set_count - 1); j >= 0; --j) {
1202         if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1203                          instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1204             return 1;
1205         }
1206     }
1207
1208     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1209     for (k = 0; k < instance->leaf_count; ++k) {
1210         if (instance->leaves[k].parent == NULL) {
1211             if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1212                 return 1;
1213         } else {
1214             if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1215                 return 1;
1216         }
1217
1218         /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1219         if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1220             return 1;
1221     }
1222
1223     /* Add 1:1000 and 1:2000 */
1224     if (instance->last_machine == NULL) {
1225         if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1226             return 1;
1227     } else {
1228         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1229             return 1;
1230     }
1231
1232     if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1233         return 1;
1234
1235     /* Add 1:1fff and 1:2fff */
1236     if (instance->last_machine == NULL) {
1237         if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1238             return 1;
1239     } else {
1240         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1241             return 1;
1242     }
1243
1244     if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1245         return 1;
1246
1247     /* Artifical delay or loss for experimentation. */
1248     if (netem_delay.u.value || netem_loss.u.value) {
1249         if (!strcmp(netem_slice.u.string, "ALL")) {
1250             /* By default, netem applies to all leaves. */
1251             if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1252                 return 1;
1253             if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1254                 return 1;
1255
1256             for (k = 0; k < instance->leaf_count; ++k) {
1257                 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1258                             (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1259                     return 1;
1260                 }
1261
1262                 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1263             }
1264         } else {
1265             /* netem_slice is not the default ALL value.  Only apply netem
1266              * to the slice that is set in netem_slice.u.string. */
1267             uint32_t slice_xid;
1268
1269             sscanf(netem_slice.u.string, "%x", &slice_xid);
1270
1271             if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1272                 return 1;
1273         }
1274     }
1275
1276     return 0;
1277 }
1278
1279 static int setup_tc_grd(drl_instance_t *instance) {
1280     int i, j;
1281     char cmd[300];
1282
1283     for (i = 0; i < instance->leaf_count; ++i) {
1284         /* Delete the old pfifo qdisc that might have been there before. */
1285         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1286                 instance->leaves[i].xid, instance->leaves[i].xid);
1287
1288         if (execute_cmd(cmd)) {
1289             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1290         }
1291
1292         /* Add the netem qdisc. */
1293         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1294                 instance->leaves[i].xid, instance->leaves[i].xid);
1295
1296         if (execute_cmd(cmd)) {
1297             printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1298             return 1;
1299         }
1300     }
1301
1302     /* Do the same for 1000 and 1fff. */
1303     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1304
1305     if (execute_cmd(cmd)) {
1306         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1307     }
1308
1309     /* Add the netem qdisc. */
1310     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1311
1312     if (execute_cmd(cmd)) {
1313         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1314         return 1;
1315     }
1316
1317     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1318
1319     if (execute_cmd(cmd)) {
1320         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1321     }
1322
1323     /* Add the netem qdisc. */
1324     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1325
1326     if (execute_cmd(cmd)) {
1327         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1328         return 1;
1329     }
1330
1331     /* Artifical delay or loss for experimentation. */
1332     if (netem_delay.u.value || netem_loss.u.value) {
1333         if (!strcmp(netem_slice.u.string, "ALL")) {
1334             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1335             if (execute_cmd(cmd)) {
1336                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1337                 return 1;
1338             }
1339
1340             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1341             if (execute_cmd(cmd)) {
1342                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1343                 return 1;
1344             }
1345
1346             for (j = 0; j < instance->leaf_count; ++j) {
1347                 leaf_t *current = &instance->leaves[j];
1348
1349                 current->delay = netem_delay.u.value;
1350
1351                 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
1352
1353                 if (execute_cmd(cmd)) {
1354                     printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1355                     return 1;
1356                 }
1357             }
1358         } else {
1359             uint32_t slice_xid;
1360             leaf_t *leaf = NULL;
1361
1362             sscanf(netem_slice.u.string, "%x", &slice_xid);
1363
1364             leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
1365
1366             if (leaf == NULL) {
1367                 /* Leaf not found - invalid selection. */
1368                 printf("Your experimental setup is incorrect...\n");
1369                 return 1;
1370             }
1371
1372             leaf->delay = netem_delay.u.value;
1373
1374             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
1375
1376             if (execute_cmd(cmd)) {
1377                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1378                 return 1;
1379             }
1380         }
1381     }
1382
1383     return 0;
1384 }
1385
1386 /* init_drl 
1387  * 
1388  * Initialize this limiter with options 
1389  * Open UDP socket for peer communication
1390  */
1391 static int init_drl(void) {
1392     parsed_configs configs;
1393     struct sockaddr_in server_address;
1394     
1395     memset(&limiter, 0, sizeof(limiter_t));
1396
1397     /* Setup logging. */
1398     system_loglevel = (uint8_t) drl_loglevel.u.value;
1399     logfile = fopen(drl_logfile.u.string, "w");
1400
1401     if (logfile == NULL) {
1402         printf("Couldn't open logfile - ");
1403         perror("fopen()");
1404         exit(EXIT_FAILURE);
1405     }
1406
1407     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1408
1409     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1410
1411     init_hashing();  /* for all hash maps */
1412
1413     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1414
1415     /* determine our local IP by iterating through interfaces */
1416     if ((limiter.ip = get_local_ip())==0) {
1417         printlog(LOG_CRITICAL,
1418                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1419         return (false);
1420     }
1421     limiter.localaddr = inet_addr(limiter.ip);
1422     limiter.port = htons(LIMITER_LISTEN_PORT);
1423     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1424     if (limiter.udp_socket < 0) {
1425         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1426         return false;
1427     }
1428
1429     memset(&server_address, 0, sizeof(server_address));
1430     server_address.sin_family = AF_INET;
1431     server_address.sin_addr.s_addr = limiter.localaddr;
1432     server_address.sin_port = limiter.port;
1433
1434     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1435         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1436         return false;
1437     }
1438
1439     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1440     if (strcasecmp(policy.u.string,"GRD") == 0) {
1441         limiter.policy = POLICY_GRD;
1442     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1443         limiter.policy = POLICY_FPS;
1444     } else {
1445         printlog(LOG_CRITICAL,
1446                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1447         return (false);
1448     }
1449
1450     limiter.estintms = estintms.u.value;
1451     if (limiter.estintms > 1000) {
1452         printlog(LOG_CRITICAL,
1453                  "DRL: sorry estimate intervals must be less than 1 second.");
1454         printlog(LOG_CRITICAL,
1455                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1456         limiter.estintms = 1000;
1457     }
1458     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1459     
1460     /* Acquire the big limiter lock for writing.  Prevents pretty much
1461      * anything else from happening while the hierarchy is being changed. */
1462     pthread_rwlock_wrlock(&limiter.limiter_lock);
1463
1464     limiter.stable_instance.ident_map = allocate_map();
1465     if (limiter.stable_instance.ident_map == NULL) {
1466         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1467         return false;
1468     }
1469
1470     if (get_eligible_leaves(&limiter.stable_instance)) {
1471         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1472         return false;
1473     }
1474
1475     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1476         /* Parse error occured. Return non-zero to notify init_drl(). */
1477         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1478             drl_configfile.u.string);
1479         return false;
1480     }
1481
1482     /* Validate identity hierarchy! */
1483     if (validate_configs(configs, &limiter.stable_instance)) {
1484         /* Clean up everything. */
1485         free_failed_config(configs, &limiter.stable_instance);
1486         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1487             drl_configfile.u.string);
1488         return false;
1489     }
1490
1491     if (init_identities(configs, &limiter.stable_instance)) {
1492         free_failed_config(configs, &limiter.stable_instance);
1493         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1494         return false;
1495     }
1496
1497     /* At this point, we should be done with configs. */
1498     free_ident_list(configs.machines);
1499     free_ident_list(configs.sets);
1500
1501     print_instance(&limiter.stable_instance);
1502
1503     switch (limiter.policy) {
1504         case POLICY_FPS:
1505             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1506                 free_instance(&limiter.stable_instance);
1507                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1508                 return false;
1509             }
1510
1511             if (create_htb_hierarchy(&limiter.stable_instance)) {
1512                 free_instance(&limiter.stable_instance);
1513                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1514                 return false;
1515             }
1516         break;
1517
1518         case POLICY_GRD:
1519             if (setup_tc_grd(&limiter.stable_instance)) {
1520                 free_instance(&limiter.stable_instance);
1521                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1522                 return false;
1523             }
1524         break;
1525
1526         default:
1527         return false;
1528     }
1529
1530     partition_set = partition.u.value;
1531
1532     pthread_rwlock_unlock(&limiter.limiter_lock);
1533
1534     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1535         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1536         return false;
1537     }
1538
1539     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1540
1541     return true;
1542 }
1543
1544 static void reconfig() {
1545     parsed_configs configs;
1546
1547     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1548     flushlog();
1549
1550     memset(&configs, 0, sizeof(parsed_configs));
1551     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1552
1553     limiter.new_instance.ident_map = allocate_map();
1554     if (limiter.new_instance.ident_map == NULL) {
1555         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1556         return;
1557     }
1558
1559     if (get_eligible_leaves(&limiter.new_instance)) {
1560         free_failed_config(configs, &limiter.new_instance);
1561         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1562         return;
1563     }
1564
1565     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1566         free_failed_config(configs, &limiter.new_instance);
1567         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1568         return;
1569     }
1570
1571     if (validate_configs(configs, &limiter.new_instance)) {
1572         free_failed_config(configs, &limiter.new_instance);
1573         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1574         pthread_rwlock_unlock(&limiter.limiter_lock);
1575         return;
1576     }
1577
1578     if (init_identities(configs, &limiter.new_instance)) {
1579         free_failed_config(configs, &limiter.new_instance);
1580         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1581         pthread_rwlock_unlock(&limiter.limiter_lock);
1582         return;
1583     }
1584
1585     free_ident_list(configs.machines);
1586     free_ident_list(configs.sets);
1587
1588     print_instance(&limiter.new_instance);
1589     
1590     /* Lock */
1591     pthread_rwlock_wrlock(&limiter.limiter_lock);
1592
1593     switch (limiter.policy) {
1594         case POLICY_FPS:
1595             if (assign_htb_hierarchy(&limiter.new_instance)) {
1596                 free_instance(&limiter.new_instance);
1597                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1598                 pthread_rwlock_unlock(&limiter.limiter_lock);
1599                 return;
1600             }
1601
1602             if (create_htb_hierarchy(&limiter.new_instance)) {
1603                 free_instance(&limiter.new_instance);
1604                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1605
1606                 /* Re-create old instance. */
1607                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1608                     /* Error reinstating the old one - big problem. */
1609                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1610                     printlog(LOG_CRITICAL, "Giving up...\n");
1611                     flushlog();
1612                     exit(EXIT_FAILURE);
1613                 }
1614
1615                 pthread_rwlock_unlock(&limiter.limiter_lock);
1616                 return;
1617             }
1618         break;
1619
1620         case POLICY_GRD:
1621             if (setup_tc_grd(&limiter.new_instance)) {
1622                 free_instance(&limiter.new_instance);
1623                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1624
1625                 /* Try to re-create old instance. */
1626                 if (setup_tc_grd(&limiter.stable_instance)) {
1627                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1628                     printlog(LOG_CRITICAL, "Giving up...\n");
1629                     flushlog();
1630                     exit(EXIT_FAILURE);
1631                 }
1632             }
1633         break;
1634
1635         default:
1636             /* Should be impossible. */
1637             printf("Pigs are flying?\n");
1638             exit(EXIT_FAILURE);
1639     }
1640
1641     /* Switch over new to stable instance. */
1642     free_instance(&limiter.stable_instance);
1643     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1644
1645     /* Success! - Unlock */
1646     pthread_rwlock_unlock(&limiter.limiter_lock);
1647 }
1648
1649 static ulog_output_t drl_op = {
1650     .name = "drl",
1651     .output = &_output_drl,
1652     .signal = NULL, /* This appears to be broken. Using my own handler. */
1653     .init = NULL,
1654     .fini = NULL,
1655 };
1656
1657 /* Tests the amount of time it takes to call reconfig(). */
1658 static void time_reconfig(int iterations) {
1659     struct timeval start, end;
1660     int i;
1661
1662     gettimeofday(&start, NULL);
1663     for (i = 0; i < iterations; ++i) {
1664         reconfig();
1665     }
1666     gettimeofday(&end, NULL);
1667
1668     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1669            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1670     exit(0);
1671
1672     // Seems to take about 85ms / iteration
1673 }
1674
1675 static int stop_enforcement(drl_instance_t *instance) {
1676     char cmd[300];
1677     int i;
1678
1679     for (i = 0; i < instance->machine_count; ++i) {
1680         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1681                 instance->machines[i]->htb_parent,
1682                 instance->machines[i]->htb_node);
1683
1684         if (execute_cmd(cmd)) {
1685             return 1;
1686         }
1687     }
1688
1689     for (i = 0; i < instance->set_count; ++i) {
1690         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1691                 instance->sets[i]->htb_parent,
1692                 instance->sets[i]->htb_node);
1693
1694         if (execute_cmd(cmd)) {
1695             return 1;
1696         }
1697     }
1698
1699     return 0;
1700 }
1701
1702 static void *signal_thread_func(void *args) {
1703     int sig;
1704     int err;
1705     sigset_t sigs;
1706
1707     sigemptyset(&sigs);
1708     sigaddset(&sigs, SIGHUP);
1709     sigaddset(&sigs, SIGUSR1);
1710     sigaddset(&sigs, SIGUSR2);
1711     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1712
1713     while (1) {
1714         sigemptyset(&sigs);
1715         sigaddset(&sigs, SIGHUP);
1716         sigaddset(&sigs, SIGUSR1);
1717         sigaddset(&sigs, SIGUSR2);
1718
1719         err = sigwait(&sigs, &sig);
1720
1721         if (err) {
1722             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1723             flushlog();
1724         }
1725
1726         switch (sig) {
1727             case SIGHUP:
1728                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1729                 reconfig();
1730                 //time_reconfig(1000); /* instrumentation */
1731                 flushlog();
1732                 break;
1733             case SIGUSR1:
1734                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1735                 if (do_enforcement) {
1736                     do_enforcement = 0;
1737                     stop_enforcement(&limiter.stable_instance);
1738                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1739                 } else {
1740                     do_enforcement = 1;
1741                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1742                 }
1743                 pthread_rwlock_unlock(&limiter.limiter_lock);
1744                 break;
1745             case SIGUSR2:
1746                 do_partition = !do_partition;
1747                 break;
1748             default:
1749                 /* Intentionally blank. */
1750                 break;
1751         }
1752     }
1753
1754 }
1755
1756 /* register output plugin with ulogd */
1757 static void _drl_reg_op(void)
1758 {
1759     ulog_output_t *op = &drl_op;
1760     sigset_t signal_mask;
1761
1762     sigemptyset(&signal_mask);
1763     sigaddset(&signal_mask, SIGHUP);
1764     sigaddset(&signal_mask, SIGUSR1);
1765     sigaddset(&signal_mask, SIGUSR2);
1766     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1767
1768     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1769         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1770         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1771         flushlog();
1772         exit(EXIT_FAILURE);
1773     }
1774
1775     if (!init_drl()) {
1776         printlog(LOG_CRITICAL, "Init failed. :(\n");
1777         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1778         flushlog();
1779         exit(EXIT_FAILURE);
1780     }
1781
1782     register_output(op);
1783
1784     /* start up the thread that will periodically estimate the
1785      * local rate and set the local limits
1786      * see estimate.c
1787      */
1788     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1789         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1790         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1791         exit(EXIT_FAILURE);
1792     }
1793
1794     if (enforce_on.u.value) {
1795         pthread_rwlock_wrlock(&limiter.limiter_lock);
1796         do_enforcement = 1;
1797         printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1798         pthread_rwlock_unlock(&limiter.limiter_lock);
1799     }
1800 }
1801
1802 void _init(void)
1803 {
1804     /* have the opts parsed */
1805     config_parse_file("DRL", config_entries);
1806
1807     if (get_ids()) {
1808         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1809         exit(2);
1810     }
1811
1812     /* Seed the hash function */
1813     salt = getpid() ^ time(NULL);
1814
1815     _drl_reg_op();
1816 }