0af380da57d254c8682b892290ea90d952a2c622
[mom.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.19 2007/01/08 21:58:13 faiyaza Exp $
19 #
20
21 import syslog
22 import os
23 import sys
24 import getopt
25 import time
26 import pickle
27
28 import socket
29 import xmlrpclib
30 import bwlimit
31
32 from sets import Set
33
34 # Utility functions
35 from pl_mom import *
36
37 # Constants
38 seconds_per_day = 24 * 60 * 60
39 bits_per_byte = 8
40
41 # Defaults
42 debug = False
43 verbose = 0
44 datafile = "/var/lib/misc/bwmon.dat"
45 nm = None
46
47 # Burst to line rate (or node cap).  Set by NM.
48 default_maxrate = bwlimit.get_bwcap()
49 default_maxexemptrate = bwlimit.bwmax
50
51 # What we cap to when slices break the rules.
52 # 500 Kbit or 5.4 GB per day
53 #default_avgrate = 500000
54 # 1.5 Mbit or 16.4 GB per day
55 #default_avgexemptrate = 1500000
56
57 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G 
58 # 5.4 Gbyte per day max allowed transfered per recording period
59 default_ByteMax = 5798205850
60 default_ByteThresh = int(.8 * default_ByteMax) 
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_ExemptByteMax = 17609365914 
63 default_ExemptByteThresh = int(.8 * default_ExemptByteMax) 
64
65 default_MinRate = 8
66
67 # Average over 1 day
68 period = 1 * seconds_per_day
69
70 # Message template
71 template = \
72 """
73 The slice %(slice)s has transmitted more than %(bytes)s from
74 %(hostname)s to %(class)s destinations
75 since %(since)s.
76
77 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
78 until %(until)s.
79
80 Please reduce the average %(class)s transmission rate
81 of the slice to %(limit)s per %(period)s.
82
83 """.lstrip()
84
85 footer = \
86 """
87 %(date)s %(hostname)s bwcap %(slice)s
88 """.lstrip()
89
90 class Slice:
91     """
92     Stores the last recorded bandwidth parameters of a slice.
93
94     xid - slice context/VServer ID
95     name - slice name
96     time - beginning of recording period in UNIX seconds
97     bytes - low bandwidth bytes transmitted at the beginning of the recording period
98     exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
99     ByteMax - total volume of data allowed
100     ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
101     ExemptByteMax - Same as above, but for i2.
102     ExemptByteThresh - i2 ByteThresh
103     maxrate - max_rate slice attribute. 
104     maxexemptrate - max_exempt_rate slice attribute.
105     self.emailed = did we email during this recording period
106
107     """
108
109     def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
110         self.xid = xid
111         self.name = name
112         self.time = 0
113         self.bytes = 0
114         self.exemptbytes = 0
115         self.ByteMax = default_ByteMax
116         self.ByteThresh = default_ByteThresh
117         self.ExemptByteMax = default_ExemptByteMax
118         self.ExemptByteThresh = default_ExemptByteThresh
119         self.maxrate = default_maxrate
120         self.maxexemptrate = default_maxexemptrate
121         self.emailed = False
122
123         # Get real values where applicable
124         self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
125
126     def __repr__(self):
127         return self.name
128
129     def updateSliceAttributes(self):
130         # Query Node Manager for max rate overrides
131         try:
132             vals = nm.query(self.name, 
133                 [('nm_net_max_rate', self.maxrate),
134                 ('nm_net_max_exempt_rate', self.maxexemptrate),
135                 ("nm_net_max_byte", int(self.ByteMax / 1024)),
136                 ("nm_net_max_exempt_byte", int(self.ExemptByteMax / 1024)),
137                 ("nm_net_max_thresh_byte", int( .8 * self.ByteMax / 1024)),
138                 ("nm_net_max_thresh_exempt_byte", int(.8 * self.ExemptByteMax / 1024)),
139                 ("nm_net_avg_rate", 0),
140                 ("nm_net_avg_exempt_rate", 0)])
141
142             (self.maxrate,
143                 self.maxexemptrate,
144                 ByteMax,
145                 ExemptByteMax,
146                 ByteThresh,
147                 ExemptByteThresh,
148                 avgrate,
149                 avgexemptrate) = vals
150             
151             # The shitty bit.  Gotta bias the limits so as not to overflow xmlrpc
152             self.ByteMax = ByteMax * 1024 
153             self.ByteThresh = ByteThresh * 1024 
154             self.ExemptByteMax = ExemptByteMax * 1024 
155             self.ExemptByteThresh = ExemptByteThresh * 1024 
156
157             # The hack here is that when i pass 0 to the xmlrpc request to NM, 
158             # for rate limits and it comes back non zero, then screw the byte limits.
159             # Mult by the period and recompute the byte limits.  The thought is
160             # If/when PLC switches to byte limits, the avgrates wont be used as
161             # slice attributes and will return as 0
162             if (avgrate != 0):
163                 self.ByteMax = int(avgrate * period / 8) 
164                 self.ByteThresh = int(self.ByteMax * .8)
165
166             if (avgexemptrate != 0):
167                 self.ExemptByteMax = int(avgexemptrate * period / 8)
168                 self.ExemptByteThresh = int(self.ExemptByteMax * .8)
169
170         except Exception, err:
171             print "Warning: Exception received while querying NM:", err
172
173     def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
174         """
175         Begin a new recording period. Remove caps by restoring limits
176         to their default values.
177         """
178         
179         # Query Node Manager for max rate overrides
180         self.updateSliceAttributes()    
181
182         # Reset baseline time
183         self.time = time.time()
184
185         # Reset baseline byte coutns
186         self.bytes = bytes
187         self.exemptbytes = exemptbytes
188
189         # Reset email 
190         self.emailed = False
191
192         if (self.maxrate != maxrate) or (self.maxexemptrate != maxexemptrate):
193             print "%s reset to %s/%s" % \
194                   (self.name,
195                    bwlimit.format_tc_rate(self.maxrate),
196                    bwlimit.format_tc_rate(self.maxexemptrate))
197             bwlimit.set(xid = self.xid, maxrate = self.maxrate, maxexemptrate = self.maxexemptrate)
198
199     def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
200         """
201         Update byte counts and check if byte limits have been
202         exceeded. 
203         """
204     
205         # Query Node Manager for max rate overrides
206         self.updateSliceAttributes()    
207      
208         # Prepare message parameters from the template
209         message = ""
210         params = {'slice': self.name, 'hostname': socket.gethostname(),
211                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
212                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
213                   'date': time.asctime(time.gmtime()) + " GMT",
214                   'period': format_period(period)} 
215
216         if bytes >= (self.bytes + self.ByteThresh):
217             new_maxrate = \
218             int(((self.ByteMax - (bytes - self.bytes)) * 8)/(period - int(time.time() - self.time)))
219             if new_maxrate < default_MinRate:
220                 new_maxrate = default_MinRate
221         else:
222             new_maxrate = maxrate
223
224         # Format template parameters for low bandwidth message
225         params['class'] = "low bandwidth"
226         params['bytes'] = format_bytes(bytes - self.bytes)
227         params['maxrate'] = bwlimit.format_tc_rate(maxrate)
228         params['limit'] = format_bytes(self.ByteMax)
229         params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
230
231         if verbose:
232             print "%(slice)s %(class)s " \
233                   "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % \
234                   params
235
236         # Cap low bandwidth burst rate
237         if new_maxrate != maxrate:
238             message += template % params
239             print "%(slice)s %(class)s capped at %(new_maxrate)s/s " % params
240     
241         if exemptbytes >= (self.exemptbytes + self.ExemptByteThresh):
242             new_maxexemptrate = \
243             int(((self.ExemptByteMax - (self.bytes - bytes)) * 8)/(period - int(time.time() - self.time)))
244             if new_maxexemptrate < default_MinRate:
245                 new_maxexemptrate = default_MinRate
246         else:
247             new_maxexemptrate = maxexemptrate
248
249         # Format template parameters for high bandwidth message
250         params['class'] = "high bandwidth"
251         params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
252         params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
253         params['limit'] = format_bytes(self.ExemptByteMax)
254         params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
255
256         if verbose:
257             print "%(slice)s %(class)s " \
258                   "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params
259
260         # Cap high bandwidth burst rate
261         if new_maxexemptrate != maxexemptrate:
262             message += template % params
263             print "%(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params
264
265         # Apply parameters
266         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
267             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
268
269         # Notify slice
270         if message and self.emailed == False:
271             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
272             if debug:
273                 print subject
274                 print message + (footer % params)
275             else:
276                 self.emailed = True
277                 slicemail(self.name, subject, message + (footer % params))
278
279
280
281 def usage():
282     print """
283 Usage: %s [OPTIONS]...
284
285 Options:
286         -d, --debug            Enable debugging (default: %s)
287         -v, --verbose            Increase verbosity level (default: %d)
288         -f, --file=FILE            Data file (default: %s)
289         -s, --slice=SLICE        Constrain monitoring to these slices (default: all)
290         -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
291         -h, --help                This message
292 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
293
294 def main():
295     # Defaults
296     global debug, verbose, datafile, period, nm
297     # All slices
298     names = []
299
300     try:
301         longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
302         (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
303     except getopt.GetoptError, err:
304         print "Error: " + err.msg
305         usage()
306         sys.exit(1)
307
308     for (opt, optval) in opts:
309         if opt == "-d" or opt == "--debug":
310             debug = True
311         elif opt == "-v" or opt == "--verbose":
312             verbose += 1
313             bwlimit.verbose = verbose - 1
314         elif opt == "-f" or opt == "--file":
315             datafile = optval
316         elif opt == "-s" or opt == "--slice":
317             names.append(optval)
318         elif opt == "-p" or opt == "--period":
319             period = int(optval)
320         else:
321             usage()
322             sys.exit(0)
323
324     # Check if we are already running
325     writepid("bwmon")
326
327     if not debug:
328         # Redirect stdout and stderr to syslog
329         syslog.openlog("bwmon")
330         sys.stdout = sys.stderr = Logger()
331
332     try:
333         f = open(datafile, "r+")
334         if verbose:
335             print "Loading %s" % datafile
336         (version, slices) = pickle.load(f)
337         f.close()
338         # Check version of data file
339         if version != "$Id: bwmon.py,v 1.19 2007/01/08 21:58:13 faiyaza Exp $":
340             print "Not using old version '%s' data file %s" % (version, datafile)
341             raise Exception
342     except Exception:
343         version = "$Id: bwmon.py,v 1.19 2007/01/08 21:58:13 faiyaza Exp $"
344         slices = {}
345
346     # Get special slice IDs
347     root_xid = bwlimit.get_xid("root")
348     default_xid = bwlimit.get_xid("default")
349
350     #Open connection to Node Manager. Global.
351     nm = NM()
352
353     live = []
354     # Get actuall running values from tc.
355     for params in bwlimit.get():
356         (xid, share,
357          minrate, maxrate,
358          minexemptrate, maxexemptrate,
359          bytes, exemptbytes) = params
360         live.append(xid)
361
362         # Ignore root and default buckets
363         if xid == root_xid or xid == default_xid:
364             continue
365
366         name = bwlimit.get_slice(xid)
367         if name is None:
368             # Orphaned (not associated with a slice) class
369             name = "%d?" % xid
370
371         # Monitor only the specified slices
372         if names and name not in names:
373             continue
374         #slices is populated from the pickle file
375         #xid is populated from bwlimit (read from /etc/passwd) 
376         if slices.has_key(xid):
377             slice = slices[xid]
378             if time.time() >= (slice.time + period) or \
379                bytes < slice.bytes or exemptbytes < slice.exemptbytes:
380                 # Reset to defaults every 24 hours or if it appears
381                 # that the byte counters have overflowed (or, more
382                 # likely, the node was restarted or the HTB buckets
383                 # were re-initialized).
384                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
385             else:
386                 # Update byte counts
387                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
388         else:
389             # New slice, initialize state
390             slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
391
392     # Delete dead slices
393     dead = Set(slices.keys()) - Set(live)
394     for xid in dead:
395         del slices[xid]
396
397     if verbose:
398         print "Saving %s" % datafile
399     f = open(datafile, "w")
400     pickle.dump((version, slices), f)
401     f.close()
402
403     removepid("bwmon")
404
405 if __name__ == '__main__':
406     main()