2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay, failtrap
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
173 flags = Flags.ExecReadOnly)
175 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
176 " from a previous same experiment, before the new experiment starts",
179 flags = Flags.ExecReadOnly)
181 clean_processes = Attribute("cleanProcesses",
182 "Kill all running processes before starting experiment",
185 flags = Flags.ExecReadOnly)
187 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188 "releasing the resource",
189 flags = Flags.ExecReadOnly)
191 cls._register_attribute(hostname)
192 cls._register_attribute(username)
193 cls._register_attribute(port)
194 cls._register_attribute(home)
195 cls._register_attribute(identity)
196 cls._register_attribute(server_key)
197 cls._register_attribute(clean_home)
198 cls._register_attribute(clean_experiment)
199 cls._register_attribute(clean_processes)
200 cls._register_attribute(tear_down)
202 def __init__(self, ec, guid):
203 super(LinuxNode, self).__init__(ec, guid)
205 # home directory at Linux host
208 # lock to prevent concurrent applications on the same node,
209 # to execute commands at the same time. There are potential
210 # concurrency issues when using SSH to a same host from
211 # multiple threads. There are also possible operational
212 # issues, e.g. an application querying the existence
213 # of a file or folder prior to its creation, and another
214 # application creating the same file or folder in between.
215 self._node_lock = threading.Lock()
217 def log_message(self, msg):
218 return " guid %d - host %s - %s " % (self.guid,
219 self.get("hostname"), msg)
223 home = self.get("home") or ""
224 if not home.startswith("/"):
225 home = os.path.join(self._home_dir, home)
230 return os.path.join(self.home_dir, "nepi-usr")
234 return os.path.join(self.usr_dir, "lib")
238 return os.path.join(self.usr_dir, "bin")
242 return os.path.join(self.usr_dir, "src")
246 return os.path.join(self.usr_dir, "share")
250 return os.path.join(self.home_dir, "nepi-exp")
254 return os.path.join(self.exp_dir, self.ec.exp_id)
258 return os.path.join(self.exp_home, "node-%d" % self.guid)
262 return os.path.join(self.node_home, self.ec.run_id)
269 if (not self.get("hostname") or not self.get("username")):
270 msg = "Can't resolve OS, insufficient data "
272 raise RuntimeError, msg
276 if out.find("Fedora release 8") == 0:
277 self._os = OSType.FEDORA_8
278 elif out.find("Fedora release 12") == 0:
279 self._os = OSType.FEDORA_12
280 elif out.find("Fedora release 14") == 0:
281 self._os = OSType.FEDORA_14
282 elif out.find("Fedora release") == 0:
283 self._os = OSType.FEDORA
284 elif out.find("Debian") == 0:
285 self._os = OSType.DEBIAN
286 elif out.find("Ubuntu") ==0:
287 self._os = OSType.UBUNTU
289 msg = "Unsupported OS"
291 raise RuntimeError, "%s - %s " %( msg, out )
296 # The underlying SSH layer will sometimes return an empty
297 # output (even if the command was executed without errors).
298 # To work arround this, repeat the operation N times or
299 # until the result is not empty string
304 (out, err), proc = self.execute("cat /etc/issue",
309 if out.strip() != "":
312 trace = traceback.format_exc()
313 msg = "Error detecting OS: %s " % trace
314 self.error(msg, out, err)
317 time.sleep(min(30.0, retrydelay))
322 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
326 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
331 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
335 # check if host is alive
336 if not self.is_alive():
337 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
339 raise RuntimeError, msg
343 if self.get("cleanProcesses"):
344 self.clean_processes()
346 if self.get("cleanHome"):
349 if self.get("cleanExperiment"):
350 self.clean_experiment()
352 # Create shared directory structure
353 self.mkdir(self.lib_dir)
354 self.mkdir(self.bin_dir)
355 self.mkdir(self.src_dir)
356 self.mkdir(self.share_dir)
358 # Create experiment node home directory
359 self.mkdir(self.node_home)
361 super(LinuxNode, self).provision()
365 if self.state == ResourceState.NEW:
366 self.info("Deploying node")
370 # Node needs to wait until all associated interfaces are
371 # ready before it can finalize deployment
372 from nepi.resources.linux.interface import LinuxInterface
373 ifaces = self.get_connected(LinuxInterface.rtype())
375 if iface.state < ResourceState.READY:
376 self.ec.schedule(reschedule_delay, self.deploy)
379 super(LinuxNode, self).deploy()
383 rms = self.get_connected()
385 # Node needs to wait until all associated RMs are released
386 # before it can be released
387 if rm.state < ResourceState.STOPPED:
388 self.ec.schedule(reschedule_delay, self.release)
391 tear_down = self.get("tearDown")
393 self.execute(tear_down)
395 self.clean_processes()
398 err = traceback.format_exc()
401 super(LinuxNode, self).release()
403 def valid_connection(self, guid):
407 def clean_processes(self, killer = False):
408 self.info("Cleaning up processes")
412 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
413 "sudo -S killall python tcpdump || /bin/true ; " +
414 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
415 "sudo -S killall -u root || /bin/true ; " +
416 "sudo -S killall -u root || /bin/true ; ")
419 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
420 "sudo -S killall tcpdump || /bin/true ; " +
421 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
422 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
425 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
427 def clean_home(self):
428 """ Cleans all NEPI related folders in the Linux host
430 self.info("Cleaning up home")
432 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
435 return self.execute(cmd, with_lock = True)
437 def clean_experiment(self):
438 """ Cleans all experiment related files in the Linux host.
439 It preserves NEPI files and folders that have a multi experiment
442 self.info("Cleaning up experiment files")
444 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
448 return self.execute(cmd, with_lock = True)
450 def execute(self, command,
458 err_on_timeout = True,
459 connect_timeout = 30,
460 strict_host_checking = False,
465 """ Notice that this invocation will block until the
466 execution finishes. If this is not the desired behavior,
467 use 'run' instead."""
470 (out, err), proc = execfuncs.lexec(command,
477 with self._node_lock:
478 (out, err), proc = sshfuncs.rexec(
480 host = self.get("hostname"),
481 user = self.get("username"),
482 port = self.get("port"),
486 identity = self.get("identity"),
487 server_key = self.get("serverKey"),
490 forward_x11 = forward_x11,
493 err_on_timeout = err_on_timeout,
494 connect_timeout = connect_timeout,
495 persistent = persistent,
497 strict_host_checking = strict_host_checking
500 (out, err), proc = sshfuncs.rexec(
502 host = self.get("hostname"),
503 user = self.get("username"),
504 port = self.get("port"),
508 identity = self.get("identity"),
509 server_key = self.get("serverKey"),
512 forward_x11 = forward_x11,
515 err_on_timeout = err_on_timeout,
516 connect_timeout = connect_timeout,
517 persistent = persistent,
519 strict_host_checking = strict_host_checking
522 return (out, err), proc
524 def run(self, command, home,
533 self.debug("Running command '%s'" % command)
536 (out, err), proc = execfuncs.lspawn(command, pidfile,
541 create_home = create_home,
545 with self._node_lock:
546 (out, err), proc = sshfuncs.rspawn(
550 create_home = create_home,
551 stdin = stdin if stdin is not None else '/dev/null',
552 stdout = stdout if stdout else '/dev/null',
553 stderr = stderr if stderr else '/dev/null',
555 host = self.get("hostname"),
556 user = self.get("username"),
557 port = self.get("port"),
559 identity = self.get("identity"),
560 server_key = self.get("serverKey"),
564 return (out, err), proc
566 def getpid(self, home, pidfile = "pidfile"):
568 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
570 with self._node_lock:
571 pidtuple = sshfuncs.rgetpid(
572 os.path.join(home, pidfile),
573 host = self.get("hostname"),
574 user = self.get("username"),
575 port = self.get("port"),
577 identity = self.get("identity"),
578 server_key = self.get("serverKey")
583 def status(self, pid, ppid):
585 status = execfuncs.lstatus(pid, ppid)
587 with self._node_lock:
588 status = sshfuncs.rstatus(
590 host = self.get("hostname"),
591 user = self.get("username"),
592 port = self.get("port"),
594 identity = self.get("identity"),
595 server_key = self.get("serverKey")
600 def kill(self, pid, ppid, sudo = False):
603 status = self.status(pid, ppid)
605 if status == sshfuncs.ProcStatus.RUNNING:
607 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
609 with self._node_lock:
610 (out, err), proc = sshfuncs.rkill(
612 host = self.get("hostname"),
613 user = self.get("username"),
614 port = self.get("port"),
617 identity = self.get("identity"),
618 server_key = self.get("serverKey")
621 return (out, err), proc
623 def copy(self, src, dst):
625 (out, err), proc = execfuncs.lcopy(source, dest,
627 strict_host_checking = False)
629 with self._node_lock:
630 (out, err), proc = sshfuncs.rcopy(
632 port = self.get("port"),
633 identity = self.get("identity"),
634 server_key = self.get("serverKey"),
636 strict_host_checking = False)
638 return (out, err), proc
640 def upload(self, src, dst, text = False, overwrite = True):
641 """ Copy content to destination
643 src content to copy. Can be a local file, directory or a list of files
645 dst destination path on the remote host (remote is always self.host)
647 text src is text input, it must be stored into a temp file before uploading
649 # If source is a string input
651 if text and not os.path.isfile(src):
652 # src is text input that should be uploaded as file
653 # create a temporal file with the content to upload
654 f = tempfile.NamedTemporaryFile(delete=False)
659 # If dst files should not be overwritten, check that the files do not
661 if overwrite == False:
662 src = self.filter_existing_files(src, dst)
664 return ("", ""), None
666 if not self.localhost:
667 # Build destination as <user>@<server>:<path>
668 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
670 result = self.copy(src, dst)
678 def download(self, src, dst):
679 if not self.localhost:
680 # Build destination as <user>@<server>:<path>
681 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
682 return self.copy(src, dst)
684 def install_packages_command(self, packages):
687 command = rpmfuncs.install_packages_command(self.os, packages)
689 command = debfuncs.install_packages_command(self.os, packages)
691 msg = "Error installing packages ( OS not known ) "
692 self.error(msg, self.os)
693 raise RuntimeError, msg
697 def install_packages(self, packages, home, run_home = None):
698 """ Install packages in the Linux host.
700 'home' is the directory to upload the package installation script.
701 'run_home' is the directory from where to execute the script.
703 command = self.install_packages_command(packages)
705 run_home = run_home or home
707 (out, err), proc = self.run_and_wait(command, run_home,
708 shfile = os.path.join(home, "instpkg.sh"),
709 pidfile = "instpkg_pidfile",
710 ecodefile = "instpkg_exitcode",
711 stdout = "instpkg_stdout",
712 stderr = "instpkg_stderr",
714 raise_on_error = True)
716 return (out, err), proc
718 def remove_packages(self, packages, home, run_home = None):
719 """ Uninstall packages from the Linux host.
721 'home' is the directory to upload the package un-installation script.
722 'run_home' is the directory from where to execute the script.
725 command = rpmfuncs.remove_packages_command(self.os, packages)
727 command = debfuncs.remove_packages_command(self.os, packages)
729 msg = "Error removing packages ( OS not known ) "
731 raise RuntimeError, msg
733 run_home = run_home or home
735 (out, err), proc = self.run_and_wait(command, run_home,
736 shfile = os.path.join(home, "rmpkg.sh"),
737 pidfile = "rmpkg_pidfile",
738 ecodefile = "rmpkg_exitcode",
739 stdout = "rmpkg_stdout",
740 stderr = "rmpkg_stderr",
742 raise_on_error = True)
744 return (out, err), proc
746 def mkdir(self, path, clean = False):
750 return self.execute("mkdir -p %s" % path, with_lock = True)
752 def rmdir(self, path):
753 return self.execute("rm -rf %s" % path, with_lock = True)
755 def run_and_wait(self, command, home,
760 ecodefile = "exitcode",
766 raise_on_error = False):
768 Uploads the 'command' to a bash script in the host.
769 Then runs the script detached in background in the host, and
770 busy-waites until the script finishes executing.
773 if not shfile.startswith("/"):
774 shfile = os.path.join(home, shfile)
776 self.upload_command(command,
778 ecodefile = ecodefile,
780 overwrite = overwrite)
782 command = "bash %s" % shfile
783 # run command in background in remote host
784 (out, err), proc = self.run(command, home,
792 # check no errors occurred
794 msg = " Failed to run command '%s' " % command
795 self.error(msg, out, err)
797 raise RuntimeError, msg
799 # Wait for pid file to be generated
800 pid, ppid = self.wait_pid(
803 raise_on_error = raise_on_error)
805 # wait until command finishes to execute
806 self.wait_run(pid, ppid)
808 (eout, err), proc = self.check_errors(home,
809 ecodefile = ecodefile,
812 # Out is what was written in the stderr file
814 msg = " Failed to run command '%s' " % command
815 self.error(msg, eout, err)
818 raise RuntimeError, msg
820 (out, oerr), proc = self.check_output(home, stdout)
822 return (out, err), proc
824 def exitcode(self, home, ecodefile = "exitcode"):
826 Get the exit code of an application.
827 Returns an integer value with the exit code
829 (out, err), proc = self.check_output(home, ecodefile)
831 # Succeeded to open file, return exit code in the file
834 return int(out.strip())
836 # Error in the content of the file!
837 return ExitCode.CORRUPTFILE
839 # No such file or directory
840 if proc.returncode == 1:
841 return ExitCode.FILENOTFOUND
843 # Other error from 'cat'
844 return ExitCode.ERROR
846 def upload_command(self, command,
848 ecodefile = "exitcode",
851 """ Saves the command as a bash script file in the remote host, and
852 forces to save the exit code of the command execution to the ecodefile
855 if not (command.strip().endswith(";") or command.strip().endswith("&")):
858 # The exit code of the command will be stored in ecodefile
859 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
861 'ecodefile': ecodefile,
865 environ = self.format_environment(env)
867 # Add environ to command
868 command = environ + command
870 return self.upload(command, shfile, text = True, overwrite = overwrite)
872 def format_environment(self, env, inline = False):
873 """ Formats the environment variables for a command to be executed
874 either as an inline command
875 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
876 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
878 if not env: return ""
880 # Remove extra white spaces
881 env = re.sub(r'\s+', ' ', env.strip())
883 sep = ";" if inline else "\n"
884 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
886 def check_errors(self, home,
887 ecodefile = "exitcode",
889 """ Checks whether errors occurred while running a command.
890 It first checks the exit code for the command, and only if the
891 exit code is an error one it returns the error output.
897 # get exit code saved in the 'exitcode' file
898 ecode = self.exitcode(home, ecodefile)
900 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
901 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
902 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
903 # The process returned an error code or didn't exist.
904 # Check standard error.
905 (err, eerr), proc = self.check_output(home, stderr)
907 # If the stderr file was not found, assume nothing bad happened,
908 # and just ignore the error.
909 # (cat returns 1 for error "No such file or directory")
910 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
913 return ("", err), proc
915 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
916 """ Waits until the pid file for the command is generated,
917 and returns the pid and ppid of the process """
922 pidtuple = self.getpid(home = home, pidfile = pidfile)
931 msg = " Failed to get pid for pidfile %s/%s " % (
936 raise RuntimeError, msg
940 def wait_run(self, pid, ppid, trial = 0):
941 """ wait for a remote process to finish execution """
945 status = self.status(pid, ppid)
947 if status is ProcStatus.FINISHED:
949 elif status is not ProcStatus.RUNNING:
952 # If it takes more than 20 seconds to start, then
953 # asume something went wrong
957 # The app is running, just wait...
960 def check_output(self, home, filename):
961 """ Retrives content of file """
962 (out, err), proc = self.execute("cat %s" %
963 os.path.join(home, filename), retry = 1, with_lock = True)
964 return (out, err), proc
967 """ Checks if host is responsive
973 # The underlying SSH layer will sometimes return an empty
974 # output (even if the command was executed without errors).
975 # To work arround this, repeat the operation N times or
976 # until the result is not empty string
980 (out, err), proc = self.execute("echo 'ALIVE'",
985 if out.find("ALIVE") > -1:
988 trace = traceback.format_exc()
989 msg = "Unresponsive host. Error reaching host: %s " % trace
990 self.error(msg, out, err)
993 time.sleep(min(30.0, retrydelay))
996 if out.find("ALIVE") > -1:
999 msg = "Unresponsive host. Wrong answer. "
1000 self.error(msg, out, err)
1003 def find_home(self):
1004 """ Retrieves host home directory
1006 # The underlying SSH layer will sometimes return an empty
1007 # output (even if the command was executed without errors).
1008 # To work arround this, repeat the operation N times or
1009 # until the result is not empty string
1013 (out, err), proc = self.execute("echo ${HOME}",
1018 if out.strip() != "":
1019 self._home_dir = out.strip()
1022 trace = traceback.format_exc()
1023 msg = "Impossible to retrieve HOME directory" % trace
1024 self.error(msg, out, err)
1027 time.sleep(min(30.0, retrydelay))
1030 if not self._home_dir:
1031 msg = "Impossible to retrieve HOME directory"
1032 self.error(msg, out, err)
1033 raise RuntimeError, msg
1035 def filter_existing_files(self, src, dst):
1036 """ Removes files that already exist in the Linux host from src list
1038 # construct a dictionary with { dst: src }
1039 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1040 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1043 for d in dests.keys():
1044 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1046 command = ";".join(command)
1048 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1050 for d in dests.keys():
1051 if out.find(d) > -1:
1057 return " ".join(dests.values())