Daemonized servers are now always launched with popen, and not directly invoked in...
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import socket
13 import signal
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import signal
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 STD_ERR = "stderr.log"
28 MAX_FD = 1024
29
30 STOP_MSG = "STOP"
31
32 ERROR_LEVEL = 0
33 DEBUG_LEVEL = 1
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 if hasattr(os, "devnull"):
37     DEV_NULL = os.devnull
38 else:
39     DEV_NULL = "/dev/null"
40
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
42
43 def shell_escape(s):
44     """ Escapes strings so that they are safe to use as command-line arguments """
45     if SHELL_SAFE.match(s):
46         # safe string - no escaping needed
47         return s
48     else:
49         # unsafe string - escape
50         def escp(c):
51             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
52                 return c
53             else:
54                 return "'$'\\x%02x''" % (ord(c),)
55         s = ''.join(map(escp,s))
56         return "'%s'" % (s,)
57
58 def eintr_retry(func):
59     import functools
60     @functools.wraps(func)
61     def rv(*p, **kw):
62         retry = kw.pop("_retry", False)
63         for i in xrange(0 if retry else 4):
64             try:
65                 return func(*p, **kw)
66             except (select.error, socket.error), args:
67                 if args[0] == errno.EINTR:
68                     continue
69                 else:
70                     raise 
71             except OSError, e:
72                 if e.errno == errno.EINTR:
73                     continue
74                 else:
75                     raise
76         else:
77             return func(*p, **kw)
78     return rv
79
80 class Server(object):
81     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
82         self._root_dir = root_dir
83         self._stop = False
84         self._ctrl_sock = None
85         self._log_level = log_level
86         self._rdbuf = ""
87         self._environment_setup = environment_setup
88
89     def run(self):
90         try:
91             if self.daemonize():
92                 self.post_daemonize()
93                 self.loop()
94                 self.cleanup()
95                 # ref: "os._exit(0)"
96                 # can not return normally after fork beacuse no exec was done.
97                 # This means that if we don't do a os._exit(0) here the code that 
98                 # follows the call to "Server.run()" in the "caller code" will be 
99                 # executed... but by now it has already been executed after the 
100                 # first process (the one that did the first fork) returned.
101                 os._exit(0)
102         except:
103             print >>sys.stderr, "SERVER_ERROR."
104             self.log_error()
105             self.cleanup()
106             os._exit(0)
107         print >>sys.stderr, "SERVER_READY."
108
109     def daemonize(self):
110         # pipes for process synchronization
111         (r, w) = os.pipe()
112         
113         # build root folder
114         root = os.path.normpath(self._root_dir)
115         if not os.path.exists(root):
116             os.makedirs(root, 0755)
117
118         pid1 = os.fork()
119         if pid1 > 0:
120             os.close(w)
121             while True:
122                 try:
123                     os.read(r, 1)
124                 except OSError, e: # pragma: no cover
125                     if e.errno == errno.EINTR:
126                         continue
127                     else:
128                         raise
129                 break
130             os.close(r)
131             # os.waitpid avoids leaving a <defunc> (zombie) process
132             st = os.waitpid(pid1, 0)[1]
133             if st:
134                 raise RuntimeError("Daemonization failed")
135             # return 0 to inform the caller method that this is not the 
136             # daemonized process
137             return 0
138         os.close(r)
139
140         # Decouple from parent environment.
141         os.chdir(self._root_dir)
142         os.umask(0)
143         os.setsid()
144
145         # fork 2
146         pid2 = os.fork()
147         if pid2 > 0:
148             # see ref: "os._exit(0)"
149             os._exit(0)
150
151         # close all open file descriptors.
152         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
153         if (max_fd == resource.RLIM_INFINITY):
154             max_fd = MAX_FD
155         for fd in range(3, max_fd):
156             if fd != w:
157                 try:
158                     os.close(fd)
159                 except OSError:
160                     pass
161
162         # Redirect standard file descriptors.
163         stdin = open(DEV_NULL, "r")
164         stderr = stdout = open(STD_ERR, "a", 0)
165         os.dup2(stdin.fileno(), sys.stdin.fileno())
166         # NOTE: sys.stdout.write will still be buffered, even if the file
167         # was opened with 0 buffer
168         os.dup2(stdout.fileno(), sys.stdout.fileno())
169         os.dup2(stderr.fileno(), sys.stderr.fileno())
170         
171         # setup environment
172         if self._environment_setup:
173             # parse environment variables and pass to child process
174             # do it by executing shell commands, in case there's some heavy setup involved
175             envproc = subprocess.Popen(
176                 [ "bash", "-c", 
177                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
178                         ( self._environment_setup, ) ],
179                 stdin = subprocess.PIPE, 
180                 stdout = subprocess.PIPE,
181                 stderr = subprocess.PIPE
182             )
183             out,err = envproc.communicate()
184
185             # parse new environment
186             if out:
187                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
188             
189                 # apply to current environment
190                 for name, value in environment.iteritems():
191                     os.environ[name] = value
192                 
193                 # apply pythonpath
194                 if 'PYTHONPATH' in environment:
195                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
196
197         # create control socket
198         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199         self._ctrl_sock.bind(CTRL_SOCK)
200         self._ctrl_sock.listen(0)
201
202         # let the parent process know that the daemonization is finished
203         os.write(w, "\n")
204         os.close(w)
205         return 1
206
207     def post_daemonize(self):
208         # QT, for some strange reason, redefines the SIGCHILD handler to write
209         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
210         # Server dameonization closes all file descriptors from fileno '3',
211         # but the overloaded handler (inherited by the forked process) will
212         # keep trying to write the \0 to fileno 'x', which might have been reused 
213         # after closing, for other operations. This is bad bad bad when fileno 'x'
214         # is in use for communication pouroses, because unexpected \0 start
215         # appearing in the communication messages... this is exactly what happens 
216         # when using netns in daemonized form. Thus, be have no other alternative than
217         # restoring the SIGCHLD handler to the default here.
218         import signal
219         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
220
221     def loop(self):
222         while not self._stop:
223             conn, addr = self._ctrl_sock.accept()
224             conn.settimeout(5)
225             while not self._stop:
226                 try:
227                     msg = self.recv_msg(conn)
228                 except socket.timeout, e:
229                     self.log_error()
230                     break
231                     
232                 if msg == STOP_MSG:
233                     self._stop = True
234                     reply = self.stop_action()
235                 else:
236                     reply = self.reply_action(msg)
237                 
238                 try:
239                     self.send_reply(conn, reply)
240                 except socket.error:
241                     self.log_error()
242                     self.log_error("NOTICE: Awaiting for reconnection")
243                     break
244             try:
245                 conn.close()
246             except:
247                 # Doesn't matter
248                 self.log_error()
249
250     def recv_msg(self, conn):
251         data = [self._rdbuf]
252         chunk = data[0]
253         while '\n' not in chunk:
254             try:
255                 chunk = conn.recv(1024)
256             except (OSError, socket.error), e:
257                 if e[0] != errno.EINTR:
258                     raise
259                 else:
260                     continue
261             if chunk:
262                 data.append(chunk)
263             else:
264                 # empty chunk = EOF
265                 break
266         data = ''.join(data).split('\n',1)
267         while len(data) < 2:
268             data.append('')
269         data, self._rdbuf = data
270         
271         decoded = base64.b64decode(data)
272         return decoded.rstrip()
273
274     def send_reply(self, conn, reply):
275         encoded = base64.b64encode(reply)
276         conn.send("%s\n" % encoded)
277        
278     def cleanup(self):
279         try:
280             self._ctrl_sock.close()
281             os.remove(CTRL_SOCK)
282         except:
283             self.log_error()
284
285     def stop_action(self):
286         return "Stopping server"
287
288     def reply_action(self, msg):
289         return "Reply to: %s" % msg
290
291     def log_error(self, text = None, context = ''):
292         if text == None:
293             text = traceback.format_exc()
294         date = time.strftime("%Y-%m-%d %H:%M:%S")
295         if context:
296             context = " (%s)" % (context,)
297         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
298         return text
299
300     def log_debug(self, text):
301         if self._log_level == DEBUG_LEVEL:
302             date = time.strftime("%Y-%m-%d %H:%M:%S")
303             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
304
305 class Forwarder(object):
306     def __init__(self, root_dir = "."):
307         self._ctrl_sock = None
308         self._root_dir = root_dir
309         self._stop = False
310         self._rdbuf = ""
311
312     def forward(self):
313         self.connect()
314         print >>sys.stderr, "FORWARDER_READY."
315         while not self._stop:
316             data = self.read_data()
317             self.send_to_server(data)
318             data = self.recv_from_server()
319             self.write_data(data)
320         self.disconnect()
321
322     def read_data(self):
323         return sys.stdin.readline()
324
325     def write_data(self, data):
326         sys.stdout.write(data)
327         # sys.stdout.write is buffered, this is why we need to do a flush()
328         sys.stdout.flush()
329
330     def send_to_server(self, data):
331         try:
332             self._ctrl_sock.send(data)
333         except (IOError, socket.error), e:
334             if e[0] == errno.EPIPE:
335                 self.connect()
336                 self._ctrl_sock.send(data)
337             else:
338                 raise e
339         encoded = data.rstrip() 
340         msg = base64.b64decode(encoded)
341         if msg == STOP_MSG:
342             self._stop = True
343
344     def recv_from_server(self):
345         data = [self._rdbuf]
346         chunk = data[0]
347         while '\n' not in chunk:
348             try:
349                 chunk = self._ctrl_sock.recv(1024)
350             except (OSError, socket.error), e:
351                 if e[0] != errno.EINTR:
352                     raise
353                 continue
354             if chunk:
355                 data.append(chunk)
356             else:
357                 # empty chunk = EOF
358                 break
359         data = ''.join(data).split('\n',1)
360         while len(data) < 2:
361             data.append('')
362         data, self._rdbuf = data
363         
364         return data+'\n'
365  
366     def connect(self):
367         self.disconnect()
368         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
369         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
370         self._ctrl_sock.connect(sock_addr)
371
372     def disconnect(self):
373         try:
374             self._ctrl_sock.close()
375         except:
376             pass
377
378 class Client(object):
379     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
380             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
381             environment_setup = ""):
382         self.root_dir = root_dir
383         self.addr = (host, port)
384         self.user = user
385         self.agent = agent
386         self.sudo = sudo
387         self.communication = communication
388         self.environment_setup = environment_setup
389         self._stopped = False
390         self._deferreds = collections.deque()
391         self.connect()
392     
393     def __del__(self):
394         if self._process.poll() is None:
395             os.kill(self._process.pid, signal.SIGTERM)
396         self._process.wait()
397         
398     def connect(self):
399         root_dir = self.root_dir
400         (host, port) = self.addr
401         user = self.user
402         agent = self.agent
403         sudo = self.sudo
404         communication = self.communication
405         
406         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
407                 c.forward()" % (root_dir,)
408
409         self._process = popen_python(python_code, 
410                     communication = communication,
411                     host = host, 
412                     port = port, 
413                     user = user, 
414                     agent = agent, 
415                     sudo = sudo, 
416                     environment_setup = self.environment_setup)
417                
418         # Wait for the forwarder to be ready, otherwise nobody
419         # will be able to connect to it
420         helo = self._process.stderr.readline()
421         if helo != 'FORWARDER_READY.\n':
422             raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
423                     helo + self._process.stderr.read())
424         
425     def send_msg(self, msg):
426         encoded = base64.b64encode(msg)
427         data = "%s\n" % encoded
428         
429         try:
430             self._process.stdin.write(data)
431         except (IOError, ValueError):
432             # dead process, poll it to un-zombify
433             self._process.poll()
434             
435             # try again after reconnect
436             # If it fails again, though, give up
437             self.connect()
438             self._process.stdin.write(data)
439
440     def send_stop(self):
441         self.send_msg(STOP_MSG)
442         self._stopped = True
443
444     def defer_reply(self, transform=None):
445         defer_entry = []
446         self._deferreds.append(defer_entry)
447         return defer.Defer(
448             functools.partial(self.read_reply, defer_entry, transform)
449         )
450         
451     def _read_reply(self):
452         data = self._process.stdout.readline()
453         encoded = data.rstrip() 
454         if not encoded:
455             # empty == eof == dead process, poll it to un-zombify
456             self._process.poll()
457             
458             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
459         return base64.b64decode(encoded)
460     
461     def read_reply(self, which=None, transform=None):
462         # Test to see if someone did it already
463         if which is not None and len(which):
464             # Ok, they did it...
465             # ...just return the deferred value
466             if transform:
467                 return transform(which[0])
468             else:
469                 return which[0]
470         
471         # Process all deferreds until the one we're looking for
472         # or until the queue is empty
473         while self._deferreds:
474             try:
475                 deferred = self._deferreds.popleft()
476             except IndexError:
477                 # emptied
478                 break
479             
480             deferred.append(self._read_reply())
481             if deferred is which:
482                 # We reached the one we were looking for
483                 if transform:
484                     return transform(deferred[0])
485                 else:
486                     return deferred[0]
487         
488         if which is None:
489             # They've requested a synchronous read
490             if transform:
491                 return transform(self._read_reply())
492             else:
493                 return self._read_reply()
494
495 def _make_server_key_args(server_key, host, port, args):
496     """ 
497     Returns a reference to the created temporary file, and adds the
498     corresponding arguments to the given argument list.
499     
500     Make sure to hold onto it until the process is done with the file
501     """
502     if port is not None:
503         host = '%s:%s' % (host,port)
504     # Create a temporary server key file
505     tmp_known_hosts = tempfile.NamedTemporaryFile()
506     
507     # Add the intended host key
508     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
509     
510     # If we're not in strict mode, add user-configured keys
511     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
512         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
513         if os.access(user_hosts_path, os.R_OK):
514             f = open(user_hosts_path, "r")
515             tmp_known_hosts.write(f.read())
516             f.close()
517         
518     tmp_known_hosts.flush()
519     
520     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
521     
522     return tmp_known_hosts
523
524 def popen_ssh_command(command, host, port, user, agent, 
525         stdin="", 
526         ident_key = None,
527         server_key = None,
528         tty = False,
529         timeout = None,
530         retry = 0,
531         err_on_timeout = True):
532     """
533     Executes a remote commands, returns ((stdout,stderr),process)
534     """
535     if TRACE:
536         print "ssh", host, command
537     
538     tmp_known_hosts = None
539     args = ['ssh',
540             # Don't bother with localhost. Makes test easier
541             '-o', 'NoHostAuthenticationForLocalhost=yes',
542             '-l', user, host]
543     if agent:
544         args.append('-A')
545     if port:
546         args.append('-p%d' % port)
547     if ident_key:
548         args.extend(('-i', ident_key))
549     if tty:
550         args.append('-t')
551     if server_key:
552         # Create a temporary server key file
553         tmp_known_hosts = _make_server_key_args(
554             server_key, host, port, args)
555     args.append(command)
556
557     while 1:
558         # connects to the remote host and starts a remote connection
559         proc = subprocess.Popen(args, 
560                 stdout = subprocess.PIPE,
561                 stdin = subprocess.PIPE, 
562                 stderr = subprocess.PIPE)
563         
564         # attach tempfile object to the process, to make sure the file stays
565         # alive until the process is finished with it
566         proc._known_hosts = tmp_known_hosts
567         
568         try:
569             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
570             break
571         except RuntimeError,e:
572             if retry <= 0:
573                 raise
574             if TRACE:
575                 print " timedout -> ", e.args
576             retry -= 1
577         
578     if TRACE:
579         print " -> ", out, err
580
581     return ((out, err), proc)
582
583 def popen_scp(source, dest, 
584         port = None, 
585         agent = None, 
586         recursive = False,
587         ident_key = None,
588         server_key = None):
589     """
590     Copies from/to remote sites.
591     
592     Source and destination should have the user and host encoded
593     as per scp specs.
594     
595     If source is a file object, a special mode will be used to
596     create the remote file with the same contents.
597     
598     If dest is a file object, the remote file (source) will be
599     read and written into dest.
600     
601     In these modes, recursive cannot be True.
602     
603     Source can be a list of files to copy to a single destination,
604     in which case it is advised that the destination be a folder.
605     """
606     
607     if TRACE:
608         print "scp", source, dest
609     
610     if isinstance(source, file) and source.tell() == 0:
611         source = source.name
612     elif hasattr(source, 'read'):
613         tmp = tempfile.NamedTemporaryFile()
614         while True:
615             buf = source.read(65536)
616             if buf:
617                 tmp.write(buf)
618             else:
619                 break
620         tmp.seek(0)
621         source = tmp.name
622     
623     if isinstance(source, file) or isinstance(dest, file) \
624             or hasattr(source, 'read')  or hasattr(dest, 'write'):
625         assert not recursive
626         
627         # Parse source/destination as <user>@<server>:<path>
628         if isinstance(dest, basestring) and ':' in dest:
629             remspec, path = dest.split(':',1)
630         elif isinstance(source, basestring) and ':' in source:
631             remspec, path = source.split(':',1)
632         else:
633             raise ValueError, "Both endpoints cannot be local"
634         user,host = remspec.rsplit('@',1)
635         tmp_known_hosts = None
636         
637         args = ['ssh', '-l', user, '-C',
638                 # Don't bother with localhost. Makes test easier
639                 '-o', 'NoHostAuthenticationForLocalhost=yes',
640                 host ]
641         if port:
642             args.append('-P%d' % port)
643         if ident_key:
644             args.extend(('-i', ident_key))
645         if server_key:
646             # Create a temporary server key file
647             tmp_known_hosts = _make_server_key_args(
648                 server_key, host, port, args)
649         
650         if isinstance(source, file) or hasattr(source, 'read'):
651             args.append('cat > %s' % (shell_escape(path),))
652         elif isinstance(dest, file) or hasattr(dest, 'write'):
653             args.append('cat %s' % (shell_escape(path),))
654         else:
655             raise AssertionError, "Unreachable code reached! :-Q"
656         
657         # connects to the remote host and starts a remote connection
658         if isinstance(source, file):
659             proc = subprocess.Popen(args, 
660                     stdout = open('/dev/null','w'),
661                     stderr = subprocess.PIPE,
662                     stdin = source)
663             err = proc.stderr.read()
664             proc._known_hosts = tmp_known_hosts
665             eintr_retry(proc.wait)()
666             return ((None,err), proc)
667         elif isinstance(dest, file):
668             proc = subprocess.Popen(args, 
669                     stdout = open('/dev/null','w'),
670                     stderr = subprocess.PIPE,
671                     stdin = source)
672             err = proc.stderr.read()
673             proc._known_hosts = tmp_known_hosts
674             eintr_retry(proc.wait)()
675             return ((None,err), proc)
676         elif hasattr(source, 'read'):
677             # file-like (but not file) source
678             proc = subprocess.Popen(args, 
679                     stdout = open('/dev/null','w'),
680                     stderr = subprocess.PIPE,
681                     stdin = subprocess.PIPE)
682             
683             buf = None
684             err = []
685             while True:
686                 if not buf:
687                     buf = source.read(4096)
688                 if not buf:
689                     #EOF
690                     break
691                 
692                 rdrdy, wrdy, broken = select.select(
693                     [proc.stderr],
694                     [proc.stdin],
695                     [proc.stderr,proc.stdin])
696                 
697                 if proc.stderr in rdrdy:
698                     # use os.read for fully unbuffered behavior
699                     err.append(os.read(proc.stderr.fileno(), 4096))
700                 
701                 if proc.stdin in wrdy:
702                     proc.stdin.write(buf)
703                     buf = None
704                 
705                 if broken:
706                     break
707             proc.stdin.close()
708             err.append(proc.stderr.read())
709                 
710             proc._known_hosts = tmp_known_hosts
711             eintr_retry(proc.wait)()
712             return ((None,''.join(err)), proc)
713         elif hasattr(dest, 'write'):
714             # file-like (but not file) dest
715             proc = subprocess.Popen(args, 
716                     stdout = subprocess.PIPE,
717                     stderr = subprocess.PIPE,
718                     stdin = open('/dev/null','w'))
719             
720             buf = None
721             err = []
722             while True:
723                 rdrdy, wrdy, broken = select.select(
724                     [proc.stderr, proc.stdout],
725                     [],
726                     [proc.stderr, proc.stdout])
727                 
728                 if proc.stderr in rdrdy:
729                     # use os.read for fully unbuffered behavior
730                     err.append(os.read(proc.stderr.fileno(), 4096))
731                 
732                 if proc.stdout in rdrdy:
733                     # use os.read for fully unbuffered behavior
734                     buf = os.read(proc.stdout.fileno(), 4096)
735                     dest.write(buf)
736                     
737                     if not buf:
738                         #EOF
739                         break
740                 
741                 if broken:
742                     break
743             err.append(proc.stderr.read())
744                 
745             proc._known_hosts = tmp_known_hosts
746             eintr_retry(proc.wait)()
747             return ((None,''.join(err)), proc)
748         else:
749             raise AssertionError, "Unreachable code reached! :-Q"
750     else:
751         # Parse destination as <user>@<server>:<path>
752         if isinstance(dest, basestring) and ':' in dest:
753             remspec, path = dest.split(':',1)
754         elif isinstance(source, basestring) and ':' in source:
755             remspec, path = source.split(':',1)
756         else:
757             raise ValueError, "Both endpoints cannot be local"
758         user,host = remspec.rsplit('@',1)
759         
760         # plain scp
761         tmp_known_hosts = None
762         args = ['scp', '-q', '-p', '-C',
763                 # Don't bother with localhost. Makes test easier
764                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
765         if port:
766             args.append('-P%d' % port)
767         if recursive:
768             args.append('-r')
769         if ident_key:
770             args.extend(('-i', ident_key))
771         if server_key:
772             # Create a temporary server key file
773             tmp_known_hosts = _make_server_key_args(
774                 server_key, host, port, args)
775         if isinstance(source,list):
776             args.extend(source)
777         else:
778             args.append(source)
779         args.append(dest)
780
781         # connects to the remote host and starts a remote connection
782         proc = subprocess.Popen(args, 
783                 stdout = subprocess.PIPE,
784                 stdin = subprocess.PIPE, 
785                 stderr = subprocess.PIPE)
786         proc._known_hosts = tmp_known_hosts
787         
788         comm = proc.communicate()
789         eintr_retry(proc.wait)()
790         return (comm, proc)
791
792 def decode_and_execute():
793     # The python code we want to execute might have characters that 
794     # are not compatible with the 'inline' mode we are using. To avoid
795     # problems we receive the encoded python code in base64 as a input 
796     # stream and decode it for execution.
797     import base64, os
798     cmd = ""
799     while True:
800         cmd += os.read(0, 1)# one byte from stdin
801         if cmd[-1] == "\n": 
802             break
803     cmd = base64.b64decode(cmd)
804     # Uncomment for debug
805     #os.write(2, "Executing python code: %s\n" % cmd)
806     os.write(1, "OK\n") # send a sync message
807     exec(cmd)
808
809 def popen_python(python_code, 
810         communication = DC.ACCESS_LOCAL,
811         host = None, 
812         port = None, 
813         user = None, 
814         agent = False, 
815         python_path = None,
816         ident_key = None,
817         server_key = None,
818         tty = False,
819         sudo = False, 
820         environment_setup = ""):
821
822
823     shell = False
824     cmd = ""
825     if python_path:
826         python_path.replace("'", r"'\''")
827         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
828         cmd += " ; "
829     if environment_setup:
830         cmd += environment_setup
831         cmd += " ; "
832     # Uncomment for debug (to run everything under strace)
833     # We had to verify if strace works (cannot nest them)
834     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
835     #cmd += "$CMD "
836     #cmd += "strace -f -tt -s 200 -o strace$$.out "
837     cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'"
838
839     if communication == DC.ACCESS_SSH:
840         tmp_known_hosts = None
841         args = ['ssh',
842                 # Don't bother with localhost. Makes test easier
843                 '-o', 'NoHostAuthenticationForLocalhost=yes',
844                 '-l', user, host]
845         if agent:
846             args.append('-A')
847         if port:
848             args.append('-p%d' % port)
849         if ident_key:
850             args.extend(('-i', ident_key))
851         if tty:
852             args.append('-t')
853         if server_key:
854             # Create a temporary server key file
855             tmp_known_hosts = _make_server_key_args(
856                 server_key, host, port, args)
857         args.append(cmd)
858     else:
859         args = [cmd]
860         shell = True
861
862     # connects to the remote host and starts a remote
863     proc = subprocess.Popen(args,
864             shell = shell, 
865             stdout = subprocess.PIPE,
866             stdin = subprocess.PIPE, 
867             stderr = subprocess.PIPE)
868
869     if communication == DC.ACCESS_SSH:
870         proc._known_hosts = tmp_known_hosts
871
872     # send the command to execute
873     os.write(proc.stdin.fileno(),
874             base64.b64encode(python_code) + "\n")
875     msg = os.read(proc.stdout.fileno(), 3)
876     if msg != "OK\n":
877         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
878             msg, proc.stdout.read(), proc.stderr.read())
879
880     return proc
881
882 # POSIX
883 def _communicate(self, input, timeout=None, err_on_timeout=True):
884     read_set = []
885     write_set = []
886     stdout = None # Return
887     stderr = None # Return
888     
889     killed = False
890     
891     if timeout is not None:
892         timelimit = time.time() + timeout
893         killtime = timelimit + 4
894         bailtime = timelimit + 4
895
896     if self.stdin:
897         # Flush stdio buffer.  This might block, if the user has
898         # been writing to .stdin in an uncontrolled fashion.
899         self.stdin.flush()
900         if input:
901             write_set.append(self.stdin)
902         else:
903             self.stdin.close()
904     if self.stdout:
905         read_set.append(self.stdout)
906         stdout = []
907     if self.stderr:
908         read_set.append(self.stderr)
909         stderr = []
910
911     input_offset = 0
912     while read_set or write_set:
913         if timeout is not None:
914             curtime = time.time()
915             if timeout is None or curtime > timelimit:
916                 if curtime > bailtime:
917                     break
918                 elif curtime > killtime:
919                     signum = signal.SIGKILL
920                 else:
921                     signum = signal.SIGTERM
922                 # Lets kill it
923                 os.kill(self.pid, signum)
924                 select_timeout = 0.5
925             else:
926                 select_timeout = timelimit - curtime + 0.1
927         else:
928             select_timeout = None
929             
930         try:
931             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
932         except select.error,e:
933             if e[0] != 4:
934                 raise
935             else:
936                 continue
937
938         if self.stdin in wlist:
939             # When select has indicated that the file is writable,
940             # we can write up to PIPE_BUF bytes without risk
941             # blocking.  POSIX defines PIPE_BUF >= 512
942             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
943             input_offset += bytes_written
944             if input_offset >= len(input):
945                 self.stdin.close()
946                 write_set.remove(self.stdin)
947
948         if self.stdout in rlist:
949             data = os.read(self.stdout.fileno(), 1024)
950             if data == "":
951                 self.stdout.close()
952                 read_set.remove(self.stdout)
953             stdout.append(data)
954
955         if self.stderr in rlist:
956             data = os.read(self.stderr.fileno(), 1024)
957             if data == "":
958                 self.stderr.close()
959                 read_set.remove(self.stderr)
960             stderr.append(data)
961     
962     # All data exchanged.  Translate lists into strings.
963     if stdout is not None:
964         stdout = ''.join(stdout)
965     if stderr is not None:
966         stderr = ''.join(stderr)
967
968     # Translate newlines, if requested.  We cannot let the file
969     # object do the translation: It is based on stdio, which is
970     # impossible to combine with select (unless forcing no
971     # buffering).
972     if self.universal_newlines and hasattr(file, 'newlines'):
973         if stdout:
974             stdout = self._translate_newlines(stdout)
975         if stderr:
976             stderr = self._translate_newlines(stderr)
977
978     if killed and err_on_timeout:
979         errcode = self.poll()
980         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
981     else:
982         if killed:
983             self.poll()
984         else:
985             self.wait()
986         return (stdout, stderr)
987