From d5f8b952ecb43f3c1308026645ef7cac73de8a05 Mon Sep 17 00:00:00 2001 From: "David E. Eisenstat" Date: Thu, 26 Oct 2006 19:32:20 +0000 Subject: [PATCH] More documentation. --- accounts.py | 38 ++++++++++++++++++++++++-------------- api.py | 2 ++ database.py | 34 +++++++++++++++++++--------------- delegate.py | 4 +++- logger.py | 6 ++---- nm.py | 4 ++-- sliver.py | 2 ++ tools.py | 15 ++++++++++----- 8 files changed, 64 insertions(+), 41 deletions(-) diff --git a/accounts.py b/accounts.py index b22a4ba..e476977 100644 --- a/accounts.py +++ b/accounts.py @@ -7,16 +7,30 @@ import logger import tools +# shell path -> account class association +shell_acct_class = {} +# account type -> account class association +type_acct_class = {} + +def register_class(acct_class): + """Call once for each account class. This method adds the class to the dictionaries used to ook up account classes by shell and type.""" + shell_acct_class[acct_class.SHELL] = acct_class + type_acct_class[acct_class.TYPE] = acct_class + + +# private account name -> worker object association and associated lock _name_worker_lock = threading.Lock() _name_worker = {} def all(): + """Returns a list of all NM accounts on the system. Format is (type, username).""" pw_ents = pwd.getpwall() for pw_ent in pw_ents: - if pw_ent[6] in acct_class_by_shell: - yield acct_class_by_shell[pw_ent[6]].TYPE, pw_ent[0] + if pw_ent[6] in shell_acct_class: + yield shell_acct_class[pw_ent[6]].TYPE, pw_ent[0] def get(name): + """Return the worker object for a particular username. If no such object exists, create it first.""" _name_worker_lock.acquire() try: if name not in _name_worker: _name_worker[name] = Worker(name) @@ -35,33 +49,29 @@ def install_ssh_keys(rec): tools.fork_as(rec['name'], do_installation) -TYPES = [] -acct_class_by_shell = {} -acct_class_by_type = {} - -def register_account_type(acct_class): - TYPES.append(acct_class.TYPE) - acct_class_by_shell[acct_class.SHELL] = acct_class - acct_class_by_type[acct_class.TYPE] = acct_class - - class Worker: # these semaphores are acquired before creating/destroying an account _create_sem = threading.Semaphore(1) _destroy_sem = threading.Semaphore(1) def __init__(self, name): + # username self.name = name + # the account object currently associated with this worker self._acct = None + # task list + # outsiders request operations by putting (fn, args...) tuples on _q + # the worker thread (created below) will perform these operations in order self._q = Queue.Queue() tools.as_daemon_thread(self._run) def ensure_created(self, rec): + """Caused the account specified by to exist if it doesn't already.""" self._q.put((self._ensure_created, tools.deepcopy(rec))) def _ensure_created(self, rec): curr_class = self._get_class() - next_class = acct_class_by_type[rec['account_type']] + next_class = type_acct_class[rec['account_type']] if next_class != curr_class: self._destroy(curr_class) self._create_sem.acquire() @@ -94,7 +104,7 @@ class Worker: def _get_class(self): try: shell = pwd.getpwnam(self.name)[6] except KeyError: return None - return acct_class_by_shell[shell] + return shell_acct_class[shell] def _make_acct_obj(self): curr_class = self._get_class() diff --git a/api.py b/api.py index 73f38e7..8762f70 100644 --- a/api.py +++ b/api.py @@ -112,6 +112,7 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): if len(args) != nargs_dict[method_name]: raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), expected_nargs)) else: + # Figure out who's calling. # XXX - these ought to be imported directly from some .h file SO_PEERCRED = 17 sizeof_struct_ucred = 12 @@ -143,6 +144,7 @@ class APIServer_INET(SocketServer.ThreadingMixIn, class APIServer_UNIX(APIServer_INET): address_family = socket.AF_UNIX def start(): + """Start two XMLRPC interfaces: one bound to localhost, the other bound to a Unix domain socket.""" serv1 = APIServer_INET(('127.0.0.1', API_SERVER_PORT), requestHandler=APIRequestHandler, logRequests=0) tools.as_daemon_thread(serv1.serve_forever) diff --git a/database.py b/database.py index bc1155e..e1d56e7 100644 --- a/database.py +++ b/database.py @@ -10,21 +10,6 @@ import logger import tools -_db_lock = threading.RLock() -_db_cond = threading.Condition(_db_lock) -_dump_requested = False - - -def synchronized(function): - def sync_fun(*args, **kw_args): - _db_lock.acquire() - try: return function(*args, **kw_args) - finally: _db_lock.release() - sync_fun.__doc__ = function.__doc__ - sync_fun.__name__ = function.__name__ - return sync_fun - - class Database(dict): def deliver_records(self, recs): ts = self.get_timestamp() @@ -99,8 +84,26 @@ class Database(dict): bwcap_rec['dirty'] = False +# database object and associated lock +_db_lock = threading.RLock() _db = Database() +# these are used in tandem to request a database dump from the dumper daemon +_db_cond = threading.Condition(_db_lock) +_dump_requested = False + + +# decorator that acquires and releases the database lock before and after the decorated operation +def synchronized(function): + def sync_fun(*args, **kw_args): + _db_lock.acquire() + try: return function(*args, **kw_args) + finally: _db_lock.release() + sync_fun.__doc__ = function.__doc__ + sync_fun.__name__ = function.__name__ + return sync_fun + +# apply the given records to the database and request a dump @synchronized def deliver_records(recs): global _dump_requested @@ -112,6 +115,7 @@ def deliver_records(recs): def get_sliver(name): return _db.get('sliver_'+name) def start(): + """The database dumper daemon. When it starts up, it populates the database with the last dumped database. It proceeds to handle dump requests forever.""" def run(): global _dump_requested _db_lock.acquire() diff --git a/delegate.py b/delegate.py index 6dd85e8..c3c4c1b 100644 --- a/delegate.py +++ b/delegate.py @@ -1,10 +1,12 @@ +"""Delegate accounts are used to provide secure access to the XMLRPC API. They are normal Unix accounts with a shell that tunnels XMLRPC requests to the API server.""" + import accounts import logger import tools class Delegate: - SHELL = '/bin/forward_api_calls' + SHELL = '/bin/forward_api_calls' # tunneling shell TYPE = 'delegate' def __init__(self, name): self.name = name diff --git a/logger.py b/logger.py index 3411df5..6999f31 100644 --- a/logger.py +++ b/logger.py @@ -1,4 +1,5 @@ -import fcntl +"""A very simple logger that tries to be concurrency-safe.""" + import os import subprocess import time @@ -9,10 +10,7 @@ from config import LOG_FILE def log(msg): """Write to the log file.""" - # the next three lines ought to be an atomic operation but aren't fd = os.open(LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0600) - flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) if not msg.endswith('\n'): msg += '\n' os.write(fd, '%s: %s' % (time.asctime(time.gmtime()), msg)) os.close(fd) diff --git a/nm.py b/nm.py index fa527da..14a13c5 100644 --- a/nm.py +++ b/nm.py @@ -27,8 +27,8 @@ def run(): try: if options.daemon: tools.daemon() - accounts.register_account_type(sliver.Sliver) - accounts.register_account_type(delegate.Delegate) + accounts.register_class(sliver.Sliver) + accounts.register_class(delegate.Delegate) other_pid = tools.pid_file() if other_pid != None: diff --git a/sliver.py b/sliver.py index 0a299da..7546bc1 100644 --- a/sliver.py +++ b/sliver.py @@ -51,6 +51,8 @@ class Sliver(vserver.VServer): logger.log('%s: starting' % self.name) child_pid = os.fork() if child_pid == 0: + # VServer.start calls fork() internally, so we don't need all of fork_as() + tools.close_nonstandard_fds() vserver.VServer.start(self, True) os._exit(0) else: os.waitpid(child_pid, 0) diff --git a/tools.py b/tools.py index bc391a9..ce029dd 100644 --- a/tools.py +++ b/tools.py @@ -10,11 +10,20 @@ import logger def as_daemon_thread(run): + """Call function with no arguments in its own thread.""" thr = threading.Thread(target=run) thr.setDaemon(True) thr.start() +def close_nonstandard_fds(): + """Close all open file descriptors other than 0, 1, and 2.""" + _SC_OPEN_MAX = 4 + for fd in range(3, os.sysconf(_SC_OPEN_MAX)): + try: os.close(fd) + except OSError: pass # most likely an fd that isn't open + + # after http://www.erlenstar.demon.co.uk/unix/faq_2.html def daemon(): """Daemonize the current process.""" @@ -38,11 +47,7 @@ def fork_as(su, function, *args): if child_pid == 0: try: os.chdir('/') - # close all nonstandard file descriptors - _SC_OPEN_MAX = 4 - for fd in range(3, os.sysconf(_SC_OPEN_MAX)): - try: os.close(fd) - except OSError: pass # most likely an fd that isn't open + close_nonstandard_fds() pw_ent = pwd.getpwnam(su) os.setegid(pw_ent[3]) os.seteuid(pw_ent[2]) -- 2.43.0