2 """The database houses information on slivers. This information
3 reaches the sliver manager in two different ways: one, through the
4 GetSlivers() call made periodically; two, by users delivering tickets.
5 The sync() method of the Database class turns this data into reality.
7 The database format is a dictionary that maps account names to records
8 (also known as dictionaries). Inside each record, data supplied or
9 computed locally is stored under keys that begin with an underscore,
10 and data from PLC is stored under keys that don't.
12 In order to maintain service when the node reboots during a network
13 partition, the database is constantly being dumped to disk.
28 # is there a good reason to have this done here and not in a plugin ?
29 try: from coresched_lxc import CoreSched
30 except: from coresched_vs import CoreSched
32 # We enforce minimum allocations to keep the clueless from hosing their slivers.
33 # Disallow disk loans because there's currently no way to punish slivers over quota.
34 MINIMUM_ALLOCATION = {'cpu_pct': 0,
42 LOANABLE_RESOURCES = MINIMUM_ALLOCATION.keys()
44 DB_FILE = '/var/lib/nodemanager/database.pickle'
47 # database object and associated lock
48 db_lock = threading.RLock()
51 # these are used in tandem to request a database dump from the dumper daemon
52 db_cond = threading.Condition(db_lock)
53 dump_requested = False
55 # decorator that acquires and releases the database lock before and after the decorated operation
56 # XXX - replace with "with" statements once we switch to 2.5
58 def sync_fn(*args, **kw_args):
60 try: return fn(*args, **kw_args)
61 finally: db_lock.release()
62 sync_fn.__doc__ = fn.__doc__
63 sync_fn.__name__ = fn.__name__
69 self._min_timestamp = 0
71 def _compute_effective_rspecs(self):
72 """Calculate the effects of loans and store the result in field _rspec.
73 At the moment, we allow slivers to loan only those resources that they have received directly from PLC.
74 In order to do the accounting, we store three different rspecs:
75 * field 'rspec', which is the resources given by PLC;
76 * field '_rspec', which is the actual amount of resources the sliver has after all loans;
77 * and variable resid_rspec, which is the amount of resources the sliver
78 has after giving out loans but not receiving any."""
80 for name, rec in self.iteritems():
82 rec['_rspec'] = rec['rspec'].copy()
84 for rec in slivers.itervalues():
85 eff_rspec = rec['_rspec']
86 resid_rspec = rec['rspec'].copy()
87 for target, resource_name, amount in rec.get('_loans', []):
88 if target in slivers and amount <= resid_rspec[resource_name] - MINIMUM_ALLOCATION[resource_name]:
89 eff_rspec[resource_name] -= amount
90 resid_rspec[resource_name] -= amount
91 slivers[target]['_rspec'][resource_name] += amount
93 def deliver_record(self, rec):
94 """A record is simply a dictionary with 'name' and 'timestamp'
95 keys. We keep some persistent private data in the records under keys
96 that start with '_'; thus record updates should not displace such
98 if rec['timestamp'] < self._min_timestamp: return
100 old_rec = self.get(name)
101 if old_rec == None: self[name] = rec
102 elif rec['timestamp'] > old_rec['timestamp']:
103 for key in old_rec.keys():
104 if not key.startswith('_'): del old_rec[key]
107 def set_min_timestamp(self, ts):
108 """The ._min_timestamp member is the timestamp on the last comprehensive update.
109 We use it to determine if a record is stale.
110 This method should be called whenever new GetSlivers() data comes in."""
111 self._min_timestamp = ts
112 for name, rec in self.items():
113 if rec['timestamp'] < ts: del self[name]
116 """Synchronize reality with the database contents. This
117 method does a lot of things, and it's currently called after every
118 single batch of database changes (a GetSlivers(), a loan, a record).
119 It may be necessary in the future to do something smarter."""
121 # delete expired records
123 for name, rec in self.items():
124 if rec.get('expires', now) < now: del self[name]
126 self._compute_effective_rspecs()
129 coresched = CoreSched()
130 coresched.adjustCores(self)
132 logger.log_exc("database: exception while doing core sched")
134 # create and destroy accounts as needed
135 logger.verbose("database: sync : fetching accounts")
136 existing_acct_names = account.all()
137 for name in existing_acct_names:
139 logger.verbose("database: sync : ensure_destroy'ing %s"%name)
140 account.get(name).ensure_destroyed()
141 for name, rec in self.iteritems():
142 # protect this; if anything fails for a given sliver
143 # we still need the other ones to be handled
145 sliver = account.get(name)
146 logger.verbose("database: sync : looping on %s (shell account class from pwd %s)" %(name, sliver._get_class()))
147 # Make sure we refresh accounts that are running
148 if rec['instantiation'] == 'plc-instantiated':
149 logger.verbose ("database: sync : ensure_create'ing 'instantiation' sliver %s"%name)
150 sliver.ensure_created(rec)
151 elif rec['instantiation'] == 'nm-controller':
152 logger.verbose ("database: sync : ensure_create'ing 'nm-controller' sliver %s"%name)
153 sliver.ensure_created(rec)
154 # Back door to ensure PLC overrides Ticket in delegation.
155 elif rec['instantiation'] == 'delegated' and sliver._get_class() != None:
156 # if the ticket has been delivered and the nm-controller started the slice
157 # update rspecs and keep them up to date.
158 if sliver.is_running():
159 logger.verbose ("database: sync : ensure_create'ing 'delegated' sliver %s"%name)
160 sliver.ensure_created(rec)
161 except SystemExit as e:
164 logger.log_exc("database: sync failed to handle sliver", name=name)
166 # Wake up bwmom to update limits.
168 global dump_requested
169 dump_requested = True
174 """The database dumper daemon.
175 When it starts up, it populates the database with the last dumped database.
176 It proceeds to handle dump requests forever."""
178 global dump_requested
181 while not dump_requested: db_cond.wait()
182 db_pickle = cPickle.dumps(db, cPickle.HIGHEST_PROTOCOL)
183 dump_requested = False
186 tools.write_file(DB_FILE, lambda f: f.write(db_pickle))
187 logger.log_database(db)
189 logger.log_exc("database.start: failed to pickle/dump")
193 try: db = cPickle.load(f)
196 logger.log ("database: Could not load %s -- starting from a fresh database"%DB_FILE)
199 logger.log_exc("database: failed in start")
201 logger.log('database.start')
202 tools.as_daemon_thread(run)