2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!!
42 Error codes that the rexitcode function can return if unable to
43 check the exit code of a spawned process
52 Supported flavors of Linux OS
57 FEDORA_8 = 1 << 3 | FEDORA
58 FEDORA_12 = 1 << 4 | FEDORA
59 FEDORA_14 = 1 << 5 | FEDORA
62 class LinuxNode(ResourceManager):
64 .. class:: Class Args :
66 :param ec: The Experiment controller
67 :type ec: ExperimentController
68 :param guid: guid of the RM
73 There are different ways in which commands can be executed using the
74 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
79 * 'execute' (blocking mode) :
81 HOW IT WORKS: 'execute', forks a process and run the
82 command, synchronously, attached to the terminal, in
84 The execute method will block until the command returns
85 the result on 'out', 'err' (so until it finishes executing).
87 USAGE: short-lived commands that must be executed attached
88 to a terminal and in foreground, for which it IS necessary
89 to block until the command has finished (e.g. if you want
90 to run 'ls' or 'cat').
92 * 'execute' (NON blocking mode - blocking = False) :
94 HOW IT WORKS: Same as before, except that execute method
95 will return immediately (even if command still running).
97 USAGE: long-lived commands that must be executed attached
98 to a terminal and in foreground, but for which it is not
99 necessary to block until the command has finished. (e.g.
100 start an application using X11 forwarding)
104 HOW IT WORKS: Connects to the host ( using SSH if remote)
105 and launches the command in background, detached from any
106 terminal (daemonized), and returns. The command continues to
107 run remotely, but since it is detached from the terminal,
108 its pipes (stdin, stdout, stderr) can't be redirected to the
109 console (as normal non detached processes would), and so they
110 are explicitly redirected to files. The pidfile is created as
111 part of the process of launching the command. The pidfile
112 holds the pid and ppid of the process forked in background,
113 so later on it is possible to check whether the command is still
116 USAGE: long-lived commands that can run detached in background,
117 for which it is NOT necessary to block (wait) until the command
118 has finished. (e.g. start an application that is not using X11
119 forwarding. It can run detached and remotely in background)
123 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124 the command has finished execution. It also checks whether
125 errors occurred during runtime by reading the exitcode file,
126 which contains the exit code of the command that was run
127 (checking stderr only is not always reliable since many
128 commands throw debugging info to stderr and the only way to
129 automatically know whether an error really happened is to
130 check the process exit code).
132 Another difference with respect to 'run', is that instead
133 of directly executing the command as a bash command line,
134 it uploads the command to a bash script and runs the script.
135 This allows to use the bash script to debug errors, since
136 it remains at the remote host and can be run manually to
139 USAGE: medium-lived commands that can run detached in
140 background, for which it IS necessary to block (wait) until
141 the command has finished. (e.g. Package installation,
142 source compilation, file download, etc)
145 _rtype = "linux::Node"
146 _help = "Controls Linux host machines ( either localhost or a host " \
147 "that can be accessed using a SSH key)"
151 def _register_attributes(cls):
152 cls._register_attribute(
153 Attribute("hostname",
154 "Hostname of the machine",
155 flags = Flags.Design))
156 cls._register_attribute(
157 Attribute("username",
158 "Local account username",
159 flags = Flags.Credential))
160 cls._register_attribute(
163 flags = Flags.Design))
164 cls._register_attribute(
166 "Experiment home directory to store all experiment related files",
167 flags = Flags.Design))
168 cls._register_attribute(
169 Attribute("identity",
171 flags = Flags.Credential))
172 cls._register_attribute(
173 Attribute("serverKey",
175 flags = Flags.Design))
176 cls._register_attribute(
177 Attribute("cleanHome",
178 "Remove all nepi files and directories "
179 " from node home folder before starting experiment",
182 flags = Flags.Design))
183 cls._register_attribute(
184 Attribute("cleanExperiment",
185 "Remove all files and directories "
186 " from a previous same experiment, before the new experiment starts",
189 flags = Flags.Design))
190 cls._register_attribute(
191 Attribute("cleanProcesses",
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
196 cls._register_attribute(
197 Attribute("cleanProcessesAfter",
198 "Kill all running processes after starting experiment"
199 "NOTE: This might be dangerous when using user root",
202 flags = Flags.Design))
203 cls._register_attribute(
204 Attribute("tearDown",
205 "Bash script to be executed before releasing the resource",
206 flags = Flags.Design))
207 cls._register_attribute(
208 Attribute("gatewayUser",
209 "Gateway account username",
210 flags = Flags.Design))
211 cls._register_attribute(
213 "Hostname of the gateway machine",
214 flags = Flags.Design))
215 cls._register_attribute(
217 "Linux host public IP address. "
218 "Must not be modified by the user unless hostname is 'localhost'",
219 flags = Flags.Design))
221 def __init__(self, ec, guid):
222 super(LinuxNode, self).__init__(ec, guid)
224 # home directory at Linux host
227 # lock to prevent concurrent applications on the same node,
228 # to execute commands at the same time. There are potential
229 # concurrency issues when using SSH to a same host from
230 # multiple threads. There are also possible operational
231 # issues, e.g. an application querying the existence
232 # of a file or folder prior to its creation, and another
233 # application creating the same file or folder in between.
234 self._node_lock = threading.Lock()
236 def log_message(self, msg):
237 return " guid {} - host {} - {} "\
238 .format(self.guid, self.get("hostname"), msg)
242 home = self.get("home") or ""
243 if not home.startswith("/"):
244 home = os.path.join(self._home_dir, home)
249 return os.path.join(self.home_dir, ".nepi")
253 return os.path.join(self.nepi_home, "nepi-usr")
257 return os.path.join(self.usr_dir, "lib")
261 return os.path.join(self.usr_dir, "bin")
265 return os.path.join(self.usr_dir, "src")
269 return os.path.join(self.usr_dir, "share")
273 return os.path.join(self.nepi_home, "nepi-exp")
277 return os.path.join(self.exp_dir, self.ec.exp_id)
281 return os.path.join(self.exp_home, "node-{}".format(self.guid))
285 return os.path.join(self.node_home, self.ec.run_id)
292 if not self.localhost and not self.get("username"):
293 msg = "Can't resolve OS, insufficient data "
295 raise RuntimeError(msg)
299 if out.find("Debian") == 0:
300 self._os = OSType.DEBIAN
301 elif out.find("Ubuntu") ==0:
302 self._os = OSType.UBUNTU
303 elif out.find("Fedora release") == 0:
304 self._os = OSType.FEDORA
305 if out.find("Fedora release 8") == 0:
306 self._os = OSType.FEDORA_8
307 elif out.find("Fedora release 12") == 0:
308 self._os = OSType.FEDORA_12
309 elif out.find("Fedora release 14") == 0:
310 self._os = OSType.FEDORA_14
312 msg = "Unsupported OS"
314 raise RuntimeError("{} - {} ".format(msg, out))
319 # The underlying SSH layer will sometimes return an empty
320 # output (even if the command was executed without errors).
321 # To work arround this, repeat the operation N times or
322 # until the result is not empty string
325 (out, err), proc = self.execute("cat /etc/issue",
329 trace = traceback.format_exc()
330 msg = "Error detecting OS: {} ".format(trace)
331 self.error(msg, out, err)
337 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
341 return (self.os & OSType.FEDORA)
345 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
347 def do_provision(self):
348 # check if host is alive
349 if not self.is_alive():
350 trace = traceback.format_exc()
351 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
353 raise RuntimeError(msg)
357 if self.get("cleanProcesses"):
358 self.clean_processes()
360 if self.get("cleanHome"):
363 if self.get("cleanExperiment"):
364 self.clean_experiment()
366 # Create shared directory structure and node home directory
367 paths = [self.lib_dir,
375 # Get Public IP address if possible
376 if not self.get("ip"):
378 ip = sshfuncs.gethostbyname(self.get("hostname"))
381 if self.get("gateway") is None:
382 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
385 super(LinuxNode, self).do_provision()
388 if self.state == ResourceState.NEW:
389 self.info("Deploying node")
393 # Node needs to wait until all associated interfaces are
394 # ready before it can finalize deployment
395 from nepi.resources.linux.interface import LinuxInterface
396 ifaces = self.get_connected(LinuxInterface.get_rtype())
398 if iface.state < ResourceState.READY:
399 self.ec.schedule(self.reschedule_delay, self.deploy)
402 super(LinuxNode, self).do_deploy()
404 def do_release(self):
405 rms = self.get_connected()
407 # Node needs to wait until all associated RMs are released
408 # before it can be released
409 if rm.state != ResourceState.RELEASED:
410 self.ec.schedule(self.reschedule_delay, self.release)
413 tear_down = self.get("tearDown")
415 self.execute(tear_down)
417 if self.get("cleanProcessesAfter"):
418 self.clean_processes()
420 super(LinuxNode, self).do_release()
422 def valid_connection(self, guid):
426 def clean_processes(self):
427 self.info("Cleaning up processes")
432 if self.get("username") != 'root':
433 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
434 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
435 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
437 if self.state >= ResourceState.READY:
438 ########################
439 #Collect all process (must change for a more intelligent way)
442 avoid_pids = "ps axjf | awk '{print $1,$2}'"
443 (out, err), proc = self.execute(avoid_pids)
445 for line in out.strip().split("\n"):
446 parts = line.strip().split(" ")
447 ppid.append(parts[0])
448 pids.append(parts[1])
450 #Collect all process below ssh -D
453 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
454 (out, err), proc = self.execute(sshs)
456 for line in out.strip().split("\n"):
457 parts = line.strip().split(" ")
458 if parts[1].startswith('root@pts'):
459 ssh_pids.append(parts[0])
460 elif parts[1] == "-D":
461 tree_owner = parts[0]
465 #Search for the child process of the pid's collected at the first block.
466 for process in ssh_pids:
467 temp = self.search_for_child(process, pids, ppid)
468 avoid_kill = list(set(temp))
470 if len(avoid_kill) > 0:
471 avoid_kill.append(tree_owner)
472 ########################
475 with open("/tmp/save.proc", "rb") as pickle_file:
476 pids = pickle.load(pickle_file)
478 ps_aux = "ps aux | awk '{print $2,$11}'"
479 (out, err), proc = self.execute(ps_aux)
481 for line in out.strip().split("\n"):
482 parts = line.strip().split(" ")
483 pids_temp[parts[0]] = parts[1]
484 # creates the difference between the machine pids freezed (pickle) and the actual
485 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
486 # alive when using besides ssh connections
487 kill_pids = set(pids_temp.items()) - set(pids.items())
488 # py2/py3 : keep it simple
489 kill_pids = ' '.join(kill_pids)
491 # removing pids from beside connections and its process
492 kill_pids = kill_pids.split(' ')
493 kill_pids = list(set(kill_pids) - set(avoid_kill))
494 kill_pids = ' '.join(kill_pids)
496 cmd = ("killall tcpdump || /bin/true ; " +
497 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
498 "kill {} || /bin/true ; ".format(kill_pids))
500 cmd = ("killall tcpdump || /bin/true ; " +
501 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
503 cmd = ("killall tcpdump || /bin/true ; " +
504 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
508 def search_for_child(self, pid, pids, ppid, family=[]):
509 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
512 for key, value in enumerate(ppid):
515 self.search_for_child(child, pids, ppid)
518 def clean_home(self):
519 """ Cleans all NEPI related folders in the Linux host
521 self.info("Cleaning up home")
523 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
524 .format(self.home_dir)
526 return self.execute(cmd, with_lock = True)
528 def clean_experiment(self):
529 """ Cleans all experiment related files in the Linux host.
530 It preserves NEPI files and folders that have a multi experiment
533 self.info("Cleaning up experiment files")
535 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
536 .format(self.exp_dir, self.ec.exp_id)
538 return self.execute(cmd, with_lock = True)
540 def execute(self, command,
546 connect_timeout = 30,
547 strict_host_checking = False,
552 """ Notice that this invocation will block until the
553 execution finishes. If this is not the desired behavior,
554 use 'run' instead."""
557 (out, err), proc = execfuncs.lexec(
559 user = self.get("username"), # still problem with localhost
564 # If the execute command is blocking, we don't want to keep
565 # the node lock. This lock is used to avoid race conditions
566 # when creating the ControlMaster sockets. A more elegant
567 # solution is needed.
568 with self._node_lock:
569 (out, err), proc = sshfuncs.rexec(
571 host = self.get("hostname"),
572 user = self.get("username"),
573 port = self.get("port"),
574 gwuser = self.get("gatewayUser"),
575 gw = self.get("gateway"),
578 identity = self.get("identity"),
579 server_key = self.get("serverKey"),
582 forward_x11 = forward_x11,
584 connect_timeout = connect_timeout,
585 persistent = persistent,
587 strict_host_checking = strict_host_checking
590 (out, err), proc = sshfuncs.rexec(
592 host = self.get("hostname"),
593 user = self.get("username"),
594 port = self.get("port"),
595 gwuser = self.get("gatewayUser"),
596 gw = self.get("gateway"),
599 identity = self.get("identity"),
600 server_key = self.get("serverKey"),
603 forward_x11 = forward_x11,
605 connect_timeout = connect_timeout,
606 persistent = persistent,
608 strict_host_checking = strict_host_checking
611 return (out, err), proc
613 def run(self, command, home,
621 strict_host_checking = False):
623 self.debug("Running command '{}'".format(command))
626 (out, err), proc = execfuncs.lspawn(
629 create_home = create_home,
630 stdin = stdin or '/dev/null',
631 stdout = stdout or '/dev/null',
632 stderr = stderr or '/dev/null',
635 with self._node_lock:
636 (out, err), proc = sshfuncs.rspawn(
640 create_home = create_home,
641 stdin = stdin or '/dev/null',
642 stdout = stdout or '/dev/null',
643 stderr = stderr or '/dev/null',
645 host = self.get("hostname"),
646 user = self.get("username"),
647 port = self.get("port"),
648 gwuser = self.get("gatewayUser"),
649 gw = self.get("gateway"),
651 identity = self.get("identity"),
652 server_key = self.get("serverKey"),
654 strict_host_checking = strict_host_checking
657 return (out, err), proc
659 def getpid(self, home, pidfile = "pidfile"):
661 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
663 with self._node_lock:
664 pidtuple = sshfuncs.rgetpid(
665 os.path.join(home, pidfile),
666 host = self.get("hostname"),
667 user = self.get("username"),
668 port = self.get("port"),
669 gwuser = self.get("gatewayUser"),
670 gw = self.get("gateway"),
672 identity = self.get("identity"),
673 server_key = self.get("serverKey"),
674 strict_host_checking = False
679 def status(self, pid, ppid):
681 status = execfuncs.lstatus(pid, ppid)
683 with self._node_lock:
684 status = sshfuncs.rstatus(
686 host = self.get("hostname"),
687 user = self.get("username"),
688 port = self.get("port"),
689 gwuser = self.get("gatewayUser"),
690 gw = self.get("gateway"),
692 identity = self.get("identity"),
693 server_key = self.get("serverKey"),
694 strict_host_checking = False
699 def kill(self, pid, ppid, sudo = False):
702 status = self.status(pid, ppid)
704 if status == sshfuncs.ProcStatus.RUNNING:
706 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
708 with self._node_lock:
709 (out, err), proc = sshfuncs.rkill(
711 host = self.get("hostname"),
712 user = self.get("username"),
713 port = self.get("port"),
714 gwuser = self.get("gatewayUser"),
715 gw = self.get("gateway"),
718 identity = self.get("identity"),
719 server_key = self.get("serverKey"),
720 strict_host_checking = False
723 return (out, err), proc
725 def copy(self, src, dst):
727 (out, err), proc = execfuncs.lcopy(
731 with self._node_lock:
732 (out, err), proc = sshfuncs.rcopy(
734 port = self.get("port"),
735 gwuser = self.get("gatewayUser"),
736 gw = self.get("gateway"),
737 identity = self.get("identity"),
738 server_key = self.get("serverKey"),
740 strict_host_checking = False)
742 return (out, err), proc
744 def upload(self, src, dst, text = False, overwrite = True,
745 raise_on_error = True):
746 """ Copy content to destination
748 src string with the content to copy. Can be:
750 - a string with the path to a local file
751 - a string with a semi-colon separeted list of local files
752 - a string with a local directory
754 dst string with destination path on the remote host (remote is
757 text src is text input, it must be stored into a temp file before
760 # If source is a string input
762 if text and not os.path.isfile(src):
763 # src is text input that should be uploaded as file
764 # create a temporal file with the content to upload
765 # in python3 we need to open in binary mode if str is bytes
766 mode = 'w' if isinstance(src, str) else 'wb'
767 f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
772 # If dst files should not be overwritten, check that the files do not
774 if isinstance(src, str):
775 src = [s.strip() for s in src.split(";")]
777 if overwrite == False:
778 src = self.filter_existing_files(src, dst)
780 return ("", ""), None
782 if not self.localhost:
783 # Build destination as <user>@<server>:<path>
784 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
786 ((out, err), proc) = self.copy(src, dst)
793 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
794 self.error(msg, out, err)
796 msg = "{} out: {} err: {}".format(msg, out, err)
798 raise RuntimeError(msg)
800 return ((out, err), proc)
802 def download(self, src, dst, raise_on_error = True):
803 if not self.localhost:
804 # Build destination as <user>@<server>:<path>
805 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
807 ((out, err), proc) = self.copy(src, dst)
810 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
811 self.error(msg, out, err)
814 raise RuntimeError(msg)
816 return ((out, err), proc)
818 def install_packages_command(self, packages):
821 command = rpmfuncs.install_packages_command(self.os, packages)
823 command = debfuncs.install_packages_command(self.os, packages)
825 msg = "Error installing packages ( OS not known ) "
826 self.error(msg, self.os)
827 raise RuntimeError(msg)
831 def install_packages(self, packages, home,
833 raise_on_error = True):
834 """ Install packages in the Linux host.
836 'home' is the directory to upload the package installation script.
837 'run_home' is the directory from where to execute the script.
839 command = self.install_packages_command(packages)
841 run_home = run_home or home
843 (out, err), proc = self.run_and_wait(command, run_home,
844 shfile = os.path.join(home, "instpkg.sh"),
845 pidfile = "instpkg_pidfile",
846 ecodefile = "instpkg_exitcode",
847 stdout = "instpkg_stdout",
848 stderr = "instpkg_stderr",
850 raise_on_error = raise_on_error)
852 return (out, err), proc
854 def remove_packages(self, packages, home, run_home = None,
855 raise_on_error = True):
856 """ Uninstall packages from the Linux host.
858 'home' is the directory to upload the package un-installation script.
859 'run_home' is the directory from where to execute the script.
862 command = rpmfuncs.remove_packages_command(self.os, packages)
864 command = debfuncs.remove_packages_command(self.os, packages)
866 msg = "Error removing packages ( OS not known ) "
868 raise RuntimeError(msg)
870 run_home = run_home or home
872 (out, err), proc = self.run_and_wait(command, run_home,
873 shfile = os.path.join(home, "rmpkg.sh"),
874 pidfile = "rmpkg_pidfile",
875 ecodefile = "rmpkg_exitcode",
876 stdout = "rmpkg_stdout",
877 stderr = "rmpkg_stderr",
879 raise_on_error = raise_on_error)
881 return (out, err), proc
883 def mkdir(self, paths, clean = False):
884 """ Paths is either a single remote directory path to create,
885 or a list of directories to create.
890 if isinstance(paths, str):
893 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
895 return self.execute(cmd, with_lock = True)
897 def rmdir(self, paths):
898 """ Paths is either a single remote directory path to delete,
899 or a list of directories to delete.
902 if isinstance(paths, str):
905 cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
907 return self.execute(cmd, with_lock = True)
909 def run_and_wait(self, command, home,
915 ecodefile="exitcode",
921 raise_on_error=True):
923 Uploads the 'command' to a bash script in the host.
924 Then runs the script detached in background in the host, and
925 busy-waites until the script finishes executing.
928 if not shfile.startswith("/"):
929 shfile = os.path.join(home, shfile)
931 self.upload_command(command,
933 ecodefile = ecodefile,
935 overwrite = overwrite)
937 command = "bash {}".format(shfile)
938 # run command in background in remote host
939 (out, err), proc = self.run(command, home,
947 # check no errors occurred
949 msg = " Failed to run command '{}' ".format(command)
950 self.error(msg, out, err)
952 raise RuntimeError(msg)
954 # Wait for pid file to be generated
955 pid, ppid = self.wait_pid(
958 raise_on_error = raise_on_error)
961 # wait until command finishes to execute
962 self.wait_run(pid, ppid)
964 (eout, err), proc = self.check_errors(home,
965 ecodefile = ecodefile,
968 # Out is what was written in the stderr file
970 msg = " Failed to run command '{}' ".format(command)
971 self.error(msg, eout, err)
974 raise RuntimeError(msg)
976 (out, oerr), proc = self.check_output(home, stdout)
978 return (out, err), proc
980 def exitcode(self, home, ecodefile = "exitcode"):
982 Get the exit code of an application.
983 Returns an integer value with the exit code
985 (out, err), proc = self.check_output(home, ecodefile)
987 # Succeeded to open file, return exit code in the file
990 return int(out.strip())
992 # Error in the content of the file!
993 return ExitCode.CORRUPTFILE
995 # No such file or directory
996 if proc.returncode == 1:
997 return ExitCode.FILENOTFOUND
999 # Other error from 'cat'
1000 return ExitCode.ERROR
1002 def upload_command(self, command,
1004 ecodefile="exitcode",
1007 """ Saves the command as a bash script file in the remote host, and
1008 forces to save the exit code of the command execution to the ecodefile
1011 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1014 # The exit code of the command will be stored in ecodefile
1015 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1016 .format(command=command, ecodefile=ecodefile)
1018 # Export environment
1019 environ = self.format_environment(env)
1021 # Add environ to command
1022 command = environ + command
1024 return self.upload(command, shfile, text=True, overwrite=overwrite)
1026 def format_environment(self, env, inline=False):
1027 """ Formats the environment variables for a command to be executed
1028 either as an inline command
1029 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1030 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1032 if not env: return ""
1034 # Remove extra white spaces
1035 env = re.sub(r'\s+', ' ', env.strip())
1037 sep = ";" if inline else "\n"
1038 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1040 def check_errors(self, home,
1041 ecodefile = "exitcode",
1043 """ Checks whether errors occurred while running a command.
1044 It first checks the exit code for the command, and only if the
1045 exit code is an error one it returns the error output.
1051 # get exit code saved in the 'exitcode' file
1052 ecode = self.exitcode(home, ecodefile)
1054 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1055 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1056 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1057 # The process returned an error code or didn't exist.
1058 # Check standard error.
1059 (err, eerr), proc = self.check_output(home, stderr)
1061 # If the stderr file was not found, assume nothing bad happened,
1062 # and just ignore the error.
1063 # (cat returns 1 for error "No such file or directory")
1064 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1067 return ("", err), proc
1069 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1070 """ Waits until the pid file for the command is generated,
1071 and returns the pid and ppid of the process """
1076 pidtuple = self.getpid(home = home, pidfile = pidfile)
1079 pid, ppid = pidtuple
1085 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1089 raise RuntimeError(msg)
1093 def wait_run(self, pid, ppid, trial = 0):
1094 """ wait for a remote process to finish execution """
1098 status = self.status(pid, ppid)
1100 if status is ProcStatus.FINISHED:
1102 elif status is not ProcStatus.RUNNING:
1105 # If it takes more than 20 seconds to start, then
1106 # asume something went wrong
1110 # The app is running, just wait...
1113 def check_output(self, home, filename):
1114 """ Retrives content of file """
1115 (out, err), proc = self.execute(
1116 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1117 return (out, err), proc
1120 """ Checks if host is responsive
1126 msg = "Unresponsive host. Wrong answer. "
1128 # The underlying SSH layer will sometimes return an empty
1129 # output (even if the command was executed without errors).
1130 # To work arround this, repeat the operation N times or
1131 # until the result is not empty string
1133 (out, err), proc = self.execute("echo 'ALIVE'",
1137 if out.find("ALIVE") > -1:
1140 trace = traceback.format_exc()
1141 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1143 self.error(msg, out, err)
1146 def find_home(self):
1148 Retrieves host home directory
1150 # The underlying SSH layer will sometimes return an empty
1151 # output (even if the command was executed without errors).
1152 # To work arround this, repeat the operation N times or
1153 # until the result is not empty string
1154 msg = "Impossible to retrieve HOME directory"
1156 (out, err), proc = self.execute("echo ${HOME}",
1160 if out.strip() != "":
1161 self._home_dir = out.strip()
1163 trace = traceback.format_exc()
1164 msg = "Impossible to retrieve HOME directory {}".format(trace)
1166 if not self._home_dir:
1168 raise RuntimeError(msg)
1170 def filter_existing_files(self, src, dst):
1171 """ Removes files that already exist in the Linux host from src list
1173 # construct a dictionary with { dst: src }
1174 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1175 if len(src) > 1 else {dst: src[0]}
1179 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1181 command = ";".join(command)
1183 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1185 # avoid RuntimeError that would result from
1186 # changing loop subject during iteration
1187 keys = list(dests.keys())
1189 if out.find(d) > -1:
1195 retcod = dests.values()
1196 if PY3: retcod = list(retcod)