Resilience against exceptions thrown from bwmon.py.
[nodemanager.git] / database.py
1 """The database houses information on slivers.  This information
2 reaches the sliver manager in two different ways: one, through the
3 GetSlivers() call made periodically; two, by users delivering tickets.
4 The sync() method of the Database class turns this data into reality.
5
6 The database format is a dictionary that maps account names to records
7 (also known as dictionaries).  Inside each record, data supplied or
8 computed locally is stored under keys that begin with an underscore,
9 and data from PLC is stored under keys that don't.
10
11 In order to maintain service when the node reboots during a network
12 partition, the database is constantly being dumped to disk.
13 """
14
15 import cPickle
16 import threading
17 import time
18
19 import accounts
20 import logger
21 import tools
22 import bwmon
23
24 # We enforce minimum allocations to keep the clueless from hosing their slivers.
25 # Disallow disk loans because there's currently no way to punish slivers over quota.
26 MINIMUM_ALLOCATION = {'cpu_min': 0, 'cpu_share': 32, 'net_min_rate': 0, 'net_max_rate': 8, 'net_i2_min_rate': 0, 'net_i2_max_rate': 8, 'net_share': 1}
27 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
28
29 DB_FILE = '/root/sliver_mgr_db.pickle'
30
31
32 # database object and associated lock
33 db_lock = threading.RLock()
34 db = None
35
36 # these are used in tandem to request a database dump from the dumper daemon
37 db_cond = threading.Condition(db_lock)
38 dump_requested = False
39
40 # decorator that acquires and releases the database lock before and after the decorated operation
41 # XXX - replace with "with" statements once we switch to 2.5
42 def synchronized(fn):
43     def sync_fn(*args, **kw_args):
44         db_lock.acquire()
45         try: return fn(*args, **kw_args)
46         finally: db_lock.release()
47     sync_fn.__doc__ = fn.__doc__
48     sync_fn.__name__ = fn.__name__
49     return sync_fn
50
51
52 class Database(dict):
53     def __init__(self):
54         self._min_timestamp = 0
55
56     def _compute_effective_rspecs(self):
57         """Calculate the effects of loans and store the result in field _rspec.  At the moment, we allow slivers to loan only those resources that they have received directly from PLC.  In order to do the accounting, we store three different rspecs: field 'rspec', which is the resources given by PLC; field '_rspec', which is the actual amount of resources the sliver has after all loans; and variable resid_rspec, which is the amount of resources the sliver has after giving out loans but not receiving any."""
58         slivers = {}
59         for name, rec in self.iteritems():
60             if 'rspec' in rec:
61                 rec['_rspec'] = rec['rspec'].copy()
62                 slivers[name] = rec
63         for rec in slivers.itervalues():
64             eff_rspec = rec['_rspec']
65             resid_rspec = rec['rspec'].copy()
66             for target, resname, amt in rec.get('_loans', []):
67                 if target in slivers and amt <= resid_rspec[resname] - MINIMUM_ALLOCATION[resname]:
68                     eff_rspec[resname] -= amt
69                     resid_rspec[resname] -= amt
70                     slivers[target]['_rspec'][resname] += amt
71
72     def deliver_record(self, rec):
73         """A record is simply a dictionary with 'name' and 'timestamp' keys.  We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
74         if rec['timestamp'] < self._min_timestamp: return
75         name = rec['name']
76         old_rec = self.get(name)
77         if old_rec == None: self[name] = rec
78         elif rec['timestamp'] > old_rec['timestamp']:
79             for key in old_rec.keys():
80                 if not key.startswith('_'): del old_rec[key]
81             old_rec.update(rec)
82
83     def set_min_timestamp(self, ts):
84         """The ._min_timestamp member is the timestamp on the last comprehensive update.  We use it to determine if a record is stale.  This method should be called whenever new GetSlivers() data comes in."""
85         self._min_timestamp = ts
86         for name, rec in self.items():
87             if rec['timestamp'] < ts: del self[name]
88
89     def sync(self):
90         """Synchronize reality with the database contents.  This method does a lot of things, and it's currently called after every single batch of database changes (a GetSlivers(), a loan, a record).  It may be necessary in the future to do something smarter."""
91
92         # delete expired records
93         now = time.time()
94         for name, rec in self.items():
95             if rec.get('expires', now) < now: del self[name]
96
97         self._compute_effective_rspecs()
98
99         # create and destroy accounts as needed
100         existing_acct_names = accounts.all()
101         for name in existing_acct_names:
102             if name not in self: accounts.get(name).ensure_destroyed()
103         for name, rec in self.iteritems():
104             if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
105
106         try: bwmon.GetSlivers(self)
107         except: logger.log_exc()
108
109         # request a database dump
110         global dump_requested
111         dump_requested = True
112         db_cond.notify()
113
114
115 def start():
116     """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
117     def run():
118         global dump_requested
119         while True:
120             db_lock.acquire()
121             while not dump_requested: db_cond.wait()
122             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
123             dump_requested = False
124             db_lock.release()
125             try: tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
126             except: logger.log_exc()
127     global db
128     try:
129         f = open(DB_FILE)
130         try: db = cPickle.load(f)
131         finally: f.close()
132     except:
133         logger.log_exc()
134         db = Database()
135     tools.as_daemon_thread(run)