e42d7a6f83d229630e1e75e90253f7da753471b8
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 #include <assert.h>
11
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
14
15 /* DRL specifics */
16 #include "raterouter.h" 
17 #include "util.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
19 #include "logging.h"
20
21 #define PRINT_COUNTER_RESET (0)
22
23 extern uint8_t system_loglevel;
24 static int printcounter = PRINT_COUNTER_RESET - 1;
25
26 uint8_t do_enforcement = 0;
27
28 /**
29  * Called for each identity each estimate interval.  Uses flow table information
30  * to estimate the current aggregate rate and the rate of the individual flows
31  * in the table.
32  */
33 static void estimate(identity_t *ident) {
34     struct timeval now;
35
36     gettimeofday(&now, NULL);
37
38     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
39
40     ident->table_update_function(ident->table, now, ident->ewma_weight);
41
42     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
43 }
44
45 /**
46  * Determines the FPS weight allocation when the identity is under its current
47  * local rate limit.
48  */
49 static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
50     uint32_t target = local_rate;
51     double ideal_weight;
52     double total_weight = peer_weights + ident->last_localweight;
53
54     if (ident->flowstart) {
55         target = local_rate*4;
56         if (local_rate >= FLOW_START_THRESHOLD) {
57             ident->flowstart = false;
58         }
59     }
60     else {
61         /* June 16, 2008 (KCW)
62          * ident->flowstart gets set initially to one, but it is never set again.  However,
63          * if a limiter gets flows and then the number of flows drops to zero, it has trouble
64          * increasing the limit again. */
65         if (local_rate < FLOW_START_THRESHOLD) {
66             ident->flowstart = true;
67         }
68     }
69
70     if (target >= ident->limit) {
71         ideal_weight = total_weight;
72     } else if (target <= 0) {
73         ideal_weight = 0; // no flows here
74     } else {
75         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
76     }
77
78 #if 0
79     else if (peer_weights <= 0) {
80 #if 0
81         // doesn't matter what we pick as our weight, so pick 1 / N.
82         ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
83 #endif
84         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
85     } else {
86 #if 0
87         double divisor = (double) ident->limit - (double) target;
88         ideal_weight = ((double) target * peer_weights) / divisor;
89 #else
90         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
91 #endif
92     }
93 #endif
94
95     return ideal_weight;
96 }
97
98 /**
99  * Determines the FPS weight allocation when the identity is over its current
100  * local rate limit.
101  */
102 static double allocate_fps_over_limit(identity_t *ident) {
103     double ideal_weight;
104
105     if (ident->common.max_flow_rate > 0) {
106         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
107
108         printlog(LOG_DEBUG, "%.3f  %d  %d  %d  FlowCount, Limit, MaxRate, TotalRate\n",
109                 ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate);
110     } else {
111         ideal_weight = 1;
112     }
113
114     return ideal_weight;
115 }
116
117 /**
118  * Determines the amount of FPS weight to allocate to the identity during each
119  * estimate interval.  Note that total_weight includes local weight.
120  */
121 static uint32_t allocate_fps(identity_t *ident, double total_weight) {
122     common_accounting_t *ftable = &ident->common; /* Common flow table info */
123     uint32_t local_rate = ftable->rate;
124     uint32_t ideallocal = 0;
125     double peer_weights; /* sum of weights of all other limiters */
126     double idealweight = 0;
127     double last_portion = 0;
128     double this_portion = 0;
129
130     static int dampen = 0;
131     int dampen_increase = 0;
132
133     double ideal_under = 0;
134     double ideal_over = 0;
135
136     int regime = 0;
137
138     /* two cases:
139        1. the aggregate is < limit
140        2. the aggregate is >= limit
141        */
142     peer_weights = total_weight - ident->last_localweight;
143     if (peer_weights < 0) {
144         peer_weights = 0;
145     }
146
147     if (dampen == 1) {
148         int64_t rate_delta =
149             (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
150         double threshold =
151             (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
152
153         if (rate_delta > threshold) {
154             dampen_increase = 1;
155             printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
156                      rate_delta, threshold);
157         }
158     }
159
160     if (local_rate <= 0) {
161         idealweight = 0;
162     } else if (dampen_increase == 0 &&
163                (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
164         /* We're under the limit - all flows are bottlenecked. */
165         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
166         ideal_over = allocate_fps_over_limit(ident);
167         ideal_under = idealweight;
168
169         if (ideal_over < idealweight) {
170             idealweight = ideal_over;
171             regime = 3;
172             dampen = 2;
173         } else {
174             regime = 1;
175             dampen = 0;
176         }
177
178         /* Apply EWMA */
179         ident->localweight = (ident->localweight * ident->ewma_weight +
180                               idealweight * (1 - ident->ewma_weight));
181         
182     } else {
183         idealweight = allocate_fps_over_limit(ident);
184         
185         /* Apply EWMA */
186         ident->localweight = (ident->localweight * ident->ewma_weight +
187                               idealweight * (1 - ident->ewma_weight));
188
189         /* This is the portion of the total weight in the system that was caused
190          * by this limiter in the last interval. */
191         last_portion = ident->last_localweight / total_weight;
192
193         /* This is the fraction of the total weight in the system that our
194          * proposed value for idealweight would use. */
195         this_portion = ident->localweight / (peer_weights + ident->localweight);
196
197         /* Dampen the large increase the first time... */
198         if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
199             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
200             dampen = 1;
201         } else {
202             dampen = 2;
203         }
204
205         ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
206         ideal_over = idealweight;
207
208         regime = 2;
209     }
210
211     /* Convert weight into a rate - add in our new local weight */
212     ident->total_weight = total_weight = ident->localweight + peer_weights;
213
214     /* compute local allocation:
215        if there is traffic elsewhere, use the weights
216        otherwise do a L/n allocation */
217     if (total_weight > 0) {
218     //if (peer_weights > 0) {
219         ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
220     } else {
221         ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
222     }
223
224     printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
225
226     printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
227             ideal_under / (ideal_under + peer_weights),
228             ideal_over / (ideal_over + peer_weights),
229             ident->localweight / (ident->localweight + peer_weights),
230             (double) local_rate / (double) ident->limit);
231
232     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
233
234     if (system_loglevel == LOG_DEBUG) {
235         printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
236             local_rate, idealweight, ident->localweight, total_weight);
237     }
238
239     if (printcounter <= 0) {
240         struct timeval tv;
241         double time_now;
242
243         gettimeofday(&tv, NULL);
244         time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
245
246         printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
247             idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
248             ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
249             ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
250
251         printcounter = PRINT_COUNTER_RESET;
252     } else {
253         printcounter -= 1;
254     }
255
256     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
257     //       dampen, dampen_increase, peer_weights, regime);
258
259     if (regime == 3) {
260         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
261                  ideal_over, ideal_under);
262     }
263
264     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
265
266     return(ideallocal);
267 }
268
269 /**
270  * Determines the local drop probability for a GRD identity every estimate
271  * interval.
272  */
273 static double allocate_grd(identity_t *ident, double aggdemand) {
274     double dropprob;
275     double global_limit = (double) (ident->limit);
276
277     if (aggdemand > global_limit) {
278         dropprob = (aggdemand-global_limit)/aggdemand;
279     } else {
280         dropprob = 0.0;
281     }
282     
283     if (system_loglevel == LOG_DEBUG) {
284         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
285            ident->common.rate, aggdemand, dropprob);
286     }
287
288     return dropprob;
289 }
290
291 /** 
292  * Given current estimates of local rate (weight) and remote rates (weights)
293  * use GRD or FPS to calculate a new local limit. 
294  */
295 static void allocate(limiter_t *limiter, identity_t *ident) {
296     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
297     double comm_val = 0;
298
299     /* Read comm_val from comm layer. */
300     if (limiter->policy == POLICY_FPS) {
301         read_comm(&ident->comm, &comm_val,
302                 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
303     } else {
304         read_comm(&ident->comm, &comm_val,
305                 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
306     }
307     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
308
309     /* Experimental printing. */
310     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
311              (double) ident->common.rate / (double) 128, ident->id);
312     ident->avg_bytes += ident->common.rate;
313     
314     if (limiter->policy == POLICY_FPS) {
315         ident->locallimit = allocate_fps(ident, comm_val);
316         ident->last_localweight = ident->localweight;
317         
318         /* Update other limiters with our weight by writing to comm layer. */
319         write_local_value(&ident->comm, ident->localweight);
320     } else {
321         ident->locallimit = 0; /* Unused with GRD. */
322         ident->last_drop_prob = ident->drop_prob;
323         ident->drop_prob = allocate_grd(ident, comm_val);
324         
325         /* Update other limiters with our rate by writing to comm layer. */
326         write_local_value(&ident->comm, ident->common.rate);
327     }
328
329     /* Update identity state. */
330     ident->common.last_rate = ident->common.rate;
331 }
332
333 /**
334  * Traces all of the parent pointers of a leaf all the way to the root in
335  * order to find the maximum drop probability in the chain.
336  */
337 static double find_leaf_drop_prob(leaf_t *leaf) {
338     identity_t *current = leaf->parent;
339     double result = 0;
340
341     assert(current);
342
343     while (current != NULL) {
344         if (current->drop_prob > result) {
345             result = current->drop_prob;
346         }
347         current = current->parent;
348     }
349
350     return result;
351 }
352
353 /**
354  * This is called once per estimate interval to enforce the rate that allocate
355  * has decided upon.  It makes calls to tc using system().
356  */
357 static void enforce(limiter_t *limiter, identity_t *ident) {
358     char cmd[CMD_BUFFER_SIZE];
359     int ret = 0;
360     int i = 0;
361
362     switch (limiter->policy) {
363         case POLICY_FPS:
364
365             /* TC treats limits of 0 (8bit) as unlimited, which causes the
366              * entire rate limiting system to become unpredictable.  In
367              * reality, we also don't want any limiter to be able to set its
368              * limit so low that it chokes all of the flows to the point that
369              * they can't increase.  Thus, when we're setting a low limit, we
370              * make sure that it isn't too low by using the
371              * FLOW_START_THRESHOLD. */
372
373             if (ident->locallimit < FLOW_START_THRESHOLD) {
374                 ident->locallimit = FLOW_START_THRESHOLD;
375             }
376
377             /* Do not allow the node to set a limit higher than its
378              * administratively assigned upper limit (bwcap). */
379             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
380                 ident->locallimit = limiter->nodelimit;
381             }
382
383             if (system_loglevel == LOG_DEBUG) {
384                 printf("FPS: Setting local limit to %d\n", ident->locallimit);
385             }
386             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
387
388             if (printcounter == PRINT_COUNTER_RESET) {
389                 printlog(LOG_WARN, "%d\n", ident->locallimit);
390             }
391
392             snprintf(cmd, CMD_BUFFER_SIZE,
393                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
394                      ident->htb_parent, ident->htb_node, ident->locallimit);
395
396             if (do_enforcement) {
397                 ret = system(cmd);
398
399                 if (ret) {
400                     /* FIXME: call failed.  What to do? */
401                     printlog(LOG_CRITICAL, "***TC call failed?***\n");
402                 }
403             }
404             break;
405
406         case POLICY_GRD:
407             for (i = 0; i < ident->leaf_count; ++i) {
408                 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
409                     /* The new drop probability for this identity is greater
410                      * than or equal to the leaf's current drop probability.
411                      * We can safely use the larger value at this leaf
412                      * immediately. */
413                     ident->leaves[i]->drop_prob = ident->drop_prob;
414                 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
415                     /* The old drop probability for this identity is less than
416                      * the leaf's current drop probability.  This means that
417                      * this identity couldn't have been the limiting ident,
418                      * so nothing needs to be done because the old limiting
419                      * ident is still the limiting factor. */
420
421                     /* Intentionally blank. */
422                 } else {
423                     /* If neither of the above are true, then...
424                      * 1) The new drop probability for the identity is less
425                      * than what it previously was, and
426                      * 2) This ident may have had the maximum drop probability
427                      * of all idents limiting this leaf, and therefore we need
428                      * to follow the leaf's parents up to the root to find the
429                      * new leaf drop probability safely. */
430                     ident->leaves[i]->drop_prob =
431                             find_leaf_drop_prob(ident->leaves[i]);
432                 }
433
434                 /* Make the call to tc. */
435 #ifdef DELAY40MS
436                 snprintf(cmd, CMD_BUFFER_SIZE,
437                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
438                          ident->leaves[i]->xid, ident->leaves[i]->xid,
439                          (100 * ident->leaves[i]->drop_prob));
440 #else
441                 snprintf(cmd, CMD_BUFFER_SIZE,
442                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
443                          ident->leaves[i]->xid, ident->leaves[i]->xid,
444                          (100 * ident->leaves[i]->drop_prob));
445 #endif
446                 if (do_enforcement) {
447                     ret = system(cmd);
448
449                     if (ret) {
450                         /* FIXME: call failed.  What to do? */
451                         printlog(LOG_CRITICAL, "***TC call failed?***\n");
452                     }
453                 }
454             }
455
456             break;
457
458         default: 
459             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
460             break;
461     }
462
463     return;
464 }
465
466 /**
467  * This function is periodically called to clean the stable instance's flow
468  * accounting tables for each identity.
469  */
470 static void clean(drl_instance_t *instance) {
471     identity_t *ident = NULL;
472
473     map_reset_iterate(instance->ident_map);
474     while ((ident = map_next(instance->ident_map)) != NULL) {
475         pthread_mutex_lock(&ident->table_mutex);
476
477         ident->table_cleanup_function(ident->table);
478
479         pthread_mutex_unlock(&ident->table_mutex);
480     }
481
482     /* Periodically flush the log file. */
483     flushlog();
484 }
485
486 static void print_averages(drl_instance_t *instance, int print_interval) {
487     identity_t *ident = NULL;
488
489     map_reset_iterate(instance->ident_map);
490     while ((ident = map_next(instance->ident_map)) != NULL) {
491         ident->avg_bytes /= (double) print_interval;
492         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
493         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
494                  ident->avg_bytes / 128, ident->id);
495         //printf("%.3f \t Avg rate. ID:%d\n",
496         //         ident->avg_bytes / 128, ident->id);
497         ident->avg_bytes = 0;
498     }
499 }
500
501 /** Thread function to handle local rate estimation.
502  *
503  * None of our simple hashmap functions are thread safe, so we lock the limiter
504  * with an rwlock to prevent another thread from attempting to modify the set
505  * of identities.
506  *
507  * Each identity also has a private lock for its table.  This gets locked by
508  * table-modifying functions such as estimate and clean.
509  */
510 void handle_estimation(void *arg) {
511     limiter_t *limiter = (limiter_t *) arg;
512     identity_t *ident = NULL;
513     int clean_timer, clean_wait_intervals;
514     useconds_t sleep_time = limiter->estintms * 1000;
515     uint32_t cal_slot = 0;
516     int print_interval = 1000 / (limiter->estintms);
517
518     sigset_t signal_mask;
519
520     sigemptyset(&signal_mask);
521     sigaddset(&signal_mask, SIGHUP);
522     sigaddset(&signal_mask, SIGUSR1);
523     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
524
525     /* Determine the number of intervals we should wait before hitting the
526      * specified clean interval. (Converts seconds -> intervals). */
527     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
528     clean_timer = clean_wait_intervals;
529
530     while (true) {
531         /* Sleep according to the delay of the estimate interval. */
532         usleep(sleep_time);
533
534         /* Grab the limiter lock for reading.  This prevents identities from
535          * disappearing beneath our feet. */
536         pthread_rwlock_rdlock(&limiter->limiter_lock);
537
538         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
539
540         /* Service all the identities that are scheduled to run during this
541          * tick. */
542         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
543             ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
544             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
545
546             /* Update the ident's flow accouting table with the latest info. */
547             estimate(ident);
548
549             /* Determine its share of the rate allocation. */
550             allocate(limiter, ident);
551
552             /* Make tc calls to enforce the rate we decided upon. */
553             enforce(limiter, ident);
554
555             /* Tell the comm library to propagate this identity's result for
556              * this interval.*/
557             send_update(&ident->comm, ident->id);
558
559             /* Add ident back to the queue at a future time slot. */
560             TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
561                               ((cal_slot + ident->intervals) & SCHEDMASK),
562                               ident, calendar);
563         }
564
565         print_interval--;
566         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
567             print_interval = 1000 / (limiter->estintms);
568             print_averages(&limiter->stable_instance, print_interval);
569         }
570
571         /* Check if enough intervals have passed for cleaning. */
572         if (clean_timer <= 0) {
573             clean(&limiter->stable_instance);
574             clean_timer = clean_wait_intervals;
575         } else {
576             clean_timer--;
577         }
578
579         limiter->stable_instance.cal_slot += 1;
580
581         pthread_rwlock_unlock(&limiter->limiter_lock); 
582     }
583 }