2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
147 def _register_attributes(cls):
148 hostname = Attribute("hostname", "Hostname of the machine",
149 flags = Flags.ExecReadOnly)
151 username = Attribute("username", "Local account username",
152 flags = Flags.Credential)
154 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
156 home = Attribute("home",
157 "Experiment home directory to store all experiment related files",
158 flags = Flags.ExecReadOnly)
160 identity = Attribute("identity", "SSH identity file",
161 flags = Flags.Credential)
163 server_key = Attribute("serverKey", "Server public key",
164 flags = Flags.ExecReadOnly)
166 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
167 " from node home folder before starting experiment",
168 flags = Flags.ExecReadOnly)
170 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
171 " from a previous same experiment, before the new experiment starts",
172 flags = Flags.ExecReadOnly)
174 clean_processes = Attribute("cleanProcesses",
175 "Kill all running processes before starting experiment",
176 flags = Flags.ExecReadOnly)
178 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
179 "releasing the resource",
180 flags = Flags.ExecReadOnly)
182 cls._register_attribute(hostname)
183 cls._register_attribute(username)
184 cls._register_attribute(port)
185 cls._register_attribute(home)
186 cls._register_attribute(identity)
187 cls._register_attribute(server_key)
188 cls._register_attribute(clean_home)
189 cls._register_attribute(clean_experiment)
190 cls._register_attribute(clean_processes)
191 cls._register_attribute(tear_down)
193 def __init__(self, ec, guid):
194 super(LinuxNode, self).__init__(ec, guid)
196 # home directory at Linux host
199 # lock to avoid concurrency issues on methods used by applications
200 self._lock = threading.Lock()
202 def log_message(self, msg):
203 return " guid %d - host %s - %s " % (self.guid,
204 self.get("hostname"), msg)
208 home = self.get("home") or ""
209 if not home.startswith("/"):
210 home = os.path.join(self._home_dir, home)
215 return os.path.join(self.home_dir, "nepi-usr")
219 return os.path.join(self.usr_dir, "lib")
223 return os.path.join(self.usr_dir, "bin")
227 return os.path.join(self.usr_dir, "src")
231 return os.path.join(self.usr_dir, "share")
235 return os.path.join(self.home_dir, "nepi-exp")
239 return os.path.join(self.exp_dir, self.ec.exp_id)
243 return os.path.join(self.exp_home, "node-%d" % self.guid)
247 return os.path.join(self.node_home, self.ec.run_id)
254 if (not self.get("hostname") or not self.get("username")):
255 msg = "Can't resolve OS, insufficient data "
257 raise RuntimeError, msg
261 if out.find("Fedora release 8") == 0:
262 self._os = OSType.FEDORA_8
263 elif out.find("Fedora release 12") == 0:
264 self._os = OSType.FEDORA_12
265 elif out.find("Fedora release 14") == 0:
266 self._os = OSType.FEDORA_14
267 elif out.find("Debian") == 0:
268 self._os = OSType.DEBIAN
269 elif out.find("Ubuntu") ==0:
270 self._os = OSType.UBUNTU
272 msg = "Unsupported OS"
274 raise RuntimeError, "%s - %s " %( msg, out )
279 # The underlying SSH layer will sometimes return an empty
280 # output (even if the command was executed without errors).
281 # To work arround this, repeat the operation N times or
282 # until the result is not empty string
287 (out, err), proc = self.execute("cat /etc/issue",
292 if out.strip() != "":
295 trace = traceback.format_exc()
296 msg = "Error detecting OS: %s " % trace
297 self.error(msg, out, err)
300 time.sleep(min(30.0, retrydelay))
306 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
310 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
315 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
318 # check if host is alive
319 if not self.is_alive():
322 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
324 raise RuntimeError, msg
328 if self.get("cleanProcesses"):
329 self.clean_processes()
331 if self.get("cleanHome"):
334 if self.get("cleanExperiment"):
335 self.clean_experiment()
337 # Create shared directory structure
338 self.mkdir(self.lib_dir)
339 self.mkdir(self.bin_dir)
340 self.mkdir(self.src_dir)
341 self.mkdir(self.share_dir)
343 # Create experiment node home directory
344 self.mkdir(self.node_home)
346 super(LinuxNode, self).provision()
349 if self.state == ResourceState.NEW:
354 self._state = ResourceState.FAILED
357 # Node needs to wait until all associated interfaces are
358 # ready before it can finalize deployment
359 from nepi.resources.linux.interface import LinuxInterface
360 ifaces = self.get_connected(LinuxInterface.rtype())
362 if iface.state < ResourceState.READY:
363 self.ec.schedule(reschedule_delay, self.deploy)
366 super(LinuxNode, self).deploy()
369 # Node needs to wait until all associated RMs are released
371 rms = self.get_connected()
373 if rm.state < ResourceState.STOPPED:
374 self.ec.schedule(reschedule_delay, self.release)
377 tear_down = self.get("tearDown")
379 self.execute(tear_down)
381 self.clean_processes()
383 super(LinuxNode, self).release()
385 def valid_connection(self, guid):
389 def clean_processes(self, killer = False):
390 self.info("Cleaning up processes")
394 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
395 "sudo -S killall python tcpdump || /bin/true ; " +
396 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
397 "sudo -S killall -u root || /bin/true ; " +
398 "sudo -S killall -u root || /bin/true ; ")
401 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
402 "sudo -S killall tcpdump || /bin/true ; " +
403 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
404 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
407 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
409 def clean_home(self):
410 """ Cleans all NEPI related folders in the Linux host
412 self.info("Cleaning up home")
414 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
417 return self.execute(cmd, with_lock = True)
419 def clean_experiment(self):
420 """ Cleans all experiment related files in the Linux host.
421 It preserves NEPI files and folders that have a multi experiment
424 self.info("Cleaning up experiment files")
426 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
430 return self.execute(cmd, with_lock = True)
432 def execute(self, command,
440 err_on_timeout = True,
441 connect_timeout = 30,
442 strict_host_checking = False,
447 """ Notice that this invocation will block until the
448 execution finishes. If this is not the desired behavior,
449 use 'run' instead."""
452 (out, err), proc = execfuncs.lexec(command,
460 (out, err), proc = sshfuncs.rexec(
462 host = self.get("hostname"),
463 user = self.get("username"),
464 port = self.get("port"),
468 identity = self.get("identity"),
469 server_key = self.get("serverKey"),
472 forward_x11 = forward_x11,
475 err_on_timeout = err_on_timeout,
476 connect_timeout = connect_timeout,
477 persistent = persistent,
479 strict_host_checking = strict_host_checking
482 (out, err), proc = sshfuncs.rexec(
484 host = self.get("hostname"),
485 user = self.get("username"),
486 port = self.get("port"),
490 identity = self.get("identity"),
491 server_key = self.get("serverKey"),
494 forward_x11 = forward_x11,
497 err_on_timeout = err_on_timeout,
498 connect_timeout = connect_timeout,
499 persistent = persistent,
501 strict_host_checking = strict_host_checking
504 return (out, err), proc
506 def run(self, command, home,
515 self.debug("Running command '%s'" % command)
518 (out, err), proc = execfuncs.lspawn(command, pidfile,
523 create_home = create_home,
528 (out, err), proc = sshfuncs.rspawn(
532 create_home = create_home,
533 stdin = stdin if stdin is not None else '/dev/null',
534 stdout = stdout if stdout else '/dev/null',
535 stderr = stderr if stderr else '/dev/null',
537 host = self.get("hostname"),
538 user = self.get("username"),
539 port = self.get("port"),
541 identity = self.get("identity"),
542 server_key = self.get("serverKey"),
546 return (out, err), proc
548 def getpid(self, home, pidfile = "pidfile"):
550 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
553 pidtuple = sshfuncs.rgetpid(
554 os.path.join(home, pidfile),
555 host = self.get("hostname"),
556 user = self.get("username"),
557 port = self.get("port"),
559 identity = self.get("identity"),
560 server_key = self.get("serverKey")
565 def status(self, pid, ppid):
567 status = execfuncs.lstatus(pid, ppid)
570 status = sshfuncs.rstatus(
572 host = self.get("hostname"),
573 user = self.get("username"),
574 port = self.get("port"),
576 identity = self.get("identity"),
577 server_key = self.get("serverKey")
582 def kill(self, pid, ppid, sudo = False):
585 status = self.status(pid, ppid)
587 if status == sshfuncs.ProcStatus.RUNNING:
589 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
592 (out, err), proc = sshfuncs.rkill(
594 host = self.get("hostname"),
595 user = self.get("username"),
596 port = self.get("port"),
599 identity = self.get("identity"),
600 server_key = self.get("serverKey")
603 return (out, err), proc
605 def copy(self, src, dst):
607 (out, err), proc = execfuncs.lcopy(source, dest,
609 strict_host_checking = False)
612 (out, err), proc = sshfuncs.rcopy(
614 port = self.get("port"),
615 identity = self.get("identity"),
616 server_key = self.get("serverKey"),
618 strict_host_checking = False)
620 return (out, err), proc
623 def upload(self, src, dst, text = False, overwrite = True):
624 """ Copy content to destination
626 src content to copy. Can be a local file, directory or a list of files
628 dst destination path on the remote host (remote is always self.host)
630 text src is text input, it must be stored into a temp file before uploading
632 # If source is a string input
634 if text and not os.path.isfile(src):
635 # src is text input that should be uploaded as file
636 # create a temporal file with the content to upload
637 f = tempfile.NamedTemporaryFile(delete=False)
642 # If dst files should not be overwritten, check that the files do not
644 if overwrite == False:
645 src = self.filter_existing_files(src, dst)
647 return ("", ""), None
649 if not self.localhost:
650 # Build destination as <user>@<server>:<path>
651 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
653 result = self.copy(src, dst)
661 def download(self, src, dst):
662 if not self.localhost:
663 # Build destination as <user>@<server>:<path>
664 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
665 return self.copy(src, dst)
667 def install_packages_command(self, packages):
670 command = rpmfuncs.install_packages_command(self.os, packages)
672 command = debfuncs.install_packages_command(self.os, packages)
674 msg = "Error installing packages ( OS not known ) "
675 self.error(msg, self.os)
676 raise RuntimeError, msg
680 def install_packages(self, packages, home, run_home = None):
681 """ Install packages in the Linux host.
683 'home' is the directory to upload the package installation script.
684 'run_home' is the directory from where to execute the script.
686 command = self.install_packages_command(packages)
688 run_home = run_home or home
690 (out, err), proc = self.run_and_wait(command, run_home,
691 shfile = os.path.join(home, "instpkg.sh"),
692 pidfile = "instpkg_pidfile",
693 ecodefile = "instpkg_exitcode",
694 stdout = "instpkg_stdout",
695 stderr = "instpkg_stderr",
697 raise_on_error = True)
699 return (out, err), proc
701 def remove_packages(self, packages, home, run_home = None):
702 """ Uninstall packages from the Linux host.
704 'home' is the directory to upload the package un-installation script.
705 'run_home' is the directory from where to execute the script.
708 command = rpmfuncs.remove_packages_command(self.os, packages)
710 command = debfuncs.remove_packages_command(self.os, packages)
712 msg = "Error removing packages ( OS not known ) "
714 raise RuntimeError, msg
716 run_home = run_home or home
718 (out, err), proc = self.run_and_wait(command, run_home,
719 shfile = os.path.join(home, "rmpkg.sh"),
720 pidfile = "rmpkg_pidfile",
721 ecodefile = "rmpkg_exitcode",
722 stdout = "rmpkg_stdout",
723 stderr = "rmpkg_stderr",
725 raise_on_error = True)
727 return (out, err), proc
729 def mkdir(self, path, clean = False):
733 return self.execute("mkdir -p %s" % path, with_lock = True)
735 def rmdir(self, path):
736 return self.execute("rm -rf %s" % path, with_lock = True)
738 def run_and_wait(self, command, home,
743 ecodefile = "exitcode",
749 raise_on_error = False):
751 Uploads the 'command' to a bash script in the host.
752 Then runs the script detached in background in the host, and
753 busy-waites until the script finishes executing.
756 if not shfile.startswith("/"):
757 shfile = os.path.join(home, shfile)
759 self.upload_command(command,
761 ecodefile = ecodefile,
763 overwrite = overwrite)
765 command = "bash %s" % shfile
766 # run command in background in remote host
767 (out, err), proc = self.run(command, home,
775 # check no errors occurred
777 msg = " Failed to run command '%s' " % command
778 self.error(msg, out, err)
780 raise RuntimeError, msg
782 # Wait for pid file to be generated
783 pid, ppid = self.wait_pid(
786 raise_on_error = raise_on_error)
788 # wait until command finishes to execute
789 self.wait_run(pid, ppid)
791 (eout, err), proc = self.check_errors(home,
792 ecodefile = ecodefile,
795 # Out is what was written in the stderr file
797 msg = " Failed to run command '%s' " % command
798 self.error(msg, eout, err)
801 raise RuntimeError, msg
803 (out, oerr), proc = self.check_output(home, stdout)
805 return (out, err), proc
807 def exitcode(self, home, ecodefile = "exitcode"):
809 Get the exit code of an application.
810 Returns an integer value with the exit code
812 (out, err), proc = self.check_output(home, ecodefile)
814 # Succeeded to open file, return exit code in the file
817 return int(out.strip())
819 # Error in the content of the file!
820 return ExitCode.CORRUPTFILE
822 # No such file or directory
823 if proc.returncode == 1:
824 return ExitCode.FILENOTFOUND
826 # Other error from 'cat'
827 return ExitCode.ERROR
829 def upload_command(self, command,
831 ecodefile = "exitcode",
834 """ Saves the command as a bash script file in the remote host, and
835 forces to save the exit code of the command execution to the ecodefile
838 if not (command.strip().endswith(";") or command.strip().endswith("&")):
841 # The exit code of the command will be stored in ecodefile
842 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
844 'ecodefile': ecodefile,
848 environ = self.format_environment(env)
850 # Add environ to command
851 command = environ + command
853 return self.upload(command, shfile, text = True, overwrite = overwrite)
855 def format_environment(self, env, inline = False):
856 """ Formats the environment variables for a command to be executed
857 either as an inline command
858 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
859 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
861 if not env: return ""
863 # Remove extra white spaces
864 env = re.sub(r'\s+', ' ', env.strip())
866 sep = ";" if inline else "\n"
867 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
869 def check_errors(self, home,
870 ecodefile = "exitcode",
872 """ Checks whether errors occurred while running a command.
873 It first checks the exit code for the command, and only if the
874 exit code is an error one it returns the error output.
880 # get exit code saved in the 'exitcode' file
881 ecode = self.exitcode(home, ecodefile)
883 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
884 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
885 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
886 # The process returned an error code or didn't exist.
887 # Check standard error.
888 (err, eerr), proc = self.check_output(home, stderr)
890 # If the stderr file was not found, assume nothing bad happened,
891 # and just ignore the error.
892 # (cat returns 1 for error "No such file or directory")
893 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
896 return ("", err), proc
898 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
899 """ Waits until the pid file for the command is generated,
900 and returns the pid and ppid of the process """
905 pidtuple = self.getpid(home = home, pidfile = pidfile)
914 msg = " Failed to get pid for pidfile %s/%s " % (
919 raise RuntimeError, msg
923 def wait_run(self, pid, ppid, trial = 0):
924 """ wait for a remote process to finish execution """
928 status = self.status(pid, ppid)
930 if status is ProcStatus.FINISHED:
932 elif status is not ProcStatus.RUNNING:
935 # If it takes more than 20 seconds to start, then
936 # asume something went wrong
940 # The app is running, just wait...
943 def check_output(self, home, filename):
944 """ Retrives content of file """
945 (out, err), proc = self.execute("cat %s" %
946 os.path.join(home, filename), retry = 1, with_lock = True)
947 return (out, err), proc
950 """ Checks if host is responsive
956 # The underlying SSH layer will sometimes return an empty
957 # output (even if the command was executed without errors).
958 # To work arround this, repeat the operation N times or
959 # until the result is not empty string
963 (out, err), proc = self.execute("echo 'ALIVE'",
968 if out.find("ALIVE") > -1:
971 trace = traceback.format_exc()
972 msg = "Unresponsive host. Error reaching host: %s " % trace
973 self.error(msg, out, err)
976 time.sleep(min(30.0, retrydelay))
979 if out.find("ALIVE") > -1:
982 msg = "Unresponsive host. Wrong answer. "
983 self.error(msg, out, err)
987 """ Retrieves host home directory
989 # The underlying SSH layer will sometimes return an empty
990 # output (even if the command was executed without errors).
991 # To work arround this, repeat the operation N times or
992 # until the result is not empty string
996 (out, err), proc = self.execute("echo ${HOME}",
1001 if out.strip() != "":
1002 self._home_dir = out.strip()
1005 trace = traceback.format_exc()
1006 msg = "Impossible to retrieve HOME directory" % trace
1007 self.error(msg, out, err)
1010 time.sleep(min(30.0, retrydelay))
1013 if not self._home_dir:
1014 msg = "Impossible to retrieve HOME directory"
1015 self.error(msg, out, err)
1016 raise RuntimeError, msg
1018 def filter_existing_files(self, src, dst):
1019 """ Removes files that already exist in the Linux host from src list
1021 # construct a dictionary with { dst: src }
1022 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1023 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1026 for d in dests.keys():
1027 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1029 command = ";".join(command)
1031 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1033 for d in dests.keys():
1034 if out.find(d) > -1:
1040 return " ".join(dests.values())