Made automatic proxy reconnection more robust
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except (select.error, socket.error), args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84
85     def run(self):
86         try:
87             if self.daemonize():
88                 self.post_daemonize()
89                 self.loop()
90                 self.cleanup()
91                 # ref: "os._exit(0)"
92                 # can not return normally after fork beacuse no exec was done.
93                 # This means that if we don't do a os._exit(0) here the code that 
94                 # follows the call to "Server.run()" in the "caller code" will be 
95                 # executed... but by now it has already been executed after the 
96                 # first process (the one that did the first fork) returned.
97                 os._exit(0)
98         except:
99             self.log_error()
100             self.cleanup()
101             os._exit(0)
102
103     def daemonize(self):
104         # pipes for process synchronization
105         (r, w) = os.pipe()
106         
107         # build root folder
108         root = os.path.normpath(self._root_dir)
109         if not os.path.exists(root):
110             os.makedirs(root, 0755)
111
112         pid1 = os.fork()
113         if pid1 > 0:
114             os.close(w)
115             while True:
116                 try:
117                     os.read(r, 1)
118                 except OSError, e: # pragma: no cover
119                     if e.errno == errno.EINTR:
120                         continue
121                     else:
122                         raise
123                 break
124             os.close(r)
125             # os.waitpid avoids leaving a <defunc> (zombie) process
126             st = os.waitpid(pid1, 0)[1]
127             if st:
128                 raise RuntimeError("Daemonization failed")
129             # return 0 to inform the caller method that this is not the 
130             # daemonized process
131             return 0
132         os.close(r)
133
134         # Decouple from parent environment.
135         os.chdir(self._root_dir)
136         os.umask(0)
137         os.setsid()
138
139         # fork 2
140         pid2 = os.fork()
141         if pid2 > 0:
142             # see ref: "os._exit(0)"
143             os._exit(0)
144
145         # close all open file descriptors.
146         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
147         if (max_fd == resource.RLIM_INFINITY):
148             max_fd = MAX_FD
149         for fd in range(3, max_fd):
150             if fd != w:
151                 try:
152                     os.close(fd)
153                 except OSError:
154                     pass
155
156         # Redirect standard file descriptors.
157         stdin = open(DEV_NULL, "r")
158         stderr = stdout = open(STD_ERR, "a", 0)
159         os.dup2(stdin.fileno(), sys.stdin.fileno())
160         # NOTE: sys.stdout.write will still be buffered, even if the file
161         # was opened with 0 buffer
162         os.dup2(stdout.fileno(), sys.stdout.fileno())
163         os.dup2(stderr.fileno(), sys.stderr.fileno())
164
165         # create control socket
166         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167         self._ctrl_sock.bind(CTRL_SOCK)
168         self._ctrl_sock.listen(0)
169
170         # let the parent process know that the daemonization is finished
171         os.write(w, "\n")
172         os.close(w)
173         return 1
174
175     def post_daemonize(self):
176         pass
177
178     def loop(self):
179         while not self._stop:
180             conn, addr = self._ctrl_sock.accept()
181             conn.settimeout(5)
182             while not self._stop:
183                 try:
184                     msg = self.recv_msg(conn)
185                 except socket.timeout, e:
186                     self.log_error()
187                     break
188                     
189                 if msg == STOP_MSG:
190                     self._stop = True
191                     reply = self.stop_action()
192                 else:
193                     reply = self.reply_action(msg)
194                 
195                 try:
196                     self.send_reply(conn, reply)
197                 except socket.error:
198                     self.log_error()
199                     self.log_error("NOTICE: Awaiting for reconnection")
200                     break
201             try:
202                 conn.close()
203             except:
204                 # Doesn't matter
205                 self.log_error()
206
207     def recv_msg(self, conn):
208         data = [self._rdbuf]
209         chunk = data[0]
210         while '\n' not in chunk:
211             try:
212                 chunk = conn.recv(1024)
213             except (OSError, socket.error), e:
214                 if e[0] != errno.EINTR:
215                     raise
216                 else:
217                     continue
218             if chunk:
219                 data.append(chunk)
220             else:
221                 # empty chunk = EOF
222                 break
223         data = ''.join(data).split('\n',1)
224         while len(data) < 2:
225             data.append('')
226         data, self._rdbuf = data
227         
228         decoded = base64.b64decode(data)
229         return decoded.rstrip()
230
231     def send_reply(self, conn, reply):
232         encoded = base64.b64encode(reply)
233         conn.send("%s\n" % encoded)
234        
235     def cleanup(self):
236         try:
237             self._ctrl_sock.close()
238             os.remove(CTRL_SOCK)
239         except:
240             self.log_error()
241
242     def stop_action(self):
243         return "Stopping server"
244
245     def reply_action(self, msg):
246         return "Reply to: %s" % msg
247
248     def log_error(self, text = None, context = ''):
249         if text == None:
250             text = traceback.format_exc()
251         date = time.strftime("%Y-%m-%d %H:%M:%S")
252         if context:
253             context = " (%s)" % (context,)
254         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
255         return text
256
257     def log_debug(self, text):
258         if self._log_level == DEBUG_LEVEL:
259             date = time.strftime("%Y-%m-%d %H:%M:%S")
260             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
261
262 class Forwarder(object):
263     def __init__(self, root_dir = "."):
264         self._ctrl_sock = None
265         self._root_dir = root_dir
266         self._stop = False
267         self._rdbuf = ""
268
269     def forward(self):
270         self.connect()
271         print >>sys.stderr, "READY."
272         while not self._stop:
273             data = self.read_data()
274             self.send_to_server(data)
275             data = self.recv_from_server()
276             self.write_data(data)
277         self.disconnect()
278
279     def read_data(self):
280         return sys.stdin.readline()
281
282     def write_data(self, data):
283         sys.stdout.write(data)
284         # sys.stdout.write is buffered, this is why we need to do a flush()
285         sys.stdout.flush()
286
287     def send_to_server(self, data):
288         try:
289             self._ctrl_sock.send(data)
290         except (IOError, socket.error), e:
291             if e[0] == errno.EPIPE:
292                 self.connect()
293                 self._ctrl_sock.send(data)
294             else:
295                 raise e
296         encoded = data.rstrip() 
297         msg = base64.b64decode(encoded)
298         if msg == STOP_MSG:
299             self._stop = True
300
301     def recv_from_server(self):
302         data = [self._rdbuf]
303         chunk = data[0]
304         while '\n' not in chunk:
305             try:
306                 chunk = self._ctrl_sock.recv(1024)
307             except (OSError, socket.error), e:
308                 if e[0] != errno.EINTR:
309                     raise
310                 continue
311             if chunk:
312                 data.append(chunk)
313             else:
314                 # empty chunk = EOF
315                 break
316         data = ''.join(data).split('\n',1)
317         while len(data) < 2:
318             data.append('')
319         data, self._rdbuf = data
320         
321         return data+'\n'
322  
323     def connect(self):
324         self.disconnect()
325         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
326         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
327         self._ctrl_sock.connect(sock_addr)
328
329     def disconnect(self):
330         try:
331             self._ctrl_sock.close()
332         except:
333             pass
334
335 class Client(object):
336     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
337             agent = None, environment_setup = ""):
338         self.root_dir = root_dir
339         self.addr = (host, port)
340         self.user = user
341         self.agent = agent
342         self.environment_setup = environment_setup
343         self._stopped = False
344         self._deferreds = collections.deque()
345         self.connect()
346     
347     def __del__(self):
348         if self._process.poll() is None:
349             os.kill(self._process.pid, signal.SIGTERM)
350         self._process.wait()
351         
352     def connect(self):
353         root_dir = self.root_dir
354         (host, port) = self.addr
355         user = self.user
356         agent = self.agent
357         
358         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
359                 c.forward()" % (root_dir,)
360         if host != None:
361             self._process = popen_ssh_subprocess(python_code, host, port, 
362                     user, agent,
363                     environment_setup = self.environment_setup)
364             # popen_ssh_subprocess already waits for readiness
365             if self._process.poll():
366                 err = proc.stderr.read()
367                 raise RuntimeError("Client could not be reached: %s" % \
368                         err)
369         else:
370             self._process = subprocess.Popen(
371                     ["python", "-c", python_code],
372                     stdin = subprocess.PIPE, 
373                     stdout = subprocess.PIPE,
374                     stderr = subprocess.PIPE
375                 )
376                 
377         # Wait for the forwarder to be ready, otherwise nobody
378         # will be able to connect to it
379         helo = self._process.stderr.readline()
380         if helo != 'READY.\n':
381             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
382                     helo + self._process.stderr.read())
383         
384     def send_msg(self, msg):
385         encoded = base64.b64encode(msg)
386         data = "%s\n" % encoded
387         
388         try:
389             self._process.stdin.write(data)
390         except (IOError, ValueError):
391             # dead process, poll it to un-zombify
392             self._process.poll()
393             
394             # try again after reconnect
395             # If it fails again, though, give up
396             self.connect()
397             self._process.stdin.write(data)
398
399     def send_stop(self):
400         self.send_msg(STOP_MSG)
401         self._stopped = True
402
403     def defer_reply(self, transform=None):
404         defer_entry = []
405         self._deferreds.append(defer_entry)
406         return defer.Defer(
407             functools.partial(self.read_reply, defer_entry, transform)
408         )
409         
410     def _read_reply(self):
411         data = self._process.stdout.readline()
412         encoded = data.rstrip() 
413         if not encoded:
414             # empty == eof == dead process, poll it to un-zombify
415             self._process.poll()
416             
417             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
418         return base64.b64decode(encoded)
419     
420     def read_reply(self, which=None, transform=None):
421         # Test to see if someone did it already
422         if which is not None and len(which):
423             # Ok, they did it...
424             # ...just return the deferred value
425             if transform:
426                 return transform(which[0])
427             else:
428                 return which[0]
429         
430         # Process all deferreds until the one we're looking for
431         # or until the queue is empty
432         while self._deferreds:
433             try:
434                 deferred = self._deferreds.popleft()
435             except IndexError:
436                 # emptied
437                 break
438             
439             deferred.append(self._read_reply())
440             if deferred is which:
441                 # We reached the one we were looking for
442                 if transform:
443                     return transform(deferred[0])
444                 else:
445                     return deferred[0]
446         
447         if which is None:
448             # They've requested a synchronous read
449             if transform:
450                 return transform(self._read_reply())
451             else:
452                 return self._read_reply()
453
454 def _make_server_key_args(server_key, host, port, args):
455     """ 
456     Returns a reference to the created temporary file, and adds the
457     corresponding arguments to the given argument list.
458     
459     Make sure to hold onto it until the process is done with the file
460     """
461     if port is not None:
462         host = '%s:%s' % (host,port)
463     # Create a temporary server key file
464     tmp_known_hosts = tempfile.NamedTemporaryFile()
465     
466     # Add the intended host key
467     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
468     
469     # If we're not in strict mode, add user-configured keys
470     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
471         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
472         if os.access(user_hosts_path, os.R_OK):
473             f = open(user_hosts_path, "r")
474             tmp_known_hosts.write(f.read())
475             f.close()
476         
477     tmp_known_hosts.flush()
478     
479     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
480     
481     return tmp_known_hosts
482
483 def popen_ssh_command(command, host, port, user, agent, 
484             stdin="", 
485             ident_key = None,
486             server_key = None,
487             tty = False):
488         """
489         Executes a remote commands, returns ((stdout,stderr),process)
490         """
491         if TRACE:
492             print "ssh", host, command
493         
494         tmp_known_hosts = None
495         args = ['ssh',
496                 # Don't bother with localhost. Makes test easier
497                 '-o', 'NoHostAuthenticationForLocalhost=yes',
498                 '-l', user, host]
499         if agent:
500             args.append('-A')
501         if port:
502             args.append('-p%d' % port)
503         if ident_key:
504             args.extend(('-i', ident_key))
505         if tty:
506             args.append('-t')
507         if server_key:
508             # Create a temporary server key file
509             tmp_known_hosts = _make_server_key_args(
510                 server_key, host, port, args)
511         args.append(command)
512
513         # connects to the remote host and starts a remote connection
514         proc = subprocess.Popen(args, 
515                 stdout = subprocess.PIPE,
516                 stdin = subprocess.PIPE, 
517                 stderr = subprocess.PIPE)
518         
519         # attach tempfile object to the process, to make sure the file stays
520         # alive until the process is finished with it
521         proc._known_hosts = tmp_known_hosts
522         
523         out, err = proc.communicate(stdin)
524         if TRACE:
525             print " -> ", out, err
526
527         return ((out, err), proc)
528  
529 def popen_scp(source, dest, 
530             port = None, 
531             agent = None, 
532             recursive = False,
533             ident_key = None,
534             server_key = None):
535         """
536         Copies from/to remote sites.
537         
538         Source and destination should have the user and host encoded
539         as per scp specs.
540         
541         If source is a file object, a special mode will be used to
542         create the remote file with the same contents.
543         
544         If dest is a file object, the remote file (source) will be
545         read and written into dest.
546         
547         In these modes, recursive cannot be True.
548         
549         Source can be a list of files to copy to a single destination,
550         in which case it is advised that the destination be a folder.
551         """
552         
553         if TRACE:
554             print "scp", source, dest
555         
556         if isinstance(source, file) or isinstance(dest, file) \
557                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
558             assert not recursive
559             
560             # Parse source/destination as <user>@<server>:<path>
561             if isinstance(dest, basestring) and ':' in dest:
562                 remspec, path = dest.split(':',1)
563             elif isinstance(source, basestring) and ':' in source:
564                 remspec, path = source.split(':',1)
565             else:
566                 raise ValueError, "Both endpoints cannot be local"
567             user,host = remspec.rsplit('@',1)
568             tmp_known_hosts = None
569             
570             args = ['ssh', '-l', user, '-C',
571                     # Don't bother with localhost. Makes test easier
572                     '-o', 'NoHostAuthenticationForLocalhost=yes',
573                     host ]
574             if port:
575                 args.append('-P%d' % port)
576             if ident_key:
577                 args.extend(('-i', ident_key))
578             if server_key:
579                 # Create a temporary server key file
580                 tmp_known_hosts = _make_server_key_args(
581                     server_key, host, port, args)
582             
583             if isinstance(source, file) or hasattr(source, 'read'):
584                 args.append('cat > %s' % (shell_escape(path),))
585             elif isinstance(dest, file) or hasattr(dest, 'write'):
586                 args.append('cat %s' % (shell_escape(path),))
587             else:
588                 raise AssertionError, "Unreachable code reached! :-Q"
589             
590             # connects to the remote host and starts a remote connection
591             if isinstance(source, file):
592                 proc = subprocess.Popen(args, 
593                         stdout = open('/dev/null','w'),
594                         stderr = subprocess.PIPE,
595                         stdin = source)
596                 err = proc.stderr.read()
597                 proc._known_hosts = tmp_known_hosts
598                 eintr_retry(proc.wait)()
599                 return ((None,err), proc)
600             elif isinstance(dest, file):
601                 proc = subprocess.Popen(args, 
602                         stdout = open('/dev/null','w'),
603                         stderr = subprocess.PIPE,
604                         stdin = source)
605                 err = proc.stderr.read()
606                 proc._known_hosts = tmp_known_hosts
607                 eintr_retry(proc.wait)()
608                 return ((None,err), proc)
609             elif hasattr(source, 'read'):
610                 # file-like (but not file) source
611                 proc = subprocess.Popen(args, 
612                         stdout = open('/dev/null','w'),
613                         stderr = subprocess.PIPE,
614                         stdin = subprocess.PIPE)
615                 
616                 buf = None
617                 err = []
618                 while True:
619                     if not buf:
620                         buf = source.read(4096)
621                     if not buf:
622                         #EOF
623                         break
624                     
625                     rdrdy, wrdy, broken = select.select(
626                         [proc.stderr],
627                         [proc.stdin],
628                         [proc.stderr,proc.stdin])
629                     
630                     if proc.stderr in rdrdy:
631                         # use os.read for fully unbuffered behavior
632                         err.append(os.read(proc.stderr.fileno(), 4096))
633                     
634                     if proc.stdin in wrdy:
635                         proc.stdin.write(buf)
636                         buf = None
637                     
638                     if broken:
639                         break
640                 proc.stdin.close()
641                 err.append(proc.stderr.read())
642                     
643                 proc._known_hosts = tmp_known_hosts
644                 eintr_retry(proc.wait)()
645                 return ((None,''.join(err)), proc)
646             elif hasattr(dest, 'write'):
647                 # file-like (but not file) dest
648                 proc = subprocess.Popen(args, 
649                         stdout = subprocess.PIPE,
650                         stderr = subprocess.PIPE,
651                         stdin = open('/dev/null','w'))
652                 
653                 buf = None
654                 err = []
655                 while True:
656                     rdrdy, wrdy, broken = select.select(
657                         [proc.stderr, proc.stdout],
658                         [],
659                         [proc.stderr, proc.stdout])
660                     
661                     if proc.stderr in rdrdy:
662                         # use os.read for fully unbuffered behavior
663                         err.append(os.read(proc.stderr.fileno(), 4096))
664                     
665                     if proc.stdout in rdrdy:
666                         # use os.read for fully unbuffered behavior
667                         buf = os.read(proc.stdout.fileno(), 4096)
668                         dest.write(buf)
669                         
670                         if not buf:
671                             #EOF
672                             break
673                     
674                     if broken:
675                         break
676                 err.append(proc.stderr.read())
677                     
678                 proc._known_hosts = tmp_known_hosts
679                 eintr_retry(proc.wait)()
680                 return ((None,''.join(err)), proc)
681             else:
682                 raise AssertionError, "Unreachable code reached! :-Q"
683         else:
684             # Parse destination as <user>@<server>:<path>
685             if isinstance(dest, basestring) and ':' in dest:
686                 remspec, path = dest.split(':',1)
687             elif isinstance(source, basestring) and ':' in source:
688                 remspec, path = source.split(':',1)
689             else:
690                 raise ValueError, "Both endpoints cannot be local"
691             user,host = remspec.rsplit('@',1)
692             
693             # plain scp
694             tmp_known_hosts = None
695             args = ['scp', '-q', '-p', '-C',
696                     # Don't bother with localhost. Makes test easier
697                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
698             if port:
699                 args.append('-P%d' % port)
700             if recursive:
701                 args.append('-r')
702             if ident_key:
703                 args.extend(('-i', ident_key))
704             if server_key:
705                 # Create a temporary server key file
706                 tmp_known_hosts = _make_server_key_args(
707                     server_key, host, port, args)
708             if isinstance(source,list):
709                 args.extend(source)
710             else:
711                 args.append(source)
712             args.append(dest)
713
714             # connects to the remote host and starts a remote connection
715             proc = subprocess.Popen(args, 
716                     stdout = subprocess.PIPE,
717                     stdin = subprocess.PIPE, 
718                     stderr = subprocess.PIPE)
719             proc._known_hosts = tmp_known_hosts
720             
721             comm = proc.communicate()
722             eintr_retry(proc.wait)()
723             return (comm, proc)
724  
725 def popen_ssh_subprocess(python_code, host, port, user, agent, 
726         python_path = None,
727         ident_key = None,
728         server_key = None,
729         tty = False,
730         environment_setup = "",
731         waitcommand = False):
732         cmd = ""
733         if python_path:
734             python_path.replace("'", r"'\''")
735             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
736             cmd += " ; "
737         if environment_setup:
738             cmd += environment_setup
739             cmd += " ; "
740         # Uncomment for debug (to run everything under strace)
741         # We had to verify if strace works (cannot nest them)
742         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
743         #cmd += "$CMD "
744         #cmd += "strace -f -tt -s 200 -o strace$$.out "
745         cmd += "python -c '"
746         cmd += "import base64, os\n"
747         cmd += "cmd = \"\"\n"
748         cmd += "while True:\n"
749         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
750         cmd += " if cmd[-1] == \"\\n\": break\n"
751         cmd += "cmd = base64.b64decode(cmd)\n"
752         # Uncomment for debug
753         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
754         if not waitcommand:
755             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
756         cmd += "exec(cmd)\n"
757         if waitcommand:
758             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
759         cmd += "'"
760         
761         tmp_known_hosts = None
762         args = ['ssh',
763                 # Don't bother with localhost. Makes test easier
764                 '-o', 'NoHostAuthenticationForLocalhost=yes',
765                 '-l', user, host]
766         if agent:
767             args.append('-A')
768         if port:
769             args.append('-p%d' % port)
770         if ident_key:
771             args.extend(('-i', ident_key))
772         if tty:
773             args.append('-t')
774         if server_key:
775             # Create a temporary server key file
776             tmp_known_hosts = _make_server_key_args(
777                 server_key, host, port, args)
778         args.append(cmd)
779
780         # connects to the remote host and starts a remote rpyc connection
781         proc = subprocess.Popen(args, 
782                 stdout = subprocess.PIPE,
783                 stdin = subprocess.PIPE, 
784                 stderr = subprocess.PIPE)
785         proc._known_hosts = tmp_known_hosts
786         
787         # send the command to execute
788         os.write(proc.stdin.fileno(),
789                 base64.b64encode(python_code) + "\n")
790         msg = os.read(proc.stdout.fileno(), 3)
791         if msg != "OK\n":
792             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
793                 msg, proc.stdout.read(), proc.stderr.read())
794         return proc
795