2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
38 # TODO: Unify delays!!
39 # TODO: Validate outcome of uploads!!
43 Error codes that the rexitcode function can return if unable to
44 check the exit code of a spawned process
53 Supported flavors of Linux OS
58 FEDORA_8 = 1 << 3 | FEDORA
59 FEDORA_12 = 1 << 4 | FEDORA
60 FEDORA_14 = 1 << 5 | FEDORA
63 class LinuxNode(ResourceManager):
65 .. class:: Class Args :
67 :param ec: The Experiment controller
68 :type ec: ExperimentController
69 :param guid: guid of the RM
74 There are different ways in which commands can be executed using the
75 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
80 * 'execute' (blocking mode) :
82 HOW IT WORKS: 'execute', forks a process and run the
83 command, synchronously, attached to the terminal, in
85 The execute method will block until the command returns
86 the result on 'out', 'err' (so until it finishes executing).
88 USAGE: short-lived commands that must be executed attached
89 to a terminal and in foreground, for which it IS necessary
90 to block until the command has finished (e.g. if you want
91 to run 'ls' or 'cat').
93 * 'execute' (NON blocking mode - blocking = False) :
95 HOW IT WORKS: Same as before, except that execute method
96 will return immediately (even if command still running).
98 USAGE: long-lived commands that must be executed attached
99 to a terminal and in foreground, but for which it is not
100 necessary to block until the command has finished. (e.g.
101 start an application using X11 forwarding)
105 HOW IT WORKS: Connects to the host ( using SSH if remote)
106 and launches the command in background, detached from any
107 terminal (daemonized), and returns. The command continues to
108 run remotely, but since it is detached from the terminal,
109 its pipes (stdin, stdout, stderr) can't be redirected to the
110 console (as normal non detached processes would), and so they
111 are explicitly redirected to files. The pidfile is created as
112 part of the process of launching the command. The pidfile
113 holds the pid and ppid of the process forked in background,
114 so later on it is possible to check whether the command is still
117 USAGE: long-lived commands that can run detached in background,
118 for which it is NOT necessary to block (wait) until the command
119 has finished. (e.g. start an application that is not using X11
120 forwarding. It can run detached and remotely in background)
124 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
125 the command has finished execution. It also checks whether
126 errors occurred during runtime by reading the exitcode file,
127 which contains the exit code of the command that was run
128 (checking stderr only is not always reliable since many
129 commands throw debugging info to stderr and the only way to
130 automatically know whether an error really happened is to
131 check the process exit code).
133 Another difference with respect to 'run', is that instead
134 of directly executing the command as a bash command line,
135 it uploads the command to a bash script and runs the script.
136 This allows to use the bash script to debug errors, since
137 it remains at the remote host and can be run manually to
140 USAGE: medium-lived commands that can run detached in
141 background, for which it IS necessary to block (wait) until
142 the command has finished. (e.g. Package installation,
143 source compilation, file download, etc)
146 _rtype = "linux::Node"
147 _help = "Controls Linux host machines ( either localhost or a host " \
148 "that can be accessed using a SSH key)"
152 def _register_attributes(cls):
153 cls._register_attribute(
154 Attribute("hostname",
155 "Hostname of the machine",
156 flags = Flags.Design))
157 cls._register_attribute(
158 Attribute("username",
159 "Local account username",
160 flags = Flags.Credential))
161 cls._register_attribute(
164 flags = Flags.Design))
165 cls._register_attribute(
167 "Experiment home directory to store all experiment related files",
168 flags = Flags.Design))
169 cls._register_attribute(
170 Attribute("identity",
172 flags = Flags.Credential))
173 cls._register_attribute(
174 Attribute("serverKey",
176 flags = Flags.Design))
177 cls._register_attribute(
178 Attribute("cleanHome",
179 "Remove all nepi files and directories "
180 " from node home folder before starting experiment",
183 flags = Flags.Design))
184 cls._register_attribute(
185 Attribute("cleanExperiment",
186 "Remove all files and directories "
187 " from a previous same experiment, before the new experiment starts",
190 flags = Flags.Design))
191 cls._register_attribute(
192 Attribute("cleanProcesses",
193 "Kill all running processes before starting experiment",
196 flags = Flags.Design))
197 cls._register_attribute(
198 Attribute("cleanProcessesAfter",
199 "Kill all running processes after starting experiment"
200 "NOTE: This might be dangerous when using user root",
203 flags = Flags.Design))
204 cls._register_attribute(
205 Attribute("tearDown",
206 "Bash script to be executed before releasing the resource",
207 flags = Flags.Design))
208 cls._register_attribute(
209 Attribute("gatewayUser",
210 "Gateway account username",
211 flags = Flags.Design))
212 cls._register_attribute(
214 "Hostname of the gateway machine",
215 flags = Flags.Design))
216 cls._register_attribute(
218 "Linux host public IP address. "
219 "Must not be modified by the user unless hostname is 'localhost'",
220 flags = Flags.Design))
222 def __init__(self, ec, guid):
223 super(LinuxNode, self).__init__(ec, guid)
225 # home directory at Linux host
228 # lock to prevent concurrent applications on the same node,
229 # to execute commands at the same time. There are potential
230 # concurrency issues when using SSH to a same host from
231 # multiple threads. There are also possible operational
232 # issues, e.g. an application querying the existence
233 # of a file or folder prior to its creation, and another
234 # application creating the same file or folder in between.
235 self._node_lock = threading.Lock()
237 def log_message(self, msg):
238 return " guid {} - host {} - {} "\
239 .format(self.guid, self.get("hostname"), msg)
243 home = self.get("home") or ""
244 if not home.startswith("/"):
245 home = os.path.join(self._home_dir, home)
250 return os.path.join(self.home_dir, ".nepi")
254 return os.path.join(self.nepi_home, "nepi-usr")
258 return os.path.join(self.usr_dir, "lib")
262 return os.path.join(self.usr_dir, "bin")
266 return os.path.join(self.usr_dir, "src")
270 return os.path.join(self.usr_dir, "share")
274 return os.path.join(self.nepi_home, "nepi-exp")
278 return os.path.join(self.exp_dir, self.ec.exp_id)
282 return os.path.join(self.exp_home, "node-{}".format(self.guid))
286 return os.path.join(self.node_home, self.ec.run_id)
293 if not self.localhost and not self.get("username"):
294 msg = "Can't resolve OS, insufficient data "
296 raise RuntimeError(msg)
300 if out.find("Debian") == 0:
301 self._os = OSType.DEBIAN
302 elif out.find("Ubuntu") == 0:
303 self._os = OSType.UBUNTU
304 elif out.find("Fedora release") == 0:
305 self._os = OSType.FEDORA
306 if out.find("Fedora release 8") == 0:
307 self._os = OSType.FEDORA_8
308 elif out.find("Fedora release 12") == 0:
309 self._os = OSType.FEDORA_12
310 elif out.find("Fedora release 14") == 0:
311 self._os = OSType.FEDORA_14
313 msg = "Unsupported OS"
315 raise RuntimeError("{} - {} ".format(msg, out))
320 # The underlying SSH layer will sometimes return an empty
321 # output (even if the command was executed without errors).
322 # To work arround this, repeat the operation N times or
323 # until the result is not empty string
326 (out, err), proc = self.execute("cat /etc/issue",
330 trace = traceback.format_exc()
331 msg = "Error detecting OS: {} ".format(trace)
332 self.error(msg, out, err)
338 return (self.os & (OSType.DEBIAN | OSType.UBUNTU))
342 return (self.os & OSType.FEDORA)
346 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
348 def do_provision(self):
349 # check if host is alive
350 if not self.is_alive():
351 trace = traceback.format_exc()
352 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
354 raise RuntimeError(msg)
358 if self.get("cleanProcesses"):
359 self.clean_processes()
361 if self.get("cleanHome"):
364 if self.get("cleanExperiment"):
365 self.clean_experiment()
367 # Create shared directory structure and node home directory
368 paths = [self.lib_dir,
376 # Get Public IP address if possible
377 if not self.get("ip"):
379 ip = sshfuncs.gethostbyname(self.get("hostname"))
382 if self.get("gateway") is None:
383 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
386 super(LinuxNode, self).do_provision()
389 if self.state == ResourceState.NEW:
390 self.info("Deploying node")
394 # Node needs to wait until all associated interfaces are
395 # ready before it can finalize deployment
396 from nepi.resources.linux.interface import LinuxInterface
397 ifaces = self.get_connected(LinuxInterface.get_rtype())
399 if iface.state < ResourceState.READY:
400 self.ec.schedule(self.reschedule_delay, self.deploy)
403 super(LinuxNode, self).do_deploy()
405 def do_release(self):
406 rms = self.get_connected()
408 # Node needs to wait until all associated RMs are released
409 # before it can be released
410 if rm.state != ResourceState.RELEASED:
411 self.ec.schedule(self.reschedule_delay, self.release)
414 tear_down = self.get("tearDown")
416 self.execute(tear_down)
418 if self.get("cleanProcessesAfter"):
419 self.clean_processes()
421 super(LinuxNode, self).do_release()
423 def valid_connection(self, guid):
427 def clean_processes(self):
428 self.info("Cleaning up processes")
433 if self.get("username") != 'root':
434 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
435 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
436 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
438 if self.state >= ResourceState.READY:
439 ########################
440 #Collect all process (must change for a more intelligent way)
443 avoid_pids = "ps axjf | awk '{print $1,$2}'"
444 (out, err), proc = self.execute(avoid_pids)
446 for line in out.strip().split("\n"):
447 parts = line.strip().split(" ")
448 ppid.append(parts[0])
449 pids.append(parts[1])
451 #Collect all process below ssh -D
454 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
455 (out, err), proc = self.execute(sshs)
457 for line in out.strip().split("\n"):
458 parts = line.strip().split(" ")
459 if parts[1].startswith('root@pts'):
460 ssh_pids.append(parts[0])
461 elif parts[1] == "-D":
462 tree_owner = parts[0]
466 #Search for the child process of the pid's collected at the first block.
467 for process in ssh_pids:
468 temp = self.search_for_child(process, pids, ppid)
469 avoid_kill = list(set(temp))
471 if len(avoid_kill) > 0:
472 avoid_kill.append(tree_owner)
473 ########################
476 with open("/tmp/save.proc", "rb") as pickle_file:
477 pids = pickle.load(pickle_file)
479 ps_aux = "ps aux | awk '{print $2,$11}'"
480 (out, err), proc = self.execute(ps_aux)
482 for line in out.strip().split("\n"):
483 parts = line.strip().split(" ")
484 pids_temp[parts[0]] = parts[1]
485 # creates the difference between the machine pids freezed (pickle) and the actual
486 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
487 # alive when using besides ssh connections
488 kill_pids = set(pids_temp.items()) - set(pids.items())
489 # py2/py3 : keep it simple
490 kill_pids = ' '.join(kill_pids)
492 # removing pids from beside connections and its process
493 kill_pids = kill_pids.split(' ')
494 kill_pids = list(set(kill_pids) - set(avoid_kill))
495 kill_pids = ' '.join(kill_pids)
497 cmd = ("killall tcpdump || /bin/true ; " +
498 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
499 "kill {} || /bin/true ; ".format(kill_pids))
501 cmd = ("killall tcpdump || /bin/true ; " +
502 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
504 cmd = ("killall tcpdump || /bin/true ; " +
505 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
507 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
509 def search_for_child(self, pid, pids, ppid, family=None):
510 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
512 family = family if family is not None else []
514 for key, value in enumerate(ppid):
517 self.search_for_child(child, pids, ppid)
520 def clean_home(self):
521 """ Cleans all NEPI related folders in the Linux host
523 self.info("Cleaning up home")
525 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
526 .format(self.home_dir)
528 return self.execute(cmd, with_lock = True)
530 def clean_experiment(self):
531 """ Cleans all experiment related files in the Linux host.
532 It preserves NEPI files and folders that have a multi experiment
535 self.info("Cleaning up experiment files")
537 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
538 .format(self.exp_dir, self.ec.exp_id)
540 return self.execute(cmd, with_lock = True)
542 def execute(self, command,
548 connect_timeout = 30,
549 strict_host_checking = False,
554 """ Notice that this invocation will block until the
555 execution finishes. If this is not the desired behavior,
556 use 'run' instead."""
559 (out, err), proc = execfuncs.lexec(
561 user = self.get("username"), # still problem with localhost
566 # If the execute command is blocking, we don't want to keep
567 # the node lock. This lock is used to avoid race conditions
568 # when creating the ControlMaster sockets. A more elegant
569 # solution is needed.
570 with self._node_lock:
571 (out, err), proc = sshfuncs.rexec(
573 host = self.get("hostname"),
574 user = self.get("username"),
575 port = self.get("port"),
576 gwuser = self.get("gatewayUser"),
577 gw = self.get("gateway"),
580 identity = self.get("identity"),
581 server_key = self.get("serverKey"),
584 forward_x11 = forward_x11,
586 connect_timeout = connect_timeout,
587 persistent = persistent,
589 strict_host_checking = strict_host_checking
592 (out, err), proc = sshfuncs.rexec(
594 host = self.get("hostname"),
595 user = self.get("username"),
596 port = self.get("port"),
597 gwuser = self.get("gatewayUser"),
598 gw = self.get("gateway"),
601 identity = self.get("identity"),
602 server_key = self.get("serverKey"),
605 forward_x11 = forward_x11,
607 connect_timeout = connect_timeout,
608 persistent = persistent,
610 strict_host_checking = strict_host_checking
613 return (out, err), proc
615 def run(self, command, home,
623 strict_host_checking = False):
625 self.debug("Running command '{}'".format(command))
628 (out, err), proc = execfuncs.lspawn(
631 create_home = create_home,
632 stdin = stdin or '/dev/null',
633 stdout = stdout or '/dev/null',
634 stderr = stderr or '/dev/null',
637 with self._node_lock:
638 (out, err), proc = sshfuncs.rspawn(
642 create_home = create_home,
643 stdin = stdin or '/dev/null',
644 stdout = stdout or '/dev/null',
645 stderr = stderr or '/dev/null',
647 host = self.get("hostname"),
648 user = self.get("username"),
649 port = self.get("port"),
650 gwuser = self.get("gatewayUser"),
651 gw = self.get("gateway"),
653 identity = self.get("identity"),
654 server_key = self.get("serverKey"),
656 strict_host_checking = strict_host_checking
659 return (out, err), proc
661 def getpid(self, home, pidfile = "pidfile"):
663 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
665 with self._node_lock:
666 pidtuple = sshfuncs.rgetpid(
667 os.path.join(home, pidfile),
668 host = self.get("hostname"),
669 user = self.get("username"),
670 port = self.get("port"),
671 gwuser = self.get("gatewayUser"),
672 gw = self.get("gateway"),
674 identity = self.get("identity"),
675 server_key = self.get("serverKey"),
676 strict_host_checking = False
681 def status(self, pid, ppid):
683 status = execfuncs.lstatus(pid, ppid)
685 with self._node_lock:
686 status = sshfuncs.rstatus(
688 host = self.get("hostname"),
689 user = self.get("username"),
690 port = self.get("port"),
691 gwuser = self.get("gatewayUser"),
692 gw = self.get("gateway"),
694 identity = self.get("identity"),
695 server_key = self.get("serverKey"),
696 strict_host_checking = False
701 def kill(self, pid, ppid, sudo = False):
704 status = self.status(pid, ppid)
706 if status == sshfuncs.ProcStatus.RUNNING:
708 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
710 with self._node_lock:
711 (out, err), proc = sshfuncs.rkill(
713 host = self.get("hostname"),
714 user = self.get("username"),
715 port = self.get("port"),
716 gwuser = self.get("gatewayUser"),
717 gw = self.get("gateway"),
720 identity = self.get("identity"),
721 server_key = self.get("serverKey"),
722 strict_host_checking = False
725 return (out, err), proc
727 def copy(self, src, dst):
729 (out, err), proc = execfuncs.lcopy(
733 with self._node_lock:
734 (out, err), proc = sshfuncs.rcopy(
736 port = self.get("port"),
737 gwuser = self.get("gatewayUser"),
738 gw = self.get("gateway"),
739 identity = self.get("identity"),
740 server_key = self.get("serverKey"),
742 strict_host_checking = False)
744 return (out, err), proc
746 def upload(self, src, dst, text = False, overwrite = True,
747 raise_on_error = True, executable = False):
748 """ Copy content to destination
750 src string with the content to copy. Can be:
752 - a string with the path to a local file
753 - a string with a semi-colon separeted list of local files
754 - a string with a local directory
756 dst string with destination path on the remote host (remote is
759 when src is text input, it gets stored into a temp file before
760 uploading; in this case, and if executable is True, said temp file
761 is made executable, and thus uploaded file will be too
763 # If source is a string input
765 if text and not os.path.isfile(src):
766 # src is text input that should be uploaded as file
767 # create a temporal file with the content to upload
768 # in python3 we need to open in binary mode if str is bytes
769 mode = 'w' if isinstance(src, str) else 'wb'
770 f = tempfile.NamedTemporaryFile(mode = mode, delete = False)
774 # do something like chmod u+x
775 mode = os.stat(f.name).st_mode
777 os.chmod(f.name, mode)
781 # If dst files should not be overwritten, check that the files do not
783 if isinstance(src, str):
784 src = [s.strip() for s in src.split(";")]
786 if overwrite == False:
787 src = self.filter_existing_files(src, dst)
789 return ("", ""), None
791 if not self.localhost:
792 # Build destination as <user>@<server>:<path>
793 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
795 ((out, err), proc) = self.copy(src, dst)
802 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
803 self.error(msg, out, err)
805 msg = "{} out: {} err: {}".format(msg, out, err)
807 raise RuntimeError(msg)
809 return ((out, err), proc)
811 def download(self, src, dst, raise_on_error = True):
812 if not self.localhost:
813 # Build destination as <user>@<server>:<path>
814 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
816 ((out, err), proc) = self.copy(src, dst)
819 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
820 self.error(msg, out, err)
823 raise RuntimeError(msg)
825 return ((out, err), proc)
827 def install_packages_command(self, packages):
830 command = rpmfuncs.install_packages_command(self.os, packages)
832 command = debfuncs.install_packages_command(self.os, packages)
834 msg = "Error installing packages ( OS not known ) "
835 self.error(msg, self.os)
836 raise RuntimeError(msg)
840 def install_packages(self, packages, home,
842 raise_on_error = True):
843 """ Install packages in the Linux host.
845 'home' is the directory to upload the package installation script.
846 'run_home' is the directory from where to execute the script.
848 command = self.install_packages_command(packages)
850 run_home = run_home or home
852 (out, err), proc = self.run_and_wait(command, run_home,
853 shfile = os.path.join(home, "instpkg.sh"),
854 pidfile = "instpkg_pidfile",
855 ecodefile = "instpkg_exitcode",
856 stdout = "instpkg_stdout",
857 stderr = "instpkg_stderr",
859 raise_on_error = raise_on_error)
861 return (out, err), proc
863 def remove_packages(self, packages, home, run_home = None,
864 raise_on_error = True):
865 """ Uninstall packages from the Linux host.
867 'home' is the directory to upload the package un-installation script.
868 'run_home' is the directory from where to execute the script.
871 command = rpmfuncs.remove_packages_command(self.os, packages)
873 command = debfuncs.remove_packages_command(self.os, packages)
875 msg = "Error removing packages ( OS not known ) "
877 raise RuntimeError(msg)
879 run_home = run_home or home
881 (out, err), proc = self.run_and_wait(command, run_home,
882 shfile = os.path.join(home, "rmpkg.sh"),
883 pidfile = "rmpkg_pidfile",
884 ecodefile = "rmpkg_exitcode",
885 stdout = "rmpkg_stdout",
886 stderr = "rmpkg_stderr",
888 raise_on_error = raise_on_error)
890 return (out, err), proc
892 def mkdir(self, paths, clean = False):
893 """ Paths is either a single remote directory path to create,
894 or a list of directories to create.
899 if isinstance(paths, str):
902 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
904 return self.execute(cmd, with_lock = True)
906 def rmdir(self, paths):
907 """ Paths is either a single remote directory path to delete,
908 or a list of directories to delete.
911 if isinstance(paths, str):
914 cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
916 return self.execute(cmd, with_lock = True)
918 def run_and_wait(self, command, home,
924 ecodefile = "exitcode",
930 raise_on_error = True):
932 Uploads the 'command' to a bash script in the host.
933 Then runs the script detached in background in the host, and
934 busy-waites until the script finishes executing.
937 if not shfile.startswith("/"):
938 shfile = os.path.join(home, shfile)
940 self.upload_command(command,
942 ecodefile = ecodefile,
944 overwrite = overwrite)
946 command = "bash {}".format(shfile)
947 # run command in background in remote host
948 (out, err), proc = self.run(command, home,
956 # check no errors occurred
958 msg = " Failed to run command '{}' ".format(command)
959 self.error(msg, out, err)
961 raise RuntimeError(msg)
963 # Wait for pid file to be generated
964 pid, ppid = self.wait_pid(
967 raise_on_error = raise_on_error)
970 # wait until command finishes to execute
971 self.wait_run(pid, ppid)
973 (eout, err), proc = self.check_errors(home,
974 ecodefile = ecodefile,
977 # Out is what was written in the stderr file
979 msg = " Failed to run command '{}' ".format(command)
980 self.error(msg, eout, err)
983 raise RuntimeError(msg)
985 (out, oerr), proc = self.check_output(home, stdout)
987 return (out, err), proc
989 def exitcode(self, home, ecodefile = "exitcode"):
991 Get the exit code of an application.
992 Returns an integer value with the exit code
994 (out, err), proc = self.check_output(home, ecodefile)
996 # Succeeded to open file, return exit code in the file
999 return int(out.strip())
1001 # Error in the content of the file!
1002 return ExitCode.CORRUPTFILE
1004 # No such file or directory
1005 if proc.returncode == 1:
1006 return ExitCode.FILENOTFOUND
1008 # Other error from 'cat'
1009 return ExitCode.ERROR
1011 def upload_command(self, command,
1013 ecodefile = "exitcode",
1016 """ Saves the command as a bash script file in the remote host, and
1017 forces to save the exit code of the command execution to the ecodefile
1020 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1023 # The exit code of the command will be stored in ecodefile
1024 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1025 .format(command = command, ecodefile = ecodefile)
1027 # Export environment
1028 environ = self.format_environment(env)
1030 # Add environ to command
1031 command = environ + command
1033 return self.upload(command, shfile, text = True, overwrite = overwrite)
1035 def format_environment(self, env, inline = False):
1036 """ Formats the environment variables for a command to be executed
1037 either as an inline command
1038 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1039 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1041 if not env: return ""
1043 # Remove extra white spaces
1044 env = re.sub(r'\s+', ' ', env.strip())
1046 sep = ";" if inline else "\n"
1047 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1049 def check_errors(self, home,
1050 ecodefile = "exitcode",
1052 """ Checks whether errors occurred while running a command.
1053 It first checks the exit code for the command, and only if the
1054 exit code is an error one it returns the error output.
1060 # get exit code saved in the 'exitcode' file
1061 ecode = self.exitcode(home, ecodefile)
1063 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1064 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1065 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1066 # The process returned an error code or didn't exist.
1067 # Check standard error.
1068 (err, eerr), proc = self.check_output(home, stderr)
1070 # If the stderr file was not found, assume nothing bad happened,
1071 # and just ignore the error.
1072 # (cat returns 1 for error "No such file or directory")
1073 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1076 return ("", err), proc
1078 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1079 """ Waits until the pid file for the command is generated,
1080 and returns the pid and ppid of the process """
1085 pidtuple = self.getpid(home = home, pidfile = pidfile)
1088 pid, ppid = pidtuple
1094 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1098 raise RuntimeError(msg)
1102 def wait_run(self, pid, ppid, trial = 0):
1103 """ wait for a remote process to finish execution """
1107 status = self.status(pid, ppid)
1109 if status is ProcStatus.FINISHED:
1111 elif status is not ProcStatus.RUNNING:
1114 # If it takes more than 20 seconds to start, then
1115 # asume something went wrong
1119 # The app is running, just wait...
1122 def check_output(self, home, filename):
1123 """ Retrives content of file """
1124 (out, err), proc = self.execute(
1125 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1126 return (out, err), proc
1129 """ Checks if host is responsive
1135 msg = "Unresponsive host. Wrong answer. "
1137 # The underlying SSH layer will sometimes return an empty
1138 # output (even if the command was executed without errors).
1139 # To work arround this, repeat the operation N times or
1140 # until the result is not empty string
1142 (out, err), proc = self.execute("echo 'ALIVE'",
1146 if out.find("ALIVE") > -1:
1149 trace = traceback.format_exc()
1150 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1152 self.error(msg, out, err)
1155 def find_home(self):
1157 Retrieves host home directory
1159 # The underlying SSH layer will sometimes return an empty
1160 # output (even if the command was executed without errors).
1161 # To work arround this, repeat the operation N times or
1162 # until the result is not empty string
1163 msg = "Impossible to retrieve HOME directory"
1165 (out, err), proc = self.execute("echo ${HOME}",
1169 if out.strip() != "":
1170 self._home_dir = out.strip()
1172 trace = traceback.format_exc()
1173 msg = "Impossible to retrieve HOME directory {}".format(trace)
1175 if not self._home_dir:
1177 raise RuntimeError(msg)
1179 def filter_existing_files(self, src, dst):
1180 """ Removes files that already exist in the Linux host from src list
1182 # construct a dictionary with { dst: src }
1183 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1184 if len(src) > 1 else {dst: src[0]}
1188 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst = d) )
1190 command = ";".join(command)
1192 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1194 # avoid RuntimeError that would result from
1195 # changing loop subject during iteration
1196 keys = list(dests.keys())
1198 if out.find(d) > -1:
1204 retcod = dests.values()
1205 if PY3: retcod = list(retcod)