PlanetLab bugfixes: ssh conection timeouts and ControlPath key generation
[nepi.git] / src / nepi / util / server.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.util.constants import DeploymentConfiguration as DC
4
5 import base64
6 import errno
7 import os
8 import os.path
9 import resource
10 import select
11 import shutil
12 import signal
13 import socket
14 import sys
15 import subprocess
16 import threading
17 import time
18 import traceback
19 import re
20 import tempfile
21 import defer
22 import functools
23 import collections
24 import hashlib
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 OPENSSH_HAS_PERSIST = None
36
37 if hasattr(os, "devnull"):
38     DEV_NULL = os.devnull
39 else:
40     DEV_NULL = "/dev/null"
41
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43
44 hostbyname_cache = dict()
45
46 def gethostbyname(host):
47     hostbyname = hostbyname_cache.get(host)
48     if not hostbyname:
49         hostbyname = socket.gethostbyname(host)
50         hostbyname_cache[host] = hostbyname
51     return hostbyname
52
53 def openssh_has_persist():
54     global OPENSSH_HAS_PERSIST
55     if OPENSSH_HAS_PERSIST is None:
56         proc = subprocess.Popen(["ssh","-v"],
57             stdout = subprocess.PIPE,
58             stderr = subprocess.STDOUT,
59             stdin = open("/dev/null","r") )
60         out,err = proc.communicate()
61         proc.wait()
62         
63         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64         OPENSSH_HAS_PERSIST = bool(vre.match(out))
65     return OPENSSH_HAS_PERSIST
66
67 def shell_escape(s):
68     """ Escapes strings so that they are safe to use as command-line arguments """
69     if SHELL_SAFE.match(s):
70         # safe string - no escaping needed
71         return s
72     else:
73         # unsafe string - escape
74         def escp(c):
75             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
76                 return c
77             else:
78                 return "'$'\\x%02x''" % (ord(c),)
79         s = ''.join(map(escp,s))
80         return "'%s'" % (s,)
81
82 def eintr_retry(func):
83     import functools
84     @functools.wraps(func)
85     def rv(*p, **kw):
86         retry = kw.pop("_retry", False)
87         for i in xrange(0 if retry else 4):
88             try:
89                 return func(*p, **kw)
90             except (select.error, socket.error), args:
91                 if args[0] == errno.EINTR:
92                     continue
93                 else:
94                     raise 
95             except OSError, e:
96                 if e.errno == errno.EINTR:
97                     continue
98                 else:
99                     raise
100         else:
101             return func(*p, **kw)
102     return rv
103
104 class Server(object):
105     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
106             environment_setup = "", clean_root = False):
107         self._root_dir = root_dir
108         self._clean_root = clean_root
109         self._stop = False
110         self._ctrl_sock = None
111         self._log_level = log_level
112         self._rdbuf = ""
113         self._environment_setup = environment_setup
114
115     def run(self):
116         try:
117             if self.daemonize():
118                 self.post_daemonize()
119                 self.loop()
120                 self.cleanup()
121                 # ref: "os._exit(0)"
122                 # can not return normally after fork beacuse no exec was done.
123                 # This means that if we don't do a os._exit(0) here the code that 
124                 # follows the call to "Server.run()" in the "caller code" will be 
125                 # executed... but by now it has already been executed after the 
126                 # first process (the one that did the first fork) returned.
127                 os._exit(0)
128         except:
129             print >>sys.stderr, "SERVER_ERROR."
130             self.log_error()
131             self.cleanup()
132             os._exit(0)
133         print >>sys.stderr, "SERVER_READY."
134
135     def daemonize(self):
136         # pipes for process synchronization
137         (r, w) = os.pipe()
138         
139         # build root folder
140         root = os.path.normpath(self._root_dir)
141         if self._root_dir not in [".", ""] and os.path.exists(root) \
142                 and self._clean_root:
143             shutil.rmtree(root)
144         if not os.path.exists(root):
145             os.makedirs(root, 0755)
146
147         pid1 = os.fork()
148         if pid1 > 0:
149             os.close(w)
150             while True:
151                 try:
152                     os.read(r, 1)
153                 except OSError, e: # pragma: no cover
154                     if e.errno == errno.EINTR:
155                         continue
156                     else:
157                         raise
158                 break
159             os.close(r)
160             # os.waitpid avoids leaving a <defunc> (zombie) process
161             st = os.waitpid(pid1, 0)[1]
162             if st:
163                 raise RuntimeError("Daemonization failed")
164             # return 0 to inform the caller method that this is not the 
165             # daemonized process
166             return 0
167         os.close(r)
168
169         # Decouple from parent environment.
170         os.chdir(self._root_dir)
171         os.umask(0)
172         os.setsid()
173
174         # fork 2
175         pid2 = os.fork()
176         if pid2 > 0:
177             # see ref: "os._exit(0)"
178             os._exit(0)
179
180         # close all open file descriptors.
181         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182         if (max_fd == resource.RLIM_INFINITY):
183             max_fd = MAX_FD
184         for fd in range(3, max_fd):
185             if fd != w:
186                 try:
187                     os.close(fd)
188                 except OSError:
189                     pass
190
191         # Redirect standard file descriptors.
192         stdin = open(DEV_NULL, "r")
193         stderr = stdout = open(STD_ERR, "a", 0)
194         os.dup2(stdin.fileno(), sys.stdin.fileno())
195         # NOTE: sys.stdout.write will still be buffered, even if the file
196         # was opened with 0 buffer
197         os.dup2(stdout.fileno(), sys.stdout.fileno())
198         os.dup2(stderr.fileno(), sys.stderr.fileno())
199         
200         # setup environment
201         if self._environment_setup:
202             # parse environment variables and pass to child process
203             # do it by executing shell commands, in case there's some heavy setup involved
204             envproc = subprocess.Popen(
205                 [ "bash", "-c", 
206                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207                         ( self._environment_setup, ) ],
208                 stdin = subprocess.PIPE, 
209                 stdout = subprocess.PIPE,
210                 stderr = subprocess.PIPE
211             )
212             out,err = envproc.communicate()
213
214             # parse new environment
215             if out:
216                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
217             
218                 # apply to current environment
219                 for name, value in environment.iteritems():
220                     os.environ[name] = value
221                 
222                 # apply pythonpath
223                 if 'PYTHONPATH' in environment:
224                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
225
226         # create control socket
227         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
228         try:
229             self._ctrl_sock.bind(CTRL_SOCK)
230         except socket.error:
231             # Address in use, check pidfile
232             pid = None
233             try:
234                 pidfile = open(CTRL_PID, "r")
235                 pid = pidfile.read()
236                 pidfile.close()
237                 pid = int(pid)
238             except:
239                 # no pidfile
240                 pass
241             
242             if pid is not None:
243                 # Check process liveliness
244                 if not os.path.exists("/proc/%d" % (pid,)):
245                     # Ok, it's dead, clean the socket
246                     os.remove(CTRL_SOCK)
247             
248             # try again
249             self._ctrl_sock.bind(CTRL_SOCK)
250             
251         self._ctrl_sock.listen(0)
252         
253         # Save pidfile
254         pidfile = open(CTRL_PID, "w")
255         pidfile.write(str(os.getpid()))
256         pidfile.close()
257
258         # let the parent process know that the daemonization is finished
259         os.write(w, "\n")
260         os.close(w)
261         return 1
262
263     def post_daemonize(self):
264         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265         # QT, for some strange reason, redefines the SIGCHILD handler to write
266         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267         # Server dameonization closes all file descriptors from fileno '3',
268         # but the overloaded handler (inherited by the forked process) will
269         # keep trying to write the \0 to fileno 'x', which might have been reused 
270         # after closing, for other operations. This is bad bad bad when fileno 'x'
271         # is in use for communication pouroses, because unexpected \0 start
272         # appearing in the communication messages... this is exactly what happens 
273         # when using netns in daemonized form. Thus, be have no other alternative than
274         # restoring the SIGCHLD handler to the default here.
275         import signal
276         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
277
278     def loop(self):
279         while not self._stop:
280             conn, addr = self._ctrl_sock.accept()
281             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
282             conn.settimeout(5)
283             while not self._stop:
284                 try:
285                     msg = self.recv_msg(conn)
286                 except socket.timeout, e:
287                     #self.log_error("SERVER recv_msg: connection timedout ")
288                     continue
289                 
290                 if not msg:
291                     self.log_error("CONNECTION LOST")
292                     break
293                     
294                 if msg == STOP_MSG:
295                     self._stop = True
296                     reply = self.stop_action()
297                 else:
298                     reply = self.reply_action(msg)
299                 
300                 try:
301                     self.send_reply(conn, reply)
302                 except socket.error:
303                     self.log_error()
304                     self.log_error("NOTICE: Awaiting for reconnection")
305                     break
306             try:
307                 conn.close()
308             except:
309                 # Doesn't matter
310                 self.log_error()
311
312     def recv_msg(self, conn):
313         data = [self._rdbuf]
314         chunk = data[0]
315         while '\n' not in chunk:
316             try:
317                 chunk = conn.recv(1024)
318             except (OSError, socket.error), e:
319                 if e[0] != errno.EINTR:
320                     raise
321                 else:
322                     continue
323             if chunk:
324                 data.append(chunk)
325             else:
326                 # empty chunk = EOF
327                 break
328         data = ''.join(data).split('\n',1)
329         while len(data) < 2:
330             data.append('')
331         data, self._rdbuf = data
332         
333         decoded = base64.b64decode(data)
334         return decoded.rstrip()
335
336     def send_reply(self, conn, reply):
337         encoded = base64.b64encode(reply)
338         conn.send("%s\n" % encoded)
339        
340     def cleanup(self):
341         try:
342             self._ctrl_sock.close()
343             os.remove(CTRL_SOCK)
344         except:
345             self.log_error()
346
347     def stop_action(self):
348         return "Stopping server"
349
350     def reply_action(self, msg):
351         return "Reply to: %s" % msg
352
353     def log_error(self, text = None, context = ''):
354         if text == None:
355             text = traceback.format_exc()
356         date = time.strftime("%Y-%m-%d %H:%M:%S")
357         if context:
358             context = " (%s)" % (context,)
359         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
360         return text
361
362     def log_debug(self, text):
363         if self._log_level == DC.DEBUG_LEVEL:
364             date = time.strftime("%Y-%m-%d %H:%M:%S")
365             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
366
367 class Forwarder(object):
368     def __init__(self, root_dir = "."):
369         self._ctrl_sock = None
370         self._root_dir = root_dir
371         self._stop = False
372         self._rdbuf = ""
373
374     def forward(self):
375         self.connect()
376         print >>sys.stderr, "FORWARDER_READY."
377         while not self._stop:
378             data = self.read_data()
379             if not data:
380                 # Connection to client lost
381                 break
382             self.send_to_server(data)
383             
384             data = self.recv_from_server()
385             if not data:
386                 # Connection to server lost
387                 raise IOError, "Connection to server lost while "\
388                     "expecting response"
389             self.write_data(data)
390         self.disconnect()
391
392     def read_data(self):
393         return sys.stdin.readline()
394
395     def write_data(self, data):
396         sys.stdout.write(data)
397         # sys.stdout.write is buffered, this is why we need to do a flush()
398         sys.stdout.flush()
399
400     def send_to_server(self, data):
401         try:
402             self._ctrl_sock.send(data)
403         except (IOError, socket.error), e:
404             if e[0] == errno.EPIPE:
405                 self.connect()
406                 self._ctrl_sock.send(data)
407             else:
408                 raise e
409         encoded = data.rstrip() 
410         msg = base64.b64decode(encoded)
411         if msg == STOP_MSG:
412             self._stop = True
413
414     def recv_from_server(self):
415         data = [self._rdbuf]
416         chunk = data[0]
417         while '\n' not in chunk:
418             try:
419                 chunk = self._ctrl_sock.recv(1024)
420             except (OSError, socket.error), e:
421                 if e[0] != errno.EINTR:
422                     raise
423                 continue
424             if chunk:
425                 data.append(chunk)
426             else:
427                 # empty chunk = EOF
428                 break
429         data = ''.join(data).split('\n',1)
430         while len(data) < 2:
431             data.append('')
432         data, self._rdbuf = data
433         
434         return data+'\n'
435  
436     def connect(self):
437         self.disconnect()
438         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440         self._ctrl_sock.connect(sock_addr)
441
442     def disconnect(self):
443         try:
444             self._ctrl_sock.close()
445         except:
446             pass
447
448 class Client(object):
449     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
450             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451             environment_setup = ""):
452         self.root_dir = root_dir
453         self.addr = (host, port)
454         self.user = user
455         self.agent = agent
456         self.sudo = sudo
457         self.communication = communication
458         self.environment_setup = environment_setup
459         self._stopped = False
460         self._deferreds = collections.deque()
461         self.connect()
462     
463     def __del__(self):
464         if self._process.poll() is None:
465             os.kill(self._process.pid, signal.SIGTERM)
466         self._process.wait()
467         
468     def connect(self):
469         root_dir = self.root_dir
470         (host, port) = self.addr
471         user = self.user
472         agent = self.agent
473         sudo = self.sudo
474         communication = self.communication
475         
476         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477                 c.forward()" % (root_dir,)
478
479         self._process = popen_python(python_code, 
480                     communication = communication,
481                     host = host, 
482                     port = port, 
483                     user = user, 
484                     agent = agent, 
485                     sudo = sudo, 
486                     environment_setup = self.environment_setup)
487                
488         # Wait for the forwarder to be ready, otherwise nobody
489         # will be able to connect to it
490         err = []
491         helo = "nope"
492         while helo:
493             helo = self._process.stderr.readline()
494             if helo == 'FORWARDER_READY.\n':
495                 break
496             err.append(helo)
497         else:
498             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
499         
500     def send_msg(self, msg):
501         encoded = base64.b64encode(msg)
502         data = "%s\n" % encoded
503         
504         try:
505             self._process.stdin.write(data)
506         except (IOError, ValueError):
507             # dead process, poll it to un-zombify
508             self._process.poll()
509             
510             # try again after reconnect
511             # If it fails again, though, give up
512             self.connect()
513             self._process.stdin.write(data)
514
515     def send_stop(self):
516         self.send_msg(STOP_MSG)
517         self._stopped = True
518
519     def defer_reply(self, transform=None):
520         defer_entry = []
521         self._deferreds.append(defer_entry)
522         return defer.Defer(
523             functools.partial(self.read_reply, defer_entry, transform)
524         )
525         
526     def _read_reply(self):
527         data = self._process.stdout.readline()
528         encoded = data.rstrip() 
529         if not encoded:
530             # empty == eof == dead process, poll it to un-zombify
531             self._process.poll()
532             
533             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534         return base64.b64decode(encoded)
535     
536     def read_reply(self, which=None, transform=None):
537         # Test to see if someone did it already
538         if which is not None and len(which):
539             # Ok, they did it...
540             # ...just return the deferred value
541             if transform:
542                 return transform(which[0])
543             else:
544                 return which[0]
545         
546         # Process all deferreds until the one we're looking for
547         # or until the queue is empty
548         while self._deferreds:
549             try:
550                 deferred = self._deferreds.popleft()
551             except IndexError:
552                 # emptied
553                 break
554             
555             deferred.append(self._read_reply())
556             if deferred is which:
557                 # We reached the one we were looking for
558                 if transform:
559                     return transform(deferred[0])
560                 else:
561                     return deferred[0]
562         
563         if which is None:
564             # They've requested a synchronous read
565             if transform:
566                 return transform(self._read_reply())
567             else:
568                 return self._read_reply()
569
570 def _make_server_key_args(server_key, host, port, args):
571     """ 
572     Returns a reference to the created temporary file, and adds the
573     corresponding arguments to the given argument list.
574     
575     Make sure to hold onto it until the process is done with the file
576     """
577     if port is not None:
578         host = '%s:%s' % (host,port)
579     # Create a temporary server key file
580     tmp_known_hosts = tempfile.NamedTemporaryFile()
581    
582     hostbyname = gethostbyname(host) 
583
584     # Add the intended host key
585     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
586     
587     # If we're not in strict mode, add user-configured keys
588     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590         if os.access(user_hosts_path, os.R_OK):
591             f = open(user_hosts_path, "r")
592             tmp_known_hosts.write(f.read())
593             f.close()
594         
595     tmp_known_hosts.flush()
596     
597     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
598     
599     return tmp_known_hosts
600
601 def popen_ssh_command(command, host, port, user, agent, 
602         stdin="", 
603         ident_key = None,
604         server_key = None,
605         tty = False,
606         timeout = None,
607         retry = 0,
608         err_on_timeout = True,
609         connect_timeout = 900,
610         persistent = True,
611         hostip = None):
612     """
613     Executes a remote commands, returns ((stdout,stderr),process)
614     """
615     if TRACE:
616         print "ssh", host, command
617     
618     tmp_known_hosts = None
619     args = ['ssh', '-C',
620             # Don't bother with localhost. Makes test easier
621             '-o', 'NoHostAuthenticationForLocalhost=yes',
622             # XXX: Security vulnerability
623             #'-o', 'StrictHostKeyChecking=no',
624             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625             '-o', 'ConnectionAttempts=3',
626             '-o', 'ServerAliveInterval=30',
627             '-o', 'TCPKeepAlive=yes',
628             '-l', user, hostip or host]
629     if persistent and openssh_has_persist():
630         args.extend([
631             '-o', 'ControlMaster=auto',
632             '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633             '-o', 'ControlPersist=60' ])
634     if agent:
635         args.append('-A')
636     if port:
637         args.append('-p%d' % port)
638     if ident_key:
639         args.extend(('-i', ident_key))
640     if tty:
641         args.append('-t')
642     if server_key:
643         # Create a temporary server key file
644         tmp_known_hosts = _make_server_key_args(
645             server_key, host, port, args)
646     args.append(command)
647
648     for x in xrange(retry or 3):
649         # connects to the remote host and starts a remote connection
650         proc = subprocess.Popen(args, 
651                 stdout = subprocess.PIPE,
652                 stdin = subprocess.PIPE, 
653                 stderr = subprocess.PIPE)
654         
655         # attach tempfile object to the process, to make sure the file stays
656         # alive until the process is finished with it
657         proc._known_hosts = tmp_known_hosts
658     
659         try:
660             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
661             if proc.poll():
662                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663                     # SSH error, can safely retry
664                     continue
665                 elif retry:
666                     # Probably timed out or plain failed but can retry
667                     continue
668             break
669         except RuntimeError,e:
670             if retry <= 0:
671                 raise
672             if TRACE:
673                 print " timedout -> ", e.args
674             retry -= 1
675         
676     if TRACE:
677         print " -> ", out, err
678
679     return ((out, err), proc)
680
681 def popen_scp(source, dest, 
682         port = None, 
683         agent = None, 
684         recursive = False,
685         ident_key = None,
686         server_key = None):
687     """
688     Copies from/to remote sites.
689     
690     Source and destination should have the user and host encoded
691     as per scp specs.
692     
693     If source is a file object, a special mode will be used to
694     create the remote file with the same contents.
695     
696     If dest is a file object, the remote file (source) will be
697     read and written into dest.
698     
699     In these modes, recursive cannot be True.
700     
701     Source can be a list of files to copy to a single destination,
702     in which case it is advised that the destination be a folder.
703     """
704     
705     if TRACE:
706         print "scp", source, dest
707     
708     if isinstance(source, file) and source.tell() == 0:
709         source = source.name
710     elif hasattr(source, 'read'):
711         tmp = tempfile.NamedTemporaryFile()
712         while True:
713             buf = source.read(65536)
714             if buf:
715                 tmp.write(buf)
716             else:
717                 break
718         tmp.seek(0)
719         source = tmp.name
720     
721     if isinstance(source, file) or isinstance(dest, file) \
722             or hasattr(source, 'read')  or hasattr(dest, 'write'):
723         assert not recursive
724         
725         # Parse source/destination as <user>@<server>:<path>
726         if isinstance(dest, basestring) and ':' in dest:
727             remspec, path = dest.split(':',1)
728         elif isinstance(source, basestring) and ':' in source:
729             remspec, path = source.split(':',1)
730         else:
731             raise ValueError, "Both endpoints cannot be local"
732         user,host = remspec.rsplit('@',1)
733         tmp_known_hosts = None
734         
735         args = ['ssh', '-l', user, '-C',
736                 # Don't bother with localhost. Makes test easier
737                 '-o', 'NoHostAuthenticationForLocalhost=yes',
738                 # XXX: Security vulnerability
739                 #'-o', 'StrictHostKeyChecking=no',
740                 '-o', 'ConnectTimeout=900',
741                 '-o', 'ConnectionAttempts=3',
742                 '-o', 'ServerAliveInterval=30',
743                 '-o', 'TCPKeepAlive=yes',
744                 host ]
745         if openssh_has_persist():
746             args.extend([
747                 '-o', 'ControlMaster=auto',
748                 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
749                 '-o', 'ControlPersist=60' ])
750         if port:
751             args.append('-P%d' % port)
752         if ident_key:
753             args.extend(('-i', ident_key))
754         if server_key:
755             # Create a temporary server key file
756             tmp_known_hosts = _make_server_key_args(
757                 server_key, host, port, args)
758         
759         if isinstance(source, file) or hasattr(source, 'read'):
760             args.append('cat > %s' % (shell_escape(path),))
761         elif isinstance(dest, file) or hasattr(dest, 'write'):
762             args.append('cat %s' % (shell_escape(path),))
763         else:
764             raise AssertionError, "Unreachable code reached! :-Q"
765         
766         # connects to the remote host and starts a remote connection
767         if isinstance(source, file):
768             proc = subprocess.Popen(args, 
769                     stdout = open('/dev/null','w'),
770                     stderr = subprocess.PIPE,
771                     stdin = source)
772             err = proc.stderr.read()
773             proc._known_hosts = tmp_known_hosts
774             eintr_retry(proc.wait)()
775             return ((None,err), proc)
776         elif isinstance(dest, file):
777             proc = subprocess.Popen(args, 
778                     stdout = open('/dev/null','w'),
779                     stderr = subprocess.PIPE,
780                     stdin = source)
781             err = proc.stderr.read()
782             proc._known_hosts = tmp_known_hosts
783             eintr_retry(proc.wait)()
784             return ((None,err), proc)
785         elif hasattr(source, 'read'):
786             # file-like (but not file) source
787             proc = subprocess.Popen(args, 
788                     stdout = open('/dev/null','w'),
789                     stderr = subprocess.PIPE,
790                     stdin = subprocess.PIPE)
791             
792             buf = None
793             err = []
794             while True:
795                 if not buf:
796                     buf = source.read(4096)
797                 if not buf:
798                     #EOF
799                     break
800                 
801                 rdrdy, wrdy, broken = select.select(
802                     [proc.stderr],
803                     [proc.stdin],
804                     [proc.stderr,proc.stdin])
805                 
806                 if proc.stderr in rdrdy:
807                     # use os.read for fully unbuffered behavior
808                     err.append(os.read(proc.stderr.fileno(), 4096))
809                 
810                 if proc.stdin in wrdy:
811                     proc.stdin.write(buf)
812                     buf = None
813                 
814                 if broken:
815                     break
816             proc.stdin.close()
817             err.append(proc.stderr.read())
818                 
819             proc._known_hosts = tmp_known_hosts
820             eintr_retry(proc.wait)()
821             return ((None,''.join(err)), proc)
822         elif hasattr(dest, 'write'):
823             # file-like (but not file) dest
824             proc = subprocess.Popen(args, 
825                     stdout = subprocess.PIPE,
826                     stderr = subprocess.PIPE,
827                     stdin = open('/dev/null','w'))
828             
829             buf = None
830             err = []
831             while True:
832                 rdrdy, wrdy, broken = select.select(
833                     [proc.stderr, proc.stdout],
834                     [],
835                     [proc.stderr, proc.stdout])
836                 
837                 if proc.stderr in rdrdy:
838                     # use os.read for fully unbuffered behavior
839                     err.append(os.read(proc.stderr.fileno(), 4096))
840                 
841                 if proc.stdout in rdrdy:
842                     # use os.read for fully unbuffered behavior
843                     buf = os.read(proc.stdout.fileno(), 4096)
844                     dest.write(buf)
845                     
846                     if not buf:
847                         #EOF
848                         break
849                 
850                 if broken:
851                     break
852             err.append(proc.stderr.read())
853                 
854             proc._known_hosts = tmp_known_hosts
855             eintr_retry(proc.wait)()
856             return ((None,''.join(err)), proc)
857         else:
858             raise AssertionError, "Unreachable code reached! :-Q"
859     else:
860         # Parse destination as <user>@<server>:<path>
861         if isinstance(dest, basestring) and ':' in dest:
862             remspec, path = dest.split(':',1)
863         elif isinstance(source, basestring) and ':' in source:
864             remspec, path = source.split(':',1)
865         else:
866             raise ValueError, "Both endpoints cannot be local"
867         user,host = remspec.rsplit('@',1)
868         
869         # plain scp
870         tmp_known_hosts = None
871         args = ['scp', '-q', '-p', '-C',
872                 # Don't bother with localhost. Makes test easier
873                 '-o', 'NoHostAuthenticationForLocalhost=yes',
874                 # XXX: Security vulnerability
875                 #'-o', 'StrictHostKeyChecking=no',
876                 '-o', 'ConnectTimeout=900',
877                 '-o', 'ConnectionAttempts=3',
878                 '-o', 'ServerAliveInterval=30',
879                 '-o', 'TCPKeepAlive=yes' ]
880                 
881         if port:
882             args.append('-P%d' % port)
883         if recursive:
884             args.append('-r')
885         if ident_key:
886             args.extend(('-i', ident_key))
887         if server_key:
888             # Create a temporary server key file
889             tmp_known_hosts = _make_server_key_args(
890                 server_key, host, port, args)
891         if isinstance(source,list):
892             args.extend(source)
893         else:
894             if openssh_has_persist():
895                 args.extend([
896                     '-o', 'ControlMaster=auto',
897                     '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
898             args.append(source)
899         args.append(dest)
900
901         # connects to the remote host and starts a remote connection
902         proc = subprocess.Popen(args, 
903                 stdout = subprocess.PIPE,
904                 stdin = subprocess.PIPE, 
905                 stderr = subprocess.PIPE)
906         proc._known_hosts = tmp_known_hosts
907         
908         comm = proc.communicate()
909         eintr_retry(proc.wait)()
910         return (comm, proc)
911
912 def decode_and_execute():
913     # The python code we want to execute might have characters that 
914     # are not compatible with the 'inline' mode we are using. To avoid
915     # problems we receive the encoded python code in base64 as a input 
916     # stream and decode it for execution.
917     import base64, os
918     cmd = ""
919     while True:
920         try:
921             cmd += os.read(0, 1)# one byte from stdin
922         except OSError, e:            
923             if e.errno == errno.EINTR:
924                 continue
925             else:
926                 raise
927         if cmd[-1] == "\n": 
928             break
929     cmd = base64.b64decode(cmd)
930     # Uncomment for debug
931     #os.write(2, "Executing python code: %s\n" % cmd)
932     os.write(1, "OK\n") # send a sync message
933     exec(cmd)
934
935 def popen_python(python_code, 
936         communication = DC.ACCESS_LOCAL,
937         host = None, 
938         port = None, 
939         user = None, 
940         agent = False, 
941         python_path = None,
942         ident_key = None,
943         server_key = None,
944         tty = False,
945         sudo = False, 
946         environment_setup = ""):
947
948     cmd = ""
949     if python_path:
950         python_path.replace("'", r"'\''")
951         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
952         cmd += " ; "
953     if environment_setup:
954         cmd += environment_setup
955         cmd += " ; "
956     # Uncomment for debug (to run everything under strace)
957     # We had to verify if strace works (cannot nest them)
958     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
959     #cmd += "$CMD "
960     #cmd += "strace -f -tt -s 200 -o strace$$.out "
961     import nepi
962     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
963         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
964     )
965
966     if sudo:
967         if ';' in cmd:
968             cmd = "sudo bash -c " + shell_escape(cmd)
969         else:
970             cmd = "sudo " + cmd
971
972     if communication == DC.ACCESS_SSH:
973         tmp_known_hosts = None
974         args = ['ssh', '-C',
975                 # Don't bother with localhost. Makes test easier
976                 '-o', 'NoHostAuthenticationForLocalhost=yes',
977                 # XXX: Security vulnerability
978                 #'-o', 'StrictHostKeyChecking=no',
979                 '-o', 'ConnectionAttempts=3',
980                 '-o', 'ServerAliveInterval=30',
981                 '-o', 'TCPKeepAlive=yes',
982                 '-l', user, host]
983         if agent:
984             args.append('-A')
985         if port:
986             args.append('-p%d' % port)
987         if ident_key:
988             args.extend(('-i', ident_key))
989         if tty:
990             args.append('-t')
991         if server_key:
992             # Create a temporary server key file
993             tmp_known_hosts = _make_server_key_args(
994                 server_key, host, port, args)
995         args.append(cmd)
996     else:
997         args = [ "/bin/bash", "-c", cmd ]
998
999     # connects to the remote host and starts a remote
1000     proc = subprocess.Popen(args,
1001             shell = False, 
1002             stdout = subprocess.PIPE,
1003             stdin = subprocess.PIPE, 
1004             stderr = subprocess.PIPE)
1005
1006     if communication == DC.ACCESS_SSH:
1007         proc._known_hosts = tmp_known_hosts
1008
1009     # send the command to execute
1010     os.write(proc.stdin.fileno(),
1011             base64.b64encode(python_code) + "\n")
1012  
1013     while True: 
1014         try:
1015             msg = os.read(proc.stdout.fileno(), 3)
1016             break
1017         except OSError, e:            
1018             if e.errno == errno.EINTR:
1019                 continue
1020             else:
1021                 raise
1022     
1023     if msg != "OK\n":
1024         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1025             msg, proc.stdout.read(), proc.stderr.read())
1026
1027     return proc
1028
1029 # POSIX
1030 def _communicate(self, input, timeout=None, err_on_timeout=True):
1031     read_set = []
1032     write_set = []
1033     stdout = None # Return
1034     stderr = None # Return
1035     
1036     killed = False
1037     
1038     if timeout is not None:
1039         timelimit = time.time() + timeout
1040         killtime = timelimit + 4
1041         bailtime = timelimit + 4
1042
1043     if self.stdin:
1044         # Flush stdio buffer.  This might block, if the user has
1045         # been writing to .stdin in an uncontrolled fashion.
1046         self.stdin.flush()
1047         if input:
1048             write_set.append(self.stdin)
1049         else:
1050             self.stdin.close()
1051     if self.stdout:
1052         read_set.append(self.stdout)
1053         stdout = []
1054     if self.stderr:
1055         read_set.append(self.stderr)
1056         stderr = []
1057
1058     input_offset = 0
1059     while read_set or write_set:
1060         if timeout is not None:
1061             curtime = time.time()
1062             if timeout is None or curtime > timelimit:
1063                 if curtime > bailtime:
1064                     break
1065                 elif curtime > killtime:
1066                     signum = signal.SIGKILL
1067                 else:
1068                     signum = signal.SIGTERM
1069                 # Lets kill it
1070                 os.kill(self.pid, signum)
1071                 select_timeout = 0.5
1072             else:
1073                 select_timeout = timelimit - curtime + 0.1
1074         else:
1075             select_timeout = 1.0
1076         
1077         if select_timeout > 1.0:
1078             select_timeout = 1.0
1079             
1080         try:
1081             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1082         except select.error,e:
1083             if e[0] != 4:
1084                 raise
1085             else:
1086                 continue
1087         
1088         if not rlist and not wlist and not xlist and self.poll() is not None:
1089             # timeout and process exited, say bye
1090             break
1091
1092         if self.stdin in wlist:
1093             # When select has indicated that the file is writable,
1094             # we can write up to PIPE_BUF bytes without risk
1095             # blocking.  POSIX defines PIPE_BUF >= 512
1096             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1097             input_offset += bytes_written
1098             if input_offset >= len(input):
1099                 self.stdin.close()
1100                 write_set.remove(self.stdin)
1101
1102         if self.stdout in rlist:
1103             data = os.read(self.stdout.fileno(), 1024)
1104             if data == "":
1105                 self.stdout.close()
1106                 read_set.remove(self.stdout)
1107             stdout.append(data)
1108
1109         if self.stderr in rlist:
1110             data = os.read(self.stderr.fileno(), 1024)
1111             if data == "":
1112                 self.stderr.close()
1113                 read_set.remove(self.stderr)
1114             stderr.append(data)
1115     
1116     # All data exchanged.  Translate lists into strings.
1117     if stdout is not None:
1118         stdout = ''.join(stdout)
1119     if stderr is not None:
1120         stderr = ''.join(stderr)
1121
1122     # Translate newlines, if requested.  We cannot let the file
1123     # object do the translation: It is based on stdio, which is
1124     # impossible to combine with select (unless forcing no
1125     # buffering).
1126     if self.universal_newlines and hasattr(file, 'newlines'):
1127         if stdout:
1128             stdout = self._translate_newlines(stdout)
1129         if stderr:
1130             stderr = self._translate_newlines(stderr)
1131
1132     if killed and err_on_timeout:
1133         errcode = self.poll()
1134         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1135     else:
1136         if killed:
1137             self.poll()
1138         else:
1139             self.wait()
1140         return (stdout, stderr)
1141