Merge
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except (select.error, socket.error), args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84         self._environment_setup = environment_setup
85
86     def run(self):
87         try:
88             if self.daemonize():
89                 self.post_daemonize()
90                 self.loop()
91                 self.cleanup()
92                 # ref: "os._exit(0)"
93                 # can not return normally after fork beacuse no exec was done.
94                 # This means that if we don't do a os._exit(0) here the code that 
95                 # follows the call to "Server.run()" in the "caller code" will be 
96                 # executed... but by now it has already been executed after the 
97                 # first process (the one that did the first fork) returned.
98                 os._exit(0)
99         except:
100             self.log_error()
101             self.cleanup()
102             os._exit(0)
103
104     def daemonize(self):
105         # pipes for process synchronization
106         (r, w) = os.pipe()
107         
108         # build root folder
109         root = os.path.normpath(self._root_dir)
110         if not os.path.exists(root):
111             os.makedirs(root, 0755)
112
113         pid1 = os.fork()
114         if pid1 > 0:
115             os.close(w)
116             while True:
117                 try:
118                     os.read(r, 1)
119                 except OSError, e: # pragma: no cover
120                     if e.errno == errno.EINTR:
121                         continue
122                     else:
123                         raise
124                 break
125             os.close(r)
126             # os.waitpid avoids leaving a <defunc> (zombie) process
127             st = os.waitpid(pid1, 0)[1]
128             if st:
129                 raise RuntimeError("Daemonization failed")
130             # return 0 to inform the caller method that this is not the 
131             # daemonized process
132             return 0
133         os.close(r)
134
135         # Decouple from parent environment.
136         os.chdir(self._root_dir)
137         os.umask(0)
138         os.setsid()
139
140         # fork 2
141         pid2 = os.fork()
142         if pid2 > 0:
143             # see ref: "os._exit(0)"
144             os._exit(0)
145
146         # close all open file descriptors.
147         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148         if (max_fd == resource.RLIM_INFINITY):
149             max_fd = MAX_FD
150         for fd in range(3, max_fd):
151             if fd != w:
152                 try:
153                     os.close(fd)
154                 except OSError:
155                     pass
156
157         # Redirect standard file descriptors.
158         stdin = open(DEV_NULL, "r")
159         stderr = stdout = open(STD_ERR, "a", 0)
160         os.dup2(stdin.fileno(), sys.stdin.fileno())
161         # NOTE: sys.stdout.write will still be buffered, even if the file
162         # was opened with 0 buffer
163         os.dup2(stdout.fileno(), sys.stdout.fileno())
164         os.dup2(stderr.fileno(), sys.stderr.fileno())
165         
166         # setup environment
167         if self._environment_setup:
168             # parse environment variables and pass to child process
169             # do it by executing shell commands, in case there's some heavy setup involved
170             envproc = subprocess.Popen(
171                 [ "bash", "-c", 
172                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173                         ( self._environment_setup, ) ],
174                 stdin = subprocess.PIPE, 
175                 stdout = subprocess.PIPE,
176                 stderr = subprocess.PIPE
177             )
178             out,err = envproc.communicate()
179
180             # parse new environment
181             if out:
182                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
183             
184                 # apply to current environment
185                 for name, value in environment.iteritems():
186                     os.environ[name] = value
187                 
188                 # apply pythonpath
189                 if 'PYTHONPATH' in environment:
190                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
191
192         # create control socket
193         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
194         self._ctrl_sock.bind(CTRL_SOCK)
195         self._ctrl_sock.listen(0)
196
197         # let the parent process know that the daemonization is finished
198         os.write(w, "\n")
199         os.close(w)
200         return 1
201
202     def post_daemonize(self):
203         pass
204
205     def loop(self):
206         while not self._stop:
207             conn, addr = self._ctrl_sock.accept()
208             conn.settimeout(5)
209             while not self._stop:
210                 try:
211                     msg = self.recv_msg(conn)
212                 except socket.timeout, e:
213                     self.log_error()
214                     break
215                     
216                 if msg == STOP_MSG:
217                     self._stop = True
218                     reply = self.stop_action()
219                 else:
220                     reply = self.reply_action(msg)
221                 
222                 try:
223                     self.send_reply(conn, reply)
224                 except socket.error:
225                     self.log_error()
226                     self.log_error("NOTICE: Awaiting for reconnection")
227                     break
228             try:
229                 conn.close()
230             except:
231                 # Doesn't matter
232                 self.log_error()
233
234     def recv_msg(self, conn):
235         data = [self._rdbuf]
236         chunk = data[0]
237         while '\n' not in chunk:
238             try:
239                 chunk = conn.recv(1024)
240             except (OSError, socket.error), e:
241                 if e[0] != errno.EINTR:
242                     raise
243                 else:
244                     continue
245             if chunk:
246                 data.append(chunk)
247             else:
248                 # empty chunk = EOF
249                 break
250         data = ''.join(data).split('\n',1)
251         while len(data) < 2:
252             data.append('')
253         data, self._rdbuf = data
254         
255         decoded = base64.b64decode(data)
256         return decoded.rstrip()
257
258     def send_reply(self, conn, reply):
259         encoded = base64.b64encode(reply)
260         conn.send("%s\n" % encoded)
261        
262     def cleanup(self):
263         try:
264             self._ctrl_sock.close()
265             os.remove(CTRL_SOCK)
266         except:
267             self.log_error()
268
269     def stop_action(self):
270         return "Stopping server"
271
272     def reply_action(self, msg):
273         return "Reply to: %s" % msg
274
275     def log_error(self, text = None, context = ''):
276         if text == None:
277             text = traceback.format_exc()
278         date = time.strftime("%Y-%m-%d %H:%M:%S")
279         if context:
280             context = " (%s)" % (context,)
281         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
282         return text
283
284     def log_debug(self, text):
285         if self._log_level == DEBUG_LEVEL:
286             date = time.strftime("%Y-%m-%d %H:%M:%S")
287             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
288
289 class Forwarder(object):
290     def __init__(self, root_dir = "."):
291         self._ctrl_sock = None
292         self._root_dir = root_dir
293         self._stop = False
294         self._rdbuf = ""
295
296     def forward(self):
297         self.connect()
298         print >>sys.stderr, "READY."
299         while not self._stop:
300             data = self.read_data()
301             self.send_to_server(data)
302             data = self.recv_from_server()
303             self.write_data(data)
304         self.disconnect()
305
306     def read_data(self):
307         return sys.stdin.readline()
308
309     def write_data(self, data):
310         sys.stdout.write(data)
311         # sys.stdout.write is buffered, this is why we need to do a flush()
312         sys.stdout.flush()
313
314     def send_to_server(self, data):
315         try:
316             self._ctrl_sock.send(data)
317         except (IOError, socket.error), e:
318             if e[0] == errno.EPIPE:
319                 self.connect()
320                 self._ctrl_sock.send(data)
321             else:
322                 raise e
323         encoded = data.rstrip() 
324         msg = base64.b64decode(encoded)
325         if msg == STOP_MSG:
326             self._stop = True
327
328     def recv_from_server(self):
329         data = [self._rdbuf]
330         chunk = data[0]
331         while '\n' not in chunk:
332             try:
333                 chunk = self._ctrl_sock.recv(1024)
334             except (OSError, socket.error), e:
335                 if e[0] != errno.EINTR:
336                     raise
337                 continue
338             if chunk:
339                 data.append(chunk)
340             else:
341                 # empty chunk = EOF
342                 break
343         data = ''.join(data).split('\n',1)
344         while len(data) < 2:
345             data.append('')
346         data, self._rdbuf = data
347         
348         return data+'\n'
349  
350     def connect(self):
351         self.disconnect()
352         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
353         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
354         self._ctrl_sock.connect(sock_addr)
355
356     def disconnect(self):
357         try:
358             self._ctrl_sock.close()
359         except:
360             pass
361
362 class Client(object):
363     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
364             agent = None, environment_setup = ""):
365         self.root_dir = root_dir
366         self.addr = (host, port)
367         self.user = user
368         self.agent = agent
369         self.environment_setup = environment_setup
370         self._stopped = False
371         self._deferreds = collections.deque()
372         self.connect()
373     
374     def __del__(self):
375         if self._process.poll() is None:
376             os.kill(self._process.pid, signal.SIGTERM)
377         self._process.wait()
378         
379     def connect(self):
380         root_dir = self.root_dir
381         (host, port) = self.addr
382         user = self.user
383         agent = self.agent
384         
385         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
386                 c.forward()" % (root_dir,)
387         if host != None:
388             self._process = popen_ssh_subprocess(python_code, host, port, 
389                     user, agent,
390                     environment_setup = self.environment_setup)
391             # popen_ssh_subprocess already waits for readiness
392             if self._process.poll():
393                 err = proc.stderr.read()
394                 raise RuntimeError("Client could not be reached: %s" % \
395                         err)
396         else:
397             self._process = subprocess.Popen(
398                     ["python", "-c", python_code],
399                     stdin = subprocess.PIPE, 
400                     stdout = subprocess.PIPE,
401                     stderr = subprocess.PIPE
402                 )
403                 
404         # Wait for the forwarder to be ready, otherwise nobody
405         # will be able to connect to it
406         helo = self._process.stderr.readline()
407         if helo != 'READY.\n':
408             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
409                     helo + self._process.stderr.read())
410         
411     def send_msg(self, msg):
412         encoded = base64.b64encode(msg)
413         data = "%s\n" % encoded
414         
415         try:
416             self._process.stdin.write(data)
417         except (IOError, ValueError):
418             # dead process, poll it to un-zombify
419             self._process.poll()
420             
421             # try again after reconnect
422             # If it fails again, though, give up
423             self.connect()
424             self._process.stdin.write(data)
425
426     def send_stop(self):
427         self.send_msg(STOP_MSG)
428         self._stopped = True
429
430     def defer_reply(self, transform=None):
431         defer_entry = []
432         self._deferreds.append(defer_entry)
433         return defer.Defer(
434             functools.partial(self.read_reply, defer_entry, transform)
435         )
436         
437     def _read_reply(self):
438         data = self._process.stdout.readline()
439         encoded = data.rstrip() 
440         if not encoded:
441             # empty == eof == dead process, poll it to un-zombify
442             self._process.poll()
443             
444             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
445         return base64.b64decode(encoded)
446     
447     def read_reply(self, which=None, transform=None):
448         # Test to see if someone did it already
449         if which is not None and len(which):
450             # Ok, they did it...
451             # ...just return the deferred value
452             if transform:
453                 return transform(which[0])
454             else:
455                 return which[0]
456         
457         # Process all deferreds until the one we're looking for
458         # or until the queue is empty
459         while self._deferreds:
460             try:
461                 deferred = self._deferreds.popleft()
462             except IndexError:
463                 # emptied
464                 break
465             
466             deferred.append(self._read_reply())
467             if deferred is which:
468                 # We reached the one we were looking for
469                 if transform:
470                     return transform(deferred[0])
471                 else:
472                     return deferred[0]
473         
474         if which is None:
475             # They've requested a synchronous read
476             if transform:
477                 return transform(self._read_reply())
478             else:
479                 return self._read_reply()
480
481 def _make_server_key_args(server_key, host, port, args):
482     """ 
483     Returns a reference to the created temporary file, and adds the
484     corresponding arguments to the given argument list.
485     
486     Make sure to hold onto it until the process is done with the file
487     """
488     if port is not None:
489         host = '%s:%s' % (host,port)
490     # Create a temporary server key file
491     tmp_known_hosts = tempfile.NamedTemporaryFile()
492     
493     # Add the intended host key
494     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
495     
496     # If we're not in strict mode, add user-configured keys
497     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
498         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
499         if os.access(user_hosts_path, os.R_OK):
500             f = open(user_hosts_path, "r")
501             tmp_known_hosts.write(f.read())
502             f.close()
503         
504     tmp_known_hosts.flush()
505     
506     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
507     
508     return tmp_known_hosts
509
510 def popen_ssh_command(command, host, port, user, agent, 
511             stdin="", 
512             ident_key = None,
513             server_key = None,
514             tty = False):
515         """
516         Executes a remote commands, returns ((stdout,stderr),process)
517         """
518         if TRACE:
519             print "ssh", host, command
520         
521         tmp_known_hosts = None
522         args = ['ssh',
523                 # Don't bother with localhost. Makes test easier
524                 '-o', 'NoHostAuthenticationForLocalhost=yes',
525                 '-l', user, host]
526         if agent:
527             args.append('-A')
528         if port:
529             args.append('-p%d' % port)
530         if ident_key:
531             args.extend(('-i', ident_key))
532         if tty:
533             args.append('-t')
534         if server_key:
535             # Create a temporary server key file
536             tmp_known_hosts = _make_server_key_args(
537                 server_key, host, port, args)
538         args.append(command)
539
540         # connects to the remote host and starts a remote connection
541         proc = subprocess.Popen(args, 
542                 stdout = subprocess.PIPE,
543                 stdin = subprocess.PIPE, 
544                 stderr = subprocess.PIPE)
545         
546         # attach tempfile object to the process, to make sure the file stays
547         # alive until the process is finished with it
548         proc._known_hosts = tmp_known_hosts
549         
550         out, err = proc.communicate(stdin)
551         if TRACE:
552             print " -> ", out, err
553
554         return ((out, err), proc)
555  
556 def popen_scp(source, dest, 
557             port = None, 
558             agent = None, 
559             recursive = False,
560             ident_key = None,
561             server_key = None):
562         """
563         Copies from/to remote sites.
564         
565         Source and destination should have the user and host encoded
566         as per scp specs.
567         
568         If source is a file object, a special mode will be used to
569         create the remote file with the same contents.
570         
571         If dest is a file object, the remote file (source) will be
572         read and written into dest.
573         
574         In these modes, recursive cannot be True.
575         
576         Source can be a list of files to copy to a single destination,
577         in which case it is advised that the destination be a folder.
578         """
579         
580         if TRACE:
581             print "scp", source, dest
582         
583         if isinstance(source, file) or isinstance(dest, file) \
584                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
585             assert not recursive
586             
587             # Parse source/destination as <user>@<server>:<path>
588             if isinstance(dest, basestring) and ':' in dest:
589                 remspec, path = dest.split(':',1)
590             elif isinstance(source, basestring) and ':' in source:
591                 remspec, path = source.split(':',1)
592             else:
593                 raise ValueError, "Both endpoints cannot be local"
594             user,host = remspec.rsplit('@',1)
595             tmp_known_hosts = None
596             
597             args = ['ssh', '-l', user, '-C',
598                     # Don't bother with localhost. Makes test easier
599                     '-o', 'NoHostAuthenticationForLocalhost=yes',
600                     host ]
601             if port:
602                 args.append('-P%d' % port)
603             if ident_key:
604                 args.extend(('-i', ident_key))
605             if server_key:
606                 # Create a temporary server key file
607                 tmp_known_hosts = _make_server_key_args(
608                     server_key, host, port, args)
609             
610             if isinstance(source, file) or hasattr(source, 'read'):
611                 args.append('cat > %s' % (shell_escape(path),))
612             elif isinstance(dest, file) or hasattr(dest, 'write'):
613                 args.append('cat %s' % (shell_escape(path),))
614             else:
615                 raise AssertionError, "Unreachable code reached! :-Q"
616             
617             # connects to the remote host and starts a remote connection
618             if isinstance(source, file):
619                 proc = subprocess.Popen(args, 
620                         stdout = open('/dev/null','w'),
621                         stderr = subprocess.PIPE,
622                         stdin = source)
623                 err = proc.stderr.read()
624                 proc._known_hosts = tmp_known_hosts
625                 eintr_retry(proc.wait)()
626                 return ((None,err), proc)
627             elif isinstance(dest, file):
628                 proc = subprocess.Popen(args, 
629                         stdout = open('/dev/null','w'),
630                         stderr = subprocess.PIPE,
631                         stdin = source)
632                 err = proc.stderr.read()
633                 proc._known_hosts = tmp_known_hosts
634                 eintr_retry(proc.wait)()
635                 return ((None,err), proc)
636             elif hasattr(source, 'read'):
637                 # file-like (but not file) source
638                 proc = subprocess.Popen(args, 
639                         stdout = open('/dev/null','w'),
640                         stderr = subprocess.PIPE,
641                         stdin = subprocess.PIPE)
642                 
643                 buf = None
644                 err = []
645                 while True:
646                     if not buf:
647                         buf = source.read(4096)
648                     if not buf:
649                         #EOF
650                         break
651                     
652                     rdrdy, wrdy, broken = select.select(
653                         [proc.stderr],
654                         [proc.stdin],
655                         [proc.stderr,proc.stdin])
656                     
657                     if proc.stderr in rdrdy:
658                         # use os.read for fully unbuffered behavior
659                         err.append(os.read(proc.stderr.fileno(), 4096))
660                     
661                     if proc.stdin in wrdy:
662                         proc.stdin.write(buf)
663                         buf = None
664                     
665                     if broken:
666                         break
667                 proc.stdin.close()
668                 err.append(proc.stderr.read())
669                     
670                 proc._known_hosts = tmp_known_hosts
671                 eintr_retry(proc.wait)()
672                 return ((None,''.join(err)), proc)
673             elif hasattr(dest, 'write'):
674                 # file-like (but not file) dest
675                 proc = subprocess.Popen(args, 
676                         stdout = subprocess.PIPE,
677                         stderr = subprocess.PIPE,
678                         stdin = open('/dev/null','w'))
679                 
680                 buf = None
681                 err = []
682                 while True:
683                     rdrdy, wrdy, broken = select.select(
684                         [proc.stderr, proc.stdout],
685                         [],
686                         [proc.stderr, proc.stdout])
687                     
688                     if proc.stderr in rdrdy:
689                         # use os.read for fully unbuffered behavior
690                         err.append(os.read(proc.stderr.fileno(), 4096))
691                     
692                     if proc.stdout in rdrdy:
693                         # use os.read for fully unbuffered behavior
694                         buf = os.read(proc.stdout.fileno(), 4096)
695                         dest.write(buf)
696                         
697                         if not buf:
698                             #EOF
699                             break
700                     
701                     if broken:
702                         break
703                 err.append(proc.stderr.read())
704                     
705                 proc._known_hosts = tmp_known_hosts
706                 eintr_retry(proc.wait)()
707                 return ((None,''.join(err)), proc)
708             else:
709                 raise AssertionError, "Unreachable code reached! :-Q"
710         else:
711             # Parse destination as <user>@<server>:<path>
712             if isinstance(dest, basestring) and ':' in dest:
713                 remspec, path = dest.split(':',1)
714             elif isinstance(source, basestring) and ':' in source:
715                 remspec, path = source.split(':',1)
716             else:
717                 raise ValueError, "Both endpoints cannot be local"
718             user,host = remspec.rsplit('@',1)
719             
720             # plain scp
721             tmp_known_hosts = None
722             args = ['scp', '-q', '-p', '-C',
723                     # Don't bother with localhost. Makes test easier
724                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
725             if port:
726                 args.append('-P%d' % port)
727             if recursive:
728                 args.append('-r')
729             if ident_key:
730                 args.extend(('-i', ident_key))
731             if server_key:
732                 # Create a temporary server key file
733                 tmp_known_hosts = _make_server_key_args(
734                     server_key, host, port, args)
735             if isinstance(source,list):
736                 args.extend(source)
737             else:
738                 args.append(source)
739             args.append(dest)
740
741             # connects to the remote host and starts a remote connection
742             proc = subprocess.Popen(args, 
743                     stdout = subprocess.PIPE,
744                     stdin = subprocess.PIPE, 
745                     stderr = subprocess.PIPE)
746             proc._known_hosts = tmp_known_hosts
747             
748             comm = proc.communicate()
749             eintr_retry(proc.wait)()
750             return (comm, proc)
751  
752 def popen_ssh_subprocess(python_code, host, port, user, agent, 
753         python_path = None,
754         ident_key = None,
755         server_key = None,
756         tty = False,
757         environment_setup = "",
758         waitcommand = False):
759         cmd = ""
760         if python_path:
761             python_path.replace("'", r"'\''")
762             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
763             cmd += " ; "
764         if environment_setup:
765             cmd += environment_setup
766             cmd += " ; "
767         # Uncomment for debug (to run everything under strace)
768         # We had to verify if strace works (cannot nest them)
769         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
770         #cmd += "$CMD "
771         #cmd += "strace -f -tt -s 200 -o strace$$.out "
772         cmd += "python -c '"
773         cmd += "import base64, os\n"
774         cmd += "cmd = \"\"\n"
775         cmd += "while True:\n"
776         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
777         cmd += " if cmd[-1] == \"\\n\": break\n"
778         cmd += "cmd = base64.b64decode(cmd)\n"
779         # Uncomment for debug
780         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
781         if not waitcommand:
782             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
783         cmd += "exec(cmd)\n"
784         if waitcommand:
785             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
786         cmd += "'"
787         
788         tmp_known_hosts = None
789         args = ['ssh',
790                 # Don't bother with localhost. Makes test easier
791                 '-o', 'NoHostAuthenticationForLocalhost=yes',
792                 '-l', user, host]
793         if agent:
794             args.append('-A')
795         if port:
796             args.append('-p%d' % port)
797         if ident_key:
798             args.extend(('-i', ident_key))
799         if tty:
800             args.append('-t')
801         if server_key:
802             # Create a temporary server key file
803             tmp_known_hosts = _make_server_key_args(
804                 server_key, host, port, args)
805         args.append(cmd)
806
807         # connects to the remote host and starts a remote rpyc connection
808         proc = subprocess.Popen(args, 
809                 stdout = subprocess.PIPE,
810                 stdin = subprocess.PIPE, 
811                 stderr = subprocess.PIPE)
812         proc._known_hosts = tmp_known_hosts
813         
814         # send the command to execute
815         os.write(proc.stdin.fileno(),
816                 base64.b64encode(python_code) + "\n")
817         msg = os.read(proc.stdout.fileno(), 3)
818         if msg != "OK\n":
819             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
820                 msg, proc.stdout.read(), proc.stderr.read())
821         return proc
822