2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
173 flags = Flags.ExecReadOnly)
175 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
176 " from a previous same experiment, before the new experiment starts",
179 flags = Flags.ExecReadOnly)
181 clean_processes = Attribute("cleanProcesses",
182 "Kill all running processes before starting experiment",
185 flags = Flags.ExecReadOnly)
187 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188 "releasing the resource",
189 flags = Flags.ExecReadOnly)
191 cls._register_attribute(hostname)
192 cls._register_attribute(username)
193 cls._register_attribute(port)
194 cls._register_attribute(home)
195 cls._register_attribute(identity)
196 cls._register_attribute(server_key)
197 cls._register_attribute(clean_home)
198 cls._register_attribute(clean_experiment)
199 cls._register_attribute(clean_processes)
200 cls._register_attribute(tear_down)
202 def __init__(self, ec, guid):
203 super(LinuxNode, self).__init__(ec, guid)
205 # home directory at Linux host
208 # lock to prevent concurrent applications on the same node,
209 # to execute commands at the same time. There are potential
210 # concurrency issues when using SSH to a same host from
211 # multiple threads. There are also possible operational
212 # issues, e.g. an application querying the existence
213 # of a file or folder prior to its creation, and another
214 # application creating the same file or folder in between.
215 self._node_lock = threading.Lock()
217 def log_message(self, msg):
218 return " guid %d - host %s - %s " % (self.guid,
219 self.get("hostname"), msg)
223 home = self.get("home") or ""
224 if not home.startswith("/"):
225 home = os.path.join(self._home_dir, home)
230 return os.path.join(self.home_dir, "nepi-usr")
234 return os.path.join(self.usr_dir, "lib")
238 return os.path.join(self.usr_dir, "bin")
242 return os.path.join(self.usr_dir, "src")
246 return os.path.join(self.usr_dir, "share")
250 return os.path.join(self.home_dir, "nepi-exp")
254 return os.path.join(self.exp_dir, self.ec.exp_id)
258 return os.path.join(self.exp_home, "node-%d" % self.guid)
262 return os.path.join(self.node_home, self.ec.run_id)
269 if (not self.get("hostname") or not self.get("username")):
270 msg = "Can't resolve OS, insufficient data "
272 raise RuntimeError, msg
276 if out.find("Fedora release 8") == 0:
277 self._os = OSType.FEDORA_8
278 elif out.find("Fedora release 12") == 0:
279 self._os = OSType.FEDORA_12
280 elif out.find("Fedora release 14") == 0:
281 self._os = OSType.FEDORA_14
282 elif out.find("Fedora release") == 0:
283 self._os = OSType.FEDORA
284 elif out.find("Debian") == 0:
285 self._os = OSType.DEBIAN
286 elif out.find("Ubuntu") ==0:
287 self._os = OSType.UBUNTU
289 msg = "Unsupported OS"
291 raise RuntimeError, "%s - %s " %( msg, out )
296 # The underlying SSH layer will sometimes return an empty
297 # output (even if the command was executed without errors).
298 # To work arround this, repeat the operation N times or
299 # until the result is not empty string
302 (out, err), proc = self.execute("cat /etc/issue",
306 trace = traceback.format_exc()
307 msg = "Error detecting OS: %s " % trace
308 self.error(msg, out, err)
314 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
318 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
323 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
325 def do_provision(self):
326 # check if host is alive
327 if not self.is_alive():
328 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
330 raise RuntimeError, msg
334 if self.get("cleanProcesses"):
335 self.clean_processes()
337 if self.get("cleanHome"):
340 if self.get("cleanExperiment"):
341 self.clean_experiment()
343 # Create shared directory structure
344 self.mkdir(self.lib_dir)
345 self.mkdir(self.bin_dir)
346 self.mkdir(self.src_dir)
347 self.mkdir(self.share_dir)
349 # Create experiment node home directory
350 self.mkdir(self.node_home)
352 super(LinuxNode, self).do_provision()
355 if self.state == ResourceState.NEW:
356 self.info("Deploying node")
360 # Node needs to wait until all associated interfaces are
361 # ready before it can finalize deployment
362 from nepi.resources.linux.interface import LinuxInterface
363 ifaces = self.get_connected(LinuxInterface.get_rtype())
365 if iface.state < ResourceState.READY:
366 self.ec.schedule(reschedule_delay, self.deploy)
369 super(LinuxNode, self).do_deploy()
371 def do_release(self):
372 rms = self.get_connected()
374 # Node needs to wait until all associated RMs are released
375 # before it can be released
376 if rm.state != ResourceState.RELEASED:
377 self.ec.schedule(reschedule_delay, self.release)
380 tear_down = self.get("tearDown")
382 self.execute(tear_down)
384 self.clean_processes()
386 super(LinuxNode, self).do_release()
388 def valid_connection(self, guid):
392 def clean_processes(self, killer = False):
393 self.info("Cleaning up processes")
397 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
398 "sudo -S killall python tcpdump || /bin/true ; " +
399 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
400 "sudo -S killall -u root || /bin/true ; " +
401 "sudo -S killall -u root || /bin/true ; ")
404 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
405 "sudo -S killall tcpdump || /bin/true ; " +
406 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
407 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
410 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
412 def clean_home(self):
413 """ Cleans all NEPI related folders in the Linux host
415 self.info("Cleaning up home")
417 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
420 return self.execute(cmd, with_lock = True)
422 def clean_experiment(self):
423 """ Cleans all experiment related files in the Linux host.
424 It preserves NEPI files and folders that have a multi experiment
427 self.info("Cleaning up experiment files")
429 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
433 return self.execute(cmd, with_lock = True)
435 def execute(self, command,
443 err_on_timeout = True,
444 connect_timeout = 30,
445 strict_host_checking = False,
450 """ Notice that this invocation will block until the
451 execution finishes. If this is not the desired behavior,
452 use 'run' instead."""
455 (out, err), proc = execfuncs.lexec(command,
462 with self._node_lock:
463 (out, err), proc = sshfuncs.rexec(
465 host = self.get("hostname"),
466 user = self.get("username"),
467 port = self.get("port"),
471 identity = self.get("identity"),
472 server_key = self.get("serverKey"),
475 forward_x11 = forward_x11,
478 err_on_timeout = err_on_timeout,
479 connect_timeout = connect_timeout,
480 persistent = persistent,
482 strict_host_checking = strict_host_checking
485 (out, err), proc = sshfuncs.rexec(
487 host = self.get("hostname"),
488 user = self.get("username"),
489 port = self.get("port"),
493 identity = self.get("identity"),
494 server_key = self.get("serverKey"),
497 forward_x11 = forward_x11,
500 err_on_timeout = err_on_timeout,
501 connect_timeout = connect_timeout,
502 persistent = persistent,
504 strict_host_checking = strict_host_checking
507 return (out, err), proc
509 def run(self, command, home,
518 self.debug("Running command '%s'" % command)
521 (out, err), proc = execfuncs.lspawn(command, pidfile,
526 create_home = create_home,
530 with self._node_lock:
531 (out, err), proc = sshfuncs.rspawn(
535 create_home = create_home,
536 stdin = stdin if stdin is not None else '/dev/null',
537 stdout = stdout if stdout else '/dev/null',
538 stderr = stderr if stderr else '/dev/null',
540 host = self.get("hostname"),
541 user = self.get("username"),
542 port = self.get("port"),
544 identity = self.get("identity"),
545 server_key = self.get("serverKey"),
549 return (out, err), proc
551 def getpid(self, home, pidfile = "pidfile"):
553 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
555 with self._node_lock:
556 pidtuple = sshfuncs.rgetpid(
557 os.path.join(home, pidfile),
558 host = self.get("hostname"),
559 user = self.get("username"),
560 port = self.get("port"),
562 identity = self.get("identity"),
563 server_key = self.get("serverKey")
568 def status(self, pid, ppid):
570 status = execfuncs.lstatus(pid, ppid)
572 with self._node_lock:
573 status = sshfuncs.rstatus(
575 host = self.get("hostname"),
576 user = self.get("username"),
577 port = self.get("port"),
579 identity = self.get("identity"),
580 server_key = self.get("serverKey")
585 def kill(self, pid, ppid, sudo = False):
588 status = self.status(pid, ppid)
590 if status == sshfuncs.ProcStatus.RUNNING:
592 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
594 with self._node_lock:
595 (out, err), proc = sshfuncs.rkill(
597 host = self.get("hostname"),
598 user = self.get("username"),
599 port = self.get("port"),
602 identity = self.get("identity"),
603 server_key = self.get("serverKey")
606 return (out, err), proc
608 def copy(self, src, dst):
610 (out, err), proc = execfuncs.lcopy(source, dest,
612 strict_host_checking = False)
614 with self._node_lock:
615 (out, err), proc = sshfuncs.rcopy(
617 port = self.get("port"),
618 identity = self.get("identity"),
619 server_key = self.get("serverKey"),
621 strict_host_checking = False)
623 return (out, err), proc
625 def upload(self, src, dst, text = False, overwrite = True):
626 """ Copy content to destination
628 src content to copy. Can be a local file, directory or a list of files
630 dst destination path on the remote host (remote is always self.host)
632 text src is text input, it must be stored into a temp file before uploading
634 # If source is a string input
636 if text and not os.path.isfile(src):
637 # src is text input that should be uploaded as file
638 # create a temporal file with the content to upload
639 f = tempfile.NamedTemporaryFile(delete=False)
644 # If dst files should not be overwritten, check that the files do not
646 if overwrite == False:
647 src = self.filter_existing_files(src, dst)
649 return ("", ""), None
651 if not self.localhost:
652 # Build destination as <user>@<server>:<path>
653 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
655 result = self.copy(src, dst)
663 def download(self, src, dst):
664 if not self.localhost:
665 # Build destination as <user>@<server>:<path>
666 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
667 return self.copy(src, dst)
669 def install_packages_command(self, packages):
672 command = rpmfuncs.install_packages_command(self.os, packages)
674 command = debfuncs.install_packages_command(self.os, packages)
676 msg = "Error installing packages ( OS not known ) "
677 self.error(msg, self.os)
678 raise RuntimeError, msg
682 def install_packages(self, packages, home, run_home = None):
683 """ Install packages in the Linux host.
685 'home' is the directory to upload the package installation script.
686 'run_home' is the directory from where to execute the script.
688 command = self.install_packages_command(packages)
690 run_home = run_home or home
692 (out, err), proc = self.run_and_wait(command, run_home,
693 shfile = os.path.join(home, "instpkg.sh"),
694 pidfile = "instpkg_pidfile",
695 ecodefile = "instpkg_exitcode",
696 stdout = "instpkg_stdout",
697 stderr = "instpkg_stderr",
699 raise_on_error = True)
701 return (out, err), proc
703 def remove_packages(self, packages, home, run_home = None):
704 """ Uninstall packages from the Linux host.
706 'home' is the directory to upload the package un-installation script.
707 'run_home' is the directory from where to execute the script.
710 command = rpmfuncs.remove_packages_command(self.os, packages)
712 command = debfuncs.remove_packages_command(self.os, packages)
714 msg = "Error removing packages ( OS not known ) "
716 raise RuntimeError, msg
718 run_home = run_home or home
720 (out, err), proc = self.run_and_wait(command, run_home,
721 shfile = os.path.join(home, "rmpkg.sh"),
722 pidfile = "rmpkg_pidfile",
723 ecodefile = "rmpkg_exitcode",
724 stdout = "rmpkg_stdout",
725 stderr = "rmpkg_stderr",
727 raise_on_error = True)
729 return (out, err), proc
731 def mkdir(self, path, clean = False):
735 return self.execute("mkdir -p %s" % path, with_lock = True)
737 def rmdir(self, path):
738 return self.execute("rm -rf %s" % path, with_lock = True)
740 def run_and_wait(self, command, home,
745 ecodefile = "exitcode",
751 raise_on_error = False):
753 Uploads the 'command' to a bash script in the host.
754 Then runs the script detached in background in the host, and
755 busy-waites until the script finishes executing.
758 if not shfile.startswith("/"):
759 shfile = os.path.join(home, shfile)
761 self.upload_command(command,
763 ecodefile = ecodefile,
765 overwrite = overwrite)
767 command = "bash %s" % shfile
768 # run command in background in remote host
769 (out, err), proc = self.run(command, home,
777 # check no errors occurred
779 msg = " Failed to run command '%s' " % command
780 self.error(msg, out, err)
782 raise RuntimeError, msg
784 # Wait for pid file to be generated
785 pid, ppid = self.wait_pid(
788 raise_on_error = raise_on_error)
790 # wait until command finishes to execute
791 self.wait_run(pid, ppid)
793 (eout, err), proc = self.check_errors(home,
794 ecodefile = ecodefile,
797 # Out is what was written in the stderr file
799 msg = " Failed to run command '%s' " % command
800 self.error(msg, eout, err)
803 raise RuntimeError, msg
805 (out, oerr), proc = self.check_output(home, stdout)
807 return (out, err), proc
809 def exitcode(self, home, ecodefile = "exitcode"):
811 Get the exit code of an application.
812 Returns an integer value with the exit code
814 (out, err), proc = self.check_output(home, ecodefile)
816 # Succeeded to open file, return exit code in the file
819 return int(out.strip())
821 # Error in the content of the file!
822 return ExitCode.CORRUPTFILE
824 # No such file or directory
825 if proc.returncode == 1:
826 return ExitCode.FILENOTFOUND
828 # Other error from 'cat'
829 return ExitCode.ERROR
831 def upload_command(self, command,
833 ecodefile = "exitcode",
836 """ Saves the command as a bash script file in the remote host, and
837 forces to save the exit code of the command execution to the ecodefile
840 if not (command.strip().endswith(";") or command.strip().endswith("&")):
843 # The exit code of the command will be stored in ecodefile
844 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
846 'ecodefile': ecodefile,
850 environ = self.format_environment(env)
852 # Add environ to command
853 command = environ + command
855 return self.upload(command, shfile, text = True, overwrite = overwrite)
857 def format_environment(self, env, inline = False):
858 """ Formats the environment variables for a command to be executed
859 either as an inline command
860 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
861 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
863 if not env: return ""
865 # Remove extra white spaces
866 env = re.sub(r'\s+', ' ', env.strip())
868 sep = ";" if inline else "\n"
869 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
871 def check_errors(self, home,
872 ecodefile = "exitcode",
874 """ Checks whether errors occurred while running a command.
875 It first checks the exit code for the command, and only if the
876 exit code is an error one it returns the error output.
882 # get exit code saved in the 'exitcode' file
883 ecode = self.exitcode(home, ecodefile)
885 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
886 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
887 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
888 # The process returned an error code or didn't exist.
889 # Check standard error.
890 (err, eerr), proc = self.check_output(home, stderr)
892 # If the stderr file was not found, assume nothing bad happened,
893 # and just ignore the error.
894 # (cat returns 1 for error "No such file or directory")
895 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
898 return ("", err), proc
900 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
901 """ Waits until the pid file for the command is generated,
902 and returns the pid and ppid of the process """
907 pidtuple = self.getpid(home = home, pidfile = pidfile)
916 msg = " Failed to get pid for pidfile %s/%s " % (
921 raise RuntimeError, msg
925 def wait_run(self, pid, ppid, trial = 0):
926 """ wait for a remote process to finish execution """
930 status = self.status(pid, ppid)
932 if status is ProcStatus.FINISHED:
934 elif status is not ProcStatus.RUNNING:
937 # If it takes more than 20 seconds to start, then
938 # asume something went wrong
942 # The app is running, just wait...
945 def check_output(self, home, filename):
946 """ Retrives content of file """
947 (out, err), proc = self.execute("cat %s" %
948 os.path.join(home, filename), retry = 1, with_lock = True)
949 return (out, err), proc
952 """ Checks if host is responsive
958 msg = "Unresponsive host. Wrong answer. "
960 # The underlying SSH layer will sometimes return an empty
961 # output (even if the command was executed without errors).
962 # To work arround this, repeat the operation N times or
963 # until the result is not empty string
965 (out, err), proc = self.execute("echo 'ALIVE'",
969 if out.find("ALIVE") > -1:
972 trace = traceback.format_exc()
973 msg = "Unresponsive host. Error reaching host: %s " % trace
975 self.error(msg, out, err)
979 """ Retrieves host home directory
981 # The underlying SSH layer will sometimes return an empty
982 # output (even if the command was executed without errors).
983 # To work arround this, repeat the operation N times or
984 # until the result is not empty string
985 msg = "Impossible to retrieve HOME directory"
987 (out, err), proc = self.execute("echo ${HOME}",
991 if out.strip() != "":
992 self._home_dir = out.strip()
994 trace = traceback.format_exc()
995 msg = "Impossible to retrieve HOME directory" % trace
997 if not self._home_dir:
998 self.error(msg, out, err)
999 raise RuntimeError, msg
1001 def filter_existing_files(self, src, dst):
1002 """ Removes files that already exist in the Linux host from src list
1004 # construct a dictionary with { dst: src }
1005 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1006 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1009 for d in dests.keys():
1010 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1012 command = ";".join(command)
1014 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1016 for d in dests.keys():
1017 if out.find(d) > -1:
1023 return " ".join(dests.values())