Refactoring in progress...
[nodemanager.git] / database.py
1 import cPickle
2 import sys
3 import threading
4 import time
5
6 import accounts
7 import bwcap
8 import logger
9 import tools
10
11
12 DB_FILE = '/root/pl_node_mgr_db.pickle'
13
14
15 class Database(dict):
16     def __init__(self): self.account_index = {}
17
18     def deliver_records(self, recs):
19         ts = self.get_timestamp()
20         for rec in recs:
21             old_rec = self.setdefault(rec['record_key'], {})
22             if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
23                 old_rec.update(rec, dirty=True)
24         self.compute_effective_rspecs()
25         if self.get_timestamp() > ts:
26             self.delete_old_records()
27             self.delete_old_accounts()
28             for rec in self.itervalues(): rec['dirty'] = True
29         self.create_new_accounts()
30         self.update_bwcap()
31
32     def compute_effective_rspecs(self):
33         """Apply loans to field 'rspec' to get field 'eff_rspec'."""
34         slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
35                         if rec.get('account_type') == 'sliver.VServer'])
36
37         # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
38         for sliver in slivers.itervalues():
39             sliver['old_eff_rspec'] = sliver.get('eff_rspec')
40             sliver['eff_rspec'] = sliver['rspec'].copy()
41
42         # Pass 2: apply loans
43         for sliver in slivers.itervalues():
44             remaining_loanable_amount = sliver['rspec'].copy()
45             for other_name, resource, amount in sliver.get('loans', []):
46                 if other_name in slivers and \
47                        0 < amount <= remaining_loanable_amount[resource]:
48                     sliver['eff_rspec'][resource] -= amount
49                     remaining_loanable_amount[resource] -= amount
50                     slivers[other_name]['eff_rspec'][resource] += amount
51
52         # Pass 3: mark changed rspecs dirty
53         for sliver in slivers.itervalues():
54             if sliver['eff_rspec'] != sliver['old_eff_rspec']:
55                 sliver['needs_update'] = True
56             del sliver['old_eff_rspec']
57
58     def rebuild_account_index(self):
59         self.account_index.clear()
60         for rec in self.itervalues():
61             if 'account_type' in rec: self.account_index[rec['name']] = rec
62
63     def delete_stale_records(self, ts):
64         for key, rec in self.items():
65             if rec['timestamp'] < ts: del self[key]
66
67     def delete_expired_records(self):
68         for key, rec in self.items():
69             if rec.get('expires', sys.maxint) < time.time(): del self[key]
70
71     def destroy_old_accounts(self):
72         for name in accounts.all():
73             if name not in self.account_index: accounts.get(name).ensure_destroyed()
74
75     def create_new_accounts(self):
76         """Invoke the appropriate create() function for every dirty account."""
77         for rec in self.account_index.itervalues():
78             if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec)
79             rec['dirty'] = False
80
81     def update_bwcap(self):
82         bwcap_rec = self.get('bwcap')
83         if bwcap_rec and bwcap_rec['dirty']:
84             bwcap.update(bwcap_rec)
85             bwcap_rec['dirty'] = False
86
87
88 # database object and associated lock
89 _db_lock = threading.RLock()
90 _db = Database()
91 # these are used in tandem to request a database dump from the dumper daemon
92 _db_cond = threading.Condition(_db_lock)
93 _dump_requested = False
94
95
96 # decorator that acquires and releases the database lock before and after the decorated operation
97 def synchronized(function):
98     def sync_fun(*args, **kw_args):
99         _db_lock.acquire()
100         try: return function(*args, **kw_args)
101         finally: _db_lock.release()
102     sync_fun.__doc__ = function.__doc__
103     sync_fun.__name__ = function.__name__
104     return sync_fun
105
106
107 # apply the given records to the database and request a dump
108 @synchronized
109 def deliver_records(recs):
110     global _dump_requested
111     _db.deliver_records(recs)
112     _dump_requested = True
113     _db_cond.notify()
114
115 def start():
116     """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
117     def run():
118         global _dump_requested
119         _db_lock.acquire()
120         try:  # load the db
121             f = open(DB_FILE)
122             _db.update(cPickle.load(f))
123             f.close()
124         except: logger.log_exc()
125         while True:  # handle dump requests forever
126             while not _dump_requested: _db_cond.wait()
127             db_copy = tools.deepcopy(_db)
128             _dump_requested = False
129             _db_lock.release()
130             try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
131             except: logger.log_exc()
132             _db_lock.acquire()
133     tools.as_daemon_thread(run)