5 try: from bwlimit import bwmin, bwmax
6 except ImportError: bwmin, bwmax = 8, 1000000000
12 DB_FILE = '/root/node_mgr_db.pickle'
14 LOANABLE_RESOURCES = ['cpu_min', 'cpu_share', 'net_min', 'net_max', 'net2_min', 'net2_max', 'net_share', 'disk_max']
16 DEFAULT_ALLOCATIONS = {'enabled': 1, 'cpu_min': 0, 'cpu_share': 32, 'net_min': bwmin, 'net_max': bwmax, 'net2_min': bwmin, 'net2_max': bwmax, 'net_share': 1, 'disk_max': 5000000}
19 # database object and associated lock
20 db_lock = threading.RLock()
23 # these are used in tandem to request a database dump from the dumper daemon
24 db_cond = threading.Condition(db_lock)
25 dump_requested = False
27 # decorator that acquires and releases the database lock before and after the decorated operation
29 def sync_fn(*args, **kw_args):
31 try: return fn(*args, **kw_args)
32 finally: db_lock.release()
33 sync_fn.__doc__ = fn.__doc__
34 sync_fn.__name__ = fn.__name__
40 self._min_timestamp = 0
42 def _compute_effective_rspecs(self):
43 """Calculate the effects of loans and store the result in field _rspec. At the moment, we allow slivers to loan only those resources that they have received directly from PLC. In order to do the accounting, we store three different rspecs: field 'rspec', which is the resources given by PLC; field '_rspec', which is the actual amount of resources the sliver has after all loans; and variable resid_rspec, which is the amount of resources the sliver has after giving out loans but not receiving any."""
45 for name, rec in self.iteritems():
47 rec['_rspec'] = rec['rspec'].copy()
49 for rec in slivers.itervalues():
50 eff_rspec = rec['_rspec']
51 resid_rspec = rec['rspec'].copy()
52 for target, resname, amt in rec.get('_loans', []):
53 if target in slivers and amt < resid_rspec[resname]:
54 eff_rspec[resname] -= amt
55 resid_rspec[resname] -= amt
56 slivers[target]['_rspec'][resname] += amt
58 def deliver_record(self, rec):
59 """A record is simply a dictionary with 'name' and 'timestamp' keys. We keep some persistent private data in the records under keys that start with '_'; thus record updates should not displace such keys."""
61 old_rec = self.get(name)
62 if old_rec != None and rec['timestamp'] > old_rec['timestamp']:
63 for key in old_rec.keys():
64 if not key.startswith('_'): del old_rec[key]
66 elif rec['timestamp'] >= self._min_timestamp: self[name] = rec
68 def set_min_timestamp(self, ts):
69 self._min_timestamp = ts
70 for name, rec in self.items():
71 if rec['timestamp'] < ts: del self[name]
74 # delete expired records
76 for name, rec in self.items():
77 if rec.get('expires', now) < now: del self[name]
79 self._compute_effective_rspecs()
81 # create and destroy accounts as needed
82 existing_acct_names = accounts.all()
83 for name in existing_acct_names:
84 if name not in self: accounts.get(name).ensure_destroyed()
85 for name, rec in self.iteritems():
86 if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
88 # request a database dump
95 def GetSlivers_callback(data):
97 for sliver in d['slivers']:
100 for attr in rec.pop('attributes'): attr_dict[attr['name']] = attr_dict[attr['value']]
101 keys = rec.pop('keys')
102 rec['keys'] = '\n'.join([key_struct['key'] for key_struct in keys])
105 for resname, default_amt in DEFAULT_ALLOCATIONS.iteritems():
106 try: amt = int(attr_dict[resname])
107 except (KeyError, ValueError): amt = default_amt
109 db.set_min_timestamp(d['timestamp'])
114 """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
116 global dump_requested
119 while not dump_requested: db_cond.wait()
120 db_copy = tools.deepcopy(db)
121 dump_requested = False
123 try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
124 except: logger.log_exc()
128 try: db = cPickle.load(f)
133 tools.as_daemon_thread(run)