Stores last values reported by NM. Uses cached vals if NM stops responding instead of
[mom.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.4 2006/06/02 04:00:00 mlhuang Exp $
19 #
20
21 import syslog
22 import os
23 import sys
24 import getopt
25 import time
26 import pickle
27
28 import socket
29 import xmlrpclib
30 import bwlimit
31
32 from sets import Set
33
34 # Utility functions
35 from pl_mom import *
36
37 # Constants
38 seconds_per_day = 24 * 60 * 60
39 bits_per_byte = 8
40
41 # Defaults
42 debug = False
43 verbose = 0
44 datafile = "/var/lib/misc/bwmon.dat"
45 nm = None
46
47 default_maxrate = bwlimit.get_bwcap()
48
49 default_maxexemptrate = bwlimit.bwmax
50
51 # 500 Kbit or 5.4 GB per day
52 default_avgrate = 500000
53
54 # 1.5 Mbit or 16.4 GB per day
55 default_avgexemptrate = 1500000
56
57 # Average over 1 day
58 period = 1 * seconds_per_day
59
60 # Message template
61 template = \
62 """
63 The slice %(slice)s has transmitted more than %(bytes)s from
64 %(hostname)s to %(class)s destinations
65 since %(since)s.
66
67 Its maximum %(class)s burst rate will be capped at %(avgrate)s
68 until %(until)s.
69
70 Please reduce the average %(class)s transmission rate
71 of the slice to %(avgrate)s, or %(limit)s per %(period)s.
72
73 """.lstrip()
74
75 footer = \
76 """
77 %(date)s %(hostname)s bwcap %(slice)s
78 """.lstrip()
79
80 class Slice:
81     """
82     Stores the last recorded bandwidth parameters of a slice.
83
84     xid - slice context/VServer ID
85     name - slice name
86     time - beginning of recording period in UNIX seconds
87     bytes - low bandwidth bytes transmitted at the beginning of the recording period
88     exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
89     avgrate - average low bandwidth rate to enforce over the recording period
90     avgexemptrate - average high bandwidth rate to enforce over the recording period (for I2 -F)
91     last_avgrate - last recorded avgrate from NM
92     last_maxrate - last recorded maxrate from NM
93     last_avgexemptrate - last recorded avgexemptrate from NM
94     last_maxexemptrate - last recorded maxexemptrate from NM
95     """
96
97     def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
98         self.xid = xid
99         self.name = name
100         self.last_maxrate = default_maxrate
101         self.last_avgrate = default_avgrate
102         self.last_avgexemptrate = default_avgexemptrate 
103         self.last_maxexemptrate = default_maxexemptrate 
104         self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
105
106     def __repr__(self):
107         return self.name
108
109     def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
110         """
111         Begin a new recording period. Remove caps by restoring limits
112         to their default values.
113         """
114
115         # Reset baseline time
116         self.time = time.time()
117
118         # Reset baseline byte coutns
119         self.bytes = bytes
120         self.exemptbytes = exemptbytes
121
122         # Query Node Manager for max rate overrides
123         try:
124                 vals = nm.query(self.name, [('nm_net_max_rate', self.last_maxrate),
125                                 ('nm_net_max_exempt_rate', self.last_maxexemptrate),
126                                 ('nm_net_avg_rate', self.last_avgrate),
127                                 ('nm_net_avg_exempt_rate', self.last_avgexemptrate)])
128                 (new_maxrate, new_maxexemptrate, 
129                         self.last_avgrate, self.last_avgexemptrate) = vals 
130                 #If NM is alive, and there is a cap, update new
131                 self.last_maxrate = new_maxrate
132                 self.last_maxexemptrate = new_maxexemptrate
133
134         except Exception, err:
135                 print "Warning: Exception received while querying NM:", err
136
137         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
138             print "%s reset to %s/%s" % \
139                   (self.name,
140                    bwlimit.format_tc_rate(new_maxrate),
141                    bwlimit.format_tc_rate(new_maxexemptrate))
142             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
143
144     def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
145         """
146         Update byte counts and check if average rates have been
147         exceeded. In the worst case (instantaneous usage of the entire
148         average daily byte limit at the beginning of the recording
149         period), the slice will be immediately capped and will get to
150         send twice the average daily byte limit. In the common case,
151         it will get to send slightly more than the average daily byte
152         limit.
153         """
154
155         # Query Node Manager for max average rate overrides
156         try:
157                 (self.avgrate, self.avgexemptrate) = nm.query(self.name, 
158                         [('nm_net_avg_rate', self.last_avgrate), 
159                         ('nm_net_avg_exempt_rate', self.last_avgexemptrate)])
160                 #If NM is alive, and there is a cap, update new
161                 self.last_avgexemptrate = self.avgexemptrate
162                 self.last_avgrate = self.avgrate
163         except Exception, err:
164                 print "Warning: Exception received while querying NM:", err
165  
166         # Prepare message parameters from the template
167         message = ""
168         params = {'slice': self.name, 'hostname': socket.gethostname(),
169                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
170                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
171                   'date': time.asctime(time.gmtime()) + " GMT",
172                   'period': format_period(period)} 
173
174         bytelimit = self.avgrate * period / bits_per_byte
175         if bytes >= (self.bytes + bytelimit) and \
176            maxrate > self.avgrate:
177             new_maxrate = self.avgrate
178         else:
179             new_maxrate = maxrate
180
181         # Format template parameters for low bandwidth message
182         params['class'] = "low bandwidth"
183         params['bytes'] = format_bytes(bytes - self.bytes)
184         params['maxrate'] = bwlimit.format_tc_rate(maxrate)
185         params['limit'] = format_bytes(bytelimit)
186         params['avgrate'] = bwlimit.format_tc_rate(self.avgrate)
187
188         if verbose:
189             print "%(slice)s %(class)s " \
190                   "%(bytes)s, %(limit)s (%(maxrate)s max/%(avgrate)s avg)" % \
191                   params
192
193         # Cap low bandwidth burst rate
194         if new_maxrate != maxrate:
195             message += template % params
196             print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params
197
198         exemptbytelimit = self.avgexemptrate * period / bits_per_byte
199         if exemptbytes >= (self.exemptbytes + exemptbytelimit) and \
200            maxexemptrate > self.avgexemptrate:
201             new_maxexemptrate = self.avgexemptrate
202         else:
203             new_maxexemptrate = maxexemptrate
204
205         # Format template parameters for high bandwidth message
206         params['class'] = "high bandwidth"
207         params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
208         params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
209         params['limit'] = format_bytes(exemptbytelimit)
210         params['avgrate'] = bwlimit.format_tc_rate(self.avgexemptrate)
211
212         if verbose:
213             print "%(slice)s %(class)s " \
214                   "%(bytes)s, %(limit)s (%(maxrate)s max /%(avgrate)s avg)" % \
215                   params
216
217         # Cap high bandwidth burst rate
218         if new_maxexemptrate != maxexemptrate:
219             message += template % params
220             print "%(slice)s %(class)s capped at %(avgrate)s (%(bytes)s/%(limit)s)" % params
221
222         # Apply parameters
223         if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
224             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
225
226         # Notify slice
227         if message:
228             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
229             if debug:
230                 print subject
231                 print message + (footer % params)
232             else:
233                 slicemail(self.name, subject, message + (footer % params))
234
235 def usage():
236     print """
237 Usage: %s [OPTIONS]...
238
239 Options:
240         -d, --debug             Enable debugging (default: %s)
241         -v, --verbose           Increase verbosity level (default: %d)
242         -f, --file=FILE         Data file (default: %s)
243         -s, --slice=SLICE       Constrain monitoring to these slices (default: all)
244         -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
245         -h, --help              This message
246 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
247
248 def main():
249     # Defaults
250     global debug, verbose, datafile, period, nm
251     # All slices
252     names = []
253
254     try:
255         longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
256         (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
257     except getopt.GetoptError, err:
258         print "Error: " + err.msg
259         usage()
260         sys.exit(1)
261
262     for (opt, optval) in opts:
263         if opt == "-d" or opt == "--debug":
264             debug = True
265         elif opt == "-v" or opt == "--verbose":
266             verbose += 1
267             bwlimit.verbose = verbose - 1
268         elif opt == "-f" or opt == "--file":
269             datafile = optval
270         elif opt == "-s" or opt == "--slice":
271             names.append(optval)
272         elif opt == "-p" or opt == "--period":
273             period = int(optval)
274         else:
275             usage()
276             sys.exit(0)
277
278     # Check if we are already running
279     writepid("bwmon")
280
281     if not debug:
282         # Redirect stdout and stderr to syslog
283         syslog.openlog("bwmon")
284         sys.stdout = sys.stderr = Logger()
285
286     try:
287         f = open(datafile, "r+")
288         if verbose:
289             print "Loading %s" % datafile
290         (version, slices) = pickle.load(f)
291         f.close()
292         # Check version of data file
293         if version != "$Id: bwmon.py,v 1.4 2006/06/02 04:00:00 mlhuang Exp $":
294             print "Not using old version '%s' data file %s" % (version, datafile)
295             raise Exception
296     except Exception:
297         version = "$Id: bwmon.py,v 1.4 2006/06/02 04:00:00 mlhuang Exp $"
298         slices = {}
299
300     # Get special slice IDs
301     root_xid = bwlimit.get_xid("root")
302     default_xid = bwlimit.get_xid("default")
303
304     #Open connection to Node Manager
305     nm = NM()
306
307     live = []
308     for params in bwlimit.get():
309         (xid, share,
310          minrate, maxrate,
311          minexemptrate, maxexemptrate,
312          bytes, exemptbytes) = params
313         live.append(xid)
314
315         # Ignore root and default buckets
316         if xid == root_xid or xid == default_xid:
317             continue
318
319         name = bwlimit.get_slice(xid)
320         if name is None:
321             # Orphaned (not associated with a slice) class
322             name = "%d?" % xid
323
324         # Monitor only the specified slices
325         if names and name not in names:
326             continue
327
328         #slices is populated from the pickle file
329         #xid is populated from bwlimit (read from /etc/passwd) 
330         if slices.has_key(xid):
331             slice = slices[xid]
332             if time.time() >= (slice.time + period) or \
333                bytes < slice.bytes or exemptbytes < slice.exemptbytes:
334                 # Reset to defaults every 24 hours or if it appears
335                 # that the byte counters have overflowed (or, more
336                 # likely, the node was restarted or the HTB buckets
337                 # were re-initialized).
338                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
339             else:
340                 # Update byte counts
341                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
342         else:
343             # New slice, initialize state
344             slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
345
346     # Delete dead slices
347     dead = Set(slices.keys()) - Set(live)
348     for xid in dead:
349         del slices[xid]
350
351     if verbose:
352         print "Saving %s" % datafile
353     f = open(datafile, "w")
354     pickle.dump((version, slices), f)
355     f.close()
356
357     removepid("bwmon")
358
359 if __name__ == '__main__':
360     main()