Forwarder must also forward EOF!!
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import socket
13 import signal
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import signal
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
28 MAX_FD = 1024
29
30 STOP_MSG = "STOP"
31
32 ERROR_LEVEL = 0
33 DEBUG_LEVEL = 1
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 if hasattr(os, "devnull"):
37     DEV_NULL = os.devnull
38 else:
39     DEV_NULL = "/dev/null"
40
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42
43 def shell_escape(s):
44     """ Escapes strings so that they are safe to use as command-line arguments """
45     if SHELL_SAFE.match(s):
46         # safe string - no escaping needed
47         return s
48     else:
49         # unsafe string - escape
50         def escp(c):
51             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
52                 return c
53             else:
54                 return "'$'\\x%02x''" % (ord(c),)
55         s = ''.join(map(escp,s))
56         return "'%s'" % (s,)
57
58 def eintr_retry(func):
59     import functools
60     @functools.wraps(func)
61     def rv(*p, **kw):
62         retry = kw.pop("_retry", False)
63         for i in xrange(0 if retry else 4):
64             try:
65                 return func(*p, **kw)
66             except (select.error, socket.error), args:
67                 if args[0] == errno.EINTR:
68                     continue
69                 else:
70                     raise 
71             except OSError, e:
72                 if e.errno == errno.EINTR:
73                     continue
74                 else:
75                     raise
76         else:
77             return func(*p, **kw)
78     return rv
79
80 class Server(object):
81     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
82         self._root_dir = root_dir
83         self._stop = False
84         self._ctrl_sock = None
85         self._log_level = log_level
86         self._rdbuf = ""
87         self._environment_setup = environment_setup
88
89     def run(self):
90         try:
91             if self.daemonize():
92                 self.post_daemonize()
93                 self.loop()
94                 self.cleanup()
95                 # ref: "os._exit(0)"
96                 # can not return normally after fork beacuse no exec was done.
97                 # This means that if we don't do a os._exit(0) here the code that 
98                 # follows the call to "Server.run()" in the "caller code" will be 
99                 # executed... but by now it has already been executed after the 
100                 # first process (the one that did the first fork) returned.
101                 os._exit(0)
102         except:
103             print >>sys.stderr, "SERVER_ERROR."
104             self.log_error()
105             self.cleanup()
106             os._exit(0)
107         print >>sys.stderr, "SERVER_READY."
108
109     def daemonize(self):
110         # pipes for process synchronization
111         (r, w) = os.pipe()
112         
113         # build root folder
114         root = os.path.normpath(self._root_dir)
115         if not os.path.exists(root):
116             os.makedirs(root, 0755)
117
118         pid1 = os.fork()
119         if pid1 > 0:
120             os.close(w)
121             while True:
122                 try:
123                     os.read(r, 1)
124                 except OSError, e: # pragma: no cover
125                     if e.errno == errno.EINTR:
126                         continue
127                     else:
128                         raise
129                 break
130             os.close(r)
131             # os.waitpid avoids leaving a <defunc> (zombie) process
132             st = os.waitpid(pid1, 0)[1]
133             if st:
134                 raise RuntimeError("Daemonization failed")
135             # return 0 to inform the caller method that this is not the 
136             # daemonized process
137             return 0
138         os.close(r)
139
140         # Decouple from parent environment.
141         os.chdir(self._root_dir)
142         os.umask(0)
143         os.setsid()
144
145         # fork 2
146         pid2 = os.fork()
147         if pid2 > 0:
148             # see ref: "os._exit(0)"
149             os._exit(0)
150
151         # close all open file descriptors.
152         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
153         if (max_fd == resource.RLIM_INFINITY):
154             max_fd = MAX_FD
155         for fd in range(3, max_fd):
156             if fd != w:
157                 try:
158                     os.close(fd)
159                 except OSError:
160                     pass
161
162         # Redirect standard file descriptors.
163         stdin = open(DEV_NULL, "r")
164         stderr = stdout = open(STD_ERR, "a", 0)
165         os.dup2(stdin.fileno(), sys.stdin.fileno())
166         # NOTE: sys.stdout.write will still be buffered, even if the file
167         # was opened with 0 buffer
168         os.dup2(stdout.fileno(), sys.stdout.fileno())
169         os.dup2(stderr.fileno(), sys.stderr.fileno())
170         
171         # setup environment
172         if self._environment_setup:
173             # parse environment variables and pass to child process
174             # do it by executing shell commands, in case there's some heavy setup involved
175             envproc = subprocess.Popen(
176                 [ "bash", "-c", 
177                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
178                         ( self._environment_setup, ) ],
179                 stdin = subprocess.PIPE, 
180                 stdout = subprocess.PIPE,
181                 stderr = subprocess.PIPE
182             )
183             out,err = envproc.communicate()
184
185             # parse new environment
186             if out:
187                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
188             
189                 # apply to current environment
190                 for name, value in environment.iteritems():
191                     os.environ[name] = value
192                 
193                 # apply pythonpath
194                 if 'PYTHONPATH' in environment:
195                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
196
197         # create control socket
198         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199         self._ctrl_sock.bind(CTRL_SOCK)
200         self._ctrl_sock.listen(0)
201
202         # let the parent process know that the daemonization is finished
203         os.write(w, "\n")
204         os.close(w)
205         return 1
206
207     def post_daemonize(self):
208         # QT, for some strange reason, redefines the SIGCHILD handler to write
209         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
210         # Server dameonization closes all file descriptors from fileno '3',
211         # but the overloaded handler (inherited by the forked process) will
212         # keep trying to write the \0 to fileno 'x', which might have been reused 
213         # after closing, for other operations. This is bad bad bad when fileno 'x'
214         # is in use for communication pouroses, because unexpected \0 start
215         # appearing in the communication messages... this is exactly what happens 
216         # when using netns in daemonized form. Thus, be have no other alternative than
217         # restoring the SIGCHLD handler to the default here.
218         import signal
219         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
220
221     def loop(self):
222         while not self._stop:
223             conn, addr = self._ctrl_sock.accept()
224             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
225             conn.settimeout(5)
226             while not self._stop:
227                 try:
228                     msg = self.recv_msg(conn)
229                 except socket.timeout, e:
230                     self.log_error()
231                     break
232                 
233                 if not msg:
234                     self.log_error("CONNECTION LOST")
235                     break
236                     
237                 if msg == STOP_MSG:
238                     self._stop = True
239                     reply = self.stop_action()
240                 else:
241                     reply = self.reply_action(msg)
242                 
243                 try:
244                     self.send_reply(conn, reply)
245                 except socket.error:
246                     self.log_error()
247                     self.log_error("NOTICE: Awaiting for reconnection")
248                     break
249             try:
250                 conn.close()
251             except:
252                 # Doesn't matter
253                 self.log_error()
254
255     def recv_msg(self, conn):
256         data = [self._rdbuf]
257         chunk = data[0]
258         while '\n' not in chunk:
259             try:
260                 chunk = conn.recv(1024)
261             except (OSError, socket.error), e:
262                 if e[0] != errno.EINTR:
263                     raise
264                 else:
265                     continue
266             if chunk:
267                 data.append(chunk)
268             else:
269                 # empty chunk = EOF
270                 break
271         data = ''.join(data).split('\n',1)
272         while len(data) < 2:
273             data.append('')
274         data, self._rdbuf = data
275         
276         decoded = base64.b64decode(data)
277         return decoded.rstrip()
278
279     def send_reply(self, conn, reply):
280         encoded = base64.b64encode(reply)
281         conn.send("%s\n" % encoded)
282        
283     def cleanup(self):
284         try:
285             self._ctrl_sock.close()
286             os.remove(CTRL_SOCK)
287         except:
288             self.log_error()
289
290     def stop_action(self):
291         return "Stopping server"
292
293     def reply_action(self, msg):
294         return "Reply to: %s" % msg
295
296     def log_error(self, text = None, context = ''):
297         if text == None:
298             text = traceback.format_exc()
299         date = time.strftime("%Y-%m-%d %H:%M:%S")
300         if context:
301             context = " (%s)" % (context,)
302         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
303         return text
304
305     def log_debug(self, text):
306         if self._log_level == DEBUG_LEVEL:
307             date = time.strftime("%Y-%m-%d %H:%M:%S")
308             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
309
310 class Forwarder(object):
311     def __init__(self, root_dir = "."):
312         self._ctrl_sock = None
313         self._root_dir = root_dir
314         self._stop = False
315         self._rdbuf = ""
316
317     def forward(self):
318         self.connect()
319         print >>sys.stderr, "FORWARDER_READY."
320         while not self._stop:
321             data = self.read_data()
322             if not data:
323                 # Connection to client lost
324                 break
325             self.send_to_server(data)
326             
327             data = self.recv_from_server()
328             if not data:
329                 # Connection to server lost
330                 raise IOError, "Connection to server lost while "\
331                     "expecting response"
332             self.write_data(data)
333         self.disconnect()
334
335     def read_data(self):
336         return sys.stdin.readline()
337
338     def write_data(self, data):
339         sys.stdout.write(data)
340         # sys.stdout.write is buffered, this is why we need to do a flush()
341         sys.stdout.flush()
342
343     def send_to_server(self, data):
344         try:
345             self._ctrl_sock.send(data)
346         except (IOError, socket.error), e:
347             if e[0] == errno.EPIPE:
348                 self.connect()
349                 self._ctrl_sock.send(data)
350             else:
351                 raise e
352         encoded = data.rstrip() 
353         msg = base64.b64decode(encoded)
354         if msg == STOP_MSG:
355             self._stop = True
356
357     def recv_from_server(self):
358         data = [self._rdbuf]
359         chunk = data[0]
360         while '\n' not in chunk:
361             try:
362                 chunk = self._ctrl_sock.recv(1024)
363             except (OSError, socket.error), e:
364                 if e[0] != errno.EINTR:
365                     raise
366                 continue
367             if chunk:
368                 data.append(chunk)
369             else:
370                 # empty chunk = EOF
371                 break
372         data = ''.join(data).split('\n',1)
373         while len(data) < 2:
374             data.append('')
375         data, self._rdbuf = data
376         
377         return data+'\n'
378  
379     def connect(self):
380         self.disconnect()
381         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
382         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
383         self._ctrl_sock.connect(sock_addr)
384
385     def disconnect(self):
386         try:
387             self._ctrl_sock.close()
388         except:
389             pass
390
391 class Client(object):
392     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
393             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
394             environment_setup = ""):
395         self.root_dir = root_dir
396         self.addr = (host, port)
397         self.user = user
398         self.agent = agent
399         self.sudo = sudo
400         self.communication = communication
401         self.environment_setup = environment_setup
402         self._stopped = False
403         self._deferreds = collections.deque()
404         self.connect()
405     
406     def __del__(self):
407         if self._process.poll() is None:
408             os.kill(self._process.pid, signal.SIGTERM)
409         self._process.wait()
410         
411     def connect(self):
412         root_dir = self.root_dir
413         (host, port) = self.addr
414         user = self.user
415         agent = self.agent
416         sudo = self.sudo
417         communication = self.communication
418         
419         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
420                 c.forward()" % (root_dir,)
421
422         self._process = popen_python(python_code, 
423                     communication = communication,
424                     host = host, 
425                     port = port, 
426                     user = user, 
427                     agent = agent, 
428                     sudo = sudo, 
429                     environment_setup = self.environment_setup)
430                
431         # Wait for the forwarder to be ready, otherwise nobody
432         # will be able to connect to it
433         helo = self._process.stderr.readline()
434         if helo != 'FORWARDER_READY.\n':
435             raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
436                     helo + self._process.stderr.read())
437         
438     def send_msg(self, msg):
439         encoded = base64.b64encode(msg)
440         data = "%s\n" % encoded
441         
442         try:
443             self._process.stdin.write(data)
444         except (IOError, ValueError):
445             # dead process, poll it to un-zombify
446             self._process.poll()
447             
448             # try again after reconnect
449             # If it fails again, though, give up
450             self.connect()
451             self._process.stdin.write(data)
452
453     def send_stop(self):
454         self.send_msg(STOP_MSG)
455         self._stopped = True
456
457     def defer_reply(self, transform=None):
458         defer_entry = []
459         self._deferreds.append(defer_entry)
460         return defer.Defer(
461             functools.partial(self.read_reply, defer_entry, transform)
462         )
463         
464     def _read_reply(self):
465         data = self._process.stdout.readline()
466         encoded = data.rstrip() 
467         if not encoded:
468             # empty == eof == dead process, poll it to un-zombify
469             self._process.poll()
470             
471             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
472         return base64.b64decode(encoded)
473     
474     def read_reply(self, which=None, transform=None):
475         # Test to see if someone did it already
476         if which is not None and len(which):
477             # Ok, they did it...
478             # ...just return the deferred value
479             if transform:
480                 return transform(which[0])
481             else:
482                 return which[0]
483         
484         # Process all deferreds until the one we're looking for
485         # or until the queue is empty
486         while self._deferreds:
487             try:
488                 deferred = self._deferreds.popleft()
489             except IndexError:
490                 # emptied
491                 break
492             
493             deferred.append(self._read_reply())
494             if deferred is which:
495                 # We reached the one we were looking for
496                 if transform:
497                     return transform(deferred[0])
498                 else:
499                     return deferred[0]
500         
501         if which is None:
502             # They've requested a synchronous read
503             if transform:
504                 return transform(self._read_reply())
505             else:
506                 return self._read_reply()
507
508 def _make_server_key_args(server_key, host, port, args):
509     """ 
510     Returns a reference to the created temporary file, and adds the
511     corresponding arguments to the given argument list.
512     
513     Make sure to hold onto it until the process is done with the file
514     """
515     if port is not None:
516         host = '%s:%s' % (host,port)
517     # Create a temporary server key file
518     tmp_known_hosts = tempfile.NamedTemporaryFile()
519     
520     # Add the intended host key
521     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
522     
523     # If we're not in strict mode, add user-configured keys
524     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
525         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
526         if os.access(user_hosts_path, os.R_OK):
527             f = open(user_hosts_path, "r")
528             tmp_known_hosts.write(f.read())
529             f.close()
530         
531     tmp_known_hosts.flush()
532     
533     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
534     
535     return tmp_known_hosts
536
537 def popen_ssh_command(command, host, port, user, agent, 
538         stdin="", 
539         ident_key = None,
540         server_key = None,
541         tty = False,
542         timeout = None,
543         retry = 0,
544         err_on_timeout = True):
545     """
546     Executes a remote commands, returns ((stdout,stderr),process)
547     """
548     if TRACE:
549         print "ssh", host, command
550     
551     tmp_known_hosts = None
552     args = ['ssh',
553             # Don't bother with localhost. Makes test easier
554             '-o', 'NoHostAuthenticationForLocalhost=yes',
555             '-l', user, host]
556     if agent:
557         args.append('-A')
558     if port:
559         args.append('-p%d' % port)
560     if ident_key:
561         args.extend(('-i', ident_key))
562     if tty:
563         args.append('-t')
564     if server_key:
565         # Create a temporary server key file
566         tmp_known_hosts = _make_server_key_args(
567             server_key, host, port, args)
568     args.append(command)
569
570     while 1:
571         # connects to the remote host and starts a remote connection
572         proc = subprocess.Popen(args, 
573                 stdout = subprocess.PIPE,
574                 stdin = subprocess.PIPE, 
575                 stderr = subprocess.PIPE)
576         
577         # attach tempfile object to the process, to make sure the file stays
578         # alive until the process is finished with it
579         proc._known_hosts = tmp_known_hosts
580         
581         try:
582             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
583             break
584         except RuntimeError,e:
585             if retry <= 0:
586                 raise
587             if TRACE:
588                 print " timedout -> ", e.args
589             retry -= 1
590         
591     if TRACE:
592         print " -> ", out, err
593
594     return ((out, err), proc)
595
596 def popen_scp(source, dest, 
597         port = None, 
598         agent = None, 
599         recursive = False,
600         ident_key = None,
601         server_key = None):
602     """
603     Copies from/to remote sites.
604     
605     Source and destination should have the user and host encoded
606     as per scp specs.
607     
608     If source is a file object, a special mode will be used to
609     create the remote file with the same contents.
610     
611     If dest is a file object, the remote file (source) will be
612     read and written into dest.
613     
614     In these modes, recursive cannot be True.
615     
616     Source can be a list of files to copy to a single destination,
617     in which case it is advised that the destination be a folder.
618     """
619     
620     if TRACE:
621         print "scp", source, dest
622     
623     if isinstance(source, file) and source.tell() == 0:
624         source = source.name
625     elif hasattr(source, 'read'):
626         tmp = tempfile.NamedTemporaryFile()
627         while True:
628             buf = source.read(65536)
629             if buf:
630                 tmp.write(buf)
631             else:
632                 break
633         tmp.seek(0)
634         source = tmp.name
635     
636     if isinstance(source, file) or isinstance(dest, file) \
637             or hasattr(source, 'read')  or hasattr(dest, 'write'):
638         assert not recursive
639         
640         # Parse source/destination as <user>@<server>:<path>
641         if isinstance(dest, basestring) and ':' in dest:
642             remspec, path = dest.split(':',1)
643         elif isinstance(source, basestring) and ':' in source:
644             remspec, path = source.split(':',1)
645         else:
646             raise ValueError, "Both endpoints cannot be local"
647         user,host = remspec.rsplit('@',1)
648         tmp_known_hosts = None
649         
650         args = ['ssh', '-l', user, '-C',
651                 # Don't bother with localhost. Makes test easier
652                 '-o', 'NoHostAuthenticationForLocalhost=yes',
653                 host ]
654         if port:
655             args.append('-P%d' % port)
656         if ident_key:
657             args.extend(('-i', ident_key))
658         if server_key:
659             # Create a temporary server key file
660             tmp_known_hosts = _make_server_key_args(
661                 server_key, host, port, args)
662         
663         if isinstance(source, file) or hasattr(source, 'read'):
664             args.append('cat > %s' % (shell_escape(path),))
665         elif isinstance(dest, file) or hasattr(dest, 'write'):
666             args.append('cat %s' % (shell_escape(path),))
667         else:
668             raise AssertionError, "Unreachable code reached! :-Q"
669         
670         # connects to the remote host and starts a remote connection
671         if isinstance(source, file):
672             proc = subprocess.Popen(args, 
673                     stdout = open('/dev/null','w'),
674                     stderr = subprocess.PIPE,
675                     stdin = source)
676             err = proc.stderr.read()
677             proc._known_hosts = tmp_known_hosts
678             eintr_retry(proc.wait)()
679             return ((None,err), proc)
680         elif isinstance(dest, file):
681             proc = subprocess.Popen(args, 
682                     stdout = open('/dev/null','w'),
683                     stderr = subprocess.PIPE,
684                     stdin = source)
685             err = proc.stderr.read()
686             proc._known_hosts = tmp_known_hosts
687             eintr_retry(proc.wait)()
688             return ((None,err), proc)
689         elif hasattr(source, 'read'):
690             # file-like (but not file) source
691             proc = subprocess.Popen(args, 
692                     stdout = open('/dev/null','w'),
693                     stderr = subprocess.PIPE,
694                     stdin = subprocess.PIPE)
695             
696             buf = None
697             err = []
698             while True:
699                 if not buf:
700                     buf = source.read(4096)
701                 if not buf:
702                     #EOF
703                     break
704                 
705                 rdrdy, wrdy, broken = select.select(
706                     [proc.stderr],
707                     [proc.stdin],
708                     [proc.stderr,proc.stdin])
709                 
710                 if proc.stderr in rdrdy:
711                     # use os.read for fully unbuffered behavior
712                     err.append(os.read(proc.stderr.fileno(), 4096))
713                 
714                 if proc.stdin in wrdy:
715                     proc.stdin.write(buf)
716                     buf = None
717                 
718                 if broken:
719                     break
720             proc.stdin.close()
721             err.append(proc.stderr.read())
722                 
723             proc._known_hosts = tmp_known_hosts
724             eintr_retry(proc.wait)()
725             return ((None,''.join(err)), proc)
726         elif hasattr(dest, 'write'):
727             # file-like (but not file) dest
728             proc = subprocess.Popen(args, 
729                     stdout = subprocess.PIPE,
730                     stderr = subprocess.PIPE,
731                     stdin = open('/dev/null','w'))
732             
733             buf = None
734             err = []
735             while True:
736                 rdrdy, wrdy, broken = select.select(
737                     [proc.stderr, proc.stdout],
738                     [],
739                     [proc.stderr, proc.stdout])
740                 
741                 if proc.stderr in rdrdy:
742                     # use os.read for fully unbuffered behavior
743                     err.append(os.read(proc.stderr.fileno(), 4096))
744                 
745                 if proc.stdout in rdrdy:
746                     # use os.read for fully unbuffered behavior
747                     buf = os.read(proc.stdout.fileno(), 4096)
748                     dest.write(buf)
749                     
750                     if not buf:
751                         #EOF
752                         break
753                 
754                 if broken:
755                     break
756             err.append(proc.stderr.read())
757                 
758             proc._known_hosts = tmp_known_hosts
759             eintr_retry(proc.wait)()
760             return ((None,''.join(err)), proc)
761         else:
762             raise AssertionError, "Unreachable code reached! :-Q"
763     else:
764         # Parse destination as <user>@<server>:<path>
765         if isinstance(dest, basestring) and ':' in dest:
766             remspec, path = dest.split(':',1)
767         elif isinstance(source, basestring) and ':' in source:
768             remspec, path = source.split(':',1)
769         else:
770             raise ValueError, "Both endpoints cannot be local"
771         user,host = remspec.rsplit('@',1)
772         
773         # plain scp
774         tmp_known_hosts = None
775         args = ['scp', '-q', '-p', '-C',
776                 # Don't bother with localhost. Makes test easier
777                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
778         if port:
779             args.append('-P%d' % port)
780         if recursive:
781             args.append('-r')
782         if ident_key:
783             args.extend(('-i', ident_key))
784         if server_key:
785             # Create a temporary server key file
786             tmp_known_hosts = _make_server_key_args(
787                 server_key, host, port, args)
788         if isinstance(source,list):
789             args.extend(source)
790         else:
791             args.append(source)
792         args.append(dest)
793
794         # connects to the remote host and starts a remote connection
795         proc = subprocess.Popen(args, 
796                 stdout = subprocess.PIPE,
797                 stdin = subprocess.PIPE, 
798                 stderr = subprocess.PIPE)
799         proc._known_hosts = tmp_known_hosts
800         
801         comm = proc.communicate()
802         eintr_retry(proc.wait)()
803         return (comm, proc)
804
805 def decode_and_execute():
806     # The python code we want to execute might have characters that 
807     # are not compatible with the 'inline' mode we are using. To avoid
808     # problems we receive the encoded python code in base64 as a input 
809     # stream and decode it for execution.
810     import base64, os
811     cmd = ""
812     while True:
813         cmd += os.read(0, 1)# one byte from stdin
814         if cmd[-1] == "\n": 
815             break
816     cmd = base64.b64decode(cmd)
817     # Uncomment for debug
818     #os.write(2, "Executing python code: %s\n" % cmd)
819     os.write(1, "OK\n") # send a sync message
820     exec(cmd)
821
822 def popen_python(python_code, 
823         communication = DC.ACCESS_LOCAL,
824         host = None, 
825         port = None, 
826         user = None, 
827         agent = False, 
828         python_path = None,
829         ident_key = None,
830         server_key = None,
831         tty = False,
832         sudo = False, 
833         environment_setup = ""):
834
835     shell = False
836     cmd = ""
837     if sudo:
838         cmd +="sudo "
839     if python_path:
840         python_path.replace("'", r"'\''")
841         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
842         cmd += " ; "
843     if environment_setup:
844         cmd += environment_setup
845         cmd += " ; "
846     # Uncomment for debug (to run everything under strace)
847     # We had to verify if strace works (cannot nest them)
848     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
849     #cmd += "$CMD "
850     #cmd += "strace -f -tt -s 200 -o strace$$.out "
851     import nepi
852     cmd += "python -c 'import sys; sys.path.append(%s); from nepi.util import server; server.decode_and_execute()'" % (
853         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
854     )
855
856     if communication == DC.ACCESS_SSH:
857         tmp_known_hosts = None
858         args = ['ssh',
859                 # Don't bother with localhost. Makes test easier
860                 '-o', 'NoHostAuthenticationForLocalhost=yes',
861                 '-l', user, host]
862         if agent:
863             args.append('-A')
864         if port:
865             args.append('-p%d' % port)
866         if ident_key:
867             args.extend(('-i', ident_key))
868         if tty:
869             args.append('-t')
870         if server_key:
871             # Create a temporary server key file
872             tmp_known_hosts = _make_server_key_args(
873                 server_key, host, port, args)
874         args.append(cmd)
875     else:
876         args = [cmd]
877         shell = True
878
879     # connects to the remote host and starts a remote
880     proc = subprocess.Popen(args,
881             shell = shell, 
882             stdout = subprocess.PIPE,
883             stdin = subprocess.PIPE, 
884             stderr = subprocess.PIPE)
885
886     if communication == DC.ACCESS_SSH:
887         proc._known_hosts = tmp_known_hosts
888
889     # send the command to execute
890     os.write(proc.stdin.fileno(),
891             base64.b64encode(python_code) + "\n")
892     msg = os.read(proc.stdout.fileno(), 3)
893     if msg != "OK\n":
894         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
895             msg, proc.stdout.read(), proc.stderr.read())
896
897     return proc
898
899 # POSIX
900 def _communicate(self, input, timeout=None, err_on_timeout=True):
901     read_set = []
902     write_set = []
903     stdout = None # Return
904     stderr = None # Return
905     
906     killed = False
907     
908     if timeout is not None:
909         timelimit = time.time() + timeout
910         killtime = timelimit + 4
911         bailtime = timelimit + 4
912
913     if self.stdin:
914         # Flush stdio buffer.  This might block, if the user has
915         # been writing to .stdin in an uncontrolled fashion.
916         self.stdin.flush()
917         if input:
918             write_set.append(self.stdin)
919         else:
920             self.stdin.close()
921     if self.stdout:
922         read_set.append(self.stdout)
923         stdout = []
924     if self.stderr:
925         read_set.append(self.stderr)
926         stderr = []
927
928     input_offset = 0
929     while read_set or write_set:
930         if timeout is not None:
931             curtime = time.time()
932             if timeout is None or curtime > timelimit:
933                 if curtime > bailtime:
934                     break
935                 elif curtime > killtime:
936                     signum = signal.SIGKILL
937                 else:
938                     signum = signal.SIGTERM
939                 # Lets kill it
940                 os.kill(self.pid, signum)
941                 select_timeout = 0.5
942             else:
943                 select_timeout = timelimit - curtime + 0.1
944         else:
945             select_timeout = None
946             
947         try:
948             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
949         except select.error,e:
950             if e[0] != 4:
951                 raise
952             else:
953                 continue
954
955         if self.stdin in wlist:
956             # When select has indicated that the file is writable,
957             # we can write up to PIPE_BUF bytes without risk
958             # blocking.  POSIX defines PIPE_BUF >= 512
959             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
960             input_offset += bytes_written
961             if input_offset >= len(input):
962                 self.stdin.close()
963                 write_set.remove(self.stdin)
964
965         if self.stdout in rlist:
966             data = os.read(self.stdout.fileno(), 1024)
967             if data == "":
968                 self.stdout.close()
969                 read_set.remove(self.stdout)
970             stdout.append(data)
971
972         if self.stderr in rlist:
973             data = os.read(self.stderr.fileno(), 1024)
974             if data == "":
975                 self.stderr.close()
976                 read_set.remove(self.stderr)
977             stderr.append(data)
978     
979     # All data exchanged.  Translate lists into strings.
980     if stdout is not None:
981         stdout = ''.join(stdout)
982     if stderr is not None:
983         stderr = ''.join(stderr)
984
985     # Translate newlines, if requested.  We cannot let the file
986     # object do the translation: It is based on stdio, which is
987     # impossible to combine with select (unless forcing no
988     # buffering).
989     if self.universal_newlines and hasattr(file, 'newlines'):
990         if stdout:
991             stdout = self._translate_newlines(stdout)
992         if stderr:
993             stderr = self._translate_newlines(stderr)
994
995     if killed and err_on_timeout:
996         errcode = self.poll()
997         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
998     else:
999         if killed:
1000             self.poll()
1001         else:
1002             self.wait()
1003         return (stdout, stderr)
1004