Better network failure recovery: added some retries on connection error in applicatio...
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25
26 CTRL_SOCK = "ctrl.sock"
27 CTRL_PID = "ctrl.pid"
28 STD_ERR = "stderr.log"
29 MAX_FD = 1024
30
31 STOP_MSG = "STOP"
32
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
34
35 if hasattr(os, "devnull"):
36     DEV_NULL = os.devnull
37 else:
38     DEV_NULL = "/dev/null"
39
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
41
42 def shell_escape(s):
43     """ Escapes strings so that they are safe to use as command-line arguments """
44     if SHELL_SAFE.match(s):
45         # safe string - no escaping needed
46         return s
47     else:
48         # unsafe string - escape
49         def escp(c):
50             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
51                 return c
52             else:
53                 return "'$'\\x%02x''" % (ord(c),)
54         s = ''.join(map(escp,s))
55         return "'%s'" % (s,)
56
57 def eintr_retry(func):
58     import functools
59     @functools.wraps(func)
60     def rv(*p, **kw):
61         retry = kw.pop("_retry", False)
62         for i in xrange(0 if retry else 4):
63             try:
64                 return func(*p, **kw)
65             except (select.error, socket.error), args:
66                 if args[0] == errno.EINTR:
67                     continue
68                 else:
69                     raise 
70             except OSError, e:
71                 if e.errno == errno.EINTR:
72                     continue
73                 else:
74                     raise
75         else:
76             return func(*p, **kw)
77     return rv
78
79 class Server(object):
80     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
81             environment_setup = "", clean_root = False):
82         self._root_dir = root_dir
83         self._clean_root = clean_root
84         self._stop = False
85         self._ctrl_sock = None
86         self._log_level = log_level
87         self._rdbuf = ""
88         self._environment_setup = environment_setup
89
90     def run(self):
91         try:
92             if self.daemonize():
93                 self.post_daemonize()
94                 self.loop()
95                 self.cleanup()
96                 # ref: "os._exit(0)"
97                 # can not return normally after fork beacuse no exec was done.
98                 # This means that if we don't do a os._exit(0) here the code that 
99                 # follows the call to "Server.run()" in the "caller code" will be 
100                 # executed... but by now it has already been executed after the 
101                 # first process (the one that did the first fork) returned.
102                 os._exit(0)
103         except:
104             print >>sys.stderr, "SERVER_ERROR."
105             self.log_error()
106             self.cleanup()
107             os._exit(0)
108         print >>sys.stderr, "SERVER_READY."
109
110     def daemonize(self):
111         # pipes for process synchronization
112         (r, w) = os.pipe()
113         
114         # build root folder
115         root = os.path.normpath(self._root_dir)
116         if self._root_dir not in [".", ""] and os.path.exists(root) \
117                 and self._clean_root:
118             shutil.rmtree(root)
119         if not os.path.exists(root):
120             os.makedirs(root, 0755)
121
122         pid1 = os.fork()
123         if pid1 > 0:
124             os.close(w)
125             while True:
126                 try:
127                     os.read(r, 1)
128                 except OSError, e: # pragma: no cover
129                     if e.errno == errno.EINTR:
130                         continue
131                     else:
132                         raise
133                 break
134             os.close(r)
135             # os.waitpid avoids leaving a <defunc> (zombie) process
136             st = os.waitpid(pid1, 0)[1]
137             if st:
138                 raise RuntimeError("Daemonization failed")
139             # return 0 to inform the caller method that this is not the 
140             # daemonized process
141             return 0
142         os.close(r)
143
144         # Decouple from parent environment.
145         os.chdir(self._root_dir)
146         os.umask(0)
147         os.setsid()
148
149         # fork 2
150         pid2 = os.fork()
151         if pid2 > 0:
152             # see ref: "os._exit(0)"
153             os._exit(0)
154
155         # close all open file descriptors.
156         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157         if (max_fd == resource.RLIM_INFINITY):
158             max_fd = MAX_FD
159         for fd in range(3, max_fd):
160             if fd != w:
161                 try:
162                     os.close(fd)
163                 except OSError:
164                     pass
165
166         # Redirect standard file descriptors.
167         stdin = open(DEV_NULL, "r")
168         stderr = stdout = open(STD_ERR, "a", 0)
169         os.dup2(stdin.fileno(), sys.stdin.fileno())
170         # NOTE: sys.stdout.write will still be buffered, even if the file
171         # was opened with 0 buffer
172         os.dup2(stdout.fileno(), sys.stdout.fileno())
173         os.dup2(stderr.fileno(), sys.stderr.fileno())
174         
175         # setup environment
176         if self._environment_setup:
177             # parse environment variables and pass to child process
178             # do it by executing shell commands, in case there's some heavy setup involved
179             envproc = subprocess.Popen(
180                 [ "bash", "-c", 
181                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182                         ( self._environment_setup, ) ],
183                 stdin = subprocess.PIPE, 
184                 stdout = subprocess.PIPE,
185                 stderr = subprocess.PIPE
186             )
187             out,err = envproc.communicate()
188
189             # parse new environment
190             if out:
191                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
192             
193                 # apply to current environment
194                 for name, value in environment.iteritems():
195                     os.environ[name] = value
196                 
197                 # apply pythonpath
198                 if 'PYTHONPATH' in environment:
199                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
200
201         # create control socket
202         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
203         try:
204             self._ctrl_sock.bind(CTRL_SOCK)
205         except socket.error:
206             # Address in use, check pidfile
207             pid = None
208             try:
209                 pidfile = open(CTRL_PID, "r")
210                 pid = pidfile.read()
211                 pidfile.close()
212                 pid = int(pid)
213             except:
214                 # no pidfile
215                 pass
216             
217             if pid is not None:
218                 # Check process liveliness
219                 if not os.path.exists("/proc/%d" % (pid,)):
220                     # Ok, it's dead, clean the socket
221                     os.remove(CTRL_SOCK)
222             
223             # try again
224             self._ctrl_sock.bind(CTRL_SOCK)
225             
226         self._ctrl_sock.listen(0)
227         
228         # Save pidfile
229         pidfile = open(CTRL_PID, "w")
230         pidfile.write(str(os.getpid()))
231         pidfile.close()
232
233         # let the parent process know that the daemonization is finished
234         os.write(w, "\n")
235         os.close(w)
236         return 1
237
238     def post_daemonize(self):
239         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240         # QT, for some strange reason, redefines the SIGCHILD handler to write
241         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242         # Server dameonization closes all file descriptors from fileno '3',
243         # but the overloaded handler (inherited by the forked process) will
244         # keep trying to write the \0 to fileno 'x', which might have been reused 
245         # after closing, for other operations. This is bad bad bad when fileno 'x'
246         # is in use for communication pouroses, because unexpected \0 start
247         # appearing in the communication messages... this is exactly what happens 
248         # when using netns in daemonized form. Thus, be have no other alternative than
249         # restoring the SIGCHLD handler to the default here.
250         import signal
251         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
252
253     def loop(self):
254         while not self._stop:
255             conn, addr = self._ctrl_sock.accept()
256             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
257             conn.settimeout(5)
258             while not self._stop:
259                 try:
260                     msg = self.recv_msg(conn)
261                 except socket.timeout, e:
262                     #self.log_error("SERVER recv_msg: connection timedout ")
263                     continue
264                 
265                 if not msg:
266                     self.log_error("CONNECTION LOST")
267                     break
268                     
269                 if msg == STOP_MSG:
270                     self._stop = True
271                     reply = self.stop_action()
272                 else:
273                     reply = self.reply_action(msg)
274                 
275                 try:
276                     self.send_reply(conn, reply)
277                 except socket.error:
278                     self.log_error()
279                     self.log_error("NOTICE: Awaiting for reconnection")
280                     break
281             try:
282                 conn.close()
283             except:
284                 # Doesn't matter
285                 self.log_error()
286
287     def recv_msg(self, conn):
288         data = [self._rdbuf]
289         chunk = data[0]
290         while '\n' not in chunk:
291             try:
292                 chunk = conn.recv(1024)
293             except (OSError, socket.error), e:
294                 if e[0] != errno.EINTR:
295                     raise
296                 else:
297                     continue
298             if chunk:
299                 data.append(chunk)
300             else:
301                 # empty chunk = EOF
302                 break
303         data = ''.join(data).split('\n',1)
304         while len(data) < 2:
305             data.append('')
306         data, self._rdbuf = data
307         
308         decoded = base64.b64decode(data)
309         return decoded.rstrip()
310
311     def send_reply(self, conn, reply):
312         encoded = base64.b64encode(reply)
313         conn.send("%s\n" % encoded)
314        
315     def cleanup(self):
316         try:
317             self._ctrl_sock.close()
318             os.remove(CTRL_SOCK)
319         except:
320             self.log_error()
321
322     def stop_action(self):
323         return "Stopping server"
324
325     def reply_action(self, msg):
326         return "Reply to: %s" % msg
327
328     def log_error(self, text = None, context = ''):
329         if text == None:
330             text = traceback.format_exc()
331         date = time.strftime("%Y-%m-%d %H:%M:%S")
332         if context:
333             context = " (%s)" % (context,)
334         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
335         return text
336
337     def log_debug(self, text):
338         if self._log_level == DC.DEBUG_LEVEL:
339             date = time.strftime("%Y-%m-%d %H:%M:%S")
340             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
341
342 class Forwarder(object):
343     def __init__(self, root_dir = "."):
344         self._ctrl_sock = None
345         self._root_dir = root_dir
346         self._stop = False
347         self._rdbuf = ""
348
349     def forward(self):
350         self.connect()
351         print >>sys.stderr, "FORWARDER_READY."
352         while not self._stop:
353             data = self.read_data()
354             if not data:
355                 # Connection to client lost
356                 break
357             self.send_to_server(data)
358             
359             data = self.recv_from_server()
360             if not data:
361                 # Connection to server lost
362                 raise IOError, "Connection to server lost while "\
363                     "expecting response"
364             self.write_data(data)
365         self.disconnect()
366
367     def read_data(self):
368         return sys.stdin.readline()
369
370     def write_data(self, data):
371         sys.stdout.write(data)
372         # sys.stdout.write is buffered, this is why we need to do a flush()
373         sys.stdout.flush()
374
375     def send_to_server(self, data):
376         try:
377             self._ctrl_sock.send(data)
378         except (IOError, socket.error), e:
379             if e[0] == errno.EPIPE:
380                 self.connect()
381                 self._ctrl_sock.send(data)
382             else:
383                 raise e
384         encoded = data.rstrip() 
385         msg = base64.b64decode(encoded)
386         if msg == STOP_MSG:
387             self._stop = True
388
389     def recv_from_server(self):
390         data = [self._rdbuf]
391         chunk = data[0]
392         while '\n' not in chunk:
393             try:
394                 chunk = self._ctrl_sock.recv(1024)
395             except (OSError, socket.error), e:
396                 if e[0] != errno.EINTR:
397                     raise
398                 continue
399             if chunk:
400                 data.append(chunk)
401             else:
402                 # empty chunk = EOF
403                 break
404         data = ''.join(data).split('\n',1)
405         while len(data) < 2:
406             data.append('')
407         data, self._rdbuf = data
408         
409         return data+'\n'
410  
411     def connect(self):
412         self.disconnect()
413         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415         self._ctrl_sock.connect(sock_addr)
416
417     def disconnect(self):
418         try:
419             self._ctrl_sock.close()
420         except:
421             pass
422
423 class Client(object):
424     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
425             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426             environment_setup = ""):
427         self.root_dir = root_dir
428         self.addr = (host, port)
429         self.user = user
430         self.agent = agent
431         self.sudo = sudo
432         self.communication = communication
433         self.environment_setup = environment_setup
434         self._stopped = False
435         self._deferreds = collections.deque()
436         self.connect()
437     
438     def __del__(self):
439         if self._process.poll() is None:
440             os.kill(self._process.pid, signal.SIGTERM)
441         self._process.wait()
442         
443     def connect(self):
444         root_dir = self.root_dir
445         (host, port) = self.addr
446         user = self.user
447         agent = self.agent
448         sudo = self.sudo
449         communication = self.communication
450         
451         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452                 c.forward()" % (root_dir,)
453
454         self._process = popen_python(python_code, 
455                     communication = communication,
456                     host = host, 
457                     port = port, 
458                     user = user, 
459                     agent = agent, 
460                     sudo = sudo, 
461                     environment_setup = self.environment_setup)
462                
463         # Wait for the forwarder to be ready, otherwise nobody
464         # will be able to connect to it
465         err = []
466         helo = "nope"
467         while helo:
468             helo = self._process.stderr.readline()
469             if helo == 'FORWARDER_READY.\n':
470                 break
471             err.append(helo)
472         else:
473             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
474         
475     def send_msg(self, msg):
476         encoded = base64.b64encode(msg)
477         data = "%s\n" % encoded
478         
479         try:
480             self._process.stdin.write(data)
481         except (IOError, ValueError):
482             # dead process, poll it to un-zombify
483             self._process.poll()
484             
485             # try again after reconnect
486             # If it fails again, though, give up
487             self.connect()
488             self._process.stdin.write(data)
489
490     def send_stop(self):
491         self.send_msg(STOP_MSG)
492         self._stopped = True
493
494     def defer_reply(self, transform=None):
495         defer_entry = []
496         self._deferreds.append(defer_entry)
497         return defer.Defer(
498             functools.partial(self.read_reply, defer_entry, transform)
499         )
500         
501     def _read_reply(self):
502         data = self._process.stdout.readline()
503         encoded = data.rstrip() 
504         if not encoded:
505             # empty == eof == dead process, poll it to un-zombify
506             self._process.poll()
507             
508             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509         return base64.b64decode(encoded)
510     
511     def read_reply(self, which=None, transform=None):
512         # Test to see if someone did it already
513         if which is not None and len(which):
514             # Ok, they did it...
515             # ...just return the deferred value
516             if transform:
517                 return transform(which[0])
518             else:
519                 return which[0]
520         
521         # Process all deferreds until the one we're looking for
522         # or until the queue is empty
523         while self._deferreds:
524             try:
525                 deferred = self._deferreds.popleft()
526             except IndexError:
527                 # emptied
528                 break
529             
530             deferred.append(self._read_reply())
531             if deferred is which:
532                 # We reached the one we were looking for
533                 if transform:
534                     return transform(deferred[0])
535                 else:
536                     return deferred[0]
537         
538         if which is None:
539             # They've requested a synchronous read
540             if transform:
541                 return transform(self._read_reply())
542             else:
543                 return self._read_reply()
544
545 def _make_server_key_args(server_key, host, port, args):
546     """ 
547     Returns a reference to the created temporary file, and adds the
548     corresponding arguments to the given argument list.
549     
550     Make sure to hold onto it until the process is done with the file
551     """
552     if port is not None:
553         host = '%s:%s' % (host,port)
554     # Create a temporary server key file
555     tmp_known_hosts = tempfile.NamedTemporaryFile()
556     
557     # Add the intended host key
558     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
559     
560     # If we're not in strict mode, add user-configured keys
561     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563         if os.access(user_hosts_path, os.R_OK):
564             f = open(user_hosts_path, "r")
565             tmp_known_hosts.write(f.read())
566             f.close()
567         
568     tmp_known_hosts.flush()
569     
570     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
571     
572     return tmp_known_hosts
573
574 def popen_ssh_command(command, host, port, user, agent, 
575         stdin="", 
576         ident_key = None,
577         server_key = None,
578         tty = False,
579         timeout = None,
580         retry = 0,
581         err_on_timeout = True,
582         connect_timeout = 30):
583     """
584     Executes a remote commands, returns ((stdout,stderr),process)
585     """
586     if TRACE:
587         print "ssh", host, command
588     
589     tmp_known_hosts = None
590     args = ['ssh',
591             # Don't bother with localhost. Makes test easier
592             '-o', 'NoHostAuthenticationForLocalhost=yes,ConnectTimeout=%s' % (connect_timeout,),
593             '-l', user, host]
594     if agent:
595         args.append('-A')
596     if port:
597         args.append('-p%d' % port)
598     if ident_key:
599         args.extend(('-i', ident_key))
600     if tty:
601         args.append('-t')
602     if server_key:
603         # Create a temporary server key file
604         tmp_known_hosts = _make_server_key_args(
605             server_key, host, port, args)
606     args.append(command)
607
608     for x in xrange(retry or 3):
609         # connects to the remote host and starts a remote connection
610         proc = subprocess.Popen(args, 
611                 stdout = subprocess.PIPE,
612                 stdin = subprocess.PIPE, 
613                 stderr = subprocess.PIPE)
614         
615         # attach tempfile object to the process, to make sure the file stays
616         # alive until the process is finished with it
617         proc._known_hosts = tmp_known_hosts
618         
619         try:
620             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
621             if proc.poll() and err.strip().startswith('ssh: '):
622                 # SSH error, can safely retry
623                 continue
624             break
625         except RuntimeError,e:
626             if retry <= 0:
627                 raise
628             if TRACE:
629                 print " timedout -> ", e.args
630             retry -= 1
631         
632     if TRACE:
633         print " -> ", out, err
634
635     return ((out, err), proc)
636
637 def popen_scp(source, dest, 
638         port = None, 
639         agent = None, 
640         recursive = False,
641         ident_key = None,
642         server_key = None):
643     """
644     Copies from/to remote sites.
645     
646     Source and destination should have the user and host encoded
647     as per scp specs.
648     
649     If source is a file object, a special mode will be used to
650     create the remote file with the same contents.
651     
652     If dest is a file object, the remote file (source) will be
653     read and written into dest.
654     
655     In these modes, recursive cannot be True.
656     
657     Source can be a list of files to copy to a single destination,
658     in which case it is advised that the destination be a folder.
659     """
660     
661     if TRACE:
662         print "scp", source, dest
663     
664     if isinstance(source, file) and source.tell() == 0:
665         source = source.name
666     elif hasattr(source, 'read'):
667         tmp = tempfile.NamedTemporaryFile()
668         while True:
669             buf = source.read(65536)
670             if buf:
671                 tmp.write(buf)
672             else:
673                 break
674         tmp.seek(0)
675         source = tmp.name
676     
677     if isinstance(source, file) or isinstance(dest, file) \
678             or hasattr(source, 'read')  or hasattr(dest, 'write'):
679         assert not recursive
680         
681         # Parse source/destination as <user>@<server>:<path>
682         if isinstance(dest, basestring) and ':' in dest:
683             remspec, path = dest.split(':',1)
684         elif isinstance(source, basestring) and ':' in source:
685             remspec, path = source.split(':',1)
686         else:
687             raise ValueError, "Both endpoints cannot be local"
688         user,host = remspec.rsplit('@',1)
689         tmp_known_hosts = None
690         
691         args = ['ssh', '-l', user, '-C',
692                 # Don't bother with localhost. Makes test easier
693                 '-o', 'NoHostAuthenticationForLocalhost=yes',
694                 host ]
695         if port:
696             args.append('-P%d' % port)
697         if ident_key:
698             args.extend(('-i', ident_key))
699         if server_key:
700             # Create a temporary server key file
701             tmp_known_hosts = _make_server_key_args(
702                 server_key, host, port, args)
703         
704         if isinstance(source, file) or hasattr(source, 'read'):
705             args.append('cat > %s' % (shell_escape(path),))
706         elif isinstance(dest, file) or hasattr(dest, 'write'):
707             args.append('cat %s' % (shell_escape(path),))
708         else:
709             raise AssertionError, "Unreachable code reached! :-Q"
710         
711         # connects to the remote host and starts a remote connection
712         if isinstance(source, file):
713             proc = subprocess.Popen(args, 
714                     stdout = open('/dev/null','w'),
715                     stderr = subprocess.PIPE,
716                     stdin = source)
717             err = proc.stderr.read()
718             proc._known_hosts = tmp_known_hosts
719             eintr_retry(proc.wait)()
720             return ((None,err), proc)
721         elif isinstance(dest, file):
722             proc = subprocess.Popen(args, 
723                     stdout = open('/dev/null','w'),
724                     stderr = subprocess.PIPE,
725                     stdin = source)
726             err = proc.stderr.read()
727             proc._known_hosts = tmp_known_hosts
728             eintr_retry(proc.wait)()
729             return ((None,err), proc)
730         elif hasattr(source, 'read'):
731             # file-like (but not file) source
732             proc = subprocess.Popen(args, 
733                     stdout = open('/dev/null','w'),
734                     stderr = subprocess.PIPE,
735                     stdin = subprocess.PIPE)
736             
737             buf = None
738             err = []
739             while True:
740                 if not buf:
741                     buf = source.read(4096)
742                 if not buf:
743                     #EOF
744                     break
745                 
746                 rdrdy, wrdy, broken = select.select(
747                     [proc.stderr],
748                     [proc.stdin],
749                     [proc.stderr,proc.stdin])
750                 
751                 if proc.stderr in rdrdy:
752                     # use os.read for fully unbuffered behavior
753                     err.append(os.read(proc.stderr.fileno(), 4096))
754                 
755                 if proc.stdin in wrdy:
756                     proc.stdin.write(buf)
757                     buf = None
758                 
759                 if broken:
760                     break
761             proc.stdin.close()
762             err.append(proc.stderr.read())
763                 
764             proc._known_hosts = tmp_known_hosts
765             eintr_retry(proc.wait)()
766             return ((None,''.join(err)), proc)
767         elif hasattr(dest, 'write'):
768             # file-like (but not file) dest
769             proc = subprocess.Popen(args, 
770                     stdout = subprocess.PIPE,
771                     stderr = subprocess.PIPE,
772                     stdin = open('/dev/null','w'))
773             
774             buf = None
775             err = []
776             while True:
777                 rdrdy, wrdy, broken = select.select(
778                     [proc.stderr, proc.stdout],
779                     [],
780                     [proc.stderr, proc.stdout])
781                 
782                 if proc.stderr in rdrdy:
783                     # use os.read for fully unbuffered behavior
784                     err.append(os.read(proc.stderr.fileno(), 4096))
785                 
786                 if proc.stdout in rdrdy:
787                     # use os.read for fully unbuffered behavior
788                     buf = os.read(proc.stdout.fileno(), 4096)
789                     dest.write(buf)
790                     
791                     if not buf:
792                         #EOF
793                         break
794                 
795                 if broken:
796                     break
797             err.append(proc.stderr.read())
798                 
799             proc._known_hosts = tmp_known_hosts
800             eintr_retry(proc.wait)()
801             return ((None,''.join(err)), proc)
802         else:
803             raise AssertionError, "Unreachable code reached! :-Q"
804     else:
805         # Parse destination as <user>@<server>:<path>
806         if isinstance(dest, basestring) and ':' in dest:
807             remspec, path = dest.split(':',1)
808         elif isinstance(source, basestring) and ':' in source:
809             remspec, path = source.split(':',1)
810         else:
811             raise ValueError, "Both endpoints cannot be local"
812         user,host = remspec.rsplit('@',1)
813         
814         # plain scp
815         tmp_known_hosts = None
816         args = ['scp', '-q', '-p', '-C',
817                 # Don't bother with localhost. Makes test easier
818                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
819         if port:
820             args.append('-P%d' % port)
821         if recursive:
822             args.append('-r')
823         if ident_key:
824             args.extend(('-i', ident_key))
825         if server_key:
826             # Create a temporary server key file
827             tmp_known_hosts = _make_server_key_args(
828                 server_key, host, port, args)
829         if isinstance(source,list):
830             args.extend(source)
831         else:
832             args.append(source)
833         args.append(dest)
834
835         # connects to the remote host and starts a remote connection
836         proc = subprocess.Popen(args, 
837                 stdout = subprocess.PIPE,
838                 stdin = subprocess.PIPE, 
839                 stderr = subprocess.PIPE)
840         proc._known_hosts = tmp_known_hosts
841         
842         comm = proc.communicate()
843         eintr_retry(proc.wait)()
844         return (comm, proc)
845
846 def decode_and_execute():
847     # The python code we want to execute might have characters that 
848     # are not compatible with the 'inline' mode we are using. To avoid
849     # problems we receive the encoded python code in base64 as a input 
850     # stream and decode it for execution.
851     import base64, os
852     cmd = ""
853     while True:
854         try:
855             cmd += os.read(0, 1)# one byte from stdin
856         except OSError, e:            
857             if e.errno == errno.EINTR:
858                 continue
859             else:
860                 raise
861         if cmd[-1] == "\n": 
862             break
863     cmd = base64.b64decode(cmd)
864     # Uncomment for debug
865     #os.write(2, "Executing python code: %s\n" % cmd)
866     os.write(1, "OK\n") # send a sync message
867     exec(cmd)
868
869 def popen_python(python_code, 
870         communication = DC.ACCESS_LOCAL,
871         host = None, 
872         port = None, 
873         user = None, 
874         agent = False, 
875         python_path = None,
876         ident_key = None,
877         server_key = None,
878         tty = False,
879         sudo = False, 
880         environment_setup = ""):
881
882     cmd = ""
883     if python_path:
884         python_path.replace("'", r"'\''")
885         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
886         cmd += " ; "
887     if environment_setup:
888         cmd += environment_setup
889         cmd += " ; "
890     # Uncomment for debug (to run everything under strace)
891     # We had to verify if strace works (cannot nest them)
892     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
893     #cmd += "$CMD "
894     #cmd += "strace -f -tt -s 200 -o strace$$.out "
895     import nepi
896     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
897         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
898     )
899
900     if sudo:
901         if ';' in cmd:
902             cmd = "sudo bash -c " + shell_escape(cmd)
903         else:
904             cmd = "sudo " + cmd
905
906     if communication == DC.ACCESS_SSH:
907         tmp_known_hosts = None
908         args = ['ssh',
909                 # Don't bother with localhost. Makes test easier
910                 '-o', 'NoHostAuthenticationForLocalhost=yes',
911                 '-l', user, host]
912         if agent:
913             args.append('-A')
914         if port:
915             args.append('-p%d' % port)
916         if ident_key:
917             args.extend(('-i', ident_key))
918         if tty:
919             args.append('-t')
920         if server_key:
921             # Create a temporary server key file
922             tmp_known_hosts = _make_server_key_args(
923                 server_key, host, port, args)
924         args.append(cmd)
925     else:
926         args = [ "/bin/bash", "-c", cmd ]
927
928     # connects to the remote host and starts a remote
929     proc = subprocess.Popen(args,
930             shell = False, 
931             stdout = subprocess.PIPE,
932             stdin = subprocess.PIPE, 
933             stderr = subprocess.PIPE)
934
935     if communication == DC.ACCESS_SSH:
936         proc._known_hosts = tmp_known_hosts
937
938     # send the command to execute
939     os.write(proc.stdin.fileno(),
940             base64.b64encode(python_code) + "\n")
941  
942     while True: 
943         try:
944             msg = os.read(proc.stdout.fileno(), 3)
945             break
946         except OSError, e:            
947             if e.errno == errno.EINTR:
948                 continue
949             else:
950                 raise
951     
952     if msg != "OK\n":
953         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
954             msg, proc.stdout.read(), proc.stderr.read())
955
956     return proc
957
958 # POSIX
959 def _communicate(self, input, timeout=None, err_on_timeout=True):
960     read_set = []
961     write_set = []
962     stdout = None # Return
963     stderr = None # Return
964     
965     killed = False
966     
967     if timeout is not None:
968         timelimit = time.time() + timeout
969         killtime = timelimit + 4
970         bailtime = timelimit + 4
971
972     if self.stdin:
973         # Flush stdio buffer.  This might block, if the user has
974         # been writing to .stdin in an uncontrolled fashion.
975         self.stdin.flush()
976         if input:
977             write_set.append(self.stdin)
978         else:
979             self.stdin.close()
980     if self.stdout:
981         read_set.append(self.stdout)
982         stdout = []
983     if self.stderr:
984         read_set.append(self.stderr)
985         stderr = []
986
987     input_offset = 0
988     while read_set or write_set:
989         if timeout is not None:
990             curtime = time.time()
991             if timeout is None or curtime > timelimit:
992                 if curtime > bailtime:
993                     break
994                 elif curtime > killtime:
995                     signum = signal.SIGKILL
996                 else:
997                     signum = signal.SIGTERM
998                 # Lets kill it
999                 os.kill(self.pid, signum)
1000                 select_timeout = 0.5
1001             else:
1002                 select_timeout = timelimit - curtime + 0.1
1003         else:
1004             select_timeout = None
1005             
1006         try:
1007             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1008         except select.error,e:
1009             if e[0] != 4:
1010                 raise
1011             else:
1012                 continue
1013
1014         if self.stdin in wlist:
1015             # When select has indicated that the file is writable,
1016             # we can write up to PIPE_BUF bytes without risk
1017             # blocking.  POSIX defines PIPE_BUF >= 512
1018             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1019             input_offset += bytes_written
1020             if input_offset >= len(input):
1021                 self.stdin.close()
1022                 write_set.remove(self.stdin)
1023
1024         if self.stdout in rlist:
1025             data = os.read(self.stdout.fileno(), 1024)
1026             if data == "":
1027                 self.stdout.close()
1028                 read_set.remove(self.stdout)
1029             stdout.append(data)
1030
1031         if self.stderr in rlist:
1032             data = os.read(self.stderr.fileno(), 1024)
1033             if data == "":
1034                 self.stderr.close()
1035                 read_set.remove(self.stderr)
1036             stderr.append(data)
1037     
1038     # All data exchanged.  Translate lists into strings.
1039     if stdout is not None:
1040         stdout = ''.join(stdout)
1041     if stderr is not None:
1042         stderr = ''.join(stderr)
1043
1044     # Translate newlines, if requested.  We cannot let the file
1045     # object do the translation: It is based on stdio, which is
1046     # impossible to combine with select (unless forcing no
1047     # buffering).
1048     if self.universal_newlines and hasattr(file, 'newlines'):
1049         if stdout:
1050             stdout = self._translate_newlines(stdout)
1051         if stderr:
1052             stderr = self._translate_newlines(stderr)
1053
1054     if killed and err_on_timeout:
1055         errcode = self.poll()
1056         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1057     else:
1058         if killed:
1059             self.poll()
1060         else:
1061             self.wait()
1062         return (stdout, stderr)
1063