# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
# Copyright (C) 2004-2006 The Trustees of Princeton University
#
-# $Id: bwmon.py,v 1.1.2.7 2007/04/24 23:11:20 faiyaza Exp $
+# $Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $
#
import os
import sys
import time
import pickle
-
import socket
-import bwlimit
import logger
+import copy
+import threading
+import tools
+
+import bwlimit
+import database
from sets import Set
# Defaults
debug = False
-verbose = True
+verbose = False
datafile = "/var/lib/misc/bwmon.dat"
#nm = None
self.emailed = True
slicemail(self.name, subject, message + (footer % params))
-def GetSlivers(db):
+def gethtbs(root_xid, default_xid):
+ """
+ Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
+ Turn off HTBs without names.
+ """
+ livehtbs = {}
+ for params in bwlimit.get():
+ (xid, share,
+ minrate, maxrate,
+ minexemptrate, maxexemptrate,
+ usedbytes, usedi2bytes) = params
+
+ name = bwlimit.get_slice(xid)
+
+ if (name is None) \
+ and (xid != root_xid) \
+ and (xid != default_xid):
+ # Orphaned (not associated with a slice) class
+ name = "%d?" % xid
+ logger.log("bwmon: Found orphaned HTB %s. Removing." %name)
+ bwlimit.off(xid)
+
+ livehtbs[xid] = {'share': share,
+ 'minrate': minrate,
+ 'maxrate': maxrate,
+ 'maxexemptrate': maxexemptrate,
+ 'minexemptrate': minexemptrate,
+ 'usedbytes': usedbytes,
+ 'name': name,
+ 'usedi2bytes': usedi2bytes}
+
+ return livehtbs
+
+def sync(nmdbcopy):
+ """
+ Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit.
+ """
# Defaults
global datafile, \
period, \
# All slices
names = []
-
# Incase the limits have changed.
default_MaxRate = int(bwlimit.get_bwcap() / 1000)
default_Maxi2Rate = int(bwlimit.bwmax / 1000)
(version, slices) = pickle.load(f)
f.close()
# Check version of data file
- if version != "$Id: bwmon.py,v 1.1.2.7 2007/04/24 23:11:20 faiyaza Exp $":
+ if version != "$Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $":
logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile))
raise Exception
except Exception:
- version = "$Id: bwmon.py,v 1.1.2.7 2007/04/24 23:11:20 faiyaza Exp $"
+ version = "$Id: bwmon.py,v 1.21 2007/06/16 14:30:17 faiyaza Exp $"
slices = {}
# Get/set special slice IDs
live = {}
# Get running slivers that should be on this node (from plc). {xid: name}
- for sliver in db.keys():
- live[bwlimit.get_xid(sliver)] = sliver
+ # db keys on name, bwmon keys on xid. db doesnt have xid either.
+ for plcSliver in nmdbcopy.keys():
+ live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
- # Setup new slices.
- # live.xids - runing(slices).xids = new.xids
- newslicesxids = []
- for plcxid in live.keys():
- if plcxid not in slices.keys():
- newslicesxids.append(plcxid)
+ logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__())
+ logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__())
+
+ # Get actual running values from tc.
+ # Update slice totals and bandwidth. {xid: {values}}
+ livehtbs = gethtbs(root_xid, default_xid)
+ logger.log("bwmon: Found %s running HTBs" % livehtbs.keys().__len__())
- #newslicesxids = Set(live.keys()) - Set(slices.keys())
- for newslicexid in newslicesxids:
+ # Get new slices.
+ # live.xids - runing(slices).xids = new.xids
+ #newslicesxids = Set(live.keys()) - Set(slices.keys())
+ newslicesxids = Set(live.keys()) - Set(livehtbs.keys())
+ logger.log("bwmon: Found %s new slices" % newslicesxids.__len__())
+
+ # Incase we rebooted and need to keep track of already running htbs
+ norecxids = Set(livehtbs.keys()) - Set(slices.keys())
+ logger.log("bwmon: Found %s slices that have htbs but not in dat." % norecxids.__len__())
+ # Reset tc counts.
+ for norecxid in norecxids:
+ slices[norecxid] = Slice(norecxid, live[norecxid]['name'], live[norecxid]['_rspec'])
+ slices[norecxid].reset(livehtbs[norecxid]['maxrate'],
+ livehtbs[norecxid]['maxexemptrate'],
+ livehtbs[norecxid]['usedbytes'],
+ livehtbs[norecxid]['usedi2bytes'],
+ live[norecxid]['_rspec'])
+
+ # Setup new slices
+ for newslice in newslicesxids:
# Delegated slices dont have xids (which are uids) since they haven't been
# instantiated yet.
- if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True:
- logger.log("bwmon: New Slice %s" % live[newslicexid])
+ if newslice != None and live[newslice].has_key('_rspec') == True:
+ logger.log("bwmon: New Slice %s" % live[newslice]['name'])
# _rspec is the computed rspec: NM retrieved data from PLC, computed loans
# and made a dict of computed values.
- rspec = db[live[newslicexid]]['_rspec']
- slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec)
- slices[newslicexid].reset(0, 0, 0, 0, rspec)
+ slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
+ slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec'])
else:
- logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslicexid])
+ logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name'])
- # ...mlhuang's abortion....
- # Get actual running values from tc.
- # Update slice totals and bandwidth.
- for params in bwlimit.get():
- (xid, share,
- minrate, maxrate,
- minexemptrate, maxexemptrate,
- usedbytes, usedi2bytes) = params
-
- # Ignore root and default buckets
+ # Delete dead slices.
+ # First delete dead slices that exist in the pickle file, but
+ # aren't instantiated by PLC.
+ dead = Set(slices.keys()) - Set(live.keys())
+ logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2))
+ for xid in dead:
if xid == root_xid or xid == default_xid:
continue
+ logger.log("bwmon: removing dead slice %s " % xid)
+ if slices.has_key(xid): del slices[xid]
+ if livehtbs.has_key(xid): bwlimit.off(xid)
- name = bwlimit.get_slice(xid)
- if name is None:
- # Orphaned (not associated with a slice) class
- name = "%d?" % xid
- bwlimit.off(xid)
+ # Get actual running values from tc since we've added and removed buckets.
+ # Update slice totals and bandwidth. {xid: {values}}
+ livehtbs = gethtbs(root_xid, default_xid)
+ logger.log("bwmon: now %s running HTBs" % livehtbs.keys().__len__())
+ for (xid, slice) in slices.iteritems():
# Monitor only the specified slices
+ if xid == root_xid or xid == default_xid: continue
if names and name not in names:
continue
- #slices is populated from the pickle file
- #xid is populated from bwlimit (read from /etc/passwd)
- if slices.has_key(xid):
- slice = slices[xid]
- # Old slices werent being instanciated correctly because
- # the HTBs were still pleasent, but the slice in bwmon would
- # have the byte counts set to 0. The next time update was run
- # the real byte count would be sent to update, causing the bw cap.
- if time.time() >= (slice.time + period) or \
- usedbytes < slice.bytes or \
- usedi2bytes < slice.i2bytes or \
- xid in newslicesxids:
- # Reset to defaults every 24 hours or if it appears
- # that the byte counters have overflowed (or, more
- # likely, the node was restarted or the HTB buckets
- # were re-initialized).
- slice.reset(maxrate, \
- maxexemptrate, \
- usedbytes, \
- usedi2bytes, \
- db[slice.name]['_rspec'])
- else:
- # Update byte counts
- slice.update(maxrate, \
- maxexemptrate, \
- usedbytes, \
- usedi2bytes, \
- db[slice.name]['_rspec'])
+ if (time.time() >= (slice.time + period)) or \
+ (livehtbs[xid]['usedbytes'] < slice.bytes) or \
+ (livehtbs[xid]['usedi2bytes'] < slice.i2bytes):
+ # Reset to defaults every 24 hours or if it appears
+ # that the byte counters have overflowed (or, more
+ # likely, the node was restarted or the HTB buckets
+ # were re-initialized).
+ slice.reset(livehtbs[xid]['maxrate'], \
+ livehtbs[xid]['maxexemptrate'], \
+ livehtbs[xid]['usedbytes'], \
+ livehtbs[xid]['usedi2bytes'], \
+ live[xid]['_rspec'])
else:
- # Just in case. Probably (hopefully) this will never happen.
- # New slice, initialize state
- logger.log("bwmon: New Slice %s" % name)
- slice = slices[xid] = Slice(xid, name, db[slice.name]['_rspec'])
- slice.reset(maxrate, \
- maxexemptrate, \
- usedbytes, \
- usedi2bytes, \
- db[slice.name]['_rspec'])
-
- # Delete dead slices
- dead = Set(slices.keys()) - Set(live.keys())
- for xid in dead:
- if xid == root_xid or xid == default_xid:
- continue
- del slices[xid]
- bwlimit.off(xid)
-
- logger.log("bwmon: Saving %s" % datafile)
+ if debug: logger.log("bwmon: Updating slice %s" % slice.name)
+ # Update byte counts
+ slice.update(livehtbs[xid]['maxrate'], \
+ livehtbs[xid]['maxexemptrate'], \
+ livehtbs[xid]['usedbytes'], \
+ livehtbs[xid]['usedi2bytes'], \
+ live[xid]['_rspec'])
+
+ logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile))
f = open(datafile, "w")
pickle.dump((version, slices), f)
f.close()
-
-#def GetSlivers(data):
-# for sliver in data['slivers']:
-# if sliver.has_key('attributes'):
-# print sliver
-# for attribute in sliver['attributes']:
-# if attribute['name'] == "KByteThresh": print attribute['value']
-
-#def start(options, config):
-# pass
+lock = threading.Event()
+def run():
+ """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
+ if debug: logger.log("bwmon: Thread started")
+ while True:
+ lock.wait()
+ if debug: logger.log("bwmon: Event received. Running.")
+ database.db_lock.acquire()
+ nmdbcopy = copy.deepcopy(database.db)
+ database.db_lock.release()
+ try: sync(nmdbcopy)
+ except: logger.log_exc()
+ lock.clear()
+
+def start(*args):
+ tools.as_daemon_thread(run)
+
+def GetSlivers(*args):
+ pass