Made the print interval faster. Made the limit be specified in Kb/s rather than...
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "calendar.h"
119 #include "ratetypes.h" /* needs util and pthread.h */
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t drl_configfile = {
128     .next = NULL,
129     .key = "drl_configfile",
130     .type = CONFIG_TYPE_STRING,
131     .options = CONFIG_OPT_MANDATORY,
132     .u = { .string = "drl.xml" },
133 };
134
135 /** The administrative bandwidth limit (mbps) for the local node.  The node
136  * will not set a limit higher than this, even when distributed capacity is
137  * available.  Set to 0 for no limit. */
138 static config_entry_t nodelimit = {
139     .next = &drl_configfile,
140     .key = "nodelimit",
141     .type = CONFIG_TYPE_INT,
142     .options = CONFIG_OPT_MANDATORY,
143     .u = { .value = 0 },
144 };
145
146 /** Determines the verbosity of logging. */
147 static config_entry_t drl_loglevel = {
148     .next = &nodelimit,
149     .key = "drl_loglevel",
150     .type = CONFIG_TYPE_INT,
151     .options = CONFIG_OPT_MANDATORY,
152     .u = { .value = LOG_WARN },
153 };
154
155 /** The path of the logfile. */
156 static config_entry_t drl_logfile = {
157     .next = &drl_loglevel,
158     .key = "drl_logfile",
159     .type = CONFIG_TYPE_STRING,
160     .options = CONFIG_OPT_MANDATORY,
161     .u = { .string = "drl_logfile.log" },
162 };
163
164 /** The choice of DRL protocol. */
165 static config_entry_t policy = {
166     .next = &drl_logfile,
167     .key = "policy",
168     .type = CONFIG_TYPE_STRING,
169     .options = CONFIG_OPT_MANDATORY,
170     .u = { .string = "GRD" },
171 };
172
173 /** The estimate interval, in milliseconds. */
174 static config_entry_t estintms = {
175     .next = &policy,
176     .key = "estintms",
177     .type = CONFIG_TYPE_INT,
178     .options = CONFIG_OPT_MANDATORY,
179     .u = { .value = 100 },
180 };
181
182 #define config_entries (&estintms)
183
184 /*
185  * Debug functionality
186  */
187
188 #ifdef DMALLOC
189 #include <dmalloc.h>
190 #endif
191
192 #define NIPQUAD(addr) \
193     ((unsigned char *)&addr)[0], \
194     ((unsigned char *)&addr)[1], \
195     ((unsigned char *)&addr)[2], \
196     ((unsigned char *)&addr)[3]
197
198 #define IPQUAD(addr) \
199     ((unsigned char *)&addr)[3], \
200     ((unsigned char *)&addr)[2], \
201     ((unsigned char *)&addr)[1], \
202     ((unsigned char *)&addr)[0]
203
204
205
206 /* Salt for the hash functions */
207 static int salt;
208
209 /*
210  * Hash slice name lookups on context ID.
211  */
212
213 /* Special context IDs */
214 #define UNKNOWN_XID -1
215 #define ROOT_XID 0
216
217 enum {
218     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
219     ICMP_ECHOREPLY_XID,
220     ICMP_UNREACH_XID,
221 };
222
223
224 /* globals */
225 pthread_t estimate_thread;
226 pthread_t signal_thread;
227 pthread_t comm_thread;
228 uint32_t local_ip = 0;
229 limiter_t limiter;
230 extern FILE *logfile;
231 extern uint8_t system_loglevel;
232 extern uint8_t do_enforcement;
233
234 /* functions */
235
236 static inline uint32_t
237 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port)
238 {
239     unsigned char mybytes[FLOWKEYSIZE];
240     mybytes[0] = protocol;
241     *(uint32_t*)(&(mybytes[1])) = src_ip;
242     *(uint32_t*)(&(mybytes[5])) = dst_ip;
243     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
244     return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1);
245 }
246
247 uint32_t sampled_hasher(const key_flow *key) {
248     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
249 }
250
251 uint32_t standard_hasher(const key_flow *key) {
252     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
253 }
254
255 struct intr_id {
256     char* name;
257     ulog_iret_t *res;
258 };
259
260 /* Interesting keys */
261 enum {
262     OOB_TIME_SEC = 0,
263     OOB_MARK,
264     IP_SADDR,
265     IP_DADDR,
266     IP_TOTLEN,
267     IP_PROTOCOL,
268     TCP_SPORT,
269     TCP_DPORT,
270     TCP_ACK,
271     TCP_FIN,
272     TCP_SYN,
273     TCP_RST,
274     UDP_SPORT,
275     UDP_DPORT,
276     ICMP_TYPE,
277     ICMP_CODE,
278     GRE_FLAG_KEY,
279     GRE_VERSION,
280     GRE_KEY,
281     PPTP_CALLID,
282 };
283
284 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
285 static struct intr_id intr_ids[] = {
286     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
287     [OOB_MARK] = { "oob.mark", 0 },
288     [IP_SADDR] = { "ip.saddr", 0 },
289     [IP_DADDR] = { "ip.daddr", 0 },
290     [IP_TOTLEN] = { "ip.totlen", 0 },
291     [IP_PROTOCOL] = { "ip.protocol", 0 },
292     [TCP_SPORT] = { "tcp.sport", 0 },
293     [TCP_DPORT] { "tcp.dport", 0 },
294     [TCP_ACK] = { "tcp.ack", 0 },
295     [TCP_FIN] = { "tcp.fin", 0 },
296     [TCP_SYN] = { "tcp.syn", 0 },
297     [TCP_RST] = { "tcp.rst", 0 },
298     [UDP_SPORT] = { "udp.sport", 0 },
299     [UDP_DPORT] = { "udp.dport", 0 },
300     [ICMP_TYPE] = { "icmp.type", 0 },
301     [ICMP_CODE] = { "icmp.code", 0 },
302     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
303     [GRE_VERSION] = { "gre.version", 0 },
304     [GRE_KEY] = { "gre.key", 0 },
305     [PPTP_CALLID] = { "pptp.callid", 0 },
306 };
307
308 #define GET_VALUE(x) intr_ids[x].res->value
309
310 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
311
312 static int _output_drl(ulog_iret_t *res)
313 {
314     int xid;
315     uint32_t src_ip, dst_ip;
316     uint16_t src_port, dst_port;
317     uint8_t protocol;
318
319     key_flow key;
320     identity_t *ident;
321     leaf_t *leaf;
322
323     protocol = GET_VALUE(IP_PROTOCOL).ui8;
324     src_ip = GET_VALUE(IP_SADDR).ui32;
325     dst_ip = GET_VALUE(IP_DADDR).ui32;
326     xid = GET_VALUE(OOB_MARK).ui32;
327
328     switch (protocol) {
329             
330         case IPPROTO_TCP:
331             src_port = GET_VALUE(TCP_SPORT).ui16;
332             dst_port = GET_VALUE(TCP_DPORT).ui16;
333             break;
334
335         case IPPROTO_UDP:
336             /* netflow had an issue with many udp flows and set
337              * src_port=0 to handle it.  We don't. 
338              */
339             src_port = GET_VALUE(UDP_SPORT).ui16;
340
341             /*
342              * traceroutes create a large number of flows in the db
343              * this is a quick hack to catch the most common form
344              * of traceroute (basically we're mapping any UDP packet
345              * in the 33435-33524 range to the "trace" port, 33524 is
346              * 3 packets * nhops (30).
347              */
348             dst_port = GET_VALUE(UDP_DPORT).ui16;
349             if (dst_port >= 33435 && dst_port <= 33524)
350                 dst_port = 33435;
351             break;
352
353         case IPPROTO_ICMP:
354             src_port = GET_VALUE(ICMP_TYPE).ui8;
355             dst_port = GET_VALUE(ICMP_CODE).ui8;
356
357             /*
358              * We special case some of the ICMP traffic that the kernel
359              * always generates. Since this is attributed to root, it 
360              * creates significant "noise" in the output. We want to be
361              * able to quickly see that root is generating traffic.
362              */
363             if (xid == ROOT_XID) {
364                 if (src_port == ICMP_ECHOREPLY)
365                     xid = ICMP_ECHOREPLY_XID;
366                 else if (src_port == ICMP_UNREACH)
367                     xid = ICMP_UNREACH_XID;
368             }
369             break;
370
371         case IPPROTO_GRE:
372             if (GET_VALUE(GRE_FLAG_KEY).b) {
373                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
374                     /* Get PPTP call ID */
375                     src_port = GET_VALUE(PPTP_CALLID).ui16;
376                 } else {
377                     /* XXX Truncate GRE keys to 16 bits */
378                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
379                 }
380             } else {
381                 /* No key available */
382                 src_port = 0;
383             }
384             dst_port = 0;
385             break;
386
387         default:
388             /* This is the default key for packets from unsupported protocols */
389             src_port = 0;
390             dst_port = 0;
391             break;
392     }
393
394     key.protocol = protocol;
395     key.source_ip = src_ip;
396     key.dest_ip = dst_ip;
397     key.source_port = src_port;
398     key.dest_port = dst_port;
399     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
400     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
401
402     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
403
404     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
405
406     /* Even if the packet doesn't match any specific xid, it should still
407      * count in the machine-type tables.  This catches root (xid == 0) and
408      * unclassified (xid = fff) packets, which don't have map entries. */
409     if (leaf == NULL) {
410         ident = limiter.stable_instance.last_machine;
411     } else {
412         ident = leaf->parent;
413     }
414
415     while (ident) {
416         pthread_mutex_lock(&ident->table_mutex);
417
418         /* Update the identity's table. */
419         ident->table_sample_function(ident->table, &key);
420
421         pthread_mutex_unlock(&ident->table_mutex);
422
423         ident = ident->parent;
424     }
425
426     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
427
428     return 0;
429 }
430
431 /* get all key id's for the keys we are intrested in */
432 static int get_ids(void)
433 {
434     int i;
435     struct intr_id *cur_id;
436
437     for (i = 0; i < INTR_IDS; i++) {
438         cur_id = &intr_ids[i];
439         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
440         if (!cur_id->res) {
441             ulogd_log(ULOGD_ERROR, 
442                     "Cannot resolve keyhash id for %s\n", 
443                     cur_id->name);
444             return 1;
445         }
446     }
447     return 0;
448 }
449
450 static void free_identity(identity_t *ident) {
451     if (ident) {
452         free_comm(&ident->comm);
453
454         if (ident->table) {
455             ident->table_destroy_function(ident->table);
456         }
457
458         pthread_mutex_destroy(&ident->table_mutex);
459
460         free(ident);
461     }
462 }
463
464 static void free_identity_map(map_handle map) {
465     identity_t *tofree = NULL;
466
467     map_reset_iterate(map);
468     while ((tofree = (identity_t *) map_next(map))) {
469         free_identity(tofree);
470     }
471
472     free_map(map, 0);
473 }
474
475 static void free_instance(drl_instance_t *instance) {
476     if (instance->leaves)
477         free(instance->leaves);
478     if (instance->leaf_map)
479         free_map(instance->leaf_map, 0);
480     if (instance->ident_map)
481         free_identity_map(instance->ident_map);
482     if (instance->machines)
483         free(instance->machines);
484     if (instance->sets)
485         free(instance->sets);
486     if (instance->cal) {
487         free(instance->cal);
488     }
489
490     memset(instance, 0, sizeof(drl_instance_t));
491 }
492
493 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
494     /* Free configs. */
495     if (configs.machines)
496         free_ident_list(configs.machines);
497     if (configs.sets)
498         free_ident_list(configs.sets);
499
500     /* Free instance. */
501     if (instance)
502         free_instance(instance);
503 }
504
505 static identity_t *new_identity(ident_config *config) {
506     identity_t *ident = malloc(sizeof(identity_t));
507     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
508     ident_peer *peer = config->peers;
509     int peer_slot = 0;
510
511     if (ident == NULL) {
512         return NULL;
513     }
514
515     if (comm_nodes == NULL) {
516         free(ident);
517         return NULL;
518     }
519
520     memset(ident, 0, sizeof(identity_t));
521     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
522
523     ident->id = config->id;
524     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
525     ident->fixed_ewma_weight = config->fixed_ewma_weight;
526     ident->intervals = config->intervals;
527     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
528                              (limiter.estintms/1000.0) * config->intervals);
529     ident->parent = NULL;
530
531     pthread_mutex_init(&ident->table_mutex, NULL);
532     switch (config->accounting) {
533         case ACT_STANDARD:
534             ident->table =
535                 standard_table_create(standard_hasher, &ident->common);
536
537             /* Ugly function pointer casting.  Makes things sufficiently
538              * generic, though. */
539             ident->table_sample_function =
540                 (int (*)(void *, const key_flow *)) standard_table_sample;
541             ident->table_cleanup_function =
542                 (int (*)(void *)) standard_table_cleanup;
543             ident->table_update_function =
544                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
545             ident->table_destroy_function =
546                 (void (*)(void *)) standard_table_destroy;
547             break;
548
549         case ACT_SAMPLEHOLD:
550             ident->table = sampled_table_create(sampled_hasher,
551                     ident->limit * IDENT_CLEAN_INTERVAL,
552                     1, 20, &ident->common);
553
554             ident->table_sample_function =
555                 (int (*)(void *, const key_flow *)) sampled_table_sample;
556             ident->table_cleanup_function =
557                 (int (*)(void *)) sampled_table_cleanup;
558             ident->table_update_function =
559                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
560             ident->table_destroy_function =
561                 (void (*)(void *)) sampled_table_destroy;
562             break;
563
564         case ACT_SIMPLE:
565             ident->table = simple_table_create(&ident->common);
566
567             ident->table_sample_function =
568                 (int (*)(void *, const key_flow *)) simple_table_sample;
569             ident->table_cleanup_function =
570                 (int (*)(void *)) simple_table_cleanup;
571             ident->table_update_function =
572                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
573             ident->table_destroy_function =
574                 (void (*)(void *)) simple_table_destroy;
575             break;
576     }
577
578     /* Make sure the table was allocated. */
579     if (ident->table == NULL) {
580         free(ident);
581         return NULL;
582     }
583
584     while (peer) {
585         comm_nodes[peer_slot].addr = peer->ip;
586         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
587         peer = peer->next;
588         peer_slot += 1;
589     }
590
591     if (new_comm(&ident->comm, config, comm_nodes)) {
592         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
593         return NULL;
594     }
595
596     ident->comm.remote_nodes = comm_nodes;
597
598     return ident;
599 }
600
601 /* Determines the validity of the parameters of one ident_config.
602  *
603  * 0 valid
604  * 1 invalid
605  */
606 static int validate_config(ident_config *config) {
607     /* Limit must be a positive integer. */
608     if (config->limit < 1) {
609         return 1;
610     }
611
612     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
613     if (config->commfabric != COMM_MESH &&
614             config->commfabric != COMM_GOSSIP) {
615         return 1;
616     }
617
618     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
619     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
620         return 1;
621     }
622
623     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
624      * ACT_SIMPLE). */
625     if (config->accounting != ACT_STANDARD &&
626             config->accounting != ACT_SAMPLEHOLD &&
627             config->accounting != ACT_SIMPLE) {
628         return 1;
629     }
630
631     /* Ewma weight must be greater than or equal to zero. */
632     if (config->fixed_ewma_weight < 0) {
633         return 1;
634     }
635
636     /* Note: Parsing stage requires that each ident has at least one peer. */
637     return 0;
638 }
639
640 /* 0 success
641  * non-zero failure
642  */
643 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
644
645     ident_config *mlist = configs.machines;
646     ident_config *slist = configs.sets;
647     ident_config *tmp = NULL;
648     int i = 0;
649
650     while (mlist) {
651         /* ID must be non-zero and unique. */
652         /* This is ugly and hackish, but this function will be called rarely.
653          * I'm tired of trying to be clever. */
654         if (mlist->id < 0) {
655             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
656             return EINVAL;
657         }
658         tmp = mlist->next;
659         while (tmp) {
660             if (mlist->id == tmp->id) {
661                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
662                 return EINVAL;
663             }
664             tmp = tmp->next;
665         }
666         tmp = configs.sets;
667         while (tmp) {
668             if (mlist->id == tmp->id) {
669                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
670                 return EINVAL;
671             }
672             tmp = tmp->next;
673         }
674
675         if (validate_config(mlist)) {
676             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
677             return EINVAL;
678         }
679
680         mlist = mlist->next;
681     }
682
683     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
684     if (instance->sets == NULL) {
685         return ENOMEM;
686     }
687
688     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
689     instance->set_count = configs.set_count;
690
691     /* For sets, make sure that the hierarchy is valid. */
692     while (slist) {
693         ident_member *members = slist->members;
694
695         /* ID must be non-zero and unique. */
696         if (slist->id < 0) {
697             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
698             return EINVAL;
699         }
700         tmp = slist->next;
701         while (tmp) {
702             if (slist->id == tmp->id) {
703                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
704                 return EINVAL;
705             }
706             tmp = tmp->next;
707         }
708
709         if (validate_config(slist)) {
710             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
711             return EINVAL;
712         }
713
714         /* Allocate an identity_t for this set-type identity. */
715         instance->sets[i] = new_identity(slist);
716
717         if (instance->sets[i] == NULL) {
718             return ENOMEM;
719         }
720
721         /* Loop through children and look up each in leaf or ident map
722          * depending on the type of child.  Set the child's parent pointer
723          * to the identity we just created above, unless it is already set,
724          * in which case we have an error. */
725         while (members) {
726             identity_t *child_ident = NULL;
727             leaf_t *child_leaf = NULL;
728
729             switch (members->type) {
730                 case MEMBER_XID:
731                     child_leaf = map_search(instance->leaf_map, &members->value,
732                                             sizeof(members->value));
733                     if (child_leaf == NULL) {
734                         return EINVAL;
735                     }
736                     if (child_leaf->parent != NULL) {
737                         /* Error - This leaf already has a parent. */
738                         return EINVAL;
739                     }
740                     child_leaf->parent = instance->sets[i];
741                     break;
742                 case MEMBER_GUID:
743                     child_ident = map_search(instance->ident_map, &members->value,
744                                              sizeof(members->value));
745                     if (child_ident == NULL) {
746                         return EINVAL;
747                     }
748                     if (child_ident->parent != NULL) {
749                         /* Error - This identity already has a parent. */
750                         return EINVAL;
751                     }
752                     child_ident->parent = instance->sets[i];
753                     break;
754                 default:
755                     /* Error - shouldn't be possible. */
756                     break;
757             }
758             members = members->next;
759         }
760
761         map_insert(instance->ident_map, &instance->sets[i]->id,
762                    sizeof(instance->sets[i]->id), instance->sets[i]);
763
764         slist = slist->next;
765         i++;
766     }
767     return 0;
768 }
769
770 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
771     int count = 0;
772     identity_t *current_ident;
773     leaf_t *current_leaf;
774     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
775     if (leaves == NULL) {
776         return 1;
777     }
778
779     map_reset_iterate(instance->leaf_map);
780     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
781         current_ident = current_leaf->parent;
782         while (current_ident != NULL && current_ident != instance->last_machine) {
783             if (current_ident == ident) {
784                 /* Found the ident we were looking for - add the leaf. */
785                 leaves[count] = current_leaf;
786                 count += 1;
787                 break;
788             }
789             current_ident = current_ident->parent;
790         }
791     }
792
793     ident->leaves = leaves;
794     ident->leaf_count = count;
795
796     return 0;
797 }
798
799 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
800     int i, j;
801     ident_config *config = configs.machines;
802     leaf_t *leaf = NULL;
803
804     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
805
806     if (instance->cal == NULL) {
807         return ENOMEM;
808     }
809
810     for (i = 0; i < SCHEDLEN; ++i) {
811         TAILQ_INIT(instance->cal + i);
812     }
813     instance->cal_slot = 0;
814
815     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
816
817     if (instance->machines == NULL) {
818         return ENOMEM;
819     }
820
821     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
822     instance->machine_count = configs.machine_count;
823
824     /* Allocate and add the machine identities. */
825     for (i = 0; i < configs.machine_count; ++i) {
826         instance->machines[i] = new_identity(config);
827
828         if (instance->machines[i] == NULL) {
829             return ENOMEM;
830         }
831
832         /* The first has no parent - it is the root.  All others have the
833          * previous ident as their parent. */
834         if (i == 0) {
835             instance->machines[i]->parent = NULL;
836         } else {
837             instance->machines[i]->parent = instance->machines[i - 1];
838         }
839
840         instance->last_machine = instance->machines[i];
841
842         /* Add the ident to the guid->ident map. */
843         map_insert(instance->ident_map, &instance->machines[i]->id,
844                    sizeof(instance->machines[i]->id), instance->machines[i]);
845
846         config = config->next;
847
848         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
849                           instance->machines[i], calendar);
850
851         /* Setup the array of pointers to leaves.  This is easy for machines
852          * because a machine node applies to every leaf. */
853         instance->machines[i]->leaves =
854             malloc(instance->leaf_count * sizeof(leaf_t *));
855         if (instance->machines[i]->leaves == NULL) {
856             return ENOMEM;
857         }
858         instance->machines[i]->leaf_count = instance->leaf_count;
859         for (j = 0; j < instance->leaf_count; ++j) {
860             instance->machines[i]->leaves[j] = &instance->leaves[j];
861         }
862     }
863
864     /* Connect the set subtree to the machines. Any set or leaf without a
865      * parent will take the last machine as its parent. */
866
867     /* Leaves... */
868     map_reset_iterate(instance->leaf_map);
869     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
870         if (leaf->parent == NULL) {
871             leaf->parent = instance->last_machine;
872         }
873     }
874
875     /* Sets... */
876     for (i = 0; i < instance->set_count; ++i) {
877         if (instance->sets[i]->parent == NULL) {
878             instance->sets[i]->parent = instance->last_machine;
879         }
880
881         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
882                           instance->sets[i], calendar);
883
884         /* Setup the array of pointers to leaves.  This is harder for sets,
885          * but this doesn't need to be super-efficient because it happens
886          * rarely and it isn't on the critical path for reconfig(). */
887         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
888             return ENOMEM;
889         }
890     }
891
892     /* Success. */
893     return 0;
894 }
895
896 static void print_instance(drl_instance_t *instance) {
897     leaf_t *leaf = NULL;
898     identity_t *ident = NULL;
899
900     if (system_loglevel == LOG_DEBUG) {
901         map_reset_iterate(instance->leaf_map);
902         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
903             printf("%x:", leaf->xid);
904             ident = leaf->parent;
905             while (ident) {
906                 printf("%d:",ident->id);
907                 ident = ident->parent;
908             }
909             printf("Leaf's parent pointer is %p\n", leaf->parent);
910         }
911
912         printf("instance->last_machine is %p\n", instance->last_machine);
913     }
914 }
915
916 static int assign_htb_hierarchy(drl_instance_t *instance) {
917     int i, j;
918     int next_node = 0x11;
919
920     /* Chain machine nodes under 1:10. */
921     for (i = 0; i < instance->machine_count; ++i) {
922         if (instance->machines[i]->parent == NULL) {
923             /* Top node. */
924             instance->machines[i]->htb_parent = 0x10;
925         } else {
926             /* Pointerific! */
927             instance->machines[i]->htb_parent =
928                 instance->machines[i]->parent->htb_node;
929         }
930
931         instance->machines[i]->htb_node = next_node;
932         next_node += 1;
933     }
934
935     next_node += 0x10;
936
937     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
938      * already there. */
939     for (j = (instance->set_count - 1); j >= 0; --j) {
940         if (instance->sets[j]->parent == NULL) {
941             instance->sets[j]->htb_parent = 0x10;
942         } else {
943             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
944         }
945         instance->sets[j]->htb_node = next_node;
946
947         next_node += 1;
948     }
949
950     return 0;
951 }
952
953 /* Added this so that I could comment one line and kill off all of the
954  * command execution. */
955 static int execute_cmd(const char *cmd) {
956     return system(cmd);
957 }
958
959 static int create_htb_hierarchy(drl_instance_t *instance) {
960     char cmd[300];
961     int i, j, k;
962
963     /* Nuke the hierarchy. */
964     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
965     execute_cmd(cmd);
966     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
967
968     /* Re-initialize the basics. */
969     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
970     if (execute_cmd(cmd)) {
971         return 1;
972     }
973     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
974     sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit");
975     if (execute_cmd(cmd)) {
976         return 1;
977     }
978     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
979
980     /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/
981     if (limiter.nodelimit) {
982         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit",
983                 (unsigned long) limiter.nodelimit * 1024 * 1024);
984     } else {
985         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit");
986     }
987
988     if (execute_cmd(cmd)) {
989         return 1;
990     }
991     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
992
993     /* Add back 1:20. */
994     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
995
996     if (execute_cmd(cmd)) {
997         return 1;
998     }
999     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1000
1001
1002     /* Add machines. */
1003     for (i = 0; i < instance->machine_count; ++i) {
1004         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
1005                 instance->machines[i]->htb_parent,
1006                 instance->machines[i]->htb_node,
1007                 (unsigned long) instance->machines[i]->limit * 1024 * 1024);
1008
1009         if (execute_cmd(cmd)) {
1010             return 1;
1011         }
1012         printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1013     }
1014
1015     /* Add sets. */
1016     for (j = (instance->set_count - 1); j >= 0; --j) {
1017         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
1018                 instance->sets[j]->htb_parent,
1019                 instance->sets[j]->htb_node,
1020                 (unsigned long) instance->sets[j]->limit * 1024 * 1024);
1021
1022         if (execute_cmd(cmd)) {
1023             return 1;
1024         }
1025         printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1026     }
1027
1028     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1029     for (k = 0; k < instance->leaf_count; ++k) {
1030         if (instance->leaves[k].parent == NULL) {
1031             sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit",
1032                 instance->leaves[k].xid,
1033                 (unsigned long) 100 * 1024 * 1024);
1034         } else {
1035             sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit",
1036                     instance->leaves[k].parent->htb_node,
1037                     instance->leaves[k].xid,
1038                     (unsigned long) 100 * 1024 * 1024);
1039         }
1040
1041         if (execute_cmd(cmd)) {
1042             return 1;
1043         }
1044         printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1045         
1046         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit",
1047                 instance->leaves[k].xid);
1048
1049         if (execute_cmd(cmd)) {
1050             return 1;
1051         }
1052         printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1053     }
1054
1055     /* Add 1:1000 and 1:2000 */
1056     if (instance->last_machine == NULL) {
1057         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit");
1058     } else {
1059         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit",
1060                 instance->last_machine->htb_node);
1061     }
1062
1063     if (execute_cmd(cmd)) {
1064         return 1;
1065     }
1066     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1067     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit");
1068
1069     if (execute_cmd(cmd)) {
1070         return 1;
1071     }
1072     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1073
1074     /* Add 1:1fff and 1:2fff */
1075     if (instance->last_machine == NULL) {
1076         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit");
1077     } else {
1078         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit",
1079                 instance->last_machine->htb_node);
1080     }
1081
1082     if (execute_cmd(cmd)) {
1083         return 1;
1084     }
1085     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1086     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit");
1087
1088     if (execute_cmd(cmd)) {
1089         return 1;
1090     }
1091     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1092
1093 #ifdef DELAY40MS
1094     /* Only for artificial delay testing. */
1095     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1096     execute_cmd(cmd);
1097
1098     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1099     execute_cmd(cmd);
1100     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
1101     execute_cmd(cmd);
1102
1103     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
1104     execute_cmd(cmd);
1105     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
1106     execute_cmd(cmd);
1107
1108     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
1109     execute_cmd(cmd);
1110     /* End delay testing */
1111 #endif
1112
1113 //#define SFQTEST
1114
1115 #ifdef SFQTEST
1116     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1117     execute_cmd(cmd);
1118
1119     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 sfq perturb 20");
1120     execute_cmd(cmd);
1121 #endif
1122
1123     return 0;
1124 }
1125
1126 static int setup_tc_grd(drl_instance_t *instance) {
1127     int i;
1128     char cmd[300];
1129
1130     for (i = 0; i < instance->leaf_count; ++i) {
1131         /* Delete the old pfifo qdisc that might have been there before. */
1132         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1133                 instance->leaves[i].xid, instance->leaves[i].xid);
1134
1135         if (execute_cmd(cmd)) {
1136             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1137         }
1138
1139         /* Add the netem qdisc. */
1140 #ifdef DELAY40MS
1141         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
1142                 instance->leaves[i].xid, instance->leaves[i].xid);
1143 #else
1144         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1145                 instance->leaves[i].xid, instance->leaves[i].xid);
1146 #endif
1147
1148         if (execute_cmd(cmd)) {
1149             return 1;
1150         }
1151     }
1152
1153     /* Do the same for 1000 and 1fff. */
1154     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1155
1156     if (execute_cmd(cmd)) {
1157         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1158     }
1159
1160     /* Add the netem qdisc. */
1161 #ifdef DELAY40MS
1162     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
1163 #else
1164     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1165 #endif
1166
1167     if (execute_cmd(cmd)) {
1168         return 1;
1169     }
1170
1171     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1172
1173     if (execute_cmd(cmd)) {
1174         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1175     }
1176
1177     /* Add the netem qdisc. */
1178 #ifdef DELAY40MS
1179     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
1180 #else
1181     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1182 #endif
1183
1184     if (execute_cmd(cmd)) {
1185         return 1;
1186     }
1187
1188     return 0;
1189 }
1190
1191 /* init_drl 
1192  * 
1193  * Initialize this limiter with options 
1194  * Open UDP socket for peer communication
1195  */
1196 static int init_drl(void) {
1197     parsed_configs configs;
1198     struct sockaddr_in server_address;
1199     
1200     memset(&limiter, 0, sizeof(limiter_t));
1201
1202     /* Setup logging. */
1203     system_loglevel = (uint8_t) drl_loglevel.u.value;
1204     logfile = fopen(drl_logfile.u.string, "w");
1205
1206     if (logfile == NULL) {
1207         printf("Couldn't open logfile - ");
1208         perror("fopen()");
1209         exit(EXIT_FAILURE);
1210     }
1211
1212     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1213
1214     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1215
1216     init_hashing();  /* for all hash maps */
1217
1218     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1219
1220     /* determine our local IP by iterating through interfaces */
1221     if ((limiter.ip = get_local_ip())==0) {
1222         printlog(LOG_CRITICAL,
1223                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1224         return (false);
1225     }
1226     limiter.localaddr = inet_addr(limiter.ip);
1227     limiter.port = htons(LIMITER_LISTEN_PORT);
1228     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1229     if (limiter.udp_socket < 0) {
1230         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1231         return false;
1232     }
1233
1234     memset(&server_address, 0, sizeof(server_address));
1235     server_address.sin_family = AF_INET;
1236     server_address.sin_addr.s_addr = limiter.localaddr;
1237     server_address.sin_port = limiter.port;
1238
1239     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1240         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1241         return false;
1242     }
1243
1244     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1245     if (strcasecmp(policy.u.string,"GRD") == 0) {
1246         limiter.policy = POLICY_GRD;
1247     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1248         limiter.policy = POLICY_FPS;
1249     } else {
1250         printlog(LOG_CRITICAL,
1251                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1252         return (false);
1253     }
1254
1255     limiter.estintms = estintms.u.value;
1256     if (limiter.estintms > 1000) {
1257         printlog(LOG_CRITICAL,
1258                  "DRL: sorry estimate intervals must be less than 1 second.");
1259         printlog(LOG_CRITICAL,
1260                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1261         limiter.estintms = 1000;
1262     }
1263     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1264     
1265     /* Acquire the big limiter lock for writing.  Prevents pretty much
1266      * anything else from happening while the hierarchy is being changed. */
1267     pthread_rwlock_wrlock(&limiter.limiter_lock);
1268
1269     limiter.stable_instance.ident_map = allocate_map();
1270     if (limiter.stable_instance.ident_map == NULL) {
1271         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1272         return false;
1273     }
1274
1275     if (get_eligible_leaves(&limiter.stable_instance)) {
1276         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1277         return false;
1278     }
1279
1280     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1281         /* Parse error occured. Return non-zero to notify init_drl(). */
1282         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1283             drl_configfile.u.string);
1284         return false;
1285     }
1286
1287     /* Validate identity hierarchy! */
1288     if (validate_configs(configs, &limiter.stable_instance)) {
1289         /* Clean up everything. */
1290         free_failed_config(configs, &limiter.stable_instance);
1291         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1292             drl_configfile.u.string);
1293         return false;
1294     }
1295
1296     if (init_identities(configs, &limiter.stable_instance)) {
1297         free_failed_config(configs, &limiter.stable_instance);
1298         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1299         return false;
1300     }
1301
1302     /* At this point, we should be done with configs. */
1303     free_ident_list(configs.machines);
1304     free_ident_list(configs.sets);
1305
1306     /* Debugging - FIXME: remove this? */
1307     print_instance(&limiter.stable_instance);
1308
1309     switch (limiter.policy) {
1310         case POLICY_FPS:
1311             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1312                 free_instance(&limiter.stable_instance);
1313                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1314                 return false;
1315             }
1316
1317             if (create_htb_hierarchy(&limiter.stable_instance)) {
1318                 free_instance(&limiter.stable_instance);
1319                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1320                 return false;
1321             }
1322         break;
1323
1324         case POLICY_GRD:
1325             if (setup_tc_grd(&limiter.stable_instance)) {
1326                 free_instance(&limiter.stable_instance);
1327                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1328                 return false;
1329             }
1330         break;
1331
1332         default:
1333         return false;
1334     }
1335
1336     pthread_rwlock_unlock(&limiter.limiter_lock);
1337
1338     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1339         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1340         return false;
1341     }
1342
1343     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1344
1345     return true;
1346 }
1347
1348 static void reconfig() {
1349     parsed_configs configs;
1350
1351     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1352     flushlog();
1353
1354     memset(&configs, 0, sizeof(parsed_configs));
1355     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1356
1357     limiter.new_instance.ident_map = allocate_map();
1358     if (limiter.new_instance.ident_map == NULL) {
1359         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1360         return;
1361     }
1362
1363     if (get_eligible_leaves(&limiter.new_instance)) {
1364         free_failed_config(configs, &limiter.new_instance);
1365         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1366         return;
1367     }
1368
1369     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1370         free_failed_config(configs, &limiter.new_instance);
1371         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1372         return;
1373     }
1374
1375     if (validate_configs(configs, &limiter.new_instance)) {
1376         free_failed_config(configs, &limiter.new_instance);
1377         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1378         pthread_rwlock_unlock(&limiter.limiter_lock);
1379         return;
1380     }
1381
1382     if (init_identities(configs, &limiter.new_instance)) {
1383         free_failed_config(configs, &limiter.new_instance);
1384         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1385         pthread_rwlock_unlock(&limiter.limiter_lock);
1386         return;
1387     }
1388
1389     free_ident_list(configs.machines);
1390     free_ident_list(configs.sets);
1391
1392     /* Debugging - FIXME: remove this? */
1393     print_instance(&limiter.new_instance);
1394     
1395     /* Lock */
1396     pthread_rwlock_wrlock(&limiter.limiter_lock);
1397
1398     switch (limiter.policy) {
1399         case POLICY_FPS:
1400             if (assign_htb_hierarchy(&limiter.new_instance)) {
1401                 free_instance(&limiter.new_instance);
1402                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1403                 pthread_rwlock_unlock(&limiter.limiter_lock);
1404                 return;
1405             }
1406
1407             if (create_htb_hierarchy(&limiter.new_instance)) {
1408                 free_instance(&limiter.new_instance);
1409                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1410
1411                 /* Re-create old instance. */
1412                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1413                     /* Error reinstating the old one - big problem. */
1414                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1415                     printlog(LOG_CRITICAL, "Giving up...\n");
1416                     flushlog();
1417                     exit(EXIT_FAILURE);
1418                 }
1419
1420                 pthread_rwlock_unlock(&limiter.limiter_lock);
1421                 return;
1422             }
1423         break;
1424
1425         case POLICY_GRD:
1426             if (setup_tc_grd(&limiter.new_instance)) {
1427                 free_instance(&limiter.new_instance);
1428                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1429
1430                 /* Try to re-create old instance. */
1431                 if (setup_tc_grd(&limiter.stable_instance)) {
1432                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1433                     printlog(LOG_CRITICAL, "Giving up...\n");
1434                     flushlog();
1435                     exit(EXIT_FAILURE);
1436                 }
1437             }
1438         break;
1439
1440         default:
1441             /* Should be impossible. */
1442             printf("Pigs are flying?\n");
1443             exit(EXIT_FAILURE);
1444     }
1445
1446     /* Switch over new to stable instance. */
1447     free_instance(&limiter.stable_instance);
1448     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1449
1450     /* Success! - Unlock */
1451     pthread_rwlock_unlock(&limiter.limiter_lock);
1452 }
1453
1454 static ulog_output_t drl_op = {
1455     .name = "drl",
1456     .output = &_output_drl,
1457     .signal = NULL, /* This appears to be broken. Using my own handler. */
1458     .init = NULL,
1459     .fini = NULL,
1460 };
1461
1462 /* Tests the amount of time it takes to call reconfig(). */
1463 static void time_reconfig(int iterations) {
1464     struct timeval start, end;
1465     int i;
1466
1467     gettimeofday(&start, NULL);
1468     for (i = 0; i < iterations; ++i) {
1469         reconfig();
1470     }
1471     gettimeofday(&end, NULL);
1472
1473     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1474            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1475     exit(0);
1476
1477     // Seems to take about 85ms / iteration
1478 }
1479
1480 static int stop_enforcement(drl_instance_t *instance) {
1481     char cmd[300];
1482     int i;
1483
1484     for (i = 0; i < instance->machine_count; ++i) {
1485         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1486                 instance->machines[i]->htb_parent,
1487                 instance->machines[i]->htb_node);
1488
1489         if (execute_cmd(cmd)) {
1490             return 1;
1491         }
1492     }
1493
1494     for (i = 0; i < instance->set_count; ++i) {
1495         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1496                 instance->sets[i]->htb_parent,
1497                 instance->sets[i]->htb_node);
1498
1499         if (execute_cmd(cmd)) {
1500             return 1;
1501         }
1502     }
1503
1504     return 0;
1505 }
1506
1507 static void *signal_thread_func(void *args) {
1508     int sig;
1509     int err;
1510     sigset_t sigs;
1511
1512     sigemptyset(&sigs);
1513     sigaddset(&sigs, SIGHUP);
1514     sigaddset(&sigs, SIGUSR1);
1515     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1516
1517     while (1) {
1518         sigemptyset(&sigs);
1519         sigaddset(&sigs, SIGHUP);
1520         sigaddset(&sigs, SIGUSR1);
1521
1522         err = sigwait(&sigs, &sig);
1523
1524         if (err) {
1525             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1526             flushlog();
1527         }
1528
1529         switch (sig) {
1530             case SIGHUP:
1531                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1532                 reconfig();
1533                 //time_reconfig(1000); /* instrumentation */
1534                 flushlog();
1535                 break;
1536             case SIGUSR1:
1537                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1538                 if (do_enforcement) {
1539                     do_enforcement = 0;
1540                     stop_enforcement(&limiter.stable_instance);
1541                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1542                 } else {
1543                     do_enforcement = 1;
1544                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1545                 }
1546                 pthread_rwlock_unlock(&limiter.limiter_lock);
1547                 break;
1548             default:
1549                 /* Intentionally blank. */
1550                 break;
1551         }
1552     }
1553
1554 }
1555
1556 /* register output plugin with ulogd */
1557 static void _drl_reg_op(void)
1558 {
1559     ulog_output_t *op = &drl_op;
1560     sigset_t signal_mask;
1561
1562     sigemptyset(&signal_mask);
1563     sigaddset(&signal_mask, SIGHUP);
1564     sigaddset(&signal_mask, SIGUSR1);
1565     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1566
1567     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1568         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1569         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1570         flushlog();
1571         exit(EXIT_FAILURE);
1572     }
1573
1574     if (!init_drl()) {
1575         printlog(LOG_CRITICAL, "Init failed. :(\n");
1576         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1577         flushlog();
1578         exit(EXIT_FAILURE);
1579     }
1580
1581     register_output(op);
1582
1583     /* start up the thread that will periodically estimate the
1584      * local rate and set the local limits
1585      * see estimate.c
1586      */
1587     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1588         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1589         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1590         exit(EXIT_FAILURE);
1591     }
1592 }
1593
1594 void _init(void)
1595 {
1596     /* have the opts parsed */
1597     config_parse_file("DRL", config_entries);
1598
1599     if (get_ids()) {
1600         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1601         exit(2);
1602     }
1603
1604     /* Seed the hash function */
1605     salt = getpid() ^ time(NULL);
1606
1607     _drl_reg_op();
1608 }