3 from neco.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\
18 def find_bin(name, extra_path = None):
20 if "PATH" in os.environ:
21 search += os.environ["PATH"].split(":")
22 for pref in ("/", "/usr/", "/usr/local/"):
23 for d in ("bin", "sbin"):
24 search.append(pref + d)
30 os.stat(d + "/" + name)
33 if e.errno != os.errno.ENOENT:
37 def find_bin_or_die(name, extra_path = None):
40 raise RuntimeError(("Cannot find `%s' command, impossible to " +
44 def gen_ssh_keypair(filename):
45 ssh_keygen = find_bin_or_die("ssh-keygen")
46 args = [ssh_keygen, '-q', '-N', '', '-f', filename]
47 assert subprocess.Popen(args).wait() == 0
48 return filename, "%s.pub" % filename
50 def add_key_to_agent(filename):
51 ssh_add = find_bin_or_die("ssh-add")
52 args = [ssh_add, filename]
53 null = file("/dev/null", "w")
54 assert subprocess.Popen(args, stderr = null).wait() == 0
59 s.bind(("127.0.0.1", 0))
60 port = s.getsockname()[1]
63 _SSH_CONF = """ListenAddress 127.0.0.1:%d
66 UsePrivilegeSeparation no
67 PubkeyAuthentication yes
68 PasswordAuthentication no
71 AllowAgentForwarding yes
74 PermitUserEnvironment yes
77 def gen_sshd_config(filename, port, server_key, auth_keys):
78 conf = open(filename, "w")
79 text = _SSH_CONF % (port, server_key, auth_keys)
84 def gen_auth_keys(pubkey, output, environ):
85 #opts = ['from="127.0.0.1/32"'] # fails in stupid yans setup
87 for k, v in environ.items():
88 opts.append('environment="%s=%s"' % (k, v))
90 lines = file(pubkey).readlines()
91 pubkey = lines[0].split()[0:2]
92 out = file(output, "w")
93 out.write("%s %s %s\n" % (",".join(opts), pubkey[0], pubkey[1]))
97 def start_ssh_agent():
98 ssh_agent = find_bin_or_die("ssh-agent")
99 proc = subprocess.Popen([ssh_agent], stdout = subprocess.PIPE)
100 (out, foo) = proc.communicate()
101 assert proc.returncode == 0
103 for l in out.split("\n"):
104 match = re.search("^(\w+)=([^ ;]+);.*", l)
107 k, v = match.groups()
112 def stop_ssh_agent(data):
113 # No need to gather the pid, ssh-agent knows how to kill itself; after we
114 # had set up the environment
115 ssh_agent = find_bin_or_die("ssh-agent")
116 null = file("/dev/null", "w")
117 proc = subprocess.Popen([ssh_agent, "-k"], stdout = null)
119 assert proc.wait() == 0
123 class test_environment(object):
125 sshd = find_bin_or_die("sshd")
127 self.dir = tempfile.mkdtemp()
128 self.server_keypair = gen_ssh_keypair(
129 os.path.join(self.dir, "server_key"))
130 self.client_keypair = gen_ssh_keypair(
131 os.path.join(self.dir, "client_key"))
132 self.authorized_keys = gen_auth_keys(self.client_keypair[1],
133 os.path.join(self.dir, "authorized_keys"), environ)
134 self.port = get_free_port()
135 self.sshd_conf = gen_sshd_config(
136 os.path.join(self.dir, "sshd_config"),
137 self.port, self.server_keypair[0], self.authorized_keys)
139 self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
140 self.ssh_agent_vars = start_ssh_agent()
141 add_key_to_agent(self.client_keypair[0])
145 os.kill(self.sshd.pid, signal.SIGTERM)
147 if self.ssh_agent_vars:
148 stop_ssh_agent(self.ssh_agent_vars)
149 shutil.rmtree(self.dir)
151 class SSHfuncsTestCase(unittest.TestCase):
152 def test_rexec(self):
153 env = test_environment()
154 user = getpass.getuser()
159 plocal = subprocess.Popen(command, stdout=subprocess.PIPE,
160 stdin=subprocess.PIPE)
161 outlocal, errlocal = plocal.communicate()
163 (outremote, errrmote), premote = rexec(command, host, user,
164 port = env.port, agent = True)
166 self.assertEquals(outlocal, outremote)
168 def test_rcopy(self):
169 env = test_environment()
170 user = getpass.getuser()
173 # create some temp files and directories to copy
174 dirpath = tempfile.mkdtemp()
175 f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False)
178 f1 = tempfile.NamedTemporaryFile(delete=False)
182 source = [dirpath, f1.name]
183 destdir = tempfile.mkdtemp()
184 dest = "%s@%s:%s" % (user, host, destdir)
185 rcopy(source, dest, port = env.port, agent = True)
188 def recls(files, dirname, names):
190 os.path.walk(destdir, recls, files)
192 origfiles = map(lambda s: os.path.basename(s), [dirpath, f.name, f1.name])
194 self.assertEquals(sorted(origfiles), sorted(files))
196 def test_rproc_manage(self):
197 env = test_environment()
198 user = getpass.getuser()
200 command = "ping localhost"
202 f = tempfile.NamedTemporaryFile(delete=False)
205 (out,err), proc = rspawn(
215 (pid, ppid) = rcheckpid(pidfile,
221 status = rstatus(pid, ppid,
227 self.assertEquals(status, RUNNING)
235 status = rstatus(pid, ppid,
241 self.assertEquals(status, FINISHED)
244 if __name__ == '__main__':