Merge branch 'master' of ssh://git.onelab.eu/git/nodemanager
[nodemanager.git] / accounts.py
index 47e7af5..74afcff 100644 (file)
@@ -1,9 +1,13 @@
+### 
+
 """Functionality common to all account classes.
 
 """Functionality common to all account classes.
 
-Each subclass of Account must provide five methods: create() and
-destroy(), which are static; configure(), start(), and stop(), which
-are not.  configure(), which takes a record as its only argument, does
-things like set up ssh keys.  In addition, an Account subclass must
+Each subclass of Account must provide five methods:
+  (*) create() and destroy(), which are static;
+  (*) configure(), start(), and stop(), which are not.
+
+configure(), which takes a record as its only argument, does
+things like set up ssh keys. In addition, an Account subclass must
 provide static member variables SHELL, which contains the unique shell
 that it uses; and TYPE, a string that is used by the account creation
 code.  For no particular reason, TYPE is divided hierarchically by
 provide static member variables SHELL, which contains the unique shell
 that it uses; and TYPE, a string that is used by the account creation
 code.  For no particular reason, TYPE is divided hierarchically by
@@ -20,28 +24,27 @@ numbers of accounts, this may cause the NM process to run out of
 maximum stack size.
 """
 
 maximum stack size.
 """
 
-import Queue
 import os
 import os
-import pwd
+import pwd, grp
 import threading
 
 import logger
 import tools
 
 
 import threading
 
 import logger
 import tools
 
 
-# When this variable is true, start after any ensure_created
-Startingup = False
-# Cumulative delay for starts when Startingup is true
-csd_lock = threading.Lock()
-cumstartdelay = 0
-
 # shell path -> account class association
 shell_acct_class = {}
 # account type -> account class association
 type_acct_class = {}
 
 # shell path -> account class association
 shell_acct_class = {}
 # account type -> account class association
 type_acct_class = {}
 
+# these semaphores are acquired before creating/destroying an account
+create_sem = threading.Semaphore(1)
+destroy_sem = threading.Semaphore(1)
+
 def register_class(acct_class):
 def register_class(acct_class):
-    """Call once for each account class.  This method adds the class to the dictionaries used to look up account classes by shell and type."""
+    """Call once for each account class. This method adds the class
+to the dictionaries used to look up account classes by shell and
+type."""
     shell_acct_class[acct_class.SHELL] = acct_class
     type_acct_class[acct_class.TYPE] = acct_class
 
     shell_acct_class[acct_class.SHELL] = acct_class
     type_acct_class[acct_class.TYPE] = acct_class
 
@@ -68,97 +71,122 @@ def get(name):
 
 class Account:
     def __init__(self, rec):
 
 class Account:
     def __init__(self, rec):
+        logger.verbose('accounts: Initing account %s'%rec['name'])
         self.name = rec['name']
         self.keys = ''
         self.name = rec['name']
         self.keys = ''
-        self.initscriptchanged = False
         self.configure(rec)
 
     @staticmethod
     def create(name, vref = None): abstract
         self.configure(rec)
 
     @staticmethod
     def create(name, vref = None): abstract
+
     @staticmethod
     def destroy(name): abstract
 
     def configure(self, rec):
         """Write <rec['keys']> to my authorized_keys file."""
     @staticmethod
     def destroy(name): abstract
 
     def configure(self, rec):
         """Write <rec['keys']> to my authorized_keys file."""
+        logger.verbose('accounts: configuring %s'%self.name)
         new_keys = rec['keys']
         if new_keys != self.keys:
         new_keys = rec['keys']
         if new_keys != self.keys:
+            # get the unix account info
+            gid = grp.getgrnam("slices")[2]
+            pw_info = pwd.getpwnam(self.name)
+            uid = pw_info[2]
+            pw_dir = pw_info[5]
+
+            # write out authorized_keys file and conditionally create
+            # the .ssh subdir if need be.
+            dot_ssh = os.path.join(pw_dir,'.ssh')
+            if not os.path.isdir(dot_ssh):
+                if not os.path.isdir(pw_dir):
+                    logger.verbose('accounts: WARNING: homedir %s does not exist for %s!'%(pw_dir,self.name))
+                    os.mkdir(pw_dir)
+                    os.chown(pw_dir, uid, gid)
+                os.mkdir(dot_ssh)
+
+            auth_keys = os.path.join(dot_ssh,'authorized_keys')
+            tools.write_file(auth_keys, lambda f: f.write(new_keys))
+
+            # set access permissions and ownership properly
+            os.chmod(dot_ssh, 0700)
+            os.chown(dot_ssh, uid, gid)
+            os.chmod(auth_keys, 0600)
+            os.chown(auth_keys, uid, gid)
+
+            # set self.keys to new_keys only when all of the above ops succeed
             self.keys = new_keys
             self.keys = new_keys
-            dot_ssh = '/home/%s/.ssh' % self.name
-            def do_installation():
-                if not os.access(dot_ssh, os.F_OK): os.mkdir(dot_ssh)
-                os.chmod(dot_ssh, 0700)
-                tools.write_file(dot_ssh + '/authorized_keys', lambda f: f.write(new_keys))
-            logger.log('%s: installing ssh keys' % self.name)
-            tools.fork_as(self.name, do_installation)
+
+            logger.log('accounts: %s: installed ssh keys' % self.name)
 
     def start(self, delay=0): pass
     def stop(self): pass
 
     def start(self, delay=0): pass
     def stop(self): pass
-
+    def is_running(self): pass
 
 class Worker:
 
 class Worker:
-    # these semaphores are acquired before creating/destroying an account
-    _create_sem = threading.Semaphore(1)
-    _destroy_sem = threading.Semaphore(1)
 
     def __init__(self, name):
         self.name = name  # username
         self._acct = None  # the account object currently associated with this worker
 
     def __init__(self, name):
         self.name = name  # username
         self._acct = None  # the account object currently associated with this worker
-        # task list
-        # outsiders request operations by putting (fn, args...) tuples on _q
-        # the worker thread (created below) will perform these operations in order
-        self._q = Queue.Queue()
-        tools.as_daemon_thread(self._run)
 
     def ensure_created(self, rec):
 
     def ensure_created(self, rec):
-        """Cause the account specified by <rec> to exist if it doesn't already."""
-        self._q.put((self._ensure_created, rec.copy(), Startingup))
-
-    def _ensure_created(self, rec, startingup):
+        """Check account type is still valid.  If not, recreate sliver.
+If still valid, check if running and configure/start if not."""
+        logger.log_data_in_file(rec,"/var/lib/nodemanager/%s.rec.txt"%rec['name'],
+                                'raw rec captured in ensure_created',logger.LOG_VERBOSE)
         curr_class = self._get_class()
         next_class = type_acct_class[rec['type']]
         if next_class != curr_class:
             self._destroy(curr_class)
         curr_class = self._get_class()
         next_class = type_acct_class[rec['type']]
         if next_class != curr_class:
             self._destroy(curr_class)
-            self._create_sem.acquire()
+            create_sem.acquire()
             try: next_class.create(self.name, rec['vref'])
             try: next_class.create(self.name, rec['vref'])
-            finally: self._create_sem.release()
+            finally: create_sem.release()
         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
         if not isinstance(self._acct, next_class): self._acct = next_class(rec)
-        else: self._acct.configure(rec)
-        if startingup:
-            csd_lock.acquire()
-            global cumstartdelay
-            delay = cumstartdelay
-            cumstartdelay += 2
-            csd_lock.release()
-            self._acct.start(delay=delay)
-        elif next_class != curr_class or self._acct.initscriptchanged:
-            self._acct.start()
-
-    def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
-    def _ensure_destroyed(self): self._destroy(self._get_class())
-
-    def start(self, delay=0): self._q.put((self._start, delay))
-    def _start(self, d): self._acct.start(delay=d)
-
-    def stop(self): self._q.put((self._stop,))
-    def _stop(self): self._acct.stop()
+        logger.verbose("accounts.ensure_created: %s, running=%r"%(self.name,self.is_running()))
+
+        # reservation_alive is set on reervable nodes, and its value is a boolean
+        if 'reservation_alive' in rec:
+            # reservable nodes
+            if rec['reservation_alive']:
+                # this sliver has the lease, it is safe to start it
+                if not self.is_running(): self.start(rec)
+                else: self.configure(rec)
+            else:
+                # not having the lease, do not start it
+                self.configure(rec)
+        # usual nodes - preserve old code
+        # xxx it's not clear what to do when a sliver changes type/class
+        # in a reservable node
+        else:
+            if not self.is_running() or next_class != curr_class:
+                self.start(rec)
+            else: self.configure(rec)
+
+    def ensure_destroyed(self): self._destroy(self._get_class())
+
+    def start(self, rec, d = 0):
+        self._acct.configure(rec)
+        self._acct.start(delay=d)
+
+    def configure(self, rec):
+        self._acct.configure(rec)
+
+    def stop(self): self._acct.stop()
+
+    def is_running(self):
+        if (self._acct != None) and self._acct.is_running():
+            status = True
+        else:
+            status = False
+            logger.verbose("accounts: Worker(%s): is not running" % self.name)
+        return status
 
     def _destroy(self, curr_class):
         self._acct = None
         if curr_class:
 
     def _destroy(self, curr_class):
         self._acct = None
         if curr_class:
-            self._destroy_sem.acquire()
+            destroy_sem.acquire()
             try: curr_class.destroy(self.name)
             try: curr_class.destroy(self.name)
-            finally: self._destroy_sem.release()
+            finally: destroy_sem.release()
 
     def _get_class(self):
         try: shell = pwd.getpwnam(self.name)[6]
         except KeyError: return None
         return shell_acct_class[shell]
 
     def _get_class(self):
         try: shell = pwd.getpwnam(self.name)[6]
         except KeyError: return None
         return shell_acct_class[shell]
-
-    def _run(self):
-        """Repeatedly pull commands off the queue and execute.  If memory usage becomes an issue, it might be wise to terminate after a while."""
-        while True:
-            try:
-                cmd = self._q.get()
-                cmd[0](*cmd[1:])
-            except:
-                logger.log_exc(self.name)