24adbbf8d81d5b7743983a57be94782d8d26557f
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 #include <assert.h>
11
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
14
15 /* DRL specifics */
16 #include "raterouter.h" 
17 #include "util.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
19 #include "calendar.h"
20 #include "logging.h"
21
22 extern uint8_t system_loglevel;
23
24 uint8_t do_enforcement = 0;
25
26 /**
27  * Called for each identity each estimate interval.  Uses flow table information
28  * to estimate the current aggregate rate and the rate of the individual flows
29  * in the table.
30  */
31 static void estimate(identity_t *ident, const double estintms) {
32     struct timeval now;
33     double time_difference;
34
35     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
36     
37     gettimeofday(&now, NULL);
38
39     time_difference = timeval_subtract(now, ident->common.last_update);
40
41     if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) {
42         printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
43                  estintms * ident->mainloop_intervals, time_difference * 1000);
44     }
45
46     ident->table_update_function(ident->table, now, ident->ewma_weight);
47
48 #ifdef SHADOW_ACCTING
49
50     standard_table_update_flows((standard_flow_table) ident->shadow_table, now,
51                                 ident->ewma_weight);
52
53 #endif
54
55     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
56 }
57
58 /**
59  * Determines the FPS weight allocation when the identity is under its current
60  * local rate limit.
61  */
62 static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) {
63     double ideal_weight;
64     double total_weight = peer_weights + ident->last_localweight;
65
66     if (target >= ident->effective_limit) {
67         ideal_weight = total_weight;
68     } else if (target <= 0) {
69         ideal_weight = 0; // no flows here
70     } else {
71         ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight;
72     }
73
74     return ideal_weight;
75 }
76
77 /**
78  * Determines the FPS weight allocation when the identity is over its current
79  * local rate limit.
80  */
81 static double allocate_fps_over_limit(identity_t *ident) {
82     double ideal_weight;
83     double total_over_max;
84
85     if (ident->common.max_flow_rate > 0) {
86         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
87         total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate;
88
89         printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n",
90                  ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max);
91     } else {
92         ideal_weight = 1;
93     }
94
95     return ideal_weight;
96 }
97
98 /**
99  * When FPS checks to see which mode it should be operating in
100  * (over limit vs under limit), we don't want it to actually look to
101  * see if we're at the limit.  Instead, we want to see if we're getting
102  * close to the limit.  This defines how close is "close enough".
103  *
104  * For example, if the limit is 50000 and we're sending 49000, we probably
105  * want to be in the over limit mode, even if we aren't actually over the limit
106  * in order to switch to the more aggressive weight calculations.
107  */
108 static inline uint32_t close_enough(uint32_t limit) {
109     uint32_t difference = limit - (limit * CLOSE_ENOUGH);
110
111     if (difference < 10240) {
112         return (limit - 10240);
113     } else {
114         return (limit * CLOSE_ENOUGH);
115     }
116 }
117
118 static void print_statistics(identity_t *ident, const double ideal_weight,
119                              const double total_weight, const double localweight,
120                              const char *identifier, common_accounting_t *table,
121                              const uint32_t resulting_limit) {
122     struct timeval tv;
123     double time_now;
124
125     gettimeofday(&tv, NULL);
126     time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
127
128     printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ",
129              time_now, table->inst_rate, ideal_weight, localweight, total_weight,
130              table->num_flows, table->num_flows_5k, table->num_flows_10k,
131              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
132              table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit,
133              identifier, ident->id);
134
135     if (table->max_flow_rate > 0) {
136         printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate);
137     } else {
138         printlog(LOG_WARN, "0\n");
139     }
140
141     /* Print to the screen in debug mode. */
142     if (system_loglevel == LOG_DEBUG) {
143         printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n",
144                table->rate, ideal_weight, ident->localweight, total_weight);
145     }
146 }
147
148 static uint32_t allocate_fps(identity_t *ident, double total_weight,
149                              common_accounting_t *table, const char *identifier) {
150
151     uint32_t resulting_limit = 0;
152     double ideal_weight = 0.0;
153     double peer_weights = total_weight - ident->last_localweight;
154
155     /* Keep track of these for measurements & comparisons only. */
156     double ideal_under = 0.0;
157     double ideal_over = 0.0;
158
159     /* Weight sanity. */
160     if (peer_weights < 0.0) {
161         peer_weights = 0.0;
162     }
163
164     if (ident->dampen_state == DAMPEN_TEST) {
165         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
166         double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
167
168         if (rate_delta > threshold) {
169             ident->dampen_state = DAMPEN_PASSED;
170         } else {
171             ident->dampen_state = DAMPEN_FAILED;
172         }
173     }
174
175     /* Rate/weight sanity. */
176     if (table->rate <= 0) {
177         ideal_weight = 0.0;
178     }
179
180     /* Under the limit OR we failed our dampening test OR our current
181      * outgoing traffic rate is under the low "flowstart" watermark. */
182     else if (ident->dampen_state == DAMPEN_FAILED ||
183              table->rate < close_enough(ident->locallimit)) {
184 #if 0
185              || ident->flowstart) {
186         uint32_t target_rate = table->rate;
187
188         if (ident->flowstart) {
189             target_rate *= 4;
190
191             if (table->rate >= FLOW_START_THRESHOLD) {
192                 ident->flowstart = false;
193             }
194         } else {
195             /* June 16, 2008 (KCW)
196              * ident->flowstart gets set initially to one, but it is never set again.  However,
197              * if a limiter gets flows and then the number of flows drops to zero, it has trouble
198              * increasing the limit again. */
199             if (table->rate < FLOW_START_THRESHOLD) {
200                 ident->flowstart = true;
201             }
202         }
203         Old flowstart code.
204 #endif
205         //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate);
206
207         /* Boost low-limits so that they have room to grow. */
208         if (table->rate < FLOW_START_THRESHOLD) {
209             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
210         } else {
211             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
212         }
213
214         ideal_over = allocate_fps_over_limit(ident);
215
216         if (ideal_over < ideal_under) {
217             /* Degenerate case in which the agressive weight calculation was
218              * actually less than the under-the-limit case.  Use it instead
219              * and skip the dampening check in the next interval. */
220             ideal_weight = ideal_over;
221             ident->dampen_state = DAMPEN_SKIP;
222         } else {
223             ident->dampen_state = DAMPEN_NONE;
224         }
225
226         /* Apply EWMA. */
227         ident->localweight = (ident->localweight * ident->ewma_weight +
228                               ideal_weight * (1 - ident->ewma_weight));
229     }
230
231     /* At or over the limit.  Use the aggressive weight calculation. */
232     else {
233         double portion_last_interval = 0.0;
234         double portion_this_interval = 0.0;
235
236         ideal_weight = ideal_over = allocate_fps_over_limit(ident);
237         ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
238
239         /* Apply EWMA. */
240         ident->localweight = (ident->localweight * ident->ewma_weight +
241                               ideal_weight * (1 - ident->ewma_weight));
242
243         /* Now check whether the result of the aggressive weight calculation
244          * increases our portion of the weight "too much", in which case we
245          * dampen it. */
246
247         /* Our portion of weight in the whole system during the last interval.*/
248         portion_last_interval = ident->last_localweight / total_weight;
249
250         /* Our proposed portion of weight for the current interval. */
251         portion_this_interval = ident->localweight / (peer_weights + ident->localweight);
252
253         if (ident->dampen_state == DAMPEN_NONE &&
254             (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
255             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
256             ident->dampen_state = DAMPEN_TEST;
257         } else {
258             ident->dampen_state = DAMPEN_SKIP;
259         }
260     }
261
262     /* Add the weight calculated in this interval to the total. */
263     ident->total_weight = total_weight = ident->localweight + peer_weights;
264
265     /* Convert weight value into a rate limit.  If there is no measureable
266      * weight, do a L/n allocation. */
267     if (total_weight > 0) {
268         resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight);
269     } else {
270         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
271     }
272
273     print_statistics(ident, ideal_weight, total_weight, ident->localweight,
274                      identifier, table, resulting_limit);
275
276     return resulting_limit;
277 }
278
279 #ifdef SHADOW_ACCTING
280
281 /* Runs through the allocate functionality without making any state changes to
282  * the identity.  Useful for comparisons, especially for comparing standard
283  * and sample&hold accounting schemes. */
284 static void allocate_fps_pretend(identity_t *ident, double total_weight,
285                                  common_accounting_t *table, const char *identifier) {
286
287     uint32_t resulting_limit = 0;
288     double ideal_weight = 0.0;
289     double peer_weights = total_weight - ident->last_localweight_copy;
290     double ideal_under = 0.0;
291     double ideal_over = 0.0;
292
293     if (peer_weights < 0.0) {
294         peer_weights = 0.0;
295     }
296
297     if (ident->dampen_state_copy == DAMPEN_TEST) {
298         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
299         double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
300
301         if (rate_delta > threshold) {
302             ident->dampen_state_copy = DAMPEN_PASSED;
303         } else {
304             ident->dampen_state_copy = DAMPEN_FAILED;
305         }
306     }
307
308     /* Rate/weight sanity. */
309     if (table->rate <= 0) {
310         ideal_weight = 0.0;
311     }
312
313     /* Under the limit OR we failed our dampening test OR our current
314      * outgoing traffic rate is under the low "flowstart" watermark. */
315     else if (ident->dampen_state_copy == DAMPEN_FAILED ||
316              table->rate < close_enough(ident->locallimit)) {
317
318         /* Boost low-limits so that they have room to grow. */
319         if (table->rate < FLOW_START_THRESHOLD) {
320             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
321         } else {
322             ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
323         }
324
325         ideal_over = allocate_fps_over_limit(ident);
326
327         if (ideal_over < ideal_under) {
328             /* Degenerate case in which the agressive weight calculation was
329              * actually less than the under-the-limit case.  Use it instead
330              * and skip the dampening check in the next interval. */
331             ideal_weight = ideal_over;
332             ident->dampen_state_copy = DAMPEN_SKIP;
333         } else {
334             ident->dampen_state_copy = DAMPEN_NONE;
335         }
336
337         /* Apply EWMA. */
338         ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
339                               ideal_weight * (1 - ident->ewma_weight));
340     }
341
342     /* At or over the limit.  Use the aggressive weight calculation. */
343     else {
344         double portion_last_interval = 0.0;
345         double portion_this_interval = 0.0;
346
347         ideal_weight = ideal_over = allocate_fps_over_limit(ident);
348         ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
349
350         /* Apply EWMA. */
351         ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
352                               ideal_weight * (1 - ident->ewma_weight));
353
354         /* Now check whether the result of the aggressive weight calculation
355          * increases our portion of the weight "too much", in which case we
356          * dampen it. */
357
358         /* Our portion of weight in the whole system during the last interval.*/
359         portion_last_interval = ident->last_localweight / total_weight;
360
361         /* Our proposed portion of weight for the current interval. */
362         portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy);
363
364         if (ident->dampen_state_copy == DAMPEN_NONE &&
365             (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
366             ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
367             ident->dampen_state_copy = DAMPEN_TEST;
368         } else {
369             ident->dampen_state_copy = DAMPEN_SKIP;
370         }
371     }
372
373     /* Add the weight calculated in this interval to the total. */
374     total_weight = ident->localweight_copy + peer_weights;
375
376     /* Convert weight value into a rate limit.  If there is no measureable
377      * weight, do a L/n allocation. */
378     if (total_weight > 0) {
379         resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight);
380     } else {
381         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
382     }
383
384     print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy,
385                      identifier, table, resulting_limit);
386 }
387
388 #endif
389
390 /**
391  * Determines the local drop probability for a GRD identity every estimate
392  * interval.
393  */
394 static double allocate_grd(identity_t *ident, double aggdemand) {
395     double dropprob;
396     double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
397
398     struct timeval tv;
399     double time_now;
400     common_accounting_t *table = &ident->common;
401
402     gettimeofday(&tv, NULL);
403     time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
404
405     if (aggdemand > ident->effective_limit) {
406         dropprob = (aggdemand - ident->effective_limit) / aggdemand;
407     } else {
408         dropprob = 0.0;
409     }
410
411     if (dropprob > 0.01 && dropprob < min_dropprob) {
412         dropprob = min_dropprob;
413     }
414
415     if (system_loglevel == LOG_DEBUG) {
416         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
417            ident->common.rate, aggdemand, dropprob);
418     }
419
420     if (table->max_flow_rate > 0) {
421         printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n",
422              time_now, table->inst_rate, aggdemand,
423              table->num_flows, table->num_flows_5k, table->num_flows_10k,
424              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
425              table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
426              ident->id, (double) table->rate / (double) table->max_flow_rate);
427     } else {
428         printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n",
429              time_now, table->inst_rate, aggdemand,
430              table->num_flows, table->num_flows_5k, table->num_flows_10k,
431              table->num_flows_20k, table->num_flows_50k, table->avg_rate,
432              table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
433              ident->id);
434     }
435
436     return dropprob;
437 }
438
439 /** 
440  * Given current estimates of local rate (weight) and remote rates (weights)
441  * use GRD or FPS to calculate a new local limit. 
442  */
443 static void allocate(limiter_t *limiter, identity_t *ident) {
444     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
445     double aggregate = 0;
446
447     /* Read aggregate from comm layer. */
448     read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit);
449     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate);
450     
451     /* Experimental printing. */
452     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
453              (double) ident->common.rate / (double) 128, ident->id);
454     ident->avg_bytes += ident->common.rate;
455     
456     if (limiter->policy == POLICY_FPS) {
457 #ifdef SHADOW_ACCTING
458
459         allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID");
460
461         ident->last_localweight_copy = ident->localweight_copy;
462 #endif
463
464         ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID");
465         ident->last_localweight = ident->localweight;
466
467         /* Update other limiters with our weight by writing to comm layer. */
468         write_local_value(&ident->comm, ident->localweight);
469     } else {
470         ident->last_drop_prob = ident->drop_prob;
471         ident->drop_prob = allocate_grd(ident, aggregate);
472         
473         /* Update other limiters with our rate by writing to comm layer. */
474         write_local_value(&ident->comm, ident->common.rate);
475     }
476
477     /* Update identity state. */
478     ident->common.last_rate = ident->common.rate;
479 }
480
481 /**
482  * Traces all of the parent pointers of a leaf all the way to the root in
483  * order to find the maximum drop probability in the chain.
484  */
485 static double find_leaf_drop_prob(leaf_t *leaf) {
486     identity_t *current = leaf->parent;
487     double result = 0;
488
489     assert(current);
490
491     while (current != NULL) {
492         if (current->drop_prob > result) {
493             result = current->drop_prob;
494         }
495         current = current->parent;
496     }
497
498     return result;
499 }
500
501 /**
502  * This is called once per estimate interval to enforce the rate that allocate
503  * has decided upon.  It makes calls to tc using system().
504  */
505 static void enforce(limiter_t *limiter, identity_t *ident) {
506     char cmd[CMD_BUFFER_SIZE];
507     int ret = 0;
508     int i = 0;
509
510     switch (limiter->policy) {
511         case POLICY_FPS:
512
513             /* TC treats limits of 0 (8bit) as unlimited, which causes the
514              * entire rate limiting system to become unpredictable.  In
515              * reality, we also don't want any limiter to be able to set its
516              * limit so low that it chokes all of the flows to the point that
517              * they can't increase.  Thus, when we're setting a low limit, we
518              * make sure that it isn't too low by using the
519              * FLOW_START_THRESHOLD. */
520
521             if (ident->locallimit < FLOW_START_THRESHOLD) {
522                 ident->locallimit = FLOW_START_THRESHOLD;
523             }
524
525             /* Do not allow the node to set a limit higher than its
526              * administratively assigned upper limit (bwcap). */
527             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
528                 ident->locallimit = limiter->nodelimit;
529             }
530
531             if (system_loglevel == LOG_DEBUG) {
532                 printf("FPS: Setting local limit to %d\n", ident->locallimit);
533             }
534             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
535
536 #if 0
537             if (printcounter == PRINT_COUNTER_RESET) {
538                 if (ident->common.max_flow_rate > 0) {
539                     printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id,
540                              (double) ident->common.rate / (double) ident->common.max_flow_rate);
541                 } else {
542                     printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id);
543                 }
544             }
545             This is now done in print_statistics()
546 #endif
547
548             snprintf(cmd, CMD_BUFFER_SIZE,
549                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
550                      ident->htb_parent, ident->htb_node, ident->locallimit);
551
552             if (do_enforcement) {
553                 ret = system(cmd);
554
555                 if (ret) {
556                     /* FIXME: call failed.  What to do? */
557                     printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd);
558                 }
559             }
560             break;
561
562         case POLICY_GRD:
563             for (i = 0; i < ident->leaf_count; ++i) {
564                 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
565                     /* The new drop probability for this identity is greater
566                      * than or equal to the leaf's current drop probability.
567                      * We can safely use the larger value at this leaf
568                      * immediately. */
569                     ident->leaves[i]->drop_prob = ident->drop_prob;
570                 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
571                     /* The old drop probability for this identity is less than
572                      * the leaf's current drop probability.  This means that
573                      * this identity couldn't have been the limiting ident,
574                      * so nothing needs to be done because the old limiting
575                      * ident is still the limiting factor. */
576
577                     /* Intentionally blank. */
578                 } else {
579                     /* If neither of the above are true, then...
580                      * 1) The new drop probability for the identity is less
581                      * than what it previously was, and
582                      * 2) This ident may have had the maximum drop probability
583                      * of all idents limiting this leaf, and therefore we need
584                      * to follow the leaf's parents up to the root to find the
585                      * new leaf drop probability safely. */
586                     ident->leaves[i]->drop_prob =
587                             find_leaf_drop_prob(ident->leaves[i]);
588                 }
589
590                 /* Make the call to tc. */
591                 snprintf(cmd, CMD_BUFFER_SIZE,
592                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms",
593                          ident->leaves[i]->xid, ident->leaves[i]->xid,
594                          (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay);
595
596                 if (do_enforcement) {
597                     ret = system(cmd);
598
599                     if (ret) {
600                         /* FIXME: call failed.  What to do? */
601                         printlog(LOG_CRITICAL, "***TC call failed?***\n");
602                     }
603                 }
604             }
605
606             break;
607
608         default: 
609             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
610             break;
611     }
612
613     return;
614 }
615
616 /**
617  * This function is periodically called to clean the stable instance's flow
618  * accounting tables for each identity.
619  */
620 static void clean(drl_instance_t *instance) {
621     identity_t *ident = NULL;
622
623     map_reset_iterate(instance->ident_map);
624     while ((ident = map_next(instance->ident_map)) != NULL) {
625         pthread_mutex_lock(&ident->table_mutex);
626
627         ident->table_cleanup_function(ident->table);
628
629 #ifdef SHADOW_ACCTING
630
631         standard_table_cleanup((standard_flow_table) ident->shadow_table);
632
633 #endif
634
635         pthread_mutex_unlock(&ident->table_mutex);
636     }
637
638     /* Periodically flush the log file. */
639     flushlog();
640 }
641
642 static void print_averages(drl_instance_t *instance, int print_interval) {
643     identity_t *ident = NULL;
644
645     map_reset_iterate(instance->ident_map);
646     while ((ident = map_next(instance->ident_map)) != NULL) {
647         ident->avg_bytes /= (double) print_interval;
648         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
649         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
650                  ident->avg_bytes / 128, ident->id);
651         //printf("%.3f \t Avg rate. ID:%d\n",
652         //         ident->avg_bytes / 128, ident->id);
653         ident->avg_bytes = 0;
654     }
655 }
656
657 /** Thread function to handle local rate estimation.
658  *
659  * None of our simple hashmap functions are thread safe, so we lock the limiter
660  * with an rwlock to prevent another thread from attempting to modify the set
661  * of identities.
662  *
663  * Each identity also has a private lock for its table.  This gets locked by
664  * table-modifying functions such as estimate and clean. It's also locked in
665  * ulogd_DRL.c when the table is being updated with new packets.
666  */
667 void handle_estimation(void *arg) {
668     limiter_t *limiter = (limiter_t *) arg;
669     int clean_timer, clean_wait_intervals;
670     useconds_t sleep_time = limiter->estintms * 1000;
671     uint32_t cal_slot = 0;
672     int print_interval = 1000 / (limiter->estintms);
673
674     sigset_t signal_mask;
675
676     sigemptyset(&signal_mask);
677     sigaddset(&signal_mask, SIGHUP);
678     sigaddset(&signal_mask, SIGUSR1);
679     sigaddset(&signal_mask, SIGUSR2);
680     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
681
682     /* Determine the number of intervals we should wait before hitting the
683      * specified clean interval. (Converts seconds -> intervals). */
684     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
685     clean_timer = clean_wait_intervals;
686
687     while (true) {
688         printlog(LOG_DEBUG, "--Beginning new tick.--\n");
689
690         /* Sleep according to the delay of the estimate interval. */
691         usleep(sleep_time);
692
693         /* Grab the limiter lock for reading.  This prevents identities from
694          * disappearing beneath our feet. */
695         pthread_rwlock_rdlock(&limiter->limiter_lock);
696
697         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
698
699         /* Service all the identities that are scheduled to run during this
700          * tick. */
701         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
702             identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
703             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar);
704
705             /* Only execute the action if it is valid. */
706             if (iaction->valid == 0) {
707                 free(iaction);
708                 continue;
709             }
710
711             switch (iaction->action) {
712                 case ACTION_MAINLOOP:
713
714                     printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id);
715
716                     /* Update the ident's flow accouting table with the latest info. */
717                     estimate(iaction->ident, limiter->estintms);
718
719                     /* Determine its share of the rate allocation. */
720                     allocate(limiter, iaction->ident);
721
722                     /* Make tc calls to enforce the rate we decided upon. */
723                     enforce(limiter, iaction->ident);
724
725                     /* Add ident back to the queue at a future time slot. */
726                     TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
727                             ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK),
728                             iaction, calendar);
729                     break;
730
731                 case ACTION_COMMUNICATE:
732
733                     printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id);
734
735                     /* Tell the comm library to propagate this identity's result for
736                      * this interval.*/
737                     send_update(&iaction->ident->comm, iaction->ident->id);
738
739                     /* Add ident back to the queue at a future time slot. */
740                     TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
741                             ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK),
742                             iaction, calendar);
743                 break;
744
745                 default:
746                     printlog(LOG_CRITICAL, "Unknown identity action!?!\n");
747                     exit(EXIT_FAILURE);
748             }
749         }
750
751         print_interval--;
752         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
753             print_interval = 1000 / (limiter->estintms);
754             print_averages(&limiter->stable_instance, print_interval);
755         }
756
757         /* Check if enough intervals have passed for cleaning. */
758         if (clean_timer <= 0) {
759             clean(&limiter->stable_instance);
760             clean_timer = clean_wait_intervals;
761         } else {
762             clean_timer--;
763         }
764
765         limiter->stable_instance.cal_slot += 1;
766
767         pthread_rwlock_unlock(&limiter->limiter_lock); 
768     }
769 }