+"""The database houses information on slivers. This information
+reaches the sliver manager in two different ways: one, through the
+GetSlivers() call made periodically; two, by users delivering tickets.
+The sync() method of the Database class turns this data into reality.
+
+The database format is a dictionary that maps account names to records
+(also known as dictionaries). Inside each record, data supplied or
+computed locally is stored under keys that begin with an underscore,
+and data from PLC is stored under keys that don't.
+
+In order to maintain service when the node reboots during a network
+partition, the database is constantly being dumped to disk.
+"""
+
import cPickle
import threading
import time
-try: from bwlimit import bwmin, bwmax
-except ImportError: bwmin, bwmax = 8, 1000000000
import accounts
import logger
import tools
+import bwmon
+# We enforce minimum allocations to keep the clueless from hosing their slivers.
+# Disallow disk loans because there's currently no way to punish slivers over quota.
+MINIMUM_ALLOCATION = {'cpu_pct': 0, 'cpu_share': 1, 'net_min_rate': 0, 'net_max_rate': 8, 'net_i2_min_rate': 0, 'net_i2_max_rate': 8, 'net_share': 1}
+LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
-DB_FILE = '/root/node_mgr_db.pickle'
-
-LOANABLE_RESOURCES = ['cpu_min', 'cpu_share', 'net_min', 'net_max', 'net2_min', 'net2_max', 'net_share', 'disk_max']
-
-DEFAULT_ALLOCATIONS = {'enabled': 1, 'cpu_min': 0, 'cpu_share': 32, 'net_min': bwmin, 'net_max': bwmax, 'net2_min': bwmin, 'net2_max': bwmax, 'net_share': 1, 'disk_max': 5000000}
+DB_FILE = '/root/sliver_mgr_db.pickle'
# database object and associated lock
dump_requested = False
# decorator that acquires and releases the database lock before and after the decorated operation
+# XXX - replace with "with" statements once we switch to 2.5
def synchronized(fn):
def sync_fn(*args, **kw_args):
db_lock.acquire()
eff_rspec = rec['_rspec']
resid_rspec = rec['rspec'].copy()
for target, resname, amt in rec.get('_loans', []):
- if target in slivers and amt < resid_rspec[resname]:
+ if target in slivers and amt <= resid_rspec[resname] - MINIMUM_ALLOCATION[resname]:
eff_rspec[resname] -= amt
resid_rspec[resname] -= amt
slivers[target]['_rspec'][resname] += amt
def deliver_record(self, rec):
"""A record is simply a dictionary with 'name' and 'timestamp' keys. We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
+ if rec['timestamp'] < self._min_timestamp: return
name = rec['name']
old_rec = self.get(name)
- if old_rec != None and rec['timestamp'] > old_rec['timestamp']:
+ if old_rec == None: self[name] = rec
+ elif rec['timestamp'] > old_rec['timestamp']:
for key in old_rec.keys():
if not key.startswith('_'): del old_rec[key]
old_rec.update(rec)
- elif rec['timestamp'] >= self._min_timestamp: self[name] = rec
def set_min_timestamp(self, ts):
+ """The ._min_timestamp member is the timestamp on the last comprehensive update. We use it to determine if a record is stale. This method should be called whenever new GetSlivers() data comes in."""
self._min_timestamp = ts
for name, rec in self.items():
if rec['timestamp'] < ts: del self[name]
def sync(self):
+ """Synchronize reality with the database contents. This method does a lot of things, and it's currently called after every single batch of database changes (a GetSlivers(), a loan, a record). It may be necessary in the future to do something smarter."""
+
# delete expired records
now = time.time()
for name, rec in self.items():
self._compute_effective_rspecs()
# create and destroy accounts as needed
+ logger.verbose("database:sync : fetching accounts")
existing_acct_names = accounts.all()
for name in existing_acct_names:
+ logger.verbose("database:sync : loop on %s"%name)
if name not in self: accounts.get(name).ensure_destroyed()
for name, rec in self.iteritems():
if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
+ if rec['instantiation'] == 'nm-controller': accounts.get(name).ensure_created(rec)
- # request a database dump
+ # Wake up bwmom to update limits.
+ bwmon.lock.set()
global dump_requested
dump_requested = True
db_cond.notify()
-@synchronized
-def GetSlivers_callback(data):
- for d in data:
- for sliver in d['slivers']:
- rec = sliver.copy()
- attr_dict = {}
- for attr in rec.pop('attributes'): attr_dict[attr['name']] = attr_dict[attr['value']]
- keys = rec.pop('keys')
- rec['keys'] = '\n'.join([key_struct['key'] for key_struct in keys])
- rspec = {}
- rec['rspec'] = rspec
- for resname, default_amt in DEFAULT_ALLOCATIONS.iteritems():
- try: amt = int(attr_dict[resname])
- except (KeyError, ValueError): amt = default_amt
- rspec[resname] = amt
- db.set_min_timestamp(d['timestamp'])
- db.sync()
-
-
def start():
"""The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
def run():
while True:
db_lock.acquire()
while not dump_requested: db_cond.wait()
- db_copy = tools.deepcopy(db)
+ db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
dump_requested = False
db_lock.release()
- try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
+ try: tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
except: logger.log_exc()
global db
try: