Improvements to linux node model
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Dec 2012 16:17:08 +0000 (17:17 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Dec 2012 16:17:08 +0000 (17:17 +0100)
src/neco/resources/linux/node.py
src/neco/util/sshfuncs.py

index 7eddd02..0ec3c16 100644 (file)
@@ -5,6 +5,7 @@ from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
 import cStringIO
 import logging
 import os.path
+import subprocess
 
 class LinuxNode(Resource):
     def __init__(self, box, ec):
@@ -63,6 +64,10 @@ class LinuxNode(Resource):
 
         return self._pm
 
+    @property
+    def is_localhost(self):
+        return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
+
     def install(self, packages):
         if not isinstance(packages, list):
             packages = [packages]
@@ -83,53 +88,32 @@ class LinuxNode(Resource):
         if not os.path.isfile(src):
             src = cStringIO.StringIO(src)
 
-        # Build destination as <user>@<server>:<path>
-        dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
-
-        (out, err), proc = eintr_retry(rcopy)(
-            src, dst, 
-            port = self.port,
-            identity_file = self.identity_file)
-
-        if proc.wait():
-            msg = "Error uploading to %s got:\n%s%s" %\
-                    (self.host or self.ip, out, err)
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+        if not self.is_localhost:
+            # Build destination as <user>@<server>:<path>
+            dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
+        return self.copy(src, dst)
 
     def download(self, src, dst):
-        # Build destination as <user>@<server>:<path>
-        src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
-
-        (out, err), proc = eintr_retry(rcopy)(
-            src, dst, 
-            port = self.port,
-            identity_file = self.identity_file)
-
-        if proc.wait():
-            msg = "Error uploading to %s got:\n%s%s" %\
-                    (self.host or self.ip, out, err)
-            self._logger.error(msg)
-            raise RuntimeError(msg)
-
+        if not self.is_localhost:
+            # Build destination as <user>@<server>:<path>
+            src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+        return self.copy(src, dst)
+        
     def is_alive(self, verbose = False):
-        (out, err), proc = eintr_retry(rexec)(
-                "echo 'ALIVE'",
-                self.host or self.ip, 
-                self.user,
-                port = self.port, 
-                agent = self.forward_agent,
-                identity_file = self.identity_file,
-                x11 = self.enable_x11,
+        if self.is_localhost:
+            return True
+
+        try:
+            out = self.execute("echo 'ALIVE'",
                 timeout = 60,
                 err_on_timeout = False,
                 persistent = False)
-        
-        if proc.wait():
+        except:
             if verbose:
                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
             return False
-        elif out.strip().startswith('ALIVE'):
+
+        if out.strip().startswith('ALIVE'):
             return True
         else:
             if verbose:
@@ -140,22 +124,43 @@ class LinuxNode(Resource):
         if clean:
             self.rmdir(path)
 
-        self.execute(
+        return self.execute(
             "mkdir -p %s" % path,
             timeout = 120,
             retry = 3
             )
 
     def rmdir(self, path):
-        self.execute(
+        return self.execute(
             "rm -rf %s" % path,
             timeout = 120,
             retry = 3
             )
 
+    def copy(self, src, dst):
+        if self.is_localhost:
+            command = ["cp", "-R", src, dst]
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            (out, err), proc = eintr_retry(rcopy)(
+                src, dst, 
+                port = self.port,
+                agent = self.agent,
+                identity_file = self.identity_file)
+
+            if proc.wait():
+                msg = "Error uploading to %s got:\n%s%s" %\
+                        (self.host or self.ip, out, err)
+                self._logger.error(msg)
+                raise RuntimeError(msg)
+
+        return (out, err)
+
     def execute(self, command,
             sudo = False,
-            stdin = ""
+            stdin = None
             tty = False,
             env = None,
             timeout = None,
@@ -166,31 +171,46 @@ class LinuxNode(Resource):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
         use 'run' instead."""
-        (out, err), proc = eintr_retry(rexec)(
-                command, 
-                self.host or self.ip, 
-                self.user,
-                port = self.port, 
-                agent = self.forward_agent,
-                sudo = sudo,
-                stdin = stdin, 
-                identity_file = self.identity_file,
-                tty = tty,
-                x11 = self.enable_x11,
-                env = env,
-                timeout = timeout,
-                retry = retry,
-                err_on_timeout = err_on_timeout,
-                connect_timeout = connect_timeout,
-                persistent = persistent)
-
-        if proc.wait():
-            msg = "Failed to execute command %s at node %s: %s %s" % \
-                    (command, self.host or self.ip, out, err,)
-            self._logger.warn(msg)
-            raise RuntimeError(msg)
 
-        return out
+        if self.is_localhost:
+            if env:
+                export = ''
+                for envkey, envval in env.iteritems():
+                    export += '%s=%s ' % (envkey, envval)
+                command = export + command
+
+            if sudo:
+                command = "sudo " + command
+
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            (out, err), proc = eintr_retry(rexec)(
+                    command, 
+                    self.host or self.ip, 
+                    self.user,
+                    port = self.port, 
+                    agent = self.forward_agent,
+                    sudo = sudo,
+                    stdin = stdin, 
+                    identity_file = self.identity_file,
+                    tty = tty,
+                    x11 = self.enable_x11,
+                    env = env,
+                    timeout = timeout,
+                    retry = retry,
+                    err_on_timeout = err_on_timeout,
+                    connect_timeout = connect_timeout,
+                    persistent = persistent)
+
+            if proc.wait():
+                msg = "Failed to execute command %s at node %s: %s %s" % \
+                        (command, self.host or self.ip, out, err,)
+                self._logger.warn(msg)
+                raise RuntimeError(msg)
+
+        return (out, err)
 
     def run(self, command, home, 
             stdin = None, 
@@ -198,26 +218,58 @@ class LinuxNode(Resource):
             stderr = 'stderr', 
             sudo = False):
         self._logger.info("Running %s", command)
-
-        # Start process in a "daemonized" way, using nohup and heavy
-        # stdin/out redirection to avoid connection issues
-        (out,err), proc = rspawn(
-            command,
-            pidfile = './pid',
-            home = home,
-            stdin = stdin if stdin is not None else '/dev/null',
-            stdout = stdout if stdout else '/dev/null',
-            stderr = stderr if stderr else '/dev/null',
-            sudo = sudo,
-            host = self.host,
-            user = self.user,
-            port = self.port,
-            agent = self.forward_agent,
-            identity_file = self.identity_file
-            )
         
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+        pidfile = './pid',
+
+        if self.is_localhost:
+            if stderr == stdout:
+                stderr = '&1'
+            else:
+                stderr = ' ' + stderr
+            
+            daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+                'command' : command,
+                'pidfile' : pidfile,
+                
+                'stdout' : stdout,
+                'stderr' : stderr,
+                'stdin' : stdin,
+            }
+            
+            cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
+                    'command' : daemon_command,
+                    
+                    'sudo' : 'sudo -S' if sudo else '',
+                    
+                    'pidfile' : pidfile,
+                    'gohome' : 'cd %s ; ' % home if home else '',
+                    'create' : 'mkdir -p %s ; ' % home if create_home else '',
+                }
+            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
+                    stderr=subprocess.PIPE)
+            out, err = p.communicate()
+        else:
+            # Start process in a "daemonized" way, using nohup and heavy
+            # stdin/out redirection to avoid connection issues
+            (out,err), proc = rspawn(
+                command,
+                pidfile = pidfile,
+                home = home,
+                stdin = stdin if stdin is not None else '/dev/null',
+                stdout = stdout if stdout else '/dev/null',
+                stderr = stderr if stderr else '/dev/null',
+                sudo = sudo,
+                host = self.host,
+                user = self.user,
+                port = self.port,
+                agent = self.forward_agent,
+                identity_file = self.identity_file
+                )
+            
+            if proc.wait():
+                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        return (out, err)
     
     def checkpid(self, path):            
         # Get PID/PPID
index 5690680..aad6039 100644 (file)
@@ -119,7 +119,7 @@ def rexec(command, host, user,
         port = None, 
         agent = True,
         sudo = False,
-        stdin = "",
+        stdin = None,
         identity_file = None,
         env = None,
         tty = False,