Merge with head
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except (select.error, socket.error), args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84         self._environment_setup = environment_setup
85
86     def run(self):
87         try:
88             if self.daemonize():
89                 self.post_daemonize()
90                 self.loop()
91                 self.cleanup()
92                 # ref: "os._exit(0)"
93                 # can not return normally after fork beacuse no exec was done.
94                 # This means that if we don't do a os._exit(0) here the code that 
95                 # follows the call to "Server.run()" in the "caller code" will be 
96                 # executed... but by now it has already been executed after the 
97                 # first process (the one that did the first fork) returned.
98                 os._exit(0)
99         except:
100             self.log_error()
101             self.cleanup()
102             os._exit(0)
103
104     def daemonize(self):
105         # pipes for process synchronization
106         (r, w) = os.pipe()
107         
108         # build root folder
109         root = os.path.normpath(self._root_dir)
110         if not os.path.exists(root):
111             os.makedirs(root, 0755)
112
113         pid1 = os.fork()
114         if pid1 > 0:
115             os.close(w)
116             while True:
117                 try:
118                     os.read(r, 1)
119                 except OSError, e: # pragma: no cover
120                     if e.errno == errno.EINTR:
121                         continue
122                     else:
123                         raise
124                 break
125             os.close(r)
126             # os.waitpid avoids leaving a <defunc> (zombie) process
127             st = os.waitpid(pid1, 0)[1]
128             if st:
129                 raise RuntimeError("Daemonization failed")
130             # return 0 to inform the caller method that this is not the 
131             # daemonized process
132             return 0
133         os.close(r)
134
135         # Decouple from parent environment.
136         os.chdir(self._root_dir)
137         os.umask(0)
138         os.setsid()
139
140         # fork 2
141         pid2 = os.fork()
142         if pid2 > 0:
143             # see ref: "os._exit(0)"
144             os._exit(0)
145
146         # close all open file descriptors.
147         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148         if (max_fd == resource.RLIM_INFINITY):
149             max_fd = MAX_FD
150         for fd in range(3, max_fd):
151             if fd != w:
152                 try:
153                     os.close(fd)
154                 except OSError:
155                     pass
156
157         # Redirect standard file descriptors.
158         stdin = open(DEV_NULL, "r")
159         stderr = stdout = open(STD_ERR, "a", 0)
160         os.dup2(stdin.fileno(), sys.stdin.fileno())
161         # NOTE: sys.stdout.write will still be buffered, even if the file
162         # was opened with 0 buffer
163         os.dup2(stdout.fileno(), sys.stdout.fileno())
164         os.dup2(stderr.fileno(), sys.stderr.fileno())
165         
166         # setup environment
167         if self._environment_setup:
168             # parse environment variables and pass to child process
169             # do it by executing shell commands, in case there's some heavy setup involved
170             envproc = subprocess.Popen(
171                 [ "bash", "-c", 
172                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173                         ( self._environment_setup, ) ],
174                 stdin = subprocess.PIPE, 
175                 stdout = subprocess.PIPE,
176                 stderr = subprocess.PIPE
177             )
178             out,err = envproc.communicate()
179
180             # parse new environment
181             if out:
182                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
183             
184                 # apply to current environment
185                 for name, value in environment.iteritems():
186                     os.environ[name] = value
187                 
188                 # apply pythonpath
189                 if 'PYTHONPATH' in environment:
190                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
191
192         # create control socket
193         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
194         self._ctrl_sock.bind(CTRL_SOCK)
195         self._ctrl_sock.listen(0)
196
197         # let the parent process know that the daemonization is finished
198         os.write(w, "\n")
199         os.close(w)
200         return 1
201
202     def post_daemonize(self):
203         pass
204
205     def loop(self):
206         while not self._stop:
207             conn, addr = self._ctrl_sock.accept()
208             conn.settimeout(5)
209             while not self._stop:
210                 try:
211                     msg = self.recv_msg(conn)
212                 except socket.timeout, e:
213                     self.log_error()
214                     break
215                     
216                 if msg == STOP_MSG:
217                     self._stop = True
218                     reply = self.stop_action()
219                 else:
220                     reply = self.reply_action(msg)
221                 
222                 try:
223                     self.send_reply(conn, reply)
224                 except socket.error:
225                     self.log_error()
226                     self.log_error("NOTICE: Awaiting for reconnection")
227                     break
228             try:
229                 conn.close()
230             except:
231                 # Doesn't matter
232                 self.log_error()
233
234     def recv_msg(self, conn):
235         data = [self._rdbuf]
236         chunk = data[0]
237         while '\n' not in chunk:
238             try:
239                 chunk = conn.recv(1024)
240             except (OSError, socket.error), e:
241                 if e[0] != errno.EINTR:
242                     raise
243                 else:
244                     continue
245             if chunk:
246                 data.append(chunk)
247             else:
248                 # empty chunk = EOF
249                 break
250         data = ''.join(data).split('\n',1)
251         while len(data) < 2:
252             data.append('')
253         data, self._rdbuf = data
254         
255         decoded = base64.b64decode(data)
256         return decoded.rstrip()
257
258     def send_reply(self, conn, reply):
259         encoded = base64.b64encode(reply)
260         conn.send("%s\n" % encoded)
261        
262     def cleanup(self):
263         try:
264             self._ctrl_sock.close()
265             os.remove(CTRL_SOCK)
266         except:
267             self.log_error()
268
269     def stop_action(self):
270         return "Stopping server"
271
272     def reply_action(self, msg):
273         return "Reply to: %s" % msg
274
275     def log_error(self, text = None, context = ''):
276         if text == None:
277             text = traceback.format_exc()
278         date = time.strftime("%Y-%m-%d %H:%M:%S")
279         if context:
280             context = " (%s)" % (context,)
281         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
282         return text
283
284     def log_debug(self, text):
285         if self._log_level == DEBUG_LEVEL:
286             date = time.strftime("%Y-%m-%d %H:%M:%S")
287             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
288
289 class Forwarder(object):
290     def __init__(self, root_dir = "."):
291         self._ctrl_sock = None
292         self._root_dir = root_dir
293         self._stop = False
294         self._rdbuf = ""
295
296     def forward(self):
297         self.connect()
298         print >>sys.stderr, "READY."
299         while not self._stop:
300             data = self.read_data()
301             self.send_to_server(data)
302             data = self.recv_from_server()
303             self.write_data(data)
304         self.disconnect()
305
306     def read_data(self):
307         return sys.stdin.readline()
308
309     def write_data(self, data):
310         sys.stdout.write(data)
311         # sys.stdout.write is buffered, this is why we need to do a flush()
312         sys.stdout.flush()
313
314     def send_to_server(self, data):
315         try:
316             self._ctrl_sock.send(data)
317         except (IOError, socket.error), e:
318             if e[0] == errno.EPIPE:
319                 self.connect()
320                 self._ctrl_sock.send(data)
321             else:
322                 raise e
323         encoded = data.rstrip() 
324         msg = base64.b64decode(encoded)
325         if msg == STOP_MSG:
326             self._stop = True
327
328     def recv_from_server(self):
329         data = [self._rdbuf]
330         chunk = data[0]
331         while '\n' not in chunk:
332             try:
333                 chunk = self._ctrl_sock.recv(1024)
334             except (OSError, socket.error), e:
335                 if e[0] != errno.EINTR:
336                     raise
337                 continue
338             if chunk:
339                 data.append(chunk)
340             else:
341                 # empty chunk = EOF
342                 break
343         data = ''.join(data).split('\n',1)
344         while len(data) < 2:
345             data.append('')
346         data, self._rdbuf = data
347         
348         return data+'\n'
349  
350     def connect(self):
351         self.disconnect()
352         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
353         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
354         self._ctrl_sock.connect(sock_addr)
355
356     def disconnect(self):
357         try:
358             self._ctrl_sock.close()
359         except:
360             pass
361
362 class Client(object):
363     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
364             agent = None, environment_setup = ""):
365         self.root_dir = root_dir
366         self.addr = (host, port)
367         self.user = user
368         self.agent = agent
369         self.environment_setup = environment_setup
370         self._stopped = False
371         self._deferreds = collections.deque()
372         self.connect()
373     
374     def __del__(self):
375         if self._process.poll() is None:
376             os.kill(self._process.pid, signal.SIGTERM)
377         self._process.wait()
378         
379     def connect(self):
380         root_dir = self.root_dir
381         (host, port) = self.addr
382         user = self.user
383         agent = self.agent
384         
385         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
386                 c.forward()" % (root_dir,)
387         if host != None:
388             self._process = popen_ssh_subprocess(python_code, host, port, 
389                     user, agent,
390                     environment_setup = self.environment_setup)
391             # popen_ssh_subprocess already waits for readiness
392             if self._process.poll():
393                 err = proc.stderr.read()
394                 raise RuntimeError("Client could not be reached: %s" % \
395                         err)
396         else:
397             self._process = subprocess.Popen(
398                     ["python", "-c", python_code],
399                     stdin = subprocess.PIPE, 
400                     stdout = subprocess.PIPE,
401                     stderr = subprocess.PIPE
402                 )
403                 
404         # Wait for the forwarder to be ready, otherwise nobody
405         # will be able to connect to it
406         helo = self._process.stderr.readline()
407         if helo != 'READY.\n':
408             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
409                     helo + self._process.stderr.read())
410         
411     def send_msg(self, msg):
412         encoded = base64.b64encode(msg)
413         data = "%s\n" % encoded
414         
415         try:
416             self._process.stdin.write(data)
417         except (IOError, ValueError):
418             # dead process, poll it to un-zombify
419             self._process.poll()
420             
421             # try again after reconnect
422             # If it fails again, though, give up
423             self.connect()
424             self._process.stdin.write(data)
425
426     def send_stop(self):
427         self.send_msg(STOP_MSG)
428         self._stopped = True
429
430     def defer_reply(self, transform=None):
431         defer_entry = []
432         self._deferreds.append(defer_entry)
433         return defer.Defer(
434             functools.partial(self.read_reply, defer_entry, transform)
435         )
436         
437     def _read_reply(self):
438         data = self._process.stdout.readline()
439         encoded = data.rstrip() 
440         if not encoded:
441             # empty == eof == dead process, poll it to un-zombify
442             self._process.poll()
443             
444             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
445         return base64.b64decode(encoded)
446     
447     def read_reply(self, which=None, transform=None):
448         # Test to see if someone did it already
449         if which is not None and len(which):
450             # Ok, they did it...
451             # ...just return the deferred value
452             if transform:
453                 return transform(which[0])
454             else:
455                 return which[0]
456         
457         # Process all deferreds until the one we're looking for
458         # or until the queue is empty
459         while self._deferreds:
460             try:
461                 deferred = self._deferreds.popleft()
462             except IndexError:
463                 # emptied
464                 break
465             
466             deferred.append(self._read_reply())
467             if deferred is which:
468                 # We reached the one we were looking for
469                 if transform:
470                     return transform(deferred[0])
471                 else:
472                     return deferred[0]
473         
474         if which is None:
475             # They've requested a synchronous read
476             if transform:
477                 return transform(self._read_reply())
478             else:
479                 return self._read_reply()
480
481 def _make_server_key_args(server_key, host, port, args):
482     """ 
483     Returns a reference to the created temporary file, and adds the
484     corresponding arguments to the given argument list.
485     
486     Make sure to hold onto it until the process is done with the file
487     """
488     if port is not None:
489         host = '%s:%s' % (host,port)
490     # Create a temporary server key file
491     tmp_known_hosts = tempfile.NamedTemporaryFile()
492     
493     # Add the intended host key
494     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
495     
496     # If we're not in strict mode, add user-configured keys
497     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
498         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
499         if os.access(user_hosts_path, os.R_OK):
500             f = open(user_hosts_path, "r")
501             tmp_known_hosts.write(f.read())
502             f.close()
503         
504     tmp_known_hosts.flush()
505     
506     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
507     
508     return tmp_known_hosts
509
510 def popen_ssh_command(command, host, port, user, agent, 
511             stdin="", 
512             ident_key = None,
513             server_key = None,
514             tty = False):
515         """
516         Executes a remote commands, returns ((stdout,stderr),process)
517         """
518         if TRACE:
519             print "ssh", host, command
520         
521         tmp_known_hosts = None
522         args = ['ssh',
523                 # Don't bother with localhost. Makes test easier
524                 '-o', 'NoHostAuthenticationForLocalhost=yes',
525                 '-l', user, host]
526         if agent:
527             args.append('-A')
528         if port:
529             args.append('-p%d' % port)
530         if ident_key:
531             args.extend(('-i', ident_key))
532         if tty:
533             args.append('-t')
534         if server_key:
535             # Create a temporary server key file
536             tmp_known_hosts = _make_server_key_args(
537                 server_key, host, port, args)
538         args.append(command)
539
540         # connects to the remote host and starts a remote connection
541         proc = subprocess.Popen(args, 
542                 stdout = subprocess.PIPE,
543                 stdin = subprocess.PIPE, 
544                 stderr = subprocess.PIPE)
545         
546         # attach tempfile object to the process, to make sure the file stays
547         # alive until the process is finished with it
548         proc._known_hosts = tmp_known_hosts
549         
550         out, err = proc.communicate(stdin)
551         if TRACE:
552             print " -> ", out, err
553
554         return ((out, err), proc)
555  
556 def popen_scp(source, dest, 
557             port = None, 
558             agent = None, 
559             recursive = False,
560             ident_key = None,
561             server_key = None):
562         """
563         Copies from/to remote sites.
564         
565         Source and destination should have the user and host encoded
566         as per scp specs.
567         
568         If source is a file object, a special mode will be used to
569         create the remote file with the same contents.
570         
571         If dest is a file object, the remote file (source) will be
572         read and written into dest.
573         
574         In these modes, recursive cannot be True.
575         
576         Source can be a list of files to copy to a single destination,
577         in which case it is advised that the destination be a folder.
578         """
579         
580         if TRACE:
581             print "scp", source, dest
582         
583         if isinstance(source, file) and source.tell() == 0:
584             source = source.name
585         elif hasattr(source, 'read'):
586             tmp = tempfile.NamedTemporaryFile()
587             while True:
588                 buf = source.read(65536)
589                 if buf:
590                     tmp.write(buf)
591                 else:
592                     break
593             tmp.seek(0)
594             source = tmp.name
595         
596         if isinstance(source, file) or isinstance(dest, file) \
597                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
598             assert not recursive
599             
600             # Parse source/destination as <user>@<server>:<path>
601             if isinstance(dest, basestring) and ':' in dest:
602                 remspec, path = dest.split(':',1)
603             elif isinstance(source, basestring) and ':' in source:
604                 remspec, path = source.split(':',1)
605             else:
606                 raise ValueError, "Both endpoints cannot be local"
607             user,host = remspec.rsplit('@',1)
608             tmp_known_hosts = None
609             
610             args = ['ssh', '-l', user, '-C',
611                     # Don't bother with localhost. Makes test easier
612                     '-o', 'NoHostAuthenticationForLocalhost=yes',
613                     host ]
614             if port:
615                 args.append('-P%d' % port)
616             if ident_key:
617                 args.extend(('-i', ident_key))
618             if server_key:
619                 # Create a temporary server key file
620                 tmp_known_hosts = _make_server_key_args(
621                     server_key, host, port, args)
622             
623             if isinstance(source, file) or hasattr(source, 'read'):
624                 args.append('cat > %s' % (shell_escape(path),))
625             elif isinstance(dest, file) or hasattr(dest, 'write'):
626                 args.append('cat %s' % (shell_escape(path),))
627             else:
628                 raise AssertionError, "Unreachable code reached! :-Q"
629             
630             # connects to the remote host and starts a remote connection
631             if isinstance(source, file):
632                 proc = subprocess.Popen(args, 
633                         stdout = open('/dev/null','w'),
634                         stderr = subprocess.PIPE,
635                         stdin = source)
636                 err = proc.stderr.read()
637                 proc._known_hosts = tmp_known_hosts
638                 eintr_retry(proc.wait)()
639                 return ((None,err), proc)
640             elif isinstance(dest, file):
641                 proc = subprocess.Popen(args, 
642                         stdout = open('/dev/null','w'),
643                         stderr = subprocess.PIPE,
644                         stdin = source)
645                 err = proc.stderr.read()
646                 proc._known_hosts = tmp_known_hosts
647                 eintr_retry(proc.wait)()
648                 return ((None,err), proc)
649             elif hasattr(source, 'read'):
650                 # file-like (but not file) source
651                 proc = subprocess.Popen(args, 
652                         stdout = open('/dev/null','w'),
653                         stderr = subprocess.PIPE,
654                         stdin = subprocess.PIPE)
655                 
656                 buf = None
657                 err = []
658                 while True:
659                     if not buf:
660                         buf = source.read(4096)
661                     if not buf:
662                         #EOF
663                         break
664                     
665                     rdrdy, wrdy, broken = select.select(
666                         [proc.stderr],
667                         [proc.stdin],
668                         [proc.stderr,proc.stdin])
669                     
670                     if proc.stderr in rdrdy:
671                         # use os.read for fully unbuffered behavior
672                         err.append(os.read(proc.stderr.fileno(), 4096))
673                     
674                     if proc.stdin in wrdy:
675                         proc.stdin.write(buf)
676                         buf = None
677                     
678                     if broken:
679                         break
680                 proc.stdin.close()
681                 err.append(proc.stderr.read())
682                     
683                 proc._known_hosts = tmp_known_hosts
684                 eintr_retry(proc.wait)()
685                 return ((None,''.join(err)), proc)
686             elif hasattr(dest, 'write'):
687                 # file-like (but not file) dest
688                 proc = subprocess.Popen(args, 
689                         stdout = subprocess.PIPE,
690                         stderr = subprocess.PIPE,
691                         stdin = open('/dev/null','w'))
692                 
693                 buf = None
694                 err = []
695                 while True:
696                     rdrdy, wrdy, broken = select.select(
697                         [proc.stderr, proc.stdout],
698                         [],
699                         [proc.stderr, proc.stdout])
700                     
701                     if proc.stderr in rdrdy:
702                         # use os.read for fully unbuffered behavior
703                         err.append(os.read(proc.stderr.fileno(), 4096))
704                     
705                     if proc.stdout in rdrdy:
706                         # use os.read for fully unbuffered behavior
707                         buf = os.read(proc.stdout.fileno(), 4096)
708                         dest.write(buf)
709                         
710                         if not buf:
711                             #EOF
712                             break
713                     
714                     if broken:
715                         break
716                 err.append(proc.stderr.read())
717                     
718                 proc._known_hosts = tmp_known_hosts
719                 eintr_retry(proc.wait)()
720                 return ((None,''.join(err)), proc)
721             else:
722                 raise AssertionError, "Unreachable code reached! :-Q"
723         else:
724             # Parse destination as <user>@<server>:<path>
725             if isinstance(dest, basestring) and ':' in dest:
726                 remspec, path = dest.split(':',1)
727             elif isinstance(source, basestring) and ':' in source:
728                 remspec, path = source.split(':',1)
729             else:
730                 raise ValueError, "Both endpoints cannot be local"
731             user,host = remspec.rsplit('@',1)
732             
733             # plain scp
734             tmp_known_hosts = None
735             args = ['scp', '-q', '-p', '-C',
736                     # Don't bother with localhost. Makes test easier
737                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
738             if port:
739                 args.append('-P%d' % port)
740             if recursive:
741                 args.append('-r')
742             if ident_key:
743                 args.extend(('-i', ident_key))
744             if server_key:
745                 # Create a temporary server key file
746                 tmp_known_hosts = _make_server_key_args(
747                     server_key, host, port, args)
748             if isinstance(source,list):
749                 args.extend(source)
750             else:
751                 args.append(source)
752             args.append(dest)
753
754             # connects to the remote host and starts a remote connection
755             proc = subprocess.Popen(args, 
756                     stdout = subprocess.PIPE,
757                     stdin = subprocess.PIPE, 
758                     stderr = subprocess.PIPE)
759             proc._known_hosts = tmp_known_hosts
760             
761             comm = proc.communicate()
762             eintr_retry(proc.wait)()
763             return (comm, proc)
764  
765 def popen_ssh_subprocess(python_code, host, port, user, agent, 
766         python_path = None,
767         ident_key = None,
768         server_key = None,
769         tty = False,
770         environment_setup = "",
771         waitcommand = False):
772         cmd = ""
773         if python_path:
774             python_path.replace("'", r"'\''")
775             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
776             cmd += " ; "
777         if environment_setup:
778             cmd += environment_setup
779             cmd += " ; "
780         # Uncomment for debug (to run everything under strace)
781         # We had to verify if strace works (cannot nest them)
782         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
783         #cmd += "$CMD "
784         #cmd += "strace -f -tt -s 200 -o strace$$.out "
785         cmd += "python -c '"
786         cmd += "import base64, os\n"
787         cmd += "cmd = \"\"\n"
788         cmd += "while True:\n"
789         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
790         cmd += " if cmd[-1] == \"\\n\": break\n"
791         cmd += "cmd = base64.b64decode(cmd)\n"
792         # Uncomment for debug
793         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
794         if not waitcommand:
795             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
796         cmd += "exec(cmd)\n"
797         if waitcommand:
798             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
799         cmd += "'"
800         
801         tmp_known_hosts = None
802         args = ['ssh',
803                 # Don't bother with localhost. Makes test easier
804                 '-o', 'NoHostAuthenticationForLocalhost=yes',
805                 '-l', user, host]
806         if agent:
807             args.append('-A')
808         if port:
809             args.append('-p%d' % port)
810         if ident_key:
811             args.extend(('-i', ident_key))
812         if tty:
813             args.append('-t')
814         if server_key:
815             # Create a temporary server key file
816             tmp_known_hosts = _make_server_key_args(
817                 server_key, host, port, args)
818         args.append(cmd)
819
820         # connects to the remote host and starts a remote rpyc connection
821         proc = subprocess.Popen(args, 
822                 stdout = subprocess.PIPE,
823                 stdin = subprocess.PIPE, 
824                 stderr = subprocess.PIPE)
825         proc._known_hosts = tmp_known_hosts
826         
827         # send the command to execute
828         os.write(proc.stdin.fileno(),
829                 base64.b64encode(python_code) + "\n")
830         msg = os.read(proc.stdout.fileno(), 3)
831         if msg != "OK\n":
832             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
833                 msg, proc.stdout.read(), proc.stderr.read())
834         return proc
835