-import cPickle
+#
+"""The database houses information on slivers. This information
+reaches the sliver manager in two different ways: one, through the
+GetSlivers() call made periodically; two, by users delivering tickets.
+The sync() method of the Database class turns this data into reality.
+
+The database format is a dictionary that maps account names to records
+(also known as dictionaries). Inside each record, data supplied or
+computed locally is stored under keys that begin with an underscore,
+and data from PLC is stored under keys that don't.
+
+In order to maintain service when the node reboots during a network
+partition, the database is constantly being dumped to disk.
+"""
+
import sys
+
+import pickle
import threading
import time
-import accounts
-import bwcap
+import account
import logger
import tools
+import bwmon
+# hopefully temporary
+# is there a good reason to have this done here and not in a plugin ?
+try: from coresched_lxc import CoreSched
+except: from coresched_vs import CoreSched
-DB_FILE = '/root/pl_node_mgr_db.pickle'
-
+# We enforce minimum allocations to keep the clueless from hosing their slivers.
+# Disallow disk loans because there's currently no way to punish slivers over quota.
+MINIMUM_ALLOCATION = {'cpu_pct': 0,
+ 'cpu_share': 1,
+ 'net_min_rate': 0,
+ 'net_max_rate': 8,
+ 'net_i2_min_rate': 0,
+ 'net_i2_max_rate': 8,
+ 'net_share': 1,
+ }
+LOANABLE_RESOURCES = list(MINIMUM_ALLOCATION.keys())
-class Database(dict):
- def __init__(self): self.account_index = {}
-
- def deliver_records(self, recs):
- ts = self.get_timestamp()
- for rec in recs:
- old_rec = self.setdefault(rec['record_key'], {})
- if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
- old_rec.update(rec, dirty=True)
- self.compute_effective_rspecs()
- if self.get_timestamp() > ts:
- self.delete_old_records()
- self.delete_old_accounts()
- for rec in self.itervalues(): rec['dirty'] = True
- self.create_new_accounts()
- self.update_bwcap()
-
- def compute_effective_rspecs(self):
- """Apply loans to field 'rspec' to get field 'eff_rspec'."""
- slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
- if rec.get('account_type') == 'sliver.VServer'])
-
- # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
- for sliver in slivers.itervalues():
- sliver['old_eff_rspec'] = sliver.get('eff_rspec')
- sliver['eff_rspec'] = sliver['rspec'].copy()
-
- # Pass 2: apply loans
- for sliver in slivers.itervalues():
- remaining_loanable_amount = sliver['rspec'].copy()
- for other_name, resource, amount in sliver.get('loans', []):
- if other_name in slivers and \
- 0 < amount <= remaining_loanable_amount[resource]:
- sliver['eff_rspec'][resource] -= amount
- remaining_loanable_amount[resource] -= amount
- slivers[other_name]['eff_rspec'][resource] += amount
-
- # Pass 3: mark changed rspecs dirty
- for sliver in slivers.itervalues():
- if sliver['eff_rspec'] != sliver['old_eff_rspec']:
- sliver['needs_update'] = True
- del sliver['old_eff_rspec']
-
- def rebuild_account_index(self):
- self.account_index.clear()
- for rec in self.itervalues():
- if 'account_type' in rec: self.account_index[rec['name']] = rec
-
- def delete_stale_records(self, ts):
- for key, rec in self.items():
- if rec['timestamp'] < ts: del self[key]
-
- def delete_expired_records(self):
- for key, rec in self.items():
- if rec.get('expires', sys.maxint) < time.time(): del self[key]
-
- def destroy_old_accounts(self):
- for name in accounts.all():
- if name not in self.account_index: accounts.get(name).ensure_destroyed()
-
- def create_new_accounts(self):
- """Invoke the appropriate create() function for every dirty account."""
- for rec in self.account_index.itervalues():
- if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec)
- rec['dirty'] = False
-
- def update_bwcap(self):
- bwcap_rec = self.get('bwcap')
- if bwcap_rec and bwcap_rec['dirty']:
- bwcap.update(bwcap_rec)
- bwcap_rec['dirty'] = False
+DB_FILE = '/var/lib/nodemanager/database.pickle'
# database object and associated lock
-_db_lock = threading.RLock()
-_db = Database()
-# these are used in tandem to request a database dump from the dumper daemon
-_db_cond = threading.Condition(_db_lock)
-_dump_requested = False
+db_lock = threading.RLock()
+db = None
+# these are used in tandem to request a database dump from the dumper daemon
+db_cond = threading.Condition(db_lock)
+dump_requested = False
# decorator that acquires and releases the database lock before and after the decorated operation
-def synchronized(function):
- def sync_fun(*args, **kw_args):
- _db_lock.acquire()
- try: return function(*args, **kw_args)
- finally: _db_lock.release()
- sync_fun.__doc__ = function.__doc__
- sync_fun.__name__ = function.__name__
- return sync_fun
-
-
-# apply the given records to the database and request a dump
-@synchronized
-def deliver_records(recs):
- global _dump_requested
- _db.deliver_records(recs)
- _dump_requested = True
- _db_cond.notify()
+# XXX - replace with "with" statements once we switch to 2.5
+def synchronized(fn):
+ def sync_fn(*args, **kw_args):
+ db_lock.acquire()
+ try: return fn(*args, **kw_args)
+ finally: db_lock.release()
+ sync_fn.__doc__ = fn.__doc__
+ sync_fn.__name__ = fn.__name__
+ return sync_fn
+
+
+class Database(dict):
+ def __init__(self):
+ self._min_timestamp = 0
+
+ def _compute_effective_rspecs(self):
+ """Calculate the effects of loans and store the result in field _rspec.
+At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
+In order to do the accounting, we store three different rspecs:
+ * field 'rspec', which is the resources given by PLC;
+ * field '_rspec', which is the actual amount of resources the sliver has after all loans;
+ * and variable resid_rspec, which is the amount of resources the sliver
+ has after giving out loans but not receiving any."""
+ slivers = {}
+ for name, rec in self.items():
+ if 'rspec' in rec:
+ rec['_rspec'] = rec['rspec'].copy()
+ slivers[name] = rec
+ for rec in slivers.values():
+ eff_rspec = rec['_rspec']
+ resid_rspec = rec['rspec'].copy()
+ for target, resource_name, amount in rec.get('_loans', []):
+ if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
+ eff_rspec[resource_name] -= amount
+ resid_rspec[resource_name] -= amount
+ slivers[target]['_rspec'][resource_name] += amount
+
+ def deliver_record(self, rec):
+ """A record is simply a dictionary with 'name' and 'timestamp'
+keys. We keep some persistent private data in the records under keys
+that start with '_'; thus record updates should not displace such
+keys."""
+ if rec['timestamp'] < self._min_timestamp: return
+ name = rec['name']
+ old_rec = self.get(name)
+ if old_rec == None: self[name] = rec
+ elif rec['timestamp'] > old_rec['timestamp']:
+ for key in list(old_rec.keys()):
+ if not key.startswith('_'): del old_rec[key]
+ old_rec.update(rec)
+
+ def set_min_timestamp(self, ts):
+ """The ._min_timestamp member is the timestamp on the last comprehensive update.
+We use it to determine if a record is stale.
+This method should be called whenever new GetSlivers() data comes in."""
+ self._min_timestamp = ts
+ for name, rec in list(self.items()):
+ if rec['timestamp'] < ts: del self[name]
+
+ def sync(self):
+ """Synchronize reality with the database contents. This
+method does a lot of things, and it's currently called after every
+single batch of database changes (a GetSlivers(), a loan, a record).
+It may be necessary in the future to do something smarter."""
+
+ # delete expired records
+ now = time.time()
+ for name, rec in list(self.items()):
+ if rec.get('expires', now) < now: del self[name]
+
+ self._compute_effective_rspecs()
+
+ try:
+ coresched = CoreSched()
+ coresched.adjustCores(self)
+ except:
+ logger.log_exc("database: exception while doing core sched")
+
+ # create and destroy accounts as needed
+ logger.verbose("database: sync : fetching accounts")
+ existing_acct_names = account.all()
+ for name in existing_acct_names:
+ if name not in self:
+ logger.verbose("database: sync : ensure_destroy'ing %s"%name)
+ account.get(name).ensure_destroyed()
+ for name, rec in self.items():
+ # protect this; if anything fails for a given sliver
+ # we still need the other ones to be handled
+ try:
+ sliver = account.get(name)
+ logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class()))
+ # Make sure we refresh accounts that are running
+ if rec['instantiation'] == 'plc-instantiated':
+ logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
+ sliver.ensure_created(rec)
+ elif rec['instantiation'] == 'nm-controller':
+ logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
+ sliver.ensure_created(rec)
+ # Back door to ensure PLC overrides Ticket in delegation.
+ elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
+ # if the ticket has been delivered and the nm-controller started the slice
+ # update rspecs and keep them up to date.
+ if sliver.is_running():
+ logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
+ sliver.ensure_created(rec)
+ except SystemExit as e:
+ sys.exit(e)
+ except:
+ logger.log_exc("database: sync failed to handle sliver", name=name)
+
+ # Wake up bwmom to update limits.
+ bwmon.lock.set()
+ global dump_requested
+ dump_requested = True
+ db_cond.notify()
+
def start():
- """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
+ """The database dumper daemon.
+When it starts up, it populates the database with the last dumped database.
+It proceeds to handle dump requests forever."""
def run():
- global _dump_requested
- _db_lock.acquire()
- try: # load the db
- f = open(DB_FILE)
- _db.update(cPickle.load(f))
- f.close()
- except: logger.log_exc()
- while True: # handle dump requests forever
- while not _dump_requested: _db_cond.wait()
- db_copy = tools.deepcopy(_db)
- _dump_requested = False
- _db_lock.release()
- try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
- except: logger.log_exc()
- _db_lock.acquire()
+ global dump_requested
+ while True:
+ db_lock.acquire()
+ while not dump_requested: db_cond.wait()
+ db_pickle = pickle.dumps(db, pickle.HIGHEST_PROTOCOL)
+ dump_requested = False
+ db_lock.release()
+ try:
+ tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
+ logger.log_database(db)
+ except:
+ logger.log_exc("database.start: failed to pickle/dump")
+ global db
+ try:
+ f = open(DB_FILE)
+ try: db = pickle.load(f)
+ finally: f.close()
+ except IOError:
+ logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
+ db = Database()
+ except:
+ logger.log_exc("database: failed in start")
+ db = Database()
+ logger.log('database.start')
tools.as_daemon_thread(run)