2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError(msg)
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 trace = traceback.format_exc()
356 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
358 raise RuntimeError(msg)
362 if self.get("cleanProcesses"):
363 self.clean_processes()
365 if self.get("cleanHome"):
368 if self.get("cleanExperiment"):
369 self.clean_experiment()
371 # Create shared directory structure and node home directory
372 paths = [self.lib_dir,
380 # Get Public IP address if possible
381 if not self.get("ip"):
383 ip = sshfuncs.gethostbyname(self.get("hostname"))
386 if self.get("gateway") is None:
387 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390 super(LinuxNode, self).do_provision()
393 if self.state == ResourceState.NEW:
394 self.info("Deploying node")
398 # Node needs to wait until all associated interfaces are
399 # ready before it can finalize deployment
400 from nepi.resources.linux.interface import LinuxInterface
401 ifaces = self.get_connected(LinuxInterface.get_rtype())
403 if iface.state < ResourceState.READY:
404 self.ec.schedule(self.reschedule_delay, self.deploy)
407 super(LinuxNode, self).do_deploy()
409 def do_release(self):
410 rms = self.get_connected()
412 # Node needs to wait until all associated RMs are released
413 # before it can be released
414 if rm.state != ResourceState.RELEASED:
415 self.ec.schedule(self.reschedule_delay, self.release)
418 tear_down = self.get("tearDown")
420 self.execute(tear_down)
422 if self.get("cleanProcessesAfter"):
423 self.clean_processes()
425 super(LinuxNode, self).do_release()
427 def valid_connection(self, guid):
431 def clean_processes(self):
432 self.info("Cleaning up processes")
437 if self.get("username") != 'root':
438 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
442 if self.state >= ResourceState.READY:
443 ########################
444 #Collect all process (must change for a more intelligent way)
447 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448 (out, err), proc = self.execute(avoid_pids)
450 for line in out.strip().split("\n"):
451 parts = line.strip().split(" ")
452 ppid.append(parts[0])
453 pids.append(parts[1])
455 #Collect all process below ssh -D
458 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459 (out, err), proc = self.execute(sshs)
461 for line in out.strip().split("\n"):
462 parts = line.strip().split(" ")
463 if parts[1].startswith('root@pts'):
464 ssh_pids.append(parts[0])
465 elif parts[1] == "-D":
466 tree_owner = parts[0]
470 #Search for the child process of the pid's collected at the first block.
471 for process in ssh_pids:
472 temp = self.search_for_child(process, pids, ppid)
473 avoid_kill = list(set(temp))
475 if len(avoid_kill) > 0:
476 avoid_kill.append(tree_owner)
477 ########################
480 with open("/tmp/save.proc", "rb") as pickle_file:
481 pids = pickle.load(pickle_file)
483 ps_aux = "ps aux | awk '{print $2,$11}'"
484 (out, err), proc = self.execute(ps_aux)
486 for line in out.strip().split("\n"):
487 parts = line.strip().split(" ")
488 pids_temp[parts[0]] = parts[1]
489 # creates the difference between the machine pids freezed (pickle) and the actual
490 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
491 # alive when using besides ssh connections
492 kill_pids = set(pids_temp.items()) - set(pids.items())
493 # py2/py3 : keep it simple
494 kill_pids = ' '.join(kill_pids)
496 # removing pids from beside connections and its process
497 kill_pids = kill_pids.split(' ')
498 kill_pids = list(set(kill_pids) - set(avoid_kill))
499 kill_pids = ' '.join(kill_pids)
501 cmd = ("killall tcpdump || /bin/true ; " +
502 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
503 "kill {} || /bin/true ; ".format(kill_pids))
505 cmd = ("killall tcpdump || /bin/true ; " +
506 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
508 cmd = ("killall tcpdump || /bin/true ; " +
509 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
511 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
513 def search_for_child(self, pid, pids, ppid, family=[]):
514 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
517 for key, value in enumerate(ppid):
520 self.search_for_child(child, pids, ppid)
523 def clean_home(self):
524 """ Cleans all NEPI related folders in the Linux host
526 self.info("Cleaning up home")
528 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
529 .format(self.home_dir)
531 return self.execute(cmd, with_lock = True)
533 def clean_experiment(self):
534 """ Cleans all experiment related files in the Linux host.
535 It preserves NEPI files and folders that have a multi experiment
538 self.info("Cleaning up experiment files")
540 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
541 .format(self.exp_dir, self.ec.exp_id)
543 return self.execute(cmd, with_lock = True)
545 def execute(self, command,
551 connect_timeout = 30,
552 strict_host_checking = False,
557 """ Notice that this invocation will block until the
558 execution finishes. If this is not the desired behavior,
559 use 'run' instead."""
562 (out, err), proc = execfuncs.lexec(
564 user = self.get("username"), # still problem with localhost
569 # If the execute command is blocking, we don't want to keep
570 # the node lock. This lock is used to avoid race conditions
571 # when creating the ControlMaster sockets. A more elegant
572 # solution is needed.
573 with self._node_lock:
574 (out, err), proc = sshfuncs.rexec(
576 host = self.get("hostname"),
577 user = self.get("username"),
578 port = self.get("port"),
579 gwuser = self.get("gatewayUser"),
580 gw = self.get("gateway"),
583 identity = self.get("identity"),
584 server_key = self.get("serverKey"),
587 forward_x11 = forward_x11,
589 connect_timeout = connect_timeout,
590 persistent = persistent,
592 strict_host_checking = strict_host_checking
595 (out, err), proc = sshfuncs.rexec(
597 host = self.get("hostname"),
598 user = self.get("username"),
599 port = self.get("port"),
600 gwuser = self.get("gatewayUser"),
601 gw = self.get("gateway"),
604 identity = self.get("identity"),
605 server_key = self.get("serverKey"),
608 forward_x11 = forward_x11,
610 connect_timeout = connect_timeout,
611 persistent = persistent,
613 strict_host_checking = strict_host_checking
616 return (out, err), proc
618 def run(self, command, home,
626 strict_host_checking = False):
628 self.debug("Running command '{}'".format(command))
631 (out, err), proc = execfuncs.lspawn(
634 create_home = create_home,
635 stdin = stdin or '/dev/null',
636 stdout = stdout or '/dev/null',
637 stderr = stderr or '/dev/null',
640 with self._node_lock:
641 (out, err), proc = sshfuncs.rspawn(
645 create_home = create_home,
646 stdin = stdin or '/dev/null',
647 stdout = stdout or '/dev/null',
648 stderr = stderr or '/dev/null',
650 host = self.get("hostname"),
651 user = self.get("username"),
652 port = self.get("port"),
653 gwuser = self.get("gatewayUser"),
654 gw = self.get("gateway"),
656 identity = self.get("identity"),
657 server_key = self.get("serverKey"),
659 strict_host_checking = strict_host_checking
662 return (out, err), proc
664 def getpid(self, home, pidfile = "pidfile"):
666 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
668 with self._node_lock:
669 pidtuple = sshfuncs.rgetpid(
670 os.path.join(home, pidfile),
671 host = self.get("hostname"),
672 user = self.get("username"),
673 port = self.get("port"),
674 gwuser = self.get("gatewayUser"),
675 gw = self.get("gateway"),
677 identity = self.get("identity"),
678 server_key = self.get("serverKey"),
679 strict_host_checking = False
684 def status(self, pid, ppid):
686 status = execfuncs.lstatus(pid, ppid)
688 with self._node_lock:
689 status = sshfuncs.rstatus(
691 host = self.get("hostname"),
692 user = self.get("username"),
693 port = self.get("port"),
694 gwuser = self.get("gatewayUser"),
695 gw = self.get("gateway"),
697 identity = self.get("identity"),
698 server_key = self.get("serverKey"),
699 strict_host_checking = False
704 def kill(self, pid, ppid, sudo = False):
707 status = self.status(pid, ppid)
709 if status == sshfuncs.ProcStatus.RUNNING:
711 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
713 with self._node_lock:
714 (out, err), proc = sshfuncs.rkill(
716 host = self.get("hostname"),
717 user = self.get("username"),
718 port = self.get("port"),
719 gwuser = self.get("gatewayUser"),
720 gw = self.get("gateway"),
723 identity = self.get("identity"),
724 server_key = self.get("serverKey"),
725 strict_host_checking = False
728 return (out, err), proc
730 def copy(self, src, dst):
732 (out, err), proc = execfuncs.lcopy(
736 with self._node_lock:
737 (out, err), proc = sshfuncs.rcopy(
739 port = self.get("port"),
740 gwuser = self.get("gatewayUser"),
741 gw = self.get("gateway"),
742 identity = self.get("identity"),
743 server_key = self.get("serverKey"),
745 strict_host_checking = False)
747 return (out, err), proc
749 def upload(self, src, dst, text = False, overwrite = True,
750 raise_on_error = True):
751 """ Copy content to destination
753 src string with the content to copy. Can be:
755 - a string with the path to a local file
756 - a string with a semi-colon separeted list of local files
757 - a string with a local directory
759 dst string with destination path on the remote host (remote is
762 text src is text input, it must be stored into a temp file before
765 # If source is a string input
767 if text and not os.path.isfile(src):
768 # src is text input that should be uploaded as file
769 # create a temporal file with the content to upload
770 # in python3 we need to open in binary mode if str is bytes
771 mode = 'w' if isinstance(src, str) else 'wb'
772 f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
777 # If dst files should not be overwritten, check that the files do not
779 if isinstance(src, str):
780 src = [s.strip() for s in src.split(";")]
782 if overwrite == False:
783 src = self.filter_existing_files(src, dst)
785 return ("", ""), None
787 if not self.localhost:
788 # Build destination as <user>@<server>:<path>
789 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
791 ((out, err), proc) = self.copy(src, dst)
798 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
799 self.error(msg, out, err)
801 msg = "{} out: {} err: {}".format(msg, out, err)
803 raise RuntimeError(msg)
805 return ((out, err), proc)
807 def download(self, src, dst, raise_on_error = True):
808 if not self.localhost:
809 # Build destination as <user>@<server>:<path>
810 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
812 ((out, err), proc) = self.copy(src, dst)
815 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
816 self.error(msg, out, err)
819 raise RuntimeError(msg)
821 return ((out, err), proc)
823 def install_packages_command(self, packages):
826 command = rpmfuncs.install_packages_command(self.os, packages)
828 command = debfuncs.install_packages_command(self.os, packages)
830 msg = "Error installing packages ( OS not known ) "
831 self.error(msg, self.os)
832 raise RuntimeError(msg)
836 def install_packages(self, packages, home,
838 raise_on_error = True):
839 """ Install packages in the Linux host.
841 'home' is the directory to upload the package installation script.
842 'run_home' is the directory from where to execute the script.
844 command = self.install_packages_command(packages)
846 run_home = run_home or home
848 (out, err), proc = self.run_and_wait(command, run_home,
849 shfile = os.path.join(home, "instpkg.sh"),
850 pidfile = "instpkg_pidfile",
851 ecodefile = "instpkg_exitcode",
852 stdout = "instpkg_stdout",
853 stderr = "instpkg_stderr",
855 raise_on_error = raise_on_error)
857 return (out, err), proc
859 def remove_packages(self, packages, home, run_home = None,
860 raise_on_error = True):
861 """ Uninstall packages from the Linux host.
863 'home' is the directory to upload the package un-installation script.
864 'run_home' is the directory from where to execute the script.
867 command = rpmfuncs.remove_packages_command(self.os, packages)
869 command = debfuncs.remove_packages_command(self.os, packages)
871 msg = "Error removing packages ( OS not known ) "
873 raise RuntimeError(msg)
875 run_home = run_home or home
877 (out, err), proc = self.run_and_wait(command, run_home,
878 shfile = os.path.join(home, "rmpkg.sh"),
879 pidfile = "rmpkg_pidfile",
880 ecodefile = "rmpkg_exitcode",
881 stdout = "rmpkg_stdout",
882 stderr = "rmpkg_stderr",
884 raise_on_error = raise_on_error)
886 return (out, err), proc
888 def mkdir(self, paths, clean = False):
889 """ Paths is either a single remote directory path to create,
890 or a list of directories to create.
895 if isinstance(paths, str):
898 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
900 return self.execute(cmd, with_lock = True)
902 def rmdir(self, paths):
903 """ Paths is either a single remote directory path to delete,
904 or a list of directories to delete.
907 if isinstance(paths, str):
910 cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
912 return self.execute(cmd, with_lock = True)
914 def run_and_wait(self, command, home,
920 ecodefile="exitcode",
926 raise_on_error=True):
928 Uploads the 'command' to a bash script in the host.
929 Then runs the script detached in background in the host, and
930 busy-waites until the script finishes executing.
933 if not shfile.startswith("/"):
934 shfile = os.path.join(home, shfile)
936 self.upload_command(command,
938 ecodefile = ecodefile,
940 overwrite = overwrite)
942 command = "bash {}".format(shfile)
943 # run command in background in remote host
944 (out, err), proc = self.run(command, home,
952 # check no errors occurred
954 msg = " Failed to run command '{}' ".format(command)
955 self.error(msg, out, err)
957 raise RuntimeError(msg)
959 # Wait for pid file to be generated
960 pid, ppid = self.wait_pid(
963 raise_on_error = raise_on_error)
966 # wait until command finishes to execute
967 self.wait_run(pid, ppid)
969 (eout, err), proc = self.check_errors(home,
970 ecodefile = ecodefile,
973 # Out is what was written in the stderr file
975 msg = " Failed to run command '{}' ".format(command)
976 self.error(msg, eout, err)
979 raise RuntimeError(msg)
981 (out, oerr), proc = self.check_output(home, stdout)
983 return (out, err), proc
985 def exitcode(self, home, ecodefile = "exitcode"):
987 Get the exit code of an application.
988 Returns an integer value with the exit code
990 (out, err), proc = self.check_output(home, ecodefile)
992 # Succeeded to open file, return exit code in the file
995 return int(out.strip())
997 # Error in the content of the file!
998 return ExitCode.CORRUPTFILE
1000 # No such file or directory
1001 if proc.returncode == 1:
1002 return ExitCode.FILENOTFOUND
1004 # Other error from 'cat'
1005 return ExitCode.ERROR
1007 def upload_command(self, command,
1009 ecodefile="exitcode",
1012 """ Saves the command as a bash script file in the remote host, and
1013 forces to save the exit code of the command execution to the ecodefile
1016 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1019 # The exit code of the command will be stored in ecodefile
1020 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1021 .format(command=command, ecodefile=ecodefile)
1023 # Export environment
1024 environ = self.format_environment(env)
1026 # Add environ to command
1027 command = environ + command
1029 return self.upload(command, shfile, text=True, overwrite=overwrite)
1031 def format_environment(self, env, inline=False):
1032 """ Formats the environment variables for a command to be executed
1033 either as an inline command
1034 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1035 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1037 if not env: return ""
1039 # Remove extra white spaces
1040 env = re.sub(r'\s+', ' ', env.strip())
1042 sep = ";" if inline else "\n"
1043 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1045 def check_errors(self, home,
1046 ecodefile = "exitcode",
1048 """ Checks whether errors occurred while running a command.
1049 It first checks the exit code for the command, and only if the
1050 exit code is an error one it returns the error output.
1056 # get exit code saved in the 'exitcode' file
1057 ecode = self.exitcode(home, ecodefile)
1059 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1060 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1061 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1062 # The process returned an error code or didn't exist.
1063 # Check standard error.
1064 (err, eerr), proc = self.check_output(home, stderr)
1066 # If the stderr file was not found, assume nothing bad happened,
1067 # and just ignore the error.
1068 # (cat returns 1 for error "No such file or directory")
1069 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1072 return ("", err), proc
1074 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1075 """ Waits until the pid file for the command is generated,
1076 and returns the pid and ppid of the process """
1081 pidtuple = self.getpid(home = home, pidfile = pidfile)
1084 pid, ppid = pidtuple
1090 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1094 raise RuntimeError(msg)
1098 def wait_run(self, pid, ppid, trial = 0):
1099 """ wait for a remote process to finish execution """
1103 status = self.status(pid, ppid)
1105 if status is ProcStatus.FINISHED:
1107 elif status is not ProcStatus.RUNNING:
1110 # If it takes more than 20 seconds to start, then
1111 # asume something went wrong
1115 # The app is running, just wait...
1118 def check_output(self, home, filename):
1119 """ Retrives content of file """
1120 (out, err), proc = self.execute(
1121 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1122 return (out, err), proc
1125 """ Checks if host is responsive
1131 msg = "Unresponsive host. Wrong answer. "
1133 # The underlying SSH layer will sometimes return an empty
1134 # output (even if the command was executed without errors).
1135 # To work arround this, repeat the operation N times or
1136 # until the result is not empty string
1138 (out, err), proc = self.execute("echo 'ALIVE'",
1142 if out.find("ALIVE") > -1:
1145 trace = traceback.format_exc()
1146 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1148 self.error(msg, out, err)
1151 def find_home(self):
1153 Retrieves host home directory
1155 # The underlying SSH layer will sometimes return an empty
1156 # output (even if the command was executed without errors).
1157 # To work arround this, repeat the operation N times or
1158 # until the result is not empty string
1159 msg = "Impossible to retrieve HOME directory"
1161 (out, err), proc = self.execute("echo ${HOME}",
1165 if out.strip() != "":
1166 self._home_dir = out.strip()
1168 trace = traceback.format_exc()
1169 msg = "Impossible to retrieve HOME directory {}".format(trace)
1171 if not self._home_dir:
1173 raise RuntimeError(msg)
1175 def filter_existing_files(self, src, dst):
1176 """ Removes files that already exist in the Linux host from src list
1178 # construct a dictionary with { dst: src }
1179 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1180 if len(src) > 1 else {dst: src[0]}
1184 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1186 command = ";".join(command)
1188 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1191 if out.find(d) > -1:
1197 return dests.values()