Call start after initscript change. set_disklimit() workaround.
[nodemanager.git] / accounts.py
1 """Functionality common to all account classes.
2
3 Each subclass of Account must provide five methods: create() and
4 destroy(), which are static; configure(), start(), and stop(), which
5 are not.  configure(), which takes a record as its only argument, does
6 things like set up ssh keys.  In addition, an Account subclass must
7 provide static member variables SHELL, which contains the unique shell
8 that it uses; and TYPE, a string that is used by the account creation
9 code.  For no particular reason, TYPE is divided hierarchically by
10 periods; at the moment the only convention is that all sliver accounts
11 have type that begins with sliver.
12
13 There are any number of race conditions that may result from the fact
14 that account names are not unique over time.  Moreover, it's a bad
15 idea to perform lengthy operations while holding the database lock.
16 In order to deal with both of these problems, we use a worker thread
17 for each account name that ever exists.  On 32-bit systems with large
18 numbers of accounts, this may cause the NM process to run out of
19 *virtual* memory!  This problem may be remedied by decreasing the
20 maximum stack size.
21 """
22
23 import Queue
24 import os
25 import pwd
26 import threading
27
28 import logger
29 import tools
30
31
32 # When this variable is true, start after any ensure_created
33 startingup = False
34 # Cumulative delay for starts when startingup is true
35 cumstartdelay = 0
36
37 # shell path -> account class association
38 shell_acct_class = {}
39 # account type -> account class association
40 type_acct_class = {}
41
42 def register_class(acct_class):
43     """Call once for each account class.  This method adds the class to the dictionaries used to look up account classes by shell and type."""
44     shell_acct_class[acct_class.SHELL] = acct_class
45     type_acct_class[acct_class.TYPE] = acct_class
46
47
48 # private account name -> worker object association and associated lock
49 name_worker_lock = threading.Lock()
50 name_worker = {}
51
52 def allpwents():
53     return [pw_ent for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
54
55 def all():
56     """Return the names of all accounts on the system with recognized shells."""
57     return [pw_ent[0] for pw_ent in allpwents()]
58
59 def get(name):
60     """Return the worker object for a particular username.  If no such object exists, create it first."""
61     name_worker_lock.acquire()
62     try:
63         if name not in name_worker: name_worker[name] = Worker(name)
64         return name_worker[name]
65     finally: name_worker_lock.release()
66
67
68 class Account:
69     def __init__(self, rec):
70         self.name = rec['name']
71         self.keys = ''
72         self.initscriptchanged = False
73         self.configure(rec)
74
75     @staticmethod
76     def create(name, vref = None): abstract
77     @staticmethod
78     def destroy(name): abstract
79
80     def configure(self, rec):
81         """Write <rec['keys']> to my authorized_keys file."""
82         new_keys = rec['keys']
83         if new_keys != self.keys:
84             self.keys = new_keys
85             dot_ssh = '/home/%s/.ssh' % self.name
86             def do_installation():
87                 if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
88                 os.chmod(dot_ssh, 0700)
89                 tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
90             logger.log('%s: installing ssh keys' % self.name)
91             tools.fork_as(self.name, do_installation)
92
93     def start(self, delay=0): pass
94     def stop(self): pass
95
96
97 class Worker:
98     # these semaphores are acquired before creating/destroying an account
99     _create_sem = threading.Semaphore(1)
100     _destroy_sem = threading.Semaphore(1)
101
102     def __init__(self, name):
103         self.name = name  # username
104         self._acct = None  # the account object currently associated with this worker
105         # task list
106         # outsiders request operations by putting (fn, args...) tuples on _q
107         # the worker thread (created below) will perform these operations in order
108         self._q = Queue.Queue()
109         tools.as_daemon_thread(self._run)
110
111     def ensure_created(self, rec):
112         """Cause the account specified by <rec> to exist if it doesn't already."""
113         self._q.put((self._ensure_created, rec.copy()))
114
115     def _ensure_created(self, rec):
116         curr_class = self._get_class()
117         next_class = type_acct_class[rec['type']]
118         if next_class != curr_class:
119             self._destroy(curr_class)
120             self._create_sem.acquire()
121             try: next_class.create(self.name, rec['vref'])
122             finally: self._create_sem.release()
123         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
124         else: self._acct.configure(rec)
125         if startingup:
126             self._acct.start(delay=cumstartdelay)
127             cumstartdelay += 2
128         elif next_class != curr_class or self._acct.initscriptchanged:
129             self._acct.start()
130
131     def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
132     def _ensure_destroyed(self): self._destroy(self._get_class())
133
134     def start(self, delay=0): self._q.put((self._start, delay))
135     def _start(self, d): self._acct.start(delay=d)
136
137     def stop(self): self._q.put((self._stop,))
138     def _stop(self): self._acct.stop()
139
140     def _destroy(self, curr_class):
141         self._acct = None
142         if curr_class:
143             self._destroy_sem.acquire()
144             try: curr_class.destroy(self.name)
145             finally: self._destroy_sem.release()
146
147     def _get_class(self):
148         try: shell = pwd.getpwnam(self.name)[6]
149         except KeyError: return None
150         return shell_acct_class[shell]
151
152     def _run(self):
153         """Repeatedly pull commands off the queue and execute.  If memory usage becomes an issue, it might be wise to terminate after a while."""
154         while True:
155             try:
156                 cmd = self._q.get()
157                 cmd[0](*cmd[1:])
158             except: logger.log_exc()