2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError, msg
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 trace = traceback.format_exc()
356 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
358 raise RuntimeError, msg
362 if self.get("cleanProcesses"):
363 self.clean_processes()
365 if self.get("cleanHome"):
368 if self.get("cleanExperiment"):
369 self.clean_experiment()
371 # Create shared directory structure and node home directory
372 paths = [self.lib_dir,
380 # Get Public IP address if possible
381 if not self.get("ip"):
383 ip = sshfuncs.gethostbyname(self.get("hostname"))
386 if self.get("gateway") is None:
387 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390 super(LinuxNode, self).do_provision()
393 if self.state == ResourceState.NEW:
394 self.info("Deploying node")
398 # Node needs to wait until all associated interfaces are
399 # ready before it can finalize deployment
400 from nepi.resources.linux.interface import LinuxInterface
401 ifaces = self.get_connected(LinuxInterface.get_rtype())
403 if iface.state < ResourceState.READY:
404 self.ec.schedule(self.reschedule_delay, self.deploy)
407 super(LinuxNode, self).do_deploy()
409 def do_release(self):
410 rms = self.get_connected()
412 # Node needs to wait until all associated RMs are released
413 # before it can be released
414 if rm.state != ResourceState.RELEASED:
415 self.ec.schedule(self.reschedule_delay, self.release)
418 tear_down = self.get("tearDown")
420 self.execute(tear_down)
422 if self.get("cleanProcessesAfter"):
423 self.clean_processes()
425 super(LinuxNode, self).do_release()
427 def valid_connection(self, guid):
431 def clean_processes(self):
432 self.info("Cleaning up processes")
437 if self.get("username") != 'root':
438 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
442 if self.state >= ResourceState.READY:
444 pids = pickle.load(open("/tmp/save.proc", "rb"))
446 ps_aux = "ps aux |awk '{print $2,$11}'"
447 (out, err), proc = self.execute(ps_aux)
449 for line in out.strip().split("\n"):
450 parts = line.strip().split(" ")
451 pids_temp[parts[0]] = parts[1]
452 kill_pids = set(pids_temp.items()) - set(pids.items())
453 kill_pids = ' '.join(dict(kill_pids).keys())
455 cmd = ("killall tcpdump || /bin/true ; " +
456 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
457 "kill {} || /bin/true ; ".format(kill_pids))
459 cmd = ("killall tcpdump || /bin/true ; " +
460 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
462 cmd = ("killall tcpdump || /bin/true ; " +
463 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
465 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
467 def clean_home(self):
468 """ Cleans all NEPI related folders in the Linux host
470 self.info("Cleaning up home")
472 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
473 .format(self.home_dir)
475 return self.execute(cmd, with_lock = True)
477 def clean_experiment(self):
478 """ Cleans all experiment related files in the Linux host.
479 It preserves NEPI files and folders that have a multi experiment
482 self.info("Cleaning up experiment files")
484 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
485 .format(self.exp_dir, self.ec.exp_id)
487 return self.execute(cmd, with_lock = True)
489 def execute(self, command,
495 connect_timeout = 30,
496 strict_host_checking = False,
501 """ Notice that this invocation will block until the
502 execution finishes. If this is not the desired behavior,
503 use 'run' instead."""
506 (out, err), proc = execfuncs.lexec(
508 user = self.get("username"), # still problem with localhost
513 # If the execute command is blocking, we don't want to keep
514 # the node lock. This lock is used to avoid race conditions
515 # when creating the ControlMaster sockets. A more elegant
516 # solution is needed.
517 with self._node_lock:
518 (out, err), proc = sshfuncs.rexec(
520 host = self.get("hostname"),
521 user = self.get("username"),
522 port = self.get("port"),
523 gwuser = self.get("gatewayUser"),
524 gw = self.get("gateway"),
527 identity = self.get("identity"),
528 server_key = self.get("serverKey"),
531 forward_x11 = forward_x11,
533 connect_timeout = connect_timeout,
534 persistent = persistent,
536 strict_host_checking = strict_host_checking
539 (out, err), proc = sshfuncs.rexec(
541 host = self.get("hostname"),
542 user = self.get("username"),
543 port = self.get("port"),
544 gwuser = self.get("gatewayUser"),
545 gw = self.get("gateway"),
548 identity = self.get("identity"),
549 server_key = self.get("serverKey"),
552 forward_x11 = forward_x11,
554 connect_timeout = connect_timeout,
555 persistent = persistent,
557 strict_host_checking = strict_host_checking
560 return (out, err), proc
562 def run(self, command, home,
570 strict_host_checking = False):
572 self.debug("Running command '{}'".format(command))
575 (out, err), proc = execfuncs.lspawn(
578 create_home = create_home,
579 stdin = stdin or '/dev/null',
580 stdout = stdout or '/dev/null',
581 stderr = stderr or '/dev/null',
584 with self._node_lock:
585 (out, err), proc = sshfuncs.rspawn(
589 create_home = create_home,
590 stdin = stdin or '/dev/null',
591 stdout = stdout or '/dev/null',
592 stderr = stderr or '/dev/null',
594 host = self.get("hostname"),
595 user = self.get("username"),
596 port = self.get("port"),
597 gwuser = self.get("gatewayUser"),
598 gw = self.get("gateway"),
600 identity = self.get("identity"),
601 server_key = self.get("serverKey"),
603 strict_host_checking = strict_host_checking
606 return (out, err), proc
608 def getpid(self, home, pidfile = "pidfile"):
610 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
612 with self._node_lock:
613 pidtuple = sshfuncs.rgetpid(
614 os.path.join(home, pidfile),
615 host = self.get("hostname"),
616 user = self.get("username"),
617 port = self.get("port"),
618 gwuser = self.get("gatewayUser"),
619 gw = self.get("gateway"),
621 identity = self.get("identity"),
622 server_key = self.get("serverKey"),
623 strict_host_checking = False
628 def status(self, pid, ppid):
630 status = execfuncs.lstatus(pid, ppid)
632 with self._node_lock:
633 status = sshfuncs.rstatus(
635 host = self.get("hostname"),
636 user = self.get("username"),
637 port = self.get("port"),
638 gwuser = self.get("gatewayUser"),
639 gw = self.get("gateway"),
641 identity = self.get("identity"),
642 server_key = self.get("serverKey"),
643 strict_host_checking = False
648 def kill(self, pid, ppid, sudo = False):
651 status = self.status(pid, ppid)
653 if status == sshfuncs.ProcStatus.RUNNING:
655 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
657 with self._node_lock:
658 (out, err), proc = sshfuncs.rkill(
660 host = self.get("hostname"),
661 user = self.get("username"),
662 port = self.get("port"),
663 gwuser = self.get("gatewayUser"),
664 gw = self.get("gateway"),
667 identity = self.get("identity"),
668 server_key = self.get("serverKey"),
669 strict_host_checking = False
672 return (out, err), proc
674 def copy(self, src, dst):
676 (out, err), proc = execfuncs.lcopy(
680 with self._node_lock:
681 (out, err), proc = sshfuncs.rcopy(
683 port = self.get("port"),
684 gwuser = self.get("gatewayUser"),
685 gw = self.get("gateway"),
686 identity = self.get("identity"),
687 server_key = self.get("serverKey"),
689 strict_host_checking = False)
691 return (out, err), proc
693 def upload(self, src, dst, text = False, overwrite = True,
694 raise_on_error = True):
695 """ Copy content to destination
697 src string with the content to copy. Can be:
699 - a string with the path to a local file
700 - a string with a semi-colon separeted list of local files
701 - a string with a local directory
703 dst string with destination path on the remote host (remote is
706 text src is text input, it must be stored into a temp file before
709 # If source is a string input
711 if text and not os.path.isfile(src):
712 # src is text input that should be uploaded as file
713 # create a temporal file with the content to upload
714 f = tempfile.NamedTemporaryFile(delete=False)
719 # If dst files should not be overwritten, check that the files do not
721 if isinstance(src, str):
722 src = map(str.strip, src.split(";"))
724 if overwrite == False:
725 src = self.filter_existing_files(src, dst)
727 return ("", ""), None
729 if not self.localhost:
730 # Build destination as <user>@<server>:<path>
731 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
733 ((out, err), proc) = self.copy(src, dst)
740 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
741 self.error(msg, out, err)
743 msg = "{} out: {} err: {}".format(msg, out, err)
745 raise RuntimeError, msg
747 return ((out, err), proc)
749 def download(self, src, dst, raise_on_error = True):
750 if not self.localhost:
751 # Build destination as <user>@<server>:<path>
752 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
754 ((out, err), proc) = self.copy(src, dst)
757 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
758 self.error(msg, out, err)
761 raise RuntimeError, msg
763 return ((out, err), proc)
765 def install_packages_command(self, packages):
768 command = rpmfuncs.install_packages_command(self.os, packages)
770 command = debfuncs.install_packages_command(self.os, packages)
772 msg = "Error installing packages ( OS not known ) "
773 self.error(msg, self.os)
774 raise RuntimeError, msg
778 def install_packages(self, packages, home,
780 raise_on_error = True):
781 """ Install packages in the Linux host.
783 'home' is the directory to upload the package installation script.
784 'run_home' is the directory from where to execute the script.
786 command = self.install_packages_command(packages)
788 run_home = run_home or home
790 (out, err), proc = self.run_and_wait(command, run_home,
791 shfile = os.path.join(home, "instpkg.sh"),
792 pidfile = "instpkg_pidfile",
793 ecodefile = "instpkg_exitcode",
794 stdout = "instpkg_stdout",
795 stderr = "instpkg_stderr",
797 raise_on_error = raise_on_error)
799 return (out, err), proc
801 def remove_packages(self, packages, home, run_home = None,
802 raise_on_error = True):
803 """ Uninstall packages from the Linux host.
805 'home' is the directory to upload the package un-installation script.
806 'run_home' is the directory from where to execute the script.
809 command = rpmfuncs.remove_packages_command(self.os, packages)
811 command = debfuncs.remove_packages_command(self.os, packages)
813 msg = "Error removing packages ( OS not known ) "
815 raise RuntimeError, msg
817 run_home = run_home or home
819 (out, err), proc = self.run_and_wait(command, run_home,
820 shfile = os.path.join(home, "rmpkg.sh"),
821 pidfile = "rmpkg_pidfile",
822 ecodefile = "rmpkg_exitcode",
823 stdout = "rmpkg_stdout",
824 stderr = "rmpkg_stderr",
826 raise_on_error = raise_on_error)
828 return (out, err), proc
830 def mkdir(self, paths, clean = False):
831 """ Paths is either a single remote directory path to create,
832 or a list of directories to create.
837 if isinstance(paths, str):
840 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
842 return self.execute(cmd, with_lock = True)
844 def rmdir(self, paths):
845 """ Paths is either a single remote directory path to delete,
846 or a list of directories to delete.
849 if isinstance(paths, str):
852 cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
854 return self.execute(cmd, with_lock = True)
856 def run_and_wait(self, command, home,
862 ecodefile="exitcode",
868 raise_on_error=True):
870 Uploads the 'command' to a bash script in the host.
871 Then runs the script detached in background in the host, and
872 busy-waites until the script finishes executing.
875 if not shfile.startswith("/"):
876 shfile = os.path.join(home, shfile)
878 self.upload_command(command,
880 ecodefile = ecodefile,
882 overwrite = overwrite)
884 command = "bash {}".format(shfile)
885 # run command in background in remote host
886 (out, err), proc = self.run(command, home,
894 # check no errors occurred
896 msg = " Failed to run command '{}' ".format(command)
897 self.error(msg, out, err)
899 raise RuntimeError, msg
901 # Wait for pid file to be generated
902 pid, ppid = self.wait_pid(
905 raise_on_error = raise_on_error)
908 # wait until command finishes to execute
909 self.wait_run(pid, ppid)
911 (eout, err), proc = self.check_errors(home,
912 ecodefile = ecodefile,
915 # Out is what was written in the stderr file
917 msg = " Failed to run command '{}' ".format(command)
918 self.error(msg, eout, err)
921 raise RuntimeError, msg
923 (out, oerr), proc = self.check_output(home, stdout)
925 return (out, err), proc
927 def exitcode(self, home, ecodefile = "exitcode"):
929 Get the exit code of an application.
930 Returns an integer value with the exit code
932 (out, err), proc = self.check_output(home, ecodefile)
934 # Succeeded to open file, return exit code in the file
937 return int(out.strip())
939 # Error in the content of the file!
940 return ExitCode.CORRUPTFILE
942 # No such file or directory
943 if proc.returncode == 1:
944 return ExitCode.FILENOTFOUND
946 # Other error from 'cat'
947 return ExitCode.ERROR
949 def upload_command(self, command,
951 ecodefile="exitcode",
954 """ Saves the command as a bash script file in the remote host, and
955 forces to save the exit code of the command execution to the ecodefile
958 if not (command.strip().endswith(";") or command.strip().endswith("&")):
961 # The exit code of the command will be stored in ecodefile
962 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
963 .format(command=command, ecodefile=ecodefile)
966 environ = self.format_environment(env)
968 # Add environ to command
969 command = environ + command
971 return self.upload(command, shfile, text=True, overwrite=overwrite)
973 def format_environment(self, env, inline=False):
974 """ Formats the environment variables for a command to be executed
975 either as an inline command
976 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
977 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
979 if not env: return ""
981 # Remove extra white spaces
982 env = re.sub(r'\s+', ' ', env.strip())
984 sep = ";" if inline else "\n"
985 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
987 def check_errors(self, home,
988 ecodefile = "exitcode",
990 """ Checks whether errors occurred while running a command.
991 It first checks the exit code for the command, and only if the
992 exit code is an error one it returns the error output.
998 # get exit code saved in the 'exitcode' file
999 ecode = self.exitcode(home, ecodefile)
1001 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1002 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1003 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1004 # The process returned an error code or didn't exist.
1005 # Check standard error.
1006 (err, eerr), proc = self.check_output(home, stderr)
1008 # If the stderr file was not found, assume nothing bad happened,
1009 # and just ignore the error.
1010 # (cat returns 1 for error "No such file or directory")
1011 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1014 return ("", err), proc
1016 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1017 """ Waits until the pid file for the command is generated,
1018 and returns the pid and ppid of the process """
1023 pidtuple = self.getpid(home = home, pidfile = pidfile)
1026 pid, ppid = pidtuple
1032 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1036 raise RuntimeError, msg
1040 def wait_run(self, pid, ppid, trial = 0):
1041 """ wait for a remote process to finish execution """
1045 status = self.status(pid, ppid)
1047 if status is ProcStatus.FINISHED:
1049 elif status is not ProcStatus.RUNNING:
1052 # If it takes more than 20 seconds to start, then
1053 # asume something went wrong
1057 # The app is running, just wait...
1060 def check_output(self, home, filename):
1061 """ Retrives content of file """
1062 (out, err), proc = self.execute(
1063 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1064 return (out, err), proc
1067 """ Checks if host is responsive
1073 msg = "Unresponsive host. Wrong answer. "
1075 # The underlying SSH layer will sometimes return an empty
1076 # output (even if the command was executed without errors).
1077 # To work arround this, repeat the operation N times or
1078 # until the result is not empty string
1080 (out, err), proc = self.execute("echo 'ALIVE'",
1084 if out.find("ALIVE") > -1:
1087 trace = traceback.format_exc()
1088 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1090 self.error(msg, out, err)
1093 def find_home(self):
1095 Retrieves host home directory
1097 # The underlying SSH layer will sometimes return an empty
1098 # output (even if the command was executed without errors).
1099 # To work arround this, repeat the operation N times or
1100 # until the result is not empty string
1101 msg = "Impossible to retrieve HOME directory"
1103 (out, err), proc = self.execute("echo ${HOME}",
1107 if out.strip() != "":
1108 self._home_dir = out.strip()
1110 trace = traceback.format_exc()
1111 msg = "Impossible to retrieve HOME directory {}".format(trace)
1113 if not self._home_dir:
1115 raise RuntimeError, msg
1117 def filter_existing_files(self, src, dst):
1118 """ Removes files that already exist in the Linux host from src list
1120 # construct a dictionary with { dst: src }
1121 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1122 if len(src) > 1 else {dst: src[0]}
1125 for d in dests.keys():
1126 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1128 command = ";".join(command)
1130 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1132 for d in dests.keys():
1133 if out.find(d) > -1:
1139 return dests.values()