import cPickle import sys import threading import time from config import DB_FILE import accounts import bwcap import logger import tools class Database(dict): def deliver_records(self, recs): ts = self.get_timestamp() for rec in recs: old_rec = self.setdefault(rec['record_key'], {}) if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)): old_rec.update(rec, dirty=True) self.compute_effective_rspecs() if self.get_timestamp() > ts: self.delete_old_records() self.delete_old_accounts() for rec in self.itervalues(): rec['dirty'] = True self.create_new_accounts() self.update_bwcap() def get_timestamp(self): return self.get('timestamp', {'timestamp': 0})['timestamp'] def compute_effective_rspecs(self): """Apply loans to field 'rspec' to get field 'eff_rspec'.""" slivers = dict([(rec['name'], rec) for rec in self.itervalues() \ if rec.get('account_type') == 'sliver']) # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value for sliver in slivers.itervalues(): sliver['old_eff_rspec'] = sliver.get('eff_rspec') sliver['eff_rspec'] = sliver['rspec'].copy() # Pass 2: apply loans for sliver in slivers.itervalues(): remaining_loanable_amount = sliver['rspec'].copy() for other_name, resource, amount in sliver.get('loans', []): if other_name in slivers and \ 0 < amount <= remaining_loanable_amount[resource]: sliver['eff_rspec'][resource] -= amount remaining_loanable_amount[resource] -= amount slivers[other_name]['eff_rspec'][resource] += amount # Pass 3: mark changed rspecs dirty for sliver in slivers.itervalues(): if sliver['eff_rspec'] != sliver['old_eff_rspec']: sliver['needs_update'] = True del sliver['old_eff_rspec'] def delete_old_records(self): ts = self.get_timestamp() now = time.time() for key in self.keys(): rec = self[key] if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now: del self[key] def delete_old_accounts(self): for acct_type, name in accounts.all(): if ('%s_%s' % (acct_type, name)) not in self: accounts.get(name).ensure_destroyed() def create_new_accounts(self): """Invoke the appropriate create() function for every dirty account.""" for rec in self.itervalues(): if 'account_type' not in rec: continue if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec) rec['dirty'] = False def update_bwcap(self): bwcap_rec = self.get('bwcap') if bwcap_rec and bwcap_rec['dirty']: bwcap.update(bwcap_rec) bwcap_rec['dirty'] = False # database object and associated lock _db_lock = threading.RLock() _db = Database() # these are used in tandem to request a database dump from the dumper daemon _db_cond = threading.Condition(_db_lock) _dump_requested = False # decorator that acquires and releases the database lock before and after the decorated operation def synchronized(function): def sync_fun(*args, **kw_args): _db_lock.acquire() try: return function(*args, **kw_args) finally: _db_lock.release() sync_fun.__doc__ = function.__doc__ sync_fun.__name__ = function.__name__ return sync_fun # apply the given records to the database and request a dump @synchronized def deliver_records(recs): global _dump_requested _db.deliver_records(recs) _dump_requested = True _db_cond.notify() @synchronized def get_sliver(name): return _db.get('sliver_'+name) def start(): """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever.""" def run(): global _dump_requested _db_lock.acquire() try: # load the db f = open(DB_FILE) _db.update(cPickle.load(f)) f.close() except: logger.log_exc() while True: # handle dump requests forever while not _dump_requested: _db_cond.wait() db_copy = tools.deepcopy(_db) _dump_requested = False _db_lock.release() try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1)) except: logger.log_exc() _db_lock.acquire() tools.as_daemon_thread(run)