Merge branch 'devel' of ssh://git.planet-lab.org/git/nodemanager into devel
[nodemanager.git] / accounts.py
index b330033..2f3bb32 100644 (file)
@@ -1,9 +1,13 @@
+### 
+
 """Functionality common to all account classes.
 
 """Functionality common to all account classes.
 
-Each subclass of Account must provide five methods: create() and
-destroy(), which are static; configure(), start(), and stop(), which
-are not.  configure(), which takes a record as its only argument, does
-things like set up ssh keys.  In addition, an Account subclass must
+Each subclass of Account must provide five methods:
+  (*) create() and destroy(), which are static;
+  (*) configure(), start(), and stop(), which are not.
+
+configure(), which takes a record as its only argument, does
+things like set up ssh keys. In addition, an Account subclass must
 provide static member variables SHELL, which contains the unique shell
 that it uses; and TYPE, a string that is used by the account creation
 code.  For no particular reason, TYPE is divided hierarchically by
 provide static member variables SHELL, which contains the unique shell
 that it uses; and TYPE, a string that is used by the account creation
 code.  For no particular reason, TYPE is divided hierarchically by
@@ -20,9 +24,8 @@ numbers of accounts, this may cause the NM process to run out of
 maximum stack size.
 """
 
 maximum stack size.
 """
 
-import Queue
 import os
 import os
-import pwd
+import pwd, grp
 import threading
 
 import logger
 import threading
 
 import logger
@@ -34,8 +37,14 @@ shell_acct_class = {}
 # account type -> account class association
 type_acct_class = {}
 
 # account type -> account class association
 type_acct_class = {}
 
+# these semaphores are acquired before creating/destroying an account
+create_sem = threading.Semaphore(1)
+destroy_sem = threading.Semaphore(1)
+
 def register_class(acct_class):
 def register_class(acct_class):
-    """Call once for each account class.  This method adds the class to the dictionaries used to look up account classes by shell and type."""
+    """Call once for each account class. This method adds the class
+to the dictionaries used to look up account classes by shell and
+type."""
     shell_acct_class[acct_class.SHELL] = acct_class
     type_acct_class[acct_class.TYPE] = acct_class
 
     shell_acct_class[acct_class.SHELL] = acct_class
     type_acct_class[acct_class.TYPE] = acct_class
 
@@ -44,9 +53,12 @@ def register_class(acct_class):
 name_worker_lock = threading.Lock()
 name_worker = {}
 
 name_worker_lock = threading.Lock()
 name_worker = {}
 
+def allpwents():
+    return [pw_ent for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
+
 def all():
     """Return the names of all accounts on the system with recognized shells."""
 def all():
     """Return the names of all accounts on the system with recognized shells."""
-    return [pw_ent[0] for pw_ent in pwd.getpwall() if pw_ent[6] in shell_acct_class]
+    return [pw_ent[0] for pw_ent in allpwents()]
 
 def get(name):
     """Return the worker object for a particular username.  If no such object exists, create it first."""
 
 def get(name):
     """Return the worker object for a particular username.  If no such object exists, create it first."""
@@ -59,86 +71,122 @@ def get(name):
 
 class Account:
     def __init__(self, rec):
 
 class Account:
     def __init__(self, rec):
+        logger.verbose('accounts: Initing account %s'%rec['name'])
         self.name = rec['name']
         self.keys = ''
         self.configure(rec)
 
     @staticmethod
         self.name = rec['name']
         self.keys = ''
         self.configure(rec)
 
     @staticmethod
-    def create(name): abstract
+    def create(name, vref = None): abstract
+
     @staticmethod
     def destroy(name): abstract
 
     def configure(self, rec):
         """Write <rec['keys']> to my authorized_keys file."""
     @staticmethod
     def destroy(name): abstract
 
     def configure(self, rec):
         """Write <rec['keys']> to my authorized_keys file."""
+        logger.verbose('accounts: configuring %s'%self.name)
         new_keys = rec['keys']
         if new_keys != self.keys:
         new_keys = rec['keys']
         if new_keys != self.keys:
+            # get the unix account info
+            gid = grp.getgrnam("slices")[2]
+            pw_info = pwd.getpwnam(self.name)
+            uid = pw_info[2]
+            pw_dir = pw_info[5]
+
+            # write out authorized_keys file and conditionally create
+            # the .ssh subdir if need be.
+            dot_ssh = os.path.join(pw_dir,'.ssh')
+            if not os.path.isdir(dot_ssh):
+                if not os.path.isdir(pw_dir):
+                    logger.verbose('accounts: WARNING: homedir %s does not exist for %s!'%(pw_dir,self.name))
+                    os.mkdir(pw_dir)
+                    os.chown(pw_dir, uid, gid)
+                os.mkdir(dot_ssh)
+
+            auth_keys = os.path.join(dot_ssh,'authorized_keys')
+            tools.write_file(auth_keys, lambda f: f.write(new_keys))
+
+            # set access permissions and ownership properly
+            os.chmod(dot_ssh, 0700)
+            os.chown(dot_ssh, uid, gid)
+            os.chmod(auth_keys, 0600)
+            os.chown(auth_keys, uid, gid)
+
+            # set self.keys to new_keys only when all of the above ops succeed
             self.keys = new_keys
             self.keys = new_keys
-            dot_ssh = '/home/%s/.ssh' % self.name
-            def do_installation():
-                if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
-                tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
-            logger.log('%s: installing ssh keys' % self.name)
-            tools.fork_as(self.name, do_installation)
+
+            logger.log('accounts: %s: installed ssh keys' % self.name)
 
     def start(self, delay=0): pass
     def stop(self): pass
 
     def start(self, delay=0): pass
     def stop(self): pass
-
+    def is_running(self): pass
 
 class Worker:
 
 class Worker:
-    # these semaphores are acquired before creating/destroying an account
-    _create_sem = threading.Semaphore(1)
-    _destroy_sem = threading.Semaphore(1)
 
     def __init__(self, name):
         self.name = name  # username
         self._acct = None  # the account object currently associated with this worker
 
     def __init__(self, name):
         self.name = name  # username
         self._acct = None  # the account object currently associated with this worker
-        # task list
-        # outsiders request operations by putting (fn, args...) tuples on _q
-        # the worker thread (created below) will perform these operations in order
-        self._q = Queue.Queue()
-        tools.as_daemon_thread(self._run)
 
     def ensure_created(self, rec):
 
     def ensure_created(self, rec):
-        """Cause the account specified by <rec> to exist if it doesn't already."""
-        self._q.put((self._ensure_created, rec.copy()))
-
-    def _ensure_created(self, rec):
+        """Check account type is still valid.  If not, recreate sliver.
+If still valid, check if running and configure/start if not."""
+        logger.log_data_in_file(rec,"/var/lib/nodemanager/%s.rec.txt"%rec['name'],
+                                'raw rec captured in ensure_created',logger.LOG_VERBOSE)
         curr_class = self._get_class()
         next_class = type_acct_class[rec['type']]
         if next_class != curr_class:
             self._destroy(curr_class)
         curr_class = self._get_class()
         next_class = type_acct_class[rec['type']]
         if next_class != curr_class:
             self._destroy(curr_class)
-            self._create_sem.acquire()
-            try: next_class.create(self.name, rec['vref'])
-            finally: self._create_sem.release()
+            create_sem.acquire()
+            try: next_class.create(self.name, rec)
+            finally: create_sem.release()
         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
-        else: self._acct.configure(rec)
-        if next_class != curr_class: self._acct.start()
+        logger.verbose("accounts.ensure_created: %s, running=%r"%(self.name,self.is_running()))
+
+        # reservation_alive is set on reervable nodes, and its value is a boolean
+        if 'reservation_alive' in rec:
+            # reservable nodes
+            if rec['reservation_alive']:
+                # this sliver has the lease, it is safe to start it
+                if not self.is_running(): self.start(rec)
+                else: self.configure(rec)
+            else:
+                # not having the lease, do not start it
+                self.configure(rec)
+        # usual nodes - preserve old code
+        # xxx it's not clear what to do when a sliver changes type/class
+        # in a reservable node
+        else:
+            if not self.is_running() or next_class != curr_class:
+                self.start(rec)
+            else: self.configure(rec)
+
+    def ensure_destroyed(self): self._destroy(self._get_class())
+
+    def start(self, rec, d = 0):
+        self._acct.configure(rec)
+        self._acct.start(delay=d)
 
 
-    def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
-    def _ensure_destroyed(self): self._destroy(self._get_class())
+    def configure(self, rec):
+        self._acct.configure(rec)
 
 
-    def start(self, delay=0): self._q.put((self._start, delay))
-    def _start(self, d): self._acct.start(delay=d)
+    def stop(self): self._acct.stop()
 
 
-    def stop(self): self._q.put((self._stop,))
-    def _stop(self): self._acct.stop()
+    def is_running(self):
+        if (self._acct != None) and self._acct.is_running():
+            status = True
+        else:
+            status = False
+            logger.verbose("accounts: Worker(%s): is not running" % self.name)
+        return status
 
     def _destroy(self, curr_class):
         self._acct = None
         if curr_class:
 
     def _destroy(self, curr_class):
         self._acct = None
         if curr_class:
-            self._destroy_sem.acquire()
+            destroy_sem.acquire()
             try: curr_class.destroy(self.name)
             try: curr_class.destroy(self.name)
-            finally: self._destroy_sem.release()
+            finally: destroy_sem.release()
 
     def _get_class(self):
         try: shell = pwd.getpwnam(self.name)[6]
         except KeyError: return None
         return shell_acct_class[shell]
 
     def _get_class(self):
         try: shell = pwd.getpwnam(self.name)[6]
         except KeyError: return None
         return shell_acct_class[shell]
-
-    def _run(self):
-        """Repeatedly pull commands off the queue and execute.  If memory usage becomes an issue, it might be wise to terminate after a while."""
-        while True:
-            try:
-                cmd = self._q.get()
-                cmd[0](*cmd[1:])
-            except: logger.log_exc()