3 from neco.util.sshfuncs import *
17 def find_bin(name, extra_path = None):
19 if "PATH" in os.environ:
20 search += os.environ["PATH"].split(":")
21 for pref in ("/", "/usr/", "/usr/local/"):
22 for d in ("bin", "sbin"):
23 search.append(pref + d)
29 os.stat(d + "/" + name)
32 if e.errno != os.errno.ENOENT:
36 def find_bin_or_die(name, extra_path = None):
39 raise RuntimeError(("Cannot find `%s' command, impossible to " +
43 def gen_ssh_keypair(filename):
44 ssh_keygen = find_bin_or_die("ssh-keygen")
45 args = [ssh_keygen, '-q', '-N', '', '-f', filename]
46 assert subprocess.Popen(args).wait() == 0
47 return filename, "%s.pub" % filename
49 def add_key_to_agent(filename):
50 ssh_add = find_bin_or_die("ssh-add")
51 args = [ssh_add, filename]
52 null = file("/dev/null", "w")
53 assert subprocess.Popen(args, stderr = null).wait() == 0
58 s.bind(("127.0.0.1", 0))
59 port = s.getsockname()[1]
62 _SSH_CONF = """ListenAddress 127.0.0.1:%d
65 UsePrivilegeSeparation no
66 PubkeyAuthentication yes
67 PasswordAuthentication no
70 AllowAgentForwarding yes
73 PermitUserEnvironment yes
76 def gen_sshd_config(filename, port, server_key, auth_keys):
77 conf = open(filename, "w")
78 text = _SSH_CONF % (port, server_key, auth_keys)
83 def gen_auth_keys(pubkey, output, environ):
84 #opts = ['from="127.0.0.1/32"'] # fails in stupid yans setup
86 for k, v in environ.items():
87 opts.append('environment="%s=%s"' % (k, v))
89 lines = file(pubkey).readlines()
90 pubkey = lines[0].split()[0:2]
91 out = file(output, "w")
92 out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
96 def start_ssh_agent():
97 ssh_agent = find_bin_or_die("ssh-agent")
98 proc = subprocess.Popen([ssh_agent], stdout = subprocess.PIPE)
99 (out, foo) = proc.communicate()
100 assert proc.returncode == 0
102 for l in out.split("\n"):
103 match = re.search("^(\w+)=([^ ;]+);.*", l)
106 k, v = match.groups()
111 def stop_ssh_agent(data):
112 # No need to gather the pid, ssh-agent knows how to kill itself; after we
113 # had set up the environment
114 ssh_agent = find_bin_or_die("ssh-agent")
115 null = file("/dev/null", "w")
116 proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
118 assert proc.wait() == 0
122 class test_environment(object):
124 sshd = find_bin_or_die("sshd")
126 self.dir = tempfile.mkdtemp()
127 self.server_keypair = gen_ssh_keypair(
128 os.path.join(self.dir, "server_key"))
129 self.client_keypair = gen_ssh_keypair(
130 os.path.join(self.dir, "client_key"))
131 self.authorized_keys = gen_auth_keys(self.client_keypair[1],
132 os.path.join(self.dir, "authorized_keys"), environ)
133 self.port = get_free_port()
134 self.sshd_conf = gen_sshd_config(
135 os.path.join(self.dir, "sshd_config"),
136 self.port, self.server_keypair[0], self.authorized_keys)
138 self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
139 self.ssh_agent_vars = start_ssh_agent()
140 add_key_to_agent(self.client_keypair[0])
144 os.kill(self.sshd.pid, signal.SIGTERM)
146 if self.ssh_agent_vars:
147 stop_ssh_agent(self.ssh_agent_vars)
148 shutil.rmtree(self.dir)
150 class SSHfuncsTestCase(unittest.TestCase):
151 def test_rexec(self):
152 env = test_environment()
153 user = getpass.getuser()
158 plocal = subprocess.Popen(command, stdout=subprocess.PIPE,
159 stdin=subprocess.PIPE)
160 outlocal, errlocal = plocal.communicate()
162 (outremote, errrmote), premote = rexec(command, host, user,
163 port = env.port, agent = True)
165 self.assertEquals(outlocal, outremote)
167 def test_rcopy(self):
168 env = test_environment()
169 user = getpass.getuser()
172 # create some temp files and directories to copy
173 dirpath = tempfile.mkdtemp()
174 f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False)
177 f1 = tempfile.NamedTemporaryFile(delete=False)
181 source = [dirpath, f1.name]
182 destdir = tempfile.mkdtemp()
183 dest = "%s@%s:%s" % (user, host, destdir)
184 rcopy(source, dest, port = env.port, agent = True)
187 def recls(files, dirname, names):
189 os.path.walk(destdir, recls, files)
191 origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name])
193 self.assertEquals(sorted(origfiles), sorted(files))
195 def test_rproc_manage(self):
196 env = test_environment()
197 user = getpass.getuser()
199 command = "ping localhost"
201 f = tempfile.NamedTemporaryFile(delete=False)
204 (out,err), proc = rspawn(
214 (pid, ppid) = rcheck_pid(pidfile,
220 status = rstatus(pid, ppid,
226 self.assertEquals(status, RUNNING)
234 status = rstatus(pid, ppid,
240 self.assertEquals(status, FINISHED)
243 if __name__ == '__main__':