Rewrite bandwidth monitoring to use bwlimit.py module and to manage exempt limits...
[mom.git] / BandwidthMonitor.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Copyright (C) 2004-2006 The Trustees of Princeton University
16 #
17 # $Id$
18 #
19
20 import syslog
21 import os
22 import sys
23 import getopt
24 import time
25 import pickle
26
27 import socket
28 import xmlrpclib
29 import bwlimit
30
31 from sets import Set
32
33 # /etc/planetlab/plc_config.py is a Python fragment maintained by
34 # PlanetLabConf that contains PLC configuration variables.
35 try:
36     sys.path.append("/etc/planetlab")
37     from plc_config import *
38 except:
39     print "Warning: Configuration file /etc/planetlab/plc_config.py not found"
40     PLC_NAME = "PlanetLab"
41     PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
42     PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
43
44 # Constants
45 seconds_per_day = 24 * 60 * 60
46 bits_per_byte = 8
47
48 # Defaults
49 debug = False
50 verbose = 0
51 datafile = "/var/lib/misc/BandwidthMonitor.dat"
52 nm = None
53
54 default_maxrate = bwlimit.get_bwcap()
55
56 default_maxexemptrate = bwlimit.bwmax
57
58 # 500 Kbit or 5.4 GB per day
59 default_avgrate = 500000
60
61 # 1.5 Mbit or 16.4 GB per day
62 default_avgexemptrate = 1500000
63
64 # Average over 1 day
65 period = 1 * seconds_per_day
66
67 # Message template
68 template = \
69 """
70 The slice %(slice)s has transmitted more than %(bytes)s from
71 %(hostname)s to %(class)s destinations
72 since %(since)s.
73 Its maximum %(class)s burst rate will be capped at %(avgrate)s
74 until %(until)s.
75 Please reduce the average %(class)s transmission rate
76 of the slice to %(avgrate)s, or %(limit)s per %(period)s.
77
78 """.lstrip()
79
80 footer = \
81 """
82 %(date)s %(hostname)s bwcap %(slice)s
83 """.lstrip()
84
85 def format_bytes(bytes):
86     """
87     Formats bytes into a string
88     """
89
90     if bytes >= 1000000000:
91         return "%.1f GB" % (bytes / 1000000000.)
92     elif bytes >= 1000000:
93         return "%.1f MB" % (bytes / 1000000.)
94     elif bytes >= 1000:
95         return "%.1f KB" % (bytes / 1000.)
96     else:
97         return "%.0f bytes" % bytes
98
99 def format_period(seconds):
100     """
101     Formats a period in seconds into a string.
102     """
103
104     if seconds == (24 * 60 * 60):
105         return "day"
106     elif seconds == (60 * 60):
107         return "hour"
108     elif seconds > (24 * 60 * 60):
109         return "%.1f days" % (seconds / 24. / 60. / 60.)
110     elif seconds > (60 * 60):
111         return "%.1f hours" % (seconds / 60. / 60.)
112     elif seconds > (60):
113         return "%.1f minutes" % (seconds / 60.)
114     else:
115         return "%.0f seconds" % seconds
116
117 class Slice:
118     """
119     Stores the last recorded bandwidth parameters of a slice.
120
121     xid - slice context/VServer ID
122     name - slice name
123     time - beginning of recording period in UNIX seconds
124     bytes - low bandwidth bytes transmitted at the beginning of the recording period
125     exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period
126     avgrate - average low bandwidth rate to enforce over the recording period
127     avgexemptrate - average high bandwidth rate to enforce over the recording period 
128     """
129
130     def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
131         self.xid = xid
132         self.name = name
133         self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
134
135     def __repr__(self):
136         return self.name
137
138     def query(self, attributes):
139         """
140         Get values of various slice attributes from the Node Manager
141         """
142         values = [None for attribute in attributes]
143
144         if nm is not None:
145             try:
146                 # Read rspec (the NM hash code for the slice)
147                 rcap = open("/var/run/pl_nm/%s.vm_rcap" % self.name, "r")
148                 rspec = rcap.readline().strip()
149                 rcap.close()
150
151                 for i, attribute in enumerate(attributes):
152                     # NM interface allows you to pass in a tuple
153                     # (attribute, default) instead of just an
154                     # attribute name. default is returned if the
155                     # attribute is not set.
156                     (rc, (value,)) = nm.nm_inspect(rspec, attribute)
157                     if type(attribute) == tuple:
158                         default = attribute[1]
159                     else:
160                         default = 0
161                     if rc == 0 and value != default:
162                         values[i] = value
163             except Exception, err:
164                 print "Warning: Exception received while querying Node Manager:", err
165
166         return values
167
168     def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
169         """
170         Begin a new recording period. Remove caps by restoring limits
171         to their default values.
172         """
173
174         # Reset baseline time
175         self.time = time.time()
176
177         # Reset baseline byte coutns
178         self.bytes = bytes
179         self.exemptbytes = exemptbytes
180
181         # Query Node Manager for max rate overrides
182         (new_maxrate, new_maxexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate'])
183         if new_maxrate is not None:
184             new_maxrate *= 1000
185         else:
186             new_maxrate = default_maxrate
187         if new_maxexemptrate is not None:
188             new_maxexemptrate *= 1000
189         else:
190             new_maxexemptrate = default_maxexemptrate
191
192         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
193             print "%s reset to %s/%s" % \
194                   (self.name,
195                    bwlimit.format_tc_rate(new_maxrate),
196                    bwlimit.format_tc_rate(new_maxexemptrate))
197             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
198
199     def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
200         """
201         Update byte counts and check if average rates have been
202         exceeded. In the worst case (instantaneous usage of the entire
203         average daily byte limit at the beginning of the recording
204         period), the slice will be immediately capped and will get to
205         send twice the average daily byte limit. In the common case,
206         it will get to send slightly more than the average daily byte
207         limit.
208         """
209
210         # Query Node Manager for max average rate overrides
211         (self.avgrate, self.avgexemptrate) = self.query(['nm_net_max_rate', 'nm_net_max_exempt_rate'])
212         if self.avgrate is None:
213             self.avgrate = default_avgrate
214         if self.avgexemptrate is None:
215             self.avgexemptrate = default_avgexemptrate
216
217         # Prepare message parameters from the template
218         message = ""
219         params = {'slice': self.name, 'hostname': socket.gethostname(),
220                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
221                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
222                   'date': time.asctime(time.gmtime()) + " GMT",
223                   'period': format_period(period)} 
224
225         bytelimit = self.avgrate * period / bits_per_byte
226         if bytes >= (self.bytes + bytelimit) and \
227            maxrate > self.avgrate:
228             new_maxrate = self.avgrate
229         else:
230             new_maxrate = maxrate
231
232         # Format template parameters for low bandwidth message
233         params['class'] = "low bandwidth"
234         params['bytes'] = format_bytes(bytes - self.bytes)
235         params['maxrate'] = bwlimit.format_tc_rate(maxrate)
236         params['limit'] = format_bytes(bytelimit)
237         params['avgrate'] = bwlimit.format_tc_rate(self.avgrate)
238
239         if verbose:
240             print "%(slice)s %(class)s " \
241                   "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \
242                   params
243
244         # Cap low bandwidth burst rate
245         if new_maxrate != maxrate:
246             message += template % params
247             print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params
248
249         exemptbytelimit = self.avgexemptrate * period / bits_per_byte
250         if exemptbytes >= (self.exemptbytes + exemptbytelimit) and \
251            maxexemptrate > self.avgexemptrate:
252             new_maxexemptrate = self.avgexemptrate
253         else:
254             new_maxexemptrate = maxexemptrate
255
256         # Format template parameters for high bandwidth message
257         params['class'] = "high bandwidth"
258         params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
259         params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
260         params['limit'] = format_bytes(exemptbytelimit)
261         params['avgrate'] = bwlimit.format_tc_rate(self.avgexemptrate)
262
263         if verbose:
264             print "%(slice)s %(class)s " \
265                   "%(bytes)s/%(limit)s (%(maxrate)s/%(avgrate)s)" % \
266                   params
267
268         # Cap high bandwidth burst rate
269         if new_maxexemptrate != maxexemptrate:
270             message += template % params
271             print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params
272
273         # Apply parameters
274         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
275             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
276
277         # Notify slice
278         if message:
279             params['from'] = "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS)
280             params['to'] = PLC_MAIL_SLICE_ADDRESS.replace("SLICE", self.name)
281             # PLC has a separate list for pl_mom messages
282             if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
283                 params['cc'] = "pl-mom@planet-lab.org"
284             else:
285                 params['cc'] = PLC_MAIL_SUPPORT_ADDRESS
286             params['version'] = sys.version.split(" ")[0]
287
288             if debug:
289                 sendmail = sys.stdout
290             else:
291                 sendmail = os.popen("/usr/sbin/sendmail -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
292
293             # Write headers
294             sendmail.write(
295 """
296 Content-type: text/plain
297 From: %(from)s
298 Reply-To: %(from)s
299 To: %(to)s
300 Cc: %(cc)s
301 X-Mailer: Python/%(version)s
302 Subject: pl_mom capped bandwidth of slice %(slice)s on %(hostname)s
303
304 """.lstrip() % params)
305
306             # Write body
307             sendmail.write(message)
308
309             # Write footer
310             sendmail.write(footer % params)
311
312             if sendmail != sys.stdout:
313                 sendmail.close()
314
315 class Logger:
316     """
317     Simple file-like class for redirecting stdout and stderr to /var/log/messages
318     """
319     def write(self, text):
320         text = text.strip()
321         if text:
322             syslog.syslog(text)
323
324 def usage():
325     print """
326 Usage: %s [OPTIONS]...
327
328 Options:
329         -d, --debug             Enable debugging (default: %s)
330         -v, --verbose           Increase verbosity level (default: %d)
331         -f, --file=FILE         Data file (default: %s)
332         -s, --slice=SLICE       Constrain monitoring to these slices (default: all)
333         -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
334         -h, --help              This message
335 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
336
337 def main():
338     # Defaults
339     global debug, verbose, datafile, period, nm
340     # All slices
341     names = []
342
343     try:
344         longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
345         (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
346     except getopt.GetoptError, err:
347         print "Error: " + err.msg
348         usage()
349         sys.exit(1)
350
351     for (opt, optval) in opts:
352         if opt == "-d" or opt == "--debug":
353             debug = True
354         elif opt == "-v" or opt == "--verbose":
355             verbose += 1
356             bwlimit.verbose = verbose - 1
357         elif opt == "-f" or opt == "--file":
358             datafile = optval
359         elif opt == "-s" or opt == "--slice":
360             names.append(optval)
361         elif opt == "-p" or opt == "--period":
362             period = int(optval)
363         else:
364             usage()
365             sys.exit(0)
366
367     # Redirect stdout and stderr to syslog
368     if not debug:
369         syslog.openlog("pl_mom")
370         sys.stdout = sys.stderr = Logger()
371
372     try:
373         if debug:
374             print "Loading %s" % datafile
375         f = open(datafile, "r+")
376         (version, slices) = pickle.load(f)
377         f.close()
378         # Check version of data file
379         if version != "$Id$":
380             print "Not using old version '%s' data file %s" % (version, datafile)
381             raise Exception
382     except Exception:
383         version = "$Id$"
384         slices = {}
385
386     # Get special slice IDs
387     root_xid = bwlimit.get_xid("root")
388     default_xid = bwlimit.get_xid("default")
389
390     # Open connection to Node Manager
391     socket.setdefaulttimeout(10)
392     try:
393         nm = xmlrpclib.ServerProxy("http://localhost:812/")
394     except Exception, err:
395         print "Warning: Exception received while opening connection to Node Manager:", err
396         nm = None
397
398     live = []
399     for params in bwlimit.get():
400         (xid, share,
401          minrate, maxrate,
402          minexemptrate, maxexemptrate,
403          bytes, exemptbytes) = params
404         live.append(xid)
405
406         # Ignore root and default buckets
407         if xid == root_xid or xid == default_xid:
408             continue
409
410         name = bwlimit.get_slice(xid)
411         if name is None:
412             # Orphaned (not associated with a slice) class
413             name = "%d?" % xid
414
415         # Monitor only the specified slices
416         if names and name not in names:
417             continue
418
419         if slices.has_key(xid):
420             slice = slices[xid]
421             if time.time() >= (slice.time + period) or \
422                bytes < slice.bytes or exemptbytes < slice.exemptbytes:
423                 # Reset to defaults every 24 hours or if it appears
424                 # that the byte counters have overflowed (or, more
425                 # likely, the node was restarted or the HTB buckets
426                 # were re-initialized).
427                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
428             else:
429                 # Update byte counts
430                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
431         else:
432             # New slice, initialize state
433             slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
434
435     # Delete dead slices
436     dead = Set(slices.keys()) - Set(live)
437     for xid in dead:
438         del slices[xid]
439
440     # Close connection to Node Manager
441     nm.close()
442
443     if debug:
444         print "Saving %s" % datafile
445     f = open(datafile, "w")
446     pickle.dump((version, slices), f)
447     f.close()
448
449 if __name__ == '__main__':
450     main()