2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!!
42 Error codes that the rexitcode function can return if unable to
43 check the exit code of a spawned process
52 Supported flavors of Linux OS
57 FEDORA_8 = 1 << 3 | FEDORA
58 FEDORA_12 = 1 << 4 | FEDORA
59 FEDORA_14 = 1 << 5 | FEDORA
62 class LinuxNode(ResourceManager):
64 .. class:: Class Args :
66 :param ec: The Experiment controller
67 :type ec: ExperimentController
68 :param guid: guid of the RM
73 There are different ways in which commands can be executed using the
74 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
79 * 'execute' (blocking mode) :
81 HOW IT WORKS: 'execute', forks a process and run the
82 command, synchronously, attached to the terminal, in
84 The execute method will block until the command returns
85 the result on 'out', 'err' (so until it finishes executing).
87 USAGE: short-lived commands that must be executed attached
88 to a terminal and in foreground, for which it IS necessary
89 to block until the command has finished (e.g. if you want
90 to run 'ls' or 'cat').
92 * 'execute' (NON blocking mode - blocking = False) :
94 HOW IT WORKS: Same as before, except that execute method
95 will return immediately (even if command still running).
97 USAGE: long-lived commands that must be executed attached
98 to a terminal and in foreground, but for which it is not
99 necessary to block until the command has finished. (e.g.
100 start an application using X11 forwarding)
104 HOW IT WORKS: Connects to the host ( using SSH if remote)
105 and launches the command in background, detached from any
106 terminal (daemonized), and returns. The command continues to
107 run remotely, but since it is detached from the terminal,
108 its pipes (stdin, stdout, stderr) can't be redirected to the
109 console (as normal non detached processes would), and so they
110 are explicitly redirected to files. The pidfile is created as
111 part of the process of launching the command. The pidfile
112 holds the pid and ppid of the process forked in background,
113 so later on it is possible to check whether the command is still
116 USAGE: long-lived commands that can run detached in background,
117 for which it is NOT necessary to block (wait) until the command
118 has finished. (e.g. start an application that is not using X11
119 forwarding. It can run detached and remotely in background)
123 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124 the command has finished execution. It also checks whether
125 errors occurred during runtime by reading the exitcode file,
126 which contains the exit code of the command that was run
127 (checking stderr only is not always reliable since many
128 commands throw debugging info to stderr and the only way to
129 automatically know whether an error really happened is to
130 check the process exit code).
132 Another difference with respect to 'run', is that instead
133 of directly executing the command as a bash command line,
134 it uploads the command to a bash script and runs the script.
135 This allows to use the bash script to debug errors, since
136 it remains at the remote host and can be run manually to
139 USAGE: medium-lived commands that can run detached in
140 background, for which it IS necessary to block (wait) until
141 the command has finished. (e.g. Package installation,
142 source compilation, file download, etc)
145 _rtype = "linux::Node"
146 _help = "Controls Linux host machines ( either localhost or a host " \
147 "that can be accessed using a SSH key)"
151 def _register_attributes(cls):
152 cls._register_attribute(Attribute(
153 "hostname", "Hostname of the machine",
154 flags = Flags.Design))
156 cls._register_attribute(Attribute(
157 "username", "Local account username",
158 flags = Flags.Credential))
160 cls._register_attribute(Attribute(
162 flags = Flags.Design))
164 cls._register_attribute(Attribute(
166 "Experiment home directory to store all experiment related files",
167 flags = Flags.Design))
169 cls._register_attribute(Attribute(
170 "identity", "SSH identity file",
171 flags = Flags.Credential))
173 cls._register_attribute(Attribute(
174 "serverKey", "Server public key",
175 flags = Flags.Design))
177 cls._register_attribute(Attribute(
179 "Remove all nepi files and directories "
180 " from node home folder before starting experiment",
183 flags = Flags.Design))
185 cls._register_attribute(Attribute(
186 "cleanExperiment", "Remove all files and directories "
187 " from a previous same experiment, before the new experiment starts",
190 flags = Flags.Design))
192 cls._register_attribute(Attribute(
194 "Kill all running processes before starting experiment",
197 flags = Flags.Design))
199 cls._register_attribute(Attribute(
200 "cleanProcessesAfter",
201 """Kill all running processes after starting experiment
202 This might be dangerous when using user root""",
205 flags = Flags.Design))
207 cls._register_attribute(Attribute(
209 "Bash script to be executed before releasing the resource",
210 flags = Flags.Design))
212 cls._register_attribute(Attribute(
214 "Gateway account username",
215 flags = Flags.Design))
217 cls._register_attribute(Attribute(
219 "Hostname of the gateway machine",
220 flags = Flags.Design))
222 cls._register_attribute(Attribute(
224 "Linux host public IP address. "
225 "Must not be modified by the user unless hostname is 'localhost'",
226 flags = Flags.Design))
228 def __init__(self, ec, guid):
229 super(LinuxNode, self).__init__(ec, guid)
231 # home directory at Linux host
234 # lock to prevent concurrent applications on the same node,
235 # to execute commands at the same time. There are potential
236 # concurrency issues when using SSH to a same host from
237 # multiple threads. There are also possible operational
238 # issues, e.g. an application querying the existence
239 # of a file or folder prior to its creation, and another
240 # application creating the same file or folder in between.
241 self._node_lock = threading.Lock()
243 def log_message(self, msg):
244 return " guid {} - host {} - {} "\
245 .format(self.guid, self.get("hostname"), msg)
249 home = self.get("home") or ""
250 if not home.startswith("/"):
251 home = os.path.join(self._home_dir, home)
256 return os.path.join(self.home_dir, ".nepi")
260 return os.path.join(self.nepi_home, "nepi-usr")
264 return os.path.join(self.usr_dir, "lib")
268 return os.path.join(self.usr_dir, "bin")
272 return os.path.join(self.usr_dir, "src")
276 return os.path.join(self.usr_dir, "share")
280 return os.path.join(self.nepi_home, "nepi-exp")
284 return os.path.join(self.exp_dir, self.ec.exp_id)
288 return os.path.join(self.exp_home, "node-{}".format(self.guid))
292 return os.path.join(self.node_home, self.ec.run_id)
299 if not self.localhost and not self.get("username"):
300 msg = "Can't resolve OS, insufficient data "
302 raise RuntimeError(msg)
306 if out.find("Debian") == 0:
307 self._os = OSType.DEBIAN
308 elif out.find("Ubuntu") ==0:
309 self._os = OSType.UBUNTU
310 elif out.find("Fedora release") == 0:
311 self._os = OSType.FEDORA
312 if out.find("Fedora release 8") == 0:
313 self._os = OSType.FEDORA_8
314 elif out.find("Fedora release 12") == 0:
315 self._os = OSType.FEDORA_12
316 elif out.find("Fedora release 14") == 0:
317 self._os = OSType.FEDORA_14
319 msg = "Unsupported OS"
321 raise RuntimeError("{} - {} ".format(msg, out))
326 # The underlying SSH layer will sometimes return an empty
327 # output (even if the command was executed without errors).
328 # To work arround this, repeat the operation N times or
329 # until the result is not empty string
332 (out, err), proc = self.execute("cat /etc/issue",
336 trace = traceback.format_exc()
337 msg = "Error detecting OS: {} ".format(trace)
338 self.error(msg, out, err)
344 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
348 return (self.os & OSType.FEDORA)
352 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
354 def do_provision(self):
355 # check if host is alive
356 if not self.is_alive():
357 trace = traceback.format_exc()
358 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
360 raise RuntimeError(msg)
364 if self.get("cleanProcesses"):
365 self.clean_processes()
367 if self.get("cleanHome"):
370 if self.get("cleanExperiment"):
371 self.clean_experiment()
373 # Create shared directory structure and node home directory
374 paths = [self.lib_dir,
382 # Get Public IP address if possible
383 if not self.get("ip"):
385 ip = sshfuncs.gethostbyname(self.get("hostname"))
388 if self.get("gateway") is None:
389 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
392 super(LinuxNode, self).do_provision()
395 if self.state == ResourceState.NEW:
396 self.info("Deploying node")
400 # Node needs to wait until all associated interfaces are
401 # ready before it can finalize deployment
402 from nepi.resources.linux.interface import LinuxInterface
403 ifaces = self.get_connected(LinuxInterface.get_rtype())
405 if iface.state < ResourceState.READY:
406 self.ec.schedule(self.reschedule_delay, self.deploy)
409 super(LinuxNode, self).do_deploy()
411 def do_release(self):
412 rms = self.get_connected()
414 # Node needs to wait until all associated RMs are released
415 # before it can be released
416 if rm.state != ResourceState.RELEASED:
417 self.ec.schedule(self.reschedule_delay, self.release)
420 tear_down = self.get("tearDown")
422 self.execute(tear_down)
424 if self.get("cleanProcessesAfter"):
425 self.clean_processes()
427 super(LinuxNode, self).do_release()
429 def valid_connection(self, guid):
433 def clean_processes(self):
434 self.info("Cleaning up processes")
439 if self.get("username") != 'root':
440 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
441 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
442 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
444 if self.state >= ResourceState.READY:
445 ########################
446 #Collect all process (must change for a more intelligent way)
449 avoid_pids = "ps axjf | awk '{print $1,$2}'"
450 (out, err), proc = self.execute(avoid_pids)
452 for line in out.strip().split("\n"):
453 parts = line.strip().split(" ")
454 ppid.append(parts[0])
455 pids.append(parts[1])
457 #Collect all process below ssh -D
460 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
461 (out, err), proc = self.execute(sshs)
463 for line in out.strip().split("\n"):
464 parts = line.strip().split(" ")
465 if parts[1].startswith('root@pts'):
466 ssh_pids.append(parts[0])
467 elif parts[1] == "-D":
468 tree_owner = parts[0]
472 #Search for the child process of the pid's collected at the first block.
473 for process in ssh_pids:
474 temp = self.search_for_child(process, pids, ppid)
475 avoid_kill = list(set(temp))
477 if len(avoid_kill) > 0:
478 avoid_kill.append(tree_owner)
479 ########################
482 with open("/tmp/save.proc", "rb") as pickle_file:
483 pids = pickle.load(pickle_file)
485 ps_aux = "ps aux | awk '{print $2,$11}'"
486 (out, err), proc = self.execute(ps_aux)
488 for line in out.strip().split("\n"):
489 parts = line.strip().split(" ")
490 pids_temp[parts[0]] = parts[1]
491 # creates the difference between the machine pids freezed (pickle) and the actual
492 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
493 # alive when using besides ssh connections
494 kill_pids = set(pids_temp.items()) - set(pids.items())
495 # py2/py3 : keep it simple
496 kill_pids = ' '.join(kill_pids)
498 # removing pids from beside connections and its process
499 kill_pids = kill_pids.split(' ')
500 kill_pids = list(set(kill_pids) - set(avoid_kill))
501 kill_pids = ' '.join(kill_pids)
503 cmd = ("killall tcpdump || /bin/true ; " +
504 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
505 "kill {} || /bin/true ; ".format(kill_pids))
507 cmd = ("killall tcpdump || /bin/true ; " +
508 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
510 cmd = ("killall tcpdump || /bin/true ; " +
511 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
513 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
515 def search_for_child(self, pid, pids, ppid, family=[]):
516 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
519 for key, value in enumerate(ppid):
522 self.search_for_child(child, pids, ppid)
525 def clean_home(self):
526 """ Cleans all NEPI related folders in the Linux host
528 self.info("Cleaning up home")
530 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
531 .format(self.home_dir)
533 return self.execute(cmd, with_lock = True)
535 def clean_experiment(self):
536 """ Cleans all experiment related files in the Linux host.
537 It preserves NEPI files and folders that have a multi experiment
540 self.info("Cleaning up experiment files")
542 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
543 .format(self.exp_dir, self.ec.exp_id)
545 return self.execute(cmd, with_lock = True)
547 def execute(self, command,
553 connect_timeout = 30,
554 strict_host_checking = False,
559 """ Notice that this invocation will block until the
560 execution finishes. If this is not the desired behavior,
561 use 'run' instead."""
564 (out, err), proc = execfuncs.lexec(
566 user = self.get("username"), # still problem with localhost
571 # If the execute command is blocking, we don't want to keep
572 # the node lock. This lock is used to avoid race conditions
573 # when creating the ControlMaster sockets. A more elegant
574 # solution is needed.
575 with self._node_lock:
576 (out, err), proc = sshfuncs.rexec(
578 host = self.get("hostname"),
579 user = self.get("username"),
580 port = self.get("port"),
581 gwuser = self.get("gatewayUser"),
582 gw = self.get("gateway"),
585 identity = self.get("identity"),
586 server_key = self.get("serverKey"),
589 forward_x11 = forward_x11,
591 connect_timeout = connect_timeout,
592 persistent = persistent,
594 strict_host_checking = strict_host_checking
597 (out, err), proc = sshfuncs.rexec(
599 host = self.get("hostname"),
600 user = self.get("username"),
601 port = self.get("port"),
602 gwuser = self.get("gatewayUser"),
603 gw = self.get("gateway"),
606 identity = self.get("identity"),
607 server_key = self.get("serverKey"),
610 forward_x11 = forward_x11,
612 connect_timeout = connect_timeout,
613 persistent = persistent,
615 strict_host_checking = strict_host_checking
618 return (out, err), proc
620 def run(self, command, home,
628 strict_host_checking = False):
630 self.debug("Running command '{}'".format(command))
633 (out, err), proc = execfuncs.lspawn(
636 create_home = create_home,
637 stdin = stdin or '/dev/null',
638 stdout = stdout or '/dev/null',
639 stderr = stderr or '/dev/null',
642 with self._node_lock:
643 (out, err), proc = sshfuncs.rspawn(
647 create_home = create_home,
648 stdin = stdin or '/dev/null',
649 stdout = stdout or '/dev/null',
650 stderr = stderr or '/dev/null',
652 host = self.get("hostname"),
653 user = self.get("username"),
654 port = self.get("port"),
655 gwuser = self.get("gatewayUser"),
656 gw = self.get("gateway"),
658 identity = self.get("identity"),
659 server_key = self.get("serverKey"),
661 strict_host_checking = strict_host_checking
664 return (out, err), proc
666 def getpid(self, home, pidfile = "pidfile"):
668 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
670 with self._node_lock:
671 pidtuple = sshfuncs.rgetpid(
672 os.path.join(home, pidfile),
673 host = self.get("hostname"),
674 user = self.get("username"),
675 port = self.get("port"),
676 gwuser = self.get("gatewayUser"),
677 gw = self.get("gateway"),
679 identity = self.get("identity"),
680 server_key = self.get("serverKey"),
681 strict_host_checking = False
686 def status(self, pid, ppid):
688 status = execfuncs.lstatus(pid, ppid)
690 with self._node_lock:
691 status = sshfuncs.rstatus(
693 host = self.get("hostname"),
694 user = self.get("username"),
695 port = self.get("port"),
696 gwuser = self.get("gatewayUser"),
697 gw = self.get("gateway"),
699 identity = self.get("identity"),
700 server_key = self.get("serverKey"),
701 strict_host_checking = False
706 def kill(self, pid, ppid, sudo = False):
709 status = self.status(pid, ppid)
711 if status == sshfuncs.ProcStatus.RUNNING:
713 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
715 with self._node_lock:
716 (out, err), proc = sshfuncs.rkill(
718 host = self.get("hostname"),
719 user = self.get("username"),
720 port = self.get("port"),
721 gwuser = self.get("gatewayUser"),
722 gw = self.get("gateway"),
725 identity = self.get("identity"),
726 server_key = self.get("serverKey"),
727 strict_host_checking = False
730 return (out, err), proc
732 def copy(self, src, dst):
734 (out, err), proc = execfuncs.lcopy(
738 with self._node_lock:
739 (out, err), proc = sshfuncs.rcopy(
741 port = self.get("port"),
742 gwuser = self.get("gatewayUser"),
743 gw = self.get("gateway"),
744 identity = self.get("identity"),
745 server_key = self.get("serverKey"),
747 strict_host_checking = False)
749 return (out, err), proc
751 def upload(self, src, dst, text = False, overwrite = True,
752 raise_on_error = True):
753 """ Copy content to destination
755 src string with the content to copy. Can be:
757 - a string with the path to a local file
758 - a string with a semi-colon separeted list of local files
759 - a string with a local directory
761 dst string with destination path on the remote host (remote is
764 text src is text input, it must be stored into a temp file before
767 # If source is a string input
769 if text and not os.path.isfile(src):
770 # src is text input that should be uploaded as file
771 # create a temporal file with the content to upload
772 # in python3 we need to open in binary mode if str is bytes
773 mode = 'w' if isinstance(src, str) else 'wb'
774 f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
779 # If dst files should not be overwritten, check that the files do not
781 if isinstance(src, str):
782 src = [s.strip() for s in src.split(";")]
784 if overwrite == False:
785 src = self.filter_existing_files(src, dst)
787 return ("", ""), None
789 if not self.localhost:
790 # Build destination as <user>@<server>:<path>
791 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
793 ((out, err), proc) = self.copy(src, dst)
800 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
801 self.error(msg, out, err)
803 msg = "{} out: {} err: {}".format(msg, out, err)
805 raise RuntimeError(msg)
807 return ((out, err), proc)
809 def download(self, src, dst, raise_on_error = True):
810 if not self.localhost:
811 # Build destination as <user>@<server>:<path>
812 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
814 ((out, err), proc) = self.copy(src, dst)
817 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
818 self.error(msg, out, err)
821 raise RuntimeError(msg)
823 return ((out, err), proc)
825 def install_packages_command(self, packages):
828 command = rpmfuncs.install_packages_command(self.os, packages)
830 command = debfuncs.install_packages_command(self.os, packages)
832 msg = "Error installing packages ( OS not known ) "
833 self.error(msg, self.os)
834 raise RuntimeError(msg)
838 def install_packages(self, packages, home,
840 raise_on_error = True):
841 """ Install packages in the Linux host.
843 'home' is the directory to upload the package installation script.
844 'run_home' is the directory from where to execute the script.
846 command = self.install_packages_command(packages)
848 run_home = run_home or home
850 (out, err), proc = self.run_and_wait(command, run_home,
851 shfile = os.path.join(home, "instpkg.sh"),
852 pidfile = "instpkg_pidfile",
853 ecodefile = "instpkg_exitcode",
854 stdout = "instpkg_stdout",
855 stderr = "instpkg_stderr",
857 raise_on_error = raise_on_error)
859 return (out, err), proc
861 def remove_packages(self, packages, home, run_home = None,
862 raise_on_error = True):
863 """ Uninstall packages from the Linux host.
865 'home' is the directory to upload the package un-installation script.
866 'run_home' is the directory from where to execute the script.
869 command = rpmfuncs.remove_packages_command(self.os, packages)
871 command = debfuncs.remove_packages_command(self.os, packages)
873 msg = "Error removing packages ( OS not known ) "
875 raise RuntimeError(msg)
877 run_home = run_home or home
879 (out, err), proc = self.run_and_wait(command, run_home,
880 shfile = os.path.join(home, "rmpkg.sh"),
881 pidfile = "rmpkg_pidfile",
882 ecodefile = "rmpkg_exitcode",
883 stdout = "rmpkg_stdout",
884 stderr = "rmpkg_stderr",
886 raise_on_error = raise_on_error)
888 return (out, err), proc
890 def mkdir(self, paths, clean = False):
891 """ Paths is either a single remote directory path to create,
892 or a list of directories to create.
897 if isinstance(paths, str):
900 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
902 return self.execute(cmd, with_lock = True)
904 def rmdir(self, paths):
905 """ Paths is either a single remote directory path to delete,
906 or a list of directories to delete.
909 if isinstance(paths, str):
912 cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
914 return self.execute(cmd, with_lock = True)
916 def run_and_wait(self, command, home,
922 ecodefile="exitcode",
928 raise_on_error=True):
930 Uploads the 'command' to a bash script in the host.
931 Then runs the script detached in background in the host, and
932 busy-waites until the script finishes executing.
935 if not shfile.startswith("/"):
936 shfile = os.path.join(home, shfile)
938 self.upload_command(command,
940 ecodefile = ecodefile,
942 overwrite = overwrite)
944 command = "bash {}".format(shfile)
945 # run command in background in remote host
946 (out, err), proc = self.run(command, home,
954 # check no errors occurred
956 msg = " Failed to run command '{}' ".format(command)
957 self.error(msg, out, err)
959 raise RuntimeError(msg)
961 # Wait for pid file to be generated
962 pid, ppid = self.wait_pid(
965 raise_on_error = raise_on_error)
968 # wait until command finishes to execute
969 self.wait_run(pid, ppid)
971 (eout, err), proc = self.check_errors(home,
972 ecodefile = ecodefile,
975 # Out is what was written in the stderr file
977 msg = " Failed to run command '{}' ".format(command)
978 self.error(msg, eout, err)
981 raise RuntimeError(msg)
983 (out, oerr), proc = self.check_output(home, stdout)
985 return (out, err), proc
987 def exitcode(self, home, ecodefile = "exitcode"):
989 Get the exit code of an application.
990 Returns an integer value with the exit code
992 (out, err), proc = self.check_output(home, ecodefile)
994 # Succeeded to open file, return exit code in the file
997 return int(out.strip())
999 # Error in the content of the file!
1000 return ExitCode.CORRUPTFILE
1002 # No such file or directory
1003 if proc.returncode == 1:
1004 return ExitCode.FILENOTFOUND
1006 # Other error from 'cat'
1007 return ExitCode.ERROR
1009 def upload_command(self, command,
1011 ecodefile="exitcode",
1014 """ Saves the command as a bash script file in the remote host, and
1015 forces to save the exit code of the command execution to the ecodefile
1018 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1021 # The exit code of the command will be stored in ecodefile
1022 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1023 .format(command=command, ecodefile=ecodefile)
1025 # Export environment
1026 environ = self.format_environment(env)
1028 # Add environ to command
1029 command = environ + command
1031 return self.upload(command, shfile, text=True, overwrite=overwrite)
1033 def format_environment(self, env, inline=False):
1034 """ Formats the environment variables for a command to be executed
1035 either as an inline command
1036 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1037 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1039 if not env: return ""
1041 # Remove extra white spaces
1042 env = re.sub(r'\s+', ' ', env.strip())
1044 sep = ";" if inline else "\n"
1045 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1047 def check_errors(self, home,
1048 ecodefile = "exitcode",
1050 """ Checks whether errors occurred while running a command.
1051 It first checks the exit code for the command, and only if the
1052 exit code is an error one it returns the error output.
1058 # get exit code saved in the 'exitcode' file
1059 ecode = self.exitcode(home, ecodefile)
1061 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1062 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1063 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1064 # The process returned an error code or didn't exist.
1065 # Check standard error.
1066 (err, eerr), proc = self.check_output(home, stderr)
1068 # If the stderr file was not found, assume nothing bad happened,
1069 # and just ignore the error.
1070 # (cat returns 1 for error "No such file or directory")
1071 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1074 return ("", err), proc
1076 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1077 """ Waits until the pid file for the command is generated,
1078 and returns the pid and ppid of the process """
1083 pidtuple = self.getpid(home = home, pidfile = pidfile)
1086 pid, ppid = pidtuple
1092 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1096 raise RuntimeError(msg)
1100 def wait_run(self, pid, ppid, trial = 0):
1101 """ wait for a remote process to finish execution """
1105 status = self.status(pid, ppid)
1107 if status is ProcStatus.FINISHED:
1109 elif status is not ProcStatus.RUNNING:
1112 # If it takes more than 20 seconds to start, then
1113 # asume something went wrong
1117 # The app is running, just wait...
1120 def check_output(self, home, filename):
1121 """ Retrives content of file """
1122 (out, err), proc = self.execute(
1123 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1124 return (out, err), proc
1127 """ Checks if host is responsive
1133 msg = "Unresponsive host. Wrong answer. "
1135 # The underlying SSH layer will sometimes return an empty
1136 # output (even if the command was executed without errors).
1137 # To work arround this, repeat the operation N times or
1138 # until the result is not empty string
1140 (out, err), proc = self.execute("echo 'ALIVE'",
1144 if out.find("ALIVE") > -1:
1147 trace = traceback.format_exc()
1148 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1150 self.error(msg, out, err)
1153 def find_home(self):
1155 Retrieves host home directory
1157 # The underlying SSH layer will sometimes return an empty
1158 # output (even if the command was executed without errors).
1159 # To work arround this, repeat the operation N times or
1160 # until the result is not empty string
1161 msg = "Impossible to retrieve HOME directory"
1163 (out, err), proc = self.execute("echo ${HOME}",
1167 if out.strip() != "":
1168 self._home_dir = out.strip()
1170 trace = traceback.format_exc()
1171 msg = "Impossible to retrieve HOME directory {}".format(trace)
1173 if not self._home_dir:
1175 raise RuntimeError(msg)
1177 def filter_existing_files(self, src, dst):
1178 """ Removes files that already exist in the Linux host from src list
1180 # construct a dictionary with { dst: src }
1181 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1182 if len(src) > 1 else {dst: src[0]}
1186 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1188 command = ";".join(command)
1190 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1192 # avoid RuntimeError that would result from
1193 # changing loop subject during iteration
1194 keys = list(dests.keys())
1196 if out.find(d) > -1:
1202 retcod = dests.values()
1203 if PY3: retcod = list(retcod)