2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
41 Error codes that the rexitcode function can return if unable to
42 check the exit code of a spawned process
51 Supported flavors of Linux OS
61 class LinuxNode(ResourceManager):
63 .. class:: Class Args :
65 :param ec: The Experiment controller
66 :type ec: ExperimentController
67 :param guid: guid of the RM
72 There are different ways in which commands can be executed using the
73 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
78 * 'execute' (blocking mode) :
80 HOW IT WORKS: 'execute', forks a process and run the
81 command, synchronously, attached to the terminal, in
83 The execute method will block until the command returns
84 the result on 'out', 'err' (so until it finishes executing).
86 USAGE: short-lived commands that must be executed attached
87 to a terminal and in foreground, for which it IS necessary
88 to block until the command has finished (e.g. if you want
89 to run 'ls' or 'cat').
91 * 'execute' (NON blocking mode - blocking = False) :
93 HOW IT WORKS: Same as before, except that execute method
94 will return immediately (even if command still running).
96 USAGE: long-lived commands that must be executed attached
97 to a terminal and in foreground, but for which it is not
98 necessary to block until the command has finished. (e.g.
99 start an application using X11 forwarding)
103 HOW IT WORKS: Connects to the host ( using SSH if remote)
104 and launches the command in background, detached from any
105 terminal (daemonized), and returns. The command continues to
106 run remotely, but since it is detached from the terminal,
107 its pipes (stdin, stdout, stderr) can't be redirected to the
108 console (as normal non detached processes would), and so they
109 are explicitly redirected to files. The pidfile is created as
110 part of the process of launching the command. The pidfile
111 holds the pid and ppid of the process forked in background,
112 so later on it is possible to check whether the command is still
115 USAGE: long-lived commands that can run detached in background,
116 for which it is NOT necessary to block (wait) until the command
117 has finished. (e.g. start an application that is not using X11
118 forwarding. It can run detached and remotely in background)
122 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
123 the command has finished execution. It also checks whether
124 errors occurred during runtime by reading the exitcode file,
125 which contains the exit code of the command that was run
126 (checking stderr only is not always reliable since many
127 commands throw debugging info to stderr and the only way to
128 automatically know whether an error really happened is to
129 check the process exit code).
131 Another difference with respect to 'run', is that instead
132 of directly executing the command as a bash command line,
133 it uploads the command to a bash script and runs the script.
134 This allows to use the bash script to debug errors, since
135 it remains at the remote host and can be run manually to
138 USAGE: medium-lived commands that can run detached in
139 background, for which it IS necessary to block (wait) until
140 the command has finished. (e.g. Package installation,
141 source compilation, file download, etc)
145 _help = "Controls Linux host machines ( either localhost or a host " \
146 "that can be accessed using a SSH key)"
147 _backend_type = "linux"
150 def _register_attributes(cls):
151 hostname = Attribute("hostname", "Hostname of the machine",
152 flags = Flags.ExecReadOnly)
154 username = Attribute("username", "Local account username",
155 flags = Flags.Credential)
157 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
159 home = Attribute("home",
160 "Experiment home directory to store all experiment related files",
161 flags = Flags.ExecReadOnly)
163 identity = Attribute("identity", "SSH identity file",
164 flags = Flags.Credential)
166 server_key = Attribute("serverKey", "Server public key",
167 flags = Flags.ExecReadOnly)
169 clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
170 " from node home folder before starting experiment",
173 flags = Flags.ExecReadOnly)
175 clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
176 " from a previous same experiment, before the new experiment starts",
179 flags = Flags.ExecReadOnly)
181 clean_processes = Attribute("cleanProcesses",
182 "Kill all running processes before starting experiment",
185 flags = Flags.ExecReadOnly)
187 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
188 "releasing the resource",
189 flags = Flags.ExecReadOnly)
191 gateway_user = Attribute("gatewayUser", "Gateway account username",
192 flags = Flags.ExecReadOnly)
194 gateway = Attribute("gateway", "Hostname of the gateway machine",
195 flags = Flags.ExecReadOnly)
197 cls._register_attribute(hostname)
198 cls._register_attribute(username)
199 cls._register_attribute(port)
200 cls._register_attribute(home)
201 cls._register_attribute(identity)
202 cls._register_attribute(server_key)
203 cls._register_attribute(clean_home)
204 cls._register_attribute(clean_experiment)
205 cls._register_attribute(clean_processes)
206 cls._register_attribute(tear_down)
207 cls._register_attribute(gateway_user)
208 cls._register_attribute(gateway)
210 def __init__(self, ec, guid):
211 super(LinuxNode, self).__init__(ec, guid)
213 # home directory at Linux host
216 # list of pids before running the app if the user is root
219 # lock to prevent concurrent applications on the same node,
220 # to execute commands at the same time. There are potential
221 # concurrency issues when using SSH to a same host from
222 # multiple threads. There are also possible operational
223 # issues, e.g. an application querying the existence
224 # of a file or folder prior to its creation, and another
225 # application creating the same file or folder in between.
226 self._node_lock = threading.Lock()
228 def log_message(self, msg):
229 return " guid %d - host %s - %s " % (self.guid,
230 self.get("hostname"), msg)
234 home = self.get("home") or ""
235 if not home.startswith("/"):
236 home = os.path.join(self._home_dir, home)
241 return os.path.join(self.home_dir, "nepi-usr")
245 return os.path.join(self.usr_dir, "lib")
249 return os.path.join(self.usr_dir, "bin")
253 return os.path.join(self.usr_dir, "src")
257 return os.path.join(self.usr_dir, "share")
261 return os.path.join(self.home_dir, "nepi-exp")
265 return os.path.join(self.exp_dir, self.ec.exp_id)
269 return os.path.join(self.exp_home, "node-%d" % self.guid)
273 return os.path.join(self.node_home, self.ec.run_id)
280 if (not self.get("hostname") or not self.get("username")):
281 msg = "Can't resolve OS, insufficient data "
283 raise RuntimeError, msg
287 if out.find("Fedora release 8") == 0:
288 self._os = OSType.FEDORA_8
289 elif out.find("Fedora release 12") == 0:
290 self._os = OSType.FEDORA_12
291 elif out.find("Fedora release 14") == 0:
292 self._os = OSType.FEDORA_14
293 elif out.find("Fedora release") == 0:
294 self._os = OSType.FEDORA
295 elif out.find("Debian") == 0:
296 self._os = OSType.DEBIAN
297 elif out.find("Ubuntu") ==0:
298 self._os = OSType.UBUNTU
300 msg = "Unsupported OS"
302 raise RuntimeError, "%s - %s " %( msg, out )
307 # The underlying SSH layer will sometimes return an empty
308 # output (even if the command was executed without errors).
309 # To work arround this, repeat the operation N times or
310 # until the result is not empty string
313 (out, err), proc = self.execute("cat /etc/issue",
317 trace = traceback.format_exc()
318 msg = "Error detecting OS: %s " % trace
319 self.error(msg, out, err)
325 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
329 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
334 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
336 def do_provision(self):
337 # check if host is alive
338 if not self.is_alive():
339 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
341 raise RuntimeError, msg
345 if self.get("cleanProcesses"):
346 self.clean_processes()
348 if self.get("cleanHome"):
351 if self.get("cleanExperiment"):
352 self.clean_experiment()
354 # Create shared directory structure
355 self.mkdir(self.lib_dir)
356 self.mkdir(self.bin_dir)
357 self.mkdir(self.src_dir)
358 self.mkdir(self.share_dir)
360 # Create experiment node home directory
361 self.mkdir(self.node_home)
363 super(LinuxNode, self).do_provision()
366 if self.state == ResourceState.NEW:
367 self.info("Deploying node")
371 # Node needs to wait until all associated interfaces are
372 # ready before it can finalize deployment
373 from nepi.resources.linux.interface import LinuxInterface
374 ifaces = self.get_connected(LinuxInterface.get_rtype())
376 if iface.state < ResourceState.READY:
377 self.ec.schedule(reschedule_delay, self.deploy)
380 super(LinuxNode, self).do_deploy()
382 def do_release(self):
383 rms = self.get_connected()
385 # Node needs to wait until all associated RMs are released
386 # before it can be released
387 if rm.state != ResourceState.RELEASED:
388 self.ec.schedule(reschedule_delay, self.release)
391 tear_down = self.get("tearDown")
393 self.execute(tear_down)
395 self.clean_processes()
397 super(LinuxNode, self).do_release()
399 def valid_connection(self, guid):
403 def clean_processes(self):
404 self.info("Cleaning up processes")
406 if self.get("username") != 'root':
407 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
408 "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
409 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
412 if self.state >= ResourceState.READY:
413 ps_aux = "ps aux |awk '{print $2}' |sort -u"
414 (out, err), proc = self.execute(ps_aux)
415 pids_temp = out.split()
416 kill_pids = list(set(pids_temp) - set(self._pids))
417 kill_pids = ' '.join(kill_pids)
419 cmd = ("killall tcpdump || /bin/true ; " +
420 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
421 "kill %s || /bin/true ; " % kill_pids)
423 cmd = ("killall tcpdump || /bin/true ; " +
424 "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
427 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
429 def clean_home(self):
430 """ Cleans all NEPI related folders in the Linux host
432 self.info("Cleaning up home")
434 cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
437 return self.execute(cmd, with_lock = True)
439 def clean_experiment(self):
440 """ Cleans all experiment related files in the Linux host.
441 It preserves NEPI files and folders that have a multi experiment
444 self.info("Cleaning up experiment files")
446 cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
450 return self.execute(cmd, with_lock = True)
452 def execute(self, command,
460 err_on_timeout = True,
461 connect_timeout = 30,
462 strict_host_checking = False,
467 """ Notice that this invocation will block until the
468 execution finishes. If this is not the desired behavior,
469 use 'run' instead."""
472 (out, err), proc = execfuncs.lexec(command,
473 user = self.get("username"), # still problem with localhost
479 with self._node_lock:
480 (out, err), proc = sshfuncs.rexec(
482 host = self.get("hostname"),
483 user = self.get("username"),
484 port = self.get("port"),
485 gwuser = self.get("gatewayUser"),
486 gw = self.get("gateway"),
490 identity = self.get("identity"),
491 server_key = self.get("serverKey"),
494 forward_x11 = forward_x11,
497 err_on_timeout = err_on_timeout,
498 connect_timeout = connect_timeout,
499 persistent = persistent,
501 strict_host_checking = strict_host_checking
504 (out, err), proc = sshfuncs.rexec(
506 host = self.get("hostname"),
507 user = self.get("username"),
508 port = self.get("port"),
509 gwuser = self.get("gatewayUser"),
510 gw = self.get("gateway"),
514 identity = self.get("identity"),
515 server_key = self.get("serverKey"),
518 forward_x11 = forward_x11,
521 err_on_timeout = err_on_timeout,
522 connect_timeout = connect_timeout,
523 persistent = persistent,
525 strict_host_checking = strict_host_checking
528 return (out, err), proc
530 def run(self, command, home,
539 self.debug("Running command '%s'" % command)
542 (out, err), proc = execfuncs.lspawn(command, pidfile,
547 create_home = create_home,
551 with self._node_lock:
552 (out, err), proc = sshfuncs.rspawn(
556 create_home = create_home,
557 stdin = stdin if stdin is not None else '/dev/null',
558 stdout = stdout if stdout else '/dev/null',
559 stderr = stderr if stderr else '/dev/null',
561 host = self.get("hostname"),
562 user = self.get("username"),
563 port = self.get("port"),
564 gwuser = self.get("gatewayUser"),
565 gw = self.get("gateway"),
567 identity = self.get("identity"),
568 server_key = self.get("serverKey"),
572 return (out, err), proc
574 def getpid(self, home, pidfile = "pidfile"):
576 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
578 with self._node_lock:
579 pidtuple = sshfuncs.rgetpid(
580 os.path.join(home, pidfile),
581 host = self.get("hostname"),
582 user = self.get("username"),
583 port = self.get("port"),
584 gwuser = self.get("gatewayUser"),
585 gw = self.get("gateway"),
587 identity = self.get("identity"),
588 server_key = self.get("serverKey")
593 def status(self, pid, ppid):
595 status = execfuncs.lstatus(pid, ppid)
597 with self._node_lock:
598 status = sshfuncs.rstatus(
600 host = self.get("hostname"),
601 user = self.get("username"),
602 port = self.get("port"),
603 gwuser = self.get("gatewayUser"),
604 gw = self.get("gateway"),
606 identity = self.get("identity"),
607 server_key = self.get("serverKey")
612 def kill(self, pid, ppid, sudo = False):
615 status = self.status(pid, ppid)
617 if status == sshfuncs.ProcStatus.RUNNING:
619 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
621 with self._node_lock:
622 (out, err), proc = sshfuncs.rkill(
624 host = self.get("hostname"),
625 user = self.get("username"),
626 port = self.get("port"),
627 gwuser = self.get("gatewayUser"),
628 gw = self.get("gateway"),
631 identity = self.get("identity"),
632 server_key = self.get("serverKey")
635 return (out, err), proc
637 def copy(self, src, dst):
639 (out, err), proc = execfuncs.lcopy(source, dest,
641 strict_host_checking = False)
643 with self._node_lock:
644 (out, err), proc = sshfuncs.rcopy(
646 port = self.get("port"),
647 gwuser = self.get("gatewayUser"),
648 gw = self.get("gateway"),
649 identity = self.get("identity"),
650 server_key = self.get("serverKey"),
652 strict_host_checking = False)
654 return (out, err), proc
656 def upload(self, src, dst, text = False, overwrite = True):
657 """ Copy content to destination
659 src content to copy. Can be a local file, directory or a list of files
661 dst destination path on the remote host (remote is always self.host)
663 text src is text input, it must be stored into a temp file before uploading
665 # If source is a string input
667 if text and not os.path.isfile(src):
668 # src is text input that should be uploaded as file
669 # create a temporal file with the content to upload
670 f = tempfile.NamedTemporaryFile(delete=False)
675 # If dst files should not be overwritten, check that the files do not
677 if overwrite == False:
678 src = self.filter_existing_files(src, dst)
680 return ("", ""), None
682 if not self.localhost:
683 # Build destination as <user>@<server>:<path>
684 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
686 result = self.copy(src, dst)
694 def download(self, src, dst):
695 if not self.localhost:
696 # Build destination as <user>@<server>:<path>
697 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
698 return self.copy(src, dst)
700 def install_packages_command(self, packages):
703 command = rpmfuncs.install_packages_command(self.os, packages)
705 command = debfuncs.install_packages_command(self.os, packages)
707 msg = "Error installing packages ( OS not known ) "
708 self.error(msg, self.os)
709 raise RuntimeError, msg
713 def install_packages(self, packages, home, run_home = None):
714 """ Install packages in the Linux host.
716 'home' is the directory to upload the package installation script.
717 'run_home' is the directory from where to execute the script.
719 command = self.install_packages_command(packages)
721 run_home = run_home or home
723 (out, err), proc = self.run_and_wait(command, run_home,
724 shfile = os.path.join(home, "instpkg.sh"),
725 pidfile = "instpkg_pidfile",
726 ecodefile = "instpkg_exitcode",
727 stdout = "instpkg_stdout",
728 stderr = "instpkg_stderr",
730 raise_on_error = True)
732 return (out, err), proc
734 def remove_packages(self, packages, home, run_home = None):
735 """ Uninstall packages from the Linux host.
737 'home' is the directory to upload the package un-installation script.
738 'run_home' is the directory from where to execute the script.
741 command = rpmfuncs.remove_packages_command(self.os, packages)
743 command = debfuncs.remove_packages_command(self.os, packages)
745 msg = "Error removing packages ( OS not known ) "
747 raise RuntimeError, msg
749 run_home = run_home or home
751 (out, err), proc = self.run_and_wait(command, run_home,
752 shfile = os.path.join(home, "rmpkg.sh"),
753 pidfile = "rmpkg_pidfile",
754 ecodefile = "rmpkg_exitcode",
755 stdout = "rmpkg_stdout",
756 stderr = "rmpkg_stderr",
758 raise_on_error = True)
760 return (out, err), proc
762 def mkdir(self, path, clean = False):
766 return self.execute("mkdir -p %s" % path, with_lock = True)
768 def rmdir(self, path):
769 return self.execute("rm -rf %s" % path, with_lock = True)
771 def run_and_wait(self, command, home,
776 ecodefile = "exitcode",
782 raise_on_error = False):
784 Uploads the 'command' to a bash script in the host.
785 Then runs the script detached in background in the host, and
786 busy-waites until the script finishes executing.
789 if not shfile.startswith("/"):
790 shfile = os.path.join(home, shfile)
792 self.upload_command(command,
794 ecodefile = ecodefile,
796 overwrite = overwrite)
798 command = "bash %s" % shfile
799 # run command in background in remote host
800 (out, err), proc = self.run(command, home,
808 # check no errors occurred
810 msg = " Failed to run command '%s' " % command
811 self.error(msg, out, err)
813 raise RuntimeError, msg
815 # Wait for pid file to be generated
816 pid, ppid = self.wait_pid(
819 raise_on_error = raise_on_error)
821 # wait until command finishes to execute
822 self.wait_run(pid, ppid)
824 (eout, err), proc = self.check_errors(home,
825 ecodefile = ecodefile,
828 # Out is what was written in the stderr file
830 msg = " Failed to run command '%s' " % command
831 self.error(msg, eout, err)
834 raise RuntimeError, msg
836 (out, oerr), proc = self.check_output(home, stdout)
838 return (out, err), proc
840 def exitcode(self, home, ecodefile = "exitcode"):
842 Get the exit code of an application.
843 Returns an integer value with the exit code
845 (out, err), proc = self.check_output(home, ecodefile)
847 # Succeeded to open file, return exit code in the file
850 return int(out.strip())
852 # Error in the content of the file!
853 return ExitCode.CORRUPTFILE
855 # No such file or directory
856 if proc.returncode == 1:
857 return ExitCode.FILENOTFOUND
859 # Other error from 'cat'
860 return ExitCode.ERROR
862 def upload_command(self, command,
864 ecodefile = "exitcode",
867 """ Saves the command as a bash script file in the remote host, and
868 forces to save the exit code of the command execution to the ecodefile
871 if not (command.strip().endswith(";") or command.strip().endswith("&")):
874 # The exit code of the command will be stored in ecodefile
875 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
877 'ecodefile': ecodefile,
881 environ = self.format_environment(env)
883 # Add environ to command
884 command = environ + command
886 return self.upload(command, shfile, text = True, overwrite = overwrite)
888 def format_environment(self, env, inline = False):
889 """ Formats the environment variables for a command to be executed
890 either as an inline command
891 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
892 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
894 if not env: return ""
896 # Remove extra white spaces
897 env = re.sub(r'\s+', ' ', env.strip())
899 sep = ";" if inline else "\n"
900 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
902 def check_errors(self, home,
903 ecodefile = "exitcode",
905 """ Checks whether errors occurred while running a command.
906 It first checks the exit code for the command, and only if the
907 exit code is an error one it returns the error output.
913 # get exit code saved in the 'exitcode' file
914 ecode = self.exitcode(home, ecodefile)
916 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
917 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
918 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
919 # The process returned an error code or didn't exist.
920 # Check standard error.
921 (err, eerr), proc = self.check_output(home, stderr)
923 # If the stderr file was not found, assume nothing bad happened,
924 # and just ignore the error.
925 # (cat returns 1 for error "No such file or directory")
926 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
929 return ("", err), proc
931 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
932 """ Waits until the pid file for the command is generated,
933 and returns the pid and ppid of the process """
938 pidtuple = self.getpid(home = home, pidfile = pidfile)
947 msg = " Failed to get pid for pidfile %s/%s " % (
952 raise RuntimeError, msg
956 def wait_run(self, pid, ppid, trial = 0):
957 """ wait for a remote process to finish execution """
961 status = self.status(pid, ppid)
963 if status is ProcStatus.FINISHED:
965 elif status is not ProcStatus.RUNNING:
968 # If it takes more than 20 seconds to start, then
969 # asume something went wrong
973 # The app is running, just wait...
976 def check_output(self, home, filename):
977 """ Retrives content of file """
978 (out, err), proc = self.execute("cat %s" %
979 os.path.join(home, filename), retry = 1, with_lock = True)
980 return (out, err), proc
983 """ Checks if host is responsive
989 msg = "Unresponsive host. Wrong answer. "
991 # The underlying SSH layer will sometimes return an empty
992 # output (even if the command was executed without errors).
993 # To work arround this, repeat the operation N times or
994 # until the result is not empty string
996 (out, err), proc = self.execute("echo 'ALIVE'",
1000 if out.find("ALIVE") > -1:
1003 trace = traceback.format_exc()
1004 msg = "Unresponsive host. Error reaching host: %s " % trace
1006 self.error(msg, out, err)
1009 def find_home(self):
1010 """ Retrieves host home directory
1012 # The underlying SSH layer will sometimes return an empty
1013 # output (even if the command was executed without errors).
1014 # To work arround this, repeat the operation N times or
1015 # until the result is not empty string
1016 msg = "Impossible to retrieve HOME directory"
1018 (out, err), proc = self.execute("echo ${HOME}",
1022 if out.strip() != "":
1023 self._home_dir = out.strip()
1025 trace = traceback.format_exc()
1026 msg = "Impossible to retrieve HOME directory %s" % trace
1028 if not self._home_dir:
1030 raise RuntimeError, msg
1032 def filter_existing_files(self, src, dst):
1033 """ Removes files that already exist in the Linux host from src list
1035 # construct a dictionary with { dst: src }
1036 dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
1037 src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
1040 for d in dests.keys():
1041 command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
1043 command = ";".join(command)
1045 (out, err), proc = self.execute(command, retry = 1, with_lock = True)
1047 for d in dests.keys():
1048 if out.find(d) > -1:
1054 return " ".join(dests.values())