fixed shebangs in non executable .py files
[nepi.git] / test / lib / test_util.py
1 import nepi.util.environ
2 import ctypes
3 import imp
4 import sys
5
6 # Unittest from Python 2.6 doesn't have these decorators
7 def _bannerwrap(f, text):
8     name = f.__name__
9     def banner(*args, **kwargs):
10         sys.stderr.write("*** WARNING: Skipping test %s: `%s'\n" %
11                 (name, text))
12         return None
13     return banner
14 def skip(text):
15     return lambda f: _bannerwrap(f, text)
16 def skipUnless(cond, text):
17     return (lambda f: _bannerwrap(f, text)) if not cond else lambda f: f
18 def skipIf(cond, text):
19     return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f
20
21 def ns3_bindings_path():
22     if "NEPI_NS3BINDINGS" in os.environ:
23         return os.environ["NEPI_NS3BINDINGS"]
24     return None
25
26 def ns3_library_path():
27     if "NEPI_NS3LIBRARY" in os.environ:
28         return os.environ["NEPI_NS3LIBRARY"]
29     return None
30
31 def ns3_usable():
32     try:
33         from nepi.testbeds.ns3 import execute
34         execute.load_ns3_module()
35     except:
36         import traceback
37         import sys
38         traceback.print_exc(file = sys.stderr)
39         return False
40     return True
41
42 def pl_auth():
43     user = os.environ.get('PL_USER')
44     pwd = os.environ.get('PL_PASS')
45      
46     if user and pwd:
47         return (user,pwd)
48     else:
49         return None
50
51 def find_bin(name, extra_path = None):
52     search = []
53     if "PATH" in os.environ:
54         search += os.environ["PATH"].split(":")
55     for pref in ("/", "/usr/", "/usr/local/"):
56         for d in ("bin", "sbin"):
57             search.append(pref + d)
58     if extra_path:
59         search += extra_path
60
61     for d in search:
62             try:
63                 os.stat(d + "/" + name)
64                 return d + "/" + name
65             except OSError, e:
66                 if e.errno != os.errno.ENOENT:
67                     raise
68     return None
69
70 def find_bin_or_die(name, extra_path = None):
71     r = find_bin(name)
72     if not r:
73         raise RuntimeError(("Cannot find `%s' command, impossible to " +
74                 "continue.") % name)
75     return r
76
77 # SSH stuff
78
79 import os, os.path, re, signal, shutil, socket, subprocess, tempfile
80 def gen_ssh_keypair(filename):
81     ssh_keygen = nepi.util.environ.find_bin_or_die("ssh-keygen")
82     args = [ssh_keygen, '-q', '-N', '', '-f', filename]
83     assert subprocess.Popen(args).wait() == 0
84     return filename, "%s.pub" % filename
85
86 def add_key_to_agent(filename):
87     ssh_add = nepi.util.environ.find_bin_or_die("ssh-add")
88     args = [ssh_add, filename]
89     null = file("/dev/null", "w")
90     assert subprocess.Popen(args, stderr = null).wait() == 0
91     null.close()
92
93 def get_free_port():
94     s = socket.socket()
95     s.bind(("127.0.0.1", 0))
96     port = s.getsockname()[1]
97     return port
98
99 _SSH_CONF = """ListenAddress 127.0.0.1:%d
100 Protocol 2
101 HostKey %s
102 UsePrivilegeSeparation no
103 PubkeyAuthentication yes
104 PasswordAuthentication no
105 AuthorizedKeysFile %s
106 UsePAM no
107 AllowAgentForwarding yes
108 PermitRootLogin yes
109 StrictModes no
110 PermitUserEnvironment yes
111 """
112
113 def gen_sshd_config(filename, port, server_key, auth_keys):
114     conf = open(filename, "w")
115     text = _SSH_CONF % (port, server_key, auth_keys)
116     conf.write(text)
117     conf.close()
118     return filename
119
120 def gen_auth_keys(pubkey, output, environ):
121     #opts = ['from="127.0.0.1/32"'] # fails in stupid yans setup
122     opts = []
123     for k, v in environ.items():
124         opts.append('environment="%s=%s"' % (k, v))
125
126     lines = file(pubkey).readlines()
127     pubkey = lines[0].split()[0:2]
128     out = file(output, "w")
129     out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
130     out.close()
131     return output
132
133 def start_ssh_agent():
134     ssh_agent = nepi.util.environ.find_bin_or_die("ssh-agent")
135     proc = subprocess.Popen([ssh_agent], stdout = subprocess.PIPE)
136     (out, foo) = proc.communicate()
137     assert proc.returncode == 0
138     d = {}
139     for l in out.split("\n"):
140         match = re.search("^(\w+)=([^ ;]+);.*", l)
141         if not match:
142             continue
143         k, v = match.groups()
144         os.environ[k] = v
145         d[k] = v
146     return d
147
148 def stop_ssh_agent(data):
149     # No need to gather the pid, ssh-agent knows how to kill itself; after we
150     # had set up the environment
151     ssh_agent = nepi.util.environ.find_bin_or_die("ssh-agent")
152     null = file("/dev/null", "w")
153     proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
154     null.close()
155     assert proc.wait() == 0
156     for k in data:
157         del os.environ[k]
158
159 class test_environment(object):
160     def __init__(self):
161         sshd = find_bin_or_die("sshd")
162         environ = {}
163         if 'PYTHONPATH' in os.environ:
164             environ['PYTHONPATH'] = ":".join(map(os.path.realpath, 
165                 os.environ['PYTHONPATH'].split(":")))
166         if 'NEPI_NS3BINDINGS' in os.environ:
167             environ['NEPI_NS3BINDINGS'] = \
168                     os.path.realpath(os.environ['NEPI_NS3BINDINGS'])
169         if 'NEPI_NS3LIBRARY' in os.environ:
170             environ['NEPI_NS3LIBRARY'] = \
171                     os.path.realpath(os.environ['NEPI_NS3LIBRARY'])
172
173         self.dir = tempfile.mkdtemp()
174         self.server_keypair = gen_ssh_keypair(
175                 os.path.join(self.dir, "server_key"))
176         self.client_keypair = gen_ssh_keypair(
177                 os.path.join(self.dir, "client_key"))
178         self.authorized_keys = gen_auth_keys(self.client_keypair[1],
179                 os.path.join(self.dir, "authorized_keys"), environ)
180         self.port = get_free_port()
181         self.sshd_conf = gen_sshd_config(
182                 os.path.join(self.dir, "sshd_config"),
183                 self.port, self.server_keypair[0], self.authorized_keys)
184
185         self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
186         self.ssh_agent_vars = start_ssh_agent()
187         add_key_to_agent(self.client_keypair[0])
188
189     def __del__(self):
190         if self.sshd:
191             os.kill(self.sshd.pid, signal.SIGTERM)
192             self.sshd.wait()
193         if self.ssh_agent_vars:
194             stop_ssh_agent(self.ssh_agent_vars)
195         shutil.rmtree(self.dir)
196