Tagging module NodeManager - NodeManager-1.7-24
[nodemanager.git] / accounts.py
1 """Functionality common to all account classes.
2
3 Each subclass of Account must provide five methods: create() and
4 destroy(), which are static; configure(), start(), and stop(), which
5 are not.  configure(), which takes a record as its only argument, does
6 things like set up ssh keys.  In addition, an Account subclass must
7 provide static member variables SHELL, which contains the unique shell
8 that it uses; and TYPE, a string that is used by the account creation
9 code.  For no particular reason, TYPE is divided hierarchically by
10 periods; at the moment the only convention is that all sliver accounts
11 have type that begins with sliver.
12
13 There are any number of race conditions that may result from the fact
14 that account names are not unique over time.  Moreover, it's a bad
15 idea to perform lengthy operations while holding the database lock.
16 In order to deal with both of these problems, we use a worker thread
17 for each account name that ever exists.  On 32-bit systems with large
18 numbers of accounts, this may cause the NM process to run out of
19 *virtual* memory!  This problem may be remedied by decreasing the
20 maximum stack size.
21 """
22
23 import Queue
24 import os
25 import pwd
26 from grp import getgrnam
27 import threading
28
29 import logger
30 import tools
31
32
33 # When this variable is true, start after any ensure_created
34 Startingup = False
35 # Cumulative delay for starts when Startingup is true
36 csd_lock = threading.Lock()
37 cumstartdelay = 0
38
39 # shell path -> account class association
40 shell_acct_class = {}
41 # account type -> account class association
42 type_acct_class = {}
43
44 def register_class(acct_class):
45     """Call once for each account class.  This method adds the class to the dictionaries used to look up account classes by shell and type."""
46     shell_acct_class[acct_class.SHELL] = acct_class
47     type_acct_class[acct_class.TYPE] = acct_class
48
49
50 # private account name -> worker object association and associated lock
51 name_worker_lock = threading.Lock()
52 name_worker = {}
53
54 def allpwents():
55     return [pw_ent for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
56
57 def all():
58     """Return the names of all accounts on the system with recognized shells."""
59     return [pw_ent[0] for pw_ent in allpwents()]
60
61 def get(name):
62     """Return the worker object for a particular username.  If no such object exists, create it first."""
63     name_worker_lock.acquire()
64     try:
65         if name not in name_worker: name_worker[name] = Worker(name)
66         return name_worker[name]
67     finally: name_worker_lock.release()
68
69
70 class Account:
71     def __init__(self, rec):
72         logger.verbose('Initing account %s'%rec['name'])
73         self.name = rec['name']
74         self.keys = ''
75         self.initscriptchanged = False
76         self.configure(rec)
77
78     @staticmethod
79     def create(name, vref = None): abstract
80     @staticmethod
81     def destroy(name): abstract
82
83     def configure(self, rec):
84         """Write <rec['keys']> to my authorized_keys file."""
85         logger.verbose('in accounts:configure for %s'%self.name)
86         new_keys = rec['keys']
87         if new_keys != self.keys:
88             self.keys = new_keys
89             dot_ssh = '/home/%s/.ssh' % self.name
90             if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
91             os.chmod(dot_ssh, 0700)
92             tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
93             logger.log('%s: installing ssh keys' % self.name)
94             user = pwd.getpwnam(self.name)[2]
95             group = getgrnam("slices")[2]
96             os.chown(dot_ssh, user, group)
97             os.chown(dot_ssh + '/authorized_keys', user, group)
98
99     def start(self, delay=0): pass
100     def stop(self): pass
101     def is_running(self): pass
102
103 class Worker:
104     # these semaphores are acquired before creating/destroying an account
105     _create_sem = threading.Semaphore(1)
106     _destroy_sem = threading.Semaphore(1)
107
108     def __init__(self, name):
109         self.name = name  # username
110         self._acct = None  # the account object currently associated with this worker
111         # task list
112         # outsiders request operations by putting (fn, args...) tuples on _q
113         # the worker thread (created below) will perform these operations in order
114         self._q = Queue.Queue()
115         tools.as_daemon_thread(self._run)
116
117     def ensure_created(self, rec):
118         """Cause the account specified by <rec> to exist if it doesn't already."""
119         if rec.has_key('name'):
120             logger.verbose('Worker.ensure_created with name=%s'%rec['name'])
121         self._q.put((self._ensure_created, rec.copy(), Startingup))
122         logger.verbose('Worker queue has %d item(s)'%self._q.qsize())
123
124     def _ensure_created(self, rec, startingup):
125         curr_class = self._get_class()
126         next_class = type_acct_class[rec['type']]
127         if next_class != curr_class:
128             self._destroy(curr_class)
129             self._create_sem.acquire()
130             try: next_class.create(self.name, rec['vref'])
131             finally: self._create_sem.release()
132         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
133         else: self._acct.configure(rec)
134         if startingup or not self.is_running():
135             csd_lock.acquire()
136             global cumstartdelay
137             delay = cumstartdelay
138             cumstartdelay += 2
139             csd_lock.release()
140             self._acct.start(delay=delay)
141         elif next_class != curr_class or self._acct.initscriptchanged:
142             self._acct.start()
143
144     def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
145     def _ensure_destroyed(self): self._destroy(self._get_class())
146
147     def start(self, delay=0): self._q.put((self._start, delay))
148     def _start(self, d): self._acct.start(delay=d)
149
150     def stop(self): self._q.put((self._stop,))
151     def _stop(self): self._acct.stop()
152
153     def is_running(self): 
154         if self._acct.is_running():
155             status = True
156         else:
157             status = False
158             logger.verbose("Worker(%s): is not running" % self.name)
159         return status
160
161     def _destroy(self, curr_class):
162         self._acct = None
163         if curr_class:
164             self._destroy_sem.acquire()
165             try: curr_class.destroy(self.name)
166             finally: self._destroy_sem.release()
167
168     def _get_class(self):
169         try: shell = pwd.getpwnam(self.name)[6]
170         except KeyError: return None
171         return shell_acct_class[shell]
172
173     def _run(self):
174         """Repeatedly pull commands off the queue and execute.  If memory usage becomes an issue, it might be wise to terminate after a while."""
175         while True:
176             try:
177                 logger.verbose('Worker:_run : getting - size is %d'%self._q.qsize())
178                 cmd = self._q.get()
179                 cmd[0](*cmd[1:])
180             except:
181                 logger.log_exc(self.name)