2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 hostname = Attribute("hostname", "Hostname of the machine",
151 flags = Flags.Design)
153 username = Attribute("username", "Local account username",
154 flags = Flags.Credential)
156 port = Attribute("port", "SSH port", flags = Flags.Design)
158 home = Attribute("home",
159 "Experiment home directory to store all experiment related files",
160 flags = Flags.Design)
162 identity = Attribute("identity", "SSH identity file",
163 flags = Flags.Credential)
165 server_key = Attribute("serverKey", "Server public key",
166 flags = Flags.Design)
168 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
169 " from node home folder before starting experiment",
172 flags = Flags.Design)
174 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
175 " from a previous same experiment, before the new experiment starts",
178 flags = Flags.Design)
180 clean_processes = Attribute("cleanProcesses",
181 "Kill all running processes before starting experiment",
184 flags = Flags.Design)
186 clean_processes_after = Attribute("cleanProcessesAfter",
187 """Kill all running processes after starting experiment
188 This might be dangerous when using user root""",
191 flags = Flags.Design)
193 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
194 "releasing the resource",
195 flags = Flags.Design)
197 gateway_user = Attribute("gatewayUser", "Gateway account username",
198 flags = Flags.Design)
200 gateway = Attribute("gateway", "Hostname of the gateway machine",
201 flags = Flags.Design)
203 ip = Attribute("ip", "Linux host public IP address. "
204 "Must not be modified by the user unless hostname is 'localhost'",
205 flags = Flags.Design)
207 cls._register_attribute(hostname)
208 cls._register_attribute(username)
209 cls._register_attribute(port)
210 cls._register_attribute(home)
211 cls._register_attribute(identity)
212 cls._register_attribute(server_key)
213 cls._register_attribute(clean_home)
214 cls._register_attribute(clean_experiment)
215 cls._register_attribute(clean_processes)
216 cls._register_attribute(clean_processes_after)
217 cls._register_attribute(tear_down)
218 cls._register_attribute(gateway_user)
219 cls._register_attribute(gateway)
220 cls._register_attribute(ip)
222 def __init__(self, ec, guid):
223 super(LinuxNode, self).__init__(ec, guid)
225 # home directory at Linux host
228 # lock to prevent concurrent applications on the same node,
229 # to execute commands at the same time. There are potential
230 # concurrency issues when using SSH to a same host from
231 # multiple threads. There are also possible operational
232 # issues, e.g. an application querying the existence
233 # of a file or folder prior to its creation, and another
234 # application creating the same file or folder in between.
235 self._node_lock = threading.Lock()
237 def log_message(self, msg):
238 return " guid %d - host %s - %s " % (self.guid,
239 self.get("hostname"), msg)
243 home = self.get("home") or ""
244 if not home.startswith("/"):
245 home = os.path.join(self._home_dir, home)
250 return os.path.join(self.home_dir, ".nepi")
254 return os.path.join(self.nepi_home, "nepi-usr")
258 return os.path.join(self.usr_dir, "lib")
262 return os.path.join(self.usr_dir, "bin")
266 return os.path.join(self.usr_dir, "src")
270 return os.path.join(self.usr_dir, "share")
274 return os.path.join(self.nepi_home, "nepi-exp")
278 return os.path.join(self.exp_dir, self.ec.exp_id)
282 return os.path.join(self.exp_home, "node-%d" % self.guid)
286 return os.path.join(self.node_home, self.ec.run_id)
293 if not self.localhost and not self.get("username"):
294 msg = "Can't resolve OS, insufficient data "
296 raise RuntimeError, msg
300 if out.find("Debian") == 0:
301 self._os = OSType.DEBIAN
302 elif out.find("Ubuntu") ==0:
303 self._os = OSType.UBUNTU
304 elif out.find("Fedora release") == 0:
305 self._os = OSType.FEDORA
306 if out.find("Fedora release 8") == 0:
307 self._os = OSType.FEDORA_8
308 elif out.find("Fedora release 12") == 0:
309 self._os = OSType.FEDORA_12
310 elif out.find("Fedora release 14") == 0:
311 self._os = OSType.FEDORA_14
313 msg = "Unsupported OS"
315 raise RuntimeError, "%s - %s " %( msg, out )
320 # The underlying SSH layer will sometimes return an empty
321 # output (even if the command was executed without errors).
322 # To work arround this, repeat the operation N times or
323 # until the result is not empty string
326 (out, err), proc = self.execute("cat /etc/issue",
330 trace = traceback.format_exc()
331 msg = "Error detecting OS: %s " % trace
332 self.error(msg, out, err)
338 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
342 return (self.os & OSType.FEDORA)
346 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
348 def do_provision(self):
349 # check if host is alive
350 if not self.is_alive():
351 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
353 raise RuntimeError, msg
357 if self.get("cleanProcesses"):
358 self.clean_processes()
360 if self.get("cleanHome"):
363 if self.get("cleanExperiment"):
364 self.clean_experiment()
366 # Create shared directory structure and node home directory
367 paths = [self.lib_dir,
375 # Get Public IP address if possible
376 if not self.get("ip"):
378 ip = sshfuncs.gethostbyname(self.get("hostname"))
381 if self.get("gateway") is None:
382 msg = "Local DNS can not resolve hostname %s" % self.get("hostname")
385 super(LinuxNode, self).do_provision()
388 if self.state == ResourceState.NEW:
389 self.info("Deploying node")
393 # Node needs to wait until all associated interfaces are
394 # ready before it can finalize deployment
395 from nepi.resources.linux.interface import LinuxInterface
396 ifaces = self.get_connected(LinuxInterface.get_rtype())
398 if iface.state < ResourceState.READY:
399 self.ec.schedule(self.reschedule_delay, self.deploy)
402 super(LinuxNode, self).do_deploy()
404 def do_release(self):
405 rms = self.get_connected()
407 # Node needs to wait until all associated RMs are released
408 # before it can be released
409 if rm.state != ResourceState.RELEASED:
410 self.ec.schedule(self.reschedule_delay, self.release)
413 tear_down = self.get("tearDown")
415 self.execute(tear_down)
417 if self.get("cleanProcessesAfter"):
418 self.clean_processes()
420 super(LinuxNode, self).do_release()
422 def valid_connection(self, guid):
426 def clean_processes(self):
427 self.info("Cleaning up processes")
432 if self.get("username") != 'root':
433 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
434 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
435 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
437 if self.state >= ResourceState.READY:
439 pids = pickle.load(open("/tmp/save.proc", "rb"))
441 ps_aux = "ps aux |awk '{print $2,$11}'"
442 (out, err), proc = self.execute(ps_aux)
444 for line in out.strip().split("\n"):
445 parts = line.strip().split(" ")
446 pids_temp[parts[0]] = parts[1]
447 kill_pids = set(pids_temp.items()) - set(pids.items())
448 kill_pids = ' '.join(dict(kill_pids).keys())
450 cmd = ("killall tcpdump || /bin/true ; " +
451 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
452 "kill %s || /bin/true ; " % kill_pids)
454 cmd = ("killall tcpdump || /bin/true ; " +
455 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
457 cmd = ("killall tcpdump || /bin/true ; " +
458 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
460 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
462 def clean_home(self):
463 """ Cleans all NEPI related folders in the Linux host
465 self.info("Cleaning up home")
467 cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
470 return self.execute(cmd, with_lock = True)
472 def clean_experiment(self):
473 """ Cleans all experiment related files in the Linux host.
474 It preserves NEPI files and folders that have a multi experiment
477 self.info("Cleaning up experiment files")
479 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
483 return self.execute(cmd, with_lock = True)
485 def execute(self, command,
491 connect_timeout = 30,
492 strict_host_checking = False,
497 """ Notice that this invocation will block until the
498 execution finishes. If this is not the desired behavior,
499 use 'run' instead."""
502 (out, err), proc = execfuncs.lexec(command,
503 user = self.get("username"), # still problem with localhost
508 # If the execute command is blocking, we don't want to keep
509 # the node lock. This lock is used to avoid race conditions
510 # when creating the ControlMaster sockets. A more elegant
511 # solution is needed.
512 with self._node_lock:
513 (out, err), proc = sshfuncs.rexec(
515 host = self.get("hostname"),
516 user = self.get("username"),
517 port = self.get("port"),
518 gwuser = self.get("gatewayUser"),
519 gw = self.get("gateway"),
522 identity = self.get("identity"),
523 server_key = self.get("serverKey"),
526 forward_x11 = forward_x11,
528 connect_timeout = connect_timeout,
529 persistent = persistent,
531 strict_host_checking = strict_host_checking
534 (out, err), proc = sshfuncs.rexec(
536 host = self.get("hostname"),
537 user = self.get("username"),
538 port = self.get("port"),
539 gwuser = self.get("gatewayUser"),
540 gw = self.get("gateway"),
543 identity = self.get("identity"),
544 server_key = self.get("serverKey"),
547 forward_x11 = forward_x11,
549 connect_timeout = connect_timeout,
550 persistent = persistent,
552 strict_host_checking = strict_host_checking
555 return (out, err), proc
557 def run(self, command, home,
565 strict_host_checking = False):
567 self.debug("Running command '%s'" % command)
570 (out, err), proc = execfuncs.lspawn(command, pidfile,
572 create_home = create_home,
573 stdin = stdin or '/dev/null',
574 stdout = stdout or '/dev/null',
575 stderr = stderr or '/dev/null',
578 with self._node_lock:
579 (out, err), proc = sshfuncs.rspawn(
583 create_home = create_home,
584 stdin = stdin or '/dev/null',
585 stdout = stdout or '/dev/null',
586 stderr = stderr or '/dev/null',
588 host = self.get("hostname"),
589 user = self.get("username"),
590 port = self.get("port"),
591 gwuser = self.get("gatewayUser"),
592 gw = self.get("gateway"),
594 identity = self.get("identity"),
595 server_key = self.get("serverKey"),
597 strict_host_checking = strict_host_checking
600 return (out, err), proc
602 def getpid(self, home, pidfile = "pidfile"):
604 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
606 with self._node_lock:
607 pidtuple = sshfuncs.rgetpid(
608 os.path.join(home, pidfile),
609 host = self.get("hostname"),
610 user = self.get("username"),
611 port = self.get("port"),
612 gwuser = self.get("gatewayUser"),
613 gw = self.get("gateway"),
615 identity = self.get("identity"),
616 server_key = self.get("serverKey"),
617 strict_host_checking = False
622 def status(self, pid, ppid):
624 status = execfuncs.lstatus(pid, ppid)
626 with self._node_lock:
627 status = sshfuncs.rstatus(
629 host = self.get("hostname"),
630 user = self.get("username"),
631 port = self.get("port"),
632 gwuser = self.get("gatewayUser"),
633 gw = self.get("gateway"),
635 identity = self.get("identity"),
636 server_key = self.get("serverKey"),
637 strict_host_checking = False
642 def kill(self, pid, ppid, sudo = False):
645 status = self.status(pid, ppid)
647 if status == sshfuncs.ProcStatus.RUNNING:
649 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
651 with self._node_lock:
652 (out, err), proc = sshfuncs.rkill(
654 host = self.get("hostname"),
655 user = self.get("username"),
656 port = self.get("port"),
657 gwuser = self.get("gatewayUser"),
658 gw = self.get("gateway"),
661 identity = self.get("identity"),
662 server_key = self.get("serverKey"),
663 strict_host_checking = False
666 return (out, err), proc
668 def copy(self, src, dst):
670 (out, err), proc = execfuncs.lcopy(src, dst,
673 with self._node_lock:
674 (out, err), proc = sshfuncs.rcopy(
676 port = self.get("port"),
677 gwuser = self.get("gatewayUser"),
678 gw = self.get("gateway"),
679 identity = self.get("identity"),
680 server_key = self.get("serverKey"),
682 strict_host_checking = False)
684 return (out, err), proc
686 def upload(self, src, dst, text = False, overwrite = True,
687 raise_on_error = True):
688 """ Copy content to destination
690 src string with the content to copy. Can be:
692 - a string with the path to a local file
693 - a string with a semi-colon separeted list of local files
694 - a string with a local directory
696 dst string with destination path on the remote host (remote is
699 text src is text input, it must be stored into a temp file before
702 # If source is a string input
704 if text and not os.path.isfile(src):
705 # src is text input that should be uploaded as file
706 # create a temporal file with the content to upload
707 f = tempfile.NamedTemporaryFile(delete=False)
712 # If dst files should not be overwritten, check that the files do not
714 if isinstance(src, str):
715 src = map(str.strip, src.split(";"))
717 if overwrite == False:
718 src = self.filter_existing_files(src, dst)
720 return ("", ""), None
722 if not self.localhost:
723 # Build destination as <user>@<server>:<path>
724 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
726 ((out, err), proc) = self.copy(src, dst)
733 msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst)
734 self.error(msg, out, err)
736 msg = "%s out: %s err: %s" % (msg, out, err)
738 raise RuntimeError, msg
740 return ((out, err), proc)
742 def download(self, src, dst, raise_on_error = True):
743 if not self.localhost:
744 # Build destination as <user>@<server>:<path>
745 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
747 ((out, err), proc) = self.copy(src, dst)
750 msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst)
751 self.error(msg, out, err)
754 raise RuntimeError, msg
756 return ((out, err), proc)
758 def install_packages_command(self, packages):
761 command = rpmfuncs.install_packages_command(self.os, packages)
763 command = debfuncs.install_packages_command(self.os, packages)
765 msg = "Error installing packages ( OS not known ) "
766 self.error(msg, self.os)
767 raise RuntimeError, msg
771 def install_packages(self, packages, home, run_home = None,
772 raise_on_error = True):
773 """ Install packages in the Linux host.
775 'home' is the directory to upload the package installation script.
776 'run_home' is the directory from where to execute the script.
778 command = self.install_packages_command(packages)
780 run_home = run_home or home
782 (out, err), proc = self.run_and_wait(command, run_home,
783 shfile = os.path.join(home, "instpkg.sh"),
784 pidfile = "instpkg_pidfile",
785 ecodefile = "instpkg_exitcode",
786 stdout = "instpkg_stdout",
787 stderr = "instpkg_stderr",
789 raise_on_error = raise_on_error)
791 return (out, err), proc
793 def remove_packages(self, packages, home, run_home = None,
794 raise_on_error = True):
795 """ Uninstall packages from the Linux host.
797 'home' is the directory to upload the package un-installation script.
798 'run_home' is the directory from where to execute the script.
801 command = rpmfuncs.remove_packages_command(self.os, packages)
803 command = debfuncs.remove_packages_command(self.os, packages)
805 msg = "Error removing packages ( OS not known ) "
807 raise RuntimeError, msg
809 run_home = run_home or home
811 (out, err), proc = self.run_and_wait(command, run_home,
812 shfile = os.path.join(home, "rmpkg.sh"),
813 pidfile = "rmpkg_pidfile",
814 ecodefile = "rmpkg_exitcode",
815 stdout = "rmpkg_stdout",
816 stderr = "rmpkg_stderr",
818 raise_on_error = raise_on_error)
820 return (out, err), proc
822 def mkdir(self, paths, clean = False):
823 """ Paths is either a single remote directory path to create,
824 or a list of directories to create.
829 if isinstance(paths, str):
832 cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
834 return self.execute(cmd, with_lock = True)
836 def rmdir(self, paths):
837 """ Paths is either a single remote directory path to delete,
838 or a list of directories to delete.
841 if isinstance(paths, str):
844 cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
846 return self.execute(cmd, with_lock = True)
848 def run_and_wait(self, command, home,
854 ecodefile="exitcode",
860 raise_on_error=True):
862 Uploads the 'command' to a bash script in the host.
863 Then runs the script detached in background in the host, and
864 busy-waites until the script finishes executing.
867 if not shfile.startswith("/"):
868 shfile = os.path.join(home, shfile)
870 self.upload_command(command,
872 ecodefile = ecodefile,
874 overwrite = overwrite)
876 command = "bash %s" % shfile
877 # run command in background in remote host
878 (out, err), proc = self.run(command, home,
886 # check no errors occurred
888 msg = " Failed to run command '%s' " % command
889 self.error(msg, out, err)
891 raise RuntimeError, msg
893 # Wait for pid file to be generated
894 pid, ppid = self.wait_pid(
897 raise_on_error = raise_on_error)
900 # wait until command finishes to execute
901 self.wait_run(pid, ppid)
903 (eout, err), proc = self.check_errors(home,
904 ecodefile = ecodefile,
907 # Out is what was written in the stderr file
909 msg = " Failed to run command '%s' " % command
910 self.error(msg, eout, err)
913 raise RuntimeError, msg
915 (out, oerr), proc = self.check_output(home, stdout)
917 return (out, err), proc
919 def exitcode(self, home, ecodefile = "exitcode"):
921 Get the exit code of an application.
922 Returns an integer value with the exit code
924 (out, err), proc = self.check_output(home, ecodefile)
926 # Succeeded to open file, return exit code in the file
929 return int(out.strip())
931 # Error in the content of the file!
932 return ExitCode.CORRUPTFILE
934 # No such file or directory
935 if proc.returncode == 1:
936 return ExitCode.FILENOTFOUND
938 # Other error from 'cat'
939 return ExitCode.ERROR
941 def upload_command(self, command,
943 ecodefile="exitcode",
946 """ Saves the command as a bash script file in the remote host, and
947 forces to save the exit code of the command execution to the ecodefile
950 if not (command.strip().endswith(";") or command.strip().endswith("&")):
953 # The exit code of the command will be stored in ecodefile
954 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
956 'ecodefile': ecodefile,
960 environ = self.format_environment(env)
962 # Add environ to command
963 command = environ + command
965 return self.upload(command, shfile, text=True, overwrite=overwrite)
967 def format_environment(self, env, inline=False):
968 """ Formats the environment variables for a command to be executed
969 either as an inline command
970 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
971 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
973 if not env: return ""
975 # Remove extra white spaces
976 env = re.sub(r'\s+', ' ', env.strip())
978 sep = ";" if inline else "\n"
979 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
981 def check_errors(self, home,
982 ecodefile = "exitcode",
984 """ Checks whether errors occurred while running a command.
985 It first checks the exit code for the command, and only if the
986 exit code is an error one it returns the error output.
992 # get exit code saved in the 'exitcode' file
993 ecode = self.exitcode(home, ecodefile)
995 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
996 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
997 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
998 # The process returned an error code or didn't exist.
999 # Check standard error.
1000 (err, eerr), proc = self.check_output(home, stderr)
1002 # If the stderr file was not found, assume nothing bad happened,
1003 # and just ignore the error.
1004 # (cat returns 1 for error "No such file or directory")
1005 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1008 return ("", err), proc
1010 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1011 """ Waits until the pid file for the command is generated,
1012 and returns the pid and ppid of the process """
1017 pidtuple = self.getpid(home = home, pidfile = pidfile)
1020 pid, ppid = pidtuple
1026 msg = " Failed to get pid for pidfile %s/%s " % (
1031 raise RuntimeError, msg
1035 def wait_run(self, pid, ppid, trial = 0):
1036 """ wait for a remote process to finish execution """
1040 status = self.status(pid, ppid)
1042 if status is ProcStatus.FINISHED:
1044 elif status is not ProcStatus.RUNNING:
1047 # If it takes more than 20 seconds to start, then
1048 # asume something went wrong
1052 # The app is running, just wait...
1055 def check_output(self, home, filename):
1056 """ Retrives content of file """
1057 (out, err), proc = self.execute("cat %s" %
1058 os.path.join(home, filename), retry = 1, with_lock = True)
1059 return (out, err), proc
1062 """ Checks if host is responsive
1068 msg = "Unresponsive host. Wrong answer. "
1070 # The underlying SSH layer will sometimes return an empty
1071 # output (even if the command was executed without errors).
1072 # To work arround this, repeat the operation N times or
1073 # until the result is not empty string
1075 (out, err), proc = self.execute("echo 'ALIVE'",
1079 if out.find("ALIVE") > -1:
1082 trace = traceback.format_exc()
1083 msg = "Unresponsive host. Error reaching host: %s " % trace
1085 self.error(msg, out, err)
1088 def find_home(self):
1089 """ Retrieves host home directory
1091 # The underlying SSH layer will sometimes return an empty
1092 # output (even if the command was executed without errors).
1093 # To work arround this, repeat the operation N times or
1094 # until the result is not empty string
1095 msg = "Impossible to retrieve HOME directory"
1097 (out, err), proc = self.execute("echo ${HOME}",
1101 if out.strip() != "":
1102 self._home_dir = out.strip()
1104 trace = traceback.format_exc()
1105 msg = "Impossible to retrieve HOME directory %s" % trace
1107 if not self._home_dir:
1109 raise RuntimeError, msg
1111 def filter_existing_files(self, src, dst):
1112 """ Removes files that already exist in the Linux host from src list
1114 # construct a dictionary with { dst: src }
1115 dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1116 if len(src) > 1 else dict({dst: src[0]})
1119 for d in dests.keys():
1120 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1122 command = ";".join(command)
1124 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1126 for d in dests.keys():
1127 if out.find(d) > -1:
1133 return dests.values()