2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
147 def _register_attributes(cls):
148 hostname = Attribute("hostname", "Hostname of the machine",
149 flags = Flags.ExecReadOnly)
151 username = Attribute("username", "Local account username",
152 flags = Flags.Credential)
154 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
156 home = Attribute("home",
157 "Experiment home directory to store all experiment related files",
158 flags = Flags.ExecReadOnly)
160 identity = Attribute("identity", "SSH identity file",
161 flags = Flags.Credential)
163 server_key = Attribute("serverKey", "Server public key",
164 flags = Flags.ExecReadOnly)
166 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167 " from node home folder before starting experiment",
168 flags = Flags.ExecReadOnly)
170 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
171 " from a previous same experiment, before the new experiment starts",
172 flags = Flags.ExecReadOnly)
174 clean_processes = Attribute("cleanProcesses",
175 "Kill all running processes before starting experiment",
176 flags = Flags.ExecReadOnly)
178 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179 "releasing the resource",
180 flags = Flags.ExecReadOnly)
182 cls._register_attribute(hostname)
183 cls._register_attribute(username)
184 cls._register_attribute(port)
185 cls._register_attribute(home)
186 cls._register_attribute(identity)
187 cls._register_attribute(server_key)
188 cls._register_attribute(clean_home)
189 cls._register_attribute(clean_experiment)
190 cls._register_attribute(clean_processes)
191 cls._register_attribute(tear_down)
193 def __init__(self, ec, guid):
194 super(LinuxNode, self).__init__(ec, guid)
196 # home directory at Linux host
199 # lock to avoid concurrency issues on methods used by applications
200 self._lock = threading.Lock()
202 def log_message(self, msg):
203 return " guid %d - host %s - %s " % (self.guid,
204 self.get("hostname"), msg)
208 home = self.get("home") or ""
209 if not home.startswith("/"):
210 home = os.path.join(self._home_dir, home)
215 return os.path.join(self.home_dir, "nepi-usr")
219 return os.path.join(self.usr_dir, "lib")
223 return os.path.join(self.usr_dir, "bin")
227 return os.path.join(self.usr_dir, "src")
231 return os.path.join(self.usr_dir, "share")
235 return os.path.join(self.home_dir, "nepi-exp")
239 return os.path.join(self.exp_dir, self.ec.exp_id)
243 return os.path.join(self.exp_home, "node-%d" % self.guid)
247 return os.path.join(self.node_home, self.ec.run_id)
254 if (not self.get("hostname") or not self.get("username")):
255 msg = "Can't resolve OS, insufficient data "
257 raise RuntimeError, msg
259 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
261 if err and proc.poll():
262 msg = "Error detecting OS "
263 self.error(msg, out, err)
264 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
266 if out.find("Fedora release 8") == 0:
267 self._os = OSType.FEDORA_8
268 elif out.find("Fedora release 12") == 0:
269 self._os = OSType.FEDORA_12
270 elif out.find("Fedora release 14") == 0:
271 self._os = OSType.FEDORA_14
272 elif out.find("Debian") == 0:
273 self._os = OSType.DEBIAN
274 elif out.find("Ubuntu") ==0:
275 self._os = OSType.UBUNTU
277 msg = "Unsupported OS"
279 raise RuntimeError, "%s - %s " %( msg, out )
285 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
289 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
294 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
297 # check if host is alive
298 if not self.is_alive():
301 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
303 raise RuntimeError, msg
307 if self.get("cleanProcesses"):
308 self.clean_processes()
310 if self.get("cleanHome"):
313 if self.get("cleanExperiment"):
314 self.clean_experiment()
316 # Create shared directory structure
317 self.mkdir(self.lib_dir)
318 self.mkdir(self.bin_dir)
319 self.mkdir(self.src_dir)
320 self.mkdir(self.share_dir)
322 # Create experiment node home directory
323 self.mkdir(self.node_home)
325 super(LinuxNode, self).provision()
328 if self.state == ResourceState.NEW:
333 self._state = ResourceState.FAILED
336 # Node needs to wait until all associated interfaces are
337 # ready before it can finalize deployment
338 from nepi.resources.linux.interface import LinuxInterface
339 ifaces = self.get_connected(LinuxInterface.rtype())
341 if iface.state < ResourceState.READY:
342 self.ec.schedule(reschedule_delay, self.deploy)
345 super(LinuxNode, self).deploy()
348 # Node needs to wait until all associated RMs are released
350 rms = self.get_connected()
352 if rm.state < ResourceState.STOPPED:
353 self.ec.schedule(reschedule_delay, self.release)
356 tear_down = self.get("tearDown")
358 self.execute(tear_down)
360 self.clean_processes()
362 super(LinuxNode, self).release()
364 def valid_connection(self, guid):
368 def clean_processes(self, killer = False):
369 self.info("Cleaning up processes")
373 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
374 "sudo -S killall python tcpdump || /bin/true ; " +
375 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
376 "sudo -S killall -u root || /bin/true ; " +
377 "sudo -S killall -u root || /bin/true ; ")
380 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
381 "sudo -S killall tcpdump || /bin/true ; " +
382 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
383 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
386 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
388 def clean_home(self):
389 """ Cleans all NEPI related folders in the Linux host
391 self.info("Cleaning up home")
393 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
396 return self.execute(cmd, with_lock = True)
398 def clean_experiment(self):
399 """ Cleans all experiment related files in the Linux host.
400 It preserves NEPI files and folders that have a multi experiment
403 self.info("Cleaning up experiment files")
405 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
409 return self.execute(cmd, with_lock = True)
411 def execute(self, command,
419 err_on_timeout = True,
420 connect_timeout = 30,
421 strict_host_checking = False,
426 """ Notice that this invocation will block until the
427 execution finishes. If this is not the desired behavior,
428 use 'run' instead."""
431 (out, err), proc = execfuncs.lexec(command,
439 (out, err), proc = sshfuncs.rexec(
441 host = self.get("hostname"),
442 user = self.get("username"),
443 port = self.get("port"),
447 identity = self.get("identity"),
448 server_key = self.get("serverKey"),
451 forward_x11 = forward_x11,
454 err_on_timeout = err_on_timeout,
455 connect_timeout = connect_timeout,
456 persistent = persistent,
458 strict_host_checking = strict_host_checking
461 (out, err), proc = sshfuncs.rexec(
463 host = self.get("hostname"),
464 user = self.get("username"),
465 port = self.get("port"),
469 identity = self.get("identity"),
470 server_key = self.get("serverKey"),
473 forward_x11 = forward_x11,
476 err_on_timeout = err_on_timeout,
477 connect_timeout = connect_timeout,
478 persistent = persistent,
480 strict_host_checking = strict_host_checking
483 return (out, err), proc
485 def run(self, command, home,
494 self.debug("Running command '%s'" % command)
497 (out, err), proc = execfuncs.lspawn(command, pidfile,
502 create_home = create_home,
507 (out, err), proc = sshfuncs.rspawn(
511 create_home = create_home,
512 stdin = stdin if stdin is not None else '/dev/null',
513 stdout = stdout if stdout else '/dev/null',
514 stderr = stderr if stderr else '/dev/null',
516 host = self.get("hostname"),
517 user = self.get("username"),
518 port = self.get("port"),
520 identity = self.get("identity"),
521 server_key = self.get("serverKey"),
525 return (out, err), proc
527 def getpid(self, home, pidfile = "pidfile"):
529 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
532 pidtuple = sshfuncs.rgetpid(
533 os.path.join(home, pidfile),
534 host = self.get("hostname"),
535 user = self.get("username"),
536 port = self.get("port"),
538 identity = self.get("identity"),
539 server_key = self.get("serverKey")
544 def status(self, pid, ppid):
546 status = execfuncs.lstatus(pid, ppid)
549 status = sshfuncs.rstatus(
551 host = self.get("hostname"),
552 user = self.get("username"),
553 port = self.get("port"),
555 identity = self.get("identity"),
556 server_key = self.get("serverKey")
561 def kill(self, pid, ppid, sudo = False):
564 status = self.status(pid, ppid)
566 if status == sshfuncs.ProcStatus.RUNNING:
568 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
571 (out, err), proc = sshfuncs.rkill(
573 host = self.get("hostname"),
574 user = self.get("username"),
575 port = self.get("port"),
578 identity = self.get("identity"),
579 server_key = self.get("serverKey")
582 return (out, err), proc
584 def copy(self, src, dst):
586 (out, err), proc = execfuncs.lcopy(source, dest,
588 strict_host_checking = False)
591 (out, err), proc = sshfuncs.rcopy(
593 port = self.get("port"),
594 identity = self.get("identity"),
595 server_key = self.get("serverKey"),
597 strict_host_checking = False)
599 return (out, err), proc
602 def upload(self, src, dst, text = False, overwrite = True):
603 """ Copy content to destination
605 src content to copy. Can be a local file, directory or a list of files
607 dst destination path on the remote host (remote is always self.host)
609 text src is text input, it must be stored into a temp file before uploading
611 # If source is a string input
613 if text and not os.path.isfile(src):
614 # src is text input that should be uploaded as file
615 # create a temporal file with the content to upload
616 f = tempfile.NamedTemporaryFile(delete=False)
621 # If dst files should not be overwritten, check that the files do not
623 if overwrite == False:
624 src = self.filter_existing_files(src, dst)
626 return ("", ""), None
628 if not self.localhost:
629 # Build destination as <user>@<server>:<path>
630 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
632 result = self.copy(src, dst)
640 def download(self, src, dst):
641 if not self.localhost:
642 # Build destination as <user>@<server>:<path>
643 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
644 return self.copy(src, dst)
646 def install_packages(self, packages, home, run_home = None):
647 """ Install packages in the Linux host.
649 'home' is the directory to upload the package installation script.
650 'run_home' is the directory from where to execute the script.
654 command = rpmfuncs.install_packages_command(self.os, packages)
656 command = debfuncs.install_packages_command(self.os, packages)
658 msg = "Error installing packages ( OS not known ) "
659 self.error(msg, self.os)
660 raise RuntimeError, msg
662 run_home = run_home or home
664 (out, err), proc = self.run_and_wait(command, run_home,
665 shfile = os.path.join(home, "instpkg.sh"),
666 pidfile = "instpkg_pidfile",
667 ecodefile = "instpkg_exitcode",
668 stdout = "instpkg_stdout",
669 stderr = "instpkg_stderr",
671 raise_on_error = True)
673 return (out, err), proc
675 def remove_packages(self, packages, home, run_home = None):
676 """ Uninstall packages from the Linux host.
678 'home' is the directory to upload the package un-installation script.
679 'run_home' is the directory from where to execute the script.
682 command = rpmfuncs.remove_packages_command(self.os, packages)
684 command = debfuncs.remove_packages_command(self.os, packages)
686 msg = "Error removing packages ( OS not known ) "
688 raise RuntimeError, msg
690 run_home = run_home or home
692 (out, err), proc = self.run_and_wait(command, run_home,
693 shfile = os.path.join(home, "rmpkg.sh"),
694 pidfile = "rmpkg_pidfile",
695 ecodefile = "rmpkg_exitcode",
696 stdout = "rmpkg_stdout",
697 stderr = "rmpkg_stderr",
699 raise_on_error = True)
701 return (out, err), proc
703 def mkdir(self, path, clean = False):
707 return self.execute("mkdir -p %s" % path, with_lock = True)
709 def rmdir(self, path):
710 return self.execute("rm -rf %s" % path, with_lock = True)
712 def run_and_wait(self, command, home,
717 ecodefile = "exitcode",
723 raise_on_error = False):
725 Uploads the 'command' to a bash script in the host.
726 Then runs the script detached in background in the host, and
727 busy-waites until the script finishes executing.
730 if not shfile.startswith("/"):
731 shfile = os.path.join(home, shfile)
733 self.upload_command(command,
735 ecodefile = ecodefile,
737 overwrite = overwrite)
739 command = "bash %s" % shfile
740 # run command in background in remote host
741 (out, err), proc = self.run(command, home,
749 # check no errors occurred
751 msg = " Failed to run command '%s' " % command
752 self.error(msg, out, err)
754 raise RuntimeError, msg
756 # Wait for pid file to be generated
757 pid, ppid = self.wait_pid(
760 raise_on_error = raise_on_error)
762 # wait until command finishes to execute
763 self.wait_run(pid, ppid)
765 (eout, err), proc = self.check_errors(home,
766 ecodefile = ecodefile,
769 # Out is what was written in the stderr file
771 msg = " Failed to run command '%s' " % command
772 self.error(msg, eout, err)
775 raise RuntimeError, msg
777 (out, oerr), proc = self.check_output(home, stdout)
779 return (out, err), proc
781 def exitcode(self, home, ecodefile = "exitcode"):
783 Get the exit code of an application.
784 Returns an integer value with the exit code
786 (out, err), proc = self.check_output(home, ecodefile)
788 # Succeeded to open file, return exit code in the file
791 return int(out.strip())
793 # Error in the content of the file!
794 return ExitCode.CORRUPTFILE
796 # No such file or directory
797 if proc.returncode == 1:
798 return ExitCode.FILENOTFOUND
800 # Other error from 'cat'
801 return ExitCode.ERROR
803 def upload_command(self, command,
805 ecodefile = "exitcode",
808 """ Saves the command as a bash script file in the remote host, and
809 forces to save the exit code of the command execution to the ecodefile
812 if not (command.strip().endswith(";") or command.strip().endswith("&")):
815 # The exit code of the command will be stored in ecodefile
816 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
818 'ecodefile': ecodefile,
822 environ = self.format_environment(env)
824 # Add environ to command
825 command = environ + command
827 return self.upload(command, shfile, text = True, overwrite = overwrite)
829 def format_environment(self, env, inline = False):
830 """ Formats the environment variables for a command to be executed
831 either as an inline command
832 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
833 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
835 if not env: return ""
837 # Remove extra white spaces
838 env = re.sub(r'\s+', ' ', env.strip())
840 sep = ";" if inline else "\n"
841 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
843 def check_errors(self, home,
844 ecodefile = "exitcode",
846 """ Checks whether errors occurred while running a command.
847 It first checks the exit code for the command, and only if the
848 exit code is an error one it returns the error output.
854 # get exit code saved in the 'exitcode' file
855 ecode = self.exitcode(home, ecodefile)
857 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
858 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
859 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
860 # The process returned an error code or didn't exist.
861 # Check standard error.
862 (err, eerr), proc = self.check_output(home, stderr)
864 # If the stderr file was not found, assume nothing bad happened,
865 # and just ignore the error.
866 # (cat returns 1 for error "No such file or directory")
867 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
870 return ("", err), proc
872 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
873 """ Waits until the pid file for the command is generated,
874 and returns the pid and ppid of the process """
879 pidtuple = self.getpid(home = home, pidfile = pidfile)
888 msg = " Failed to get pid for pidfile %s/%s " % (
893 raise RuntimeError, msg
897 def wait_run(self, pid, ppid, trial = 0):
898 """ wait for a remote process to finish execution """
902 status = self.status(pid, ppid)
904 if status is ProcStatus.FINISHED:
906 elif status is not ProcStatus.RUNNING:
909 # If it takes more than 20 seconds to start, then
910 # asume something went wrong
914 # The app is running, just wait...
917 def check_output(self, home, filename):
918 """ Retrives content of file """
919 (out, err), proc = self.execute("cat %s" %
920 os.path.join(home, filename), retry = 1, with_lock = True)
921 return (out, err), proc
924 """ Checks if host is responsive
931 (out, err), proc = self.execute("echo 'ALIVE'",
935 trace = traceback.format_exc()
936 msg = "Unresponsive host %s " % err
937 self.error(msg, out, trace)
940 if out.strip() == "ALIVE":
943 msg = "Unresponsive host "
944 self.error(msg, out, err)
948 """ Retrieves host home directory
950 (out, err), proc = self.execute("echo ${HOME}", retry = 5,
954 msg = "Imposible to retrieve HOME directory"
955 self.error(msg, out, err)
956 raise RuntimeError, msg
958 self._home_dir = out.strip()
960 def filter_existing_files(self, src, dst):
961 """ Removes files that already exist in the Linux host from src list
963 # construct a dictionary with { dst: src }
964 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
965 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
968 for d in dests.keys():
969 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
971 command = ";".join(command)
973 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
975 for d in dests.keys():
982 return " ".join(dests.values())