From c957791ea6cdbd032f28cb62d4a39013dbfb7a4a Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sat, 27 Jul 2013 23:09:51 -0700 Subject: [PATCH] Fixing issues in LinuxNode whith high concurrency --- src/nepi/resources/linux/application.py | 2 +- src/nepi/resources/linux/node.py | 118 ++++++++++++++++++------ 2 files changed, 89 insertions(+), 31 deletions(-) diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 380470c9..3519138b 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -446,7 +446,7 @@ class LinuxApplication(ResourceManager): depends = self.get("depends") if depends: self.info("Installing dependencies %s" % depends) - self.node.install_packages(depends, self.app_home, self.run_home) + return self.node.install_packages_command(depends) def build(self): build = self.get("build") diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 82ccaca1..9188022f 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -256,12 +256,7 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue", with_lock = True) - - if err and proc.poll(): - msg = "Error detecting OS " - self.error(msg, out, err) - raise RuntimeError, "%s - %s - %s" %( msg, out, err ) + out = self.get_os() if out.find("Fedora release 8") == 0: self._os = OSType.FEDORA_8 @@ -280,6 +275,32 @@ class LinuxNode(ResourceManager): return self._os + def get_os(self): + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + out = "" + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("cat /etc/issue", + retry = 5, + with_lock = True, + blocking = True) + + if out.strip() != "": + return out + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + return False + + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + @property def use_deb(self): return self.os in [OSType.DEBIAN, OSType.UBUNTU] @@ -643,12 +664,7 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home, run_home = None): - """ Install packages in the Linux host. - - 'home' is the directory to upload the package installation script. - 'run_home' is the directory from where to execute the script. - """ + def install_packages_command(self, packages): command = "" if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) @@ -659,6 +675,16 @@ class LinuxNode(ResourceManager): self.error(msg, self.os) raise RuntimeError, msg + return command + + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ + command = self.install_packages_command(packages) + run_home = run_home or home (out, err), proc = self.run_and_wait(command, run_home, @@ -896,7 +922,7 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - start_delay = 1.0 + delay = 1.0 while True: status = self.status(pid, ppid) @@ -927,36 +953,68 @@ class LinuxNode(ResourceManager): return True out = err = "" - try: - (out, err), proc = self.execute("echo 'ALIVE'", - retry = 5, - with_lock = True) - except: - trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo 'ALIVE'", + retry = 5, + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace + self.error(msg, out, err) + return False - if out.strip() == "ALIVE": + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + if out.find("ALIVE") > -1: return True else: - msg = "Unresponsive host " + msg = "Unresponsive host. Wrong answer. " self.error(msg, out, err) return False def find_home(self): """ Retrieves host home directory """ - (out, err), proc = self.execute("echo ${HOME}", retry = 5, - with_lock = True) + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo ${HOME}", + retry = 5, + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + break + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory" % trace + self.error(msg, out, err) + return False - if proc.poll(): - msg = "Imposible to retrieve HOME directory" + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + if not self._home_dir: + msg = "Impossible to retrieve HOME directory" self.error(msg, out, err) raise RuntimeError, msg - self._home_dir = out.strip() - def filter_existing_files(self, src, dst): """ Removes files that already exist in the Linux host from src list """ -- 2.43.0