2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!!
42 Error codes that the rexitcode function can return if unable to
43 check the exit code of a spawned process
52 Supported flavors of Linux OS
62 class LinuxNode(ResourceManager):
64 .. class:: Class Args :
66 :param ec: The Experiment controller
67 :type ec: ExperimentController
68 :param guid: guid of the RM
73 There are different ways in which commands can be executed using the
74 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
79 * 'execute' (blocking mode) :
81 HOW IT WORKS: 'execute', forks a process and run the
82 command, synchronously, attached to the terminal, in
84 The execute method will block until the command returns
85 the result on 'out', 'err' (so until it finishes executing).
87 USAGE: short-lived commands that must be executed attached
88 to a terminal and in foreground, for which it IS necessary
89 to block until the command has finished (e.g. if you want
90 to run 'ls' or 'cat').
92 * 'execute' (NON blocking mode - blocking = False) :
94 HOW IT WORKS: Same as before, except that execute method
95 will return immediately (even if command still running).
97 USAGE: long-lived commands that must be executed attached
98 to a terminal and in foreground, but for which it is not
99 necessary to block until the command has finished. (e.g.
100 start an application using X11 forwarding)
104 HOW IT WORKS: Connects to the host ( using SSH if remote)
105 and launches the command in background, detached from any
106 terminal (daemonized), and returns. The command continues to
107 run remotely, but since it is detached from the terminal,
108 its pipes (stdin, stdout, stderr) can't be redirected to the
109 console (as normal non detached processes would), and so they
110 are explicitly redirected to files. The pidfile is created as
111 part of the process of launching the command. The pidfile
112 holds the pid and ppid of the process forked in background,
113 so later on it is possible to check whether the command is still
116 USAGE: long-lived commands that can run detached in background,
117 for which it is NOT necessary to block (wait) until the command
118 has finished. (e.g. start an application that is not using X11
119 forwarding. It can run detached and remotely in background)
123 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124 the command has finished execution. It also checks whether
125 errors occurred during runtime by reading the exitcode file,
126 which contains the exit code of the command that was run
127 (checking stderr only is not always reliable since many
128 commands throw debugging info to stderr and the only way to
129 automatically know whether an error really happened is to
130 check the process exit code).
132 Another difference with respect to 'run', is that instead
133 of directly executing the command as a bash command line,
134 it uploads the command to a bash script and runs the script.
135 This allows to use the bash script to debug errors, since
136 it remains at the remote host and can be run manually to
139 USAGE: medium-lived commands that can run detached in
140 background, for which it IS necessary to block (wait) until
141 the command has finished. (e.g. Package installation,
142 source compilation, file download, etc)
146 _help = "Controls Linux host machines ( either localhost or a host " \
147 "that can be accessed using a SSH key)"
148 _backend_type = "linux"
151 def _register_attributes(cls):
152 hostname = Attribute("hostname", "Hostname of the machine",
153 flags = Flags.Design)
155 username = Attribute("username", "Local account username",
156 flags = Flags.Credential)
158 port = Attribute("port", "SSH port", flags = Flags.Design)
160 home = Attribute("home",
161 "Experiment home directory to store all experiment related files",
162 flags = Flags.Design)
164 identity = Attribute("identity", "SSH identity file",
165 flags = Flags.Credential)
167 server_key = Attribute("serverKey", "Server public key",
168 flags = Flags.Design)
170 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171 " from node home folder before starting experiment",
174 flags = Flags.Design)
176 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
177 " from a previous same experiment, before the new experiment starts",
180 flags = Flags.Design)
182 clean_processes = Attribute("cleanProcesses",
183 "Kill all running processes before starting experiment",
186 flags = Flags.Design)
188 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189 "releasing the resource",
190 flags = Flags.Design)
192 gateway_user = Attribute("gatewayUser", "Gateway account username",
193 flags = Flags.Design)
195 gateway = Attribute("gateway", "Hostname of the gateway machine",
196 flags = Flags.Design)
198 ip = Attribute("ip", "Linux host public IP address. "
199 "Must not be modified by the user unless hostname is 'localhost'",
200 flags = Flags.Design)
202 cls._register_attribute(hostname)
203 cls._register_attribute(username)
204 cls._register_attribute(port)
205 cls._register_attribute(home)
206 cls._register_attribute(identity)
207 cls._register_attribute(server_key)
208 cls._register_attribute(clean_home)
209 cls._register_attribute(clean_experiment)
210 cls._register_attribute(clean_processes)
211 cls._register_attribute(tear_down)
212 cls._register_attribute(gateway_user)
213 cls._register_attribute(gateway)
214 cls._register_attribute(ip)
216 def __init__(self, ec, guid):
217 super(LinuxNode, self).__init__(ec, guid)
219 # home directory at Linux host
222 # lock to prevent concurrent applications on the same node,
223 # to execute commands at the same time. There are potential
224 # concurrency issues when using SSH to a same host from
225 # multiple threads. There are also possible operational
226 # issues, e.g. an application querying the existence
227 # of a file or folder prior to its creation, and another
228 # application creating the same file or folder in between.
229 self._node_lock = threading.Lock()
231 def log_message(self, msg):
232 return " guid %d - host %s - %s " % (self.guid,
233 self.get("hostname"), msg)
237 home = self.get("home") or ""
238 if not home.startswith("/"):
239 home = os.path.join(self._home_dir, home)
244 return os.path.join(self.home_dir, ".nepi")
248 return os.path.join(self.nepi_home, "nepi-usr")
252 return os.path.join(self.usr_dir, "lib")
256 return os.path.join(self.usr_dir, "bin")
260 return os.path.join(self.usr_dir, "src")
264 return os.path.join(self.usr_dir, "share")
268 return os.path.join(self.nepi_home, "nepi-exp")
272 return os.path.join(self.exp_dir, self.ec.exp_id)
276 return os.path.join(self.exp_home, "node-%d" % self.guid)
280 return os.path.join(self.node_home, self.ec.run_id)
287 if not self.localhost and not self.get("username"):
288 msg = "Can't resolve OS, insufficient data "
290 raise RuntimeError, msg
294 if out.find("Fedora release 8") == 0:
295 self._os = OSType.FEDORA_8
296 elif out.find("Fedora release 12") == 0:
297 self._os = OSType.FEDORA_12
298 elif out.find("Fedora release 14") == 0:
299 self._os = OSType.FEDORA_14
300 elif out.find("Fedora release") == 0:
301 self._os = OSType.FEDORA
302 elif out.find("Debian") == 0:
303 self._os = OSType.DEBIAN
304 elif out.find("Ubuntu") ==0:
305 self._os = OSType.UBUNTU
307 msg = "Unsupported OS"
309 raise RuntimeError, "%s - %s " %( msg, out )
314 # The underlying SSH layer will sometimes return an empty
315 # output (even if the command was executed without errors).
316 # To work arround this, repeat the operation N times or
317 # until the result is not empty string
320 (out, err), proc = self.execute("cat /etc/issue",
324 trace = traceback.format_exc()
325 msg = "Error detecting OS: %s " % trace
326 self.error(msg, out, err)
332 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
336 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
341 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
343 def do_provision(self):
344 # check if host is alive
345 if not self.is_alive():
346 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
348 raise RuntimeError, msg
352 if self.get("cleanProcesses"):
353 self.clean_processes()
355 if self.get("cleanHome"):
358 if self.get("cleanExperiment"):
359 self.clean_experiment()
361 # Create shared directory structure and node home directory
362 paths = [self.lib_dir,
370 # Get Public IP address if possible
371 if not self.get("ip"):
375 ip = socket.gethostbyname(socket.gethostname())
378 ip = socket.gethostbyname(self.get("hostname"))
380 msg = "DNS can not resolve hostname %s" % self.get("hostname")
385 super(LinuxNode, self).do_provision()
388 if self.state == ResourceState.NEW:
389 self.info("Deploying node")
393 # Node needs to wait until all associated interfaces are
394 # ready before it can finalize deployment
395 from nepi.resources.linux.interface import LinuxInterface
396 ifaces = self.get_connected(LinuxInterface.get_rtype())
398 if iface.state < ResourceState.READY:
399 self.ec.schedule(reschedule_delay, self.deploy)
402 super(LinuxNode, self).do_deploy()
404 def do_release(self):
405 rms = self.get_connected()
407 # Node needs to wait until all associated RMs are released
408 # before it can be released
409 if rm.state != ResourceState.RELEASED:
410 self.ec.schedule(reschedule_delay, self.release)
413 tear_down = self.get("tearDown")
415 self.execute(tear_down)
417 self.clean_processes()
419 super(LinuxNode, self).do_release()
421 def valid_connection(self, guid):
425 def clean_processes(self):
426 self.info("Cleaning up processes")
431 if self.get("username") != 'root':
432 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
433 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
435 if self.state >= ResourceState.READY:
437 pids = pickle.load(open("/tmp/save.proc", "rb"))
439 ps_aux = "ps aux |awk '{print $2,$11}'"
440 (out, err), proc = self.execute(ps_aux)
442 for line in out.strip().split("\n"):
443 parts = line.strip().split(" ")
444 pids_temp[parts[0]] = parts[1]
445 kill_pids = set(pids_temp.items()) - set(pids.items())
446 kill_pids = ' '.join(dict(kill_pids).keys())
448 cmd = ("killall tcpdump || /bin/true ; " +
449 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
450 "kill %s || /bin/true ; " % kill_pids)
452 cmd = ("killall tcpdump || /bin/true ; " +
453 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
455 cmd = ("killall tcpdump || /bin/true ; " +
456 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
458 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
460 def clean_home(self):
461 """ Cleans all NEPI related folders in the Linux host
463 self.info("Cleaning up home")
465 cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
468 return self.execute(cmd, with_lock = True)
470 def clean_experiment(self):
471 """ Cleans all experiment related files in the Linux host.
472 It preserves NEPI files and folders that have a multi experiment
475 self.info("Cleaning up experiment files")
477 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
481 return self.execute(cmd, with_lock = True)
483 def execute(self, command,
489 connect_timeout = 30,
490 strict_host_checking = False,
495 """ Notice that this invocation will block until the
496 execution finishes. If this is not the desired behavior,
497 use 'run' instead."""
500 (out, err), proc = execfuncs.lexec(command,
501 user = self.get("username"), # still problem with localhost
506 # If the execute command is blocking, we don't want to keep
507 # the node lock. This lock is used to avoid race conditions
508 # when creating the ControlMaster sockets. A more elegant
509 # solution is needed.
510 with self._node_lock:
511 (out, err), proc = sshfuncs.rexec(
513 host = self.get("hostname"),
514 user = self.get("username"),
515 port = self.get("port"),
516 gwuser = self.get("gatewayUser"),
517 gw = self.get("gateway"),
520 identity = self.get("identity"),
521 server_key = self.get("serverKey"),
524 forward_x11 = forward_x11,
526 connect_timeout = connect_timeout,
527 persistent = persistent,
529 strict_host_checking = strict_host_checking
532 (out, err), proc = sshfuncs.rexec(
534 host = self.get("hostname"),
535 user = self.get("username"),
536 port = self.get("port"),
537 gwuser = self.get("gatewayUser"),
538 gw = self.get("gateway"),
541 identity = self.get("identity"),
542 server_key = self.get("serverKey"),
545 forward_x11 = forward_x11,
547 connect_timeout = connect_timeout,
548 persistent = persistent,
550 strict_host_checking = strict_host_checking
553 return (out, err), proc
555 def run(self, command, home,
563 strict_host_checking = False):
565 self.debug("Running command '%s'" % command)
568 (out, err), proc = execfuncs.lspawn(command, pidfile,
570 create_home = create_home,
571 stdin = stdin or '/dev/null',
572 stdout = stdout or '/dev/null',
573 stderr = stderr or '/dev/null',
576 with self._node_lock:
577 (out, err), proc = sshfuncs.rspawn(
581 create_home = create_home,
582 stdin = stdin or '/dev/null',
583 stdout = stdout or '/dev/null',
584 stderr = stderr or '/dev/null',
586 host = self.get("hostname"),
587 user = self.get("username"),
588 port = self.get("port"),
589 gwuser = self.get("gatewayUser"),
590 gw = self.get("gateway"),
592 identity = self.get("identity"),
593 server_key = self.get("serverKey"),
595 strict_host_checking = strict_host_checking
598 return (out, err), proc
600 def getpid(self, home, pidfile = "pidfile"):
602 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
604 with self._node_lock:
605 pidtuple = sshfuncs.rgetpid(
606 os.path.join(home, pidfile),
607 host = self.get("hostname"),
608 user = self.get("username"),
609 port = self.get("port"),
610 gwuser = self.get("gatewayUser"),
611 gw = self.get("gateway"),
613 identity = self.get("identity"),
614 server_key = self.get("serverKey"),
615 strict_host_checking = False
620 def status(self, pid, ppid):
622 status = execfuncs.lstatus(pid, ppid)
624 with self._node_lock:
625 status = sshfuncs.rstatus(
627 host = self.get("hostname"),
628 user = self.get("username"),
629 port = self.get("port"),
630 gwuser = self.get("gatewayUser"),
631 gw = self.get("gateway"),
633 identity = self.get("identity"),
634 server_key = self.get("serverKey"),
635 strict_host_checking = False
640 def kill(self, pid, ppid, sudo = False):
643 status = self.status(pid, ppid)
645 if status == sshfuncs.ProcStatus.RUNNING:
647 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
649 with self._node_lock:
650 (out, err), proc = sshfuncs.rkill(
652 host = self.get("hostname"),
653 user = self.get("username"),
654 port = self.get("port"),
655 gwuser = self.get("gatewayUser"),
656 gw = self.get("gateway"),
659 identity = self.get("identity"),
660 server_key = self.get("serverKey"),
661 strict_host_checking = False
664 return (out, err), proc
666 def copy(self, src, dst):
668 (out, err), proc = execfuncs.lcopy(src, dst,
671 with self._node_lock:
672 (out, err), proc = sshfuncs.rcopy(
674 port = self.get("port"),
675 gwuser = self.get("gatewayUser"),
676 gw = self.get("gateway"),
677 identity = self.get("identity"),
678 server_key = self.get("serverKey"),
680 strict_host_checking = False)
682 return (out, err), proc
684 def upload(self, src, dst, text = False, overwrite = True,
685 raise_on_error = True):
686 """ Copy content to destination
688 src string with the content to copy. Can be:
690 - a string with the path to a local file
691 - a string with a semi-colon separeted list of local files
692 - a string with a local directory
694 dst string with destination path on the remote host (remote is
697 text src is text input, it must be stored into a temp file before
700 # If source is a string input
702 if text and not os.path.isfile(src):
703 # src is text input that should be uploaded as file
704 # create a temporal file with the content to upload
705 f = tempfile.NamedTemporaryFile(delete=False)
710 # If dst files should not be overwritten, check that the files do not
712 if isinstance(src, str):
713 src = map(str.strip, src.split(";"))
715 if overwrite == False:
716 src = self.filter_existing_files(src, dst)
718 return ("", ""), None
720 if not self.localhost:
721 # Build destination as <user>@<server>:<path>
722 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
724 ((out, err), proc) = self.copy(src, dst)
731 msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst)
732 self.error(msg, out, err)
734 msg = "%s out: %s err: %s" % (msg, out, err)
736 raise RuntimeError, msg
738 return ((out, err), proc)
740 def download(self, src, dst, raise_on_error = True):
741 if not self.localhost:
742 # Build destination as <user>@<server>:<path>
743 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
745 ((out, err), proc) = self.copy(src, dst)
748 msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst)
749 self.error(msg, out, err)
752 raise RuntimeError, msg
754 return ((out, err), proc)
756 def install_packages_command(self, packages):
759 command = rpmfuncs.install_packages_command(self.os, packages)
761 command = debfuncs.install_packages_command(self.os, packages)
763 msg = "Error installing packages ( OS not known ) "
764 self.error(msg, self.os)
765 raise RuntimeError, msg
769 def install_packages(self, packages, home, run_home = None,
770 raise_on_error = True):
771 """ Install packages in the Linux host.
773 'home' is the directory to upload the package installation script.
774 'run_home' is the directory from where to execute the script.
776 command = self.install_packages_command(packages)
778 run_home = run_home or home
780 (out, err), proc = self.run_and_wait(command, run_home,
781 shfile = os.path.join(home, "instpkg.sh"),
782 pidfile = "instpkg_pidfile",
783 ecodefile = "instpkg_exitcode",
784 stdout = "instpkg_stdout",
785 stderr = "instpkg_stderr",
787 raise_on_error = raise_on_error)
789 return (out, err), proc
791 def remove_packages(self, packages, home, run_home = None,
792 raise_on_error = True):
793 """ Uninstall packages from the Linux host.
795 'home' is the directory to upload the package un-installation script.
796 'run_home' is the directory from where to execute the script.
799 command = rpmfuncs.remove_packages_command(self.os, packages)
801 command = debfuncs.remove_packages_command(self.os, packages)
803 msg = "Error removing packages ( OS not known ) "
805 raise RuntimeError, msg
807 run_home = run_home or home
809 (out, err), proc = self.run_and_wait(command, run_home,
810 shfile = os.path.join(home, "rmpkg.sh"),
811 pidfile = "rmpkg_pidfile",
812 ecodefile = "rmpkg_exitcode",
813 stdout = "rmpkg_stdout",
814 stderr = "rmpkg_stderr",
816 raise_on_error = raise_on_error)
818 return (out, err), proc
820 def mkdir(self, paths, clean = False):
821 """ Paths is either a single remote directory path to create,
822 or a list of directories to create.
827 if isinstance(paths, str):
830 cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
832 return self.execute(cmd, with_lock = True)
834 def rmdir(self, paths):
835 """ Paths is either a single remote directory path to delete,
836 or a list of directories to delete.
839 if isinstance(paths, str):
842 cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
844 return self.execute(cmd, with_lock = True)
846 def run_and_wait(self, command, home,
851 ecodefile = "exitcode",
857 raise_on_error = True):
859 Uploads the 'command' to a bash script in the host.
860 Then runs the script detached in background in the host, and
861 busy-waites until the script finishes executing.
864 if not shfile.startswith("/"):
865 shfile = os.path.join(home, shfile)
867 self.upload_command(command,
869 ecodefile = ecodefile,
871 overwrite = overwrite)
873 command = "bash %s" % shfile
874 # run command in background in remote host
875 (out, err), proc = self.run(command, home,
883 # check no errors occurred
885 msg = " Failed to run command '%s' " % command
886 self.error(msg, out, err)
888 raise RuntimeError, msg
890 # Wait for pid file to be generated
891 pid, ppid = self.wait_pid(
894 raise_on_error = raise_on_error)
896 # wait until command finishes to execute
897 self.wait_run(pid, ppid)
899 (eout, err), proc = self.check_errors(home,
900 ecodefile = ecodefile,
903 # Out is what was written in the stderr file
905 msg = " Failed to run command '%s' " % command
906 self.error(msg, eout, err)
909 raise RuntimeError, msg
911 (out, oerr), proc = self.check_output(home, stdout)
913 return (out, err), proc
915 def exitcode(self, home, ecodefile = "exitcode"):
917 Get the exit code of an application.
918 Returns an integer value with the exit code
920 (out, err), proc = self.check_output(home, ecodefile)
922 # Succeeded to open file, return exit code in the file
925 return int(out.strip())
927 # Error in the content of the file!
928 return ExitCode.CORRUPTFILE
930 # No such file or directory
931 if proc.returncode == 1:
932 return ExitCode.FILENOTFOUND
934 # Other error from 'cat'
935 return ExitCode.ERROR
937 def upload_command(self, command,
939 ecodefile = "exitcode",
942 """ Saves the command as a bash script file in the remote host, and
943 forces to save the exit code of the command execution to the ecodefile
946 if not (command.strip().endswith(";") or command.strip().endswith("&")):
949 # The exit code of the command will be stored in ecodefile
950 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
952 'ecodefile': ecodefile,
956 environ = self.format_environment(env)
958 # Add environ to command
959 command = environ + command
961 return self.upload(command, shfile, text = True, overwrite = overwrite)
963 def format_environment(self, env, inline = False):
964 """ Formats the environment variables for a command to be executed
965 either as an inline command
966 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
967 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
969 if not env: return ""
971 # Remove extra white spaces
972 env = re.sub(r'\s+', ' ', env.strip())
974 sep = ";" if inline else "\n"
975 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
977 def check_errors(self, home,
978 ecodefile = "exitcode",
980 """ Checks whether errors occurred while running a command.
981 It first checks the exit code for the command, and only if the
982 exit code is an error one it returns the error output.
988 # get exit code saved in the 'exitcode' file
989 ecode = self.exitcode(home, ecodefile)
991 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
992 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
993 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
994 # The process returned an error code or didn't exist.
995 # Check standard error.
996 (err, eerr), proc = self.check_output(home, stderr)
998 # If the stderr file was not found, assume nothing bad happened,
999 # and just ignore the error.
1000 # (cat returns 1 for error "No such file or directory")
1001 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1004 return ("", err), proc
1006 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1007 """ Waits until the pid file for the command is generated,
1008 and returns the pid and ppid of the process """
1013 pidtuple = self.getpid(home = home, pidfile = pidfile)
1016 pid, ppid = pidtuple
1022 msg = " Failed to get pid for pidfile %s/%s " % (
1027 raise RuntimeError, msg
1031 def wait_run(self, pid, ppid, trial = 0):
1032 """ wait for a remote process to finish execution """
1036 status = self.status(pid, ppid)
1038 if status is ProcStatus.FINISHED:
1040 elif status is not ProcStatus.RUNNING:
1043 # If it takes more than 20 seconds to start, then
1044 # asume something went wrong
1048 # The app is running, just wait...
1051 def check_output(self, home, filename):
1052 """ Retrives content of file """
1053 (out, err), proc = self.execute("cat %s" %
1054 os.path.join(home, filename), retry = 1, with_lock = True)
1055 return (out, err), proc
1058 """ Checks if host is responsive
1064 msg = "Unresponsive host. Wrong answer. "
1066 # The underlying SSH layer will sometimes return an empty
1067 # output (even if the command was executed without errors).
1068 # To work arround this, repeat the operation N times or
1069 # until the result is not empty string
1071 (out, err), proc = self.execute("echo 'ALIVE'",
1075 if out.find("ALIVE") > -1:
1078 trace = traceback.format_exc()
1079 msg = "Unresponsive host. Error reaching host: %s " % trace
1081 self.error(msg, out, err)
1084 def find_home(self):
1085 """ Retrieves host home directory
1087 # The underlying SSH layer will sometimes return an empty
1088 # output (even if the command was executed without errors).
1089 # To work arround this, repeat the operation N times or
1090 # until the result is not empty string
1091 msg = "Impossible to retrieve HOME directory"
1093 (out, err), proc = self.execute("echo ${HOME}",
1097 if out.strip() != "":
1098 self._home_dir = out.strip()
1100 trace = traceback.format_exc()
1101 msg = "Impossible to retrieve HOME directory %s" % trace
1103 if not self._home_dir:
1105 raise RuntimeError, msg
1107 def filter_existing_files(self, src, dst):
1108 """ Removes files that already exist in the Linux host from src list
1110 # construct a dictionary with { dst: src }
1111 dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1112 if len(src) > 1 else dict({dst: src[0]})
1115 for d in dests.keys():
1116 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1118 command = ";".join(command)
1120 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1122 for d in dests.keys():
1123 if out.find(d) > -1:
1129 return dests.values()