Bug fixes in Linux Application and sshfuncs
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 20:00:48 +0000 (22:00 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 20:00:48 +0000 (22:00 +0200)
src/neco/execution/ec.py
src/neco/resources/linux/node.py
src/neco/util/sshfuncs.py

index 65ef175..1d25334 100644 (file)
@@ -14,8 +14,7 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: Improve speed. Too slow... !!
-# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
+# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
 
 class ECState(object):
     RUNNING = 1
index 38c3865..237399e 100644 (file)
@@ -413,12 +413,12 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         try:
-            (out, err), proc = self.execute("echo 'ALIVE'", retry = 5, 
+            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
                     with_lock = True)
         except:
             import traceback
             trace = traceback.format_exc()
-            msg = "Unresponsive host "
+            msg = "Unresponsive host  %s " % err
             self.error(msg, out, trace)
             return False
 
index 13b7b1e..a8a9b86 100644 (file)
@@ -9,11 +9,10 @@ import select
 import signal
 import socket
 import subprocess
+import threading
 import time
 import tempfile
 
-# TODO: Add retries to rcopy!! rcopy is not being retried!
-
 logger = logging.getLogger("sshfuncs")
 
 def log(msg, level, out = None, err = None):
@@ -55,14 +54,21 @@ class NOT_STARTED:
     """
 
 hostbyname_cache = dict()
+hostbyname_cache_lock = threading.Lock()
 
 def gethostbyname(host):
     global hostbyname_cache
+    global hostbyname_cache_lock
     
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
-        hostbyname = socket.gethostbyname(host)
-        hostbyname_cache[host] = hostbyname
+        with hostbyname_cache_lock:
+            hostbyname = socket.gethostbyname(host)
+            hostbyname_cache[host] = hostbyname
+
+            msg = " Added hostbyname %s - %s " % (host, hostbyname)
+            log(msg, logging.DEBUG)
+
     return hostbyname
 
 OPENSSH_HAS_PERSIST = None
@@ -297,6 +303,7 @@ def rcopy(source, dest,
         recursive = False,
         identity = None,
         server_key = None,
+        retry = 3,
         strict_host_checking = True):
     """
     Copies from/to remote sites.
@@ -316,9 +323,6 @@ def rcopy(source, dest,
     in which case it is advised that the destination be a folder.
     """
     
-    msg = " rcopy - scp %s %s " % (source, dest)
-    log(msg, logging.DEBUG)
-    
     if isinstance(source, file) and source.tell() == 0:
         source = source.name
     elif hasattr(source, 'read'):
@@ -525,15 +529,41 @@ def rcopy(source, dest,
 
         args.append(dest)
 
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args, 
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        proc._known_hosts = tmp_known_hosts
+        for x in xrange(retry):
+            # connects to the remote host and starts a remote connection
+            proc = subprocess.Popen(args,
+                    stdout = subprocess.PIPE,
+                    stdin = subprocess.PIPE, 
+                    stderr = subprocess.PIPE)
+            
+            # attach tempfile object to the process, to make sure the file stays
+            # alive until the process is finished with it
+            proc._known_hosts = tmp_known_hosts
         
-        (out, err) = proc.communicate()
-        eintr_retry(proc.wait)()
+            try:
+                (out, err) = proc.communicate()
+                eintr_retry(proc.wait)()
+                msg = " rcopy - host %s - command %s " % (host, " ".join(args))
+                log(msg, logging.DEBUG, out, err)
+
+                if proc.poll():
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
+                            t, x, host, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
+                    continue
+
+                break
+            except RuntimeError, e:
+                msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
+                log(msg, logging.DEBUG, out, err)
+
+                if retry <= 0:
+                    raise
+                retry -= 1
+            
         return ((out, err), proc)
 
 def rspawn(command, pidfile,