2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError, msg
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 msg = "Deploy failed. Unresponsive node {}".format(self.get("hostname"))
357 raise RuntimeError, msg
361 if self.get("cleanProcesses"):
362 self.clean_processes()
364 if self.get("cleanHome"):
367 if self.get("cleanExperiment"):
368 self.clean_experiment()
370 # Create shared directory structure and node home directory
371 paths = [self.lib_dir,
379 # Get Public IP address if possible
380 if not self.get("ip"):
382 ip = sshfuncs.gethostbyname(self.get("hostname"))
385 if self.get("gateway") is None:
386 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
389 super(LinuxNode, self).do_provision()
392 if self.state == ResourceState.NEW:
393 self.info("Deploying node")
397 # Node needs to wait until all associated interfaces are
398 # ready before it can finalize deployment
399 from nepi.resources.linux.interface import LinuxInterface
400 ifaces = self.get_connected(LinuxInterface.get_rtype())
402 if iface.state < ResourceState.READY:
403 self.ec.schedule(self.reschedule_delay, self.deploy)
406 super(LinuxNode, self).do_deploy()
408 def do_release(self):
409 rms = self.get_connected()
411 # Node needs to wait until all associated RMs are released
412 # before it can be released
413 if rm.state != ResourceState.RELEASED:
414 self.ec.schedule(self.reschedule_delay, self.release)
417 tear_down = self.get("tearDown")
419 self.execute(tear_down)
421 if self.get("cleanProcessesAfter"):
422 self.clean_processes()
424 super(LinuxNode, self).do_release()
426 def valid_connection(self, guid):
430 def clean_processes(self):
431 self.info("Cleaning up processes")
436 if self.get("username") != 'root':
437 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
438 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
439 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
441 if self.state >= ResourceState.READY:
443 pids = pickle.load(open("/tmp/save.proc", "rb"))
445 ps_aux = "ps aux |awk '{print $2,$11}'"
446 (out, err), proc = self.execute(ps_aux)
448 for line in out.strip().split("\n"):
449 parts = line.strip().split(" ")
450 pids_temp[parts[0]] = parts[1]
451 kill_pids = set(pids_temp.items()) - set(pids.items())
452 kill_pids = ' '.join(dict(kill_pids).keys())
454 cmd = ("killall tcpdump || /bin/true ; " +
455 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
456 "kill {} || /bin/true ; ".format(kill_pids))
458 cmd = ("killall tcpdump || /bin/true ; " +
459 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
461 cmd = ("killall tcpdump || /bin/true ; " +
462 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
464 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
466 def clean_home(self):
467 """ Cleans all NEPI related folders in the Linux host
469 self.info("Cleaning up home")
471 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
472 .format(self.home_dir)
474 return self.execute(cmd, with_lock = True)
476 def clean_experiment(self):
477 """ Cleans all experiment related files in the Linux host.
478 It preserves NEPI files and folders that have a multi experiment
481 self.info("Cleaning up experiment files")
483 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
484 .format(self.exp_dir, self.ec.exp_id)
486 return self.execute(cmd, with_lock = True)
488 def execute(self, command,
494 connect_timeout = 30,
495 strict_host_checking = False,
500 """ Notice that this invocation will block until the
501 execution finishes. If this is not the desired behavior,
502 use 'run' instead."""
505 (out, err), proc = execfuncs.lexec(
507 user = self.get("username"), # still problem with localhost
512 # If the execute command is blocking, we don't want to keep
513 # the node lock. This lock is used to avoid race conditions
514 # when creating the ControlMaster sockets. A more elegant
515 # solution is needed.
516 with self._node_lock:
517 (out, err), proc = sshfuncs.rexec(
519 host = self.get("hostname"),
520 user = self.get("username"),
521 port = self.get("port"),
522 gwuser = self.get("gatewayUser"),
523 gw = self.get("gateway"),
526 identity = self.get("identity"),
527 server_key = self.get("serverKey"),
530 forward_x11 = forward_x11,
532 connect_timeout = connect_timeout,
533 persistent = persistent,
535 strict_host_checking = strict_host_checking
538 (out, err), proc = sshfuncs.rexec(
540 host = self.get("hostname"),
541 user = self.get("username"),
542 port = self.get("port"),
543 gwuser = self.get("gatewayUser"),
544 gw = self.get("gateway"),
547 identity = self.get("identity"),
548 server_key = self.get("serverKey"),
551 forward_x11 = forward_x11,
553 connect_timeout = connect_timeout,
554 persistent = persistent,
556 strict_host_checking = strict_host_checking
559 return (out, err), proc
561 def run(self, command, home,
569 strict_host_checking = False):
571 self.debug("Running command '{}'".format(command))
574 (out, err), proc = execfuncs.lspawn(
577 create_home = create_home,
578 stdin = stdin or '/dev/null',
579 stdout = stdout or '/dev/null',
580 stderr = stderr or '/dev/null',
583 with self._node_lock:
584 (out, err), proc = sshfuncs.rspawn(
588 create_home = create_home,
589 stdin = stdin or '/dev/null',
590 stdout = stdout or '/dev/null',
591 stderr = stderr or '/dev/null',
593 host = self.get("hostname"),
594 user = self.get("username"),
595 port = self.get("port"),
596 gwuser = self.get("gatewayUser"),
597 gw = self.get("gateway"),
599 identity = self.get("identity"),
600 server_key = self.get("serverKey"),
602 strict_host_checking = strict_host_checking
605 return (out, err), proc
607 def getpid(self, home, pidfile = "pidfile"):
609 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
611 with self._node_lock:
612 pidtuple = sshfuncs.rgetpid(
613 os.path.join(home, pidfile),
614 host = self.get("hostname"),
615 user = self.get("username"),
616 port = self.get("port"),
617 gwuser = self.get("gatewayUser"),
618 gw = self.get("gateway"),
620 identity = self.get("identity"),
621 server_key = self.get("serverKey"),
622 strict_host_checking = False
627 def status(self, pid, ppid):
629 status = execfuncs.lstatus(pid, ppid)
631 with self._node_lock:
632 status = sshfuncs.rstatus(
634 host = self.get("hostname"),
635 user = self.get("username"),
636 port = self.get("port"),
637 gwuser = self.get("gatewayUser"),
638 gw = self.get("gateway"),
640 identity = self.get("identity"),
641 server_key = self.get("serverKey"),
642 strict_host_checking = False
647 def kill(self, pid, ppid, sudo = False):
650 status = self.status(pid, ppid)
652 if status == sshfuncs.ProcStatus.RUNNING:
654 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
656 with self._node_lock:
657 (out, err), proc = sshfuncs.rkill(
659 host = self.get("hostname"),
660 user = self.get("username"),
661 port = self.get("port"),
662 gwuser = self.get("gatewayUser"),
663 gw = self.get("gateway"),
666 identity = self.get("identity"),
667 server_key = self.get("serverKey"),
668 strict_host_checking = False
671 return (out, err), proc
673 def copy(self, src, dst):
675 (out, err), proc = execfuncs.lcopy(
679 with self._node_lock:
680 (out, err), proc = sshfuncs.rcopy(
682 port = self.get("port"),
683 gwuser = self.get("gatewayUser"),
684 gw = self.get("gateway"),
685 identity = self.get("identity"),
686 server_key = self.get("serverKey"),
688 strict_host_checking = False)
690 return (out, err), proc
692 def upload(self, src, dst, text = False, overwrite = True,
693 raise_on_error = True):
694 """ Copy content to destination
696 src string with the content to copy. Can be:
698 - a string with the path to a local file
699 - a string with a semi-colon separeted list of local files
700 - a string with a local directory
702 dst string with destination path on the remote host (remote is
705 text src is text input, it must be stored into a temp file before
708 # If source is a string input
710 if text and not os.path.isfile(src):
711 # src is text input that should be uploaded as file
712 # create a temporal file with the content to upload
713 f = tempfile.NamedTemporaryFile(delete=False)
718 # If dst files should not be overwritten, check that the files do not
720 if isinstance(src, str):
721 src = map(str.strip, src.split(";"))
723 if overwrite == False:
724 src = self.filter_existing_files(src, dst)
726 return ("", ""), None
728 if not self.localhost:
729 # Build destination as <user>@<server>:<path>
730 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
732 ((out, err), proc) = self.copy(src, dst)
739 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
740 self.error(msg, out, err)
742 msg = "{} out: {} err: {}".format(msg, out, err)
744 raise RuntimeError, msg
746 return ((out, err), proc)
748 def download(self, src, dst, raise_on_error = True):
749 if not self.localhost:
750 # Build destination as <user>@<server>:<path>
751 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
753 ((out, err), proc) = self.copy(src, dst)
756 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
757 self.error(msg, out, err)
760 raise RuntimeError, msg
762 return ((out, err), proc)
764 def install_packages_command(self, packages):
767 command = rpmfuncs.install_packages_command(self.os, packages)
769 command = debfuncs.install_packages_command(self.os, packages)
771 msg = "Error installing packages ( OS not known ) "
772 self.error(msg, self.os)
773 raise RuntimeError, msg
777 def install_packages(self, packages, home,
779 raise_on_error = True):
780 """ Install packages in the Linux host.
782 'home' is the directory to upload the package installation script.
783 'run_home' is the directory from where to execute the script.
785 command = self.install_packages_command(packages)
787 run_home = run_home or home
789 (out, err), proc = self.run_and_wait(command, run_home,
790 shfile = os.path.join(home, "instpkg.sh"),
791 pidfile = "instpkg_pidfile",
792 ecodefile = "instpkg_exitcode",
793 stdout = "instpkg_stdout",
794 stderr = "instpkg_stderr",
796 raise_on_error = raise_on_error)
798 return (out, err), proc
800 def remove_packages(self, packages, home, run_home = None,
801 raise_on_error = True):
802 """ Uninstall packages from the Linux host.
804 'home' is the directory to upload the package un-installation script.
805 'run_home' is the directory from where to execute the script.
808 command = rpmfuncs.remove_packages_command(self.os, packages)
810 command = debfuncs.remove_packages_command(self.os, packages)
812 msg = "Error removing packages ( OS not known ) "
814 raise RuntimeError, msg
816 run_home = run_home or home
818 (out, err), proc = self.run_and_wait(command, run_home,
819 shfile = os.path.join(home, "rmpkg.sh"),
820 pidfile = "rmpkg_pidfile",
821 ecodefile = "rmpkg_exitcode",
822 stdout = "rmpkg_stdout",
823 stderr = "rmpkg_stderr",
825 raise_on_error = raise_on_error)
827 return (out, err), proc
829 def mkdir(self, paths, clean = False):
830 """ Paths is either a single remote directory path to create,
831 or a list of directories to create.
836 if isinstance(paths, str):
839 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
841 return self.execute(cmd, with_lock = True)
843 def rmdir(self, paths):
844 """ Paths is either a single remote directory path to delete,
845 or a list of directories to delete.
848 if isinstance(paths, str):
851 cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
853 return self.execute(cmd, with_lock = True)
855 def run_and_wait(self, command, home,
861 ecodefile="exitcode",
867 raise_on_error=True):
869 Uploads the 'command' to a bash script in the host.
870 Then runs the script detached in background in the host, and
871 busy-waites until the script finishes executing.
874 if not shfile.startswith("/"):
875 shfile = os.path.join(home, shfile)
877 self.upload_command(command,
879 ecodefile = ecodefile,
881 overwrite = overwrite)
883 command = "bash {}".format(shfile)
884 # run command in background in remote host
885 (out, err), proc = self.run(command, home,
893 # check no errors occurred
895 msg = " Failed to run command '{}' ".format(command)
896 self.error(msg, out, err)
898 raise RuntimeError, msg
900 # Wait for pid file to be generated
901 pid, ppid = self.wait_pid(
904 raise_on_error = raise_on_error)
907 # wait until command finishes to execute
908 self.wait_run(pid, ppid)
910 (eout, err), proc = self.check_errors(home,
911 ecodefile = ecodefile,
914 # Out is what was written in the stderr file
916 msg = " Failed to run command '{}' ".format(command)
917 self.error(msg, eout, err)
920 raise RuntimeError, msg
922 (out, oerr), proc = self.check_output(home, stdout)
924 return (out, err), proc
926 def exitcode(self, home, ecodefile = "exitcode"):
928 Get the exit code of an application.
929 Returns an integer value with the exit code
931 (out, err), proc = self.check_output(home, ecodefile)
933 # Succeeded to open file, return exit code in the file
936 return int(out.strip())
938 # Error in the content of the file!
939 return ExitCode.CORRUPTFILE
941 # No such file or directory
942 if proc.returncode == 1:
943 return ExitCode.FILENOTFOUND
945 # Other error from 'cat'
946 return ExitCode.ERROR
948 def upload_command(self, command,
950 ecodefile="exitcode",
953 """ Saves the command as a bash script file in the remote host, and
954 forces to save the exit code of the command execution to the ecodefile
957 if not (command.strip().endswith(";") or command.strip().endswith("&")):
960 # The exit code of the command will be stored in ecodefile
961 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
962 .format(command=command, ecodefile=ecodefile)
965 environ = self.format_environment(env)
967 # Add environ to command
968 command = environ + command
970 return self.upload(command, shfile, text=True, overwrite=overwrite)
972 def format_environment(self, env, inline=False):
973 """ Formats the environment variables for a command to be executed
974 either as an inline command
975 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
976 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
978 if not env: return ""
980 # Remove extra white spaces
981 env = re.sub(r'\s+', ' ', env.strip())
983 sep = ";" if inline else "\n"
984 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
986 def check_errors(self, home,
987 ecodefile = "exitcode",
989 """ Checks whether errors occurred while running a command.
990 It first checks the exit code for the command, and only if the
991 exit code is an error one it returns the error output.
997 # get exit code saved in the 'exitcode' file
998 ecode = self.exitcode(home, ecodefile)
1000 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1001 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1002 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1003 # The process returned an error code or didn't exist.
1004 # Check standard error.
1005 (err, eerr), proc = self.check_output(home, stderr)
1007 # If the stderr file was not found, assume nothing bad happened,
1008 # and just ignore the error.
1009 # (cat returns 1 for error "No such file or directory")
1010 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1013 return ("", err), proc
1015 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1016 """ Waits until the pid file for the command is generated,
1017 and returns the pid and ppid of the process """
1022 pidtuple = self.getpid(home = home, pidfile = pidfile)
1025 pid, ppid = pidtuple
1031 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1035 raise RuntimeError, msg
1039 def wait_run(self, pid, ppid, trial = 0):
1040 """ wait for a remote process to finish execution """
1044 status = self.status(pid, ppid)
1046 if status is ProcStatus.FINISHED:
1048 elif status is not ProcStatus.RUNNING:
1051 # If it takes more than 20 seconds to start, then
1052 # asume something went wrong
1056 # The app is running, just wait...
1059 def check_output(self, home, filename):
1060 """ Retrives content of file """
1061 (out, err), proc = self.execute(
1062 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1063 return (out, err), proc
1066 """ Checks if host is responsive
1072 msg = "Unresponsive host. Wrong answer. "
1074 # The underlying SSH layer will sometimes return an empty
1075 # output (even if the command was executed without errors).
1076 # To work arround this, repeat the operation N times or
1077 # until the result is not empty string
1079 (out, err), proc = self.execute("echo 'ALIVE'",
1083 if out.find("ALIVE") > -1:
1086 trace = traceback.format_exc()
1087 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1089 self.error(msg, out, err)
1092 def find_home(self):
1094 Retrieves host home directory
1096 # The underlying SSH layer will sometimes return an empty
1097 # output (even if the command was executed without errors).
1098 # To work arround this, repeat the operation N times or
1099 # until the result is not empty string
1100 msg = "Impossible to retrieve HOME directory"
1102 (out, err), proc = self.execute("echo ${HOME}",
1106 if out.strip() != "":
1107 self._home_dir = out.strip()
1109 trace = traceback.format_exc()
1110 msg = "Impossible to retrieve HOME directory {}".format(trace)
1112 if not self._home_dir:
1114 raise RuntimeError, msg
1116 def filter_existing_files(self, src, dst):
1117 """ Removes files that already exist in the Linux host from src list
1119 # construct a dictionary with { dst: src }
1120 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1121 if len(src) > 1 else {dst: src[0]}
1124 for d in dests.keys():
1125 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1127 command = ";".join(command)
1129 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1131 for d in dests.keys():
1132 if out.find(d) > -1:
1138 return dests.values()