2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
171 flags = Flags.ExecReadOnly)
173 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
174 " from a previous same experiment, before the new experiment starts",
175 flags = Flags.ExecReadOnly)
177 clean_processes = Attribute("cleanProcesses",
178 "Kill all running processes before starting experiment",
179 flags = Flags.ExecReadOnly)
181 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
182 "releasing the resource",
183 flags = Flags.ExecReadOnly)
185 cls._register_attribute(hostname)
186 cls._register_attribute(username)
187 cls._register_attribute(port)
188 cls._register_attribute(home)
189 cls._register_attribute(identity)
190 cls._register_attribute(server_key)
191 cls._register_attribute(clean_home)
192 cls._register_attribute(clean_experiment)
193 cls._register_attribute(clean_processes)
194 cls._register_attribute(tear_down)
196 def __init__(self, ec, guid):
197 super(LinuxNode, self).__init__(ec, guid)
199 # home directory at Linux host
202 # lock to prevent concurrent applications on the same node,
203 # to execute commands at the same time. There are potential
204 # concurrency issues when using SSH to a same host from
205 # multiple threads. There are also possible operational
206 # issues, e.g. an application querying the existence
207 # of a file or folder prior to its creation, and another
208 # application creating the same file or folder in between.
209 self._node_lock = threading.Lock()
211 def log_message(self, msg):
212 return " guid %d - host %s - %s " % (self.guid,
213 self.get("hostname"), msg)
217 home = self.get("home") or ""
218 if not home.startswith("/"):
219 home = os.path.join(self._home_dir, home)
224 return os.path.join(self.home_dir, "nepi-usr")
228 return os.path.join(self.usr_dir, "lib")
232 return os.path.join(self.usr_dir, "bin")
236 return os.path.join(self.usr_dir, "src")
240 return os.path.join(self.usr_dir, "share")
244 return os.path.join(self.home_dir, "nepi-exp")
248 return os.path.join(self.exp_dir, self.ec.exp_id)
252 return os.path.join(self.exp_home, "node-%d" % self.guid)
256 return os.path.join(self.node_home, self.ec.run_id)
263 if (not self.get("hostname") or not self.get("username")):
264 msg = "Can't resolve OS, insufficient data "
266 raise RuntimeError, msg
270 if out.find("Fedora release 8") == 0:
271 self._os = OSType.FEDORA_8
272 elif out.find("Fedora release 12") == 0:
273 self._os = OSType.FEDORA_12
274 elif out.find("Fedora release 14") == 0:
275 self._os = OSType.FEDORA_14
276 elif out.find("Fedora release") == 0:
277 self._os = OSType.FEDORA
278 elif out.find("Debian") == 0:
279 self._os = OSType.DEBIAN
280 elif out.find("Ubuntu") ==0:
281 self._os = OSType.UBUNTU
283 msg = "Unsupported OS"
285 raise RuntimeError, "%s - %s " %( msg, out )
290 # The underlying SSH layer will sometimes return an empty
291 # output (even if the command was executed without errors).
292 # To work arround this, repeat the operation N times or
293 # until the result is not empty string
298 (out, err), proc = self.execute("cat /etc/issue",
303 if out.strip() != "":
306 trace = traceback.format_exc()
307 msg = "Error detecting OS: %s " % trace
308 self.error(msg, out, err)
311 time.sleep(min(30.0, retrydelay))
317 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
321 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
326 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
329 # check if host is alive
330 if not self.is_alive():
333 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
335 raise RuntimeError, msg
339 if self.get("cleanProcesses"):
340 self.clean_processes()
342 if self.get("cleanHome"):
345 if self.get("cleanExperiment"):
346 self.clean_experiment()
348 # Create shared directory structure
349 self.mkdir(self.lib_dir)
350 self.mkdir(self.bin_dir)
351 self.mkdir(self.src_dir)
352 self.mkdir(self.share_dir)
354 # Create experiment node home directory
355 self.mkdir(self.node_home)
357 super(LinuxNode, self).provision()
360 if self.state == ResourceState.NEW:
368 # Node needs to wait until all associated interfaces are
369 # ready before it can finalize deployment
370 from nepi.resources.linux.interface import LinuxInterface
371 ifaces = self.get_connected(LinuxInterface.rtype())
373 if iface.state < ResourceState.READY:
374 self.ec.schedule(reschedule_delay, self.deploy)
377 super(LinuxNode, self).deploy()
380 # Node needs to wait until all associated RMs are released
382 rms = self.get_connected()
384 if rm.state < ResourceState.STOPPED:
385 self.ec.schedule(reschedule_delay, self.release)
388 tear_down = self.get("tearDown")
390 self.execute(tear_down)
392 self.clean_processes()
394 super(LinuxNode, self).release()
396 def valid_connection(self, guid):
400 def clean_processes(self, killer = False):
401 self.info("Cleaning up processes")
405 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
406 "sudo -S killall python tcpdump || /bin/true ; " +
407 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
408 "sudo -S killall -u root || /bin/true ; " +
409 "sudo -S killall -u root || /bin/true ; ")
412 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
413 "sudo -S killall tcpdump || /bin/true ; " +
414 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
415 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
418 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
420 def clean_home(self):
421 """ Cleans all NEPI related folders in the Linux host
423 self.info("Cleaning up home")
425 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
428 return self.execute(cmd, with_lock = True)
430 def clean_experiment(self):
431 """ Cleans all experiment related files in the Linux host.
432 It preserves NEPI files and folders that have a multi experiment
435 self.info("Cleaning up experiment files")
437 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
441 return self.execute(cmd, with_lock = True)
443 def execute(self, command,
451 err_on_timeout = True,
452 connect_timeout = 30,
453 strict_host_checking = False,
458 """ Notice that this invocation will block until the
459 execution finishes. If this is not the desired behavior,
460 use 'run' instead."""
463 (out, err), proc = execfuncs.lexec(command,
470 with self._node_lock:
471 (out, err), proc = sshfuncs.rexec(
473 host = self.get("hostname"),
474 user = self.get("username"),
475 port = self.get("port"),
479 identity = self.get("identity"),
480 server_key = self.get("serverKey"),
483 forward_x11 = forward_x11,
486 err_on_timeout = err_on_timeout,
487 connect_timeout = connect_timeout,
488 persistent = persistent,
490 strict_host_checking = strict_host_checking
493 (out, err), proc = sshfuncs.rexec(
495 host = self.get("hostname"),
496 user = self.get("username"),
497 port = self.get("port"),
501 identity = self.get("identity"),
502 server_key = self.get("serverKey"),
505 forward_x11 = forward_x11,
508 err_on_timeout = err_on_timeout,
509 connect_timeout = connect_timeout,
510 persistent = persistent,
512 strict_host_checking = strict_host_checking
515 return (out, err), proc
517 def run(self, command, home,
526 self.debug("Running command '%s'" % command)
529 (out, err), proc = execfuncs.lspawn(command, pidfile,
534 create_home = create_home,
538 with self._node_lock:
539 (out, err), proc = sshfuncs.rspawn(
543 create_home = create_home,
544 stdin = stdin if stdin is not None else '/dev/null',
545 stdout = stdout if stdout else '/dev/null',
546 stderr = stderr if stderr else '/dev/null',
548 host = self.get("hostname"),
549 user = self.get("username"),
550 port = self.get("port"),
552 identity = self.get("identity"),
553 server_key = self.get("serverKey"),
557 return (out, err), proc
559 def getpid(self, home, pidfile = "pidfile"):
561 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
563 with self._node_lock:
564 pidtuple = sshfuncs.rgetpid(
565 os.path.join(home, pidfile),
566 host = self.get("hostname"),
567 user = self.get("username"),
568 port = self.get("port"),
570 identity = self.get("identity"),
571 server_key = self.get("serverKey")
576 def status(self, pid, ppid):
578 status = execfuncs.lstatus(pid, ppid)
580 with self._node_lock:
581 status = sshfuncs.rstatus(
583 host = self.get("hostname"),
584 user = self.get("username"),
585 port = self.get("port"),
587 identity = self.get("identity"),
588 server_key = self.get("serverKey")
593 def kill(self, pid, ppid, sudo = False):
596 status = self.status(pid, ppid)
598 if status == sshfuncs.ProcStatus.RUNNING:
600 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
602 with self._node_lock:
603 (out, err), proc = sshfuncs.rkill(
605 host = self.get("hostname"),
606 user = self.get("username"),
607 port = self.get("port"),
610 identity = self.get("identity"),
611 server_key = self.get("serverKey")
614 return (out, err), proc
616 def copy(self, src, dst):
618 (out, err), proc = execfuncs.lcopy(source, dest,
620 strict_host_checking = False)
622 with self._node_lock:
623 (out, err), proc = sshfuncs.rcopy(
625 port = self.get("port"),
626 identity = self.get("identity"),
627 server_key = self.get("serverKey"),
629 strict_host_checking = False)
631 return (out, err), proc
634 def upload(self, src, dst, text = False, overwrite = True):
635 """ Copy content to destination
637 src content to copy. Can be a local file, directory or a list of files
639 dst destination path on the remote host (remote is always self.host)
641 text src is text input, it must be stored into a temp file before uploading
643 # If source is a string input
645 if text and not os.path.isfile(src):
646 # src is text input that should be uploaded as file
647 # create a temporal file with the content to upload
648 f = tempfile.NamedTemporaryFile(delete=False)
653 # If dst files should not be overwritten, check that the files do not
655 if overwrite == False:
656 src = self.filter_existing_files(src, dst)
658 return ("", ""), None
660 if not self.localhost:
661 # Build destination as <user>@<server>:<path>
662 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
664 result = self.copy(src, dst)
672 def download(self, src, dst):
673 if not self.localhost:
674 # Build destination as <user>@<server>:<path>
675 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
676 return self.copy(src, dst)
678 def install_packages_command(self, packages):
681 command = rpmfuncs.install_packages_command(self.os, packages)
683 command = debfuncs.install_packages_command(self.os, packages)
685 msg = "Error installing packages ( OS not known ) "
686 self.error(msg, self.os)
687 raise RuntimeError, msg
691 def install_packages(self, packages, home, run_home = None):
692 """ Install packages in the Linux host.
694 'home' is the directory to upload the package installation script.
695 'run_home' is the directory from where to execute the script.
697 command = self.install_packages_command(packages)
699 run_home = run_home or home
701 (out, err), proc = self.run_and_wait(command, run_home,
702 shfile = os.path.join(home, "instpkg.sh"),
703 pidfile = "instpkg_pidfile",
704 ecodefile = "instpkg_exitcode",
705 stdout = "instpkg_stdout",
706 stderr = "instpkg_stderr",
708 raise_on_error = True)
710 return (out, err), proc
712 def remove_packages(self, packages, home, run_home = None):
713 """ Uninstall packages from the Linux host.
715 'home' is the directory to upload the package un-installation script.
716 'run_home' is the directory from where to execute the script.
719 command = rpmfuncs.remove_packages_command(self.os, packages)
721 command = debfuncs.remove_packages_command(self.os, packages)
723 msg = "Error removing packages ( OS not known ) "
725 raise RuntimeError, msg
727 run_home = run_home or home
729 (out, err), proc = self.run_and_wait(command, run_home,
730 shfile = os.path.join(home, "rmpkg.sh"),
731 pidfile = "rmpkg_pidfile",
732 ecodefile = "rmpkg_exitcode",
733 stdout = "rmpkg_stdout",
734 stderr = "rmpkg_stderr",
736 raise_on_error = True)
738 return (out, err), proc
740 def mkdir(self, path, clean = False):
744 return self.execute("mkdir -p %s" % path, with_lock = True)
746 def rmdir(self, path):
747 return self.execute("rm -rf %s" % path, with_lock = True)
749 def run_and_wait(self, command, home,
754 ecodefile = "exitcode",
760 raise_on_error = False):
762 Uploads the 'command' to a bash script in the host.
763 Then runs the script detached in background in the host, and
764 busy-waites until the script finishes executing.
767 if not shfile.startswith("/"):
768 shfile = os.path.join(home, shfile)
770 self.upload_command(command,
772 ecodefile = ecodefile,
774 overwrite = overwrite)
776 command = "bash %s" % shfile
777 # run command in background in remote host
778 (out, err), proc = self.run(command, home,
786 # check no errors occurred
788 msg = " Failed to run command '%s' " % command
789 self.error(msg, out, err)
791 raise RuntimeError, msg
793 # Wait for pid file to be generated
794 pid, ppid = self.wait_pid(
797 raise_on_error = raise_on_error)
799 # wait until command finishes to execute
800 self.wait_run(pid, ppid)
802 (eout, err), proc = self.check_errors(home,
803 ecodefile = ecodefile,
806 # Out is what was written in the stderr file
808 msg = " Failed to run command '%s' " % command
809 self.error(msg, eout, err)
812 raise RuntimeError, msg
814 (out, oerr), proc = self.check_output(home, stdout)
816 return (out, err), proc
818 def exitcode(self, home, ecodefile = "exitcode"):
820 Get the exit code of an application.
821 Returns an integer value with the exit code
823 (out, err), proc = self.check_output(home, ecodefile)
825 # Succeeded to open file, return exit code in the file
828 return int(out.strip())
830 # Error in the content of the file!
831 return ExitCode.CORRUPTFILE
833 # No such file or directory
834 if proc.returncode == 1:
835 return ExitCode.FILENOTFOUND
837 # Other error from 'cat'
838 return ExitCode.ERROR
840 def upload_command(self, command,
842 ecodefile = "exitcode",
845 """ Saves the command as a bash script file in the remote host, and
846 forces to save the exit code of the command execution to the ecodefile
849 if not (command.strip().endswith(";") or command.strip().endswith("&")):
852 # The exit code of the command will be stored in ecodefile
853 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
855 'ecodefile': ecodefile,
859 environ = self.format_environment(env)
861 # Add environ to command
862 command = environ + command
864 return self.upload(command, shfile, text = True, overwrite = overwrite)
866 def format_environment(self, env, inline = False):
867 """ Formats the environment variables for a command to be executed
868 either as an inline command
869 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
870 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
872 if not env: return ""
874 # Remove extra white spaces
875 env = re.sub(r'\s+', ' ', env.strip())
877 sep = ";" if inline else "\n"
878 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
880 def check_errors(self, home,
881 ecodefile = "exitcode",
883 """ Checks whether errors occurred while running a command.
884 It first checks the exit code for the command, and only if the
885 exit code is an error one it returns the error output.
891 # get exit code saved in the 'exitcode' file
892 ecode = self.exitcode(home, ecodefile)
894 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
895 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
896 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
897 # The process returned an error code or didn't exist.
898 # Check standard error.
899 (err, eerr), proc = self.check_output(home, stderr)
901 # If the stderr file was not found, assume nothing bad happened,
902 # and just ignore the error.
903 # (cat returns 1 for error "No such file or directory")
904 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
907 return ("", err), proc
909 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
910 """ Waits until the pid file for the command is generated,
911 and returns the pid and ppid of the process """
916 pidtuple = self.getpid(home = home, pidfile = pidfile)
925 msg = " Failed to get pid for pidfile %s/%s " % (
930 raise RuntimeError, msg
934 def wait_run(self, pid, ppid, trial = 0):
935 """ wait for a remote process to finish execution """
939 status = self.status(pid, ppid)
941 if status is ProcStatus.FINISHED:
943 elif status is not ProcStatus.RUNNING:
946 # If it takes more than 20 seconds to start, then
947 # asume something went wrong
951 # The app is running, just wait...
954 def check_output(self, home, filename):
955 """ Retrives content of file """
956 (out, err), proc = self.execute("cat %s" %
957 os.path.join(home, filename), retry = 1, with_lock = True)
958 return (out, err), proc
961 """ Checks if host is responsive
967 # The underlying SSH layer will sometimes return an empty
968 # output (even if the command was executed without errors).
969 # To work arround this, repeat the operation N times or
970 # until the result is not empty string
974 (out, err), proc = self.execute("echo 'ALIVE'",
979 if out.find("ALIVE") > -1:
982 trace = traceback.format_exc()
983 msg = "Unresponsive host. Error reaching host: %s " % trace
984 self.error(msg, out, err)
987 time.sleep(min(30.0, retrydelay))
990 if out.find("ALIVE") > -1:
993 msg = "Unresponsive host. Wrong answer. "
994 self.error(msg, out, err)
998 """ Retrieves host home directory
1000 # The underlying SSH layer will sometimes return an empty
1001 # output (even if the command was executed without errors).
1002 # To work arround this, repeat the operation N times or
1003 # until the result is not empty string
1007 (out, err), proc = self.execute("echo ${HOME}",
1012 if out.strip() != "":
1013 self._home_dir = out.strip()
1016 trace = traceback.format_exc()
1017 msg = "Impossible to retrieve HOME directory" % trace
1018 self.error(msg, out, err)
1021 time.sleep(min(30.0, retrydelay))
1024 if not self._home_dir:
1025 msg = "Impossible to retrieve HOME directory"
1026 self.error(msg, out, err)
1027 raise RuntimeError, msg
1029 def filter_existing_files(self, src, dst):
1030 """ Removes files that already exist in the Linux host from src list
1032 # construct a dictionary with { dst: src }
1033 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1034 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1037 for d in dests.keys():
1038 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1040 command = ";".join(command)
1042 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1044 for d in dests.keys():
1045 if out.find(d) > -1:
1051 return " ".join(dests.values())