From: Alina Quereilhac Date: Wed, 12 Feb 2014 23:22:47 +0000 (+0100) Subject: Fix #29 LinuxApplication passing a list of files as 'sources' not working X-Git-Tag: nepi-3.1.0~124 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;ds=sidebyside;h=35af1f1f8393dc9663b2a8be8a4c2b1d78f03bc1;p=nepi.git Fix #29 LinuxApplication passing a list of files as 'sources' not working --- diff --git a/examples/linux/ccn/ccncat_2_nodes.py b/examples/linux/ccn/ccncat_2_nodes.py index 2e0af0e2..34d3350a 100755 --- a/examples/linux/ccn/ccncat_2_nodes.py +++ b/examples/linux/ccn/ccncat_2_nodes.py @@ -43,6 +43,7 @@ from nepi.resources.linux.node import OSType from optparse import OptionParser, SUPPRESS_HELP +import getpass import os import time @@ -151,7 +152,8 @@ def get_options(): # the host can be accessed through SSH without prompting # for a password. The host must allow X forwarding using SSH. linux_host = 'roseval.pl.sophia.inria.fr' - + linux_user = getpass.getuser() + usage = "usage: %prog -p -s -l -u -m -e -i " parser = OptionParser(usage=usage) @@ -164,8 +166,8 @@ def get_options(): help="Hostname of second Linux host (non PlanetLab)", default = linux_host, type="str") parser.add_option("-u", "--linux-user", dest="linux_user", - help="User for extra Linux host (non PlanetLab)", default = linux_host, - type="str") + help="User for extra Linux host (non PlanetLab)", + default = linux_user, type="str") parser.add_option("-m", "--movie", dest="movie", help="Stream movie", type="str") parser.add_option("-e", "--exp-id", dest="exp_id", diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 0057ad14..d6391a32 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -101,7 +101,7 @@ class LinuxApplication(ResourceManager): "Space-separated list of packages required to run the application", flags = Flags.ExecReadOnly) sources = Attribute("sources", - "Space-separated list of regular files to be uploaded to ${SRC} " + "Colon-separated list of regular files to be uploaded to ${SRC} " "directory prior to building. Archives won't be expanded automatically. " "Sources are globally available for all experiments unless " "cleanHome is set to True (This will delete all sources). ", @@ -375,7 +375,7 @@ class LinuxApplication(ResourceManager): if sources: self.info("Uploading sources ") - sources = sources.split(' ') + sources = map(str.strip, sources.split(";")) # Separate sources that should be downloaded from # the web, from sources that should be uploaded from diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index f75368dc..cc94daa8 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -657,11 +657,17 @@ class LinuxNode(ResourceManager): def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination - src content to copy. Can be a local file, directory or a list of files + src string with the content to copy. Can be: + - plain text + - a string with the path to a local file + - a string with a colon-separeted list of local files + - a string with a local directory - dst destination path on the remote host (remote is always self.host) + dst string with destination path on the remote host (remote is + always self.host) - text src is text input, it must be stored into a temp file before uploading + text src is text input, it must be stored into a temp file before + uploading """ # If source is a string input f = None @@ -674,11 +680,14 @@ class LinuxNode(ResourceManager): src = f.name # If dst files should not be overwritten, check that the files do not - # exits already + # exits already + if isinstance(src, str): + src = map(str.strip, src.split(";")) + if overwrite == False: src = self.filter_existing_files(src, dst) if not src: - return ("", ""), None + return ("", ""), None if not self.localhost: # Build destination as @: @@ -1034,8 +1043,9 @@ class LinuxNode(ResourceManager): """ Removes files that already exist in the Linux host from src list """ # construct a dictionary with { dst: src } - dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), - src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) + dests = dict(map( + lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \ + if len(src) > 1 else dict({dst: src[0]}) command = [] for d in dests.keys(): @@ -1050,7 +1060,7 @@ class LinuxNode(ResourceManager): del dests[d] if not dests: - return "" + return [] - return " ".join(dests.values()) + return dests.values() diff --git a/src/nepi/util/execfuncs.py b/src/nepi/util/execfuncs.py index eaadf509..b25ae980 100644 --- a/src/nepi/util/execfuncs.py +++ b/src/nepi/util/execfuncs.py @@ -60,10 +60,17 @@ def lcopy(source, dest, recursive = False): command = ["cp"] if recursive: command.append("-R") - - command.append(src) - command.append(dst) - + + if isinstance(dest, str): + dest = dest.split(";") + + if isinstance(src, str): + src = src.split(";") + + args.extend(src) + + args.extend(dest) + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index 80df6906..c00b494d 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -355,282 +355,111 @@ def rcopy(source, dest, Source and destination should have the user and host encoded as per scp specs. - If source is a file object, a special mode will be used to - create the remote file with the same contents. - - If dest is a file object, the remote file (source) will be - read and written into dest. - - In these modes, recursive cannot be True. - - Source can be a list of files to copy to a single destination, - in which case it is advised that the destination be a folder. + Source can be a list of files to copy to a single destination, + (in which case it is advised that the destination be a folder), + a single string or a string list of colon-separeted files. """ + + # Parse destination as @: + if isinstance(dest, str) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, str) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) - if isinstance(source, file) and source.tell() == 0: - source = source.name - elif hasattr(source, 'read'): - tmp = tempfile.NamedTemporaryFile() - while True: - buf = source.read(65536) - if buf: - tmp.write(buf) - else: - break - tmp.seek(0) - source = tmp.name - - if isinstance(source, file) or isinstance(dest, file) \ - or hasattr(source, 'read') or hasattr(dest, 'write'): - assert not recursive - - # Parse source/destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) + # plain scp + tmp_known_hosts = None + + args = ['scp', '-q', '-p', '-C', + # Speed up transfer using blowfish cypher specification which is + # faster than the default one (3des) + '-c', 'blowfish', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + '-o', 'ConnectTimeout=60', + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes' ] + + if port: + args.append('-P%d' % port) + + if gw: + if gwuser: + proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - tmp_known_hosts = None - if not gw: - hostip = gethostbyname(host) - else: hostip = None - - args = ['ssh', '-l', user, '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes', - hostip or host ] - - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),), - '-o', 'ControlPersist=60' ]) - - if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw - args.extend(['-o', proxycommand]) + proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw + args.extend(['-o', proxycommand]) - if port: - args.append('-P%d' % port) + if recursive: + args.append('-r') - if identity: - args.extend(('-i', identity)) + if identity: + args.extend(('-i', identity)) - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - - if isinstance(source, file) or hasattr(source, 'read'): - args.append('cat > %s' % (shell_escape(path),)) - elif isinstance(dest, file) or hasattr(dest, 'write'): - args.append('cat %s' % (shell_escape(path),)) - else: - raise AssertionError, "Unreachable code reached! :-Q" - - # connects to the remote host and starts a remote connection - if isinstance(source, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif isinstance(dest, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif hasattr(source, 'read'): - # file-like (but not file) source - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = subprocess.PIPE) - - buf = None - err = [] - while True: - if not buf: - buf = source.read(4096) - if not buf: - #EOF - break - - rdrdy, wrdy, broken = select.select( - [proc.stderr], - [proc.stdin], - [proc.stderr,proc.stdin]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdin in wrdy: - proc.stdin.write(buf) - buf = None - - if broken: - break - proc.stdin.close() - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - elif hasattr(dest, 'write'): - # file-like (but not file) dest - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - stdin = open('/dev/null','w')) - - buf = None - err = [] - while True: - rdrdy, wrdy, broken = select.select( - [proc.stderr, proc.stdout], - [], - [proc.stderr, proc.stdout]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdout in rdrdy: - # use os.read for fully unbuffered behavior - buf = os.read(proc.stdout.fileno(), 4096) - dest.write(buf) - - if not buf: - #EOF - break - - if broken: - break - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - else: - raise AssertionError, "Unreachable code reached! :-Q" - else: - # Parse destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - # plain scp - tmp_known_hosts = None - - args = ['scp', '-q', '-p', '-C', - # Speed up transfer using blowfish cypher specification which is - # faster than the default one (3des) - '-c', 'blowfish', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes' ] - - if port: - args.append('-P%d' % port) + if server_key: + # Create a temporary server key file + tmp_known_hosts = make_server_key_args(server_key, host, port) + args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw - args.extend(['-o', proxycommand]) + if not strict_host_checking: + # Do not check for Host key. Unsafe. + args.extend(['-o', 'StrictHostKeyChecking=no']) - if recursive: - args.append('-r') + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=%s' % (make_control_path(agent, False),) + ]) - if identity: - args.extend(('-i', identity)) + if isinstance(dest, str): + dest = map(str.strip, dest.split(";")) - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) + if isinstance(source, str): + source = map(str.strip, source.split(";")) - if not strict_host_checking: - # Do not check for Host key. Unsafe. - args.extend(['-o', 'StrictHostKeyChecking=no']) + args.extend(source) - if ' ' in source: - source = source.split(' ') + args.extend(dest) - if isinstance(source,list): - args.extend(source) - else: - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) - ]) - args.append(source) - - args.append(dest) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts + for x in xrange(retry): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) - try: - (out, err) = proc.communicate() - eintr_retry(proc.wait)() - msg = " rcopy - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + try: + (out, err) = proc.communicate() + eintr_retry(proc.wait)() + msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + log(msg, logging.DEBUG, out, err) - if proc.poll(): - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) + if proc.poll(): + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( + t, x, host, " ".join(args)) + log(msg, logging.DEBUG) - time.sleep(t) - continue + time.sleep(t) + continue - break - except RuntimeError, e: - msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + break + except RuntimeError, e: + msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) + log(msg, logging.DEBUG, out, err) - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) def rspawn(command, pidfile, stdout = '/dev/null', diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index edbc2ba7..ad76973b 100755 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -32,10 +32,10 @@ import unittest class LinuxApplicationTestCase(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" - self.fedora_user = "inria_test" + self.fedora_user = "inria_nepi" self.ubuntu_host = "roseval.pl.sophia.inria.fr" - self.ubuntu_user = "alina" + self.ubuntu_user = "inria_nepi" self.target = "nepi5.pl.sophia.inria.fr" @@ -238,12 +238,15 @@ main (void) ec.set(node, "cleanHome", True) ec.set(node, "cleanProcesses", True) - sources = "http://yans.pl.sophia.inria.fr/code/nef/archive/tip.tar.gz " \ - " http://yans.pl.sophia.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png" + sources = "http://yans.pl.sophia.inria.fr/code/nef/archive/tip.tar.gz;" \ + "http://yans.pl.sophia.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png" app = ec.register_resource("LinuxApplication") ec.set(app, "sources", sources) + command = "ls ${SRC}" + ec.set(app, "command", command) + ec.register_connection(app, node) ec.deploy() @@ -260,6 +263,10 @@ main (void) self.assertTrue(out.find("tip.tar.gz") > -1) self.assertTrue(out.find("connect.png") > -1) + stdout = ec.trace(app, "stdout") + self.assertTrue(stdout.find("tip.tar.gz") > -1) + self.assertTrue(stdout.find("connect.png") > -1) + ec.shutdown() @skipIfNotAlive diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index cebf0d81..69222112 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -24,6 +24,7 @@ from nepi.util.sshfuncs import ProcStatus from test_utils import skipIfNotAlive, skipInteractive, create_node +import shutil import os import time import tempfile @@ -32,10 +33,10 @@ import unittest class LinuxNodeTestCase(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" - self.fedora_user = "inria_test" + self.fedora_user = "inria_nepi" self.ubuntu_host = "roseval.pl.sophia.inria.fr" - self.ubuntu_user = "alina" + self.ubuntu_user = "inria_nepi" self.target = "nepi5.pl.sophia.inria.fr" @@ -213,7 +214,6 @@ class LinuxNodeTestCase(unittest.TestCase): self.assertEquals(out.strip(), "") - @skipIfNotAlive def t_xterm(self, host, user): node, ec = create_node(host, user) @@ -286,6 +286,55 @@ main (void) self.assertEquals(out, "Hello, world!\n") + @skipIfNotAlive + def t_copy_files(self, host, user): + node, ec = create_node(host, user) + + node.find_home() + app_home = os.path.join(node.exp_home, "my-app") + node.mkdir(app_home, clean = True) + + # create some temp files and directories to copy + dirpath = tempfile.mkdtemp() + f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False) + f.close() + + f1 = tempfile.NamedTemporaryFile(delete=False) + f1.close() + f1.name + + source = "%s;%s" % (dirpath, f1.name) + destdir = "test" + node.mkdir(destdir, clean = True) + dest = "%s@%s:test" % (user, host) + node.copy(source, dest) + + command = "ls %s" % destdir + + (out, err), proc = node.execute(command) + + os.remove(f1.name) + shutil.rmtree(dirpath) + + self.assertTrue(out.find(os.path.basename(dirpath)) > -1) + self.assertTrue(out.find(os.path.basename(f1.name)) > -1) + + f2 = tempfile.NamedTemporaryFile(delete=False) + f2.close() + f2.name + + node.mkdir(destdir, clean = True) + dest = "%s@%s:test" % (user, host) + node.copy(f2.name, dest) + + command = "ls %s" % destdir + + (out, err), proc = node.execute(command) + + os.remove(f2.name) + + self.assertTrue(out.find(os.path.basename(f2.name)) > -1) + def test_execute_fedora(self): self.t_execute(self.fedora_host, self.fedora_user) @@ -339,6 +388,11 @@ main (void) """ Interactive test. Should not run automatically """ self.t_xterm(self.ubuntu_host, self.ubuntu_user) + def test_copy_files_fedora(self): + self.t_copy_files(self.fedora_host, self.fedora_user) + + def test_copy_files_ubuntu(self): + self.t_copy_files(self.ubuntu_host, self.ubuntu_user) if __name__ == '__main__': unittest.main() diff --git a/test/util/sshfuncs.py b/test/util/sshfuncs.py index 1a55b6b1..75e54644 100755 --- a/test/util/sshfuncs.py +++ b/test/util/sshfuncs.py @@ -184,7 +184,7 @@ class SSHfuncsTestCase(unittest.TestCase): self.assertEquals(outlocal, outremote) - def test_rcopy(self): + def test_rcopy_list(self): env = test_environment() user = getpass.getuser() host = "localhost" @@ -212,6 +212,37 @@ class SSHfuncsTestCase(unittest.TestCase): self.assertEquals(sorted(origfiles), sorted(files)) + os.remove(f1.name) + shutil.rmtree(dirpath) + + def test_rcopy_slist(self): + env = test_environment() + user = getpass.getuser() + host = "localhost" + + # create some temp files and directories to copy + dirpath = tempfile.mkdtemp() + f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False) + f.close() + + f1 = tempfile.NamedTemporaryFile(delete=False) + f1.close() + f1.name + + source = "%s;%s" % (dirpath, f1.name) + destdir = tempfile.mkdtemp() + dest = "%s@%s:%s" % (user, host, destdir) + rcopy(source, dest, port = env.port, agent = True, recursive = True) + + files = [] + def recls(files, dirname, names): + files.extend(names) + os.path.walk(destdir, recls, files) + + origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name]) + + self.assertEquals(sorted(origfiles), sorted(files)) + def test_rproc_manage(self): env = test_environment() user = getpass.getuser()