From a819e8fb87fac2ceb36a6e54e9d1a99627c0de8a Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Tue, 18 Dec 2012 17:17:08 +0100 Subject: [PATCH] Improvements to linux node model --- src/neco/resources/linux/node.py | 220 +++++++++++++++++++------------ src/neco/util/sshfuncs.py | 2 +- 2 files changed, 137 insertions(+), 85 deletions(-) diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 7eddd022..0ec3c16b 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -5,6 +5,7 @@ from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \ import cStringIO import logging import os.path +import subprocess class LinuxNode(Resource): def __init__(self, box, ec): @@ -63,6 +64,10 @@ class LinuxNode(Resource): return self._pm + @property + def is_localhost(self): + return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1'] + def install(self, packages): if not isinstance(packages, list): packages = [packages] @@ -83,53 +88,32 @@ class LinuxNode(Resource): if not os.path.isfile(src): src = cStringIO.StringIO(src) - # Build destination as @: - dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst) - - (out, err), proc = eintr_retry(rcopy)( - src, dst, - port = self.port, - identity_file = self.identity_file) - - if proc.wait(): - msg = "Error uploading to %s got:\n%s%s" %\ - (self.host or self.ip, out, err) - self._logger.error(msg) - raise RuntimeError(msg) + if not self.is_localhost: + # Build destination as @: + dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst) + return self.copy(src, dst) def download(self, src, dst): - # Build destination as @: - src = "%s@%s:%s" % (self.user, self.host or self.ip, src) - - (out, err), proc = eintr_retry(rcopy)( - src, dst, - port = self.port, - identity_file = self.identity_file) - - if proc.wait(): - msg = "Error uploading to %s got:\n%s%s" %\ - (self.host or self.ip, out, err) - self._logger.error(msg) - raise RuntimeError(msg) - + if not self.is_localhost: + # Build destination as @: + src = "%s@%s:%s" % (self.user, self.host or self.ip, src) + return self.copy(src, dst) + def is_alive(self, verbose = False): - (out, err), proc = eintr_retry(rexec)( - "echo 'ALIVE'", - self.host or self.ip, - self.user, - port = self.port, - agent = self.forward_agent, - identity_file = self.identity_file, - x11 = self.enable_x11, + if self.is_localhost: + return True + + try: + out = self.execute("echo 'ALIVE'", timeout = 60, err_on_timeout = False, persistent = False) - - if proc.wait(): + except: if verbose: self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err) return False - elif out.strip().startswith('ALIVE'): + + if out.strip().startswith('ALIVE'): return True else: if verbose: @@ -140,22 +124,43 @@ class LinuxNode(Resource): if clean: self.rmdir(path) - self.execute( + return self.execute( "mkdir -p %s" % path, timeout = 120, retry = 3 ) def rmdir(self, path): - self.execute( + return self.execute( "rm -rf %s" % path, timeout = 120, retry = 3 ) + def copy(self, src, dst): + if self.is_localhost: + command = ["cp", "-R", src, dst] + p = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate() + else: + (out, err), proc = eintr_retry(rcopy)( + src, dst, + port = self.port, + agent = self.agent, + identity_file = self.identity_file) + + if proc.wait(): + msg = "Error uploading to %s got:\n%s%s" %\ + (self.host or self.ip, out, err) + self._logger.error(msg) + raise RuntimeError(msg) + + return (out, err) + def execute(self, command, sudo = False, - stdin = "", + stdin = None, tty = False, env = None, timeout = None, @@ -166,31 +171,46 @@ class LinuxNode(Resource): """ Notice that this invocation will block until the execution finishes. If this is not the desired behavior, use 'run' instead.""" - (out, err), proc = eintr_retry(rexec)( - command, - self.host or self.ip, - self.user, - port = self.port, - agent = self.forward_agent, - sudo = sudo, - stdin = stdin, - identity_file = self.identity_file, - tty = tty, - x11 = self.enable_x11, - env = env, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent) - - if proc.wait(): - msg = "Failed to execute command %s at node %s: %s %s" % \ - (command, self.host or self.ip, out, err,) - self._logger.warn(msg) - raise RuntimeError(msg) - return out + if self.is_localhost: + if env: + export = '' + for envkey, envval in env.iteritems(): + export += '%s=%s ' % (envkey, envval) + command = export + command + + if sudo: + command = "sudo " + command + + p = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate() + else: + (out, err), proc = eintr_retry(rexec)( + command, + self.host or self.ip, + self.user, + port = self.port, + agent = self.forward_agent, + sudo = sudo, + stdin = stdin, + identity_file = self.identity_file, + tty = tty, + x11 = self.enable_x11, + env = env, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent) + + if proc.wait(): + msg = "Failed to execute command %s at node %s: %s %s" % \ + (command, self.host or self.ip, out, err,) + self._logger.warn(msg) + raise RuntimeError(msg) + + return (out, err) def run(self, command, home, stdin = None, @@ -198,26 +218,58 @@ class LinuxNode(Resource): stderr = 'stderr', sudo = False): self._logger.info("Running %s", command) - - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues - (out,err), proc = rspawn( - command, - pidfile = './pid', - home = home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.host, - user = self.user, - port = self.port, - agent = self.forward_agent, - identity_file = self.identity_file - ) - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + pidfile = './pid', + + if self.is_localhost: + if stderr == stdout: + stderr = '&1' + else: + stderr = ' ' + stderr + + daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % { + 'command' : command, + 'pidfile' : pidfile, + + 'stdout' : stdout, + 'stderr' : stderr, + 'stdin' : stdin, + } + + cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % { + 'command' : daemon_command, + + 'sudo' : 'sudo -S' if sudo else '', + + 'pidfile' : pidfile, + 'gohome' : 'cd %s ; ' % home if home else '', + 'create' : 'mkdir -p %s ; ' % home if create_home else '', + } + p = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate() + else: + # Start process in a "daemonized" way, using nohup and heavy + # stdin/out redirection to avoid connection issues + (out,err), proc = rspawn( + command, + pidfile = pidfile, + home = home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.host, + user = self.user, + port = self.port, + agent = self.forward_agent, + identity_file = self.identity_file + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + return (out, err) def checkpid(self, path): # Get PID/PPID diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index 56906808..aad6039c 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -119,7 +119,7 @@ def rexec(command, host, user, port = None, agent = True, sudo = False, - stdin = "", + stdin = None, identity_file = None, env = None, tty = False, -- 2.43.0