rename accounts into account for consistency
[nodemanager.git] / database.py
1 #
2 """The database houses information on slivers.  This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
6
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries).  Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
11
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
14 """
15
16 import cPickle
17 import threading
18 import time
19
20 import account
21 import coresched
22 import logger
23 import tools
24 import bwmon
25
26 # We enforce minimum allocations to keep the clueless from hosing their slivers.
27 # Disallow disk loans because there's currently no way to punish slivers over quota.
28 MINIMUM_ALLOCATION = {'cpu_pct': 0,
29                       'cpu_share': 1,
30                       'net_min_rate': 0,
31                       'net_max_rate': 8,
32                       'net_i2_min_rate': 0,
33                       'net_i2_max_rate': 8,
34                       'net_share': 1,
35                       }
36 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
37
38 DB_FILE = '/var/lib/nodemanager/database.pickle'
39
40
41 # database object and associated lock
42 db_lock = threading.RLock()
43 db = None
44
45 # these are used in tandem to request a database dump from the dumper daemon
46 db_cond = threading.Condition(db_lock)
47 dump_requested = False
48
49 # decorator that acquires and releases the database lock before and after the decorated operation
50 # XXX - replace with "with" statements once we switch to 2.5
51 def synchronized(fn):
52     def sync_fn(*args, **kw_args):
53         db_lock.acquire()
54         try: return fn(*args, **kw_args)
55         finally: db_lock.release()
56     sync_fn.__doc__ = fn.__doc__
57     sync_fn.__name__ = fn.__name__
58     return sync_fn
59
60
61 class Database(dict):
62     def __init__(self):
63         self._min_timestamp = 0
64
65     def _compute_effective_rspecs(self):
66         """Calculate the effects of loans and store the result in field _rspec.
67 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
68 In order to do the accounting, we store three different rspecs:
69  * field 'rspec', which is the resources given by PLC;
70  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
71  * and variable resid_rspec, which is the amount of resources the sliver
72    has after giving out loans but not receiving any."""
73         slivers = {}
74         for name, rec in self.iteritems():
75             if 'rspec' in rec:
76                 rec['_rspec'] = rec['rspec'].copy()
77                 slivers[name] = rec
78         for rec in slivers.itervalues():
79             eff_rspec = rec['_rspec']
80             resid_rspec = rec['rspec'].copy()
81             for target, resource_name, amount in rec.get('_loans', []):
82                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
83                     eff_rspec[resource_name] -= amount
84                     resid_rspec[resource_name] -= amount
85                     slivers[target]['_rspec'][resource_name] += amount
86
87     def deliver_record(self, rec):
88         """A record is simply a dictionary with 'name' and 'timestamp'
89 keys. We keep some persistent private data in the records under keys
90 that start with '_'; thus record updates should not displace such
91 keys."""
92         if rec['timestamp'] < self._min_timestamp: return
93         name = rec['name']
94         old_rec = self.get(name)
95         if old_rec == None: self[name] = rec
96         elif rec['timestamp'] > old_rec['timestamp']:
97             for key in old_rec.keys():
98                 if not key.startswith('_'): del old_rec[key]
99             old_rec.update(rec)
100
101     def set_min_timestamp(self, ts):
102         """The ._min_timestamp member is the timestamp on the last comprehensive update.
103 We use it to determine if a record is stale.
104 This method should be called whenever new GetSlivers() data comes in."""
105         self._min_timestamp = ts
106         for name, rec in self.items():
107             if rec['timestamp'] < ts: del self[name]
108
109     def sync(self):
110         """Synchronize reality with the database contents.  This
111 method does a lot of things, and it's currently called after every
112 single batch of database changes (a GetSlivers(), a loan, a record).
113 It may be necessary in the future to do something smarter."""
114
115         # delete expired records
116         now = time.time()
117         for name, rec in self.items():
118             if rec.get('expires', now) < now: del self[name]
119
120         self._compute_effective_rspecs()
121
122         try:
123             x = coresched.CoreSched()
124             x.adjustCores(self)
125         except:
126             logger.log_exc("database: exception while doing core sched")
127
128         # create and destroy accounts as needed
129         logger.verbose("database: sync : fetching accounts")
130         existing_acct_names = account.all()
131         for name in existing_acct_names:
132             if name not in self:
133                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
134                 account.get(name).ensure_destroyed()
135         for name, rec in self.iteritems():
136             # protect this; if anything fails for a given sliver
137             # we still need the other ones to be handled
138             try:
139                 sliver = account.get(name)
140                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name,sliver._get_class()))
141                 # Make sure we refresh accounts that are running
142                 if rec['instantiation'] == 'plc-instantiated':
143                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
144                     sliver.ensure_created(rec)
145                 elif rec['instantiation'] == 'nm-controller':
146                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
147                     sliver.ensure_created(rec)
148                 # Back door to ensure PLC overrides Ticket in delegation.
149                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
150                     # if the ticket has been delivered and the nm-controller started the slice
151                     # update rspecs and keep them up to date.
152                     if sliver.is_running():
153                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
154                         sliver.ensure_created(rec)
155             except:
156                 logger.log_exc("database: sync failed to handle sliver",name=name)
157
158         # Wake up bwmom to update limits.
159         bwmon.lock.set()
160         global dump_requested
161         dump_requested = True
162         db_cond.notify()
163
164
165 def start():
166     """The database dumper daemon.
167 When it starts up, it populates the database with the last dumped database.
168 It proceeds to handle dump requests forever."""
169     def run():
170         global dump_requested
171         while True:
172             db_lock.acquire()
173             while not dump_requested: db_cond.wait()
174             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
175             dump_requested = False
176             db_lock.release()
177             try:
178                 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
179                 logger.log_database(db)
180             except:
181                 logger.log_exc("database.start: failed to pickle/dump")
182     global db
183     try:
184         f = open(DB_FILE)
185         try: db = cPickle.load(f)
186         finally: f.close()
187     except IOError:
188         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
189         db = Database()
190     except:
191         logger.log_exc("database: failed in start")
192         db = Database()
193     logger.log('database.start')
194     tools.as_daemon_thread(run)