#!/usr/bin/python # # Average bandwidth monitoring script. Run periodically via cron(8) to # enforce a soft limit on daily bandwidth usage for each slice. If a # slice is found to have exceeded its daily bandwidth usage when the # script is run, its instantaneous rate will be capped at the desired # average rate. Thus, in the worst case, a slice will only be able to # send a little more than twice its average daily limit. # # Two separate limits are enforced, one for destinations exempt from # the node bandwidth cap, and the other for all other destinations. # # Mark Huang # Andy Bavier # Copyright (C) 2004-2006 The Trustees of Princeton University # # $Id$ # import syslog import os import sys import getopt import time import pickle import socket import xmlrpclib import bwlimit from sets import Set # /etc/planetlab/plc_config.py is a Python fragment maintained by # PlanetLabConf that contains PLC configuration variables. try: sys.path.append("/etc/planetlab") from plc_config import * except: print "Warning: Configuration file /etc/planetlab/plc_config.py not found" PLC_NAME = "PlanetLab" PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org" PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org" # Constants seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 # Defaults debug = False verbose = 0 datafile = "/var/lib/misc/BandwidthMonitor.dat" nm = None default_maxrate = bwlimit.get_bwcap() default_maxexemptrate = bwlimit.bwmax # 500 Kbit or 5.4 GB per day default_avgrate = 500000 # 1.5 Mbit or 16.4 GB per day default_avgexemptrate = 1500000 # Average over 1 day period = 1 * seconds_per_day # Message template template = \ """ The slice %(slice)s has transmitted more than %(bytes)s from %(hostname)s to %(class)s destinations since %(since)s. Its maximum %(class)s burst rate will be capped at %(avgrate)s until %(until)s. Please reduce the average %(class)s transmission rate of the slice to %(avgrate)s, or %(limit)s per %(period)s. """.lstrip() footer = \ """ %(date)s %(hostname)s bwcap %(slice)s """.lstrip() def format_bytes(bytes): """ Formats bytes into a string """ if bytes >= 1000000000: return "%.1f GB" % (bytes / 1000000000.) elif bytes >= 1000000: return "%.1f MB" % (bytes / 1000000.) elif bytes >= 1000: return "%.1f KB" % (bytes / 1000.) else: return "%.0f bytes" % bytes def format_period(seconds): """ Formats a period in seconds into a string. """ if seconds == (24 * 60 * 60): return "day" elif seconds == (60 * 60): return "hour" elif seconds > (24 * 60 * 60): return "%.1f days" % (seconds / 24. / 60. / 60.) elif seconds > (60 * 60): return "%.1f hours" % (seconds / 60. / 60.) elif seconds > (60): return "%.1f minutes" % (seconds / 60.) else: return "%.0f seconds" % seconds class Slice: """ Stores the last recorded bandwidth parameters of a slice. xid - slice context/VServer ID name - slice name time - beginning of recording period in UNIX seconds bytes - low bandwidth bytes transmitted at the beginning of the recording period exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period avgrate - average low bandwidth rate to enforce over the recording period avgexemptrate - average high bandwidth rate to enforce over the recording period """ def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes): self.xid = xid self.name = name self.reset(maxrate, maxexemptrate, bytes, exemptbytes) def __repr__(self): return self.name def query(self, attributes): """ Get values of various slice attributes from the Node Manager """ values = [None for attribute in attributes] if nm is not None: try: # Read rspec (the NM hash code for the slice) rcap = open("/var/run/pl_nm/%s.vm_rcap" % self.name, "r") rspec = rcap.readline().strip() rcap.close() for i, attribute in enumerate(attributes): # NM interface allows you to pass in a tuple # (attribute, default) instead of just an # attribute name. default is returned if the # attribute is not set. (rc, (value,)) = nm.nm_inspect(rspec, attribute) if type(attribute) == tuple: default = attribute[1] else: default = 0 if rc == 0 and value != default: values[i] = value except Exception, err: print "Warning: Exception received while querying Node Manager:", err return values def reset(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ # Reset baseline time self.time = time.time() # Reset baseline byte coutns self.bytes = bytes self.exemptbytes = exemptbytes # Query Node Manager for max rate overrides (new_maxrate, new_maxexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate']) if new_maxrate is not None: new_maxrate *= 1000 else: new_maxrate = default_maxrate if new_maxexemptrate is not None: new_maxexemptrate *= 1000 else: new_maxexemptrate = default_maxexemptrate if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: print "%s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(new_maxrate), bwlimit.format_tc_rate(new_maxexemptrate)) bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) def update(self, maxrate, maxexemptrate, bytes, exemptbytes): """ Update byte counts and check if average rates have been exceeded. In the worst case (instantaneous usage of the entire average daily byte limit at the beginning of the recording period), the slice will be immediately capped and will get to send twice the average daily byte limit. In the common case, it will get to send slightly more than the average daily byte limit. """ # Query Node Manager for max average rate overrides (self.avgrate, self.avgexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate']) if self.avgrate is None: self.avgrate = default_avgrate if self.avgexemptrate is None: self.avgexemptrate = default_avgexemptrate # Prepare message parameters from the template message = "" params = {'slice': self.name, 'hostname': socket.gethostname(), 'since': time.asctime(time.gmtime(self.time)) + " GMT", 'until': time.asctime(time.gmtime(self.time + period)) + " GMT", 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} bytelimit = self.avgrate * period / bits_per_byte if bytes >= (self.bytes + bytelimit) and \ maxrate > self.avgrate: new_maxrate = self.avgrate else: new_maxrate = maxrate # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(bytes - self.bytes) params['maxrate'] = bwlimit.format_tc_rate(maxrate) params['limit'] = format_bytes(bytelimit) params['avgrate'] = bwlimit.format_tc_rate(self.avgrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \ params # Cap low bandwidth burst rate if new_maxrate != maxrate: message += template % params print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params exemptbytelimit = self.avgexemptrate * period / bits_per_byte if exemptbytes >= (self.exemptbytes + exemptbytelimit) and \ maxexemptrate > self.avgexemptrate: new_maxexemptrate = self.avgexemptrate else: new_maxexemptrate = maxexemptrate # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(exemptbytes - self.exemptbytes) params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate) params['limit'] = format_bytes(exemptbytelimit) params['avgrate'] = bwlimit.format_tc_rate(self.avgexemptrate) if verbose: print "%(slice)s %(class)s " \ "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \ params # Cap high bandwidth burst rate if new_maxexemptrate != maxexemptrate: message += template % params print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params # Apply parameters if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) # Notify slice if message: params['from'] = "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS) params['to'] = PLC_MAIL_SLICE_ADDRESS.replace("SLICE", self.name) # PLC has a separate list for pl_mom messages if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org": params['cc'] = "pl-mom@planet-lab.org" else: params['cc'] = PLC_MAIL_SUPPORT_ADDRESS params['version'] = sys.version.split(" ")[0] if debug: sendmail = sys.stdout else: sendmail = os.popen("/usr/sbin/sendmail -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w") # Write headers sendmail.write( """ Content-type: text/plain From: %(from)s Reply-To: %(from)s To: %(to)s Cc: %(cc)s X-Mailer: Python/%(version)s Subject: pl_mom capped bandwidth of slice %(slice)s on %(hostname)s """.lstrip() % params) # Write body sendmail.write(message) # Write footer sendmail.write(footer % params) if sendmail != sys.stdout: sendmail.close() class Logger: """ Simple file-like class for redirecting stdout and stderr to /var/log/messages """ def write(self, text): text = text.strip() if text: syslog.syslog(text) def usage(): print """ Usage: %s [OPTIONS]... Options: -d, --debug Enable debugging (default: %s) -v, --verbose Increase verbosity level (default: %d) -f, --file=FILE Data file (default: %s) -s, --slice=SLICE Constrain monitoring to these slices (default: all) -p, --period=SECONDS Interval in seconds over which to enforce average byte limits (default: %s) -h, --help This message """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period)) def main(): # Defaults global debug, verbose, datafile, period, nm # All slices names = [] try: longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"] (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts) except getopt.GetoptError, err: print "Error: " + err.msg usage() sys.exit(1) for (opt, optval) in opts: if opt == "-d" or opt == "--debug": debug = True elif opt == "-v" or opt == "--verbose": verbose += 1 bwlimit.verbose = verbose - 1 elif opt == "-f" or opt == "--file": datafile = optval elif opt == "-s" or opt == "--slice": names.append(optval) elif opt == "-p" or opt == "--period": period = int(optval) else: usage() sys.exit(0) # Redirect stdout and stderr to syslog if not debug: syslog.openlog("pl_mom") sys.stdout = sys.stderr = Logger() try: if debug: print "Loading %s" % datafile f = open(datafile, "r+") (version, slices) = pickle.load(f) f.close() # Check version of data file if version != "$Id$": print "Not using old version '%s' data file %s" % (version, datafile) raise Exception except Exception: version = "$Id$" slices = {} # Get special slice IDs root_xid = bwlimit.get_xid("root") default_xid = bwlimit.get_xid("default") # Open connection to Node Manager socket.setdefaulttimeout(10) try: nm = xmlrpclib.ServerProxy("http://localhost:812/") except Exception, err: print "Warning: Exception received while opening connection to Node Manager:", err nm = None live = [] for params in bwlimit.get(): (xid, share, minrate, maxrate, minexemptrate, maxexemptrate, bytes, exemptbytes) = params live.append(xid) # Ignore root and default buckets if xid == root_xid or xid == default_xid: continue name = bwlimit.get_slice(xid) if name is None: # Orphaned (not associated with a slice) class name = "%d?" % xid # Monitor only the specified slices if names and name not in names: continue if slices.has_key(xid): slice = slices[xid] if time.time() >= (slice.time + period) or \ bytes < slice.bytes or exemptbytes < slice.exemptbytes: # Reset to defaults every 24 hours or if it appears # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). slice.reset(maxrate, maxexemptrate, bytes, exemptbytes) else: # Update byte counts slice.update(maxrate, maxexemptrate, bytes, exemptbytes) else: # New slice, initialize state slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes) # Delete dead slices dead = Set(slices.keys()) - Set(live) for xid in dead: del slices[xid] # Close connection to Node Manager nm.close() if debug: print "Saving %s" % datafile f = open(datafile, "w") pickle.dump((version, slices), f) f.close() if __name__ == '__main__': main()