170ab4375ea7e122ce901ec224882f9ebcd51332
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t create_htb = {
128     .next = NULL,
129     .key = "create_htb",
130     .type = CONFIG_TYPE_INT,
131     .options = CONFIG_OPT_NONE,
132     .u = { .value = 1 },
133 };
134
135 static config_entry_t enforce_on = {
136     .next = &create_htb,
137     .key = "enforce_on",
138     .type = CONFIG_TYPE_INT,
139     .options = CONFIG_OPT_NONE,
140     .u = { .value = 1 },
141 };
142
143 static config_entry_t partition = {
144     .next = &enforce_on,
145     .key = "partition_set",
146     .type = CONFIG_TYPE_INT,
147     .options = CONFIG_OPT_NONE,
148     .u = { .value = 0xfffffff },
149 };
150
151 static config_entry_t sfq_slice = {
152     .next = &partition,
153     .key = "sfq_slice",
154     .type = CONFIG_TYPE_STRING,
155     .options = CONFIG_OPT_NONE,
156     .u = { .string = "NONE" },
157 };
158
159 static config_entry_t netem_slice = {
160     .next = &sfq_slice,
161     .key = "netem_slice",
162     .type = CONFIG_TYPE_STRING,
163     .options = CONFIG_OPT_NONE,
164     .u = { .string = "ALL" },
165 };
166
167 static config_entry_t netem_loss = {
168     .next = &netem_slice,
169     .key = "netem_loss",
170     .type = CONFIG_TYPE_INT,
171     .options = CONFIG_OPT_NONE,
172     .u = { .value = 0 },
173 };
174
175 static config_entry_t netem_delay = {
176     .next = &netem_loss,
177     .key = "netem_delay",
178     .type = CONFIG_TYPE_INT,
179     .options = CONFIG_OPT_NONE,
180     .u = { .value = 0 },
181 };
182
183 static config_entry_t drl_configfile = {
184     .next = &netem_delay,
185     .key = "drl_configfile",
186     .type = CONFIG_TYPE_STRING,
187     .options = CONFIG_OPT_MANDATORY,
188     .u = { .string = "drl.xml" },
189 };
190
191 /** The administrative bandwidth limit (mbps) for the local node.  The node
192  * will not set a limit higher than this, even when distributed capacity is
193  * available.  Set to 0 for no limit. */
194 static config_entry_t nodelimit = {
195     .next = &drl_configfile,
196     .key = "nodelimit",
197     .type = CONFIG_TYPE_INT,
198     .options = CONFIG_OPT_MANDATORY,
199     .u = { .value = 0 },
200 };
201
202 /** Determines the verbosity of logging. */
203 static config_entry_t drl_loglevel = {
204     .next = &nodelimit,
205     .key = "drl_loglevel",
206     .type = CONFIG_TYPE_INT,
207     .options = CONFIG_OPT_MANDATORY,
208     .u = { .value = LOG_WARN },
209 };
210
211 /** The path of the logfile. */
212 static config_entry_t drl_logfile = {
213     .next = &drl_loglevel,
214     .key = "drl_logfile",
215     .type = CONFIG_TYPE_STRING,
216     .options = CONFIG_OPT_MANDATORY,
217     .u = { .string = "drl_logfile.log" },
218 };
219
220 /** The choice of DRL protocol. */
221 static config_entry_t policy = {
222     .next = &drl_logfile,
223     .key = "policy",
224     .type = CONFIG_TYPE_STRING,
225     .options = CONFIG_OPT_MANDATORY,
226     .u = { .string = "GRD" },
227 };
228
229 /** The estimate interval, in milliseconds. */
230 static config_entry_t estintms = {
231     .next = &policy,
232     .key = "estintms",
233     .type = CONFIG_TYPE_INT,
234     .options = CONFIG_OPT_MANDATORY,
235     .u = { .value = 100 },
236 };
237
238 #define config_entries (&estintms)
239
240 /*
241  * Debug functionality
242  */
243
244 #ifdef DMALLOC
245 #include <dmalloc.h>
246 #endif
247
248 #define NIPQUAD(addr) \
249     ((unsigned char *)&addr)[0], \
250     ((unsigned char *)&addr)[1], \
251     ((unsigned char *)&addr)[2], \
252     ((unsigned char *)&addr)[3]
253
254 #define IPQUAD(addr) \
255     ((unsigned char *)&addr)[3], \
256     ((unsigned char *)&addr)[2], \
257     ((unsigned char *)&addr)[1], \
258     ((unsigned char *)&addr)[0]
259
260
261
262 /* Salt for the hash functions */
263 static int salt;
264
265 /*
266  * Hash slice name lookups on context ID.
267  */
268
269 /* Special context IDs */
270 #define UNKNOWN_XID -1
271 #define ROOT_XID 0
272
273 enum {
274     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
275     ICMP_ECHOREPLY_XID,
276     ICMP_UNREACH_XID,
277 };
278
279
280 /* globals */
281 pthread_t estimate_thread;
282 pthread_t signal_thread;
283 pthread_t comm_thread;
284 uint32_t local_ip = 0;
285 limiter_t limiter;
286 extern FILE *logfile;
287 extern uint8_t system_loglevel;
288 extern uint8_t do_enforcement;
289
290 /* Used to simulate partitions. */
291 int do_partition = 0;
292 int partition_set = 0xfffffff;
293
294 /* functions */
295
296 static inline uint32_t
297 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
298 {
299     unsigned char mybytes[FLOWKEYSIZE];
300     mybytes[0] = protocol;
301     *(uint32_t*)(&(mybytes[1])) = src_ip;
302     *(uint32_t*)(&(mybytes[5])) = dst_ip;
303     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
304     return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
305 }
306
307 uint32_t sampled_hasher(const key_flow *key) {
308     /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
309     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
310 }
311
312 uint32_t standard_hasher(const key_flow *key) {
313     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
314 }
315
316 uint32_t multiple_hasher(const key_flow *key) {
317     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
318 }
319
320 struct intr_id {
321     char* name;
322     ulog_iret_t *res;
323 };
324
325 /* Interesting keys */
326 enum {
327     OOB_TIME_SEC = 0,
328     OOB_MARK,
329     IP_SADDR,
330     IP_DADDR,
331     IP_TOTLEN,
332     IP_PROTOCOL,
333     TCP_SPORT,
334     TCP_DPORT,
335     TCP_ACK,
336     TCP_FIN,
337     TCP_SYN,
338     TCP_RST,
339     UDP_SPORT,
340     UDP_DPORT,
341     ICMP_TYPE,
342     ICMP_CODE,
343     GRE_FLAG_KEY,
344     GRE_VERSION,
345     GRE_KEY,
346     PPTP_CALLID,
347 };
348
349 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
350 static struct intr_id intr_ids[] = {
351     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
352     [OOB_MARK] = { "oob.mark", 0 },
353     [IP_SADDR] = { "ip.saddr", 0 },
354     [IP_DADDR] = { "ip.daddr", 0 },
355     [IP_TOTLEN] = { "ip.totlen", 0 },
356     [IP_PROTOCOL] = { "ip.protocol", 0 },
357     [TCP_SPORT] = { "tcp.sport", 0 },
358     [TCP_DPORT] { "tcp.dport", 0 },
359     [TCP_ACK] = { "tcp.ack", 0 },
360     [TCP_FIN] = { "tcp.fin", 0 },
361     [TCP_SYN] = { "tcp.syn", 0 },
362     [TCP_RST] = { "tcp.rst", 0 },
363     [UDP_SPORT] = { "udp.sport", 0 },
364     [UDP_DPORT] = { "udp.dport", 0 },
365     [ICMP_TYPE] = { "icmp.type", 0 },
366     [ICMP_CODE] = { "icmp.code", 0 },
367     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
368     [GRE_VERSION] = { "gre.version", 0 },
369     [GRE_KEY] = { "gre.key", 0 },
370     [PPTP_CALLID] = { "pptp.callid", 0 },
371 };
372
373 #define GET_VALUE(x) intr_ids[x].res->value
374
375 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
376
377 static int _output_drl(ulog_iret_t *res)
378 {
379     int xid;
380     uint32_t src_ip, dst_ip;
381     uint16_t src_port, dst_port;
382     uint8_t protocol;
383
384     key_flow key;
385     identity_t *ident;
386     leaf_t *leaf;
387
388     protocol = GET_VALUE(IP_PROTOCOL).ui8;
389     src_ip = GET_VALUE(IP_SADDR).ui32;
390     dst_ip = GET_VALUE(IP_DADDR).ui32;
391     xid = GET_VALUE(OOB_MARK).ui32;
392
393     switch (protocol) {
394             
395         case IPPROTO_TCP:
396             src_port = GET_VALUE(TCP_SPORT).ui16;
397             dst_port = GET_VALUE(TCP_DPORT).ui16;
398             break;
399
400         case IPPROTO_UDP:
401             /* netflow had an issue with many udp flows and set
402              * src_port=0 to handle it.  We don't. 
403              */
404             src_port = GET_VALUE(UDP_SPORT).ui16;
405
406             /*
407              * traceroutes create a large number of flows in the db
408              * this is a quick hack to catch the most common form
409              * of traceroute (basically we're mapping any UDP packet
410              * in the 33435-33524 range to the "trace" port, 33524 is
411              * 3 packets * nhops (30).
412              */
413             dst_port = GET_VALUE(UDP_DPORT).ui16;
414             if (dst_port >= 33435 && dst_port <= 33524)
415                 dst_port = 33435;
416             break;
417
418         case IPPROTO_ICMP:
419             src_port = GET_VALUE(ICMP_TYPE).ui8;
420             dst_port = GET_VALUE(ICMP_CODE).ui8;
421
422             /*
423              * We special case some of the ICMP traffic that the kernel
424              * always generates. Since this is attributed to root, it 
425              * creates significant "noise" in the output. We want to be
426              * able to quickly see that root is generating traffic.
427              */
428             if (xid == ROOT_XID) {
429                 if (src_port == ICMP_ECHOREPLY)
430                     xid = ICMP_ECHOREPLY_XID;
431                 else if (src_port == ICMP_UNREACH)
432                     xid = ICMP_UNREACH_XID;
433             }
434             break;
435
436         case IPPROTO_GRE:
437             if (GET_VALUE(GRE_FLAG_KEY).b) {
438                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
439                     /* Get PPTP call ID */
440                     src_port = GET_VALUE(PPTP_CALLID).ui16;
441                 } else {
442                     /* XXX Truncate GRE keys to 16 bits */
443                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
444                 }
445             } else {
446                 /* No key available */
447                 src_port = 0;
448             }
449             dst_port = 0;
450             break;
451
452         default:
453             /* This is the default key for packets from unsupported protocols */
454             src_port = 0;
455             dst_port = 0;
456             break;
457     }
458
459     key.protocol = protocol;
460     key.source_ip = src_ip;
461     key.dest_ip = dst_ip;
462     key.source_port = src_port;
463     key.dest_port = dst_port;
464     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
465     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
466
467     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
468
469     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
470
471     /* Even if the packet doesn't match any specific xid, it should still
472      * count in the machine-type tables.  This catches root (xid == 0) and
473      * unclassified (xid = fff) packets, which don't have map entries. */
474     if (leaf == NULL) {
475         ident = limiter.stable_instance.last_machine;
476     } else {
477         ident = leaf->parent;
478     }
479
480     while (ident) {
481         pthread_mutex_lock(&ident->table_mutex);
482
483         /* Update the identity's table. */
484         ident->table_sample_function(ident->table, &key);
485
486 #ifdef SHADOW_ACCTING
487
488         /* Update the shadow perfect copy of the accounting table. */
489         standard_table_sample((standard_flow_table) ident->shadow_table, &key);
490
491 #endif
492
493         pthread_mutex_unlock(&ident->table_mutex);
494
495         ident = ident->parent;
496     }
497
498     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
499
500     return 0;
501 }
502
503 /* get all key id's for the keys we are intrested in */
504 static int get_ids(void)
505 {
506     int i;
507     struct intr_id *cur_id;
508
509     for (i = 0; i < INTR_IDS; i++) {
510         cur_id = &intr_ids[i];
511         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
512         if (!cur_id->res) {
513             ulogd_log(ULOGD_ERROR, 
514                     "Cannot resolve keyhash id for %s\n", 
515                     cur_id->name);
516             return 1;
517         }
518     }
519     return 0;
520 }
521
522 static void free_identity(identity_t *ident) {
523     if (ident) {
524         free_comm(&ident->comm);
525
526         if (ident->table) {
527             ident->table_destroy_function(ident->table);
528         }
529
530         if (ident->loop_action) {
531             ident->loop_action->valid = 0;
532         }
533
534         if (ident->comm_action) {
535             ident->comm_action->valid = 0;
536         }
537
538         pthread_mutex_destroy(&ident->table_mutex);
539
540         free(ident);
541     }
542 }
543
544 static void free_identity_map(map_handle map) {
545     identity_t *tofree = NULL;
546
547     map_reset_iterate(map);
548     while ((tofree = (identity_t *) map_next(map))) {
549         free_identity(tofree);
550     }
551
552     free_map(map, 0);
553 }
554
555 static void free_instance(drl_instance_t *instance) {
556     if (instance->leaves)
557         free(instance->leaves);
558     if (instance->leaf_map)
559         free_map(instance->leaf_map, 0);
560     if (instance->ident_map)
561         free_identity_map(instance->ident_map);
562     if (instance->machines)
563         free(instance->machines);
564     if (instance->sets)
565         free(instance->sets);
566
567     /* FIXME: Drain the calendar first and free all the entries. */
568     if (instance->cal) {
569         free(instance->cal);
570     }
571
572     memset(instance, 0, sizeof(drl_instance_t));
573 }
574
575 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
576     /* Free configs. */
577     if (configs.machines)
578         free_ident_list(configs.machines);
579     if (configs.sets)
580         free_ident_list(configs.sets);
581
582     /* Free instance. */
583     if (instance)
584         free_instance(instance);
585 }
586
587 static identity_t *new_identity(ident_config *config) {
588     identity_t *ident = malloc(sizeof(identity_t));
589     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
590     ident_peer *peer = config->peers;
591     int peer_slot = 0;
592
593     if (ident == NULL) {
594         return NULL;
595     }
596
597     if (comm_nodes == NULL) {
598         free(ident);
599         return NULL;
600     }
601
602     memset(ident, 0, sizeof(identity_t));
603     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
604
605     ident->id = config->id;
606     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
607     ident->fixed_ewma_weight = config->fixed_ewma_weight;
608     ident->communication_intervals = config->communication_intervals;
609     ident->mainloop_intervals = config->mainloop_intervals;
610     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
611                              (limiter.estintms/1000.0) * config->mainloop_intervals);
612     ident->parent = NULL;
613     ident->independent = config->independent;
614
615     pthread_mutex_init(&ident->table_mutex, NULL);
616     switch (config->accounting) {
617         case ACT_STANDARD:
618             ident->table =
619                 standard_table_create(standard_hasher, &ident->common);
620
621             /* Ugly function pointer casting.  Makes things sufficiently
622              * generic, though. */
623             ident->table_sample_function =
624                 (int (*)(void *, const key_flow *)) standard_table_sample;
625             ident->table_cleanup_function =
626                 (int (*)(void *)) standard_table_cleanup;
627             ident->table_update_function =
628                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
629             ident->table_destroy_function =
630                 (void (*)(void *)) standard_table_destroy;
631             break;
632
633         case ACT_MULTIPLE:
634             ident->table =
635                 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
636
637             ident->table_sample_function =
638                 (int (*)(void *, const key_flow *)) multiple_table_sample;
639             ident->table_cleanup_function =
640                 (int (*)(void *)) multiple_table_cleanup;
641             ident->table_update_function =
642                 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
643             ident->table_destroy_function =
644                 (void (*)(void *)) multiple_table_destroy;
645             break;
646
647         case ACT_SAMPLEHOLD:
648             ident->table = sampled_table_create(sampled_hasher,
649                     ident->limit * IDENT_CLEAN_INTERVAL,
650                     SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
651
652             ident->table_sample_function =
653                 (int (*)(void *, const key_flow *)) sampled_table_sample;
654             ident->table_cleanup_function =
655                 (int (*)(void *)) sampled_table_cleanup;
656             ident->table_update_function =
657                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
658             ident->table_destroy_function =
659                 (void (*)(void *)) sampled_table_destroy;
660             break;
661
662         case ACT_SIMPLE:
663             ident->table = simple_table_create(&ident->common);
664
665             ident->table_sample_function =
666                 (int (*)(void *, const key_flow *)) simple_table_sample;
667             ident->table_cleanup_function =
668                 (int (*)(void *)) simple_table_cleanup;
669             ident->table_update_function =
670                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
671             ident->table_destroy_function =
672                 (void (*)(void *)) simple_table_destroy;
673             break;
674     }
675
676 #ifdef SHADOW_ACCTING
677
678     ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
679
680     if (ident->shadow_table == NULL) {
681         ident->table_destroy_function(ident->table);
682         free(ident);
683         return NULL;
684     }
685
686 #endif
687
688     /* Make sure the table was allocated. */
689     if (ident->table == NULL) {
690         free(ident);
691         return NULL;
692     }
693
694     while (peer) {
695         comm_nodes[peer_slot].addr = peer->ip;
696         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
697         peer = peer->next;
698         peer_slot += 1;
699     }
700
701     if (new_comm(&ident->comm, config, comm_nodes)) {
702         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
703         return NULL;
704     }
705
706     ident->comm.remote_nodes = comm_nodes;
707
708     if (!create_htb.u.value) {
709         ident->htb_node = config->htb_node;
710         ident->htb_parent = config->htb_parent;
711     }
712
713     return ident;
714 }
715
716 /* Determines the validity of the parameters of one ident_config.
717  *
718  * 0 valid
719  * 1 invalid
720  */
721 static int validate_config(ident_config *config) {
722     /* Limit must be a positive integer. */
723     if (config->limit < 1) {
724         return 1;
725     }
726
727     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
728     if (config->commfabric != COMM_MESH &&
729             config->commfabric != COMM_GOSSIP) {
730         return 1;
731     }
732
733     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
734     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
735         return 1;
736     }
737
738     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
739      * ACT_SIMPLE, ACT_MULTIPLE). */
740     if (config->accounting != ACT_STANDARD &&
741             config->accounting != ACT_SAMPLEHOLD &&
742             config->accounting != ACT_SIMPLE &&
743             config->accounting != ACT_MULTIPLE) {
744         return 1;
745     }
746
747     /* Ewma weight must be greater than or equal to zero. */
748     if (config->fixed_ewma_weight < 0) {
749         return 1;
750     }
751
752     if (!create_htb.u.value) {
753         if (config->htb_node < 0 || config->htb_parent < 0) {
754             printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n");
755             return 1;
756         }
757     } else {
758         if (config->htb_node > -1 || config->htb_parent > -1) {
759             printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n");
760         }
761     }
762
763     /* Note: Parsing stage requires that each ident has at least one peer. */
764     return 0;
765 }
766
767 /* 0 success
768  * non-zero failure
769  */
770 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
771
772     ident_config *mlist = configs.machines;
773     ident_config *slist = configs.sets;
774     ident_config *tmp = NULL;
775     int i = 0;
776
777     while (mlist) {
778         /* ID must be non-zero and unique. */
779         /* This is ugly and hackish, but this function will be called rarely.
780          * I'm tired of trying to be clever. */
781         if (mlist->id < 0) {
782             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
783             return EINVAL;
784         }
785         tmp = mlist->next;
786         while (tmp) {
787             if (mlist->id == tmp->id) {
788                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
789                 return EINVAL;
790             }
791             tmp = tmp->next;
792         }
793         tmp = configs.sets;
794         while (tmp) {
795             if (mlist->id == tmp->id) {
796                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
797                 return EINVAL;
798             }
799             tmp = tmp->next;
800         }
801
802         if (validate_config(mlist)) {
803             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
804             return EINVAL;
805         }
806
807         if (mlist->independent) {
808             printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n");
809             mlist->independent = 0;
810         }
811
812         mlist = mlist->next;
813     }
814
815     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
816     if (instance->sets == NULL) {
817         printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n");
818         return ENOMEM;
819     }
820
821     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
822     instance->set_count = configs.set_count;
823
824     /* For sets, make sure that the hierarchy is valid. */
825     while (slist) {
826         ident_member *members = slist->members;
827
828         /* ID must be non-zero and unique. */
829         if (slist->id < 0) {
830             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
831             return EINVAL;
832         }
833         tmp = slist->next;
834         while (tmp) {
835             if (slist->id == tmp->id) {
836                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
837                 return EINVAL;
838             }
839             tmp = tmp->next;
840         }
841
842         if (validate_config(slist)) {
843             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
844             return EINVAL;
845         }
846
847         /* Allocate an identity_t for this set-type identity. */
848         instance->sets[i] = new_identity(slist);
849
850         if (instance->sets[i] == NULL) {
851             printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n");
852             return ENOMEM;
853         }
854
855         /* Loop through children and look up each in leaf or ident map
856          * depending on the type of child.  Set the child's parent pointer
857          * to the identity we just created above, unless it is already set,
858          * in which case we have an error. */
859         while (members) {
860             identity_t *child_ident = NULL;
861             leaf_t *child_leaf = NULL;
862
863             switch (members->type) {
864                 case MEMBER_XID:
865                     child_leaf = map_search(instance->leaf_map, &members->value,
866                                             sizeof(members->value));
867                     if (child_leaf == NULL) {
868                         printlog(LOG_CRITICAL, "xid: child leaf not found.\n");
869                         return EINVAL;
870                     }
871                     if (child_leaf->parent != NULL) {
872                         /* Error - This leaf already has a parent. */
873                         printlog(LOG_CRITICAL, "xid: child already has a parent.\n");
874                         return EINVAL;
875                     }
876                     child_leaf->parent = instance->sets[i];
877                     break;
878                 case MEMBER_GUID:
879                     child_ident = map_search(instance->ident_map, &members->value,
880                                              sizeof(members->value));
881                     if (child_ident == NULL) {
882                         printlog(LOG_CRITICAL, "guid: child identity not found.\n");
883                         return EINVAL;
884                     }
885                     if (child_ident->parent != NULL) {
886                         /* Error - This identity already has a parent. */
887                         printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n");
888                         return EINVAL;
889                     }
890                     child_ident->parent = instance->sets[i];
891                     break;
892                 default:
893                     /* Error - shouldn't be possible. */
894                     break;
895             }
896             members = members->next;
897         }
898
899         map_insert(instance->ident_map, &instance->sets[i]->id,
900                    sizeof(instance->sets[i]->id), instance->sets[i]);
901
902         slist = slist->next;
903         i++;
904     }
905     return 0;
906 }
907
908 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
909     int count = 0;
910     identity_t *current_ident;
911     leaf_t *current_leaf;
912     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
913     if (leaves == NULL) {
914         return 1;
915     }
916
917     map_reset_iterate(instance->leaf_map);
918     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
919         current_ident = current_leaf->parent;
920         while (current_ident != NULL && current_ident != instance->last_machine) {
921             if (current_ident == ident) {
922                 /* Found the ident we were looking for - add the leaf. */
923                 leaves[count] = current_leaf;
924                 count += 1;
925                 break;
926             }
927             current_ident = current_ident->parent;
928         }
929     }
930
931     ident->leaves = leaves;
932     ident->leaf_count = count;
933
934     return 0;
935 }
936
937 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
938     int i, j;
939     ident_config *config = configs.machines;
940     leaf_t *leaf = NULL;
941
942     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
943
944     if (instance->cal == NULL) {
945         return ENOMEM;
946     }
947
948     for (i = 0; i < SCHEDLEN; ++i) {
949         TAILQ_INIT(instance->cal + i);
950     }
951     instance->cal_slot = 0;
952
953     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
954
955     if (instance->machines == NULL) {
956         return ENOMEM;
957     }
958
959     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
960     instance->machine_count = configs.machine_count;
961
962     /* Allocate and add the machine identities. */
963     for (i = 0; i < configs.machine_count; ++i) {
964         identity_action *loop_action;
965         identity_action *comm_action;
966         instance->machines[i] = new_identity(config);
967
968         if (instance->machines[i] == NULL) {
969             return ENOMEM;
970         }
971
972         loop_action = malloc(sizeof(identity_action));
973         comm_action = malloc(sizeof(identity_action));
974
975         if (loop_action == NULL || comm_action == NULL) {
976             return ENOMEM;
977         }
978
979         /* The first has no parent - it is the root.  All others have the
980          * previous ident as their parent. */
981         if (i == 0) {
982             instance->machines[i]->parent = NULL;
983         } else {
984             instance->machines[i]->parent = instance->machines[i - 1];
985         }
986
987         instance->last_machine = instance->machines[i];
988
989         /* Add the ident to the guid->ident map. */
990         map_insert(instance->ident_map, &instance->machines[i]->id,
991                    sizeof(instance->machines[i]->id), instance->machines[i]);
992
993         config = config->next;
994
995         memset(loop_action, 0, sizeof(identity_action));
996         memset(comm_action, 0, sizeof(identity_action));
997         loop_action->ident = instance->machines[i];
998         loop_action->action = ACTION_MAINLOOP;
999         loop_action->valid = 1;
1000         comm_action->ident = instance->machines[i];
1001         comm_action->action = ACTION_COMMUNICATE;
1002         comm_action->valid = 1;
1003
1004         instance->machines[i]->loop_action = loop_action;
1005         instance->machines[i]->comm_action = comm_action;
1006
1007         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1008                           loop_action, calendar);
1009
1010         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1011                           comm_action, calendar);
1012
1013         /* Setup the array of pointers to leaves.  This is easy for machines
1014          * because a machine node applies to every leaf. */
1015         instance->machines[i]->leaves =
1016             malloc(instance->leaf_count * sizeof(leaf_t *));
1017         if (instance->machines[i]->leaves == NULL) {
1018             return ENOMEM;
1019         }
1020         instance->machines[i]->leaf_count = instance->leaf_count;
1021         for (j = 0; j < instance->leaf_count; ++j) {
1022             instance->machines[i]->leaves[j] = &instance->leaves[j];
1023         }
1024     }
1025
1026     /* Connect the set subtree to the machines. Any set or leaf without a
1027      * parent will take the last machine as its parent. */
1028
1029     /* Leaves... */
1030     map_reset_iterate(instance->leaf_map);
1031     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1032         if (leaf->parent == NULL) {
1033             leaf->parent = instance->last_machine;
1034         }
1035     }
1036
1037     /* Sets... */
1038     for (i = 0; i < instance->set_count; ++i) {
1039         identity_action *loop_action;
1040         identity_action *comm_action;
1041
1042         if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) {
1043             instance->sets[i]->parent = instance->last_machine;
1044         }
1045
1046         loop_action = malloc(sizeof(identity_action));
1047         comm_action = malloc(sizeof(identity_action));
1048
1049         if (loop_action == NULL || comm_action == NULL) {
1050             return ENOMEM;
1051         }
1052
1053         memset(loop_action, 0, sizeof(identity_action));
1054         memset(comm_action, 0, sizeof(identity_action));
1055         loop_action->ident = instance->sets[i];
1056         loop_action->action = ACTION_MAINLOOP;
1057         loop_action->valid = 1;
1058         comm_action->ident = instance->sets[i];
1059         comm_action->action = ACTION_COMMUNICATE;
1060         comm_action->valid = 1;
1061
1062         instance->sets[i]->loop_action = loop_action;
1063         instance->sets[i]->comm_action = comm_action;
1064
1065         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1066                           loop_action, calendar);
1067
1068         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1069                           comm_action, calendar);
1070
1071         /* Setup the array of pointers to leaves.  This is harder for sets,
1072          * but this doesn't need to be super-efficient because it happens
1073          * rarely and it isn't on the critical path for reconfig(). */
1074         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1075             return ENOMEM;
1076         }
1077     }
1078
1079     /* Success. */
1080     return 0;
1081 }
1082
1083 static void print_instance(drl_instance_t *instance) {
1084     leaf_t *leaf = NULL;
1085     identity_t *ident = NULL;
1086
1087     if (system_loglevel == LOG_DEBUG) {
1088         map_reset_iterate(instance->leaf_map);
1089         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1090             printf("%x:", leaf->xid);
1091             ident = leaf->parent;
1092             while (ident) {
1093                 printf("%d:",ident->id);
1094                 ident = ident->parent;
1095             }
1096             printf("Leaf's parent pointer is %p\n", leaf->parent);
1097         }
1098
1099         printf("instance->last_machine is %p\n", instance->last_machine);
1100     }
1101 }
1102
1103 static int assign_htb_hierarchy(drl_instance_t *instance) {
1104     int i, j;
1105     int next_node = 0x100;
1106
1107     /* If we're not going to create our own htb hierarchy (for instance,
1108      * if we're going to let PL's node manager do it for us), then we don't
1109      * want this function to do anything. */
1110     if (!create_htb.u.value) {
1111         printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1112         return 0;
1113     }
1114
1115     /* Chain machine nodes under 1:10. */
1116     for (i = 0; i < instance->machine_count; ++i) {
1117         if (instance->machines[i]->parent == NULL) {
1118             /* Top node. */
1119             instance->machines[i]->htb_parent = 0x10;
1120         } else {
1121             /* Pointerific! */
1122             instance->machines[i]->htb_parent =
1123                 instance->machines[i]->parent->htb_node;
1124         }
1125
1126         instance->machines[i]->htb_node = next_node;
1127         next_node += 1;
1128     }
1129
1130     next_node += 0x10;
1131
1132     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1133      * already there. */
1134     for (j = (instance->set_count - 1); j >= 0; --j) {
1135         if (instance->sets[j]->parent == NULL) {
1136             /* Independent node - goes under 0x10 away from machine nodes. */
1137             instance->sets[j]->htb_parent = 0x10;
1138         } else {
1139             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1140         }
1141         instance->sets[j]->htb_node = next_node;
1142
1143         next_node += 1;
1144     }
1145
1146     return 0;
1147 }
1148
1149 /* Added this so that I could comment one line and kill off all of the
1150  * command execution. */
1151 static inline int execute_cmd(const char *cmd) {
1152     return system(cmd);
1153 }
1154
1155 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1156                                const uint32_t classid_major, const uint32_t classid_minor,
1157                                const uint64_t rate, const uint64_t ceil) {
1158     char cmd[300];
1159
1160     sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1161             iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1162     printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1163
1164     return execute_cmd(cmd);
1165 }
1166
1167 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1168                                 const uint32_t parent_minor, const uint32_t handle,
1169                                 const int loss, const int delay) {
1170     char cmd[300];
1171
1172     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1173             parent_minor, handle);
1174     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1175     if (execute_cmd(cmd))
1176         printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1177
1178     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1179             iface, parent_major, parent_minor, handle, loss, delay);
1180     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1181     return execute_cmd(cmd);
1182 }
1183
1184 static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
1185                                 const uint32_t parent_minor, const uint32_t handle,
1186                                 const int perturb) {
1187     char cmd[300];
1188
1189     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1190             parent_minor, handle);
1191     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1192     if (execute_cmd(cmd))
1193         printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
1194
1195     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
1196             iface, parent_major, parent_minor, handle, perturb);
1197     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1198     return execute_cmd(cmd);
1199 }
1200
1201 static int create_htb_hierarchy(drl_instance_t *instance) {
1202     char cmd[300];
1203     int i, j, k;
1204     uint64_t gigabit = 1024 * 1024 * 1024;
1205
1206     /* If we're not going to create our own htb hierarchy (for instance,
1207      * if we're going to let PL's node manager do it for us), then we don't
1208      * want this function to do anything. */
1209     if (!create_htb.u.value) {
1210         printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1211         return 0;
1212     }
1213
1214     /* Nuke the hierarchy. */
1215     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1216     execute_cmd(cmd);
1217     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1218
1219     /* Re-initialize the basics. */
1220     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1221     if (execute_cmd(cmd)) {
1222         return 1;
1223     }
1224     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1225
1226     if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1227         return 1;
1228
1229     /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1230     if (limiter.nodelimit) {
1231         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1232             return 1;
1233     } else {
1234         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1235             return 1;
1236     }
1237
1238     /* Add machines. */
1239     for (i = 0; i < instance->machine_count; ++i) {
1240         if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1241                          instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1242             return 1;
1243         }
1244     }
1245
1246 #define LIMITEXEMPT
1247
1248     /* Add back 1:20. */
1249 #ifdef LIMITEXEMPT
1250     if (instance->last_machine == NULL) {
1251         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1252     } else {
1253         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1254             instance->last_machine->htb_node);
1255     }
1256 #else
1257     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1258 #endif
1259
1260     if (execute_cmd(cmd)) {
1261         return 1;
1262     }
1263     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1264
1265     /* Add sets. */
1266     for (j = (instance->set_count - 1); j >= 0; --j) {
1267         if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1268                          instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1269             return 1;
1270         }
1271     }
1272
1273     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1274     for (k = 0; k < instance->leaf_count; ++k) {
1275         if (instance->leaves[k].parent == NULL) {
1276             if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1277                 return 1;
1278         } else {
1279             if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1280                 return 1;
1281         }
1282
1283         /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1284         if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1285             return 1;
1286     }
1287
1288     /* Add 1:1000 and 1:2000 */
1289     if (instance->last_machine == NULL) {
1290         if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1291             return 1;
1292     } else {
1293         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1294             return 1;
1295     }
1296
1297     if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1298         return 1;
1299
1300     /* Add 1:1fff and 1:2fff */
1301     if (instance->last_machine == NULL) {
1302         if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1303             return 1;
1304     } else {
1305         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1306             return 1;
1307     }
1308
1309     if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1310         return 1;
1311
1312     /* Artifical delay or loss for experimentation. */
1313     if (netem_delay.u.value || netem_loss.u.value) {
1314         if (!strcmp(netem_slice.u.string, "ALL")) {
1315             /* By default, netem applies to all leaves. */
1316             if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1317                 return 1;
1318             if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1319                 return 1;
1320
1321             for (k = 0; k < instance->leaf_count; ++k) {
1322                 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1323                             (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1324                     return 1;
1325                 }
1326
1327                 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1328             }
1329         } else {
1330             /* netem_slice is not the default ALL value.  Only apply netem
1331              * to the slice that is set in netem_slice.u.string. */
1332             uint32_t slice_xid;
1333
1334             sscanf(netem_slice.u.string, "%x", &slice_xid);
1335
1336             if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1337                 return 1;
1338         }
1339     }
1340
1341     /* Turn on SFQ for experimentation. */
1342     if (strcmp(sfq_slice.u.string, "NONE")) {
1343         if (!strcmp(sfq_slice.u.string, "ALL")) {
1344             if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
1345                 return 1;
1346             if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
1347                 return 1;
1348
1349             for (k = 0; k < instance->leaf_count; ++k) {
1350                 if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
1351                             (0x1000 | instance->leaves[k].xid), 30)) {
1352                     return 1;
1353                 }
1354             }
1355         } else {
1356             uint32_t slice_xid;
1357
1358             sscanf(sfq_slice.u.string, "%x", &slice_xid);
1359
1360             if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
1361                 return 1;
1362         }
1363     }
1364
1365     return 0;
1366 }
1367
1368 static int setup_tc_grd(drl_instance_t *instance) {
1369     int i, j;
1370     char cmd[300];
1371
1372     for (i = 0; i < instance->leaf_count; ++i) {
1373         /* Delete the old pfifo qdisc that might have been there before. */
1374         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1375                 instance->leaves[i].xid, instance->leaves[i].xid);
1376
1377         if (execute_cmd(cmd)) {
1378             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1379         }
1380
1381         /* Add the netem qdisc. */
1382         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1383                 instance->leaves[i].xid, instance->leaves[i].xid);
1384
1385         if (execute_cmd(cmd)) {
1386             printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1387             return 1;
1388         }
1389     }
1390
1391     /* Do the same for 1000 and 1fff. */
1392     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1393
1394     if (execute_cmd(cmd)) {
1395         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1396     }
1397
1398     /* Add the netem qdisc. */
1399     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1400
1401     if (execute_cmd(cmd)) {
1402         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1403         return 1;
1404     }
1405
1406     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1407
1408     if (execute_cmd(cmd)) {
1409         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1410     }
1411
1412     /* Add the netem qdisc. */
1413     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1414
1415     if (execute_cmd(cmd)) {
1416         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1417         return 1;
1418     }
1419
1420     /* Artifical delay or loss for experimentation. */
1421     if (netem_delay.u.value || netem_loss.u.value) {
1422         if (!strcmp(netem_slice.u.string, "ALL")) {
1423             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1424             if (execute_cmd(cmd)) {
1425                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1426                 return 1;
1427             }
1428
1429             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1430             if (execute_cmd(cmd)) {
1431                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1432                 return 1;
1433             }
1434
1435             for (j = 0; j < instance->leaf_count; ++j) {
1436                 leaf_t *current = &instance->leaves[j];
1437
1438                 current->delay = netem_delay.u.value;
1439
1440                 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
1441
1442                 if (execute_cmd(cmd)) {
1443                     printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1444                     return 1;
1445                 }
1446             }
1447         } else {
1448             uint32_t slice_xid;
1449             leaf_t *leaf = NULL;
1450
1451             sscanf(netem_slice.u.string, "%x", &slice_xid);
1452
1453             leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
1454
1455             if (leaf == NULL) {
1456                 /* Leaf not found - invalid selection. */
1457                 printf("Your experimental setup is incorrect...\n");
1458                 return 1;
1459             }
1460
1461             leaf->delay = netem_delay.u.value;
1462
1463             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
1464
1465             if (execute_cmd(cmd)) {
1466                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1467                 return 1;
1468             }
1469         }
1470     }
1471
1472     return 0;
1473 }
1474
1475 /* init_drl 
1476  * 
1477  * Initialize this limiter with options 
1478  * Open UDP socket for peer communication
1479  */
1480 static int init_drl(void) {
1481     parsed_configs configs;
1482     struct sockaddr_in server_address;
1483     
1484     memset(&limiter, 0, sizeof(limiter_t));
1485
1486     /* Setup logging. */
1487     system_loglevel = (uint8_t) drl_loglevel.u.value;
1488     logfile = fopen(drl_logfile.u.string, "w");
1489
1490     if (logfile == NULL) {
1491         printf("Couldn't open logfile - ");
1492         perror("fopen()");
1493         exit(EXIT_FAILURE);
1494     }
1495
1496     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1497
1498     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1499
1500     init_hashing();  /* for all hash maps */
1501
1502     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1503
1504     /* determine our local IP by iterating through interfaces */
1505     if ((limiter.ip = get_local_ip())==0) {
1506         printlog(LOG_CRITICAL,
1507                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1508         return (false);
1509     }
1510     limiter.localaddr = inet_addr(limiter.ip);
1511     limiter.port = htons(LIMITER_LISTEN_PORT);
1512     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1513     if (limiter.udp_socket < 0) {
1514         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1515         return false;
1516     }
1517
1518     memset(&server_address, 0, sizeof(server_address));
1519     server_address.sin_family = AF_INET;
1520     server_address.sin_addr.s_addr = limiter.localaddr;
1521     server_address.sin_port = limiter.port;
1522
1523     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1524         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1525         return false;
1526     }
1527
1528     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1529     if (strcasecmp(policy.u.string,"GRD") == 0) {
1530         limiter.policy = POLICY_GRD;
1531     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1532         limiter.policy = POLICY_FPS;
1533     } else {
1534         printlog(LOG_CRITICAL,
1535                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1536         return (false);
1537     }
1538
1539     limiter.estintms = estintms.u.value;
1540     if (limiter.estintms > 1000) {
1541         printlog(LOG_CRITICAL,
1542                  "DRL: sorry estimate intervals must be less than 1 second.");
1543         printlog(LOG_CRITICAL,
1544                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1545         limiter.estintms = 1000;
1546     }
1547     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1548     
1549     /* Acquire the big limiter lock for writing.  Prevents pretty much
1550      * anything else from happening while the hierarchy is being changed. */
1551     pthread_rwlock_wrlock(&limiter.limiter_lock);
1552
1553     limiter.stable_instance.ident_map = allocate_map();
1554     if (limiter.stable_instance.ident_map == NULL) {
1555         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1556         return false;
1557     }
1558
1559     if (get_eligible_leaves(&limiter.stable_instance)) {
1560         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1561         return false;
1562     }
1563
1564     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1565         /* Parse error occured. Return non-zero to notify init_drl(). */
1566         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1567             drl_configfile.u.string);
1568         return false;
1569     }
1570
1571     /* Validate identity hierarchy! */
1572     if (validate_configs(configs, &limiter.stable_instance)) {
1573         /* Clean up everything. */
1574         free_failed_config(configs, &limiter.stable_instance);
1575         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1576             drl_configfile.u.string);
1577         return false;
1578     }
1579
1580     if (init_identities(configs, &limiter.stable_instance)) {
1581         free_failed_config(configs, &limiter.stable_instance);
1582         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1583         return false;
1584     }
1585
1586     /* At this point, we should be done with configs. */
1587     free_ident_list(configs.machines);
1588     free_ident_list(configs.sets);
1589
1590     print_instance(&limiter.stable_instance);
1591
1592     switch (limiter.policy) {
1593         case POLICY_FPS:
1594             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1595                 free_instance(&limiter.stable_instance);
1596                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1597                 return false;
1598             }
1599
1600             if (create_htb_hierarchy(&limiter.stable_instance)) {
1601                 free_instance(&limiter.stable_instance);
1602                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1603                 return false;
1604             }
1605         break;
1606
1607         case POLICY_GRD:
1608             if (setup_tc_grd(&limiter.stable_instance)) {
1609                 free_instance(&limiter.stable_instance);
1610                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1611                 return false;
1612             }
1613         break;
1614
1615         default:
1616         return false;
1617     }
1618
1619     partition_set = partition.u.value;
1620
1621     pthread_rwlock_unlock(&limiter.limiter_lock);
1622
1623     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1624         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1625         return false;
1626     }
1627
1628     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1629
1630     return true;
1631 }
1632
1633 static void reconfig() {
1634     parsed_configs configs;
1635
1636     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1637     flushlog();
1638
1639     memset(&configs, 0, sizeof(parsed_configs));
1640     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1641
1642     limiter.new_instance.ident_map = allocate_map();
1643     if (limiter.new_instance.ident_map == NULL) {
1644         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1645         return;
1646     }
1647
1648     if (get_eligible_leaves(&limiter.new_instance)) {
1649         free_failed_config(configs, &limiter.new_instance);
1650         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1651         return;
1652     }
1653
1654     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1655         free_failed_config(configs, &limiter.new_instance);
1656         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1657         return;
1658     }
1659
1660     if (validate_configs(configs, &limiter.new_instance)) {
1661         free_failed_config(configs, &limiter.new_instance);
1662         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1663         pthread_rwlock_unlock(&limiter.limiter_lock);
1664         return;
1665     }
1666
1667     if (init_identities(configs, &limiter.new_instance)) {
1668         free_failed_config(configs, &limiter.new_instance);
1669         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1670         pthread_rwlock_unlock(&limiter.limiter_lock);
1671         return;
1672     }
1673
1674     free_ident_list(configs.machines);
1675     free_ident_list(configs.sets);
1676
1677     print_instance(&limiter.new_instance);
1678     
1679     /* Lock */
1680     pthread_rwlock_wrlock(&limiter.limiter_lock);
1681
1682     switch (limiter.policy) {
1683         case POLICY_FPS:
1684             if (assign_htb_hierarchy(&limiter.new_instance)) {
1685                 free_instance(&limiter.new_instance);
1686                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1687                 pthread_rwlock_unlock(&limiter.limiter_lock);
1688                 return;
1689             }
1690
1691             if (create_htb_hierarchy(&limiter.new_instance)) {
1692                 free_instance(&limiter.new_instance);
1693                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1694
1695                 /* Re-create old instance. */
1696                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1697                     /* Error reinstating the old one - big problem. */
1698                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1699                     printlog(LOG_CRITICAL, "Giving up...\n");
1700                     flushlog();
1701                     exit(EXIT_FAILURE);
1702                 }
1703
1704                 pthread_rwlock_unlock(&limiter.limiter_lock);
1705                 return;
1706             }
1707         break;
1708
1709         case POLICY_GRD:
1710             if (setup_tc_grd(&limiter.new_instance)) {
1711                 free_instance(&limiter.new_instance);
1712                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1713
1714                 /* Try to re-create old instance. */
1715                 if (setup_tc_grd(&limiter.stable_instance)) {
1716                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1717                     printlog(LOG_CRITICAL, "Giving up...\n");
1718                     flushlog();
1719                     exit(EXIT_FAILURE);
1720                 }
1721             }
1722         break;
1723
1724         default:
1725             /* Should be impossible. */
1726             printf("Pigs are flying?\n");
1727             exit(EXIT_FAILURE);
1728     }
1729
1730     /* Switch over new to stable instance. */
1731     free_instance(&limiter.stable_instance);
1732     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1733
1734     /* Success! - Unlock */
1735     pthread_rwlock_unlock(&limiter.limiter_lock);
1736 }
1737
1738 static int stop_enforcement(drl_instance_t *instance) {
1739     char cmd[300];
1740     int i;
1741
1742     for (i = 0; i < instance->machine_count; ++i) {
1743         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1744                 instance->machines[i]->htb_parent,
1745                 instance->machines[i]->htb_node);
1746
1747         if (execute_cmd(cmd)) {
1748             return 1;
1749         }
1750     }
1751
1752     for (i = 0; i < instance->set_count; ++i) {
1753         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1754                 instance->sets[i]->htb_parent,
1755                 instance->sets[i]->htb_node);
1756
1757         if (execute_cmd(cmd)) {
1758             return 1;
1759         }
1760     }
1761
1762     return 0;
1763 }
1764
1765 static void *signal_thread_func(void *args) {
1766     int sig;
1767     int err;
1768     sigset_t sigs;
1769
1770     sigemptyset(&sigs);
1771     sigaddset(&sigs, SIGHUP);
1772     sigaddset(&sigs, SIGUSR1);
1773     sigaddset(&sigs, SIGUSR2);
1774     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1775
1776     while (1) {
1777         sigemptyset(&sigs);
1778         sigaddset(&sigs, SIGHUP);
1779         sigaddset(&sigs, SIGUSR1);
1780         sigaddset(&sigs, SIGUSR2);
1781
1782         err = sigwait(&sigs, &sig);
1783
1784         if (err) {
1785             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1786             flushlog();
1787         }
1788
1789         switch (sig) {
1790             case SIGHUP:
1791                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1792                 reconfig();
1793                 //time_reconfig(1000); /* instrumentation */
1794                 flushlog();
1795                 break;
1796             case SIGUSR1:
1797                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1798                 if (do_enforcement) {
1799                     do_enforcement = 0;
1800                     stop_enforcement(&limiter.stable_instance);
1801                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1802                 } else {
1803                     do_enforcement = 1;
1804                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1805                 }
1806                 pthread_rwlock_unlock(&limiter.limiter_lock);
1807                 break;
1808             case SIGUSR2:
1809                 do_partition = !do_partition;
1810                 break;
1811             default:
1812                 /* Intentionally blank. */
1813                 break;
1814         }
1815     }
1816 }
1817
1818 static int drl_plugin_init() {
1819     sigset_t signal_mask;
1820
1821     sigemptyset(&signal_mask);
1822     sigaddset(&signal_mask, SIGHUP);
1823     sigaddset(&signal_mask, SIGUSR1);
1824     sigaddset(&signal_mask, SIGUSR2);
1825     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1826
1827     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1828         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1829         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1830         flushlog();
1831         exit(EXIT_FAILURE);
1832     }
1833
1834     if (!init_drl()) {
1835         printlog(LOG_CRITICAL, "Init failed. :(\n");
1836         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1837         flushlog();
1838         exit(EXIT_FAILURE);
1839     }
1840
1841     /* start up the thread that will periodically estimate the
1842      * local rate and set the local limits
1843      * see estimate.c
1844      */
1845     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1846         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1847         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1848         exit(EXIT_FAILURE);
1849     }
1850
1851     if (enforce_on.u.value) {
1852         pthread_rwlock_wrlock(&limiter.limiter_lock);
1853         do_enforcement = 1;
1854         printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1855         pthread_rwlock_unlock(&limiter.limiter_lock);
1856     }
1857
1858     return 0;
1859 }
1860
1861 static ulog_output_t drl_op = {
1862     .name = "drl",
1863     .output = &_output_drl,
1864     .signal = NULL, /* This appears to be broken. Using my own handler. */
1865     .init = &drl_plugin_init,
1866     .fini = NULL,
1867 };
1868
1869 #if 0
1870 /* Tests the amount of time it takes to call reconfig(). */
1871 static void time_reconfig(int iterations) {
1872     struct timeval start, end;
1873     int i;
1874
1875     gettimeofday(&start, NULL);
1876     for (i = 0; i < iterations; ++i) {
1877         reconfig();
1878     }
1879     gettimeofday(&end, NULL);
1880
1881     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1882            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1883     exit(0);
1884
1885     // Seems to take about 85ms / iteration
1886 }
1887 #endif
1888
1889 /* register output plugin with ulogd */
1890 static void _drl_reg_op(void)
1891 {
1892     ulog_output_t *op = &drl_op;
1893     register_output(op);
1894 }
1895
1896 void _init(void)
1897 {
1898     /* have the opts parsed */
1899     config_parse_file("DRL", config_entries);
1900
1901     if (get_ids()) {
1902         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1903         exit(2);
1904     }
1905
1906     /* Seed the hash function */
1907     salt = getpid() ^ time(NULL);
1908
1909     _drl_reg_op();
1910 }