From 72accb16ffd0f4349f721f8ca21738179ac3d597 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sat, 11 May 2013 22:00:48 +0200 Subject: [PATCH] Bug fixes in Linux Application and sshfuncs --- src/neco/execution/ec.py | 3 +- src/neco/resources/linux/node.py | 4 +-- src/neco/util/sshfuncs.py | 60 ++++++++++++++++++++++++-------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 65ef175f..1d253347 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -14,8 +14,7 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading -# TODO: Improve speed. Too slow... !! -# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!! +# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!! class ECState(object): RUNNING = 1 diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 38c38651..237399e2 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -413,12 +413,12 @@ class LinuxNode(ResourceManager): out = err = "" try: - (out, err), proc = self.execute("echo 'ALIVE'", retry = 5, + (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, with_lock = True) except: import traceback trace = traceback.format_exc() - msg = "Unresponsive host " + msg = "Unresponsive host %s " % err self.error(msg, out, trace) return False diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index 13b7b1ed..a8a9b866 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -9,11 +9,10 @@ import select import signal import socket import subprocess +import threading import time import tempfile -# TODO: Add retries to rcopy!! rcopy is not being retried! - logger = logging.getLogger("sshfuncs") def log(msg, level, out = None, err = None): @@ -55,14 +54,21 @@ class NOT_STARTED: """ hostbyname_cache = dict() +hostbyname_cache_lock = threading.Lock() def gethostbyname(host): global hostbyname_cache + global hostbyname_cache_lock hostbyname = hostbyname_cache.get(host) if not hostbyname: - hostbyname = socket.gethostbyname(host) - hostbyname_cache[host] = hostbyname + with hostbyname_cache_lock: + hostbyname = socket.gethostbyname(host) + hostbyname_cache[host] = hostbyname + + msg = " Added hostbyname %s - %s " % (host, hostbyname) + log(msg, logging.DEBUG) + return hostbyname OPENSSH_HAS_PERSIST = None @@ -297,6 +303,7 @@ def rcopy(source, dest, recursive = False, identity = None, server_key = None, + retry = 3, strict_host_checking = True): """ Copies from/to remote sites. @@ -316,9 +323,6 @@ def rcopy(source, dest, in which case it is advised that the destination be a folder. """ - msg = " rcopy - scp %s %s " % (source, dest) - log(msg, logging.DEBUG) - if isinstance(source, file) and source.tell() == 0: source = source.name elif hasattr(source, 'read'): @@ -525,15 +529,41 @@ def rcopy(source, dest, args.append(dest) - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - proc._known_hosts = tmp_known_hosts + for x in xrange(retry): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts - (out, err) = proc.communicate() - eintr_retry(proc.wait)() + try: + (out, err) = proc.communicate() + eintr_retry(proc.wait)() + msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + log(msg, logging.DEBUG, out, err) + + if proc.poll(): + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( + t, x, host, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) + continue + + break + except RuntimeError, e: + msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) + log(msg, logging.DEBUG, out, err) + + if retry <= 0: + raise + retry -= 1 + return ((out, err), proc) def rspawn(command, pidfile, -- 2.43.0