Change ownership of authorized_keys after writing
[nodemanager.git] / accounts.py
1 """Functionality common to all account classes.
2
3 Each subclass of Account must provide five methods: create() and
4 destroy(), which are static; configure(), start(), and stop(), which
5 are not.  configure(), which takes a record as its only argument, does
6 things like set up ssh keys.  In addition, an Account subclass must
7 provide static member variables SHELL, which contains the unique shell
8 that it uses; and TYPE, a string that is used by the account creation
9 code.  For no particular reason, TYPE is divided hierarchically by
10 periods; at the moment the only convention is that all sliver accounts
11 have type that begins with sliver.
12
13 There are any number of race conditions that may result from the fact
14 that account names are not unique over time.  Moreover, it's a bad
15 idea to perform lengthy operations while holding the database lock.
16 In order to deal with both of these problems, we use a worker thread
17 for each account name that ever exists.  On 32-bit systems with large
18 numbers of accounts, this may cause the NM process to run out of
19 *virtual* memory!  This problem may be remedied by decreasing the
20 maximum stack size.
21 """
22
23 import Queue
24 import os
25 import pwd
26 import threading
27
28 import logger
29 import tools
30
31
32 # When this variable is true, start after any ensure_created
33 Startingup = False
34 # Cumulative delay for starts when Startingup is true
35 csd_lock = threading.Lock()
36 cumstartdelay = 0
37
38 # shell path -> account class association
39 shell_acct_class = {}
40 # account type -> account class association
41 type_acct_class = {}
42
43 def register_class(acct_class):
44     """Call once for each account class.  This method adds the class to the dictionaries used to look up account classes by shell and type."""
45     shell_acct_class[acct_class.SHELL] = acct_class
46     type_acct_class[acct_class.TYPE] = acct_class
47
48
49 # private account name -> worker object association and associated lock
50 name_worker_lock = threading.Lock()
51 name_worker = {}
52
53 def allpwents():
54     return [pw_ent for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
55
56 def all():
57     """Return the names of all accounts on the system with recognized shells."""
58     return [pw_ent[0] for pw_ent in allpwents()]
59
60 def get(name):
61     """Return the worker object for a particular username.  If no such object exists, create it first."""
62     name_worker_lock.acquire()
63     try:
64         if name not in name_worker: name_worker[name] = Worker(name)
65         return name_worker[name]
66     finally: name_worker_lock.release()
67
68
69 class Account:
70     def __init__(self, rec):
71         logger.verbose('Initing account %s'%rec['name'])
72         self.name = rec['name']
73         self.keys = ''
74         self.initscriptchanged = False
75         self.configure(rec)
76
77     @staticmethod
78     def create(name, vref = None): abstract
79     @staticmethod
80     def destroy(name): abstract
81
82     def configure(self, rec):
83         """Write <rec['keys']> to my authorized_keys file."""
84         logger.verbose('in accounts:configure for %s'%self.name)
85         new_keys = rec['keys']
86         if new_keys != self.keys:
87             self.keys = new_keys
88             dot_ssh = '/home/%s/.ssh' % self.name
89             if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
90             os.chmod(dot_ssh, 0700)
91             tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
92             logger.verbose('%s: installing ssh keys' % self.name)
93             os.chown(dot_ssh + '/authorized_keys', pwd.getpwnam(self.name)[2], 504)
94
95     def start(self, delay=0): pass
96     def stop(self): pass
97     def is_running(self): pass
98
99 class Worker:
100     # these semaphores are acquired before creating/destroying an account
101     _create_sem = threading.Semaphore(1)
102     _destroy_sem = threading.Semaphore(1)
103
104     def __init__(self, name):
105         self.name = name  # username
106         self._acct = None  # the account object currently associated with this worker
107         # task list
108         # outsiders request operations by putting (fn, args...) tuples on _q
109         # the worker thread (created below) will perform these operations in order
110         self._q = Queue.Queue()
111         tools.as_daemon_thread(self._run)
112
113     def ensure_created(self, rec):
114         """Cause the account specified by <rec> to exist if it doesn't already."""
115         if rec.has_key('name'):
116             logger.verbose('Worker.ensure_created with name=%s'%rec['name'])
117         self._q.put((self._ensure_created, rec.copy(), Startingup))
118         logger.verbose('Worker queue has %d item(s)'%self._q.qsize())
119
120     def _ensure_created(self, rec, startingup):
121         curr_class = self._get_class()
122         next_class = type_acct_class[rec['type']]
123         if next_class != curr_class:
124             self._destroy(curr_class)
125             self._create_sem.acquire()
126             try: next_class.create(self.name, rec['vref'])
127             finally: self._create_sem.release()
128         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
129         else: self._acct.configure(rec)
130         if startingup or not self.is_running():
131             csd_lock.acquire()
132             global cumstartdelay
133             delay = cumstartdelay
134             cumstartdelay += 2
135             csd_lock.release()
136             self._acct.start(delay=delay)
137         elif next_class != curr_class or self._acct.initscriptchanged:
138             self._acct.start()
139
140     def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
141     def _ensure_destroyed(self): self._destroy(self._get_class())
142
143     def start(self, delay=0): self._q.put((self._start, delay))
144     def _start(self, d): self._acct.start(delay=d)
145
146     def stop(self): self._q.put((self._stop,))
147     def _stop(self): self._acct.stop()
148
149     def is_running(self): 
150         if self._acct.is_running():
151             status = True
152         else:
153             status = False
154             logger.verbose("Worker(%s): is not running" % self.name)
155         return status
156
157     def _destroy(self, curr_class):
158         self._acct = None
159         if curr_class:
160             self._destroy_sem.acquire()
161             try: curr_class.destroy(self.name)
162             finally: self._destroy_sem.release()
163
164     def _get_class(self):
165         try: shell = pwd.getpwnam(self.name)[6]
166         except KeyError: return None
167         return shell_acct_class[shell]
168
169     def _run(self):
170         """Repeatedly pull commands off the queue and execute.  If memory usage becomes an issue, it might be wise to terminate after a while."""
171         while True:
172             try:
173                 logger.verbose('Worker:_run : getting - size is %d'%self._q.qsize())
174                 cmd = self._q.get()
175                 cmd[0](*cmd[1:])
176             except:
177                 logger.log_exc(self.name)