2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
56 FEDORA_8 = 1 << 3 | FEDORA
57 FEDORA_12 = 1 << 4 | FEDORA
58 FEDORA_14 = 1 << 5 | FEDORA
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
144 _rtype = "linux::Node"
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.Design)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.Design)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.Design)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.Design)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
173 flags = Flags.Design)
175 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
176 " from a previous same experiment, before the new experiment starts",
179 flags = Flags.Design)
181 clean_processes = Attribute("cleanProcesses",
182 "Kill all running processes before starting experiment",
185 flags = Flags.Design)
187 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188 "releasing the resource",
189 flags = Flags.Design)
191 gateway_user = Attribute("gatewayUser", "Gateway account username",
192 flags = Flags.Design)
194 gateway = Attribute("gateway", "Hostname of the gateway machine",
195 flags = Flags.Design)
197 ip = Attribute("ip", "Linux host public IP address. "
198 "Must not be modified by the user unless hostname is 'localhost'",
199 flags = Flags.Design)
201 cls._register_attribute(hostname)
202 cls._register_attribute(username)
203 cls._register_attribute(port)
204 cls._register_attribute(home)
205 cls._register_attribute(identity)
206 cls._register_attribute(server_key)
207 cls._register_attribute(clean_home)
208 cls._register_attribute(clean_experiment)
209 cls._register_attribute(clean_processes)
210 cls._register_attribute(tear_down)
211 cls._register_attribute(gateway_user)
212 cls._register_attribute(gateway)
213 cls._register_attribute(ip)
215 def __init__(self, ec, guid):
216 super(LinuxNode, self).__init__(ec, guid)
218 # home directory at Linux host
221 # lock to prevent concurrent applications on the same node,
222 # to execute commands at the same time. There are potential
223 # concurrency issues when using SSH to a same host from
224 # multiple threads. There are also possible operational
225 # issues, e.g. an application querying the existence
226 # of a file or folder prior to its creation, and another
227 # application creating the same file or folder in between.
228 self._node_lock = threading.Lock()
230 def log_message(self, msg):
231 return " guid %d - host %s - %s " % (self.guid,
232 self.get("hostname"), msg)
236 home = self.get("home") or ""
237 if not home.startswith("/"):
238 home = os.path.join(self._home_dir, home)
243 return os.path.join(self.home_dir, ".nepi")
247 return os.path.join(self.nepi_home, "nepi-usr")
251 return os.path.join(self.usr_dir, "lib")
255 return os.path.join(self.usr_dir, "bin")
259 return os.path.join(self.usr_dir, "src")
263 return os.path.join(self.usr_dir, "share")
267 return os.path.join(self.nepi_home, "nepi-exp")
271 return os.path.join(self.exp_dir, self.ec.exp_id)
275 return os.path.join(self.exp_home, "node-%d" % self.guid)
279 return os.path.join(self.node_home, self.ec.run_id)
286 if not self.localhost and not self.get("username"):
287 msg = "Can't resolve OS, insufficient data "
289 raise RuntimeError, msg
293 if out.find("Debian") == 0:
294 self._os = OSType.DEBIAN
295 elif out.find("Ubuntu") ==0:
296 self._os = OSType.UBUNTU
297 elif out.find("Fedora release") == 0:
298 self._os = OSType.FEDORA
299 if out.find("Fedora release 8") == 0:
300 self._os = OSType.FEDORA_8
301 elif out.find("Fedora release 12") == 0:
302 self._os = OSType.FEDORA_12
303 elif out.find("Fedora release 14") == 0:
304 self._os = OSType.FEDORA_14
306 msg = "Unsupported OS"
308 raise RuntimeError, "%s - %s " %( msg, out )
313 # The underlying SSH layer will sometimes return an empty
314 # output (even if the command was executed without errors).
315 # To work arround this, repeat the operation N times or
316 # until the result is not empty string
319 (out, err), proc = self.execute("cat /etc/issue",
323 trace = traceback.format_exc()
324 msg = "Error detecting OS: %s " % trace
325 self.error(msg, out, err)
331 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
335 return (self.os & OSType.FEDORA)
339 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
341 def do_provision(self):
342 # check if host is alive
343 if not self.is_alive():
344 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
346 raise RuntimeError, msg
350 if self.get("cleanProcesses"):
351 self.clean_processes()
353 if self.get("cleanHome"):
356 if self.get("cleanExperiment"):
357 self.clean_experiment()
359 # Create shared directory structure and node home directory
360 paths = [self.lib_dir,
368 # Get Public IP address if possible
369 if not self.get("ip"):
371 ip = sshfuncs.gethostbyname(self.get("hostname"))
374 if self.get("gateway") is None:
375 msg = "Local DNS can not resolve hostname %s" % self.get("hostname")
378 super(LinuxNode, self).do_provision()
381 if self.state == ResourceState.NEW:
382 self.info("Deploying node")
386 # Node needs to wait until all associated interfaces are
387 # ready before it can finalize deployment
388 from nepi.resources.linux.interface import LinuxInterface
389 ifaces = self.get_connected(LinuxInterface.get_rtype())
391 if iface.state < ResourceState.READY:
392 self.ec.schedule(self.reschedule_delay, self.deploy)
395 super(LinuxNode, self).do_deploy()
397 def do_release(self):
398 rms = self.get_connected()
400 # Node needs to wait until all associated RMs are released
401 # before it can be released
402 if rm.state != ResourceState.RELEASED:
403 self.ec.schedule(self.reschedule_delay, self.release)
406 tear_down = self.get("tearDown")
408 self.execute(tear_down)
410 self.clean_processes()
412 super(LinuxNode, self).do_release()
414 def valid_connection(self, guid):
418 def clean_processes(self):
419 self.info("Cleaning up processes")
424 if self.get("username") != 'root':
425 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
426 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
427 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
429 if self.state >= ResourceState.READY:
431 pids = pickle.load(open("/tmp/save.proc", "rb"))
433 ps_aux = "ps aux |awk '{print $2,$11}'"
434 (out, err), proc = self.execute(ps_aux)
436 for line in out.strip().split("\n"):
437 parts = line.strip().split(" ")
438 pids_temp[parts[0]] = parts[1]
439 kill_pids = set(pids_temp.items()) - set(pids.items())
440 kill_pids = ' '.join(dict(kill_pids).keys())
442 cmd = ("killall tcpdump || /bin/true ; " +
443 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
444 "kill %s || /bin/true ; " % kill_pids)
446 cmd = ("killall tcpdump || /bin/true ; " +
447 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
449 cmd = ("killall tcpdump || /bin/true ; " +
450 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
452 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
454 def clean_home(self):
455 """ Cleans all NEPI related folders in the Linux host
457 self.info("Cleaning up home")
459 cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
462 return self.execute(cmd, with_lock = True)
464 def clean_experiment(self):
465 """ Cleans all experiment related files in the Linux host.
466 It preserves NEPI files and folders that have a multi experiment
469 self.info("Cleaning up experiment files")
471 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
475 return self.execute(cmd, with_lock = True)
477 def execute(self, command,
483 connect_timeout = 30,
484 strict_host_checking = False,
489 """ Notice that this invocation will block until the
490 execution finishes. If this is not the desired behavior,
491 use 'run' instead."""
494 (out, err), proc = execfuncs.lexec(command,
495 user = self.get("username"), # still problem with localhost
500 # If the execute command is blocking, we don't want to keep
501 # the node lock. This lock is used to avoid race conditions
502 # when creating the ControlMaster sockets. A more elegant
503 # solution is needed.
504 with self._node_lock:
505 (out, err), proc = sshfuncs.rexec(
507 host = self.get("hostname"),
508 user = self.get("username"),
509 port = self.get("port"),
510 gwuser = self.get("gatewayUser"),
511 gw = self.get("gateway"),
514 identity = self.get("identity"),
515 server_key = self.get("serverKey"),
518 forward_x11 = forward_x11,
520 connect_timeout = connect_timeout,
521 persistent = persistent,
523 strict_host_checking = strict_host_checking
526 (out, err), proc = sshfuncs.rexec(
528 host = self.get("hostname"),
529 user = self.get("username"),
530 port = self.get("port"),
531 gwuser = self.get("gatewayUser"),
532 gw = self.get("gateway"),
535 identity = self.get("identity"),
536 server_key = self.get("serverKey"),
539 forward_x11 = forward_x11,
541 connect_timeout = connect_timeout,
542 persistent = persistent,
544 strict_host_checking = strict_host_checking
547 return (out, err), proc
549 def run(self, command, home,
557 strict_host_checking = False):
559 self.debug("Running command '%s'" % command)
562 (out, err), proc = execfuncs.lspawn(command, pidfile,
564 create_home = create_home,
565 stdin = stdin or '/dev/null',
566 stdout = stdout or '/dev/null',
567 stderr = stderr or '/dev/null',
570 with self._node_lock:
571 (out, err), proc = sshfuncs.rspawn(
575 create_home = create_home,
576 stdin = stdin or '/dev/null',
577 stdout = stdout or '/dev/null',
578 stderr = stderr or '/dev/null',
580 host = self.get("hostname"),
581 user = self.get("username"),
582 port = self.get("port"),
583 gwuser = self.get("gatewayUser"),
584 gw = self.get("gateway"),
586 identity = self.get("identity"),
587 server_key = self.get("serverKey"),
589 strict_host_checking = strict_host_checking
592 return (out, err), proc
594 def getpid(self, home, pidfile = "pidfile"):
596 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
598 with self._node_lock:
599 pidtuple = sshfuncs.rgetpid(
600 os.path.join(home, pidfile),
601 host = self.get("hostname"),
602 user = self.get("username"),
603 port = self.get("port"),
604 gwuser = self.get("gatewayUser"),
605 gw = self.get("gateway"),
607 identity = self.get("identity"),
608 server_key = self.get("serverKey"),
609 strict_host_checking = False
614 def status(self, pid, ppid):
616 status = execfuncs.lstatus(pid, ppid)
618 with self._node_lock:
619 status = sshfuncs.rstatus(
621 host = self.get("hostname"),
622 user = self.get("username"),
623 port = self.get("port"),
624 gwuser = self.get("gatewayUser"),
625 gw = self.get("gateway"),
627 identity = self.get("identity"),
628 server_key = self.get("serverKey"),
629 strict_host_checking = False
634 def kill(self, pid, ppid, sudo = False):
637 status = self.status(pid, ppid)
639 if status == sshfuncs.ProcStatus.RUNNING:
641 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
643 with self._node_lock:
644 (out, err), proc = sshfuncs.rkill(
646 host = self.get("hostname"),
647 user = self.get("username"),
648 port = self.get("port"),
649 gwuser = self.get("gatewayUser"),
650 gw = self.get("gateway"),
653 identity = self.get("identity"),
654 server_key = self.get("serverKey"),
655 strict_host_checking = False
658 return (out, err), proc
660 def copy(self, src, dst):
662 (out, err), proc = execfuncs.lcopy(src, dst,
665 with self._node_lock:
666 (out, err), proc = sshfuncs.rcopy(
668 port = self.get("port"),
669 gwuser = self.get("gatewayUser"),
670 gw = self.get("gateway"),
671 identity = self.get("identity"),
672 server_key = self.get("serverKey"),
674 strict_host_checking = False)
676 return (out, err), proc
678 def upload(self, src, dst, text = False, overwrite = True,
679 raise_on_error = True):
680 """ Copy content to destination
682 src string with the content to copy. Can be:
684 - a string with the path to a local file
685 - a string with a semi-colon separeted list of local files
686 - a string with a local directory
688 dst string with destination path on the remote host (remote is
691 text src is text input, it must be stored into a temp file before
694 # If source is a string input
696 if text and not os.path.isfile(src):
697 # src is text input that should be uploaded as file
698 # create a temporal file with the content to upload
699 f = tempfile.NamedTemporaryFile(delete=False)
704 # If dst files should not be overwritten, check that the files do not
706 if isinstance(src, str):
707 src = map(str.strip, src.split(";"))
709 if overwrite == False:
710 src = self.filter_existing_files(src, dst)
712 return ("", ""), None
714 if not self.localhost:
715 # Build destination as <user>@<server>:<path>
716 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
718 ((out, err), proc) = self.copy(src, dst)
725 msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst)
726 self.error(msg, out, err)
728 msg = "%s out: %s err: %s" % (msg, out, err)
730 raise RuntimeError, msg
732 return ((out, err), proc)
734 def download(self, src, dst, raise_on_error = True):
735 if not self.localhost:
736 # Build destination as <user>@<server>:<path>
737 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
739 ((out, err), proc) = self.copy(src, dst)
742 msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst)
743 self.error(msg, out, err)
746 raise RuntimeError, msg
748 return ((out, err), proc)
750 def install_packages_command(self, packages):
753 command = rpmfuncs.install_packages_command(self.os, packages)
755 command = debfuncs.install_packages_command(self.os, packages)
757 msg = "Error installing packages ( OS not known ) "
758 self.error(msg, self.os)
759 raise RuntimeError, msg
763 def install_packages(self, packages, home, run_home = None,
764 raise_on_error = True):
765 """ Install packages in the Linux host.
767 'home' is the directory to upload the package installation script.
768 'run_home' is the directory from where to execute the script.
770 command = self.install_packages_command(packages)
772 run_home = run_home or home
774 (out, err), proc = self.run_and_wait(command, run_home,
775 shfile = os.path.join(home, "instpkg.sh"),
776 pidfile = "instpkg_pidfile",
777 ecodefile = "instpkg_exitcode",
778 stdout = "instpkg_stdout",
779 stderr = "instpkg_stderr",
781 raise_on_error = raise_on_error)
783 return (out, err), proc
785 def remove_packages(self, packages, home, run_home = None,
786 raise_on_error = True):
787 """ Uninstall packages from the Linux host.
789 'home' is the directory to upload the package un-installation script.
790 'run_home' is the directory from where to execute the script.
793 command = rpmfuncs.remove_packages_command(self.os, packages)
795 command = debfuncs.remove_packages_command(self.os, packages)
797 msg = "Error removing packages ( OS not known ) "
799 raise RuntimeError, msg
801 run_home = run_home or home
803 (out, err), proc = self.run_and_wait(command, run_home,
804 shfile = os.path.join(home, "rmpkg.sh"),
805 pidfile = "rmpkg_pidfile",
806 ecodefile = "rmpkg_exitcode",
807 stdout = "rmpkg_stdout",
808 stderr = "rmpkg_stderr",
810 raise_on_error = raise_on_error)
812 return (out, err), proc
814 def mkdir(self, paths, clean = False):
815 """ Paths is either a single remote directory path to create,
816 or a list of directories to create.
821 if isinstance(paths, str):
824 cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
826 return self.execute(cmd, with_lock = True)
828 def rmdir(self, paths):
829 """ Paths is either a single remote directory path to delete,
830 or a list of directories to delete.
833 if isinstance(paths, str):
836 cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
838 return self.execute(cmd, with_lock = True)
840 def run_and_wait(self, command, home,
846 ecodefile="exitcode",
852 raise_on_error=True):
854 Uploads the 'command' to a bash script in the host.
855 Then runs the script detached in background in the host, and
856 busy-waites until the script finishes executing.
859 if not shfile.startswith("/"):
860 shfile = os.path.join(home, shfile)
862 self.upload_command(command,
864 ecodefile = ecodefile,
866 overwrite = overwrite)
868 command = "bash %s" % shfile
869 # run command in background in remote host
870 (out, err), proc = self.run(command, home,
878 # check no errors occurred
880 msg = " Failed to run command '%s' " % command
881 self.error(msg, out, err)
883 raise RuntimeError, msg
885 # Wait for pid file to be generated
886 pid, ppid = self.wait_pid(
889 raise_on_error = raise_on_error)
892 # wait until command finishes to execute
893 self.wait_run(pid, ppid)
895 (eout, err), proc = self.check_errors(home,
896 ecodefile = ecodefile,
899 # Out is what was written in the stderr file
901 msg = " Failed to run command '%s' " % command
902 self.error(msg, eout, err)
905 raise RuntimeError, msg
907 (out, oerr), proc = self.check_output(home, stdout)
909 return (out, err), proc
911 def exitcode(self, home, ecodefile = "exitcode"):
913 Get the exit code of an application.
914 Returns an integer value with the exit code
916 (out, err), proc = self.check_output(home, ecodefile)
918 # Succeeded to open file, return exit code in the file
921 return int(out.strip())
923 # Error in the content of the file!
924 return ExitCode.CORRUPTFILE
926 # No such file or directory
927 if proc.returncode == 1:
928 return ExitCode.FILENOTFOUND
930 # Other error from 'cat'
931 return ExitCode.ERROR
933 def upload_command(self, command,
935 ecodefile="exitcode",
938 """ Saves the command as a bash script file in the remote host, and
939 forces to save the exit code of the command execution to the ecodefile
942 if not (command.strip().endswith(";") or command.strip().endswith("&")):
945 # The exit code of the command will be stored in ecodefile
946 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
948 'ecodefile': ecodefile,
952 environ = self.format_environment(env)
954 # Add environ to command
955 command = environ + command
957 return self.upload(command, shfile, text=True, overwrite=overwrite)
959 def format_environment(self, env, inline=False):
960 """ Formats the environment variables for a command to be executed
961 either as an inline command
962 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
963 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
965 if not env: return ""
967 # Remove extra white spaces
968 env = re.sub(r'\s+', ' ', env.strip())
970 sep = ";" if inline else "\n"
971 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
973 def check_errors(self, home,
974 ecodefile = "exitcode",
976 """ Checks whether errors occurred while running a command.
977 It first checks the exit code for the command, and only if the
978 exit code is an error one it returns the error output.
984 # get exit code saved in the 'exitcode' file
985 ecode = self.exitcode(home, ecodefile)
987 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
988 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
989 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
990 # The process returned an error code or didn't exist.
991 # Check standard error.
992 (err, eerr), proc = self.check_output(home, stderr)
994 # If the stderr file was not found, assume nothing bad happened,
995 # and just ignore the error.
996 # (cat returns 1 for error "No such file or directory")
997 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1000 return ("", err), proc
1002 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1003 """ Waits until the pid file for the command is generated,
1004 and returns the pid and ppid of the process """
1009 pidtuple = self.getpid(home = home, pidfile = pidfile)
1012 pid, ppid = pidtuple
1018 msg = " Failed to get pid for pidfile %s/%s " % (
1023 raise RuntimeError, msg
1027 def wait_run(self, pid, ppid, trial = 0):
1028 """ wait for a remote process to finish execution """
1032 status = self.status(pid, ppid)
1034 if status is ProcStatus.FINISHED:
1036 elif status is not ProcStatus.RUNNING:
1039 # If it takes more than 20 seconds to start, then
1040 # asume something went wrong
1044 # The app is running, just wait...
1047 def check_output(self, home, filename):
1048 """ Retrives content of file """
1049 (out, err), proc = self.execute("cat %s" %
1050 os.path.join(home, filename), retry = 1, with_lock = True)
1051 return (out, err), proc
1054 """ Checks if host is responsive
1060 msg = "Unresponsive host. Wrong answer. "
1062 # The underlying SSH layer will sometimes return an empty
1063 # output (even if the command was executed without errors).
1064 # To work arround this, repeat the operation N times or
1065 # until the result is not empty string
1067 (out, err), proc = self.execute("echo 'ALIVE'",
1071 if out.find("ALIVE") > -1:
1074 trace = traceback.format_exc()
1075 msg = "Unresponsive host. Error reaching host: %s " % trace
1077 self.error(msg, out, err)
1080 def find_home(self):
1081 """ Retrieves host home directory
1083 # The underlying SSH layer will sometimes return an empty
1084 # output (even if the command was executed without errors).
1085 # To work arround this, repeat the operation N times or
1086 # until the result is not empty string
1087 msg = "Impossible to retrieve HOME directory"
1089 (out, err), proc = self.execute("echo ${HOME}",
1093 if out.strip() != "":
1094 self._home_dir = out.strip()
1096 trace = traceback.format_exc()
1097 msg = "Impossible to retrieve HOME directory %s" % trace
1099 if not self._home_dir:
1101 raise RuntimeError, msg
1103 def filter_existing_files(self, src, dst):
1104 """ Removes files that already exist in the Linux host from src list
1106 # construct a dictionary with { dst: src }
1107 dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1108 if len(src) > 1 else dict({dst: src[0]})
1111 for d in dests.keys():
1112 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1114 command = ";".join(command)
1116 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1118 for d in dests.keys():
1119 if out.find(d) > -1:
1125 return dests.values()