* Queries NM for: "nm_net_max_byte",
[mom.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $
19 #
20
21 import syslog
22 import os
23 import sys
24 import getopt
25 import time
26 import pickle
27
28 import socket
29 import xmlrpclib
30 import bwlimit
31
32 from sets import Set
33
34 # Utility functions
35 from pl_mom import *
36
37 # Constants
38 seconds_per_day = 24 * 60 * 60
39 bits_per_byte = 8
40
41 # Defaults
42 debug = False
43 verbose = 0
44 datafile = "/var/lib/misc/bwmon.dat"
45 nm = None
46
47 # Burst to line rate (or node cap).  Set by NM.
48 default_maxrate = bwlimit.get_bwcap()
49 default_maxexemptrate = bwlimit.bwmax
50
51 # What we cap to when slices break the rules.
52 # 500 Kbit or 5.4 GB per day
53 #default_avgrate = 500000
54 # 1.5 Mbit or 16.4 GB per day
55 #default_avgexemptrate = 1500000
56
57 # 5.4 Gbyte per day. 5.4 * 1024M * 1024G kbytes
58 # 5.4 Gbyte per day max allowed transfered per recording period
59 default_ByteMax = 5662310
60 default_ByteThresh = int(.8 * default_ByteMax) 
61 # 16.4 Gbyte per day max allowed transfered per recording period to I2
62 default_ExemptByteMax = 17196646 
63 default_ExemptByteThresh = int(.8 * default_ExemptByteMax) 
64
65
66 # Average over 1 day
67 period = 1 * seconds_per_day
68
69 # Message template
70 template = \
71 """
72 The slice %(slice)s has transmitted more than %(bytes)s from
73 %(hostname)s to %(class)s destinations
74 since %(since)s.
75
76 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s
77 until %(until)s.
78
79 Please reduce the average %(class)s transmission rate
80 of the slice %(limit)s per %(period)s.
81
82 """.lstrip()
83
84 footer = \
85 """
86 %(date)s %(hostname)s bwcap %(slice)s
87 """.lstrip()
88
89 class Slice:
90         """
91         Stores the last recorded bandwidth parameters of a slice.
92
93         xid - slice context/VServer ID
94         name - slice name
95         time - beginning of recording period in UNIX seconds
96         bytes - low bandwidth bytes transmitted at the beginning of the recording period
97         exemptbytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
98         ByteMax - total volume of data allowed
99         ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
100         ExemptByteMax - Same as above, but for i2.
101         ExemptByteThresh - i2 ByteThresh
102         maxrate - max_rate slice attribute. 
103         maxexemptrate - max_exempt_rate slice attribute.
104
105     """
106
107         def __init__(self, xid, name, maxrate, maxexemptrate, bytes, exemptbytes):
108                 self.xid = xid
109                 self.name = name
110                 self.time = 0
111                 self.bytes = 0
112                 self.exemptbytes = 0
113                 self.ByteMax = default_ByteMax
114                 self.ByteThresh = default_ByteThresh
115                 self.ExemptByteMax = default_ExemptByteMax
116                 self.ExemptByteThresh = default_ExemptByteThresh
117                 self.maxrate = default_maxrate
118                 self.maxexemptrate = default_maxexemptrate
119
120                 # Get real values where applicable
121                 self.reset(maxrate, maxexemptrate, bytes, exemptbytes)
122
123         def __repr__(self):
124                 return self.name
125
126         def updateSliceAttributes(self):
127                 # Query Node Manager for max rate overrides
128                 try:
129                         vals = nm.query(self.name, 
130                                 [('nm_net_max_rate', self.maxrate),
131                                 ('nm_net_max_exempt_rate', self.maxexemptrate),
132                                 ("nm_net_max_byte", self.ByteMax),
133                                 ("nm_net_max_exempt_byte", self.ExemptByteMax),
134                                 ("nm_net_max_thresh_byte", int( .8 * self.ByteMax)),
135                                 ("nm_net_max_thresh_exempt_byte", int(.8 * self.ExemptByteMax)),
136                                 ("nm_net_avg_rate", 0),
137                                 ("nm_net_avg_exempt_rate", 0)])
138
139                         (self.maxrate,
140                         self.maxexemptrate,
141                         self.ByteMax,
142                         self.ExemptByteMax,
143                         self.ByteThresh,
144                         self.ExemptByteThresh,
145                         avgrate,
146                         avgexemptrate) = vals
147
148                         if (avgrate != 0) or (avgexemptrate != 0):
149                                 self.ByteMax = avgrate * period
150                                 self.ByteThresh = int(self.ByteMax * .8)
151                                 self.ExemptByteMax = avgexemptrate * period
152                                 self.ExemptByteThresh = int(self.ExemptByteMax * .8)
153
154                 except Exception, err:
155                         print "Warning: Exception received while querying NM:", err
156
157         def reset(self, maxrate, maxexemptrate, bytes, exemptbytes):
158                 """
159                 Begin a new recording period. Remove caps by restoring limits
160                 to their default values.
161                 """
162                 
163                 # Query Node Manager for max rate overrides
164                 self.updateSliceAttributes()    
165
166                 # Reset baseline time
167                 self.time = time.time()
168
169                 # Reset baseline byte coutns
170                 self.bytes = bytes
171                 self.exemptbytes = exemptbytes
172
173                 if (self.maxrate != maxrate) or (self.maxexemptrate != maxexemptrate):
174                         print "%s reset to %s/%s" % \
175                                   (self.name,
176                                    bwlimit.format_tc_rate(self.maxrate),
177                                    bwlimit.format_tc_rate(self.maxexemptrate))
178                         bwlimit.set(xid = self.xid, maxrate = self.maxrate, maxexemptrate = self.maxexemptrate)
179
180         def update(self, maxrate, maxexemptrate, bytes, exemptbytes):
181                 """
182                 Update byte counts and check if byte limits have been
183                 exceeded. 
184                 """
185         
186                 # Query Node Manager for max rate overrides
187                 self.updateSliceAttributes()    
188  
189                 # Prepare message parameters from the template
190                 message = ""
191                 params = {'slice': self.name, 'hostname': socket.gethostname(),
192                                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
193                                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
194                                   'date': time.asctime(time.gmtime()) + " GMT",
195                                   'period': format_period(period)} 
196
197                 if bytes >= (self.bytes + (self.ByteThresh * 1024)):
198                         new_maxrate = \
199                         int(((self.ByteMax * 1024) - self.bytes + bytes)/(period - time.time() - self.time))
200                 else:
201                         new_maxrate = maxrate
202
203                 # Format template parameters for low bandwidth message
204                 params['class'] = "low bandwidth"
205                 params['bytes'] = format_bytes(bytes - self.bytes)
206                 params['maxrate'] = bwlimit.format_tc_rate(maxrate)
207                 params['limit'] = format_bytes(int(self.ByteMax * 1024))
208                 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
209
210                 if verbose:
211                         print "%(slice)s %(class)s " \
212                                   "%(bytes)s of %(limit)s (%(new_maxrate)s maxrate)" % \
213                                   params
214
215                 # Cap low bandwidth burst rate
216                 if new_maxrate != maxrate:
217                         message += template % params
218                         print "%(slice)s %(class)s capped at %(new_maxrate)s " % params
219         
220                 if exemptbytes >= (self.exemptbytes + (self.ExemptByteThresh * 1024)):
221                         new_maxexemptrate = \
222                         int(((self.ExemptByteMax * 1024) - (self.bytes + bytes))/(period - (time.time() - self.time)))
223                 else:
224                         new_maxexemptrate = maxexemptrate
225
226                 # Format template parameters for high bandwidth message
227                 params['class'] = "high bandwidth"
228                 params['bytes'] = format_bytes(exemptbytes - self.exemptbytes)
229                 params['maxrate'] = bwlimit.format_tc_rate(maxexemptrate)
230                 params['limit'] = format_bytes(self.ExemptByteMax * 1024)
231                 params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate)
232
233                 if verbose:
234                         print "%(slice)s %(class)s " \
235                                   "%(bytes)s of %(limit)s (%(new_maxrate)s maxrate)" % params
236
237                 # Cap high bandwidth burst rate
238                 if new_maxexemptrate != maxexemptrate:
239                         message += template % params
240                         print "%(slice)s %(class)s capped at %(new_maxexemptrate)s" % params
241
242                 # Apply parameters
243                 if new_maxrate != maxrate or new_maxexemptrate != maxexemptrate:
244                         bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxexemptrate)
245
246                 # Notify slice
247                 if message:
248                         subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
249                         if debug:
250                                 print subject
251                                 print message + (footer % params)
252                         else:
253                                 slicemail(self.name, subject, message + (footer % params))
254
255
256
257 def usage():
258         print """
259 Usage: %s [OPTIONS]...
260
261 Options:
262                 -d, --debug                     Enable debugging (default: %s)
263                 -v, --verbose               Increase verbosity level (default: %d)
264                 -f, --file=FILE             Data file (default: %s)
265                 -s, --slice=SLICE           Constrain monitoring to these slices (default: all)
266                 -p, --period=SECONDS    Interval in seconds over which to enforce average byte limits (default: %s)
267                 -h, --help                          This message
268 """.lstrip() % (sys.argv[0], debug, verbose, datafile, format_period(period))
269
270 def main():
271         # Defaults
272         global debug, verbose, datafile, period, nm
273         # All slices
274         names = []
275
276         try:
277                 longopts = ["debug", "verbose", "file=", "slice=", "period=", "help"]
278                 (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:p:h", longopts)
279         except getopt.GetoptError, err:
280                 print "Error: " + err.msg
281                 usage()
282                 sys.exit(1)
283
284         for (opt, optval) in opts:
285                 if opt == "-d" or opt == "--debug":
286                         debug = True
287                 elif opt == "-v" or opt == "--verbose":
288                         verbose += 1
289                         bwlimit.verbose = verbose - 1
290                 elif opt == "-f" or opt == "--file":
291                         datafile = optval
292                 elif opt == "-s" or opt == "--slice":
293                         names.append(optval)
294                 elif opt == "-p" or opt == "--period":
295                         period = int(optval)
296                 else:
297                         usage()
298                         sys.exit(0)
299
300         # Check if we are already running
301         writepid("bwmon")
302
303         if not debug:
304                 # Redirect stdout and stderr to syslog
305                 syslog.openlog("bwmon")
306                 sys.stdout = sys.stderr = Logger()
307
308         try:
309                 f = open(datafile, "r+")
310                 if verbose:
311                         print "Loading %s" % datafile
312                 (version, slices) = pickle.load(f)
313                 f.close()
314                 # Check version of data file
315                 if version != "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $":
316                         print "Not using old version '%s' data file %s" % (version, datafile)
317                         raise Exception
318         except Exception:
319                 version = "$Id: bwmon.py,v 1.5.2.2 2006/08/21 21:27:35 mlhuang Exp $"
320                 slices = {}
321
322         # Get special slice IDs
323         root_xid = bwlimit.get_xid("root")
324         default_xid = bwlimit.get_xid("default")
325
326         #Open connection to Node Manager. Global.
327         nm = NM()
328
329         live = []
330         # Get actuall running values from tc.
331         for params in bwlimit.get():
332                 (xid, share,
333                  minrate, maxrate,
334                  minexemptrate, maxexemptrate,
335                  bytes, exemptbytes) = params
336                 live.append(xid)
337                 
338                 if verbose:
339                         print("\nRunning stats for %s"\
340                             "\n minrate %s, maxrate %s, minexemptrate %s,"\
341                                 " maxexemptrate %s, bytes %s, exemptbytes %s" % \
342                                 (bwlimit.get_slice(xid), 
343                                 bwlimit.format_tc_rate(minrate),
344                                 bwlimit.format_tc_rate(maxrate), 
345                                 bwlimit.format_tc_rate(minexemptrate),
346                                 bwlimit.format_tc_rate(maxexemptrate), 
347                                 bytes, 
348                                 exemptbytes))
349
350                 # Ignore root and default buckets
351                 if xid == root_xid or xid == default_xid:
352                         continue
353
354                 name = bwlimit.get_slice(xid)
355                 if name is None:
356                         # Orphaned (not associated with a slice) class
357                         name = "%d?" % xid
358
359                 # Monitor only the specified slices
360                 if names and name not in names:
361                         continue
362                 #slices is populated from the pickle file
363                 #xid is populated from bwlimit (read from /etc/passwd) 
364                 if slices.has_key(xid):
365                         slice = slices[xid]
366                         if time.time() >= (slice.time + period) or \
367                            bytes < slice.bytes or exemptbytes < slice.exemptbytes:
368                                 # Reset to defaults every 24 hours or if it appears
369                                 # that the byte counters have overflowed (or, more
370                                 # likely, the node was restarted or the HTB buckets
371                                 # were re-initialized).
372                                 slice.reset(maxrate, maxexemptrate, bytes, exemptbytes)
373                         else:
374                                 # Update byte counts
375                                 slice.update(maxrate, maxexemptrate, bytes, exemptbytes)
376                 else:
377                         # New slice, initialize state
378                         slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, exemptbytes)
379
380         # Delete dead slices
381         dead = Set(slices.keys()) - Set(live)
382         for xid in dead:
383                 del slices[xid]
384
385         if verbose:
386                 print "Saving %s" % datafile
387         f = open(datafile, "w")
388         pickle.dump((version, slices), f)
389         f.close()
390
391         removepid("bwmon")
392
393 if __name__ == '__main__':
394         main()