17a0fd89b5b69c646d1fbf193be1b5ae1c686e35
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 #include <assert.h>
11
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
14
15 /* DRL specifics */
16 #include "raterouter.h" 
17 #include "util.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
19 #include "logging.h"
20
21 extern uint8_t system_loglevel;
22 static int printcounter = 8;
23
24 /**
25  * Called for each identity each estimate interval.  Uses flow table information
26  * to estimate the current aggregate rate and the rate of the individual flows
27  * in the table.
28  */
29 static void estimate(identity_t *ident) {
30     struct timeval now;
31
32     gettimeofday(&now, NULL);
33
34     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
35
36     ident->table_update_function(ident->table, now, ident->ewma_weight);
37
38     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
39 }
40
41 /**
42  * Determines the FPS weight allocation when the identity is under its current
43  * local rate limit.
44  */
45 static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
46     uint32_t target = local_rate;
47     double ideal_weight;
48     double total_weight = peer_weights + ident->last_localweight;
49
50     if (ident->flowstart) {
51         target = local_rate*4;
52         if (local_rate >= FLOW_START_THRESHOLD) {
53             ident->flowstart = false;
54         }
55     }
56     else {
57         /* June 16, 2008 (KCW)
58          * ident->flowstart gets set initially to one, but it is never set again.  However,
59          * if a limiter gets flows and then the number of flows drops to zero, it has trouble
60          * increasing the limit again. */
61         if (local_rate < FLOW_START_THRESHOLD) {
62             ident->flowstart = true;
63         }
64     }
65
66     if (target >= ident->limit) {
67         ideal_weight = total_weight;
68     } else if (target <= 0) {
69         ideal_weight = 0; // no flows here
70     } else {
71         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
72     }
73
74 #if 0
75     else if (peer_weights <= 0) {
76 #if 0
77         // doesn't matter what we pick as our weight, so pick 1 / N.
78         ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
79 #endif
80         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
81     } else {
82 #if 0
83         double divisor = (double) ident->limit - (double) target;
84         ideal_weight = ((double) target * peer_weights) / divisor;
85 #else
86         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
87 #endif
88     }
89 #endif
90
91     return ideal_weight;
92 }
93
94 /**
95  * Determines the FPS weight allocation when the identity is over its current
96  * local rate limit.
97  */
98 static double allocate_fps_over_limit(identity_t *ident) {
99     double ideal_weight;
100
101     if (ident->common.max_flow_rate > 0) {
102         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
103
104         printlog(LOG_DEBUG, "%.3f  %d  %d  FlowCount, TotalRate, MaxRate\n",
105                 ideal_weight, ident->common.rate, ident->common.max_flow_rate);
106     } else {
107         ideal_weight = 1;
108     }
109
110     return ideal_weight;
111 }
112
113 /**
114  * Determines the amount of FPS weight to allocate to the identity during each
115  * estimate interval.  Note that total_weight includes local weight.
116  */
117 static uint32_t allocate_fps(identity_t *ident, double total_weight) {
118     common_accounting_t *ftable = &ident->common; /* Common flow table info */
119     uint32_t local_rate = ftable->rate;
120     uint32_t ideallocal = 0;
121     double peer_weights; /* sum of weights of all other limiters  */
122     double idealweight = 0;
123     double last_portion = 0;
124     double this_portion = 0;
125
126     static int dampen = 0;
127     int dampen_increase = 0;
128
129     double ideal_under = 0;
130     double ideal_over = 0;
131
132     int regime = 0;
133
134     /* two cases:
135        1. the aggregate is < limit
136        2. the aggregate is >= limit
137        */
138     peer_weights = total_weight - ident->last_localweight;
139     if (peer_weights < 0) {
140         peer_weights = 0;
141     }
142
143     if (dampen == 1) {
144         int64_t rate_delta =
145             (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
146         double threshold =
147             (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
148
149         if (rate_delta > threshold) {
150             dampen_increase = 1;
151             printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
152                      rate_delta, threshold);
153         }
154     }
155
156     if (local_rate <= 0) {
157         idealweight = 0;
158     } else if (dampen_increase == 0 &&
159                (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
160         /* We're under the limit - all flows are bottlenecked. */
161         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
162         ideal_over = allocate_fps_over_limit(ident);
163         ideal_under = idealweight;
164
165         if (ideal_over < idealweight) {
166             idealweight = ideal_over;
167             regime = 3;
168             dampen = 2;
169         } else {
170             regime = 1;
171             dampen = 0;
172         }
173
174         /* Apply EWMA */
175         ident->localweight = (ident->localweight * ident->ewma_weight +
176                               idealweight * (1 - ident->ewma_weight));
177         
178     } else {
179         idealweight = allocate_fps_over_limit(ident);
180         
181         /* Apply EWMA */
182         ident->localweight = (ident->localweight * ident->ewma_weight +
183                               idealweight * (1 - ident->ewma_weight));
184
185         /* This is the portion of the total weight in the system that was caused
186          * by this limiter in the last interval. */
187         last_portion = ident->last_localweight / total_weight;
188
189         /* This is the fraction of the total weight in the system that our
190          * proposed value for idealweight would use. */
191         this_portion = ident->localweight / (peer_weights + ident->localweight);
192
193         /* Dampen the large increase the first time... */
194         if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
195             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
196             dampen = 1;
197         } else {
198             dampen = 2;
199         }
200
201         ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
202         ideal_over = idealweight;
203
204         regime = 2;
205     }
206
207     /* Convert weight into a rate - add in our new local weight */
208     ident->total_weight = total_weight = ident->localweight + peer_weights;
209
210     /* compute local allocation:
211        if there is traffic elsewhere, use the weights
212        otherwise do a L/n allocation */
213     if (total_weight > 0) {
214     //if (peer_weights > 0) {
215         ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
216     } else {
217         ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
218     }
219
220     printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
221
222     printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
223             ideal_under / (ideal_under + peer_weights),
224             ideal_over / (ideal_over + peer_weights),
225             ident->localweight / (ident->localweight + peer_weights),
226             (double) local_rate / (double) ident->limit);
227
228     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
229
230     if (system_loglevel == LOG_DEBUG) {
231         printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
232             local_rate, idealweight, ident->localweight, total_weight);
233     }
234
235     if (printcounter <= 0) {
236         printlog(LOG_WARN, "%d %.1f %.1f %.1f\n", local_rate, idealweight, ident->localweight, total_weight);
237         printcounter = 8;
238     } else {
239         printcounter -= 1;
240     }
241
242     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
243     //       dampen, dampen_increase, peer_weights, regime);
244
245     if (regime == 3) {
246         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
247                  ideal_over, ideal_under);
248     }
249
250     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
251
252     return(ideallocal);
253 }
254
255 /**
256  * Determines the local drop probability for a GRD identity every estimate
257  * interval.
258  */
259 static double allocate_grd(identity_t *ident, double aggdemand) {
260     double dropprob;
261     double global_limit = (double) (ident->limit);
262
263     if (aggdemand > global_limit) {
264         dropprob = (aggdemand-global_limit)/aggdemand;
265     } else {
266         dropprob = 0.0;
267     }
268     
269     if (system_loglevel == LOG_DEBUG) {
270         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
271            ident->common.rate, aggdemand, dropprob);
272     }
273
274     return dropprob;
275 }
276
277 /** 
278  * Given current estimates of local rate (weight) and remote rates (weights)
279  * use GRD or FPS to calculate a new local limit. 
280  */
281 static void allocate(limiter_t *limiter, identity_t *ident) {
282     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
283     double comm_val = 0;
284
285     /* Read comm_val from comm layer. */
286     if (limiter->policy == POLICY_FPS) {
287         read_comm(&ident->comm, &comm_val,
288                 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
289     } else {
290         read_comm(&ident->comm, &comm_val,
291                 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
292     }
293     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
294
295     /* Experimental printing. */
296     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
297              (double) ident->common.rate / (double) 128, ident->id);
298     ident->avg_bytes += ident->common.rate;
299     
300     if (limiter->policy == POLICY_FPS) {
301         ident->locallimit = allocate_fps(ident, comm_val);
302         ident->last_localweight = ident->localweight;
303         
304         /* Update other limiters with our weight by writing to comm layer. */
305         write_local_value(&ident->comm, ident->localweight);
306     } else {
307         ident->locallimit = 0; /* Unused with GRD. */
308         ident->last_drop_prob = ident->drop_prob;
309         ident->drop_prob = allocate_grd(ident, comm_val);
310         
311         /* Update other limiters with our rate by writing to comm layer. */
312         write_local_value(&ident->comm, ident->common.rate);
313     }
314
315     /* Update identity state. */
316     ident->common.last_rate = ident->common.rate;
317 }
318
319 /**
320  * Traces all of the parent pointers of a leaf all the way to the root in
321  * order to find the maximum drop probability in the chain.
322  */
323 static double find_leaf_drop_prob(leaf_t *leaf) {
324     identity_t *current = leaf->parent;
325     double result = 0;
326
327     assert(current);
328
329     while (current != NULL) {
330         if (current->drop_prob > result) {
331             result = current->drop_prob;
332         }
333         current = current->parent;
334     }
335
336     return result;
337 }
338
339 /**
340  * This is called once per estimate interval to enforce the rate that allocate
341  * has decided upon.  It makes calls to tc using system().
342  */
343 static void enforce(limiter_t *limiter, identity_t *ident) {
344     char cmd[CMD_BUFFER_SIZE];
345     int ret = 0;
346     int i = 0;
347
348     switch (limiter->policy) {
349         case POLICY_FPS:
350
351             /* TC treats limits of 0 (8bit) as unlimited, which causes the
352              * entire rate limiting system to become unpredictable.  In
353              * reality, we also don't want any limiter to be able to set its
354              * limit so low that it chokes all of the flows to the point that
355              * they can't increase.  Thus, when we're setting a low limit, we
356              * make sure that it isn't too low by using the
357              * FLOW_START_THRESHOLD. */
358
359             if (ident->locallimit < FLOW_START_THRESHOLD) {
360                 ident->locallimit = FLOW_START_THRESHOLD;
361             }
362
363             /* Do not allow the node to set a limit higher than its
364              * administratively assigned upper limit (bwcap). */
365             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
366                 ident->locallimit = limiter->nodelimit;
367             }
368
369             if (system_loglevel == LOG_DEBUG) {
370                 printf("FPS: Setting local limit to %d\n", ident->locallimit);
371             }
372             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
373             printlog(LOG_WARN, "%d\n", ident->locallimit);
374
375             snprintf(cmd, CMD_BUFFER_SIZE,
376                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
377                      ident->htb_parent, ident->htb_node, ident->locallimit);
378
379             ret = system(cmd);
380
381             if (ret) {
382                 /* FIXME: call failed.  What to do? */
383             }
384             break;
385
386         case POLICY_GRD:
387             for (i = 0; i < ident->leaf_count; ++i) {
388                 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
389                     /* The new drop probability for this identity is greater
390                      * than or equal to the leaf's current drop probability.
391                      * We can safely use the larger value at this leaf
392                      * immediately. */
393                     ident->leaves[i]->drop_prob = ident->drop_prob;
394                 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
395                     /* The old drop probability for this identity is less than
396                      * the leaf's current drop probability.  This means that
397                      * this identity couldn't have been the limiting ident,
398                      * so nothing needs to be done because the old limiting
399                      * ident is still the limiting factor. */
400
401                     /* Intentionally blank. */
402                 } else {
403                     /* If neither of the above are true, then...
404                      * 1) The new drop probability for the identity is less
405                      * than what it previously was, and
406                      * 2) This ident may have had the maximum drop probability
407                      * of all idents limiting this leaf, and therefore we need
408                      * to follow the leaf's parents up to the root to find the
409                      * new leaf drop probability safely. */
410                     ident->leaves[i]->drop_prob =
411                             find_leaf_drop_prob(ident->leaves[i]);
412                 }
413
414                 /* Make the call to tc. */
415 #ifdef DELAY40MS
416                 snprintf(cmd, CMD_BUFFER_SIZE,
417                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
418                          ident->leaves[i]->xid, ident->leaves[i]->xid,
419                          (100 * ident->leaves[i]->drop_prob));
420 #else
421                 snprintf(cmd, CMD_BUFFER_SIZE,
422                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
423                          ident->leaves[i]->xid, ident->leaves[i]->xid,
424                          (100 * ident->leaves[i]->drop_prob));
425 #endif
426                 ret = system(cmd);
427
428                 if (ret) {
429                     /* FIXME: call failed.  What to do? */
430                 }
431             }
432
433             break;
434
435         default: 
436             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
437             break;
438     }
439
440     return;
441 }
442
443 /**
444  * This function is periodically called to clean the stable instance's flow
445  * accounting tables for each identity.
446  */
447 static void clean(drl_instance_t *instance) {
448     identity_t *ident = NULL;
449
450     map_reset_iterate(instance->ident_map);
451     while ((ident = map_next(instance->ident_map)) != NULL) {
452         pthread_mutex_lock(&ident->table_mutex);
453
454         ident->table_cleanup_function(ident->table);
455
456         pthread_mutex_unlock(&ident->table_mutex);
457     }
458
459     /* Periodically flush the log file. */
460     flushlog();
461 }
462
463 static void print_averages(drl_instance_t *instance, int print_interval) {
464     identity_t *ident = NULL;
465
466     map_reset_iterate(instance->ident_map);
467     while ((ident = map_next(instance->ident_map)) != NULL) {
468         ident->avg_bytes /= (double) print_interval;
469         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
470         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
471                  ident->avg_bytes / 128, ident->id);
472         //printf("%.3f \t Avg rate. ID:%d\n",
473         //         ident->avg_bytes / 128, ident->id);
474         ident->avg_bytes = 0;
475     }
476 }
477
478 /** Thread function to handle local rate estimation.
479  *
480  * None of our simple hashmap functions are thread safe, so we lock the limiter
481  * with an rwlock to prevent another thread from attempting to modify the set
482  * of identities.
483  *
484  * Each identity also has a private lock for its table.  This gets locked by
485  * table-modifying functions such as estimate and clean.
486  */
487 void handle_estimation(void *arg) {
488     limiter_t *limiter = (limiter_t *) arg;
489     identity_t *ident = NULL;
490     int clean_timer, clean_wait_intervals;
491     useconds_t sleep_time = limiter->estintms * 1000;
492     uint32_t cal_slot = 0;
493     int print_interval = 1000 / (limiter->estintms);
494
495     sigset_t signal_mask;
496
497     sigemptyset(&signal_mask);
498     sigaddset(&signal_mask, SIGHUP);
499     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
500
501     /* Determine the number of intervals we should wait before hitting the
502      * specified clean interval. (Converts seconds -> intervals). */
503     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
504     clean_timer = clean_wait_intervals;
505
506     while (true) {
507         /* Sleep according to the delay of the estimate interval. */
508         usleep(sleep_time);
509
510         /* Grab the limiter lock for reading.  This prevents identities from
511          * disappearing beneath our feet. */
512         pthread_rwlock_rdlock(&limiter->limiter_lock);
513
514         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
515
516         /* Service all the identities that are scheduled to run during this
517          * tick. */
518         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
519             ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
520             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
521
522             /* Update the ident's flow accouting table with the latest info. */
523             estimate(ident);
524
525             /* Determine its share of the rate allocation. */
526             allocate(limiter, ident);
527
528             /* Make tc calls to enforce the rate we decided upon. */
529             enforce(limiter, ident);
530
531             /* Tell the comm library to propagate this identity's result for
532              * this interval.*/
533             send_update(&ident->comm, ident->id);
534
535             /* Add ident back to the queue at a future time slot. */
536             TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
537                               ((cal_slot + ident->intervals) & SCHEDMASK),
538                               ident, calendar);
539         }
540
541         print_interval--;
542         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
543             print_interval = 1000 / (limiter->estintms);
544             print_averages(&limiter->stable_instance, print_interval);
545         }
546
547         /* Check if enough intervals have passed for cleaning. */
548         if (clean_timer <= 0) {
549             clean(&limiter->stable_instance);
550             clean_timer = clean_wait_intervals;
551         } else {
552             clean_timer--;
553         }
554
555         limiter->stable_instance.cal_slot += 1;
556
557         pthread_rwlock_unlock(&limiter->limiter_lock); 
558     }
559 }