* Capped rate can only go as low as default_MinRate which is 8bits/s
[mom.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.15 2006/12/13 21:39:23 faiyaza Exp $
19 #
20
21 import syslog
22 import os
23 import sys
24 import getopt
25 import time
26 import pickle
27
28 import socket
29 import xmlrpclib
30 import bwlimit
31
32 from sets import Set
33
34 # Utility functions
35 from pl_mom import *
36
37 # Constants
38 seconds_per_day = 24 * 60 * 60
39 bits_per_byte = 8
40
41 # Defaults
42 debug = False
43 verbose = 0
44 datafile = "/var/lib/misc/bwmon.dat"
45 nm = None
46
47 # Burst to line rate (or node cap).  Set by NM.
48 default_maxrate = bwlimit.get_bwcap()
49 default_maxexemptrate = bwlimit.bwmax
50
51 # What we cap to when slices break the rules.
52 # 500 Kbit or 5.4 GB per day
53 #default_avgrate = 500000
54 # 1.5 Mbit or 16.4 GB per day
55 #default_avgexemptrate = 1500000
56
57 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G 
58 # 5.4 Gbyte per day max allowed transfered per recording period
59 default_ByteMax = 5798205850
60 default_ByteThresh = int(.8 * default_ByteMax) 
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_ExemptByteMax = 17609365914 
63 default_ExemptByteThresh = int(.8 * default_ExemptByteMax) 
64
65 default_MinRate = 8
66
67 # Average over 1 day
68 period = 1 * seconds_per_day
69
70 # Message template
71 template = \
72 """
73 The slice %(slice)s has transmitted more than %(bytes)s from
74 %(hostname)s to %(class)s destinations
75 since %(since)s.
76
77 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s
78 until %(until)s.
79
80 Please reduce the average %(class)s transmission rate
81 of the slice %(limit)s per %(period)s.
82
83 """.lstrip()
84
85 footer = \
86 """
87 %(date)s %(hostname)s bwcap %(slice)s
88 """.lstrip()
89
90 class Slice:
91     """
92     Stores the last recorded bandwidth parameters of a slice.
93
94     xid - slice context/VServer ID
95     name - slice name
96     time - beginning of recording period in UNIX seconds
97     bytes - low bandwidth bytes transmitted at the beginning of the recording period
98     exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
99     ByteMax - total volume of data allowed
100     ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
101     ExemptByteMax - Same as above, but for i2.
102     ExemptByteThresh - i2 ByteThresh
103     maxrate - max_rate slice attribute. 
104     maxexemptrate - max_exempt_rate slice attribute.
105     self.emailed = did we email during this recording period
106
107     """
108
109     def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
110         self.xid = xid
111         self.name = name
112         self.time = 0
113         self.bytes = 0
114         self.exemptbytes = 0
115         self.ByteMax = default_ByteMax
116         self.ByteThresh = default_ByteThresh
117         self.ExemptByteMax = default_ExemptByteMax
118         self.ExemptByteThresh = default_ExemptByteThresh
119         self.maxrate = default_maxrate
120         self.maxexemptrate = default_maxexemptrate
121         self.emailed = False
122
123         # Get real values where applicable
124         self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
125
126     def __repr__(self):
127         return self.name
128
129     def updateSliceAttributes(self):
130         # Query Node Manager for max rate overrides
131         try:
132             vals = nm.query(self.name, 
133                 [('nm_net_max_rate', self.maxrate),
134                 ('nm_net_max_exempt_rate', self.maxexemptrate),
135                 ("nm_net_max_byte", int(self.ByteMax / 1024)),
136                 ("nm_net_max_exempt_byte", int(self.ExemptByteMax / 1024)),
137                 ("nm_net_max_thresh_byte", int( .8 * self.ByteMax / 1024)),
138                 ("nm_net_max_thresh_exempt_byte", int(.8 * self.ExemptByteMax / 1024)),
139                 ("nm_net_avg_rate", 0),
140                 ("nm_net_avg_exempt_rate", 0)])
141
142             (self.maxrate,
143                 self.maxexemptrate,
144                 ByteMax,
145                 ExemptByteMax,
146                 ByteThresh,
147                 ExemptByteThresh,
148                 avgrate,
149                 avgexemptrate) = vals
150             
151             # The shitty bit.  Gotta bias the limits so as not to overflow xmlrpc
152             self.ByteMax = ByteMax * 1024 
153             self.ByteThresh = ByteThresh * 1024 
154             self.ExemptByteMax = ExemptByteMax * 1024 
155             self.ExemptByteThresh = ExemptByteThresh * 1024 
156
157             # The hack here is that when i pass 0 to the xmlrpc request to NM, 
158             # for rate limits and it comes back non zero, then screw the byte limits.
159             # Mult by the period and recompute the byte limits.  The thought is
160             # If/when PLC switches to byte limits, the avgrates wont be used as
161             # slice attributes and will return as 0
162             if (avgrate != 0):
163                 self.ByteMax = avgrate * period 
164                 self.ByteThresh = int(self.ByteMax * .8)
165
166             if (avgexemptrate != 0):
167                 self.ExemptByteMax = avgexemptrate * period
168                 self.ExemptByteThresh = int(self.ExemptByteMax * .8)
169
170         except Exception, err:
171             print "Warning: Exception received while querying NM:", err
172
173     def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
174         """
175         Begin a new recording period. Remove caps by restoring limits
176         to their default values.
177         """
178         
179         # Query Node Manager for max rate overrides
180         self.updateSliceAttributes()    
181
182         # Reset baseline time
183         self.time = time.time()
184
185         # Reset baseline byte coutns
186         self.bytes = bytes
187         self.exemptbytes = exemptbytes
188
189         # Reset email 
190         self.emailed = False
191
192         if (self.maxrate != maxrate) or (self.maxexemptrate != maxexemptrate):
193             print "%s reset to %s/%s" % \
194                   (self.name,
195                    bwlimit.format_tc_rate(self.maxrate),
196                    bwlimit.format_tc_rate(self.maxexemptrate))
197             bwlimit.set(xid = self.xid, maxrate = self.maxrate, maxexemptrate = self.maxexemptrate)
198
199     def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
200         """
201         Update byte counts and check if byte limits have been
202         exceeded. 
203         """
204     
205         # Query Node Manager for max rate overrides
206         self.updateSliceAttributes()    
207      
208         if verbose:
209             print("\n%s slice attributes "\
210                 "maxrate %s, maxexemptrate %s" % \
211                 (self.name, 
212                 bwlimit.format_tc_rate(maxrate), 
213                 bwlimit.format_tc_rate(maxexemptrate)))
214
215         # Prepare message parameters from the template
216         message = ""
217         params = {'slice': self.name, 'hostname': socket.gethostname(),
218                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
219                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
220                   'date': time.asctime(time.gmtime()) + " GMT",
221                   'period': format_period(period)} 
222
223         if bytes >= (self.bytes + self.ByteThresh):
224             new_maxrate = \
225             int((self.ByteMax - self.bytes + bytes)/(period - time.time() - self.time))
226             if new_maxrate < default_MinRate:
227                 new_maxrate = default_MinRate
228         else:
229             new_maxrate = maxrate
230
231         # Format template parameters for low bandwidth message
232         params['class'] = "low bandwidth"
233         params['bytes'] = format_bytes(bytes - self.bytes)
234         params['maxrate'] = bwlimit.format_tc_rate(maxrate)
235         params['limit'] = format_bytes(self.ByteMax)
236         params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
237
238         if verbose:
239             print "%(slice)s %(class)s " \
240                   "%(bytes)s of %(limit)s (%(new_maxrate)s maxrate)" % \
241                   params
242
243         # Cap low bandwidth burst rate
244         if new_maxrate != maxrate:
245             message += template % params
246             print "%(slice)s %(class)s capped at %(new_maxrate)s " % params
247     
248         if exemptbytes >= (self.exemptbytes + self.ExemptByteThresh):
249             new_maxexemptrate = \
250             int((self.ExemptByteMax - (self.bytes + bytes))/(period - (time.time() - self.time)))
251             if new_maxexemptrate < default_MinRate:
252                 new_maxexemptrate = default_MinRate
253         else:
254             new_maxexemptrate = maxexemptrate
255
256         # Format template parameters for high bandwidth message
257         params['class'] = "high bandwidth"
258         params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
259         params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
260         params['limit'] = format_bytes(self.ExemptByteMax)
261         params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
262
263         if verbose:
264             print "%(slice)s %(class)s " \
265                   "%(bytes)s of %(limit)s (%(new_maxrate)s maxrate)" % params
266
267         # Cap high bandwidth burst rate
268         if new_maxexemptrate != maxexemptrate:
269             message += template % params
270             print "%(slice)s %(class)s capped at %(new_maxexemptrate)s" % params
271
272         # Apply parameters
273         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
274             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
275
276         # Notify slice
277         if message and self.emailed == False:
278             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
279             if debug:
280                 print subject
281                 print message + (footer % params)
282             else:
283                 self.emailed = True
284                 slicemail(self.name, subject, message + (footer % params))
285
286
287
288 def usage():
289     print """
290 Usage: %s [OPTIONS]...
291
292 Options:
293         -d, --debug            Enable debugging (default: %s)
294         -v, --verbose            Increase verbosity level (default: %d)
295         -f, --file=FILE            Data file (default: %s)
296         -s, --slice=SLICE        Constrain monitoring to these slices (default: all)
297         -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
298         -h, --help                This message
299 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
300
301 def main():
302     # Defaults
303     global debug, verbose, datafile, period, nm
304     # All slices
305     names = []
306
307     try:
308         longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
309         (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
310     except getopt.GetoptError, err:
311         print "Error: " + err.msg
312         usage()
313         sys.exit(1)
314
315     for (opt, optval) in opts:
316         if opt == "-d" or opt == "--debug":
317             debug = True
318         elif opt == "-v" or opt == "--verbose":
319             verbose += 1
320             bwlimit.verbose = verbose - 1
321         elif opt == "-f" or opt == "--file":
322             datafile = optval
323         elif opt == "-s" or opt == "--slice":
324             names.append(optval)
325         elif opt == "-p" or opt == "--period":
326             period = int(optval)
327         else:
328             usage()
329             sys.exit(0)
330
331     # Check if we are already running
332     writepid("bwmon")
333
334     if not debug:
335         # Redirect stdout and stderr to syslog
336         syslog.openlog("bwmon")
337         sys.stdout = sys.stderr = Logger()
338
339     try:
340         f = open(datafile, "r+")
341         if verbose:
342             print "Loading %s" % datafile
343         (version, slices) = pickle.load(f)
344         f.close()
345         # Check version of data file
346         if version != "$Id: bwmon.py,v 1.15 2006/12/13 21:39:23 faiyaza Exp $":
347             print "Not using old version '%s' data file %s" % (version, datafile)
348             raise Exception
349     except Exception:
350         version = "$Id: bwmon.py,v 1.15 2006/12/13 21:39:23 faiyaza Exp $"
351         slices = {}
352
353     # Get special slice IDs
354     root_xid = bwlimit.get_xid("root")
355     default_xid = bwlimit.get_xid("default")
356
357     #Open connection to Node Manager. Global.
358     nm = NM()
359
360     live = []
361     # Get actuall running values from tc.
362     for params in bwlimit.get():
363         (xid, share,
364          minrate, maxrate,
365          minexemptrate, maxexemptrate,
366          bytes, exemptbytes) = params
367         live.append(xid)
368
369         # Ignore root and default buckets
370         if xid == root_xid or xid == default_xid:
371             continue
372
373         name = bwlimit.get_slice(xid)
374         if name is None:
375             # Orphaned (not associated with a slice) class
376             name = "%d?" % xid
377
378         # Monitor only the specified slices
379         if names and name not in names:
380             continue
381         #slices is populated from the pickle file
382         #xid is populated from bwlimit (read from /etc/passwd) 
383         if slices.has_key(xid):
384             slice = slices[xid]
385             if time.time() >= (slice.time + period) or \
386                bytes < slice.bytes or exemptbytes < slice.exemptbytes:
387                 # Reset to defaults every 24 hours or if it appears
388                 # that the byte counters have overflowed (or, more
389                 # likely, the node was restarted or the HTB buckets
390                 # were re-initialized).
391                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
392             else:
393                 # Update byte counts
394                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
395         else:
396             # New slice, initialize state
397             slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
398
399     # Delete dead slices
400     dead = Set(slices.keys()) - Set(live)
401     for xid in dead:
402         del slices[xid]
403
404     if verbose:
405         print "Saving %s" % datafile
406     f = open(datafile, "w")
407     pickle.dump((version, slices), f)
408     f.close()
409
410     removepid("bwmon")
411
412 if __name__ == '__main__':
413     main()