* BWmon is now event driven and handles reboots. Also got rid of ALL legacy code.
[nodemanager.git] / bwmon.py
1 #!/usr/bin/python
2 #
3 # Average bandwidth monitoring script. Run periodically via cron(8) to
4 # enforce a soft limit on daily bandwidth usage for each slice. If a
5 # slice is found to have exceeded its daily bandwidth usage when the
6 # script is run, its instantaneous rate will be capped at the desired
7 # average rate. Thus, in the worst case, a slice will only be able to
8 # send a little more than twice its average daily limit.
9 #
10 # Two separate limits are enforced, one for destinations exempt from
11 # the node bandwidth cap, and the other for all other destinations.
12 #
13 # Mark Huang <mlhuang@cs.princeton.edu>
14 # Andy Bavier <acb@cs.princeton.edu>
15 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
16 # Copyright (C) 2004-2006 The Trustees of Princeton University
17 #
18 # $Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $
19 #
20
21 import os
22 import sys
23 import time
24 import pickle
25 import socket
26 import logger
27 import copy
28 import threading
29 import tools
30
31 import bwlimit
32 import database
33
34 from sets import Set
35
36 try:
37     sys.path.append("/etc/planetlab")
38     from plc_config import *
39 except:
40     logger.log("bwmon:  Warning: Configuration file /etc/planetlab/plc_config.py not found")
41     PLC_NAME = "PlanetLab"
42     PLC_SLICE_PREFIX = "pl"
43     PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org"
44     PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org"
45
46 # Constants
47 seconds_per_day = 24 * 60 * 60
48 bits_per_byte = 8
49
50 # Defaults
51 debug = False
52 verbose = False
53 datafile = "/var/lib/misc/bwmon.dat"
54 #nm = None
55
56 # Burst to line rate (or node cap).  Set by NM. in KBit/s
57 default_MaxRate = int(bwlimit.get_bwcap() / 1000)
58 default_Maxi2Rate = int(bwlimit.bwmax / 1000)
59 # Min rate 8 bits/s 
60 default_MinRate = 0
61 default_Mini2Rate = 0
62 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G 
63 # 5.4 Gbyte per day max allowed transfered per recording period
64 default_MaxKByte = 5662310
65 default_ThreshKByte = int(.8 * default_MaxKByte) 
66 # 16.4 Gbyte per day max allowed transfered per recording period to I2
67 default_Maxi2KByte = 17196646
68 default_Threshi2KByte = int(.8 * default_Maxi2KByte) 
69 # Default share quanta
70 default_Share = 1
71
72 # Average over 1 day
73 period = 1 * seconds_per_day
74
75 # Message template
76 template = \
77 """
78 The slice %(slice)s has transmitted more than %(bytes)s from
79 %(hostname)s to %(class)s destinations
80 since %(since)s.
81
82 Its maximum %(class)s burst rate will be capped at %(new_maxrate)s/s
83 until %(until)s.
84
85 Please reduce the average %(class)s transmission rate
86 of the slice to %(limit)s per %(period)s.
87
88 """.lstrip()
89
90 footer = \
91 """
92 %(date)s %(hostname)s bwcap %(slice)s
93 """.lstrip()
94
95 def format_bytes(bytes, si = True):
96     """
97     Formats bytes into a string
98     """
99     if si:
100         kilo = 1000.
101     else:
102         # Officially, a kibibyte
103         kilo = 1024.
104
105     if bytes >= (kilo * kilo * kilo):
106         return "%.1f GB" % (bytes / (kilo * kilo * kilo))
107     elif bytes >= 1000000:
108         return "%.1f MB" % (bytes / (kilo * kilo))
109     elif bytes >= 1000:
110         return "%.1f KB" % (bytes / kilo)
111     else:
112         return "%.0f bytes" % bytes
113
114 def format_period(seconds):
115     """
116     Formats a period in seconds into a string
117     """
118
119     if seconds == (24 * 60 * 60):
120         return "day"
121     elif seconds == (60 * 60):
122         return "hour"
123     elif seconds > (24 * 60 * 60):
124         return "%.1f days" % (seconds / 24. / 60. / 60.)
125     elif seconds > (60 * 60):
126         return "%.1f hours" % (seconds / 60. / 60.)
127     elif seconds > (60):
128         return "%.1f minutes" % (seconds / 60.)
129     else:
130         return "%.0f seconds" % seconds
131
132 def slicemail(slice, subject, body):
133     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
134
135     # PLC has a separate list for pl_mom messages
136     if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org":
137         to = ["pl-mom@planet-lab.org"]
138     else:
139         to = [PLC_MAIL_SUPPORT_ADDRESS]
140
141     if slice is not None and slice != "root":
142         to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice))
143
144     header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS),
145               'to': ", ".join(to),
146               'version': sys.version.split(" ")[0],
147               'subject': subject}
148
149     # Write headers
150     sendmail.write(
151 """
152 Content-type: text/plain
153 From: %(from)s
154 Reply-To: %(from)s
155 To: %(to)s
156 X-Mailer: Python/%(version)s
157 Subject: %(subject)s
158
159 """.lstrip() % header)
160
161     # Write body
162     sendmail.write(body)
163     # Done
164     sendmail.close()
165
166
167 class Slice:
168     """
169     Stores the last recorded bandwidth parameters of a slice.
170
171     xid - slice context/VServer ID
172     name - slice name
173     time - beginning of recording period in UNIX seconds
174     bytes - low bandwidth bytes transmitted at the beginning of the recording period
175     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
176     ByteMax - total volume of data allowed
177     ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
178     ExemptByteMax - Same as above, but for i2.
179     ExemptByteThresh - i2 ByteThresh
180     maxrate - max_rate slice attribute. 
181     maxexemptrate - max_exempt_rate slice attribute.
182     self.emailed = did we email during this recording period
183
184     """
185
186     def __init__(self, xid, name, rspec):
187         self.xid = xid
188         self.name = name
189         self.time = 0
190         self.bytes = 0
191         self.i2bytes = 0
192         self.MaxRate = default_MaxRate
193         self.MinRate = default_MinRate 
194         self.Maxi2Rate = default_Maxi2Rate
195         self.Mini2Rate = default_Mini2Rate
196         self.MaxKByte = default_MaxKByte
197         self.ThreshKByte = default_ThreshKByte
198         self.Maxi2KByte = default_Maxi2KByte
199         self.Threshi2KByte = default_Threshi2KByte
200         self.Share = default_Share
201         self.Sharei2 = default_Share
202         self.emailed = False
203
204         self.updateSliceAttributes(rspec)
205         bwlimit.set(xid = self.xid, 
206                 minrate = self.MinRate * 1000, 
207                 maxrate = self.MaxRate * 1000, 
208                 maxexemptrate = self.Maxi2Rate * 1000,
209                 minexemptrate = self.Mini2Rate * 1000,
210                 share = self.Share)
211
212     def __repr__(self):
213         return self.name
214
215     def updateSliceAttributes(self, rspec):
216         # Get attributes
217
218         # Sanity check plus policy decision for MinRate:
219         # Minrate cant be greater than 25% of MaxRate or NodeCap.
220         MinRate = int(rspec.get("net_min_rate", default_MinRate))
221         if MinRate > int(.25 * default_MaxRate):
222             MinRate = int(.25 * default_MaxRate)
223         if MinRate != self.MinRate:
224             self.MinRate = MinRate
225             logger.log("bwmon:  Updating %s: Min Rate = %s" %(self.name, self.MinRate))
226
227         MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000))
228         if MaxRate != self.MaxRate:
229             self.MaxRate = MaxRate
230             logger.log("bwmon:  Updating %s: Max Rate = %s" %(self.name, self.MaxRate))
231
232         Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate))
233         if Mini2Rate != self.Mini2Rate:
234             self.Mini2Rate = Mini2Rate 
235             logger.log("bwmon:  Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate))
236
237         Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000))
238         if Maxi2Rate != self.Maxi2Rate:
239             self.Maxi2Rate = Maxi2Rate
240             logger.log("bwmon:  Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate))
241                           
242         MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte))
243         if MaxKByte != self.MaxKByte:
244             self.MaxKByte = MaxKByte
245             logger.log("bwmon:  Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte))
246                           
247         Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte))
248         if Maxi2KByte != self.Maxi2KByte:
249             self.Maxi2KByte = Maxi2KByte
250             logger.log("bwmon:  Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte))
251                           
252         ThreshKByte = int(rspec.get('net_thresh_kbyte', default_ThreshKByte))
253         if ThreshKByte != self.ThreshKByte:
254             self.ThreshKByte = ThreshKByte
255             logger.log("bwmon:  Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte))
256                           
257         Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', default_Threshi2KByte))
258         if Threshi2KByte != self.Threshi2KByte:    
259             self.Threshi2KByte = Threshi2KByte
260             logger.log("bwmon:  Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte))
261  
262         Share = int(rspec.get('net_share', default_Share))
263         if Share != self.Share:
264             self.Share = Share
265             logger.log("bwmon:  Updating %s: Net Share = %s" %(self.name, self.Share))
266
267         Sharei2 = int(rspec.get('net_i2_share', default_Share))
268         if Sharei2 != self.Sharei2:
269             self.Sharei2 = Sharei2 
270             logger.log("bwmon:  Updating %s: Net i2 Share = %s" %(self.name, self.i2Share))
271
272
273     def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
274         """
275         Begin a new recording period. Remove caps by restoring limits
276         to their default values.
277         """
278         
279         # Query Node Manager for max rate overrides
280         self.updateSliceAttributes(rspec)    
281
282         # Reset baseline time
283         self.time = time.time()
284
285         # Reset baseline byte coutns
286         self.bytes = usedbytes
287         self.i2bytes = usedi2bytes
288
289         # Reset email 
290         self.emailed = False
291         maxrate = self.MaxRate * 1000 
292         maxi2rate = self.Maxi2Rate * 1000 
293         # Reset rates.
294         if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate):
295             logger.log("bwmon:  %s reset to %s/%s" % \
296                   (self.name,
297                    bwlimit.format_tc_rate(maxrate),
298                    bwlimit.format_tc_rate(maxi2rate)))
299             bwlimit.set(xid = self.xid, 
300                 minrate = self.MinRate * 1000, 
301                 maxrate = self.MaxRate * 1000, 
302                 maxexemptrate = self.Maxi2Rate * 1000,
303                 minexemptrate = self.Mini2Rate * 1000,
304                 share = self.Share)
305
306     def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
307         """
308         Update byte counts and check if byte limits have been
309         exceeded. 
310         """
311     
312         # Query Node Manager for max rate overrides
313         self.updateSliceAttributes(rspec)    
314      
315         # Prepare message parameters from the template
316         message = ""
317         params = {'slice': self.name, 'hostname': socket.gethostname(),
318                   'since': time.asctime(time.gmtime(self.time)) + " GMT",
319                   'until': time.asctime(time.gmtime(self.time + period)) + " GMT",
320                   'date': time.asctime(time.gmtime()) + " GMT",
321                   'period': format_period(period)} 
322
323         if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)):
324             if verbose:
325                 logger.log("bwmon: %s over thresh %s" \
326                   % (self.name, format_bytes(self.ThreshKByte * 1024)))
327             sum = self.bytes + (self.ThreshKByte * 1024)
328             maxbyte = self.MaxKByte * 1024
329             bytesused = usedbytes - self.bytes
330             timeused = int(time.time() - self.time)
331             new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused))
332             if new_maxrate < (self.MinRate * 1000):
333                 new_maxrate = self.MinRate * 1000
334         else:
335             new_maxrate = self.MaxRate * 1000 
336
337         # Format template parameters for low bandwidth message
338         params['class'] = "low bandwidth"
339         params['bytes'] = format_bytes(usedbytes - self.bytes)
340         params['limit'] = format_bytes(self.MaxKByte * 1024)
341         params['thresh'] = format_bytes(self.ThreshKByte * 1024)
342         params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate)
343
344         if verbose:
345             logger.log("bwmon:  %(slice)s %(class)s " \
346                   "%(bytes)s of %(limit)s max %(thresh)s thresh (%(new_maxrate)s/s maxrate)" % \
347                   params)
348
349         # Cap low bandwidth burst rate
350         if new_maxrate != runningmaxrate:
351             message += template % params
352             logger.log("bwmon:   ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params)
353     
354         if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)):
355             maxi2byte = self.Maxi2KByte * 1024
356             i2bytesused = usedi2bytes - self.i2bytes
357             timeused = int(time.time() - self.time)
358             new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused))
359             if new_maxi2rate < (self.Mini2Rate * 1000):
360                 new_maxi2rate = self.Mini2Rate * 1000
361         else:
362             new_maxi2rate = self.Maxi2Rate * 1000
363
364         # Format template parameters for high bandwidth message
365         params['class'] = "high bandwidth"
366         params['bytes'] = format_bytes(usedi2bytes - self.i2bytes)
367         params['limit'] = format_bytes(self.Maxi2KByte * 1024)
368         params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate)
369
370         if verbose:
371             logger.log("bwmon:  %(slice)s %(class)s " \
372                   "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params)
373
374         # Cap high bandwidth burst rate
375         if new_maxi2rate != runningmaxi2rate:
376             message += template % params
377             logger.log("bwmon:  %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params)
378
379         # Apply parameters
380         if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate:
381             bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate)
382
383         # Notify slice
384         if message and self.emailed == False:
385             subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params
386             if debug:
387                 logger.log("bwmon:  "+ subject)
388                 logger.log("bwmon:  "+ message + (footer % params))
389             else:
390                 self.emailed = True
391                 slicemail(self.name, subject, message + (footer % params))
392
393 def gethtbs(root_xid, default_xid):
394     """
395     Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
396         Turn off HTBs without names.
397     """
398     livehtbs = {}
399     for params in bwlimit.get():
400         (xid, share,
401          minrate, maxrate,
402          minexemptrate, maxexemptrate,
403          usedbytes, usedi2bytes) = params
404         
405         name = bwlimit.get_slice(xid)
406
407         
408         
409         if (name is None) \
410                 and (xid != root_xid) \
411                 and (xid != default_xid):
412             # Orphaned (not associated with a slice) class
413             name = "%d?" % xid
414             logger.log("bwmon:  Found orphaned HTB %s. Removing." %name)
415             bwlimit.off(xid)
416
417         livehtbs[xid] = {'share': share,
418             'minrate': minrate,
419             'maxrate': maxrate,
420             'maxexemptrate': maxexemptrate,
421             'minexemptrate': minexemptrate,
422             'usedbytes': usedbytes,
423             'name': name, 
424             'usedi2bytes': usedi2bytes}
425
426     return livehtbs
427
428 def sync(nmdbcopy):
429     """
430     Syncs tc, db, and bwmon.dat.  Then, starts new slices, kills old ones, and updates byte accounts for each running slice.  Sends emails and caps those that went over their limit.
431     """
432     # Defaults
433     global datafile, \
434         period, \
435         default_MaxRate, \
436         default_Maxi2Rate, \
437         default_MinRate, \
438         default_MaxKByte,\
439         default_ThreshKByte,\
440         default_Maxi2KByte,\
441         default_Threshi2KByte,\
442         default_Share,\
443         verbose
444
445     # All slices
446     names = []
447     # Incase the limits have changed. 
448     default_MaxRate = int(bwlimit.get_bwcap() / 1000)
449     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
450
451     # Incase default isn't set yet.
452     if default_MaxRate == -1:
453         default_MaxRate = 1000000
454
455     try:
456         f = open(datafile, "r+")
457         logger.log("bwmon:  Loading %s" % datafile)
458         (version, slices) = pickle.load(f)
459         f.close()
460         # Check version of data file
461         if version != "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $":
462             logger.log("bwmon:  Not using old version '%s' data file %s" % (version, datafile))
463             raise Exception
464     except Exception:
465         version = "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $"
466         slices = {}
467
468     # Get/set special slice IDs
469     root_xid = bwlimit.get_xid("root")
470     default_xid = bwlimit.get_xid("default")
471
472     # Since root is required for sanity, its not in the API/plc database, so pass {} 
473     # to use defaults.
474     if root_xid not in slices.keys():
475         slices[root_xid] = Slice(root_xid, "root", {})
476         slices[root_xid].reset(0, 0, 0, 0, {})
477     
478     # Used by bwlimit.  pass {} since there is no rspec (like above).
479     if default_xid not in slices.keys():
480         slices[default_xid] = Slice(default_xid, "default", {})
481         slices[default_xid].reset(0, 0, 0, 0, {})
482
483     live = {}
484     # Get running slivers that should be on this node (from plc). {xid: name}
485     # db keys on name, bwmon keys on xid.  db doesnt have xid either.
486     for plcSliver in nmdbcopy.keys():
487         live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
488
489     logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__())
490     logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__())
491
492     # Get actual running values from tc.
493     # Update slice totals and bandwidth. {xid: {values}}
494     livehtbs = gethtbs(root_xid, default_xid)
495     logger.log("bwmon:  Found %s running HTBs" % livehtbs.keys().__len__())
496
497     # Get new slices.
498     # live.xids - runing(slices).xids = new.xids
499     #newslicesxids = Set(live.keys()) - Set(slices.keys()) 
500     newslicesxids = Set(live.keys()) - Set(livehtbs.keys())
501     logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__())
502
503     # Incase we rebooted and need to bring up the htbs that are in the db but 
504     # not known to tc.
505     #nohtbxids = Set(slices.keys()) - Set(livehtbs.keys())
506     #logger.log("bwmon:  Found %s slices that should have htbs but dont." % nohtbxids.__len__())
507     #newslicesxids.update(nohtbxids)
508         
509     # Setup new slices
510     for newslice in newslicesxids:
511         # Delegated slices dont have xids (which are uids) since they haven't been
512         # instantiated yet.
513         if newslice != None and live[newslice].has_key('_rspec') == True:
514             logger.log("bwmon: New Slice %s" % live[newslice]['name'])
515             # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
516             # and made a dict of computed values.
517             slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
518             slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec'])
519         else:
520             logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslice]['name'])
521
522     # Delete dead slices.
523     # First delete dead slices that exist in the pickle file, but
524     # aren't instantiated by PLC.
525     dead = Set(slices.keys()) - Set(live.keys())
526     logger.log("bwmon:  Found %s dead slices" % (dead.__len__() - 2))
527     for xid in dead:
528         if xid == root_xid or xid == default_xid:
529             continue
530         logger.log("bwmon:  removing dead slice  %s " % xid)
531         if slices.has_key(xid): del slices[xid]
532         if livehtbs.has_key(xid): bwlimit.off(xid)
533
534     # Get actual running values from tc since we've added and removed buckets.
535     # Update slice totals and bandwidth. {xid: {values}}
536     livehtbs = gethtbs(root_xid, default_xid)
537     logger.log("bwmon:  now %s running HTBs" % livehtbs.keys().__len__())
538
539     for (xid, slice) in slices.iteritems():
540         # Monitor only the specified slices
541         if xid == root_xid or xid == default_xid: continue
542         if names and name not in names:
543             continue
544  
545         if (time.time() >= (slice.time + period)) or \
546         (livehtbs[xid]['usedbytes'] < slice.bytes) or \
547         (livehtbs[xid]['usedi2bytes'] < slice.i2bytes):
548             # Reset to defaults every 24 hours or if it appears
549             # that the byte counters have overflowed (or, more
550             # likely, the node was restarted or the HTB buckets
551             # were re-initialized).
552             slice.reset(livehtbs[xid]['maxrate'], \
553                 livehtbs[xid]['maxexemptrate'], \
554                 livehtbs[xid]['usedbytes'], \
555                 livehtbs[xid]['usedi2bytes'], \
556                 live[xid]['_rspec'])
557         else:
558             if debug:  logger.log("bwmon: Updating slice %s" % slice.name)
559             # Update byte counts
560             slice.update(livehtbs[xid]['maxrate'], \
561                 livehtbs[xid]['maxexemptrate'], \
562                 livehtbs[xid]['usedbytes'], \
563                 livehtbs[xid]['usedi2bytes'], \
564                 live[xid]['_rspec'])
565     
566     logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile))
567     f = open(datafile, "w")
568     pickle.dump((version, slices), f)
569     f.close()
570
571 lock = threading.Event()
572 def run():
573     """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
574     if debug:  logger.log("bwmon:  Thread started")
575     while True:
576         lock.wait()
577         if debug: logger.log("bwmon:  Event received.  Running.")
578         database.db_lock.acquire()
579         nmdbcopy = copy.deepcopy(database.db)
580         database.db_lock.release()
581         try:  sync(nmdbcopy)
582         except: logger.log_exc()
583         lock.clear()
584
585 def start(*args):
586     tools.as_daemon_thread(run)
587
588 def GetSlivers(*args):
589     pass