X-Git-Url: http://git.onelab.eu/?p=nodemanager.git;a=blobdiff_plain;f=database.py;h=a7099bc7cd0cb1717837a63cb5efdd1830229008;hp=782922a88f4d673b0b6bf2f5f4e4e8e7be888b6d;hb=HEAD;hpb=114082fb689c7b39d66c1041df0256120321fca6 diff --git a/database.py b/database.py index 782922a..a7099bc 100644 --- a/database.py +++ b/database.py @@ -1,19 +1,47 @@ -import cPickle +# +"""The database houses information on slivers. This information +reaches the sliver manager in two different ways: one, through the +GetSlivers() call made periodically; two, by users delivering tickets. +The sync() method of the Database class turns this data into reality. + +The database format is a dictionary that maps account names to records +(also known as dictionaries). Inside each record, data supplied or +computed locally is stored under keys that begin with an underscore, +and data from PLC is stored under keys that don't. + +In order to maintain service when the node reboots during a network +partition, the database is constantly being dumped to disk. +""" + +import sys + +import pickle import threading import time -try: from bwlimit import bwmin, bwmax -except ImportError: bwmin, bwmax = 8, 1000000000 -import accounts +import account import logger import tools +import bwmon +# hopefully temporary +# is there a good reason to have this done here and not in a plugin ? +try: from coresched_lxc import CoreSched +except: from coresched_vs import CoreSched -DB_FILE = '/root/node_mgr_db.pickle' +# We enforce minimum allocations to keep the clueless from hosing their slivers. +# Disallow disk loans because there's currently no way to punish slivers over quota. +MINIMUM_ALLOCATION = {'cpu_pct': 0, + 'cpu_share': 1, + 'net_min_rate': 0, + 'net_max_rate': 8, + 'net_i2_min_rate': 0, + 'net_i2_max_rate': 8, + 'net_share': 1, + } +LOANABLE_RESOURCES = list(MINIMUM_ALLOCATION.keys()) -LOANABLE_RESOURCES = ['cpu_min', 'cpu_share', 'net_min', 'net_max', 'net2_min', 'net2_max', 'net_share', 'disk_max'] - -DEFAULT_ALLOCATIONS = {'enabled': 1, 'cpu_min': 0, 'cpu_share': 32, 'net_min': bwmin, 'net_max': bwmax, 'net2_min': bwmin, 'net2_max': bwmax, 'net_share': 1, 'disk_max': 5000000} +DB_FILE = '/var/lib/nodemanager/database.pickle' # database object and associated lock @@ -25,6 +53,7 @@ db_cond = threading.Condition(db_lock) dump_requested = False # decorator that acquires and releases the database lock before and after the decorated operation +# XXX - replace with "with" statements once we switch to 2.5 def synchronized(fn): def sync_fn(*args, **kw_args): db_lock.acquire() @@ -40,105 +69,137 @@ class Database(dict): self._min_timestamp = 0 def _compute_effective_rspecs(self): - """Calculate the effects of loans and store the result in field _rspec. At the moment, we allow slivers to loan only those resources that they have received directly from PLC. In order to do the accounting, we store three different rspecs: field 'rspec', which is the resources given by PLC; field '_rspec', which is the actual amount of resources the sliver has after all loans; and variable resid_rspec, which is the amount of resources the sliver has after giving out loans but not receiving any.""" + """Calculate the effects of loans and store the result in field _rspec. +At the moment, we allow slivers to loan only those resources that they have received directly from PLC. +In order to do the accounting, we store three different rspecs: + * field 'rspec', which is the resources given by PLC; + * field '_rspec', which is the actual amount of resources the sliver has after all loans; + * and variable resid_rspec, which is the amount of resources the sliver + has after giving out loans but not receiving any.""" slivers = {} - for name, rec in self.iteritems(): + for name, rec in self.items(): if 'rspec' in rec: rec['_rspec'] = rec['rspec'].copy() slivers[name] = rec - for rec in slivers.itervalues(): + for rec in slivers.values(): eff_rspec = rec['_rspec'] resid_rspec = rec['rspec'].copy() - for target, resname, amt in rec.get('_loans', []): - if target in slivers and amt < resid_rspec[resname]: - eff_rspec[resname] -= amt - resid_rspec[resname] -= amt - slivers[target]['_rspec'][resname] += amt + for target, resource_name, amount in rec.get('_loans', []): + if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]: + eff_rspec[resource_name] -= amount + resid_rspec[resource_name] -= amount + slivers[target]['_rspec'][resource_name] += amount def deliver_record(self, rec): - """A record is simply a dictionary with 'name' and 'timestamp' keys. We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys.""" + """A record is simply a dictionary with 'name' and 'timestamp' +keys. We keep some persistent private data in the records under keys +that start with '_'; thus record updates should not displace such +keys.""" + if rec['timestamp'] < self._min_timestamp: return name = rec['name'] old_rec = self.get(name) - if old_rec != None and rec['timestamp'] > old_rec['timestamp']: - for key in old_rec.keys(): - if not key.startswith('_'): del old_rec[key] + if old_rec == None: + self[name] = rec + elif rec['timestamp'] > old_rec['timestamp']: + for key in list(old_rec.keys()): + if not key.startswith('_'): + del old_rec[key] old_rec.update(rec) - elif rec['timestamp'] >= self._min_timestamp: self[name] = rec def set_min_timestamp(self, ts): + """The ._min_timestamp member is the timestamp on the last comprehensive update. +We use it to determine if a record is stale. +This method should be called whenever new GetSlivers() data comes in.""" self._min_timestamp = ts - for name, rec in self.items(): + for name, rec in list(self.items()): if rec['timestamp'] < ts: del self[name] def sync(self): + """Synchronize reality with the database contents. This +method does a lot of things, and it's currently called after every +single batch of database changes (a GetSlivers(), a loan, a record). +It may be necessary in the future to do something smarter.""" + # delete expired records now = time.time() - for name, rec in self.items(): + for name, rec in list(self.items()): if rec.get('expires', now) < now: del self[name] self._compute_effective_rspecs() + try: + coresched = CoreSched() + coresched.adjustCores(self) + except: + logger.log_exc("database: exception while doing core sched") + # create and destroy accounts as needed - existing_acct_names = accounts.all() + logger.verbose("database: sync : fetching accounts") + existing_acct_names = account.all() for name in existing_acct_names: - if name not in self: accounts.get(name).ensure_destroyed() - for name, rec in self.iteritems(): - if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec) - - # request a database dump + if name not in self: + logger.verbose("database: sync : ensure_destroy'ing %s"%name) + account.get(name).ensure_destroyed() + for name, rec in self.items(): + # protect this; if anything fails for a given sliver + # we still need the other ones to be handled + try: + sliver = account.get(name) + logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class())) + # Make sure we refresh accounts that are running + if rec['instantiation'] == 'plc-instantiated': + logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name) + sliver.ensure_created(rec) + elif rec['instantiation'] == 'nm-controller': + logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name) + sliver.ensure_created(rec) + # Back door to ensure PLC overrides Ticket in delegation. + elif rec['instantiation'] == 'delegated' and sliver._get_class() != None: + # if the ticket has been delivered and the nm-controller started the slice + # update rspecs and keep them up to date. + if sliver.is_running(): + logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name) + sliver.ensure_created(rec) + except SystemExit as e: + sys.exit(e) + except: + logger.log_exc("database: sync failed to handle sliver", name=name) + + # Wake up bwmom to update limits. + bwmon.lock.set() global dump_requested dump_requested = True db_cond.notify() def start(): - """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever.""" + """The database dumper daemon. +When it starts up, it populates the database with the last dumped database. +It proceeds to handle dump requests forever.""" def run(): global dump_requested while True: db_lock.acquire() while not dump_requested: db_cond.wait() - db_copy = tools.deepcopy(db) + db_pickle = pickle.dumps(db, pickle.HIGHEST_PROTOCOL) dump_requested = False db_lock.release() - try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1)) - except: logger.log_exc() + try: + tools.write_file( + DB_FILE, lambda f: f.write(db_pickle), binary=True) + logger.log_database(db) + except: + logger.log_exc("database.start: failed to pickle/dump") global db try: f = open(DB_FILE) - try: db = cPickle.load(f) + try: db = pickle.load(f) finally: f.close() + except IOError: + logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE) + db = Database() except: - logger.log_exc() + logger.log_exc("database: failed in start") db = Database() + logger.log('database.start') tools.as_daemon_thread(run) - -@synchronized -def GetSlivers_callback(data): - for d in data: - for sliver in d['slivers']: - rec = sliver.copy() - rec.setdefault('timestamp', d['timestamp']) - rec.setdefault('type', 'sliver.VServer') - - # convert attributes field to a proper dict - attr_dict = {} - for attr in rec.pop('attributes'): attr_dict[attr['name']] = attr['value'] - - # squash keys - keys = rec.pop('keys') - rec.setdefault('keys', '\n'.join([key_struct['key'] for key_struct in keys]) - - rec.setdefault('initscript', attr_dict.get('initscript')) - rec.setdefault('delegations', []) # XXX - delegation not yet supported - - # extract the implied rspec - rspec = {} - rec['rspec'] = rspec - for resname, default_amt in DEFAULT_ALLOCATIONS.iteritems(): - try: amt = int(attr_dict[resname]) - except (KeyError, ValueError): amt = default_amt - rspec[resname] = amt - db.deliver_record(rec) - db.set_min_timestamp(d['timestamp']) - db.sync()