2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError(msg)
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 trace = traceback.format_exc()
356 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
358 raise RuntimeError(msg)
362 if self.get("cleanProcesses"):
363 self.clean_processes()
365 if self.get("cleanHome"):
368 if self.get("cleanExperiment"):
369 self.clean_experiment()
371 # Create shared directory structure and node home directory
372 paths = [self.lib_dir,
380 # Get Public IP address if possible
381 if not self.get("ip"):
383 ip = sshfuncs.gethostbyname(self.get("hostname"))
386 if self.get("gateway") is None:
387 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390 super(LinuxNode, self).do_provision()
393 if self.state == ResourceState.NEW:
394 self.info("Deploying node")
398 # Node needs to wait until all associated interfaces are
399 # ready before it can finalize deployment
400 from nepi.resources.linux.interface import LinuxInterface
401 ifaces = self.get_connected(LinuxInterface.get_rtype())
403 if iface.state < ResourceState.READY:
404 self.ec.schedule(self.reschedule_delay, self.deploy)
407 super(LinuxNode, self).do_deploy()
409 def do_release(self):
410 rms = self.get_connected()
412 # Node needs to wait until all associated RMs are released
413 # before it can be released
414 if rm.state != ResourceState.RELEASED:
415 self.ec.schedule(self.reschedule_delay, self.release)
418 tear_down = self.get("tearDown")
420 self.execute(tear_down)
422 if self.get("cleanProcessesAfter"):
423 self.clean_processes()
425 super(LinuxNode, self).do_release()
427 def valid_connection(self, guid):
431 def clean_processes(self):
432 self.info("Cleaning up processes")
437 if self.get("username") != 'root':
438 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
442 if self.state >= ResourceState.READY:
443 ########################
444 #Collect all process (must change for a more intelligent way)
447 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448 (out, err), proc = self.execute(avoid_pids)
450 for line in out.strip().split("\n"):
451 parts = line.strip().split(" ")
452 ppid.append(parts[0])
453 pids.append(parts[1])
455 #Collect all process below ssh -D
458 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459 (out, err), proc = self.execute(sshs)
461 for line in out.strip().split("\n"):
462 parts = line.strip().split(" ")
463 if parts[1].startswith('root@pts'):
464 ssh_pids.append(parts[0])
465 elif parts[1] == "-D":
466 tree_owner = parts[0]
470 #Search for the child process of the pid's collected at the first block.
471 for process in ssh_pids:
472 temp = self.search_for_child(process, pids, ppid)
473 avoid_kill = list(set(temp))
475 if len(avoid_kill) > 0:
476 avoid_kill.append(tree_owner)
477 ########################
480 pids = pickle.load(open("/tmp/save.proc", "rb"))
482 ps_aux = "ps aux | awk '{print $2,$11}'"
483 (out, err), proc = self.execute(ps_aux)
485 for line in out.strip().split("\n"):
486 parts = line.strip().split(" ")
487 pids_temp[parts[0]] = parts[1]
488 # creates the difference between the machine pids freezed (pickle) and the actual
489 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
490 # alive when using besides ssh connections
491 kill_pids = set(pids_temp.items()) - set(pids.items())
492 kill_pids = ' '.join(list(dict(kill_pids).keys()))
494 # removing pids from beside connections and its process
495 kill_pids = kill_pids.split(' ')
496 kill_pids = list(set(kill_pids) - set(avoid_kill))
497 kill_pids = ' '.join(kill_pids)
499 cmd = ("killall tcpdump || /bin/true ; " +
500 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
501 "kill {} || /bin/true ; ".format(kill_pids))
503 cmd = ("killall tcpdump || /bin/true ; " +
504 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506 cmd = ("killall tcpdump || /bin/true ; " +
507 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
509 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
511 def search_for_child(self, pid, pids, ppid, family=[]):
512 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
515 for key, value in enumerate(ppid):
518 self.search_for_child(child, pids, ppid)
521 def clean_home(self):
522 """ Cleans all NEPI related folders in the Linux host
524 self.info("Cleaning up home")
526 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
527 .format(self.home_dir)
529 return self.execute(cmd, with_lock = True)
531 def clean_experiment(self):
532 """ Cleans all experiment related files in the Linux host.
533 It preserves NEPI files and folders that have a multi experiment
536 self.info("Cleaning up experiment files")
538 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
539 .format(self.exp_dir, self.ec.exp_id)
541 return self.execute(cmd, with_lock = True)
543 def execute(self, command,
549 connect_timeout = 30,
550 strict_host_checking = False,
555 """ Notice that this invocation will block until the
556 execution finishes. If this is not the desired behavior,
557 use 'run' instead."""
560 (out, err), proc = execfuncs.lexec(
562 user = self.get("username"), # still problem with localhost
567 # If the execute command is blocking, we don't want to keep
568 # the node lock. This lock is used to avoid race conditions
569 # when creating the ControlMaster sockets. A more elegant
570 # solution is needed.
571 with self._node_lock:
572 (out, err), proc = sshfuncs.rexec(
574 host = self.get("hostname"),
575 user = self.get("username"),
576 port = self.get("port"),
577 gwuser = self.get("gatewayUser"),
578 gw = self.get("gateway"),
581 identity = self.get("identity"),
582 server_key = self.get("serverKey"),
585 forward_x11 = forward_x11,
587 connect_timeout = connect_timeout,
588 persistent = persistent,
590 strict_host_checking = strict_host_checking
593 (out, err), proc = sshfuncs.rexec(
595 host = self.get("hostname"),
596 user = self.get("username"),
597 port = self.get("port"),
598 gwuser = self.get("gatewayUser"),
599 gw = self.get("gateway"),
602 identity = self.get("identity"),
603 server_key = self.get("serverKey"),
606 forward_x11 = forward_x11,
608 connect_timeout = connect_timeout,
609 persistent = persistent,
611 strict_host_checking = strict_host_checking
614 return (out, err), proc
616 def run(self, command, home,
624 strict_host_checking = False):
626 self.debug("Running command '{}'".format(command))
629 (out, err), proc = execfuncs.lspawn(
632 create_home = create_home,
633 stdin = stdin or '/dev/null',
634 stdout = stdout or '/dev/null',
635 stderr = stderr or '/dev/null',
638 with self._node_lock:
639 (out, err), proc = sshfuncs.rspawn(
643 create_home = create_home,
644 stdin = stdin or '/dev/null',
645 stdout = stdout or '/dev/null',
646 stderr = stderr or '/dev/null',
648 host = self.get("hostname"),
649 user = self.get("username"),
650 port = self.get("port"),
651 gwuser = self.get("gatewayUser"),
652 gw = self.get("gateway"),
654 identity = self.get("identity"),
655 server_key = self.get("serverKey"),
657 strict_host_checking = strict_host_checking
660 return (out, err), proc
662 def getpid(self, home, pidfile = "pidfile"):
664 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
666 with self._node_lock:
667 pidtuple = sshfuncs.rgetpid(
668 os.path.join(home, pidfile),
669 host = self.get("hostname"),
670 user = self.get("username"),
671 port = self.get("port"),
672 gwuser = self.get("gatewayUser"),
673 gw = self.get("gateway"),
675 identity = self.get("identity"),
676 server_key = self.get("serverKey"),
677 strict_host_checking = False
682 def status(self, pid, ppid):
684 status = execfuncs.lstatus(pid, ppid)
686 with self._node_lock:
687 status = sshfuncs.rstatus(
689 host = self.get("hostname"),
690 user = self.get("username"),
691 port = self.get("port"),
692 gwuser = self.get("gatewayUser"),
693 gw = self.get("gateway"),
695 identity = self.get("identity"),
696 server_key = self.get("serverKey"),
697 strict_host_checking = False
702 def kill(self, pid, ppid, sudo = False):
705 status = self.status(pid, ppid)
707 if status == sshfuncs.ProcStatus.RUNNING:
709 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
711 with self._node_lock:
712 (out, err), proc = sshfuncs.rkill(
714 host = self.get("hostname"),
715 user = self.get("username"),
716 port = self.get("port"),
717 gwuser = self.get("gatewayUser"),
718 gw = self.get("gateway"),
721 identity = self.get("identity"),
722 server_key = self.get("serverKey"),
723 strict_host_checking = False
726 return (out, err), proc
728 def copy(self, src, dst):
730 (out, err), proc = execfuncs.lcopy(
734 with self._node_lock:
735 (out, err), proc = sshfuncs.rcopy(
737 port = self.get("port"),
738 gwuser = self.get("gatewayUser"),
739 gw = self.get("gateway"),
740 identity = self.get("identity"),
741 server_key = self.get("serverKey"),
743 strict_host_checking = False)
745 return (out, err), proc
747 def upload(self, src, dst, text = False, overwrite = True,
748 raise_on_error = True):
749 """ Copy content to destination
751 src string with the content to copy. Can be:
753 - a string with the path to a local file
754 - a string with a semi-colon separeted list of local files
755 - a string with a local directory
757 dst string with destination path on the remote host (remote is
760 text src is text input, it must be stored into a temp file before
763 # If source is a string input
765 if text and not os.path.isfile(src):
766 # src is text input that should be uploaded as file
767 # create a temporal file with the content to upload
768 # in python3 we need to open in binary mode if str is bytes
769 mode = 'w' if isinstance(src, str) else 'wb'
770 f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
775 # If dst files should not be overwritten, check that the files do not
777 if isinstance(src, str):
778 src = list(map(str.strip, src.split(";")))
780 if overwrite == False:
781 src = self.filter_existing_files(src, dst)
783 return ("", ""), None
785 if not self.localhost:
786 # Build destination as <user>@<server>:<path>
787 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
789 ((out, err), proc) = self.copy(src, dst)
796 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
797 self.error(msg, out, err)
799 msg = "{} out: {} err: {}".format(msg, out, err)
801 raise RuntimeError(msg)
803 return ((out, err), proc)
805 def download(self, src, dst, raise_on_error = True):
806 if not self.localhost:
807 # Build destination as <user>@<server>:<path>
808 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
810 ((out, err), proc) = self.copy(src, dst)
813 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
814 self.error(msg, out, err)
817 raise RuntimeError(msg)
819 return ((out, err), proc)
821 def install_packages_command(self, packages):
824 command = rpmfuncs.install_packages_command(self.os, packages)
826 command = debfuncs.install_packages_command(self.os, packages)
828 msg = "Error installing packages ( OS not known ) "
829 self.error(msg, self.os)
830 raise RuntimeError(msg)
834 def install_packages(self, packages, home,
836 raise_on_error = True):
837 """ Install packages in the Linux host.
839 'home' is the directory to upload the package installation script.
840 'run_home' is the directory from where to execute the script.
842 command = self.install_packages_command(packages)
844 run_home = run_home or home
846 (out, err), proc = self.run_and_wait(command, run_home,
847 shfile = os.path.join(home, "instpkg.sh"),
848 pidfile = "instpkg_pidfile",
849 ecodefile = "instpkg_exitcode",
850 stdout = "instpkg_stdout",
851 stderr = "instpkg_stderr",
853 raise_on_error = raise_on_error)
855 return (out, err), proc
857 def remove_packages(self, packages, home, run_home = None,
858 raise_on_error = True):
859 """ Uninstall packages from the Linux host.
861 'home' is the directory to upload the package un-installation script.
862 'run_home' is the directory from where to execute the script.
865 command = rpmfuncs.remove_packages_command(self.os, packages)
867 command = debfuncs.remove_packages_command(self.os, packages)
869 msg = "Error removing packages ( OS not known ) "
871 raise RuntimeError(msg)
873 run_home = run_home or home
875 (out, err), proc = self.run_and_wait(command, run_home,
876 shfile = os.path.join(home, "rmpkg.sh"),
877 pidfile = "rmpkg_pidfile",
878 ecodefile = "rmpkg_exitcode",
879 stdout = "rmpkg_stdout",
880 stderr = "rmpkg_stderr",
882 raise_on_error = raise_on_error)
884 return (out, err), proc
886 def mkdir(self, paths, clean = False):
887 """ Paths is either a single remote directory path to create,
888 or a list of directories to create.
893 if isinstance(paths, str):
896 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
898 return self.execute(cmd, with_lock = True)
900 def rmdir(self, paths):
901 """ Paths is either a single remote directory path to delete,
902 or a list of directories to delete.
905 if isinstance(paths, str):
908 cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
910 return self.execute(cmd, with_lock = True)
912 def run_and_wait(self, command, home,
918 ecodefile="exitcode",
924 raise_on_error=True):
926 Uploads the 'command' to a bash script in the host.
927 Then runs the script detached in background in the host, and
928 busy-waites until the script finishes executing.
931 if not shfile.startswith("/"):
932 shfile = os.path.join(home, shfile)
934 self.upload_command(command,
936 ecodefile = ecodefile,
938 overwrite = overwrite)
940 command = "bash {}".format(shfile)
941 # run command in background in remote host
942 (out, err), proc = self.run(command, home,
950 # check no errors occurred
952 msg = " Failed to run command '{}' ".format(command)
953 self.error(msg, out, err)
955 raise RuntimeError(msg)
957 # Wait for pid file to be generated
958 pid, ppid = self.wait_pid(
961 raise_on_error = raise_on_error)
964 # wait until command finishes to execute
965 self.wait_run(pid, ppid)
967 (eout, err), proc = self.check_errors(home,
968 ecodefile = ecodefile,
971 # Out is what was written in the stderr file
973 msg = " Failed to run command '{}' ".format(command)
974 self.error(msg, eout, err)
977 raise RuntimeError(msg)
979 (out, oerr), proc = self.check_output(home, stdout)
981 return (out, err), proc
983 def exitcode(self, home, ecodefile = "exitcode"):
985 Get the exit code of an application.
986 Returns an integer value with the exit code
988 (out, err), proc = self.check_output(home, ecodefile)
990 # Succeeded to open file, return exit code in the file
993 return int(out.strip())
995 # Error in the content of the file!
996 return ExitCode.CORRUPTFILE
998 # No such file or directory
999 if proc.returncode == 1:
1000 return ExitCode.FILENOTFOUND
1002 # Other error from 'cat'
1003 return ExitCode.ERROR
1005 def upload_command(self, command,
1007 ecodefile="exitcode",
1010 """ Saves the command as a bash script file in the remote host, and
1011 forces to save the exit code of the command execution to the ecodefile
1014 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1017 # The exit code of the command will be stored in ecodefile
1018 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1019 .format(command=command, ecodefile=ecodefile)
1021 # Export environment
1022 environ = self.format_environment(env)
1024 # Add environ to command
1025 command = environ + command
1027 return self.upload(command, shfile, text=True, overwrite=overwrite)
1029 def format_environment(self, env, inline=False):
1030 """ Formats the environment variables for a command to be executed
1031 either as an inline command
1032 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1033 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1035 if not env: return ""
1037 # Remove extra white spaces
1038 env = re.sub(r'\s+', ' ', env.strip())
1040 sep = ";" if inline else "\n"
1041 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1043 def check_errors(self, home,
1044 ecodefile = "exitcode",
1046 """ Checks whether errors occurred while running a command.
1047 It first checks the exit code for the command, and only if the
1048 exit code is an error one it returns the error output.
1054 # get exit code saved in the 'exitcode' file
1055 ecode = self.exitcode(home, ecodefile)
1057 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1058 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1059 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1060 # The process returned an error code or didn't exist.
1061 # Check standard error.
1062 (err, eerr), proc = self.check_output(home, stderr)
1064 # If the stderr file was not found, assume nothing bad happened,
1065 # and just ignore the error.
1066 # (cat returns 1 for error "No such file or directory")
1067 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1070 return ("", err), proc
1072 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1073 """ Waits until the pid file for the command is generated,
1074 and returns the pid and ppid of the process """
1079 pidtuple = self.getpid(home = home, pidfile = pidfile)
1082 pid, ppid = pidtuple
1088 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1092 raise RuntimeError(msg)
1096 def wait_run(self, pid, ppid, trial = 0):
1097 """ wait for a remote process to finish execution """
1101 status = self.status(pid, ppid)
1103 if status is ProcStatus.FINISHED:
1105 elif status is not ProcStatus.RUNNING:
1108 # If it takes more than 20 seconds to start, then
1109 # asume something went wrong
1113 # The app is running, just wait...
1116 def check_output(self, home, filename):
1117 """ Retrives content of file """
1118 (out, err), proc = self.execute(
1119 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1120 return (out, err), proc
1123 """ Checks if host is responsive
1129 msg = "Unresponsive host. Wrong answer. "
1131 # The underlying SSH layer will sometimes return an empty
1132 # output (even if the command was executed without errors).
1133 # To work arround this, repeat the operation N times or
1134 # until the result is not empty string
1136 (out, err), proc = self.execute("echo 'ALIVE'",
1140 if out.find("ALIVE") > -1:
1143 trace = traceback.format_exc()
1144 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1146 self.error(msg, out, err)
1149 def find_home(self):
1151 Retrieves host home directory
1153 # The underlying SSH layer will sometimes return an empty
1154 # output (even if the command was executed without errors).
1155 # To work arround this, repeat the operation N times or
1156 # until the result is not empty string
1157 msg = "Impossible to retrieve HOME directory"
1159 (out, err), proc = self.execute("echo ${HOME}",
1163 if out.strip() != "":
1164 self._home_dir = out.strip()
1166 trace = traceback.format_exc()
1167 msg = "Impossible to retrieve HOME directory {}".format(trace)
1169 if not self._home_dir:
1171 raise RuntimeError(msg)
1173 def filter_existing_files(self, src, dst):
1174 """ Removes files that already exist in the Linux host from src list
1176 # construct a dictionary with { dst: src }
1177 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1178 if len(src) > 1 else {dst: src[0]}
1181 for d in list(dests.keys()):
1182 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1184 command = ";".join(command)
1186 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1188 for d in list(dests.keys()):
1189 if out.find(d) > -1:
1195 return list(dests.values())