#!/usr/bin/python # # Average bandwidth monitoring script. Run periodically via cron(8) to # enforce a soft limit on daily bandwidth usage for each slice. If a # slice is found to have exceeded its daily bandwidth usage when the # script is run, its instantaneous rate will be capped at the desired # average rate. Thus, in the worst case, a slice will only be able to # send a little more than twice its average daily limit. # # Two separate limits are enforced, one for destinations exempt from # the node bandwidth cap, and the other for all other destinations. # # Mark Huang # Andy Bavier # Faiyaz Ahmed # Copyright (C) 2004-2006 The Trustees of Princeton University # # $Id: bwmon.py,v 1.6 2006/07/10 15:19:35 faiyaza Exp $ # import syslog import os import sys import getopt import time import pickle import socket import xmlrpclib import bwlimit from sets import Set # Utility functions from pl_mom import * # Constants seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 # Defaults debug = False verbose = 0 datafile = "/var/lib/misc/bwmon.dat" nm = None default_maxrate = bwlimit.get_bwcap() default_maxexemptrate = bwlimit.bwmax # 500 Kbit or 5.4 GB per day default_avgrate = 500000 # 1.5 Mbit or 16.4 GB per day default_avgexemptrate = 1500000 # Average over 1 day period = 1 * seconds_per_day # Message template template = \ """ The slice %(slice)s has transmitted more than %(bytes)s from %(hostname)s to %(class)s destinations since %(since)s. Its maximum %(class)s burst rate will be capped at %(avgrate)s until %(until)s. Please reduce the average %(class)s transmission rate of the slice to %(avgrate)s, or %(limit)s per %(period)s. """.lstrip() footer = \ """ %(date)s %(hostname)s bwcap %(slice)s """.lstrip() class Slice: """ Stores the last recorded bandwidth parameters of a slice. xid - slice context/VServer ID name - slice name time - beginning of recording period in UNIX seconds bytes - low bandwidth bytes transmitted at the beginning of the recording period exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F) avgrate - average low bandwidth rate to enforce over the recording period avgexemptrate - average high bandwidth rate to enforce over the recording period (for I2 -F) last_avgrate - last recorded avgrate from NM last_maxrate - last recorded maxrate from NM last_avgexemptrate - last recorded avgexemptrate from NM last_maxexemptrate - last recorded maxexemptrate from NM """ def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes): self.xid = xid self.name = name self._maxrate = default_maxrate self.last_maxrate = default_maxrate self.avgrate = default_avgrate self.last_avgrate = default_avgrate self.avgexemptrate = default_avgexemptrate self.last_avgexemptrate = default_avgexemptrate self.maxexemptrate = default_maxexemptrate self.last_maxexemptrate = default_maxexemptrate self.reset(maxrate, maxexemptrate, bytes, exemptbytes) def __repr__(self): return self.name def reset(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ # Reset baseline time self.time = time.time() # Reset baseline byte coutns self.bytes = bytes self.exemptbytes = exemptbytes #If NM except"ns below, and new_max* doesn't get set, use last. new_maxrate = self.last_maxrate new_maxexemptrate = self.last_maxexemptrate # Query Node Manager for max rate overrides try: vals = nm.query(self.name, [('nm_net_max_rate', self.last_maxrate), ('nm_net_max_exempt_rate', self.last_maxexemptrate), ('nm_net_avg_rate', self.last_avgrate), ('nm_net_avg_exempt_rate', self.last_avgexemptrate)]) (new_maxrate, new_maxexemptrate, self.last_avgrate, self.last_avgexemptrate) = vals #If NM is alive, and there is a cap, update new self.last_maxrate = new_maxrate self.last_maxexemptrate = new_maxexemptrate except Exception, err: print "Warning: Exception received while querying NM:", err if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: print "%s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(new_maxrate), bwlimit.format_tc_rate(new_maxexemptrate)) bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) def update(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Update byte counts and check if average rates have been exceeded. In the worst case (instantaneous usage of the entire average daily byte limit at the beginning of the recording period), the slice will be immediately capped and will get to send twice the average daily byte limit. In the common case, it will get to send slightly more than the average daily byte limit. """ # Query Node Manager for max average rate overrides try: (self.avgrate, self.avgexemptrate) = nm.query(self.name, [('nm_net_avg_rate', self.last_avgrate), ('nm_net_avg_exempt_rate', self.last_avgexemptrate)]) #If NM is alive, and there is a cap, update new self.last_avgexemptrate = self.avgexemptrate self.last_avgrate = self.avgrate except Exception, err: print "Warning: Exception received while querying NM:", err # Prepare message parameters from the template message = "" params = {'slice': self.name, 'hostname': socket.gethostname(), 'since': time.asctime(time.gmtime(self.time)) + " GMT", 'until': time.asctime(time.gmtime(self.time + period)) + " GMT", 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} bytelimit = self.avgrate * period / bits_per_byte if bytes >= (self.bytes + bytelimit) and \ maxrate > self.avgrate: new_maxrate = self.avgrate else: new_maxrate = maxrate # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(bytes - self.bytes) params['maxrate'] = bwlimit.format_tc_rate(maxrate) params['limit'] = format_bytes(bytelimit) params['avgrate'] = bwlimit.format_tc_rate(self.avgrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s, %(limit)s (%(maxrate)s max/%(avgrate)s avg)" % \ params # Cap low bandwidth burst rate if new_maxrate != maxrate: message += template % params print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params exemptbytelimit = self.avgexemptrate * period / bits_per_byte if exemptbytes >= (self.exemptbytes + exemptbytelimit) and \ maxexemptrate > self.avgexemptrate: new_maxexemptrate = self.avgexemptrate else: new_maxexemptrate = maxexemptrate # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(exemptbytes - self.exemptbytes) params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate) params['limit'] = format_bytes(exemptbytelimit) params['avgrate'] = bwlimit.format_tc_rate(self.avgexemptrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s, %(limit)s (%(maxrate)s max /%(avgrate)s avg)" % \ params # Cap high bandwidth burst rate if new_maxexemptrate != maxexemptrate: message += template % params print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params # Apply parameters if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) # Notify slice if message: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params if debug: print subject print message + (footer % params) else: slicemail(self.name, subject, message + (footer % params)) def usage(): print """ Usage: %s [OPTIONS]... Options: -d, --debug Enable debugging (default: %s) -v, --verbose Increase verbosity level (default: %d) -f, --file=FILE Data file (default: %s) -s, --slice=SLICE Constrain monitoring to these slices (default: all) -p, --period=SECONDS Interval in seconds over which to enforce average byte limits (default: %s) -h, --help This message """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period)) def main(): # Defaults global debug, verbose, datafile, period, nm # All slices names = [] try: longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"] (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts) except getopt.GetoptError, err: print "Error: " + err.msg usage() sys.exit(1) for (opt, optval) in opts: if opt == "-d" or opt == "--debug": debug = True elif opt == "-v" or opt == "--verbose": verbose += 1 bwlimit.verbose = verbose - 1 elif opt == "-f" or opt == "--file": datafile = optval elif opt == "-s" or opt == "--slice": names.append(optval) elif opt == "-p" or opt == "--period": period = int(optval) else: usage() sys.exit(0) # Check if we are already running writepid("bwmon") if not debug: # Redirect stdout and stderr to syslog syslog.openlog("bwmon") sys.stdout = sys.stderr = Logger() try: f = open(datafile, "r+") if verbose: print "Loading %s" % datafile (version, slices) = pickle.load(f) f.close() # Check version of data file if version != "$Id: bwmon.py,v 1.6 2006/07/10 15:19:35 faiyaza Exp $": print "Not using old version '%s' data file %s" % (version, datafile) raise Exception except Exception: version = "$Id: bwmon.py,v 1.6 2006/07/10 15:19:35 faiyaza Exp $" slices = {} # Get special slice IDs root_xid = bwlimit.get_xid("root") default_xid = bwlimit.get_xid("default") #Open connection to Node Manager nm = NM() live = [] for params in bwlimit.get(): (xid, share, minrate, maxrate, minexemptrate, maxexemptrate, bytes, exemptbytes) = params live.append(xid) # Ignore root and default buckets if xid == root_xid or xid == default_xid: continue name = bwlimit.get_slice(xid) if name is None: # Orphaned (not associated with a slice) class name = "%d?" % xid # Monitor only the specified slices if names and name not in names: continue #slices is populated from the pickle file #xid is populated from bwlimit (read from /etc/passwd) if slices.has_key(xid): slice = slices[xid] if time.time() >= (slice.time + period) or \ bytes < slice.bytes or exemptbytes < slice.exemptbytes: # Reset to defaults every 24 hours or if it appears # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). slice.reset(maxrate, maxexemptrate, bytes, exemptbytes) else: # Update byte counts slice.update(maxrate, maxexemptrate, bytes, exemptbytes) else: # New slice, initialize state slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes) # Delete dead slices dead = Set(slices.keys()) - Set(live) for xid in dead: del slices[xid] if verbose: print "Saving %s" % datafile f = open(datafile, "w") pickle.dump((version, slices), f) f.close() removepid("bwmon") if __name__ == '__main__': main()