Fixing issues in LinuxNode whith high concurrency
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 28 Jul 2013 06:09:51 +0000 (23:09 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 28 Jul 2013 06:09:51 +0000 (23:09 -0700)
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py

index 380470c..3519138 100644 (file)
@@ -446,7 +446,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info("Installing dependencies %s" % depends)
-            self.node.install_packages(depends, self.app_home, self.run_home)
+            return self.node.install_packages_command(depends)
 
     def build(self):
         build = self.get("build")
index 82ccaca..9188022 100644 (file)
@@ -256,12 +256,7 @@ class LinuxNode(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
-
-        if err and proc.poll():
-            msg = "Error detecting OS "
-            self.error(msg, out, err)
-            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+        out = self.get_os()
 
         if out.find("Fedora release 8") == 0:
             self._os = OSType.FEDORA_8
@@ -280,6 +275,32 @@ class LinuxNode(ResourceManager):
 
         return self._os
 
+    def get_os(self):
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        out = ""
+        retrydelay = 1.0
+        for i in xrange(10):
+            try:
+                (out, err), proc = self.execute("cat /etc/issue", 
+                        retry = 5,
+                        with_lock = True,
+                        blocking = True)
+
+                if out.strip() != "":
+                    return out
+            except:
+                trace = traceback.format_exc()
+                msg = "Error detecting OS: %s " % trace
+                self.error(msg, out, err)
+                return False
+
+            time.sleep(min(30.0, retrydelay))
+            retrydelay *= 1.5
+
+
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -643,12 +664,7 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home, run_home = None):
-        """ Install packages in the Linux host.
-
-        'home' is the directory to upload the package installation script.
-        'run_home' is the directory from where to execute the script.
-        """
+    def install_packages_command(self, packages):
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
@@ -659,6 +675,16 @@ class LinuxNode(ResourceManager):
             self.error(msg, self.os)
             raise RuntimeError, msg
 
+        return command
+
+    def install_packages(self, packages, home, run_home = None):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
+        command = self.install_packages_command(packages)
+
         run_home = run_home or home
 
         (out, err), proc = self.run_and_wait(command, run_home, 
@@ -896,7 +922,7 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        start_delay = 1.0
+        delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
@@ -927,36 +953,68 @@ class LinuxNode(ResourceManager):
             return True
 
         out = err = ""
-        try:
-            (out, err), proc = self.execute("echo 'ALIVE'",
-                    retry = 5, 
-                    with_lock = True)
-        except:
-            trace = traceback.format_exc()
-            msg = "Unresponsive host  %s " % err
-            self.error(msg, out, trace)
-            return False
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        retrydelay = 1.0
+        for i in xrange(10):
+            try:
+                (out, err), proc = self.execute("echo 'ALIVE'",
+                        retry = 5,
+                        blocking = True,
+                        with_lock = True)
+        
+                if out.find("ALIVE") > -1:
+                    return True
+            except:
+                trace = traceback.format_exc()
+                msg = "Unresponsive host. Error reaching host: %s " % trace
+                self.error(msg, out, err)
+                return False
 
-        if out.strip() == "ALIVE":
+            time.sleep(min(30.0, retrydelay))
+            retrydelay *= 1.5
+
+        if out.find("ALIVE") > -1:
             return True
         else:
-            msg = "Unresponsive host "
+            msg = "Unresponsive host. Wrong answer. "
             self.error(msg, out, err)
             return False
 
     def find_home(self):
         """ Retrieves host home directory
         """
-        (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
-                    with_lock = True)
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        retrydelay = 1.0
+        for i in xrange(10):
+            try:
+                (out, err), proc = self.execute("echo ${HOME}",
+                        retry = 5,
+                        blocking = True,
+                        with_lock = True)
+        
+                if out.strip() != "":
+                    self._home_dir =  out.strip()
+                    break
+            except:
+                trace = traceback.format_exc()
+                msg = "Impossible to retrieve HOME directory" % trace
+                self.error(msg, out, err)
+                return False
 
-        if proc.poll():
-            msg = "Imposible to retrieve HOME directory"
+            time.sleep(min(30.0, retrydelay))
+            retrydelay *= 1.5
+
+        if not self._home_dir:
+            msg = "Impossible to retrieve HOME directory"
             self.error(msg, out, err)
             raise RuntimeError, msg
 
-        self._home_dir =  out.strip()
-
     def filter_existing_files(self, src, dst):
         """ Removes files that already exist in the Linux host from src list
         """