Long-outstanding commit. (Hopefully) Final version before running experiments for...
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 #include <assert.h>
11
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
14
15 /* DRL specifics */
16 #include "raterouter.h" 
17 #include "util.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
19 #include "calendar.h"
20 #include "logging.h"
21
22 extern uint8_t system_loglevel;
23
24 uint8_t do_enforcement = 0;
25
26 /**
27  * Called for each identity each estimate interval.  Uses flow table information
28  * to estimate the current aggregate rate and the rate of the individual flows
29  * in the table.
30  */
31 static void estimate(identity_t *ident, const double estintms) {
32     struct timeval now;
33     double time_difference;
34
35     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
36     
37     gettimeofday(&now, NULL);
38
39     time_difference = timeval_subtract(now, ident->common.last_update);
40
41     if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) {
42         printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
43                  estintms * ident->mainloop_intervals, time_difference * 1000);
44     }
45
46     ident->table_update_function(ident->table, now, ident->ewma_weight);
47
48 #ifdef SHADOW_ACCTING
49
50     standard_table_update_flows((standard_flow_table) ident->shadow_table, now,
51                                 ident->ewma_weight);
52
53 #endif
54
55     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
56 }
57
58 /**
59  * Determines the FPS weight allocation when the identity is under its current
60  * local rate limit.
61  */
62 static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) {
63     double ideal_weight;
64     double total_weight = peer_weights + ident->last_localweight;
65
66     if (target >= ident->limit) {
67         ideal_weight = total_weight;
68     } else if (target <= 0) {
69         ideal_weight = 0; // no flows here
70     } else {
71         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
72     }
73
74 #if 0
75     else if (peer_weights <= 0) {
76 #if 0
77         // doesn't matter what we pick as our weight, so pick 1 / N.
78         ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
79 #endif
80         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
81     } else {
82 #if 0
83         double divisor = (double) ident->limit - (double) target;
84         ideal_weight = ((double) target * peer_weights) / divisor;
85 #else
86         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
87 #endif
88     }
89 #endif
90
91     return ideal_weight;
92 }
93
94 /**
95  * Determines the FPS weight allocation when the identity is over its current
96  * local rate limit.
97  */
98 static double allocate_fps_over_limit(identity_t *ident) {
99     double ideal_weight;
100     double total_over_max;
101
102     if (ident->common.max_flow_rate > 0) {
103         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
104         total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate;
105
106         printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n",
107                  ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max);
108     } else {
109         ideal_weight = 1;
110     }
111
112     return ideal_weight;
113 }
114
115 /**
116  * When FPS checks to see which mode it should be operating in
117  * (over limit vs under limit), we don't want it to actually look to
118  * see if we're at the limit.  Instead, we want to see if we're getting
119  * close to the limit.  This defines how close is "close enough".
120  *
121  * For example, if the limit is 50000 and we're sending 49000, we probably
122  * want to be in the over limit mode, even if we aren't actually over the limit
123  * in order to switch to the more aggressive weight calculations.
124  */
125 static inline uint32_t close_enough(uint32_t limit) {
126     uint32_t difference = limit - (limit * CLOSE_ENOUGH);
127
128     if (difference < 10240) {
129         return (limit - 10240);
130     } else {
131         return (limit * CLOSE_ENOUGH);
132     }
133 }
134
135 static void print_statistics(identity_t *ident, const double ideal_weight,
136                              const double total_weight, const double localweight,
137                              const char *identifier, common_accounting_t *table,
138                              const uint32_t resulting_limit) {
139     struct timeval tv;
140     double time_now;
141
142     gettimeofday(&tv, NULL);
143     time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
144
145     printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ",
146              time_now, table->inst_rate, ideal_weight, localweight, total_weight,
147              table->num_flows, table->num_flows_5k, table->num_flows_10k,
148              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
149              table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit,
150              identifier, ident->id);
151
152     if (table->max_flow_rate > 0) {
153         printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate);
154     } else {
155         printlog(LOG_WARN, "0\n");
156     }
157
158     /* Print to the screen in debug mode. */
159     if (system_loglevel == LOG_DEBUG) {
160         printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n",
161                table->rate, ideal_weight, ident->localweight, total_weight);
162     }
163 }
164
165 static uint32_t allocate_fps(identity_t *ident, double total_weight,
166                              common_accounting_t *table, const char *identifier) {
167
168     uint32_t resulting_limit = 0;
169     double ideal_weight = 0.0;
170     double peer_weights = total_weight - ident->last_localweight;
171
172     /* Keep track of these for measurements & comparisons only. */
173     double ideal_under = 0.0;
174     double ideal_over = 0.0;
175
176     /* Weight sanity. */
177     if (peer_weights < 0.0) {
178         peer_weights = 0.0;
179     }
180
181     if (ident->dampen_state == DAMPEN_TEST) {
182         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
183         double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
184
185         if (rate_delta > threshold) {
186             ident->dampen_state = DAMPEN_PASSED;
187         } else {
188             ident->dampen_state = DAMPEN_FAILED;
189         }
190     }
191
192     /* Rate/weight sanity. */
193     if (table->rate <= 0) {
194         ideal_weight = 0.0;
195     }
196
197     /* Under the limit OR we failed our dampening test OR our current
198      * outgoing traffic rate is under the low "flowstart" watermark. */
199     else if (ident->dampen_state == DAMPEN_FAILED ||
200              table->rate < close_enough(ident->locallimit)) {
201 #if 0
202              || ident->flowstart) {
203         uint32_t target_rate = table->rate;
204
205         if (ident->flowstart) {
206             target_rate *= 4;
207
208             if (table->rate >= FLOW_START_THRESHOLD) {
209                 ident->flowstart = false;
210             }
211         } else {
212             /* June 16, 2008 (KCW)
213              * ident->flowstart gets set initially to one, but it is never set again.  However,
214              * if a limiter gets flows and then the number of flows drops to zero, it has trouble
215              * increasing the limit again. */
216             if (table->rate < FLOW_START_THRESHOLD) {
217                 ident->flowstart = true;
218             }
219         }
220         Old flowstart code.
221 #endif
222         //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate);
223
224         /* Boost low-limits so that they have room to grow. */
225         if (table->rate < FLOW_START_THRESHOLD) {
226             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
227         } else {
228             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
229         }
230
231         ideal_over = allocate_fps_over_limit(ident);
232
233         if (ideal_over < ideal_under) {
234             /* Degenerate case in which the agressive weight calculation was
235              * actually less than the under-the-limit case.  Use it instead
236              * and skip the dampening check in the next interval. */
237             ideal_weight = ideal_over;
238             ident->dampen_state = DAMPEN_SKIP;
239         } else {
240             ident->dampen_state = DAMPEN_NONE;
241         }
242
243         /* Apply EWMA. */
244         ident->localweight = (ident->localweight * ident->ewma_weight +
245                               ideal_weight * (1 - ident->ewma_weight));
246     }
247
248     /* At or over the limit.  Use the aggressive weight calculation. */
249     else {
250         double portion_last_interval = 0.0;
251         double portion_this_interval = 0.0;
252
253         ideal_weight = ideal_over = allocate_fps_over_limit(ident);
254         ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
255
256         /* Apply EWMA. */
257         ident->localweight = (ident->localweight * ident->ewma_weight +
258                               ideal_weight * (1 - ident->ewma_weight));
259
260         /* Now check whether the result of the aggressive weight calculation
261          * increases our portion of the weight "too much", in which case we
262          * dampen it. */
263
264         /* Our portion of weight in the whole system during the last interval.*/
265         portion_last_interval = ident->last_localweight / total_weight;
266
267         /* Our proposed portion of weight for the current interval. */
268         portion_this_interval = ident->localweight / (peer_weights + ident->localweight);
269
270         if (ident->dampen_state == DAMPEN_NONE &&
271             (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
272             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
273             ident->dampen_state = DAMPEN_TEST;
274         } else {
275             ident->dampen_state = DAMPEN_SKIP;
276         }
277     }
278
279     /* Add the weight calculated in this interval to the total. */
280     ident->total_weight = total_weight = ident->localweight + peer_weights;
281
282     /* Convert weight value into a rate limit.  If there is no measureable
283      * weight, do a L/n allocation. */
284     if (total_weight > 0) {
285         resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
286     } else {
287         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
288     }
289
290     print_statistics(ident, ideal_weight, total_weight, ident->localweight,
291                      identifier, table, resulting_limit);
292
293     return resulting_limit;
294 }
295
296 #ifdef SHADOW_ACCTING
297
298 /* Runs through the allocate functionality without making any state changes to
299  * the identity.  Useful for comparisons, especially for comparing standard
300  * and sample&hold accounting schemes. */
301 static void allocate_fps_pretend(identity_t *ident, double total_weight,
302                                  common_accounting_t *table, const char *identifier) {
303
304     uint32_t resulting_limit = 0;
305     double ideal_weight = 0.0;
306     double peer_weights = total_weight - ident->last_localweight_copy;
307     double ideal_under = 0.0;
308     double ideal_over = 0.0;
309
310     if (peer_weights < 0.0) {
311         peer_weights = 0.0;
312     }
313
314     if (ident->dampen_state_copy == DAMPEN_TEST) {
315         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
316         double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
317
318         if (rate_delta > threshold) {
319             ident->dampen_state_copy = DAMPEN_PASSED;
320         } else {
321             ident->dampen_state_copy = DAMPEN_FAILED;
322         }
323     }
324
325     /* Rate/weight sanity. */
326     if (table->rate <= 0) {
327         ideal_weight = 0.0;
328     }
329
330     /* Under the limit OR we failed our dampening test OR our current
331      * outgoing traffic rate is under the low "flowstart" watermark. */
332     else if (ident->dampen_state_copy == DAMPEN_FAILED ||
333              table->rate < close_enough(ident->locallimit)) {
334
335         /* Boost low-limits so that they have room to grow. */
336         if (table->rate < FLOW_START_THRESHOLD) {
337             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
338         } else {
339             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
340         }
341
342         ideal_over = allocate_fps_over_limit(ident);
343
344         if (ideal_over < ideal_under) {
345             /* Degenerate case in which the agressive weight calculation was
346              * actually less than the under-the-limit case.  Use it instead
347              * and skip the dampening check in the next interval. */
348             ideal_weight = ideal_over;
349             ident->dampen_state_copy = DAMPEN_SKIP;
350         } else {
351             ident->dampen_state_copy = DAMPEN_NONE;
352         }
353
354         /* Apply EWMA. */
355         ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
356                               ideal_weight * (1 - ident->ewma_weight));
357     }
358
359     /* At or over the limit.  Use the aggressive weight calculation. */
360     else {
361         double portion_last_interval = 0.0;
362         double portion_this_interval = 0.0;
363
364         ideal_weight = ideal_over = allocate_fps_over_limit(ident);
365         ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
366
367         /* Apply EWMA. */
368         ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
369                               ideal_weight * (1 - ident->ewma_weight));
370
371         /* Now check whether the result of the aggressive weight calculation
372          * increases our portion of the weight "too much", in which case we
373          * dampen it. */
374
375         /* Our portion of weight in the whole system during the last interval.*/
376         portion_last_interval = ident->last_localweight / total_weight;
377
378         /* Our proposed portion of weight for the current interval. */
379         portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy);
380
381         if (ident->dampen_state_copy == DAMPEN_NONE &&
382             (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
383             ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
384             ident->dampen_state_copy = DAMPEN_TEST;
385         } else {
386             ident->dampen_state_copy = DAMPEN_SKIP;
387         }
388     }
389
390     /* Add the weight calculated in this interval to the total. */
391     total_weight = ident->localweight_copy + peer_weights;
392
393     /* Convert weight value into a rate limit.  If there is no measureable
394      * weight, do a L/n allocation. */
395     if (total_weight > 0) {
396         resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
397     } else {
398         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
399     }
400
401     print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy,
402                      identifier, table, resulting_limit);
403 }
404
405 #endif
406
407 /**
408  * Determines the amount of FPS weight to allocate to the identity during each
409  * estimate interval.  Note that total_weight includes local weight.
410  */
411 static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
412     common_accounting_t *ftable = &ident->common; /* Common flow table info */
413     uint32_t local_rate = ftable->rate;
414     uint32_t ideallocal = 0;
415     double peer_weights; /* sum of weights of all other limiters */
416     double idealweight = 0;
417     double last_portion = 0;
418     double this_portion = 0;
419
420     static int dampen = 0;
421     int dampen_increase = 0;
422
423     double ideal_under = 0;
424     double ideal_over = 0;
425
426     int regime = 0;
427
428     /* two cases:
429        1. the aggregate is < limit
430        2. the aggregate is >= limit
431        */
432     peer_weights = total_weight - ident->last_localweight;
433     if (peer_weights < 0) {
434         peer_weights = 0;
435     }
436
437     if (dampen == 1) {
438         int64_t rate_delta =
439             (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
440         double threshold =
441             (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
442
443         if (rate_delta > threshold) {
444             dampen_increase = 1;
445             printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
446                      rate_delta, threshold);
447         }
448     }
449
450     if (local_rate <= 0) {
451         idealweight = 0;
452     } else if (dampen_increase == 0 &&
453                (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
454         /* We're under the limit - all flows are bottlenecked. */
455         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
456         ideal_over = allocate_fps_over_limit(ident);
457         ideal_under = idealweight;
458
459         if (ideal_over < idealweight) {
460             idealweight = ideal_over;
461             regime = 3;
462             dampen = 2;
463         } else {
464             regime = 1;
465             dampen = 0;
466         }
467
468         /* Apply EWMA */
469         ident->localweight = (ident->localweight * ident->ewma_weight +
470                               idealweight * (1 - ident->ewma_weight));
471         
472     } else {
473         idealweight = allocate_fps_over_limit(ident);
474         
475         /* Apply EWMA */
476         ident->localweight = (ident->localweight * ident->ewma_weight +
477                               idealweight * (1 - ident->ewma_weight));
478
479         /* This is the portion of the total weight in the system that was caused
480          * by this limiter in the last interval. */
481         last_portion = ident->last_localweight / total_weight;
482
483         /* This is the fraction of the total weight in the system that our
484          * proposed value for idealweight would use. */
485         this_portion = ident->localweight / (peer_weights + ident->localweight);
486
487         /* Dampen the large increase the first time... */
488         if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
489             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
490             dampen = 1;
491         } else {
492             dampen = 2;
493         }
494
495         ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
496         ideal_over = idealweight;
497
498         regime = 2;
499     }
500
501     /* Convert weight into a rate - add in our new local weight */
502     ident->total_weight = total_weight = ident->localweight + peer_weights;
503
504     /* compute local allocation:
505        if there is traffic elsewhere, use the weights
506        otherwise do a L/n allocation */
507     if (total_weight > 0) {
508     //if (peer_weights > 0) {
509         ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
510     } else {
511         ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
512     }
513
514     printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
515
516     printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
517             ideal_under / (ideal_under + peer_weights),
518             ideal_over / (ideal_over + peer_weights),
519             ident->localweight / (ident->localweight + peer_weights),
520             (double) local_rate / (double) ident->limit);
521
522     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
523
524     if (system_loglevel == LOG_DEBUG) {
525         printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
526             local_rate, idealweight, ident->localweight, total_weight);
527     }
528
529 #if 0
530     if (printcounter <= 0) {
531         struct timeval tv;
532         double time_now;
533
534         gettimeofday(&tv, NULL);
535         time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
536
537         printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
538             idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
539             ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
540             ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
541
542         printcounter = PRINT_COUNTER_RESET;
543     } else {
544         printcounter -= 1;
545     }
546
547     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
548     //       dampen, dampen_increase, peer_weights, regime);
549
550     if (regime == 3) {
551         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
552                  ideal_over, ideal_under);
553     }
554     See print_statistics()
555 #endif
556
557     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
558
559     return(ideallocal);
560 }
561
562 /**
563  * Determines the local drop probability for a GRD identity every estimate
564  * interval.
565  */
566 static double allocate_grd(identity_t *ident, double aggdemand) {
567     double dropprob;
568     double global_limit = ident->limit;
569     double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
570
571     struct timeval tv;
572     double time_now;
573     common_accounting_t *table = &ident->common;
574
575     gettimeofday(&tv, NULL);
576     time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
577
578     if (aggdemand > global_limit) {
579         dropprob = (aggdemand-global_limit)/aggdemand;
580     } else {
581         dropprob = 0.0;
582     }
583
584     if (dropprob > 0.01 && dropprob < min_dropprob) {
585         dropprob = min_dropprob;
586     }
587
588     if (system_loglevel == LOG_DEBUG) {
589         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
590            ident->common.rate, aggdemand, dropprob);
591     }
592
593     if (table->max_flow_rate > 0) {
594         printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n",
595              time_now, table->inst_rate, aggdemand,
596              table->num_flows, table->num_flows_5k, table->num_flows_10k,
597              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
598              table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
599              ident->id, (double) table->rate / (double) table->max_flow_rate);
600     } else {
601         printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n",
602              time_now, table->inst_rate, aggdemand,
603              table->num_flows, table->num_flows_5k, table->num_flows_10k,
604              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
605              table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
606              ident->id);
607     }
608
609     return dropprob;
610 }
611
612 /** 
613  * Given current estimates of local rate (weight) and remote rates (weights)
614  * use GRD or FPS to calculate a new local limit. 
615  */
616 static void allocate(limiter_t *limiter, identity_t *ident) {
617     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
618     double comm_val = 0;
619
620     /* Read comm_val from comm layer. */
621     if (limiter->policy == POLICY_FPS) {
622         read_comm(&ident->comm, &comm_val,
623                 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
624     } else {
625         read_comm(&ident->comm, &comm_val,
626                 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
627     }
628     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
629
630     /* Experimental printing. */
631     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
632              (double) ident->common.rate / (double) 128, ident->id);
633     ident->avg_bytes += ident->common.rate;
634     
635     if (limiter->policy == POLICY_FPS) {
636 #ifdef SHADOW_ACCTING
637
638         allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
639
640         ident->last_localweight_copy = ident->localweight_copy;
641 #endif
642
643         ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
644         ident->last_localweight = ident->localweight;
645
646         /* Update other limiters with our weight by writing to comm layer. */
647         write_local_value(&ident->comm, ident->localweight);
648     } else {
649         ident->last_drop_prob = ident->drop_prob;
650         ident->drop_prob = allocate_grd(ident, comm_val);
651         
652         /* Update other limiters with our rate by writing to comm layer. */
653         write_local_value(&ident->comm, ident->common.rate);
654     }
655
656     /* Update identity state. */
657     ident->common.last_rate = ident->common.rate;
658 }
659
660 /**
661  * Traces all of the parent pointers of a leaf all the way to the root in
662  * order to find the maximum drop probability in the chain.
663  */
664 static double find_leaf_drop_prob(leaf_t *leaf) {
665     identity_t *current = leaf->parent;
666     double result = 0;
667
668     assert(current);
669
670     while (current != NULL) {
671         if (current->drop_prob > result) {
672             result = current->drop_prob;
673         }
674         current = current->parent;
675     }
676
677     return result;
678 }
679
680 /**
681  * This is called once per estimate interval to enforce the rate that allocate
682  * has decided upon.  It makes calls to tc using system().
683  */
684 static void enforce(limiter_t *limiter, identity_t *ident) {
685     char cmd[CMD_BUFFER_SIZE];
686     int ret = 0;
687     int i = 0;
688
689     switch (limiter->policy) {
690         case POLICY_FPS:
691
692             /* TC treats limits of 0 (8bit) as unlimited, which causes the
693              * entire rate limiting system to become unpredictable.  In
694              * reality, we also don't want any limiter to be able to set its
695              * limit so low that it chokes all of the flows to the point that
696              * they can't increase.  Thus, when we're setting a low limit, we
697              * make sure that it isn't too low by using the
698              * FLOW_START_THRESHOLD. */
699
700             if (ident->locallimit < FLOW_START_THRESHOLD) {
701                 ident->locallimit = FLOW_START_THRESHOLD;
702             }
703
704             /* Do not allow the node to set a limit higher than its
705              * administratively assigned upper limit (bwcap). */
706             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
707                 ident->locallimit = limiter->nodelimit;
708             }
709
710             if (system_loglevel == LOG_DEBUG) {
711                 printf("FPS: Setting local limit to %d\n", ident->locallimit);
712             }
713             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
714
715 #if 0
716             if (printcounter == PRINT_COUNTER_RESET) {
717                 if (ident->common.max_flow_rate > 0) {
718                     printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id,
719                              (double) ident->common.rate / (double) ident->common.max_flow_rate);
720                 } else {
721                     printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id);
722                 }
723             }
724             This is now done in print_statistics()
725 #endif
726
727             snprintf(cmd, CMD_BUFFER_SIZE,
728                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
729                      ident->htb_parent, ident->htb_node, ident->locallimit);
730
731             if (do_enforcement) {
732                 ret = system(cmd);
733
734                 if (ret) {
735                     /* FIXME: call failed.  What to do? */
736                     printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd);
737                 }
738             }
739             break;
740
741         case POLICY_GRD:
742             for (i = 0; i < ident->leaf_count; ++i) {
743                 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
744                     /* The new drop probability for this identity is greater
745                      * than or equal to the leaf's current drop probability.
746                      * We can safely use the larger value at this leaf
747                      * immediately. */
748                     ident->leaves[i]->drop_prob = ident->drop_prob;
749                 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
750                     /* The old drop probability for this identity is less than
751                      * the leaf's current drop probability.  This means that
752                      * this identity couldn't have been the limiting ident,
753                      * so nothing needs to be done because the old limiting
754                      * ident is still the limiting factor. */
755
756                     /* Intentionally blank. */
757                 } else {
758                     /* If neither of the above are true, then...
759                      * 1) The new drop probability for the identity is less
760                      * than what it previously was, and
761                      * 2) This ident may have had the maximum drop probability
762                      * of all idents limiting this leaf, and therefore we need
763                      * to follow the leaf's parents up to the root to find the
764                      * new leaf drop probability safely. */
765                     ident->leaves[i]->drop_prob =
766                             find_leaf_drop_prob(ident->leaves[i]);
767                 }
768
769                 /* Make the call to tc. */
770                 snprintf(cmd, CMD_BUFFER_SIZE,
771                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms",
772                          ident->leaves[i]->xid, ident->leaves[i]->xid,
773                          (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay);
774
775                 if (do_enforcement) {
776                     ret = system(cmd);
777
778                     if (ret) {
779                         /* FIXME: call failed.  What to do? */
780                         printlog(LOG_CRITICAL, "***TC call failed?***\n");
781                     }
782                 }
783             }
784
785             break;
786
787         default: 
788             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
789             break;
790     }
791
792     return;
793 }
794
795 /**
796  * This function is periodically called to clean the stable instance's flow
797  * accounting tables for each identity.
798  */
799 static void clean(drl_instance_t *instance) {
800     identity_t *ident = NULL;
801
802     map_reset_iterate(instance->ident_map);
803     while ((ident = map_next(instance->ident_map)) != NULL) {
804         pthread_mutex_lock(&ident->table_mutex);
805
806         ident->table_cleanup_function(ident->table);
807
808 #ifdef SHADOW_ACCTING
809
810         standard_table_cleanup((standard_flow_table) ident->shadow_table);
811
812 #endif
813
814         pthread_mutex_unlock(&ident->table_mutex);
815     }
816
817     /* Periodically flush the log file. */
818     flushlog();
819 }
820
821 static void print_averages(drl_instance_t *instance, int print_interval) {
822     identity_t *ident = NULL;
823
824     map_reset_iterate(instance->ident_map);
825     while ((ident = map_next(instance->ident_map)) != NULL) {
826         ident->avg_bytes /= (double) print_interval;
827         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
828         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
829                  ident->avg_bytes / 128, ident->id);
830         //printf("%.3f \t Avg rate. ID:%d\n",
831         //         ident->avg_bytes / 128, ident->id);
832         ident->avg_bytes = 0;
833     }
834 }
835
836 /** Thread function to handle local rate estimation.
837  *
838  * None of our simple hashmap functions are thread safe, so we lock the limiter
839  * with an rwlock to prevent another thread from attempting to modify the set
840  * of identities.
841  *
842  * Each identity also has a private lock for its table.  This gets locked by
843  * table-modifying functions such as estimate and clean. It's also locked in
844  * ulogd_DRL.c when the table is being updated with new packets.
845  */
846 void handle_estimation(void *arg) {
847     limiter_t *limiter = (limiter_t *) arg;
848     int clean_timer, clean_wait_intervals;
849     useconds_t sleep_time = limiter->estintms * 1000;
850     uint32_t cal_slot = 0;
851     int print_interval = 1000 / (limiter->estintms);
852
853     sigset_t signal_mask;
854
855     sigemptyset(&signal_mask);
856     sigaddset(&signal_mask, SIGHUP);
857     sigaddset(&signal_mask, SIGUSR1);
858     sigaddset(&signal_mask, SIGUSR2);
859     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
860
861     /* Determine the number of intervals we should wait before hitting the
862      * specified clean interval. (Converts seconds -> intervals). */
863     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
864     clean_timer = clean_wait_intervals;
865
866     while (true) {
867         printlog(LOG_DEBUG, "--Beginning new tick.--\n");
868
869         /* Sleep according to the delay of the estimate interval. */
870         usleep(sleep_time);
871
872         /* Grab the limiter lock for reading.  This prevents identities from
873          * disappearing beneath our feet. */
874         pthread_rwlock_rdlock(&limiter->limiter_lock);
875
876         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
877
878         /* Service all the identities that are scheduled to run during this
879          * tick. */
880         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
881             identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
882             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar);
883
884             /* Only execute the action if it is valid. */
885             if (iaction->valid == 0) {
886                 free(iaction);
887                 continue;
888             }
889
890             switch (iaction->action) {
891                 case ACTION_MAINLOOP:
892
893                     printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id);
894
895                     /* Update the ident's flow accouting table with the latest info. */
896                     estimate(iaction->ident, limiter->estintms);
897
898                     /* Determine its share of the rate allocation. */
899                     allocate(limiter, iaction->ident);
900
901                     /* Make tc calls to enforce the rate we decided upon. */
902                     enforce(limiter, iaction->ident);
903
904                     /* Add ident back to the queue at a future time slot. */
905                     TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
906                             ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK),
907                             iaction, calendar);
908                     break;
909
910                 case ACTION_COMMUNICATE:
911
912                     printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id);
913
914                     /* Tell the comm library to propagate this identity's result for
915                      * this interval.*/
916                     send_update(&iaction->ident->comm, iaction->ident->id);
917
918                     /* Add ident back to the queue at a future time slot. */
919                     TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
920                             ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK),
921                             iaction, calendar);
922                 break;
923
924                 default:
925                     printlog(LOG_CRITICAL, "Unknown identity action!?!\n");
926                     exit(EXIT_FAILURE);
927             }
928         }
929
930         print_interval--;
931         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
932             print_interval = 1000 / (limiter->estintms);
933             print_averages(&limiter->stable_instance, print_interval);
934         }
935
936         /* Check if enough intervals have passed for cleaning. */
937         if (clean_timer <= 0) {
938             clean(&limiter->stable_instance);
939             clean_timer = clean_wait_intervals;
940         } else {
941             clean_timer--;
942         }
943
944         limiter->stable_instance.cal_slot += 1;
945
946         pthread_rwlock_unlock(&limiter->limiter_lock); 
947     }
948 }