Catch socket timeouts too and retry
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except select.error, args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84
85     def run(self):
86         try:
87             if self.daemonize():
88                 self.post_daemonize()
89                 self.loop()
90                 self.cleanup()
91                 # ref: "os._exit(0)"
92                 # can not return normally after fork beacuse no exec was done.
93                 # This means that if we don't do a os._exit(0) here the code that 
94                 # follows the call to "Server.run()" in the "caller code" will be 
95                 # executed... but by now it has already been executed after the 
96                 # first process (the one that did the first fork) returned.
97                 os._exit(0)
98         except:
99             self.log_error()
100             self.cleanup()
101             os._exit(0)
102
103     def daemonize(self):
104         # pipes for process synchronization
105         (r, w) = os.pipe()
106         
107         # build root folder
108         root = os.path.normpath(self._root_dir)
109         if not os.path.exists(root):
110             os.makedirs(root, 0755)
111
112         pid1 = os.fork()
113         if pid1 > 0:
114             os.close(w)
115             while True:
116                 try:
117                     os.read(r, 1)
118                 except OSError, e: # pragma: no cover
119                     if e.errno == errno.EINTR:
120                         continue
121                     else:
122                         raise
123                 break
124             os.close(r)
125             # os.waitpid avoids leaving a <defunc> (zombie) process
126             st = os.waitpid(pid1, 0)[1]
127             if st:
128                 raise RuntimeError("Daemonization failed")
129             # return 0 to inform the caller method that this is not the 
130             # daemonized process
131             return 0
132         os.close(r)
133
134         # Decouple from parent environment.
135         os.chdir(self._root_dir)
136         os.umask(0)
137         os.setsid()
138
139         # fork 2
140         pid2 = os.fork()
141         if pid2 > 0:
142             # see ref: "os._exit(0)"
143             os._exit(0)
144
145         # close all open file descriptors.
146         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
147         if (max_fd == resource.RLIM_INFINITY):
148             max_fd = MAX_FD
149         for fd in range(3, max_fd):
150             if fd != w:
151                 try:
152                     os.close(fd)
153                 except OSError:
154                     pass
155
156         # Redirect standard file descriptors.
157         stdin = open(DEV_NULL, "r")
158         stderr = stdout = open(STD_ERR, "a", 0)
159         os.dup2(stdin.fileno(), sys.stdin.fileno())
160         # NOTE: sys.stdout.write will still be buffered, even if the file
161         # was opened with 0 buffer
162         os.dup2(stdout.fileno(), sys.stdout.fileno())
163         os.dup2(stderr.fileno(), sys.stderr.fileno())
164
165         # create control socket
166         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167         self._ctrl_sock.bind(CTRL_SOCK)
168         self._ctrl_sock.listen(0)
169
170         # let the parent process know that the daemonization is finished
171         os.write(w, "\n")
172         os.close(w)
173         return 1
174
175     def post_daemonize(self):
176         pass
177
178     def loop(self):
179         while not self._stop:
180             conn, addr = self._ctrl_sock.accept()
181             conn.settimeout(5)
182             while not self._stop:
183                 try:
184                     msg = self.recv_msg(conn)
185                 except socket.timeout, e:
186                     self.log_error()
187                     break
188                     
189                 if msg == STOP_MSG:
190                     self._stop = True
191                     reply = self.stop_action()
192                 else:
193                     reply = self.reply_action(msg)
194                 
195                 try:
196                     self.send_reply(conn, reply)
197                 except socket.error:
198                     self.log_error()
199                     self.log_error("NOTICE: Awaiting for reconnection")
200                     break
201             try:
202                 conn.close()
203             except:
204                 # Doesn't matter
205                 self.log_error()
206
207     def recv_msg(self, conn):
208         data = [self._rdbuf]
209         chunk = data[0]
210         while '\n' not in chunk:
211             try:
212                 chunk = conn.recv(1024)
213             except socket.timeout:
214                 continue
215             except OSError, e:
216                 if e.errno != errno.EINTR:
217                     raise
218                 else:
219                     continue
220             if chunk:
221                 data.append(chunk)
222             else:
223                 # empty chunk = EOF
224                 break
225         data = ''.join(data).split('\n',1)
226         while len(data) < 2:
227             data.append('')
228         data, self._rdbuf = data
229         
230         decoded = base64.b64decode(data)
231         return decoded.rstrip()
232
233     def send_reply(self, conn, reply):
234         encoded = base64.b64encode(reply)
235         conn.send("%s\n" % encoded)
236        
237     def cleanup(self):
238         try:
239             self._ctrl_sock.close()
240             os.remove(CTRL_SOCK)
241         except:
242             self.log_error()
243
244     def stop_action(self):
245         return "Stopping server"
246
247     def reply_action(self, msg):
248         return "Reply to: %s" % msg
249
250     def log_error(self, text = None, context = ''):
251         if text == None:
252             text = traceback.format_exc()
253         date = time.strftime("%Y-%m-%d %H:%M:%S")
254         if context:
255             context = " (%s)" % (context,)
256         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
257         return text
258
259     def log_debug(self, text):
260         if self._log_level == DEBUG_LEVEL:
261             date = time.strftime("%Y-%m-%d %H:%M:%S")
262             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
263
264 class Forwarder(object):
265     def __init__(self, root_dir = "."):
266         self._ctrl_sock = None
267         self._root_dir = root_dir
268         self._stop = False
269         self._rdbuf = ""
270
271     def forward(self):
272         self.connect()
273         print >>sys.stderr, "READY."
274         while not self._stop:
275             data = self.read_data()
276             self.send_to_server(data)
277             data = self.recv_from_server()
278             self.write_data(data)
279         self.disconnect()
280
281     def read_data(self):
282         return sys.stdin.readline()
283
284     def write_data(self, data):
285         sys.stdout.write(data)
286         # sys.stdout.write is buffered, this is why we need to do a flush()
287         sys.stdout.flush()
288
289     def send_to_server(self, data):
290         try:
291             self._ctrl_sock.send(data)
292         except IOError, e:
293             if e.errno == errno.EPIPE:
294                 self.connect()
295                 self._ctrl_sock.send(data)
296             else:
297                 raise e
298         encoded = data.rstrip() 
299         msg = base64.b64decode(encoded)
300         if msg == STOP_MSG:
301             self._stop = True
302
303     def recv_from_server(self):
304         data = [self._rdbuf]
305         chunk = data[0]
306         while '\n' not in chunk:
307             try:
308                 chunk = self._ctrl_sock.recv(1024)
309             except OSError, e:
310                 if e.errno != errno.EINTR:
311                     raise
312                 if chunk == '':
313                     continue
314             if chunk:
315                 data.append(chunk)
316             else:
317                 # empty chunk = EOF
318                 break
319         data = ''.join(data).split('\n',1)
320         while len(data) < 2:
321             data.append('')
322         data, self._rdbuf = data
323         
324         return data+'\n'
325  
326     def connect(self):
327         self.disconnect()
328         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
329         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
330         self._ctrl_sock.connect(sock_addr)
331
332     def disconnect(self):
333         try:
334             self._ctrl_sock.close()
335         except:
336             pass
337
338 class Client(object):
339     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
340             agent = None, environment_setup = ""):
341         self.root_dir = root_dir
342         self.addr = (host, port)
343         self.user = user
344         self.agent = agent
345         self.environment_setup = environment_setup
346         self._stopped = False
347         self._deferreds = collections.deque()
348         self.connect()
349     
350     def __del__(self):
351         if self._process.poll() is None:
352             os.kill(self._process.pid, signal.SIGTERM)
353         self._process.wait()
354         
355     def connect(self):
356         root_dir = self.root_dir
357         (host, port) = self.addr
358         user = self.user
359         agent = self.agent
360         
361         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
362                 c.forward()" % (root_dir,)
363         if host != None:
364             self._process = popen_ssh_subprocess(python_code, host, port, 
365                     user, agent,
366                     environment_setup = self.environment_setup)
367             # popen_ssh_subprocess already waits for readiness
368             if self._process.poll():
369                 err = proc.stderr.read()
370                 raise RuntimeError("Client could not be reached: %s" % \
371                         err)
372         else:
373             self._process = subprocess.Popen(
374                     ["python", "-c", python_code],
375                     stdin = subprocess.PIPE, 
376                     stdout = subprocess.PIPE,
377                     stderr = subprocess.PIPE
378                 )
379                 
380         # Wait for the forwarder to be ready, otherwise nobody
381         # will be able to connect to it
382         helo = self._process.stderr.readline()
383         if helo != 'READY.\n':
384             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
385                     helo + self._process.stderr.read())
386         
387     def send_msg(self, msg):
388         encoded = base64.b64encode(msg)
389         data = "%s\n" % encoded
390         
391         try:
392             self._process.stdin.write(data)
393         except (IOError, ValueError):
394             # dead process, poll it to un-zombify
395             self._process.poll()
396             
397             # try again after reconnect
398             # If it fails again, though, give up
399             self.connect()
400             self._process.stdin.write(data)
401
402     def send_stop(self):
403         self.send_msg(STOP_MSG)
404         self._stopped = True
405
406     def defer_reply(self, transform=None):
407         defer_entry = []
408         self._deferreds.append(defer_entry)
409         return defer.Defer(
410             functools.partial(self.read_reply, defer_entry, transform)
411         )
412         
413     def _read_reply(self):
414         data = self._process.stdout.readline()
415         encoded = data.rstrip() 
416         if not encoded:
417             # empty == eof == dead process, poll it to un-zombify
418             self._process.poll()
419             
420             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
421         return base64.b64decode(encoded)
422     
423     def read_reply(self, which=None, transform=None):
424         # Test to see if someone did it already
425         if which is not None and len(which):
426             # Ok, they did it...
427             # ...just return the deferred value
428             if transform:
429                 return transform(which[0])
430             else:
431                 return which[0]
432         
433         # Process all deferreds until the one we're looking for
434         # or until the queue is empty
435         while self._deferreds:
436             try:
437                 deferred = self._deferreds.popleft()
438             except IndexError:
439                 # emptied
440                 break
441             
442             deferred.append(self._read_reply())
443             if deferred is which:
444                 # We reached the one we were looking for
445                 if transform:
446                     return transform(deferred[0])
447                 else:
448                     return deferred[0]
449         
450         if which is None:
451             # They've requested a synchronous read
452             if transform:
453                 return transform(self._read_reply())
454             else:
455                 return self._read_reply()
456
457 def _make_server_key_args(server_key, host, port, args):
458     """ 
459     Returns a reference to the created temporary file, and adds the
460     corresponding arguments to the given argument list.
461     
462     Make sure to hold onto it until the process is done with the file
463     """
464     if port is not None:
465         host = '%s:%s' % (host,port)
466     # Create a temporary server key file
467     tmp_known_hosts = tempfile.NamedTemporaryFile()
468     
469     # Add the intended host key
470     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
471     
472     # If we're not in strict mode, add user-configured keys
473     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
474         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
475         if os.access(user_hosts_path, os.R_OK):
476             f = open(user_hosts_path, "r")
477             tmp_known_hosts.write(f.read())
478             f.close()
479         
480     tmp_known_hosts.flush()
481     
482     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
483     
484     return tmp_known_hosts
485
486 def popen_ssh_command(command, host, port, user, agent, 
487             stdin="", 
488             ident_key = None,
489             server_key = None,
490             tty = False):
491         """
492         Executes a remote commands, returns ((stdout,stderr),process)
493         """
494         if TRACE:
495             print "ssh", host, command
496         
497         tmp_known_hosts = None
498         args = ['ssh',
499                 # Don't bother with localhost. Makes test easier
500                 '-o', 'NoHostAuthenticationForLocalhost=yes',
501                 '-l', user, host]
502         if agent:
503             args.append('-A')
504         if port:
505             args.append('-p%d' % port)
506         if ident_key:
507             args.extend(('-i', ident_key))
508         if tty:
509             args.append('-t')
510         if server_key:
511             # Create a temporary server key file
512             tmp_known_hosts = _make_server_key_args(
513                 server_key, host, port, args)
514         args.append(command)
515
516         # connects to the remote host and starts a remote connection
517         proc = subprocess.Popen(args, 
518                 stdout = subprocess.PIPE,
519                 stdin = subprocess.PIPE, 
520                 stderr = subprocess.PIPE)
521         
522         # attach tempfile object to the process, to make sure the file stays
523         # alive until the process is finished with it
524         proc._known_hosts = tmp_known_hosts
525         
526         out, err = proc.communicate(stdin)
527         if TRACE:
528             print " -> ", out, err
529
530         return ((out, err), proc)
531  
532 def popen_scp(source, dest, 
533             port = None, 
534             agent = None, 
535             recursive = False,
536             ident_key = None,
537             server_key = None):
538         """
539         Copies from/to remote sites.
540         
541         Source and destination should have the user and host encoded
542         as per scp specs.
543         
544         If source is a file object, a special mode will be used to
545         create the remote file with the same contents.
546         
547         If dest is a file object, the remote file (source) will be
548         read and written into dest.
549         
550         In these modes, recursive cannot be True.
551         
552         Source can be a list of files to copy to a single destination,
553         in which case it is advised that the destination be a folder.
554         """
555         
556         if TRACE:
557             print "scp", source, dest
558         
559         if isinstance(source, file) or isinstance(dest, file) \
560                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
561             assert not recursive
562             
563             # Parse source/destination as <user>@<server>:<path>
564             if isinstance(dest, basestring) and ':' in dest:
565                 remspec, path = dest.split(':',1)
566             elif isinstance(source, basestring) and ':' in source:
567                 remspec, path = source.split(':',1)
568             else:
569                 raise ValueError, "Both endpoints cannot be local"
570             user,host = remspec.rsplit('@',1)
571             tmp_known_hosts = None
572             
573             args = ['ssh', '-l', user, '-C',
574                     # Don't bother with localhost. Makes test easier
575                     '-o', 'NoHostAuthenticationForLocalhost=yes',
576                     host ]
577             if port:
578                 args.append('-P%d' % port)
579             if ident_key:
580                 args.extend(('-i', ident_key))
581             if server_key:
582                 # Create a temporary server key file
583                 tmp_known_hosts = _make_server_key_args(
584                     server_key, host, port, args)
585             
586             if isinstance(source, file) or hasattr(source, 'read'):
587                 args.append('cat > %s' % (shell_escape(path),))
588             elif isinstance(dest, file) or hasattr(dest, 'write'):
589                 args.append('cat %s' % (shell_escape(path),))
590             else:
591                 raise AssertionError, "Unreachable code reached! :-Q"
592             
593             # connects to the remote host and starts a remote connection
594             if isinstance(source, file):
595                 proc = subprocess.Popen(args, 
596                         stdout = open('/dev/null','w'),
597                         stderr = subprocess.PIPE,
598                         stdin = source)
599                 err = proc.stderr.read()
600                 proc._known_hosts = tmp_known_hosts
601                 eintr_retry(proc.wait)()
602                 return ((None,err), proc)
603             elif isinstance(dest, file):
604                 proc = subprocess.Popen(args, 
605                         stdout = open('/dev/null','w'),
606                         stderr = subprocess.PIPE,
607                         stdin = source)
608                 err = proc.stderr.read()
609                 proc._known_hosts = tmp_known_hosts
610                 eintr_retry(proc.wait)()
611                 return ((None,err), proc)
612             elif hasattr(source, 'read'):
613                 # file-like (but not file) source
614                 proc = subprocess.Popen(args, 
615                         stdout = open('/dev/null','w'),
616                         stderr = subprocess.PIPE,
617                         stdin = subprocess.PIPE)
618                 
619                 buf = None
620                 err = []
621                 while True:
622                     if not buf:
623                         buf = source.read(4096)
624                     if not buf:
625                         #EOF
626                         break
627                     
628                     rdrdy, wrdy, broken = select.select(
629                         [proc.stderr],
630                         [proc.stdin],
631                         [proc.stderr,proc.stdin])
632                     
633                     if proc.stderr in rdrdy:
634                         # use os.read for fully unbuffered behavior
635                         err.append(os.read(proc.stderr.fileno(), 4096))
636                     
637                     if proc.stdin in wrdy:
638                         proc.stdin.write(buf)
639                         buf = None
640                     
641                     if broken:
642                         break
643                 proc.stdin.close()
644                 err.append(proc.stderr.read())
645                     
646                 proc._known_hosts = tmp_known_hosts
647                 eintr_retry(proc.wait)()
648                 return ((None,''.join(err)), proc)
649             elif hasattr(dest, 'write'):
650                 # file-like (but not file) dest
651                 proc = subprocess.Popen(args, 
652                         stdout = subprocess.PIPE,
653                         stderr = subprocess.PIPE,
654                         stdin = open('/dev/null','w'))
655                 
656                 buf = None
657                 err = []
658                 while True:
659                     rdrdy, wrdy, broken = select.select(
660                         [proc.stderr, proc.stdout],
661                         [],
662                         [proc.stderr, proc.stdout])
663                     
664                     if proc.stderr in rdrdy:
665                         # use os.read for fully unbuffered behavior
666                         err.append(os.read(proc.stderr.fileno(), 4096))
667                     
668                     if proc.stdout in rdrdy:
669                         # use os.read for fully unbuffered behavior
670                         buf = os.read(proc.stdout.fileno(), 4096)
671                         dest.write(buf)
672                         
673                         if not buf:
674                             #EOF
675                             break
676                     
677                     if broken:
678                         break
679                 err.append(proc.stderr.read())
680                     
681                 proc._known_hosts = tmp_known_hosts
682                 eintr_retry(proc.wait)()
683                 return ((None,''.join(err)), proc)
684             else:
685                 raise AssertionError, "Unreachable code reached! :-Q"
686         else:
687             # Parse destination as <user>@<server>:<path>
688             if isinstance(dest, basestring) and ':' in dest:
689                 remspec, path = dest.split(':',1)
690             elif isinstance(source, basestring) and ':' in source:
691                 remspec, path = source.split(':',1)
692             else:
693                 raise ValueError, "Both endpoints cannot be local"
694             user,host = remspec.rsplit('@',1)
695             
696             # plain scp
697             tmp_known_hosts = None
698             args = ['scp', '-q', '-p', '-C',
699                     # Don't bother with localhost. Makes test easier
700                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
701             if port:
702                 args.append('-P%d' % port)
703             if recursive:
704                 args.append('-r')
705             if ident_key:
706                 args.extend(('-i', ident_key))
707             if server_key:
708                 # Create a temporary server key file
709                 tmp_known_hosts = _make_server_key_args(
710                     server_key, host, port, args)
711             if isinstance(source,list):
712                 args.extend(source)
713             else:
714                 args.append(source)
715             args.append(dest)
716
717             # connects to the remote host and starts a remote connection
718             proc = subprocess.Popen(args, 
719                     stdout = subprocess.PIPE,
720                     stdin = subprocess.PIPE, 
721                     stderr = subprocess.PIPE)
722             proc._known_hosts = tmp_known_hosts
723             
724             comm = proc.communicate()
725             eintr_retry(proc.wait)()
726             return (comm, proc)
727  
728 def popen_ssh_subprocess(python_code, host, port, user, agent, 
729         python_path = None,
730         ident_key = None,
731         server_key = None,
732         tty = False,
733         environment_setup = "",
734         waitcommand = False):
735         cmd = ""
736         if python_path:
737             python_path.replace("'", r"'\''")
738             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
739             cmd += " ; "
740         if environment_setup:
741             cmd += environment_setup
742             cmd += " ; "
743         # Uncomment for debug (to run everything under strace)
744         # We had to verify if strace works (cannot nest them)
745         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
746         #cmd += "$CMD "
747         #cmd += "strace -f -tt -s 200 -o strace$$.out "
748         cmd += "python -c '"
749         cmd += "import base64, os\n"
750         cmd += "cmd = \"\"\n"
751         cmd += "while True:\n"
752         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
753         cmd += " if cmd[-1] == \"\\n\": break\n"
754         cmd += "cmd = base64.b64decode(cmd)\n"
755         # Uncomment for debug
756         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
757         if not waitcommand:
758             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
759         cmd += "exec(cmd)\n"
760         if waitcommand:
761             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
762         cmd += "'"
763         
764         tmp_known_hosts = None
765         args = ['ssh',
766                 # Don't bother with localhost. Makes test easier
767                 '-o', 'NoHostAuthenticationForLocalhost=yes',
768                 '-l', user, host]
769         if agent:
770             args.append('-A')
771         if port:
772             args.append('-p%d' % port)
773         if ident_key:
774             args.extend(('-i', ident_key))
775         if tty:
776             args.append('-t')
777         if server_key:
778             # Create a temporary server key file
779             tmp_known_hosts = _make_server_key_args(
780                 server_key, host, port, args)
781         args.append(cmd)
782
783         # connects to the remote host and starts a remote rpyc connection
784         proc = subprocess.Popen(args, 
785                 stdout = subprocess.PIPE,
786                 stdin = subprocess.PIPE, 
787                 stderr = subprocess.PIPE)
788         proc._known_hosts = tmp_known_hosts
789         
790         # send the command to execute
791         os.write(proc.stdin.fileno(),
792                 base64.b64encode(python_code) + "\n")
793         msg = os.read(proc.stdout.fileno(), 3)
794         if msg != "OK\n":
795             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
796                 msg, proc.stdout.read(), proc.stderr.read())
797         return proc
798