Needs to be tested before certifying good.
[nodemanager.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.20 2007/01/10 16:51:04 faiyaza Exp $
19 #
20
21 import os
22 import sys
23 import time
24 import pickle
25 import database
26
27 #import socket
28 #import xmlrpclib
29 import bwlimit
30
31 from sets import Set
32
33 # Utility functions
34 #from pl_mom import *
35
36 # Constants
37 seconds_per_day = 24 * 60 * 60
38 bits_per_byte = 8
39
40 # Defaults
41 debug = False
42 verbose = 0
43 datafile = "/var/lib/misc/bwmon.dat"
44 #nm = None
45
46 # Burst to line rate (or node cap).  Set by NM.
47 default_MaxRate = bwlimit.get_bwcap()
48 default_Maxi2Rate = bwlimit.bwmax
49 # Min rate 8 bits/s 
50 default_MinRate = 0
51 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G 
52 # 5.4 Gbyte per day max allowed transfered per recording period
53 default_MaxKByte = 5662310
54 default_ThreshKByte = int(.8 * default_MaxKByte) 
55 # 16.4 Gbyte per day max allowed transfered per recording period to I2
56 default_Maxi2KByte = 17196646
57 default_Threshi2KByte = int(.8 * default_Maxi2KByte) 
58 # Default share quanta
59 default_Share = 1
60
61 # Average over 1 day
62 period = 1 * seconds_per_day
63
64 # Message template
65 template = \
66 """
67 The slice %(slice)s has transmitted more than %(bytes)s from
68 %(hostname)s to %(class)s destinations
69 since %(since)s.
70
71 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
72 until %(until)s.
73
74 Please reduce the average %(class)s transmission rate
75 of the slice to %(limit)s per %(period)s.
76
77 """.lstrip()
78
79 footer = \
80 """
81 %(date)s %(hostname)s bwcap %(slice)s
82 """.lstrip()
83
84 class Slice:
85     """
86     Stores the last recorded bandwidth parameters of a slice.
87
88     xid - slice context/VServer ID
89     name - slice name
90     time - beginning of recording period in UNIX seconds
91     bytes - low bandwidth bytes transmitted at the beginning of the recording period
92     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
93     ByteMax - total volume of data allowed
94     ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
95     ExemptByteMax - Same as above, but for i2.
96     ExemptByteThresh - i2 ByteThresh
97     maxrate - max_rate slice attribute. 
98     maxexemptrate - max_exempt_rate slice attribute.
99     self.emailed = did we email during this recording period
100
101     """
102
103     def __init__(self, xid, name, maxrate, maxi2rate, bytes, i2bytes, data):
104         self.xid = xid
105         self.name = name
106         self.time = 0
107         self.bytes = 0
108         self.i2bytes = 0
109         self.MaxRate = default_MaxRate
110         self.MinRate = default_MinRate
111         self.Maxi2Rate = default_Maxi2Rate
112         self.MaxKByte = default_MaxKByte
113         self.ThreshKByte = default_ThreshKByte
114         self.Maxi2KByte = default_Maxi2KByte
115         self.Threshi2KByte = default_Threshi2KByte
116         self.Share = default_Share
117         self.emailed = False
118
119         # Get real values where applicable
120         self.reset(maxrate, maxi2rate, bytes, i2bytes, data)
121
122     def __repr__(self):
123         return self.name
124
125     @database.synchronized
126     def updateSliceAttributes(self, data):
127         for sliver in data['slivers']:
128             if sliver['name'] == self.name:    
129                 for attribute in sliver['attributes']:
130                     if attribute['name'] == 'net_min_rate':        
131                         self.MinRate = attribute['value']
132                     elif attribute['name'] == 'net_max_rate':        
133                         self.MaxRate = attribute['value']
134                     elif attribute['name'] == 'net_i2_min_rate':
135                         self.Mini2Rate = attribute['value']
136                     elif attribute['name'] == 'net_i2_max_rate':        
137                         self.Maxi2Rate = attribute['value']
138                     elif attribute['name'] == 'net_max_kbyte':        
139                         self.MaxKbyte = attribute['value']
140                     elif attribute['name'] == 'net_i2_max_kbyte':    
141                         self.Maxi2KByte = attribute['value']
142                     elif attribute['name'] == 'net_thresh_kbyte':    
143                         self.ThreshKByte = attribute['value']
144                     elif attribute['name'] == 'net_i2_thresh_kbyte':    
145                         self.Threshi2KByte = attribute['value']
146                     elif attribute['name'] == 'net_share':    
147                         self.Share = attribute['value']
148                     elif attribute['name'] == 'net_i2_share':    
149                         self.Sharei2 = attribute['value']
150
151     def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data):
152         """
153         Begin a new recording period. Remove caps by restoring limits
154         to their default values.
155         """
156         
157         # Query Node Manager for max rate overrides
158         self.updateSliceAttributes(data)    
159
160         # Reset baseline time
161         self.time = time.time()
162
163         # Reset baseline byte coutns
164         self.bytes = usedbytes
165         self.i2bytes = usedi2bytes
166
167         # Reset email 
168         self.emailed = False
169
170         # Reset rates.
171         if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
172             print "%s reset to %s/%s" % \
173                   (self.name,
174                    bwlimit.format_tc_rate(self.MaxRate),
175                    bwlimit.format_tc_rate(self.Maxi2Rate))
176             bwlimit.set(xid = self.xid, 
177                 minrate = self.MinRate, 
178                 maxrate = self.MaxRate, 
179                 maxexemptrate = self.Maxi2Rate,
180                 minexemptrate = self.Mini2Rate,
181                 share = self.Share)
182
183     def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data):
184         """
185         Update byte counts and check if byte limits have been
186         exceeded. 
187         """
188     
189         # Query Node Manager for max rate overrides
190         self.updateSliceAttributes(data)    
191      
192         # Prepare message parameters from the template
193         message = ""
194         params = {'slice': self.name, 'hostname': socket.gethostname(),
195                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
196                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
197                   'date': time.asctime(time.gmtime()) + " GMT",
198                   'period': format_period(period)} 
199
200         if usedi2bytes >= (self.usedbytes + self.ByteThresh):
201             maxbyte = self.MaxKByte * 1024
202             bytesused = bytes - self.bytes
203             timeused = int(time.time() - self.time)
204             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
205             if new_maxrate < self.MinRate:
206                 new_maxrate = self.MinRate
207         else:
208             new_maxrate = self.MaxRate 
209
210         # Format template parameters for low bandwidth message
211         params['class'] = "low bandwidth"
212         params['bytes'] = format_bytes(usedbytes - self.bytes)
213         params['maxrate'] = bwlimit.format_tc_rate(runningmaxrate)
214         params['limit'] = format_bytes(self.MaxKByte)
215         params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
216
217         if verbose:
218             print "%(slice)s %(class)s " \
219                   "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % \
220                   params
221
222         # Cap low bandwidth burst rate
223         if new_maxrate != runningmaxrate:
224             message += template % params
225             print "%(slice)s %(class)s capped at %(new_maxrate)s/s " % params
226     
227         if usedi2bytes >= (self.i2bytes + self.Threshi2KBytes):
228             maxi2byte = self.Maxi2KByte * 1024
229             i2bytesused = i2bytes - self.i2bytes
230             timeused = int(time.time() - self.time)
231             new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
232             if new_maxi2rate < self.Mini2Rate:
233                 new_maxi2rate = self.Mini2Rate
234         else:
235             new_maxi2rate = self.Maxi2Rate 
236
237         # Format template parameters for high bandwidth message
238         params['class'] = "high bandwidth"
239         params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
240         params['maxrate'] = bwlimit.format_tc_rate(runningmaxi2rate)
241         params['limit'] = format_bytes(self.Maxi2KByte)
242         params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
243
244         if verbose:
245             print "%(slice)s %(class)s " \
246                   "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params
247
248         # Cap high bandwidth burst rate
249         if new_maxi2rate != runningmaxi2rate:
250             message += template % params
251             print "%(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params
252
253         # Apply parameters
254         if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
255             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
256
257         # Notify slice
258         if message and self.emailed == False:
259             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
260             if debug:
261                 print subject
262                 print message + (footer % params)
263             else:
264                 self.emailed = True
265                 slicemail(self.name, subject, message + (footer % params))
266
267 def GetSlivers(data):
268     # Defaults
269     global datafile, \
270         period, \
271             default_MaxRate, \
272             default_Maxi2Rate, \
273             default_MinRate, \
274             default_MaxKByte,\
275             default_ThreshKByte,\
276         default_Maxi2KByte,\
277         default_Threshi2KByte,\
278         default_Share
279
280     # All slices
281     names = []
282
283     try:
284         f = open(datafile, "r+")
285         if verbose:
286             print "Loading %s" % datafile
287         (version, slices) = pickle.load(f)
288         f.close()
289         # Check version of data file
290         if version != "$Id: bwmon.py,v 1.20 2007/01/10 16:51:04 faiyaza Exp $":
291             print "Not using old version '%s' data file %s" % (version, datafile)
292             raise Exception
293     except Exception:
294         version = "$Id: bwmon.py,v 1.20 2007/01/10 16:51:04 faiyaza Exp $"
295         slices = {}
296
297     # Get special slice IDs
298     root_xid = bwlimit.get_xid("root")
299     default_xid = bwlimit.get_xid("default")
300
301         # {name: xid}
302     live = {}
303         for sliver in data['slivers']:
304                 live[sliver['name']] = bwlimit.get_xid(sliver['name'])
305
306     # Get actuall running values from tc.
307     for params in bwlimit.get():
308         (xid, share,
309          minrate, maxrate,
310          minexemptrate, maxexemptrate,
311          bytes, i2bytes) = params
312
313         # Ignore root and default buckets
314         if xid == root_xid or xid == default_xid:
315             continue
316
317         name = bwlimit.get_slice(xid)
318         if name is None:
319             # Orphaned (not associated with a slice) class
320             name = "%d?" % xid
321             bwlimit.off(xid)
322
323         # Monitor only the specified slices
324         if names and name not in names:
325             continue
326         #slices is populated from the pickle file
327         #xid is populated from bwlimit (read from /etc/passwd) 
328         if slices.has_key(xid):
329             slice = slices[xid]
330             if time.time() >= (slice.time + period) or \
331                bytes < slice.bytes or i2bytes < slice.i2bytes:
332                 # Reset to defaults every 24 hours or if it appears
333                 # that the byte counters have overflowed (or, more
334                 # likely, the node was restarted or the HTB buckets
335                 # were re-initialized).
336                 slice.reset(maxrate, maxexemptrate, bytes, i2bytes, data)
337             else:
338                 # Update byte counts
339                 slice.update(maxrate, maxexemptrate, bytes, i2bytes, data)
340         else:
341             # New slice, initialize state
342             slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, i2bytes, data)
343
344     # Delete dead slices
345     dead = Set(slices.keys()) - Set(live.values())
346     for xid in dead:
347         del slices[xid]
348         bwlimit.off(xid)
349
350     print "Saving %s" % datafile
351     f = open(datafile, "w")
352     pickle.dump((version, slices), f)
353     f.close()
354
355
356 #def GetSlivers(data):
357 #    for sliver in data['slivers']:
358 #        if sliver.has_key('attributes'):
359 #           print sliver
360 #            for attribute in sliver['attributes']:
361 #                if attribute['name'] == "KByteThresh": print attribute['value']
362
363 def start(options, config):
364     pass
365
366 if __name__ == '__main__':
367     main()