+"""Functionality common to all account classes.
+
+Each account class must provide five methods: create(), destroy(),
+configure(), start(), and stop(). In addition, it must provide static
+member variables SHELL, which contains the unique shell that it uses;
+and TYPE, which contains a description of the type that it uses. TYPE
+is divided hierarchically by periods; at the moment the only
+convention is that all sliver accounts have type that begins with
+sliver.
+
+Because Python does dynamic method lookup, we do not bother with a
+boilerplate abstract superclass.
+
+There are any number of race conditions that may result from the fact
+that account names are not unique over time. Moreover, it's a bad
+idea to perform lengthy operations while holding the database lock.
+In order to deal with both of these problems, we use a worker thread
+for each account name that ever exists. On 32-bit systems with large
+numbers of accounts, this may cause the NM process to run out of
+*virtual* memory! This problem may be remedied by decreasing the
+maximum stack size.
+"""
+
import Queue
import os
import pwd
type_acct_class = {}
def register_class(acct_class):
- """Call once for each account class. This method adds the class to the dictionaries used to ook up account classes by shell and type."""
+ """Call once for each account class. This method adds the class to the dictionaries used to look up account classes by shell and type."""
shell_acct_class[acct_class.SHELL] = acct_class
type_acct_class[acct_class.TYPE] = acct_class
_name_worker = {}
def all():
- """Returns a list of all NM accounts on the system. Format is (type, username)."""
- pw_ents = pwd.getpwall()
- for pw_ent in pw_ents:
- if pw_ent[6] in shell_acct_class:
- yield shell_acct_class[pw_ent[6]].TYPE, pw_ent[0]
+ """Return the names of all accounts on the system with recognized shells."""
+ return [pw_ent[0] for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
def get(name):
"""Return the worker object for a particular username. If no such object exists, create it first."""
finally: _name_worker_lock.release()
-def install_ssh_keys(rec):
- """Write <rec['ssh_keys']> to <rec['name']>'s authorized_keys file."""
- dot_ssh = '/home/%s/.ssh' % rec['name']
+def install_keys(rec):
+ """Write <rec['keys']> to <rec['name']>'s authorized_keys file."""
+ name = rec['name']
+ dot_ssh = '/home/%s/.ssh' % name
def do_installation():
if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
- tools.write_file(dot_ssh + '/authorized_keys',
- lambda thefile: thefile.write(rec['ssh_keys']))
- logger.log('%s: installing ssh keys' % rec['name'])
- tools.fork_as(rec['name'], do_installation)
+ tools.write_file(dot_ssh + '/authorized_keys', lambda thefile: thefile.write(rec['keys']))
+ logger.log('%s: installing ssh keys' % name)
+ tools.fork_as(name, do_installation)
class Worker:
_destroy_sem = threading.Semaphore(1)
def __init__(self, name):
- # username
- self.name = name
- # the account object currently associated with this worker
- self._acct = None
+ self.name = name # username
+ self._acct = None # the account object currently associated with this worker
# task list
# outsiders request operations by putting (fn, args...) tuples on _q
# the worker thread (created below) will perform these operations in order
tools.as_daemon_thread(self._run)
def ensure_created(self, rec):
- """Caused the account specified by <rec> to exist if it doesn't already."""
- self._q.put((self._ensure_created, tools.deepcopy(rec)))
+ """Cause the account specified by <rec> to exist if it doesn't already."""
+ self._q.put((self._ensure_created, rec.copy()))
def _ensure_created(self, rec):
curr_class = self._get_class()
def _make_acct_obj(self):
curr_class = self._get_class()
- if not isinstance(self._acct, curr_class):
- self._acct = curr_class(self.name)
+ if not isinstance(self._acct, curr_class): self._acct = curr_class(self.name)
def _run(self):
while True:
import threading
import xmlrpclib
-from config import *
import accounts
import database
import logger
import tools
+API_SERVER_PORT = 812
+
+
api_method_dict = {}
nargs_dict = {}
+++ /dev/null
-"""Global parameters and configuration."""
-
-try:
- from bwlimit import bwmin, bwmax
-
- DEFAULT_RSPEC = {'nm_cpu_share': 32, 'nm_cpu_guaranteed_share': 0,
- 'nm_disk_quota': 5000000,
- 'nm_enabled': 1,
- 'nm_net_min_rate': bwmin, 'nm_net_max_rate': bwmax,
- 'nm_net_exempt_min_rate': bwmin,
- 'nm_net_exempt_max_rate': bwmax,
- 'nm_net_share': 1}
-except ImportError: pass
-
-API_SERVER_PORT = 812
-
-DB_FILE = '/root/pl_node_mgr_db.pickle'
-
-KEY_FILE = '/home/deisenst/nm/key.pem'
-
-LOANABLE_RESOURCES = set(['nm_cpu_share', 'nm_cpu_guaranteed_share',
- 'nm_net_max_rate', 'nm_net_exempt_max_rate',
- 'nm_net_share'])
-
-LOG_FILE = '/var/log/pl_node_mgr.log'
-
-PID_FILE = '/var/run/pl_node_mgr.pid'
-
-SA_HOSTNAME = 'plc-a.demo.vmware'
-
-START_DELAY_SECS = 10
-
-TICKET_SERVER_PORT = 1813
import threading
import time
-from config import DB_FILE
import accounts
import bwcap
import logger
import tools
+DB_FILE = '/root/pl_node_mgr_db.pickle'
+
+
class Database(dict):
+ def __init__(self): self.account_index = {}
+
def deliver_records(self, recs):
ts = self.get_timestamp()
for rec in recs:
self.create_new_accounts()
self.update_bwcap()
- def get_timestamp(self):
- return self.get('timestamp', {'timestamp': 0})['timestamp']
-
-
def compute_effective_rspecs(self):
"""Apply loans to field 'rspec' to get field 'eff_rspec'."""
slivers = dict([(rec['name'], rec) for rec in self.itervalues() \
- if rec.get('account_type') == 'sliver'])
+ if rec.get('account_type') == 'sliver.VServer'])
# Pass 1: copy 'rspec' to 'eff_rspec', saving the old value
for sliver in slivers.itervalues():
sliver['needs_update'] = True
del sliver['old_eff_rspec']
+ def rebuild_account_index(self):
+ self.account_index.clear()
+ for rec in self.itervalues():
+ if 'account_type' in rec: self.account_index[rec['name']] = rec
- def delete_old_records(self):
- ts = self.get_timestamp()
- now = time.time()
- for key in self.keys():
- rec = self[key]
- if rec['timestamp'] < ts or rec.get('expiry', sys.maxint) < now:
- del self[key]
+ def delete_stale_records(self, ts):
+ for key, rec in self.items():
+ if rec['timestamp'] < ts: del self[key]
+
+ def delete_expired_records(self):
+ for key, rec in self.items():
+ if rec.get('expires', sys.maxint) < time.time(): del self[key]
- def delete_old_accounts(self):
- for acct_type, name in accounts.all():
- if ('%s_%s' % (acct_type, name)) not in self:
- accounts.get(name).ensure_destroyed()
+ def destroy_old_accounts(self):
+ for name in accounts.all():
+ if name not in self.account_index: accounts.get(name).ensure_destroyed()
def create_new_accounts(self):
"""Invoke the appropriate create() function for every dirty account."""
- for rec in self.itervalues():
- if 'account_type' not in rec: continue
- if rec['dirty'] and rec['plc_instantiated']:
- accounts.get(rec['name']).ensure_created(rec)
+ for rec in self.account_index.itervalues():
+ if rec['dirty'] and rec['plc_instantiated']: accounts.get(rec['name']).ensure_created(rec)
rec['dirty'] = False
def update_bwcap(self):
_dump_requested = True
_db_cond.notify()
-@synchronized
-def get_sliver(name): return _db.get('sliver_'+name)
-
def start():
"""The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
def run():
f.close()
except: logger.log_exc()
while True: # handle dump requests forever
- while not _dump_requested:
- _db_cond.wait()
+ while not _dump_requested: _db_cond.wait()
db_copy = tools.deepcopy(_db)
_dump_requested = False
_db_lock.release()
- try: tools.write_file(DB_FILE,
- lambda f: cPickle.dump(db_copy, f, -1))
+ try: tools.write_file(DB_FILE, lambda f: cPickle.dump(db_copy, f, -1))
except: logger.log_exc()
_db_lock.acquire()
tools.as_daemon_thread(run)
@staticmethod
def create(name):
add_shell(Delegate.SHELL)
- logger.log_call('/usr/sbin/useradd',
- '-p', '*', '-s', Delegate.SHELL, name)
+ logger.log_call('/usr/sbin/useradd', '-p', '*', '-s', Delegate.SHELL, name)
@staticmethod
def destroy(name): logger.log_call('/usr/sbin/userdel', '-r', name)
- def configure(self, rec): accounts.install_ssh_keys(rec)
+ def configure(self, rec): accounts.install_keys(rec)
def start(self): pass
def stop(self): pass
import time
import traceback
-from config import LOG_FILE
+
+LOG_FILE = '/var/log/pl_node_mgr.log'
def log(msg):
import delegate
import logger
import plc
-import sliver
+import sliver_vs
import tools
try:
if options.daemon: tools.daemon()
- accounts.register_class(sliver.Sliver)
+ accounts.register_class(sliver_vs.Sliver_VS)
accounts.register_class(delegate.Delegate)
other_pid = tools.pid_file()
+++ /dev/null
-"""Parse slices.xml. This file will become obsolete when the new API comes online."""
-
-import base64
-import sys
-sys.path.append('/usr/local/planetlab/bin')
-import SslFetch
-import time
-import xml.parsers.expat
-
-from config import *
-import database
-import logger
-
-
-_worker = SslFetch.Worker(SA_HOSTNAME, cacert_file='/usr/boot/cacert.pem')
-
-def fetch(filename):
- logger.log('fetching %s' % filename)
- (rc, data) = _worker.fetch(filename)
- if rc == 0:
- logger.log('fetch succeeded')
- return data
- else:
- # XXX - should get a better error message from SslFetch/libcurl
- curl_doc = 'http://curl.haxx.se/libcurl/c/libcurl-errors.html'
- raise 'fetch failed, rc=%d (see %s)' % (rc, curl_doc)
-
-
-delegate_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEAzNQIrVC9ZV9iDgu5/WXxcH/SyGdLG45CWXoWWh37UNA4dCVVlxtQ96xF7poolnxnM1irKUiXx85FsjA37z6m7IWl1h9uMYEJEvYkkxApsCmwm8C02m/BsOWK4Zjh4sv7QTeDgDnqhwnBw/U4jnkt8yKfVTBTNUY01dESzOgBfBc= root@yankee.cs.princeton.edu'
-
-def fetch_and_update():
- sx = slices_xml(fetch('/xml/slices-0.5.xml'))
- # sx = slices_xml(open('/root/slices-0.5.xml').read())
- recs = [{'record_key': 'timestamp', 'type': 'timestamp', 'timestamp': time.time()}]
- recs.append({'record_key': 'delegate_del_snoop', 'timestamp': time.time(), 'account_type': 'delegate', 'name': 'del_snoop', 'ssh_keys': delegate_key, 'plc_instantiated': True})
- recs.append({'record_key': 'bwcap', 'timestamp': time.time(), 'cap': 5000000000, 'exempt_ips': ['127.0.0.1']})
- for id, name in sx.id_name.iteritems():
- rec = {}
- rec['record_key'] = 'sliver_' + name
- rec['account_type'] = 'sliver'
- rec['name'] = name
- rec['expiry'] = sx.id_expiry[id]
- rec['timestamp'] = sx.id_ts.get(id) or time.time()
- rec['delegations'] = [('del_snoop', 'GetRSpec')]
- rec['id'] = id
- rec['rspec'] = sx.get_rspec(id)
- ssh_keys = []
- for uid in sx.id_uids[id]: ssh_keys.extend(sx.uid_keys[uid])
- rec['ssh_keys'] = '\n'.join(ssh_keys)
- rec['plc_instantiated'] = True
- rec['initscript'] = base64.b64encode('#!/bin/sh\n/bin/echo hello >/world.txt')
- recs.append(rec)
- database.deliver_records(recs)
-
-
-node_id = None
-
-def get_node_id():
- global node_id
- if node_id == None:
- filename = '/etc/planetlab/node_id'
- logger.log('reading node id from %s' % filename)
- id_file = open(filename)
- node_id = int(id_file.readline())
- id_file.close()
- return node_id
-
-
-class slices_xml:
- def __init__(self, data):
- self.node_id = get_node_id()
- self.id_name = {}
- self.id_expiry = {}
- self.id_uids = {}
- self.uid_keys = {}
- self.id_rspec = {}
- self.id_ts = {}
- parser = xml.parsers.expat.ParserCreate()
- parser.StartElementHandler = self._start_element
- parser.CharacterDataHandler = self._char_data
- isfinal = True
- parser.Parse(data, isfinal)
-
- def get_rspec(self, id):
- rspec = DEFAULT_RSPEC.copy()
- rspec.update(self.id_rspec[id])
- return rspec
-
- def _start_element(self, name, attrs):
- self.last_tag = name
- if name == u'slice':
- self.id = int(attrs[u'id'])
- self.name = str(attrs[u'name'])
- self.expiry = int(attrs[u'expiry'])
- elif name == u'timestamp':
- self.id_ts[self.id] = int(attrs[u'value'])
- elif name == u'node':
- # remember slices with slivers on us
- nid = int(attrs[u'id'])
- if nid == self.node_id:
- self.id_name[self.id] = self.name
- self.id_expiry[self.id] = self.expiry
- self.id_uids[self.id] = []
- self.id_rspec[self.id] = {}
- elif name == u'user':
- # remember users with slices with slivers on us
- if self.id in self.id_name:
- uid = int(attrs[u'person_id'])
- self.id_uids[self.id].append(uid)
- self.uid_keys[uid] = []
- elif name == u'resource':
- self.rname = str(attrs[u'name'])
- elif name == u'key':
- # remember keys of users with slices with slivers on us
- uid = int(attrs[u'person_id'])
- if uid in self.uid_keys:
- self.uid_keys[uid].append(str(attrs[u'value']))
-
- def _char_data(self, data):
- if self.last_tag == u'value' and self.id in self.id_name:
- try: self.id_rspec[self.id][self.rname] = int(data)
- except ValueError: pass
- self.last_tag = u''
+++ /dev/null
-import base64
-import errno
-import os
-import vserver
-
-from config import DEFAULT_RSPEC
-import accounts
-import logger
-import tools
-
-
-class Sliver(vserver.VServer):
- """This class wraps vserver.VServer to make its interface closer to what we need for the Node Manager."""
-
- SHELL = '/bin/vsh'
- TYPE = 'sliver'
-
- def __init__(self, name):
- vserver.VServer.__init__(self, name, vm_running=True)
- self.disk_limit_has_been_set = False
- self.rspec = DEFAULT_RSPEC.copy()
- self.ssh_keys = None
- self.initscript = ''
-
- @staticmethod
- def create(name): logger.log_call('/usr/sbin/vuseradd', name)
-
- @staticmethod
- def destroy(name): logger.log_call('/usr/sbin/vuserdel', name)
-
- def configure(self, rec):
- self.rspec.update(rec['eff_rspec'])
- self.set_resources()
- if rec['ssh_keys'] != self.ssh_keys:
- accounts.install_ssh_keys(rec)
- self.ssh_keys = rec['ssh_keys']
- if rec['initscript'] != self.initscript:
- logger.log('%s: installing initscript' % self.name)
- def install_initscript():
- flags = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
- fd = os.open('/etc/rc.vinit', flags, 0755)
- os.write(fd, base64.b64decode(rec['initscript']))
- os.close(fd)
- try: self.chroot_call(install_initscript)
- except OSError, e:
- if e.errno != errno.EEXIST: logger.log_exc()
- self.initscript = rec['initscript']
-
- def start(self):
- if self.rspec['nm_enabled']:
- logger.log('%s: starting' % self.name)
- child_pid = os.fork()
- if child_pid == 0:
- # VServer.start calls fork() internally, so we don't need all of fork_as()
- tools.close_nonstandard_fds()
- vserver.VServer.start(self, True)
- os._exit(0)
- else: os.waitpid(child_pid, 0)
- else: logger.log('%s: not starting, is not enabled' % self.name)
-
- def stop(self):
- logger.log('%s: stopping' % self.name)
- vserver.VServer.stop(self)
- # make sure we always make the syscalls when setting resource limits
- self.vm_running = True
-
- def set_resources(self):
- """Set the resource limits of sliver <self.name>."""
- # disk limits
- disk_max_KiB = self.rspec['nm_disk_quota']
- logger.log('%s: setting max disk usage to %d KiB' %
- (self.name, disk_max_KiB))
- try: # don't let slivers over quota escape other limits
- if not self.disk_limit_has_been_set:
- self.vm_running = False
- logger.log('%s: computing disk usage' % self.name)
- self.init_disk_info()
- # even if set_disklimit() triggers an exception,
- # the kernel probably knows the disk usage
- self.disk_limit_has_been_set = True
- vserver.VServer.set_disklimit(self, disk_max_KiB)
- self.vm_running = True
- except OSError: logger.log_exc()
-
- # bw limits
- bw_fields = ['nm_net_min_rate', 'nm_net_max_rate',
- 'nm_net_exempt_min_rate', 'nm_net_exempt_max_rate',
- 'nm_net_share']
- args = tuple(map(self.rspec.__getitem__, bw_fields))
- logger.log('%s: setting bw share to %d' % (self.name, args[-1]))
- logger.log('%s: setting bw limits to %s bps' % (self.name, args[:-1]))
- self.set_bwlimit(*args)
-
- # cpu limits / remote login
- cpu_guaranteed_shares = self.rspec['nm_cpu_guaranteed_share']
- cpu_shares = self.rspec['nm_cpu_share']
- if self.rspec['nm_enabled']:
- if cpu_guaranteed_shares > 0:
- logger.log('%s: setting cpu share to %d%% guaranteed' %
- (self.name, cpu_guaranteed_shares/10.0))
- self.set_sched_config(cpu_guaranteed_shares,
- vserver.SCHED_CPU_GUARANTEED)
- else:
- logger.log('%s: setting cpu share to %d' %
- (self.name, cpu_shares))
- self.set_sched_config(cpu_shares, 0)
- else:
- # tell vsh to disable remote login by setting CPULIMIT to 0
- logger.log('%s: disabling remote login' % self.name)
- self.set_sched_config(0, 0)
- self.stop()
--- /dev/null
+"""VServer slivers.
+
+There are a couple of tricky things going on here. First, the kernel
+needs disk usage information in order to enforce the quota. However,
+determining disk usage redundantly strains the disks. Thus, the
+Sliver_VS.disk_usage_initialized flag is used to determine whether
+this initialization has been made.
+
+Second, it's not currently possible to set the scheduler parameters
+for a sliver unless that sliver has a running process. /bin/vsh helps
+us out by reading the configuration file so that it can set the
+appropriate limits after entering the sliver context. Making the
+syscall that actually sets the parameters gives a harmless error if no
+process is running. Thus we keep vm_running on when setting scheduler
+parameters so that set_sched_params() always makes the syscall, and we
+don't have to guess if there is a running process or not.
+"""
+
+import errno
+import os
+import vserver
+
+from bwlimit import bwmin, bwmax
+import accounts
+import logger
+import tools
+
+
+DEFAULTS = {'disk_max': 5000000,
+ 'net_min': bwmin,
+ 'net_max': bwmax,
+ 'net2_min': bwmin,
+ 'net2_max': bwmax,
+ 'net_share': 1,
+ 'enabled': 1,
+ 'cpu_min': 0,
+ 'cpu_share': 32,
+ 'keys': '',
+ 'initscript': ''}
+
+class Sliver_VS(vserver.VServer):
+ """This class wraps vserver.VServer to make its interface closer to what we need for the Node Manager."""
+
+ SHELL = '/bin/vsh'
+ TYPE = 'sliver.VServer'
+
+ def __init__(self, name):
+ vserver.VServer.__init__(self, name)
+ self.current_keys = ''
+ self.current_initscript = ''
+ self.disk_usage_initialized = False
+ self.rec = DEFAULTS.copy()
+
+
+ @staticmethod
+ def create(name): logger.log_call('/usr/sbin/vuseradd', name)
+
+ @staticmethod
+ def destroy(name): logger.log_call('/usr/sbin/vuserdel', name)
+
+
+ def configure(self, rec):
+ self.rec = DEFAULTS.copy()
+ self.rec.update(rec)
+
+ self.set_resources()
+
+ new_keys = self.rec['keys']
+ if new_keys != self.current_keys:
+ accounts.install_keys(rec)
+ self.current_keys = new_keys
+
+ new_initscript = self.rec['initscript']
+ if new_initscript != self.current_initscript:
+ logger.log('%s: installing initscript' % self.name)
+ def install_initscript():
+ flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
+ fd = os.open('/etc/rc.vinit', flags, 0755)
+ os.write(fd, new_initscript)
+ os.close(fd)
+ try: self.chroot_call(install_initscript)
+ except: logger.log_exc()
+ self.current_initscript = new_initscript
+
+
+ def start(self):
+ if self.rec['enabled']:
+ logger.log('%s: starting' % self.name)
+ child_pid = os.fork()
+ if child_pid == 0:
+ # VServer.start calls fork() internally, so just close the nonstandard fds and fork once to avoid creating zombies
+ tools.close_nonstandard_fds()
+ vserver.VServer.start(self, True)
+ os._exit(0)
+ else: os.waitpid(child_pid, 0)
+ else: logger.log('%s: not starting, is not enabled' % self.name)
+
+ def stop(self):
+ logger.log('%s: stopping' % self.name)
+ vserver.VServer.stop(self)
+
+
+ def set_resources(self):
+ disk_max = int(self.rec['disk_max'])
+ logger.log('%s: setting max disk usage to %d KiB' % (self.name, disk_max))
+ try: # if the sliver is over quota, .set_disk_limit will throw an exception
+ if not self.disk_usage_initialized:
+ self.vm_running = False
+ logger.log('%s: computing disk usage' % self.name)
+ self.init_disk_info()
+ self.disk_usage_initialized = True
+ vserver.VServer.set_disklimit(self, disk_max_KiB)
+ except OSError: logger.log_exc()
+
+ net_limits = (int(self.rec['net_min']),
+ int(self.rec['net_max']),
+ int(self.rec['net2_min']),
+ int(self.rec['net2_max']),
+ int(self.rec['net_share']))
+ logger.log('%s: setting net limits to %s bps' % (self.name, net_limits[:-1]))
+ logger.log('%s: setting net share to %d' % (self.name, net_limits[-1]))
+ self.set_bwlimit(*net_limits)
+
+ cpu_min = int(self.rec['cpu_min'])
+ cpu_share = int(self.rec['cpu_share'])
+ if bool(self.rec['enabled']):
+ if cpu_min > 0:
+ logger.log('%s: setting cpu share to %d%% guaranteed' % (self.name, cpu_min/10.0))
+ self.set_sched_config(cpu_min, vserver.SCHED_CPU_GUARANTEED)
+ else:
+ logger.log('%s: setting cpu share to %d' % (self.name, cpu_share))
+ self.set_sched_config(cpu_share, 0)
+ else:
+ # tell vsh to disable remote login by setting CPULIMIT to 0
+ logger.log('%s: disabling remote login' % self.name)
+ self.set_sched_config(0, 0)
+ self.stop()
import tempfile
import threading
-from config import PID_FILE
import logger
+PID_FILE = '/var/run/pl_node_mgr.pid'
+
+
def as_daemon_thread(run):
"""Call function <run> with no arguments in its own thread."""
thr = threading.Thread(target=run)
def pid_file():
"""We use a pid file to ensure that only one copy of NM is running at a given time. If successful, this function will write a pid file containing the pid of the current process. The return value is the pid of the other running process, or None otherwise."""
other_pid = None
- # check for a pid file
- if os.access(PID_FILE, os.F_OK):
- # pid file exists, read it
- handle = open(PID_FILE)
+ if os.access(PID_FILE, os.F_OK): # check for a pid file
+ handle = open(PID_FILE) # pid file exists, read it
other_pid = int(handle.read())
handle.close()
# check for a process with that pid by sending signal 0