Enable usage of persistent connections with popen_scp (limited though)
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25 import hashlib
26
27 CTRL_SOCK = "ctrl.sock"
28 CTRL_PID = "ctrl.pid"
29 STD_ERR = "stderr.log"
30 MAX_FD = 1024
31
32 STOP_MSG = "STOP"
33
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 OPENSSH_HAS_PERSIST = None
37
38 if hasattr(os, "devnull"):
39     DEV_NULL = os.devnull
40 else:
41     DEV_NULL = "/dev/null"
42
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44
45 def openssh_has_persist():
46     global OPENSSH_HAS_PERSIST
47     if OPENSSH_HAS_PERSIST is None:
48         proc = subprocess.Popen(["ssh","-v"],
49             stdout = subprocess.PIPE,
50             stderr = subprocess.STDOUT,
51             stdin = open("/dev/null","r") )
52         out,err = proc.communicate()
53         proc.wait()
54         
55         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56         OPENSSH_HAS_PERSIST = bool(vre.match(out))
57     return OPENSSH_HAS_PERSIST
58
59 def shell_escape(s):
60     """ Escapes strings so that they are safe to use as command-line arguments """
61     if SHELL_SAFE.match(s):
62         # safe string - no escaping needed
63         return s
64     else:
65         # unsafe string - escape
66         def escp(c):
67             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
68                 return c
69             else:
70                 return "'$'\\x%02x''" % (ord(c),)
71         s = ''.join(map(escp,s))
72         return "'%s'" % (s,)
73
74 def eintr_retry(func):
75     import functools
76     @functools.wraps(func)
77     def rv(*p, **kw):
78         retry = kw.pop("_retry", False)
79         for i in xrange(0 if retry else 4):
80             try:
81                 return func(*p, **kw)
82             except (select.error, socket.error), args:
83                 if args[0] == errno.EINTR:
84                     continue
85                 else:
86                     raise 
87             except OSError, e:
88                 if e.errno == errno.EINTR:
89                     continue
90                 else:
91                     raise
92         else:
93             return func(*p, **kw)
94     return rv
95
96 class Server(object):
97     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
98             environment_setup = "", clean_root = False):
99         self._root_dir = root_dir
100         self._clean_root = clean_root
101         self._stop = False
102         self._ctrl_sock = None
103         self._log_level = log_level
104         self._rdbuf = ""
105         self._environment_setup = environment_setup
106
107     def run(self):
108         try:
109             if self.daemonize():
110                 self.post_daemonize()
111                 self.loop()
112                 self.cleanup()
113                 # ref: "os._exit(0)"
114                 # can not return normally after fork beacuse no exec was done.
115                 # This means that if we don't do a os._exit(0) here the code that 
116                 # follows the call to "Server.run()" in the "caller code" will be 
117                 # executed... but by now it has already been executed after the 
118                 # first process (the one that did the first fork) returned.
119                 os._exit(0)
120         except:
121             print >>sys.stderr, "SERVER_ERROR."
122             self.log_error()
123             self.cleanup()
124             os._exit(0)
125         print >>sys.stderr, "SERVER_READY."
126
127     def daemonize(self):
128         # pipes for process synchronization
129         (r, w) = os.pipe()
130         
131         # build root folder
132         root = os.path.normpath(self._root_dir)
133         if self._root_dir not in [".", ""] and os.path.exists(root) \
134                 and self._clean_root:
135             shutil.rmtree(root)
136         if not os.path.exists(root):
137             os.makedirs(root, 0755)
138
139         pid1 = os.fork()
140         if pid1 > 0:
141             os.close(w)
142             while True:
143                 try:
144                     os.read(r, 1)
145                 except OSError, e: # pragma: no cover
146                     if e.errno == errno.EINTR:
147                         continue
148                     else:
149                         raise
150                 break
151             os.close(r)
152             # os.waitpid avoids leaving a <defunc> (zombie) process
153             st = os.waitpid(pid1, 0)[1]
154             if st:
155                 raise RuntimeError("Daemonization failed")
156             # return 0 to inform the caller method that this is not the 
157             # daemonized process
158             return 0
159         os.close(r)
160
161         # Decouple from parent environment.
162         os.chdir(self._root_dir)
163         os.umask(0)
164         os.setsid()
165
166         # fork 2
167         pid2 = os.fork()
168         if pid2 > 0:
169             # see ref: "os._exit(0)"
170             os._exit(0)
171
172         # close all open file descriptors.
173         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174         if (max_fd == resource.RLIM_INFINITY):
175             max_fd = MAX_FD
176         for fd in range(3, max_fd):
177             if fd != w:
178                 try:
179                     os.close(fd)
180                 except OSError:
181                     pass
182
183         # Redirect standard file descriptors.
184         stdin = open(DEV_NULL, "r")
185         stderr = stdout = open(STD_ERR, "a", 0)
186         os.dup2(stdin.fileno(), sys.stdin.fileno())
187         # NOTE: sys.stdout.write will still be buffered, even if the file
188         # was opened with 0 buffer
189         os.dup2(stdout.fileno(), sys.stdout.fileno())
190         os.dup2(stderr.fileno(), sys.stderr.fileno())
191         
192         # setup environment
193         if self._environment_setup:
194             # parse environment variables and pass to child process
195             # do it by executing shell commands, in case there's some heavy setup involved
196             envproc = subprocess.Popen(
197                 [ "bash", "-c", 
198                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199                         ( self._environment_setup, ) ],
200                 stdin = subprocess.PIPE, 
201                 stdout = subprocess.PIPE,
202                 stderr = subprocess.PIPE
203             )
204             out,err = envproc.communicate()
205
206             # parse new environment
207             if out:
208                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
209             
210                 # apply to current environment
211                 for name, value in environment.iteritems():
212                     os.environ[name] = value
213                 
214                 # apply pythonpath
215                 if 'PYTHONPATH' in environment:
216                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
217
218         # create control socket
219         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220         try:
221             self._ctrl_sock.bind(CTRL_SOCK)
222         except socket.error:
223             # Address in use, check pidfile
224             pid = None
225             try:
226                 pidfile = open(CTRL_PID, "r")
227                 pid = pidfile.read()
228                 pidfile.close()
229                 pid = int(pid)
230             except:
231                 # no pidfile
232                 pass
233             
234             if pid is not None:
235                 # Check process liveliness
236                 if not os.path.exists("/proc/%d" % (pid,)):
237                     # Ok, it's dead, clean the socket
238                     os.remove(CTRL_SOCK)
239             
240             # try again
241             self._ctrl_sock.bind(CTRL_SOCK)
242             
243         self._ctrl_sock.listen(0)
244         
245         # Save pidfile
246         pidfile = open(CTRL_PID, "w")
247         pidfile.write(str(os.getpid()))
248         pidfile.close()
249
250         # let the parent process know that the daemonization is finished
251         os.write(w, "\n")
252         os.close(w)
253         return 1
254
255     def post_daemonize(self):
256         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257         # QT, for some strange reason, redefines the SIGCHILD handler to write
258         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259         # Server dameonization closes all file descriptors from fileno '3',
260         # but the overloaded handler (inherited by the forked process) will
261         # keep trying to write the \0 to fileno 'x', which might have been reused 
262         # after closing, for other operations. This is bad bad bad when fileno 'x'
263         # is in use for communication pouroses, because unexpected \0 start
264         # appearing in the communication messages... this is exactly what happens 
265         # when using netns in daemonized form. Thus, be have no other alternative than
266         # restoring the SIGCHLD handler to the default here.
267         import signal
268         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
269
270     def loop(self):
271         while not self._stop:
272             conn, addr = self._ctrl_sock.accept()
273             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
274             conn.settimeout(5)
275             while not self._stop:
276                 try:
277                     msg = self.recv_msg(conn)
278                 except socket.timeout, e:
279                     #self.log_error("SERVER recv_msg: connection timedout ")
280                     continue
281                 
282                 if not msg:
283                     self.log_error("CONNECTION LOST")
284                     break
285                     
286                 if msg == STOP_MSG:
287                     self._stop = True
288                     reply = self.stop_action()
289                 else:
290                     reply = self.reply_action(msg)
291                 
292                 try:
293                     self.send_reply(conn, reply)
294                 except socket.error:
295                     self.log_error()
296                     self.log_error("NOTICE: Awaiting for reconnection")
297                     break
298             try:
299                 conn.close()
300             except:
301                 # Doesn't matter
302                 self.log_error()
303
304     def recv_msg(self, conn):
305         data = [self._rdbuf]
306         chunk = data[0]
307         while '\n' not in chunk:
308             try:
309                 chunk = conn.recv(1024)
310             except (OSError, socket.error), e:
311                 if e[0] != errno.EINTR:
312                     raise
313                 else:
314                     continue
315             if chunk:
316                 data.append(chunk)
317             else:
318                 # empty chunk = EOF
319                 break
320         data = ''.join(data).split('\n',1)
321         while len(data) < 2:
322             data.append('')
323         data, self._rdbuf = data
324         
325         decoded = base64.b64decode(data)
326         return decoded.rstrip()
327
328     def send_reply(self, conn, reply):
329         encoded = base64.b64encode(reply)
330         conn.send("%s\n" % encoded)
331        
332     def cleanup(self):
333         try:
334             self._ctrl_sock.close()
335             os.remove(CTRL_SOCK)
336         except:
337             self.log_error()
338
339     def stop_action(self):
340         return "Stopping server"
341
342     def reply_action(self, msg):
343         return "Reply to: %s" % msg
344
345     def log_error(self, text = None, context = ''):
346         if text == None:
347             text = traceback.format_exc()
348         date = time.strftime("%Y-%m-%d %H:%M:%S")
349         if context:
350             context = " (%s)" % (context,)
351         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
352         return text
353
354     def log_debug(self, text):
355         if self._log_level == DC.DEBUG_LEVEL:
356             date = time.strftime("%Y-%m-%d %H:%M:%S")
357             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
358
359 class Forwarder(object):
360     def __init__(self, root_dir = "."):
361         self._ctrl_sock = None
362         self._root_dir = root_dir
363         self._stop = False
364         self._rdbuf = ""
365
366     def forward(self):
367         self.connect()
368         print >>sys.stderr, "FORWARDER_READY."
369         while not self._stop:
370             data = self.read_data()
371             if not data:
372                 # Connection to client lost
373                 break
374             self.send_to_server(data)
375             
376             data = self.recv_from_server()
377             if not data:
378                 # Connection to server lost
379                 raise IOError, "Connection to server lost while "\
380                     "expecting response"
381             self.write_data(data)
382         self.disconnect()
383
384     def read_data(self):
385         return sys.stdin.readline()
386
387     def write_data(self, data):
388         sys.stdout.write(data)
389         # sys.stdout.write is buffered, this is why we need to do a flush()
390         sys.stdout.flush()
391
392     def send_to_server(self, data):
393         try:
394             self._ctrl_sock.send(data)
395         except (IOError, socket.error), e:
396             if e[0] == errno.EPIPE:
397                 self.connect()
398                 self._ctrl_sock.send(data)
399             else:
400                 raise e
401         encoded = data.rstrip() 
402         msg = base64.b64decode(encoded)
403         if msg == STOP_MSG:
404             self._stop = True
405
406     def recv_from_server(self):
407         data = [self._rdbuf]
408         chunk = data[0]
409         while '\n' not in chunk:
410             try:
411                 chunk = self._ctrl_sock.recv(1024)
412             except (OSError, socket.error), e:
413                 if e[0] != errno.EINTR:
414                     raise
415                 continue
416             if chunk:
417                 data.append(chunk)
418             else:
419                 # empty chunk = EOF
420                 break
421         data = ''.join(data).split('\n',1)
422         while len(data) < 2:
423             data.append('')
424         data, self._rdbuf = data
425         
426         return data+'\n'
427  
428     def connect(self):
429         self.disconnect()
430         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432         self._ctrl_sock.connect(sock_addr)
433
434     def disconnect(self):
435         try:
436             self._ctrl_sock.close()
437         except:
438             pass
439
440 class Client(object):
441     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
442             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443             environment_setup = ""):
444         self.root_dir = root_dir
445         self.addr = (host, port)
446         self.user = user
447         self.agent = agent
448         self.sudo = sudo
449         self.communication = communication
450         self.environment_setup = environment_setup
451         self._stopped = False
452         self._deferreds = collections.deque()
453         self.connect()
454     
455     def __del__(self):
456         if self._process.poll() is None:
457             os.kill(self._process.pid, signal.SIGTERM)
458         self._process.wait()
459         
460     def connect(self):
461         root_dir = self.root_dir
462         (host, port) = self.addr
463         user = self.user
464         agent = self.agent
465         sudo = self.sudo
466         communication = self.communication
467         
468         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469                 c.forward()" % (root_dir,)
470
471         self._process = popen_python(python_code, 
472                     communication = communication,
473                     host = host, 
474                     port = port, 
475                     user = user, 
476                     agent = agent, 
477                     sudo = sudo, 
478                     environment_setup = self.environment_setup)
479                
480         # Wait for the forwarder to be ready, otherwise nobody
481         # will be able to connect to it
482         err = []
483         helo = "nope"
484         while helo:
485             helo = self._process.stderr.readline()
486             if helo == 'FORWARDER_READY.\n':
487                 break
488             err.append(helo)
489         else:
490             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
491         
492     def send_msg(self, msg):
493         encoded = base64.b64encode(msg)
494         data = "%s\n" % encoded
495         
496         try:
497             self._process.stdin.write(data)
498         except (IOError, ValueError):
499             # dead process, poll it to un-zombify
500             self._process.poll()
501             
502             # try again after reconnect
503             # If it fails again, though, give up
504             self.connect()
505             self._process.stdin.write(data)
506
507     def send_stop(self):
508         self.send_msg(STOP_MSG)
509         self._stopped = True
510
511     def defer_reply(self, transform=None):
512         defer_entry = []
513         self._deferreds.append(defer_entry)
514         return defer.Defer(
515             functools.partial(self.read_reply, defer_entry, transform)
516         )
517         
518     def _read_reply(self):
519         data = self._process.stdout.readline()
520         encoded = data.rstrip() 
521         if not encoded:
522             # empty == eof == dead process, poll it to un-zombify
523             self._process.poll()
524             
525             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526         return base64.b64decode(encoded)
527     
528     def read_reply(self, which=None, transform=None):
529         # Test to see if someone did it already
530         if which is not None and len(which):
531             # Ok, they did it...
532             # ...just return the deferred value
533             if transform:
534                 return transform(which[0])
535             else:
536                 return which[0]
537         
538         # Process all deferreds until the one we're looking for
539         # or until the queue is empty
540         while self._deferreds:
541             try:
542                 deferred = self._deferreds.popleft()
543             except IndexError:
544                 # emptied
545                 break
546             
547             deferred.append(self._read_reply())
548             if deferred is which:
549                 # We reached the one we were looking for
550                 if transform:
551                     return transform(deferred[0])
552                 else:
553                     return deferred[0]
554         
555         if which is None:
556             # They've requested a synchronous read
557             if transform:
558                 return transform(self._read_reply())
559             else:
560                 return self._read_reply()
561
562 def _make_server_key_args(server_key, host, port, args):
563     """ 
564     Returns a reference to the created temporary file, and adds the
565     corresponding arguments to the given argument list.
566     
567     Make sure to hold onto it until the process is done with the file
568     """
569     if port is not None:
570         host = '%s:%s' % (host,port)
571     # Create a temporary server key file
572     tmp_known_hosts = tempfile.NamedTemporaryFile()
573     
574     # Add the intended host key
575     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
576     
577     # If we're not in strict mode, add user-configured keys
578     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580         if os.access(user_hosts_path, os.R_OK):
581             f = open(user_hosts_path, "r")
582             tmp_known_hosts.write(f.read())
583             f.close()
584         
585     tmp_known_hosts.flush()
586     
587     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
588     
589     return tmp_known_hosts
590
591 def make_connkey(user, host, port):
592     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593     if len(connkey) > 60:
594         connkey = hashlib.sha1(connkey).hexdigest()
595     return connkey
596
597 def popen_ssh_command(command, host, port, user, agent, 
598         stdin="", 
599         ident_key = None,
600         server_key = None,
601         tty = False,
602         timeout = None,
603         retry = 0,
604         err_on_timeout = True,
605         connect_timeout = 30,
606         persistent = True,
607         hostip = None):
608     """
609     Executes a remote commands, returns ((stdout,stderr),process)
610     """
611     if TRACE:
612         print "ssh", host, command
613     
614     tmp_known_hosts = None
615     connkey = make_connkey(user,host,port)
616     args = ['ssh', '-C',
617             # Don't bother with localhost. Makes test easier
618             '-o', 'NoHostAuthenticationForLocalhost=yes',
619             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
620             '-o', 'ConnectionAttempts=3',
621             '-o', 'ServerAliveInterval=30',
622             '-o', 'TCPKeepAlive=yes',
623             '-l', user, hostip or host]
624     if persistent and openssh_has_persist():
625         args.extend([
626             '-o', 'ControlMaster=auto',
627             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
628             '-o', 'ControlPersist=60' ])
629     if agent:
630         args.append('-A')
631     if port:
632         args.append('-p%d' % port)
633     if ident_key:
634         args.extend(('-i', ident_key))
635     if tty:
636         args.append('-t')
637     if server_key:
638         # Create a temporary server key file
639         tmp_known_hosts = _make_server_key_args(
640             server_key, host, port, args)
641     args.append(command)
642
643     for x in xrange(retry or 3):
644         # connects to the remote host and starts a remote connection
645         proc = subprocess.Popen(args, 
646                 stdout = subprocess.PIPE,
647                 stdin = subprocess.PIPE, 
648                 stderr = subprocess.PIPE)
649         
650         # attach tempfile object to the process, to make sure the file stays
651         # alive until the process is finished with it
652         proc._known_hosts = tmp_known_hosts
653         
654         try:
655             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
656             if proc.poll():
657                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
658                     # SSH error, can safely retry
659                     continue
660                 elif retry:
661                     # Probably timed out or plain failed but can retry
662                     continue
663             break
664         except RuntimeError,e:
665             if retry <= 0:
666                 raise
667             if TRACE:
668                 print " timedout -> ", e.args
669             retry -= 1
670         
671     if TRACE:
672         print " -> ", out, err
673
674     return ((out, err), proc)
675
676 def popen_scp(source, dest, 
677         port = None, 
678         agent = None, 
679         recursive = False,
680         ident_key = None,
681         server_key = None):
682     """
683     Copies from/to remote sites.
684     
685     Source and destination should have the user and host encoded
686     as per scp specs.
687     
688     If source is a file object, a special mode will be used to
689     create the remote file with the same contents.
690     
691     If dest is a file object, the remote file (source) will be
692     read and written into dest.
693     
694     In these modes, recursive cannot be True.
695     
696     Source can be a list of files to copy to a single destination,
697     in which case it is advised that the destination be a folder.
698     """
699     
700     if TRACE:
701         print "scp", source, dest
702     
703     if isinstance(source, file) and source.tell() == 0:
704         source = source.name
705     elif hasattr(source, 'read'):
706         tmp = tempfile.NamedTemporaryFile()
707         while True:
708             buf = source.read(65536)
709             if buf:
710                 tmp.write(buf)
711             else:
712                 break
713         tmp.seek(0)
714         source = tmp.name
715     
716     if isinstance(source, file) or isinstance(dest, file) \
717             or hasattr(source, 'read')  or hasattr(dest, 'write'):
718         assert not recursive
719         
720         # Parse source/destination as <user>@<server>:<path>
721         if isinstance(dest, basestring) and ':' in dest:
722             remspec, path = dest.split(':',1)
723         elif isinstance(source, basestring) and ':' in source:
724             remspec, path = source.split(':',1)
725         else:
726             raise ValueError, "Both endpoints cannot be local"
727         user,host = remspec.rsplit('@',1)
728         tmp_known_hosts = None
729         
730         connkey = make_connkey(user,host,port)
731         args = ['ssh', '-l', user, '-C',
732                 # Don't bother with localhost. Makes test easier
733                 '-o', 'NoHostAuthenticationForLocalhost=yes',
734                 '-o', 'ConnectTimeout=30',
735                 '-o', 'ConnectionAttempts=3',
736                 '-o', 'ServerAliveInterval=30',
737                 '-o', 'TCPKeepAlive=yes',
738                 host ]
739         if openssh_has_persist():
740             args.extend([
741                 '-o', 'ControlMaster=auto',
742                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
743                 '-o', 'ControlPersist=60' ])
744         if port:
745             args.append('-P%d' % port)
746         if ident_key:
747             args.extend(('-i', ident_key))
748         if server_key:
749             # Create a temporary server key file
750             tmp_known_hosts = _make_server_key_args(
751                 server_key, host, port, args)
752         
753         if isinstance(source, file) or hasattr(source, 'read'):
754             args.append('cat > %s' % (shell_escape(path),))
755         elif isinstance(dest, file) or hasattr(dest, 'write'):
756             args.append('cat %s' % (shell_escape(path),))
757         else:
758             raise AssertionError, "Unreachable code reached! :-Q"
759         
760         # connects to the remote host and starts a remote connection
761         if isinstance(source, file):
762             proc = subprocess.Popen(args, 
763                     stdout = open('/dev/null','w'),
764                     stderr = subprocess.PIPE,
765                     stdin = source)
766             err = proc.stderr.read()
767             proc._known_hosts = tmp_known_hosts
768             eintr_retry(proc.wait)()
769             return ((None,err), proc)
770         elif isinstance(dest, file):
771             proc = subprocess.Popen(args, 
772                     stdout = open('/dev/null','w'),
773                     stderr = subprocess.PIPE,
774                     stdin = source)
775             err = proc.stderr.read()
776             proc._known_hosts = tmp_known_hosts
777             eintr_retry(proc.wait)()
778             return ((None,err), proc)
779         elif hasattr(source, 'read'):
780             # file-like (but not file) source
781             proc = subprocess.Popen(args, 
782                     stdout = open('/dev/null','w'),
783                     stderr = subprocess.PIPE,
784                     stdin = subprocess.PIPE)
785             
786             buf = None
787             err = []
788             while True:
789                 if not buf:
790                     buf = source.read(4096)
791                 if not buf:
792                     #EOF
793                     break
794                 
795                 rdrdy, wrdy, broken = select.select(
796                     [proc.stderr],
797                     [proc.stdin],
798                     [proc.stderr,proc.stdin])
799                 
800                 if proc.stderr in rdrdy:
801                     # use os.read for fully unbuffered behavior
802                     err.append(os.read(proc.stderr.fileno(), 4096))
803                 
804                 if proc.stdin in wrdy:
805                     proc.stdin.write(buf)
806                     buf = None
807                 
808                 if broken:
809                     break
810             proc.stdin.close()
811             err.append(proc.stderr.read())
812                 
813             proc._known_hosts = tmp_known_hosts
814             eintr_retry(proc.wait)()
815             return ((None,''.join(err)), proc)
816         elif hasattr(dest, 'write'):
817             # file-like (but not file) dest
818             proc = subprocess.Popen(args, 
819                     stdout = subprocess.PIPE,
820                     stderr = subprocess.PIPE,
821                     stdin = open('/dev/null','w'))
822             
823             buf = None
824             err = []
825             while True:
826                 rdrdy, wrdy, broken = select.select(
827                     [proc.stderr, proc.stdout],
828                     [],
829                     [proc.stderr, proc.stdout])
830                 
831                 if proc.stderr in rdrdy:
832                     # use os.read for fully unbuffered behavior
833                     err.append(os.read(proc.stderr.fileno(), 4096))
834                 
835                 if proc.stdout in rdrdy:
836                     # use os.read for fully unbuffered behavior
837                     buf = os.read(proc.stdout.fileno(), 4096)
838                     dest.write(buf)
839                     
840                     if not buf:
841                         #EOF
842                         break
843                 
844                 if broken:
845                     break
846             err.append(proc.stderr.read())
847                 
848             proc._known_hosts = tmp_known_hosts
849             eintr_retry(proc.wait)()
850             return ((None,''.join(err)), proc)
851         else:
852             raise AssertionError, "Unreachable code reached! :-Q"
853     else:
854         # Parse destination as <user>@<server>:<path>
855         if isinstance(dest, basestring) and ':' in dest:
856             remspec, path = dest.split(':',1)
857         elif isinstance(source, basestring) and ':' in source:
858             remspec, path = source.split(':',1)
859         else:
860             raise ValueError, "Both endpoints cannot be local"
861         user,host = remspec.rsplit('@',1)
862         
863         # plain scp
864         tmp_known_hosts = None
865         args = ['scp', '-q', '-p', '-C',
866                 # Don't bother with localhost. Makes test easier
867                 '-o', 'NoHostAuthenticationForLocalhost=yes',
868                 '-o', 'ConnectTimeout=30',
869                 '-o', 'ConnectionAttempts=3',
870                 '-o', 'ServerAliveInterval=30',
871                 '-o', 'TCPKeepAlive=yes' ]
872                 
873         if port:
874             args.append('-P%d' % port)
875         if recursive:
876             args.append('-r')
877         if ident_key:
878             args.extend(('-i', ident_key))
879         if server_key:
880             # Create a temporary server key file
881             tmp_known_hosts = _make_server_key_args(
882                 server_key, host, port, args)
883         if isinstance(source,list):
884             args.extend(source)
885         else:
886             if openssh_has_persist():
887                 connkey = make_connkey(user,host,port)
888                 args.extend([
889                     '-o', 'ControlMaster=no',
890                     '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
891             args.append(source)
892         args.append(dest)
893
894         # connects to the remote host and starts a remote connection
895         proc = subprocess.Popen(args, 
896                 stdout = subprocess.PIPE,
897                 stdin = subprocess.PIPE, 
898                 stderr = subprocess.PIPE)
899         proc._known_hosts = tmp_known_hosts
900         
901         comm = proc.communicate()
902         eintr_retry(proc.wait)()
903         return (comm, proc)
904
905 def decode_and_execute():
906     # The python code we want to execute might have characters that 
907     # are not compatible with the 'inline' mode we are using. To avoid
908     # problems we receive the encoded python code in base64 as a input 
909     # stream and decode it for execution.
910     import base64, os
911     cmd = ""
912     while True:
913         try:
914             cmd += os.read(0, 1)# one byte from stdin
915         except OSError, e:            
916             if e.errno == errno.EINTR:
917                 continue
918             else:
919                 raise
920         if cmd[-1] == "\n": 
921             break
922     cmd = base64.b64decode(cmd)
923     # Uncomment for debug
924     #os.write(2, "Executing python code: %s\n" % cmd)
925     os.write(1, "OK\n") # send a sync message
926     exec(cmd)
927
928 def popen_python(python_code, 
929         communication = DC.ACCESS_LOCAL,
930         host = None, 
931         port = None, 
932         user = None, 
933         agent = False, 
934         python_path = None,
935         ident_key = None,
936         server_key = None,
937         tty = False,
938         sudo = False, 
939         environment_setup = ""):
940
941     cmd = ""
942     if python_path:
943         python_path.replace("'", r"'\''")
944         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
945         cmd += " ; "
946     if environment_setup:
947         cmd += environment_setup
948         cmd += " ; "
949     # Uncomment for debug (to run everything under strace)
950     # We had to verify if strace works (cannot nest them)
951     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
952     #cmd += "$CMD "
953     #cmd += "strace -f -tt -s 200 -o strace$$.out "
954     import nepi
955     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
956         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
957     )
958
959     if sudo:
960         if ';' in cmd:
961             cmd = "sudo bash -c " + shell_escape(cmd)
962         else:
963             cmd = "sudo " + cmd
964
965     if communication == DC.ACCESS_SSH:
966         tmp_known_hosts = None
967         args = ['ssh', '-C',
968                 # Don't bother with localhost. Makes test easier
969                 '-o', 'NoHostAuthenticationForLocalhost=yes',
970                 '-o', 'ConnectionAttempts=3',
971                 '-o', 'ServerAliveInterval=30',
972                 '-o', 'TCPKeepAlive=yes',
973                 '-l', user, host]
974         if agent:
975             args.append('-A')
976         if port:
977             args.append('-p%d' % port)
978         if ident_key:
979             args.extend(('-i', ident_key))
980         if tty:
981             args.append('-t')
982         if server_key:
983             # Create a temporary server key file
984             tmp_known_hosts = _make_server_key_args(
985                 server_key, host, port, args)
986         args.append(cmd)
987     else:
988         args = [ "/bin/bash", "-c", cmd ]
989
990     # connects to the remote host and starts a remote
991     proc = subprocess.Popen(args,
992             shell = False, 
993             stdout = subprocess.PIPE,
994             stdin = subprocess.PIPE, 
995             stderr = subprocess.PIPE)
996
997     if communication == DC.ACCESS_SSH:
998         proc._known_hosts = tmp_known_hosts
999
1000     # send the command to execute
1001     os.write(proc.stdin.fileno(),
1002             base64.b64encode(python_code) + "\n")
1003  
1004     while True: 
1005         try:
1006             msg = os.read(proc.stdout.fileno(), 3)
1007             break
1008         except OSError, e:            
1009             if e.errno == errno.EINTR:
1010                 continue
1011             else:
1012                 raise
1013     
1014     if msg != "OK\n":
1015         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1016             msg, proc.stdout.read(), proc.stderr.read())
1017
1018     return proc
1019
1020 # POSIX
1021 def _communicate(self, input, timeout=None, err_on_timeout=True):
1022     read_set = []
1023     write_set = []
1024     stdout = None # Return
1025     stderr = None # Return
1026     
1027     killed = False
1028     
1029     if timeout is not None:
1030         timelimit = time.time() + timeout
1031         killtime = timelimit + 4
1032         bailtime = timelimit + 4
1033
1034     if self.stdin:
1035         # Flush stdio buffer.  This might block, if the user has
1036         # been writing to .stdin in an uncontrolled fashion.
1037         self.stdin.flush()
1038         if input:
1039             write_set.append(self.stdin)
1040         else:
1041             self.stdin.close()
1042     if self.stdout:
1043         read_set.append(self.stdout)
1044         stdout = []
1045     if self.stderr:
1046         read_set.append(self.stderr)
1047         stderr = []
1048
1049     input_offset = 0
1050     while read_set or write_set:
1051         if timeout is not None:
1052             curtime = time.time()
1053             if timeout is None or curtime > timelimit:
1054                 if curtime > bailtime:
1055                     break
1056                 elif curtime > killtime:
1057                     signum = signal.SIGKILL
1058                 else:
1059                     signum = signal.SIGTERM
1060                 # Lets kill it
1061                 os.kill(self.pid, signum)
1062                 select_timeout = 0.5
1063             else:
1064                 select_timeout = timelimit - curtime + 0.1
1065         else:
1066             select_timeout = 1.0
1067         
1068         if select_timeout > 1.0:
1069             select_timeout = 1.0
1070             
1071         try:
1072             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1073         except select.error,e:
1074             if e[0] != 4:
1075                 raise
1076             else:
1077                 continue
1078         
1079         if not rlist and not wlist and not xlist and self.poll() is not None:
1080             # timeout and process exited, say bye
1081             break
1082
1083         if self.stdin in wlist:
1084             # When select has indicated that the file is writable,
1085             # we can write up to PIPE_BUF bytes without risk
1086             # blocking.  POSIX defines PIPE_BUF >= 512
1087             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1088             input_offset += bytes_written
1089             if input_offset >= len(input):
1090                 self.stdin.close()
1091                 write_set.remove(self.stdin)
1092
1093         if self.stdout in rlist:
1094             data = os.read(self.stdout.fileno(), 1024)
1095             if data == "":
1096                 self.stdout.close()
1097                 read_set.remove(self.stdout)
1098             stdout.append(data)
1099
1100         if self.stderr in rlist:
1101             data = os.read(self.stderr.fileno(), 1024)
1102             if data == "":
1103                 self.stderr.close()
1104                 read_set.remove(self.stderr)
1105             stderr.append(data)
1106     
1107     # All data exchanged.  Translate lists into strings.
1108     if stdout is not None:
1109         stdout = ''.join(stdout)
1110     if stderr is not None:
1111         stderr = ''.join(stderr)
1112
1113     # Translate newlines, if requested.  We cannot let the file
1114     # object do the translation: It is based on stdio, which is
1115     # impossible to combine with select (unless forcing no
1116     # buffering).
1117     if self.universal_newlines and hasattr(file, 'newlines'):
1118         if stdout:
1119             stdout = self._translate_newlines(stdout)
1120         if stderr:
1121             stderr = self._translate_newlines(stderr)
1122
1123     if killed and err_on_timeout:
1124         errcode = self.poll()
1125         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1126     else:
1127         if killed:
1128             self.poll()
1129         else:
1130             self.wait()
1131         return (stdout, stderr)
1132