3 # Average bandwidth monitoring script. Run periodically via NM db.sync to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have transmitted 80% of its daily byte limit usage,
6 # its instantaneous rate will be capped at the bytes remaning in the limit
7 # over the time remaining in the recording period.
9 # Two separate limits are enforced, one for destinations exempt from
10 # the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
12 # Mark Huang <mlhuang@cs.princeton.edu>
13 # Andy Bavier <acb@cs.princeton.edu>
14 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
15 # Copyright (C) 2004-2008 The Trustees of Princeton University
36 # Set DEBUG to True if you don't want to send emails
38 # Set ENABLE to False to setup buckets, but not limit.
41 datafile = "/var/lib/misc/bwmon.dat"
44 sys.path.append("/etc/planetlab")
45 from plc_config import *
48 logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2)
49 logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1)
52 seconds_per_day = 24 * 60 * 60
55 # Burst to line rate (or node cap). Set by NM. in KBit/s
56 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
57 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
58 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G
59 # 5.4 Gbyte per day max allowed transfered per recording period
60 default_MaxKByte = 5662310
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_Maxi2KByte = 17196646
63 # Default share quanta
67 period = 1 * seconds_per_day
72 The slice %(slice)s has transmitted more than %(bytes)s from
73 %(hostname)s to %(class)s destinations
76 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
79 Please reduce the average %(class)s transmission rate
80 of the slice to %(limit)s per %(period)s.
86 %(date)s %(hostname)s bwcap %(slice)s
89 def format_bytes(bytes, si = True):
91 Formats bytes into a string
96 # Officially, a kibibyte
99 if bytes >= (kilo * kilo * kilo):
100 return "%.1f GB" % (bytes / (kilo * kilo * kilo))
101 elif bytes >= 1000000:
102 return "%.1f MB" % (bytes / (kilo * kilo))
104 return "%.1f KB" % (bytes / kilo)
106 return "%.0f bytes" % bytes
108 def format_period(seconds):
110 Formats a period in seconds into a string
113 if seconds == (24 * 60 * 60):
115 elif seconds == (60 * 60):
117 elif seconds > (24 * 60 * 60):
118 return "%.1f days" % (seconds / 24. / 60. / 60.)
119 elif seconds > (60 * 60):
120 return "%.1f hours" % (seconds / 60. / 60.)
122 return "%.1f minutes" % (seconds / 60.)
124 return "%.0f seconds" % seconds
126 def slicemail(slice, subject, body):
128 Front end to sendmail. Sends email to slice alias with given subject and body.
131 sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
133 # Parsed from MyPLC config
134 to = [PLC_MAIL_MOM_LIST_ADDRESS]
136 if slice is not None and slice != "root":
137 to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
139 header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
141 'version': sys.version.split(" ")[0],
147 Content-type: text/plain
151 X-Mailer: Python/%(version)s
154 """.lstrip() % header)
164 Stores the last recorded bandwidth parameters of a slice.
166 xid - slice context/VServer ID
168 time - beginning of recording period in UNIX seconds
169 bytes - low bandwidth bytes transmitted at the beginning of the recording period
170 i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
171 MaxKByte - total volume of data allowed
172 ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
173 Maxi2KByte - same as MaxKByte, but for i2
174 Threshi2Kbyte - same as Threshi2KByte, but for i2
175 MaxRate - max_rate slice attribute.
176 Maxi2Rate - max_exempt_rate slice attribute.
177 Share - Used by Sirius to loan min rates
178 Sharei2 - Used by Sirius to loan min rates for i2
179 self.emailed - did slice recv email during this recording period
183 def __init__(self, xid, name, rspec):
189 self.MaxRate = default_MaxRate
190 self.MinRate = bwlimit.bwmin / 1000
191 self.Maxi2Rate = default_Maxi2Rate
192 self.Mini2Rate = bwlimit.bwmin / 1000
193 self.MaxKByte = default_MaxKByte
194 self.ThreshKByte = int(.8 * self.MaxKByte)
195 self.Maxi2KByte = default_Maxi2KByte
196 self.Threshi2KByte = int(.8 * self.Maxi2KByte)
197 self.Share = default_Share
198 self.Sharei2 = default_Share
202 self.updateSliceAttributes(rspec)
203 bwlimit.set(xid = self.xid,
204 minrate = self.MinRate * 1000,
205 maxrate = self.MaxRate * 1000,
206 maxexemptrate = self.Maxi2Rate * 1000,
207 minexemptrate = self.Mini2Rate * 1000,
213 def updateSliceAttributes(self, rspec):
215 Use respects from GetSlivers to PLC to populate slice object. Also
216 do some sanity checking.
219 # Sanity check plus policy decision for MinRate:
220 # Minrate cant be greater than 25% of MaxRate or NodeCap.
221 MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000))
222 if MinRate > int(.25 * default_MaxRate):
223 MinRate = int(.25 * default_MaxRate)
224 if MinRate != self.MinRate:
225 self.MinRate = MinRate
226 logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate))
228 MaxRate = int(rspec.get('net_max_rate', default_MaxRate))
229 if MaxRate != self.MaxRate:
230 self.MaxRate = MaxRate
231 logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
233 Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000))
234 if Mini2Rate != self.Mini2Rate:
235 self.Mini2Rate = Mini2Rate
236 logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
238 Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate))
239 if Maxi2Rate != self.Maxi2Rate:
240 self.Maxi2Rate = Maxi2Rate
241 logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
243 MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
244 if MaxKByte != self.MaxKByte:
245 self.MaxKByte = MaxKByte
246 logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
248 Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
249 if Maxi2KByte != self.Maxi2KByte:
250 self.Maxi2KByte = Maxi2KByte
251 logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
253 ThreshKByte = int(rspec.get('net_thresh_kbyte', (MaxKByte * .8)))
254 if ThreshKByte != self.ThreshKByte:
255 self.ThreshKByte = ThreshKByte
256 logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
258 Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', (Maxi2KByte * .8)))
259 if Threshi2KByte != self.Threshi2KByte:
260 self.Threshi2KByte = Threshi2KByte
261 logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
263 Share = int(rspec.get('net_share', default_Share))
264 if Share != self.Share:
266 logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share))
268 Sharei2 = int(rspec.get('net_i2_share', default_Share))
269 if Sharei2 != self.Sharei2:
270 self.Sharei2 = Sharei2
271 logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
274 def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
276 Begin a new recording period. Remove caps by restoring limits
277 to their default values.
279 # Query Node Manager for max rate overrides
280 self.updateSliceAttributes(rspec)
282 # Reset baseline time
283 self.time = time.time()
285 # Reset baseline byte coutns
286 self.bytes = usedbytes
287 self.i2bytes = usedi2bytes
294 maxrate = self.MaxRate * 1000
295 maxi2rate = self.Maxi2Rate * 1000
296 if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
297 logger.log("bwmon: %s reset to %s/%s" % \
299 bwlimit.format_tc_rate(maxrate),
300 bwlimit.format_tc_rate(maxi2rate)), 1)
301 bwlimit.set(xid = self.xid,
302 minrate = self.MinRate * 1000,
303 maxrate = self.MaxRate * 1000,
304 maxexemptrate = self.Maxi2Rate * 1000,
305 minexemptrate = self.Mini2Rate * 1000,
308 def notify(self, new_maxrate, new_maxexemptrate, usedbytes, usedi2bytes):
310 Notify the slice it's being capped.
312 # Prepare message parameters from the template
314 params = {'slice': self.name, 'hostname': socket.gethostname(),
315 'since': time.asctime(time.gmtime(self.time)) + " GMT",
316 'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
317 'date': time.asctime(time.gmtime()) + " GMT",
318 'period': format_period(period)}
320 if new_maxrate != (self.MaxRate * 1000):
321 # Format template parameters for low bandwidth message
322 params['class'] = "low bandwidth"
323 params['bytes'] = format_bytes(usedbytes - self.bytes)
324 params['limit'] = format_bytes(self.MaxKByte * 1024)
325 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
327 # Cap low bandwidth burst rate
328 message += template % params
329 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
331 if new_maxexemptrate != (self.Maxi2Rate * 1000):
332 # Format template parameters for high bandwidth message
333 params['class'] = "high bandwidth"
334 params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
335 params['limit'] = format_bytes(self.Maxi2KByte * 1024)
336 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
338 message += template % params
339 logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
342 if self.emailed == False:
343 subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
345 logger.log("bwmon: "+ subject)
346 logger.log("bwmon: "+ message + (footer % params))
349 logger.log("bwmon: Emailing %s" % self.name)
350 slicemail(self.name, subject, message + (footer % params))
353 def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec):
355 Update byte counts and check if byte thresholds have been
356 exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period.
357 Recalculate every time module runs.
360 # copy self.Min* and self.*share values for comparison later.
361 runningMinRate = self.MinRate
362 runningMini2Rate = self.Mini2Rate
363 runningshare = self.Share
364 runningsharei2 = self.Sharei2
366 # Query Node Manager for max rate overrides
367 self.updateSliceAttributes(rspec)
370 if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
371 sum = self.bytes + (self.ThreshKByte * 1024)
372 maxbyte = self.MaxKByte * 1024
373 bytesused = usedbytes - self.bytes
374 timeused = int(time.time() - self.time)
375 # Calcuate new rate. in bit/s
376 new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
377 # Never go under MinRate
378 if new_maxrate < (self.MinRate * 1000):
379 new_maxrate = self.MinRate * 1000
380 # State information. I'm capped.
384 new_maxrate = self.MaxRate * 1000
387 if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
388 maxi2byte = self.Maxi2KByte * 1024
389 i2bytesused = usedi2bytes - self.i2bytes
390 timeused = int(time.time() - self.time)
392 new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
393 # Never go under MinRate
394 if new_maxi2rate < (self.Mini2Rate * 1000):
395 new_maxi2rate = self.Mini2Rate * 1000
396 # State information. I'm capped.
400 new_maxi2rate = self.Maxi2Rate * 1000
403 # Check running values against newly calculated values so as not to run tc
405 if (runningmaxrate != new_maxrate) or \
406 (runningMinRate != self.MinRate) or \
407 (runningmaxi2rate != new_maxi2rate) or \
408 (runningMini2Rate != self.Mini2Rate) or \
409 (runningshare != self.share) or \
410 (runningi2share != self.i2share):
412 bwlimit.set(xid = self.xid,
413 minrate = self.MinRate * 1000,
414 maxrate = new_maxrate,
415 minexemptrate = self.Mini2Rate * 1000,
416 maxexemptrate = new_maxi2rate,
420 if self.capped == True:
421 self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes)
424 def gethtbs(root_xid, default_xid):
426 Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
427 Turn off HTBs without names.
430 for params in bwlimit.get():
433 minexemptrate, maxexemptrate,
434 usedbytes, usedi2bytes) = params
436 name = bwlimit.get_slice(xid)
439 and (xid != root_xid) \
440 and (xid != default_xid):
441 # Orphaned (not associated with a slice) class
443 logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1)
446 livehtbs[xid] = {'share': share,
449 'maxexemptrate': maxexemptrate,
450 'minexemptrate': minexemptrate,
451 'usedbytes': usedbytes,
453 'usedi2bytes': usedi2bytes}
459 Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
472 # Incase the limits have changed.
473 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
474 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
476 # Incase default isn't set yet.
477 if default_MaxRate == -1:
478 default_MaxRate = 1000000
481 f = open(datafile, "r+")
482 logger.log("bwmon: Loading %s" % datafile, 2)
483 (version, slices, deaddb) = pickle.load(f)
485 # Check version of data file
486 if version != "$Id$":
487 logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
494 # Get/set special slice IDs
495 root_xid = bwlimit.get_xid("root")
496 default_xid = bwlimit.get_xid("default")
498 # Since root is required for sanity, its not in the API/plc database, so pass {}
500 if root_xid not in slices.keys():
501 slices[root_xid] = Slice(root_xid, "root", {})
502 slices[root_xid].reset(0, 0, 0, 0, {})
504 # Used by bwlimit. pass {} since there is no rspec (like above).
505 if default_xid not in slices.keys():
506 slices[default_xid] = Slice(default_xid, "default", {})
507 slices[default_xid].reset(0, 0, 0, 0, {})
510 # Get running slivers that should be on this node (from plc). {xid: name}
511 # db keys on name, bwmon keys on xid. db doesnt have xid either.
512 for plcSliver in nmdbcopy.keys():
513 live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
515 logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2)
516 logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2)
518 # Get actual running values from tc.
519 # Update slice totals and bandwidth. {xid: {values}}
520 kernelhtbs = gethtbs(root_xid, default_xid)
521 logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2)
523 # The dat file has HTBs for slices, but the HTBs aren't running
524 nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys())
525 logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2)
527 for nohtbslice in nohtbslices:
528 if live.has_key(nohtbslice):
529 slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
531 logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice)
532 del slices[nohtbslice]
534 # The dat file doesnt have HTB for the slice but kern has HTB
535 slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
536 logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2)
537 for slicenodat in slicesnodat:
538 # But slice is running
539 if live.has_key(slicenodat):
540 # init the slice. which means start accounting over since kernel
541 # htb was already there.
542 slices[slicenodat] = Slice(slicenodat,
543 live[slicenodat]['name'],
544 live[slicenodat]['_rspec'])
547 # Slices in GetSlivers but not running HTBs
548 newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
549 logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2)
552 for newslice in newslicesxids:
553 # Delegated slices dont have xids (which are uids) since they haven't been
555 if newslice != None and live[newslice].has_key('_rspec') == True:
556 # Check to see if we recently deleted this slice.
557 if live[newslice]['name'] not in deaddb.keys():
558 logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
559 # _rspec is the computed rspec: NM retrieved data from PLC, computed loans
560 # and made a dict of computed values.
561 slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
562 slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
563 # Double check time for dead slice in deaddb is within 24hr recording period.
564 elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
565 deadslice = deaddb[live[newslice]['name']]
566 logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
567 slices[newslice] = deadslice['slice']
568 slices[newslice].xid = newslice
570 slices[newslice].reset(deadslice['slice'].MaxRate,
571 deadslice['slice'].Maxi2Rate,
572 deadslice['htb']['usedbytes'],
573 deadslice['htb']['usedi2bytes'],
574 live[newslice]['_rspec'])
576 slices[newslice].update(deadslice['slice'].MaxRate,
577 deadslice['slice'].Maxi2Rate,
578 deadslice['htb']['usedbytes'],
579 deadslice['htb']['usedi2bytes'],
580 deadslice['htb']['share'],
581 live[newslice]['_rspec'])
582 # Since the slice has been reinitialed, remove from dead database.
583 del deaddb[deadslice['slice'].name]
585 logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name'])
587 # Move dead slices that exist in the pickle file, but
588 # aren't instantiated by PLC into the dead dict until
589 # recording period is over. This is to avoid the case where a slice is dynamically created
590 # and destroyed then recreated to get around byte limits.
591 deadxids = Set(slices.keys()) - Set(live.keys())
592 logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2)
593 for deadxid in deadxids:
594 if deadxid == root_xid or deadxid == default_xid:
596 logger.log("bwmon: removing dead slice %s " % deadxid)
597 if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid):
598 # add slice (by name) to deaddb
599 logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name)
600 deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]}
602 if kernelhtbs.has_key(deadxid):
603 logger.log("bwmon: Removing HTB for %s." % deadxid, 2)
607 for deadslice in deaddb.keys():
608 if (time.time() >= (deaddb[deadslice]['slice'].time + period)):
609 logger.log("bwmon: Removing dead slice %s from dat." \
610 % deaddb[deadslice]['slice'].name)
611 del deaddb[deadslice]
613 # Get actual running values from tc since we've added and removed buckets.
614 # Update slice totals and bandwidth. {xid: {values}}
615 kernelhtbs = gethtbs(root_xid, default_xid)
616 logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2)
618 # Update all byte limites on all slices
619 for (xid, slice) in slices.iteritems():
620 # Monitor only the specified slices
621 if xid == root_xid or xid == default_xid: continue
622 if names and name not in names:
625 if (time.time() >= (slice.time + period)) or \
626 (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
627 (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
628 # Reset to defaults every 24 hours or if it appears
629 # that the byte counters have overflowed (or, more
630 # likely, the node was restarted or the HTB buckets
631 # were re-initialized).
632 slice.reset(kernelhtbs[xid]['maxrate'], \
633 kernelhtbs[xid]['maxexemptrate'], \
634 kernelhtbs[xid]['usedbytes'], \
635 kernelhtbs[xid]['usedi2bytes'], \
638 logger.log("bwmon: Updating slice %s" % slice.name, 2)
640 slice.update(kernelhtbs[xid]['maxrate'], \
641 kernelhtbs[xid]['maxexemptrate'], \
642 kernelhtbs[xid]['usedbytes'], \
643 kernelhtbs[xid]['usedi2bytes'], \
644 kernelhtbs[xid]['share'],
647 logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2)
648 f = open(datafile, "w")
649 pickle.dump((version, slices, deaddb), f)
652 lock = threading.Event()
654 """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
655 logger.log("bwmon: Thread started", 2)
658 logger.log("bwmon: Event received. Running.", 2)
659 database.db_lock.acquire()
660 nmdbcopy = copy.deepcopy(database.db)
661 database.db_lock.release()
663 except: logger.log_exc()
667 tools.as_daemon_thread(run)
669 def GetSlivers(*args):