e55639bfd96b6bc521d809c240f50f88d87639e1
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 #include <assert.h>
11
12 /** The size of the buffer we use to hold tc commands. */
13 #define CMD_BUFFER_SIZE 200
14
15 /* DRL specifics */
16 #include "raterouter.h" 
17 #include "util.h"
18 #include "ratetypes.h" /* needs util and pthread.h */
19 #include "logging.h"
20
21 static int underlimit_flowcount_count = 0;
22 static int underlimit_normal_count = 0;
23 extern uint8_t system_loglevel;
24
25 /**
26  * Called for each identity each estimate interval.  Uses flow table information
27  * to estimate the current aggregate rate and the rate of the individual flows
28  * in the table.
29  */
30 static void estimate(identity_t *ident) {
31     struct timeval now;
32
33     gettimeofday(&now, NULL);
34
35     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
36
37     ident->table_update_function(ident->table, now, ident->ewma_weight);
38
39     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
40 }
41
42 /**
43  * Determines the FPS weight allocation when the identity is under its current
44  * local rate limit.
45  */
46 static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
47     uint32_t target = local_rate;
48     double ideal_weight;
49     double total_weight = peer_weights + ident->last_localweight;
50
51     if (ident->flowstart) {
52         target = local_rate*4;
53         if (local_rate >= FLOW_START_THRESHOLD) {
54             ident->flowstart = false;
55         }
56     }
57     else {
58         /* June 16, 2008 (KCW)
59          * ident->flowstart gets set initially to one, but it is never set again.  However,
60          * if a limiter gets flows and then the number of flows drops to zero, it has trouble
61          * increasing the limit again. */
62         if (local_rate < FLOW_START_THRESHOLD) {
63             ident->flowstart = true;
64         }
65     }
66
67     if (target >= ident->limit) {
68         ideal_weight = total_weight;
69     } else if (target <= 0) {
70         ideal_weight = 0; // no flows here
71     } else {
72         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
73     }
74
75 #if 0
76     else if (peer_weights <= 0) {
77 #if 0
78         // doesn't matter what we pick as our weight, so pick 1 / N.
79         ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
80 #endif
81         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
82     } else {
83 #if 0
84         double divisor = (double) ident->limit - (double) target;
85         ideal_weight = ((double) target * peer_weights) / divisor;
86 #else
87         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
88 #endif
89     }
90 #endif
91
92     return ideal_weight;
93 }
94
95 /**
96  * Determines the FPS weight allocation when the identity is over its current
97  * local rate limit.
98  */
99 static double allocate_fps_over_limit(identity_t *ident) {
100     double ideal_weight;
101
102     if (ident->common.max_flow_rate > 0) {
103         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
104
105         printlog(LOG_DEBUG, "%.3f  %d  %d  FlowCount, TotalRate, MaxRate\n",
106                 ideal_weight, ident->common.rate, ident->common.max_flow_rate);
107     } else {
108         ideal_weight = 1;
109     }
110
111     return ideal_weight;
112 }
113
114 /**
115  * Determines the amount of FPS weight to allocate to the identity during each
116  * estimate interval.  Note that total_weight includes local weight.
117  */
118 static uint32_t allocate_fps(identity_t *ident, double total_weight) {
119     common_accounting_t *ftable = &ident->common; /* Common flow table info */
120     uint32_t local_rate = ftable->rate;
121     uint32_t ideallocal = 0;
122     double peer_weights; /* sum of weights of all other limiters  */
123     double idealweight = 0;
124     double last_portion = 0;
125     double this_portion = 0;
126
127     static int dampen = 0;
128     int dampen_increase = 0;
129
130     double ideal_under = 0;
131     double ideal_over = 0;
132
133     int regime = 0;
134
135     /* two cases:
136        1. the aggregate is < limit
137        2. the aggregate is >= limit
138        */
139     peer_weights = total_weight - ident->last_localweight;
140     if (peer_weights < 0) {
141         peer_weights = 0;
142     }
143
144     if (dampen == 1) {
145         int64_t rate_delta =
146             (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
147         double threshold =
148             (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
149
150         if (rate_delta > threshold) {
151             dampen_increase = 1;
152             printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
153                      rate_delta, threshold);
154         }
155     }
156
157     if (local_rate <= 0) {
158         idealweight = 0;
159     } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) {
160         /* We're under the limit - all flows are bottlenecked. */
161         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
162         ideal_over = allocate_fps_over_limit(ident);
163         ideal_under = idealweight;
164
165         if (ideal_over < idealweight) {
166             idealweight = ideal_over;
167             regime = 3;
168             dampen = 2;
169             underlimit_flowcount_count += 1;
170         } else {
171             regime = 1;
172             dampen = 0;
173             underlimit_normal_count += 1;
174         }
175
176         /* Apply EWMA */
177         ident->localweight = (ident->localweight * ident->ewma_weight +
178                               idealweight * (1 - ident->ewma_weight));
179         
180     } else {
181         idealweight = allocate_fps_over_limit(ident);
182         
183         /* Apply EWMA */
184         ident->localweight = (ident->localweight * ident->ewma_weight +
185                               idealweight * (1 - ident->ewma_weight));
186
187         /* This is the portion of the total weight in the system that was caused
188          * by this limiter in the last interval. */
189         last_portion = ident->last_localweight / total_weight;
190
191         /* This is the fraction of the total weight in the system that our
192          * proposed value for idealweight would use. */
193         this_portion = ident->localweight / (peer_weights + ident->localweight);
194
195         /* Dampen the large increase the first time... */
196         if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
197             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
198             dampen = 1;
199         } else {
200             dampen = 2;
201         }
202
203         ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
204         ideal_over = idealweight;
205
206         regime = 2;
207     }
208
209     /* Convert weight into a rate - add in our new local weight */
210     ident->total_weight = total_weight = ident->localweight + peer_weights;
211
212     /* compute local allocation:
213        if there is traffic elsewhere, use the weights
214        otherwise do a L/n allocation */
215     if (total_weight > 0) {
216     //if (peer_weights > 0) {
217         ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
218     } else {
219         ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
220     }
221
222     printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
223
224     printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
225             ideal_under / (ideal_under + peer_weights),
226             ideal_over / (ideal_over + peer_weights),
227             ident->localweight / (ident->localweight + peer_weights),
228             (double) local_rate / (double) ident->limit);
229
230     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
231
232     if (system_loglevel == LOG_DEBUG) {
233         printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
234             local_rate, idealweight, ident->localweight, total_weight);
235     }
236
237     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
238     //       dampen, dampen_increase, peer_weights, regime);
239
240     //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count);
241
242     if (regime == 3) {
243         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
244                  ideal_over, ideal_under);
245     }
246
247     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
248
249     return(ideallocal);
250 }
251
252 /**
253  * Determines the local drop probability for a GRD identity every estimate
254  * interval.
255  */
256 static double allocate_grd(identity_t *ident, double aggdemand) {
257     double dropprob;
258     double global_limit = (double) (ident->limit);
259
260     if (aggdemand > global_limit) {
261         dropprob = (aggdemand-global_limit)/aggdemand;
262     } else {
263         dropprob = 0.0;
264     }
265     
266     if (system_loglevel == LOG_DEBUG) {
267         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
268            ident->common.rate, aggdemand, dropprob);
269     }
270
271     return dropprob;
272 }
273
274 /** 
275  * Given current estimates of local rate (weight) and remote rates (weights)
276  * use GRD or FPS to calculate a new local limit. 
277  */
278 static void allocate(limiter_t *limiter, identity_t *ident) {
279     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
280     double comm_val = 0;
281
282     /* Read comm_val from comm layer. */
283     if (limiter->policy == POLICY_FPS) {
284         read_comm(&ident->comm, &comm_val,
285                 ident->total_weight / (double) (ident->comm.remote_node_count + 1));
286     } else {
287         read_comm(&ident->comm, &comm_val,
288                 (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
289     }
290     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
291
292     /* Experimental printing. */
293     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
294              (double) ident->common.rate / (double) 128, ident->id);
295     ident->avg_bytes += ident->common.rate;
296     
297     if (limiter->policy == POLICY_FPS) {
298         ident->locallimit = allocate_fps(ident, comm_val);
299         ident->last_localweight = ident->localweight;
300         
301         /* Update other limiters with our weight by writing to comm layer. */
302         write_local_value(&ident->comm, ident->localweight);
303     } else {
304         ident->locallimit = 0; /* Unused with GRD. */
305         ident->last_drop_prob = ident->drop_prob;
306         ident->drop_prob = allocate_grd(ident, comm_val);
307         
308         /* Update other limiters with our rate by writing to comm layer. */
309         write_local_value(&ident->comm, ident->common.rate);
310     }
311
312     /* Update identity state. */
313     ident->common.last_rate = ident->common.rate;
314 }
315
316 /**
317  * Traces all of the parent pointers of a leaf all the way to the root in
318  * order to find the maximum drop probability in the chain.
319  */
320 static double find_leaf_drop_prob(leaf_t *leaf) {
321     identity_t *current = leaf->parent;
322     double result = 0;
323
324     assert(current);
325
326     while (current != NULL) {
327         if (current->drop_prob > result) {
328             result = current->drop_prob;
329         }
330         current = current->parent;
331     }
332
333     return result;
334 }
335
336 /**
337  * This is called once per estimate interval to enforce the rate that allocate
338  * has decided upon.  It makes calls to tc using system().
339  */
340 static void enforce(limiter_t *limiter, identity_t *ident) {
341     char cmd[CMD_BUFFER_SIZE];
342     int ret = 0;
343     int i = 0;
344
345     switch (limiter->policy) {
346         case POLICY_FPS:
347
348             /* TC treats limits of 0 (8bit) as unlimited, which causes the
349              * entire rate limiting system to become unpredictable.  In
350              * reality, we also don't want any limiter to be able to set its
351              * limit so low that it chokes all of the flows to the point that
352              * they can't increase.  Thus, when we're setting a low limit, we
353              * make sure that it isn't too low by using the
354              * FLOW_START_THRESHOLD. */
355
356             if (ident->locallimit < FLOW_START_THRESHOLD) {
357                 ident->locallimit = FLOW_START_THRESHOLD * 2;
358             }
359
360             /* Do not allow the node to set a limit higher than its
361              * administratively assigned upper limit (bwcap). */
362             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
363                 ident->locallimit = limiter->nodelimit;
364             }
365
366             if (system_loglevel == LOG_DEBUG) {
367                 printf("FPS: Setting local limit to %d\n", ident->locallimit);
368             }
369             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
370
371             snprintf(cmd, CMD_BUFFER_SIZE,
372                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
373                      ident->htb_parent, ident->htb_node, ident->locallimit);
374
375             ret = system(cmd);
376
377             if (ret) {
378                 /* FIXME: call failed.  What to do? */
379             }
380             break;
381
382         case POLICY_GRD:
383             for (i = 0; i < ident->leaf_count; ++i) {
384                 if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
385                     /* The new drop probability for this identity is greater
386                      * than or equal to the leaf's current drop probability.
387                      * We can safely use the larger value at this leaf
388                      * immediately. */
389                     ident->leaves[i]->drop_prob = ident->drop_prob;
390                 } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
391                     /* The old drop probability for this identity is less than
392                      * the leaf's current drop probability.  This means that
393                      * this identity couldn't have been the limiting ident,
394                      * so nothing needs to be done because the old limiting
395                      * ident is still the limiting factor. */
396
397                     /* Intentionally blank. */
398                 } else {
399                     /* If neither of the above are true, then...
400                      * 1) The new drop probability for the identity is less
401                      * than what it previously was, and
402                      * 2) This ident may have had the maximum drop probability
403                      * of all idents limiting this leaf, and therefore we need
404                      * to follow the leaf's parents up to the root to find the
405                      * new leaf drop probability safely. */
406                     ident->leaves[i]->drop_prob =
407                             find_leaf_drop_prob(ident->leaves[i]);
408                 }
409
410                 /* Make the call to tc. */
411 #ifdef DELAY40MS
412                 snprintf(cmd, CMD_BUFFER_SIZE,
413                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
414                          ident->leaves[i]->xid, ident->leaves[i]->xid,
415                          (100 * ident->leaves[i]->drop_prob));
416 #else
417                 snprintf(cmd, CMD_BUFFER_SIZE,
418                          "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
419                          ident->leaves[i]->xid, ident->leaves[i]->xid,
420                          (100 * ident->leaves[i]->drop_prob));
421 #endif
422                 ret = system(cmd);
423
424                 if (ret) {
425                     /* FIXME: call failed.  What to do? */
426                 }
427             }
428
429             break;
430
431         default: 
432             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
433             break;
434     }
435
436     return;
437 }
438
439 /**
440  * This function is periodically called to clean the stable instance's flow
441  * accounting tables for each identity.
442  */
443 static void clean(drl_instance_t *instance) {
444     identity_t *ident = NULL;
445
446     map_reset_iterate(instance->ident_map);
447     while ((ident = map_next(instance->ident_map)) != NULL) {
448         pthread_mutex_lock(&ident->table_mutex);
449
450         ident->table_cleanup_function(ident->table);
451
452         pthread_mutex_unlock(&ident->table_mutex);
453     }
454
455     /* Periodically flush the log file. */
456     flushlog();
457 }
458
459 static void print_averages(drl_instance_t *instance, int print_interval) {
460     identity_t *ident = NULL;
461
462     map_reset_iterate(instance->ident_map);
463     while ((ident = map_next(instance->ident_map)) != NULL) {
464         ident->avg_bytes /= (double) print_interval;
465         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
466         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
467                  ident->avg_bytes / 128, ident->id);
468         //printf("%.3f \t Avg rate. ID:%d\n",
469         //         ident->avg_bytes / 128, ident->id);
470         ident->avg_bytes = 0;
471     }
472 }
473
474 /** Thread function to handle local rate estimation.
475  *
476  * None of our simple hashmap functions are thread safe, so we lock the limiter
477  * with an rwlock to prevent another thread from attempting to modify the set
478  * of identities.
479  *
480  * Each identity also has a private lock for its table.  This gets locked by
481  * table-modifying functions such as estimate and clean.
482  */
483 void handle_estimation(void *arg) {
484     limiter_t *limiter = (limiter_t *) arg;
485     identity_t *ident = NULL;
486     int clean_timer, clean_wait_intervals;
487     useconds_t sleep_time = limiter->estintms * 1000;
488     uint32_t cal_slot = 0;
489     int print_interval = 1000 / (limiter->estintms);
490
491     sigset_t signal_mask;
492
493     sigemptyset(&signal_mask);
494     sigaddset(&signal_mask, SIGHUP);
495     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
496
497     /* Determine the number of intervals we should wait before hitting the
498      * specified clean interval. (Converts seconds -> intervals). */
499     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
500     clean_timer = clean_wait_intervals;
501
502     while (true) {
503         /* Sleep according to the delay of the estimate interval. */
504         usleep(sleep_time);
505
506         /* Grab the limiter lock for reading.  This prevents identities from
507          * disappearing beneath our feet. */
508         pthread_rwlock_rdlock(&limiter->limiter_lock);
509
510         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
511
512         /* Service all the identities that are scheduled to run during this
513          * tick. */
514         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
515             ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
516             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
517
518             /* Update the ident's flow accouting table with the latest info. */
519             estimate(ident);
520
521             /* Determine its share of the rate allocation. */
522             allocate(limiter, ident);
523
524             /* Make tc calls to enforce the rate we decided upon. */
525             enforce(limiter, ident);
526
527             /* Tell the comm library to propagate this identity's result for
528              * this interval.*/
529             send_update(&ident->comm, ident->id);
530
531             /* Add ident back to the queue at a future time slot. */
532             TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
533                               ((cal_slot + ident->intervals) & SCHEDMASK),
534                               ident, calendar);
535         }
536
537         print_interval--;
538         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
539             print_interval = 1000 / (limiter->estintms);
540             print_averages(&limiter->stable_instance, print_interval);
541         }
542
543         /* Check if enough intervals have passed for cleaning. */
544         if (clean_timer <= 0) {
545             clean(&limiter->stable_instance);
546             clean_timer = clean_wait_intervals;
547         } else {
548             clean_timer--;
549         }
550
551         limiter->stable_instance.cal_slot += 1;
552
553         pthread_rwlock_unlock(&limiter->limiter_lock); 
554     }
555 }