"""Functionality common to all account classes.
-Each subclass of Account must provide five methods: create() and
-destroy(), which are static; configure(), start(), and stop(), which
-are not. configure(), which takes a record as its only argument, does
+Each subclass of Account must provide 6 methods: create() and
+destroy(), which are static; configure(), is_running(), start(), and stop(),
+which are not. configure() takes a record as its only argument, and does
things like set up ssh keys. In addition, an Account subclass must
provide static member variables SHELL, which contains the unique shell
that it uses; and TYPE, a string that is used by the account creation
periods; at the moment the only convention is that all sliver accounts
have type that begins with sliver.
-There are any number of race conditions that may result from the fact
-that account names are not unique over time. Moreover, it's a bad
-idea to perform lengthy operations while holding the database lock.
-In order to deal with both of these problems, we use a worker thread
-for each account name that ever exists. On 32-bit systems with large
-numbers of accounts, this may cause the NM process to run out of
-*virtual* memory! This problem may be remedied by decreasing the
-maximum stack size.
"""
import Queue
import os
import pwd
+from grp import getgrnam
import threading
import logger
# When this variable is true, start after any ensure_created
Startingup = False
-# Cumulative delay for starts when Startingup is true
-csd_lock = threading.Lock()
-cumstartdelay = 0
-
# shell path -> account class association
shell_acct_class = {}
# account type -> account class association
type_acct_class = {}
+# these semaphores are acquired before creating/destroying an account
+create_sem = threading.Semaphore(1)
+destroy_sem = threading.Semaphore(1)
+
def register_class(acct_class):
"""Call once for each account class. This method adds the class to the dictionaries used to look up account classes by shell and type."""
shell_acct_class[acct_class.SHELL] = acct_class
def configure(self, rec):
"""Write <rec['keys']> to my authorized_keys file."""
- logger.verbose('in accounts:configure for %s'%self.name)
+ logger.verbose('%s: in accounts:configure'%self.name)
new_keys = rec['keys']
if new_keys != self.keys:
self.keys = new_keys
dot_ssh = '/home/%s/.ssh' % self.name
- def do_installation():
- if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
- os.chmod(dot_ssh, 0700)
- tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
- logger.verbose('%s: installing ssh keys' % self.name)
- tools.fork_as(self.name, do_installation)
+ if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
+ os.chmod(dot_ssh, 0700)
+ tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
+ logger.log('%s: installing ssh keys' % self.name)
+ user = pwd.getpwnam(self.name)[2]
+ group = getgrnam("slices")[2]
+ os.chown(dot_ssh, user, group)
+ os.chown(dot_ssh + '/authorized_keys', user, group)
def start(self, delay=0): pass
def stop(self): pass
def is_running(self): pass
class Worker:
- # these semaphores are acquired before creating/destroying an account
- _create_sem = threading.Semaphore(1)
- _destroy_sem = threading.Semaphore(1)
-
def __init__(self, name):
self.name = name # username
self._acct = None # the account object currently associated with this worker
- # task list
- # outsiders request operations by putting (fn, args...) tuples on _q
- # the worker thread (created below) will perform these operations in order
- self._q = Queue.Queue()
- tools.as_daemon_thread(self._run)
-
- def ensure_created(self, rec):
- """Cause the account specified by <rec> to exist if it doesn't already."""
- if rec.has_key('name'):
- logger.verbose('Worker.ensure_created with name=%s'%rec['name'])
- self._q.put((self._ensure_created, rec.copy(), Startingup))
- logger.verbose('Worker queue has %d item(s)'%self._q.qsize())
-
- def _ensure_created(self, rec, startingup):
+
+ def ensure_created(self, rec, startingup = Startingup):
+ """Check account type is still valid. If not, recreate sliver. If still valid,
+ check if running and configure/start if not."""
curr_class = self._get_class()
next_class = type_acct_class[rec['type']]
if next_class != curr_class:
self._destroy(curr_class)
- self._create_sem.acquire()
+ create_sem.acquire()
try: next_class.create(self.name, rec['vref'])
- finally: self._create_sem.release()
+ finally: create_sem.release()
if not isinstance(self._acct, next_class): self._acct = next_class(rec)
+ if startingup or \
+ not self.is_running() or \
+ next_class != curr_class or \
+ self._acct.initscriptchanged:
+ self.start(rec)
else: self._acct.configure(rec)
- if startingup or not self.is_running():
- csd_lock.acquire()
- global cumstartdelay
- delay = cumstartdelay
- cumstartdelay += 2
- csd_lock.release()
- self._acct.start(delay=delay)
- elif next_class != curr_class or self._acct.initscriptchanged:
- self._acct.start()
- def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
- def _ensure_destroyed(self): self._destroy(self._get_class())
+ def ensure_destroyed(self): self._destroy(self._get_class())
- def start(self, delay=0): self._q.put((self._start, delay))
- def _start(self, d): self._acct.start(delay=d)
+ def start(self, rec, d = 0):
+ self._acct.configure(rec)
+ self._acct.start(delay=d)
- def stop(self): self._q.put((self._stop,))
- def _stop(self): self._acct.stop()
+ def stop(self): self._acct.stop()
def is_running(self):
- status = self._acct.is_running()
- if not status: logger.verbose("Worker(%s): is not running" % self.name)
+ if (self._acct != None) and self._acct.is_running():
+ status = True
+ else:
+ status = False
+ logger.verbose("Worker(%s): is not running" % self.name)
return status
def _destroy(self, curr_class):
self._acct = None
if curr_class:
- self._destroy_sem.acquire()
+ destroy_sem.acquire()
try: curr_class.destroy(self.name)
- finally: self._destroy_sem.release()
+ finally: destroy_sem.release()
def _get_class(self):
try: shell = pwd.getpwnam(self.name)[6]
except KeyError: return None
return shell_acct_class[shell]
-
- def _run(self):
- """Repeatedly pull commands off the queue and execute. If memory usage becomes an issue, it might be wise to terminate after a while."""
- while True:
- try:
- logger.verbose('Worker:_run : getting - size is %d'%self._q.qsize())
- cmd = self._q.get()
- cmd[0](*cmd[1:])
- except:
- logger.log_exc(self.name)