Lots of changes. In no particular order:
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t partition = {
128     .next = NULL,
129     .key = "partition_set",
130     .type = CONFIG_TYPE_INT,
131     .options = CONFIG_OPT_NONE,
132     .u = { .value = 0xfffffff },
133 };
134
135 static config_entry_t netem_slice = {
136     .next = &partition,
137     .key = "netem_slice",
138     .type = CONFIG_TYPE_STRING,
139     .options = CONFIG_OPT_NONE,
140     .u = { .string = "ALL" },
141 };
142
143 static config_entry_t netem_loss = {
144     .next = &netem_slice,
145     .key = "netem_loss",
146     .type = CONFIG_TYPE_INT,
147     .options = CONFIG_OPT_NONE,
148     .u = { .value = 0 },
149 };
150
151 static config_entry_t netem_delay = {
152     .next = &netem_loss,
153     .key = "netem_delay",
154     .type = CONFIG_TYPE_INT,
155     .options = CONFIG_OPT_NONE,
156     .u = { .value = 0 },
157 };
158
159 static config_entry_t drl_configfile = {
160     .next = &netem_delay,
161     .key = "drl_configfile",
162     .type = CONFIG_TYPE_STRING,
163     .options = CONFIG_OPT_MANDATORY,
164     .u = { .string = "drl.xml" },
165 };
166
167 /** The administrative bandwidth limit (mbps) for the local node.  The node
168  * will not set a limit higher than this, even when distributed capacity is
169  * available.  Set to 0 for no limit. */
170 static config_entry_t nodelimit = {
171     .next = &drl_configfile,
172     .key = "nodelimit",
173     .type = CONFIG_TYPE_INT,
174     .options = CONFIG_OPT_MANDATORY,
175     .u = { .value = 0 },
176 };
177
178 /** Determines the verbosity of logging. */
179 static config_entry_t drl_loglevel = {
180     .next = &nodelimit,
181     .key = "drl_loglevel",
182     .type = CONFIG_TYPE_INT,
183     .options = CONFIG_OPT_MANDATORY,
184     .u = { .value = LOG_WARN },
185 };
186
187 /** The path of the logfile. */
188 static config_entry_t drl_logfile = {
189     .next = &drl_loglevel,
190     .key = "drl_logfile",
191     .type = CONFIG_TYPE_STRING,
192     .options = CONFIG_OPT_MANDATORY,
193     .u = { .string = "drl_logfile.log" },
194 };
195
196 /** The choice of DRL protocol. */
197 static config_entry_t policy = {
198     .next = &drl_logfile,
199     .key = "policy",
200     .type = CONFIG_TYPE_STRING,
201     .options = CONFIG_OPT_MANDATORY,
202     .u = { .string = "GRD" },
203 };
204
205 /** The estimate interval, in milliseconds. */
206 static config_entry_t estintms = {
207     .next = &policy,
208     .key = "estintms",
209     .type = CONFIG_TYPE_INT,
210     .options = CONFIG_OPT_MANDATORY,
211     .u = { .value = 100 },
212 };
213
214 #define config_entries (&estintms)
215
216 /*
217  * Debug functionality
218  */
219
220 #ifdef DMALLOC
221 #include <dmalloc.h>
222 #endif
223
224 #define NIPQUAD(addr) \
225     ((unsigned char *)&addr)[0], \
226     ((unsigned char *)&addr)[1], \
227     ((unsigned char *)&addr)[2], \
228     ((unsigned char *)&addr)[3]
229
230 #define IPQUAD(addr) \
231     ((unsigned char *)&addr)[3], \
232     ((unsigned char *)&addr)[2], \
233     ((unsigned char *)&addr)[1], \
234     ((unsigned char *)&addr)[0]
235
236
237
238 /* Salt for the hash functions */
239 static int salt;
240
241 /*
242  * Hash slice name lookups on context ID.
243  */
244
245 /* Special context IDs */
246 #define UNKNOWN_XID -1
247 #define ROOT_XID 0
248
249 enum {
250     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
251     ICMP_ECHOREPLY_XID,
252     ICMP_UNREACH_XID,
253 };
254
255
256 /* globals */
257 pthread_t estimate_thread;
258 pthread_t signal_thread;
259 pthread_t comm_thread;
260 uint32_t local_ip = 0;
261 limiter_t limiter;
262 extern FILE *logfile;
263 extern uint8_t system_loglevel;
264 extern uint8_t do_enforcement;
265
266 /* From peer_comm.c - used to simulate partition. */
267 extern int do_partition;
268 extern int partition_set;
269
270 /* functions */
271
272 static inline uint32_t
273 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
274 {
275     unsigned char mybytes[FLOWKEYSIZE];
276     mybytes[0] = protocol;
277     *(uint32_t*)(&(mybytes[1])) = src_ip;
278     *(uint32_t*)(&(mybytes[5])) = dst_ip;
279     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
280     return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
281 }
282
283 uint32_t sampled_hasher(const key_flow *key) {
284     /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
285     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
286 }
287
288 uint32_t standard_hasher(const key_flow *key) {
289     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
290 }
291
292 uint32_t multiple_hasher(const key_flow *key) {
293     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
294 }
295
296 struct intr_id {
297     char* name;
298     ulog_iret_t *res;
299 };
300
301 /* Interesting keys */
302 enum {
303     OOB_TIME_SEC = 0,
304     OOB_MARK,
305     IP_SADDR,
306     IP_DADDR,
307     IP_TOTLEN,
308     IP_PROTOCOL,
309     TCP_SPORT,
310     TCP_DPORT,
311     TCP_ACK,
312     TCP_FIN,
313     TCP_SYN,
314     TCP_RST,
315     UDP_SPORT,
316     UDP_DPORT,
317     ICMP_TYPE,
318     ICMP_CODE,
319     GRE_FLAG_KEY,
320     GRE_VERSION,
321     GRE_KEY,
322     PPTP_CALLID,
323 };
324
325 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
326 static struct intr_id intr_ids[] = {
327     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
328     [OOB_MARK] = { "oob.mark", 0 },
329     [IP_SADDR] = { "ip.saddr", 0 },
330     [IP_DADDR] = { "ip.daddr", 0 },
331     [IP_TOTLEN] = { "ip.totlen", 0 },
332     [IP_PROTOCOL] = { "ip.protocol", 0 },
333     [TCP_SPORT] = { "tcp.sport", 0 },
334     [TCP_DPORT] { "tcp.dport", 0 },
335     [TCP_ACK] = { "tcp.ack", 0 },
336     [TCP_FIN] = { "tcp.fin", 0 },
337     [TCP_SYN] = { "tcp.syn", 0 },
338     [TCP_RST] = { "tcp.rst", 0 },
339     [UDP_SPORT] = { "udp.sport", 0 },
340     [UDP_DPORT] = { "udp.dport", 0 },
341     [ICMP_TYPE] = { "icmp.type", 0 },
342     [ICMP_CODE] = { "icmp.code", 0 },
343     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
344     [GRE_VERSION] = { "gre.version", 0 },
345     [GRE_KEY] = { "gre.key", 0 },
346     [PPTP_CALLID] = { "pptp.callid", 0 },
347 };
348
349 #define GET_VALUE(x) intr_ids[x].res->value
350
351 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
352
353 static int _output_drl(ulog_iret_t *res)
354 {
355     int xid;
356     uint32_t src_ip, dst_ip;
357     uint16_t src_port, dst_port;
358     uint8_t protocol;
359
360     key_flow key;
361     identity_t *ident;
362     leaf_t *leaf;
363
364     protocol = GET_VALUE(IP_PROTOCOL).ui8;
365     src_ip = GET_VALUE(IP_SADDR).ui32;
366     dst_ip = GET_VALUE(IP_DADDR).ui32;
367     xid = GET_VALUE(OOB_MARK).ui32;
368
369     switch (protocol) {
370             
371         case IPPROTO_TCP:
372             src_port = GET_VALUE(TCP_SPORT).ui16;
373             dst_port = GET_VALUE(TCP_DPORT).ui16;
374             break;
375
376         case IPPROTO_UDP:
377             /* netflow had an issue with many udp flows and set
378              * src_port=0 to handle it.  We don't. 
379              */
380             src_port = GET_VALUE(UDP_SPORT).ui16;
381
382             /*
383              * traceroutes create a large number of flows in the db
384              * this is a quick hack to catch the most common form
385              * of traceroute (basically we're mapping any UDP packet
386              * in the 33435-33524 range to the "trace" port, 33524 is
387              * 3 packets * nhops (30).
388              */
389             dst_port = GET_VALUE(UDP_DPORT).ui16;
390             if (dst_port >= 33435 && dst_port <= 33524)
391                 dst_port = 33435;
392             break;
393
394         case IPPROTO_ICMP:
395             src_port = GET_VALUE(ICMP_TYPE).ui8;
396             dst_port = GET_VALUE(ICMP_CODE).ui8;
397
398             /*
399              * We special case some of the ICMP traffic that the kernel
400              * always generates. Since this is attributed to root, it 
401              * creates significant "noise" in the output. We want to be
402              * able to quickly see that root is generating traffic.
403              */
404             if (xid == ROOT_XID) {
405                 if (src_port == ICMP_ECHOREPLY)
406                     xid = ICMP_ECHOREPLY_XID;
407                 else if (src_port == ICMP_UNREACH)
408                     xid = ICMP_UNREACH_XID;
409             }
410             break;
411
412         case IPPROTO_GRE:
413             if (GET_VALUE(GRE_FLAG_KEY).b) {
414                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
415                     /* Get PPTP call ID */
416                     src_port = GET_VALUE(PPTP_CALLID).ui16;
417                 } else {
418                     /* XXX Truncate GRE keys to 16 bits */
419                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
420                 }
421             } else {
422                 /* No key available */
423                 src_port = 0;
424             }
425             dst_port = 0;
426             break;
427
428         default:
429             /* This is the default key for packets from unsupported protocols */
430             src_port = 0;
431             dst_port = 0;
432             break;
433     }
434
435     key.protocol = protocol;
436     key.source_ip = src_ip;
437     key.dest_ip = dst_ip;
438     key.source_port = src_port;
439     key.dest_port = dst_port;
440     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
441     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
442
443     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
444
445     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
446
447     /* Even if the packet doesn't match any specific xid, it should still
448      * count in the machine-type tables.  This catches root (xid == 0) and
449      * unclassified (xid = fff) packets, which don't have map entries. */
450     if (leaf == NULL) {
451         ident = limiter.stable_instance.last_machine;
452     } else {
453         ident = leaf->parent;
454     }
455
456     while (ident) {
457         pthread_mutex_lock(&ident->table_mutex);
458
459         /* Update the identity's table. */
460         ident->table_sample_function(ident->table, &key);
461
462 #ifdef SHADOW_ACCTING
463
464         /* Update the shadow perfect copy of the accounting table. */
465         standard_table_sample((standard_flow_table) ident->shadow_table, &key);
466
467 #endif
468
469         pthread_mutex_unlock(&ident->table_mutex);
470
471         ident = ident->parent;
472     }
473
474     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
475
476     return 0;
477 }
478
479 /* get all key id's for the keys we are intrested in */
480 static int get_ids(void)
481 {
482     int i;
483     struct intr_id *cur_id;
484
485     for (i = 0; i < INTR_IDS; i++) {
486         cur_id = &intr_ids[i];
487         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
488         if (!cur_id->res) {
489             ulogd_log(ULOGD_ERROR, 
490                     "Cannot resolve keyhash id for %s\n", 
491                     cur_id->name);
492             return 1;
493         }
494     }
495     return 0;
496 }
497
498 static void free_identity(identity_t *ident) {
499     if (ident) {
500         free_comm(&ident->comm);
501
502         if (ident->table) {
503             ident->table_destroy_function(ident->table);
504         }
505
506         if (ident->loop_action) {
507             ident->loop_action->valid = 0;
508         }
509
510         if (ident->comm_action) {
511             ident->comm_action->valid = 0;
512         }
513
514         pthread_mutex_destroy(&ident->table_mutex);
515
516         free(ident);
517     }
518 }
519
520 static void free_identity_map(map_handle map) {
521     identity_t *tofree = NULL;
522
523     map_reset_iterate(map);
524     while ((tofree = (identity_t *) map_next(map))) {
525         free_identity(tofree);
526     }
527
528     free_map(map, 0);
529 }
530
531 static void free_instance(drl_instance_t *instance) {
532     if (instance->leaves)
533         free(instance->leaves);
534     if (instance->leaf_map)
535         free_map(instance->leaf_map, 0);
536     if (instance->ident_map)
537         free_identity_map(instance->ident_map);
538     if (instance->machines)
539         free(instance->machines);
540     if (instance->sets)
541         free(instance->sets);
542
543     /* FIXME: Drain the calendar first and free all the entries. */
544     if (instance->cal) {
545         free(instance->cal);
546     }
547
548     memset(instance, 0, sizeof(drl_instance_t));
549 }
550
551 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
552     /* Free configs. */
553     if (configs.machines)
554         free_ident_list(configs.machines);
555     if (configs.sets)
556         free_ident_list(configs.sets);
557
558     /* Free instance. */
559     if (instance)
560         free_instance(instance);
561 }
562
563 static identity_t *new_identity(ident_config *config) {
564     identity_t *ident = malloc(sizeof(identity_t));
565     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
566     ident_peer *peer = config->peers;
567     int peer_slot = 0;
568
569     if (ident == NULL) {
570         return NULL;
571     }
572
573     if (comm_nodes == NULL) {
574         free(ident);
575         return NULL;
576     }
577
578     memset(ident, 0, sizeof(identity_t));
579     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
580
581     ident->id = config->id;
582     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
583     ident->fixed_ewma_weight = config->fixed_ewma_weight;
584     ident->communication_intervals = config->communication_intervals;
585     ident->mainloop_intervals = config->mainloop_intervals;
586     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
587                              (limiter.estintms/1000.0) * config->mainloop_intervals);
588     ident->parent = NULL;
589
590     pthread_mutex_init(&ident->table_mutex, NULL);
591     switch (config->accounting) {
592         case ACT_STANDARD:
593             ident->table =
594                 standard_table_create(standard_hasher, &ident->common);
595
596             /* Ugly function pointer casting.  Makes things sufficiently
597              * generic, though. */
598             ident->table_sample_function =
599                 (int (*)(void *, const key_flow *)) standard_table_sample;
600             ident->table_cleanup_function =
601                 (int (*)(void *)) standard_table_cleanup;
602             ident->table_update_function =
603                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
604             ident->table_destroy_function =
605                 (void (*)(void *)) standard_table_destroy;
606             break;
607
608         case ACT_MULTIPLE:
609             ident->table =
610                 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
611
612             ident->table_sample_function =
613                 (int (*)(void *, const key_flow *)) multiple_table_sample;
614             ident->table_cleanup_function =
615                 (int (*)(void *)) multiple_table_cleanup;
616             ident->table_update_function =
617                 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
618             ident->table_destroy_function =
619                 (void (*)(void *)) multiple_table_destroy;
620             break;
621
622         case ACT_SAMPLEHOLD:
623             ident->table = sampled_table_create(sampled_hasher,
624                     ident->limit * IDENT_CLEAN_INTERVAL,
625                     SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
626
627             ident->table_sample_function =
628                 (int (*)(void *, const key_flow *)) sampled_table_sample;
629             ident->table_cleanup_function =
630                 (int (*)(void *)) sampled_table_cleanup;
631             ident->table_update_function =
632                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
633             ident->table_destroy_function =
634                 (void (*)(void *)) sampled_table_destroy;
635             break;
636
637         case ACT_SIMPLE:
638             ident->table = simple_table_create(&ident->common);
639
640             ident->table_sample_function =
641                 (int (*)(void *, const key_flow *)) simple_table_sample;
642             ident->table_cleanup_function =
643                 (int (*)(void *)) simple_table_cleanup;
644             ident->table_update_function =
645                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
646             ident->table_destroy_function =
647                 (void (*)(void *)) simple_table_destroy;
648             break;
649     }
650
651 #ifdef SHADOW_ACCTING
652
653     ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
654
655     if (ident->shadow_table == NULL) {
656         ident->table_destroy_function(ident->table);
657         free(ident);
658         return NULL;
659     }
660
661 #endif
662
663     /* Make sure the table was allocated. */
664     if (ident->table == NULL) {
665         free(ident);
666         return NULL;
667     }
668
669     while (peer) {
670         comm_nodes[peer_slot].addr = peer->ip;
671         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
672         peer = peer->next;
673         peer_slot += 1;
674     }
675
676     if (new_comm(&ident->comm, config, comm_nodes)) {
677         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
678         return NULL;
679     }
680
681     ident->comm.remote_nodes = comm_nodes;
682
683     return ident;
684 }
685
686 /* Determines the validity of the parameters of one ident_config.
687  *
688  * 0 valid
689  * 1 invalid
690  */
691 static int validate_config(ident_config *config) {
692     /* Limit must be a positive integer. */
693     if (config->limit < 1) {
694         return 1;
695     }
696
697     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
698     if (config->commfabric != COMM_MESH &&
699             config->commfabric != COMM_GOSSIP) {
700         return 1;
701     }
702
703     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
704     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
705         return 1;
706     }
707
708     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
709      * ACT_SIMPLE, ACT_MULTIPLE). */
710     if (config->accounting != ACT_STANDARD &&
711             config->accounting != ACT_SAMPLEHOLD &&
712             config->accounting != ACT_SIMPLE &&
713             config->accounting != ACT_MULTIPLE) {
714         return 1;
715     }
716
717     /* Ewma weight must be greater than or equal to zero. */
718     if (config->fixed_ewma_weight < 0) {
719         return 1;
720     }
721
722     /* Note: Parsing stage requires that each ident has at least one peer. */
723     return 0;
724 }
725
726 /* 0 success
727  * non-zero failure
728  */
729 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
730
731     ident_config *mlist = configs.machines;
732     ident_config *slist = configs.sets;
733     ident_config *tmp = NULL;
734     int i = 0;
735
736     while (mlist) {
737         /* ID must be non-zero and unique. */
738         /* This is ugly and hackish, but this function will be called rarely.
739          * I'm tired of trying to be clever. */
740         if (mlist->id < 0) {
741             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
742             return EINVAL;
743         }
744         tmp = mlist->next;
745         while (tmp) {
746             if (mlist->id == tmp->id) {
747                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
748                 return EINVAL;
749             }
750             tmp = tmp->next;
751         }
752         tmp = configs.sets;
753         while (tmp) {
754             if (mlist->id == tmp->id) {
755                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
756                 return EINVAL;
757             }
758             tmp = tmp->next;
759         }
760
761         if (validate_config(mlist)) {
762             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
763             return EINVAL;
764         }
765
766         mlist = mlist->next;
767     }
768
769     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
770     if (instance->sets == NULL) {
771         return ENOMEM;
772     }
773
774     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
775     instance->set_count = configs.set_count;
776
777     /* For sets, make sure that the hierarchy is valid. */
778     while (slist) {
779         ident_member *members = slist->members;
780
781         /* ID must be non-zero and unique. */
782         if (slist->id < 0) {
783             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
784             return EINVAL;
785         }
786         tmp = slist->next;
787         while (tmp) {
788             if (slist->id == tmp->id) {
789                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
790                 return EINVAL;
791             }
792             tmp = tmp->next;
793         }
794
795         if (validate_config(slist)) {
796             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
797             return EINVAL;
798         }
799
800         /* Allocate an identity_t for this set-type identity. */
801         instance->sets[i] = new_identity(slist);
802
803         if (instance->sets[i] == NULL) {
804             return ENOMEM;
805         }
806
807         /* Loop through children and look up each in leaf or ident map
808          * depending on the type of child.  Set the child's parent pointer
809          * to the identity we just created above, unless it is already set,
810          * in which case we have an error. */
811         while (members) {
812             identity_t *child_ident = NULL;
813             leaf_t *child_leaf = NULL;
814
815             switch (members->type) {
816                 case MEMBER_XID:
817                     child_leaf = map_search(instance->leaf_map, &members->value,
818                                             sizeof(members->value));
819                     if (child_leaf == NULL) {
820                         return EINVAL;
821                     }
822                     if (child_leaf->parent != NULL) {
823                         /* Error - This leaf already has a parent. */
824                         return EINVAL;
825                     }
826                     child_leaf->parent = instance->sets[i];
827                     break;
828                 case MEMBER_GUID:
829                     child_ident = map_search(instance->ident_map, &members->value,
830                                              sizeof(members->value));
831                     if (child_ident == NULL) {
832                         return EINVAL;
833                     }
834                     if (child_ident->parent != NULL) {
835                         /* Error - This identity already has a parent. */
836                         return EINVAL;
837                     }
838                     child_ident->parent = instance->sets[i];
839                     break;
840                 default:
841                     /* Error - shouldn't be possible. */
842                     break;
843             }
844             members = members->next;
845         }
846
847         map_insert(instance->ident_map, &instance->sets[i]->id,
848                    sizeof(instance->sets[i]->id), instance->sets[i]);
849
850         slist = slist->next;
851         i++;
852     }
853     return 0;
854 }
855
856 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
857     int count = 0;
858     identity_t *current_ident;
859     leaf_t *current_leaf;
860     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
861     if (leaves == NULL) {
862         return 1;
863     }
864
865     map_reset_iterate(instance->leaf_map);
866     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
867         current_ident = current_leaf->parent;
868         while (current_ident != NULL && current_ident != instance->last_machine) {
869             if (current_ident == ident) {
870                 /* Found the ident we were looking for - add the leaf. */
871                 leaves[count] = current_leaf;
872                 count += 1;
873                 break;
874             }
875             current_ident = current_ident->parent;
876         }
877     }
878
879     ident->leaves = leaves;
880     ident->leaf_count = count;
881
882     return 0;
883 }
884
885 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
886     int i, j;
887     ident_config *config = configs.machines;
888     leaf_t *leaf = NULL;
889
890     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
891
892     if (instance->cal == NULL) {
893         return ENOMEM;
894     }
895
896     for (i = 0; i < SCHEDLEN; ++i) {
897         TAILQ_INIT(instance->cal + i);
898     }
899     instance->cal_slot = 0;
900
901     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
902
903     if (instance->machines == NULL) {
904         return ENOMEM;
905     }
906
907     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
908     instance->machine_count = configs.machine_count;
909
910     /* Allocate and add the machine identities. */
911     for (i = 0; i < configs.machine_count; ++i) {
912         identity_action *loop_action;
913         identity_action *comm_action;
914         instance->machines[i] = new_identity(config);
915
916         if (instance->machines[i] == NULL) {
917             return ENOMEM;
918         }
919
920         loop_action = malloc(sizeof(identity_action));
921         comm_action = malloc(sizeof(identity_action));
922
923         if (loop_action == NULL || comm_action == NULL) {
924             return ENOMEM;
925         }
926
927         /* The first has no parent - it is the root.  All others have the
928          * previous ident as their parent. */
929         if (i == 0) {
930             instance->machines[i]->parent = NULL;
931         } else {
932             instance->machines[i]->parent = instance->machines[i - 1];
933         }
934
935         instance->last_machine = instance->machines[i];
936
937         /* Add the ident to the guid->ident map. */
938         map_insert(instance->ident_map, &instance->machines[i]->id,
939                    sizeof(instance->machines[i]->id), instance->machines[i]);
940
941         config = config->next;
942
943         memset(loop_action, 0, sizeof(identity_action));
944         memset(comm_action, 0, sizeof(identity_action));
945         loop_action->ident = instance->machines[i];
946         loop_action->action = ACTION_MAINLOOP;
947         loop_action->valid = 1;
948         comm_action->ident = instance->machines[i];
949         comm_action->action = ACTION_COMMUNICATE;
950         comm_action->valid = 1;
951
952         instance->machines[i]->loop_action = loop_action;
953         instance->machines[i]->comm_action = comm_action;
954
955         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
956                           loop_action, calendar);
957
958         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
959                           comm_action, calendar);
960
961         /* Setup the array of pointers to leaves.  This is easy for machines
962          * because a machine node applies to every leaf. */
963         instance->machines[i]->leaves =
964             malloc(instance->leaf_count * sizeof(leaf_t *));
965         if (instance->machines[i]->leaves == NULL) {
966             return ENOMEM;
967         }
968         instance->machines[i]->leaf_count = instance->leaf_count;
969         for (j = 0; j < instance->leaf_count; ++j) {
970             instance->machines[i]->leaves[j] = &instance->leaves[j];
971         }
972     }
973
974     /* Connect the set subtree to the machines. Any set or leaf without a
975      * parent will take the last machine as its parent. */
976
977     /* Leaves... */
978     map_reset_iterate(instance->leaf_map);
979     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
980         if (leaf->parent == NULL) {
981             leaf->parent = instance->last_machine;
982         }
983     }
984
985     /* Sets... */
986     for (i = 0; i < instance->set_count; ++i) {
987         identity_action *loop_action;
988         identity_action *comm_action;
989
990         if (instance->sets[i]->parent == NULL) {
991             instance->sets[i]->parent = instance->last_machine;
992         }
993
994         loop_action = malloc(sizeof(identity_action));
995         comm_action = malloc(sizeof(identity_action));
996
997         if (loop_action == NULL || comm_action == NULL) {
998             return ENOMEM;
999         }
1000
1001         memset(loop_action, 0, sizeof(identity_action));
1002         memset(comm_action, 0, sizeof(identity_action));
1003         loop_action->ident = instance->sets[i];
1004         loop_action->action = ACTION_MAINLOOP;
1005         loop_action->valid = 1;
1006         comm_action->ident = instance->sets[i];
1007         comm_action->action = ACTION_COMMUNICATE;
1008         comm_action->valid = 1;
1009
1010         instance->sets[i]->loop_action = loop_action;
1011         instance->sets[i]->comm_action = comm_action;
1012
1013         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1014                           loop_action, calendar);
1015
1016         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1017                           comm_action, calendar);
1018
1019         /* Setup the array of pointers to leaves.  This is harder for sets,
1020          * but this doesn't need to be super-efficient because it happens
1021          * rarely and it isn't on the critical path for reconfig(). */
1022         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1023             return ENOMEM;
1024         }
1025     }
1026
1027     /* Success. */
1028     return 0;
1029 }
1030
1031 static void print_instance(drl_instance_t *instance) {
1032     leaf_t *leaf = NULL;
1033     identity_t *ident = NULL;
1034
1035     if (system_loglevel == LOG_DEBUG) {
1036         map_reset_iterate(instance->leaf_map);
1037         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1038             printf("%x:", leaf->xid);
1039             ident = leaf->parent;
1040             while (ident) {
1041                 printf("%d:",ident->id);
1042                 ident = ident->parent;
1043             }
1044             printf("Leaf's parent pointer is %p\n", leaf->parent);
1045         }
1046
1047         printf("instance->last_machine is %p\n", instance->last_machine);
1048     }
1049 }
1050
1051 static int assign_htb_hierarchy(drl_instance_t *instance) {
1052     int i, j;
1053     int next_node = 0x100;
1054
1055     /* Chain machine nodes under 1:10. */
1056     for (i = 0; i < instance->machine_count; ++i) {
1057         if (instance->machines[i]->parent == NULL) {
1058             /* Top node. */
1059             instance->machines[i]->htb_parent = 0x10;
1060         } else {
1061             /* Pointerific! */
1062             instance->machines[i]->htb_parent =
1063                 instance->machines[i]->parent->htb_node;
1064         }
1065
1066         instance->machines[i]->htb_node = next_node;
1067         next_node += 1;
1068     }
1069
1070     next_node += 0x10;
1071
1072     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1073      * already there. */
1074     for (j = (instance->set_count - 1); j >= 0; --j) {
1075         if (instance->sets[j]->parent == NULL) {
1076             instance->sets[j]->htb_parent = 0x10;
1077         } else {
1078             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1079         }
1080         instance->sets[j]->htb_node = next_node;
1081
1082         next_node += 1;
1083     }
1084
1085     return 0;
1086 }
1087
1088 /* Added this so that I could comment one line and kill off all of the
1089  * command execution. */
1090 static inline int execute_cmd(const char *cmd) {
1091     return system(cmd);
1092 }
1093
1094 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1095                                const uint32_t classid_major, const uint32_t classid_minor,
1096                                const uint64_t rate, const uint64_t ceil) {
1097     char cmd[300];
1098
1099     sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1100             iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1101     printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1102
1103     return execute_cmd(cmd);
1104 }
1105
1106 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1107                                 const uint32_t parent_minor, const uint32_t handle,
1108                                 const int loss, const int delay) {
1109     char cmd[300];
1110
1111     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1112             parent_minor, handle);
1113     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1114     if (execute_cmd(cmd))
1115         printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1116
1117     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1118             iface, parent_major, parent_minor, handle, loss, delay);
1119     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1120     return execute_cmd(cmd);
1121 }
1122
1123 static int create_htb_hierarchy(drl_instance_t *instance) {
1124     char cmd[300];
1125     int i, j, k;
1126     uint64_t gigabit = 1024 * 1024 * 1024;
1127
1128     /* Nuke the hierarchy. */
1129     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1130     execute_cmd(cmd);
1131     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1132
1133     /* Re-initialize the basics. */
1134     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1135     if (execute_cmd(cmd)) {
1136         return 1;
1137     }
1138     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1139
1140     if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1141         return 1;
1142
1143     /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1144     if (limiter.nodelimit) {
1145         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1146             return 1;
1147     } else {
1148         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1149             return 1;
1150     }
1151
1152     /* Add machines. */
1153     for (i = 0; i < instance->machine_count; ++i) {
1154         if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1155                          instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1156             return 1;
1157         }
1158     }
1159
1160 #define LIMITEXEMPT
1161
1162     /* Add back 1:20. */
1163 #ifdef LIMITEXEMPT
1164     if (instance->last_machine == NULL) {
1165         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1166     } else {
1167         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1168             instance->last_machine->htb_node);
1169     }
1170 #else
1171     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1172 #endif
1173
1174     if (execute_cmd(cmd)) {
1175         return 1;
1176     }
1177     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1178
1179     /* Add sets. */
1180     for (j = (instance->set_count - 1); j >= 0; --j) {
1181         if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1182                          instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1183             return 1;
1184         }
1185     }
1186
1187     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1188     for (k = 0; k < instance->leaf_count; ++k) {
1189         if (instance->leaves[k].parent == NULL) {
1190             if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1191                 return 1;
1192         } else {
1193             if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1194                 return 1;
1195         }
1196
1197         /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1198         if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1199             return 1;
1200     }
1201
1202     /* Add 1:1000 and 1:2000 */
1203     if (instance->last_machine == NULL) {
1204         if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1205             return 1;
1206     } else {
1207         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1208             return 1;
1209     }
1210
1211     if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1212         return 1;
1213
1214     /* Add 1:1fff and 1:2fff */
1215     if (instance->last_machine == NULL) {
1216         if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1217             return 1;
1218     } else {
1219         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1220             return 1;
1221     }
1222
1223     if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1224         return 1;
1225
1226     /* Artifical delay or loss for experimentation. */
1227     if (netem_delay.u.value || netem_loss.u.value) {
1228         if (!strcmp(netem_slice.u.string, "ALL")) {
1229             /* By default, netem applies to all leaves. */
1230             if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1231                 return 1;
1232             if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1233                 return 1;
1234
1235             for (k = 0; k < instance->leaf_count; ++k) {
1236                 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1237                             (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1238                     return 1;
1239                 }
1240
1241                 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1242             }
1243         } else {
1244             /* netem_slice is not the default ALL value.  Only apply netem
1245              * to the slice that is set in netem_slice.u.string. */
1246             uint32_t slice_xid;
1247
1248             sscanf(netem_slice.u.string, "%x", &slice_xid);
1249
1250             if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1251                 return 1;
1252         }
1253     }
1254
1255 #if 0
1256 #ifdef DELAY40MS
1257     /* Only for artificial delay testing. */
1258     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1259     execute_cmd(cmd);
1260
1261     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1262     execute_cmd(cmd);
1263     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
1264     execute_cmd(cmd);
1265
1266     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
1267     execute_cmd(cmd);
1268     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
1269     execute_cmd(cmd);
1270
1271     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
1272     execute_cmd(cmd);
1273     /* End delay testing */
1274 #endif
1275 #endif
1276
1277     return 0;
1278 }
1279
1280 static int setup_tc_grd(drl_instance_t *instance) {
1281     int i;
1282     char cmd[300];
1283
1284     for (i = 0; i < instance->leaf_count; ++i) {
1285         /* Delete the old pfifo qdisc that might have been there before. */
1286         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1287                 instance->leaves[i].xid, instance->leaves[i].xid);
1288
1289         if (execute_cmd(cmd)) {
1290             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1291         }
1292
1293         /* Add the netem qdisc. */
1294 #ifdef DELAY40MS
1295         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
1296                 instance->leaves[i].xid, instance->leaves[i].xid);
1297 #else
1298         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1299                 instance->leaves[i].xid, instance->leaves[i].xid);
1300 #endif
1301
1302         if (execute_cmd(cmd)) {
1303             return 1;
1304         }
1305     }
1306
1307     /* Do the same for 1000 and 1fff. */
1308     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1309
1310     if (execute_cmd(cmd)) {
1311         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1312     }
1313
1314     /* Add the netem qdisc. */
1315 #ifdef DELAY40MS
1316     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1317 #else
1318     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1319 #endif
1320
1321     if (execute_cmd(cmd)) {
1322         return 1;
1323     }
1324
1325     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1326
1327     if (execute_cmd(cmd)) {
1328         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1329     }
1330
1331     /* Add the netem qdisc. */
1332 #ifdef DELAY40MS
1333     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
1334 #else
1335     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1336 #endif
1337
1338     if (execute_cmd(cmd)) {
1339         return 1;
1340     }
1341
1342     return 0;
1343 }
1344
1345 /* init_drl 
1346  * 
1347  * Initialize this limiter with options 
1348  * Open UDP socket for peer communication
1349  */
1350 static int init_drl(void) {
1351     parsed_configs configs;
1352     struct sockaddr_in server_address;
1353     
1354     memset(&limiter, 0, sizeof(limiter_t));
1355
1356     /* Setup logging. */
1357     system_loglevel = (uint8_t) drl_loglevel.u.value;
1358     logfile = fopen(drl_logfile.u.string, "w");
1359
1360     if (logfile == NULL) {
1361         printf("Couldn't open logfile - ");
1362         perror("fopen()");
1363         exit(EXIT_FAILURE);
1364     }
1365
1366     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1367
1368     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1369
1370     init_hashing();  /* for all hash maps */
1371
1372     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1373
1374     /* determine our local IP by iterating through interfaces */
1375     if ((limiter.ip = get_local_ip())==0) {
1376         printlog(LOG_CRITICAL,
1377                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1378         return (false);
1379     }
1380     limiter.localaddr = inet_addr(limiter.ip);
1381     limiter.port = htons(LIMITER_LISTEN_PORT);
1382     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1383     if (limiter.udp_socket < 0) {
1384         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1385         return false;
1386     }
1387
1388     memset(&server_address, 0, sizeof(server_address));
1389     server_address.sin_family = AF_INET;
1390     server_address.sin_addr.s_addr = limiter.localaddr;
1391     server_address.sin_port = limiter.port;
1392
1393     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1394         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1395         return false;
1396     }
1397
1398     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1399     if (strcasecmp(policy.u.string,"GRD") == 0) {
1400         limiter.policy = POLICY_GRD;
1401     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1402         limiter.policy = POLICY_FPS;
1403     } else {
1404         printlog(LOG_CRITICAL,
1405                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1406         return (false);
1407     }
1408
1409     limiter.estintms = estintms.u.value;
1410     if (limiter.estintms > 1000) {
1411         printlog(LOG_CRITICAL,
1412                  "DRL: sorry estimate intervals must be less than 1 second.");
1413         printlog(LOG_CRITICAL,
1414                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1415         limiter.estintms = 1000;
1416     }
1417     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1418     
1419     /* Acquire the big limiter lock for writing.  Prevents pretty much
1420      * anything else from happening while the hierarchy is being changed. */
1421     pthread_rwlock_wrlock(&limiter.limiter_lock);
1422
1423     limiter.stable_instance.ident_map = allocate_map();
1424     if (limiter.stable_instance.ident_map == NULL) {
1425         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1426         return false;
1427     }
1428
1429     if (get_eligible_leaves(&limiter.stable_instance)) {
1430         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1431         return false;
1432     }
1433
1434     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1435         /* Parse error occured. Return non-zero to notify init_drl(). */
1436         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1437             drl_configfile.u.string);
1438         return false;
1439     }
1440
1441     /* Validate identity hierarchy! */
1442     if (validate_configs(configs, &limiter.stable_instance)) {
1443         /* Clean up everything. */
1444         free_failed_config(configs, &limiter.stable_instance);
1445         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1446             drl_configfile.u.string);
1447         return false;
1448     }
1449
1450     if (init_identities(configs, &limiter.stable_instance)) {
1451         free_failed_config(configs, &limiter.stable_instance);
1452         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1453         return false;
1454     }
1455
1456     /* At this point, we should be done with configs. */
1457     free_ident_list(configs.machines);
1458     free_ident_list(configs.sets);
1459
1460     /* Debugging - FIXME: remove this? */
1461     print_instance(&limiter.stable_instance);
1462
1463     switch (limiter.policy) {
1464         case POLICY_FPS:
1465             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1466                 free_instance(&limiter.stable_instance);
1467                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1468                 return false;
1469             }
1470
1471             if (create_htb_hierarchy(&limiter.stable_instance)) {
1472                 free_instance(&limiter.stable_instance);
1473                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1474                 return false;
1475             }
1476         break;
1477
1478         case POLICY_GRD:
1479             if (setup_tc_grd(&limiter.stable_instance)) {
1480                 free_instance(&limiter.stable_instance);
1481                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1482                 return false;
1483             }
1484         break;
1485
1486         default:
1487         return false;
1488     }
1489
1490     partition_set = partition.u.value;
1491
1492     pthread_rwlock_unlock(&limiter.limiter_lock);
1493
1494     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1495         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1496         return false;
1497     }
1498
1499     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1500
1501     return true;
1502 }
1503
1504 static void reconfig() {
1505     parsed_configs configs;
1506
1507     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1508     flushlog();
1509
1510     memset(&configs, 0, sizeof(parsed_configs));
1511     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1512
1513     limiter.new_instance.ident_map = allocate_map();
1514     if (limiter.new_instance.ident_map == NULL) {
1515         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1516         return;
1517     }
1518
1519     if (get_eligible_leaves(&limiter.new_instance)) {
1520         free_failed_config(configs, &limiter.new_instance);
1521         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1522         return;
1523     }
1524
1525     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1526         free_failed_config(configs, &limiter.new_instance);
1527         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1528         return;
1529     }
1530
1531     if (validate_configs(configs, &limiter.new_instance)) {
1532         free_failed_config(configs, &limiter.new_instance);
1533         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1534         pthread_rwlock_unlock(&limiter.limiter_lock);
1535         return;
1536     }
1537
1538     if (init_identities(configs, &limiter.new_instance)) {
1539         free_failed_config(configs, &limiter.new_instance);
1540         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1541         pthread_rwlock_unlock(&limiter.limiter_lock);
1542         return;
1543     }
1544
1545     free_ident_list(configs.machines);
1546     free_ident_list(configs.sets);
1547
1548     /* Debugging - FIXME: remove this? */
1549     print_instance(&limiter.new_instance);
1550     
1551     /* Lock */
1552     pthread_rwlock_wrlock(&limiter.limiter_lock);
1553
1554     switch (limiter.policy) {
1555         case POLICY_FPS:
1556             if (assign_htb_hierarchy(&limiter.new_instance)) {
1557                 free_instance(&limiter.new_instance);
1558                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1559                 pthread_rwlock_unlock(&limiter.limiter_lock);
1560                 return;
1561             }
1562
1563             if (create_htb_hierarchy(&limiter.new_instance)) {
1564                 free_instance(&limiter.new_instance);
1565                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1566
1567                 /* Re-create old instance. */
1568                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1569                     /* Error reinstating the old one - big problem. */
1570                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1571                     printlog(LOG_CRITICAL, "Giving up...\n");
1572                     flushlog();
1573                     exit(EXIT_FAILURE);
1574                 }
1575
1576                 pthread_rwlock_unlock(&limiter.limiter_lock);
1577                 return;
1578             }
1579         break;
1580
1581         case POLICY_GRD:
1582             if (setup_tc_grd(&limiter.new_instance)) {
1583                 free_instance(&limiter.new_instance);
1584                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1585
1586                 /* Try to re-create old instance. */
1587                 if (setup_tc_grd(&limiter.stable_instance)) {
1588                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1589                     printlog(LOG_CRITICAL, "Giving up...\n");
1590                     flushlog();
1591                     exit(EXIT_FAILURE);
1592                 }
1593             }
1594         break;
1595
1596         default:
1597             /* Should be impossible. */
1598             printf("Pigs are flying?\n");
1599             exit(EXIT_FAILURE);
1600     }
1601
1602     /* Switch over new to stable instance. */
1603     free_instance(&limiter.stable_instance);
1604     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1605
1606     /* Success! - Unlock */
1607     pthread_rwlock_unlock(&limiter.limiter_lock);
1608 }
1609
1610 static ulog_output_t drl_op = {
1611     .name = "drl",
1612     .output = &_output_drl,
1613     .signal = NULL, /* This appears to be broken. Using my own handler. */
1614     .init = NULL,
1615     .fini = NULL,
1616 };
1617
1618 /* Tests the amount of time it takes to call reconfig(). */
1619 static void time_reconfig(int iterations) {
1620     struct timeval start, end;
1621     int i;
1622
1623     gettimeofday(&start, NULL);
1624     for (i = 0; i < iterations; ++i) {
1625         reconfig();
1626     }
1627     gettimeofday(&end, NULL);
1628
1629     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1630            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1631     exit(0);
1632
1633     // Seems to take about 85ms / iteration
1634 }
1635
1636 static int stop_enforcement(drl_instance_t *instance) {
1637     char cmd[300];
1638     int i;
1639
1640     for (i = 0; i < instance->machine_count; ++i) {
1641         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1642                 instance->machines[i]->htb_parent,
1643                 instance->machines[i]->htb_node);
1644
1645         if (execute_cmd(cmd)) {
1646             return 1;
1647         }
1648     }
1649
1650     for (i = 0; i < instance->set_count; ++i) {
1651         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1652                 instance->sets[i]->htb_parent,
1653                 instance->sets[i]->htb_node);
1654
1655         if (execute_cmd(cmd)) {
1656             return 1;
1657         }
1658     }
1659
1660     return 0;
1661 }
1662
1663 static void *signal_thread_func(void *args) {
1664     int sig;
1665     int err;
1666     sigset_t sigs;
1667
1668     sigemptyset(&sigs);
1669     sigaddset(&sigs, SIGHUP);
1670     sigaddset(&sigs, SIGUSR1);
1671     sigaddset(&sigs, SIGUSR2);
1672     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1673
1674     while (1) {
1675         sigemptyset(&sigs);
1676         sigaddset(&sigs, SIGHUP);
1677         sigaddset(&sigs, SIGUSR1);
1678         sigaddset(&sigs, SIGUSR2);
1679
1680         err = sigwait(&sigs, &sig);
1681
1682         if (err) {
1683             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1684             flushlog();
1685         }
1686
1687         switch (sig) {
1688             case SIGHUP:
1689                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1690                 reconfig();
1691                 //time_reconfig(1000); /* instrumentation */
1692                 flushlog();
1693                 break;
1694             case SIGUSR1:
1695                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1696                 if (do_enforcement) {
1697                     do_enforcement = 0;
1698                     stop_enforcement(&limiter.stable_instance);
1699                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1700                 } else {
1701                     do_enforcement = 1;
1702                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1703                 }
1704                 pthread_rwlock_unlock(&limiter.limiter_lock);
1705                 break;
1706             case SIGUSR2:
1707                 do_partition = !do_partition;
1708                 break;
1709             default:
1710                 /* Intentionally blank. */
1711                 break;
1712         }
1713     }
1714
1715 }
1716
1717 /* register output plugin with ulogd */
1718 static void _drl_reg_op(void)
1719 {
1720     ulog_output_t *op = &drl_op;
1721     sigset_t signal_mask;
1722
1723     sigemptyset(&signal_mask);
1724     sigaddset(&signal_mask, SIGHUP);
1725     sigaddset(&signal_mask, SIGUSR1);
1726     sigaddset(&signal_mask, SIGUSR2);
1727     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1728
1729     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1730         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1731         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1732         flushlog();
1733         exit(EXIT_FAILURE);
1734     }
1735
1736     if (!init_drl()) {
1737         printlog(LOG_CRITICAL, "Init failed. :(\n");
1738         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1739         flushlog();
1740         exit(EXIT_FAILURE);
1741     }
1742
1743     register_output(op);
1744
1745     /* start up the thread that will periodically estimate the
1746      * local rate and set the local limits
1747      * see estimate.c
1748      */
1749     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1750         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1751         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1752         exit(EXIT_FAILURE);
1753     }
1754 }
1755
1756 void _init(void)
1757 {
1758     /* have the opts parsed */
1759     config_parse_file("DRL", config_entries);
1760
1761     if (get_ids()) {
1762         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1763         exit(2);
1764     }
1765
1766     /* Seed the hash function */
1767     salt = getpid() ^ time(NULL);
1768
1769     _drl_reg_op();
1770 }