6 from config import DB_FILE
14 def deliver_records(self, recs):
15 ts = self.get_timestamp()
17 old_rec = self.setdefault(rec['record_key'], {})
18 if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
19 old_rec.update(rec, dirty=True)
20 self.compute_effective_rspecs()
21 if self.get_timestamp() > ts:
22 self.delete_old_records()
23 self.delete_old_accounts()
24 for rec in self.itervalues(): rec['dirty'] = True
25 self.create_new_accounts()
28 def get_timestamp(self):
29 return self.get('timestamp', {'timestamp': 0})['timestamp']
32 def compute_effective_rspecs(self):
33 """Apply loans to field 'rspec' to get field 'eff_rspec'."""
34 slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
35 if rec.get('account_type') == 'sliver'])
37 # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
38 for sliver in slivers.itervalues():
39 sliver['old_eff_rspec'] = sliver.get('eff_rspec')
40 sliver['eff_rspec'] = sliver['rspec'].copy()
43 for sliver in slivers.itervalues():
44 remaining_loanable_amount = sliver['rspec'].copy()
45 for other_name, resource, amount in sliver.get('loans', []):
46 if other_name in slivers and \
47 0 < amount <= remaining_loanable_amount[resource]:
48 sliver['eff_rspec'][resource] -= amount
49 remaining_loanable_amount[resource] -= amount
50 slivers[other_name]['eff_rspec'][resource] += amount
52 # Pass 3: mark changed rspecs dirty
53 for sliver in slivers.itervalues():
54 if sliver['eff_rspec'] != sliver['old_eff_rspec']:
55 sliver['needs_update'] = True
56 del sliver['old_eff_rspec']
59 def delete_old_records(self):
60 ts = self.get_timestamp()
62 for key in self.keys():
64 if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now:
67 def delete_old_accounts(self):
68 for acct_type, name in accounts.all():
69 if ('%s_%s' % (acct_type, name)) not in self:
70 accounts.get(name).ensure_destroyed()
72 def create_new_accounts(self):
73 """Invoke the appropriate create() function for every dirty account."""
74 for rec in self.itervalues():
75 if 'account_type' not in rec: continue
76 if rec['dirty'] and rec['plc_instantiated']:
77 accounts.get(rec['name']).ensure_created(rec)
80 def update_bwcap(self):
81 bwcap_rec = self.get('bwcap')
82 if bwcap_rec and bwcap_rec['dirty']:
83 bwcap.update(bwcap_rec)
84 bwcap_rec['dirty'] = False
87 # database object and associated lock
88 _db_lock = threading.RLock()
90 # these are used in tandem to request a database dump from the dumper daemon
91 _db_cond = threading.Condition(_db_lock)
92 _dump_requested = False
95 # decorator that acquires and releases the database lock before and after the decorated operation
96 def synchronized(function):
97 def sync_fun(*args, **kw_args):
99 try: return function(*args, **kw_args)
100 finally: _db_lock.release()
101 sync_fun.__doc__ = function.__doc__
102 sync_fun.__name__ = function.__name__
106 # apply the given records to the database and request a dump
108 def deliver_records(recs):
109 global _dump_requested
110 _db.deliver_records(recs)
111 _dump_requested = True
115 def get_sliver(name): return _db.get('sliver_'+name)
118 """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
120 global _dump_requested
124 _db.update(cPickle.load(f))
126 except: logger.log_exc()
127 while True: # handle dump requests forever
128 while not _dump_requested:
130 db_copy = tools.deepcopy(_db)
131 _dump_requested = False
133 try: tools.write_file(DB_FILE,
134 lambda f: cPickle.dump(db_copy, f, -1))
135 except: logger.log_exc()
137 tools.as_daemon_thread(run)