SSH timeout. It tends to... hang. Whatevah...
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import errno
6 import os
7 import os.path
8 import resource
9 import select
10 import socket
11 import signal
12 import sys
13 import subprocess
14 import threading
15 import time
16 import traceback
17 import signal
18 import re
19 import tempfile
20 import defer
21 import functools
22 import collections
23
24 CTRL_SOCK = "ctrl.sock"
25 STD_ERR = "stderr.log"
26 MAX_FD = 1024
27
28 STOP_MSG = "STOP"
29
30 ERROR_LEVEL = 0
31 DEBUG_LEVEL = 1
32 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
33
34 if hasattr(os, "devnull"):
35     DEV_NULL = os.devnull
36 else:
37     DEV_NULL = "/dev/null"
38
39 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
40
41 def shell_escape(s):
42     """ Escapes strings so that they are safe to use as command-line arguments """
43     if SHELL_SAFE.match(s):
44         # safe string - no escaping needed
45         return s
46     else:
47         # unsafe string - escape
48         def escp(c):
49             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
50                 return c
51             else:
52                 return "'$'\\x%02x''" % (ord(c),)
53         s = ''.join(map(escp,s))
54         return "'%s'" % (s,)
55
56 def eintr_retry(func):
57     import functools
58     @functools.wraps(func)
59     def rv(*p, **kw):
60         retry = kw.pop("_retry", False)
61         for i in xrange(0 if retry else 4):
62             try:
63                 return func(*p, **kw)
64             except (select.error, socket.error), args:
65                 if args[0] == errno.EINTR:
66                     continue
67                 else:
68                     raise 
69             except OSError, e:
70                 if e.errno == errno.EINTR:
71                     continue
72                 else:
73                     raise
74         else:
75             return func(*p, **kw)
76     return rv
77
78 class Server(object):
79     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
80         self._root_dir = root_dir
81         self._stop = False
82         self._ctrl_sock = None
83         self._log_level = log_level
84         self._rdbuf = ""
85         self._environment_setup = environment_setup
86
87     def run(self):
88         try:
89             if self.daemonize():
90                 self.post_daemonize()
91                 self.loop()
92                 self.cleanup()
93                 # ref: "os._exit(0)"
94                 # can not return normally after fork beacuse no exec was done.
95                 # This means that if we don't do a os._exit(0) here the code that 
96                 # follows the call to "Server.run()" in the "caller code" will be 
97                 # executed... but by now it has already been executed after the 
98                 # first process (the one that did the first fork) returned.
99                 os._exit(0)
100         except:
101             self.log_error()
102             self.cleanup()
103             os._exit(0)
104
105     def daemonize(self):
106         # pipes for process synchronization
107         (r, w) = os.pipe()
108         
109         # build root folder
110         root = os.path.normpath(self._root_dir)
111         if not os.path.exists(root):
112             os.makedirs(root, 0755)
113
114         pid1 = os.fork()
115         if pid1 > 0:
116             os.close(w)
117             while True:
118                 try:
119                     os.read(r, 1)
120                 except OSError, e: # pragma: no cover
121                     if e.errno == errno.EINTR:
122                         continue
123                     else:
124                         raise
125                 break
126             os.close(r)
127             # os.waitpid avoids leaving a <defunc> (zombie) process
128             st = os.waitpid(pid1, 0)[1]
129             if st:
130                 raise RuntimeError("Daemonization failed")
131             # return 0 to inform the caller method that this is not the 
132             # daemonized process
133             return 0
134         os.close(r)
135
136         # Decouple from parent environment.
137         os.chdir(self._root_dir)
138         os.umask(0)
139         os.setsid()
140
141         # fork 2
142         pid2 = os.fork()
143         if pid2 > 0:
144             # see ref: "os._exit(0)"
145             os._exit(0)
146
147         # close all open file descriptors.
148         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
149         if (max_fd == resource.RLIM_INFINITY):
150             max_fd = MAX_FD
151         for fd in range(3, max_fd):
152             if fd != w:
153                 try:
154                     os.close(fd)
155                 except OSError:
156                     pass
157
158         # Redirect standard file descriptors.
159         stdin = open(DEV_NULL, "r")
160         stderr = stdout = open(STD_ERR, "a", 0)
161         os.dup2(stdin.fileno(), sys.stdin.fileno())
162         # NOTE: sys.stdout.write will still be buffered, even if the file
163         # was opened with 0 buffer
164         os.dup2(stdout.fileno(), sys.stdout.fileno())
165         os.dup2(stderr.fileno(), sys.stderr.fileno())
166         
167         # setup environment
168         if self._environment_setup:
169             # parse environment variables and pass to child process
170             # do it by executing shell commands, in case there's some heavy setup involved
171             envproc = subprocess.Popen(
172                 [ "bash", "-c", 
173                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
174                         ( self._environment_setup, ) ],
175                 stdin = subprocess.PIPE, 
176                 stdout = subprocess.PIPE,
177                 stderr = subprocess.PIPE
178             )
179             out,err = envproc.communicate()
180
181             # parse new environment
182             if out:
183                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
184             
185                 # apply to current environment
186                 for name, value in environment.iteritems():
187                     os.environ[name] = value
188                 
189                 # apply pythonpath
190                 if 'PYTHONPATH' in environment:
191                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
192
193         # create control socket
194         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
195         self._ctrl_sock.bind(CTRL_SOCK)
196         self._ctrl_sock.listen(0)
197
198         # let the parent process know that the daemonization is finished
199         os.write(w, "\n")
200         os.close(w)
201         return 1
202
203     def post_daemonize(self):
204         # QT, for some strange reason, redefines the SIGCHILD handler to write
205         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
206         # Server dameonization closes all file descriptors from fileno '3',
207         # but the overloaded handler (inherited by the forked process) will
208         # keep trying to write the \0 to fileno 'x', which might have been reused 
209         # after closing, for other operations. This is bad bad bad when fileno 'x'
210         # is in use for communication pouroses, because unexpected \0 start
211         # appearing in the communication messages... this is exactly what happens 
212         # when using netns in daemonized form. Thus, be have no other alternative than
213         # restoring the SIGCHLD handler to the default here.
214         import signal
215         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
216
217     def loop(self):
218         while not self._stop:
219             conn, addr = self._ctrl_sock.accept()
220             conn.settimeout(5)
221             while not self._stop:
222                 try:
223                     msg = self.recv_msg(conn)
224                 except socket.timeout, e:
225                     self.log_error()
226                     break
227                     
228                 if msg == STOP_MSG:
229                     self._stop = True
230                     reply = self.stop_action()
231                 else:
232                     reply = self.reply_action(msg)
233                 
234                 try:
235                     self.send_reply(conn, reply)
236                 except socket.error:
237                     self.log_error()
238                     self.log_error("NOTICE: Awaiting for reconnection")
239                     break
240             try:
241                 conn.close()
242             except:
243                 # Doesn't matter
244                 self.log_error()
245
246     def recv_msg(self, conn):
247         data = [self._rdbuf]
248         chunk = data[0]
249         while '\n' not in chunk:
250             try:
251                 chunk = conn.recv(1024)
252             except (OSError, socket.error), e:
253                 if e[0] != errno.EINTR:
254                     raise
255                 else:
256                     continue
257             if chunk:
258                 data.append(chunk)
259             else:
260                 # empty chunk = EOF
261                 break
262         data = ''.join(data).split('\n',1)
263         while len(data) < 2:
264             data.append('')
265         data, self._rdbuf = data
266         
267         decoded = base64.b64decode(data)
268         return decoded.rstrip()
269
270     def send_reply(self, conn, reply):
271         encoded = base64.b64encode(reply)
272         conn.send("%s\n" % encoded)
273        
274     def cleanup(self):
275         try:
276             self._ctrl_sock.close()
277             os.remove(CTRL_SOCK)
278         except:
279             self.log_error()
280
281     def stop_action(self):
282         return "Stopping server"
283
284     def reply_action(self, msg):
285         return "Reply to: %s" % msg
286
287     def log_error(self, text = None, context = ''):
288         if text == None:
289             text = traceback.format_exc()
290         date = time.strftime("%Y-%m-%d %H:%M:%S")
291         if context:
292             context = " (%s)" % (context,)
293         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
294         return text
295
296     def log_debug(self, text):
297         if self._log_level == DEBUG_LEVEL:
298             date = time.strftime("%Y-%m-%d %H:%M:%S")
299             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
300
301 class Forwarder(object):
302     def __init__(self, root_dir = "."):
303         self._ctrl_sock = None
304         self._root_dir = root_dir
305         self._stop = False
306         self._rdbuf = ""
307
308     def forward(self):
309         self.connect()
310         print >>sys.stderr, "READY."
311         while not self._stop:
312             data = self.read_data()
313             self.send_to_server(data)
314             data = self.recv_from_server()
315             self.write_data(data)
316         self.disconnect()
317
318     def read_data(self):
319         return sys.stdin.readline()
320
321     def write_data(self, data):
322         sys.stdout.write(data)
323         # sys.stdout.write is buffered, this is why we need to do a flush()
324         sys.stdout.flush()
325
326     def send_to_server(self, data):
327         try:
328             self._ctrl_sock.send(data)
329         except (IOError, socket.error), e:
330             if e[0] == errno.EPIPE:
331                 self.connect()
332                 self._ctrl_sock.send(data)
333             else:
334                 raise e
335         encoded = data.rstrip() 
336         msg = base64.b64decode(encoded)
337         if msg == STOP_MSG:
338             self._stop = True
339
340     def recv_from_server(self):
341         data = [self._rdbuf]
342         chunk = data[0]
343         while '\n' not in chunk:
344             try:
345                 chunk = self._ctrl_sock.recv(1024)
346             except (OSError, socket.error), e:
347                 if e[0] != errno.EINTR:
348                     raise
349                 continue
350             if chunk:
351                 data.append(chunk)
352             else:
353                 # empty chunk = EOF
354                 break
355         data = ''.join(data).split('\n',1)
356         while len(data) < 2:
357             data.append('')
358         data, self._rdbuf = data
359         
360         return data+'\n'
361  
362     def connect(self):
363         self.disconnect()
364         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
365         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
366         self._ctrl_sock.connect(sock_addr)
367
368     def disconnect(self):
369         try:
370             self._ctrl_sock.close()
371         except:
372             pass
373
374 class Client(object):
375     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
376             agent = None, environment_setup = ""):
377         self.root_dir = root_dir
378         self.addr = (host, port)
379         self.user = user
380         self.agent = agent
381         self.environment_setup = environment_setup
382         self._stopped = False
383         self._deferreds = collections.deque()
384         self.connect()
385     
386     def __del__(self):
387         if self._process.poll() is None:
388             os.kill(self._process.pid, signal.SIGTERM)
389         self._process.wait()
390         
391     def connect(self):
392         root_dir = self.root_dir
393         (host, port) = self.addr
394         user = self.user
395         agent = self.agent
396         
397         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
398                 c.forward()" % (root_dir,)
399         if host != None:
400             self._process = popen_ssh_subprocess(python_code, host, port, 
401                     user, agent,
402                     environment_setup = self.environment_setup)
403             # popen_ssh_subprocess already waits for readiness
404             if self._process.poll():
405                 err = proc.stderr.read()
406                 raise RuntimeError("Client could not be reached: %s" % \
407                         err)
408         else:
409             self._process = subprocess.Popen(
410                     ["python", "-c", python_code],
411                     stdin = subprocess.PIPE, 
412                     stdout = subprocess.PIPE,
413                     stderr = subprocess.PIPE
414                 )
415                 
416         # Wait for the forwarder to be ready, otherwise nobody
417         # will be able to connect to it
418         helo = self._process.stderr.readline()
419         if helo != 'READY.\n':
420             raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
421                     helo + self._process.stderr.read())
422         
423     def send_msg(self, msg):
424         encoded = base64.b64encode(msg)
425         data = "%s\n" % encoded
426         
427         try:
428             self._process.stdin.write(data)
429         except (IOError, ValueError):
430             # dead process, poll it to un-zombify
431             self._process.poll()
432             
433             # try again after reconnect
434             # If it fails again, though, give up
435             self.connect()
436             self._process.stdin.write(data)
437
438     def send_stop(self):
439         self.send_msg(STOP_MSG)
440         self._stopped = True
441
442     def defer_reply(self, transform=None):
443         defer_entry = []
444         self._deferreds.append(defer_entry)
445         return defer.Defer(
446             functools.partial(self.read_reply, defer_entry, transform)
447         )
448         
449     def _read_reply(self):
450         data = self._process.stdout.readline()
451         encoded = data.rstrip() 
452         if not encoded:
453             # empty == eof == dead process, poll it to un-zombify
454             self._process.poll()
455             
456             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
457         return base64.b64decode(encoded)
458     
459     def read_reply(self, which=None, transform=None):
460         # Test to see if someone did it already
461         if which is not None and len(which):
462             # Ok, they did it...
463             # ...just return the deferred value
464             if transform:
465                 return transform(which[0])
466             else:
467                 return which[0]
468         
469         # Process all deferreds until the one we're looking for
470         # or until the queue is empty
471         while self._deferreds:
472             try:
473                 deferred = self._deferreds.popleft()
474             except IndexError:
475                 # emptied
476                 break
477             
478             deferred.append(self._read_reply())
479             if deferred is which:
480                 # We reached the one we were looking for
481                 if transform:
482                     return transform(deferred[0])
483                 else:
484                     return deferred[0]
485         
486         if which is None:
487             # They've requested a synchronous read
488             if transform:
489                 return transform(self._read_reply())
490             else:
491                 return self._read_reply()
492
493 def _make_server_key_args(server_key, host, port, args):
494     """ 
495     Returns a reference to the created temporary file, and adds the
496     corresponding arguments to the given argument list.
497     
498     Make sure to hold onto it until the process is done with the file
499     """
500     if port is not None:
501         host = '%s:%s' % (host,port)
502     # Create a temporary server key file
503     tmp_known_hosts = tempfile.NamedTemporaryFile()
504     
505     # Add the intended host key
506     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
507     
508     # If we're not in strict mode, add user-configured keys
509     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
510         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
511         if os.access(user_hosts_path, os.R_OK):
512             f = open(user_hosts_path, "r")
513             tmp_known_hosts.write(f.read())
514             f.close()
515         
516     tmp_known_hosts.flush()
517     
518     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
519     
520     return tmp_known_hosts
521
522 def popen_ssh_command(command, host, port, user, agent, 
523             stdin="", 
524             ident_key = None,
525             server_key = None,
526             tty = False,
527             timeout = None,
528             retry = 0,
529             err_on_timeout = True):
530         """
531         Executes a remote commands, returns ((stdout,stderr),process)
532         """
533         if TRACE:
534             print "ssh", host, command
535         
536         tmp_known_hosts = None
537         args = ['ssh',
538                 # Don't bother with localhost. Makes test easier
539                 '-o', 'NoHostAuthenticationForLocalhost=yes',
540                 '-l', user, host]
541         if agent:
542             args.append('-A')
543         if port:
544             args.append('-p%d' % port)
545         if ident_key:
546             args.extend(('-i', ident_key))
547         if tty:
548             args.append('-t')
549         if server_key:
550             # Create a temporary server key file
551             tmp_known_hosts = _make_server_key_args(
552                 server_key, host, port, args)
553         args.append(command)
554
555         while 1:
556             # connects to the remote host and starts a remote connection
557             proc = subprocess.Popen(args, 
558                     stdout = subprocess.PIPE,
559                     stdin = subprocess.PIPE, 
560                     stderr = subprocess.PIPE)
561             
562             # attach tempfile object to the process, to make sure the file stays
563             # alive until the process is finished with it
564             proc._known_hosts = tmp_known_hosts
565             
566             try:
567                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
568                 break
569             except RuntimeError,e:
570                 if retry <= 0:
571                     raise
572                 if TRACE:
573                     print " timedout -> ", e.args
574                 retry -= 1
575             
576         if TRACE:
577             print " -> ", out, err
578
579         return ((out, err), proc)
580  
581 def popen_scp(source, dest, 
582             port = None, 
583             agent = None, 
584             recursive = False,
585             ident_key = None,
586             server_key = None):
587         """
588         Copies from/to remote sites.
589         
590         Source and destination should have the user and host encoded
591         as per scp specs.
592         
593         If source is a file object, a special mode will be used to
594         create the remote file with the same contents.
595         
596         If dest is a file object, the remote file (source) will be
597         read and written into dest.
598         
599         In these modes, recursive cannot be True.
600         
601         Source can be a list of files to copy to a single destination,
602         in which case it is advised that the destination be a folder.
603         """
604         
605         if TRACE:
606             print "scp", source, dest
607         
608         if isinstance(source, file) and source.tell() == 0:
609             source = source.name
610         elif hasattr(source, 'read'):
611             tmp = tempfile.NamedTemporaryFile()
612             while True:
613                 buf = source.read(65536)
614                 if buf:
615                     tmp.write(buf)
616                 else:
617                     break
618             tmp.seek(0)
619             source = tmp.name
620         
621         if isinstance(source, file) or isinstance(dest, file) \
622                 or hasattr(source, 'read')  or hasattr(dest, 'write'):
623             assert not recursive
624             
625             # Parse source/destination as <user>@<server>:<path>
626             if isinstance(dest, basestring) and ':' in dest:
627                 remspec, path = dest.split(':',1)
628             elif isinstance(source, basestring) and ':' in source:
629                 remspec, path = source.split(':',1)
630             else:
631                 raise ValueError, "Both endpoints cannot be local"
632             user,host = remspec.rsplit('@',1)
633             tmp_known_hosts = None
634             
635             args = ['ssh', '-l', user, '-C',
636                     # Don't bother with localhost. Makes test easier
637                     '-o', 'NoHostAuthenticationForLocalhost=yes',
638                     host ]
639             if port:
640                 args.append('-P%d' % port)
641             if ident_key:
642                 args.extend(('-i', ident_key))
643             if server_key:
644                 # Create a temporary server key file
645                 tmp_known_hosts = _make_server_key_args(
646                     server_key, host, port, args)
647             
648             if isinstance(source, file) or hasattr(source, 'read'):
649                 args.append('cat > %s' % (shell_escape(path),))
650             elif isinstance(dest, file) or hasattr(dest, 'write'):
651                 args.append('cat %s' % (shell_escape(path),))
652             else:
653                 raise AssertionError, "Unreachable code reached! :-Q"
654             
655             # connects to the remote host and starts a remote connection
656             if isinstance(source, file):
657                 proc = subprocess.Popen(args, 
658                         stdout = open('/dev/null','w'),
659                         stderr = subprocess.PIPE,
660                         stdin = source)
661                 err = proc.stderr.read()
662                 proc._known_hosts = tmp_known_hosts
663                 eintr_retry(proc.wait)()
664                 return ((None,err), proc)
665             elif isinstance(dest, file):
666                 proc = subprocess.Popen(args, 
667                         stdout = open('/dev/null','w'),
668                         stderr = subprocess.PIPE,
669                         stdin = source)
670                 err = proc.stderr.read()
671                 proc._known_hosts = tmp_known_hosts
672                 eintr_retry(proc.wait)()
673                 return ((None,err), proc)
674             elif hasattr(source, 'read'):
675                 # file-like (but not file) source
676                 proc = subprocess.Popen(args, 
677                         stdout = open('/dev/null','w'),
678                         stderr = subprocess.PIPE,
679                         stdin = subprocess.PIPE)
680                 
681                 buf = None
682                 err = []
683                 while True:
684                     if not buf:
685                         buf = source.read(4096)
686                     if not buf:
687                         #EOF
688                         break
689                     
690                     rdrdy, wrdy, broken = select.select(
691                         [proc.stderr],
692                         [proc.stdin],
693                         [proc.stderr,proc.stdin])
694                     
695                     if proc.stderr in rdrdy:
696                         # use os.read for fully unbuffered behavior
697                         err.append(os.read(proc.stderr.fileno(), 4096))
698                     
699                     if proc.stdin in wrdy:
700                         proc.stdin.write(buf)
701                         buf = None
702                     
703                     if broken:
704                         break
705                 proc.stdin.close()
706                 err.append(proc.stderr.read())
707                     
708                 proc._known_hosts = tmp_known_hosts
709                 eintr_retry(proc.wait)()
710                 return ((None,''.join(err)), proc)
711             elif hasattr(dest, 'write'):
712                 # file-like (but not file) dest
713                 proc = subprocess.Popen(args, 
714                         stdout = subprocess.PIPE,
715                         stderr = subprocess.PIPE,
716                         stdin = open('/dev/null','w'))
717                 
718                 buf = None
719                 err = []
720                 while True:
721                     rdrdy, wrdy, broken = select.select(
722                         [proc.stderr, proc.stdout],
723                         [],
724                         [proc.stderr, proc.stdout])
725                     
726                     if proc.stderr in rdrdy:
727                         # use os.read for fully unbuffered behavior
728                         err.append(os.read(proc.stderr.fileno(), 4096))
729                     
730                     if proc.stdout in rdrdy:
731                         # use os.read for fully unbuffered behavior
732                         buf = os.read(proc.stdout.fileno(), 4096)
733                         dest.write(buf)
734                         
735                         if not buf:
736                             #EOF
737                             break
738                     
739                     if broken:
740                         break
741                 err.append(proc.stderr.read())
742                     
743                 proc._known_hosts = tmp_known_hosts
744                 eintr_retry(proc.wait)()
745                 return ((None,''.join(err)), proc)
746             else:
747                 raise AssertionError, "Unreachable code reached! :-Q"
748         else:
749             # Parse destination as <user>@<server>:<path>
750             if isinstance(dest, basestring) and ':' in dest:
751                 remspec, path = dest.split(':',1)
752             elif isinstance(source, basestring) and ':' in source:
753                 remspec, path = source.split(':',1)
754             else:
755                 raise ValueError, "Both endpoints cannot be local"
756             user,host = remspec.rsplit('@',1)
757             
758             # plain scp
759             tmp_known_hosts = None
760             args = ['scp', '-q', '-p', '-C',
761                     # Don't bother with localhost. Makes test easier
762                     '-o', 'NoHostAuthenticationForLocalhost=yes' ]
763             if port:
764                 args.append('-P%d' % port)
765             if recursive:
766                 args.append('-r')
767             if ident_key:
768                 args.extend(('-i', ident_key))
769             if server_key:
770                 # Create a temporary server key file
771                 tmp_known_hosts = _make_server_key_args(
772                     server_key, host, port, args)
773             if isinstance(source,list):
774                 args.extend(source)
775             else:
776                 args.append(source)
777             args.append(dest)
778
779             # connects to the remote host and starts a remote connection
780             proc = subprocess.Popen(args, 
781                     stdout = subprocess.PIPE,
782                     stdin = subprocess.PIPE, 
783                     stderr = subprocess.PIPE)
784             proc._known_hosts = tmp_known_hosts
785             
786             comm = proc.communicate()
787             eintr_retry(proc.wait)()
788             return (comm, proc)
789  
790 def popen_ssh_subprocess(python_code, host, port, user, agent, 
791         python_path = None,
792         ident_key = None,
793         server_key = None,
794         tty = False,
795         environment_setup = "",
796         waitcommand = False):
797         cmd = ""
798         if python_path:
799             python_path.replace("'", r"'\''")
800             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
801             cmd += " ; "
802         if environment_setup:
803             cmd += environment_setup
804             cmd += " ; "
805         # Uncomment for debug (to run everything under strace)
806         # We had to verify if strace works (cannot nest them)
807         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
808         #cmd += "$CMD "
809         #cmd += "strace -f -tt -s 200 -o strace$$.out "
810         cmd += "python -c '"
811         cmd += "import base64, os\n"
812         cmd += "cmd = \"\"\n"
813         cmd += "while True:\n"
814         cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
815         cmd += " if cmd[-1] == \"\\n\": break\n"
816         cmd += "cmd = base64.b64decode(cmd)\n"
817         # Uncomment for debug
818         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
819         if not waitcommand:
820             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
821         cmd += "exec(cmd)\n"
822         if waitcommand:
823             cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
824         cmd += "'"
825         
826         tmp_known_hosts = None
827         args = ['ssh',
828                 # Don't bother with localhost. Makes test easier
829                 '-o', 'NoHostAuthenticationForLocalhost=yes',
830                 '-l', user, host]
831         if agent:
832             args.append('-A')
833         if port:
834             args.append('-p%d' % port)
835         if ident_key:
836             args.extend(('-i', ident_key))
837         if tty:
838             args.append('-t')
839         if server_key:
840             # Create a temporary server key file
841             tmp_known_hosts = _make_server_key_args(
842                 server_key, host, port, args)
843         args.append(cmd)
844
845         # connects to the remote host and starts a remote rpyc connection
846         proc = subprocess.Popen(args, 
847                 stdout = subprocess.PIPE,
848                 stdin = subprocess.PIPE, 
849                 stderr = subprocess.PIPE)
850         proc._known_hosts = tmp_known_hosts
851         
852         # send the command to execute
853         os.write(proc.stdin.fileno(),
854                 base64.b64encode(python_code) + "\n")
855         msg = os.read(proc.stdout.fileno(), 3)
856         if msg != "OK\n":
857             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
858                 msg, proc.stdout.read(), proc.stderr.read())
859         return proc
860
861
862 # POSIX
863 def _communicate(self, input, timeout=None, err_on_timeout=True):
864     read_set = []
865     write_set = []
866     stdout = None # Return
867     stderr = None # Return
868     
869     killed = False
870     
871     if timeout is not None:
872         timelimit = time.time() + timeout
873         killtime = timelimit + 4
874         bailtime = timelimit + 4
875
876     if self.stdin:
877         # Flush stdio buffer.  This might block, if the user has
878         # been writing to .stdin in an uncontrolled fashion.
879         self.stdin.flush()
880         if input:
881             write_set.append(self.stdin)
882         else:
883             self.stdin.close()
884     if self.stdout:
885         read_set.append(self.stdout)
886         stdout = []
887     if self.stderr:
888         read_set.append(self.stderr)
889         stderr = []
890
891     input_offset = 0
892     while read_set or write_set:
893         if timeout is not None:
894             curtime = time.time()
895             if timeout is None or curtime > timelimit:
896                 if curtime > bailtime:
897                     break
898                 elif curtime > killtime:
899                     signum = signal.SIGKILL
900                 else:
901                     signum = signal.SIGTERM
902                 # Lets kill it
903                 os.kill(self.pid, signum)
904                 select_timeout = 0.5
905             else:
906                 select_timeout = timelimit - curtime + 0.1
907         else:
908             select_timeout = None
909             
910         try:
911             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
912         except select.error,e:
913             if e[0] != 4:
914                 raise
915             else:
916                 continue
917
918         if self.stdin in wlist:
919             # When select has indicated that the file is writable,
920             # we can write up to PIPE_BUF bytes without risk
921             # blocking.  POSIX defines PIPE_BUF >= 512
922             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
923             input_offset += bytes_written
924             if input_offset >= len(input):
925                 self.stdin.close()
926                 write_set.remove(self.stdin)
927
928         if self.stdout in rlist:
929             data = os.read(self.stdout.fileno(), 1024)
930             if data == "":
931                 self.stdout.close()
932                 read_set.remove(self.stdout)
933             stdout.append(data)
934
935         if self.stderr in rlist:
936             data = os.read(self.stderr.fileno(), 1024)
937             if data == "":
938                 self.stderr.close()
939                 read_set.remove(self.stderr)
940             stderr.append(data)
941     
942     # All data exchanged.  Translate lists into strings.
943     if stdout is not None:
944         stdout = ''.join(stdout)
945     if stderr is not None:
946         stderr = ''.join(stderr)
947
948     # Translate newlines, if requested.  We cannot let the file
949     # object do the translation: It is based on stdio, which is
950     # impossible to combine with select (unless forcing no
951     # buffering).
952     if self.universal_newlines and hasattr(file, 'newlines'):
953         if stdout:
954             stdout = self._translate_newlines(stdout)
955         if stderr:
956             stderr = self._translate_newlines(stderr)
957
958     if killed and err_on_timeout:
959         errcode = self.poll()
960         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
961     else:
962         if killed:
963             self.poll()
964         else:
965             self.wait()
966         return (stdout, stderr)
967