Merge branch 'ipv6'
[nodemanager.git] / database.py
1 #
2 """The database houses information on slivers.  This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
6
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries).  Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
11
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
14 """
15
16 import cPickle
17 import threading
18 import time
19
20 import account
21 import logger
22 import tools
23 import bwmon
24
25 # hopefully temporary
26 # is there a good reason to have this done here and not in a plugin ?
27 try:    from coresched_lxc import CoreSched
28 except: from coresched_vs  import CoreSched
29
30 # We enforce minimum allocations to keep the clueless from hosing their slivers.
31 # Disallow disk loans because there's currently no way to punish slivers over quota.
32 MINIMUM_ALLOCATION = {'cpu_pct': 0,
33                       'cpu_share': 1,
34                       'net_min_rate': 0,
35                       'net_max_rate': 8,
36                       'net_i2_min_rate': 0,
37                       'net_i2_max_rate': 8,
38                       'net_share': 1,
39                       }
40 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
41
42 DB_FILE = '/var/lib/nodemanager/database.pickle'
43
44
45 # database object and associated lock
46 db_lock = threading.RLock()
47 db = None
48
49 # these are used in tandem to request a database dump from the dumper daemon
50 db_cond = threading.Condition(db_lock)
51 dump_requested = False
52
53 # decorator that acquires and releases the database lock before and after the decorated operation
54 # XXX - replace with "with" statements once we switch to 2.5
55 def synchronized(fn):
56     def sync_fn(*args, **kw_args):
57         db_lock.acquire()
58         try: return fn(*args, **kw_args)
59         finally: db_lock.release()
60     sync_fn.__doc__ = fn.__doc__
61     sync_fn.__name__ = fn.__name__
62     return sync_fn
63
64
65 class Database(dict):
66     def __init__(self):
67         self._min_timestamp = 0
68
69     def _compute_effective_rspecs(self):
70         """Calculate the effects of loans and store the result in field _rspec.
71 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
72 In order to do the accounting, we store three different rspecs:
73  * field 'rspec', which is the resources given by PLC;
74  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
75  * and variable resid_rspec, which is the amount of resources the sliver
76    has after giving out loans but not receiving any."""
77         slivers = {}
78         for name, rec in self.iteritems():
79             if 'rspec' in rec:
80                 rec['_rspec'] = rec['rspec'].copy()
81                 slivers[name] = rec
82         for rec in slivers.itervalues():
83             eff_rspec = rec['_rspec']
84             resid_rspec = rec['rspec'].copy()
85             for target, resource_name, amount in rec.get('_loans', []):
86                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
87                     eff_rspec[resource_name] -= amount
88                     resid_rspec[resource_name] -= amount
89                     slivers[target]['_rspec'][resource_name] += amount
90
91     def deliver_record(self, rec):
92         """A record is simply a dictionary with 'name' and 'timestamp'
93 keys. We keep some persistent private data in the records under keys
94 that start with '_'; thus record updates should not displace such
95 keys."""
96         if rec['timestamp'] < self._min_timestamp: return
97         name = rec['name']
98         old_rec = self.get(name)
99         if old_rec == None: self[name] = rec
100         elif rec['timestamp'] > old_rec['timestamp']:
101             for key in old_rec.keys():
102                 if not key.startswith('_'): del old_rec[key]
103             old_rec.update(rec)
104
105     def set_min_timestamp(self, ts):
106         """The ._min_timestamp member is the timestamp on the last comprehensive update.
107 We use it to determine if a record is stale.
108 This method should be called whenever new GetSlivers() data comes in."""
109         self._min_timestamp = ts
110         for name, rec in self.items():
111             if rec['timestamp'] < ts: del self[name]
112
113     def sync(self):
114         """Synchronize reality with the database contents.  This
115 method does a lot of things, and it's currently called after every
116 single batch of database changes (a GetSlivers(), a loan, a record).
117 It may be necessary in the future to do something smarter."""
118
119         # delete expired records
120         now = time.time()
121         for name, rec in self.items():
122             if rec.get('expires', now) < now: del self[name]
123
124         self._compute_effective_rspecs()
125
126         try:
127             coresched = CoreSched()
128             coresched.adjustCores(self)
129         except:
130             logger.log_exc("database: exception while doing core sched")
131
132         # create and destroy accounts as needed
133         logger.verbose("database: sync : fetching accounts")
134         existing_acct_names = account.all()
135         for name in existing_acct_names:
136             if name not in self:
137                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
138                 account.get(name).ensure_destroyed()
139         for name, rec in self.iteritems():
140             # protect this; if anything fails for a given sliver
141             # we still need the other ones to be handled
142             try:
143                 sliver = account.get(name)
144                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name,sliver._get_class()))
145                 # Make sure we refresh accounts that are running
146                 if rec['instantiation'] == 'plc-instantiated':
147                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
148                     sliver.ensure_created(rec)
149                 elif rec['instantiation'] == 'nm-controller':
150                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
151                     sliver.ensure_created(rec)
152                 # Back door to ensure PLC overrides Ticket in delegation.
153                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
154                     # if the ticket has been delivered and the nm-controller started the slice
155                     # update rspecs and keep them up to date.
156                     if sliver.is_running():
157                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
158                         sliver.ensure_created(rec)
159             except:
160                 logger.log_exc("database: sync failed to handle sliver",name=name)
161
162         # Wake up bwmom to update limits.
163         bwmon.lock.set()
164         global dump_requested
165         dump_requested = True
166         db_cond.notify()
167
168
169 def start():
170     """The database dumper daemon.
171 When it starts up, it populates the database with the last dumped database.
172 It proceeds to handle dump requests forever."""
173     def run():
174         global dump_requested
175         while True:
176             db_lock.acquire()
177             while not dump_requested: db_cond.wait()
178             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
179             dump_requested = False
180             db_lock.release()
181             try:
182                 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
183                 logger.log_database(db)
184             except:
185                 logger.log_exc("database.start: failed to pickle/dump")
186     global db
187     try:
188         f = open(DB_FILE)
189         try: db = cPickle.load(f)
190         finally: f.close()
191     except IOError:
192         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
193         db = Database()
194     except:
195         logger.log_exc("database: failed in start")
196         db = Database()
197     logger.log('database.start')
198     tools.as_daemon_thread(run)