2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay, failtrap
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
173 flags = Flags.ExecReadOnly)
175 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
176 " from a previous same experiment, before the new experiment starts",
179 flags = Flags.ExecReadOnly)
181 clean_processes = Attribute("cleanProcesses",
182 "Kill all running processes before starting experiment",
185 flags = Flags.ExecReadOnly)
187 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188 "releasing the resource",
189 flags = Flags.ExecReadOnly)
191 cls._register_attribute(hostname)
192 cls._register_attribute(username)
193 cls._register_attribute(port)
194 cls._register_attribute(home)
195 cls._register_attribute(identity)
196 cls._register_attribute(server_key)
197 cls._register_attribute(clean_home)
198 cls._register_attribute(clean_experiment)
199 cls._register_attribute(clean_processes)
200 cls._register_attribute(tear_down)
202 def __init__(self, ec, guid):
203 super(LinuxNode, self).__init__(ec, guid)
205 # home directory at Linux host
208 # lock to prevent concurrent applications on the same node,
209 # to execute commands at the same time. There are potential
210 # concurrency issues when using SSH to a same host from
211 # multiple threads. There are also possible operational
212 # issues, e.g. an application querying the existence
213 # of a file or folder prior to its creation, and another
214 # application creating the same file or folder in between.
215 self._node_lock = threading.Lock()
217 def log_message(self, msg):
218 return " guid %d - host %s - %s " % (self.guid,
219 self.get("hostname"), msg)
223 home = self.get("home") or ""
224 if not home.startswith("/"):
225 home = os.path.join(self._home_dir, home)
230 return os.path.join(self.home_dir, "nepi-usr")
234 return os.path.join(self.usr_dir, "lib")
238 return os.path.join(self.usr_dir, "bin")
242 return os.path.join(self.usr_dir, "src")
246 return os.path.join(self.usr_dir, "share")
250 return os.path.join(self.home_dir, "nepi-exp")
254 return os.path.join(self.exp_dir, self.ec.exp_id)
258 return os.path.join(self.exp_home, "node-%d" % self.guid)
262 return os.path.join(self.node_home, self.ec.run_id)
269 if (not self.get("hostname") or not self.get("username")):
270 msg = "Can't resolve OS, insufficient data "
272 raise RuntimeError, msg
276 if out.find("Fedora release 8") == 0:
277 self._os = OSType.FEDORA_8
278 elif out.find("Fedora release 12") == 0:
279 self._os = OSType.FEDORA_12
280 elif out.find("Fedora release 14") == 0:
281 self._os = OSType.FEDORA_14
282 elif out.find("Debian") == 0:
283 self._os = OSType.DEBIAN
284 elif out.find("Ubuntu") ==0:
285 self._os = OSType.UBUNTU
287 msg = "Unsupported OS"
289 raise RuntimeError, "%s - %s " %( msg, out )
294 # The underlying SSH layer will sometimes return an empty
295 # output (even if the command was executed without errors).
296 # To work arround this, repeat the operation N times or
297 # until the result is not empty string
302 (out, err), proc = self.execute("cat /etc/issue",
307 if out.strip() != "":
310 trace = traceback.format_exc()
311 msg = "Error detecting OS: %s " % trace
312 self.error(msg, out, err)
315 time.sleep(min(30.0, retrydelay))
320 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
324 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
329 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
333 # check if host is alive
334 if not self.is_alive():
335 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
337 raise RuntimeError, msg
341 if self.get("cleanProcesses"):
342 self.clean_processes()
344 if self.get("cleanHome"):
347 if self.get("cleanExperiment"):
348 self.clean_experiment()
350 # Create shared directory structure
351 self.mkdir(self.lib_dir)
352 self.mkdir(self.bin_dir)
353 self.mkdir(self.src_dir)
354 self.mkdir(self.share_dir)
356 # Create experiment node home directory
357 self.mkdir(self.node_home)
359 super(LinuxNode, self).provision()
363 if self.state == ResourceState.NEW:
364 self.info("Deploying node")
368 # Node needs to wait until all associated interfaces are
369 # ready before it can finalize deployment
370 from nepi.resources.linux.interface import LinuxInterface
371 ifaces = self.get_connected(LinuxInterface.rtype())
373 if iface.state < ResourceState.READY:
374 self.ec.schedule(reschedule_delay, self.deploy)
377 super(LinuxNode, self).deploy()
381 rms = self.get_connected()
383 # Node needs to wait until all associated RMs are released
384 # before it can be released
385 if rm.state < ResourceState.STOPPED:
386 self.ec.schedule(reschedule_delay, self.release)
389 tear_down = self.get("tearDown")
391 self.execute(tear_down)
393 self.clean_processes()
396 err = traceback.format_exc()
399 super(LinuxNode, self).release()
401 def valid_connection(self, guid):
405 def clean_processes(self, killer = False):
406 self.info("Cleaning up processes")
410 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
411 "sudo -S killall python tcpdump || /bin/true ; " +
412 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
413 "sudo -S killall -u root || /bin/true ; " +
414 "sudo -S killall -u root || /bin/true ; ")
417 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
418 "sudo -S killall tcpdump || /bin/true ; " +
419 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
420 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
423 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
425 def clean_home(self):
426 """ Cleans all NEPI related folders in the Linux host
428 self.info("Cleaning up home")
430 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
433 return self.execute(cmd, with_lock = True)
435 def clean_experiment(self):
436 """ Cleans all experiment related files in the Linux host.
437 It preserves NEPI files and folders that have a multi experiment
440 self.info("Cleaning up experiment files")
442 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
446 return self.execute(cmd, with_lock = True)
448 def execute(self, command,
456 err_on_timeout = True,
457 connect_timeout = 30,
458 strict_host_checking = False,
463 """ Notice that this invocation will block until the
464 execution finishes. If this is not the desired behavior,
465 use 'run' instead."""
468 (out, err), proc = execfuncs.lexec(command,
475 with self._node_lock:
476 (out, err), proc = sshfuncs.rexec(
478 host = self.get("hostname"),
479 user = self.get("username"),
480 port = self.get("port"),
484 identity = self.get("identity"),
485 server_key = self.get("serverKey"),
488 forward_x11 = forward_x11,
491 err_on_timeout = err_on_timeout,
492 connect_timeout = connect_timeout,
493 persistent = persistent,
495 strict_host_checking = strict_host_checking
498 (out, err), proc = sshfuncs.rexec(
500 host = self.get("hostname"),
501 user = self.get("username"),
502 port = self.get("port"),
506 identity = self.get("identity"),
507 server_key = self.get("serverKey"),
510 forward_x11 = forward_x11,
513 err_on_timeout = err_on_timeout,
514 connect_timeout = connect_timeout,
515 persistent = persistent,
517 strict_host_checking = strict_host_checking
520 return (out, err), proc
522 def run(self, command, home,
531 self.debug("Running command '%s'" % command)
534 (out, err), proc = execfuncs.lspawn(command, pidfile,
539 create_home = create_home,
543 with self._node_lock:
544 (out, err), proc = sshfuncs.rspawn(
548 create_home = create_home,
549 stdin = stdin if stdin is not None else '/dev/null',
550 stdout = stdout if stdout else '/dev/null',
551 stderr = stderr if stderr else '/dev/null',
553 host = self.get("hostname"),
554 user = self.get("username"),
555 port = self.get("port"),
557 identity = self.get("identity"),
558 server_key = self.get("serverKey"),
562 return (out, err), proc
564 def getpid(self, home, pidfile = "pidfile"):
566 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
568 with self._node_lock:
569 pidtuple = sshfuncs.rgetpid(
570 os.path.join(home, pidfile),
571 host = self.get("hostname"),
572 user = self.get("username"),
573 port = self.get("port"),
575 identity = self.get("identity"),
576 server_key = self.get("serverKey")
581 def status(self, pid, ppid):
583 status = execfuncs.lstatus(pid, ppid)
585 with self._node_lock:
586 status = sshfuncs.rstatus(
588 host = self.get("hostname"),
589 user = self.get("username"),
590 port = self.get("port"),
592 identity = self.get("identity"),
593 server_key = self.get("serverKey")
598 def kill(self, pid, ppid, sudo = False):
601 status = self.status(pid, ppid)
603 if status == sshfuncs.ProcStatus.RUNNING:
605 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
607 with self._node_lock:
608 (out, err), proc = sshfuncs.rkill(
610 host = self.get("hostname"),
611 user = self.get("username"),
612 port = self.get("port"),
615 identity = self.get("identity"),
616 server_key = self.get("serverKey")
619 return (out, err), proc
621 def copy(self, src, dst):
623 (out, err), proc = execfuncs.lcopy(source, dest,
625 strict_host_checking = False)
627 with self._node_lock:
628 (out, err), proc = sshfuncs.rcopy(
630 port = self.get("port"),
631 identity = self.get("identity"),
632 server_key = self.get("serverKey"),
634 strict_host_checking = False)
636 return (out, err), proc
638 def upload(self, src, dst, text = False, overwrite = True):
639 """ Copy content to destination
641 src content to copy. Can be a local file, directory or a list of files
643 dst destination path on the remote host (remote is always self.host)
645 text src is text input, it must be stored into a temp file before uploading
647 # If source is a string input
649 if text and not os.path.isfile(src):
650 # src is text input that should be uploaded as file
651 # create a temporal file with the content to upload
652 f = tempfile.NamedTemporaryFile(delete=False)
657 # If dst files should not be overwritten, check that the files do not
659 if overwrite == False:
660 src = self.filter_existing_files(src, dst)
662 return ("", ""), None
664 if not self.localhost:
665 # Build destination as <user>@<server>:<path>
666 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
668 result = self.copy(src, dst)
676 def download(self, src, dst):
677 if not self.localhost:
678 # Build destination as <user>@<server>:<path>
679 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
680 return self.copy(src, dst)
682 def install_packages_command(self, packages):
685 command = rpmfuncs.install_packages_command(self.os, packages)
687 command = debfuncs.install_packages_command(self.os, packages)
689 msg = "Error installing packages ( OS not known ) "
690 self.error(msg, self.os)
691 raise RuntimeError, msg
695 def install_packages(self, packages, home, run_home = None):
696 """ Install packages in the Linux host.
698 'home' is the directory to upload the package installation script.
699 'run_home' is the directory from where to execute the script.
701 command = self.install_packages_command(packages)
703 run_home = run_home or home
705 (out, err), proc = self.run_and_wait(command, run_home,
706 shfile = os.path.join(home, "instpkg.sh"),
707 pidfile = "instpkg_pidfile",
708 ecodefile = "instpkg_exitcode",
709 stdout = "instpkg_stdout",
710 stderr = "instpkg_stderr",
712 raise_on_error = True)
714 return (out, err), proc
716 def remove_packages(self, packages, home, run_home = None):
717 """ Uninstall packages from the Linux host.
719 'home' is the directory to upload the package un-installation script.
720 'run_home' is the directory from where to execute the script.
723 command = rpmfuncs.remove_packages_command(self.os, packages)
725 command = debfuncs.remove_packages_command(self.os, packages)
727 msg = "Error removing packages ( OS not known ) "
729 raise RuntimeError, msg
731 run_home = run_home or home
733 (out, err), proc = self.run_and_wait(command, run_home,
734 shfile = os.path.join(home, "rmpkg.sh"),
735 pidfile = "rmpkg_pidfile",
736 ecodefile = "rmpkg_exitcode",
737 stdout = "rmpkg_stdout",
738 stderr = "rmpkg_stderr",
740 raise_on_error = True)
742 return (out, err), proc
744 def mkdir(self, path, clean = False):
748 return self.execute("mkdir -p %s" % path, with_lock = True)
750 def rmdir(self, path):
751 return self.execute("rm -rf %s" % path, with_lock = True)
753 def run_and_wait(self, command, home,
758 ecodefile = "exitcode",
764 raise_on_error = False):
766 Uploads the 'command' to a bash script in the host.
767 Then runs the script detached in background in the host, and
768 busy-waites until the script finishes executing.
771 if not shfile.startswith("/"):
772 shfile = os.path.join(home, shfile)
774 self.upload_command(command,
776 ecodefile = ecodefile,
778 overwrite = overwrite)
780 command = "bash %s" % shfile
781 # run command in background in remote host
782 (out, err), proc = self.run(command, home,
790 # check no errors occurred
792 msg = " Failed to run command '%s' " % command
793 self.error(msg, out, err)
795 raise RuntimeError, msg
797 # Wait for pid file to be generated
798 pid, ppid = self.wait_pid(
801 raise_on_error = raise_on_error)
803 # wait until command finishes to execute
804 self.wait_run(pid, ppid)
806 (eout, err), proc = self.check_errors(home,
807 ecodefile = ecodefile,
810 # Out is what was written in the stderr file
812 msg = " Failed to run command '%s' " % command
813 self.error(msg, eout, err)
816 raise RuntimeError, msg
818 (out, oerr), proc = self.check_output(home, stdout)
820 return (out, err), proc
822 def exitcode(self, home, ecodefile = "exitcode"):
824 Get the exit code of an application.
825 Returns an integer value with the exit code
827 (out, err), proc = self.check_output(home, ecodefile)
829 # Succeeded to open file, return exit code in the file
832 return int(out.strip())
834 # Error in the content of the file!
835 return ExitCode.CORRUPTFILE
837 # No such file or directory
838 if proc.returncode == 1:
839 return ExitCode.FILENOTFOUND
841 # Other error from 'cat'
842 return ExitCode.ERROR
844 def upload_command(self, command,
846 ecodefile = "exitcode",
849 """ Saves the command as a bash script file in the remote host, and
850 forces to save the exit code of the command execution to the ecodefile
853 if not (command.strip().endswith(";") or command.strip().endswith("&")):
856 # The exit code of the command will be stored in ecodefile
857 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
859 'ecodefile': ecodefile,
863 environ = self.format_environment(env)
865 # Add environ to command
866 command = environ + command
868 return self.upload(command, shfile, text = True, overwrite = overwrite)
870 def format_environment(self, env, inline = False):
871 """ Formats the environment variables for a command to be executed
872 either as an inline command
873 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
874 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
876 if not env: return ""
878 # Remove extra white spaces
879 env = re.sub(r'\s+', ' ', env.strip())
881 sep = ";" if inline else "\n"
882 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
884 def check_errors(self, home,
885 ecodefile = "exitcode",
887 """ Checks whether errors occurred while running a command.
888 It first checks the exit code for the command, and only if the
889 exit code is an error one it returns the error output.
895 # get exit code saved in the 'exitcode' file
896 ecode = self.exitcode(home, ecodefile)
898 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
899 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
900 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
901 # The process returned an error code or didn't exist.
902 # Check standard error.
903 (err, eerr), proc = self.check_output(home, stderr)
905 # If the stderr file was not found, assume nothing bad happened,
906 # and just ignore the error.
907 # (cat returns 1 for error "No such file or directory")
908 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
911 return ("", err), proc
913 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
914 """ Waits until the pid file for the command is generated,
915 and returns the pid and ppid of the process """
920 pidtuple = self.getpid(home = home, pidfile = pidfile)
929 msg = " Failed to get pid for pidfile %s/%s " % (
934 raise RuntimeError, msg
938 def wait_run(self, pid, ppid, trial = 0):
939 """ wait for a remote process to finish execution """
943 status = self.status(pid, ppid)
945 if status is ProcStatus.FINISHED:
947 elif status is not ProcStatus.RUNNING:
950 # If it takes more than 20 seconds to start, then
951 # asume something went wrong
955 # The app is running, just wait...
958 def check_output(self, home, filename):
959 """ Retrives content of file """
960 (out, err), proc = self.execute("cat %s" %
961 os.path.join(home, filename), retry = 1, with_lock = True)
962 return (out, err), proc
965 """ Checks if host is responsive
971 # The underlying SSH layer will sometimes return an empty
972 # output (even if the command was executed without errors).
973 # To work arround this, repeat the operation N times or
974 # until the result is not empty string
978 (out, err), proc = self.execute("echo 'ALIVE'",
983 if out.find("ALIVE") > -1:
986 trace = traceback.format_exc()
987 msg = "Unresponsive host. Error reaching host: %s " % trace
988 self.error(msg, out, err)
991 time.sleep(min(30.0, retrydelay))
994 if out.find("ALIVE") > -1:
997 msg = "Unresponsive host. Wrong answer. "
998 self.error(msg, out, err)
1001 def find_home(self):
1002 """ Retrieves host home directory
1004 # The underlying SSH layer will sometimes return an empty
1005 # output (even if the command was executed without errors).
1006 # To work arround this, repeat the operation N times or
1007 # until the result is not empty string
1009 for i in xrange(10):
1011 (out, err), proc = self.execute("echo ${HOME}",
1016 if out.strip() != "":
1017 self._home_dir = out.strip()
1020 trace = traceback.format_exc()
1021 msg = "Impossible to retrieve HOME directory" % trace
1022 self.error(msg, out, err)
1025 time.sleep(min(30.0, retrydelay))
1028 if not self._home_dir:
1029 msg = "Impossible to retrieve HOME directory"
1030 self.error(msg, out, err)
1031 raise RuntimeError, msg
1033 def filter_existing_files(self, src, dst):
1034 """ Removes files that already exist in the Linux host from src list
1036 # construct a dictionary with { dst: src }
1037 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1038 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1041 for d in dests.keys():
1042 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1044 command = ";".join(command)
1046 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1048 for d in dests.keys():
1049 if out.find(d) > -1:
1055 return " ".join(dests.values())