2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!!
42 Error codes that the rexitcode function can return if unable to
43 check the exit code of a spawned process
52 Supported flavors of Linux OS
62 class LinuxNode(ResourceManager):
64 .. class:: Class Args :
66 :param ec: The Experiment controller
67 :type ec: ExperimentController
68 :param guid: guid of the RM
73 There are different ways in which commands can be executed using the
74 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
79 * 'execute' (blocking mode) :
81 HOW IT WORKS: 'execute', forks a process and run the
82 command, synchronously, attached to the terminal, in
84 The execute method will block until the command returns
85 the result on 'out', 'err' (so until it finishes executing).
87 USAGE: short-lived commands that must be executed attached
88 to a terminal and in foreground, for which it IS necessary
89 to block until the command has finished (e.g. if you want
90 to run 'ls' or 'cat').
92 * 'execute' (NON blocking mode - blocking = False) :
94 HOW IT WORKS: Same as before, except that execute method
95 will return immediately (even if command still running).
97 USAGE: long-lived commands that must be executed attached
98 to a terminal and in foreground, but for which it is not
99 necessary to block until the command has finished. (e.g.
100 start an application using X11 forwarding)
104 HOW IT WORKS: Connects to the host ( using SSH if remote)
105 and launches the command in background, detached from any
106 terminal (daemonized), and returns. The command continues to
107 run remotely, but since it is detached from the terminal,
108 its pipes (stdin, stdout, stderr) can't be redirected to the
109 console (as normal non detached processes would), and so they
110 are explicitly redirected to files. The pidfile is created as
111 part of the process of launching the command. The pidfile
112 holds the pid and ppid of the process forked in background,
113 so later on it is possible to check whether the command is still
116 USAGE: long-lived commands that can run detached in background,
117 for which it is NOT necessary to block (wait) until the command
118 has finished. (e.g. start an application that is not using X11
119 forwarding. It can run detached and remotely in background)
123 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124 the command has finished execution. It also checks whether
125 errors occurred during runtime by reading the exitcode file,
126 which contains the exit code of the command that was run
127 (checking stderr only is not always reliable since many
128 commands throw debugging info to stderr and the only way to
129 automatically know whether an error really happened is to
130 check the process exit code).
132 Another difference with respect to 'run', is that instead
133 of directly executing the command as a bash command line,
134 it uploads the command to a bash script and runs the script.
135 This allows to use the bash script to debug errors, since
136 it remains at the remote host and can be run manually to
139 USAGE: medium-lived commands that can run detached in
140 background, for which it IS necessary to block (wait) until
141 the command has finished. (e.g. Package installation,
142 source compilation, file download, etc)
146 _help = "Controls Linux host machines ( either localhost or a host " \
147 "that can be accessed using a SSH key)"
148 _backend_type = "linux"
151 def _register_attributes(cls):
152 hostname = Attribute("hostname", "Hostname of the machine",
153 flags = Flags.Design)
155 username = Attribute("username", "Local account username",
156 flags = Flags.Credential)
158 port = Attribute("port", "SSH port", flags = Flags.Design)
160 home = Attribute("home",
161 "Experiment home directory to store all experiment related files",
162 flags = Flags.Design)
164 identity = Attribute("identity", "SSH identity file",
165 flags = Flags.Credential)
167 server_key = Attribute("serverKey", "Server public key",
168 flags = Flags.Design)
170 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
171 " from node home folder before starting experiment",
174 flags = Flags.Design)
176 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
177 " from a previous same experiment, before the new experiment starts",
180 flags = Flags.Design)
182 clean_processes = Attribute("cleanProcesses",
183 "Kill all running processes before starting experiment",
186 flags = Flags.Design)
188 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
189 "releasing the resource",
190 flags = Flags.Design)
192 gateway_user = Attribute("gatewayUser", "Gateway account username",
193 flags = Flags.Design)
195 gateway = Attribute("gateway", "Hostname of the gateway machine",
196 flags = Flags.Design)
198 ip = Attribute("ip", "Linux host public IP address",
199 flags = Flags.NoWrite)
201 cls._register_attribute(hostname)
202 cls._register_attribute(username)
203 cls._register_attribute(port)
204 cls._register_attribute(home)
205 cls._register_attribute(identity)
206 cls._register_attribute(server_key)
207 cls._register_attribute(clean_home)
208 cls._register_attribute(clean_experiment)
209 cls._register_attribute(clean_processes)
210 cls._register_attribute(tear_down)
211 cls._register_attribute(gateway_user)
212 cls._register_attribute(gateway)
213 cls._register_attribute(ip)
215 def __init__(self, ec, guid):
216 super(LinuxNode, self).__init__(ec, guid)
218 # home directory at Linux host
221 # lock to prevent concurrent applications on the same node,
222 # to execute commands at the same time. There are potential
223 # concurrency issues when using SSH to a same host from
224 # multiple threads. There are also possible operational
225 # issues, e.g. an application querying the existence
226 # of a file or folder prior to its creation, and another
227 # application creating the same file or folder in between.
228 self._node_lock = threading.Lock()
230 def log_message(self, msg):
231 return " guid %d - host %s - %s " % (self.guid,
232 self.get("hostname"), msg)
236 home = self.get("home") or ""
237 if not home.startswith("/"):
238 home = os.path.join(self._home_dir, home)
243 return os.path.join(self.home_dir, ".nepi")
247 return os.path.join(self.nepi_home, "nepi-usr")
251 return os.path.join(self.usr_dir, "lib")
255 return os.path.join(self.usr_dir, "bin")
259 return os.path.join(self.usr_dir, "src")
263 return os.path.join(self.usr_dir, "share")
267 return os.path.join(self.nepi_home, "nepi-exp")
271 return os.path.join(self.exp_dir, self.ec.exp_id)
275 return os.path.join(self.exp_home, "node-%d" % self.guid)
279 return os.path.join(self.node_home, self.ec.run_id)
286 if not self.localhost and not self.get("username"):
287 msg = "Can't resolve OS, insufficient data "
289 raise RuntimeError, msg
293 if out.find("Fedora release 8") == 0:
294 self._os = OSType.FEDORA_8
295 elif out.find("Fedora release 12") == 0:
296 self._os = OSType.FEDORA_12
297 elif out.find("Fedora release 14") == 0:
298 self._os = OSType.FEDORA_14
299 elif out.find("Fedora release") == 0:
300 self._os = OSType.FEDORA
301 elif out.find("Debian") == 0:
302 self._os = OSType.DEBIAN
303 elif out.find("Ubuntu") ==0:
304 self._os = OSType.UBUNTU
306 msg = "Unsupported OS"
308 raise RuntimeError, "%s - %s " %( msg, out )
313 # The underlying SSH layer will sometimes return an empty
314 # output (even if the command was executed without errors).
315 # To work arround this, repeat the operation N times or
316 # until the result is not empty string
319 (out, err), proc = self.execute("cat /etc/issue",
323 trace = traceback.format_exc()
324 msg = "Error detecting OS: %s " % trace
325 self.error(msg, out, err)
331 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
335 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
340 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
342 def do_provision(self):
343 # check if host is alive
344 if not self.is_alive():
345 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
347 raise RuntimeError, msg
351 if self.get("cleanProcesses"):
352 self.clean_processes()
354 if self.get("cleanHome"):
357 if self.get("cleanExperiment"):
358 self.clean_experiment()
360 # Create shared directory structure and node home directory
361 paths = [self.lib_dir,
369 # Get Public IP address
371 ip = socket.gethostbyname(socket.gethostname())
373 ip = socket.gethostbyname(self.get("hostname"))
377 super(LinuxNode, self).do_provision()
380 if self.state == ResourceState.NEW:
381 self.info("Deploying node")
385 # Node needs to wait until all associated interfaces are
386 # ready before it can finalize deployment
387 from nepi.resources.linux.interface import LinuxInterface
388 ifaces = self.get_connected(LinuxInterface.get_rtype())
390 if iface.state < ResourceState.READY:
391 self.ec.schedule(reschedule_delay, self.deploy)
394 super(LinuxNode, self).do_deploy()
396 def do_release(self):
397 rms = self.get_connected()
399 # Node needs to wait until all associated RMs are released
400 # before it can be released
401 if rm.state != ResourceState.RELEASED:
402 self.ec.schedule(reschedule_delay, self.release)
405 tear_down = self.get("tearDown")
407 self.execute(tear_down)
409 self.clean_processes()
411 super(LinuxNode, self).do_release()
413 def valid_connection(self, guid):
417 def clean_processes(self):
418 self.info("Cleaning up processes")
423 if self.get("username") != 'root':
424 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
425 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
427 if self.state >= ResourceState.READY:
429 pids = pickle.load(open("/tmp/save.proc", "rb"))
431 ps_aux = "ps aux |awk '{print $2,$11}'"
432 (out, err), proc = self.execute(ps_aux)
434 for line in out.strip().split("\n"):
435 parts = line.strip().split(" ")
436 pids_temp[parts[0]] = parts[1]
437 kill_pids = set(pids_temp.items()) - set(pids.items())
438 kill_pids = ' '.join(dict(kill_pids).keys())
440 cmd = ("killall tcpdump || /bin/true ; " +
441 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
442 "kill %s || /bin/true ; " % kill_pids)
444 cmd = ("killall tcpdump || /bin/true ; " +
445 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
447 cmd = ("killall tcpdump || /bin/true ; " +
448 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
450 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
452 def clean_home(self):
453 """ Cleans all NEPI related folders in the Linux host
455 self.info("Cleaning up home")
457 cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
460 return self.execute(cmd, with_lock = True)
462 def clean_experiment(self):
463 """ Cleans all experiment related files in the Linux host.
464 It preserves NEPI files and folders that have a multi experiment
467 self.info("Cleaning up experiment files")
469 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
473 return self.execute(cmd, with_lock = True)
475 def execute(self, command,
481 connect_timeout = 30,
482 strict_host_checking = False,
487 """ Notice that this invocation will block until the
488 execution finishes. If this is not the desired behavior,
489 use 'run' instead."""
492 (out, err), proc = execfuncs.lexec(command,
493 user = self.get("username"), # still problem with localhost
498 # If the execute command is blocking, we don't want to keep
499 # the node lock. This lock is used to avoid race conditions
500 # when creating the ControlMaster sockets. A more elegant
501 # solution is needed.
502 with self._node_lock:
503 (out, err), proc = sshfuncs.rexec(
505 host = self.get("hostname"),
506 user = self.get("username"),
507 port = self.get("port"),
508 gwuser = self.get("gatewayUser"),
509 gw = self.get("gateway"),
512 identity = self.get("identity"),
513 server_key = self.get("serverKey"),
516 forward_x11 = forward_x11,
518 connect_timeout = connect_timeout,
519 persistent = persistent,
521 strict_host_checking = strict_host_checking
524 (out, err), proc = sshfuncs.rexec(
526 host = self.get("hostname"),
527 user = self.get("username"),
528 port = self.get("port"),
529 gwuser = self.get("gatewayUser"),
530 gw = self.get("gateway"),
533 identity = self.get("identity"),
534 server_key = self.get("serverKey"),
537 forward_x11 = forward_x11,
539 connect_timeout = connect_timeout,
540 persistent = persistent,
542 strict_host_checking = strict_host_checking
545 return (out, err), proc
547 def run(self, command, home,
556 self.debug("Running command '%s'" % command)
559 (out, err), proc = execfuncs.lspawn(command, pidfile,
561 create_home = create_home,
562 stdin = stdin or '/dev/null',
563 stdout = stdout or '/dev/null',
564 stderr = stderr or '/dev/null',
567 with self._node_lock:
568 (out, err), proc = sshfuncs.rspawn(
572 create_home = create_home,
573 stdin = stdin or '/dev/null',
574 stdout = stdout or '/dev/null',
575 stderr = stderr or '/dev/null',
577 host = self.get("hostname"),
578 user = self.get("username"),
579 port = self.get("port"),
580 gwuser = self.get("gatewayUser"),
581 gw = self.get("gateway"),
583 identity = self.get("identity"),
584 server_key = self.get("serverKey"),
588 return (out, err), proc
590 def getpid(self, home, pidfile = "pidfile"):
592 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
594 with self._node_lock:
595 pidtuple = sshfuncs.rgetpid(
596 os.path.join(home, pidfile),
597 host = self.get("hostname"),
598 user = self.get("username"),
599 port = self.get("port"),
600 gwuser = self.get("gatewayUser"),
601 gw = self.get("gateway"),
603 identity = self.get("identity"),
604 server_key = self.get("serverKey")
609 def status(self, pid, ppid):
611 status = execfuncs.lstatus(pid, ppid)
613 with self._node_lock:
614 status = sshfuncs.rstatus(
616 host = self.get("hostname"),
617 user = self.get("username"),
618 port = self.get("port"),
619 gwuser = self.get("gatewayUser"),
620 gw = self.get("gateway"),
622 identity = self.get("identity"),
623 server_key = self.get("serverKey")
628 def kill(self, pid, ppid, sudo = False):
631 status = self.status(pid, ppid)
633 if status == sshfuncs.ProcStatus.RUNNING:
635 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
637 with self._node_lock:
638 (out, err), proc = sshfuncs.rkill(
640 host = self.get("hostname"),
641 user = self.get("username"),
642 port = self.get("port"),
643 gwuser = self.get("gatewayUser"),
644 gw = self.get("gateway"),
647 identity = self.get("identity"),
648 server_key = self.get("serverKey")
651 return (out, err), proc
653 def copy(self, src, dst):
655 (out, err), proc = execfuncs.lcopy(src, dst,
658 with self._node_lock:
659 (out, err), proc = sshfuncs.rcopy(
661 port = self.get("port"),
662 gwuser = self.get("gatewayUser"),
663 gw = self.get("gateway"),
664 identity = self.get("identity"),
665 server_key = self.get("serverKey"),
667 strict_host_checking = False)
669 return (out, err), proc
671 def upload(self, src, dst, text = False, overwrite = True,
672 raise_on_error = True):
673 """ Copy content to destination
675 src string with the content to copy. Can be:
677 - a string with the path to a local file
678 - a string with a semi-colon separeted list of local files
679 - a string with a local directory
681 dst string with destination path on the remote host (remote is
684 text src is text input, it must be stored into a temp file before
687 # If source is a string input
689 if text and not os.path.isfile(src):
690 # src is text input that should be uploaded as file
691 # create a temporal file with the content to upload
692 f = tempfile.NamedTemporaryFile(delete=False)
697 # If dst files should not be overwritten, check that the files do not
699 if isinstance(src, str):
700 src = map(str.strip, src.split(";"))
702 if overwrite == False:
703 src = self.filter_existing_files(src, dst)
705 return ("", ""), None
707 if not self.localhost:
708 # Build destination as <user>@<server>:<path>
709 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
711 ((out, err), proc) = self.copy(src, dst)
718 msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst)
719 self.error(msg, out, err)
721 msg = "%s out: %s err: %s" % (msg, out, err)
723 raise RuntimeError, msg
725 return ((out, err), proc)
727 def download(self, src, dst, raise_on_error = True):
728 if not self.localhost:
729 # Build destination as <user>@<server>:<path>
730 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
732 ((out, err), proc) = self.copy(src, dst)
735 msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst)
736 self.error(msg, out, err)
739 raise RuntimeError, msg
741 return ((out, err), proc)
743 def install_packages_command(self, packages):
746 command = rpmfuncs.install_packages_command(self.os, packages)
748 command = debfuncs.install_packages_command(self.os, packages)
750 msg = "Error installing packages ( OS not known ) "
751 self.error(msg, self.os)
752 raise RuntimeError, msg
756 def install_packages(self, packages, home, run_home = None,
757 raise_on_error = True):
758 """ Install packages in the Linux host.
760 'home' is the directory to upload the package installation script.
761 'run_home' is the directory from where to execute the script.
763 command = self.install_packages_command(packages)
765 run_home = run_home or home
767 (out, err), proc = self.run_and_wait(command, run_home,
768 shfile = os.path.join(home, "instpkg.sh"),
769 pidfile = "instpkg_pidfile",
770 ecodefile = "instpkg_exitcode",
771 stdout = "instpkg_stdout",
772 stderr = "instpkg_stderr",
774 raise_on_error = raise_on_error)
776 return (out, err), proc
778 def remove_packages(self, packages, home, run_home = None,
779 raise_on_error = True):
780 """ Uninstall packages from the Linux host.
782 'home' is the directory to upload the package un-installation script.
783 'run_home' is the directory from where to execute the script.
786 command = rpmfuncs.remove_packages_command(self.os, packages)
788 command = debfuncs.remove_packages_command(self.os, packages)
790 msg = "Error removing packages ( OS not known ) "
792 raise RuntimeError, msg
794 run_home = run_home or home
796 (out, err), proc = self.run_and_wait(command, run_home,
797 shfile = os.path.join(home, "rmpkg.sh"),
798 pidfile = "rmpkg_pidfile",
799 ecodefile = "rmpkg_exitcode",
800 stdout = "rmpkg_stdout",
801 stderr = "rmpkg_stderr",
803 raise_on_error = raise_on_error)
805 return (out, err), proc
807 def mkdir(self, paths, clean = False):
808 """ Paths is either a single remote directory path to create,
809 or a list of directories to create.
814 if isinstance(paths, str):
817 cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
819 return self.execute(cmd, with_lock = True)
821 def rmdir(self, paths):
822 """ Paths is either a single remote directory path to delete,
823 or a list of directories to delete.
826 if isinstance(paths, str):
829 cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
831 return self.execute(cmd, with_lock = True)
833 def run_and_wait(self, command, home,
838 ecodefile = "exitcode",
844 raise_on_error = True):
846 Uploads the 'command' to a bash script in the host.
847 Then runs the script detached in background in the host, and
848 busy-waites until the script finishes executing.
851 if not shfile.startswith("/"):
852 shfile = os.path.join(home, shfile)
854 self.upload_command(command,
856 ecodefile = ecodefile,
858 overwrite = overwrite)
860 command = "bash %s" % shfile
861 # run command in background in remote host
862 (out, err), proc = self.run(command, home,
870 # check no errors occurred
872 msg = " Failed to run command '%s' " % command
873 self.error(msg, out, err)
875 raise RuntimeError, msg
877 # Wait for pid file to be generated
878 pid, ppid = self.wait_pid(
881 raise_on_error = raise_on_error)
883 # wait until command finishes to execute
884 self.wait_run(pid, ppid)
886 (eout, err), proc = self.check_errors(home,
887 ecodefile = ecodefile,
890 # Out is what was written in the stderr file
892 msg = " Failed to run command '%s' " % command
893 self.error(msg, eout, err)
896 raise RuntimeError, msg
898 (out, oerr), proc = self.check_output(home, stdout)
900 return (out, err), proc
902 def exitcode(self, home, ecodefile = "exitcode"):
904 Get the exit code of an application.
905 Returns an integer value with the exit code
907 (out, err), proc = self.check_output(home, ecodefile)
909 # Succeeded to open file, return exit code in the file
912 return int(out.strip())
914 # Error in the content of the file!
915 return ExitCode.CORRUPTFILE
917 # No such file or directory
918 if proc.returncode == 1:
919 return ExitCode.FILENOTFOUND
921 # Other error from 'cat'
922 return ExitCode.ERROR
924 def upload_command(self, command,
926 ecodefile = "exitcode",
929 """ Saves the command as a bash script file in the remote host, and
930 forces to save the exit code of the command execution to the ecodefile
933 if not (command.strip().endswith(";") or command.strip().endswith("&")):
936 # The exit code of the command will be stored in ecodefile
937 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
939 'ecodefile': ecodefile,
943 environ = self.format_environment(env)
945 # Add environ to command
946 command = environ + command
948 return self.upload(command, shfile, text = True, overwrite = overwrite)
950 def format_environment(self, env, inline = False):
951 """ Formats the environment variables for a command to be executed
952 either as an inline command
953 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
954 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
956 if not env: return ""
958 # Remove extra white spaces
959 env = re.sub(r'\s+', ' ', env.strip())
961 sep = ";" if inline else "\n"
962 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
964 def check_errors(self, home,
965 ecodefile = "exitcode",
967 """ Checks whether errors occurred while running a command.
968 It first checks the exit code for the command, and only if the
969 exit code is an error one it returns the error output.
975 # get exit code saved in the 'exitcode' file
976 ecode = self.exitcode(home, ecodefile)
978 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
979 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
980 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
981 # The process returned an error code or didn't exist.
982 # Check standard error.
983 (err, eerr), proc = self.check_output(home, stderr)
985 # If the stderr file was not found, assume nothing bad happened,
986 # and just ignore the error.
987 # (cat returns 1 for error "No such file or directory")
988 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
991 return ("", err), proc
993 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
994 """ Waits until the pid file for the command is generated,
995 and returns the pid and ppid of the process """
1000 pidtuple = self.getpid(home = home, pidfile = pidfile)
1003 pid, ppid = pidtuple
1009 msg = " Failed to get pid for pidfile %s/%s " % (
1014 raise RuntimeError, msg
1018 def wait_run(self, pid, ppid, trial = 0):
1019 """ wait for a remote process to finish execution """
1023 status = self.status(pid, ppid)
1025 if status is ProcStatus.FINISHED:
1027 elif status is not ProcStatus.RUNNING:
1030 # If it takes more than 20 seconds to start, then
1031 # asume something went wrong
1035 # The app is running, just wait...
1038 def check_output(self, home, filename):
1039 """ Retrives content of file """
1040 (out, err), proc = self.execute("cat %s" %
1041 os.path.join(home, filename), retry = 1, with_lock = True)
1042 return (out, err), proc
1045 """ Checks if host is responsive
1051 msg = "Unresponsive host. Wrong answer. "
1053 # The underlying SSH layer will sometimes return an empty
1054 # output (even if the command was executed without errors).
1055 # To work arround this, repeat the operation N times or
1056 # until the result is not empty string
1058 (out, err), proc = self.execute("echo 'ALIVE'",
1062 if out.find("ALIVE") > -1:
1065 trace = traceback.format_exc()
1066 msg = "Unresponsive host. Error reaching host: %s " % trace
1068 self.error(msg, out, err)
1071 def find_home(self):
1072 """ Retrieves host home directory
1074 # The underlying SSH layer will sometimes return an empty
1075 # output (even if the command was executed without errors).
1076 # To work arround this, repeat the operation N times or
1077 # until the result is not empty string
1078 msg = "Impossible to retrieve HOME directory"
1080 (out, err), proc = self.execute("echo ${HOME}",
1084 if out.strip() != "":
1085 self._home_dir = out.strip()
1087 trace = traceback.format_exc()
1088 msg = "Impossible to retrieve HOME directory %s" % trace
1090 if not self._home_dir:
1092 raise RuntimeError, msg
1094 def filter_existing_files(self, src, dst):
1095 """ Removes files that already exist in the Linux host from src list
1097 # construct a dictionary with { dst: src }
1098 dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
1099 if len(src) > 1 else dict({dst: src[0]})
1102 for d in dests.keys():
1103 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1105 command = ";".join(command)
1107 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1109 for d in dests.keys():
1110 if out.find(d) > -1:
1116 return dests.values()