X-Git-Url: http://git.onelab.eu/?p=nodemanager.git;a=blobdiff_plain;f=database.py;h=a7099bc7cd0cb1717837a63cb5efdd1830229008;hp=4555d485c491afe8c1d68d01c1900981ff95ce9c;hb=HEAD;hpb=dfbec103d5234340d11f454c70c82891e5ac9344 diff --git a/database.py b/database.py index 4555d48..a7099bc 100644 --- a/database.py +++ b/database.py @@ -1,133 +1,205 @@ -import cPickle +# +"""The database houses information on slivers. This information +reaches the sliver manager in two different ways: one, through the +GetSlivers() call made periodically; two, by users delivering tickets. +The sync() method of the Database class turns this data into reality. + +The database format is a dictionary that maps account names to records +(also known as dictionaries). Inside each record, data supplied or +computed locally is stored under keys that begin with an underscore, +and data from PLC is stored under keys that don't. + +In order to maintain service when the node reboots during a network +partition, the database is constantly being dumped to disk. +""" + import sys + +import pickle import threading import time -import accounts -import bwcap +import account import logger import tools +import bwmon +# hopefully temporary +# is there a good reason to have this done here and not in a plugin ? +try: from coresched_lxc import CoreSched +except: from coresched_vs import CoreSched -DB_FILE = '/root/pl_node_mgr_db.pickle' - +# We enforce minimum allocations to keep the clueless from hosing their slivers. +# Disallow disk loans because there's currently no way to punish slivers over quota. +MINIMUM_ALLOCATION = {'cpu_pct': 0, + 'cpu_share': 1, + 'net_min_rate': 0, + 'net_max_rate': 8, + 'net_i2_min_rate': 0, + 'net_i2_max_rate': 8, + 'net_share': 1, + } +LOANABLE_RESOURCES = list(MINIMUM_ALLOCATION.keys()) -class Database(dict): - def __init__(self): self.account_index = {} - - def deliver_records(self, recs): - ts = self.get_timestamp() - for rec in recs: - old_rec = self.setdefault(rec['record_key'], {}) - if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)): - old_rec.update(rec, dirty=True) - self.compute_effective_rspecs() - if self.get_timestamp() > ts: - self.delete_old_records() - self.delete_old_accounts() - for rec in self.itervalues(): rec['dirty'] = True - self.create_new_accounts() - self.update_bwcap() - - def compute_effective_rspecs(self): - """Apply loans to field 'rspec' to get field 'eff_rspec'.""" - slivers = dict([(rec['name'], rec) for rec in self.itervalues() \ - if rec.get('account_type') == 'sliver.VServer']) - - # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value - for sliver in slivers.itervalues(): - sliver['old_eff_rspec'] = sliver.get('eff_rspec') - sliver['eff_rspec'] = sliver['rspec'].copy() - - # Pass 2: apply loans - for sliver in slivers.itervalues(): - remaining_loanable_amount = sliver['rspec'].copy() - for other_name, resource, amount in sliver.get('loans', []): - if other_name in slivers and \ - 0 < amount <= remaining_loanable_amount[resource]: - sliver['eff_rspec'][resource] -= amount - remaining_loanable_amount[resource] -= amount - slivers[other_name]['eff_rspec'][resource] += amount - - # Pass 3: mark changed rspecs dirty - for sliver in slivers.itervalues(): - if sliver['eff_rspec'] != sliver['old_eff_rspec']: - sliver['needs_update'] = True - del sliver['old_eff_rspec'] - - def rebuild_account_index(self): - self.account_index.clear() - for rec in self.itervalues(): - if 'account_type' in rec: self.account_index[rec['name']] = rec - - def delete_stale_records(self, ts): - for key, rec in self.items(): - if rec['timestamp'] < ts: del self[key] - - def delete_expired_records(self): - for key, rec in self.items(): - if rec.get('expires', sys.maxint) < time.time(): del self[key] - - def destroy_old_accounts(self): - for name in accounts.all(): - if name not in self.account_index: accounts.get(name).ensure_destroyed() - - def create_new_accounts(self): - """Invoke the appropriate create() function for every dirty account.""" - for rec in self.account_index.itervalues(): - if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec) - rec['dirty'] = False - - def update_bwcap(self): - bwcap_rec = self.get('bwcap') - if bwcap_rec and bwcap_rec['dirty']: - bwcap.update(bwcap_rec) - bwcap_rec['dirty'] = False +DB_FILE = '/var/lib/nodemanager/database.pickle' # database object and associated lock -_db_lock = threading.RLock() -_db = Database() -# these are used in tandem to request a database dump from the dumper daemon -_db_cond = threading.Condition(_db_lock) -_dump_requested = False +db_lock = threading.RLock() +db = None +# these are used in tandem to request a database dump from the dumper daemon +db_cond = threading.Condition(db_lock) +dump_requested = False # decorator that acquires and releases the database lock before and after the decorated operation -def synchronized(function): - def sync_fun(*args, **kw_args): - _db_lock.acquire() - try: return function(*args, **kw_args) - finally: _db_lock.release() - sync_fun.__doc__ = function.__doc__ - sync_fun.__name__ = function.__name__ - return sync_fun - - -# apply the given records to the database and request a dump -@synchronized -def deliver_records(recs): - global _dump_requested - _db.deliver_records(recs) - _dump_requested = True - _db_cond.notify() +# XXX - replace with "with" statements once we switch to 2.5 +def synchronized(fn): + def sync_fn(*args, **kw_args): + db_lock.acquire() + try: return fn(*args, **kw_args) + finally: db_lock.release() + sync_fn.__doc__ = fn.__doc__ + sync_fn.__name__ = fn.__name__ + return sync_fn + + +class Database(dict): + def __init__(self): + self._min_timestamp = 0 + + def _compute_effective_rspecs(self): + """Calculate the effects of loans and store the result in field _rspec. +At the moment, we allow slivers to loan only those resources that they have received directly from PLC. +In order to do the accounting, we store three different rspecs: + * field 'rspec', which is the resources given by PLC; + * field '_rspec', which is the actual amount of resources the sliver has after all loans; + * and variable resid_rspec, which is the amount of resources the sliver + has after giving out loans but not receiving any.""" + slivers = {} + for name, rec in self.items(): + if 'rspec' in rec: + rec['_rspec'] = rec['rspec'].copy() + slivers[name] = rec + for rec in slivers.values(): + eff_rspec = rec['_rspec'] + resid_rspec = rec['rspec'].copy() + for target, resource_name, amount in rec.get('_loans', []): + if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]: + eff_rspec[resource_name] -= amount + resid_rspec[resource_name] -= amount + slivers[target]['_rspec'][resource_name] += amount + + def deliver_record(self, rec): + """A record is simply a dictionary with 'name' and 'timestamp' +keys. We keep some persistent private data in the records under keys +that start with '_'; thus record updates should not displace such +keys.""" + if rec['timestamp'] < self._min_timestamp: return + name = rec['name'] + old_rec = self.get(name) + if old_rec == None: + self[name] = rec + elif rec['timestamp'] > old_rec['timestamp']: + for key in list(old_rec.keys()): + if not key.startswith('_'): + del old_rec[key] + old_rec.update(rec) + + def set_min_timestamp(self, ts): + """The ._min_timestamp member is the timestamp on the last comprehensive update. +We use it to determine if a record is stale. +This method should be called whenever new GetSlivers() data comes in.""" + self._min_timestamp = ts + for name, rec in list(self.items()): + if rec['timestamp'] < ts: del self[name] + + def sync(self): + """Synchronize reality with the database contents. This +method does a lot of things, and it's currently called after every +single batch of database changes (a GetSlivers(), a loan, a record). +It may be necessary in the future to do something smarter.""" + + # delete expired records + now = time.time() + for name, rec in list(self.items()): + if rec.get('expires', now) < now: del self[name] + + self._compute_effective_rspecs() + + try: + coresched = CoreSched() + coresched.adjustCores(self) + except: + logger.log_exc("database: exception while doing core sched") + + # create and destroy accounts as needed + logger.verbose("database: sync : fetching accounts") + existing_acct_names = account.all() + for name in existing_acct_names: + if name not in self: + logger.verbose("database: sync : ensure_destroy'ing %s"%name) + account.get(name).ensure_destroyed() + for name, rec in self.items(): + # protect this; if anything fails for a given sliver + # we still need the other ones to be handled + try: + sliver = account.get(name) + logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class())) + # Make sure we refresh accounts that are running + if rec['instantiation'] == 'plc-instantiated': + logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name) + sliver.ensure_created(rec) + elif rec['instantiation'] == 'nm-controller': + logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name) + sliver.ensure_created(rec) + # Back door to ensure PLC overrides Ticket in delegation. + elif rec['instantiation'] == 'delegated' and sliver._get_class() != None: + # if the ticket has been delivered and the nm-controller started the slice + # update rspecs and keep them up to date. + if sliver.is_running(): + logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name) + sliver.ensure_created(rec) + except SystemExit as e: + sys.exit(e) + except: + logger.log_exc("database: sync failed to handle sliver", name=name) + + # Wake up bwmom to update limits. + bwmon.lock.set() + global dump_requested + dump_requested = True + db_cond.notify() + def start(): - """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever.""" + """The database dumper daemon. +When it starts up, it populates the database with the last dumped database. +It proceeds to handle dump requests forever.""" def run(): - global _dump_requested - _db_lock.acquire() - try: # load the db - f = open(DB_FILE) - _db.update(cPickle.load(f)) - f.close() - except: logger.log_exc() - while True: # handle dump requests forever - while not _dump_requested: _db_cond.wait() - db_copy = tools.deepcopy(_db) - _dump_requested = False - _db_lock.release() - try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1)) - except: logger.log_exc() - _db_lock.acquire() + global dump_requested + while True: + db_lock.acquire() + while not dump_requested: db_cond.wait() + db_pickle = pickle.dumps(db, pickle.HIGHEST_PROTOCOL) + dump_requested = False + db_lock.release() + try: + tools.write_file( + DB_FILE, lambda f: f.write(db_pickle), binary=True) + logger.log_database(db) + except: + logger.log_exc("database.start: failed to pickle/dump") + global db + try: + f = open(DB_FILE) + try: db = pickle.load(f) + finally: f.close() + except IOError: + logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE) + db = Database() + except: + logger.log_exc("database: failed in start") + db = Database() + logger.log('database.start') tools.as_daemon_thread(run)