# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
try:
os.stat(d + "/" + name)
return d + "/" + name
- except OSError, e:
+ except OSError as e:
if e.errno != os.errno.ENOENT:
raise
return None
def add_key_to_agent(filename):
ssh_add = find_bin_or_die("ssh-add")
args = [ssh_add, filename]
- null = file("/dev/null", "w")
- assert subprocess.Popen(args, stderr = null).wait() == 0
- null.close()
+ with open("/dev/null", "w") as null:
+ assert subprocess.Popen(args, stderr = null).wait() == 0
def get_free_port():
s = socket.socket()
"""
def gen_sshd_config(filename, port, server_key, auth_keys):
- conf = open(filename, "w")
- text = _SSH_CONF % (port, server_key, auth_keys)
- conf.write(text)
- conf.close()
+ with open(filename, "w") as conf:
+ text = _SSH_CONF % (port, server_key, auth_keys)
+ conf.write(text)
return filename
def gen_auth_keys(pubkey, output, environ):
for k, v in environ.items():
opts.append('environment="%s=%s"' % (k, v))
- lines = file(pubkey).readlines()
+ with open(pubkey) as f:
+ lines = f.readlines()
pubkey = lines[0].split()[0:2]
- out = file(output, "w")
- out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
- out.close()
+ with open(output, "w") as out:
+ out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
return output
def start_ssh_agent():
# No need to gather the pid, ssh-agent knows how to kill itself; after we
# had set up the environment
ssh_agent = find_bin_or_die("ssh-agent")
- null = file("/dev/null", "w")
- proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
- null.close()
+ with open("/dev/null", "w") as null:
+ proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
assert proc.wait() == 0
for k in data:
del os.environ[k]
(outremote, errrmote), premote = rexec(command, host, user,
port = env.port, agent = True)
- self.assertEquals(outlocal, outremote)
+ self.assertEqual(outlocal, outremote)
def test_rcopy_list(self):
env = test_environment()
files.extend(names)
os.path.walk(destdir, recls, files)
- origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name])
+ origfiles = [os.path.basename(s) for s in [dirpath, f.name, f1.name]]
- self.assertEquals(sorted(origfiles), sorted(files))
+ self.assertEqual(sorted(origfiles), sorted(files))
os.remove(f1.name)
shutil.rmtree(dirpath)
files.extend(names)
os.path.walk(destdir, recls, files)
- origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name])
+ origfiles = [os.path.basename(s) for s in [dirpath, f.name, f1.name]]
- self.assertEquals(sorted(origfiles), sorted(files))
+ self.assertEqual(sorted(origfiles), sorted(files))
def test_rproc_manage(self):
env = test_environment()
port = env.port,
agent = True)
- self.assertEquals(status, ProcStatus.RUNNING)
+ self.assertEqual(status, ProcStatus.RUNNING)
rkill(pid, ppid,
host = host,
port = env.port,
agent = True)
- self.assertEquals(status, ProcStatus.FINISHED)
+ self.assertEqual(status, ProcStatus.FINISHED)
if __name__ == '__main__':