3 import nepi.util.environ
8 # Unittest from Python 2.6 doesn't have these decorators
9 def _bannerwrap(f, text):
11 def banner(*args, **kwargs):
12 sys.stderr.write("*** WARNING: Skipping test %s: `%s'\n" %
17 return lambda f: _bannerwrap(f, text)
18 def skipUnless(cond, text):
19 return (lambda f: _bannerwrap(f, text)) if not cond else lambda f: f
20 def skipIf(cond, text):
21 return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f
23 def ns3_bindings_path():
24 if "NEPI_NS3BINDINGS" in os.environ:
25 return os.environ["NEPI_NS3BINDINGS"]
28 def ns3_library_path():
29 if "NEPI_NS3LIBRARY" in os.environ:
30 return os.environ["NEPI_NS3LIBRARY"]
34 if ns3_library_path():
36 ctypes.CDLL(ns3_library_path(), ctypes.RTLD_GLOBAL)
40 if ns3_bindings_path():
41 sys.path.insert(0, ns3_bindings_path())
44 found = imp.find_module('ns3')
45 module = imp.load_module('ns3', *found)
49 if ns3_bindings_path():
55 user = os.environ.get('PL_USER')
56 pwd = os.environ.get('PL_PASS')
63 def find_bin(name, extra_path = None):
65 if "PATH" in os.environ:
66 search += os.environ["PATH"].split(":")
67 for pref in ("/", "/usr/", "/usr/local/"):
68 for d in ("bin", "sbin"):
69 search.append(pref + d)
75 os.stat(d + "/" + name)
78 if e.errno != os.errno.ENOENT:
82 def find_bin_or_die(name, extra_path = None):
85 raise RuntimeError(("Cannot find `%s' command, impossible to " +
91 import os, os.path, re, signal, shutil, socket, subprocess, tempfile
92 def gen_ssh_keypair(filename):
93 ssh_keygen = nepi.util.environ.find_bin_or_die("ssh-keygen")
94 args = [ssh_keygen, '-q', '-N', '', '-f', filename]
95 assert subprocess.Popen(args).wait() == 0
96 return filename, "%s.pub" % filename
98 def add_key_to_agent(filename):
99 ssh_add = nepi.util.environ.find_bin_or_die("ssh-add")
100 args = [ssh_add, filename]
101 null = file("/dev/null", "w")
102 assert subprocess.Popen(args, stderr = null).wait() == 0
107 s.bind(("127.0.0.1", 0))
108 port = s.getsockname()[1]
111 _SSH_CONF = """ListenAddress 127.0.0.1:%d
114 UsePrivilegeSeparation no
115 PubkeyAuthentication yes
116 PasswordAuthentication no
117 AuthorizedKeysFile %s
119 AllowAgentForwarding yes
122 PermitUserEnvironment yes
125 def gen_sshd_config(filename, port, server_key, auth_keys):
126 conf = open(filename, "w")
127 text = _SSH_CONF % (port, server_key, auth_keys)
132 def gen_auth_keys(pubkey, output, environ):
133 #opts = ['from="127.0.0.1/32"'] # fails in stupid yans setup
135 for k, v in environ.items():
136 opts.append('environment="%s=%s"' % (k, v))
138 lines = file(pubkey).readlines()
139 pubkey = lines[0].split()[0:2]
140 out = file(output, "w")
141 out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
145 def start_ssh_agent():
146 ssh_agent = nepi.util.environ.find_bin_or_die("ssh-agent")
147 proc = subprocess.Popen([ssh_agent], stdout = subprocess.PIPE)
148 (out, foo) = proc.communicate()
149 assert proc.returncode == 0
151 for l in out.split("\n"):
152 match = re.search("^(\w+)=([^ ;]+);.*", l)
155 k, v = match.groups()
160 def stop_ssh_agent(data):
161 # No need to gather the pid, ssh-agent knows how to kill itself; after we
162 # had set up the environment
163 ssh_agent = nepi.util.environ.find_bin_or_die("ssh-agent")
164 null = file("/dev/null", "w")
165 proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
167 assert proc.wait() == 0
171 class test_environment(object):
173 sshd = find_bin_or_die("sshd")
175 if 'PYTHONPATH' in os.environ:
176 environ['PYTHONPATH'] = ":".join(map(os.path.realpath,
177 os.environ['PYTHONPATH'].split(":")))
178 if 'NEPI_NS3BINDINGS' in os.environ:
179 environ['NEPI_NS3BINDINGS'] = \
180 os.path.realpath(os.environ['NEPI_NS3BINDINGS'])
181 if 'NEPI_NS3LIBRARY' in os.environ:
182 environ['NEPI_NS3LIBRARY'] = \
183 os.path.realpath(os.environ['NEPI_NS3LIBRARY'])
185 self.dir = tempfile.mkdtemp()
186 self.server_keypair = gen_ssh_keypair(
187 os.path.join(self.dir, "server_key"))
188 self.client_keypair = gen_ssh_keypair(
189 os.path.join(self.dir, "client_key"))
190 self.authorized_keys = gen_auth_keys(self.client_keypair[1],
191 os.path.join(self.dir, "authorized_keys"), environ)
192 self.port = get_free_port()
193 self.sshd_conf = gen_sshd_config(
194 os.path.join(self.dir, "sshd_config"),
195 self.port, self.server_keypair[0], self.authorized_keys)
197 self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
198 self.ssh_agent_vars = start_ssh_agent()
199 add_key_to_agent(self.client_keypair[0])
203 os.kill(self.sshd.pid, signal.SIGTERM)
205 if self.ssh_agent_vars:
206 stop_ssh_agent(self.ssh_agent_vars)
207 shutil.rmtree(self.dir)