When create_htb=0 is specified in ulogd.conf, DRL now ensures that each identity's
[distributedratelimiting.git] / drl / ulogd_DRL.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * ulogd output target for DRL: GRD and FPS
5  *
6  * Ken Yocum <kyocum@cs.ucsd.edu>
7  *
8  * Original shell of this code from ulogd_NETFLOW
9  * Like that code, we keep track of per-slice data rates 
10  * out of this slice.  Thus we are rate limiting particular slices
11  * across multiple boxes, ensuring that their outbound rate does not
12  * exceed some fixed limit.  
13  *
14  * Fabric:  static mesh
15  *
16  * Enforcer: linux drop percentage.
17  *    
18  * This file reads packets from the netlink socket.  It updates all
19  * the hashmaps which track how much data has arrived per flow.
20  * It starts two threads for this limiter.
21  * One thread handles periodic estimation.
22  * The other thread handles communication with other limiters. 
23  *
24  * 
25  * Code map
26  *
27  * ulogd_DRL: attach to netlink socket, accept packets.  replaces ratelimit.cc
28  * util.c: generic hashing functions, flow comparisons, sundry items. 
29  * gossip.c: Recv gossip, send gossip.
30  * peer_comm.c: Thread to listen for updates from other limiters. 
31  * estimate.c: Thread to calculate the local limits. 
32  * 
33  * 
34  * Ken Yocum <kyocum@cs.ucsd.edu>
35  * 2007 
36  * 
37  * Some code appropriated from ulogd_NETFLOW:
38  *
39  * Mark Huang <mlhuang@cs.princeton.edu>
40  * Copyright (C) 2004-2005 The Trustees of Princeton University
41  *
42  * Based on admindump.pl by Mic Bowman and Paul Brett
43  * Copyright (c) 2002 Intel Corporation
44  *
45  */
46
47 /* Enable GNU glibc extensions */
48 #define _GNU_SOURCE
49
50 #include <stdio.h>
51 #include <stdlib.h>
52
53 /* va_start() and friends */
54 #include <stdarg.h>
55
56 /* ispunct() */
57 #include <ctype.h>
58
59 /* strstr() and friends */
60 #include <string.h>
61
62 /* dirname() and basename() */
63 #include <libgen.h>
64
65 /* fork() and wait() */
66 #include <sys/types.h>
67 #include <unistd.h>
68 #include <sys/wait.h>
69
70 /* errno and assert() */
71 #include <errno.h>
72 #include <assert.h>
73
74 /* getopt_long() */
75 #include <getopt.h>
76
77 /* time() and friends */
78 #include <time.h>
79 #include <sys/time.h>
80
81 /* inet_aton() */
82 #include <sys/socket.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85
86 /* ICMP definitions */
87 #include <netinet/ip.h>
88 #include <netinet/ip_icmp.h>
89
90 /* stat() */
91 #include <sys/stat.h>
92
93 /* pthread_create() */
94 #include <pthread.h>
95
96 /* flock() */
97 #include <sys/file.h>
98
99 /* Signal definitions - so that we can catch SIGHUP and update config. */
100 #include <signal.h>
101
102 #include <ulogd/ulogd.h>
103 #include <ulogd/conffile.h>
104
105 /* Perhaps useful for files within vservers? */
106 #if !defined(STANDALONE) && HAVE_LIBPROPER
107 #include <proper/prop.h>
108 #endif
109
110 /*
111  * Jenkins hash support
112  * lives in raterouter.h
113  */
114
115 /* DRL specifics */
116 #include "raterouter.h"
117 #include "util.h"
118 #include "ratetypes.h" /* needs util and pthread.h */
119 #include "calendar.h"
120 #include "logging.h"
121
122 /*
123  * /etc/ulogd.conf configuration options
124  * Add the config options for DRL. 
125  */
126
127 static config_entry_t create_htb = {
128     .next = NULL,
129     .key = "create_htb",
130     .type = CONFIG_TYPE_INT,
131     .options = CONFIG_OPT_NONE,
132     .u = { .value = 1 },
133 };
134
135 static config_entry_t enforce_on = {
136     .next = &create_htb,
137     .key = "enforce_on",
138     .type = CONFIG_TYPE_INT,
139     .options = CONFIG_OPT_NONE,
140     .u = { .value = 1 },
141 };
142
143 static config_entry_t partition = {
144     .next = &enforce_on,
145     .key = "partition_set",
146     .type = CONFIG_TYPE_INT,
147     .options = CONFIG_OPT_NONE,
148     .u = { .value = 0xfffffff },
149 };
150
151 static config_entry_t sfq_slice = {
152     .next = &partition,
153     .key = "sfq_slice",
154     .type = CONFIG_TYPE_STRING,
155     .options = CONFIG_OPT_NONE,
156     .u = { .string = "NONE" },
157 };
158
159 static config_entry_t netem_slice = {
160     .next = &sfq_slice,
161     .key = "netem_slice",
162     .type = CONFIG_TYPE_STRING,
163     .options = CONFIG_OPT_NONE,
164     .u = { .string = "ALL" },
165 };
166
167 static config_entry_t netem_loss = {
168     .next = &netem_slice,
169     .key = "netem_loss",
170     .type = CONFIG_TYPE_INT,
171     .options = CONFIG_OPT_NONE,
172     .u = { .value = 0 },
173 };
174
175 static config_entry_t netem_delay = {
176     .next = &netem_loss,
177     .key = "netem_delay",
178     .type = CONFIG_TYPE_INT,
179     .options = CONFIG_OPT_NONE,
180     .u = { .value = 0 },
181 };
182
183 static config_entry_t drl_configfile = {
184     .next = &netem_delay,
185     .key = "drl_configfile",
186     .type = CONFIG_TYPE_STRING,
187     .options = CONFIG_OPT_MANDATORY,
188     .u = { .string = "drl.xml" },
189 };
190
191 /** The administrative bandwidth limit (mbps) for the local node.  The node
192  * will not set a limit higher than this, even when distributed capacity is
193  * available.  Set to 0 for no limit. */
194 static config_entry_t nodelimit = {
195     .next = &drl_configfile,
196     .key = "nodelimit",
197     .type = CONFIG_TYPE_INT,
198     .options = CONFIG_OPT_MANDATORY,
199     .u = { .value = 0 },
200 };
201
202 /** Determines the verbosity of logging. */
203 static config_entry_t drl_loglevel = {
204     .next = &nodelimit,
205     .key = "drl_loglevel",
206     .type = CONFIG_TYPE_INT,
207     .options = CONFIG_OPT_MANDATORY,
208     .u = { .value = LOG_WARN },
209 };
210
211 /** The path of the logfile. */
212 static config_entry_t drl_logfile = {
213     .next = &drl_loglevel,
214     .key = "drl_logfile",
215     .type = CONFIG_TYPE_STRING,
216     .options = CONFIG_OPT_MANDATORY,
217     .u = { .string = "drl_logfile.log" },
218 };
219
220 /** The choice of DRL protocol. */
221 static config_entry_t policy = {
222     .next = &drl_logfile,
223     .key = "policy",
224     .type = CONFIG_TYPE_STRING,
225     .options = CONFIG_OPT_MANDATORY,
226     .u = { .string = "GRD" },
227 };
228
229 /** The estimate interval, in milliseconds. */
230 static config_entry_t estintms = {
231     .next = &policy,
232     .key = "estintms",
233     .type = CONFIG_TYPE_INT,
234     .options = CONFIG_OPT_MANDATORY,
235     .u = { .value = 100 },
236 };
237
238 #define config_entries (&estintms)
239
240 /*
241  * Debug functionality
242  */
243
244 #ifdef DMALLOC
245 #include <dmalloc.h>
246 #endif
247
248 #define NIPQUAD(addr) \
249     ((unsigned char *)&addr)[0], \
250     ((unsigned char *)&addr)[1], \
251     ((unsigned char *)&addr)[2], \
252     ((unsigned char *)&addr)[3]
253
254 #define IPQUAD(addr) \
255     ((unsigned char *)&addr)[3], \
256     ((unsigned char *)&addr)[2], \
257     ((unsigned char *)&addr)[1], \
258     ((unsigned char *)&addr)[0]
259
260
261
262 /* Salt for the hash functions */
263 static int salt;
264
265 /*
266  * Hash slice name lookups on context ID.
267  */
268
269 /* Special context IDs */
270 #define UNKNOWN_XID -1
271 #define ROOT_XID 0
272
273 enum {
274     CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
275     ICMP_ECHOREPLY_XID,
276     ICMP_UNREACH_XID,
277 };
278
279
280 /* globals */
281 pthread_t estimate_thread;
282 pthread_t signal_thread;
283 pthread_t comm_thread;
284 uint32_t local_ip = 0;
285 limiter_t limiter;
286 extern FILE *logfile;
287 extern uint8_t system_loglevel;
288 extern uint8_t do_enforcement;
289
290 /* Used to simulate partitions. */
291 int do_partition = 0;
292 int partition_set = 0xfffffff;
293
294 /* functions */
295
296 static inline uint32_t
297 hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
298 {
299     unsigned char mybytes[FLOWKEYSIZE];
300     mybytes[0] = protocol;
301     *(uint32_t*)(&(mybytes[1])) = src_ip;
302     *(uint32_t*)(&(mybytes[5])) = dst_ip;
303     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
304     return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
305 }
306
307 uint32_t sampled_hasher(const key_flow *key) {
308     /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
309     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
310 }
311
312 uint32_t standard_hasher(const key_flow *key) {
313     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
314 }
315
316 uint32_t multiple_hasher(const key_flow *key) {
317     return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
318 }
319
320 struct intr_id {
321     char* name;
322     ulog_iret_t *res;
323 };
324
325 /* Interesting keys */
326 enum {
327     OOB_TIME_SEC = 0,
328     OOB_MARK,
329     IP_SADDR,
330     IP_DADDR,
331     IP_TOTLEN,
332     IP_PROTOCOL,
333     TCP_SPORT,
334     TCP_DPORT,
335     TCP_ACK,
336     TCP_FIN,
337     TCP_SYN,
338     TCP_RST,
339     UDP_SPORT,
340     UDP_DPORT,
341     ICMP_TYPE,
342     ICMP_CODE,
343     GRE_FLAG_KEY,
344     GRE_VERSION,
345     GRE_KEY,
346     PPTP_CALLID,
347 };
348
349 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
350 static struct intr_id intr_ids[] = {
351     [OOB_TIME_SEC] = { "oob.time.sec", 0 },
352     [OOB_MARK] = { "oob.mark", 0 },
353     [IP_SADDR] = { "ip.saddr", 0 },
354     [IP_DADDR] = { "ip.daddr", 0 },
355     [IP_TOTLEN] = { "ip.totlen", 0 },
356     [IP_PROTOCOL] = { "ip.protocol", 0 },
357     [TCP_SPORT] = { "tcp.sport", 0 },
358     [TCP_DPORT] { "tcp.dport", 0 },
359     [TCP_ACK] = { "tcp.ack", 0 },
360     [TCP_FIN] = { "tcp.fin", 0 },
361     [TCP_SYN] = { "tcp.syn", 0 },
362     [TCP_RST] = { "tcp.rst", 0 },
363     [UDP_SPORT] = { "udp.sport", 0 },
364     [UDP_DPORT] = { "udp.dport", 0 },
365     [ICMP_TYPE] = { "icmp.type", 0 },
366     [ICMP_CODE] = { "icmp.code", 0 },
367     [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
368     [GRE_VERSION] = { "gre.version", 0 },
369     [GRE_KEY] = { "gre.key", 0 },
370     [PPTP_CALLID] = { "pptp.callid", 0 },
371 };
372
373 #define GET_VALUE(x) intr_ids[x].res->value
374
375 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
376
377 static int _output_drl(ulog_iret_t *res)
378 {
379     int xid;
380     uint32_t src_ip, dst_ip;
381     uint16_t src_port, dst_port;
382     uint8_t protocol;
383
384     key_flow key;
385     identity_t *ident;
386     leaf_t *leaf;
387
388     protocol = GET_VALUE(IP_PROTOCOL).ui8;
389     src_ip = GET_VALUE(IP_SADDR).ui32;
390     dst_ip = GET_VALUE(IP_DADDR).ui32;
391     xid = GET_VALUE(OOB_MARK).ui32;
392
393     switch (protocol) {
394             
395         case IPPROTO_TCP:
396             src_port = GET_VALUE(TCP_SPORT).ui16;
397             dst_port = GET_VALUE(TCP_DPORT).ui16;
398             break;
399
400         case IPPROTO_UDP:
401             /* netflow had an issue with many udp flows and set
402              * src_port=0 to handle it.  We don't. 
403              */
404             src_port = GET_VALUE(UDP_SPORT).ui16;
405
406             /*
407              * traceroutes create a large number of flows in the db
408              * this is a quick hack to catch the most common form
409              * of traceroute (basically we're mapping any UDP packet
410              * in the 33435-33524 range to the "trace" port, 33524 is
411              * 3 packets * nhops (30).
412              */
413             dst_port = GET_VALUE(UDP_DPORT).ui16;
414             if (dst_port >= 33435 && dst_port <= 33524)
415                 dst_port = 33435;
416             break;
417
418         case IPPROTO_ICMP:
419             src_port = GET_VALUE(ICMP_TYPE).ui8;
420             dst_port = GET_VALUE(ICMP_CODE).ui8;
421
422             /*
423              * We special case some of the ICMP traffic that the kernel
424              * always generates. Since this is attributed to root, it 
425              * creates significant "noise" in the output. We want to be
426              * able to quickly see that root is generating traffic.
427              */
428             if (xid == ROOT_XID) {
429                 if (src_port == ICMP_ECHOREPLY)
430                     xid = ICMP_ECHOREPLY_XID;
431                 else if (src_port == ICMP_UNREACH)
432                     xid = ICMP_UNREACH_XID;
433             }
434             break;
435
436         case IPPROTO_GRE:
437             if (GET_VALUE(GRE_FLAG_KEY).b) {
438                 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
439                     /* Get PPTP call ID */
440                     src_port = GET_VALUE(PPTP_CALLID).ui16;
441                 } else {
442                     /* XXX Truncate GRE keys to 16 bits */
443                     src_port = (uint16_t) GET_VALUE(GRE_KEY).ui32;
444                 }
445             } else {
446                 /* No key available */
447                 src_port = 0;
448             }
449             dst_port = 0;
450             break;
451
452         default:
453             /* This is the default key for packets from unsupported protocols */
454             src_port = 0;
455             dst_port = 0;
456             break;
457     }
458
459     key.protocol = protocol;
460     key.source_ip = src_ip;
461     key.dest_ip = dst_ip;
462     key.source_port = src_port;
463     key.dest_port = dst_port;
464     key.packet_size = GET_VALUE(IP_TOTLEN).ui16;
465     key.packet_time = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
466
467     pthread_rwlock_rdlock(&limiter.limiter_lock); /* CLUNK! */
468
469     leaf = (leaf_t *) map_search(limiter.stable_instance.leaf_map, &xid, sizeof(xid));
470
471     /* Even if the packet doesn't match any specific xid, it should still
472      * count in the machine-type tables.  This catches root (xid == 0) and
473      * unclassified (xid = fff) packets, which don't have map entries. */
474     if (leaf == NULL) {
475         ident = limiter.stable_instance.last_machine;
476     } else {
477         ident = leaf->parent;
478     }
479
480     while (ident) {
481         pthread_mutex_lock(&ident->table_mutex);
482
483         /* Update the identity's table. */
484         ident->table_sample_function(ident->table, &key);
485
486 #ifdef SHADOW_ACCTING
487
488         /* Update the shadow perfect copy of the accounting table. */
489         standard_table_sample((standard_flow_table) ident->shadow_table, &key);
490
491 #endif
492
493         pthread_mutex_unlock(&ident->table_mutex);
494
495         ident = ident->parent;
496     }
497
498     pthread_rwlock_unlock(&limiter.limiter_lock); /* CLINK! */
499
500     return 0;
501 }
502
503 /* get all key id's for the keys we are intrested in */
504 static int get_ids(void)
505 {
506     int i;
507     struct intr_id *cur_id;
508
509     for (i = 0; i < INTR_IDS; i++) {
510         cur_id = &intr_ids[i];
511         cur_id->res = keyh_getres(keyh_getid(cur_id->name));
512         if (!cur_id->res) {
513             ulogd_log(ULOGD_ERROR, 
514                     "Cannot resolve keyhash id for %s\n", 
515                     cur_id->name);
516             return 1;
517         }
518     }
519     return 0;
520 }
521
522 static void free_identity(identity_t *ident) {
523     if (ident) {
524         free_comm(&ident->comm);
525
526         if (ident->table) {
527             ident->table_destroy_function(ident->table);
528         }
529
530         if (ident->loop_action) {
531             ident->loop_action->valid = 0;
532         }
533
534         if (ident->comm_action) {
535             ident->comm_action->valid = 0;
536         }
537
538         pthread_mutex_destroy(&ident->table_mutex);
539
540         free(ident);
541     }
542 }
543
544 static void free_identity_map(map_handle map) {
545     identity_t *tofree = NULL;
546
547     map_reset_iterate(map);
548     while ((tofree = (identity_t *) map_next(map))) {
549         free_identity(tofree);
550     }
551
552     free_map(map, 0);
553 }
554
555 static void free_instance(drl_instance_t *instance) {
556     if (instance->leaves)
557         free(instance->leaves);
558     if (instance->leaf_map)
559         free_map(instance->leaf_map, 0);
560     if (instance->ident_map)
561         free_identity_map(instance->ident_map);
562     if (instance->machines)
563         free(instance->machines);
564     if (instance->sets)
565         free(instance->sets);
566
567     /* FIXME: Drain the calendar first and free all the entries. */
568     if (instance->cal) {
569         free(instance->cal);
570     }
571
572     memset(instance, 0, sizeof(drl_instance_t));
573 }
574
575 static void free_failed_config(parsed_configs configs, drl_instance_t *instance) {
576     /* Free configs. */
577     if (configs.machines)
578         free_ident_list(configs.machines);
579     if (configs.sets)
580         free_ident_list(configs.sets);
581
582     /* Free instance. */
583     if (instance)
584         free_instance(instance);
585 }
586
587 static identity_t *new_identity(ident_config *config) {
588     identity_t *ident = malloc(sizeof(identity_t));
589     remote_node_t *comm_nodes = malloc(sizeof(remote_node_t)*config->peer_count);
590     ident_peer *peer = config->peers;
591     int peer_slot = 0;
592
593     if (ident == NULL) {
594         return NULL;
595     }
596
597     if (comm_nodes == NULL) {
598         free(ident);
599         return NULL;
600     }
601
602     memset(ident, 0, sizeof(identity_t));
603     memset(comm_nodes, 0, config->peer_count * sizeof(remote_node_t));
604
605     ident->id = config->id;
606     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
607     ident->fixed_ewma_weight = config->fixed_ewma_weight;
608     ident->communication_intervals = config->communication_intervals;
609     ident->mainloop_intervals = config->mainloop_intervals;
610     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
611                              (limiter.estintms/1000.0) * config->mainloop_intervals);
612     ident->parent = NULL;
613     ident->independent = config->independent;
614
615     pthread_mutex_init(&ident->table_mutex, NULL);
616     switch (config->accounting) {
617         case ACT_STANDARD:
618             ident->table =
619                 standard_table_create(standard_hasher, &ident->common);
620
621             /* Ugly function pointer casting.  Makes things sufficiently
622              * generic, though. */
623             ident->table_sample_function =
624                 (int (*)(void *, const key_flow *)) standard_table_sample;
625             ident->table_cleanup_function =
626                 (int (*)(void *)) standard_table_cleanup;
627             ident->table_update_function =
628                 (void (*)(void *, struct timeval, double)) standard_table_update_flows;
629             ident->table_destroy_function =
630                 (void (*)(void *)) standard_table_destroy;
631             break;
632
633         case ACT_MULTIPLE:
634             ident->table =
635                 multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
636
637             ident->table_sample_function =
638                 (int (*)(void *, const key_flow *)) multiple_table_sample;
639             ident->table_cleanup_function =
640                 (int (*)(void *)) multiple_table_cleanup;
641             ident->table_update_function =
642                 (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
643             ident->table_destroy_function =
644                 (void (*)(void *)) multiple_table_destroy;
645             break;
646
647         case ACT_SAMPLEHOLD:
648             ident->table = sampled_table_create(sampled_hasher,
649                     ident->limit * IDENT_CLEAN_INTERVAL,
650                     SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
651
652             ident->table_sample_function =
653                 (int (*)(void *, const key_flow *)) sampled_table_sample;
654             ident->table_cleanup_function =
655                 (int (*)(void *)) sampled_table_cleanup;
656             ident->table_update_function =
657                 (void (*)(void *, struct timeval, double)) sampled_table_update_flows;
658             ident->table_destroy_function =
659                 (void (*)(void *)) sampled_table_destroy;
660             break;
661
662         case ACT_SIMPLE:
663             ident->table = simple_table_create(&ident->common);
664
665             ident->table_sample_function =
666                 (int (*)(void *, const key_flow *)) simple_table_sample;
667             ident->table_cleanup_function =
668                 (int (*)(void *)) simple_table_cleanup;
669             ident->table_update_function =
670                 (void (*)(void *, struct timeval, double)) simple_table_update_flows;
671             ident->table_destroy_function =
672                 (void (*)(void *)) simple_table_destroy;
673             break;
674     }
675
676 #ifdef SHADOW_ACCTING
677
678     ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
679
680     if (ident->shadow_table == NULL) {
681         ident->table_destroy_function(ident->table);
682         free(ident);
683         return NULL;
684     }
685
686 #endif
687
688     /* Make sure the table was allocated. */
689     if (ident->table == NULL) {
690         free(ident);
691         return NULL;
692     }
693
694     while (peer) {
695         comm_nodes[peer_slot].addr = peer->ip;
696         comm_nodes[peer_slot].port = htons(LIMITER_LISTEN_PORT);
697         peer = peer->next;
698         peer_slot += 1;
699     }
700
701     if (new_comm(&ident->comm, config, comm_nodes)) {
702         printlog(LOG_CRITICAL, "Failed to create communication structure.\n");
703         return NULL;
704     }
705
706     ident->comm.remote_nodes = comm_nodes;
707
708     if (!create_htb.u.value) {
709         ident->htb_node = config->htb_node;
710         ident->htb_parent = config->htb_parent;
711     }
712
713     return ident;
714 }
715
716 static int validate_htb_exists(int node, int parent) {
717     FILE *pipe = popen("/sbin/tc class show dev eth0", "r");
718     char line[200];
719
720     while (fgets(line, 200, pipe) != NULL) {
721         int n, p;
722         char ignore[200];
723
724         sscanf(line, "class htb 1:%x parent 1:%x prio %s", &n, &p, ignore);
725         if (n == node && p == parent) {
726             pclose(pipe);
727             return 0;
728         }
729     }
730
731     pclose(pipe);
732     return 1;
733 }
734
735 /* Determines the validity of the parameters of one ident_config.
736  *
737  * 0 valid
738  * 1 invalid
739  */
740 static int validate_config(ident_config *config) {
741     /* Limit must be a positive integer. */
742     if (config->limit < 1) {
743         return 1;
744     }
745
746     /* Commfabric must be a valid choice (COMM_MESH or COMM_GOSSIP). */
747     if (config->commfabric != COMM_MESH &&
748             config->commfabric != COMM_GOSSIP) {
749         return 1;
750     }
751
752     /* If commfabric is COMM_GOSSIP, this must be a positive integer. */
753     if (config->commfabric == COMM_GOSSIP && config->branch < 1) {
754         return 1;
755     }
756
757     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
758      * ACT_SIMPLE, ACT_MULTIPLE). */
759     if (config->accounting != ACT_STANDARD &&
760             config->accounting != ACT_SAMPLEHOLD &&
761             config->accounting != ACT_SIMPLE &&
762             config->accounting != ACT_MULTIPLE) {
763         return 1;
764     }
765
766     /* Ewma weight must be greater than or equal to zero. */
767     if (config->fixed_ewma_weight < 0) {
768         return 1;
769     }
770
771     if (!create_htb.u.value) {
772         if (config->htb_node < 0 || config->htb_parent < 0) {
773             printlog(LOG_CRITICAL, "When create_htb is disabled in ulogd.conf, an identity must specify the htb_node and htb_parent propertities in its configuration.\n");
774             return 1;
775         }
776
777         if (validate_htb_exists(config->htb_node, config->htb_parent)) {
778             printlog(LOG_CRITICAL, "Identity specified htb node %x with parent %x.  No such node/parent combo seems to exist!\n", config->htb_node, config->htb_parent);
779             return 1;
780         }
781     } else {
782         if (config->htb_node > -1 || config->htb_parent > -1) {
783             printlog(LOG_WARN, "htb_node or htb_parent are configured but ignored because we're configured to create our own htb hierarchy.\n");
784         }
785     }
786
787     /* Note: Parsing stage requires that each ident has at least one peer. */
788     return 0;
789 }
790
791 /* 0 success
792  * non-zero failure
793  */
794 static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
795
796     ident_config *mlist = configs.machines;
797     ident_config *slist = configs.sets;
798     ident_config *tmp = NULL;
799     int i = 0;
800
801     while (mlist) {
802         /* ID must be non-zero and unique. */
803         /* This is ugly and hackish, but this function will be called rarely.
804          * I'm tired of trying to be clever. */
805         if (mlist->id < 0) {
806             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", mlist->id, mlist->id);
807             return EINVAL;
808         }
809         tmp = mlist->next;
810         while (tmp) {
811             if (mlist->id == tmp->id) {
812                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
813                 return EINVAL;
814             }
815             tmp = tmp->next;
816         }
817         tmp = configs.sets;
818         while (tmp) {
819             if (mlist->id == tmp->id) {
820                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", mlist->id, mlist->id);
821                 return EINVAL;
822             }
823             tmp = tmp->next;
824         }
825
826         if (validate_config(mlist)) {
827             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", mlist->id, mlist->id);
828             return EINVAL;
829         }
830
831         if (mlist->independent) {
832             printlog(LOG_CRITICAL, "Makes no sense to have independent machine node - setting independent to false.\n");
833             mlist->independent = 0;
834         }
835
836         mlist = mlist->next;
837     }
838
839     instance->sets = malloc(configs.set_count * sizeof(identity_t *));
840     if (instance->sets == NULL) {
841         printlog(LOG_CRITICAL, "Not enough memory to allocate set identity collection.\n");
842         return ENOMEM;
843     }
844
845     memset(instance->sets, 0, configs.set_count * sizeof(identity_t *));
846     instance->set_count = configs.set_count;
847
848     /* For sets, make sure that the hierarchy is valid. */
849     while (slist) {
850         ident_member *members = slist->members;
851
852         /* ID must be non-zero and unique. */
853         if (slist->id < 0) {
854             printlog(LOG_CRITICAL, "Negative ident id: %d (%x) ?\n", slist->id, slist->id);
855             return EINVAL;
856         }
857         tmp = slist->next;
858         while (tmp) {
859             if (slist->id == tmp->id) {
860                 printlog(LOG_CRITICAL, "Duplicate ident id: %d (%x)\n", slist->id, slist->id);
861                 return EINVAL;
862             }
863             tmp = tmp->next;
864         }
865
866         if (validate_config(slist)) {
867             printlog(LOG_CRITICAL, "Invalid ident parameters for id: %d (%x)\n", slist->id, slist->id);
868             return EINVAL;
869         }
870
871         /* Allocate an identity_t for this set-type identity. */
872         instance->sets[i] = new_identity(slist);
873
874         if (instance->sets[i] == NULL) {
875             printlog(LOG_CRITICAL, "Not enough memory to allocate set identity.\n");
876             return ENOMEM;
877         }
878
879         /* Loop through children and look up each in leaf or ident map
880          * depending on the type of child.  Set the child's parent pointer
881          * to the identity we just created above, unless it is already set,
882          * in which case we have an error. */
883         while (members) {
884             identity_t *child_ident = NULL;
885             leaf_t *child_leaf = NULL;
886
887             switch (members->type) {
888                 case MEMBER_XID:
889                     child_leaf = map_search(instance->leaf_map, &members->value,
890                                             sizeof(members->value));
891                     if (child_leaf == NULL) {
892                         printlog(LOG_CRITICAL, "xid: child leaf not found.\n");
893                         return EINVAL;
894                     }
895                     if (child_leaf->parent != NULL) {
896                         /* Error - This leaf already has a parent. */
897                         printlog(LOG_CRITICAL, "xid: child already has a parent.\n");
898                         return EINVAL;
899                     }
900                     child_leaf->parent = instance->sets[i];
901                     break;
902                 case MEMBER_GUID:
903                     child_ident = map_search(instance->ident_map, &members->value,
904                                              sizeof(members->value));
905                     if (child_ident == NULL) {
906                         printlog(LOG_CRITICAL, "guid: child identity not found.\n");
907                         return EINVAL;
908                     }
909                     if (child_ident->parent != NULL) {
910                         /* Error - This identity already has a parent. */
911                         printlog(LOG_CRITICAL, "guid: child identity already has a parent.\n");
912                         return EINVAL;
913                     }
914                     child_ident->parent = instance->sets[i];
915                     break;
916                 default:
917                     /* Error - shouldn't be possible. */
918                     break;
919             }
920             members = members->next;
921         }
922
923         map_insert(instance->ident_map, &instance->sets[i]->id,
924                    sizeof(instance->sets[i]->id), instance->sets[i]);
925
926         slist = slist->next;
927         i++;
928     }
929     return 0;
930 }
931
932 static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
933     int count = 0;
934     identity_t *current_ident;
935     leaf_t *current_leaf;
936     leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
937     if (leaves == NULL) {
938         return 1;
939     }
940
941     map_reset_iterate(instance->leaf_map);
942     while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
943         current_ident = current_leaf->parent;
944         while (current_ident != NULL && current_ident != instance->last_machine) {
945             if (current_ident == ident) {
946                 /* Found the ident we were looking for - add the leaf. */
947                 leaves[count] = current_leaf;
948                 count += 1;
949                 break;
950             }
951             current_ident = current_ident->parent;
952         }
953     }
954
955     ident->leaves = leaves;
956     ident->leaf_count = count;
957
958     return 0;
959 }
960
961 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
962     int i, j;
963     ident_config *config = configs.machines;
964     leaf_t *leaf = NULL;
965
966     instance->cal = malloc(sizeof(struct ident_calendar) * SCHEDLEN);
967
968     if (instance->cal == NULL) {
969         return ENOMEM;
970     }
971
972     for (i = 0; i < SCHEDLEN; ++i) {
973         TAILQ_INIT(instance->cal + i);
974     }
975     instance->cal_slot = 0;
976
977     instance->machines = malloc(configs.machine_count * sizeof(drl_instance_t *));
978
979     if (instance->machines == NULL) {
980         return ENOMEM;
981     }
982
983     memset(instance->machines, 0, configs.machine_count * sizeof(drl_instance_t *));
984     instance->machine_count = configs.machine_count;
985
986     /* Allocate and add the machine identities. */
987     for (i = 0; i < configs.machine_count; ++i) {
988         identity_action *loop_action;
989         identity_action *comm_action;
990         instance->machines[i] = new_identity(config);
991
992         if (instance->machines[i] == NULL) {
993             return ENOMEM;
994         }
995
996         loop_action = malloc(sizeof(identity_action));
997         comm_action = malloc(sizeof(identity_action));
998
999         if (loop_action == NULL || comm_action == NULL) {
1000             return ENOMEM;
1001         }
1002
1003         /* The first has no parent - it is the root.  All others have the
1004          * previous ident as their parent. */
1005         if (i == 0) {
1006             instance->machines[i]->parent = NULL;
1007         } else {
1008             instance->machines[i]->parent = instance->machines[i - 1];
1009         }
1010
1011         instance->last_machine = instance->machines[i];
1012
1013         /* Add the ident to the guid->ident map. */
1014         map_insert(instance->ident_map, &instance->machines[i]->id,
1015                    sizeof(instance->machines[i]->id), instance->machines[i]);
1016
1017         config = config->next;
1018
1019         memset(loop_action, 0, sizeof(identity_action));
1020         memset(comm_action, 0, sizeof(identity_action));
1021         loop_action->ident = instance->machines[i];
1022         loop_action->action = ACTION_MAINLOOP;
1023         loop_action->valid = 1;
1024         comm_action->ident = instance->machines[i];
1025         comm_action->action = ACTION_COMMUNICATE;
1026         comm_action->valid = 1;
1027
1028         instance->machines[i]->loop_action = loop_action;
1029         instance->machines[i]->comm_action = comm_action;
1030
1031         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1032                           loop_action, calendar);
1033
1034         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1035                           comm_action, calendar);
1036
1037         /* Setup the array of pointers to leaves.  This is easy for machines
1038          * because a machine node applies to every leaf. */
1039         instance->machines[i]->leaves =
1040             malloc(instance->leaf_count * sizeof(leaf_t *));
1041         if (instance->machines[i]->leaves == NULL) {
1042             return ENOMEM;
1043         }
1044         instance->machines[i]->leaf_count = instance->leaf_count;
1045         for (j = 0; j < instance->leaf_count; ++j) {
1046             instance->machines[i]->leaves[j] = &instance->leaves[j];
1047         }
1048     }
1049
1050     /* Connect the set subtree to the machines. Any set or leaf without a
1051      * parent will take the last machine as its parent. */
1052
1053     /* Leaves... */
1054     map_reset_iterate(instance->leaf_map);
1055     while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1056         if (leaf->parent == NULL) {
1057             leaf->parent = instance->last_machine;
1058         }
1059     }
1060
1061     /* Sets... */
1062     for (i = 0; i < instance->set_count; ++i) {
1063         identity_action *loop_action;
1064         identity_action *comm_action;
1065
1066         if (instance->sets[i]->parent == NULL && instance->sets[i]->independent == 0) {
1067             instance->sets[i]->parent = instance->last_machine;
1068         }
1069
1070         loop_action = malloc(sizeof(identity_action));
1071         comm_action = malloc(sizeof(identity_action));
1072
1073         if (loop_action == NULL || comm_action == NULL) {
1074             return ENOMEM;
1075         }
1076
1077         memset(loop_action, 0, sizeof(identity_action));
1078         memset(comm_action, 0, sizeof(identity_action));
1079         loop_action->ident = instance->sets[i];
1080         loop_action->action = ACTION_MAINLOOP;
1081         loop_action->valid = 1;
1082         comm_action->ident = instance->sets[i];
1083         comm_action->action = ACTION_COMMUNICATE;
1084         comm_action->valid = 1;
1085
1086         instance->sets[i]->loop_action = loop_action;
1087         instance->sets[i]->comm_action = comm_action;
1088
1089         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1090                           loop_action, calendar);
1091
1092         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
1093                           comm_action, calendar);
1094
1095         /* Setup the array of pointers to leaves.  This is harder for sets,
1096          * but this doesn't need to be super-efficient because it happens
1097          * rarely and it isn't on the critical path for reconfig(). */
1098         if (fill_set_leaf_pointer(instance, instance->sets[i])) {
1099             return ENOMEM;
1100         }
1101     }
1102
1103     /* Success. */
1104     return 0;
1105 }
1106
1107 static void print_instance(drl_instance_t *instance) {
1108     leaf_t *leaf = NULL;
1109     identity_t *ident = NULL;
1110
1111     if (system_loglevel == LOG_DEBUG) {
1112         map_reset_iterate(instance->leaf_map);
1113         while ((leaf = (leaf_t *) map_next(instance->leaf_map))) {
1114             printf("%x:", leaf->xid);
1115             ident = leaf->parent;
1116             while (ident) {
1117                 printf("%d:",ident->id);
1118                 ident = ident->parent;
1119             }
1120             printf("Leaf's parent pointer is %p\n", leaf->parent);
1121         }
1122
1123         printf("instance->last_machine is %p\n", instance->last_machine);
1124     }
1125 }
1126
1127 static int assign_htb_hierarchy(drl_instance_t *instance) {
1128     int i, j;
1129     int next_node = 0x100;
1130
1131     /* If we're not going to create our own htb hierarchy (for instance,
1132      * if we're going to let PL's node manager do it for us), then we don't
1133      * want this function to do anything. */
1134     if (!create_htb.u.value) {
1135         printlog(LOG_DEBUG, "Skipping assign_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1136         return 0;
1137     }
1138
1139     /* Chain machine nodes under 1:10. */
1140     for (i = 0; i < instance->machine_count; ++i) {
1141         if (instance->machines[i]->parent == NULL) {
1142             /* Top node. */
1143             instance->machines[i]->htb_parent = 0x10;
1144         } else {
1145             /* Pointerific! */
1146             instance->machines[i]->htb_parent =
1147                 instance->machines[i]->parent->htb_node;
1148         }
1149
1150         instance->machines[i]->htb_node = next_node;
1151         next_node += 1;
1152     }
1153
1154     next_node += 0x10;
1155
1156     /* Add set nodes under machine nodes. Iterate backwards to ensure parent is
1157      * already there. */
1158     for (j = (instance->set_count - 1); j >= 0; --j) {
1159         if (instance->sets[j]->parent == NULL) {
1160             /* Independent node - goes under 0x10 away from machine nodes. */
1161             instance->sets[j]->htb_parent = 0x10;
1162         } else {
1163             instance->sets[j]->htb_parent = instance->sets[j]->parent->htb_node;
1164         }
1165         instance->sets[j]->htb_node = next_node;
1166
1167         next_node += 1;
1168     }
1169
1170     return 0;
1171 }
1172
1173 /* Added this so that I could comment one line and kill off all of the
1174  * command execution. */
1175 static inline int execute_cmd(const char *cmd) {
1176     return system(cmd);
1177 }
1178
1179 static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
1180                                const uint32_t classid_major, const uint32_t classid_minor,
1181                                const uint64_t rate, const uint64_t ceil) {
1182     char cmd[300];
1183
1184     sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
1185             iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
1186     printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
1187
1188     return execute_cmd(cmd);
1189 }
1190
1191 static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
1192                                 const uint32_t parent_minor, const uint32_t handle,
1193                                 const int loss, const int delay) {
1194     char cmd[300];
1195
1196     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1197             parent_minor, handle);
1198     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1199     if (execute_cmd(cmd))
1200         printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
1201
1202     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
1203             iface, parent_major, parent_minor, handle, loss, delay);
1204     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1205     return execute_cmd(cmd);
1206 }
1207
1208 static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
1209                                 const uint32_t parent_minor, const uint32_t handle,
1210                                 const int perturb) {
1211     char cmd[300];
1212
1213     sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
1214             parent_minor, handle);
1215     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1216     if (execute_cmd(cmd))
1217         printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
1218
1219     sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
1220             iface, parent_major, parent_minor, handle, perturb);
1221     printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
1222     return execute_cmd(cmd);
1223 }
1224
1225 static int create_htb_hierarchy(drl_instance_t *instance) {
1226     char cmd[300];
1227     int i, j, k;
1228     uint64_t gigabit = 1024 * 1024 * 1024;
1229
1230     /* If we're not going to create our own htb hierarchy (for instance,
1231      * if we're going to let PL's node manager do it for us), then we don't
1232      * want this function to do anything. */
1233     if (!create_htb.u.value) {
1234         printlog(LOG_DEBUG, "Skipping create_htb_hierarchy becase ulogd.conf's create_htb set to 0.\n");
1235         return 0;
1236     }
1237
1238     /* Nuke the hierarchy. */
1239     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
1240     execute_cmd(cmd);
1241     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1242
1243     /* Re-initialize the basics. */
1244     sprintf(cmd, "tc qdisc add dev eth0 root handle 1: htb default 1fff");
1245     if (execute_cmd(cmd)) {
1246         return 1;
1247     }
1248     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1249
1250     if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
1251         return 1;
1252
1253     /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
1254     if (limiter.nodelimit) {
1255         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
1256             return 1;
1257     } else {
1258         if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
1259             return 1;
1260     }
1261
1262     /* Add machines. */
1263     for (i = 0; i < instance->machine_count; ++i) {
1264         if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
1265                          instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
1266             return 1;
1267         }
1268     }
1269
1270 #define LIMITEXEMPT
1271
1272     /* Add back 1:20. */
1273 #ifdef LIMITEXEMPT
1274     if (instance->last_machine == NULL) {
1275         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1276     } else {
1277         sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:20 htb rate 8bit ceil 1000mbit",
1278             instance->last_machine->htb_node);
1279     }
1280 #else
1281     sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:20 htb rate 8bit ceil 1000mbit");
1282 #endif
1283
1284     if (execute_cmd(cmd)) {
1285         return 1;
1286     }
1287     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
1288
1289     /* Add sets. */
1290     for (j = (instance->set_count - 1); j >= 0; --j) {
1291         if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
1292                          instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
1293             return 1;
1294         }
1295     }
1296
1297     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
1298     for (k = 0; k < instance->leaf_count; ++k) {
1299         if (instance->leaves[k].parent == NULL) {
1300             if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1301                 return 1;
1302         } else {
1303             if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
1304                 return 1;
1305         }
1306
1307         /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
1308         if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
1309             return 1;
1310     }
1311
1312     /* Add 1:1000 and 1:2000 */
1313     if (instance->last_machine == NULL) {
1314         if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
1315             return 1;
1316     } else {
1317         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
1318             return 1;
1319     }
1320
1321     if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
1322         return 1;
1323
1324     /* Add 1:1fff and 1:2fff */
1325     if (instance->last_machine == NULL) {
1326         if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
1327             return 1;
1328     } else {
1329         if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
1330             return 1;
1331     }
1332
1333     if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
1334         return 1;
1335
1336     /* Artifical delay or loss for experimentation. */
1337     if (netem_delay.u.value || netem_loss.u.value) {
1338         if (!strcmp(netem_slice.u.string, "ALL")) {
1339             /* By default, netem applies to all leaves. */
1340             if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
1341                 return 1;
1342             if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
1343                 return 1;
1344
1345             for (k = 0; k < instance->leaf_count; ++k) {
1346                 if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
1347                             (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
1348                     return 1;
1349                 }
1350
1351                 //FIXME: add exempt delay/loss here on 0x2000 ... ?
1352             }
1353         } else {
1354             /* netem_slice is not the default ALL value.  Only apply netem
1355              * to the slice that is set in netem_slice.u.string. */
1356             uint32_t slice_xid;
1357
1358             sscanf(netem_slice.u.string, "%x", &slice_xid);
1359
1360             if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
1361                 return 1;
1362         }
1363     }
1364
1365     /* Turn on SFQ for experimentation. */
1366     if (strcmp(sfq_slice.u.string, "NONE")) {
1367         if (!strcmp(sfq_slice.u.string, "ALL")) {
1368             if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
1369                 return 1;
1370             if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
1371                 return 1;
1372
1373             for (k = 0; k < instance->leaf_count; ++k) {
1374                 if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
1375                             (0x1000 | instance->leaves[k].xid), 30)) {
1376                     return 1;
1377                 }
1378             }
1379         } else {
1380             uint32_t slice_xid;
1381
1382             sscanf(sfq_slice.u.string, "%x", &slice_xid);
1383
1384             if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
1385                 return 1;
1386         }
1387     }
1388
1389     return 0;
1390 }
1391
1392 static int setup_tc_grd(drl_instance_t *instance) {
1393     int i, j;
1394     char cmd[300];
1395
1396     for (i = 0; i < instance->leaf_count; ++i) {
1397         /* Delete the old pfifo qdisc that might have been there before. */
1398         sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
1399                 instance->leaves[i].xid, instance->leaves[i].xid);
1400
1401         if (execute_cmd(cmd)) {
1402             printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1403         }
1404
1405         /* Add the netem qdisc. */
1406         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
1407                 instance->leaves[i].xid, instance->leaves[i].xid);
1408
1409         if (execute_cmd(cmd)) {
1410             printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1411             return 1;
1412         }
1413     }
1414
1415     /* Do the same for 1000 and 1fff. */
1416     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
1417
1418     if (execute_cmd(cmd)) {
1419         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1420     }
1421
1422     /* Add the netem qdisc. */
1423     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
1424
1425     if (execute_cmd(cmd)) {
1426         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1427         return 1;
1428     }
1429
1430     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
1431
1432     if (execute_cmd(cmd)) {
1433         printlog(LOG_DEBUG, "GRD: pfifo qdisc wasn't there!\n");
1434     }
1435
1436     /* Add the netem qdisc. */
1437     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
1438
1439     if (execute_cmd(cmd)) {
1440         printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1441         return 1;
1442     }
1443
1444     /* Artifical delay or loss for experimentation. */
1445     if (netem_delay.u.value || netem_loss.u.value) {
1446         if (!strcmp(netem_slice.u.string, "ALL")) {
1447             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1448             if (execute_cmd(cmd)) {
1449                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1450                 return 1;
1451             }
1452
1453             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
1454             if (execute_cmd(cmd)) {
1455                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1456                 return 1;
1457             }
1458
1459             for (j = 0; j < instance->leaf_count; ++j) {
1460                 leaf_t *current = &instance->leaves[j];
1461
1462                 current->delay = netem_delay.u.value;
1463
1464                 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
1465
1466                 if (execute_cmd(cmd)) {
1467                     printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1468                     return 1;
1469                 }
1470             }
1471         } else {
1472             uint32_t slice_xid;
1473             leaf_t *leaf = NULL;
1474
1475             sscanf(netem_slice.u.string, "%x", &slice_xid);
1476
1477             leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
1478
1479             if (leaf == NULL) {
1480                 /* Leaf not found - invalid selection. */
1481                 printf("Your experimental setup is incorrect...\n");
1482                 return 1;
1483             }
1484
1485             leaf->delay = netem_delay.u.value;
1486
1487             sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
1488
1489             if (execute_cmd(cmd)) {
1490                 printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
1491                 return 1;
1492             }
1493         }
1494     }
1495
1496     return 0;
1497 }
1498
1499 /* init_drl 
1500  * 
1501  * Initialize this limiter with options 
1502  * Open UDP socket for peer communication
1503  */
1504 static int init_drl(void) {
1505     parsed_configs configs;
1506     struct sockaddr_in server_address;
1507     
1508     memset(&limiter, 0, sizeof(limiter_t));
1509
1510     /* Setup logging. */
1511     system_loglevel = (uint8_t) drl_loglevel.u.value;
1512     logfile = fopen(drl_logfile.u.string, "w");
1513
1514     if (logfile == NULL) {
1515         printf("Couldn't open logfile - ");
1516         perror("fopen()");
1517         exit(EXIT_FAILURE);
1518     }
1519
1520     printlog(LOG_CRITICAL, "ulogd_DRL initializing . . .\n");
1521
1522     limiter.nodelimit = (uint32_t) (((double) nodelimit.u.value * 1000000.0) / 8.0);
1523
1524     init_hashing();  /* for all hash maps */
1525
1526     pthread_rwlock_init(&limiter.limiter_lock,NULL);
1527
1528     /* determine our local IP by iterating through interfaces */
1529     if ((limiter.ip = get_local_ip())==0) {
1530         printlog(LOG_CRITICAL,
1531                  "ulogd_DRL unable to aquire local IP address, not registering.\n");
1532         return (false);
1533     }
1534     limiter.localaddr = inet_addr(limiter.ip);
1535     limiter.port = htons(LIMITER_LISTEN_PORT);
1536     limiter.udp_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
1537     if (limiter.udp_socket < 0) {
1538         printlog(LOG_CRITICAL, "Failed to create UDP socket().\n");
1539         return false;
1540     }
1541
1542     memset(&server_address, 0, sizeof(server_address));
1543     server_address.sin_family = AF_INET;
1544     server_address.sin_addr.s_addr = limiter.localaddr;
1545     server_address.sin_port = limiter.port;
1546
1547     if (bind(limiter.udp_socket, (struct sockaddr *) &server_address, sizeof(server_address)) < 0) {
1548         printlog(LOG_CRITICAL, "Failed to bind UDP socket.\n");
1549         return false;
1550     }
1551
1552     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
1553     if (strcasecmp(policy.u.string,"GRD") == 0) {
1554         limiter.policy = POLICY_GRD;
1555     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
1556         limiter.policy = POLICY_FPS;
1557     } else {
1558         printlog(LOG_CRITICAL,
1559                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
1560         return (false);
1561     }
1562
1563     limiter.estintms = estintms.u.value;
1564     if (limiter.estintms > 1000) {
1565         printlog(LOG_CRITICAL,
1566                  "DRL: sorry estimate intervals must be less than 1 second.");
1567         printlog(LOG_CRITICAL,
1568                  "  Simple source mods will allow larger intervals.  Using 1 second.\n");
1569         limiter.estintms = 1000;
1570     }
1571     printlog(LOG_WARN, "     Est interval: %dms\n",limiter.estintms);
1572     
1573     /* Acquire the big limiter lock for writing.  Prevents pretty much
1574      * anything else from happening while the hierarchy is being changed. */
1575     pthread_rwlock_wrlock(&limiter.limiter_lock);
1576
1577     limiter.stable_instance.ident_map = allocate_map();
1578     if (limiter.stable_instance.ident_map == NULL) {
1579         printlog(LOG_CRITICAL, "Failed to allocate memory for identity map.\n");
1580         return false;
1581     }
1582
1583     if (get_eligible_leaves(&limiter.stable_instance)) {
1584         printlog(LOG_CRITICAL, "Failed to read eligigle leaves.\n");
1585         return false;
1586     }
1587
1588     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1589         /* Parse error occured. Return non-zero to notify init_drl(). */
1590         printlog(LOG_CRITICAL, "Failed to parse the DRL configuration file (%s).\n",
1591             drl_configfile.u.string);
1592         return false;
1593     }
1594
1595     /* Validate identity hierarchy! */
1596     if (validate_configs(configs, &limiter.stable_instance)) {
1597         /* Clean up everything. */
1598         free_failed_config(configs, &limiter.stable_instance);
1599         printlog(LOG_CRITICAL, "Invalid DRL configuration file (%s).\n",
1600             drl_configfile.u.string);
1601         return false;
1602     }
1603
1604     if (init_identities(configs, &limiter.stable_instance)) {
1605         free_failed_config(configs, &limiter.stable_instance);
1606         printlog(LOG_CRITICAL, "Failed to initialize identities.\n");
1607         return false;
1608     }
1609
1610     /* At this point, we should be done with configs. */
1611     free_ident_list(configs.machines);
1612     free_ident_list(configs.sets);
1613
1614     print_instance(&limiter.stable_instance);
1615
1616     switch (limiter.policy) {
1617         case POLICY_FPS:
1618             if (assign_htb_hierarchy(&limiter.stable_instance)) {
1619                 free_instance(&limiter.stable_instance);
1620                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy.\n");
1621                 return false;
1622             }
1623
1624             if (create_htb_hierarchy(&limiter.stable_instance)) {
1625                 free_instance(&limiter.stable_instance);
1626                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy.\n");
1627                 return false;
1628             }
1629         break;
1630
1631         case POLICY_GRD:
1632             if (setup_tc_grd(&limiter.stable_instance)) {
1633                 free_instance(&limiter.stable_instance);
1634                 printlog(LOG_CRITICAL, "Failed to initialize tc calls for GRD.\n");
1635                 return false;
1636             }
1637         break;
1638
1639         default:
1640         return false;
1641     }
1642
1643     partition_set = partition.u.value;
1644
1645     pthread_rwlock_unlock(&limiter.limiter_lock);
1646
1647     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
1648         printlog(LOG_CRITICAL, "Unable to start UDP receive thread.\n");
1649         return false;
1650     }
1651
1652     printlog(LOG_WARN, "ulogd_DRL init finished.\n");
1653
1654     return true;
1655 }
1656
1657 static void reconfig() {
1658     parsed_configs configs;
1659
1660     printlog(LOG_DEBUG, "--Starting reconfig()--\n");
1661     flushlog();
1662
1663     memset(&configs, 0, sizeof(parsed_configs));
1664     memset(&limiter.new_instance, 0, sizeof(drl_instance_t));
1665
1666     limiter.new_instance.ident_map = allocate_map();
1667     if (limiter.new_instance.ident_map == NULL) {
1668         printlog(LOG_CRITICAL, "Failed to allocate ident_map during reconfig().\n");
1669         return;
1670     }
1671
1672     if (get_eligible_leaves(&limiter.new_instance)) {
1673         free_failed_config(configs, &limiter.new_instance);
1674         printlog(LOG_CRITICAL, "Failed to read leaves during reconfig().\n");
1675         return;
1676     }
1677
1678     if (parse_drl_config(drl_configfile.u.string, &configs)) {
1679         free_failed_config(configs, &limiter.new_instance);
1680         printlog(LOG_CRITICAL, "Failed to parse config during reconfig().\n");
1681         return;
1682     }
1683
1684     if (validate_configs(configs, &limiter.new_instance)) {
1685         free_failed_config(configs, &limiter.new_instance);
1686         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
1687         pthread_rwlock_unlock(&limiter.limiter_lock);
1688         return;
1689     }
1690
1691     if (init_identities(configs, &limiter.new_instance)) {
1692         free_failed_config(configs, &limiter.new_instance);
1693         printlog(LOG_CRITICAL, "Initialization failed during reconfig().\n");
1694         pthread_rwlock_unlock(&limiter.limiter_lock);
1695         return;
1696     }
1697
1698     free_ident_list(configs.machines);
1699     free_ident_list(configs.sets);
1700
1701     print_instance(&limiter.new_instance);
1702     
1703     /* Lock */
1704     pthread_rwlock_wrlock(&limiter.limiter_lock);
1705
1706     switch (limiter.policy) {
1707         case POLICY_FPS:
1708             if (assign_htb_hierarchy(&limiter.new_instance)) {
1709                 free_instance(&limiter.new_instance);
1710                 printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
1711                 pthread_rwlock_unlock(&limiter.limiter_lock);
1712                 return;
1713             }
1714
1715             if (create_htb_hierarchy(&limiter.new_instance)) {
1716                 free_instance(&limiter.new_instance);
1717                 printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
1718
1719                 /* Re-create old instance. */
1720                 if (create_htb_hierarchy(&limiter.stable_instance)) {
1721                     /* Error reinstating the old one - big problem. */
1722                     printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
1723                     printlog(LOG_CRITICAL, "Giving up...\n");
1724                     flushlog();
1725                     exit(EXIT_FAILURE);
1726                 }
1727
1728                 pthread_rwlock_unlock(&limiter.limiter_lock);
1729                 return;
1730             }
1731         break;
1732
1733         case POLICY_GRD:
1734             if (setup_tc_grd(&limiter.new_instance)) {
1735                 free_instance(&limiter.new_instance);
1736                 printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
1737
1738                 /* Try to re-create old instance. */
1739                 if (setup_tc_grd(&limiter.stable_instance)) {
1740                     printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
1741                     printlog(LOG_CRITICAL, "Giving up...\n");
1742                     flushlog();
1743                     exit(EXIT_FAILURE);
1744                 }
1745             }
1746         break;
1747
1748         default:
1749             /* Should be impossible. */
1750             printf("Pigs are flying?\n");
1751             exit(EXIT_FAILURE);
1752     }
1753
1754     /* Switch over new to stable instance. */
1755     free_instance(&limiter.stable_instance);
1756     memcpy(&limiter.stable_instance, &limiter.new_instance, sizeof(drl_instance_t));
1757
1758     /* Success! - Unlock */
1759     pthread_rwlock_unlock(&limiter.limiter_lock);
1760 }
1761
1762 static int stop_enforcement(drl_instance_t *instance) {
1763     char cmd[300];
1764     int i;
1765
1766     for (i = 0; i < instance->machine_count; ++i) {
1767         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1768                 instance->machines[i]->htb_parent,
1769                 instance->machines[i]->htb_node);
1770
1771         if (execute_cmd(cmd)) {
1772             return 1;
1773         }
1774     }
1775
1776     for (i = 0; i < instance->set_count; ++i) {
1777         sprintf(cmd, "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil 100mbit",
1778                 instance->sets[i]->htb_parent,
1779                 instance->sets[i]->htb_node);
1780
1781         if (execute_cmd(cmd)) {
1782             return 1;
1783         }
1784     }
1785
1786     return 0;
1787 }
1788
1789 static void *signal_thread_func(void *args) {
1790     int sig;
1791     int err;
1792     sigset_t sigs;
1793
1794     sigemptyset(&sigs);
1795     sigaddset(&sigs, SIGHUP);
1796     sigaddset(&sigs, SIGUSR1);
1797     sigaddset(&sigs, SIGUSR2);
1798     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
1799
1800     while (1) {
1801         sigemptyset(&sigs);
1802         sigaddset(&sigs, SIGHUP);
1803         sigaddset(&sigs, SIGUSR1);
1804         sigaddset(&sigs, SIGUSR2);
1805
1806         err = sigwait(&sigs, &sig);
1807
1808         if (err) {
1809             printlog(LOG_CRITICAL, "sigwait() returned an error.\n");
1810             flushlog();
1811         }
1812
1813         switch (sig) {
1814             case SIGHUP:
1815                 printlog(LOG_WARN, "Caught SIGHUP - re-reading XML file.\n");
1816                 reconfig();
1817                 //time_reconfig(1000); /* instrumentation */
1818                 flushlog();
1819                 break;
1820             case SIGUSR1:
1821                 pthread_rwlock_wrlock(&limiter.limiter_lock);
1822                 if (do_enforcement) {
1823                     do_enforcement = 0;
1824                     stop_enforcement(&limiter.stable_instance);
1825                     printlog(LOG_CRITICAL, "--Switching enforcement off.--\n");
1826                 } else {
1827                     do_enforcement = 1;
1828                     printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1829                 }
1830                 pthread_rwlock_unlock(&limiter.limiter_lock);
1831                 break;
1832             case SIGUSR2:
1833                 do_partition = !do_partition;
1834                 break;
1835             default:
1836                 /* Intentionally blank. */
1837                 break;
1838         }
1839     }
1840 }
1841
1842 static int drl_plugin_init() {
1843     sigset_t signal_mask;
1844
1845     sigemptyset(&signal_mask);
1846     sigaddset(&signal_mask, SIGHUP);
1847     sigaddset(&signal_mask, SIGUSR1);
1848     sigaddset(&signal_mask, SIGUSR2);
1849     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
1850
1851     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {
1852         printlog(LOG_CRITICAL, "Failed to create signal handling thread.\n");
1853         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1854         flushlog();
1855         exit(EXIT_FAILURE);
1856     }
1857
1858     if (!init_drl()) {
1859         printlog(LOG_CRITICAL, "Init failed. :(\n");
1860         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1861         flushlog();
1862         exit(EXIT_FAILURE);
1863     }
1864
1865     /* start up the thread that will periodically estimate the
1866      * local rate and set the local limits
1867      * see estimate.c
1868      */
1869     if (pthread_create(&estimate_thread, NULL, (void*(*)(void*)) &handle_estimation, &limiter)!=0) {
1870         printlog(LOG_CRITICAL, "Couldn't start estimate thread.\n");
1871         fprintf(stderr, "An error has occured starting ulogd_DRL.  Refer to your logfile (%s) for additional information.\n", drl_logfile.u.string);
1872         exit(EXIT_FAILURE);
1873     }
1874
1875     if (enforce_on.u.value) {
1876         pthread_rwlock_wrlock(&limiter.limiter_lock);
1877         do_enforcement = 1;
1878         printlog(LOG_CRITICAL, "--Switching enforcement on.--\n");
1879         pthread_rwlock_unlock(&limiter.limiter_lock);
1880     }
1881
1882     return 0;
1883 }
1884
1885 static ulog_output_t drl_op = {
1886     .name = "drl",
1887     .output = &_output_drl,
1888     .signal = NULL, /* This appears to be broken. Using my own handler. */
1889     .init = &drl_plugin_init,
1890     .fini = NULL,
1891 };
1892
1893 #if 0
1894 /* Tests the amount of time it takes to call reconfig(). */
1895 static void time_reconfig(int iterations) {
1896     struct timeval start, end;
1897     int i;
1898
1899     gettimeofday(&start, NULL);
1900     for (i = 0; i < iterations; ++i) {
1901         reconfig();
1902     }
1903     gettimeofday(&end, NULL);
1904
1905     printf("%d reconfigs() took %d seconds and %d microseconds.\n",
1906            iterations, end.tv_sec - start.tv_sec, end.tv_usec - start.tv_usec);
1907     exit(0);
1908
1909     // Seems to take about 85ms / iteration
1910 }
1911 #endif
1912
1913 /* register output plugin with ulogd */
1914 static void _drl_reg_op(void)
1915 {
1916     ulog_output_t *op = &drl_op;
1917     register_output(op);
1918 }
1919
1920 void _init(void)
1921 {
1922     /* have the opts parsed */
1923     config_parse_file("DRL", config_entries);
1924
1925     if (get_ids()) {
1926         ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1927         exit(2);
1928     }
1929
1930     /* Seed the hash function */
1931     salt = getpid() ^ time(NULL);
1932
1933     _drl_reg_op();
1934 }