#!/usr/bin/python # # Average bandwidth monitoring script. Run periodically via cron(8) to # enforce a soft limit on daily bandwidth usage for each slice. If a # slice is found to have exceeded its daily bandwidth usage when the # script is run, its instantaneous rate will be capped at the desired # average rate. Thus, in the worst case, a slice will only be able to # send a little more than twice its average daily limit. # # Two separate limits are enforced, one for destinations exempt from # the node bandwidth cap, and the other for all other destinations. # # Mark Huang # Andy Bavier # Faiyaz Ahmed # Copyright (C) 2004-2006 The Trustees of Princeton University # # $Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $ # import syslog import os import sys import getopt import time import pickle import socket import xmlrpclib import bwlimit from sets import Set # Utility functions from pl_mom import * # Constants seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 # Defaults debug = False verbose = 0 datafile = "/var/lib/misc/bwmon.dat" nm = None # Burst to line rate (or node cap). Set by NM. default_maxrate = bwlimit.get_bwcap() default_maxexemptrate = bwlimit.bwmax # What we cap to when slices break the rules. # 500 Kbit or 5.4 GB per day #default_avgrate = 500000 # 1.5 Mbit or 16.4 GB per day #default_avgexemptrate = 1500000 # 4 Gbyte per day. 4 * 1024K * 1024M * 1024G default_ByteThresh = 4294967296 # 5.4 Gbyte per day max allowed transfered per recording period default_ByteMax = 5798205850 # 14 Gbyte per day default_ExemptByteThresh = 15032385536 # 16.4 Gbyte per day max allowed transfered per recording period to I2 default_ExemptByteMax = 17609365914 # Average over 1 day period = 1 * seconds_per_day # Message template template = \ """ The slice %(slice)s has transmitted more than %(bytes)s from %(hostname)s to %(class)s destinations since %(since)s. Its maximum %(class)s burst rate will be capped at %(new_maxrate)s until %(until)s. Please reduce the average %(class)s transmission rate of the slice %(limit)s per %(period)s. """.lstrip() footer = \ """ %(date)s %(hostname)s bwcap %(slice)s """.lstrip() class Slice: """ Stores the last recorded bandwidth parameters of a slice. xid - slice context/VServer ID name - slice name time - beginning of recording period in UNIX seconds bytes - low bandwidth bytes transmitted at the beginning of the recording period exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F) ByteMax - total volume of data allowed ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period) ExemptByteMax - Same as above, but for i2. ExemptByteThresh - i2 ByteThresh #last_maxexemptrate - last recorded maxexemptrate from NM. Slice attribute. #last_maxrate - last recorded maxrate from NM. Slice attribute. #last_ByteMax - Last recorded from NM. total volume of data allowed. #last_ByteThresh - Last recorded from NM. After thresh, cap node to (maxbyte - bytes)/(period - t) #last_ExemptByteMax - Last recorded from NM. Same as above, but for i2. #last_ExemptByteThresh - Last recorded from NM. i2 ByteThresh """ def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes): self.xid = xid self.name = name self.time = 0 self.bytes = 0 self.exemptbytes = 0 self.ByteMax = default_ByteMax self.ByteThresh = default_ByteThresh self.ExemptByteMax = default_ExemptByteMax self.ExemptByteThresh = default_ExemptByteThresh self.maxrate = default_maxrate self.maxexemptrate = default_maxexemptrate #self.last_maxrate = default_maxrate #self.last_maxexemptrate = default_maxexemptrate #self.last_ByteMax = default_ByteMax #self.last_ByteThresh = default_ByteThresh #self.last_ExemptByteMax = default_ExemptByteMax #self.last_ExemptByteThresh = default_ExemptByteThresh # Get real values where applicable self.reset(maxrate, maxexemptrate, bytes, exemptbytes) def __repr__(self): return self.name def updateSliceAttributes(self): # Query Node Manager for max rate overrides try: vals = nm.query(self.name, [('nm_net_max_rate', self.maxrate), ('nm_net_max_exempt_rate', self.maxexemptrate)]) (self.maxrate, self.maxexemptrate) = vals except Exception, err: print "Warning: Exception received while querying NM:", err def reset(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ # Query Node Manager for max rate overrides self.updateSliceAttributes() # Reset baseline time self.time = time.time() # Reset baseline byte coutns self.bytes = bytes self.exemptbytes = exemptbytes if (self.maxrate != maxrate) or (self.maxexemptrate != maxexemptrate): print "%s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(self.maxrate), bwlimit.format_tc_rate(self.maxexemptrate)) bwlimit.set(xid = self.xid, maxrate = self.maxrate, maxexemptrate = self.maxexemptrate) def update(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Update byte counts and check if average rates have been exceeded. In the worst case (instantaneous usage of the entire average daily byte limit at the beginning of the recording period), the slice will be immediately capped and will get to send twice the average daily byte limit. In the common case, it will get to send slightly more than the average daily byte limit. """ # Query Node Manager for max rate overrides self.updateSliceAttributes() # Prepare message parameters from the template message = "" params = {'slice': self.name, 'hostname': socket.gethostname(), 'since': time.asctime(time.gmtime(self.time)) + " GMT", 'until': time.asctime(time.gmtime(self.time + period)) + " GMT", 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} if bytes >= (self.bytes + self.ByteThresh): new_maxrate = (self.ByteMax - self.ByteThresh)/(period - (time.time() - self.time)) else: new_maxrate = maxrate # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(bytes - self.bytes) params['maxrate'] = bwlimit.format_tc_rate(maxrate) params['limit'] = format_bytes(self.ByteMax) params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s, %(limit)s (%(new_maxrate)s avg)" % \ params # Cap low bandwidth burst rate if new_maxrate != maxrate: message += template % params print "%(slice)s %(class)s capped at %(new_maxrate)s " % params if exemptbytes >= (self.exemptbytes + self.ExemptByteThresh): new_maxexemptrate = \ (self.ExemptByteMax - self.ExemptByteThresh)/(period - (time.time() - self.time)) else: new_maxexemptrate = maxexemptrate # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(exemptbytes - self.exemptbytes) params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate) params['limit'] = format_bytes(self.ExemptByteMax) params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s, %(limit)s (%(new_maxrate)s avg)" % params # Cap high bandwidth burst rate if new_maxexemptrate != maxexemptrate: message += template % params print "%(slice)s %(class)s capped at %(new_maxexemptrate)s" % params # Apply parameters if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) # Notify slice if message: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params if debug: print subject print message + (footer % params) else: slicemail(self.name, subject, message + (footer % params)) def usage(): print """ Usage: %s [OPTIONS]... Options: -d, --debug Enable debugging (default: %s) -v, --verbose Increase verbosity level (default: %d) -f, --file=FILE Data file (default: %s) -s, --slice=SLICE Constrain monitoring to these slices (default: all) -p, --period=SECONDS Interval in seconds over which to enforce average byte limits (default: %s) -h, --help This message """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period)) def main(): # Defaults global debug, verbose, datafile, period, nm # All slices names = [] try: longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"] (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts) except getopt.GetoptError, err: print "Error: " + err.msg usage() sys.exit(1) for (opt, optval) in opts: if opt == "-d" or opt == "--debug": debug = True elif opt == "-v" or opt == "--verbose": verbose += 1 bwlimit.verbose = verbose - 1 elif opt == "-f" or opt == "--file": datafile = optval elif opt == "-s" or opt == "--slice": names.append(optval) elif opt == "-p" or opt == "--period": period = int(optval) else: usage() sys.exit(0) # Check if we are already running writepid("bwmon") if not debug: # Redirect stdout and stderr to syslog syslog.openlog("bwmon") sys.stdout = sys.stderr = Logger() try: f = open(datafile, "r+") if verbose: print "Loading %s" % datafile (version, slices) = pickle.load(f) f.close() # Check version of data file if version != "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $": print "Not using old version '%s' data file %s" % (version, datafile) raise Exception except Exception: version = "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $" slices = {} # Get special slice IDs root_xid = bwlimit.get_xid("root") default_xid = bwlimit.get_xid("default") #Open connection to Node Manager. Global. nm = NM() live = [] # Get actuall running values from tc. for params in bwlimit.get(): (xid, share, minrate, maxrate, minexemptrate, maxexemptrate, bytes, exemptbytes) = params live.append(xid) # Delete Me print("name %s , minrate %s, maxrate %s, minexemptrate %s, maxexemptrate %s, bytes %s, exemptbytes %s" % (bwlimit.get_slice(xid), minrate, maxrate, minexemptrate, maxexemptrate, bytes, exemptbytes)) # Ignore root and default buckets if xid == root_xid or xid == default_xid: continue name = bwlimit.get_slice(xid) if name is None: # Orphaned (not associated with a slice) class name = "%d?" % xid # Monitor only the specified slices if names and name not in names: continue #slices is populated from the pickle file #xid is populated from bwlimit (read from /etc/passwd) if slices.has_key(xid): slice = slices[xid] if time.time() >= (slice.time + period) or \ bytes < slice.bytes or exemptbytes < slice.exemptbytes: # Reset to defaults every 24 hours or if it appears # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). slice.reset(maxrate, maxexemptrate, bytes, exemptbytes) else: # Update byte counts slice.update(maxrate, maxexemptrate, bytes, exemptbytes) else: # New slice, initialize state slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes) # Delete dead slices dead = Set(slices.keys()) - Set(live) for xid in dead: del slices[xid] if verbose: print "Saving %s" % datafile f = open(datafile, "w") pickle.dump((version, slices), f) f.close() removepid("bwmon") if __name__ == '__main__': main()