SSH daemonization test fix, along with environment setup fixes.
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except (select.error, socket.error), args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84         self._environment_setup = environment_setup
85
86     def run(self):
87         try:
88             if self.daemonize():
89                 self.post_daemonize()
90                 self.loop()
91                 self.cleanup()
92                 # ref: "os._exit(0)"
93                 # can not return normally after fork beacuse no exec was done.
94                 # This means that if we don't do a os._exit(0) here the code that 
95                 # follows the call to "Server.run()" in the "caller code" will be 
96                 # executed... but by now it has already been executed after the 
97                 # first process (the one that did the first fork) returned.
98                 os._exit(0)
99         except:
100             self.log_error()
101             self.cleanup()
102             os._exit(0)
103
104     def daemonize(self):
105         # pipes for process synchronization
106         (r, w) = os.pipe()
107         
108         # build root folder
109         root = os.path.normpath(self._root_dir)
110         if not os.path.exists(root):
111             os.makedirs(root, 0755)
112
113         pid1 = os.fork()
114         if pid1 > 0:
115             os.close(w)
116             while True:
117                 try:
118                     os.read(r, 1)
119                 except OSError, e: # pragma: no cover
120                     if e.errno == errno.EINTR:
121                         continue
122                     else:
123                         raise
124                 break
125             os.close(r)
126             # os.waitpid avoids leaving a <defunc> (zombie) process
127             st = os.waitpid(pid1, 0)[1]
128             if st:
129                 raise RuntimeError("Daemonization failed")
130             # return 0 to inform the caller method that this is not the 
131             # daemonized process
132             return 0
133         os.close(r)
134
135         # Decouple from parent environment.
136         os.chdir(self._root_dir)
137         os.umask(0)
138         os.setsid()
139
140         # fork 2
141         pid2 = os.fork()
142         if pid2 > 0:
143             # see ref: "os._exit(0)"
144             os._exit(0)
145
146         # close all open file descriptors.
147         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148         if (max_fd == resource.RLIM_INFINITY):
149             max_fd = MAX_FD
150         for fd in range(3, max_fd):
151             if fd != w:
152                 try:
153                     os.close(fd)
154                 except OSError:
155                     pass
156
157         # Redirect standard file descriptors.
158         stdin = open(DEV_NULL, "r")
159         stderr = stdout = open(STD_ERR, "a", 0)
160         os.dup2(stdin.fileno(), sys.stdin.fileno())
161         # NOTE: sys.stdout.write will still be buffered, even if the file
162         # was opened with 0 buffer
163         os.dup2(stdout.fileno(), sys.stdout.fileno())
164         os.dup2(stderr.fileno(), sys.stderr.fileno())
165         
166         # setup environment
167         if self._environment_setup:
168             # parse environment variables and pass to child process
169             # do it by executing shell commands, in case there's some heavy setup involved
170             envproc = subprocess.Popen(
171                 [ "bash", "-c", 
172                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173                         ( self._environment_setup, ) ],
174                 stdin = subprocess.PIPE, 
175                 stdout = subprocess.PIPE,
176                 stderr = subprocess.PIPE
177             )
178             out,err = envproc.communicate()
179
180             # parse new environment
181             environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
182             
183             # apply to current environment
184             for name, value in environment.iteritems():
185                 os.environ[name] = value
186             
187             # apply pythonpath
188             if 'PYTHONPATH' in environment:
189                 sys.path = environment['PYTHONPATH'].split(':') + sys.path
190
191         # create control socket
192         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
193         self._ctrl_sock.bind(CTRL_SOCK)
194         self._ctrl_sock.listen(0)
195
196         # let the parent process know that the daemonization is finished
197         os.write(w, "\n")
198         os.close(w)
199         return 1
200
201     def post_daemonize(self):
202         pass
203
204     def loop(self):
205         while not self._stop:
206             conn, addr = self._ctrl_sock.accept()
207             conn.settimeout(5)
208             while not self._stop:
209                 try:
210                     msg = self.recv_msg(conn)
211                 except socket.timeout, e:
212                     self.log_error()
213                     break
214                     
215                 if msg == STOP_MSG:
216                     self._stop = True
217                     reply = self.stop_action()
218                 else:
219                     reply = self.reply_action(msg)
220                 
221                 try:
222                     self.send_reply(conn, reply)
223                 except socket.error:
224                     self.log_error()
225                     self.log_error("NOTICE: Awaiting for reconnection")
226                     break
227             try:
228                 conn.close()
229             except:
230                 # Doesn't matter
231                 self.log_error()
232
233     def recv_msg(self, conn):
234         data = [self._rdbuf]
235         chunk = data[0]
236         while '\n' not in chunk:
237             try:
238                 chunk = conn.recv(1024)
239             except (OSError, socket.error), e:
240                 if e[0] != errno.EINTR:
241                     raise
242                 else:
243                     continue
244             if chunk:
245                 data.append(chunk)
246             else:
247                 # empty chunk = EOF
248                 break
249         data = ''.join(data).split('\n',1)
250         while len(data) < 2:
251             data.append('')
252         data, self._rdbuf = data
253         
254         decoded = base64.b64decode(data)
255         return decoded.rstrip()
256
257     def send_reply(self, conn, reply):
258         encoded = base64.b64encode(reply)
259         conn.send("%s\n" % encoded)
260        
261     def cleanup(self):
262         try:
263             self._ctrl_sock.close()
264             os.remove(CTRL_SOCK)
265         except:
266             self.log_error()
267
268     def stop_action(self):
269         return "Stopping server"
270
271     def reply_action(self, msg):
272         return "Reply to: %s" % msg
273
274     def log_error(self, text = None, context = ''):
275         if text == None:
276             text = traceback.format_exc()
277         date = time.strftime("%Y-%m-%d %H:%M:%S")
278         if context:
279             context = " (%s)" % (context,)
280         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
281         return text
282
283     def log_debug(self, text):
284         if self._log_level == DEBUG_LEVEL:
285             date = time.strftime("%Y-%m-%d %H:%M:%S")
286             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
287
288 class Forwarder(object):
289     def __init__(self, root_dir = "."):
290         self._ctrl_sock = None
291         self._root_dir = root_dir
292         self._stop = False
293         self._rdbuf = ""
294
295     def forward(self):
296         self.connect()
297         print >>sys.stderr, "READY."
298         while not self._stop:
299             data = self.read_data()
300             self.send_to_server(data)
301             data = self.recv_from_server()
302             self.write_data(data)
303         self.disconnect()
304
305     def read_data(self):
306         return sys.stdin.readline()
307
308     def write_data(self, data):
309         sys.stdout.write(data)
310         # sys.stdout.write is buffered, this is why we need to do a flush()
311         sys.stdout.flush()
312
313     def send_to_server(self, data):
314         try:
315             self._ctrl_sock.send(data)
316         except (IOError, socket.error), e:
317             if e[0] == errno.EPIPE:
318                 self.connect()
319                 self._ctrl_sock.send(data)
320             else:
321                 raise e
322         encoded = data.rstrip() 
323         msg = base64.b64decode(encoded)
324         if msg == STOP_MSG:
325             self._stop = True
326
327     def recv_from_server(self):
328         data = [self._rdbuf]
329         chunk = data[0]
330         while '\n' not in chunk:
331             try:
332                 chunk = self._ctrl_sock.recv(1024)
333             except (OSError, socket.error), e:
334                 if e[0] != errno.EINTR:
335                     raise
336                 continue
337             if chunk:
338                 data.append(chunk)
339             else:
340                 # empty chunk = EOF
341                 break
342         data = ''.join(data).split('\n',1)
343         while len(data) < 2:
344             data.append('')
345         data, self._rdbuf = data
346         
347         return data+'\n'
348  
349     def connect(self):
350         self.disconnect()
351         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
352         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
353         self._ctrl_sock.connect(sock_addr)
354
355     def disconnect(self):
356         try:
357             self._ctrl_sock.close()
358         except:
359             pass
360
361 class Client(object):
362     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
363             agent = None, environment_setup = ""):
364         self.root_dir = root_dir
365         self.addr = (host, port)
366         self.user = user
367         self.agent = agent
368         self.environment_setup = environment_setup
369         self._stopped = False
370         self._deferreds = collections.deque()
371         self.connect()
372     
373     def __del__(self):
374         if self._process.poll() is None:
375             os.kill(self._process.pid, signal.SIGTERM)
376         self._process.wait()
377         
378     def connect(self):
379         root_dir = self.root_dir
380         (host, port) = self.addr
381         user = self.user
382         agent = self.agent
383         
384         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
385                 c.forward()" % (root_dir,)
386         if host != None:
387             self._process = popen_ssh_subprocess(python_code, host, port, 
388                     user, agent,
389                     environment_setup = self.environment_setup)
390             # popen_ssh_subprocess already waits for readiness
391             if self._process.poll():
392                 err = proc.stderr.read()
393                 raise RuntimeError("Client could not be reached: %s" % \
394                         err)
395         else:
396             self._process = subprocess.Popen(
397                     ["python", "-c", python_code],
398                     stdin = subprocess.PIPE, 
399                     stdout = subprocess.PIPE,
400                     stderr = subprocess.PIPE
401                 )
402                 
403         # Wait for the forwarder to be ready, otherwise nobody
404         # will be able to connect to it
405         helo = self._process.stderr.readline()
406         if helo != 'READY.\n':
407             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
408                     helo + self._process.stderr.read())
409         
410     def send_msg(self, msg):
411         encoded = base64.b64encode(msg)
412         data = "%s\n" % encoded
413         
414         try:
415             self._process.stdin.write(data)
416         except (IOError, ValueError):
417             # dead process, poll it to un-zombify
418             self._process.poll()
419             
420             # try again after reconnect
421             # If it fails again, though, give up
422             self.connect()
423             self._process.stdin.write(data)
424
425     def send_stop(self):
426         self.send_msg(STOP_MSG)
427         self._stopped = True
428
429     def defer_reply(self, transform=None):
430         defer_entry = []
431         self._deferreds.append(defer_entry)
432         return defer.Defer(
433             functools.partial(self.read_reply, defer_entry, transform)
434         )
435         
436     def _read_reply(self):
437         data = self._process.stdout.readline()
438         encoded = data.rstrip() 
439         if not encoded:
440             # empty == eof == dead process, poll it to un-zombify
441             self._process.poll()
442             
443             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
444         return base64.b64decode(encoded)
445     
446     def read_reply(self, which=None, transform=None):
447         # Test to see if someone did it already
448         if which is not None and len(which):
449             # Ok, they did it...
450             # ...just return the deferred value
451             if transform:
452                 return transform(which[0])
453             else:
454                 return which[0]
455         
456         # Process all deferreds until the one we're looking for
457         # or until the queue is empty
458         while self._deferreds:
459             try:
460                 deferred = self._deferreds.popleft()
461             except IndexError:
462                 # emptied
463                 break
464             
465             deferred.append(self._read_reply())
466             if deferred is which:
467                 # We reached the one we were looking for
468                 if transform:
469                     return transform(deferred[0])
470                 else:
471                     return deferred[0]
472         
473         if which is None:
474             # They've requested a synchronous read
475             if transform:
476                 return transform(self._read_reply())
477             else:
478                 return self._read_reply()
479
480 def _make_server_key_args(server_key, host, port, args):
481     """ 
482     Returns a reference to the created temporary file, and adds the
483     corresponding arguments to the given argument list.
484     
485     Make sure to hold onto it until the process is done with the file
486     """
487     if port is not None:
488         host = '%s:%s' % (host,port)
489     # Create a temporary server key file
490     tmp_known_hosts = tempfile.NamedTemporaryFile()
491     
492     # Add the intended host key
493     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
494     
495     # If we're not in strict mode, add user-configured keys
496     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
497         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
498         if os.access(user_hosts_path, os.R_OK):
499             f = open(user_hosts_path, "r")
500             tmp_known_hosts.write(f.read())
501             f.close()
502         
503     tmp_known_hosts.flush()
504     
505     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
506     
507     return tmp_known_hosts
508
509 def popen_ssh_command(command, host, port, user, agent, 
510             stdin="", 
511             ident_key = None,
512             server_key = None,
513             tty = False):
514         """
515         Executes a remote commands, returns ((stdout,stderr),process)
516         """
517         if TRACE:
518             print "ssh", host, command
519         
520         tmp_known_hosts = None
521         args = ['ssh',
522                 # Don't bother with localhost. Makes test easier
523                 '-o', 'NoHostAuthenticationForLocalhost=yes',
524                 '-l', user, host]
525         if agent:
526             args.append('-A')
527         if port:
528             args.append('-p%d' % port)
529         if ident_key:
530             args.extend(('-i', ident_key))
531         if tty:
532             args.append('-t')
533         if server_key:
534             # Create a temporary server key file
535             tmp_known_hosts = _make_server_key_args(
536                 server_key, host, port, args)
537         args.append(command)
538
539         # connects to the remote host and starts a remote connection
540         proc = subprocess.Popen(args, 
541                 stdout = subprocess.PIPE,
542                 stdin = subprocess.PIPE, 
543                 stderr = subprocess.PIPE)
544         
545         # attach tempfile object to the process, to make sure the file stays
546         # alive until the process is finished with it
547         proc._known_hosts = tmp_known_hosts
548         
549         out, err = proc.communicate(stdin)
550         if TRACE:
551             print " -> ", out, err
552
553         return ((out, err), proc)
554  
555 def popen_scp(source, dest, 
556             port = None, 
557             agent = None, 
558             recursive = False,
559             ident_key = None,
560             server_key = None):
561         """
562         Copies from/to remote sites.
563         
564         Source and destination should have the user and host encoded
565         as per scp specs.
566         
567         If source is a file object, a special mode will be used to
568         create the remote file with the same contents.
569         
570         If dest is a file object, the remote file (source) will be
571         read and written into dest.
572         
573         In these modes, recursive cannot be True.
574         
575         Source can be a list of files to copy to a single destination,
576         in which case it is advised that the destination be a folder.
577         """
578         
579         if TRACE:
580             print "scp", source, dest
581         
582         if isinstance(source, file) or isinstance(dest, file) \
583                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
584             assert not recursive
585             
586             # Parse source/destination as <user>@<server>:<path>
587             if isinstance(dest, basestring) and ':' in dest:
588                 remspec, path = dest.split(':',1)
589             elif isinstance(source, basestring) and ':' in source:
590                 remspec, path = source.split(':',1)
591             else:
592                 raise ValueError, "Both endpoints cannot be local"
593             user,host = remspec.rsplit('@',1)
594             tmp_known_hosts = None
595             
596             args = ['ssh', '-l', user, '-C',
597                     # Don't bother with localhost. Makes test easier
598                     '-o', 'NoHostAuthenticationForLocalhost=yes',
599                     host ]
600             if port:
601                 args.append('-P%d' % port)
602             if ident_key:
603                 args.extend(('-i', ident_key))
604             if server_key:
605                 # Create a temporary server key file
606                 tmp_known_hosts = _make_server_key_args(
607                     server_key, host, port, args)
608             
609             if isinstance(source, file) or hasattr(source, 'read'):
610                 args.append('cat > %s' % (shell_escape(path),))
611             elif isinstance(dest, file) or hasattr(dest, 'write'):
612                 args.append('cat %s' % (shell_escape(path),))
613             else:
614                 raise AssertionError, "Unreachable code reached! :-Q"
615             
616             # connects to the remote host and starts a remote connection
617             if isinstance(source, file):
618                 proc = subprocess.Popen(args, 
619                         stdout = open('/dev/null','w'),
620                         stderr = subprocess.PIPE,
621                         stdin = source)
622                 err = proc.stderr.read()
623                 proc._known_hosts = tmp_known_hosts
624                 eintr_retry(proc.wait)()
625                 return ((None,err), proc)
626             elif isinstance(dest, file):
627                 proc = subprocess.Popen(args, 
628                         stdout = open('/dev/null','w'),
629                         stderr = subprocess.PIPE,
630                         stdin = source)
631                 err = proc.stderr.read()
632                 proc._known_hosts = tmp_known_hosts
633                 eintr_retry(proc.wait)()
634                 return ((None,err), proc)
635             elif hasattr(source, 'read'):
636                 # file-like (but not file) source
637                 proc = subprocess.Popen(args, 
638                         stdout = open('/dev/null','w'),
639                         stderr = subprocess.PIPE,
640                         stdin = subprocess.PIPE)
641                 
642                 buf = None
643                 err = []
644                 while True:
645                     if not buf:
646                         buf = source.read(4096)
647                     if not buf:
648                         #EOF
649                         break
650                     
651                     rdrdy, wrdy, broken = select.select(
652                         [proc.stderr],
653                         [proc.stdin],
654                         [proc.stderr,proc.stdin])
655                     
656                     if proc.stderr in rdrdy:
657                         # use os.read for fully unbuffered behavior
658                         err.append(os.read(proc.stderr.fileno(), 4096))
659                     
660                     if proc.stdin in wrdy:
661                         proc.stdin.write(buf)
662                         buf = None
663                     
664                     if broken:
665                         break
666                 proc.stdin.close()
667                 err.append(proc.stderr.read())
668                     
669                 proc._known_hosts = tmp_known_hosts
670                 eintr_retry(proc.wait)()
671                 return ((None,''.join(err)), proc)
672             elif hasattr(dest, 'write'):
673                 # file-like (but not file) dest
674                 proc = subprocess.Popen(args, 
675                         stdout = subprocess.PIPE,
676                         stderr = subprocess.PIPE,
677                         stdin = open('/dev/null','w'))
678                 
679                 buf = None
680                 err = []
681                 while True:
682                     rdrdy, wrdy, broken = select.select(
683                         [proc.stderr, proc.stdout],
684                         [],
685                         [proc.stderr, proc.stdout])
686                     
687                     if proc.stderr in rdrdy:
688                         # use os.read for fully unbuffered behavior
689                         err.append(os.read(proc.stderr.fileno(), 4096))
690                     
691                     if proc.stdout in rdrdy:
692                         # use os.read for fully unbuffered behavior
693                         buf = os.read(proc.stdout.fileno(), 4096)
694                         dest.write(buf)
695                         
696                         if not buf:
697                             #EOF
698                             break
699                     
700                     if broken:
701                         break
702                 err.append(proc.stderr.read())
703                     
704                 proc._known_hosts = tmp_known_hosts
705                 eintr_retry(proc.wait)()
706                 return ((None,''.join(err)), proc)
707             else:
708                 raise AssertionError, "Unreachable code reached! :-Q"
709         else:
710             # Parse destination as <user>@<server>:<path>
711             if isinstance(dest, basestring) and ':' in dest:
712                 remspec, path = dest.split(':',1)
713             elif isinstance(source, basestring) and ':' in source:
714                 remspec, path = source.split(':',1)
715             else:
716                 raise ValueError, "Both endpoints cannot be local"
717             user,host = remspec.rsplit('@',1)
718             
719             # plain scp
720             tmp_known_hosts = None
721             args = ['scp', '-q', '-p', '-C',
722                     # Don't bother with localhost. Makes test easier
723                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
724             if port:
725                 args.append('-P%d' % port)
726             if recursive:
727                 args.append('-r')
728             if ident_key:
729                 args.extend(('-i', ident_key))
730             if server_key:
731                 # Create a temporary server key file
732                 tmp_known_hosts = _make_server_key_args(
733                     server_key, host, port, args)
734             if isinstance(source,list):
735                 args.extend(source)
736             else:
737                 args.append(source)
738             args.append(dest)
739
740             # connects to the remote host and starts a remote connection
741             proc = subprocess.Popen(args, 
742                     stdout = subprocess.PIPE,
743                     stdin = subprocess.PIPE, 
744                     stderr = subprocess.PIPE)
745             proc._known_hosts = tmp_known_hosts
746             
747             comm = proc.communicate()
748             eintr_retry(proc.wait)()
749             return (comm, proc)
750  
751 def popen_ssh_subprocess(python_code, host, port, user, agent, 
752         python_path = None,
753         ident_key = None,
754         server_key = None,
755         tty = False,
756         environment_setup = "",
757         waitcommand = False):
758         cmd = ""
759         if python_path:
760             python_path.replace("'", r"'\''")
761             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
762             cmd += " ; "
763         if environment_setup:
764             cmd += environment_setup
765             cmd += " ; "
766         # Uncomment for debug (to run everything under strace)
767         # We had to verify if strace works (cannot nest them)
768         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
769         #cmd += "$CMD "
770         #cmd += "strace -f -tt -s 200 -o strace$$.out "
771         cmd += "python -c '"
772         cmd += "import base64, os\n"
773         cmd += "cmd = \"\"\n"
774         cmd += "while True:\n"
775         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
776         cmd += " if cmd[-1] == \"\\n\": break\n"
777         cmd += "cmd = base64.b64decode(cmd)\n"
778         # Uncomment for debug
779         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
780         if not waitcommand:
781             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
782         cmd += "exec(cmd)\n"
783         if waitcommand:
784             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
785         cmd += "'"
786         
787         tmp_known_hosts = None
788         args = ['ssh',
789                 # Don't bother with localhost. Makes test easier
790                 '-o', 'NoHostAuthenticationForLocalhost=yes',
791                 '-l', user, host]
792         if agent:
793             args.append('-A')
794         if port:
795             args.append('-p%d' % port)
796         if ident_key:
797             args.extend(('-i', ident_key))
798         if tty:
799             args.append('-t')
800         if server_key:
801             # Create a temporary server key file
802             tmp_known_hosts = _make_server_key_args(
803                 server_key, host, port, args)
804         args.append(cmd)
805
806         # connects to the remote host and starts a remote rpyc connection
807         proc = subprocess.Popen(args, 
808                 stdout = subprocess.PIPE,
809                 stdin = subprocess.PIPE, 
810                 stderr = subprocess.PIPE)
811         proc._known_hosts = tmp_known_hosts
812         
813         # send the command to execute
814         os.write(proc.stdin.fileno(),
815                 base64.b64encode(python_code) + "\n")
816         msg = os.read(proc.stdout.fileno(), 3)
817         if msg != "OK\n":
818             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
819                 msg, proc.stdout.read(), proc.stderr.read())
820         return proc
821