Added ReCreate. Also added try catch to api eval of rpc method.
[nodemanager.git] / database.py
index 782922a..46a4e47 100644 (file)
@@ -1,19 +1,32 @@
+"""The database houses information on slivers.  This information
+reaches the sliver manager in two different ways: one, through the
+GetSlivers() call made periodically; two, by users delivering tickets.
+The sync() method of the Database class turns this data into reality.
+
+The database format is a dictionary that maps account names to records
+(also known as dictionaries).  Inside each record, data supplied or
+computed locally is stored under keys that begin with an underscore,
+and data from PLC is stored under keys that don't.
+
+In order to maintain service when the node reboots during a network
+partition, the database is constantly being dumped to disk.
+"""
+
 import cPickle
 import threading
 import time
 
-try: from bwlimit import bwmin, bwmax
-except ImportError: bwmin, bwmax = 8, 1000000000
 import accounts
 import logger
 import tools
+import bwmon
 
+# We enforce minimum allocations to keep the clueless from hosing their slivers.
+# Disallow disk loans because there's currently no way to punish slivers over quota.
+MINIMUM_ALLOCATION = {'cpu_min': 0, 'cpu_share': 32, 'net_min_rate': 0, 'net_max_rate': 8, 'net_i2_min_rate': 0, 'net_i2_max_rate': 8, 'net_share': 1}
+LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
 
-DB_FILE = '/root/node_mgr_db.pickle'
-
-LOANABLE_RESOURCES = ['cpu_min', 'cpu_share', 'net_min', 'net_max', 'net2_min', 'net2_max', 'net_share', 'disk_max']
-
-DEFAULT_ALLOCATIONS = {'enabled': 1, 'cpu_min': 0, 'cpu_share': 32, 'net_min': bwmin, 'net_max': bwmax, 'net2_min': bwmin, 'net2_max': bwmax, 'net_share': 1, 'disk_max': 5000000}
+DB_FILE = '/root/sliver_mgr_db.pickle'
 
 
 # database object and associated lock
@@ -25,6 +38,7 @@ db_cond = threading.Condition(db_lock)
 dump_requested = False
 
 # decorator that acquires and releases the database lock before and after the decorated operation
+# XXX - replace with "with" statements once we switch to 2.5
 def synchronized(fn):
     def sync_fn(*args, **kw_args):
         db_lock.acquire()
@@ -50,27 +64,31 @@ class Database(dict):
             eff_rspec = rec['_rspec']
             resid_rspec = rec['rspec'].copy()
             for target, resname, amt in rec.get('_loans', []):
-                if target in slivers and amt < resid_rspec[resname]:
+                if target in slivers and amt <= resid_rspec[resname] - MINIMUM_ALLOCATION[resname]:
                     eff_rspec[resname] -= amt
                     resid_rspec[resname] -= amt
                     slivers[target]['_rspec'][resname] += amt
 
     def deliver_record(self, rec):
         """A record is simply a dictionary with 'name' and 'timestamp' keys.  We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
+        if rec['timestamp'] < self._min_timestamp: return
         name = rec['name']
         old_rec = self.get(name)
-        if old_rec != None and rec['timestamp'] > old_rec['timestamp']:
+        if old_rec == None: self[name] = rec
+        elif rec['timestamp'] > old_rec['timestamp']:
             for key in old_rec.keys():
                 if not key.startswith('_'): del old_rec[key]
             old_rec.update(rec)
-        elif rec['timestamp'] >= self._min_timestamp: self[name] = rec
 
     def set_min_timestamp(self, ts):
+        """The ._min_timestamp member is the timestamp on the last comprehensive update.  We use it to determine if a record is stale.  This method should be called whenever new GetSlivers() data comes in."""
         self._min_timestamp = ts
         for name, rec in self.items():
             if rec['timestamp'] < ts: del self[name]
 
     def sync(self):
+        """Synchronize reality with the database contents.  This method does a lot of things, and it's currently called after every single batch of database changes (a GetSlivers(), a loan, a record).  It may be necessary in the future to do something smarter."""
+
         # delete expired records
         now = time.time()
         for name, rec in self.items():
@@ -84,8 +102,10 @@ class Database(dict):
             if name not in self: accounts.get(name).ensure_destroyed()
         for name, rec in self.iteritems():
             if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
+            if rec['instantiation'] == 'nm-controller': accounts.get(name).ensure_created(rec)
 
-        # request a database dump
+               # Wake up bwmom to update limits.
+        bwmon.lock.set()
         global dump_requested
         dump_requested = True
         db_cond.notify()
@@ -98,10 +118,10 @@ def start():
         while True:
             db_lock.acquire()
             while not dump_requested: db_cond.wait()
-            db_copy = tools.deepcopy(db)
+            db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
             dump_requested = False
             db_lock.release()
-            try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
+            try: tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
             except: logger.log_exc()
     global db
     try:
@@ -112,33 +132,3 @@ def start():
         logger.log_exc()
         db = Database()
     tools.as_daemon_thread(run)
-
-@synchronized
-def GetSlivers_callback(data):
-    for d in data:
-        for sliver in d['slivers']:
-            rec = sliver.copy()
-            rec.setdefault('timestamp', d['timestamp'])
-            rec.setdefault('type', 'sliver.VServer')
-
-            # convert attributes field to a proper dict
-            attr_dict = {}
-            for attr in rec.pop('attributes'): attr_dict[attr['name']] = attr['value']
-
-            # squash keys
-            keys = rec.pop('keys')
-            rec.setdefault('keys', '\n'.join([key_struct['key'] for key_struct in keys])
-
-            rec.setdefault('initscript', attr_dict.get('initscript'))
-            rec.setdefault('delegations', [])  # XXX - delegation not yet supported
-
-            # extract the implied rspec
-            rspec = {}
-            rec['rspec'] = rspec
-            for resname, default_amt in DEFAULT_ALLOCATIONS.iteritems():
-                try: amt = int(attr_dict[resname])
-                except (KeyError, ValueError): amt = default_amt
-                rspec[resname] = amt
-            db.deliver_record(rec)
-        db.set_min_timestamp(d['timestamp'])
-    db.sync()