X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=0bef4ada69263b17e733e6f3d09d10c85318eb1c;hb=7fed767f9f18ee81807950771145969cbb27b8a7;hp=60fad69ce2ff94c6f41e63dce6b227a8c2fd83d2;hpb=abea23bb8272d556bb719a8a71d1c8649a4a9b9c;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 60fad69c..0bef4ada 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay + ResourceState from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -50,12 +50,12 @@ class OSType: """ Supported flavors of Linux OS """ - FEDORA_8 = "f8" - FEDORA_12 = "f12" - FEDORA_14 = "f14" - FEDORA = "fedora" - UBUNTU = "ubuntu" - DEBIAN = "debian" + DEBIAN = 1 + UBUNTU = 1 << 1 + FEDORA = 1 << 2 + FEDORA_8 = 1 << 3 | FEDORA + FEDORA_12 = 1 << 4 | FEDORA + FEDORA_14 = 1 << 5 | FEDORA @clsinit_copy class LinuxNode(ResourceManager): @@ -141,58 +141,62 @@ class LinuxNode(ResourceManager): source compilation, file download, etc) """ - _rtype = "LinuxNode" + _rtype = "linux::Node" _help = "Controls Linux host machines ( either localhost or a host " \ "that can be accessed using a SSH key)" - _backend_type = "linux" + _platform = "linux" @classmethod def _register_attributes(cls): hostname = Attribute("hostname", "Hostname of the machine", - flags = Flags.ExecReadOnly) + flags = Flags.Design) username = Attribute("username", "Local account username", flags = Flags.Credential) - port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly) + port = Attribute("port", "SSH port", flags = Flags.Design) home = Attribute("home", "Experiment home directory to store all experiment related files", - flags = Flags.ExecReadOnly) + flags = Flags.Design) identity = Attribute("identity", "SSH identity file", flags = Flags.Credential) server_key = Attribute("serverKey", "Server public key", - flags = Flags.ExecReadOnly) + flags = Flags.Design) clean_home = Attribute("cleanHome", "Remove all nepi files and directories " " from node home folder before starting experiment", type = Types.Bool, default = False, - flags = Flags.ExecReadOnly) + flags = Flags.Design) clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " " from a previous same experiment, before the new experiment starts", type = Types.Bool, default = False, - flags = Flags.ExecReadOnly) + flags = Flags.Design) clean_processes = Attribute("cleanProcesses", "Kill all running processes before starting experiment", type = Types.Bool, default = False, - flags = Flags.ExecReadOnly) + flags = Flags.Design) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ "releasing the resource", - flags = Flags.ExecReadOnly) + flags = Flags.Design) gateway_user = Attribute("gatewayUser", "Gateway account username", - flags = Flags.ExecReadOnly) + flags = Flags.Design) gateway = Attribute("gateway", "Hostname of the gateway machine", - flags = Flags.ExecReadOnly) + flags = Flags.Design) + + ip = Attribute("ip", "Linux host public IP address. " + "Must not be modified by the user unless hostname is 'localhost'", + flags = Flags.Design) cls._register_attribute(hostname) cls._register_attribute(username) @@ -206,6 +210,7 @@ class LinuxNode(ResourceManager): cls._register_attribute(tear_down) cls._register_attribute(gateway_user) cls._register_attribute(gateway) + cls._register_attribute(ip) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) @@ -213,9 +218,6 @@ class LinuxNode(ResourceManager): # home directory at Linux host self._home_dir = "" - # list of pids before running the app if the user is root - self._pids = [] - # lock to prevent concurrent applications on the same node, # to execute commands at the same time. There are potential # concurrency issues when using SSH to a same host from @@ -236,9 +238,13 @@ class LinuxNode(ResourceManager): home = os.path.join(self._home_dir, home) return home + @property + def nepi_home(self): + return os.path.join(self.home_dir, ".nepi") + @property def usr_dir(self): - return os.path.join(self.home_dir, "nepi-usr") + return os.path.join(self.nepi_home, "nepi-usr") @property def lib_dir(self): @@ -258,7 +264,7 @@ class LinuxNode(ResourceManager): @property def exp_dir(self): - return os.path.join(self.home_dir, "nepi-exp") + return os.path.join(self.nepi_home, "nepi-exp") @property def exp_home(self): @@ -277,25 +283,25 @@ class LinuxNode(ResourceManager): if self._os: return self._os - if (not self.get("hostname") or not self.get("username")): + if not self.localhost and not self.get("username"): msg = "Can't resolve OS, insufficient data " self.error(msg) raise RuntimeError, msg out = self.get_os() - if out.find("Fedora release 8") == 0: - self._os = OSType.FEDORA_8 - elif out.find("Fedora release 12") == 0: - self._os = OSType.FEDORA_12 - elif out.find("Fedora release 14") == 0: - self._os = OSType.FEDORA_14 - elif out.find("Fedora release") == 0: - self._os = OSType.FEDORA - elif out.find("Debian") == 0: + if out.find("Debian") == 0: self._os = OSType.DEBIAN elif out.find("Ubuntu") ==0: self._os = OSType.UBUNTU + elif out.find("Fedora release") == 0: + self._os = OSType.FEDORA + if out.find("Fedora release 8") == 0: + self._os = OSType.FEDORA_8 + elif out.find("Fedora release 12") == 0: + self._os = OSType.FEDORA_12 + elif out.find("Fedora release 14") == 0: + self._os = OSType.FEDORA_14 else: msg = "Unsupported OS" self.error(msg, out) @@ -322,16 +328,15 @@ class LinuxNode(ResourceManager): @property def use_deb(self): - return self.os in [OSType.DEBIAN, OSType.UBUNTU] + return (self.os & (OSType.DEBIAN|OSType.UBUNTU)) @property def use_rpm(self): - return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8, - OSType.FEDORA] + return (self.os & OSType.FEDORA) @property def localhost(self): - return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] + return self.get("hostname") in ['localhost', '127.0.0.1', '::1'] def do_provision(self): # check if host is alive @@ -351,14 +356,24 @@ class LinuxNode(ResourceManager): if self.get("cleanExperiment"): self.clean_experiment() - # Create shared directory structure - self.mkdir(self.lib_dir) - self.mkdir(self.bin_dir) - self.mkdir(self.src_dir) - self.mkdir(self.share_dir) + # Create shared directory structure and node home directory + paths = [self.lib_dir, + self.bin_dir, + self.src_dir, + self.share_dir, + self.node_home] - # Create experiment node home directory - self.mkdir(self.node_home) + self.mkdir(paths) + + # Get Public IP address if possible + if not self.get("ip"): + try: + ip = sshfuncs.gethostbyname(self.get("hostname")) + except: + msg = "DNS can not resolve hostname %s" % self.get("hostname") + self.debug(msg) + + self.set("ip", ip) super(LinuxNode, self).do_provision() @@ -374,7 +389,7 @@ class LinuxNode(ResourceManager): ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: - self.ec.schedule(reschedule_delay, self.deploy) + self.ec.schedule(self.reschedule_delay, self.deploy) return super(LinuxNode, self).do_deploy() @@ -385,7 +400,7 @@ class LinuxNode(ResourceManager): # Node needs to wait until all associated RMs are released # before it can be released if rm.state != ResourceState.RELEASED: - self.ec.schedule(reschedule_delay, self.release) + self.ec.schedule(self.reschedule_delay, self.release) return tear_down = self.get("tearDown") @@ -402,28 +417,38 @@ class LinuxNode(ResourceManager): def clean_processes(self): self.info("Cleaning up processes") - + + if self.localhost: + return + if self.get("username") != 'root': cmd = ("sudo -S killall tcpdump || /bin/true ; " + - "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + + "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) else: - pids_temp = [] if self.state >= ResourceState.READY: - ps_aux = "ps aux |awk '{print $2}' |sort -u" + import pickle + pids = pickle.load(open("/tmp/save.proc", "rb")) + pids_temp = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" (out, err), proc = self.execute(ps_aux) - pids_temp = out.split() - kill_pids = list(set(pids_temp) - set(self._pids)) - kill_pids = ' '.join(kill_pids) - - cmd = ("killall tcpdump || /bin/true ; " + - "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + - "kill %s || /bin/true ; " % kill_pids) + if len(out) != 0: + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + pids_temp[parts[0]] = parts[1] + kill_pids = set(pids_temp.items()) - set(pids.items()) + kill_pids = ' '.join(dict(kill_pids).keys()) + + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + + "kill %s || /bin/true ; " % kill_pids) + else: + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") else: cmd = ("killall tcpdump || /bin/true ; " + - "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ") + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") - out = err = "" (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def clean_home(self): @@ -431,7 +456,7 @@ class LinuxNode(ResourceManager): """ self.info("Cleaning up home") - cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % ( + cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % ( self.home_dir ) return self.execute(cmd, with_lock = True) @@ -451,13 +476,10 @@ class LinuxNode(ResourceManager): def execute(self, command, sudo = False, - stdin = None, env = None, tty = False, forward_x11 = False, - timeout = None, retry = 3, - err_on_timeout = True, connect_timeout = 30, strict_host_checking = False, persistent = True, @@ -472,10 +494,13 @@ class LinuxNode(ResourceManager): (out, err), proc = execfuncs.lexec(command, user = self.get("username"), # still problem with localhost sudo = sudo, - stdin = stdin, env = env) else: if with_lock: + # If the execute command is blocking, we don't want to keep + # the node lock. This lock is used to avoid race conditions + # when creating the ControlMaster sockets. A more elegant + # solution is needed. with self._node_lock: (out, err), proc = sshfuncs.rexec( command, @@ -486,15 +511,12 @@ class LinuxNode(ResourceManager): gw = self.get("gateway"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, @@ -510,15 +532,12 @@ class LinuxNode(ResourceManager): gw = self.get("gateway"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, @@ -534,19 +553,19 @@ class LinuxNode(ResourceManager): stdout = 'stdout', stderr = 'stderr', sudo = False, - tty = False): + tty = False, + strict_host_checking = False): self.debug("Running command '%s'" % command) if self.localhost: - (out, err), proc = execfuncs.lspawn(command, pidfile, - stdout = stdout, - stderr = stderr, - stdin = stdin, + (out, err), proc = execfuncs.lspawn(command, pidfile, home = home, create_home = create_home, - sudo = sudo, - user = user) + stdin = stdin or '/dev/null', + stdout = stdout or '/dev/null', + stderr = stderr or '/dev/null', + sudo = sudo) else: with self._node_lock: (out, err), proc = sshfuncs.rspawn( @@ -554,9 +573,9 @@ class LinuxNode(ResourceManager): pidfile = pidfile, home = home, create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', + stdin = stdin or '/dev/null', + stdout = stdout or '/dev/null', + stderr = stderr or '/dev/null', sudo = sudo, host = self.get("hostname"), user = self.get("username"), @@ -566,7 +585,8 @@ class LinuxNode(ResourceManager): agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), - tty = tty + tty = tty, + strict_host_checking = strict_host_checking ) return (out, err), proc @@ -585,7 +605,8 @@ class LinuxNode(ResourceManager): gw = self.get("gateway"), agent = True, identity = self.get("identity"), - server_key = self.get("serverKey") + server_key = self.get("serverKey"), + strict_host_checking = False ) return pidtuple @@ -604,7 +625,8 @@ class LinuxNode(ResourceManager): gw = self.get("gateway"), agent = True, identity = self.get("identity"), - server_key = self.get("serverKey") + server_key = self.get("serverKey"), + strict_host_checking = False ) return status @@ -629,16 +651,16 @@ class LinuxNode(ResourceManager): agent = True, sudo = sudo, identity = self.get("identity"), - server_key = self.get("serverKey") + server_key = self.get("serverKey"), + strict_host_checking = False ) return (out, err), proc def copy(self, src, dst): if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) + (out, err), proc = execfuncs.lcopy(src, dst, + recursive = True) else: with self._node_lock: (out, err), proc = sshfuncs.rcopy( @@ -653,14 +675,21 @@ class LinuxNode(ResourceManager): return (out, err), proc - def upload(self, src, dst, text = False, overwrite = True): + def upload(self, src, dst, text = False, overwrite = True, + raise_on_error = True): """ Copy content to destination - src content to copy. Can be a local file, directory or a list of files + src string with the content to copy. Can be: + - plain text + - a string with the path to a local file + - a string with a semi-colon separeted list of local files + - a string with a local directory - dst destination path on the remote host (remote is always self.host) + dst string with destination path on the remote host (remote is + always self.host) - text src is text input, it must be stored into a temp file before uploading + text src is text input, it must be stored into a temp file before + uploading """ # If source is a string input f = None @@ -673,29 +702,50 @@ class LinuxNode(ResourceManager): src = f.name # If dst files should not be overwritten, check that the files do not - # exits already + # exits already + if isinstance(src, str): + src = map(str.strip, src.split(";")) + if overwrite == False: src = self.filter_existing_files(src, dst) if not src: - return ("", ""), None + return ("", ""), None if not self.localhost: # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - result = self.copy(src, dst) + ((out, err), proc) = self.copy(src, dst) # clean up temp file if f: os.remove(f.name) - return result + if err: + msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst) + self.error(msg, out, err) + + msg = "%s out: %s err: %s" % (msg, out, err) + if raise_on_error: + raise RuntimeError, msg - def download(self, src, dst): + return ((out, err), proc) + + def download(self, src, dst, raise_on_error = True): if not self.localhost: # Build destination as @: src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) - return self.copy(src, dst) + + ((out, err), proc) = self.copy(src, dst) + + if err: + msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst) + self.error(msg, out, err) + + if raise_on_error: + raise RuntimeError, msg + + return ((out, err), proc) def install_packages_command(self, packages): command = "" @@ -710,7 +760,8 @@ class LinuxNode(ResourceManager): return command - def install_packages(self, packages, home, run_home = None): + def install_packages(self, packages, home, run_home = None, + raise_on_error = True): """ Install packages in the Linux host. 'home' is the directory to upload the package installation script. @@ -727,11 +778,12 @@ class LinuxNode(ResourceManager): stdout = "instpkg_stdout", stderr = "instpkg_stderr", overwrite = False, - raise_on_error = True) + raise_on_error = raise_on_error) return (out, err), proc - def remove_packages(self, packages, home, run_home = None): + def remove_packages(self, packages, home, run_home = None, + raise_on_error = True): """ Uninstall packages from the Linux host. 'home' is the directory to upload the package un-installation script. @@ -755,31 +807,49 @@ class LinuxNode(ResourceManager): stdout = "rmpkg_stdout", stderr = "rmpkg_stderr", overwrite = False, - raise_on_error = True) + raise_on_error = raise_on_error) return (out, err), proc - def mkdir(self, path, clean = False): + def mkdir(self, paths, clean = False): + """ Paths is either a single remote directory path to create, + or a list of directories to create. + """ if clean: - self.rmdir(path) + self.rmdir(paths) + + if isinstance(paths, str): + paths = [paths] + + cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths)) + + return self.execute(cmd, with_lock = True) - return self.execute("mkdir -p %s" % path, with_lock = True) + def rmdir(self, paths): + """ Paths is either a single remote directory path to delete, + or a list of directories to delete. + """ + + if isinstance(paths, str): + paths = [paths] - def rmdir(self, path): - return self.execute("rm -rf %s" % path, with_lock = True) + cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths)) + + return self.execute(cmd, with_lock = True) def run_and_wait(self, command, home, - shfile = "cmd.sh", - env = None, - overwrite = True, - pidfile = "pidfile", - ecodefile = "exitcode", - stdin = None, - stdout = "stdout", - stderr = "stderr", - sudo = False, - tty = False, - raise_on_error = False): + shfile="cmd.sh", + env=None, + overwrite=True, + wait_run=True, + pidfile="pidfile", + ecodefile="exitcode", + stdin=None, + stdout="stdout", + stderr="stderr", + sudo=False, + tty=False, + raise_on_error=True): """ Uploads the 'command' to a bash script in the host. Then runs the script detached in background in the host, and @@ -818,25 +888,26 @@ class LinuxNode(ResourceManager): pidfile = pidfile, raise_on_error = raise_on_error) - # wait until command finishes to execute - self.wait_run(pid, ppid) - - (eout, err), proc = self.check_errors(home, - ecodefile = ecodefile, - stderr = stderr) + if wait_run: + # wait until command finishes to execute + self.wait_run(pid, ppid) + + (eout, err), proc = self.check_errors(home, + ecodefile = ecodefile, + stderr = stderr) - # Out is what was written in the stderr file - if err: - msg = " Failed to run command '%s' " % command - self.error(msg, eout, err) + # Out is what was written in the stderr file + if err: + msg = " Failed to run command '%s' " % command + self.error(msg, eout, err) - if raise_on_error: - raise RuntimeError, msg + if raise_on_error: + raise RuntimeError, msg (out, oerr), proc = self.check_output(home, stdout) return (out, err), proc - + def exitcode(self, home, ecodefile = "exitcode"): """ Get the exit code of an application. @@ -860,10 +931,10 @@ class LinuxNode(ResourceManager): return ExitCode.ERROR def upload_command(self, command, - shfile = "cmd.sh", - ecodefile = "exitcode", - overwrite = True, - env = None): + shfile="cmd.sh", + ecodefile="exitcode", + overwrite=True, + env=None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile """ @@ -883,9 +954,9 @@ class LinuxNode(ResourceManager): # Add environ to command command = environ + command - return self.upload(command, shfile, text = True, overwrite = overwrite) + return self.upload(command, shfile, text=True, overwrite=overwrite) - def format_environment(self, env, inline = False): + def format_environment(self, env, inline=False): """ Formats the environment variables for a command to be executed either as an inline command (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or @@ -1033,8 +1104,8 @@ class LinuxNode(ResourceManager): """ Removes files that already exist in the Linux host from src list """ # construct a dictionary with { dst: src } - dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), - src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) + dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \ + if len(src) > 1 else dict({dst: src[0]}) command = [] for d in dests.keys(): @@ -1049,7 +1120,7 @@ class LinuxNode(ResourceManager): del dests[d] if not dests: - return "" + return [] - return " ".join(dests.values()) + return dests.values()