X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=b50d7fad89c06e8c99945db12c41fb575834532e;hb=4b0e922489532434f0968647886021542b77cece;hp=82ccaca13aaa573a54c9389c6aefb2912b85934e;hpb=8b192f1c0cc55e5bc847ce40436ab55ddce0585c;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 82ccaca1..b50d7fad 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -142,6 +142,9 @@ class LinuxNode(ResourceManager): """ _rtype = "LinuxNode" + _help = "Controls Linux host machines ( either localhost or a host " \ + "that can be accessed using a SSH key)" + _backend_type = "linux" @classmethod def _register_attributes(cls): @@ -196,8 +199,14 @@ class LinuxNode(ResourceManager): # home directory at Linux host self._home_dir = "" - # lock to avoid concurrency issues on methods used by applications - self._lock = threading.Lock() + # lock to prevent concurrent applications on the same node, + # to execute commands at the same time. There are potential + # concurrency issues when using SSH to a same host from + # multiple threads. There are also possible operational + # issues, e.g. an application querying the existence + # of a file or folder prior to its creation, and another + # application creating the same file or folder in between. + self._node_lock = threading.Lock() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, @@ -256,12 +265,7 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue", with_lock = True) - - if err and proc.poll(): - msg = "Error detecting OS " - self.error(msg, out, err) - raise RuntimeError, "%s - %s - %s" %( msg, out, err ) + out = self.get_os() if out.find("Fedora release 8") == 0: self._os = OSType.FEDORA_8 @@ -280,6 +284,32 @@ class LinuxNode(ResourceManager): return self._os + def get_os(self): + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + out = "" + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("cat /etc/issue", + retry = 5, + with_lock = True, + blocking = True) + + if out.strip() != "": + return out + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + return False + + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + @property def use_deb(self): return self.os in [OSType.DEBIAN, OSType.UBUNTU] @@ -330,7 +360,7 @@ class LinuxNode(ResourceManager): self.discover() self.provision() except: - self._state = ResourceState.FAILED + self.fail() raise # Node needs to wait until all associated interfaces are @@ -435,7 +465,7 @@ class LinuxNode(ResourceManager): env = env) else: if with_lock: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), @@ -503,7 +533,7 @@ class LinuxNode(ResourceManager): sudo = sudo, user = user) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, @@ -528,7 +558,7 @@ class LinuxNode(ResourceManager): if self.localhost: pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: - with self._lock: + with self._node_lock: pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), @@ -545,7 +575,7 @@ class LinuxNode(ResourceManager): if self.localhost: status = execfuncs.lstatus(pid, ppid) else: - with self._lock: + with self._node_lock: status = sshfuncs.rstatus( pid, ppid, host = self.get("hostname"), @@ -567,7 +597,7 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rkill( pid, ppid, host = self.get("hostname"), @@ -587,7 +617,7 @@ class LinuxNode(ResourceManager): recursive = True, strict_host_checking = False) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rcopy( src, dst, port = self.get("port"), @@ -643,12 +673,7 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home, run_home = None): - """ Install packages in the Linux host. - - 'home' is the directory to upload the package installation script. - 'run_home' is the directory from where to execute the script. - """ + def install_packages_command(self, packages): command = "" if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) @@ -659,6 +684,16 @@ class LinuxNode(ResourceManager): self.error(msg, self.os) raise RuntimeError, msg + return command + + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ + command = self.install_packages_command(packages) + run_home = run_home or home (out, err), proc = self.run_and_wait(command, run_home, @@ -896,7 +931,7 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - start_delay = 1.0 + delay = 1.0 while True: status = self.status(pid, ppid) @@ -927,36 +962,68 @@ class LinuxNode(ResourceManager): return True out = err = "" - try: - (out, err), proc = self.execute("echo 'ALIVE'", - retry = 5, - with_lock = True) - except: - trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo 'ALIVE'", + retry = 5, + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace + self.error(msg, out, err) + return False - if out.strip() == "ALIVE": + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + if out.find("ALIVE") > -1: return True else: - msg = "Unresponsive host " + msg = "Unresponsive host. Wrong answer. " self.error(msg, out, err) return False def find_home(self): """ Retrieves host home directory """ - (out, err), proc = self.execute("echo ${HOME}", retry = 5, - with_lock = True) + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo ${HOME}", + retry = 5, + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + break + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory" % trace + self.error(msg, out, err) + return False - if proc.poll(): - msg = "Imposible to retrieve HOME directory" + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + if not self._home_dir: + msg = "Impossible to retrieve HOME directory" self.error(msg, out, err) raise RuntimeError, msg - self._home_dir = out.strip() - def filter_existing_files(self, src, dst): """ Removes files that already exist in the Linux host from src list """