can't blindly decode() a pickle, need to store as bytes
[nodemanager.git] / database.py
1 #
2 """The database houses information on slivers.  This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
6
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries).  Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
11
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
14 """
15
16 import sys
17
18 import pickle
19 import threading
20 import time
21
22 import account
23 import logger
24 import tools
25 import bwmon
26
27 # hopefully temporary
28 # is there a good reason to have this done here and not in a plugin ?
29 try:    from coresched_lxc import CoreSched
30 except: from coresched_vs  import CoreSched
31
32 # We enforce minimum allocations to keep the clueless from hosing their slivers.
33 # Disallow disk loans because there's currently no way to punish slivers over quota.
34 MINIMUM_ALLOCATION = {'cpu_pct': 0,
35                       'cpu_share': 1,
36                       'net_min_rate': 0,
37                       'net_max_rate': 8,
38                       'net_i2_min_rate': 0,
39                       'net_i2_max_rate': 8,
40                       'net_share': 1,
41                       }
42 LOANABLE_RESOURCES = list(MINIMUM_ALLOCATION.keys())
43
44 DB_FILE = '/var/lib/nodemanager/database.pickle'
45
46
47 # database object and associated lock
48 db_lock = threading.RLock()
49 db = None
50
51 # these are used in tandem to request a database dump from the dumper daemon
52 db_cond = threading.Condition(db_lock)
53 dump_requested = False
54
55 # decorator that acquires and releases the database lock before and after the decorated operation
56 # XXX - replace with "with" statements once we switch to 2.5
57 def synchronized(fn):
58     def sync_fn(*args, **kw_args):
59         db_lock.acquire()
60         try: return fn(*args, **kw_args)
61         finally: db_lock.release()
62     sync_fn.__doc__ = fn.__doc__
63     sync_fn.__name__ = fn.__name__
64     return sync_fn
65
66
67 class Database(dict):
68     def __init__(self):
69         self._min_timestamp = 0
70
71     def _compute_effective_rspecs(self):
72         """Calculate the effects of loans and store the result in field _rspec.
73 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
74 In order to do the accounting, we store three different rspecs:
75  * field 'rspec', which is the resources given by PLC;
76  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
77  * and variable resid_rspec, which is the amount of resources the sliver
78    has after giving out loans but not receiving any."""
79         slivers = {}
80         for name, rec in self.items():
81             if 'rspec' in rec:
82                 rec['_rspec'] = rec['rspec'].copy()
83                 slivers[name] = rec
84         for rec in slivers.values():
85             eff_rspec = rec['_rspec']
86             resid_rspec = rec['rspec'].copy()
87             for target, resource_name, amount in rec.get('_loans', []):
88                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
89                     eff_rspec[resource_name] -= amount
90                     resid_rspec[resource_name] -= amount
91                     slivers[target]['_rspec'][resource_name] += amount
92
93     def deliver_record(self, rec):
94         """A record is simply a dictionary with 'name' and 'timestamp'
95 keys. We keep some persistent private data in the records under keys
96 that start with '_'; thus record updates should not displace such
97 keys."""
98         if rec['timestamp'] < self._min_timestamp: return
99         name = rec['name']
100         old_rec = self.get(name)
101         if old_rec == None:
102             self[name] = rec
103         elif rec['timestamp'] > old_rec['timestamp']:
104             for key in list(old_rec.keys()):
105                 if not key.startswith('_'):
106                     del old_rec[key]
107             old_rec.update(rec)
108
109     def set_min_timestamp(self, ts):
110         """The ._min_timestamp member is the timestamp on the last comprehensive update.
111 We use it to determine if a record is stale.
112 This method should be called whenever new GetSlivers() data comes in."""
113         self._min_timestamp = ts
114         for name, rec in list(self.items()):
115             if rec['timestamp'] < ts: del self[name]
116
117     def sync(self):
118         """Synchronize reality with the database contents.  This
119 method does a lot of things, and it's currently called after every
120 single batch of database changes (a GetSlivers(), a loan, a record).
121 It may be necessary in the future to do something smarter."""
122
123         # delete expired records
124         now = time.time()
125         for name, rec in list(self.items()):
126             if rec.get('expires', now) < now: del self[name]
127
128         self._compute_effective_rspecs()
129
130         try:
131             coresched = CoreSched()
132             coresched.adjustCores(self)
133         except:
134             logger.log_exc("database: exception while doing core sched")
135
136         # create and destroy accounts as needed
137         logger.verbose("database: sync : fetching accounts")
138         existing_acct_names = account.all()
139         for name in existing_acct_names:
140             if name not in self:
141                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
142                 account.get(name).ensure_destroyed()
143         for name, rec in self.items():
144             # protect this; if anything fails for a given sliver
145             # we still need the other ones to be handled
146             try:
147                 sliver = account.get(name)
148                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class()))
149                 # Make sure we refresh accounts that are running
150                 if rec['instantiation'] == 'plc-instantiated':
151                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
152                     sliver.ensure_created(rec)
153                 elif rec['instantiation'] == 'nm-controller':
154                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
155                     sliver.ensure_created(rec)
156                 # Back door to ensure PLC overrides Ticket in delegation.
157                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
158                     # if the ticket has been delivered and the nm-controller started the slice
159                     # update rspecs and keep them up to date.
160                     if sliver.is_running():
161                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
162                         sliver.ensure_created(rec)
163             except SystemExit as e:
164                 sys.exit(e)
165             except:
166                 logger.log_exc("database: sync failed to handle sliver", name=name)
167
168         # Wake up bwmom to update limits.
169         bwmon.lock.set()
170         global dump_requested
171         dump_requested = True
172         db_cond.notify()
173
174
175 def start():
176     """The database dumper daemon.
177 When it starts up, it populates the database with the last dumped database.
178 It proceeds to handle dump requests forever."""
179     def run():
180         global dump_requested
181         while True:
182             db_lock.acquire()
183             while not dump_requested: db_cond.wait()
184             db_pickle = pickle.dumps(db, pickle.HIGHEST_PROTOCOL)
185             dump_requested = False
186             db_lock.release()
187             try:
188                 tools.write_file(
189                     DB_FILE, lambda f: f.write(db_pickle), binary=True)
190                 logger.log_database(db)
191             except:
192                 logger.log_exc("database.start: failed to pickle/dump")
193     global db
194     try:
195         f = open(DB_FILE)
196         try: db = pickle.load(f)
197         finally: f.close()
198     except IOError:
199         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
200         db = Database()
201     except:
202         logger.log_exc("database: failed in start")
203         db = Database()
204     logger.log('database.start')
205     tools.as_daemon_thread(run)