2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import (ResourceManager, clsinit_copy,
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
40 Error codes that the rexitcode function can return if unable to
41 check the exit code of a spawned process
50 Supported flavors of Linux OS
55 FEDORA_8 = 1 << 3 | FEDORA
56 FEDORA_12 = 1 << 4 | FEDORA
57 FEDORA_14 = 1 << 5 | FEDORA
60 class LinuxNode(ResourceManager):
62 .. class:: Class Args :
64 :param ec: The Experiment controller
65 :type ec: ExperimentController
66 :param guid: guid of the RM
71 There are different ways in which commands can be executed using the
72 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
77 * 'execute' (blocking mode) :
79 HOW IT WORKS: 'execute', forks a process and run the
80 command, synchronously, attached to the terminal, in
82 The execute method will block until the command returns
83 the result on 'out', 'err' (so until it finishes executing).
85 USAGE: short-lived commands that must be executed attached
86 to a terminal and in foreground, for which it IS necessary
87 to block until the command has finished (e.g. if you want
88 to run 'ls' or 'cat').
90 * 'execute' (NON blocking mode - blocking = False) :
92 HOW IT WORKS: Same as before, except that execute method
93 will return immediately (even if command still running).
95 USAGE: long-lived commands that must be executed attached
96 to a terminal and in foreground, but for which it is not
97 necessary to block until the command has finished. (e.g.
98 start an application using X11 forwarding)
102 HOW IT WORKS: Connects to the host ( using SSH if remote)
103 and launches the command in background, detached from any
104 terminal (daemonized), and returns. The command continues to
105 run remotely, but since it is detached from the terminal,
106 its pipes (stdin, stdout, stderr) can't be redirected to the
107 console (as normal non detached processes would), and so they
108 are explicitly redirected to files. The pidfile is created as
109 part of the process of launching the command. The pidfile
110 holds the pid and ppid of the process forked in background,
111 so later on it is possible to check whether the command is still
114 USAGE: long-lived commands that can run detached in background,
115 for which it is NOT necessary to block (wait) until the command
116 has finished. (e.g. start an application that is not using X11
117 forwarding. It can run detached and remotely in background)
121 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
122 the command has finished execution. It also checks whether
123 errors occurred during runtime by reading the exitcode file,
124 which contains the exit code of the command that was run
125 (checking stderr only is not always reliable since many
126 commands throw debugging info to stderr and the only way to
127 automatically know whether an error really happened is to
128 check the process exit code).
130 Another difference with respect to 'run', is that instead
131 of directly executing the command as a bash command line,
132 it uploads the command to a bash script and runs the script.
133 This allows to use the bash script to debug errors, since
134 it remains at the remote host and can be run manually to
137 USAGE: medium-lived commands that can run detached in
138 background, for which it IS necessary to block (wait) until
139 the command has finished. (e.g. Package installation,
140 source compilation, file download, etc)
143 _rtype = "linux::Node"
144 _help = "Controls Linux host machines ( either localhost or a host " \
145 "that can be accessed using a SSH key)"
149 def _register_attributes(cls):
150 cls._register_attribute(Attribute(
151 "hostname", "Hostname of the machine",
152 flags = Flags.Design))
154 cls._register_attribute(Attribute(
155 "username", "Local account username",
156 flags = Flags.Credential))
158 cls._register_attribute(Attribute(
160 flags = Flags.Design))
162 cls._register_attribute(Attribute(
164 "Experiment home directory to store all experiment related files",
165 flags = Flags.Design))
167 cls._register_attribute(Attribute(
168 "identity", "SSH identity file",
169 flags = Flags.Credential))
171 cls._register_attribute(Attribute(
172 "serverKey", "Server public key",
173 flags = Flags.Design))
175 cls._register_attribute(Attribute(
177 "Remove all nepi files and directories "
178 " from node home folder before starting experiment",
181 flags = Flags.Design))
183 cls._register_attribute(Attribute(
184 "cleanExperiment", "Remove all files and directories "
185 " from a previous same experiment, before the new experiment starts",
188 flags = Flags.Design))
190 cls._register_attribute(Attribute(
192 "Kill all running processes before starting experiment",
195 flags = Flags.Design))
197 cls._register_attribute(Attribute(
198 "cleanProcessesAfter",
199 """Kill all running processes after starting experiment
200 This might be dangerous when using user root""",
203 flags = Flags.Design))
205 cls._register_attribute(Attribute(
207 "Bash script to be executed before releasing the resource",
208 flags = Flags.Design))
210 cls._register_attribute(Attribute(
212 "Gateway account username",
213 flags = Flags.Design))
215 cls._register_attribute(Attribute(
217 "Hostname of the gateway machine",
218 flags = Flags.Design))
220 cls._register_attribute(Attribute(
222 "Linux host public IP address. "
223 "Must not be modified by the user unless hostname is 'localhost'",
224 flags = Flags.Design))
226 def __init__(self, ec, guid):
227 super(LinuxNode, self).__init__(ec, guid)
229 # home directory at Linux host
232 # lock to prevent concurrent applications on the same node,
233 # to execute commands at the same time. There are potential
234 # concurrency issues when using SSH to a same host from
235 # multiple threads. There are also possible operational
236 # issues, e.g. an application querying the existence
237 # of a file or folder prior to its creation, and another
238 # application creating the same file or folder in between.
239 self._node_lock = threading.Lock()
241 def log_message(self, msg):
242 return " guid {} - host {} - {} "\
243 .format(self.guid, self.get("hostname"), msg)
247 home = self.get("home") or ""
248 if not home.startswith("/"):
249 home = os.path.join(self._home_dir, home)
254 return os.path.join(self.home_dir, ".nepi")
258 return os.path.join(self.nepi_home, "nepi-usr")
262 return os.path.join(self.usr_dir, "lib")
266 return os.path.join(self.usr_dir, "bin")
270 return os.path.join(self.usr_dir, "src")
274 return os.path.join(self.usr_dir, "share")
278 return os.path.join(self.nepi_home, "nepi-exp")
282 return os.path.join(self.exp_dir, self.ec.exp_id)
286 return os.path.join(self.exp_home, "node-{}".format(self.guid))
290 return os.path.join(self.node_home, self.ec.run_id)
297 if not self.localhost and not self.get("username"):
298 msg = "Can't resolve OS, insufficient data "
300 raise RuntimeError, msg
304 if out.find("Debian") == 0:
305 self._os = OSType.DEBIAN
306 elif out.find("Ubuntu") ==0:
307 self._os = OSType.UBUNTU
308 elif out.find("Fedora release") == 0:
309 self._os = OSType.FEDORA
310 if out.find("Fedora release 8") == 0:
311 self._os = OSType.FEDORA_8
312 elif out.find("Fedora release 12") == 0:
313 self._os = OSType.FEDORA_12
314 elif out.find("Fedora release 14") == 0:
315 self._os = OSType.FEDORA_14
317 msg = "Unsupported OS"
319 raise RuntimeError("{} - {} ".format(msg, out))
324 # The underlying SSH layer will sometimes return an empty
325 # output (even if the command was executed without errors).
326 # To work arround this, repeat the operation N times or
327 # until the result is not empty string
330 (out, err), proc = self.execute("cat /etc/issue",
334 trace = traceback.format_exc()
335 msg = "Error detecting OS: {} ".format(trace)
336 self.error(msg, out, err)
342 return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
346 return (self.os & OSType.FEDORA)
350 return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
352 def do_provision(self):
353 # check if host is alive
354 if not self.is_alive():
355 trace = traceback.format_exc()
356 msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
358 raise RuntimeError, msg
362 if self.get("cleanProcesses"):
363 self.clean_processes()
365 if self.get("cleanHome"):
368 if self.get("cleanExperiment"):
369 self.clean_experiment()
371 # Create shared directory structure and node home directory
372 paths = [self.lib_dir,
380 # Get Public IP address if possible
381 if not self.get("ip"):
383 ip = sshfuncs.gethostbyname(self.get("hostname"))
386 if self.get("gateway") is None:
387 msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
390 super(LinuxNode, self).do_provision()
393 if self.state == ResourceState.NEW:
394 self.info("Deploying node")
398 # Node needs to wait until all associated interfaces are
399 # ready before it can finalize deployment
400 from nepi.resources.linux.interface import LinuxInterface
401 ifaces = self.get_connected(LinuxInterface.get_rtype())
403 if iface.state < ResourceState.READY:
404 self.ec.schedule(self.reschedule_delay, self.deploy)
407 super(LinuxNode, self).do_deploy()
409 def do_release(self):
410 rms = self.get_connected()
412 # Node needs to wait until all associated RMs are released
413 # before it can be released
414 if rm.state != ResourceState.RELEASED:
415 self.ec.schedule(self.reschedule_delay, self.release)
418 tear_down = self.get("tearDown")
420 self.execute(tear_down)
422 if self.get("cleanProcessesAfter"):
423 self.clean_processes()
425 super(LinuxNode, self).do_release()
427 def valid_connection(self, guid):
431 def clean_processes(self):
432 self.info("Cleaning up processes")
437 if self.get("username") != 'root':
438 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
439 "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
440 "sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
442 if self.state >= ResourceState.READY:
443 ########################
444 #Collect all process (must change for a more intelligent way)
447 avoid_pids = "ps axjf | awk '{print $1,$2}'"
448 (out, err), proc = self.execute(avoid_pids)
450 for line in out.strip().split("\n"):
451 parts = line.strip().split(" ")
452 ppid.append(parts[0])
453 pids.append(parts[1])
455 #Collect all process below ssh -D
458 sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
459 (out, err), proc = self.execute(sshs)
461 for line in out.strip().split("\n"):
462 parts = line.strip().split(" ")
463 if parts[1].startswith('root@pts'):
464 ssh_pids.append(parts[0])
465 elif parts[1] == "-D":
466 tree_owner = parts[0]
470 #Search for the child process of the pid's collected at the first block.
471 for process in ssh_pids:
472 temp = self.search_for_child(process, pids, ppid)
473 avoid_kill = list(set(temp))
475 if len(avoid_kill) > 0:
476 avoid_kill.append(tree_owner)
477 ########################
480 pids = pickle.load(open("/tmp/save.proc", "rb"))
482 ps_aux = "ps aux | awk '{print $2,$11}'"
483 (out, err), proc = self.execute(ps_aux)
485 for line in out.strip().split("\n"):
486 parts = line.strip().split(" ")
487 pids_temp[parts[0]] = parts[1]
488 # creates the difference between the machine pids freezed (pickle) and the actual
489 # adding the avoided pids filtered above (avoid_kill) to allow users keep process
490 # alive when using besides ssh connections
491 kill_pids = set(pids_temp.items()) - set(pids.items())
492 kill_pids = ' '.join(dict(kill_pids).keys())
494 # removing pids from beside connections and its process
495 kill_pids = kill_pids.split(' ')
496 kill_pids = list(set(kill_pids) - set(avoid_kill))
497 kill_pids = ' '.join(kill_pids)
499 cmd = ("killall tcpdump || /bin/true ; " +
500 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
501 "kill {} || /bin/true ; ".format(kill_pids))
503 cmd = ("killall tcpdump || /bin/true ; " +
504 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
506 cmd = ("killall tcpdump || /bin/true ; " +
507 "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
509 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
511 def search_for_child(self, pid, pids, ppid, family=[]):
512 """ Recursive function to search for child. List A contains the pids and list B the parents (ppid)
515 for key, value in enumerate(ppid):
518 self.search_for_child(child, pids, ppid)
521 def clean_home(self):
522 """ Cleans all NEPI related folders in the Linux host
524 self.info("Cleaning up home")
526 cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
527 .format(self.home_dir)
529 return self.execute(cmd, with_lock = True)
531 def clean_experiment(self):
532 """ Cleans all experiment related files in the Linux host.
533 It preserves NEPI files and folders that have a multi experiment
536 self.info("Cleaning up experiment files")
538 cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
539 .format(self.exp_dir, self.ec.exp_id)
541 return self.execute(cmd, with_lock = True)
543 def execute(self, command,
549 connect_timeout = 30,
550 strict_host_checking = False,
555 """ Notice that this invocation will block until the
556 execution finishes. If this is not the desired behavior,
557 use 'run' instead."""
560 (out, err), proc = execfuncs.lexec(
562 user = self.get("username"), # still problem with localhost
567 # If the execute command is blocking, we don't want to keep
568 # the node lock. This lock is used to avoid race conditions
569 # when creating the ControlMaster sockets. A more elegant
570 # solution is needed.
571 with self._node_lock:
572 (out, err), proc = sshfuncs.rexec(
574 host = self.get("hostname"),
575 user = self.get("username"),
576 port = self.get("port"),
577 gwuser = self.get("gatewayUser"),
578 gw = self.get("gateway"),
581 identity = self.get("identity"),
582 server_key = self.get("serverKey"),
585 forward_x11 = forward_x11,
587 connect_timeout = connect_timeout,
588 persistent = persistent,
590 strict_host_checking = strict_host_checking
593 (out, err), proc = sshfuncs.rexec(
595 host = self.get("hostname"),
596 user = self.get("username"),
597 port = self.get("port"),
598 gwuser = self.get("gatewayUser"),
599 gw = self.get("gateway"),
602 identity = self.get("identity"),
603 server_key = self.get("serverKey"),
606 forward_x11 = forward_x11,
608 connect_timeout = connect_timeout,
609 persistent = persistent,
611 strict_host_checking = strict_host_checking
614 return (out, err), proc
616 def run(self, command, home,
624 strict_host_checking = False):
626 self.debug("Running command '{}'".format(command))
629 (out, err), proc = execfuncs.lspawn(
632 create_home = create_home,
633 stdin = stdin or '/dev/null',
634 stdout = stdout or '/dev/null',
635 stderr = stderr or '/dev/null',
638 with self._node_lock:
639 (out, err), proc = sshfuncs.rspawn(
643 create_home = create_home,
644 stdin = stdin or '/dev/null',
645 stdout = stdout or '/dev/null',
646 stderr = stderr or '/dev/null',
648 host = self.get("hostname"),
649 user = self.get("username"),
650 port = self.get("port"),
651 gwuser = self.get("gatewayUser"),
652 gw = self.get("gateway"),
654 identity = self.get("identity"),
655 server_key = self.get("serverKey"),
657 strict_host_checking = strict_host_checking
660 return (out, err), proc
662 def getpid(self, home, pidfile = "pidfile"):
664 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
666 with self._node_lock:
667 pidtuple = sshfuncs.rgetpid(
668 os.path.join(home, pidfile),
669 host = self.get("hostname"),
670 user = self.get("username"),
671 port = self.get("port"),
672 gwuser = self.get("gatewayUser"),
673 gw = self.get("gateway"),
675 identity = self.get("identity"),
676 server_key = self.get("serverKey"),
677 strict_host_checking = False
682 def status(self, pid, ppid):
684 status = execfuncs.lstatus(pid, ppid)
686 with self._node_lock:
687 status = sshfuncs.rstatus(
689 host = self.get("hostname"),
690 user = self.get("username"),
691 port = self.get("port"),
692 gwuser = self.get("gatewayUser"),
693 gw = self.get("gateway"),
695 identity = self.get("identity"),
696 server_key = self.get("serverKey"),
697 strict_host_checking = False
702 def kill(self, pid, ppid, sudo = False):
705 status = self.status(pid, ppid)
707 if status == sshfuncs.ProcStatus.RUNNING:
709 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
711 with self._node_lock:
712 (out, err), proc = sshfuncs.rkill(
714 host = self.get("hostname"),
715 user = self.get("username"),
716 port = self.get("port"),
717 gwuser = self.get("gatewayUser"),
718 gw = self.get("gateway"),
721 identity = self.get("identity"),
722 server_key = self.get("serverKey"),
723 strict_host_checking = False
726 return (out, err), proc
728 def copy(self, src, dst):
730 (out, err), proc = execfuncs.lcopy(
734 with self._node_lock:
735 (out, err), proc = sshfuncs.rcopy(
737 port = self.get("port"),
738 gwuser = self.get("gatewayUser"),
739 gw = self.get("gateway"),
740 identity = self.get("identity"),
741 server_key = self.get("serverKey"),
743 strict_host_checking = False)
745 return (out, err), proc
747 def upload(self, src, dst, text = False, overwrite = True,
748 raise_on_error = True):
749 """ Copy content to destination
751 src string with the content to copy. Can be:
753 - a string with the path to a local file
754 - a string with a semi-colon separeted list of local files
755 - a string with a local directory
757 dst string with destination path on the remote host (remote is
760 text src is text input, it must be stored into a temp file before
763 # If source is a string input
765 if text and not os.path.isfile(src):
766 # src is text input that should be uploaded as file
767 # create a temporal file with the content to upload
768 f = tempfile.NamedTemporaryFile(delete=False)
773 # If dst files should not be overwritten, check that the files do not
775 if isinstance(src, str):
776 src = map(str.strip, src.split(";"))
778 if overwrite == False:
779 src = self.filter_existing_files(src, dst)
781 return ("", ""), None
783 if not self.localhost:
784 # Build destination as <user>@<server>:<path>
785 dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
787 ((out, err), proc) = self.copy(src, dst)
794 msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
795 self.error(msg, out, err)
797 msg = "{} out: {} err: {}".format(msg, out, err)
799 raise RuntimeError, msg
801 return ((out, err), proc)
803 def download(self, src, dst, raise_on_error = True):
804 if not self.localhost:
805 # Build destination as <user>@<server>:<path>
806 src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
808 ((out, err), proc) = self.copy(src, dst)
811 msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
812 self.error(msg, out, err)
815 raise RuntimeError, msg
817 return ((out, err), proc)
819 def install_packages_command(self, packages):
822 command = rpmfuncs.install_packages_command(self.os, packages)
824 command = debfuncs.install_packages_command(self.os, packages)
826 msg = "Error installing packages ( OS not known ) "
827 self.error(msg, self.os)
828 raise RuntimeError, msg
832 def install_packages(self, packages, home,
834 raise_on_error = True):
835 """ Install packages in the Linux host.
837 'home' is the directory to upload the package installation script.
838 'run_home' is the directory from where to execute the script.
840 command = self.install_packages_command(packages)
842 run_home = run_home or home
844 (out, err), proc = self.run_and_wait(command, run_home,
845 shfile = os.path.join(home, "instpkg.sh"),
846 pidfile = "instpkg_pidfile",
847 ecodefile = "instpkg_exitcode",
848 stdout = "instpkg_stdout",
849 stderr = "instpkg_stderr",
851 raise_on_error = raise_on_error)
853 return (out, err), proc
855 def remove_packages(self, packages, home, run_home = None,
856 raise_on_error = True):
857 """ Uninstall packages from the Linux host.
859 'home' is the directory to upload the package un-installation script.
860 'run_home' is the directory from where to execute the script.
863 command = rpmfuncs.remove_packages_command(self.os, packages)
865 command = debfuncs.remove_packages_command(self.os, packages)
867 msg = "Error removing packages ( OS not known ) "
869 raise RuntimeError, msg
871 run_home = run_home or home
873 (out, err), proc = self.run_and_wait(command, run_home,
874 shfile = os.path.join(home, "rmpkg.sh"),
875 pidfile = "rmpkg_pidfile",
876 ecodefile = "rmpkg_exitcode",
877 stdout = "rmpkg_stdout",
878 stderr = "rmpkg_stderr",
880 raise_on_error = raise_on_error)
882 return (out, err), proc
884 def mkdir(self, paths, clean = False):
885 """ Paths is either a single remote directory path to create,
886 or a list of directories to create.
891 if isinstance(paths, str):
894 cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
896 return self.execute(cmd, with_lock = True)
898 def rmdir(self, paths):
899 """ Paths is either a single remote directory path to delete,
900 or a list of directories to delete.
903 if isinstance(paths, str):
906 cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths))
908 return self.execute(cmd, with_lock = True)
910 def run_and_wait(self, command, home,
916 ecodefile="exitcode",
922 raise_on_error=True):
924 Uploads the 'command' to a bash script in the host.
925 Then runs the script detached in background in the host, and
926 busy-waites until the script finishes executing.
929 if not shfile.startswith("/"):
930 shfile = os.path.join(home, shfile)
932 self.upload_command(command,
934 ecodefile = ecodefile,
936 overwrite = overwrite)
938 command = "bash {}".format(shfile)
939 # run command in background in remote host
940 (out, err), proc = self.run(command, home,
948 # check no errors occurred
950 msg = " Failed to run command '{}' ".format(command)
951 self.error(msg, out, err)
953 raise RuntimeError, msg
955 # Wait for pid file to be generated
956 pid, ppid = self.wait_pid(
959 raise_on_error = raise_on_error)
962 # wait until command finishes to execute
963 self.wait_run(pid, ppid)
965 (eout, err), proc = self.check_errors(home,
966 ecodefile = ecodefile,
969 # Out is what was written in the stderr file
971 msg = " Failed to run command '{}' ".format(command)
972 self.error(msg, eout, err)
975 raise RuntimeError, msg
977 (out, oerr), proc = self.check_output(home, stdout)
979 return (out, err), proc
981 def exitcode(self, home, ecodefile = "exitcode"):
983 Get the exit code of an application.
984 Returns an integer value with the exit code
986 (out, err), proc = self.check_output(home, ecodefile)
988 # Succeeded to open file, return exit code in the file
991 return int(out.strip())
993 # Error in the content of the file!
994 return ExitCode.CORRUPTFILE
996 # No such file or directory
997 if proc.returncode == 1:
998 return ExitCode.FILENOTFOUND
1000 # Other error from 'cat'
1001 return ExitCode.ERROR
1003 def upload_command(self, command,
1005 ecodefile="exitcode",
1008 """ Saves the command as a bash script file in the remote host, and
1009 forces to save the exit code of the command execution to the ecodefile
1012 if not (command.strip().endswith(";") or command.strip().endswith("&")):
1015 # The exit code of the command will be stored in ecodefile
1016 command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
1017 .format(command=command, ecodefile=ecodefile)
1019 # Export environment
1020 environ = self.format_environment(env)
1022 # Add environ to command
1023 command = environ + command
1025 return self.upload(command, shfile, text=True, overwrite=overwrite)
1027 def format_environment(self, env, inline=False):
1028 """ Formats the environment variables for a command to be executed
1029 either as an inline command
1030 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
1031 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
1033 if not env: return ""
1035 # Remove extra white spaces
1036 env = re.sub(r'\s+', ' ', env.strip())
1038 sep = ";" if inline else "\n"
1039 return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
1041 def check_errors(self, home,
1042 ecodefile = "exitcode",
1044 """ Checks whether errors occurred while running a command.
1045 It first checks the exit code for the command, and only if the
1046 exit code is an error one it returns the error output.
1052 # get exit code saved in the 'exitcode' file
1053 ecode = self.exitcode(home, ecodefile)
1055 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
1056 err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
1057 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
1058 # The process returned an error code or didn't exist.
1059 # Check standard error.
1060 (err, eerr), proc = self.check_output(home, stderr)
1062 # If the stderr file was not found, assume nothing bad happened,
1063 # and just ignore the error.
1064 # (cat returns 1 for error "No such file or directory")
1065 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
1068 return ("", err), proc
1070 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
1071 """ Waits until the pid file for the command is generated,
1072 and returns the pid and ppid of the process """
1077 pidtuple = self.getpid(home = home, pidfile = pidfile)
1080 pid, ppid = pidtuple
1086 msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
1090 raise RuntimeError, msg
1094 def wait_run(self, pid, ppid, trial = 0):
1095 """ wait for a remote process to finish execution """
1099 status = self.status(pid, ppid)
1101 if status is ProcStatus.FINISHED:
1103 elif status is not ProcStatus.RUNNING:
1106 # If it takes more than 20 seconds to start, then
1107 # asume something went wrong
1111 # The app is running, just wait...
1114 def check_output(self, home, filename):
1115 """ Retrives content of file """
1116 (out, err), proc = self.execute(
1117 "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
1118 return (out, err), proc
1121 """ Checks if host is responsive
1127 msg = "Unresponsive host. Wrong answer. "
1129 # The underlying SSH layer will sometimes return an empty
1130 # output (even if the command was executed without errors).
1131 # To work arround this, repeat the operation N times or
1132 # until the result is not empty string
1134 (out, err), proc = self.execute("echo 'ALIVE'",
1138 if out.find("ALIVE") > -1:
1141 trace = traceback.format_exc()
1142 msg = "Unresponsive host. Error reaching host: {} ".format(trace)
1144 self.error(msg, out, err)
1147 def find_home(self):
1149 Retrieves host home directory
1151 # The underlying SSH layer will sometimes return an empty
1152 # output (even if the command was executed without errors).
1153 # To work arround this, repeat the operation N times or
1154 # until the result is not empty string
1155 msg = "Impossible to retrieve HOME directory"
1157 (out, err), proc = self.execute("echo ${HOME}",
1161 if out.strip() != "":
1162 self._home_dir = out.strip()
1164 trace = traceback.format_exc()
1165 msg = "Impossible to retrieve HOME directory {}".format(trace)
1167 if not self._home_dir:
1169 raise RuntimeError, msg
1171 def filter_existing_files(self, src, dst):
1172 """ Removes files that already exist in the Linux host from src list
1174 # construct a dictionary with { dst: src }
1175 dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
1176 if len(src) > 1 else {dst: src[0]}
1179 for d in dests.keys():
1180 command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
1182 command = ";".join(command)
1184 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1186 for d in dests.keys():
1187 if out.find(d) > -1:
1193 return dests.values()