Retry operations on networking errors. Really common from wan
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if self._root_dir not in [".", ""] and os.path.exists(root) \
117                 and self._clean_root:
118             shutil.rmtree(root)
119         if not os.path.exists(root):
120             os.makedirs(root, 0755)
121
122         pid1 = os.fork()
123         if pid1 > 0:
124             os.close(w)
125             while True:
126                 try:
127                     os.read(r, 1)
128                 except OSError, e: # pragma: no cover
129                     if e.errno == errno.EINTR:
130                         continue
131                     else:
132                         raise
133                 break
134             os.close(r)
135             # os.waitpid avoids leaving a <defunc> (zombie) process
136             st = os.waitpid(pid1, 0)[1]
137             if st:
138                 raise RuntimeError("Daemonization failed")
139             # return 0 to inform the caller method that this is not the 
140             # daemonized process
141             return 0
142         os.close(r)
143
144         # Decouple from parent environment.
145         os.chdir(self._root_dir)
146         os.umask(0)
147         os.setsid()
148
149         # fork 2
150         pid2 = os.fork()
151         if pid2 > 0:
152             # see ref: "os._exit(0)"
153             os._exit(0)
154
155         # close all open file descriptors.
156         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157         if (max_fd == resource.RLIM_INFINITY):
158             max_fd = MAX_FD
159         for fd in range(3, max_fd):
160             if fd != w:
161                 try:
162                     os.close(fd)
163                 except OSError:
164                     pass
165
166         # Redirect standard file descriptors.
167         stdin = open(DEV_NULL, "r")
168         stderr = stdout = open(STD_ERR, "a", 0)
169         os.dup2(stdin.fileno(), sys.stdin.fileno())
170         # NOTE: sys.stdout.write will still be buffered, even if the file
171         # was opened with 0 buffer
172         os.dup2(stdout.fileno(), sys.stdout.fileno())
173         os.dup2(stderr.fileno(), sys.stderr.fileno())
174         
175         # setup environment
176         if self._environment_setup:
177             # parse environment variables and pass to child process
178             # do it by executing shell commands, in case there's some heavy setup involved
179             envproc = subprocess.Popen(
180                 [ "bash", "-c", 
181                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182                         ( self._environment_setup, ) ],
183                 stdin = subprocess.PIPE, 
184                 stdout = subprocess.PIPE,
185                 stderr = subprocess.PIPE
186             )
187             out,err = envproc.communicate()
188
189             # parse new environment
190             if out:
191                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192             
193                 # apply to current environment
194                 for name, value in environment.iteritems():
195                     os.environ[name] = value
196                 
197                 # apply pythonpath
198                 if 'PYTHONPATH' in environment:
199                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
200
201         # create control socket
202         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203         try:
204             self._ctrl_sock.bind(CTRL_SOCK)
205         except socket.error:
206             # Address in use, check pidfile
207             pid = None
208             try:
209                 pidfile = open(CTRL_PID, "r")
210                 pid = pidfile.read()
211                 pidfile.close()
212                 pid = int(pid)
213             except:
214                 # no pidfile
215                 pass
216             
217             if pid is not None:
218                 # Check process liveliness
219                 if not os.path.exists("/proc/%d" % (pid,)):
220                     # Ok, it's dead, clean the socket
221                     os.remove(CTRL_SOCK)
222             
223             # try again
224             self._ctrl_sock.bind(CTRL_SOCK)
225             
226         self._ctrl_sock.listen(0)
227         
228         # Save pidfile
229         pidfile = open(CTRL_PID, "w")
230         pidfile.write(str(os.getpid()))
231         pidfile.close()
232
233         # let the parent process know that the daemonization is finished
234         os.write(w, "\n")
235         os.close(w)
236         return 1
237
238     def post_daemonize(self):
239         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240         # QT, for some strange reason, redefines the SIGCHILD handler to write
241         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242         # Server dameonization closes all file descriptors from fileno '3',
243         # but the overloaded handler (inherited by the forked process) will
244         # keep trying to write the \0 to fileno 'x', which might have been reused 
245         # after closing, for other operations. This is bad bad bad when fileno 'x'
246         # is in use for communication pouroses, because unexpected \0 start
247         # appearing in the communication messages... this is exactly what happens 
248         # when using netns in daemonized form. Thus, be have no other alternative than
249         # restoring the SIGCHLD handler to the default here.
250         import signal
251         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
252
253     def loop(self):
254         while not self._stop:
255             conn, addr = self._ctrl_sock.accept()
256             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257             conn.settimeout(5)
258             while not self._stop:
259                 try:
260                     msg = self.recv_msg(conn)
261                 except socket.timeout, e:
262                     #self.log_error("SERVER recv_msg: connection timedout ")
263                     continue
264                 
265                 if not msg:
266                     self.log_error("CONNECTION LOST")
267                     break
268                     
269                 if msg == STOP_MSG:
270                     self._stop = True
271                     reply = self.stop_action()
272                 else:
273                     reply = self.reply_action(msg)
274                 
275                 try:
276                     self.send_reply(conn, reply)
277                 except socket.error:
278                     self.log_error()
279                     self.log_error("NOTICE: Awaiting for reconnection")
280                     break
281             try:
282                 conn.close()
283             except:
284                 # Doesn't matter
285                 self.log_error()
286
287     def recv_msg(self, conn):
288         data = [self._rdbuf]
289         chunk = data[0]
290         while '\n' not in chunk:
291             try:
292                 chunk = conn.recv(1024)
293             except (OSError, socket.error), e:
294                 if e[0] != errno.EINTR:
295                     raise
296                 else:
297                     continue
298             if chunk:
299                 data.append(chunk)
300             else:
301                 # empty chunk = EOF
302                 break
303         data = ''.join(data).split('\n',1)
304         while len(data) < 2:
305             data.append('')
306         data, self._rdbuf = data
307         
308         decoded = base64.b64decode(data)
309         return decoded.rstrip()
310
311     def send_reply(self, conn, reply):
312         encoded = base64.b64encode(reply)
313         conn.send("%s\n" % encoded)
314        
315     def cleanup(self):
316         try:
317             self._ctrl_sock.close()
318             os.remove(CTRL_SOCK)
319         except:
320             self.log_error()
321
322     def stop_action(self):
323         return "Stopping server"
324
325     def reply_action(self, msg):
326         return "Reply to: %s" % msg
327
328     def log_error(self, text = None, context = ''):
329         if text == None:
330             text = traceback.format_exc()
331         date = time.strftime("%Y-%m-%d %H:%M:%S")
332         if context:
333             context = " (%s)" % (context,)
334         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
335         return text
336
337     def log_debug(self, text):
338         if self._log_level == DC.DEBUG_LEVEL:
339             date = time.strftime("%Y-%m-%d %H:%M:%S")
340             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341
342 class Forwarder(object):
343     def __init__(self, root_dir = "."):
344         self._ctrl_sock = None
345         self._root_dir = root_dir
346         self._stop = False
347         self._rdbuf = ""
348
349     def forward(self):
350         self.connect()
351         print >>sys.stderr, "FORWARDER_READY."
352         while not self._stop:
353             data = self.read_data()
354             if not data:
355                 # Connection to client lost
356                 break
357             self.send_to_server(data)
358             
359             data = self.recv_from_server()
360             if not data:
361                 # Connection to server lost
362                 raise IOError, "Connection to server lost while "\
363                     "expecting response"
364             self.write_data(data)
365         self.disconnect()
366
367     def read_data(self):
368         return sys.stdin.readline()
369
370     def write_data(self, data):
371         sys.stdout.write(data)
372         # sys.stdout.write is buffered, this is why we need to do a flush()
373         sys.stdout.flush()
374
375     def send_to_server(self, data):
376         try:
377             self._ctrl_sock.send(data)
378         except (IOError, socket.error), e:
379             if e[0] == errno.EPIPE:
380                 self.connect()
381                 self._ctrl_sock.send(data)
382             else:
383                 raise e
384         encoded = data.rstrip() 
385         msg = base64.b64decode(encoded)
386         if msg == STOP_MSG:
387             self._stop = True
388
389     def recv_from_server(self):
390         data = [self._rdbuf]
391         chunk = data[0]
392         while '\n' not in chunk:
393             try:
394                 chunk = self._ctrl_sock.recv(1024)
395             except (OSError, socket.error), e:
396                 if e[0] != errno.EINTR:
397                     raise
398                 continue
399             if chunk:
400                 data.append(chunk)
401             else:
402                 # empty chunk = EOF
403                 break
404         data = ''.join(data).split('\n',1)
405         while len(data) < 2:
406             data.append('')
407         data, self._rdbuf = data
408         
409         return data+'\n'
410  
411     def connect(self):
412         self.disconnect()
413         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415         self._ctrl_sock.connect(sock_addr)
416
417     def disconnect(self):
418         try:
419             self._ctrl_sock.close()
420         except:
421             pass
422
423 class Client(object):
424     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
425             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426             environment_setup = ""):
427         self.root_dir = root_dir
428         self.addr = (host, port)
429         self.user = user
430         self.agent = agent
431         self.sudo = sudo
432         self.communication = communication
433         self.environment_setup = environment_setup
434         self._stopped = False
435         self._deferreds = collections.deque()
436         self.connect()
437     
438     def __del__(self):
439         if self._process.poll() is None:
440             os.kill(self._process.pid, signal.SIGTERM)
441         self._process.wait()
442         
443     def connect(self):
444         root_dir = self.root_dir
445         (host, port) = self.addr
446         user = self.user
447         agent = self.agent
448         sudo = self.sudo
449         communication = self.communication
450         
451         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452                 c.forward()" % (root_dir,)
453
454         self._process = popen_python(python_code, 
455                     communication = communication,
456                     host = host, 
457                     port = port, 
458                     user = user, 
459                     agent = agent, 
460                     sudo = sudo, 
461                     environment_setup = self.environment_setup)
462                
463         # Wait for the forwarder to be ready, otherwise nobody
464         # will be able to connect to it
465         err = []
466         helo = "nope"
467         while helo:
468             helo = self._process.stderr.readline()
469             if helo == 'FORWARDER_READY.\n':
470                 break
471             err.append(helo)
472         else:
473             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474         
475     def send_msg(self, msg):
476         encoded = base64.b64encode(msg)
477         data = "%s\n" % encoded
478         
479         try:
480             self._process.stdin.write(data)
481         except (IOError, ValueError):
482             # dead process, poll it to un-zombify
483             self._process.poll()
484             
485             # try again after reconnect
486             # If it fails again, though, give up
487             self.connect()
488             self._process.stdin.write(data)
489
490     def send_stop(self):
491         self.send_msg(STOP_MSG)
492         self._stopped = True
493
494     def defer_reply(self, transform=None):
495         defer_entry = []
496         self._deferreds.append(defer_entry)
497         return defer.Defer(
498             functools.partial(self.read_reply, defer_entry, transform)
499         )
500         
501     def _read_reply(self):
502         data = self._process.stdout.readline()
503         encoded = data.rstrip() 
504         if not encoded:
505             # empty == eof == dead process, poll it to un-zombify
506             self._process.poll()
507             
508             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509         return base64.b64decode(encoded)
510     
511     def read_reply(self, which=None, transform=None):
512         # Test to see if someone did it already
513         if which is not None and len(which):
514             # Ok, they did it...
515             # ...just return the deferred value
516             if transform:
517                 return transform(which[0])
518             else:
519                 return which[0]
520         
521         # Process all deferreds until the one we're looking for
522         # or until the queue is empty
523         while self._deferreds:
524             try:
525                 deferred = self._deferreds.popleft()
526             except IndexError:
527                 # emptied
528                 break
529             
530             deferred.append(self._read_reply())
531             if deferred is which:
532                 # We reached the one we were looking for
533                 if transform:
534                     return transform(deferred[0])
535                 else:
536                     return deferred[0]
537         
538         if which is None:
539             # They've requested a synchronous read
540             if transform:
541                 return transform(self._read_reply())
542             else:
543                 return self._read_reply()
544
545 def _make_server_key_args(server_key, host, port, args):
546     """ 
547     Returns a reference to the created temporary file, and adds the
548     corresponding arguments to the given argument list.
549     
550     Make sure to hold onto it until the process is done with the file
551     """
552     if port is not None:
553         host = '%s:%s' % (host,port)
554     # Create a temporary server key file
555     tmp_known_hosts = tempfile.NamedTemporaryFile()
556     
557     # Add the intended host key
558     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559     
560     # If we're not in strict mode, add user-configured keys
561     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563         if os.access(user_hosts_path, os.R_OK):
564             f = open(user_hosts_path, "r")
565             tmp_known_hosts.write(f.read())
566             f.close()
567         
568     tmp_known_hosts.flush()
569     
570     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571     
572     return tmp_known_hosts
573
574 def popen_ssh_command(command, host, port, user, agent, 
575         stdin="", 
576         ident_key = None,
577         server_key = None,
578         tty = False,
579         timeout = None,
580         retry = 0,
581         err_on_timeout = True):
582     """
583     Executes a remote commands, returns ((stdout,stderr),process)
584     """
585     if TRACE:
586         print "ssh", host, command
587     
588     tmp_known_hosts = None
589     args = ['ssh',
590             # Don't bother with localhost. Makes test easier
591             '-o', 'NoHostAuthenticationForLocalhost=yes',
592             '-l', user, host]
593     if agent:
594         args.append('-A')
595     if port:
596         args.append('-p%d' % port)
597     if ident_key:
598         args.extend(('-i', ident_key))
599     if tty:
600         args.append('-t')
601     if server_key:
602         # Create a temporary server key file
603         tmp_known_hosts = _make_server_key_args(
604             server_key, host, port, args)
605     args.append(command)
606
607     for x in xrange(retry or 3):
608         # connects to the remote host and starts a remote connection
609         proc = subprocess.Popen(args, 
610                 stdout = subprocess.PIPE,
611                 stdin = subprocess.PIPE, 
612                 stderr = subprocess.PIPE)
613         
614         # attach tempfile object to the process, to make sure the file stays
615         # alive until the process is finished with it
616         proc._known_hosts = tmp_known_hosts
617         
618         try:
619             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
620             if proc.poll() and err.strip().startswith('ssh: '):
621                 # SSH error, can safely retry
622                 continue
623             break
624         except RuntimeError,e:
625             if retry <= 0:
626                 raise
627             if TRACE:
628                 print " timedout -> ", e.args
629             retry -= 1
630         
631     if TRACE:
632         print " -> ", out, err
633
634     return ((out, err), proc)
635
636 def popen_scp(source, dest, 
637         port = None, 
638         agent = None, 
639         recursive = False,
640         ident_key = None,
641         server_key = None):
642     """
643     Copies from/to remote sites.
644     
645     Source and destination should have the user and host encoded
646     as per scp specs.
647     
648     If source is a file object, a special mode will be used to
649     create the remote file with the same contents.
650     
651     If dest is a file object, the remote file (source) will be
652     read and written into dest.
653     
654     In these modes, recursive cannot be True.
655     
656     Source can be a list of files to copy to a single destination,
657     in which case it is advised that the destination be a folder.
658     """
659     
660     if TRACE:
661         print "scp", source, dest
662     
663     if isinstance(source, file) and source.tell() == 0:
664         source = source.name
665     elif hasattr(source, 'read'):
666         tmp = tempfile.NamedTemporaryFile()
667         while True:
668             buf = source.read(65536)
669             if buf:
670                 tmp.write(buf)
671             else:
672                 break
673         tmp.seek(0)
674         source = tmp.name
675     
676     if isinstance(source, file) or isinstance(dest, file) \
677             or hasattr(source, 'read')  or hasattr(dest, 'write'):
678         assert not recursive
679         
680         # Parse source/destination as <user>@<server>:<path>
681         if isinstance(dest, basestring) and ':' in dest:
682             remspec, path = dest.split(':',1)
683         elif isinstance(source, basestring) and ':' in source:
684             remspec, path = source.split(':',1)
685         else:
686             raise ValueError, "Both endpoints cannot be local"
687         user,host = remspec.rsplit('@',1)
688         tmp_known_hosts = None
689         
690         args = ['ssh', '-l', user, '-C',
691                 # Don't bother with localhost. Makes test easier
692                 '-o', 'NoHostAuthenticationForLocalhost=yes',
693                 host ]
694         if port:
695             args.append('-P%d' % port)
696         if ident_key:
697             args.extend(('-i', ident_key))
698         if server_key:
699             # Create a temporary server key file
700             tmp_known_hosts = _make_server_key_args(
701                 server_key, host, port, args)
702         
703         if isinstance(source, file) or hasattr(source, 'read'):
704             args.append('cat > %s' % (shell_escape(path),))
705         elif isinstance(dest, file) or hasattr(dest, 'write'):
706             args.append('cat %s' % (shell_escape(path),))
707         else:
708             raise AssertionError, "Unreachable code reached! :-Q"
709         
710         # connects to the remote host and starts a remote connection
711         if isinstance(source, file):
712             proc = subprocess.Popen(args, 
713                     stdout = open('/dev/null','w'),
714                     stderr = subprocess.PIPE,
715                     stdin = source)
716             err = proc.stderr.read()
717             proc._known_hosts = tmp_known_hosts
718             eintr_retry(proc.wait)()
719             return ((None,err), proc)
720         elif isinstance(dest, file):
721             proc = subprocess.Popen(args, 
722                     stdout = open('/dev/null','w'),
723                     stderr = subprocess.PIPE,
724                     stdin = source)
725             err = proc.stderr.read()
726             proc._known_hosts = tmp_known_hosts
727             eintr_retry(proc.wait)()
728             return ((None,err), proc)
729         elif hasattr(source, 'read'):
730             # file-like (but not file) source
731             proc = subprocess.Popen(args, 
732                     stdout = open('/dev/null','w'),
733                     stderr = subprocess.PIPE,
734                     stdin = subprocess.PIPE)
735             
736             buf = None
737             err = []
738             while True:
739                 if not buf:
740                     buf = source.read(4096)
741                 if not buf:
742                     #EOF
743                     break
744                 
745                 rdrdy, wrdy, broken = select.select(
746                     [proc.stderr],
747                     [proc.stdin],
748                     [proc.stderr,proc.stdin])
749                 
750                 if proc.stderr in rdrdy:
751                     # use os.read for fully unbuffered behavior
752                     err.append(os.read(proc.stderr.fileno(), 4096))
753                 
754                 if proc.stdin in wrdy:
755                     proc.stdin.write(buf)
756                     buf = None
757                 
758                 if broken:
759                     break
760             proc.stdin.close()
761             err.append(proc.stderr.read())
762                 
763             proc._known_hosts = tmp_known_hosts
764             eintr_retry(proc.wait)()
765             return ((None,''.join(err)), proc)
766         elif hasattr(dest, 'write'):
767             # file-like (but not file) dest
768             proc = subprocess.Popen(args, 
769                     stdout = subprocess.PIPE,
770                     stderr = subprocess.PIPE,
771                     stdin = open('/dev/null','w'))
772             
773             buf = None
774             err = []
775             while True:
776                 rdrdy, wrdy, broken = select.select(
777                     [proc.stderr, proc.stdout],
778                     [],
779                     [proc.stderr, proc.stdout])
780                 
781                 if proc.stderr in rdrdy:
782                     # use os.read for fully unbuffered behavior
783                     err.append(os.read(proc.stderr.fileno(), 4096))
784                 
785                 if proc.stdout in rdrdy:
786                     # use os.read for fully unbuffered behavior
787                     buf = os.read(proc.stdout.fileno(), 4096)
788                     dest.write(buf)
789                     
790                     if not buf:
791                         #EOF
792                         break
793                 
794                 if broken:
795                     break
796             err.append(proc.stderr.read())
797                 
798             proc._known_hosts = tmp_known_hosts
799             eintr_retry(proc.wait)()
800             return ((None,''.join(err)), proc)
801         else:
802             raise AssertionError, "Unreachable code reached! :-Q"
803     else:
804         # Parse destination as <user>@<server>:<path>
805         if isinstance(dest, basestring) and ':' in dest:
806             remspec, path = dest.split(':',1)
807         elif isinstance(source, basestring) and ':' in source:
808             remspec, path = source.split(':',1)
809         else:
810             raise ValueError, "Both endpoints cannot be local"
811         user,host = remspec.rsplit('@',1)
812         
813         # plain scp
814         tmp_known_hosts = None
815         args = ['scp', '-q', '-p', '-C',
816                 # Don't bother with localhost. Makes test easier
817                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
818         if port:
819             args.append('-P%d' % port)
820         if recursive:
821             args.append('-r')
822         if ident_key:
823             args.extend(('-i', ident_key))
824         if server_key:
825             # Create a temporary server key file
826             tmp_known_hosts = _make_server_key_args(
827                 server_key, host, port, args)
828         if isinstance(source,list):
829             args.extend(source)
830         else:
831             args.append(source)
832         args.append(dest)
833
834         # connects to the remote host and starts a remote connection
835         proc = subprocess.Popen(args, 
836                 stdout = subprocess.PIPE,
837                 stdin = subprocess.PIPE, 
838                 stderr = subprocess.PIPE)
839         proc._known_hosts = tmp_known_hosts
840         
841         comm = proc.communicate()
842         eintr_retry(proc.wait)()
843         return (comm, proc)
844
845 def decode_and_execute():
846     # The python code we want to execute might have characters that 
847     # are not compatible with the 'inline' mode we are using. To avoid
848     # problems we receive the encoded python code in base64 as a input 
849     # stream and decode it for execution.
850     import base64, os
851     cmd = ""
852     while True:
853         try:
854             cmd += os.read(0, 1)# one byte from stdin
855         except OSError, e:            
856             if e.errno == errno.EINTR:
857                 continue
858             else:
859                 raise
860         if cmd[-1] == "\n": 
861             break
862     cmd = base64.b64decode(cmd)
863     # Uncomment for debug
864     #os.write(2, "Executing python code: %s\n" % cmd)
865     os.write(1, "OK\n") # send a sync message
866     exec(cmd)
867
868 def popen_python(python_code, 
869         communication = DC.ACCESS_LOCAL,
870         host = None, 
871         port = None, 
872         user = None, 
873         agent = False, 
874         python_path = None,
875         ident_key = None,
876         server_key = None,
877         tty = False,
878         sudo = False, 
879         environment_setup = ""):
880
881     cmd = ""
882     if python_path:
883         python_path.replace("'", r"'\''")
884         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
885         cmd += " ; "
886     if environment_setup:
887         cmd += environment_setup
888         cmd += " ; "
889     # Uncomment for debug (to run everything under strace)
890     # We had to verify if strace works (cannot nest them)
891     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
892     #cmd += "$CMD "
893     #cmd += "strace -f -tt -s 200 -o strace$$.out "
894     import nepi
895     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
896         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
897     )
898
899     if sudo:
900         if ';' in cmd:
901             cmd = "sudo bash -c " + shell_escape(cmd)
902         else:
903             cmd = "sudo " + cmd
904
905     if communication == DC.ACCESS_SSH:
906         tmp_known_hosts = None
907         args = ['ssh',
908                 # Don't bother with localhost. Makes test easier
909                 '-o', 'NoHostAuthenticationForLocalhost=yes',
910                 '-l', user, host]
911         if agent:
912             args.append('-A')
913         if port:
914             args.append('-p%d' % port)
915         if ident_key:
916             args.extend(('-i', ident_key))
917         if tty:
918             args.append('-t')
919         if server_key:
920             # Create a temporary server key file
921             tmp_known_hosts = _make_server_key_args(
922                 server_key, host, port, args)
923         args.append(cmd)
924     else:
925         args = [ "/bin/bash", "-c", cmd ]
926
927     # connects to the remote host and starts a remote
928     proc = subprocess.Popen(args,
929             shell = False, 
930             stdout = subprocess.PIPE,
931             stdin = subprocess.PIPE, 
932             stderr = subprocess.PIPE)
933
934     if communication == DC.ACCESS_SSH:
935         proc._known_hosts = tmp_known_hosts
936
937     # send the command to execute
938     os.write(proc.stdin.fileno(),
939             base64.b64encode(python_code) + "\n")
940  
941     while True: 
942         try:
943             msg = os.read(proc.stdout.fileno(), 3)
944             break
945         except OSError, e:            
946             if e.errno == errno.EINTR:
947                 continue
948             else:
949                 raise
950     
951     if msg != "OK\n":
952         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
953             msg, proc.stdout.read(), proc.stderr.read())
954
955     return proc
956
957 # POSIX
958 def _communicate(self, input, timeout=None, err_on_timeout=True):
959     read_set = []
960     write_set = []
961     stdout = None # Return
962     stderr = None # Return
963     
964     killed = False
965     
966     if timeout is not None:
967         timelimit = time.time() + timeout
968         killtime = timelimit + 4
969         bailtime = timelimit + 4
970
971     if self.stdin:
972         # Flush stdio buffer.  This might block, if the user has
973         # been writing to .stdin in an uncontrolled fashion.
974         self.stdin.flush()
975         if input:
976             write_set.append(self.stdin)
977         else:
978             self.stdin.close()
979     if self.stdout:
980         read_set.append(self.stdout)
981         stdout = []
982     if self.stderr:
983         read_set.append(self.stderr)
984         stderr = []
985
986     input_offset = 0
987     while read_set or write_set:
988         if timeout is not None:
989             curtime = time.time()
990             if timeout is None or curtime > timelimit:
991                 if curtime > bailtime:
992                     break
993                 elif curtime > killtime:
994                     signum = signal.SIGKILL
995                 else:
996                     signum = signal.SIGTERM
997                 # Lets kill it
998                 os.kill(self.pid, signum)
999                 select_timeout = 0.5
1000             else:
1001                 select_timeout = timelimit - curtime + 0.1
1002         else:
1003             select_timeout = None
1004             
1005         try:
1006             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1007         except select.error,e:
1008             if e[0] != 4:
1009                 raise
1010             else:
1011                 continue
1012
1013         if self.stdin in wlist:
1014             # When select has indicated that the file is writable,
1015             # we can write up to PIPE_BUF bytes without risk
1016             # blocking.  POSIX defines PIPE_BUF >= 512
1017             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1018             input_offset += bytes_written
1019             if input_offset >= len(input):
1020                 self.stdin.close()
1021                 write_set.remove(self.stdin)
1022
1023         if self.stdout in rlist:
1024             data = os.read(self.stdout.fileno(), 1024)
1025             if data == "":
1026                 self.stdout.close()
1027                 read_set.remove(self.stdout)
1028             stdout.append(data)
1029
1030         if self.stderr in rlist:
1031             data = os.read(self.stderr.fileno(), 1024)
1032             if data == "":
1033                 self.stderr.close()
1034                 read_set.remove(self.stderr)
1035             stderr.append(data)
1036     
1037     # All data exchanged.  Translate lists into strings.
1038     if stdout is not None:
1039         stdout = ''.join(stdout)
1040     if stderr is not None:
1041         stderr = ''.join(stderr)
1042
1043     # Translate newlines, if requested.  We cannot let the file
1044     # object do the translation: It is based on stdio, which is
1045     # impossible to combine with select (unless forcing no
1046     # buffering).
1047     if self.universal_newlines and hasattr(file, 'newlines'):
1048         if stdout:
1049             stdout = self._translate_newlines(stdout)
1050         if stderr:
1051             stderr = self._translate_newlines(stderr)
1052
1053     if killed and err_on_timeout:
1054         errcode = self.poll()
1055         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1056     else:
1057         if killed:
1058             self.poll()
1059         else:
1060             self.wait()
1061         return (stdout, stderr)
1062