2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
171 flags = Flags.ExecReadOnly)
173 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
174 " from a previous same experiment, before the new experiment starts",
175 flags = Flags.ExecReadOnly)
177 clean_processes = Attribute("cleanProcesses",
178 "Kill all running processes before starting experiment",
179 flags = Flags.ExecReadOnly)
181 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
182 "releasing the resource",
183 flags = Flags.ExecReadOnly)
185 cls._register_attribute(hostname)
186 cls._register_attribute(username)
187 cls._register_attribute(port)
188 cls._register_attribute(home)
189 cls._register_attribute(identity)
190 cls._register_attribute(server_key)
191 cls._register_attribute(clean_home)
192 cls._register_attribute(clean_experiment)
193 cls._register_attribute(clean_processes)
194 cls._register_attribute(tear_down)
196 def __init__(self, ec, guid):
197 super(LinuxNode, self).__init__(ec, guid)
199 # home directory at Linux host
202 # lock to prevent concurrent applications on the same node,
203 # to execute commands at the same time. There are potential
204 # concurrency issues when using SSH to a same host from
205 # multiple threads. There are also possible operational
206 # issues, e.g. an application querying the existence
207 # of a file or folder prior to its creation, and another
208 # application creating the same file or folder in between.
209 self._node_lock = threading.Lock()
211 def log_message(self, msg):
212 return " guid %d - host %s - %s " % (self.guid,
213 self.get("hostname"), msg)
217 home = self.get("home") or ""
218 if not home.startswith("/"):
219 home = os.path.join(self._home_dir, home)
224 return os.path.join(self.home_dir, "nepi-usr")
228 return os.path.join(self.usr_dir, "lib")
232 return os.path.join(self.usr_dir, "bin")
236 return os.path.join(self.usr_dir, "src")
240 return os.path.join(self.usr_dir, "share")
244 return os.path.join(self.home_dir, "nepi-exp")
248 return os.path.join(self.exp_dir, self.ec.exp_id)
252 return os.path.join(self.exp_home, "node-%d" % self.guid)
256 return os.path.join(self.node_home, self.ec.run_id)
263 if (not self.get("hostname") or not self.get("username")):
264 msg = "Can't resolve OS, insufficient data "
266 raise RuntimeError, msg
270 if out.find("Fedora release 8") == 0:
271 self._os = OSType.FEDORA_8
272 elif out.find("Fedora release 12") == 0:
273 self._os = OSType.FEDORA_12
274 elif out.find("Fedora release 14") == 0:
275 self._os = OSType.FEDORA_14
276 elif out.find("Debian") == 0:
277 self._os = OSType.DEBIAN
278 elif out.find("Ubuntu") ==0:
279 self._os = OSType.UBUNTU
281 msg = "Unsupported OS"
283 raise RuntimeError, "%s - %s " %( msg, out )
288 # The underlying SSH layer will sometimes return an empty
289 # output (even if the command was executed without errors).
290 # To work arround this, repeat the operation N times or
291 # until the result is not empty string
296 (out, err), proc = self.execute("cat /etc/issue",
301 if out.strip() != "":
304 trace = traceback.format_exc()
305 msg = "Error detecting OS: %s " % trace
306 self.error(msg, out, err)
309 time.sleep(min(30.0, retrydelay))
315 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
319 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
324 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
327 # check if host is alive
328 if not self.is_alive():
331 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
333 raise RuntimeError, msg
337 if self.get("cleanProcesses"):
338 self.clean_processes()
340 if self.get("cleanHome"):
343 if self.get("cleanExperiment"):
344 self.clean_experiment()
346 # Create shared directory structure
347 self.mkdir(self.lib_dir)
348 self.mkdir(self.bin_dir)
349 self.mkdir(self.src_dir)
350 self.mkdir(self.share_dir)
352 # Create experiment node home directory
353 self.mkdir(self.node_home)
355 super(LinuxNode, self).provision()
358 if self.state == ResourceState.NEW:
366 # Node needs to wait until all associated interfaces are
367 # ready before it can finalize deployment
368 from nepi.resources.linux.interface import LinuxInterface
369 ifaces = self.get_connected(LinuxInterface.rtype())
371 if iface.state < ResourceState.READY:
372 self.ec.schedule(reschedule_delay, self.deploy)
375 super(LinuxNode, self).deploy()
378 # Node needs to wait until all associated RMs are released
380 rms = self.get_connected()
382 if rm.state < ResourceState.STOPPED:
383 self.ec.schedule(reschedule_delay, self.release)
386 tear_down = self.get("tearDown")
388 self.execute(tear_down)
390 self.clean_processes()
392 super(LinuxNode, self).release()
394 def valid_connection(self, guid):
398 def clean_processes(self, killer = False):
399 self.info("Cleaning up processes")
403 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
404 "sudo -S killall python tcpdump || /bin/true ; " +
405 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
406 "sudo -S killall -u root || /bin/true ; " +
407 "sudo -S killall -u root || /bin/true ; ")
410 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
411 "sudo -S killall tcpdump || /bin/true ; " +
412 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
413 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
416 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
418 def clean_home(self):
419 """ Cleans all NEPI related folders in the Linux host
421 self.info("Cleaning up home")
423 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
426 return self.execute(cmd, with_lock = True)
428 def clean_experiment(self):
429 """ Cleans all experiment related files in the Linux host.
430 It preserves NEPI files and folders that have a multi experiment
433 self.info("Cleaning up experiment files")
435 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
439 return self.execute(cmd, with_lock = True)
441 def execute(self, command,
449 err_on_timeout = True,
450 connect_timeout = 30,
451 strict_host_checking = False,
456 """ Notice that this invocation will block until the
457 execution finishes. If this is not the desired behavior,
458 use 'run' instead."""
461 (out, err), proc = execfuncs.lexec(command,
468 with self._node_lock:
469 (out, err), proc = sshfuncs.rexec(
471 host = self.get("hostname"),
472 user = self.get("username"),
473 port = self.get("port"),
477 identity = self.get("identity"),
478 server_key = self.get("serverKey"),
481 forward_x11 = forward_x11,
484 err_on_timeout = err_on_timeout,
485 connect_timeout = connect_timeout,
486 persistent = persistent,
488 strict_host_checking = strict_host_checking
491 (out, err), proc = sshfuncs.rexec(
493 host = self.get("hostname"),
494 user = self.get("username"),
495 port = self.get("port"),
499 identity = self.get("identity"),
500 server_key = self.get("serverKey"),
503 forward_x11 = forward_x11,
506 err_on_timeout = err_on_timeout,
507 connect_timeout = connect_timeout,
508 persistent = persistent,
510 strict_host_checking = strict_host_checking
513 return (out, err), proc
515 def run(self, command, home,
524 self.debug("Running command '%s'" % command)
527 (out, err), proc = execfuncs.lspawn(command, pidfile,
532 create_home = create_home,
536 with self._node_lock:
537 (out, err), proc = sshfuncs.rspawn(
541 create_home = create_home,
542 stdin = stdin if stdin is not None else '/dev/null',
543 stdout = stdout if stdout else '/dev/null',
544 stderr = stderr if stderr else '/dev/null',
546 host = self.get("hostname"),
547 user = self.get("username"),
548 port = self.get("port"),
550 identity = self.get("identity"),
551 server_key = self.get("serverKey"),
555 return (out, err), proc
557 def getpid(self, home, pidfile = "pidfile"):
559 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
561 with self._node_lock:
562 pidtuple = sshfuncs.rgetpid(
563 os.path.join(home, pidfile),
564 host = self.get("hostname"),
565 user = self.get("username"),
566 port = self.get("port"),
568 identity = self.get("identity"),
569 server_key = self.get("serverKey")
574 def status(self, pid, ppid):
576 status = execfuncs.lstatus(pid, ppid)
578 with self._node_lock:
579 status = sshfuncs.rstatus(
581 host = self.get("hostname"),
582 user = self.get("username"),
583 port = self.get("port"),
585 identity = self.get("identity"),
586 server_key = self.get("serverKey")
591 def kill(self, pid, ppid, sudo = False):
594 status = self.status(pid, ppid)
596 if status == sshfuncs.ProcStatus.RUNNING:
598 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
600 with self._node_lock:
601 (out, err), proc = sshfuncs.rkill(
603 host = self.get("hostname"),
604 user = self.get("username"),
605 port = self.get("port"),
608 identity = self.get("identity"),
609 server_key = self.get("serverKey")
612 return (out, err), proc
614 def copy(self, src, dst):
616 (out, err), proc = execfuncs.lcopy(source, dest,
618 strict_host_checking = False)
620 with self._node_lock:
621 (out, err), proc = sshfuncs.rcopy(
623 port = self.get("port"),
624 identity = self.get("identity"),
625 server_key = self.get("serverKey"),
627 strict_host_checking = False)
629 return (out, err), proc
632 def upload(self, src, dst, text = False, overwrite = True):
633 """ Copy content to destination
635 src content to copy. Can be a local file, directory or a list of files
637 dst destination path on the remote host (remote is always self.host)
639 text src is text input, it must be stored into a temp file before uploading
641 # If source is a string input
643 if text and not os.path.isfile(src):
644 # src is text input that should be uploaded as file
645 # create a temporal file with the content to upload
646 f = tempfile.NamedTemporaryFile(delete=False)
651 # If dst files should not be overwritten, check that the files do not
653 if overwrite == False:
654 src = self.filter_existing_files(src, dst)
656 return ("", ""), None
658 if not self.localhost:
659 # Build destination as <user>@<server>:<path>
660 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
662 result = self.copy(src, dst)
670 def download(self, src, dst):
671 if not self.localhost:
672 # Build destination as <user>@<server>:<path>
673 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
674 return self.copy(src, dst)
676 def install_packages_command(self, packages):
679 command = rpmfuncs.install_packages_command(self.os, packages)
681 command = debfuncs.install_packages_command(self.os, packages)
683 msg = "Error installing packages ( OS not known ) "
684 self.error(msg, self.os)
685 raise RuntimeError, msg
689 def install_packages(self, packages, home, run_home = None):
690 """ Install packages in the Linux host.
692 'home' is the directory to upload the package installation script.
693 'run_home' is the directory from where to execute the script.
695 command = self.install_packages_command(packages)
697 run_home = run_home or home
699 (out, err), proc = self.run_and_wait(command, run_home,
700 shfile = os.path.join(home, "instpkg.sh"),
701 pidfile = "instpkg_pidfile",
702 ecodefile = "instpkg_exitcode",
703 stdout = "instpkg_stdout",
704 stderr = "instpkg_stderr",
706 raise_on_error = True)
708 return (out, err), proc
710 def remove_packages(self, packages, home, run_home = None):
711 """ Uninstall packages from the Linux host.
713 'home' is the directory to upload the package un-installation script.
714 'run_home' is the directory from where to execute the script.
717 command = rpmfuncs.remove_packages_command(self.os, packages)
719 command = debfuncs.remove_packages_command(self.os, packages)
721 msg = "Error removing packages ( OS not known ) "
723 raise RuntimeError, msg
725 run_home = run_home or home
727 (out, err), proc = self.run_and_wait(command, run_home,
728 shfile = os.path.join(home, "rmpkg.sh"),
729 pidfile = "rmpkg_pidfile",
730 ecodefile = "rmpkg_exitcode",
731 stdout = "rmpkg_stdout",
732 stderr = "rmpkg_stderr",
734 raise_on_error = True)
736 return (out, err), proc
738 def mkdir(self, path, clean = False):
742 return self.execute("mkdir -p %s" % path, with_lock = True)
744 def rmdir(self, path):
745 return self.execute("rm -rf %s" % path, with_lock = True)
747 def run_and_wait(self, command, home,
752 ecodefile = "exitcode",
758 raise_on_error = False):
760 Uploads the 'command' to a bash script in the host.
761 Then runs the script detached in background in the host, and
762 busy-waites until the script finishes executing.
765 if not shfile.startswith("/"):
766 shfile = os.path.join(home, shfile)
768 self.upload_command(command,
770 ecodefile = ecodefile,
772 overwrite = overwrite)
774 command = "bash %s" % shfile
775 # run command in background in remote host
776 (out, err), proc = self.run(command, home,
784 # check no errors occurred
786 msg = " Failed to run command '%s' " % command
787 self.error(msg, out, err)
789 raise RuntimeError, msg
791 # Wait for pid file to be generated
792 pid, ppid = self.wait_pid(
795 raise_on_error = raise_on_error)
797 # wait until command finishes to execute
798 self.wait_run(pid, ppid)
800 (eout, err), proc = self.check_errors(home,
801 ecodefile = ecodefile,
804 # Out is what was written in the stderr file
806 msg = " Failed to run command '%s' " % command
807 self.error(msg, eout, err)
810 raise RuntimeError, msg
812 (out, oerr), proc = self.check_output(home, stdout)
814 return (out, err), proc
816 def exitcode(self, home, ecodefile = "exitcode"):
818 Get the exit code of an application.
819 Returns an integer value with the exit code
821 (out, err), proc = self.check_output(home, ecodefile)
823 # Succeeded to open file, return exit code in the file
826 return int(out.strip())
828 # Error in the content of the file!
829 return ExitCode.CORRUPTFILE
831 # No such file or directory
832 if proc.returncode == 1:
833 return ExitCode.FILENOTFOUND
835 # Other error from 'cat'
836 return ExitCode.ERROR
838 def upload_command(self, command,
840 ecodefile = "exitcode",
843 """ Saves the command as a bash script file in the remote host, and
844 forces to save the exit code of the command execution to the ecodefile
847 if not (command.strip().endswith(";") or command.strip().endswith("&")):
850 # The exit code of the command will be stored in ecodefile
851 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
853 'ecodefile': ecodefile,
857 environ = self.format_environment(env)
859 # Add environ to command
860 command = environ + command
862 return self.upload(command, shfile, text = True, overwrite = overwrite)
864 def format_environment(self, env, inline = False):
865 """ Formats the environment variables for a command to be executed
866 either as an inline command
867 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
868 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
870 if not env: return ""
872 # Remove extra white spaces
873 env = re.sub(r'\s+', ' ', env.strip())
875 sep = ";" if inline else "\n"
876 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
878 def check_errors(self, home,
879 ecodefile = "exitcode",
881 """ Checks whether errors occurred while running a command.
882 It first checks the exit code for the command, and only if the
883 exit code is an error one it returns the error output.
889 # get exit code saved in the 'exitcode' file
890 ecode = self.exitcode(home, ecodefile)
892 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
893 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
894 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
895 # The process returned an error code or didn't exist.
896 # Check standard error.
897 (err, eerr), proc = self.check_output(home, stderr)
899 # If the stderr file was not found, assume nothing bad happened,
900 # and just ignore the error.
901 # (cat returns 1 for error "No such file or directory")
902 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
905 return ("", err), proc
907 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
908 """ Waits until the pid file for the command is generated,
909 and returns the pid and ppid of the process """
914 pidtuple = self.getpid(home = home, pidfile = pidfile)
923 msg = " Failed to get pid for pidfile %s/%s " % (
928 raise RuntimeError, msg
932 def wait_run(self, pid, ppid, trial = 0):
933 """ wait for a remote process to finish execution """
937 status = self.status(pid, ppid)
939 if status is ProcStatus.FINISHED:
941 elif status is not ProcStatus.RUNNING:
944 # If it takes more than 20 seconds to start, then
945 # asume something went wrong
949 # The app is running, just wait...
952 def check_output(self, home, filename):
953 """ Retrives content of file """
954 (out, err), proc = self.execute("cat %s" %
955 os.path.join(home, filename), retry = 1, with_lock = True)
956 return (out, err), proc
959 """ Checks if host is responsive
965 # The underlying SSH layer will sometimes return an empty
966 # output (even if the command was executed without errors).
967 # To work arround this, repeat the operation N times or
968 # until the result is not empty string
972 (out, err), proc = self.execute("echo 'ALIVE'",
977 if out.find("ALIVE") > -1:
980 trace = traceback.format_exc()
981 msg = "Unresponsive host. Error reaching host: %s " % trace
982 self.error(msg, out, err)
985 time.sleep(min(30.0, retrydelay))
988 if out.find("ALIVE") > -1:
991 msg = "Unresponsive host. Wrong answer. "
992 self.error(msg, out, err)
996 """ Retrieves host home directory
998 # The underlying SSH layer will sometimes return an empty
999 # output (even if the command was executed without errors).
1000 # To work arround this, repeat the operation N times or
1001 # until the result is not empty string
1003 for i in xrange(10):
1005 (out, err), proc = self.execute("echo ${HOME}",
1010 if out.strip() != "":
1011 self._home_dir = out.strip()
1014 trace = traceback.format_exc()
1015 msg = "Impossible to retrieve HOME directory" % trace
1016 self.error(msg, out, err)
1019 time.sleep(min(30.0, retrydelay))
1022 if not self._home_dir:
1023 msg = "Impossible to retrieve HOME directory"
1024 self.error(msg, out, err)
1025 raise RuntimeError, msg
1027 def filter_existing_files(self, src, dst):
1028 """ Removes files that already exist in the Linux host from src list
1030 # construct a dictionary with { dst: src }
1031 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1032 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1035 for d in dests.keys():
1036 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1038 command = ";".join(command)
1040 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1042 for d in dests.keys():
1043 if out.find(d) > -1:
1049 return " ".join(dests.values())