Tons of SSH improvements:
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if self._root_dir not in [".", ""] and os.path.exists(root) \
117                 and self._clean_root:
118             shutil.rmtree(root)
119         if not os.path.exists(root):
120             os.makedirs(root, 0755)
121
122         pid1 = os.fork()
123         if pid1 > 0:
124             os.close(w)
125             while True:
126                 try:
127                     os.read(r, 1)
128                 except OSError, e: # pragma: no cover
129                     if e.errno == errno.EINTR:
130                         continue
131                     else:
132                         raise
133                 break
134             os.close(r)
135             # os.waitpid avoids leaving a <defunc> (zombie) process
136             st = os.waitpid(pid1, 0)[1]
137             if st:
138                 raise RuntimeError("Daemonization failed")
139             # return 0 to inform the caller method that this is not the 
140             # daemonized process
141             return 0
142         os.close(r)
143
144         # Decouple from parent environment.
145         os.chdir(self._root_dir)
146         os.umask(0)
147         os.setsid()
148
149         # fork 2
150         pid2 = os.fork()
151         if pid2 > 0:
152             # see ref: "os._exit(0)"
153             os._exit(0)
154
155         # close all open file descriptors.
156         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157         if (max_fd == resource.RLIM_INFINITY):
158             max_fd = MAX_FD
159         for fd in range(3, max_fd):
160             if fd != w:
161                 try:
162                     os.close(fd)
163                 except OSError:
164                     pass
165
166         # Redirect standard file descriptors.
167         stdin = open(DEV_NULL, "r")
168         stderr = stdout = open(STD_ERR, "a", 0)
169         os.dup2(stdin.fileno(), sys.stdin.fileno())
170         # NOTE: sys.stdout.write will still be buffered, even if the file
171         # was opened with 0 buffer
172         os.dup2(stdout.fileno(), sys.stdout.fileno())
173         os.dup2(stderr.fileno(), sys.stderr.fileno())
174         
175         # setup environment
176         if self._environment_setup:
177             # parse environment variables and pass to child process
178             # do it by executing shell commands, in case there's some heavy setup involved
179             envproc = subprocess.Popen(
180                 [ "bash", "-c", 
181                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182                         ( self._environment_setup, ) ],
183                 stdin = subprocess.PIPE, 
184                 stdout = subprocess.PIPE,
185                 stderr = subprocess.PIPE
186             )
187             out,err = envproc.communicate()
188
189             # parse new environment
190             if out:
191                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192             
193                 # apply to current environment
194                 for name, value in environment.iteritems():
195                     os.environ[name] = value
196                 
197                 # apply pythonpath
198                 if 'PYTHONPATH' in environment:
199                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
200
201         # create control socket
202         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203         try:
204             self._ctrl_sock.bind(CTRL_SOCK)
205         except socket.error:
206             # Address in use, check pidfile
207             pid = None
208             try:
209                 pidfile = open(CTRL_PID, "r")
210                 pid = pidfile.read()
211                 pidfile.close()
212                 pid = int(pid)
213             except:
214                 # no pidfile
215                 pass
216             
217             if pid is not None:
218                 # Check process liveliness
219                 if not os.path.exists("/proc/%d" % (pid,)):
220                     # Ok, it's dead, clean the socket
221                     os.remove(CTRL_SOCK)
222             
223             # try again
224             self._ctrl_sock.bind(CTRL_SOCK)
225             
226         self._ctrl_sock.listen(0)
227         
228         # Save pidfile
229         pidfile = open(CTRL_PID, "w")
230         pidfile.write(str(os.getpid()))
231         pidfile.close()
232
233         # let the parent process know that the daemonization is finished
234         os.write(w, "\n")
235         os.close(w)
236         return 1
237
238     def post_daemonize(self):
239         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240         # QT, for some strange reason, redefines the SIGCHILD handler to write
241         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242         # Server dameonization closes all file descriptors from fileno '3',
243         # but the overloaded handler (inherited by the forked process) will
244         # keep trying to write the \0 to fileno 'x', which might have been reused 
245         # after closing, for other operations. This is bad bad bad when fileno 'x'
246         # is in use for communication pouroses, because unexpected \0 start
247         # appearing in the communication messages... this is exactly what happens 
248         # when using netns in daemonized form. Thus, be have no other alternative than
249         # restoring the SIGCHLD handler to the default here.
250         import signal
251         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
252
253     def loop(self):
254         while not self._stop:
255             conn, addr = self._ctrl_sock.accept()
256             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257             conn.settimeout(5)
258             while not self._stop:
259                 try:
260                     msg = self.recv_msg(conn)
261                 except socket.timeout, e:
262                     #self.log_error("SERVER recv_msg: connection timedout ")
263                     continue
264                 
265                 if not msg:
266                     self.log_error("CONNECTION LOST")
267                     break
268                     
269                 if msg == STOP_MSG:
270                     self._stop = True
271                     reply = self.stop_action()
272                 else:
273                     reply = self.reply_action(msg)
274                 
275                 try:
276                     self.send_reply(conn, reply)
277                 except socket.error:
278                     self.log_error()
279                     self.log_error("NOTICE: Awaiting for reconnection")
280                     break
281             try:
282                 conn.close()
283             except:
284                 # Doesn't matter
285                 self.log_error()
286
287     def recv_msg(self, conn):
288         data = [self._rdbuf]
289         chunk = data[0]
290         while '\n' not in chunk:
291             try:
292                 chunk = conn.recv(1024)
293             except (OSError, socket.error), e:
294                 if e[0] != errno.EINTR:
295                     raise
296                 else:
297                     continue
298             if chunk:
299                 data.append(chunk)
300             else:
301                 # empty chunk = EOF
302                 break
303         data = ''.join(data).split('\n',1)
304         while len(data) < 2:
305             data.append('')
306         data, self._rdbuf = data
307         
308         decoded = base64.b64decode(data)
309         return decoded.rstrip()
310
311     def send_reply(self, conn, reply):
312         encoded = base64.b64encode(reply)
313         conn.send("%s\n" % encoded)
314        
315     def cleanup(self):
316         try:
317             self._ctrl_sock.close()
318             os.remove(CTRL_SOCK)
319         except:
320             self.log_error()
321
322     def stop_action(self):
323         return "Stopping server"
324
325     def reply_action(self, msg):
326         return "Reply to: %s" % msg
327
328     def log_error(self, text = None, context = ''):
329         if text == None:
330             text = traceback.format_exc()
331         date = time.strftime("%Y-%m-%d %H:%M:%S")
332         if context:
333             context = " (%s)" % (context,)
334         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
335         return text
336
337     def log_debug(self, text):
338         if self._log_level == DC.DEBUG_LEVEL:
339             date = time.strftime("%Y-%m-%d %H:%M:%S")
340             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341
342 class Forwarder(object):
343     def __init__(self, root_dir = "."):
344         self._ctrl_sock = None
345         self._root_dir = root_dir
346         self._stop = False
347         self._rdbuf = ""
348
349     def forward(self):
350         self.connect()
351         print >>sys.stderr, "FORWARDER_READY."
352         while not self._stop:
353             data = self.read_data()
354             if not data:
355                 # Connection to client lost
356                 break
357             self.send_to_server(data)
358             
359             data = self.recv_from_server()
360             if not data:
361                 # Connection to server lost
362                 raise IOError, "Connection to server lost while "\
363                     "expecting response"
364             self.write_data(data)
365         self.disconnect()
366
367     def read_data(self):
368         return sys.stdin.readline()
369
370     def write_data(self, data):
371         sys.stdout.write(data)
372         # sys.stdout.write is buffered, this is why we need to do a flush()
373         sys.stdout.flush()
374
375     def send_to_server(self, data):
376         try:
377             self._ctrl_sock.send(data)
378         except (IOError, socket.error), e:
379             if e[0] == errno.EPIPE:
380                 self.connect()
381                 self._ctrl_sock.send(data)
382             else:
383                 raise e
384         encoded = data.rstrip() 
385         msg = base64.b64decode(encoded)
386         if msg == STOP_MSG:
387             self._stop = True
388
389     def recv_from_server(self):
390         data = [self._rdbuf]
391         chunk = data[0]
392         while '\n' not in chunk:
393             try:
394                 chunk = self._ctrl_sock.recv(1024)
395             except (OSError, socket.error), e:
396                 if e[0] != errno.EINTR:
397                     raise
398                 continue
399             if chunk:
400                 data.append(chunk)
401             else:
402                 # empty chunk = EOF
403                 break
404         data = ''.join(data).split('\n',1)
405         while len(data) < 2:
406             data.append('')
407         data, self._rdbuf = data
408         
409         return data+'\n'
410  
411     def connect(self):
412         self.disconnect()
413         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415         self._ctrl_sock.connect(sock_addr)
416
417     def disconnect(self):
418         try:
419             self._ctrl_sock.close()
420         except:
421             pass
422
423 class Client(object):
424     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
425             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426             environment_setup = ""):
427         self.root_dir = root_dir
428         self.addr = (host, port)
429         self.user = user
430         self.agent = agent
431         self.sudo = sudo
432         self.communication = communication
433         self.environment_setup = environment_setup
434         self._stopped = False
435         self._deferreds = collections.deque()
436         self.connect()
437     
438     def __del__(self):
439         if self._process.poll() is None:
440             os.kill(self._process.pid, signal.SIGTERM)
441         self._process.wait()
442         
443     def connect(self):
444         root_dir = self.root_dir
445         (host, port) = self.addr
446         user = self.user
447         agent = self.agent
448         sudo = self.sudo
449         communication = self.communication
450         
451         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452                 c.forward()" % (root_dir,)
453
454         self._process = popen_python(python_code, 
455                     communication = communication,
456                     host = host, 
457                     port = port, 
458                     user = user, 
459                     agent = agent, 
460                     sudo = sudo, 
461                     environment_setup = self.environment_setup)
462                
463         # Wait for the forwarder to be ready, otherwise nobody
464         # will be able to connect to it
465         err = []
466         helo = "nope"
467         while helo:
468             helo = self._process.stderr.readline()
469             if helo == 'FORWARDER_READY.\n':
470                 break
471             err.append(helo)
472         else:
473             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474         
475     def send_msg(self, msg):
476         encoded = base64.b64encode(msg)
477         data = "%s\n" % encoded
478         
479         try:
480             self._process.stdin.write(data)
481         except (IOError, ValueError):
482             # dead process, poll it to un-zombify
483             self._process.poll()
484             
485             # try again after reconnect
486             # If it fails again, though, give up
487             self.connect()
488             self._process.stdin.write(data)
489
490     def send_stop(self):
491         self.send_msg(STOP_MSG)
492         self._stopped = True
493
494     def defer_reply(self, transform=None):
495         defer_entry = []
496         self._deferreds.append(defer_entry)
497         return defer.Defer(
498             functools.partial(self.read_reply, defer_entry, transform)
499         )
500         
501     def _read_reply(self):
502         data = self._process.stdout.readline()
503         encoded = data.rstrip() 
504         if not encoded:
505             # empty == eof == dead process, poll it to un-zombify
506             self._process.poll()
507             
508             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509         return base64.b64decode(encoded)
510     
511     def read_reply(self, which=None, transform=None):
512         # Test to see if someone did it already
513         if which is not None and len(which):
514             # Ok, they did it...
515             # ...just return the deferred value
516             if transform:
517                 return transform(which[0])
518             else:
519                 return which[0]
520         
521         # Process all deferreds until the one we're looking for
522         # or until the queue is empty
523         while self._deferreds:
524             try:
525                 deferred = self._deferreds.popleft()
526             except IndexError:
527                 # emptied
528                 break
529             
530             deferred.append(self._read_reply())
531             if deferred is which:
532                 # We reached the one we were looking for
533                 if transform:
534                     return transform(deferred[0])
535                 else:
536                     return deferred[0]
537         
538         if which is None:
539             # They've requested a synchronous read
540             if transform:
541                 return transform(self._read_reply())
542             else:
543                 return self._read_reply()
544
545 def _make_server_key_args(server_key, host, port, args):
546     """ 
547     Returns a reference to the created temporary file, and adds the
548     corresponding arguments to the given argument list.
549     
550     Make sure to hold onto it until the process is done with the file
551     """
552     if port is not None:
553         host = '%s:%s' % (host,port)
554     # Create a temporary server key file
555     tmp_known_hosts = tempfile.NamedTemporaryFile()
556     
557     # Add the intended host key
558     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559     
560     # If we're not in strict mode, add user-configured keys
561     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563         if os.access(user_hosts_path, os.R_OK):
564             f = open(user_hosts_path, "r")
565             tmp_known_hosts.write(f.read())
566             f.close()
567         
568     tmp_known_hosts.flush()
569     
570     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571     
572     return tmp_known_hosts
573
574 def popen_ssh_command(command, host, port, user, agent, 
575         stdin="", 
576         ident_key = None,
577         server_key = None,
578         tty = False,
579         timeout = None,
580         retry = 0,
581         err_on_timeout = True,
582         connect_timeout = 30):
583     """
584     Executes a remote commands, returns ((stdout,stderr),process)
585     """
586     if TRACE:
587         print "ssh", host, command
588     
589     tmp_known_hosts = None
590     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
591     args = ['ssh', '-C',
592             # Don't bother with localhost. Makes test easier
593             '-o', 'NoHostAuthenticationForLocalhost=yes',
594             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
595             '-o', 'ConnectionAttempts=3',
596             '-o', 'ServerAliveInterval=30',
597             '-o', 'TCPKeepAlive=yes',
598             '-o', 'ControlMaster=auto',
599             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
600             '-o', 'ControlPersist=60',
601             '-l', user, host]
602     if agent:
603         args.append('-A')
604     if port:
605         args.append('-p%d' % port)
606     if ident_key:
607         args.extend(('-i', ident_key))
608     if tty:
609         args.append('-t')
610     if server_key:
611         # Create a temporary server key file
612         tmp_known_hosts = _make_server_key_args(
613             server_key, host, port, args)
614     args.append(command)
615
616     for x in xrange(retry or 3):
617         # connects to the remote host and starts a remote connection
618         proc = subprocess.Popen(args, 
619                 stdout = subprocess.PIPE,
620                 stdin = subprocess.PIPE, 
621                 stderr = subprocess.PIPE)
622         
623         # attach tempfile object to the process, to make sure the file stays
624         # alive until the process is finished with it
625         proc._known_hosts = tmp_known_hosts
626         
627         try:
628             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
629             if proc.poll() and err.strip().startswith('ssh: '):
630                 # SSH error, can safely retry
631                 continue
632             break
633         except RuntimeError,e:
634             if retry <= 0:
635                 raise
636             if TRACE:
637                 print " timedout -> ", e.args
638             retry -= 1
639         
640     if TRACE:
641         print " -> ", out, err
642
643     return ((out, err), proc)
644
645 def popen_scp(source, dest, 
646         port = None, 
647         agent = None, 
648         recursive = False,
649         ident_key = None,
650         server_key = None):
651     """
652     Copies from/to remote sites.
653     
654     Source and destination should have the user and host encoded
655     as per scp specs.
656     
657     If source is a file object, a special mode will be used to
658     create the remote file with the same contents.
659     
660     If dest is a file object, the remote file (source) will be
661     read and written into dest.
662     
663     In these modes, recursive cannot be True.
664     
665     Source can be a list of files to copy to a single destination,
666     in which case it is advised that the destination be a folder.
667     """
668     
669     if TRACE:
670         print "scp", source, dest
671     
672     if isinstance(source, file) and source.tell() == 0:
673         source = source.name
674     elif hasattr(source, 'read'):
675         tmp = tempfile.NamedTemporaryFile()
676         while True:
677             buf = source.read(65536)
678             if buf:
679                 tmp.write(buf)
680             else:
681                 break
682         tmp.seek(0)
683         source = tmp.name
684     
685     if isinstance(source, file) or isinstance(dest, file) \
686             or hasattr(source, 'read')  or hasattr(dest, 'write'):
687         assert not recursive
688         
689         # Parse source/destination as <user>@<server>:<path>
690         if isinstance(dest, basestring) and ':' in dest:
691             remspec, path = dest.split(':',1)
692         elif isinstance(source, basestring) and ':' in source:
693             remspec, path = source.split(':',1)
694         else:
695             raise ValueError, "Both endpoints cannot be local"
696         user,host = remspec.rsplit('@',1)
697         tmp_known_hosts = None
698         
699         connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
700         args = ['ssh', '-l', user, '-C',
701                 # Don't bother with localhost. Makes test easier
702                 '-o', 'NoHostAuthenticationForLocalhost=yes',
703                 '-o', 'ConnectTimeout=30',
704                 '-o', 'ConnectionAttempts=3',
705                 '-o', 'ServerAliveInterval=30',
706                 '-o', 'TCPKeepAlive=yes',
707                 '-o', 'ControlMaster=auto',
708                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
709                 '-o', 'ControlPersist=60',
710                 host ]
711         if port:
712             args.append('-P%d' % port)
713         if ident_key:
714             args.extend(('-i', ident_key))
715         if server_key:
716             # Create a temporary server key file
717             tmp_known_hosts = _make_server_key_args(
718                 server_key, host, port, args)
719         
720         if isinstance(source, file) or hasattr(source, 'read'):
721             args.append('cat > %s' % (shell_escape(path),))
722         elif isinstance(dest, file) or hasattr(dest, 'write'):
723             args.append('cat %s' % (shell_escape(path),))
724         else:
725             raise AssertionError, "Unreachable code reached! :-Q"
726         
727         # connects to the remote host and starts a remote connection
728         if isinstance(source, file):
729             proc = subprocess.Popen(args, 
730                     stdout = open('/dev/null','w'),
731                     stderr = subprocess.PIPE,
732                     stdin = source)
733             err = proc.stderr.read()
734             proc._known_hosts = tmp_known_hosts
735             eintr_retry(proc.wait)()
736             return ((None,err), proc)
737         elif isinstance(dest, file):
738             proc = subprocess.Popen(args, 
739                     stdout = open('/dev/null','w'),
740                     stderr = subprocess.PIPE,
741                     stdin = source)
742             err = proc.stderr.read()
743             proc._known_hosts = tmp_known_hosts
744             eintr_retry(proc.wait)()
745             return ((None,err), proc)
746         elif hasattr(source, 'read'):
747             # file-like (but not file) source
748             proc = subprocess.Popen(args, 
749                     stdout = open('/dev/null','w'),
750                     stderr = subprocess.PIPE,
751                     stdin = subprocess.PIPE)
752             
753             buf = None
754             err = []
755             while True:
756                 if not buf:
757                     buf = source.read(4096)
758                 if not buf:
759                     #EOF
760                     break
761                 
762                 rdrdy, wrdy, broken = select.select(
763                     [proc.stderr],
764                     [proc.stdin],
765                     [proc.stderr,proc.stdin])
766                 
767                 if proc.stderr in rdrdy:
768                     # use os.read for fully unbuffered behavior
769                     err.append(os.read(proc.stderr.fileno(), 4096))
770                 
771                 if proc.stdin in wrdy:
772                     proc.stdin.write(buf)
773                     buf = None
774                 
775                 if broken:
776                     break
777             proc.stdin.close()
778             err.append(proc.stderr.read())
779                 
780             proc._known_hosts = tmp_known_hosts
781             eintr_retry(proc.wait)()
782             return ((None,''.join(err)), proc)
783         elif hasattr(dest, 'write'):
784             # file-like (but not file) dest
785             proc = subprocess.Popen(args, 
786                     stdout = subprocess.PIPE,
787                     stderr = subprocess.PIPE,
788                     stdin = open('/dev/null','w'))
789             
790             buf = None
791             err = []
792             while True:
793                 rdrdy, wrdy, broken = select.select(
794                     [proc.stderr, proc.stdout],
795                     [],
796                     [proc.stderr, proc.stdout])
797                 
798                 if proc.stderr in rdrdy:
799                     # use os.read for fully unbuffered behavior
800                     err.append(os.read(proc.stderr.fileno(), 4096))
801                 
802                 if proc.stdout in rdrdy:
803                     # use os.read for fully unbuffered behavior
804                     buf = os.read(proc.stdout.fileno(), 4096)
805                     dest.write(buf)
806                     
807                     if not buf:
808                         #EOF
809                         break
810                 
811                 if broken:
812                     break
813             err.append(proc.stderr.read())
814                 
815             proc._known_hosts = tmp_known_hosts
816             eintr_retry(proc.wait)()
817             return ((None,''.join(err)), proc)
818         else:
819             raise AssertionError, "Unreachable code reached! :-Q"
820     else:
821         # Parse destination as <user>@<server>:<path>
822         if isinstance(dest, basestring) and ':' in dest:
823             remspec, path = dest.split(':',1)
824         elif isinstance(source, basestring) and ':' in source:
825             remspec, path = source.split(':',1)
826         else:
827             raise ValueError, "Both endpoints cannot be local"
828         user,host = remspec.rsplit('@',1)
829         
830         # plain scp
831         tmp_known_hosts = None
832         args = ['scp', '-q', '-p', '-C',
833                 # Don't bother with localhost. Makes test easier
834                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
835         if port:
836             args.append('-P%d' % port)
837         if recursive:
838             args.append('-r')
839         if ident_key:
840             args.extend(('-i', ident_key))
841         if server_key:
842             # Create a temporary server key file
843             tmp_known_hosts = _make_server_key_args(
844                 server_key, host, port, args)
845         if isinstance(source,list):
846             args.extend(source)
847         else:
848             args.append(source)
849         args.append(dest)
850
851         # connects to the remote host and starts a remote connection
852         proc = subprocess.Popen(args, 
853                 stdout = subprocess.PIPE,
854                 stdin = subprocess.PIPE, 
855                 stderr = subprocess.PIPE)
856         proc._known_hosts = tmp_known_hosts
857         
858         comm = proc.communicate()
859         eintr_retry(proc.wait)()
860         return (comm, proc)
861
862 def decode_and_execute():
863     # The python code we want to execute might have characters that 
864     # are not compatible with the 'inline' mode we are using. To avoid
865     # problems we receive the encoded python code in base64 as a input 
866     # stream and decode it for execution.
867     import base64, os
868     cmd = ""
869     while True:
870         try:
871             cmd += os.read(0, 1)# one byte from stdin
872         except OSError, e:            
873             if e.errno == errno.EINTR:
874                 continue
875             else:
876                 raise
877         if cmd[-1] == "\n": 
878             break
879     cmd = base64.b64decode(cmd)
880     # Uncomment for debug
881     #os.write(2, "Executing python code: %s\n" % cmd)
882     os.write(1, "OK\n") # send a sync message
883     exec(cmd)
884
885 def popen_python(python_code, 
886         communication = DC.ACCESS_LOCAL,
887         host = None, 
888         port = None, 
889         user = None, 
890         agent = False, 
891         python_path = None,
892         ident_key = None,
893         server_key = None,
894         tty = False,
895         sudo = False, 
896         environment_setup = ""):
897
898     cmd = ""
899     if python_path:
900         python_path.replace("'", r"'\''")
901         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
902         cmd += " ; "
903     if environment_setup:
904         cmd += environment_setup
905         cmd += " ; "
906     # Uncomment for debug (to run everything under strace)
907     # We had to verify if strace works (cannot nest them)
908     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
909     #cmd += "$CMD "
910     #cmd += "strace -f -tt -s 200 -o strace$$.out "
911     import nepi
912     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
913         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
914     )
915
916     if sudo:
917         if ';' in cmd:
918             cmd = "sudo bash -c " + shell_escape(cmd)
919         else:
920             cmd = "sudo " + cmd
921
922     if communication == DC.ACCESS_SSH:
923         tmp_known_hosts = None
924         args = ['ssh', '-C',
925                 # Don't bother with localhost. Makes test easier
926                 '-o', 'NoHostAuthenticationForLocalhost=yes',
927                 '-o', 'ConnectionAttempts=3',
928                 '-o', 'ServerAliveInterval=30',
929                 '-o', 'TCPKeepAlive=yes',
930                 '-l', user, host]
931         if agent:
932             args.append('-A')
933         if port:
934             args.append('-p%d' % port)
935         if ident_key:
936             args.extend(('-i', ident_key))
937         if tty:
938             args.append('-t')
939         if server_key:
940             # Create a temporary server key file
941             tmp_known_hosts = _make_server_key_args(
942                 server_key, host, port, args)
943         args.append(cmd)
944     else:
945         args = [ "/bin/bash", "-c", cmd ]
946
947     # connects to the remote host and starts a remote
948     proc = subprocess.Popen(args,
949             shell = False, 
950             stdout = subprocess.PIPE,
951             stdin = subprocess.PIPE, 
952             stderr = subprocess.PIPE)
953
954     if communication == DC.ACCESS_SSH:
955         proc._known_hosts = tmp_known_hosts
956
957     # send the command to execute
958     os.write(proc.stdin.fileno(),
959             base64.b64encode(python_code) + "\n")
960  
961     while True: 
962         try:
963             msg = os.read(proc.stdout.fileno(), 3)
964             break
965         except OSError, e:            
966             if e.errno == errno.EINTR:
967                 continue
968             else:
969                 raise
970     
971     if msg != "OK\n":
972         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
973             msg, proc.stdout.read(), proc.stderr.read())
974
975     return proc
976
977 # POSIX
978 def _communicate(self, input, timeout=None, err_on_timeout=True):
979     read_set = []
980     write_set = []
981     stdout = None # Return
982     stderr = None # Return
983     
984     killed = False
985     
986     if timeout is not None:
987         timelimit = time.time() + timeout
988         killtime = timelimit + 4
989         bailtime = timelimit + 4
990
991     if self.stdin:
992         # Flush stdio buffer.  This might block, if the user has
993         # been writing to .stdin in an uncontrolled fashion.
994         self.stdin.flush()
995         if input:
996             write_set.append(self.stdin)
997         else:
998             self.stdin.close()
999     if self.stdout:
1000         read_set.append(self.stdout)
1001         stdout = []
1002     if self.stderr:
1003         read_set.append(self.stderr)
1004         stderr = []
1005
1006     input_offset = 0
1007     while read_set or write_set:
1008         if timeout is not None:
1009             curtime = time.time()
1010             if timeout is None or curtime > timelimit:
1011                 if curtime > bailtime:
1012                     break
1013                 elif curtime > killtime:
1014                     signum = signal.SIGKILL
1015                 else:
1016                     signum = signal.SIGTERM
1017                 # Lets kill it
1018                 os.kill(self.pid, signum)
1019                 select_timeout = 0.5
1020             else:
1021                 select_timeout = timelimit - curtime + 0.1
1022         else:
1023             select_timeout = 1.0
1024         
1025         if select_timeout > 1.0:
1026             select_timeout = 1.0
1027             
1028         try:
1029             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1030         except select.error,e:
1031             if e[0] != 4:
1032                 raise
1033             else:
1034                 continue
1035         
1036         if not rlist and not wlist and not xlist and self.poll() is not None:
1037             # timeout and process exited, say bye
1038             break
1039
1040         if self.stdin in wlist:
1041             # When select has indicated that the file is writable,
1042             # we can write up to PIPE_BUF bytes without risk
1043             # blocking.  POSIX defines PIPE_BUF >= 512
1044             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1045             input_offset += bytes_written
1046             if input_offset >= len(input):
1047                 self.stdin.close()
1048                 write_set.remove(self.stdin)
1049
1050         if self.stdout in rlist:
1051             data = os.read(self.stdout.fileno(), 1024)
1052             if data == "":
1053                 self.stdout.close()
1054                 read_set.remove(self.stdout)
1055             stdout.append(data)
1056
1057         if self.stderr in rlist:
1058             data = os.read(self.stderr.fileno(), 1024)
1059             if data == "":
1060                 self.stderr.close()
1061                 read_set.remove(self.stderr)
1062             stderr.append(data)
1063     
1064     # All data exchanged.  Translate lists into strings.
1065     if stdout is not None:
1066         stdout = ''.join(stdout)
1067     if stderr is not None:
1068         stderr = ''.join(stderr)
1069
1070     # Translate newlines, if requested.  We cannot let the file
1071     # object do the translation: It is based on stdio, which is
1072     # impossible to combine with select (unless forcing no
1073     # buffering).
1074     if self.universal_newlines and hasattr(file, 'newlines'):
1075         if stdout:
1076             stdout = self._translate_newlines(stdout)
1077         if stderr:
1078             stderr = self._translate_newlines(stderr)
1079
1080     if killed and err_on_timeout:
1081         errcode = self.poll()
1082         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1083     else:
1084         if killed:
1085             self.poll()
1086         else:
1087             self.wait()
1088         return (stdout, stderr)
1089