...
[nodemanager.git] / database.py
1 import cPickle
2 import threading
3 import time
4
5 try: from bwlimit import bwmin, bwmax
6 except ImportError: bwmin, bwmax = 8, 1000000000
7 import accounts
8 import logger
9 import tools
10
11
12 DB_FILE = '/root/node_mgr_db.pickle'
13
14 LOANABLE_RESOURCES = ['cpu_min', 'cpu_share', 'net_min', 'net_max', 'net2_min', 'net2_max', 'net_share', 'disk_max']
15
16 DEFAULT_ALLOCATIONS = {'enabled': 1, 'cpu_min': 0, 'cpu_share': 32, 'net_min': bwmin, 'net_max': bwmax, 'net2_min': bwmin, 'net2_max': bwmax, 'net_share': 1, 'disk_max': 5000000}
17
18
19 # database object and associated lock
20 db_lock = threading.RLock()
21 db = None
22
23 # these are used in tandem to request a database dump from the dumper daemon
24 db_cond = threading.Condition(db_lock)
25 dump_requested = False
26
27 # decorator that acquires and releases the database lock before and after the decorated operation
28 def synchronized(fn):
29     def sync_fn(*args, **kw_args):
30         db_lock.acquire()
31         try: return fn(*args, **kw_args)
32         finally: db_lock.release()
33     sync_fn.__doc__ = fn.__doc__
34     sync_fn.__name__ = fn.__name__
35     return sync_fn
36
37
38 class Database(dict):
39     def __init__(self):
40         self._min_timestamp = 0
41
42     def _compute_effective_rspecs(self):
43         """Calculate the effects of loans and store the result in field _rspec.  At the moment, we allow slivers to loan only those resources that they have received directly from PLC.  In order to do the accounting, we store three different rspecs: field 'rspec', which is the resources given by PLC; field '_rspec', which is the actual amount of resources the sliver has after all loans; and variable resid_rspec, which is the amount of resources the sliver has after giving out loans but not receiving any."""
44         slivers = {}
45         for name, rec in self.iteritems():
46             if 'rspec' in rec:
47                 rec['_rspec'] = rec['rspec'].copy()
48                 slivers[name] = rec
49         for rec in slivers.itervalues():
50             eff_rspec = rec['_rspec']
51             resid_rspec = rec['rspec'].copy()
52             for target, resname, amt in rec.get('_loans', []):
53                 if target in slivers and amt < resid_rspec[resname]:
54                     eff_rspec[resname] -= amt
55                     resid_rspec[resname] -= amt
56                     slivers[target]['_rspec'][resname] += amt
57
58     def deliver_record(self, rec):
59         """A record is simply a dictionary with 'name' and 'timestamp' keys.  We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
60         name = rec['name']
61         old_rec = self.get(name)
62         if old_rec != None and rec['timestamp'] > old_rec['timestamp']:
63             for key in old_rec.keys():
64                 if not key.startswith('_'): del old_rec[key]
65             old_rec.update(rec)
66         elif rec['timestamp'] >= self._min_timestamp: self[name] = rec
67
68     def set_min_timestamp(self, ts):
69         self._min_timestamp = ts
70         for name, rec in self.items():
71             if rec['timestamp'] < ts: del self[name]
72
73     def sync(self):
74         # delete expired records
75         now = time.time()
76         for name, rec in self.items():
77             if rec.get('expires', now) < now: del self[name]
78
79         self._compute_effective_rspecs()
80
81         # create and destroy accounts as needed
82         existing_acct_names = accounts.all()
83         for name in existing_acct_names:
84             if name not in self: accounts.get(name).ensure_destroyed()
85         for name, rec in self.iteritems():
86             if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
87
88         # request a database dump
89         global dump_requested
90         dump_requested = True
91         db_cond.notify()
92
93
94 @synchronized
95 def GetSlivers_callback(data):
96     for d in data:
97         for sliver in d['slivers']:
98             rec = sliver.copy()
99             attr_dict = {}
100             for attr in rec.pop('attributes'): attr_dict[attr['name']] = attr_dict[attr['value']]
101             keys = rec.pop('keys')
102             rec['keys'] = '\n'.join([key_struct['key'] for key_struct in keys])
103             rspec = {}
104             rec['rspec'] = rspec
105             for resname, default_amt in DEFAULT_ALLOCATIONS.iteritems():
106                 try: amt = int(attr_dict[resname])
107                 except (KeyError, ValueError): amt = default_amt
108                 rspec[resname] = amt
109         db.set_min_timestamp(d['timestamp'])
110     db.sync()
111
112
113 def start():
114     """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
115     def run():
116         global dump_requested
117         while True:
118             db_lock.acquire()
119             while not dump_requested: db_cond.wait()
120             db_copy = tools.deepcopy(db)
121             dump_requested = False
122             db_lock.release()
123             try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
124             except: logger.log_exc()
125     global db
126     try:
127         f = open(DB_FILE)
128         try: db = cPickle.load(f)
129         finally: f.close()
130     except:
131         logger.log_exc()
132         db = Database()
133     tools.as_daemon_thread(run)