Updates to autotools for library detection
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t enforce_on = {
128     .next = NULL,
129     .key = "enforce_on",
130     .type = CONFIG_TYPE_INT,
131     .options = CONFIG_OPT_NONE,
132     .u = { .value = 1 },
133 };
134
135 static config_entry_t partition = {
136     .next = &enforce_on,
137     .key = "partition_set",
138     .type = CONFIG_TYPE_INT,
139     .options = CONFIG_OPT_NONE,
140     .u = { .value = 0xfffffff },
141 };
142
143 static config_entry_t sfq_slice = {
144     .next = &partition,
145     .key = "sfq_slice",
146     .type = CONFIG_TYPE_STRING,
147     .options = CONFIG_OPT_NONE,
148     .u = { .string = "NONE" },
149 };
150
151 static config_entry_t netem_slice = {
152     .next = &sfq_slice,
153     .key = "netem_slice",
154     .type = CONFIG_TYPE_STRING,
155     .options = CONFIG_OPT_NONE,
156     .u = { .string = "ALL" },
157 };
158
159 static config_entry_t netem_loss = {
160     .next = &netem_slice,
161     .key = "netem_loss",
162     .type = CONFIG_TYPE_INT,
163     .options = CONFIG_OPT_NONE,
164     .u = { .value = 0 },
165 };
166
167 static config_entry_t netem_delay = {
168     .next = &netem_loss,
169     .key = "netem_delay",
170     .type = CONFIG_TYPE_INT,
171     .options = CONFIG_OPT_NONE,
172     .u = { .value = 0 },
173 };
174
175 static config_entry_t drl_configfile = {
176     .next = &netem_delay,
177     .key = "drl_configfile",
178     .type = CONFIG_TYPE_STRING,
179     .options = CONFIG_OPT_MANDATORY,
180     .u = { .string = "drl.xml" },
181 };
182
183 /** The administrative bandwidth limit (mbps) for the local node.  The node
184  * will not set a limit higher than this, even when distributed capacity is
185  * available.  Set to 0 for no limit. */
186 static config_entry_t nodelimit = {
187     .next = &drl_configfile,
188     .key = "nodelimit",
189     .type = CONFIG_TYPE_INT,
190     .options = CONFIG_OPT_MANDATORY,
191     .u = { .value = 0 },
192 };
193
194 /** Determines the verbosity of logging. */
195 static config_entry_t drl_loglevel = {
196     .next = &nodelimit,
197     .key = "drl_loglevel",
198     .type = CONFIG_TYPE_INT,
199     .options = CONFIG_OPT_MANDATORY,
200     .u = { .value = LOG_WARN },
201 };
202
203 /** The path of the logfile. */
204 static config_entry_t drl_logfile = {
205     .next = &drl_loglevel,
206     .key = "drl_logfile",
207     .type = CONFIG_TYPE_STRING,
208     .options = CONFIG_OPT_MANDATORY,
209     .u = { .string = "drl_logfile.log" },
210 };
211
212 /** The choice of DRL protocol. */
213 static config_entry_t policy = {
214     .next = &drl_logfile,
215     .key = "policy",
216     .type = CONFIG_TYPE_STRING,
217     .options = CONFIG_OPT_MANDATORY,
218     .u = { .string = "GRD" },
219 };
220
221 /** The estimate interval, in milliseconds. */
222 static config_entry_t estintms = {
223     .next = &policy,
224     .key = "estintms",
225     .type = CONFIG_TYPE_INT,
226     .options = CONFIG_OPT_MANDATORY,
227     .u = { .value = 100 },
228 };
229
230 #define config_entries (&estintms)
231
232 /*
233  * Debug functionality
234  */
235
236 #ifdef DMALLOC
237 #include <dmalloc.h>
238 #endif
239
240 #define NIPQUAD(addr) \
241     ((unsigned char *)&addr)[0], \
242     ((unsigned char *)&addr)[1], \
243     ((unsigned char *)&addr)[2], \
244     ((unsigned char *)&addr)[3]
245
246 #define IPQUAD(addr) \
247     ((unsigned char *)&addr)[3], \
248     ((unsigned char *)&addr)[2], \
249     ((unsigned char *)&addr)[1], \
250     ((unsigned char *)&addr)[0]
251
252
253
254 /* Salt for the hash functions */
255 static int salt;
256
257 /*
258  * Hash slice name lookups on context ID.
259  */
260
261 /* Special context IDs */
262 #define UNKNOWN_XID -1
263 #define ROOT_XID 0
264
265 enum {
266     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
267     ICMP_ECHOREPLY_XID,
268     ICMP_UNREACH_XID,
269 };
270
271
272 /* globals */
273 pthread_t estimate_thread;
274 pthread_t signal_thread;
275 pthread_t comm_thread;
276 uint32_t local_ip = 0;
277 limiter_t limiter;
278 extern FILE *logfile;
279 extern uint8_t system_loglevel;
280 extern uint8_t do_enforcement;
281
282 /* Used to simulate partitions. */
283 int do_partition = 0;
284 int partition_set = 0xfffffff;
285
286 /* functions */
287
288 static inline uint32_t
289 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
290 {
291     unsigned char mybytes[FLOWKEYSIZE];
292     mybytes[0] = protocol;
293     *(uint32_t*)(&(mybytes[1])) = src_ip;
294     *(uint32_t*)(&(mybytes[5])) = dst_ip;
295     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
296     return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
297 }
298
299 uint32_t sampled_hasher(const key_flow *key) {
300     /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
301     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
302 }
303
304 uint32_t standard_hasher(const key_flow *key) {
305     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
306 }
307
308 uint32_t multiple_hasher(const key_flow *key) {
309     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
310 }
311
312 struct intr_id {
313     char* name;
314     ulog_iret_t *res;
315 };
316
317 /* Interesting keys */
318 enum {
319     OOB_TIME_SEC = 0,
320     OOB_MARK,
321     IP_SADDR,
322     IP_DADDR,
323     IP_TOTLEN,
324     IP_PROTOCOL,
325     TCP_SPORT,
326     TCP_DPORT,
327     TCP_ACK,
328     TCP_FIN,
329     TCP_SYN,
330     TCP_RST,
331     UDP_SPORT,
332     UDP_DPORT,
333     ICMP_TYPE,
334     ICMP_CODE,
335     GRE_FLAG_KEY,
336     GRE_VERSION,
337     GRE_KEY,
338     PPTP_CALLID,
339 };
340
341 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
342 static struct intr_id intr_ids[] = {
343     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
344     [OOB_MARK] = { "oob.mark", 0 },
345     [IP_SADDR] = { "ip.saddr", 0 },
346     [IP_DADDR] = { "ip.daddr", 0 },
347     [IP_TOTLEN] = { "ip.totlen", 0 },
348     [IP_PROTOCOL] = { "ip.protocol", 0 },
349     [TCP_SPORT] = { "tcp.sport", 0 },
350     [TCP_DPORT] { "tcp.dport", 0 },
351     [TCP_ACK] = { "tcp.ack", 0 },
352     [TCP_FIN] = { "tcp.fin", 0 },
353     [TCP_SYN] = { "tcp.syn", 0 },
354     [TCP_RST] = { "tcp.rst", 0 },
355     [UDP_SPORT] = { "udp.sport", 0 },
356     [UDP_DPORT] = { "udp.dport", 0 },
357     [ICMP_TYPE] = { "icmp.type", 0 },
358     [ICMP_CODE] = { "icmp.code", 0 },
359     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
360     [GRE_VERSION] = { "gre.version", 0 },
361     [GRE_KEY] = { "gre.key", 0 },
362     [PPTP_CALLID] = { "pptp.callid", 0 },
363 };
364
365 #define GET_VALUE(x) intr_ids[x].res->value
366
367 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
368
369 static int _output_drl(ulog_iret_t *res)
370 {
371     int xid;
372     uint32_t src_ip, dst_ip;
373     uint16_t src_port, dst_port;
374     uint8_t protocol;
375
376     key_flow key;
377     identity_t *ident;
378     leaf_t *leaf;
379
380     protocol = GET_VALUE(IP_PROTOCOL).ui8;
381     src_ip = GET_VALUE(IP_SADDR).ui32;
382     dst_ip = GET_VALUE(IP_DADDR).ui32;
383     xid = GET_VALUE(OOB_MARK).ui32;
384
385     switch (protocol) {
386             
387         case IPPROTO_TCP:
388             src_port = GET_VALUE(TCP_SPORT).ui16;
389             dst_port = GET_VALUE(TCP_DPORT).ui16;
390             break;
391
392         case IPPROTO_UDP:
393             /* netflow had an issue with many udp flows and set
394              * src_port=0 to handle it.  We don't. 
395              */
396             src_port = GET_VALUE(UDP_SPORT).ui16;
397
398             /*
399              * traceroutes create a large number of flows in the db
400              * this is a quick hack to catch the most common form
401              * of traceroute (basically we're mapping any UDP packet
402              * in the 33435-33524 range to the "trace" port, 33524 is
403              * 3 packets * nhops (30).
404              */
405             dst_port = GET_VALUE(UDP_DPORT).ui16;
406             if (dst_port >= 33435 && dst_port <= 33524)
407                 dst_port = 33435;
408             break;
409
410         case IPPROTO_ICMP:
411             src_port = GET_VALUE(ICMP_TYPE).ui8;
412             dst_port = GET_VALUE(ICMP_CODE).ui8;
413
414             /*
415              * We special case some of the ICMP traffic that the kernel
416              * always generates. Since this is attributed to root, it 
417              * creates significant "noise" in the output. We want to be
418              * able to quickly see that root is generating traffic.
419              */
420             if (xid == ROOT_XID) {
421                 if (src_port == ICMP_ECHOREPLY)
422                     xid = ICMP_ECHOREPLY_XID;
423                 else if (src_port == ICMP_UNREACH)
424                     xid = ICMP_UNREACH_XID;
425             }
426             break;
427
428         case IPPROTO_GRE:
429             if (GET_VALUE(GRE_FLAG_KEY).b) {
430                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
431                     /* Get PPTP call ID */
432                     src_port = GET_VALUE(PPTP_CALLID).ui16;
433                 } else {
434                     /* XXX Truncate GRE keys to 16 bits */
435                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
436                 }
437             } else {
438                 /* No key available */
439                 src_port = 0;
440             }
441             dst_port = 0;
442             break;
443
444         default:
445             /* This is the default key for packets from unsupported protocols */
446             src_port = 0;
447             dst_port = 0;
448             break;
449     }
450
451     key.protocol = protocol;
452     key.source_ip = src_ip;
453     key.dest_ip = dst_ip;
454     key.source_port = src_port;
455     key.dest_port = dst_port;
456     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
457     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
458
459     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
460
461     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
462
463     /* Even if the packet doesn't match any specific xid, it should still
464      * count in the machine-type tables.  This catches root (xid == 0) and
465      * unclassified (xid = fff) packets, which don't have map entries. */
466     if (leaf == NULL) {
467         ident = limiter.stable_instance.last_machine;
468     } else {
469         ident = leaf->parent;
470     }
471
472     while (ident) {
473         pthread_mutex_lock(&ident->table_mutex);
474
475         /* Update the identity's table. */
476         ident->table_sample_function(ident->table, &key);
477
478 #ifdef SHADOW_ACCTING
479
480         /* Update the shadow perfect copy of the accounting table. */
481         standard_table_sample((standard_flow_table) ident->shadow_table, &key);
482
483 #endif
484
485         pthread_mutex_unlock(&ident->table_mutex);
486
487         ident = ident->parent;
488     }
489
490     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
491
492     return 0;
493 }
494
495 /* get all key id's for the keys we are intrested in */
496 static int get_ids(void)
497 {
498     int i;
499     struct intr_id *cur_id;
500
501     for (i = 0; i < INTR_IDS; i++) {
502         cur_id = &intr_ids[i];
503         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
504         if (!cur_id->res) {
505             ulogd_log(ULOGD_ERROR, 
506                     "Cannot resolve keyhash id for %s\n", 
507                     cur_id->name);
508             return 1;
509         }
510     }
511     return 0;
512 }
513
514 static void free_identity(identity_t *ident) {
515     if (ident) {
516         free_comm(&ident->comm);
517
518         if (ident->table) {
519             ident->table_destroy_function(ident->table);
520         }
521
522         if (ident->loop_action) {
523             ident->loop_action->valid = 0;
524         }
525
526         if (ident->comm_action) {
527             ident->comm_action->valid = 0;
528         }
529
530         pthread_mutex_destroy(&ident->table_mutex);
531
532         free(ident);
533     }
534 }
535
536 static void free_identity_map(map_handle map) {
537     identity_t *tofree = NULL;
538
539     map_reset_iterate(map);
540     while ((tofree = (identity_t *) map_next(map))) {
541         free_identity(tofree);
542     }
543
544     free_map(map, 0);
545 }
546
547 static void free_instance(drl_instance_t *instance) {
548     if (instance->leaves)
549         free(instance->leaves);
550     if (instance->leaf_map)
551         free_map(instance->leaf_map, 0);
552     if (instance->ident_map)
553         free_identity_map(instance->ident_map);
554     if (instance->machines)
555         free(instance->machines);
556     if (instance->sets)
557         free(instance->sets);
558
559     /* FIXME: Drain the calendar first and free all the entries. */
560     if (instance->cal) {
561         free(instance->cal);
562     }
563
564     memset(instance, 0, sizeof(drl_instance_t));
565 }
566
567 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
568     /* Free configs. */
569     if (configs.machines)
570         free_ident_list(configs.machines);
571     if (configs.sets)
572         free_ident_list(configs.sets);
573
574     /* Free instance. */
575     if (instance)
576         free_instance(instance);
577 }
578
579 static identity_t *new_identity(ident_config *config) {
580     identity_t *ident = malloc(sizeof(identity_t));
581     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
582     ident_peer *peer = config->peers;
583     int peer_slot = 0;
584
585     if (ident == NULL) {
586         return NULL;
587     }
588
589     if (comm_nodes == NULL) {
590         free(ident);
591         return NULL;
592     }
593
594     memset(ident, 0, sizeof(identity_t));
595     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
596
597     ident->id = config->id;
598     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
599     ident->fixed_ewma_weight = config->fixed_ewma_weight;
600     ident->communication_intervals = config->communication_intervals;
601     ident->mainloop_intervals = config->mainloop_intervals;
602     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
603                              (limiter.estintms/1000.0) * config->mainloop_intervals);
604     ident->parent = NULL;
605     ident->independent = config->independent;
606
607     pthread_mutex_init(&ident->table_mutex, NULL);
608     switch (config->accounting) {
609         case ACT_STANDARD:
610             ident->table =
611                 standard_table_create(standard_hasher, &ident->common);
612
613             /* Ugly function pointer casting.  Makes things sufficiently
614              * generic, though. */
615             ident->table_sample_function =
616                 (int (*)(void *, const key_flow *)) standard_table_sample;
617             ident->table_cleanup_function =
618                 (int (*)(void *)) standard_table_cleanup;
619             ident->table_update_function =
620                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
621             ident->table_destroy_function =
622                 (void (*)(void *)) standard_table_destroy;
623             break;
624
625         case ACT_MULTIPLE:
626             ident->table =
627                 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
628
629             ident->table_sample_function =
630                 (int (*)(void *, const key_flow *)) multiple_table_sample;
631             ident->table_cleanup_function =
632                 (int (*)(void *)) multiple_table_cleanup;
633             ident->table_update_function =
634                 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
635             ident->table_destroy_function =
636                 (void (*)(void *)) multiple_table_destroy;
637             break;
638
639         case ACT_SAMPLEHOLD:
640             ident->table = sampled_table_create(sampled_hasher,
641                     ident->limit * IDENT_CLEAN_INTERVAL,
642                     SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
643
644             ident->table_sample_function =
645                 (int (*)(void *, const key_flow *)) sampled_table_sample;
646             ident->table_cleanup_function =
647                 (int (*)(void *)) sampled_table_cleanup;
648             ident->table_update_function =
649                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
650             ident->table_destroy_function =
651                 (void (*)(void *)) sampled_table_destroy;
652             break;
653
654         case ACT_SIMPLE:
655             ident->table = simple_table_create(&ident->common);
656
657             ident->table_sample_function =
658                 (int (*)(void *, const key_flow *)) simple_table_sample;
659             ident->table_cleanup_function =
660                 (int (*)(void *)) simple_table_cleanup;
661             ident->table_update_function =
662                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
663             ident->table_destroy_function =
664                 (void (*)(void *)) simple_table_destroy;
665             break;
666     }
667
668 #ifdef SHADOW_ACCTING
669
670     ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
671
672     if (ident->shadow_table == NULL) {
673         ident->table_destroy_function(ident->table);
674         free(ident);
675         return NULL;
676     }
677
678 #endif
679
680     /* Make sure the table was allocated. */
681     if (ident->table == NULL) {
682         free(ident);
683         return NULL;
684     }
685
686     while (peer) {
687         comm_nodes[peer_slot].addr = peer->ip;
688         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
689         peer = peer->next;
690         peer_slot += 1;
691     }
692
693     if (new_comm(&ident->comm, config, comm_nodes)) {
694         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
695         return NULL;
696     }
697
698     ident->comm.remote_nodes = comm_nodes;
699
700     return ident;
701 }
702
703 /* Determines the validity of the parameters of one ident_config.
704  *
705  * 0 valid
706  * 1 invalid
707  */
708 static int validate_config(ident_config *config) {
709     /* Limit must be a positive integer. */
710     if (config->limit < 1) {
711         return 1;
712     }
713
714     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
715     if (config->commfabric != COMM_MESH &&
716             config->commfabric != COMM_GOSSIP) {
717         return 1;
718     }
719
720     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
721     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
722         return 1;
723     }
724
725     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
726      * ACT_SIMPLE, ACT_MULTIPLE). */
727     if (config->accounting != ACT_STANDARD &&
728             config->accounting != ACT_SAMPLEHOLD &&
729             config->accounting != ACT_SIMPLE &&
730             config->accounting != ACT_MULTIPLE) {
731         return 1;
732     }
733
734     /* Ewma weight must be greater than or equal to zero. */
735     if (config->fixed_ewma_weight < 0) {
736         return 1;
737     }
738
739     /* Note: Parsing stage requires that each ident has at least one peer. */
740     return 0;
741 }
742
743 /* 0 success
744  * non-zero failure
745  */
746 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
747
748     ident_config *mlist = configs.machines;
749     ident_config *slist = configs.sets;
750     ident_config *tmp = NULL;
751     int i = 0;
752
753     while (mlist) {
754         /* ID must be non-zero and unique. */
755         /* This is ugly and hackish, but this function will be called rarely.
756          * I'm tired of trying to be clever. */
757         if (mlist->id < 0) {
758             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
759             return EINVAL;
760         }
761         tmp = mlist->next;
762         while (tmp) {
763             if (mlist->id == tmp->id) {
764                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
765                 return EINVAL;
766             }
767             tmp = tmp->next;
768         }
769         tmp = configs.sets;
770         while (tmp) {
771             if (mlist->id == tmp->id) {
772                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
773                 return EINVAL;
774             }
775             tmp = tmp->next;
776         }
777
778         if (validate_config(mlist)) {
779             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
780             return EINVAL;
781         }
782
783         if (mlist->independent) {
784             printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n");
785             mlist->independent = 0;
786         }
787
788         mlist = mlist->next;
789     }
790
791     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
792     if (instance->sets == NULL) {
793         printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n");
794         return ENOMEM;
795     }
796
797     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
798     instance->set_count = configs.set_count;
799
800     /* For sets, make sure that the hierarchy is valid. */
801     while (slist) {
802         ident_member *members = slist->members;
803
804         /* ID must be non-zero and unique. */
805         if (slist->id < 0) {
806             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
807             return EINVAL;
808         }
809         tmp = slist->next;
810         while (tmp) {
811             if (slist->id == tmp->id) {
812                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
813                 return EINVAL;
814             }
815             tmp = tmp->next;
816         }
817
818         if (validate_config(slist)) {
819             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
820             return EINVAL;
821         }
822
823         /* Allocate an identity_t for this set-type identity. */
824         instance->sets[i] = new_identity(slist);
825
826         if (instance->sets[i] == NULL) {
827             printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n");
828             return ENOMEM;
829         }
830
831         /* Loop through children and look up each in leaf or ident map
832          * depending on the type of child.  Set the child's parent pointer
833          * to the identity we just created above, unless it is already set,
834          * in which case we have an error. */
835         while (members) {
836             identity_t *child_ident = NULL;
837             leaf_t *child_leaf = NULL;
838
839             switch (members->type) {
840                 case MEMBER_XID:
841                     child_leaf = map_search(instance->leaf_map, &members->value,
842                                             sizeof(members->value));
843                     if (child_leaf == NULL) {
844                         printlog(LOG_CRITICAL, "xid: child leaf not found.\n");
845                         return EINVAL;
846                     }
847                     if (child_leaf->parent != NULL) {
848                         /* Error - This leaf already has a parent. */
849                         printlog(LOG_CRITICAL, "xid: child already has a parent.\n");
850                         return EINVAL;
851                     }
852                     child_leaf->parent = instance->sets[i];
853                     break;
854                 case MEMBER_GUID:
855                     child_ident = map_search(instance->ident_map, &members->value,
856                                              sizeof(members->value));
857                     if (child_ident == NULL) {
858                         printlog(LOG_CRITICAL, "guid: child identity not found.\n");
859                         return EINVAL;
860                     }
861                     if (child_ident->parent != NULL) {
862                         /* Error - This identity already has a parent. */
863                         printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n");
864                         return EINVAL;
865                     }
866                     child_ident->parent = instance->sets[i];
867                     break;
868                 default:
869                     /* Error - shouldn't be possible. */
870                     break;
871             }
872             members = members->next;
873         }
874
875         map_insert(instance->ident_map, &instance->sets[i]->id,
876                    sizeof(instance->sets[i]->id), instance->sets[i]);
877
878         slist = slist->next;
879         i++;
880     }
881     return 0;
882 }
883
884 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
885     int count = 0;
886     identity_t *current_ident;
887     leaf_t *current_leaf;
888     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
889     if (leaves == NULL) {
890         return 1;
891     }
892
893     map_reset_iterate(instance->leaf_map);
894     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
895         current_ident = current_leaf->parent;
896         while (current_ident != NULL && current_ident != instance->last_machine) {
897             if (current_ident == ident) {
898                 /* Found the ident we were looking for - add the leaf. */
899                 leaves[count] = current_leaf;
900                 count += 1;
901                 break;
902             }
903             current_ident = current_ident->parent;
904         }
905     }
906
907     ident->leaves = leaves;
908     ident->leaf_count = count;
909
910     return 0;
911 }
912
913 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
914     int i, j;
915     ident_config *config = configs.machines;
916     leaf_t *leaf = NULL;
917
918     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
919
920     if (instance->cal == NULL) {
921         return ENOMEM;
922     }
923
924     for (i = 0; i < SCHEDLEN; ++i) {
925         TAILQ_INIT(instance->cal + i);
926     }
927     instance->cal_slot = 0;
928
929     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
930
931     if (instance->machines == NULL) {
932         return ENOMEM;
933     }
934
935     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
936     instance->machine_count = configs.machine_count;
937
938     /* Allocate and add the machine identities. */
939     for (i = 0; i < configs.machine_count; ++i) {
940         identity_action *loop_action;
941         identity_action *comm_action;
942         instance->machines[i] = new_identity(config);
943
944         if (instance->machines[i] == NULL) {
945             return ENOMEM;
946         }
947
948         loop_action = malloc(sizeof(identity_action));
949         comm_action = malloc(sizeof(identity_action));
950
951         if (loop_action == NULL || comm_action == NULL) {
952             return ENOMEM;
953         }
954
955         /* The first has no parent - it is the root.  All others have the
956          * previous ident as their parent. */
957         if (i == 0) {
958             instance->machines[i]->parent = NULL;
959         } else {
960             instance->machines[i]->parent = instance->machines[i - 1];
961         }
962
963         instance->last_machine = instance->machines[i];
964
965         /* Add the ident to the guid->ident map. */
966         map_insert(instance->ident_map, &instance->machines[i]->id,
967                    sizeof(instance->machines[i]->id), instance->machines[i]);
968
969         config = config->next;
970
971         memset(loop_action, 0, sizeof(identity_action));
972         memset(comm_action, 0, sizeof(identity_action));
973         loop_action->ident = instance->machines[i];
974         loop_action->action = ACTION_MAINLOOP;
975         loop_action->valid = 1;
976         comm_action->ident = instance->machines[i];
977         comm_action->action = ACTION_COMMUNICATE;
978         comm_action->valid = 1;
979
980         instance->machines[i]->loop_action = loop_action;
981         instance->machines[i]->comm_action = comm_action;
982
983         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
984                           loop_action, calendar);
985
986         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
987                           comm_action, calendar);
988
989         /* Setup the array of pointers to leaves.  This is easy for machines
990          * because a machine node applies to every leaf. */
991         instance->machines[i]->leaves =
992             malloc(instance->leaf_count * sizeof(leaf_t *));
993         if (instance->machines[i]->leaves == NULL) {
994             return ENOMEM;
995         }
996         instance->machines[i]->leaf_count = instance->leaf_count;
997         for (j = 0; j < instance->leaf_count; ++j) {
998             instance->machines[i]->leaves[j] = &instance->leaves[j];
999         }
1000     }
1001
1002     /* Connect the set subtree to the machines. Any set or leaf without a
1003      * parent will take the last machine as its parent. */
1004
1005     /* Leaves... */
1006     map_reset_iterate(instance->leaf_map);
1007     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1008         if (leaf->parent == NULL) {
1009             leaf->parent = instance->last_machine;
1010         }
1011     }
1012
1013     /* Sets... */
1014     for (i = 0; i < instance->set_count; ++i) {
1015         identity_action *loop_action;
1016         identity_action *comm_action;
1017
1018         if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) {
1019             instance->sets[i]->parent = instance->last_machine;
1020         }
1021
1022         loop_action = malloc(sizeof(identity_action));
1023         comm_action = malloc(sizeof(identity_action));
1024
1025         if (loop_action == NULL || comm_action == NULL) {
1026             return ENOMEM;
1027         }
1028
1029         memset(loop_action, 0, sizeof(identity_action));
1030         memset(comm_action, 0, sizeof(identity_action));
1031         loop_action->ident = instance->sets[i];
1032         loop_action->action = ACTION_MAINLOOP;
1033         loop_action->valid = 1;
1034         comm_action->ident = instance->sets[i];
1035         comm_action->action = ACTION_COMMUNICATE;
1036         comm_action->valid = 1;
1037
1038         instance->sets[i]->loop_action = loop_action;
1039         instance->sets[i]->comm_action = comm_action;
1040
1041         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1042                           loop_action, calendar);
1043
1044         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1045                           comm_action, calendar);
1046
1047         /* Setup the array of pointers to leaves.  This is harder for sets,
1048          * but this doesn't need to be super-efficient because it happens
1049          * rarely and it isn't on the critical path for reconfig(). */
1050         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1051             return ENOMEM;
1052         }
1053     }
1054
1055     /* Success. */
1056     return 0;
1057 }
1058
1059 static void print_instance(drl_instance_t *instance) {
1060     leaf_t *leaf = NULL;
1061     identity_t *ident = NULL;
1062
1063     if (system_loglevel == LOG_DEBUG) {
1064         map_reset_iterate(instance->leaf_map);
1065         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1066             printf("%x:", leaf->xid);
1067             ident = leaf->parent;
1068             while (ident) {
1069                 printf("%d:",ident->id);
1070                 ident = ident->parent;
1071             }
1072             printf("Leaf's parent pointer is %p\n", leaf->parent);
1073         }
1074
1075         printf("instance->last_machine is %p\n", instance->last_machine);
1076     }
1077 }
1078
1079 static int assign_htb_hierarchy(drl_instance_t *instance) {
1080     int i, j;
1081     int next_node = 0x100;
1082
1083     /* Chain machine nodes under 1:10. */
1084     for (i = 0; i < instance->machine_count; ++i) {
1085         if (instance->machines[i]->parent == NULL) {
1086             /* Top node. */
1087             instance->machines[i]->htb_parent = 0x10;
1088         } else {
1089             /* Pointerific! */
1090             instance->machines[i]->htb_parent =
1091                 instance->machines[i]->parent->htb_node;
1092         }
1093
1094         instance->machines[i]->htb_node = next_node;
1095         next_node += 1;
1096     }
1097
1098     next_node += 0x10;
1099
1100     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1101      * already there. */
1102     for (j = (instance->set_count - 1); j >= 0; --j) {
1103         if (instance->sets[j]->parent == NULL) {
1104             /* Independent node - goes under 0x10 away from machine nodes. */
1105             instance->sets[j]->htb_parent = 0x10;
1106         } else {
1107             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1108         }
1109         instance->sets[j]->htb_node = next_node;
1110
1111         next_node += 1;
1112     }
1113
1114     return 0;
1115 }
1116
1117 /* Added this so that I could comment one line and kill off all of the
1118  * command execution. */
1119 static inline int execute_cmd(const char *cmd) {
1120     return system(cmd);
1121 }
1122
1123 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1124                                const uint32_t classid_major, const uint32_t classid_minor,
1125                                const uint64_t rate, const uint64_t ceil) {
1126     char cmd[300];
1127
1128     sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1129             iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1130     printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1131
1132     return execute_cmd(cmd);
1133 }
1134
1135 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1136                                 const uint32_t parent_minor, const uint32_t handle,
1137                                 const int loss, const int delay) {
1138     char cmd[300];
1139
1140     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1141             parent_minor, handle);
1142     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1143     if (execute_cmd(cmd))
1144         printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1145
1146     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1147             iface, parent_major, parent_minor, handle, loss, delay);
1148     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1149     return execute_cmd(cmd);
1150 }
1151
1152 static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
1153                                 const uint32_t parent_minor, const uint32_t handle,
1154                                 const int perturb) {
1155     char cmd[300];
1156
1157     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1158             parent_minor, handle);
1159     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1160     if (execute_cmd(cmd))
1161         printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
1162
1163     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
1164             iface, parent_major, parent_minor, handle, perturb);
1165     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1166     return execute_cmd(cmd);
1167 }
1168
1169 static int create_htb_hierarchy(drl_instance_t *instance) {
1170     char cmd[300];
1171     int i, j, k;
1172     uint64_t gigabit = 1024 * 1024 * 1024;
1173
1174     /* Nuke the hierarchy. */
1175     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1176     execute_cmd(cmd);
1177     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1178
1179     /* Re-initialize the basics. */
1180     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1181     if (execute_cmd(cmd)) {
1182         return 1;
1183     }
1184     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1185
1186     if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1187         return 1;
1188
1189     /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1190     if (limiter.nodelimit) {
1191         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1192             return 1;
1193     } else {
1194         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1195             return 1;
1196     }
1197
1198     /* Add machines. */
1199     for (i = 0; i < instance->machine_count; ++i) {
1200         if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1201                          instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1202             return 1;
1203         }
1204     }
1205
1206 #define LIMITEXEMPT
1207
1208     /* Add back 1:20. */
1209 #ifdef LIMITEXEMPT
1210     if (instance->last_machine == NULL) {
1211         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1212     } else {
1213         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1214             instance->last_machine->htb_node);
1215     }
1216 #else
1217     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1218 #endif
1219
1220     if (execute_cmd(cmd)) {
1221         return 1;
1222     }
1223     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1224
1225     /* Add sets. */
1226     for (j = (instance->set_count - 1); j >= 0; --j) {
1227         if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1228                          instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1229             return 1;
1230         }
1231     }
1232
1233     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1234     for (k = 0; k < instance->leaf_count; ++k) {
1235         if (instance->leaves[k].parent == NULL) {
1236             if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1237                 return 1;
1238         } else {
1239             if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1240                 return 1;
1241         }
1242
1243         /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1244         if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1245             return 1;
1246     }
1247
1248     /* Add 1:1000 and 1:2000 */
1249     if (instance->last_machine == NULL) {
1250         if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1251             return 1;
1252     } else {
1253         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1254             return 1;
1255     }
1256
1257     if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1258         return 1;
1259
1260     /* Add 1:1fff and 1:2fff */
1261     if (instance->last_machine == NULL) {
1262         if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1263             return 1;
1264     } else {
1265         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1266             return 1;
1267     }
1268
1269     if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1270         return 1;
1271
1272     /* Artifical delay or loss for experimentation. */
1273     if (netem_delay.u.value || netem_loss.u.value) {
1274         if (!strcmp(netem_slice.u.string, "ALL")) {
1275             /* By default, netem applies to all leaves. */
1276             if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1277                 return 1;
1278             if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1279                 return 1;
1280
1281             for (k = 0; k < instance->leaf_count; ++k) {
1282                 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1283                             (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1284                     return 1;
1285                 }
1286
1287                 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1288             }
1289         } else {
1290             /* netem_slice is not the default ALL value.  Only apply netem
1291              * to the slice that is set in netem_slice.u.string. */
1292             uint32_t slice_xid;
1293
1294             sscanf(netem_slice.u.string, "%x", &slice_xid);
1295
1296             if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1297                 return 1;
1298         }
1299     }
1300
1301     /* Turn on SFQ for experimentation. */
1302     if (strcmp(sfq_slice.u.string, "NONE")) {
1303         if (!strcmp(sfq_slice.u.string, "ALL")) {
1304             if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
1305                 return 1;
1306             if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
1307                 return 1;
1308
1309             for (k = 0; k < instance->leaf_count; ++k) {
1310                 if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
1311                             (0x1000 | instance->leaves[k].xid), 30)) {
1312                     return 1;
1313                 }
1314             }
1315         } else {
1316             uint32_t slice_xid;
1317
1318             sscanf(sfq_slice.u.string, "%x", &slice_xid);
1319
1320             if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
1321                 return 1;
1322         }
1323     }
1324
1325     return 0;
1326 }
1327
1328 static int setup_tc_grd(drl_instance_t *instance) {
1329     int i, j;
1330     char cmd[300];
1331
1332     for (i = 0; i < instance->leaf_count; ++i) {
1333         /* Delete the old pfifo qdisc that might have been there before. */
1334         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1335                 instance->leaves[i].xid, instance->leaves[i].xid);
1336
1337         if (execute_cmd(cmd)) {
1338             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1339         }
1340
1341         /* Add the netem qdisc. */
1342         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1343                 instance->leaves[i].xid, instance->leaves[i].xid);
1344
1345         if (execute_cmd(cmd)) {
1346             printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1347             return 1;
1348         }
1349     }
1350
1351     /* Do the same for 1000 and 1fff. */
1352     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1353
1354     if (execute_cmd(cmd)) {
1355         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1356     }
1357
1358     /* Add the netem qdisc. */
1359     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1360
1361     if (execute_cmd(cmd)) {
1362         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1363         return 1;
1364     }
1365
1366     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1367
1368     if (execute_cmd(cmd)) {
1369         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1370     }
1371
1372     /* Add the netem qdisc. */
1373     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1374
1375     if (execute_cmd(cmd)) {
1376         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1377         return 1;
1378     }
1379
1380     /* Artifical delay or loss for experimentation. */
1381     if (netem_delay.u.value || netem_loss.u.value) {
1382         if (!strcmp(netem_slice.u.string, "ALL")) {
1383             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1384             if (execute_cmd(cmd)) {
1385                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1386                 return 1;
1387             }
1388
1389             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1390             if (execute_cmd(cmd)) {
1391                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1392                 return 1;
1393             }
1394
1395             for (j = 0; j < instance->leaf_count; ++j) {
1396                 leaf_t *current = &instance->leaves[j];
1397
1398                 current->delay = netem_delay.u.value;
1399
1400                 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
1401
1402                 if (execute_cmd(cmd)) {
1403                     printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1404                     return 1;
1405                 }
1406             }
1407         } else {
1408             uint32_t slice_xid;
1409             leaf_t *leaf = NULL;
1410
1411             sscanf(netem_slice.u.string, "%x", &slice_xid);
1412
1413             leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
1414
1415             if (leaf == NULL) {
1416                 /* Leaf not found - invalid selection. */
1417                 printf("Your experimental setup is incorrect...\n");
1418                 return 1;
1419             }
1420
1421             leaf->delay = netem_delay.u.value;
1422
1423             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
1424
1425             if (execute_cmd(cmd)) {
1426                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1427                 return 1;
1428             }
1429         }
1430     }
1431
1432     return 0;
1433 }
1434
1435 /* init_drl 
1436  * 
1437  * Initialize this limiter with options 
1438  * Open UDP socket for peer communication
1439  */
1440 static int init_drl(void) {
1441     parsed_configs configs;
1442     struct sockaddr_in server_address;
1443     
1444     memset(&limiter, 0, sizeof(limiter_t));
1445
1446     /* Setup logging. */
1447     system_loglevel = (uint8_t) drl_loglevel.u.value;
1448     logfile = fopen(drl_logfile.u.string, "w");
1449
1450     if (logfile == NULL) {
1451         printf("Couldn't open logfile - ");
1452         perror("fopen()");
1453         exit(EXIT_FAILURE);
1454     }
1455
1456     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1457
1458     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1459
1460     init_hashing();  /* for all hash maps */
1461
1462     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1463
1464     /* determine our local IP by iterating through interfaces */
1465     if ((limiter.ip = get_local_ip())==0) {
1466         printlog(LOG_CRITICAL,
1467                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1468         return (false);
1469     }
1470     limiter.localaddr = inet_addr(limiter.ip);
1471     limiter.port = htons(LIMITER_LISTEN_PORT);
1472     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1473     if (limiter.udp_socket < 0) {
1474         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1475         return false;
1476     }
1477
1478     memset(&server_address, 0, sizeof(server_address));
1479     server_address.sin_family = AF_INET;
1480     server_address.sin_addr.s_addr = limiter.localaddr;
1481     server_address.sin_port = limiter.port;
1482
1483     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1484         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1485         return false;
1486     }
1487
1488     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1489     if (strcasecmp(policy.u.string,"GRD") == 0) {
1490         limiter.policy = POLICY_GRD;
1491     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1492         limiter.policy = POLICY_FPS;
1493     } else {
1494         printlog(LOG_CRITICAL,
1495                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1496         return (false);
1497     }
1498
1499     limiter.estintms = estintms.u.value;
1500     if (limiter.estintms > 1000) {
1501         printlog(LOG_CRITICAL,
1502                  "DRL: sorry estimate intervals must be less than 1 second.");
1503         printlog(LOG_CRITICAL,
1504                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1505         limiter.estintms = 1000;
1506     }
1507     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1508     
1509     /* Acquire the big limiter lock for writing.  Prevents pretty much
1510      * anything else from happening while the hierarchy is being changed. */
1511     pthread_rwlock_wrlock(&limiter.limiter_lock);
1512
1513     limiter.stable_instance.ident_map = allocate_map();
1514     if (limiter.stable_instance.ident_map == NULL) {
1515         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1516         return false;
1517     }
1518
1519     if (get_eligible_leaves(&limiter.stable_instance)) {
1520         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1521         return false;
1522     }
1523
1524     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1525         /* Parse error occured. Return non-zero to notify init_drl(). */
1526         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1527             drl_configfile.u.string);
1528         return false;
1529     }
1530
1531     /* Validate identity hierarchy! */
1532     if (validate_configs(configs, &limiter.stable_instance)) {
1533         /* Clean up everything. */
1534         free_failed_config(configs, &limiter.stable_instance);
1535         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1536             drl_configfile.u.string);
1537         return false;
1538     }
1539
1540     if (init_identities(configs, &limiter.stable_instance)) {
1541         free_failed_config(configs, &limiter.stable_instance);
1542         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1543         return false;
1544     }
1545
1546     /* At this point, we should be done with configs. */
1547     free_ident_list(configs.machines);
1548     free_ident_list(configs.sets);
1549
1550     print_instance(&limiter.stable_instance);
1551
1552     switch (limiter.policy) {
1553         case POLICY_FPS:
1554             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1555                 free_instance(&limiter.stable_instance);
1556                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1557                 return false;
1558             }
1559
1560             if (create_htb_hierarchy(&limiter.stable_instance)) {
1561                 free_instance(&limiter.stable_instance);
1562                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1563                 return false;
1564             }
1565         break;
1566
1567         case POLICY_GRD:
1568             if (setup_tc_grd(&limiter.stable_instance)) {
1569                 free_instance(&limiter.stable_instance);
1570                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1571                 return false;
1572             }
1573         break;
1574
1575         default:
1576         return false;
1577     }
1578
1579     partition_set = partition.u.value;
1580
1581     pthread_rwlock_unlock(&limiter.limiter_lock);
1582
1583     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1584         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1585         return false;
1586     }
1587
1588     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1589
1590     return true;
1591 }
1592
1593 static void reconfig() {
1594     parsed_configs configs;
1595
1596     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1597     flushlog();
1598
1599     memset(&configs, 0, sizeof(parsed_configs));
1600     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1601
1602     limiter.new_instance.ident_map = allocate_map();
1603     if (limiter.new_instance.ident_map == NULL) {
1604         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1605         return;
1606     }
1607
1608     if (get_eligible_leaves(&limiter.new_instance)) {
1609         free_failed_config(configs, &limiter.new_instance);
1610         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1611         return;
1612     }
1613
1614     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1615         free_failed_config(configs, &limiter.new_instance);
1616         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1617         return;
1618     }
1619
1620     if (validate_configs(configs, &limiter.new_instance)) {
1621         free_failed_config(configs, &limiter.new_instance);
1622         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1623         pthread_rwlock_unlock(&limiter.limiter_lock);
1624         return;
1625     }
1626
1627     if (init_identities(configs, &limiter.new_instance)) {
1628         free_failed_config(configs, &limiter.new_instance);
1629         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1630         pthread_rwlock_unlock(&limiter.limiter_lock);
1631         return;
1632     }
1633
1634     free_ident_list(configs.machines);
1635     free_ident_list(configs.sets);
1636
1637     print_instance(&limiter.new_instance);
1638     
1639     /* Lock */
1640     pthread_rwlock_wrlock(&limiter.limiter_lock);
1641
1642     switch (limiter.policy) {
1643         case POLICY_FPS:
1644             if (assign_htb_hierarchy(&limiter.new_instance)) {
1645                 free_instance(&limiter.new_instance);
1646                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1647                 pthread_rwlock_unlock(&limiter.limiter_lock);
1648                 return;
1649             }
1650
1651             if (create_htb_hierarchy(&limiter.new_instance)) {
1652                 free_instance(&limiter.new_instance);
1653                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1654
1655                 /* Re-create old instance. */
1656                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1657                     /* Error reinstating the old one - big problem. */
1658                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1659                     printlog(LOG_CRITICAL, "Giving up...\n");
1660                     flushlog();
1661                     exit(EXIT_FAILURE);
1662                 }
1663
1664                 pthread_rwlock_unlock(&limiter.limiter_lock);
1665                 return;
1666             }
1667         break;
1668
1669         case POLICY_GRD:
1670             if (setup_tc_grd(&limiter.new_instance)) {
1671                 free_instance(&limiter.new_instance);
1672                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1673
1674                 /* Try to re-create old instance. */
1675                 if (setup_tc_grd(&limiter.stable_instance)) {
1676                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1677                     printlog(LOG_CRITICAL, "Giving up...\n");
1678                     flushlog();
1679                     exit(EXIT_FAILURE);
1680                 }
1681             }
1682         break;
1683
1684         default:
1685             /* Should be impossible. */
1686             printf("Pigs are flying?\n");
1687             exit(EXIT_FAILURE);
1688     }
1689
1690     /* Switch over new to stable instance. */
1691     free_instance(&limiter.stable_instance);
1692     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1693
1694     /* Success! - Unlock */
1695     pthread_rwlock_unlock(&limiter.limiter_lock);
1696 }
1697
1698 static ulog_output_t drl_op = {
1699     .name = "drl",
1700     .output = &_output_drl,
1701     .signal = NULL, /* This appears to be broken. Using my own handler. */
1702     .init = NULL,
1703     .fini = NULL,
1704 };
1705
1706 /* Tests the amount of time it takes to call reconfig(). */
1707 static void time_reconfig(int iterations) {
1708     struct timeval start, end;
1709     int i;
1710
1711     gettimeofday(&start, NULL);
1712     for (i = 0; i < iterations; ++i) {
1713         reconfig();
1714     }
1715     gettimeofday(&end, NULL);
1716
1717     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1718            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1719     exit(0);
1720
1721     // Seems to take about 85ms / iteration
1722 }
1723
1724 static int stop_enforcement(drl_instance_t *instance) {
1725     char cmd[300];
1726     int i;
1727
1728     for (i = 0; i < instance->machine_count; ++i) {
1729         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1730                 instance->machines[i]->htb_parent,
1731                 instance->machines[i]->htb_node);
1732
1733         if (execute_cmd(cmd)) {
1734             return 1;
1735         }
1736     }
1737
1738     for (i = 0; i < instance->set_count; ++i) {
1739         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1740                 instance->sets[i]->htb_parent,
1741                 instance->sets[i]->htb_node);
1742
1743         if (execute_cmd(cmd)) {
1744             return 1;
1745         }
1746     }
1747
1748     return 0;
1749 }
1750
1751 static void *signal_thread_func(void *args) {
1752     int sig;
1753     int err;
1754     sigset_t sigs;
1755
1756     sigemptyset(&sigs);
1757     sigaddset(&sigs, SIGHUP);
1758     sigaddset(&sigs, SIGUSR1);
1759     sigaddset(&sigs, SIGUSR2);
1760     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1761
1762     while (1) {
1763         sigemptyset(&sigs);
1764         sigaddset(&sigs, SIGHUP);
1765         sigaddset(&sigs, SIGUSR1);
1766         sigaddset(&sigs, SIGUSR2);
1767
1768         err = sigwait(&sigs, &sig);
1769
1770         if (err) {
1771             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1772             flushlog();
1773         }
1774
1775         switch (sig) {
1776             case SIGHUP:
1777                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1778                 reconfig();
1779                 //time_reconfig(1000); /* instrumentation */
1780                 flushlog();
1781                 break;
1782             case SIGUSR1:
1783                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1784                 if (do_enforcement) {
1785                     do_enforcement = 0;
1786                     stop_enforcement(&limiter.stable_instance);
1787                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1788                 } else {
1789                     do_enforcement = 1;
1790                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1791                 }
1792                 pthread_rwlock_unlock(&limiter.limiter_lock);
1793                 break;
1794             case SIGUSR2:
1795                 do_partition = !do_partition;
1796                 break;
1797             default:
1798                 /* Intentionally blank. */
1799                 break;
1800         }
1801     }
1802
1803 }
1804
1805 /* register output plugin with ulogd */
1806 static void _drl_reg_op(void)
1807 {
1808     ulog_output_t *op = &drl_op;
1809     sigset_t signal_mask;
1810
1811     sigemptyset(&signal_mask);
1812     sigaddset(&signal_mask, SIGHUP);
1813     sigaddset(&signal_mask, SIGUSR1);
1814     sigaddset(&signal_mask, SIGUSR2);
1815     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1816
1817     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1818         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1819         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1820         flushlog();
1821         exit(EXIT_FAILURE);
1822     }
1823
1824     if (!init_drl()) {
1825         printlog(LOG_CRITICAL, "Init failed. :(\n");
1826         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1827         flushlog();
1828         exit(EXIT_FAILURE);
1829     }
1830
1831     register_output(op);
1832
1833     /* start up the thread that will periodically estimate the
1834      * local rate and set the local limits
1835      * see estimate.c
1836      */
1837     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1838         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1839         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1840         exit(EXIT_FAILURE);
1841     }
1842
1843     if (enforce_on.u.value) {
1844         pthread_rwlock_wrlock(&limiter.limiter_lock);
1845         do_enforcement = 1;
1846         printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1847         pthread_rwlock_unlock(&limiter.limiter_lock);
1848     }
1849 }
1850
1851 void _init(void)
1852 {
1853     /* have the opts parsed */
1854     config_parse_file("DRL", config_entries);
1855
1856     if (get_ids()) {
1857         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1858         exit(2);
1859     }
1860
1861     /* Seed the hash function */
1862     salt = getpid() ^ time(NULL);
1863
1864     _drl_reg_op();
1865 }