Merging with head
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except select.error, args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68         else:
69             return func(*p, **kw)
70     return rv
71
72 class Server(object):
73     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
74         self._root_dir = root_dir
75         self._stop = False
76         self._ctrl_sock = None
77         self._log_level = log_level
78         self._rdbuf = ""
79
80     def run(self):
81         try:
82             if self.daemonize():
83                 self.post_daemonize()
84                 self.loop()
85                 self.cleanup()
86                 # ref: "os._exit(0)"
87                 # can not return normally after fork beacuse no exec was done.
88                 # This means that if we don't do a os._exit(0) here the code that 
89                 # follows the call to "Server.run()" in the "caller code" will be 
90                 # executed... but by now it has already been executed after the 
91                 # first process (the one that did the first fork) returned.
92                 os._exit(0)
93         except:
94             self.log_error()
95             self.cleanup()
96             os._exit(0)
97
98     def daemonize(self):
99         # pipes for process synchronization
100         (r, w) = os.pipe()
101         
102         # build root folder
103         root = os.path.normpath(self._root_dir)
104         if not os.path.exists(root):
105             os.makedirs(root, 0755)
106
107         pid1 = os.fork()
108         if pid1 > 0:
109             os.close(w)
110             while True:
111                 try:
112                     os.read(r, 1)
113                 except OSError, e: # pragma: no cover
114                     if e.errno == errno.EINTR:
115                         continue
116                     else:
117                         raise
118                 break
119             os.close(r)
120             # os.waitpid avoids leaving a <defunc> (zombie) process
121             st = os.waitpid(pid1, 0)[1]
122             if st:
123                 raise RuntimeError("Daemonization failed")
124             # return 0 to inform the caller method that this is not the 
125             # daemonized process
126             return 0
127         os.close(r)
128
129         # Decouple from parent environment.
130         os.chdir(self._root_dir)
131         os.umask(0)
132         os.setsid()
133
134         # fork 2
135         pid2 = os.fork()
136         if pid2 > 0:
137             # see ref: "os._exit(0)"
138             os._exit(0)
139
140         # close all open file descriptors.
141         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
142         if (max_fd == resource.RLIM_INFINITY):
143             max_fd = MAX_FD
144         for fd in range(3, max_fd):
145             if fd != w:
146                 try:
147                     os.close(fd)
148                 except OSError:
149                     pass
150
151         # Redirect standard file descriptors.
152         stdin = open(DEV_NULL, "r")
153         stderr = stdout = open(STD_ERR, "a", 0)
154         os.dup2(stdin.fileno(), sys.stdin.fileno())
155         # NOTE: sys.stdout.write will still be buffered, even if the file
156         # was opened with 0 buffer
157         os.dup2(stdout.fileno(), sys.stdout.fileno())
158         os.dup2(stderr.fileno(), sys.stderr.fileno())
159
160         # create control socket
161         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162         self._ctrl_sock.bind(CTRL_SOCK)
163         self._ctrl_sock.listen(0)
164
165         # let the parent process know that the daemonization is finished
166         os.write(w, "\n")
167         os.close(w)
168         return 1
169
170     def post_daemonize(self):
171         pass
172
173     def loop(self):
174         while not self._stop:
175             conn, addr = self._ctrl_sock.accept()
176             conn.settimeout(5)
177             while not self._stop:
178                 try:
179                     msg = self.recv_msg(conn)
180                 except socket.timeout, e:
181                     break
182                     
183                 if msg == STOP_MSG:
184                     self._stop = True
185                     reply = self.stop_action()
186                 else:
187                     reply = self.reply_action(msg)
188                 
189                 try:
190                     self.send_reply(conn, reply)
191                 except socket.error:
192                     self.log_error()
193                     self.log_error("NOTICE: Awaiting for reconnection")
194                     break
195             try:
196                 conn.close()
197             except:
198                 # Doesn't matter
199                 self.log_error()
200
201     def recv_msg(self, conn):
202         data = [self._rdbuf]
203         chunk = data[0]
204         while '\n' not in chunk:
205             try:
206                 chunk = conn.recv(1024)
207             except OSError, e:
208                 if e.errno != errno.EINTR:
209                     raise
210                 if chunk == '':
211                     continue
212             if chunk:
213                 data.append(chunk)
214             else:
215                 # empty chunk = EOF
216                 break
217         data = ''.join(data).split('\n',1)
218         while len(data) < 2:
219             data.append('')
220         data, self._rdbuf = data
221         
222         decoded = base64.b64decode(data)
223         return decoded.rstrip()
224
225     def send_reply(self, conn, reply):
226         encoded = base64.b64encode(reply)
227         conn.send("%s\n" % encoded)
228        
229     def cleanup(self):
230         try:
231             self._ctrl_sock.close()
232             os.remove(CTRL_SOCK)
233         except:
234             self.log_error()
235
236     def stop_action(self):
237         return "Stopping server"
238
239     def reply_action(self, msg):
240         return "Reply to: %s" % msg
241
242     def log_error(self, text = None, context = ''):
243         if text == None:
244             text = traceback.format_exc()
245         date = time.strftime("%Y-%m-%d %H:%M:%S")
246         if context:
247             context = " (%s)" % (context,)
248         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
249         return text
250
251     def log_debug(self, text):
252         if self._log_level == DEBUG_LEVEL:
253             date = time.strftime("%Y-%m-%d %H:%M:%S")
254             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
255
256 class Forwarder(object):
257     def __init__(self, root_dir = "."):
258         self._ctrl_sock = None
259         self._root_dir = root_dir
260         self._stop = False
261         self._rdbuf = ""
262
263     def forward(self):
264         self.connect()
265         print >>sys.stderr, "READY."
266         while not self._stop:
267             data = self.read_data()
268             self.send_to_server(data)
269             data = self.recv_from_server()
270             self.write_data(data)
271         self.disconnect()
272
273     def read_data(self):
274         return sys.stdin.readline()
275
276     def write_data(self, data):
277         sys.stdout.write(data)
278         # sys.stdout.write is buffered, this is why we need to do a flush()
279         sys.stdout.flush()
280
281     def send_to_server(self, data):
282         try:
283             self._ctrl_sock.send(data)
284         except IOError, e:
285             if e.errno == errno.EPIPE:
286                 self.connect()
287                 self._ctrl_sock.send(data)
288             else:
289                 raise e
290         encoded = data.rstrip() 
291         msg = base64.b64decode(encoded)
292         if msg == STOP_MSG:
293             self._stop = True
294
295     def recv_from_server(self):
296         data = [self._rdbuf]
297         chunk = data[0]
298         while '\n' not in chunk:
299             try:
300                 chunk = self._ctrl_sock.recv(1024)
301             except OSError, e:
302                 if e.errno != errno.EINTR:
303                     raise
304                 if chunk == '':
305                     continue
306             if chunk:
307                 data.append(chunk)
308             else:
309                 # empty chunk = EOF
310                 break
311         data = ''.join(data).split('\n',1)
312         while len(data) < 2:
313             data.append('')
314         data, self._rdbuf = data
315         
316         return data+'\n'
317  
318     def connect(self):
319         self.disconnect()
320         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
321         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
322         self._ctrl_sock.connect(sock_addr)
323
324     def disconnect(self):
325         try:
326             self._ctrl_sock.close()
327         except:
328             pass
329
330 class Client(object):
331     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
332             agent = None, environment_setup = ""):
333         self.root_dir = root_dir
334         self.addr = (host, port)
335         self.user = user
336         self.agent = agent
337         self.environment_setup = environment_setup
338         self._stopped = False
339         self._deferreds = collections.deque()
340         self.connect()
341     
342     def __del__(self):
343         if self._process.poll() is None:
344             os.kill(self._process.pid, signal.SIGTERM)
345         self._process.wait()
346         
347     def connect(self):
348         root_dir = self.root_dir
349         (host, port) = self.addr
350         user = self.user
351         agent = self.agent
352         
353         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
354                 c.forward()" % (root_dir,)
355         if host != None:
356             self._process = popen_ssh_subprocess(python_code, host, port, 
357                     user, agent,
358                     environment_setup = self.environment_setup)
359             # popen_ssh_subprocess already waits for readiness
360             if self._process.poll():
361                 err = proc.stderr.read()
362                 raise RuntimeError("Client could not be reached: %s" % \
363                         err)
364         else:
365             self._process = subprocess.Popen(
366                     ["python", "-c", python_code],
367                     stdin = subprocess.PIPE, 
368                     stdout = subprocess.PIPE,
369                     stderr = subprocess.PIPE
370                 )
371                 
372         # Wait for the forwarder to be ready, otherwise nobody
373         # will be able to connect to it
374         helo = self._process.stderr.readline()
375         if helo != 'READY.\n':
376             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
377                     helo + self._process.stderr.read())
378         
379     def send_msg(self, msg):
380         encoded = base64.b64encode(msg)
381         data = "%s\n" % encoded
382         
383         try:
384             self._process.stdin.write(data)
385         except (IOError, ValueError):
386             # dead process, poll it to un-zombify
387             self._process.poll()
388             
389             # try again after reconnect
390             # If it fails again, though, give up
391             self.connect()
392             self._process.stdin.write(data)
393
394     def send_stop(self):
395         self.send_msg(STOP_MSG)
396         self._stopped = True
397
398     def defer_reply(self, transform=None):
399         defer_entry = []
400         self._deferreds.append(defer_entry)
401         return defer.Defer(
402             functools.partial(self.read_reply, defer_entry, transform)
403         )
404         
405     def _read_reply(self):
406         data = self._process.stdout.readline()
407         encoded = data.rstrip() 
408         if not encoded:
409             # empty == eof == dead process, poll it to un-zombify
410             self._process.poll()
411             
412             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
413         return base64.b64decode(encoded)
414     
415     def read_reply(self, which=None, transform=None):
416         # Test to see if someone did it already
417         if which is not None and len(which):
418             # Ok, they did it...
419             # ...just return the deferred value
420             if transform:
421                 return transform(which[0])
422             else:
423                 return which[0]
424         
425         # Process all deferreds until the one we're looking for
426         # or until the queue is empty
427         while self._deferreds:
428             try:
429                 deferred = self._deferreds.popleft()
430             except IndexError:
431                 # emptied
432                 break
433             
434             deferred.append(self._read_reply())
435             if deferred is which:
436                 # We reached the one we were looking for
437                 if transform:
438                     return transform(deferred[0])
439                 else:
440                     return deferred[0]
441         
442         if which is None:
443             # They've requested a synchronous read
444             if transform:
445                 return transform(self._read_reply())
446             else:
447                 return self._read_reply()
448
449 def _make_server_key_args(server_key, host, port, args):
450     """ 
451     Returns a reference to the created temporary file, and adds the
452     corresponding arguments to the given argument list.
453     
454     Make sure to hold onto it until the process is done with the file
455     """
456     if port is not None:
457         host = '%s:%s' % (host,port)
458     # Create a temporary server key file
459     tmp_known_hosts = tempfile.NamedTemporaryFile()
460     
461     # Add the intended host key
462     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
463     
464     # If we're not in strict mode, add user-configured keys
465     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
466         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
467         if os.access(user_hosts_path, os.R_OK):
468             f = open(user_hosts_path, "r")
469             tmp_known_hosts.write(f.read())
470             f.close()
471         
472     tmp_known_hosts.flush()
473     
474     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
475     
476     return tmp_known_hosts
477
478 def popen_ssh_command(command, host, port, user, agent, 
479             stdin="", 
480             ident_key = None,
481             server_key = None,
482             tty = False):
483         """
484         Executes a remote commands, returns ((stdout,stderr),process)
485         """
486         if TRACE:
487             print "ssh", host, command
488         
489         tmp_known_hosts = None
490         args = ['ssh',
491                 # Don't bother with localhost. Makes test easier
492                 '-o', 'NoHostAuthenticationForLocalhost=yes',
493                 '-l', user, host]
494         if agent:
495             args.append('-A')
496         if port:
497             args.append('-p%d' % port)
498         if ident_key:
499             args.extend(('-i', ident_key))
500         if tty:
501             args.append('-t')
502         if server_key:
503             # Create a temporary server key file
504             tmp_known_hosts = _make_server_key_args(
505                 server_key, host, port, args)
506         args.append(command)
507
508         # connects to the remote host and starts a remote connection
509         proc = subprocess.Popen(args, 
510                 stdout = subprocess.PIPE,
511                 stdin = subprocess.PIPE, 
512                 stderr = subprocess.PIPE)
513         
514         # attach tempfile object to the process, to make sure the file stays
515         # alive until the process is finished with it
516         proc._known_hosts = tmp_known_hosts
517         
518         out, err = proc.communicate(stdin)
519         if TRACE:
520             print " -> ", out, err
521
522         return ((out, err), proc)
523  
524 def popen_scp(source, dest, 
525             port = None, 
526             agent = None, 
527             recursive = False,
528             ident_key = None,
529             server_key = None):
530         """
531         Copies from/to remote sites.
532         
533         Source and destination should have the user and host encoded
534         as per scp specs.
535         
536         If source is a file object, a special mode will be used to
537         create the remote file with the same contents.
538         
539         If dest is a file object, the remote file (source) will be
540         read and written into dest.
541         
542         In these modes, recursive cannot be True.
543         
544         Source can be a list of files to copy to a single destination,
545         in which case it is advised that the destination be a folder.
546         """
547         
548         if TRACE:
549             print "scp", source, dest
550         
551         if isinstance(source, file) or isinstance(dest, file) \
552                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
553             assert not recursive
554             
555             # Parse source/destination as <user>@<server>:<path>
556             if isinstance(dest, basestring) and ':' in dest:
557                 remspec, path = dest.split(':',1)
558             elif isinstance(source, basestring) and ':' in source:
559                 remspec, path = source.split(':',1)
560             else:
561                 raise ValueError, "Both endpoints cannot be local"
562             user,host = remspec.rsplit('@',1)
563             tmp_known_hosts = None
564             
565             args = ['ssh', '-l', user, '-C',
566                     # Don't bother with localhost. Makes test easier
567                     '-o', 'NoHostAuthenticationForLocalhost=yes',
568                     host ]
569             if port:
570                 args.append('-P%d' % port)
571             if ident_key:
572                 args.extend(('-i', ident_key))
573             if server_key:
574                 # Create a temporary server key file
575                 tmp_known_hosts = _make_server_key_args(
576                     server_key, host, port, args)
577             
578             if isinstance(source, file) or hasattr(source, 'read'):
579                 args.append('cat > %s' % (shell_escape(path),))
580             elif isinstance(dest, file) or hasattr(dest, 'write'):
581                 args.append('cat %s' % (shell_escape(path),))
582             else:
583                 raise AssertionError, "Unreachable code reached! :-Q"
584             
585             # connects to the remote host and starts a remote connection
586             if isinstance(source, file):
587                 proc = subprocess.Popen(args, 
588                         stdout = open('/dev/null','w'),
589                         stderr = subprocess.PIPE,
590                         stdin = source)
591                 err = proc.stderr.read()
592                 proc._known_hosts = tmp_known_hosts
593                 eintr_retry(proc.wait)()
594                 return ((None,err), proc)
595             elif isinstance(dest, file):
596                 proc = subprocess.Popen(args, 
597                         stdout = open('/dev/null','w'),
598                         stderr = subprocess.PIPE,
599                         stdin = source)
600                 err = proc.stderr.read()
601                 proc._known_hosts = tmp_known_hosts
602                 eintr_retry(proc.wait)()
603                 return ((None,err), proc)
604             elif hasattr(source, 'read'):
605                 # file-like (but not file) source
606                 proc = subprocess.Popen(args, 
607                         stdout = open('/dev/null','w'),
608                         stderr = subprocess.PIPE,
609                         stdin = subprocess.PIPE)
610                 
611                 buf = None
612                 err = []
613                 while True:
614                     if not buf:
615                         buf = source.read(4096)
616                     if not buf:
617                         #EOF
618                         break
619                     
620                     rdrdy, wrdy, broken = select.select(
621                         [proc.stderr],
622                         [proc.stdin],
623                         [proc.stderr,proc.stdin])
624                     
625                     if proc.stderr in rdrdy:
626                         # use os.read for fully unbuffered behavior
627                         err.append(os.read(proc.stderr.fileno(), 4096))
628                     
629                     if proc.stdin in wrdy:
630                         proc.stdin.write(buf)
631                         buf = None
632                     
633                     if broken:
634                         break
635                 proc.stdin.close()
636                 err.append(proc.stderr.read())
637                     
638                 proc._known_hosts = tmp_known_hosts
639                 eintr_retry(proc.wait)()
640                 return ((None,''.join(err)), proc)
641             elif hasattr(dest, 'write'):
642                 # file-like (but not file) dest
643                 proc = subprocess.Popen(args, 
644                         stdout = subprocess.PIPE,
645                         stderr = subprocess.PIPE,
646                         stdin = open('/dev/null','w'))
647                 
648                 buf = None
649                 err = []
650                 while True:
651                     rdrdy, wrdy, broken = select.select(
652                         [proc.stderr, proc.stdout],
653                         [],
654                         [proc.stderr, proc.stdout])
655                     
656                     if proc.stderr in rdrdy:
657                         # use os.read for fully unbuffered behavior
658                         err.append(os.read(proc.stderr.fileno(), 4096))
659                     
660                     if proc.stdout in rdrdy:
661                         # use os.read for fully unbuffered behavior
662                         buf = os.read(proc.stdout.fileno(), 4096)
663                         dest.write(buf)
664                         
665                         if not buf:
666                             #EOF
667                             break
668                     
669                     if broken:
670                         break
671                 err.append(proc.stderr.read())
672                     
673                 proc._known_hosts = tmp_known_hosts
674                 eintr_retry(proc.wait)()
675                 return ((None,''.join(err)), proc)
676             else:
677                 raise AssertionError, "Unreachable code reached! :-Q"
678         else:
679             # Parse destination as <user>@<server>:<path>
680             if isinstance(dest, basestring) and ':' in dest:
681                 remspec, path = dest.split(':',1)
682             elif isinstance(source, basestring) and ':' in source:
683                 remspec, path = source.split(':',1)
684             else:
685                 raise ValueError, "Both endpoints cannot be local"
686             user,host = remspec.rsplit('@',1)
687             
688             # plain scp
689             tmp_known_hosts = None
690             args = ['scp', '-q', '-p', '-C',
691                     # Don't bother with localhost. Makes test easier
692                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
693             if port:
694                 args.append('-P%d' % port)
695             if recursive:
696                 args.append('-r')
697             if ident_key:
698                 args.extend(('-i', ident_key))
699             if server_key:
700                 # Create a temporary server key file
701                 tmp_known_hosts = _make_server_key_args(
702                     server_key, host, port, args)
703             if isinstance(source,list):
704                 args.extend(source)
705             else:
706                 args.append(source)
707             args.append(dest)
708
709             # connects to the remote host and starts a remote connection
710             proc = subprocess.Popen(args, 
711                     stdout = subprocess.PIPE,
712                     stdin = subprocess.PIPE, 
713                     stderr = subprocess.PIPE)
714             proc._known_hosts = tmp_known_hosts
715             
716             comm = proc.communicate()
717             eintr_retry(proc.wait)()
718             return (comm, proc)
719  
720 def popen_ssh_subprocess(python_code, host, port, user, agent, 
721         python_path = None,
722         ident_key = None,
723         server_key = None,
724         tty = False,
725         environment_setup = "",
726         waitcommand = False):
727         cmd = ""
728         if python_path:
729             python_path.replace("'", r"'\''")
730             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
731             cmd += " ; "
732         if environment_setup:
733             cmd += environment_setup
734             cmd += " ; "
735         # Uncomment for debug (to run everything under strace)
736         # We had to verify if strace works (cannot nest them)
737         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
738         #cmd += "$CMD "
739         #cmd += "strace -f -tt -s 200 -o strace$$.out "
740         cmd += "python -c '"
741         cmd += "import base64, os\n"
742         cmd += "cmd = \"\"\n"
743         cmd += "while True:\n"
744         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
745         cmd += " if cmd[-1] == \"\\n\": break\n"
746         cmd += "cmd = base64.b64decode(cmd)\n"
747         # Uncomment for debug
748         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
749         if not waitcommand:
750             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
751         cmd += "exec(cmd)\n"
752         if waitcommand:
753             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
754         cmd += "'"
755         
756         tmp_known_hosts = None
757         args = ['ssh',
758                 # Don't bother with localhost. Makes test easier
759                 '-o', 'NoHostAuthenticationForLocalhost=yes',
760                 '-l', user, host]
761         if agent:
762             args.append('-A')
763         if port:
764             args.append('-p%d' % port)
765         if ident_key:
766             args.extend(('-i', ident_key))
767         if tty:
768             args.append('-t')
769         if server_key:
770             # Create a temporary server key file
771             tmp_known_hosts = _make_server_key_args(
772                 server_key, host, port, args)
773         args.append(cmd)
774
775         # connects to the remote host and starts a remote rpyc connection
776         proc = subprocess.Popen(args, 
777                 stdout = subprocess.PIPE,
778                 stdin = subprocess.PIPE, 
779                 stderr = subprocess.PIPE)
780         proc._known_hosts = tmp_known_hosts
781         
782         # send the command to execute
783         os.write(proc.stdin.fileno(),
784                 base64.b64encode(python_code) + "\n")
785         msg = os.read(proc.stdout.fileno(), 3)
786         if msg != "OK\n":
787             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
788                 msg, proc.stdout.read(), proc.stderr.read())
789         return proc
790