Take the new doc out of the branch and into trunk
[nodemanager.git] / database.py
index bc1155e..46a4e47 100644 (file)
+"""The database houses information on slivers.  This information
+reaches the sliver manager in two different ways: one, through the
+GetSlivers() call made periodically; two, by users delivering tickets.
+The sync() method of the Database class turns this data into reality.
+
+The database format is a dictionary that maps account names to records
+(also known as dictionaries).  Inside each record, data supplied or
+computed locally is stored under keys that begin with an underscore,
+and data from PLC is stored under keys that don't.
+
+In order to maintain service when the node reboots during a network
+partition, the database is constantly being dumped to disk.
+"""
+
 import cPickle
-import sys
 import threading
 import time
 
-from config import DB_FILE
 import accounts
-import bwcap
 import logger
 import tools
+import bwmon
+
+# We enforce minimum allocations to keep the clueless from hosing their slivers.
+# Disallow disk loans because there's currently no way to punish slivers over quota.
+MINIMUM_ALLOCATION = {'cpu_min': 0, 'cpu_share': 32, 'net_min_rate': 0, 'net_max_rate': 8, 'net_i2_min_rate': 0, 'net_i2_max_rate': 8, 'net_share': 1}
+LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
+
+DB_FILE = '/root/sliver_mgr_db.pickle'
 
 
-_db_lock = threading.RLock()
-_db_cond = threading.Condition(_db_lock)
-_dump_requested = False
+# database object and associated lock
+db_lock = threading.RLock()
+db = None
 
+# these are used in tandem to request a database dump from the dumper daemon
+db_cond = threading.Condition(db_lock)
+dump_requested = False
 
-def synchronized(function):
-    def sync_fun(*args, **kw_args):
-        _db_lock.acquire()
-        try: return function(*args, **kw_args)
-        finally: _db_lock.release()
-    sync_fun.__doc__ = function.__doc__
-    sync_fun.__name__ = function.__name__
-    return sync_fun
+# decorator that acquires and releases the database lock before and after the decorated operation
+# XXX - replace with "with" statements once we switch to 2.5
+def synchronized(fn):
+    def sync_fn(*args, **kw_args):
+        db_lock.acquire()
+        try: return fn(*args, **kw_args)
+        finally: db_lock.release()
+    sync_fn.__doc__ = fn.__doc__
+    sync_fn.__name__ = fn.__name__
+    return sync_fn
 
 
 class Database(dict):
-    def deliver_records(self, recs):
-        ts = self.get_timestamp()
-        for rec in recs:
-            old_rec = self.setdefault(rec['record_key'], {})
-            if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
-                old_rec.update(rec, dirty=True)
-        self.compute_effective_rspecs()
-        if self.get_timestamp() > ts:
-            self.delete_old_records()
-            self.delete_old_accounts()
-            for rec in self.itervalues(): rec['dirty'] = True
-        self.create_new_accounts()
-        self.update_bwcap()
-
-    def get_timestamp(self):
-        return self.get('timestamp', {'timestamp': 0})['timestamp']
-
-
-    def compute_effective_rspecs(self):
-        """Apply loans to field 'rspec' to get field 'eff_rspec'."""
-        slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
-                        if rec.get('account_type') == 'sliver'])
-
-        # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
-        for sliver in slivers.itervalues():
-            sliver['old_eff_rspec'] = sliver.get('eff_rspec')
-            sliver['eff_rspec'] = sliver['rspec'].copy()
-
-        # Pass 2: apply loans
-        for sliver in slivers.itervalues():
-            remaining_loanable_amount = sliver['rspec'].copy()
-            for other_name, resource, amount in sliver.get('loans', []):
-                if other_name in slivers and \
-                       0 < amount <= remaining_loanable_amount[resource]:
-                    sliver['eff_rspec'][resource] -= amount
-                    remaining_loanable_amount[resource] -= amount
-                    slivers[other_name]['eff_rspec'][resource] += amount
-
-        # Pass 3: mark changed rspecs dirty
-        for sliver in slivers.itervalues():
-            if sliver['eff_rspec'] != sliver['old_eff_rspec']:
-                sliver['needs_update'] = True
-            del sliver['old_eff_rspec']
-
-
-    def delete_old_records(self):
-        ts = self.get_timestamp()
+    def __init__(self):
+        self._min_timestamp = 0
+
+    def _compute_effective_rspecs(self):
+        """Calculate the effects of loans and store the result in field _rspec.  At the moment, we allow slivers to loan only those resources that they have received directly from PLC.  In order to do the accounting, we store three different rspecs: field 'rspec', which is the resources given by PLC; field '_rspec', which is the actual amount of resources the sliver has after all loans; and variable resid_rspec, which is the amount of resources the sliver has after giving out loans but not receiving any."""
+        slivers = {}
+        for name, rec in self.iteritems():
+            if 'rspec' in rec:
+                rec['_rspec'] = rec['rspec'].copy()
+                slivers[name] = rec
+        for rec in slivers.itervalues():
+            eff_rspec = rec['_rspec']
+            resid_rspec = rec['rspec'].copy()
+            for target, resname, amt in rec.get('_loans', []):
+                if target in slivers and amt <= resid_rspec[resname] - MINIMUM_ALLOCATION[resname]:
+                    eff_rspec[resname] -= amt
+                    resid_rspec[resname] -= amt
+                    slivers[target]['_rspec'][resname] += amt
+
+    def deliver_record(self, rec):
+        """A record is simply a dictionary with 'name' and 'timestamp' keys.  We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
+        if rec['timestamp'] < self._min_timestamp: return
+        name = rec['name']
+        old_rec = self.get(name)
+        if old_rec == None: self[name] = rec
+        elif rec['timestamp'] > old_rec['timestamp']:
+            for key in old_rec.keys():
+                if not key.startswith('_'): del old_rec[key]
+            old_rec.update(rec)
+
+    def set_min_timestamp(self, ts):
+        """The ._min_timestamp member is the timestamp on the last comprehensive update.  We use it to determine if a record is stale.  This method should be called whenever new GetSlivers() data comes in."""
+        self._min_timestamp = ts
+        for name, rec in self.items():
+            if rec['timestamp'] < ts: del self[name]
+
+    def sync(self):
+        """Synchronize reality with the database contents.  This method does a lot of things, and it's currently called after every single batch of database changes (a GetSlivers(), a loan, a record).  It may be necessary in the future to do something smarter."""
+
+        # delete expired records
         now = time.time()
-        for key in self.keys():
-            rec = self[key]
-            if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now:
-                del self[key]
-
-    def delete_old_accounts(self):
-        for acct_type, name in accounts.all():
-            if ('%s_%s' % (acct_type, name)) not in self:
-                accounts.get(name).ensure_destroyed()
-
-    def create_new_accounts(self):
-        """Invoke the appropriate create() function for every dirty account."""
-        for rec in self.itervalues():
-            if 'account_type' not in rec: continue
-            if rec['dirty'] and rec['plc_instantiated']:
-                accounts.get(rec['name']).ensure_created(rec)
-            rec['dirty'] = False
-
-    def update_bwcap(self):
-        bwcap_rec = self.get('bwcap')
-        if bwcap_rec and bwcap_rec['dirty']:
-            bwcap.update(bwcap_rec)
-            bwcap_rec['dirty'] = False
-
-
-_db = Database()
-
-@synchronized
-def deliver_records(recs):
-    global _dump_requested
-    _db.deliver_records(recs)
-    _dump_requested = True
-    _db_cond.notify()
-
-@synchronized
-def get_sliver(name): return _db.get('sliver_'+name)
+        for name, rec in self.items():
+            if rec.get('expires', now) < now: del self[name]
+
+        self._compute_effective_rspecs()
+
+        # create and destroy accounts as needed
+        existing_acct_names = accounts.all()
+        for name in existing_acct_names:
+            if name not in self: accounts.get(name).ensure_destroyed()
+        for name, rec in self.iteritems():
+            if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
+            if rec['instantiation'] == 'nm-controller': accounts.get(name).ensure_created(rec)
+
+               # Wake up bwmom to update limits.
+        bwmon.lock.set()
+        global dump_requested
+        dump_requested = True
+        db_cond.notify()
+
 
 def start():
+    """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
     def run():
-        global _dump_requested
-        _db_lock.acquire()
-        try:  # load the db
-            f = open(DB_FILE)
-            _db.update(cPickle.load(f))
-            f.close()
-        except: logger.log_exc()
-        while True:  # handle dump requests forever
-            while not _dump_requested:
-                _db_cond.wait()
-            db_copy = tools.deepcopy(_db)
-            _dump_requested = False
-            _db_lock.release()
-            try: tools.write_file(DB_FILE,
-                                  lambda f: cPickle.dump(db_copy, f, -1))
+        global dump_requested
+        while True:
+            db_lock.acquire()
+            while not dump_requested: db_cond.wait()
+            db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
+            dump_requested = False
+            db_lock.release()
+            try: tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
             except: logger.log_exc()
-            _db_lock.acquire()
+    global db
+    try:
+        f = open(DB_FILE)
+        try: db = cPickle.load(f)
+        finally: f.close()
+    except:
+        logger.log_exc()
+        db = Database()
     tools.as_daemon_thread(run)