Some trivial fixes (EINTR stuff) on server.py
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except select.error, args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68         else:
69             return func(*p, **kw)
70     return rv
71
72 class Server(object):
73     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
74         self._root_dir = root_dir
75         self._stop = False
76         self._ctrl_sock = None
77         self._log_level = log_level
78         self._rdbuf = ""
79
80     def run(self):
81         try:
82             if self.daemonize():
83                 self.post_daemonize()
84                 self.loop()
85                 self.cleanup()
86                 # ref: "os._exit(0)"
87                 # can not return normally after fork beacuse no exec was done.
88                 # This means that if we don't do a os._exit(0) here the code that 
89                 # follows the call to "Server.run()" in the "caller code" will be 
90                 # executed... but by now it has already been executed after the 
91                 # first process (the one that did the first fork) returned.
92                 os._exit(0)
93         except:
94             self.log_error()
95             self.cleanup()
96             os._exit(0)
97
98     def daemonize(self):
99         # pipes for process synchronization
100         (r, w) = os.pipe()
101         
102         # build root folder
103         root = os.path.normpath(self._root_dir)
104         if not os.path.exists(root):
105             os.makedirs(root, 0755)
106
107         pid1 = os.fork()
108         if pid1 > 0:
109             os.close(w)
110             while True:
111                 try:
112                     os.read(r, 1)
113                 except OSError, e: # pragma: no cover
114                     if e.errno == errno.EINTR:
115                         continue
116                     else:
117                         raise
118                 break
119             os.close(r)
120             # os.waitpid avoids leaving a <defunc> (zombie) process
121             st = os.waitpid(pid1, 0)[1]
122             if st:
123                 raise RuntimeError("Daemonization failed")
124             # return 0 to inform the caller method that this is not the 
125             # daemonized process
126             return 0
127         os.close(r)
128
129         # Decouple from parent environment.
130         os.chdir(self._root_dir)
131         os.umask(0)
132         os.setsid()
133
134         # fork 2
135         pid2 = os.fork()
136         if pid2 > 0:
137             # see ref: "os._exit(0)"
138             os._exit(0)
139
140         # close all open file descriptors.
141         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
142         if (max_fd == resource.RLIM_INFINITY):
143             max_fd = MAX_FD
144         for fd in range(3, max_fd):
145             if fd != w:
146                 try:
147                     os.close(fd)
148                 except OSError:
149                     pass
150
151         # Redirect standard file descriptors.
152         stdin = open(DEV_NULL, "r")
153         stderr = stdout = open(STD_ERR, "a", 0)
154         os.dup2(stdin.fileno(), sys.stdin.fileno())
155         # NOTE: sys.stdout.write will still be buffered, even if the file
156         # was opened with 0 buffer
157         os.dup2(stdout.fileno(), sys.stdout.fileno())
158         os.dup2(stderr.fileno(), sys.stderr.fileno())
159
160         # create control socket
161         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162         self._ctrl_sock.bind(CTRL_SOCK)
163         self._ctrl_sock.listen(0)
164
165         # let the parent process know that the daemonization is finished
166         os.write(w, "\n")
167         os.close(w)
168         return 1
169
170     def post_daemonize(self):
171         pass
172
173     def loop(self):
174         while not self._stop:
175             conn, addr = self._ctrl_sock.accept()
176             conn.settimeout(5)
177             while not self._stop:
178                 try:
179                     msg = self.recv_msg(conn)
180                 except socket.timeout, e:
181                     break
182                     
183                 if msg == STOP_MSG:
184                     self._stop = True
185                     reply = self.stop_action()
186                 else:
187                     reply = self.reply_action(msg)
188                 
189                 try:
190                     self.send_reply(conn, reply)
191                 except socket.error:
192                     self.log_error()
193                     self.log_error("NOTICE: Awaiting for reconnection")
194                     break
195             try:
196                 conn.close()
197             except:
198                 # Doesn't matter
199                 self.log_error()
200
201     def recv_msg(self, conn):
202         data = [self._rdbuf]
203         chunk = data[0]
204         while '\n' not in chunk:
205             try:
206                 chunk = conn.recv(1024)
207             except OSError, e:
208                 if e.errno != errno.EINTR:
209                     raise
210                 if chunk == '':
211                     continue
212             if chunk:
213                 data.append(chunk)
214             else:
215                 # empty chunk = EOF
216                 break
217         data = ''.join(data).split('\n',1)
218         while len(data) < 2:
219             data.append('')
220         data, self._rdbuf = data
221         
222         decoded = base64.b64decode(data)
223         return decoded.rstrip()
224
225     def send_reply(self, conn, reply):
226         encoded = base64.b64encode(reply)
227         conn.send("%s\n" % encoded)
228        
229     def cleanup(self):
230         try:
231             self._ctrl_sock.close()
232             os.remove(CTRL_SOCK)
233         except:
234             self.log_error()
235
236     def stop_action(self):
237         return "Stopping server"
238
239     def reply_action(self, msg):
240         return "Reply to: %s" % msg
241
242     def log_error(self, text = None, context = ''):
243         if text == None:
244             text = traceback.format_exc()
245         date = time.strftime("%Y-%m-%d %H:%M:%S")
246         if context:
247             context = " (%s)" % (context,)
248         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
249         return text
250
251     def log_debug(self, text):
252         if self._log_level == DEBUG_LEVEL:
253             date = time.strftime("%Y-%m-%d %H:%M:%S")
254             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
255
256 class Forwarder(object):
257     def __init__(self, root_dir = "."):
258         self._ctrl_sock = None
259         self._root_dir = root_dir
260         self._stop = False
261         self._rdbuf = ""
262
263     def forward(self):
264         self.connect()
265         print >>sys.stderr, "READY."
266         while not self._stop:
267             data = self.read_data()
268             self.send_to_server(data)
269             data = self.recv_from_server()
270             self.write_data(data)
271         self.disconnect()
272
273     def read_data(self):
274         return sys.stdin.readline()
275
276     def write_data(self, data):
277         sys.stdout.write(data)
278         # sys.stdout.write is buffered, this is why we need to do a flush()
279         sys.stdout.flush()
280
281     def send_to_server(self, data):
282         try:
283             self._ctrl_sock.send(data)
284         except IOError, e:
285             if e.errno == errno.EPIPE:
286                 self.connect()
287                 self._ctrl_sock.send(data)
288             else:
289                 raise e
290         encoded = data.rstrip() 
291         msg = base64.b64decode(encoded)
292         if msg == STOP_MSG:
293             self._stop = True
294
295     def recv_from_server(self):
296         data = [self._rdbuf]
297         chunk = data[0]
298         while '\n' not in chunk:
299             try:
300                 chunk = self._ctrl_sock.recv(1024)
301             except OSError, e:
302                 if e.errno != errno.EINTR:
303                     raise
304                 if chunk == '':
305                     continue
306             if chunk:
307                 data.append(chunk)
308             else:
309                 # empty chunk = EOF
310                 break
311         data = ''.join(data).split('\n',1)
312         while len(data) < 2:
313             data.append('')
314         data, self._rdbuf = data
315         
316         return data+'\n'
317  
318     def connect(self):
319         self.disconnect()
320         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
321         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
322         self._ctrl_sock.connect(sock_addr)
323
324     def disconnect(self):
325         try:
326             self._ctrl_sock.close()
327         except:
328             pass
329
330 class Client(object):
331     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
332             agent = None, environment_setup = ""):
333         self.root_dir = root_dir
334         self.addr = (host, port)
335         self.user = user
336         self.agent = agent
337         self.environment_setup = environment_setup
338         self._stopped = False
339         self._deferreds = collections.deque()
340         self.connect()
341     
342     def __del__(self):
343         if self._process.poll() is None:
344             os.kill(self._process.pid, signal.SIGTERM)
345         self._process.wait()
346         
347     def connect(self):
348         root_dir = self.root_dir
349         (host, port) = self.addr
350         user = self.user
351         agent = self.agent
352         
353         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
354                 c.forward()" % (root_dir,)
355         if host != None:
356             self._process = popen_ssh_subprocess(python_code, host, port, 
357                     user, agent,
358                     environment_setup = self.environment_setup)
359             # popen_ssh_subprocess already waits for readiness
360             if self._process.poll():
361                 err = proc.stderr.read()
362                 raise RuntimeError("Client could not be reached: %s" % \
363                         err)
364         else:
365             self._process = subprocess.Popen(
366                     ["python", "-c", python_code],
367                     stdin = subprocess.PIPE, 
368                     stdout = subprocess.PIPE,
369                     stderr = subprocess.PIPE
370                 )
371                 
372         # Wait for the forwarder to be ready, otherwise nobody
373         # will be able to connect to it
374         helo = self._process.stderr.readline()
375         if helo != 'READY.\n':
376             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
377                     helo + self._process.stderr.read())
378         
379     def send_msg(self, msg):
380         encoded = base64.b64encode(msg)
381         data = "%s\n" % encoded
382         
383         try:
384             self._process.stdin.write(data)
385         except (IOError, ValueError):
386             # dead process, poll it to un-zombify
387             self._process.poll()
388             
389             # try again after reconnect
390             # If it fails again, though, give up
391             self.connect()
392             self._process.stdin.write(data)
393
394     def send_stop(self):
395         self.send_msg(STOP_MSG)
396         self._stopped = True
397
398     def defer_reply(self, transform=None):
399         defer_entry = []
400         self._deferreds.append(defer_entry)
401         return defer.Defer(
402             functools.partial(self.read_reply, defer_entry, transform)
403         )
404         
405     def _read_reply(self):
406         data = self._process.stdout.readline()
407         encoded = data.rstrip() 
408         return base64.b64decode(encoded)
409     
410     def read_reply(self, which=None, transform=None):
411         # Test to see if someone did it already
412         if which is not None and len(which):
413             # Ok, they did it...
414             # ...just return the deferred value
415             if transform:
416                 return transform(which[0])
417             else:
418                 return which[0]
419         
420         # Process all deferreds until the one we're looking for
421         # or until the queue is empty
422         while self._deferreds:
423             try:
424                 deferred = self._deferreds.popleft()
425             except IndexError:
426                 # emptied
427                 break
428             
429             deferred.append(self._read_reply())
430             if deferred is which:
431                 # We reached the one we were looking for
432                 if transform:
433                     return transform(deferred[0])
434                 else:
435                     return deferred[0]
436         
437         if which is None:
438             # They've requested a synchronous read
439             if transform:
440                 return transform(self._read_reply())
441             else:
442                 return self._read_reply()
443
444 def _make_server_key_args(server_key, host, port, args):
445     """ 
446     Returns a reference to the created temporary file, and adds the
447     corresponding arguments to the given argument list.
448     
449     Make sure to hold onto it until the process is done with the file
450     """
451     if port is not None:
452         host = '%s:%s' % (host,port)
453     # Create a temporary server key file
454     tmp_known_hosts = tempfile.NamedTemporaryFile()
455     
456     # Add the intended host key
457     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
458     
459     # If we're not in strict mode, add user-configured keys
460     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
461         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
462         if os.access(user_hosts_path, os.R_OK):
463             f = open(user_hosts_path, "r")
464             tmp_known_hosts.write(f.read())
465             f.close()
466         
467     tmp_known_hosts.flush()
468     
469     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
470     
471     return tmp_known_hosts
472
473 def popen_ssh_command(command, host, port, user, agent, 
474             stdin="", 
475             ident_key = None,
476             server_key = None,
477             tty = False):
478         """
479         Executes a remote commands, returns ((stdout,stderr),process)
480         """
481         if TRACE:
482             print "ssh", host, command
483         
484         tmp_known_hosts = None
485         args = ['ssh',
486                 # Don't bother with localhost. Makes test easier
487                 '-o', 'NoHostAuthenticationForLocalhost=yes',
488                 '-l', user, host]
489         if agent:
490             args.append('-A')
491         if port:
492             args.append('-p%d' % port)
493         if ident_key:
494             args.extend(('-i', ident_key))
495         if tty:
496             args.append('-t')
497         if server_key:
498             # Create a temporary server key file
499             tmp_known_hosts = _make_server_key_args(
500                 server_key, host, port, args)
501         args.append(command)
502
503         # connects to the remote host and starts a remote connection
504         proc = subprocess.Popen(args, 
505                 stdout = subprocess.PIPE,
506                 stdin = subprocess.PIPE, 
507                 stderr = subprocess.PIPE)
508         
509         # attach tempfile object to the process, to make sure the file stays
510         # alive until the process is finished with it
511         proc._known_hosts = tmp_known_hosts
512         
513         out, err = proc.communicate(stdin)
514         if TRACE:
515             print " -> ", out, err
516
517         return ((out, err), proc)
518  
519 def popen_scp(source, dest, 
520             port = None, 
521             agent = None, 
522             recursive = False,
523             ident_key = None,
524             server_key = None):
525         """
526         Copies from/to remote sites.
527         
528         Source and destination should have the user and host encoded
529         as per scp specs.
530         
531         If source is a file object, a special mode will be used to
532         create the remote file with the same contents.
533         
534         If dest is a file object, the remote file (source) will be
535         read and written into dest.
536         
537         In these modes, recursive cannot be True.
538         
539         Source can be a list of files to copy to a single destination,
540         in which case it is advised that the destination be a folder.
541         """
542         
543         if TRACE:
544             print "scp", source, dest
545         
546         if isinstance(source, file) or isinstance(dest, file) \
547                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
548             assert not recursive
549             
550             # Parse source/destination as <user>@<server>:<path>
551             if isinstance(dest, basestring) and ':' in dest:
552                 remspec, path = dest.split(':',1)
553             elif isinstance(source, basestring) and ':' in source:
554                 remspec, path = source.split(':',1)
555             else:
556                 raise ValueError, "Both endpoints cannot be local"
557             user,host = remspec.rsplit('@',1)
558             tmp_known_hosts = None
559             
560             args = ['ssh', '-l', user, '-C',
561                     # Don't bother with localhost. Makes test easier
562                     '-o', 'NoHostAuthenticationForLocalhost=yes',
563                     host ]
564             if port:
565                 args.append('-P%d' % port)
566             if ident_key:
567                 args.extend(('-i', ident_key))
568             if server_key:
569                 # Create a temporary server key file
570                 tmp_known_hosts = _make_server_key_args(
571                     server_key, host, port, args)
572             
573             if isinstance(source, file) or hasattr(source, 'read'):
574                 args.append('cat > %s' % (shell_escape(path),))
575             elif isinstance(dest, file) or hasattr(dest, 'write'):
576                 args.append('cat %s' % (shell_escape(path),))
577             else:
578                 raise AssertionError, "Unreachable code reached! :-Q"
579             
580             # connects to the remote host and starts a remote connection
581             if isinstance(source, file):
582                 proc = subprocess.Popen(args, 
583                         stdout = open('/dev/null','w'),
584                         stderr = subprocess.PIPE,
585                         stdin = source)
586                 err = proc.stderr.read()
587                 proc._known_hosts = tmp_known_hosts
588                 eintr_retry(proc.wait)()
589                 return ((None,err), proc)
590             elif isinstance(dest, file):
591                 proc = subprocess.Popen(args, 
592                         stdout = open('/dev/null','w'),
593                         stderr = subprocess.PIPE,
594                         stdin = source)
595                 err = proc.stderr.read()
596                 proc._known_hosts = tmp_known_hosts
597                 eintr_retry(proc.wait)()
598                 return ((None,err), proc)
599             elif hasattr(source, 'read'):
600                 # file-like (but not file) source
601                 proc = subprocess.Popen(args, 
602                         stdout = open('/dev/null','w'),
603                         stderr = subprocess.PIPE,
604                         stdin = subprocess.PIPE)
605                 
606                 buf = None
607                 err = []
608                 while True:
609                     if not buf:
610                         buf = source.read(4096)
611                     if not buf:
612                         #EOF
613                         break
614                     
615                     rdrdy, wrdy, broken = select.select(
616                         [proc.stderr],
617                         [proc.stdin],
618                         [proc.stderr,proc.stdin])
619                     
620                     if proc.stderr in rdrdy:
621                         # use os.read for fully unbuffered behavior
622                         err.append(os.read(proc.stderr.fileno(), 4096))
623                     
624                     if proc.stdin in wrdy:
625                         proc.stdin.write(buf)
626                         buf = None
627                     
628                     if broken:
629                         break
630                 proc.stdin.close()
631                 err.append(proc.stderr.read())
632                     
633                 proc._known_hosts = tmp_known_hosts
634                 eintr_retry(proc.wait)()
635                 return ((None,''.join(err)), proc)
636             elif hasattr(dest, 'write'):
637                 # file-like (but not file) dest
638                 proc = subprocess.Popen(args, 
639                         stdout = subprocess.PIPE,
640                         stderr = subprocess.PIPE,
641                         stdin = open('/dev/null','w'))
642                 
643                 buf = None
644                 err = []
645                 while True:
646                     rdrdy, wrdy, broken = select.select(
647                         [proc.stderr, proc.stdout],
648                         [],
649                         [proc.stderr, proc.stdout])
650                     
651                     if proc.stderr in rdrdy:
652                         # use os.read for fully unbuffered behavior
653                         err.append(os.read(proc.stderr.fileno(), 4096))
654                     
655                     if proc.stdout in rdrdy:
656                         # use os.read for fully unbuffered behavior
657                         buf = os.read(proc.stdout.fileno(), 4096)
658                         dest.write(buf)
659                         
660                         if not buf:
661                             #EOF
662                             break
663                     
664                     if broken:
665                         break
666                 err.append(proc.stderr.read())
667                     
668                 proc._known_hosts = tmp_known_hosts
669                 eintr_retry(proc.wait)()
670                 return ((None,''.join(err)), proc)
671             else:
672                 raise AssertionError, "Unreachable code reached! :-Q"
673         else:
674             # Parse destination as <user>@<server>:<path>
675             if isinstance(dest, basestring) and ':' in dest:
676                 remspec, path = dest.split(':',1)
677             elif isinstance(source, basestring) and ':' in source:
678                 remspec, path = source.split(':',1)
679             else:
680                 raise ValueError, "Both endpoints cannot be local"
681             user,host = remspec.rsplit('@',1)
682             
683             # plain scp
684             tmp_known_hosts = None
685             args = ['scp', '-q', '-p', '-C',
686                     # Don't bother with localhost. Makes test easier
687                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
688             if port:
689                 args.append('-P%d' % port)
690             if recursive:
691                 args.append('-r')
692             if ident_key:
693                 args.extend(('-i', ident_key))
694             if server_key:
695                 # Create a temporary server key file
696                 tmp_known_hosts = _make_server_key_args(
697                     server_key, host, port, args)
698             if isinstance(source,list):
699                 args.extend(source)
700             else:
701                 args.append(source)
702             args.append(dest)
703
704             # connects to the remote host and starts a remote connection
705             proc = subprocess.Popen(args, 
706                     stdout = subprocess.PIPE,
707                     stdin = subprocess.PIPE, 
708                     stderr = subprocess.PIPE)
709             proc._known_hosts = tmp_known_hosts
710             
711             comm = proc.communicate()
712             eintr_retry(proc.wait)()
713             return (comm, proc)
714  
715 def popen_ssh_subprocess(python_code, host, port, user, agent, 
716         python_path = None,
717         ident_key = None,
718         server_key = None,
719         tty = False,
720         environment_setup = "",
721         waitcommand = False):
722         cmd = ""
723         if python_path:
724             python_path.replace("'", r"'\''")
725             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
726             cmd += " ; "
727         if environment_setup:
728             cmd += environment_setup
729             cmd += " ; "
730         # Uncomment for debug (to run everything under strace)
731         # We had to verify if strace works (cannot nest them)
732         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
733         #cmd += "$CMD "
734         #cmd += "strace -f -tt -s 200 -o strace$$.out "
735         cmd += "python -c '"
736         cmd += "import base64, os\n"
737         cmd += "cmd = \"\"\n"
738         cmd += "while True:\n"
739         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
740         cmd += " if cmd[-1] == \"\\n\": break\n"
741         cmd += "cmd = base64.b64decode(cmd)\n"
742         # Uncomment for debug
743         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
744         if not waitcommand:
745             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
746         cmd += "exec(cmd)\n"
747         if waitcommand:
748             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
749         cmd += "'"
750         
751         tmp_known_hosts = None
752         args = ['ssh',
753                 # Don't bother with localhost. Makes test easier
754                 '-o', 'NoHostAuthenticationForLocalhost=yes',
755                 '-l', user, host]
756         if agent:
757             args.append('-A')
758         if port:
759             args.append('-p%d' % port)
760         if ident_key:
761             args.extend(('-i', ident_key))
762         if tty:
763             args.append('-t')
764         if server_key:
765             # Create a temporary server key file
766             tmp_known_hosts = _make_server_key_args(
767                 server_key, host, port, args)
768         args.append(cmd)
769
770         # connects to the remote host and starts a remote rpyc connection
771         proc = subprocess.Popen(args, 
772                 stdout = subprocess.PIPE,
773                 stdin = subprocess.PIPE, 
774                 stderr = subprocess.PIPE)
775         proc._known_hosts = tmp_known_hosts
776         
777         # send the command to execute
778         os.write(proc.stdin.fileno(),
779                 base64.b64encode(python_code) + "\n")
780         msg = os.read(proc.stdout.fileno(), 3)
781         if msg != "OK\n":
782             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
783                 msg, proc.stdout.read(), proc.stderr.read())
784         return proc
785