2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError(msg)
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 trace = traceback.format_exc()
356 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
358 raise RuntimeError(msg)
362 if self.get("cleanProcesses"):
363 self.clean_processes()
365 if self.get("cleanHome"):
368 if self.get("cleanExperiment"):
369 self.clean_experiment()
371 # Create shared directory structure and node home directory
372 paths = [self.lib_dir,
380 # Get Public IP address if possible
381 if not self.get("ip"):
383 ip = sshfuncs.gethostbyname(self.get("hostname"))
386 if self.get("gateway") is None:
387 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390 super(LinuxNode, self).do_provision()
393 if self.state == ResourceState.NEW:
394 self.info("Deploying node")
398 # Node needs to wait until all associated interfaces are
399 # ready before it can finalize deployment
400 from nepi.resources.linux.interface import LinuxInterface
401 ifaces = self.get_connected(LinuxInterface.get_rtype())
403 if iface.state < ResourceState.READY:
404 self.ec.schedule(self.reschedule_delay, self.deploy)
407 super(LinuxNode, self).do_deploy()
409 def do_release(self):
410 rms = self.get_connected()
412 # Node needs to wait until all associated RMs are released
413 # before it can be released
414 if rm.state != ResourceState.RELEASED:
415 self.ec.schedule(self.reschedule_delay, self.release)
418 tear_down = self.get("tearDown")
420 self.execute(tear_down)
422 if self.get("cleanProcessesAfter"):
423 self.clean_processes()
425 super(LinuxNode, self).do_release()
427 def valid_connection(self, guid):
431 def clean_processes(self):
432 self.info("Cleaning up processes")
437 if self.get("username") != 'root':
438 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
442 if self.state >= ResourceState.READY:
443 ########################
444 #Collect all process (must change for a more intelligent way)
447 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448 (out, err), proc = self.execute(avoid_pids)
450 for line in out.strip().split("\n"):
451 parts = line.strip().split(" ")
452 ppid.append(parts[0])
453 pids.append(parts[1])
455 #Collect all process below ssh -D
458 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459 (out, err), proc = self.execute(sshs)
461 for line in out.strip().split("\n"):
462 parts = line.strip().split(" ")
463 if parts[1].startswith('root@pts'):
464 ssh_pids.append(parts[0])
465 elif parts[1] == "-D":
466 tree_owner = parts[0]
470 #Search for the child process of the pid's collected at the first block.
471 for process in ssh_pids:
472 temp = self.search_for_child(process, pids, ppid)
473 avoid_kill = list(set(temp))
475 if len(avoid_kill) > 0:
476 avoid_kill.append(tree_owner)
477 ########################
480 with open("/tmp/save.proc", "rb") as pickle_file:
481 pids = pickle.load(pickle_file)
483 ps_aux = "ps aux | awk '{print $2,$11}'"
484 (out, err), proc = self.execute(ps_aux)
486 for line in out.strip().split("\n"):
487 parts = line.strip().split(" ")
488 pids_temp[parts[0]] = parts[1]
489 # creates the difference between the machine pids freezed (pickle) and the actual
490 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
491 # alive when using besides ssh connections
492 kill_pids = set(pids_temp.items()) - set(pids.items())
493 kill_pids = ' '.join(dict(kill_pids).keys())
495 # removing pids from beside connections and its process
496 kill_pids = kill_pids.split(' ')
497 kill_pids = list(set(kill_pids) - set(avoid_kill))
498 kill_pids = ' '.join(kill_pids)
500 cmd = ("killall tcpdump || /bin/true ; " +
501 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
502 "kill {} || /bin/true ; ".format(kill_pids))
504 cmd = ("killall tcpdump || /bin/true ; " +
505 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
507 cmd = ("killall tcpdump || /bin/true ; " +
508 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
510 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
512 def search_for_child(self, pid, pids, ppid, family=[]):
513 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
516 for key, value in enumerate(ppid):
519 self.search_for_child(child, pids, ppid)
522 def clean_home(self):
523 """ Cleans all NEPI related folders in the Linux host
525 self.info("Cleaning up home")
527 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
528 .format(self.home_dir)
530 return self.execute(cmd, with_lock = True)
532 def clean_experiment(self):
533 """ Cleans all experiment related files in the Linux host.
534 It preserves NEPI files and folders that have a multi experiment
537 self.info("Cleaning up experiment files")
539 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
540 .format(self.exp_dir, self.ec.exp_id)
542 return self.execute(cmd, with_lock = True)
544 def execute(self, command,
550 connect_timeout = 30,
551 strict_host_checking = False,
556 """ Notice that this invocation will block until the
557 execution finishes. If this is not the desired behavior,
558 use 'run' instead."""
561 (out, err), proc = execfuncs.lexec(
563 user = self.get("username"), # still problem with localhost
568 # If the execute command is blocking, we don't want to keep
569 # the node lock. This lock is used to avoid race conditions
570 # when creating the ControlMaster sockets. A more elegant
571 # solution is needed.
572 with self._node_lock:
573 (out, err), proc = sshfuncs.rexec(
575 host = self.get("hostname"),
576 user = self.get("username"),
577 port = self.get("port"),
578 gwuser = self.get("gatewayUser"),
579 gw = self.get("gateway"),
582 identity = self.get("identity"),
583 server_key = self.get("serverKey"),
586 forward_x11 = forward_x11,
588 connect_timeout = connect_timeout,
589 persistent = persistent,
591 strict_host_checking = strict_host_checking
594 (out, err), proc = sshfuncs.rexec(
596 host = self.get("hostname"),
597 user = self.get("username"),
598 port = self.get("port"),
599 gwuser = self.get("gatewayUser"),
600 gw = self.get("gateway"),
603 identity = self.get("identity"),
604 server_key = self.get("serverKey"),
607 forward_x11 = forward_x11,
609 connect_timeout = connect_timeout,
610 persistent = persistent,
612 strict_host_checking = strict_host_checking
615 return (out, err), proc
617 def run(self, command, home,
625 strict_host_checking = False):
627 self.debug("Running command '{}'".format(command))
630 (out, err), proc = execfuncs.lspawn(
633 create_home = create_home,
634 stdin = stdin or '/dev/null',
635 stdout = stdout or '/dev/null',
636 stderr = stderr or '/dev/null',
639 with self._node_lock:
640 (out, err), proc = sshfuncs.rspawn(
644 create_home = create_home,
645 stdin = stdin or '/dev/null',
646 stdout = stdout or '/dev/null',
647 stderr = stderr or '/dev/null',
649 host = self.get("hostname"),
650 user = self.get("username"),
651 port = self.get("port"),
652 gwuser = self.get("gatewayUser"),
653 gw = self.get("gateway"),
655 identity = self.get("identity"),
656 server_key = self.get("serverKey"),
658 strict_host_checking = strict_host_checking
661 return (out, err), proc
663 def getpid(self, home, pidfile = "pidfile"):
665 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
667 with self._node_lock:
668 pidtuple = sshfuncs.rgetpid(
669 os.path.join(home, pidfile),
670 host = self.get("hostname"),
671 user = self.get("username"),
672 port = self.get("port"),
673 gwuser = self.get("gatewayUser"),
674 gw = self.get("gateway"),
676 identity = self.get("identity"),
677 server_key = self.get("serverKey"),
678 strict_host_checking = False
683 def status(self, pid, ppid):
685 status = execfuncs.lstatus(pid, ppid)
687 with self._node_lock:
688 status = sshfuncs.rstatus(
690 host = self.get("hostname"),
691 user = self.get("username"),
692 port = self.get("port"),
693 gwuser = self.get("gatewayUser"),
694 gw = self.get("gateway"),
696 identity = self.get("identity"),
697 server_key = self.get("serverKey"),
698 strict_host_checking = False
703 def kill(self, pid, ppid, sudo = False):
706 status = self.status(pid, ppid)
708 if status == sshfuncs.ProcStatus.RUNNING:
710 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
712 with self._node_lock:
713 (out, err), proc = sshfuncs.rkill(
715 host = self.get("hostname"),
716 user = self.get("username"),
717 port = self.get("port"),
718 gwuser = self.get("gatewayUser"),
719 gw = self.get("gateway"),
722 identity = self.get("identity"),
723 server_key = self.get("serverKey"),
724 strict_host_checking = False
727 return (out, err), proc
729 def copy(self, src, dst):
731 (out, err), proc = execfuncs.lcopy(
735 with self._node_lock:
736 (out, err), proc = sshfuncs.rcopy(
738 port = self.get("port"),
739 gwuser = self.get("gatewayUser"),
740 gw = self.get("gateway"),
741 identity = self.get("identity"),
742 server_key = self.get("serverKey"),
744 strict_host_checking = False)
746 return (out, err), proc
748 def upload(self, src, dst, text = False, overwrite = True,
749 raise_on_error = True):
750 """ Copy content to destination
752 src string with the content to copy. Can be:
754 - a string with the path to a local file
755 - a string with a semi-colon separeted list of local files
756 - a string with a local directory
758 dst string with destination path on the remote host (remote is
761 text src is text input, it must be stored into a temp file before
764 # If source is a string input
766 if text and not os.path.isfile(src):
767 # src is text input that should be uploaded as file
768 # create a temporal file with the content to upload
769 f = tempfile.NamedTemporaryFile(delete=False)
774 # If dst files should not be overwritten, check that the files do not
776 if isinstance(src, str):
777 src = map(str.strip, src.split(";"))
779 if overwrite == False:
780 src = self.filter_existing_files(src, dst)
782 return ("", ""), None
784 if not self.localhost:
785 # Build destination as <user>@<server>:<path>
786 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
788 ((out, err), proc) = self.copy(src, dst)
795 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
796 self.error(msg, out, err)
798 msg = "{} out: {} err: {}".format(msg, out, err)
800 raise RuntimeError(msg)
802 return ((out, err), proc)
804 def download(self, src, dst, raise_on_error = True):
805 if not self.localhost:
806 # Build destination as <user>@<server>:<path>
807 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
809 ((out, err), proc) = self.copy(src, dst)
812 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
813 self.error(msg, out, err)
816 raise RuntimeError(msg)
818 return ((out, err), proc)
820 def install_packages_command(self, packages):
823 command = rpmfuncs.install_packages_command(self.os, packages)
825 command = debfuncs.install_packages_command(self.os, packages)
827 msg = "Error installing packages ( OS not known ) "
828 self.error(msg, self.os)
829 raise RuntimeError(msg)
833 def install_packages(self, packages, home,
835 raise_on_error = True):
836 """ Install packages in the Linux host.
838 'home' is the directory to upload the package installation script.
839 'run_home' is the directory from where to execute the script.
841 command = self.install_packages_command(packages)
843 run_home = run_home or home
845 (out, err), proc = self.run_and_wait(command, run_home,
846 shfile = os.path.join(home, "instpkg.sh"),
847 pidfile = "instpkg_pidfile",
848 ecodefile = "instpkg_exitcode",
849 stdout = "instpkg_stdout",
850 stderr = "instpkg_stderr",
852 raise_on_error = raise_on_error)
854 return (out, err), proc
856 def remove_packages(self, packages, home, run_home = None,
857 raise_on_error = True):
858 """ Uninstall packages from the Linux host.
860 'home' is the directory to upload the package un-installation script.
861 'run_home' is the directory from where to execute the script.
864 command = rpmfuncs.remove_packages_command(self.os, packages)
866 command = debfuncs.remove_packages_command(self.os, packages)
868 msg = "Error removing packages ( OS not known ) "
870 raise RuntimeError(msg)
872 run_home = run_home or home
874 (out, err), proc = self.run_and_wait(command, run_home,
875 shfile = os.path.join(home, "rmpkg.sh"),
876 pidfile = "rmpkg_pidfile",
877 ecodefile = "rmpkg_exitcode",
878 stdout = "rmpkg_stdout",
879 stderr = "rmpkg_stderr",
881 raise_on_error = raise_on_error)
883 return (out, err), proc
885 def mkdir(self, paths, clean = False):
886 """ Paths is either a single remote directory path to create,
887 or a list of directories to create.
892 if isinstance(paths, str):
895 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
897 return self.execute(cmd, with_lock = True)
899 def rmdir(self, paths):
900 """ Paths is either a single remote directory path to delete,
901 or a list of directories to delete.
904 if isinstance(paths, str):
907 cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
909 return self.execute(cmd, with_lock = True)
911 def run_and_wait(self, command, home,
917 ecodefile="exitcode",
923 raise_on_error=True):
925 Uploads the 'command' to a bash script in the host.
926 Then runs the script detached in background in the host, and
927 busy-waites until the script finishes executing.
930 if not shfile.startswith("/"):
931 shfile = os.path.join(home, shfile)
933 self.upload_command(command,
935 ecodefile = ecodefile,
937 overwrite = overwrite)
939 command = "bash {}".format(shfile)
940 # run command in background in remote host
941 (out, err), proc = self.run(command, home,
949 # check no errors occurred
951 msg = " Failed to run command '{}' ".format(command)
952 self.error(msg, out, err)
954 raise RuntimeError(msg)
956 # Wait for pid file to be generated
957 pid, ppid = self.wait_pid(
960 raise_on_error = raise_on_error)
963 # wait until command finishes to execute
964 self.wait_run(pid, ppid)
966 (eout, err), proc = self.check_errors(home,
967 ecodefile = ecodefile,
970 # Out is what was written in the stderr file
972 msg = " Failed to run command '{}' ".format(command)
973 self.error(msg, eout, err)
976 raise RuntimeError(msg)
978 (out, oerr), proc = self.check_output(home, stdout)
980 return (out, err), proc
982 def exitcode(self, home, ecodefile = "exitcode"):
984 Get the exit code of an application.
985 Returns an integer value with the exit code
987 (out, err), proc = self.check_output(home, ecodefile)
989 # Succeeded to open file, return exit code in the file
992 return int(out.strip())
994 # Error in the content of the file!
995 return ExitCode.CORRUPTFILE
997 # No such file or directory
998 if proc.returncode == 1:
999 return ExitCode.FILENOTFOUND
1001 # Other error from 'cat'
1002 return ExitCode.ERROR
1004 def upload_command(self, command,
1006 ecodefile="exitcode",
1009 """ Saves the command as a bash script file in the remote host, and
1010 forces to save the exit code of the command execution to the ecodefile
1013 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1016 # The exit code of the command will be stored in ecodefile
1017 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1018 .format(command=command, ecodefile=ecodefile)
1020 # Export environment
1021 environ = self.format_environment(env)
1023 # Add environ to command
1024 command = environ + command
1026 return self.upload(command, shfile, text=True, overwrite=overwrite)
1028 def format_environment(self, env, inline=False):
1029 """ Formats the environment variables for a command to be executed
1030 either as an inline command
1031 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1032 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1034 if not env: return ""
1036 # Remove extra white spaces
1037 env = re.sub(r'\s+', ' ', env.strip())
1039 sep = ";" if inline else "\n"
1040 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1042 def check_errors(self, home,
1043 ecodefile = "exitcode",
1045 """ Checks whether errors occurred while running a command.
1046 It first checks the exit code for the command, and only if the
1047 exit code is an error one it returns the error output.
1053 # get exit code saved in the 'exitcode' file
1054 ecode = self.exitcode(home, ecodefile)
1056 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1057 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1058 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1059 # The process returned an error code or didn't exist.
1060 # Check standard error.
1061 (err, eerr), proc = self.check_output(home, stderr)
1063 # If the stderr file was not found, assume nothing bad happened,
1064 # and just ignore the error.
1065 # (cat returns 1 for error "No such file or directory")
1066 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1069 return ("", err), proc
1071 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1072 """ Waits until the pid file for the command is generated,
1073 and returns the pid and ppid of the process """
1078 pidtuple = self.getpid(home = home, pidfile = pidfile)
1081 pid, ppid = pidtuple
1087 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1091 raise RuntimeError(msg)
1095 def wait_run(self, pid, ppid, trial = 0):
1096 """ wait for a remote process to finish execution """
1100 status = self.status(pid, ppid)
1102 if status is ProcStatus.FINISHED:
1104 elif status is not ProcStatus.RUNNING:
1107 # If it takes more than 20 seconds to start, then
1108 # asume something went wrong
1112 # The app is running, just wait...
1115 def check_output(self, home, filename):
1116 """ Retrives content of file """
1117 (out, err), proc = self.execute(
1118 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1119 return (out, err), proc
1122 """ Checks if host is responsive
1128 msg = "Unresponsive host. Wrong answer. "
1130 # The underlying SSH layer will sometimes return an empty
1131 # output (even if the command was executed without errors).
1132 # To work arround this, repeat the operation N times or
1133 # until the result is not empty string
1135 (out, err), proc = self.execute("echo 'ALIVE'",
1139 if out.find("ALIVE") > -1:
1142 trace = traceback.format_exc()
1143 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1145 self.error(msg, out, err)
1148 def find_home(self):
1150 Retrieves host home directory
1152 # The underlying SSH layer will sometimes return an empty
1153 # output (even if the command was executed without errors).
1154 # To work arround this, repeat the operation N times or
1155 # until the result is not empty string
1156 msg = "Impossible to retrieve HOME directory"
1158 (out, err), proc = self.execute("echo ${HOME}",
1162 if out.strip() != "":
1163 self._home_dir = out.strip()
1165 trace = traceback.format_exc()
1166 msg = "Impossible to retrieve HOME directory {}".format(trace)
1168 if not self._home_dir:
1170 raise RuntimeError(msg)
1172 def filter_existing_files(self, src, dst):
1173 """ Removes files that already exist in the Linux host from src list
1175 # construct a dictionary with { dst: src }
1176 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1177 if len(src) > 1 else {dst: src[0]}
1180 for d in dests.keys():
1181 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1183 command = ";".join(command)
1185 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1187 for d in dests.keys():
1188 if out.find(d) > -1:
1194 return dests.values()