2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
147 def _register_attributes(cls):
148 hostname = Attribute("hostname", "Hostname of the machine",
149 flags = Flags.ExecReadOnly)
151 username = Attribute("username", "Local account username",
152 flags = Flags.Credential)
154 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
156 home = Attribute("home",
157 "Experiment home directory to store all experiment related files",
158 flags = Flags.ExecReadOnly)
160 identity = Attribute("identity", "SSH identity file",
161 flags = Flags.Credential)
163 server_key = Attribute("serverKey", "Server public key",
164 flags = Flags.ExecReadOnly)
166 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167 " from node home folder before starting experiment",
168 flags = Flags.ExecReadOnly)
170 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
171 " from a previous same experiment, before the new experiment starts",
172 flags = Flags.ExecReadOnly)
174 clean_processes = Attribute("cleanProcesses",
175 "Kill all running processes before starting experiment",
176 flags = Flags.ExecReadOnly)
178 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179 "releasing the resource",
180 flags = Flags.ExecReadOnly)
182 cls._register_attribute(hostname)
183 cls._register_attribute(username)
184 cls._register_attribute(port)
185 cls._register_attribute(home)
186 cls._register_attribute(identity)
187 cls._register_attribute(server_key)
188 cls._register_attribute(clean_home)
189 cls._register_attribute(clean_experiment)
190 cls._register_attribute(clean_processes)
191 cls._register_attribute(tear_down)
193 def __init__(self, ec, guid):
194 super(LinuxNode, self).__init__(ec, guid)
196 # home directory at Linux host
199 # lock to prevent concurrent applications on the same node,
200 # to execute commands at the same time. There are potential
201 # concurrency issues when using SSH to a same host from
202 # multiple threads. There are also possible operational
203 # issues, e.g. an application querying the existence
204 # of a file or folder prior to its creation, and another
205 # application creating the same file or folder in between.
206 self._node_lock = threading.Lock()
208 def log_message(self, msg):
209 return " guid %d - host %s - %s " % (self.guid,
210 self.get("hostname"), msg)
214 home = self.get("home") or ""
215 if not home.startswith("/"):
216 home = os.path.join(self._home_dir, home)
221 return os.path.join(self.home_dir, "nepi-usr")
225 return os.path.join(self.usr_dir, "lib")
229 return os.path.join(self.usr_dir, "bin")
233 return os.path.join(self.usr_dir, "src")
237 return os.path.join(self.usr_dir, "share")
241 return os.path.join(self.home_dir, "nepi-exp")
245 return os.path.join(self.exp_dir, self.ec.exp_id)
249 return os.path.join(self.exp_home, "node-%d" % self.guid)
253 return os.path.join(self.node_home, self.ec.run_id)
260 if (not self.get("hostname") or not self.get("username")):
261 msg = "Can't resolve OS, insufficient data "
263 raise RuntimeError, msg
267 if out.find("Fedora release 8") == 0:
268 self._os = OSType.FEDORA_8
269 elif out.find("Fedora release 12") == 0:
270 self._os = OSType.FEDORA_12
271 elif out.find("Fedora release 14") == 0:
272 self._os = OSType.FEDORA_14
273 elif out.find("Debian") == 0:
274 self._os = OSType.DEBIAN
275 elif out.find("Ubuntu") ==0:
276 self._os = OSType.UBUNTU
278 msg = "Unsupported OS"
280 raise RuntimeError, "%s - %s " %( msg, out )
285 # The underlying SSH layer will sometimes return an empty
286 # output (even if the command was executed without errors).
287 # To work arround this, repeat the operation N times or
288 # until the result is not empty string
293 (out, err), proc = self.execute("cat /etc/issue",
298 if out.strip() != "":
301 trace = traceback.format_exc()
302 msg = "Error detecting OS: %s " % trace
303 self.error(msg, out, err)
306 time.sleep(min(30.0, retrydelay))
312 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
316 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
321 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
324 # check if host is alive
325 if not self.is_alive():
328 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
330 raise RuntimeError, msg
334 if self.get("cleanProcesses"):
335 self.clean_processes()
337 if self.get("cleanHome"):
340 if self.get("cleanExperiment"):
341 self.clean_experiment()
343 # Create shared directory structure
344 self.mkdir(self.lib_dir)
345 self.mkdir(self.bin_dir)
346 self.mkdir(self.src_dir)
347 self.mkdir(self.share_dir)
349 # Create experiment node home directory
350 self.mkdir(self.node_home)
352 super(LinuxNode, self).provision()
355 if self.state == ResourceState.NEW:
363 # Node needs to wait until all associated interfaces are
364 # ready before it can finalize deployment
365 from nepi.resources.linux.interface import LinuxInterface
366 ifaces = self.get_connected(LinuxInterface.rtype())
368 if iface.state < ResourceState.READY:
369 self.ec.schedule(reschedule_delay, self.deploy)
372 super(LinuxNode, self).deploy()
375 # Node needs to wait until all associated RMs are released
377 rms = self.get_connected()
379 if rm.state < ResourceState.STOPPED:
380 self.ec.schedule(reschedule_delay, self.release)
383 tear_down = self.get("tearDown")
385 self.execute(tear_down)
387 self.clean_processes()
389 super(LinuxNode, self).release()
391 def valid_connection(self, guid):
395 def clean_processes(self, killer = False):
396 self.info("Cleaning up processes")
400 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
401 "sudo -S killall python tcpdump || /bin/true ; " +
402 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
403 "sudo -S killall -u root || /bin/true ; " +
404 "sudo -S killall -u root || /bin/true ; ")
407 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
408 "sudo -S killall tcpdump || /bin/true ; " +
409 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
410 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
413 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
415 def clean_home(self):
416 """ Cleans all NEPI related folders in the Linux host
418 self.info("Cleaning up home")
420 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
423 return self.execute(cmd, with_lock = True)
425 def clean_experiment(self):
426 """ Cleans all experiment related files in the Linux host.
427 It preserves NEPI files and folders that have a multi experiment
430 self.info("Cleaning up experiment files")
432 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
436 return self.execute(cmd, with_lock = True)
438 def execute(self, command,
446 err_on_timeout = True,
447 connect_timeout = 30,
448 strict_host_checking = False,
453 """ Notice that this invocation will block until the
454 execution finishes. If this is not the desired behavior,
455 use 'run' instead."""
458 (out, err), proc = execfuncs.lexec(command,
465 with self._node_lock:
466 (out, err), proc = sshfuncs.rexec(
468 host = self.get("hostname"),
469 user = self.get("username"),
470 port = self.get("port"),
474 identity = self.get("identity"),
475 server_key = self.get("serverKey"),
478 forward_x11 = forward_x11,
481 err_on_timeout = err_on_timeout,
482 connect_timeout = connect_timeout,
483 persistent = persistent,
485 strict_host_checking = strict_host_checking
488 (out, err), proc = sshfuncs.rexec(
490 host = self.get("hostname"),
491 user = self.get("username"),
492 port = self.get("port"),
496 identity = self.get("identity"),
497 server_key = self.get("serverKey"),
500 forward_x11 = forward_x11,
503 err_on_timeout = err_on_timeout,
504 connect_timeout = connect_timeout,
505 persistent = persistent,
507 strict_host_checking = strict_host_checking
510 return (out, err), proc
512 def run(self, command, home,
521 self.debug("Running command '%s'" % command)
524 (out, err), proc = execfuncs.lspawn(command, pidfile,
529 create_home = create_home,
533 with self._node_lock:
534 (out, err), proc = sshfuncs.rspawn(
538 create_home = create_home,
539 stdin = stdin if stdin is not None else '/dev/null',
540 stdout = stdout if stdout else '/dev/null',
541 stderr = stderr if stderr else '/dev/null',
543 host = self.get("hostname"),
544 user = self.get("username"),
545 port = self.get("port"),
547 identity = self.get("identity"),
548 server_key = self.get("serverKey"),
552 return (out, err), proc
554 def getpid(self, home, pidfile = "pidfile"):
556 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
558 with self._node_lock:
559 pidtuple = sshfuncs.rgetpid(
560 os.path.join(home, pidfile),
561 host = self.get("hostname"),
562 user = self.get("username"),
563 port = self.get("port"),
565 identity = self.get("identity"),
566 server_key = self.get("serverKey")
571 def status(self, pid, ppid):
573 status = execfuncs.lstatus(pid, ppid)
575 with self._node_lock:
576 status = sshfuncs.rstatus(
578 host = self.get("hostname"),
579 user = self.get("username"),
580 port = self.get("port"),
582 identity = self.get("identity"),
583 server_key = self.get("serverKey")
588 def kill(self, pid, ppid, sudo = False):
591 status = self.status(pid, ppid)
593 if status == sshfuncs.ProcStatus.RUNNING:
595 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
597 with self._node_lock:
598 (out, err), proc = sshfuncs.rkill(
600 host = self.get("hostname"),
601 user = self.get("username"),
602 port = self.get("port"),
605 identity = self.get("identity"),
606 server_key = self.get("serverKey")
609 return (out, err), proc
611 def copy(self, src, dst):
613 (out, err), proc = execfuncs.lcopy(source, dest,
615 strict_host_checking = False)
617 with self._node_lock:
618 (out, err), proc = sshfuncs.rcopy(
620 port = self.get("port"),
621 identity = self.get("identity"),
622 server_key = self.get("serverKey"),
624 strict_host_checking = False)
626 return (out, err), proc
629 def upload(self, src, dst, text = False, overwrite = True):
630 """ Copy content to destination
632 src content to copy. Can be a local file, directory or a list of files
634 dst destination path on the remote host (remote is always self.host)
636 text src is text input, it must be stored into a temp file before uploading
638 # If source is a string input
640 if text and not os.path.isfile(src):
641 # src is text input that should be uploaded as file
642 # create a temporal file with the content to upload
643 f = tempfile.NamedTemporaryFile(delete=False)
648 # If dst files should not be overwritten, check that the files do not
650 if overwrite == False:
651 src = self.filter_existing_files(src, dst)
653 return ("", ""), None
655 if not self.localhost:
656 # Build destination as <user>@<server>:<path>
657 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
659 result = self.copy(src, dst)
667 def download(self, src, dst):
668 if not self.localhost:
669 # Build destination as <user>@<server>:<path>
670 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
671 return self.copy(src, dst)
673 def install_packages_command(self, packages):
676 command = rpmfuncs.install_packages_command(self.os, packages)
678 command = debfuncs.install_packages_command(self.os, packages)
680 msg = "Error installing packages ( OS not known ) "
681 self.error(msg, self.os)
682 raise RuntimeError, msg
686 def install_packages(self, packages, home, run_home = None):
687 """ Install packages in the Linux host.
689 'home' is the directory to upload the package installation script.
690 'run_home' is the directory from where to execute the script.
692 command = self.install_packages_command(packages)
694 run_home = run_home or home
696 (out, err), proc = self.run_and_wait(command, run_home,
697 shfile = os.path.join(home, "instpkg.sh"),
698 pidfile = "instpkg_pidfile",
699 ecodefile = "instpkg_exitcode",
700 stdout = "instpkg_stdout",
701 stderr = "instpkg_stderr",
703 raise_on_error = True)
705 return (out, err), proc
707 def remove_packages(self, packages, home, run_home = None):
708 """ Uninstall packages from the Linux host.
710 'home' is the directory to upload the package un-installation script.
711 'run_home' is the directory from where to execute the script.
714 command = rpmfuncs.remove_packages_command(self.os, packages)
716 command = debfuncs.remove_packages_command(self.os, packages)
718 msg = "Error removing packages ( OS not known ) "
720 raise RuntimeError, msg
722 run_home = run_home or home
724 (out, err), proc = self.run_and_wait(command, run_home,
725 shfile = os.path.join(home, "rmpkg.sh"),
726 pidfile = "rmpkg_pidfile",
727 ecodefile = "rmpkg_exitcode",
728 stdout = "rmpkg_stdout",
729 stderr = "rmpkg_stderr",
731 raise_on_error = True)
733 return (out, err), proc
735 def mkdir(self, path, clean = False):
739 return self.execute("mkdir -p %s" % path, with_lock = True)
741 def rmdir(self, path):
742 return self.execute("rm -rf %s" % path, with_lock = True)
744 def run_and_wait(self, command, home,
749 ecodefile = "exitcode",
755 raise_on_error = False):
757 Uploads the 'command' to a bash script in the host.
758 Then runs the script detached in background in the host, and
759 busy-waites until the script finishes executing.
762 if not shfile.startswith("/"):
763 shfile = os.path.join(home, shfile)
765 self.upload_command(command,
767 ecodefile = ecodefile,
769 overwrite = overwrite)
771 command = "bash %s" % shfile
772 # run command in background in remote host
773 (out, err), proc = self.run(command, home,
781 # check no errors occurred
783 msg = " Failed to run command '%s' " % command
784 self.error(msg, out, err)
786 raise RuntimeError, msg
788 # Wait for pid file to be generated
789 pid, ppid = self.wait_pid(
792 raise_on_error = raise_on_error)
794 # wait until command finishes to execute
795 self.wait_run(pid, ppid)
797 (eout, err), proc = self.check_errors(home,
798 ecodefile = ecodefile,
801 # Out is what was written in the stderr file
803 msg = " Failed to run command '%s' " % command
804 self.error(msg, eout, err)
807 raise RuntimeError, msg
809 (out, oerr), proc = self.check_output(home, stdout)
811 return (out, err), proc
813 def exitcode(self, home, ecodefile = "exitcode"):
815 Get the exit code of an application.
816 Returns an integer value with the exit code
818 (out, err), proc = self.check_output(home, ecodefile)
820 # Succeeded to open file, return exit code in the file
823 return int(out.strip())
825 # Error in the content of the file!
826 return ExitCode.CORRUPTFILE
828 # No such file or directory
829 if proc.returncode == 1:
830 return ExitCode.FILENOTFOUND
832 # Other error from 'cat'
833 return ExitCode.ERROR
835 def upload_command(self, command,
837 ecodefile = "exitcode",
840 """ Saves the command as a bash script file in the remote host, and
841 forces to save the exit code of the command execution to the ecodefile
844 if not (command.strip().endswith(";") or command.strip().endswith("&")):
847 # The exit code of the command will be stored in ecodefile
848 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
850 'ecodefile': ecodefile,
854 environ = self.format_environment(env)
856 # Add environ to command
857 command = environ + command
859 return self.upload(command, shfile, text = True, overwrite = overwrite)
861 def format_environment(self, env, inline = False):
862 """ Formats the environment variables for a command to be executed
863 either as an inline command
864 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
865 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
867 if not env: return ""
869 # Remove extra white spaces
870 env = re.sub(r'\s+', ' ', env.strip())
872 sep = ";" if inline else "\n"
873 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
875 def check_errors(self, home,
876 ecodefile = "exitcode",
878 """ Checks whether errors occurred while running a command.
879 It first checks the exit code for the command, and only if the
880 exit code is an error one it returns the error output.
886 # get exit code saved in the 'exitcode' file
887 ecode = self.exitcode(home, ecodefile)
889 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
890 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
891 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
892 # The process returned an error code or didn't exist.
893 # Check standard error.
894 (err, eerr), proc = self.check_output(home, stderr)
896 # If the stderr file was not found, assume nothing bad happened,
897 # and just ignore the error.
898 # (cat returns 1 for error "No such file or directory")
899 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
902 return ("", err), proc
904 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
905 """ Waits until the pid file for the command is generated,
906 and returns the pid and ppid of the process """
911 pidtuple = self.getpid(home = home, pidfile = pidfile)
920 msg = " Failed to get pid for pidfile %s/%s " % (
925 raise RuntimeError, msg
929 def wait_run(self, pid, ppid, trial = 0):
930 """ wait for a remote process to finish execution """
934 status = self.status(pid, ppid)
936 if status is ProcStatus.FINISHED:
938 elif status is not ProcStatus.RUNNING:
941 # If it takes more than 20 seconds to start, then
942 # asume something went wrong
946 # The app is running, just wait...
949 def check_output(self, home, filename):
950 """ Retrives content of file """
951 (out, err), proc = self.execute("cat %s" %
952 os.path.join(home, filename), retry = 1, with_lock = True)
953 return (out, err), proc
956 """ Checks if host is responsive
962 # The underlying SSH layer will sometimes return an empty
963 # output (even if the command was executed without errors).
964 # To work arround this, repeat the operation N times or
965 # until the result is not empty string
969 (out, err), proc = self.execute("echo 'ALIVE'",
974 if out.find("ALIVE") > -1:
977 trace = traceback.format_exc()
978 msg = "Unresponsive host. Error reaching host: %s " % trace
979 self.error(msg, out, err)
982 time.sleep(min(30.0, retrydelay))
985 if out.find("ALIVE") > -1:
988 msg = "Unresponsive host. Wrong answer. "
989 self.error(msg, out, err)
993 """ Retrieves host home directory
995 # The underlying SSH layer will sometimes return an empty
996 # output (even if the command was executed without errors).
997 # To work arround this, repeat the operation N times or
998 # until the result is not empty string
1000 for i in xrange(10):
1002 (out, err), proc = self.execute("echo ${HOME}",
1007 if out.strip() != "":
1008 self._home_dir = out.strip()
1011 trace = traceback.format_exc()
1012 msg = "Impossible to retrieve HOME directory" % trace
1013 self.error(msg, out, err)
1016 time.sleep(min(30.0, retrydelay))
1019 if not self._home_dir:
1020 msg = "Impossible to retrieve HOME directory"
1021 self.error(msg, out, err)
1022 raise RuntimeError, msg
1024 def filter_existing_files(self, src, dst):
1025 """ Removes files that already exist in the Linux host from src list
1027 # construct a dictionary with { dst: src }
1028 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1029 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1032 for d in dests.keys():
1033 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1035 command = ";".join(command)
1037 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1039 for d in dests.keys():
1040 if out.find(d) > -1:
1046 return " ".join(dests.values())