X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=5080a6c4e63453ce3160a692b41b376b9b61faa8;hb=87f44a7c2853afb7021276dd3700858cff950703;hp=0d2597fdb6d22e6bb693d250485f2e5d0013dbc0;hpb=54d2a201dca3af3dabf18601d4909bf506960627;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 0d2597fd..5080a6c4 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -31,13 +31,11 @@ import re import tempfile import time import threading +import traceback -# TODO: Verify files and dirs exists already -# TODO: Blacklist nodes! # TODO: Unify delays!! # TODO: Validate outcome of uploads!! - class ExitCode: """ Error codes that the rexitcode function can return if unable to @@ -165,8 +163,12 @@ class LinuxNode(ResourceManager): server_key = Attribute("serverKey", "Server public key", flags = Flags.ExecReadOnly) - clean_home = Attribute("cleanHome", "Remove all files and directories " + \ - " from home folder before starting experiment", + clean_home = Attribute("cleanHome", "Remove all nepi files and directories " + " from node home folder before starting experiment", + flags = Flags.ExecReadOnly) + + clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " + " from a previous same experiment, before the new experiment starts", flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", @@ -184,12 +186,15 @@ class LinuxNode(ResourceManager): cls._register_attribute(identity) cls._register_attribute(server_key) cls._register_attribute(clean_home) + cls._register_attribute(clean_experiment) cls._register_attribute(clean_processes) cls._register_attribute(tear_down) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None + # home directory at Linux host + self._home_dir = "" # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() @@ -199,17 +204,47 @@ class LinuxNode(ResourceManager): self.get("hostname"), msg) @property - def home(self): - return self.get("home") or "" + def home_dir(self): + home = self.get("home") or "" + if not home.startswith("/"): + home = os.path.join(self._home_dir, home) + return home + + @property + def usr_dir(self): + return os.path.join(self.home_dir, "nepi-usr") + + @property + def lib_dir(self): + return os.path.join(self.usr_dir, "lib") + + @property + def bin_dir(self): + return os.path.join(self.usr_dir, "bin") + + @property + def src_dir(self): + return os.path.join(self.usr_dir, "src") + + @property + def share_dir(self): + return os.path.join(self.usr_dir, "share") + + @property + def exp_dir(self): + return os.path.join(self.home_dir, "nepi-exp") @property def exp_home(self): - return os.path.join(self.home, self.ec.exp_id) + return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): - node_home = "node-%d" % self.guid - return os.path.join(self.exp_home, node_home) + return os.path.join(self.exp_home, "node-%d" % self.guid) + + @property + def run_home(self): + return os.path.join(self.node_home, self.ec.run_id) @property def os(self): @@ -259,18 +294,32 @@ class LinuxNode(ResourceManager): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] def provision(self): + # check if host is alive if not self.is_alive(): - self._state = ResourceState.FAILED + self.fail() + msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg + self.find_home() + if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() - + + if self.get("cleanExperiment"): + self.clean_experiment() + + # Create shared directory structure + self.mkdir(self.lib_dir) + self.mkdir(self.bin_dir) + self.mkdir(self.src_dir) + self.mkdir(self.share_dir) + + # Create experiment node home directory self.mkdir(self.node_home) super(LinuxNode, self).provision() @@ -300,6 +349,8 @@ class LinuxNode(ResourceManager): if tear_down: self.execute(tear_down) + self.clean_processes() + super(LinuxNode, self).release() def valid_connection(self, guid): @@ -327,21 +378,220 @@ class LinuxNode(ResourceManager): (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def clean_home(self): + """ Cleans all NEPI related folders in the Linux host + """ self.info("Cleaning up home") - cmd = ( - # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + - "find . -maxdepth 1 -name 'nepi-*' " + - " -execdir rm -rf {} + " - ) + cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % ( + self.home_dir ) + + return self.execute(cmd, with_lock = True) + + def clean_experiment(self): + """ Cleans all experiment related files in the Linux host. + It preserves NEPI files and folders that have a multi experiment + scope. + """ + self.info("Cleaning up experiment files") + + cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % ( + self.exp_dir, + self.ec.exp_id ) - if self.home: - cmd = "cd %s ; " % self.home + cmd + return self.execute(cmd, with_lock = True) + + def execute(self, command, + sudo = False, + stdin = None, + env = None, + tty = False, + forward_x11 = False, + timeout = None, + retry = 3, + err_on_timeout = True, + connect_timeout = 30, + strict_host_checking = False, + persistent = True, + blocking = True, + with_lock = False + ): + """ Notice that this invocation will block until the + execution finishes. If this is not the desired behavior, + use 'run' instead.""" + if self.localhost: + (out, err), proc = execfuncs.lexec(command, + user = user, + sudo = sudo, + stdin = stdin, + env = env) + else: + if with_lock: + with self._lock: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + else: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + + return (out, err), proc + + def run(self, command, home, + create_home = False, + pidfile = 'pidfile', + stdin = None, + stdout = 'stdout', + stderr = 'stderr', + sudo = False, + tty = False): + + self.debug("Running command '%s'" % command) + + if self.localhost: + (out, err), proc = execfuncs.lspawn(command, pidfile, + stdout = stdout, + stderr = stderr, + stdin = stdin, + home = home, + create_home = create_home, + sudo = sudo, + user = user) + else: + with self._lock: + (out, err), proc = sshfuncs.rspawn( + command, + pidfile = pidfile, + home = home, + create_home = create_home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + tty = tty + ) + + return (out, err), proc + + def getpid(self, home, pidfile = "pidfile"): + if self.localhost: + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) + else: + with self._lock: + pidtuple = sshfuncs.rgetpid( + os.path.join(home, pidfile), + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return pidtuple + + def status(self, pid, ppid): + if self.localhost: + status = execfuncs.lstatus(pid, ppid) + else: + with self._lock: + status = sshfuncs.rstatus( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return status + + def kill(self, pid, ppid, sudo = False): out = err = "" - (out, err), proc = self.execute(cmd, with_lock = True) + proc = None + status = self.status(pid, ppid) + + if status == sshfuncs.ProcStatus.RUNNING: + if self.localhost: + (out, err), proc = execfuncs.lkill(pid, ppid, sudo) + else: + with self._lock: + (out, err), proc = sshfuncs.rkill( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return (out, err), proc - def upload(self, src, dst, text = False): + def copy(self, src, dst): + if self.localhost: + (out, err), proc = execfuncs.lcopy(source, dest, + recursive = True, + strict_host_checking = False) + else: + with self._lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) + + return (out, err), proc + + + def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination src content to copy. Can be a local file, directory or a list of files @@ -360,9 +610,17 @@ class LinuxNode(ResourceManager): f.close() src = f.name + # If dst files should not be overwritten, check that the files do not + # exits already + if overwrite == False: + src = self.filter_existing_files(src, dst) + if not src: + return ("", ""), None + if not self.localhost: # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) + result = self.copy(src, dst) # clean up temp file @@ -377,7 +635,12 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home): + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ command = "" if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) @@ -388,19 +651,25 @@ class LinuxNode(ResourceManager): self.error(msg, self.os) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "instpkg.sh", + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "instpkg.sh"), pidfile = "instpkg_pidfile", ecodefile = "instpkg_exitcode", stdout = "instpkg_stdout", stderr = "instpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc - def remove_packages(self, packages, home): - command = "" + def remove_packages(self, packages, home, run_home = None): + """ Uninstall packages from the Linux host. + + 'home' is the directory to upload the package un-installation script. + 'run_home' is the directory from where to execute the script. + """ if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) elif self.use_deb: @@ -410,13 +679,15 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "rmpkg.sh", + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "rmpkg.sh"), pidfile = "rmpkg_pidfile", ecodefile = "rmpkg_exitcode", stdout = "rmpkg_stdout", stderr = "rmpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc @@ -433,6 +704,7 @@ class LinuxNode(ResourceManager): def run_and_wait(self, command, home, shfile = "cmd.sh", env = None, + overwrite = True, pidfile = "pidfile", ecodefile = "exitcode", stdin = None, @@ -446,12 +718,17 @@ class LinuxNode(ResourceManager): Then runs the script detached in background in the host, and busy-waites until the script finishes executing. """ - self.upload_command(command, home, + + if not shfile.startswith("/"): + shfile = os.path.join(home, shfile) + + self.upload_command(command, shfile = shfile, ecodefile = ecodefile, - env = env) + env = env, + overwrite = overwrite) - command = "bash ./%s" % shfile + command = "bash %s" % shfile # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, @@ -514,9 +791,10 @@ class LinuxNode(ResourceManager): # Other error from 'cat' return ExitCode.ERROR - def upload_command(self, command, home, + def upload_command(self, command, shfile = "cmd.sh", ecodefile = "exitcode", + overwrite = True, env = None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile @@ -537,8 +815,7 @@ class LinuxNode(ResourceManager): # Add environ to command command = environ + command - dst = os.path.join(home, shfile) - return self.upload(command, dst, text = True) + return self.upload(command, shfile, text = True, overwrite = overwrite) def format_environment(self, env, inline = False): """Format environmental variables for command to be executed either @@ -639,215 +916,63 @@ class LinuxNode(ResourceManager): return (out, err), proc def is_alive(self): + """ Checks if host is responsive + """ if self.localhost: return True out = err = "" try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, + (out, err), proc = self.execute("echo 'ALIVE'", + retry = 5, with_lock = True) except: - import traceback trace = traceback.format_exc() msg = "Unresponsive host %s " % err self.error(msg, out, trace) return False - if out.strip().startswith('ALIVE'): + if out.strip() == "ALIVE": return True else: msg = "Unresponsive host " self.error(msg, out, err) return False - def copy(self, src, dst): - if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) - else: - with self._lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) - - return (out, err), proc - - def execute(self, command, - sudo = False, - stdin = None, - env = None, - tty = False, - forward_x11 = False, - timeout = None, - retry = 3, - err_on_timeout = True, - connect_timeout = 30, - strict_host_checking = False, - persistent = True, - blocking = True, - with_lock = False - ): - """ Notice that this invocation will block until the - execution finishes. If this is not the desired behavior, - use 'run' instead.""" + def find_home(self): + """ Retrieves host home directory + """ + (out, err), proc = self.execute("echo ${HOME}", retry = 5, + with_lock = True) - if self.localhost: - (out, err), proc = execfuncs.lexec(command, - user = user, - sudo = sudo, - stdin = stdin, - env = env) - else: - if with_lock: - with self._lock: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) - else: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) + if proc.poll(): + msg = "Imposible to retrieve HOME directory" + self.error(msg, out, err) + raise RuntimeError, msg - return (out, err), proc + self._home_dir = out.strip() - def run(self, command, home, - create_home = False, - pidfile = 'pidfile', - stdin = None, - stdout = 'stdout', - stderr = 'stderr', - sudo = False, - tty = False): - - self.debug("Running command '%s'" % command) - - if self.localhost: - (out, err), proc = execfuncs.lspawn(command, pidfile, - stdout = stdout, - stderr = stderr, - stdin = stdin, - home = home, - create_home = create_home, - sudo = sudo, - user = user) - else: - with self._lock: - (out, err), proc = sshfuncs.rspawn( - command, - pidfile = pidfile, - home = home, - create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey"), - tty = tty - ) + def filter_existing_files(self, src, dst): + """ Removes files that already exist in the Linux host from src list + """ + # construct a dictionary with { dst: src } + dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), + src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) - return (out, err), proc + command = [] + for d in dests.keys(): + command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} ) - def getpid(self, home, pidfile = "pidfile"): - if self.localhost: - pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) - else: - with self._lock: - pidtuple = sshfuncs.rgetpid( - os.path.join(home, pidfile), - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return pidtuple + command = ";".join(command) - def status(self, pid, ppid): - if self.localhost: - status = execfuncs.lstatus(pid, ppid) - else: - with self._lock: - status = sshfuncs.rstatus( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return status + (out, err), proc = self.execute(command, retry = 1, with_lock = True) - def kill(self, pid, ppid, sudo = False): - out = err = "" - proc = None - status = self.status(pid, ppid) + for d in dests.keys(): + if out.find(d) > -1: + del dests[d] - if status == sshfuncs.ProcStatus.RUNNING: - if self.localhost: - (out, err), proc = execfuncs.lkill(pid, ppid, sudo) - else: - with self._lock: - (out, err), proc = sshfuncs.rkill( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + if not dests: + return "" - return (out, err), proc + return " ".join(dests.values())