import tools
+# shell path -> account class association
+shell_acct_class = {}
+# account type -> account class association
+type_acct_class = {}
+
+def register_class(acct_class):
+ """Call once for each account class. This method adds the class to the dictionaries used to ook up account classes by shell and type."""
+ shell_acct_class[acct_class.SHELL] = acct_class
+ type_acct_class[acct_class.TYPE] = acct_class
+
+
+# private account name -> worker object association and associated lock
_name_worker_lock = threading.Lock()
_name_worker = {}
def all():
+ """Returns a list of all NM accounts on the system. Format is (type, username)."""
pw_ents = pwd.getpwall()
for pw_ent in pw_ents:
- if pw_ent[6] in acct_class_by_shell:
- yield acct_class_by_shell[pw_ent[6]].TYPE, pw_ent[0]
+ if pw_ent[6] in shell_acct_class:
+ yield shell_acct_class[pw_ent[6]].TYPE, pw_ent[0]
def get(name):
+ """Return the worker object for a particular username. If no such object exists, create it first."""
_name_worker_lock.acquire()
try:
if name not in _name_worker: _name_worker[name] = Worker(name)
tools.fork_as(rec['name'], do_installation)
-TYPES = []
-acct_class_by_shell = {}
-acct_class_by_type = {}
-
-def register_account_type(acct_class):
- TYPES.append(acct_class.TYPE)
- acct_class_by_shell[acct_class.SHELL] = acct_class
- acct_class_by_type[acct_class.TYPE] = acct_class
-
-
class Worker:
# these semaphores are acquired before creating/destroying an account
_create_sem = threading.Semaphore(1)
_destroy_sem = threading.Semaphore(1)
def __init__(self, name):
+ # username
self.name = name
+ # the account object currently associated with this worker
self._acct = None
+ # task list
+ # outsiders request operations by putting (fn, args...) tuples on _q
+ # the worker thread (created below) will perform these operations in order
self._q = Queue.Queue()
tools.as_daemon_thread(self._run)
def ensure_created(self, rec):
+ """Caused the account specified by <rec> to exist if it doesn't already."""
self._q.put((self._ensure_created, tools.deepcopy(rec)))
def _ensure_created(self, rec):
curr_class = self._get_class()
- next_class = acct_class_by_type[rec['account_type']]
+ next_class = type_acct_class[rec['account_type']]
if next_class != curr_class:
self._destroy(curr_class)
self._create_sem.acquire()
def _get_class(self):
try: shell = pwd.getpwnam(self.name)[6]
except KeyError: return None
- return acct_class_by_shell[shell]
+ return shell_acct_class[shell]
def _make_acct_obj(self):
curr_class = self._get_class()
if len(args) != nargs_dict[method_name]:
raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), expected_nargs))
else:
+ # Figure out who's calling.
# XXX - these ought to be imported directly from some .h file
SO_PEERCRED = 17
sizeof_struct_ucred = 12
class APIServer_UNIX(APIServer_INET): address_family = socket.AF_UNIX
def start():
+ """Start two XMLRPC interfaces: one bound to localhost, the other bound to a Unix domain socket."""
serv1 = APIServer_INET(('127.0.0.1', API_SERVER_PORT),
requestHandler=APIRequestHandler, logRequests=0)
tools.as_daemon_thread(serv1.serve_forever)
import tools
-_db_lock = threading.RLock()
-_db_cond = threading.Condition(_db_lock)
-_dump_requested = False
-
-
-def synchronized(function):
- def sync_fun(*args, **kw_args):
- _db_lock.acquire()
- try: return function(*args, **kw_args)
- finally: _db_lock.release()
- sync_fun.__doc__ = function.__doc__
- sync_fun.__name__ = function.__name__
- return sync_fun
-
-
class Database(dict):
def deliver_records(self, recs):
ts = self.get_timestamp()
bwcap_rec['dirty'] = False
+# database object and associated lock
+_db_lock = threading.RLock()
_db = Database()
+# these are used in tandem to request a database dump from the dumper daemon
+_db_cond = threading.Condition(_db_lock)
+_dump_requested = False
+
+
+# decorator that acquires and releases the database lock before and after the decorated operation
+def synchronized(function):
+ def sync_fun(*args, **kw_args):
+ _db_lock.acquire()
+ try: return function(*args, **kw_args)
+ finally: _db_lock.release()
+ sync_fun.__doc__ = function.__doc__
+ sync_fun.__name__ = function.__name__
+ return sync_fun
+
+# apply the given records to the database and request a dump
@synchronized
def deliver_records(recs):
global _dump_requested
def get_sliver(name): return _db.get('sliver_'+name)
def start():
+ """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever."""
def run():
global _dump_requested
_db_lock.acquire()
+"""Delegate accounts are used to provide secure access to the XMLRPC API. They are normal Unix accounts with a shell that tunnels XMLRPC requests to the API server."""
+
import accounts
import logger
import tools
class Delegate:
- SHELL = '/bin/forward_api_calls'
+ SHELL = '/bin/forward_api_calls' # tunneling shell
TYPE = 'delegate'
def __init__(self, name): self.name = name
-import fcntl
+"""A very simple logger that tries to be concurrency-safe."""
+
import os
import subprocess
import time
def log(msg):
"""Write <msg> to the log file."""
- # the next three lines ought to be an atomic operation but aren't
fd = os.open(LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0600)
- flags = fcntl.fcntl(fd, fcntl.F_GETFD)
- fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
if not msg.endswith('\n'): msg += '\n'
os.write(fd, '%s: %s' % (time.asctime(time.gmtime()), msg))
os.close(fd)
try:
if options.daemon: tools.daemon()
- accounts.register_account_type(sliver.Sliver)
- accounts.register_account_type(delegate.Delegate)
+ accounts.register_class(sliver.Sliver)
+ accounts.register_class(delegate.Delegate)
other_pid = tools.pid_file()
if other_pid != None:
logger.log('%s: starting' % self.name)
child_pid = os.fork()
if child_pid == 0:
+ # VServer.start calls fork() internally, so we don't need all of fork_as()
+ tools.close_nonstandard_fds()
vserver.VServer.start(self, True)
os._exit(0)
else: os.waitpid(child_pid, 0)
def as_daemon_thread(run):
+ """Call function <run> with no arguments in its own thread."""
thr = threading.Thread(target=run)
thr.setDaemon(True)
thr.start()
+def close_nonstandard_fds():
+ """Close all open file descriptors other than 0, 1, and 2."""
+ _SC_OPEN_MAX = 4
+ for fd in range(3, os.sysconf(_SC_OPEN_MAX)):
+ try: os.close(fd)
+ except OSError: pass # most likely an fd that isn't open
+
+
# after http://www.erlenstar.demon.co.uk/unix/faq_2.html
def daemon():
"""Daemonize the current process."""
if child_pid == 0:
try:
os.chdir('/')
- # close all nonstandard file descriptors
- _SC_OPEN_MAX = 4
- for fd in range(3, os.sysconf(_SC_OPEN_MAX)):
- try: os.close(fd)
- except OSError: pass # most likely an fd that isn't open
+ close_nonstandard_fds()
pw_ent = pwd.getpwnam(su)
os.setegid(pw_ent[3])
os.seteuid(pw_ent[2])