More documentation.
authorDavid E. Eisenstat <deisenst@cs.princeton.edu>
Thu, 26 Oct 2006 19:32:20 +0000 (19:32 +0000)
committerDavid E. Eisenstat <deisenst@cs.princeton.edu>
Thu, 26 Oct 2006 19:32:20 +0000 (19:32 +0000)
accounts.py
api.py
database.py
delegate.py
logger.py
nm.py
sliver.py
tools.py

index b22a4ba..e476977 100644 (file)
@@ -7,16 +7,30 @@ import logger
 import tools
 
 
+# shell path -> account class association
+shell_acct_class = {}
+# account type -> account class association
+type_acct_class = {}
+
+def register_class(acct_class):
+    """Call once for each account class.  This method adds the class to the dictionaries used to ook up account classes by shell and type."""
+    shell_acct_class[acct_class.SHELL] = acct_class
+    type_acct_class[acct_class.TYPE] = acct_class
+
+
+# private account name -> worker object association and associated lock
 _name_worker_lock = threading.Lock()
 _name_worker = {}
 
 def all():
+    """Returns a list of all NM accounts on the system.  Format is (type, username)."""
     pw_ents = pwd.getpwall()
     for pw_ent in pw_ents:
-        if pw_ent[6] in acct_class_by_shell:
-            yield acct_class_by_shell[pw_ent[6]].TYPE, pw_ent[0]
+        if pw_ent[6] in shell_acct_class:
+            yield shell_acct_class[pw_ent[6]].TYPE, pw_ent[0]
 
 def get(name):
+    """Return the worker object for a particular username.  If no such object exists, create it first."""
     _name_worker_lock.acquire()
     try:
         if name not in _name_worker: _name_worker[name] = Worker(name)
@@ -35,33 +49,29 @@ def install_ssh_keys(rec):
     tools.fork_as(rec['name'], do_installation)
 
 
-TYPES = []
-acct_class_by_shell = {}
-acct_class_by_type = {}
-
-def register_account_type(acct_class):
-    TYPES.append(acct_class.TYPE)
-    acct_class_by_shell[acct_class.SHELL] = acct_class
-    acct_class_by_type[acct_class.TYPE] = acct_class
-
-
 class Worker:
     # these semaphores are acquired before creating/destroying an account
     _create_sem = threading.Semaphore(1)
     _destroy_sem = threading.Semaphore(1)
 
     def __init__(self, name):
+        # username
         self.name = name
+        # the account object currently associated with this worker
         self._acct = None
+        # task list
+        # outsiders request operations by putting (fn, args...) tuples on _q
+        # the worker thread (created below) will perform these operations in order
         self._q = Queue.Queue()
         tools.as_daemon_thread(self._run)
 
     def ensure_created(self, rec):
+        """Caused the account specified by <rec> to exist if it doesn't already."""
         self._q.put((self._ensure_created, tools.deepcopy(rec)))
 
     def _ensure_created(self, rec):
         curr_class = self._get_class()
-        next_class = acct_class_by_type[rec['account_type']]
+        next_class = type_acct_class[rec['account_type']]
         if next_class != curr_class:
             self._destroy(curr_class)
             self._create_sem.acquire()
@@ -94,7 +104,7 @@ class Worker:
     def _get_class(self):
         try: shell = pwd.getpwnam(self.name)[6]
         except KeyError: return None
-        return acct_class_by_shell[shell]
+        return shell_acct_class[shell]
 
     def _make_acct_obj(self):
         curr_class = self._get_class()
diff --git a/api.py b/api.py
index 73f38e7..8762f70 100644 (file)
--- a/api.py
+++ b/api.py
@@ -112,6 +112,7 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
         if len(args) != nargs_dict[method_name]:
             raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), expected_nargs))
         else:
+            # Figure out who's calling.
             # XXX - these ought to be imported directly from some .h file
             SO_PEERCRED = 17
             sizeof_struct_ucred = 12
@@ -143,6 +144,7 @@ class APIServer_INET(SocketServer.ThreadingMixIn,
 class APIServer_UNIX(APIServer_INET): address_family = socket.AF_UNIX
 
 def start():
+    """Start two XMLRPC interfaces: one bound to localhost, the other bound to a Unix domain socket."""
     serv1 = APIServer_INET(('127.0.0.1', API_SERVER_PORT),
                            requestHandler=APIRequestHandler, logRequests=0)
     tools.as_daemon_thread(serv1.serve_forever)
index bc1155e..e1d56e7 100644 (file)
@@ -10,21 +10,6 @@ import logger
 import tools
 
 
-_db_lock = threading.RLock()
-_db_cond = threading.Condition(_db_lock)
-_dump_requested = False
-
-
-def synchronized(function):
-    def sync_fun(*args, **kw_args):
-        _db_lock.acquire()
-        try: return function(*args, **kw_args)
-        finally: _db_lock.release()
-    sync_fun.__doc__ = function.__doc__
-    sync_fun.__name__ = function.__name__
-    return sync_fun
-
-
 class Database(dict):
     def deliver_records(self, recs):
         ts = self.get_timestamp()
@@ -99,8 +84,26 @@ class Database(dict):
             bwcap_rec['dirty'] = False
 
 
+# database object and associated lock
+_db_lock = threading.RLock()
 _db = Database()
+# these are used in tandem to request a database dump from the dumper daemon
+_db_cond = threading.Condition(_db_lock)
+_dump_requested = False
+
+
+# decorator that acquires and releases the database lock before and after the decorated operation
+def synchronized(function):
+    def sync_fun(*args, **kw_args):
+        _db_lock.acquire()
+        try: return function(*args, **kw_args)
+        finally: _db_lock.release()
+    sync_fun.__doc__ = function.__doc__
+    sync_fun.__name__ = function.__name__
+    return sync_fun
+
 
+# apply the given records to the database and request a dump
 @synchronized
 def deliver_records(recs):
     global _dump_requested
@@ -112,6 +115,7 @@ def deliver_records(recs):
 def get_sliver(name): return _db.get('sliver_'+name)
 
 def start():
+    """The database dumper daemon.  When it starts up, it populates the database with the last dumped database.  It proceeds to handle dump requests forever."""
     def run():
         global _dump_requested
         _db_lock.acquire()
index 6dd85e8..c3c4c1b 100644 (file)
@@ -1,10 +1,12 @@
+"""Delegate accounts are used to provide secure access to the XMLRPC API.  They are normal Unix accounts with a shell that tunnels XMLRPC requests to the API server."""
+
 import accounts
 import logger
 import tools
 
 
 class Delegate:
-    SHELL = '/bin/forward_api_calls'
+    SHELL = '/bin/forward_api_calls'  # tunneling shell
     TYPE = 'delegate'
 
     def __init__(self, name): self.name = name
index 3411df5..6999f31 100644 (file)
--- a/logger.py
+++ b/logger.py
@@ -1,4 +1,5 @@
-import fcntl
+"""A very simple logger that tries to be concurrency-safe."""
+
 import os
 import subprocess
 import time
@@ -9,10 +10,7 @@ from config import LOG_FILE
 
 def log(msg):
     """Write <msg> to the log file."""
-    # the next three lines ought to be an atomic operation but aren't
     fd = os.open(LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0600)
-    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
-    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
     if not msg.endswith('\n'): msg += '\n'
     os.write(fd, '%s: %s' % (time.asctime(time.gmtime()), msg))
     os.close(fd)
diff --git a/nm.py b/nm.py
index fa527da..14a13c5 100644 (file)
--- a/nm.py
+++ b/nm.py
@@ -27,8 +27,8 @@ def run():
     try:
         if options.daemon: tools.daemon()
 
-        accounts.register_account_type(sliver.Sliver)
-        accounts.register_account_type(delegate.Delegate)
+        accounts.register_class(sliver.Sliver)
+        accounts.register_class(delegate.Delegate)
 
         other_pid = tools.pid_file()
         if other_pid != None:
index 0a299da..7546bc1 100644 (file)
--- a/sliver.py
+++ b/sliver.py
@@ -51,6 +51,8 @@ class Sliver(vserver.VServer):
             logger.log('%s: starting' % self.name)
             child_pid = os.fork()
             if child_pid == 0:
+                # VServer.start calls fork() internally, so we don't need all of fork_as()
+                tools.close_nonstandard_fds()
                 vserver.VServer.start(self, True)
                 os._exit(0)
             else: os.waitpid(child_pid, 0)
index bc391a9..ce029dd 100644 (file)
--- a/tools.py
+++ b/tools.py
@@ -10,11 +10,20 @@ import logger
 
 
 def as_daemon_thread(run):
+    """Call function <run> with no arguments in its own thread."""
     thr = threading.Thread(target=run)
     thr.setDaemon(True)
     thr.start()
 
 
+def close_nonstandard_fds():
+    """Close all open file descriptors other than 0, 1, and 2."""
+    _SC_OPEN_MAX = 4
+    for fd in range(3, os.sysconf(_SC_OPEN_MAX)):
+        try: os.close(fd)
+        except OSError: pass  # most likely an fd that isn't open
+
+
 # after http://www.erlenstar.demon.co.uk/unix/faq_2.html
 def daemon():
     """Daemonize the current process."""
@@ -38,11 +47,7 @@ def fork_as(su, function, *args):
     if child_pid == 0:
         try:
             os.chdir('/')
-            # close all nonstandard file descriptors
-            _SC_OPEN_MAX = 4
-            for fd in range(3, os.sysconf(_SC_OPEN_MAX)):
-                try: os.close(fd)
-                except OSError: pass  # most likely an fd that isn't open
+            close_nonstandard_fds()
             pw_ent = pwd.getpwnam(su)
             os.setegid(pw_ent[3])
             os.seteuid(pw_ent[2])