- Change .py files to use 4-space indents and no hard tab characters.
[nodemanager.git] / database.py
1 # $Id$
2 # $URL$
3
4 """The database houses information on slivers.  This information
5 reaches the sliver manager in two different ways: one, through the
6 GetSlivers() call made periodically; two, by users delivering tickets.
7 The sync() method of the Database class turns this data into reality.
8
9 The database format is a dictionary that maps account names to records
10 (also known as dictionaries).  Inside each record, data supplied or
11 computed locally is stored under keys that begin with an underscore,
12 and data from PLC is stored under keys that don't.
13
14 In order to maintain service when the node reboots during a network
15 partition, the database is constantly being dumped to disk.
16 """
17
18 import cPickle
19 import threading
20 import time
21
22 import accounts
23 import logger
24 import tools
25 import bwmon
26
27 # We enforce minimum allocations to keep the clueless from hosing their slivers.
28 # Disallow disk loans because there's currently no way to punish slivers over quota.
29 MINIMUM_ALLOCATION = {'cpu_pct': 0,
30                       'cpu_share': 1,
31                       'net_min_rate': 0,
32                       'net_max_rate': 8,
33                       'net_i2_min_rate': 0,
34                       'net_i2_max_rate': 8,
35                       'net_share': 1,
36                       }
37 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
38
39 DB_FILE = '/var/lib/nodemanager/database.pickle'
40
41
42 # database object and associated lock
43 db_lock = threading.RLock()
44 db = None
45
46 # these are used in tandem to request a database dump from the dumper daemon
47 db_cond = threading.Condition(db_lock)
48 dump_requested = False
49
50 # decorator that acquires and releases the database lock before and after the decorated operation
51 # XXX - replace with "with" statements once we switch to 2.5
52 def synchronized(fn):
53     def sync_fn(*args, **kw_args):
54         db_lock.acquire()
55         try: return fn(*args, **kw_args)
56         finally: db_lock.release()
57     sync_fn.__doc__ = fn.__doc__
58     sync_fn.__name__ = fn.__name__
59     return sync_fn
60
61
62 class Database(dict):
63     def __init__(self):
64         self._min_timestamp = 0
65
66     def _compute_effective_rspecs(self):
67         """Calculate the effects of loans and store the result in field _rspec.
68 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
69 In order to do the accounting, we store three different rspecs:
70  * field 'rspec', which is the resources given by PLC;
71  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
72  * and variable resid_rspec, which is the amount of resources the sliver
73    has after giving out loans but not receiving any."""
74         slivers = {}
75         for name, rec in self.iteritems():
76             if 'rspec' in rec:
77                 rec['_rspec'] = rec['rspec'].copy()
78                 slivers[name] = rec
79         for rec in slivers.itervalues():
80             eff_rspec = rec['_rspec']
81             resid_rspec = rec['rspec'].copy()
82             for target, resource_name, amount in rec.get('_loans', []):
83                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
84                     eff_rspec[resource_name] -= amount
85                     resid_rspec[resource_name] -= amount
86                     slivers[target]['_rspec'][resource_name] += amount
87
88     def deliver_record(self, rec):
89         """A record is simply a dictionary with 'name' and 'timestamp'
90 keys. We keep some persistent private data in the records under keys
91 that start with '_'; thus record updates should not displace such
92 keys."""
93         if rec['timestamp'] < self._min_timestamp: return
94         name = rec['name']
95         old_rec = self.get(name)
96         if old_rec == None: self[name] = rec
97         elif rec['timestamp'] > old_rec['timestamp']:
98             for key in old_rec.keys():
99                 if not key.startswith('_'): del old_rec[key]
100             old_rec.update(rec)
101
102     def set_min_timestamp(self, ts):
103         """The ._min_timestamp member is the timestamp on the last comprehensive update.
104 We use it to determine if a record is stale.
105 This method should be called whenever new GetSlivers() data comes in."""
106         self._min_timestamp = ts
107         for name, rec in self.items():
108             if rec['timestamp'] < ts: del self[name]
109
110     def sync(self):
111         """Synchronize reality with the database contents.  This
112 method does a lot of things, and it's currently called after every
113 single batch of database changes (a GetSlivers(), a loan, a record).
114 It may be necessary in the future to do something smarter."""
115
116         # delete expired records
117         now = time.time()
118         for name, rec in self.items():
119             if rec.get('expires', now) < now: del self[name]
120
121         self._compute_effective_rspecs()
122
123         # create and destroy accounts as needed
124         logger.verbose("database: sync : fetching accounts")
125         existing_acct_names = accounts.all()
126         for name in existing_acct_names:
127             if name not in self:
128                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
129                 accounts.get(name).ensure_destroyed()
130         for name, rec in self.iteritems():
131             # protect this; if anything fails for a given sliver
132             # we still need the other ones to be handled
133             try:
134                 sliver = accounts.get(name)
135                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name,sliver._get_class()))
136                 # Make sure we refresh accounts that are running
137                 if rec['instantiation'] == 'plc-instantiated':
138                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
139                     sliver.ensure_created(rec)
140                 elif rec['instantiation'] == 'nm-controller':
141                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
142                     sliver.ensure_created(rec)
143                 # Back door to ensure PLC overrides Ticket in delegation.
144                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
145                     # if the ticket has been delivered and the nm-controller started the slice
146                     # update rspecs and keep them up to date.
147                     if sliver.is_running():
148                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
149                         sliver.ensure_created(rec)
150             except:
151                 logger.log_exc("database: sync failed to handle sliver",name=name)
152
153         # Wake up bwmom to update limits.
154         bwmon.lock.set()
155         global dump_requested
156         dump_requested = True
157         db_cond.notify()
158
159
160 def start():
161     """The database dumper daemon.
162 When it starts up, it populates the database with the last dumped database.
163 It proceeds to handle dump requests forever."""
164     def run():
165         global dump_requested
166         while True:
167             db_lock.acquire()
168             while not dump_requested: db_cond.wait()
169             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
170             dump_requested = False
171             db_lock.release()
172             try:
173                 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
174                 logger.log_database(db)
175             except:
176                 logger.log_exc("database.start: failed to pickle/dump")
177     global db
178     try:
179         f = open(DB_FILE)
180         try: db = cPickle.load(f)
181         finally: f.close()
182     except IOError:
183         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
184         db = Database()
185     except:
186         logger.log_exc("database: failed in start")
187         db = Database()
188     logger.log('database.start')
189     tools.as_daemon_thread(run)