More documentation.
[nodemanager.git] / database.py
1 import cPickle
2 import sys
3 import threading
4 import time
5
6 from config import DB_FILE
7 import accounts
8 import bwcap
9 import logger
10 import tools
11
12
13 class Database(dict):
14     def deliver_records(self, recs):
15         ts = self.get_timestamp()
16         for rec in recs:
17             old_rec = self.setdefault(rec['record_key'], {})
18             if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
19                 old_rec.update(rec, dirty=True)
20         self.compute_effective_rspecs()
21         if self.get_timestamp() > ts:
22             self.delete_old_records()
23             self.delete_old_accounts()
24             for rec in self.itervalues(): rec['dirty'] = True
25         self.create_new_accounts()
26         self.update_bwcap()
27
28     def get_timestamp(self):
29         return self.get('timestamp', {'timestamp': 0})['timestamp']
30
31
32     def compute_effective_rspecs(self):
33         """Apply loans to field 'rspec' to get field 'eff_rspec'."""
34         slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
35                         if rec.get('account_type') == 'sliver'])
36
37         # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
38         for sliver in slivers.itervalues():
39             sliver['old_eff_rspec'] = sliver.get('eff_rspec')
40             sliver['eff_rspec'] = sliver['rspec'].copy()
41
42         # Pass 2: apply loans
43         for sliver in slivers.itervalues():
44             remaining_loanable_amount = sliver['rspec'].copy()
45             for other_name, resource, amount in sliver.get('loans', []):
46                 if other_name in slivers and \
47                        0 < amount <= remaining_loanable_amount[resource]:
48                     sliver['eff_rspec'][resource] -= amount
49                     remaining_loanable_amount[resource] -= amount
50                     slivers[other_name]['eff_rspec'][resource] += amount
51
52         # Pass 3: mark changed rspecs dirty
53         for sliver in slivers.itervalues():
54             if sliver['eff_rspec'] != sliver['old_eff_rspec']:
55                 sliver['needs_update'] = True
56             del sliver['old_eff_rspec']
57
58
59     def delete_old_records(self):
60         ts = self.get_timestamp()
61         now = time.time()
62         for key in self.keys():
63             rec = self[key]
64             if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now:
65                 del self[key]
66
67     def delete_old_accounts(self):
68         for acct_type, name in accounts.all():
69             if ('%s_%s' % (acct_type, name)) not in self:
70                 accounts.get(name).ensure_destroyed()
71
72     def create_new_accounts(self):
73         """Invoke the appropriate create() function for every dirty account."""
74         for rec in self.itervalues():
75             if 'account_type' not in rec: continue
76             if rec['dirty'] and rec['plc_instantiated']:
77                 accounts.get(rec['name']).ensure_created(rec)
78             rec['dirty'] = False
79
80     def update_bwcap(self):
81         bwcap_rec = self.get('bwcap')
82         if bwcap_rec and bwcap_rec['dirty']:
83             bwcap.update(bwcap_rec)
84             bwcap_rec['dirty'] = False
85
86
87 # database object and associated lock
88 _db_lock = threading.RLock()
89 _db = Database()
90 # these are used in tandem to request a database dump from the dumper daemon
91 _db_cond = threading.Condition(_db_lock)
92 _dump_requested = False
93
94
95 # decorator that acquires and releases the database lock before and after the decorated operation
96 def synchronized(function):
97     def sync_fun(*args, **kw_args):
98         _db_lock.acquire()
99         try: return function(*args, **kw_args)
100         finally: _db_lock.release()
101     sync_fun.__doc__ = function.__doc__
102     sync_fun.__name__ = function.__name__
103     return sync_fun
104
105
106 # apply the given records to the database and request a dump
107 @synchronized
108 def deliver_records(recs):
109     global _dump_requested
110     _db.deliver_records(recs)
111     _dump_requested = True
112     _db_cond.notify()
113
114 @synchronized
115 def get_sliver(name): return _db.get('sliver_'+name)
116
117 def start():
118     """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
119     def run():
120         global _dump_requested
121         _db_lock.acquire()
122         try:  # load the db
123             f = open(DB_FILE)
124             _db.update(cPickle.load(f))
125             f.close()
126         except: logger.log_exc()
127         while True:  # handle dump requests forever
128             while not _dump_requested:
129                 _db_cond.wait()
130             db_copy = tools.deepcopy(_db)
131             _dump_requested = False
132             _db_lock.release()
133             try: tools.write_file(DB_FILE,
134                                   lambda f: cPickle.dump(db_copy, f, -1))
135             except: logger.log_exc()
136             _db_lock.acquire()
137     tools.as_daemon_thread(run)