# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
try:
os.stat(d + "/" + name)
return d + "/" + name
- except OSError, e:
+ except OSError as e:
if e.errno != os.errno.ENOENT:
raise
return None
def add_key_to_agent(filename):
ssh_add = find_bin_or_die("ssh-add")
args = [ssh_add, filename]
- null = file("/dev/null", "w")
- assert subprocess.Popen(args, stderr = null).wait() == 0
- null.close()
+ with open("/dev/null", "w") as null:
+ assert subprocess.Popen(args, stderr = null).wait() == 0
def get_free_port():
s = socket.socket()
"""
def gen_sshd_config(filename, port, server_key, auth_keys):
- conf = open(filename, "w")
- text = _SSH_CONF % (port, server_key, auth_keys)
- conf.write(text)
- conf.close()
+ with open(filename, "w") as conf:
+ text = _SSH_CONF % (port, server_key, auth_keys)
+ conf.write(text)
return filename
def gen_auth_keys(pubkey, output, environ):
for k, v in environ.items():
opts.append('environment="%s=%s"' % (k, v))
- lines = file(pubkey).readlines()
+ with open(pubkey) as f:
+ lines = f.readlines()
pubkey = lines[0].split()[0:2]
- out = file(output, "w")
- out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
- out.close()
+ with open(output, "w") as out:
+ out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
return output
def start_ssh_agent():
# No need to gather the pid, ssh-agent knows how to kill itself; after we
# had set up the environment
ssh_agent = find_bin_or_die("ssh-agent")
- null = file("/dev/null", "w")
- proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
- null.close()
+ with open("/dev/null", "w") as null:
+ proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
assert proc.wait() == 0
for k in data:
del os.environ[k]
os.remove(f1.name)
shutil.rmtree(dirpath)
- def test_rcopy_slist(self):
+ def test_rcopy_list(self):
env = test_environment()
user = getpass.getuser()
host = "localhost"
f1.close()
f1.name
- source = "%s;%s" % (dirpath, f1.name)
+ # Copy a list of files
+ source = [dirpath, f1.name]
destdir = tempfile.mkdtemp()
dest = "%s@%s:%s" % (user, host, destdir)
- rcopy(source, dest, port = env.port, agent = True, recursive = True)
+ ((out, err), proc) = rcopy(source, dest, port = env.port, agent = True, recursive = True)
files = []
def recls(files, dirname, names):
files.extend(names)
os.path.walk(destdir, recls, files)
-
+
origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name])
self.assertEquals(sorted(origfiles), sorted(files))