Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / drl / estimate.c
1 /* See the DRL-LICENSE file for this file's software license. */
2
3 /*
4  * Thread to periodically calculate the estimated local limits
5  * Barath Raghavan 2006/2007
6  * Ken Yocum 2007
7  * Kevin Webb 2007/2008
8  */
9
10 /** The size of the buffer we use to hold tc commands. */
11 #define CMD_BUFFER_SIZE 200
12
13 /* DRL specifics */
14 #include "raterouter.h" 
15 #include "util.h"
16 #include "ratetypes.h" /* needs util and pthread.h */
17 #include "logging.h"
18
19 static int underlimit_flowcount_count = 0;
20 static int underlimit_normal_count = 0;
21
22 /**
23  * Called for each identity each estimate interval.  Uses flow table information
24  * to estimate the current aggregate rate and the rate of the individual flows
25  * in the table.
26  */
27 static void estimate(identity_t *ident) {
28     struct timeval now;
29
30     gettimeofday(&now, NULL);
31
32     pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
33
34     ident->table_update_function(ident->table, now, ident->ewma_weight);
35
36     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
37 }
38
39 /**
40  * Determines the FPS weight allocation when the identity is under its current
41  * local rate limit.
42  */
43 static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
44     uint32_t target = local_rate;
45     double ideal_weight;
46     double total_weight = peer_weights + ident->last_localweight;
47
48     if (ident->flowstart) {
49         target = local_rate*4;
50         if (local_rate >= FLOW_START_THRESHOLD) {
51             ident->flowstart = false;
52         }
53     }
54     else {
55         /* June 16, 2008 (KCW)
56          * ident->flowstart gets set initially to one, but it is never set again.  However,
57          * if a limiter gets flows and then the number of flows drops to zero, it has trouble
58          * increasing the limit again. */
59         if (local_rate < FLOW_START_THRESHOLD) {
60             ident->flowstart = true;
61         }
62     }
63
64     if (target >= ident->limit) {
65         ideal_weight = total_weight;
66     } else if (target <= 0) {
67         ideal_weight = 0; // no flows here
68     } else {
69         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
70     }
71
72 #if 0
73     else if (peer_weights <= 0) {
74 #if 0
75         // doesn't matter what we pick as our weight, so pick 1 / N.
76         ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
77 #endif
78         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
79     } else {
80 #if 0
81         double divisor = (double) ident->limit - (double) target;
82         ideal_weight = ((double) target * peer_weights) / divisor;
83 #else
84         ideal_weight = ((double)target / (double)ident->limit) * total_weight;
85 #endif
86     }
87 #endif
88
89     return ideal_weight;
90 }
91
92 /**
93  * Determines the FPS weight allocation when the identity is over its current
94  * local rate limit.
95  */
96 static double allocate_fps_over_limit(identity_t *ident) {
97     double ideal_weight;
98
99     if (ident->common.max_flow_rate > 0) {
100         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
101
102         printlog(LOG_DEBUG, "%.3f  %d  %d  FlowCount, TotalRate, MaxRate\n",
103                 ideal_weight, ident->common.rate, ident->common.max_flow_rate);
104     } else {
105         ideal_weight = 1;
106     }
107
108     return ideal_weight;
109 }
110
111 /**
112  * Determines the amount of FPS weight to allocate to the identity during each
113  * estimate interval.  Note that total_weight includes local weight.
114  */
115 static uint32_t allocate_fps(identity_t *ident, double total_weight) {
116     common_accounting_t *ftable = &ident->common; /* Common flow table info */
117     uint32_t local_rate = ftable->rate;
118     uint32_t ideallocal = 0;
119     double peer_weights; /* sum of weights of all other limiters  */
120     double idealweight = 0;
121     double last_portion = 0;
122     double this_portion = 0;
123
124     static int dampen = 0;
125     int dampen_increase = 0;
126
127     double ideal_under = 0;
128     double ideal_over = 0;
129
130     int regime = 0;
131
132     /* two cases:
133        1. the aggregate is < limit
134        2. the aggregate is >= limit
135        */
136     peer_weights = total_weight - ident->last_localweight;
137     if (peer_weights < 0) {
138         peer_weights = 0;
139     }
140
141     if (dampen == 1) {
142         int64_t rate_delta =
143             (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
144         double threshold =
145             (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
146
147         if (rate_delta > threshold) {
148             dampen_increase = 1;
149             printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
150                      rate_delta, threshold);
151         }
152     }
153
154     if (local_rate <= 0) {
155         idealweight = 0;
156     } else if (dampen_increase == 0 && (ident->locallimit <= 0 || local_rate < ident->locallimit || ident->flowstart)) {
157         /* We're under the limit - all flows are bottlenecked. */
158         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
159         ideal_over = allocate_fps_over_limit(ident);
160         ideal_under = idealweight;
161
162         if (ideal_over < idealweight) {
163             idealweight = ideal_over;
164             regime = 3;
165             dampen = 2;
166             underlimit_flowcount_count += 1;
167         } else {
168             regime = 1;
169             dampen = 0;
170             underlimit_normal_count += 1;
171         }
172
173         /* Apply EWMA */
174         ident->localweight = (ident->localweight * ident->ewma_weight +
175                               idealweight * (1 - ident->ewma_weight));
176         
177     } else {
178         idealweight = allocate_fps_over_limit(ident);
179         
180         /* Apply EWMA */
181         ident->localweight = (ident->localweight * ident->ewma_weight +
182                               idealweight * (1 - ident->ewma_weight));
183
184         /* This is the portion of the total weight in the system that was caused
185          * by this limiter in the last interval. */
186         last_portion = ident->last_localweight / total_weight;
187
188         /* This is the fraction of the total weight in the system that our
189          * proposed value for idealweight would use. */
190         this_portion = ident->localweight / (peer_weights + ident->localweight);
191
192         /* Dampen the large increase the first time... */
193         if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
194             ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
195             dampen = 1;
196         } else {
197             dampen = 2;
198         }
199
200         ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
201         ideal_over = idealweight;
202
203         regime = 2;
204     }
205
206     /* Convert weight into a rate - add in our new local weight */
207     total_weight = ident->localweight + peer_weights;
208
209     /* compute local allocation:
210        if there is traffic elsewhere, use the weights
211        otherwise do a L/n allocation */
212     if (total_weight > 0) {
213     //if (peer_weights > 0) {
214         ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
215     } else {
216         ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
217     }
218
219     printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
220
221     printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
222             ideal_under / (ideal_under + peer_weights),
223             ideal_over / (ideal_over + peer_weights),
224             ident->localweight / (ident->localweight + peer_weights),
225             (double) local_rate / (double) ident->limit);
226
227     printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
228
229     printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
230             local_rate, idealweight, ident->localweight, total_weight);
231
232     //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
233     //       dampen, dampen_increase, peer_weights, regime);
234
235     //printf("normal_count: %d, flowcount_count: %d\n", underlimit_normal_count, underlimit_flowcount_count);
236
237     if (regime == 3) {
238         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
239                  ideal_over, ideal_under);
240     }
241
242     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
243
244     return(ideallocal);
245 }
246
247 /**
248  * Determines the local drop probability for a GRD identity every estimate
249  * interval.
250  */
251 static double allocate_grd(identity_t *ident, double aggdemand) {
252     double dropprob;
253     double global_limit = (double) (ident->limit);
254
255     if (aggdemand > global_limit) {
256         dropprob = (aggdemand-global_limit)/aggdemand;
257     } else {
258         dropprob = 0.0;
259     }
260     
261     //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
262     //        ident->common.rate, aggdemand, dropprob);
263
264     return dropprob;
265 }
266
267 /** 
268  * Given current estimates of local rate (weight) and remote rates (weights)
269  * use GRD or FPS to calculate a new local limit. 
270  */
271 static void allocate(limiter_t *limiter, identity_t *ident) {
272     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
273     double comm_val = 0;
274
275     /* Read comm_val from comm layer. */
276     read_comm(&ident->comm, &comm_val);
277     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
278
279     /* Experimental printing. */
280     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
281              (double) ident->common.rate / (double) 128, ident->id);
282     ident->avg_bytes += ident->common.rate;
283     
284     if (limiter->policynum == POLICY_FPS) {
285         ident->locallimit = allocate_fps(ident, comm_val);
286         ident->last_localweight = ident->localweight;
287         
288         /* Update other limiters with our weight by writing to comm layer. */
289         write_local_value(&ident->comm, ident->localweight);
290     } else {
291         ident->locallimit = 0; /* Unused with GRD. */
292         ident->last_localdropprob = ident->localdropprob;
293         ident->localdropprob = allocate_grd(ident, comm_val);
294         
295         /* Update other limiters with our rate by writing to comm layer. */
296         write_local_value(&ident->comm, ident->common.rate);
297     }
298
299     /* Update identity state. */
300     ident->common.last_rate = ident->common.rate;
301 }
302
303 /**
304  * This is called once per estimate interval to enforce the rate that allocate
305  * has decided upon.  It makes calls to tc using system().
306  */
307 static void enforce(limiter_t *limiter, identity_t *ident) {
308     char cmd[CMD_BUFFER_SIZE];
309     int ret = 0;
310
311     switch (limiter->policynum) {
312         case POLICY_FPS:
313
314             /* TC treats limits of 0 (8bit) as unlimited, which causes the
315              * entire rate limiting system to become unpredictable.  In
316              * reality, we also don't want any limiter to be able to set its
317              * limit so low that it chokes all of the flows to the point that
318              * they can't increase.  Thus, when we're setting a low limit, we
319              * make sure that it isn't too low by using the
320              * FLOW_START_THRESHOLD. */
321
322             if (ident->locallimit < FLOW_START_THRESHOLD) {
323                 ident->locallimit = FLOW_START_THRESHOLD * 2;
324             }
325
326             /* Do not allow the node to set a limit higher than its
327              * administratively assigned upper limit (bwcap). */
328             if (limiter->nodelimit != 0 && ident->locallimit > limiter->nodelimit) {
329                 ident->locallimit = limiter->nodelimit;
330             }
331
332             printf("FPS: Setting local limit to %d\n", ident->locallimit);
333             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
334
335             snprintf(cmd, CMD_BUFFER_SIZE,
336                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
337                      ident->htb_parent, ident->htb_node, ident->locallimit);
338
339             ret = system(cmd);
340
341             if (ret) {
342                 /* FIXME: call failed.  What to do? */
343             }
344             break;
345
346         case POLICY_GRD:
347 /* FIXME: Figure out where to enforce GRD. */
348 #if 0
349             for (i = 0; i < ident->num_slices; i++){
350
351                 sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
352                         ident->xids[i],ident->xids[i], (100*ident->localdropprob));
353
354                 ret = system(cmd);
355
356                 if (ret==-1)
357                     print_system_error(ret);
358             }
359 #endif
360             break;
361
362         default: 
363             printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
364             break;
365     }
366
367     return;
368 }
369
370 /**
371  * This function is periodically called to clean the stable instance's flow
372  * accounting tables for each identity.
373  */
374 static void clean(drl_instance_t *instance) {
375     identity_t *ident = NULL;
376
377     map_reset_iterate(instance->ident_map);
378     while ((ident = map_next(instance->ident_map)) != NULL) {
379         pthread_mutex_lock(&ident->table_mutex);
380
381         ident->table_cleanup_function(ident->table);
382
383         pthread_mutex_unlock(&ident->table_mutex);
384     }
385
386     /* Periodically flush the log file. */
387     flushlog();
388 }
389
390 static void print_averages(drl_instance_t *instance, int print_interval) {
391     identity_t *ident = NULL;
392
393     map_reset_iterate(instance->ident_map);
394     while ((ident = map_next(instance->ident_map)) != NULL) {
395         ident->avg_bytes /= (double) print_interval;
396         //printf("avg_bytes = %f, print_interval = %d\n", ident->avg_bytes, print_interval);
397         printlog(LOG_DEBUG, "%.3f \t Avg rate. ID:%d\n",
398                  ident->avg_bytes / 128, ident->id);
399         //printf("%.3f \t Avg rate. ID:%d\n",
400         //         ident->avg_bytes / 128, ident->id);
401         ident->avg_bytes = 0;
402     }
403 }
404
405 /** Thread function to handle local rate estimation.
406  *
407  * None of our simple hashmap functions are thread safe, so we lock the limiter
408  * with an rwlock to prevent another thread from attempting to modify the set
409  * of identities.
410  *
411  * Each identity also has a private lock for its table.  This gets locked by
412  * table-modifying functions such as estimate and clean.
413  */
414 void handle_estimation(void *arg) {
415     limiter_t *limiter = (limiter_t *) arg;
416     identity_t *ident = NULL;
417     int clean_timer, clean_wait_intervals;
418     useconds_t sleep_time = limiter->estintms * 1000;
419     uint32_t cal_slot = 0;
420     int print_interval = 1000 / (limiter->estintms);
421
422     sigset_t signal_mask;
423
424     sigemptyset(&signal_mask);
425     sigaddset(&signal_mask, SIGHUP);
426     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
427
428     /* Determine the number of intervals we should wait before hitting the
429      * specified clean interval. (Converts seconds -> intervals). */
430     clean_wait_intervals = IDENT_CLEAN_INTERVAL * (1000.0 / limiter->estintms);
431     clean_timer = clean_wait_intervals;
432
433     while (true) {
434         /* Sleep according to the delay of the estimate interval. */
435         usleep(sleep_time);
436
437         /* Grab the limiter lock for reading.  This prevents identities from
438          * disappearing beneath our feet. */
439         pthread_rwlock_rdlock(&limiter->limiter_lock);
440
441         cal_slot = limiter->stable_instance.cal_slot & SCHEDMASK;
442
443         /* Service all the identities that are scheduled to run during this
444          * tick. */
445         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
446             ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
447             TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
448
449             /* Update the ident's flow accouting table with the latest info. */
450             estimate(ident);
451
452             /* Determine its share of the rate allocation. */
453             allocate(limiter, ident);
454
455             /* Make tc calls to enforce the rate we decided upon. */
456             enforce(limiter, ident);
457
458             /* Tell the comm library to propagate this identity's result for
459              * this interval.*/
460             send_update(&ident->comm, ident->id);
461
462             /* Add ident back to the queue at a future time slot. */
463             TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
464                               ((cal_slot + ident->intervals) & SCHEDMASK),
465                               ident, calendar);
466         }
467
468         print_interval--;
469         if (loglevel() == LOG_DEBUG && print_interval <= 0) {
470             print_interval = 1000 / (limiter->estintms);
471             print_averages(&limiter->stable_instance, print_interval);
472         }
473
474         /* Check if enough intervals have passed for cleaning. */
475         if (clean_timer <= 0) {
476             clean(&limiter->stable_instance);
477             clean_timer = clean_wait_intervals;
478         } else {
479             clean_timer--;
480         }
481
482         limiter->stable_instance.cal_slot += 1;
483
484         pthread_rwlock_unlock(&limiter->limiter_lock); 
485     }
486 }