svn kwds
[nodemanager.git] / database.py
1 #
2 """The database houses information on slivers.  This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
6
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries).  Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
11
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
14 """
15
16 import cPickle
17 import threading
18 import time
19
20 import accounts
21 import logger
22 import tools
23 import bwmon
24
25 # We enforce minimum allocations to keep the clueless from hosing their slivers.
26 # Disallow disk loans because there's currently no way to punish slivers over quota.
27 MINIMUM_ALLOCATION = {'cpu_pct': 0,
28                       'cpu_share': 1,
29                       'net_min_rate': 0,
30                       'net_max_rate': 8,
31                       'net_i2_min_rate': 0,
32                       'net_i2_max_rate': 8,
33                       'net_share': 1,
34                       }
35 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
36
37 DB_FILE = '/var/lib/nodemanager/database.pickle'
38
39
40 # database object and associated lock
41 db_lock = threading.RLock()
42 db = None
43
44 # these are used in tandem to request a database dump from the dumper daemon
45 db_cond = threading.Condition(db_lock)
46 dump_requested = False
47
48 # decorator that acquires and releases the database lock before and after the decorated operation
49 # XXX - replace with "with" statements once we switch to 2.5
50 def synchronized(fn):
51     def sync_fn(*args, **kw_args):
52         db_lock.acquire()
53         try: return fn(*args, **kw_args)
54         finally: db_lock.release()
55     sync_fn.__doc__ = fn.__doc__
56     sync_fn.__name__ = fn.__name__
57     return sync_fn
58
59
60 class Database(dict):
61     def __init__(self):
62         self._min_timestamp = 0
63
64     def _compute_effective_rspecs(self):
65         """Calculate the effects of loans and store the result in field _rspec.
66 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
67 In order to do the accounting, we store three different rspecs:
68  * field 'rspec', which is the resources given by PLC;
69  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
70  * and variable resid_rspec, which is the amount of resources the sliver
71    has after giving out loans but not receiving any."""
72         slivers = {}
73         for name, rec in self.iteritems():
74             if 'rspec' in rec:
75                 rec['_rspec'] = rec['rspec'].copy()
76                 slivers[name] = rec
77         for rec in slivers.itervalues():
78             eff_rspec = rec['_rspec']
79             resid_rspec = rec['rspec'].copy()
80             for target, resource_name, amount in rec.get('_loans', []):
81                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
82                     eff_rspec[resource_name] -= amount
83                     resid_rspec[resource_name] -= amount
84                     slivers[target]['_rspec'][resource_name] += amount
85
86     def deliver_record(self, rec):
87         """A record is simply a dictionary with 'name' and 'timestamp'
88 keys. We keep some persistent private data in the records under keys
89 that start with '_'; thus record updates should not displace such
90 keys."""
91         if rec['timestamp'] < self._min_timestamp: return
92         name = rec['name']
93         old_rec = self.get(name)
94         if old_rec == None: self[name] = rec
95         elif rec['timestamp'] > old_rec['timestamp']:
96             for key in old_rec.keys():
97                 if not key.startswith('_'): del old_rec[key]
98             old_rec.update(rec)
99
100     def set_min_timestamp(self, ts):
101         """The ._min_timestamp member is the timestamp on the last comprehensive update.
102 We use it to determine if a record is stale.
103 This method should be called whenever new GetSlivers() data comes in."""
104         self._min_timestamp = ts
105         for name, rec in self.items():
106             if rec['timestamp'] < ts: del self[name]
107
108     def sync(self):
109         """Synchronize reality with the database contents.  This
110 method does a lot of things, and it's currently called after every
111 single batch of database changes (a GetSlivers(), a loan, a record).
112 It may be necessary in the future to do something smarter."""
113
114         # delete expired records
115         now = time.time()
116         for name, rec in self.items():
117             if rec.get('expires', now) < now: del self[name]
118
119         self._compute_effective_rspecs()
120
121         # create and destroy accounts as needed
122         logger.verbose("database: sync : fetching accounts")
123         existing_acct_names = accounts.all()
124         for name in existing_acct_names:
125             if name not in self:
126                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
127                 accounts.get(name).ensure_destroyed()
128         for name, rec in self.iteritems():
129             # protect this; if anything fails for a given sliver
130             # we still need the other ones to be handled
131             try:
132                 sliver = accounts.get(name)
133                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name,sliver._get_class()))
134                 # Make sure we refresh accounts that are running
135                 if rec['instantiation'] == 'plc-instantiated':
136                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
137                     sliver.ensure_created(rec)
138                 elif rec['instantiation'] == 'nm-controller':
139                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
140                     sliver.ensure_created(rec)
141                 # Back door to ensure PLC overrides Ticket in delegation.
142                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
143                     # if the ticket has been delivered and the nm-controller started the slice
144                     # update rspecs and keep them up to date.
145                     if sliver.is_running():
146                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
147                         sliver.ensure_created(rec)
148             except:
149                 logger.log_exc("database: sync failed to handle sliver",name=name)
150
151         # Wake up bwmom to update limits.
152         bwmon.lock.set()
153         global dump_requested
154         dump_requested = True
155         db_cond.notify()
156
157
158 def start():
159     """The database dumper daemon.
160 When it starts up, it populates the database with the last dumped database.
161 It proceeds to handle dump requests forever."""
162     def run():
163         global dump_requested
164         while True:
165             db_lock.acquire()
166             while not dump_requested: db_cond.wait()
167             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
168             dump_requested = False
169             db_lock.release()
170             try:
171                 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
172                 logger.log_database(db)
173             except:
174                 logger.log_exc("database.start: failed to pickle/dump")
175     global db
176     try:
177         f = open(DB_FILE)
178         try: db = cPickle.load(f)
179         finally: f.close()
180     except IOError:
181         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
182         db = Database()
183     except:
184         logger.log_exc("database: failed in start")
185         db = Database()
186     logger.log('database.start')
187     tools.as_daemon_thread(run)