12 DB_FILE = '/root/pl_node_mgr_db.pickle'
16 def __init__(self): self.account_index = {}
18 def deliver_records(self, recs):
19 ts = self.get_timestamp()
21 old_rec = self.setdefault(rec['record_key'], {})
22 if rec['timestamp'] >= max(ts, old_rec.get('timestamp', 0)):
23 old_rec.update(rec, dirty=True)
24 self.compute_effective_rspecs()
25 if self.get_timestamp() > ts:
26 self.delete_old_records()
27 self.delete_old_accounts()
28 for rec in self.itervalues(): rec['dirty'] = True
29 self.create_new_accounts()
32 def compute_effective_rspecs(self):
33 """Apply loans to field 'rspec' to get field 'eff_rspec'."""
34 slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
35 if rec.get('account_type') == 'sliver.VServer'])
37 # Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
38 for sliver in slivers.itervalues():
39 sliver['old_eff_rspec'] = sliver.get('eff_rspec')
40 sliver['eff_rspec'] = sliver['rspec'].copy()
43 for sliver in slivers.itervalues():
44 remaining_loanable_amount = sliver['rspec'].copy()
45 for other_name, resource, amount in sliver.get('loans', []):
46 if other_name in slivers and \
47 0 < amount <= remaining_loanable_amount[resource]:
48 sliver['eff_rspec'][resource] -= amount
49 remaining_loanable_amount[resource] -= amount
50 slivers[other_name]['eff_rspec'][resource] += amount
52 # Pass 3: mark changed rspecs dirty
53 for sliver in slivers.itervalues():
54 if sliver['eff_rspec'] != sliver['old_eff_rspec']:
55 sliver['needs_update'] = True
56 del sliver['old_eff_rspec']
58 def rebuild_account_index(self):
59 self.account_index.clear()
60 for rec in self.itervalues():
61 if 'account_type' in rec: self.account_index[rec['name']] = rec
63 def delete_stale_records(self, ts):
64 for key, rec in self.items():
65 if rec['timestamp'] < ts: del self[key]
67 def delete_expired_records(self):
68 for key, rec in self.items():
69 if rec.get('expires', sys.maxint) < time.time(): del self[key]
71 def destroy_old_accounts(self):
72 for name in accounts.all():
73 if name not in self.account_index: accounts.get(name).ensure_destroyed()
75 def create_new_accounts(self):
76 """Invoke the appropriate create() function for every dirty account."""
77 for rec in self.account_index.itervalues():
78 if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec)
81 def update_bwcap(self):
82 bwcap_rec = self.get('bwcap')
83 if bwcap_rec and bwcap_rec['dirty']:
84 bwcap.update(bwcap_rec)
85 bwcap_rec['dirty'] = False
88 # database object and associated lock
89 _db_lock = threading.RLock()
91 # these are used in tandem to request a database dump from the dumper daemon
92 _db_cond = threading.Condition(_db_lock)
93 _dump_requested = False
96 # decorator that acquires and releases the database lock before and after the decorated operation
97 def synchronized(function):
98 def sync_fun(*args, **kw_args):
100 try: return function(*args, **kw_args)
101 finally: _db_lock.release()
102 sync_fun.__doc__ = function.__doc__
103 sync_fun.__name__ = function.__name__
107 # apply the given records to the database and request a dump
109 def deliver_records(recs):
110 global _dump_requested
111 _db.deliver_records(recs)
112 _dump_requested = True
116 """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
118 global _dump_requested
122 _db.update(cPickle.load(f))
124 except: logger.log_exc()
125 while True: # handle dump requests forever
126 while not _dump_requested: _db_cond.wait()
127 db_copy = tools.deepcopy(_db)
128 _dump_requested = False
130 try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
131 except: logger.log_exc()
133 tools.as_daemon_thread(run)