patch by Thomas Dreibholz - ovs-vsctl and not ovs-ovsctl
[nodemanager.git] / database.py
1 #
2 """The database houses information on slivers.  This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
6
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries).  Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
11
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
14 """
15
16 import sys
17
18 import cPickle
19 import threading
20 import time
21
22 import account
23 import logger
24 import tools
25 import bwmon
26
27 # hopefully temporary
28 # is there a good reason to have this done here and not in a plugin ?
29 try:    from coresched_lxc import CoreSched
30 except: from coresched_vs  import CoreSched
31
32 # We enforce minimum allocations to keep the clueless from hosing their slivers.
33 # Disallow disk loans because there's currently no way to punish slivers over quota.
34 MINIMUM_ALLOCATION = {'cpu_pct': 0,
35                       'cpu_share': 1,
36                       'net_min_rate': 0,
37                       'net_max_rate': 8,
38                       'net_i2_min_rate': 0,
39                       'net_i2_max_rate': 8,
40                       'net_share': 1,
41                       }
42 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
43
44 DB_FILE = '/var/lib/nodemanager/database.pickle'
45
46
47 # database object and associated lock
48 db_lock = threading.RLock()
49 db = None
50
51 # these are used in tandem to request a database dump from the dumper daemon
52 db_cond = threading.Condition(db_lock)
53 dump_requested = False
54
55 # decorator that acquires and releases the database lock before and after the decorated operation
56 # XXX - replace with "with" statements once we switch to 2.5
57 def synchronized(fn):
58     def sync_fn(*args, **kw_args):
59         db_lock.acquire()
60         try: return fn(*args, **kw_args)
61         finally: db_lock.release()
62     sync_fn.__doc__ = fn.__doc__
63     sync_fn.__name__ = fn.__name__
64     return sync_fn
65
66
67 class Database(dict):
68     def __init__(self):
69         self._min_timestamp = 0
70
71     def _compute_effective_rspecs(self):
72         """Calculate the effects of loans and store the result in field _rspec.
73 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
74 In order to do the accounting, we store three different rspecs:
75  * field 'rspec', which is the resources given by PLC;
76  * field '_rspec', which is the actual amount of resources the sliver has after all loans;
77  * and variable resid_rspec, which is the amount of resources the sliver
78    has after giving out loans but not receiving any."""
79         slivers = {}
80         for name, rec in self.iteritems():
81             if 'rspec' in rec:
82                 rec['_rspec'] = rec['rspec'].copy()
83                 slivers[name] = rec
84         for rec in slivers.itervalues():
85             eff_rspec = rec['_rspec']
86             resid_rspec = rec['rspec'].copy()
87             for target, resource_name, amount in rec.get('_loans', []):
88                 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
89                     eff_rspec[resource_name] -= amount
90                     resid_rspec[resource_name] -= amount
91                     slivers[target]['_rspec'][resource_name] += amount
92
93     def deliver_record(self, rec):
94         """A record is simply a dictionary with 'name' and 'timestamp'
95 keys. We keep some persistent private data in the records under keys
96 that start with '_'; thus record updates should not displace such
97 keys."""
98         if rec['timestamp'] < self._min_timestamp: return
99         name = rec['name']
100         old_rec = self.get(name)
101         if old_rec == None: self[name] = rec
102         elif rec['timestamp'] > old_rec['timestamp']:
103             for key in old_rec.keys():
104                 if not key.startswith('_'): del old_rec[key]
105             old_rec.update(rec)
106
107     def set_min_timestamp(self, ts):
108         """The ._min_timestamp member is the timestamp on the last comprehensive update.
109 We use it to determine if a record is stale.
110 This method should be called whenever new GetSlivers() data comes in."""
111         self._min_timestamp = ts
112         for name, rec in self.items():
113             if rec['timestamp'] < ts: del self[name]
114
115     def sync(self):
116         """Synchronize reality with the database contents.  This
117 method does a lot of things, and it's currently called after every
118 single batch of database changes (a GetSlivers(), a loan, a record).
119 It may be necessary in the future to do something smarter."""
120
121         # delete expired records
122         now = time.time()
123         for name, rec in self.items():
124             if rec.get('expires', now) < now: del self[name]
125
126         self._compute_effective_rspecs()
127
128         try:
129             coresched = CoreSched()
130             coresched.adjustCores(self)
131         except:
132             logger.log_exc("database: exception while doing core sched")
133
134         # create and destroy accounts as needed
135         logger.verbose("database: sync : fetching accounts")
136         existing_acct_names = account.all()
137         for name in existing_acct_names:
138             if name not in self:
139                 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
140                 account.get(name).ensure_destroyed()
141         for name, rec in self.iteritems():
142             # protect this; if anything fails for a given sliver
143             # we still need the other ones to be handled
144             try:
145                 sliver = account.get(name)
146                 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class()))
147                 # Make sure we refresh accounts that are running
148                 if rec['instantiation'] == 'plc-instantiated':
149                     logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
150                     sliver.ensure_created(rec)
151                 elif rec['instantiation'] == 'nm-controller':
152                     logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
153                     sliver.ensure_created(rec)
154                 # Back door to ensure PLC overrides Ticket in delegation.
155                 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
156                     # if the ticket has been delivered and the nm-controller started the slice
157                     # update rspecs and keep them up to date.
158                     if sliver.is_running():
159                         logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
160                         sliver.ensure_created(rec)
161             except SystemExit as e:
162                 sys.exit(e)
163             except:
164                 logger.log_exc("database: sync failed to handle sliver", name=name)
165
166         # Wake up bwmom to update limits.
167         bwmon.lock.set()
168         global dump_requested
169         dump_requested = True
170         db_cond.notify()
171
172
173 def start():
174     """The database dumper daemon.
175 When it starts up, it populates the database with the last dumped database.
176 It proceeds to handle dump requests forever."""
177     def run():
178         global dump_requested
179         while True:
180             db_lock.acquire()
181             while not dump_requested: db_cond.wait()
182             db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
183             dump_requested = False
184             db_lock.release()
185             try:
186                 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
187                 logger.log_database(db)
188             except:
189                 logger.log_exc("database.start: failed to pickle/dump")
190     global db
191     try:
192         f = open(DB_FILE)
193         try: db = cPickle.load(f)
194         finally: f.close()
195     except IOError:
196         logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
197         db = Database()
198     except:
199         logger.log_exc("database: failed in start")
200         db = Database()
201     logger.log('database.start')
202     tools.as_daemon_thread(run)