b31e893c43ec17371994308c3e8796f529ec4474
[mom.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $
19 #
20
21 import syslog
22 import os
23 import sys
24 import getopt
25 import time
26 import pickle
27
28 import socket
29 import xmlrpclib
30 import bwlimit
31
32 from sets import Set
33
34 # Utility functions
35 from pl_mom import *
36
37 # Constants
38 seconds_per_day = 24 * 60 * 60
39 bits_per_byte = 8
40
41 # Defaults
42 debug = False
43 verbose = 0
44 datafile = "/var/lib/misc/bwmon.dat"
45 nm = None
46
47 # Burst to line rate (or node cap).  Set by NM.
48 default_maxrate = bwlimit.get_bwcap()
49 default_maxexemptrate = bwlimit.bwmax
50
51 # What we cap to when slices break the rules.
52 # 500 Kbit or 5.4 GB per day
53 #default_avgrate = 500000
54 # 1.5 Mbit or 16.4 GB per day
55 #default_avgexemptrate = 1500000
56
57 # 4 Gbyte per day. 4 * 1024K * 1024M * 1024G 
58 default_ByteThresh = 4294967296
59 # 5.4 Gbyte per day max allowed transfered per recording period
60 default_ByteMax = 5798205850 
61 # 14 Gbyte per day
62 default_ExemptByteThresh = 15032385536
63 # 16.4 Gbyte per day max allowed transfered per recording period to I2
64 default_ExemptByteMax = 17609365914 
65
66
67 # Average over 1 day
68 period = 1 * seconds_per_day
69
70 # Message template
71 template = \
72 """
73 The slice %(slice)s has transmitted more than %(bytes)s from
74 %(hostname)s to %(class)s destinations
75 since %(since)s.
76
77 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s
78 until %(until)s.
79
80 Please reduce the average %(class)s transmission rate
81 of the slice %(limit)s per %(period)s.
82
83 """.lstrip()
84
85 footer = \
86 """
87 %(date)s %(hostname)s bwcap %(slice)s
88 """.lstrip()
89
90 class Slice:
91         """
92         Stores the last recorded bandwidth parameters of a slice.
93
94         xid - slice context/VServer ID
95         name - slice name
96         time - beginning of recording period in UNIX seconds
97         bytes - low bandwidth bytes transmitted at the beginning of the recording period
98         exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
99         ByteMax - total volume of data allowed
100         ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
101         ExemptByteMax - Same as above, but for i2.
102         ExemptByteThresh - i2 ByteThresh
103         #last_maxexemptrate - last recorded maxexemptrate from NM.  Slice attribute.
104         #last_maxrate - last recorded maxrate from NM.  Slice attribute.
105         #last_ByteMax - Last recorded from NM. total volume of data allowed.
106         #last_ByteThresh - Last recorded from NM.  After thresh, cap node to (maxbyte - bytes)/(period - t)
107         #last_ExemptByteMax - Last recorded from NM.  Same as above, but for i2.
108         #last_ExemptByteThresh - Last recorded from NM.  i2 ByteThresh
109
110     """
111
112         def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
113                 self.xid = xid
114                 self.name = name
115                 self.time = 0
116                 self.bytes = 0
117                 self.exemptbytes = 0
118                 self.ByteMax = default_ByteMax
119                 self.ByteThresh = default_ByteThresh
120                 self.ExemptByteMax = default_ExemptByteMax
121                 self.ExemptByteThresh = default_ExemptByteThresh
122                 self.maxrate = default_maxrate
123                 self.maxexemptrate = default_maxexemptrate
124                 #self.last_maxrate = default_maxrate
125                 #self.last_maxexemptrate = default_maxexemptrate        
126                 #self.last_ByteMax = default_ByteMax
127                 #self.last_ByteThresh = default_ByteThresh
128                 #self.last_ExemptByteMax = default_ExemptByteMax
129                 #self.last_ExemptByteThresh = default_ExemptByteThresh
130
131                 # Get real values where applicable
132                 self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
133
134         def __repr__(self):
135                 return self.name
136
137         def updateSliceAttributes(self):
138                 # Query Node Manager for max rate overrides
139                 try:
140                         vals = nm.query(self.name, 
141                                 [('nm_net_max_rate', self.maxrate),
142                                 ('nm_net_max_exempt_rate', self.maxexemptrate)])
143                         (self.maxrate, self.maxexemptrate) = vals
144
145                 except Exception, err:
146                         print "Warning: Exception received while querying NM:", err
147
148         def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
149                 """
150                 Begin a new recording period. Remove caps by restoring limits
151                 to their default values.
152                 """
153                 
154                 # Query Node Manager for max rate overrides
155                 self.updateSliceAttributes()    
156
157                 # Reset baseline time
158                 self.time = time.time()
159
160                 # Reset baseline byte coutns
161                 self.bytes = bytes
162                 self.exemptbytes = exemptbytes
163
164                 if (self.maxrate != maxrate) or (self.maxexemptrate != maxexemptrate):
165                         print "%s reset to %s/%s" % \
166                                   (self.name,
167                                    bwlimit.format_tc_rate(self.maxrate),
168                                    bwlimit.format_tc_rate(self.maxexemptrate))
169                         bwlimit.set(xid = self.xid, maxrate = self.maxrate, maxexemptrate = self.maxexemptrate)
170
171         def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
172                 """
173                 Update byte counts and check if average rates have been
174                 exceeded. In the worst case (instantaneous usage of the entire
175                 average daily byte limit at the beginning of the recording
176                 period), the slice will be immediately capped and will get to
177                 send twice the average daily byte limit. In the common case,
178                 it will get to send slightly more than the average daily byte
179                 limit.
180                 """
181         
182                 # Query Node Manager for max rate overrides
183                 self.updateSliceAttributes()    
184  
185                 # Prepare message parameters from the template
186                 message = ""
187                 params = {'slice': self.name, 'hostname': socket.gethostname(),
188                                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
189                                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
190                                   'date': time.asctime(time.gmtime()) + " GMT",
191                                   'period': format_period(period)} 
192
193                 if bytes >= (self.bytes + self.ByteThresh):
194                         new_maxrate = (self.ByteMax - self.ByteThresh)/(period - (time.time() - self.time))
195                 else:
196                         new_maxrate = maxrate
197
198                 # Format template parameters for low bandwidth message
199                 params['class'] = "low bandwidth"
200                 params['bytes'] = format_bytes(bytes - self.bytes)
201                 params['maxrate'] = bwlimit.format_tc_rate(maxrate)
202                 params['limit'] = format_bytes(self.ByteMax)
203                 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
204
205                 if verbose:
206                         print "%(slice)s %(class)s " \
207                                   "%(bytes)s, %(limit)s (%(new_maxrate)s avg)" % \
208                                   params
209
210                 # Cap low bandwidth burst rate
211                 if new_maxrate != maxrate:
212                         message += template % params
213                         print "%(slice)s %(class)s capped at %(new_maxrate)s " % params
214         
215                 if exemptbytes >= (self.exemptbytes + self.ExemptByteThresh):
216                         new_maxexemptrate = \
217                         (self.ExemptByteMax - self.ExemptByteThresh)/(period - (time.time() - self.time))
218                 else:
219                         new_maxexemptrate = maxexemptrate
220
221                 # Format template parameters for high bandwidth message
222                 params['class'] = "high bandwidth"
223                 params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
224                 params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
225                 params['limit'] = format_bytes(self.ExemptByteMax)
226                 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
227
228                 if verbose:
229                         print "%(slice)s %(class)s " \
230                                   "%(bytes)s, %(limit)s (%(new_maxrate)s avg)" % params
231
232                 # Cap high bandwidth burst rate
233                 if new_maxexemptrate != maxexemptrate:
234                         message += template % params
235                         print "%(slice)s %(class)s capped at %(new_maxexemptrate)s" % params
236
237                 # Apply parameters
238                 if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
239                         bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
240
241                 # Notify slice
242                 if message:
243                         subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
244                         if debug:
245                                 print subject
246                                 print message + (footer % params)
247                         else:
248                                 slicemail(self.name, subject, message + (footer % params))
249
250 def usage():
251         print """
252 Usage: %s [OPTIONS]...
253
254 Options:
255                 -d, --debug                     Enable debugging (default: %s)
256                 -v, --verbose               Increase verbosity level (default: %d)
257                 -f, --file=FILE             Data file (default: %s)
258                 -s, --slice=SLICE           Constrain monitoring to these slices (default: all)
259                 -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
260                 -h, --help                          This message
261 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
262
263 def main():
264         # Defaults
265         global debug, verbose, datafile, period, nm
266         # All slices
267         names = []
268
269         try:
270                 longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
271                 (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
272         except getopt.GetoptError, err:
273                 print "Error: " + err.msg
274                 usage()
275                 sys.exit(1)
276
277         for (opt, optval) in opts:
278                 if opt == "-d" or opt == "--debug":
279                         debug = True
280                 elif opt == "-v" or opt == "--verbose":
281                         verbose += 1
282                         bwlimit.verbose = verbose - 1
283                 elif opt == "-f" or opt == "--file":
284                         datafile = optval
285                 elif opt == "-s" or opt == "--slice":
286                         names.append(optval)
287                 elif opt == "-p" or opt == "--period":
288                         period = int(optval)
289                 else:
290                         usage()
291                         sys.exit(0)
292
293         # Check if we are already running
294         writepid("bwmon")
295
296         if not debug:
297                 # Redirect stdout and stderr to syslog
298                 syslog.openlog("bwmon")
299                 sys.stdout = sys.stderr = Logger()
300
301         try:
302                 f = open(datafile, "r+")
303                 if verbose:
304                         print "Loading %s" % datafile
305                 (version, slices) = pickle.load(f)
306                 f.close()
307                 # Check version of data file
308                 if version != "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $":
309                         print "Not using old version '%s' data file %s" % (version, datafile)
310                         raise Exception
311         except Exception:
312                 version = "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $"
313                 slices = {}
314
315         # Get special slice IDs
316         root_xid = bwlimit.get_xid("root")
317         default_xid = bwlimit.get_xid("default")
318
319         #Open connection to Node Manager. Global.
320         nm = NM()
321
322         live = []
323         # Get actuall running values from tc.
324         for params in bwlimit.get():
325                 (xid, share,
326                  minrate, maxrate,
327                  minexemptrate, maxexemptrate,
328                  bytes, exemptbytes) = params
329                 live.append(xid)
330                 
331                 # Delete Me
332                 print("name %s , minrate %s, maxrate %s, minexemptrate %s, maxexemptrate %s, bytes %s, exemptbytes %s" % (bwlimit.get_slice(xid), minrate, maxrate, minexemptrate, maxexemptrate, bytes, exemptbytes))
333
334                 # Ignore root and default buckets
335                 if xid == root_xid or xid == default_xid:
336                         continue
337
338                 name = bwlimit.get_slice(xid)
339                 if name is None:
340                         # Orphaned (not associated with a slice) class
341                         name = "%d?" % xid
342
343                 # Monitor only the specified slices
344                 if names and name not in names:
345                         continue
346                 #slices is populated from the pickle file
347                 #xid is populated from bwlimit (read from /etc/passwd) 
348                 if slices.has_key(xid):
349                         slice = slices[xid]
350                         if time.time() >= (slice.time + period) or \
351                            bytes < slice.bytes or exemptbytes < slice.exemptbytes:
352                                 # Reset to defaults every 24 hours or if it appears
353                                 # that the byte counters have overflowed (or, more
354                                 # likely, the node was restarted or the HTB buckets
355                                 # were re-initialized).
356                                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
357                         else:
358                                 # Update byte counts
359                                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
360                 else:
361                         # New slice, initialize state
362                         slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
363
364         # Delete dead slices
365         dead = Set(slices.keys()) - Set(live)
366         for xid in dead:
367                 del slices[xid]
368
369         if verbose:
370                 print "Saving %s" % datafile
371         f = open(datafile, "w")
372         pickle.dump((version, slices), f)
373         f.close()
374
375         removepid("bwmon")
376
377 if __name__ == '__main__':
378         main()