X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=a96267f1e5192432420eab79ddb9a53c39d61e0a;hb=25a46fdfb9753474f35fa2dc677182f8500c52a9;hp=85e22863cf0d02fca95c0b5f7e9d1de9e74c5d03;hpb=98a98f7058ec7180303ea88e8ccc212ddf740ac6;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 85e22863..a96267f1 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -1,10 +1,29 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + from nepi.execution.attribute import Attribute, Flags from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux import rpmfuncs, debfuncs -from nepi.util import sshfuncs, execfuncs +from nepi.util import sshfuncs, execfuncs +from nepi.util.sshfuncs import ProcStatus import collections -import logging import os import random import re @@ -19,9 +38,110 @@ import threading reschedule_delay = "0.5s" +class ExitCode: + """ + Error codes that the rexitcode function can return if unable to + check the exit code of a spawned process + """ + FILENOTFOUND = -1 + CORRUPTFILE = -2 + ERROR = -3 + OK = 0 + +class OSType: + """ + Supported flavors of Linux OS + """ + FEDORA_12 = "f12" + FEDORA_14 = "f14" + FEDORA = "fedora" + UBUNTU = "ubuntu" + DEBIAN = "debian" @clsinit class LinuxNode(ResourceManager): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + .. note:: + + There are different ways in which commands can be executed using the + LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run', + 'run_and_wait'). + + Brief explanation: + + * 'execute' (blocking mode) : + + HOW IT WORKS: 'execute', forks a process and run the + command, synchronously, attached to the terminal, in + foreground. + The execute method will block until the command returns + the result on 'out', 'err' (so until it finishes executing). + + USAGE: short-lived commands that must be executed attached + to a terminal and in foreground, for which it IS necessary + to block until the command has finished (e.g. if you want + to run 'ls' or 'cat'). + + * 'execute' (NON blocking mode - blocking = False) : + + HOW IT WORKS: Same as before, except that execute method + will return immediately (even if command still running). + + USAGE: long-lived commands that must be executed attached + to a terminal and in foreground, but for which it is not + necessary to block until the command has finished. (e.g. + start an application using X11 forwarding) + + * 'run' : + + HOW IT WORKS: Connects to the host ( using SSH if remote) + and launches the command in background, detached from any + terminal (daemonized), and returns. The command continues to + run remotely, but since it is detached from the terminal, + its pipes (stdin, stdout, stderr) can't be redirected to the + console (as normal non detached processes would), and so they + are explicitly redirected to files. The pidfile is created as + part of the process of launching the command. The pidfile + holds the pid and ppid of the process forked in background, + so later on it is possible to check whether the command is still + running. + + USAGE: long-lived commands that can run detached in background, + for which it is NOT necessary to block (wait) until the command + has finished. (e.g. start an application that is not using X11 + forwarding. It can run detached and remotely in background) + + * 'run_and_wait' : + + HOW IT WORKS: Similar to 'run' except that it 'blocks' until + the command has finished execution. It also checks whether + errors occurred during runtime by reading the exitcode file, + which contains the exit code of the command that was run + (checking stderr only is not always reliable since many + commands throw debugging info to stderr and the only way to + automatically know whether an error really happened is to + check the process exit code). + + Another difference with respect to 'run', is that instead + of directly executing the command as a bash command line, + it uploads the command to a bash script and runs the script. + This allows to use the bash script to debug errors, since + it remains at the remote host and can be run manually to + reproduce the error. + + USAGE: medium-lived commands that can run detached in + background, for which it IS necessary to block (wait) until + the command has finished. (e.g. Package installation, + source compilation, file download, etc) + + """ _rtype = "LinuxNode" @classmethod @@ -72,8 +192,6 @@ class LinuxNode(ResourceManager): # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() - - self._logger = logging.getLogger("LinuxNode") def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, @@ -110,13 +228,13 @@ class LinuxNode(ResourceManager): raise RuntimeError, "%s - %s - %s" %( msg, out, err ) if out.find("Fedora release 12") == 0: - self._os = "f12" + self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: - self._os = "f14" + self._os = OSType.FEDORA_14 elif out.find("Debian") == 0: - self._os = "debian" + self._os = OSType.DEBIAN elif out.find("Ubuntu") ==0: - self._os = "ubuntu" + self._os = OSType.UBUNTU else: msg = "Unsupported OS" self.error(msg, out) @@ -128,7 +246,7 @@ class LinuxNode(ResourceManager): def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] - def provision(self, filters = None): + def provision(self): if not self.is_alive(): self._state = ResourceState.FAILED msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") @@ -148,8 +266,8 @@ class LinuxNode(ResourceManager): def deploy(self): if self.state == ResourceState.NEW: try: - self.discover() - self.provision() + self.discover() + self.provision() except: self._state = ResourceState.FAILED raise @@ -233,7 +351,6 @@ class LinuxNode(ResourceManager): if not self.localhost: # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - result = self.copy(src, dst) # clean up temp file @@ -248,46 +365,46 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" - if self.os in ["f12", "f14"]: - cmd = rpmfuncs.install_packages_command(self.os, packages) - elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.install_packages_command(self.os, packages) + def install_packages(self, packages, home): + command = "" + if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + command = rpmfuncs.install_packages_command(self.os, packages) + elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " self.error(msg, self.os) raise RuntimeError, msg out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "instpkg_pid", - stdout = "instpkg_out", - stderr = "instpkg_err", + (out, err), proc = self.run_and_wait(command, home, + shfile = "instpkg.sh", + pidfile = "instpkg_pidfile", + ecodefile = "instpkg_exitcode", + stdout = "instpkg_stdout", + stderr = "instpkg_stderr", raise_on_error = True) return (out, err), proc - def remove_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" - if self.os in ["f12", "f14"]: - cmd = rpmfuncs.remove_packages_command(self.os, packages) - elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.remove_packages_command(self.os, packages) + def remove_packages(self, packages, home): + command = "" + if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + command = rpmfuncs.remove_packages_command(self.os, packages) + elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " self.error(msg) raise RuntimeError, msg out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "rmpkg_pid", - stdout = "rmpkg_out", - stderr = "rmpkg_err", + (out, err), proc = self.run_and_wait(command, home, + shfile = "rmpkg.sh", + pidfile = "rmpkg_pidfile", + ecodefile = "rmpkg_exitcode", + stdout = "rmpkg_stdout", + stderr = "rmpkg_stderr", raise_on_error = True) return (out, err), proc @@ -300,22 +417,29 @@ class LinuxNode(ResourceManager): def rmdir(self, path): return self.execute("rm -rf %s" % path, with_lock = True) - - def run_and_wait(self, command, - home = ".", - pidfile = "pid", + + def run_and_wait(self, command, home, + shfile = "cmd.sh", + env = None, + pidfile = "pidfile", + ecodefile = "exitcode", stdin = None, - stdout = 'stdout', - stderr = 'stderr', + stdout = "stdout", + stderr = "stderr", sudo = False, tty = False, raise_on_error = False): - """ runs a command in background on the remote host, but waits - until the command finishes execution. - This is more robust than doing a simple synchronized 'execute', - since in the remote host the command can continue to run detached - even if network disconnections occur """ + Uploads the 'command' to a bash script in the host. + Then runs the script detached in background in the host, and + busy-waites until the script finishes executing. + """ + self.upload_command(command, home, + shfile = shfile, + ecodefile = ecodefile, + env = env) + + command = "bash ./%s" % shfile # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, @@ -326,7 +450,7 @@ class LinuxNode(ResourceManager): tty = tty) # check no errors occurred - if proc.poll() and err: + if proc.poll(): msg = " Failed to run command '%s' " % command self.error(msg, out, err) if raise_on_error: @@ -340,11 +464,14 @@ class LinuxNode(ResourceManager): # wait until command finishes to execute self.wait_run(pid, ppid) - - # check if execution errors occurred - (out, err), proc = self.check_output(home, stderr) - - if err or out: + + (out, err), proc = self.check_errors(home, + ecodefile = ecodefile, + stdout = stdout, + stderr= stderr) + + # Out is what was written in the stderr file + if err: msg = " Failed to run command '%s' " % command self.error(msg, out, err) @@ -352,21 +479,117 @@ class LinuxNode(ResourceManager): raise RuntimeError, msg return (out, err), proc + + def exitcode(self, home, ecodefile = "exitcode"): + """ + Get the exit code of an application. + Returns an integer value with the exit code + """ + (out, err), proc = self.check_output(home, ecodefile) + + # Succeeded to open file, return exit code in the file + if proc.wait() == 0: + try: + return int(out.strip()) + except: + # Error in the content of the file! + return ExitCode.CORRUPTFILE + + # No such file or directory + if proc.returncode == 1: + return ExitCode.FILENOTFOUND + + # Other error from 'cat' + return ExitCode.ERROR + + def upload_command(self, command, home, + shfile = "cmd.sh", + ecodefile = "exitcode", + env = None): + """ Saves the command as a bash script file in the remote host, and + forces to save the exit code of the command execution to the ecodefile + """ + + if not (command.strip().endswith(";") or command.strip().endswith("&")): + command += ";" + + # The exit code of the command will be stored in ecodefile + command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % { + 'command': command, + 'ecodefile': ecodefile, + } + + # Export environment + environ = self.format_environment(env) + + # Add environ to command + command = environ + command + + dst = os.path.join(home, shfile) + return self.upload(command, dst, text = True) + + def format_environment(self, env, inline = False): + """Format environmental variables for command to be executed either + as an inline command + (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or + as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) + """ + if not env: return "" + + # Remove extra white spaces + env = re.sub(r'\s+', ' ', env.strip()) + + sep = ";" if inline else "\n" + return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep + + def check_errors(self, home, + ecodefile = "exitcode", + stdout = "stdout", + stderr = "stderr"): + """ + Checks whether errors occurred while running a command. + It first checks the exit code for the command, and only if the + exit code is an error one it returns the error output. + + """ + proc = None + err = "" + # retrive standard output from the file + (out, oerr), oproc = self.check_output(home, stdout) + + # get exit code saved in the 'exitcode' file + ecode = self.exitcode(home, ecodefile) + + if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: + err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile) + elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: + # The process returned an error code or didn't exist. + # Check standard error. + (err, eerr), proc = self.check_output(home, stderr) + + # If the stderr file was not found, assume nothing bad happened, + # and just ignore the error. + # (cat returns 1 for error "No such file or directory") + if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: + err = "" + + return (out, err), proc - def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False): + def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 - for i in xrange(5): - pidtuple = self.checkpid(home = home, pidfile = pidfile) + + for i in xrange(4): + pidtuple = self.getpid(home = home, pidfile = pidfile) if pidtuple: pid, ppid = pidtuple break else: time.sleep(delay) - delay = min(30,delay*1.2) + delay = delay * 1.5 else: msg = " Failed to get pid for pidfile %s/%s " % ( home, pidfile ) @@ -379,30 +602,26 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - delay = 1.0 - first = True - bustspin = 0 + start_delay = 1.0 while True: status = self.status(pid, ppid) - if status is sshfuncs.FINISHED: + if status is ProcStatus.FINISHED: break - elif status is not sshfuncs.RUNNING: - bustspin += 1 - time.sleep(delay*(5.5+random.random())) - if bustspin > 12: + elif status is not ProcStatus.RUNNING: + delay = delay * 1.5 + time.sleep(delay) + # If it takes more than 20 seconds to start, then + # asume something went wrong + if delay > 20: break else: - if first: - first = False - - time.sleep(delay*(0.5+random.random())) - delay = min(30,delay*1.2) - bustspin = 0 + # The app is running, just wait... + time.sleep(0.5) def check_output(self, home, filename): - """ checks file content """ + """ Retrives content of file """ (out, err), proc = self.execute("cat %s" % os.path.join(home, filename), retry = 1, with_lock = True) return (out, err), proc @@ -432,7 +651,7 @@ class LinuxNode(ResourceManager): def copy(self, src, dst): if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, + (out, err), proc = execfuncs.lcopy(source, dest, recursive = True, strict_host_checking = False) else: @@ -459,6 +678,7 @@ class LinuxNode(ResourceManager): connect_timeout = 30, strict_host_checking = False, persistent = True, + blocking = True, with_lock = False ): """ Notice that this invocation will block until the @@ -492,6 +712,7 @@ class LinuxNode(ResourceManager): err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, + blocking = blocking, strict_host_checking = strict_host_checking ) else: @@ -512,21 +733,22 @@ class LinuxNode(ResourceManager): retry = retry, err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, - persistent = persistent + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking ) return (out, err), proc - def run(self, command, - home = None, + def run(self, command, home, create_home = False, - pidfile = "pid", + pidfile = 'pidfile', stdin = None, stdout = 'stdout', stderr = 'stderr', sudo = False, tty = False): - + self.debug("Running command '%s'" % command) if self.localhost: @@ -539,10 +761,8 @@ class LinuxNode(ResourceManager): sudo = sudo, user = user) else: - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues with self._lock: - (out,err), proc = sshfuncs.rspawn( + (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, home = home, @@ -562,12 +782,12 @@ class LinuxNode(ResourceManager): return (out, err), proc - def checkpid(self, home = ".", pidfile = "pid"): + def getpid(self, home, pidfile = "pidfile"): if self.localhost: - pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile)) + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: with self._lock: - pidtuple = sshfuncs.rcheckpid( + pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), user = self.get("username"), @@ -578,7 +798,7 @@ class LinuxNode(ResourceManager): ) return pidtuple - + def status(self, pid, ppid): if self.localhost: status = execfuncs.lstatus(pid, ppid) @@ -601,7 +821,7 @@ class LinuxNode(ResourceManager): proc = None status = self.status(pid, ppid) - if status == sshfuncs.RUNNING: + if status == sshfuncs.ProcStatus.RUNNING: if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: @@ -616,18 +836,6 @@ class LinuxNode(ResourceManager): identity = self.get("identity"), server_key = self.get("serverKey") ) - return (out, err), proc - def check_bad_host(self, out, err): - badre = re.compile(r'(?:' - r'|Error: disk I/O error' - r')', - re.I) - return badre.search(out) or badre.search(err) - - def blacklist(self): - # TODO!!!! - self.warn(" Blacklisting malfunctioning node ") - #import util - #util.appendBlacklist(self.hostname) + return (out, err), proc