First version. Most definitely a work in progress.
[nodemanager.git] / database.py
1 import cPickle
2 import sys
3 import threading
4 import time
5
6 from config import DB_FILE
7 import accounts
8 import bwcap
9 import logger
10 import tools
11
12
13 _db_lock = threading.RLock()
14 _db_cond = threading.Condition(_db_lock)
15 _dump_requested = False
16
17
18 def synchronized(function):
19     def sync_fun(*args, **kw_args):
20         _db_lock.acquire()
21         try: return function(*args, **kw_args)
22         finally: _db_lock.release()
23     sync_fun.__doc__ = function.__doc__
24     sync_fun.__name__ = function.__name__
25     return sync_fun
26
27
28 class Database(dict):
29     def deliver_records(self, recs):
30         ts = self.get_timestamp()
31         for rec in recs:
32             old_rec = self.setdefault(rec['record_key'], {})
33             if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
34                 old_rec.update(rec, dirty=True)
35         self.compute_effective_rspecs()
36         if self.get_timestamp() > ts:
37             self.delete_old_records()
38             self.delete_old_accounts()
39             for rec in self.itervalues(): rec['dirty'] = True
40         self.create_new_accounts()
41         self.update_bwcap()
42
43     def get_timestamp(self):
44         return self.get('timestamp', {'timestamp': 0})['timestamp']
45
46
47     def compute_effective_rspecs(self):
48         """Apply loans to field 'rspec' to get field 'eff_rspec'."""
49         slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
50                         if rec.get('account_type') == 'sliver'])
51
52         # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
53         for sliver in slivers.itervalues():
54             sliver['old_eff_rspec'] = sliver.get('eff_rspec')
55             sliver['eff_rspec'] = sliver['rspec'].copy()
56
57         # Pass 2: apply loans
58         for sliver in slivers.itervalues():
59             remaining_loanable_amount = sliver['rspec'].copy()
60             for other_name, resource, amount in sliver.get('loans', []):
61                 if other_name in slivers and \
62                        0 < amount <= remaining_loanable_amount[resource]:
63                     sliver['eff_rspec'][resource] -= amount
64                     remaining_loanable_amount[resource] -= amount
65                     slivers[other_name]['eff_rspec'][resource] += amount
66
67         # Pass 3: mark changed rspecs dirty
68         for sliver in slivers.itervalues():
69             if sliver['eff_rspec'] != sliver['old_eff_rspec']:
70                 sliver['needs_update'] = True
71             del sliver['old_eff_rspec']
72
73
74     def delete_old_records(self):
75         ts = self.get_timestamp()
76         now = time.time()
77         for key in self.keys():
78             rec = self[key]
79             if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now:
80                 del self[key]
81
82     def delete_old_accounts(self):
83         for acct_type, name in accounts.all():
84             if ('%s_%s' % (acct_type, name)) not in self:
85                 accounts.get(name).ensure_destroyed()
86
87     def create_new_accounts(self):
88         """Invoke the appropriate create() function for every dirty account."""
89         for rec in self.itervalues():
90             if 'account_type' not in rec: continue
91             if rec['dirty'] and rec['plc_instantiated']:
92                 accounts.get(rec['name']).ensure_created(rec)
93             rec['dirty'] = False
94
95     def update_bwcap(self):
96         bwcap_rec = self.get('bwcap')
97         if bwcap_rec and bwcap_rec['dirty']:
98             bwcap.update(bwcap_rec)
99             bwcap_rec['dirty'] = False
100
101
102 _db = Database()
103
104 @synchronized
105 def deliver_records(recs):
106     global _dump_requested
107     _db.deliver_records(recs)
108     _dump_requested = True
109     _db_cond.notify()
110
111 @synchronized
112 def get_sliver(name): return _db.get('sliver_'+name)
113
114 def start():
115     def run():
116         global _dump_requested
117         _db_lock.acquire()
118         try:  # load the db
119             f = open(DB_FILE)
120             _db.update(cPickle.load(f))
121             f.close()
122         except: logger.log_exc()
123         while True:  # handle dump requests forever
124             while not _dump_requested:
125                 _db_cond.wait()
126             db_copy = tools.deepcopy(_db)
127             _dump_requested = False
128             _db_lock.release()
129             try: tools.write_file(DB_FILE,
130                                   lambda f: cPickle.dump(db_copy, f, -1))
131             except: logger.log_exc()
132             _db_lock.acquire()
133     tools.as_daemon_thread(run)