def skipIf(cond, text):
return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f
+def find_bin(name, extra_path = None):
+ search = []
+ if "PATH" in os.environ:
+ search += os.environ["PATH"].split(":")
+ for pref in ("/", "/usr/", "/usr/local/"):
+ for d in ("bin", "sbin"):
+ search.append(pref + d)
+ if extra_path:
+ search += extra_path
+
+ for d in search:
+ try:
+ os.stat(d + "/" + name)
+ return d + "/" + name
+ except OSError, e:
+ if e.errno != os.errno.ENOENT:
+ raise
+ return None
+
+def find_bin_or_die(name, extra_path = None):
+ r = find_bin(name)
+ if not r:
+ raise RuntimeError(("Cannot find `%s' command, impossible to " +
+ "continue.") % name)
+ return r
+
# SSH stuff
import os, os.path, re, signal, shutil, socket, subprocess, tempfile
for k in data:
del os.environ[k]
+class test_environment(object):
+ def __init__(self):
+ sshd = find_bin_or_die("sshd")
+ environ = {}
+ if 'PYTHONPATH' in os.environ:
+ environ['PYTHONPATH'] = ":".join(map(os.path.realpath,
+ os.environ['PYTHONPATH'].split(":")))
+
+ self.dir = tempfile.mkdtemp()
+ self.server_keypair = gen_ssh_keypair(
+ os.path.join(self.dir, "server_key"))
+ self.client_keypair = gen_ssh_keypair(
+ os.path.join(self.dir, "client_key"))
+ self.authorized_keys = gen_auth_keys(self.client_keypair[1],
+ os.path.join(self.dir, "authorized_keys"), environ)
+ self.port = get_free_port()
+ self.sshd_conf = gen_sshd_config(
+ os.path.join(self.dir, "sshd_config"),
+ self.port, self.server_keypair[0], self.authorized_keys)
+
+ self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
+ self.ssh_agent_vars = start_ssh_agent()
+ add_key_to_agent(self.client_keypair[0])
+
+ def __del__(self):
+ if self.sshd:
+ os.kill(self.sshd.pid, signal.SIGTERM)
+ self.sshd.wait()
+ if self.ssh_agent_vars:
+ stop_ssh_agent(self.ssh_agent_vars)
+ shutil.rmtree(self.dir)