Merge with head
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import sys
12 import subprocess
13 import threading
14 import time
15 import traceback
16 import signal
17 import re
18 import tempfile
19 import defer
20 import functools
21 import collections
22
23 CTRL_SOCK = "ctrl.sock"
24 STD_ERR = "stderr.log"
25 MAX_FD = 1024
26
27 STOP_MSG = "STOP"
28
29 ERROR_LEVEL = 0
30 DEBUG_LEVEL = 1
31 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
32
33 if hasattr(os, "devnull"):
34     DEV_NULL = os.devnull
35 else:
36     DEV_NULL = "/dev/null"
37
38 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
39
40 def shell_escape(s):
41     """ Escapes strings so that they are safe to use as command-line arguments """
42     if SHELL_SAFE.match(s):
43         # safe string - no escaping needed
44         return s
45     else:
46         # unsafe string - escape
47         def escp(c):
48             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
49                 return c
50             else:
51                 return "'$'\\x%02x''" % (ord(c),)
52         s = ''.join(map(escp,s))
53         return "'%s'" % (s,)
54
55 def eintr_retry(func):
56     import functools
57     @functools.wraps(func)
58     def rv(*p, **kw):
59         retry = kw.pop("_retry", False)
60         for i in xrange(0 if retry else 4):
61             try:
62                 return func(*p, **kw)
63             except (select.error, socket.error), args:
64                 if args[0] == errno.EINTR:
65                     continue
66                 else:
67                     raise 
68             except OSError, e:
69                 if e.errno == errno.EINTR:
70                     continue
71                 else:
72                     raise
73         else:
74             return func(*p, **kw)
75     return rv
76
77 class Server(object):
78     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
79         self._root_dir = root_dir
80         self._stop = False
81         self._ctrl_sock = None
82         self._log_level = log_level
83         self._rdbuf = ""
84         self._environment_setup = environment_setup
85
86     def run(self):
87         try:
88             if self.daemonize():
89                 self.post_daemonize()
90                 self.loop()
91                 self.cleanup()
92                 # ref: "os._exit(0)"
93                 # can not return normally after fork beacuse no exec was done.
94                 # This means that if we don't do a os._exit(0) here the code that 
95                 # follows the call to "Server.run()" in the "caller code" will be 
96                 # executed... but by now it has already been executed after the 
97                 # first process (the one that did the first fork) returned.
98                 os._exit(0)
99         except:
100             self.log_error()
101             self.cleanup()
102             os._exit(0)
103
104     def daemonize(self):
105         # pipes for process synchronization
106         (r, w) = os.pipe()
107         
108         # build root folder
109         root = os.path.normpath(self._root_dir)
110         if not os.path.exists(root):
111             os.makedirs(root, 0755)
112
113         pid1 = os.fork()
114         if pid1 > 0:
115             os.close(w)
116             while True:
117                 try:
118                     os.read(r, 1)
119                 except OSError, e: # pragma: no cover
120                     if e.errno == errno.EINTR:
121                         continue
122                     else:
123                         raise
124                 break
125             os.close(r)
126             # os.waitpid avoids leaving a <defunc> (zombie) process
127             st = os.waitpid(pid1, 0)[1]
128             if st:
129                 raise RuntimeError("Daemonization failed")
130             # return 0 to inform the caller method that this is not the 
131             # daemonized process
132             return 0
133         os.close(r)
134
135         # Decouple from parent environment.
136         os.chdir(self._root_dir)
137         os.umask(0)
138         os.setsid()
139
140         # fork 2
141         pid2 = os.fork()
142         if pid2 > 0:
143             # see ref: "os._exit(0)"
144             os._exit(0)
145
146         # close all open file descriptors.
147         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
148         if (max_fd == resource.RLIM_INFINITY):
149             max_fd = MAX_FD
150         for fd in range(3, max_fd):
151             if fd != w:
152                 try:
153                     os.close(fd)
154                 except OSError:
155                     pass
156
157         # Redirect standard file descriptors.
158         stdin = open(DEV_NULL, "r")
159         stderr = stdout = open(STD_ERR, "a", 0)
160         os.dup2(stdin.fileno(), sys.stdin.fileno())
161         # NOTE: sys.stdout.write will still be buffered, even if the file
162         # was opened with 0 buffer
163         os.dup2(stdout.fileno(), sys.stdout.fileno())
164         os.dup2(stderr.fileno(), sys.stderr.fileno())
165         
166         # setup environment
167         if self._environment_setup:
168             # parse environment variables and pass to child process
169             # do it by executing shell commands, in case there's some heavy setup involved
170             envproc = subprocess.Popen(
171                 [ "bash", "-c", 
172                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
173                         ( self._environment_setup, ) ],
174                 stdin = subprocess.PIPE, 
175                 stdout = subprocess.PIPE,
176                 stderr = subprocess.PIPE
177             )
178             out,err = envproc.communicate()
179
180             # parse new environment
181             if out:
182                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
183             
184                 # apply to current environment
185                 for name, value in environment.iteritems():
186                     os.environ[name] = value
187                 
188                 # apply pythonpath
189                 if 'PYTHONPATH' in environment:
190                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
191
192         # create control socket
193         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
194         self._ctrl_sock.bind(CTRL_SOCK)
195         self._ctrl_sock.listen(0)
196
197         # let the parent process know that the daemonization is finished
198         os.write(w, "\n")
199         os.close(w)
200         return 1
201
202     def post_daemonize(self):
203         # QT, for some strange reason, redefines the SIGCHILD handler to write
204         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
205         # Server dameonization closes all file descriptors from fileno '3',
206         # but the overloaded handler (inherited by the forked process) will
207         # keep trying to write the \0 to fileno 'x', which might have been reused 
208         # after closing, for other operations. This is bad bad bad when fileno 'x'
209         # is in use for communication pouroses, because unexpected \0 start
210         # appearing in the communication messages... this is exactly what happens 
211         # when using netns in daemonized form. Thus, be have no other alternative than
212         # restoring the SIGCHLD handler to the default here.
213         import signal
214         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
215
216     def loop(self):
217         while not self._stop:
218             conn, addr = self._ctrl_sock.accept()
219             conn.settimeout(5)
220             while not self._stop:
221                 try:
222                     msg = self.recv_msg(conn)
223                 except socket.timeout, e:
224                     self.log_error()
225                     break
226                     
227                 if msg == STOP_MSG:
228                     self._stop = True
229                     reply = self.stop_action()
230                 else:
231                     reply = self.reply_action(msg)
232                 
233                 try:
234                     self.send_reply(conn, reply)
235                 except socket.error:
236                     self.log_error()
237                     self.log_error("NOTICE: Awaiting for reconnection")
238                     break
239             try:
240                 conn.close()
241             except:
242                 # Doesn't matter
243                 self.log_error()
244
245     def recv_msg(self, conn):
246         data = [self._rdbuf]
247         chunk = data[0]
248         while '\n' not in chunk:
249             try:
250                 chunk = conn.recv(1024)
251             except (OSError, socket.error), e:
252                 if e[0] != errno.EINTR:
253                     raise
254                 else:
255                     continue
256             if chunk:
257                 data.append(chunk)
258             else:
259                 # empty chunk = EOF
260                 break
261         data = ''.join(data).split('\n',1)
262         while len(data) < 2:
263             data.append('')
264         data, self._rdbuf = data
265         
266         decoded = base64.b64decode(data)
267         return decoded.rstrip()
268
269     def send_reply(self, conn, reply):
270         encoded = base64.b64encode(reply)
271         conn.send("%s\n" % encoded)
272        
273     def cleanup(self):
274         try:
275             self._ctrl_sock.close()
276             os.remove(CTRL_SOCK)
277         except:
278             self.log_error()
279
280     def stop_action(self):
281         return "Stopping server"
282
283     def reply_action(self, msg):
284         return "Reply to: %s" % msg
285
286     def log_error(self, text = None, context = ''):
287         if text == None:
288             text = traceback.format_exc()
289         date = time.strftime("%Y-%m-%d %H:%M:%S")
290         if context:
291             context = " (%s)" % (context,)
292         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
293         return text
294
295     def log_debug(self, text):
296         if self._log_level == DEBUG_LEVEL:
297             date = time.strftime("%Y-%m-%d %H:%M:%S")
298             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
299
300 class Forwarder(object):
301     def __init__(self, root_dir = "."):
302         self._ctrl_sock = None
303         self._root_dir = root_dir
304         self._stop = False
305         self._rdbuf = ""
306
307     def forward(self):
308         self.connect()
309         print >>sys.stderr, "READY."
310         while not self._stop:
311             data = self.read_data()
312             self.send_to_server(data)
313             data = self.recv_from_server()
314             self.write_data(data)
315         self.disconnect()
316
317     def read_data(self):
318         return sys.stdin.readline()
319
320     def write_data(self, data):
321         sys.stdout.write(data)
322         # sys.stdout.write is buffered, this is why we need to do a flush()
323         sys.stdout.flush()
324
325     def send_to_server(self, data):
326         try:
327             self._ctrl_sock.send(data)
328         except (IOError, socket.error), e:
329             if e[0] == errno.EPIPE:
330                 self.connect()
331                 self._ctrl_sock.send(data)
332             else:
333                 raise e
334         encoded = data.rstrip() 
335         msg = base64.b64decode(encoded)
336         if msg == STOP_MSG:
337             self._stop = True
338
339     def recv_from_server(self):
340         data = [self._rdbuf]
341         chunk = data[0]
342         while '\n' not in chunk:
343             try:
344                 chunk = self._ctrl_sock.recv(1024)
345             except (OSError, socket.error), e:
346                 if e[0] != errno.EINTR:
347                     raise
348                 continue
349             if chunk:
350                 data.append(chunk)
351             else:
352                 # empty chunk = EOF
353                 break
354         data = ''.join(data).split('\n',1)
355         while len(data) < 2:
356             data.append('')
357         data, self._rdbuf = data
358         
359         return data+'\n'
360  
361     def connect(self):
362         self.disconnect()
363         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
364         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
365         self._ctrl_sock.connect(sock_addr)
366
367     def disconnect(self):
368         try:
369             self._ctrl_sock.close()
370         except:
371             pass
372
373 class Client(object):
374     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
375             agent = None, environment_setup = ""):
376         self.root_dir = root_dir
377         self.addr = (host, port)
378         self.user = user
379         self.agent = agent
380         self.environment_setup = environment_setup
381         self._stopped = False
382         self._deferreds = collections.deque()
383         self.connect()
384     
385     def __del__(self):
386         if self._process.poll() is None:
387             os.kill(self._process.pid, signal.SIGTERM)
388         self._process.wait()
389         
390     def connect(self):
391         root_dir = self.root_dir
392         (host, port) = self.addr
393         user = self.user
394         agent = self.agent
395         
396         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
397                 c.forward()" % (root_dir,)
398         if host != None:
399             self._process = popen_ssh_subprocess(python_code, host, port, 
400                     user, agent,
401                     environment_setup = self.environment_setup)
402             # popen_ssh_subprocess already waits for readiness
403             if self._process.poll():
404                 err = proc.stderr.read()
405                 raise RuntimeError("Client could not be reached: %s" % \
406                         err)
407         else:
408             self._process = subprocess.Popen(
409                     ["python", "-c", python_code],
410                     stdin = subprocess.PIPE, 
411                     stdout = subprocess.PIPE,
412                     stderr = subprocess.PIPE
413                 )
414                 
415         # Wait for the forwarder to be ready, otherwise nobody
416         # will be able to connect to it
417         helo = self._process.stderr.readline()
418         if helo != 'READY.\n':
419             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
420                     helo + self._process.stderr.read())
421         
422     def send_msg(self, msg):
423         encoded = base64.b64encode(msg)
424         data = "%s\n" % encoded
425         
426         try:
427             self._process.stdin.write(data)
428         except (IOError, ValueError):
429             # dead process, poll it to un-zombify
430             self._process.poll()
431             
432             # try again after reconnect
433             # If it fails again, though, give up
434             self.connect()
435             self._process.stdin.write(data)
436
437     def send_stop(self):
438         self.send_msg(STOP_MSG)
439         self._stopped = True
440
441     def defer_reply(self, transform=None):
442         defer_entry = []
443         self._deferreds.append(defer_entry)
444         return defer.Defer(
445             functools.partial(self.read_reply, defer_entry, transform)
446         )
447         
448     def _read_reply(self):
449         data = self._process.stdout.readline()
450         encoded = data.rstrip() 
451         if not encoded:
452             # empty == eof == dead process, poll it to un-zombify
453             self._process.poll()
454             
455             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
456         return base64.b64decode(encoded)
457     
458     def read_reply(self, which=None, transform=None):
459         # Test to see if someone did it already
460         if which is not None and len(which):
461             # Ok, they did it...
462             # ...just return the deferred value
463             if transform:
464                 return transform(which[0])
465             else:
466                 return which[0]
467         
468         # Process all deferreds until the one we're looking for
469         # or until the queue is empty
470         while self._deferreds:
471             try:
472                 deferred = self._deferreds.popleft()
473             except IndexError:
474                 # emptied
475                 break
476             
477             deferred.append(self._read_reply())
478             if deferred is which:
479                 # We reached the one we were looking for
480                 if transform:
481                     return transform(deferred[0])
482                 else:
483                     return deferred[0]
484         
485         if which is None:
486             # They've requested a synchronous read
487             if transform:
488                 return transform(self._read_reply())
489             else:
490                 return self._read_reply()
491
492 def _make_server_key_args(server_key, host, port, args):
493     """ 
494     Returns a reference to the created temporary file, and adds the
495     corresponding arguments to the given argument list.
496     
497     Make sure to hold onto it until the process is done with the file
498     """
499     if port is not None:
500         host = '%s:%s' % (host,port)
501     # Create a temporary server key file
502     tmp_known_hosts = tempfile.NamedTemporaryFile()
503     
504     # Add the intended host key
505     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
506     
507     # If we're not in strict mode, add user-configured keys
508     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
509         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
510         if os.access(user_hosts_path, os.R_OK):
511             f = open(user_hosts_path, "r")
512             tmp_known_hosts.write(f.read())
513             f.close()
514         
515     tmp_known_hosts.flush()
516     
517     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
518     
519     return tmp_known_hosts
520
521 def popen_ssh_command(command, host, port, user, agent, 
522             stdin="", 
523             ident_key = None,
524             server_key = None,
525             tty = False):
526         """
527         Executes a remote commands, returns ((stdout,stderr),process)
528         """
529         if TRACE:
530             print "ssh", host, command
531         
532         tmp_known_hosts = None
533         args = ['ssh',
534                 # Don't bother with localhost. Makes test easier
535                 '-o', 'NoHostAuthenticationForLocalhost=yes',
536                 '-l', user, host]
537         if agent:
538             args.append('-A')
539         if port:
540             args.append('-p%d' % port)
541         if ident_key:
542             args.extend(('-i', ident_key))
543         if tty:
544             args.append('-t')
545         if server_key:
546             # Create a temporary server key file
547             tmp_known_hosts = _make_server_key_args(
548                 server_key, host, port, args)
549         args.append(command)
550
551         # connects to the remote host and starts a remote connection
552         proc = subprocess.Popen(args, 
553                 stdout = subprocess.PIPE,
554                 stdin = subprocess.PIPE, 
555                 stderr = subprocess.PIPE)
556         
557         # attach tempfile object to the process, to make sure the file stays
558         # alive until the process is finished with it
559         proc._known_hosts = tmp_known_hosts
560         
561         out, err = proc.communicate(stdin)
562         if TRACE:
563             print " -> ", out, err
564
565         return ((out, err), proc)
566  
567 def popen_scp(source, dest, 
568             port = None, 
569             agent = None, 
570             recursive = False,
571             ident_key = None,
572             server_key = None):
573         """
574         Copies from/to remote sites.
575         
576         Source and destination should have the user and host encoded
577         as per scp specs.
578         
579         If source is a file object, a special mode will be used to
580         create the remote file with the same contents.
581         
582         If dest is a file object, the remote file (source) will be
583         read and written into dest.
584         
585         In these modes, recursive cannot be True.
586         
587         Source can be a list of files to copy to a single destination,
588         in which case it is advised that the destination be a folder.
589         """
590         
591         if TRACE:
592             print "scp", source, dest
593         
594         if isinstance(source, file) and source.tell() == 0:
595             source = source.name
596         elif hasattr(source, 'read'):
597             tmp = tempfile.NamedTemporaryFile()
598             while True:
599                 buf = source.read(65536)
600                 if buf:
601                     tmp.write(buf)
602                 else:
603                     break
604             tmp.seek(0)
605             source = tmp.name
606         
607         if isinstance(source, file) or isinstance(dest, file) \
608                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
609             assert not recursive
610             
611             # Parse source/destination as <user>@<server>:<path>
612             if isinstance(dest, basestring) and ':' in dest:
613                 remspec, path = dest.split(':',1)
614             elif isinstance(source, basestring) and ':' in source:
615                 remspec, path = source.split(':',1)
616             else:
617                 raise ValueError, "Both endpoints cannot be local"
618             user,host = remspec.rsplit('@',1)
619             tmp_known_hosts = None
620             
621             args = ['ssh', '-l', user, '-C',
622                     # Don't bother with localhost. Makes test easier
623                     '-o', 'NoHostAuthenticationForLocalhost=yes',
624                     host ]
625             if port:
626                 args.append('-P%d' % port)
627             if ident_key:
628                 args.extend(('-i', ident_key))
629             if server_key:
630                 # Create a temporary server key file
631                 tmp_known_hosts = _make_server_key_args(
632                     server_key, host, port, args)
633             
634             if isinstance(source, file) or hasattr(source, 'read'):
635                 args.append('cat > %s' % (shell_escape(path),))
636             elif isinstance(dest, file) or hasattr(dest, 'write'):
637                 args.append('cat %s' % (shell_escape(path),))
638             else:
639                 raise AssertionError, "Unreachable code reached! :-Q"
640             
641             # connects to the remote host and starts a remote connection
642             if isinstance(source, file):
643                 proc = subprocess.Popen(args, 
644                         stdout = open('/dev/null','w'),
645                         stderr = subprocess.PIPE,
646                         stdin = source)
647                 err = proc.stderr.read()
648                 proc._known_hosts = tmp_known_hosts
649                 eintr_retry(proc.wait)()
650                 return ((None,err), proc)
651             elif isinstance(dest, file):
652                 proc = subprocess.Popen(args, 
653                         stdout = open('/dev/null','w'),
654                         stderr = subprocess.PIPE,
655                         stdin = source)
656                 err = proc.stderr.read()
657                 proc._known_hosts = tmp_known_hosts
658                 eintr_retry(proc.wait)()
659                 return ((None,err), proc)
660             elif hasattr(source, 'read'):
661                 # file-like (but not file) source
662                 proc = subprocess.Popen(args, 
663                         stdout = open('/dev/null','w'),
664                         stderr = subprocess.PIPE,
665                         stdin = subprocess.PIPE)
666                 
667                 buf = None
668                 err = []
669                 while True:
670                     if not buf:
671                         buf = source.read(4096)
672                     if not buf:
673                         #EOF
674                         break
675                     
676                     rdrdy, wrdy, broken = select.select(
677                         [proc.stderr],
678                         [proc.stdin],
679                         [proc.stderr,proc.stdin])
680                     
681                     if proc.stderr in rdrdy:
682                         # use os.read for fully unbuffered behavior
683                         err.append(os.read(proc.stderr.fileno(), 4096))
684                     
685                     if proc.stdin in wrdy:
686                         proc.stdin.write(buf)
687                         buf = None
688                     
689                     if broken:
690                         break
691                 proc.stdin.close()
692                 err.append(proc.stderr.read())
693                     
694                 proc._known_hosts = tmp_known_hosts
695                 eintr_retry(proc.wait)()
696                 return ((None,''.join(err)), proc)
697             elif hasattr(dest, 'write'):
698                 # file-like (but not file) dest
699                 proc = subprocess.Popen(args, 
700                         stdout = subprocess.PIPE,
701                         stderr = subprocess.PIPE,
702                         stdin = open('/dev/null','w'))
703                 
704                 buf = None
705                 err = []
706                 while True:
707                     rdrdy, wrdy, broken = select.select(
708                         [proc.stderr, proc.stdout],
709                         [],
710                         [proc.stderr, proc.stdout])
711                     
712                     if proc.stderr in rdrdy:
713                         # use os.read for fully unbuffered behavior
714                         err.append(os.read(proc.stderr.fileno(), 4096))
715                     
716                     if proc.stdout in rdrdy:
717                         # use os.read for fully unbuffered behavior
718                         buf = os.read(proc.stdout.fileno(), 4096)
719                         dest.write(buf)
720                         
721                         if not buf:
722                             #EOF
723                             break
724                     
725                     if broken:
726                         break
727                 err.append(proc.stderr.read())
728                     
729                 proc._known_hosts = tmp_known_hosts
730                 eintr_retry(proc.wait)()
731                 return ((None,''.join(err)), proc)
732             else:
733                 raise AssertionError, "Unreachable code reached! :-Q"
734         else:
735             # Parse destination as <user>@<server>:<path>
736             if isinstance(dest, basestring) and ':' in dest:
737                 remspec, path = dest.split(':',1)
738             elif isinstance(source, basestring) and ':' in source:
739                 remspec, path = source.split(':',1)
740             else:
741                 raise ValueError, "Both endpoints cannot be local"
742             user,host = remspec.rsplit('@',1)
743             
744             # plain scp
745             tmp_known_hosts = None
746             args = ['scp', '-q', '-p', '-C',
747                     # Don't bother with localhost. Makes test easier
748                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
749             if port:
750                 args.append('-P%d' % port)
751             if recursive:
752                 args.append('-r')
753             if ident_key:
754                 args.extend(('-i', ident_key))
755             if server_key:
756                 # Create a temporary server key file
757                 tmp_known_hosts = _make_server_key_args(
758                     server_key, host, port, args)
759             if isinstance(source,list):
760                 args.extend(source)
761             else:
762                 args.append(source)
763             args.append(dest)
764
765             # connects to the remote host and starts a remote connection
766             proc = subprocess.Popen(args, 
767                     stdout = subprocess.PIPE,
768                     stdin = subprocess.PIPE, 
769                     stderr = subprocess.PIPE)
770             proc._known_hosts = tmp_known_hosts
771             
772             comm = proc.communicate()
773             eintr_retry(proc.wait)()
774             return (comm, proc)
775  
776 def popen_ssh_subprocess(python_code, host, port, user, agent, 
777         python_path = None,
778         ident_key = None,
779         server_key = None,
780         tty = False,
781         environment_setup = "",
782         waitcommand = False):
783         cmd = ""
784         if python_path:
785             python_path.replace("'", r"'\''")
786             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
787             cmd += " ; "
788         if environment_setup:
789             cmd += environment_setup
790             cmd += " ; "
791         # Uncomment for debug (to run everything under strace)
792         # We had to verify if strace works (cannot nest them)
793         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
794         #cmd += "$CMD "
795         #cmd += "strace -f -tt -s 200 -o strace$$.out "
796         cmd += "python -c '"
797         cmd += "import base64, os\n"
798         cmd += "cmd = \"\"\n"
799         cmd += "while True:\n"
800         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
801         cmd += " if cmd[-1] == \"\\n\": break\n"
802         cmd += "cmd = base64.b64decode(cmd)\n"
803         # Uncomment for debug
804         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
805         if not waitcommand:
806             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
807         cmd += "exec(cmd)\n"
808         if waitcommand:
809             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
810         cmd += "'"
811         
812         tmp_known_hosts = None
813         args = ['ssh',
814                 # Don't bother with localhost. Makes test easier
815                 '-o', 'NoHostAuthenticationForLocalhost=yes',
816                 '-l', user, host]
817         if agent:
818             args.append('-A')
819         if port:
820             args.append('-p%d' % port)
821         if ident_key:
822             args.extend(('-i', ident_key))
823         if tty:
824             args.append('-t')
825         if server_key:
826             # Create a temporary server key file
827             tmp_known_hosts = _make_server_key_args(
828                 server_key, host, port, args)
829         args.append(cmd)
830
831         # connects to the remote host and starts a remote rpyc connection
832         proc = subprocess.Popen(args, 
833                 stdout = subprocess.PIPE,
834                 stdin = subprocess.PIPE, 
835                 stderr = subprocess.PIPE)
836         proc._known_hosts = tmp_known_hosts
837         
838         # send the command to execute
839         os.write(proc.stdin.fileno(),
840                 base64.b64encode(python_code) + "\n")
841         msg = os.read(proc.stdout.fileno(), 3)
842         if msg != "OK\n":
843             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
844                 msg, proc.stdout.read(), proc.stderr.read())
845         return proc
846