From 3c1e167777c2bc313562a5bdc67aa78525b10c38 Mon Sep 17 00:00:00 2001 From: Mark Huang Date: Tue, 25 Apr 2006 14:40:28 +0000 Subject: [PATCH] Rewrite bandwidth monitoring to use bwlimit.py module and to manage exempt limits as well --- BandwidthMonitor.py | 450 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 450 insertions(+) create mode 100755 BandwidthMonitor.py diff --git a/BandwidthMonitor.py b/BandwidthMonitor.py new file mode 100755 index 0000000..c7a4148 --- /dev/null +++ b/BandwidthMonitor.py @@ -0,0 +1,450 @@ +#!/usr/bin/python +# +# Average bandwidth monitoring script. Run periodically via cron(8) to +# enforce a soft limit on daily bandwidth usage for each slice. If a +# slice is found to have exceeded its daily bandwidth usage when the +# script is run, its instantaneous rate will be capped at the desired +# average rate. Thus, in the worst case, a slice will only be able to +# send a little more than twice its average daily limit. +# +# Two separate limits are enforced, one for destinations exempt from +# the node bandwidth cap, and the other for all other destinations. +# +# Mark Huang +# Andy Bavier +# Copyright (C) 2004-2006 The Trustees of Princeton University +# +# $Id$ +# + +import syslog +import os +import sys +import getopt +import time +import pickle + +import socket +import xmlrpclib +import bwlimit + +from sets import Set + +# /etc/planetlab/plc_config.py is a Python fragment maintained by +# PlanetLabConf that contains PLC configuration variables. +try: + sys.path.append("/etc/planetlab") + from plc_config import * +except: + print "Warning: Configuration file /etc/planetlab/plc_config.py not found" + PLC_NAME = "PlanetLab" + PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org" + PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org" + +# Constants +seconds_per_day = 24 * 60 * 60 +bits_per_byte = 8 + +# Defaults +debug = False +verbose = 0 +datafile = "/var/lib/misc/BandwidthMonitor.dat" +nm = None + +default_maxrate = bwlimit.get_bwcap() + +default_maxexemptrate = bwlimit.bwmax + +# 500 Kbit or 5.4 GB per day +default_avgrate = 500000 + +# 1.5 Mbit or 16.4 GB per day +default_avgexemptrate = 1500000 + +# Average over 1 day +period = 1 * seconds_per_day + +# Message template +template = \ +""" +The slice %(slice)s has transmitted more than %(bytes)s from +%(hostname)s to %(class)s destinations +since %(since)s. +Its maximum %(class)s burst rate will be capped at %(avgrate)s +until %(until)s. +Please reduce the average %(class)s transmission rate +of the slice to %(avgrate)s, or %(limit)s per %(period)s. + +""".lstrip() + +footer = \ +""" +%(date)s %(hostname)s bwcap %(slice)s +""".lstrip() + +def format_bytes(bytes): + """ + Formats bytes into a string + """ + + if bytes >= 1000000000: + return "%.1f GB" % (bytes / 1000000000.) + elif bytes >= 1000000: + return "%.1f MB" % (bytes / 1000000.) + elif bytes >= 1000: + return "%.1f KB" % (bytes / 1000.) + else: + return "%.0f bytes" % bytes + +def format_period(seconds): + """ + Formats a period in seconds into a string. + """ + + if seconds == (24 * 60 * 60): + return "day" + elif seconds == (60 * 60): + return "hour" + elif seconds > (24 * 60 * 60): + return "%.1f days" % (seconds / 24. / 60. / 60.) + elif seconds > (60 * 60): + return "%.1f hours" % (seconds / 60. / 60.) + elif seconds > (60): + return "%.1f minutes" % (seconds / 60.) + else: + return "%.0f seconds" % seconds + +class Slice: + """ + Stores the last recorded bandwidth parameters of a slice. + + xid - slice context/VServer ID + name - slice name + time - beginning of recording period in UNIX seconds + bytes - low bandwidth bytes transmitted at the beginning of the recording period + exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period + avgrate - average low bandwidth rate to enforce over the recording period + avgexemptrate - average high bandwidth rate to enforce over the recording period + """ + + def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes): + self.xid = xid + self.name = name + self.reset(maxrate, maxexemptrate, bytes, exemptbytes) + + def __repr__(self): + return self.name + + def query(self, attributes): + """ + Get values of various slice attributes from the Node Manager + """ + values = [None for attribute in attributes] + + if nm is not None: + try: + # Read rspec (the NM hash code for the slice) + rcap = open("/var/run/pl_nm/%s.vm_rcap" % self.name, "r") + rspec = rcap.readline().strip() + rcap.close() + + for i, attribute in enumerate(attributes): + # NM interface allows you to pass in a tuple + # (attribute, default) instead of just an + # attribute name. default is returned if the + # attribute is not set. + (rc, (value,)) = nm.nm_inspect(rspec, attribute) + if type(attribute) == tuple: + default = attribute[1] + else: + default = 0 + if rc == 0 and value != default: + values[i] = value + except Exception, err: + print "Warning: Exception received while querying Node Manager:", err + + return values + + def reset(self, maxrate, maxexemptrate, bytes, exemptbytes): + """ + Begin a new recording period. Remove caps by restoring limits + to their default values. + """ + + # Reset baseline time + self.time = time.time() + + # Reset baseline byte coutns + self.bytes = bytes + self.exemptbytes = exemptbytes + + # Query Node Manager for max rate overrides + (new_maxrate, new_maxexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate']) + if new_maxrate is not None: + new_maxrate *= 1000 + else: + new_maxrate = default_maxrate + if new_maxexemptrate is not None: + new_maxexemptrate *= 1000 + else: + new_maxexemptrate = default_maxexemptrate + + if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: + print "%s reset to %s/%s" % \ + (self.name, + bwlimit.format_tc_rate(new_maxrate), + bwlimit.format_tc_rate(new_maxexemptrate)) + bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) + + def update(self, maxrate, maxexemptrate, bytes, exemptbytes): + """ + Update byte counts and check if average rates have been + exceeded. In the worst case (instantaneous usage of the entire + average daily byte limit at the beginning of the recording + period), the slice will be immediately capped and will get to + send twice the average daily byte limit. In the common case, + it will get to send slightly more than the average daily byte + limit. + """ + + # Query Node Manager for max average rate overrides + (self.avgrate, self.avgexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate']) + if self.avgrate is None: + self.avgrate = default_avgrate + if self.avgexemptrate is None: + self.avgexemptrate = default_avgexemptrate + + # Prepare message parameters from the template + message = "" + params = {'slice': self.name, 'hostname': socket.gethostname(), + 'since': time.asctime(time.gmtime(self.time)) + " GMT", + 'until': time.asctime(time.gmtime(self.time + period)) + " GMT", + 'date': time.asctime(time.gmtime()) + " GMT", + 'period': format_period(period)} + + bytelimit = self.avgrate * period / bits_per_byte + if bytes >= (self.bytes + bytelimit) and \ + maxrate > self.avgrate: + new_maxrate = self.avgrate + else: + new_maxrate = maxrate + + # Format template parameters for low bandwidth message + params['class'] = "low bandwidth" + params['bytes'] = format_bytes(bytes - self.bytes) + params['maxrate'] = bwlimit.format_tc_rate(maxrate) + params['limit'] = format_bytes(bytelimit) + params['avgrate'] = bwlimit.format_tc_rate(self.avgrate) + + if verbose: + print "%(slice)s %(class)s " \ + "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \ + params + + # Cap low bandwidth burst rate + if new_maxrate != maxrate: + message += template % params + print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params + + exemptbytelimit = self.avgexemptrate * period / bits_per_byte + if exemptbytes >= (self.exemptbytes + exemptbytelimit) and \ + maxexemptrate > self.avgexemptrate: + new_maxexemptrate = self.avgexemptrate + else: + new_maxexemptrate = maxexemptrate + + # Format template parameters for high bandwidth message + params['class'] = "high bandwidth" + params['bytes'] = format_bytes(exemptbytes - self.exemptbytes) + params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate) + params['limit'] = format_bytes(exemptbytelimit) + params['avgrate'] = bwlimit.format_tc_rate(self.avgexemptrate) + + if verbose: + print "%(slice)s %(class)s " \ + "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \ + params + + # Cap high bandwidth burst rate + if new_maxexemptrate != maxexemptrate: + message += template % params + print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params + + # Apply parameters + if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate: + bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate) + + # Notify slice + if message: + params['from'] = "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS) + params['to'] = PLC_MAIL_SLICE_ADDRESS.replace("SLICE", self.name) + # PLC has a separate list for pl_mom messages + if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org": + params['cc'] = "pl-mom@planet-lab.org" + else: + params['cc'] = PLC_MAIL_SUPPORT_ADDRESS + params['version'] = sys.version.split(" ")[0] + + if debug: + sendmail = sys.stdout + else: + sendmail = os.popen("/usr/sbin/sendmail -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w") + + # Write headers + sendmail.write( +""" +Content-type: text/plain +From: %(from)s +Reply-To: %(from)s +To: %(to)s +Cc: %(cc)s +X-Mailer: Python/%(version)s +Subject: pl_mom capped bandwidth of slice %(slice)s on %(hostname)s + +""".lstrip() % params) + + # Write body + sendmail.write(message) + + # Write footer + sendmail.write(footer % params) + + if sendmail != sys.stdout: + sendmail.close() + +class Logger: + """ + Simple file-like class for redirecting stdout and stderr to /var/log/messages + """ + def write(self, text): + text = text.strip() + if text: + syslog.syslog(text) + +def usage(): + print """ +Usage: %s [OPTIONS]... + +Options: + -d, --debug Enable debugging (default: %s) + -v, --verbose Increase verbosity level (default: %d) + -f, --file=FILE Data file (default: %s) + -s, --slice=SLICE Constrain monitoring to these slices (default: all) + -p, --period=SECONDS Interval in seconds over which to enforce average byte limits (default: %s) + -h, --help This message +""".lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period)) + +def main(): + # Defaults + global debug, verbose, datafile, period, nm + # All slices + names = [] + + try: + longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"] + (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts) + except getopt.GetoptError, err: + print "Error: " + err.msg + usage() + sys.exit(1) + + for (opt, optval) in opts: + if opt == "-d" or opt == "--debug": + debug = True + elif opt == "-v" or opt == "--verbose": + verbose += 1 + bwlimit.verbose = verbose - 1 + elif opt == "-f" or opt == "--file": + datafile = optval + elif opt == "-s" or opt == "--slice": + names.append(optval) + elif opt == "-p" or opt == "--period": + period = int(optval) + else: + usage() + sys.exit(0) + + # Redirect stdout and stderr to syslog + if not debug: + syslog.openlog("pl_mom") + sys.stdout = sys.stderr = Logger() + + try: + if debug: + print "Loading %s" % datafile + f = open(datafile, "r+") + (version, slices) = pickle.load(f) + f.close() + # Check version of data file + if version != "$Id$": + print "Not using old version '%s' data file %s" % (version, datafile) + raise Exception + except Exception: + version = "$Id$" + slices = {} + + # Get special slice IDs + root_xid = bwlimit.get_xid("root") + default_xid = bwlimit.get_xid("default") + + # Open connection to Node Manager + socket.setdefaulttimeout(10) + try: + nm = xmlrpclib.ServerProxy("http://localhost:812/") + except Exception, err: + print "Warning: Exception received while opening connection to Node Manager:", err + nm = None + + live = [] + for params in bwlimit.get(): + (xid, share, + minrate, maxrate, + minexemptrate, maxexemptrate, + bytes, exemptbytes) = params + live.append(xid) + + # Ignore root and default buckets + if xid == root_xid or xid == default_xid: + continue + + name = bwlimit.get_slice(xid) + if name is None: + # Orphaned (not associated with a slice) class + name = "%d?" % xid + + # Monitor only the specified slices + if names and name not in names: + continue + + if slices.has_key(xid): + slice = slices[xid] + if time.time() >= (slice.time + period) or \ + bytes < slice.bytes or exemptbytes < slice.exemptbytes: + # Reset to defaults every 24 hours or if it appears + # that the byte counters have overflowed (or, more + # likely, the node was restarted or the HTB buckets + # were re-initialized). + slice.reset(maxrate, maxexemptrate, bytes, exemptbytes) + else: + # Update byte counts + slice.update(maxrate, maxexemptrate, bytes, exemptbytes) + else: + # New slice, initialize state + slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes) + + # Delete dead slices + dead = Set(slices.keys()) - Set(live) + for xid in dead: + del slices[xid] + + # Close connection to Node Manager + nm.close() + + if debug: + print "Saving %s" % datafile + f = open(datafile, "w") + pickle.dump((version, slices), f) + f.close() + +if __name__ == '__main__': + main() -- 2.43.0