Use persistent connectiosn only when supported
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25 import hashlib
26
27 CTRL_SOCK = "ctrl.sock"
28 CTRL_PID = "ctrl.pid"
29 STD_ERR = "stderr.log"
30 MAX_FD = 1024
31
32 STOP_MSG = "STOP"
33
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 OPENSSH_HAS_PERSIST = None
37
38 if hasattr(os, "devnull"):
39     DEV_NULL = os.devnull
40 else:
41     DEV_NULL = "/dev/null"
42
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44
45 def openssh_has_persist():
46     global OPENSSH_HAS_PERSIST
47     if OPENSSH_HAS_PERSIST is None:
48         proc = subprocess.Popen(["ssh","-v"],
49             stdout = subprocess.PIPE,
50             stderr = subprocess.STDOUT,
51             stdin = open("/dev/null","r") )
52         out,err = proc.communicate()
53         proc.wait()
54         
55         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56         OPENSSH_HAS_PERSIST = bool(vre.match(out))
57     return OPENSSH_HAS_PERSIST
58
59 def shell_escape(s):
60     """ Escapes strings so that they are safe to use as command-line arguments """
61     if SHELL_SAFE.match(s):
62         # safe string - no escaping needed
63         return s
64     else:
65         # unsafe string - escape
66         def escp(c):
67             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
68                 return c
69             else:
70                 return "'$'\\x%02x''" % (ord(c),)
71         s = ''.join(map(escp,s))
72         return "'%s'" % (s,)
73
74 def eintr_retry(func):
75     import functools
76     @functools.wraps(func)
77     def rv(*p, **kw):
78         retry = kw.pop("_retry", False)
79         for i in xrange(0 if retry else 4):
80             try:
81                 return func(*p, **kw)
82             except (select.error, socket.error), args:
83                 if args[0] == errno.EINTR:
84                     continue
85                 else:
86                     raise 
87             except OSError, e:
88                 if e.errno == errno.EINTR:
89                     continue
90                 else:
91                     raise
92         else:
93             return func(*p, **kw)
94     return rv
95
96 class Server(object):
97     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
98             environment_setup = "", clean_root = False):
99         self._root_dir = root_dir
100         self._clean_root = clean_root
101         self._stop = False
102         self._ctrl_sock = None
103         self._log_level = log_level
104         self._rdbuf = ""
105         self._environment_setup = environment_setup
106
107     def run(self):
108         try:
109             if self.daemonize():
110                 self.post_daemonize()
111                 self.loop()
112                 self.cleanup()
113                 # ref: "os._exit(0)"
114                 # can not return normally after fork beacuse no exec was done.
115                 # This means that if we don't do a os._exit(0) here the code that 
116                 # follows the call to "Server.run()" in the "caller code" will be 
117                 # executed... but by now it has already been executed after the 
118                 # first process (the one that did the first fork) returned.
119                 os._exit(0)
120         except:
121             print >>sys.stderr, "SERVER_ERROR."
122             self.log_error()
123             self.cleanup()
124             os._exit(0)
125         print >>sys.stderr, "SERVER_READY."
126
127     def daemonize(self):
128         # pipes for process synchronization
129         (r, w) = os.pipe()
130         
131         # build root folder
132         root = os.path.normpath(self._root_dir)
133         if self._root_dir not in [".", ""] and os.path.exists(root) \
134                 and self._clean_root:
135             shutil.rmtree(root)
136         if not os.path.exists(root):
137             os.makedirs(root, 0755)
138
139         pid1 = os.fork()
140         if pid1 > 0:
141             os.close(w)
142             while True:
143                 try:
144                     os.read(r, 1)
145                 except OSError, e: # pragma: no cover
146                     if e.errno == errno.EINTR:
147                         continue
148                     else:
149                         raise
150                 break
151             os.close(r)
152             # os.waitpid avoids leaving a <defunc> (zombie) process
153             st = os.waitpid(pid1, 0)[1]
154             if st:
155                 raise RuntimeError("Daemonization failed")
156             # return 0 to inform the caller method that this is not the 
157             # daemonized process
158             return 0
159         os.close(r)
160
161         # Decouple from parent environment.
162         os.chdir(self._root_dir)
163         os.umask(0)
164         os.setsid()
165
166         # fork 2
167         pid2 = os.fork()
168         if pid2 > 0:
169             # see ref: "os._exit(0)"
170             os._exit(0)
171
172         # close all open file descriptors.
173         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174         if (max_fd == resource.RLIM_INFINITY):
175             max_fd = MAX_FD
176         for fd in range(3, max_fd):
177             if fd != w:
178                 try:
179                     os.close(fd)
180                 except OSError:
181                     pass
182
183         # Redirect standard file descriptors.
184         stdin = open(DEV_NULL, "r")
185         stderr = stdout = open(STD_ERR, "a", 0)
186         os.dup2(stdin.fileno(), sys.stdin.fileno())
187         # NOTE: sys.stdout.write will still be buffered, even if the file
188         # was opened with 0 buffer
189         os.dup2(stdout.fileno(), sys.stdout.fileno())
190         os.dup2(stderr.fileno(), sys.stderr.fileno())
191         
192         # setup environment
193         if self._environment_setup:
194             # parse environment variables and pass to child process
195             # do it by executing shell commands, in case there's some heavy setup involved
196             envproc = subprocess.Popen(
197                 [ "bash", "-c", 
198                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199                         ( self._environment_setup, ) ],
200                 stdin = subprocess.PIPE, 
201                 stdout = subprocess.PIPE,
202                 stderr = subprocess.PIPE
203             )
204             out,err = envproc.communicate()
205
206             # parse new environment
207             if out:
208                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
209             
210                 # apply to current environment
211                 for name, value in environment.iteritems():
212                     os.environ[name] = value
213                 
214                 # apply pythonpath
215                 if 'PYTHONPATH' in environment:
216                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
217
218         # create control socket
219         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220         try:
221             self._ctrl_sock.bind(CTRL_SOCK)
222         except socket.error:
223             # Address in use, check pidfile
224             pid = None
225             try:
226                 pidfile = open(CTRL_PID, "r")
227                 pid = pidfile.read()
228                 pidfile.close()
229                 pid = int(pid)
230             except:
231                 # no pidfile
232                 pass
233             
234             if pid is not None:
235                 # Check process liveliness
236                 if not os.path.exists("/proc/%d" % (pid,)):
237                     # Ok, it's dead, clean the socket
238                     os.remove(CTRL_SOCK)
239             
240             # try again
241             self._ctrl_sock.bind(CTRL_SOCK)
242             
243         self._ctrl_sock.listen(0)
244         
245         # Save pidfile
246         pidfile = open(CTRL_PID, "w")
247         pidfile.write(str(os.getpid()))
248         pidfile.close()
249
250         # let the parent process know that the daemonization is finished
251         os.write(w, "\n")
252         os.close(w)
253         return 1
254
255     def post_daemonize(self):
256         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257         # QT, for some strange reason, redefines the SIGCHILD handler to write
258         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259         # Server dameonization closes all file descriptors from fileno '3',
260         # but the overloaded handler (inherited by the forked process) will
261         # keep trying to write the \0 to fileno 'x', which might have been reused 
262         # after closing, for other operations. This is bad bad bad when fileno 'x'
263         # is in use for communication pouroses, because unexpected \0 start
264         # appearing in the communication messages... this is exactly what happens 
265         # when using netns in daemonized form. Thus, be have no other alternative than
266         # restoring the SIGCHLD handler to the default here.
267         import signal
268         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
269
270     def loop(self):
271         while not self._stop:
272             conn, addr = self._ctrl_sock.accept()
273             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
274             conn.settimeout(5)
275             while not self._stop:
276                 try:
277                     msg = self.recv_msg(conn)
278                 except socket.timeout, e:
279                     #self.log_error("SERVER recv_msg: connection timedout ")
280                     continue
281                 
282                 if not msg:
283                     self.log_error("CONNECTION LOST")
284                     break
285                     
286                 if msg == STOP_MSG:
287                     self._stop = True
288                     reply = self.stop_action()
289                 else:
290                     reply = self.reply_action(msg)
291                 
292                 try:
293                     self.send_reply(conn, reply)
294                 except socket.error:
295                     self.log_error()
296                     self.log_error("NOTICE: Awaiting for reconnection")
297                     break
298             try:
299                 conn.close()
300             except:
301                 # Doesn't matter
302                 self.log_error()
303
304     def recv_msg(self, conn):
305         data = [self._rdbuf]
306         chunk = data[0]
307         while '\n' not in chunk:
308             try:
309                 chunk = conn.recv(1024)
310             except (OSError, socket.error), e:
311                 if e[0] != errno.EINTR:
312                     raise
313                 else:
314                     continue
315             if chunk:
316                 data.append(chunk)
317             else:
318                 # empty chunk = EOF
319                 break
320         data = ''.join(data).split('\n',1)
321         while len(data) < 2:
322             data.append('')
323         data, self._rdbuf = data
324         
325         decoded = base64.b64decode(data)
326         return decoded.rstrip()
327
328     def send_reply(self, conn, reply):
329         encoded = base64.b64encode(reply)
330         conn.send("%s\n" % encoded)
331        
332     def cleanup(self):
333         try:
334             self._ctrl_sock.close()
335             os.remove(CTRL_SOCK)
336         except:
337             self.log_error()
338
339     def stop_action(self):
340         return "Stopping server"
341
342     def reply_action(self, msg):
343         return "Reply to: %s" % msg
344
345     def log_error(self, text = None, context = ''):
346         if text == None:
347             text = traceback.format_exc()
348         date = time.strftime("%Y-%m-%d %H:%M:%S")
349         if context:
350             context = " (%s)" % (context,)
351         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
352         return text
353
354     def log_debug(self, text):
355         if self._log_level == DC.DEBUG_LEVEL:
356             date = time.strftime("%Y-%m-%d %H:%M:%S")
357             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
358
359 class Forwarder(object):
360     def __init__(self, root_dir = "."):
361         self._ctrl_sock = None
362         self._root_dir = root_dir
363         self._stop = False
364         self._rdbuf = ""
365
366     def forward(self):
367         self.connect()
368         print >>sys.stderr, "FORWARDER_READY."
369         while not self._stop:
370             data = self.read_data()
371             if not data:
372                 # Connection to client lost
373                 break
374             self.send_to_server(data)
375             
376             data = self.recv_from_server()
377             if not data:
378                 # Connection to server lost
379                 raise IOError, "Connection to server lost while "\
380                     "expecting response"
381             self.write_data(data)
382         self.disconnect()
383
384     def read_data(self):
385         return sys.stdin.readline()
386
387     def write_data(self, data):
388         sys.stdout.write(data)
389         # sys.stdout.write is buffered, this is why we need to do a flush()
390         sys.stdout.flush()
391
392     def send_to_server(self, data):
393         try:
394             self._ctrl_sock.send(data)
395         except (IOError, socket.error), e:
396             if e[0] == errno.EPIPE:
397                 self.connect()
398                 self._ctrl_sock.send(data)
399             else:
400                 raise e
401         encoded = data.rstrip() 
402         msg = base64.b64decode(encoded)
403         if msg == STOP_MSG:
404             self._stop = True
405
406     def recv_from_server(self):
407         data = [self._rdbuf]
408         chunk = data[0]
409         while '\n' not in chunk:
410             try:
411                 chunk = self._ctrl_sock.recv(1024)
412             except (OSError, socket.error), e:
413                 if e[0] != errno.EINTR:
414                     raise
415                 continue
416             if chunk:
417                 data.append(chunk)
418             else:
419                 # empty chunk = EOF
420                 break
421         data = ''.join(data).split('\n',1)
422         while len(data) < 2:
423             data.append('')
424         data, self._rdbuf = data
425         
426         return data+'\n'
427  
428     def connect(self):
429         self.disconnect()
430         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432         self._ctrl_sock.connect(sock_addr)
433
434     def disconnect(self):
435         try:
436             self._ctrl_sock.close()
437         except:
438             pass
439
440 class Client(object):
441     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
442             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443             environment_setup = ""):
444         self.root_dir = root_dir
445         self.addr = (host, port)
446         self.user = user
447         self.agent = agent
448         self.sudo = sudo
449         self.communication = communication
450         self.environment_setup = environment_setup
451         self._stopped = False
452         self._deferreds = collections.deque()
453         self.connect()
454     
455     def __del__(self):
456         if self._process.poll() is None:
457             os.kill(self._process.pid, signal.SIGTERM)
458         self._process.wait()
459         
460     def connect(self):
461         root_dir = self.root_dir
462         (host, port) = self.addr
463         user = self.user
464         agent = self.agent
465         sudo = self.sudo
466         communication = self.communication
467         
468         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469                 c.forward()" % (root_dir,)
470
471         self._process = popen_python(python_code, 
472                     communication = communication,
473                     host = host, 
474                     port = port, 
475                     user = user, 
476                     agent = agent, 
477                     sudo = sudo, 
478                     environment_setup = self.environment_setup)
479                
480         # Wait for the forwarder to be ready, otherwise nobody
481         # will be able to connect to it
482         err = []
483         helo = "nope"
484         while helo:
485             helo = self._process.stderr.readline()
486             if helo == 'FORWARDER_READY.\n':
487                 break
488             err.append(helo)
489         else:
490             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
491         
492     def send_msg(self, msg):
493         encoded = base64.b64encode(msg)
494         data = "%s\n" % encoded
495         
496         try:
497             self._process.stdin.write(data)
498         except (IOError, ValueError):
499             # dead process, poll it to un-zombify
500             self._process.poll()
501             
502             # try again after reconnect
503             # If it fails again, though, give up
504             self.connect()
505             self._process.stdin.write(data)
506
507     def send_stop(self):
508         self.send_msg(STOP_MSG)
509         self._stopped = True
510
511     def defer_reply(self, transform=None):
512         defer_entry = []
513         self._deferreds.append(defer_entry)
514         return defer.Defer(
515             functools.partial(self.read_reply, defer_entry, transform)
516         )
517         
518     def _read_reply(self):
519         data = self._process.stdout.readline()
520         encoded = data.rstrip() 
521         if not encoded:
522             # empty == eof == dead process, poll it to un-zombify
523             self._process.poll()
524             
525             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526         return base64.b64decode(encoded)
527     
528     def read_reply(self, which=None, transform=None):
529         # Test to see if someone did it already
530         if which is not None and len(which):
531             # Ok, they did it...
532             # ...just return the deferred value
533             if transform:
534                 return transform(which[0])
535             else:
536                 return which[0]
537         
538         # Process all deferreds until the one we're looking for
539         # or until the queue is empty
540         while self._deferreds:
541             try:
542                 deferred = self._deferreds.popleft()
543             except IndexError:
544                 # emptied
545                 break
546             
547             deferred.append(self._read_reply())
548             if deferred is which:
549                 # We reached the one we were looking for
550                 if transform:
551                     return transform(deferred[0])
552                 else:
553                     return deferred[0]
554         
555         if which is None:
556             # They've requested a synchronous read
557             if transform:
558                 return transform(self._read_reply())
559             else:
560                 return self._read_reply()
561
562 def _make_server_key_args(server_key, host, port, args):
563     """ 
564     Returns a reference to the created temporary file, and adds the
565     corresponding arguments to the given argument list.
566     
567     Make sure to hold onto it until the process is done with the file
568     """
569     if port is not None:
570         host = '%s:%s' % (host,port)
571     # Create a temporary server key file
572     tmp_known_hosts = tempfile.NamedTemporaryFile()
573     
574     # Add the intended host key
575     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
576     
577     # If we're not in strict mode, add user-configured keys
578     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580         if os.access(user_hosts_path, os.R_OK):
581             f = open(user_hosts_path, "r")
582             tmp_known_hosts.write(f.read())
583             f.close()
584         
585     tmp_known_hosts.flush()
586     
587     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
588     
589     return tmp_known_hosts
590
591 def make_connkey(user, host, port):
592     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593     if len(connkey) > 60:
594         connkey = hashlib.sha1(connkey).hexdigest()
595     return connkey
596
597 def popen_ssh_command(command, host, port, user, agent, 
598         stdin="", 
599         ident_key = None,
600         server_key = None,
601         tty = False,
602         timeout = None,
603         retry = 0,
604         err_on_timeout = True,
605         connect_timeout = 30,
606         persistent = True):
607     """
608     Executes a remote commands, returns ((stdout,stderr),process)
609     """
610     if TRACE:
611         print "ssh", host, command
612     
613     tmp_known_hosts = None
614     connkey = make_connkey(user,host,port)
615     args = ['ssh', '-C',
616             # Don't bother with localhost. Makes test easier
617             '-o', 'NoHostAuthenticationForLocalhost=yes',
618             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
619             '-o', 'ConnectionAttempts=3',
620             '-o', 'ServerAliveInterval=30',
621             '-o', 'TCPKeepAlive=yes',
622             '-l', user, host]
623     if persistent and openssh_has_persist():
624         args.extend([
625             '-o', 'ControlMaster=auto',
626             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
627             '-o', 'ControlPersist=60' ])
628     if agent:
629         args.append('-A')
630     if port:
631         args.append('-p%d' % port)
632     if ident_key:
633         args.extend(('-i', ident_key))
634     if tty:
635         args.append('-t')
636     if server_key:
637         # Create a temporary server key file
638         tmp_known_hosts = _make_server_key_args(
639             server_key, host, port, args)
640     args.append(command)
641
642     for x in xrange(retry or 3):
643         # connects to the remote host and starts a remote connection
644         proc = subprocess.Popen(args, 
645                 stdout = subprocess.PIPE,
646                 stdin = subprocess.PIPE, 
647                 stderr = subprocess.PIPE)
648         
649         # attach tempfile object to the process, to make sure the file stays
650         # alive until the process is finished with it
651         proc._known_hosts = tmp_known_hosts
652         
653         try:
654             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
655             if proc.poll():
656                 if err.strip().startswith('ssh: '):
657                     # SSH error, can safely retry
658                     continue
659                 elif retry:
660                     # Probably timed out or plain failed but can retry
661                     continue
662             break
663         except RuntimeError,e:
664             if retry <= 0:
665                 raise
666             if TRACE:
667                 print " timedout -> ", e.args
668             retry -= 1
669         
670     if TRACE:
671         print " -> ", out, err
672
673     return ((out, err), proc)
674
675 def popen_scp(source, dest, 
676         port = None, 
677         agent = None, 
678         recursive = False,
679         ident_key = None,
680         server_key = None):
681     """
682     Copies from/to remote sites.
683     
684     Source and destination should have the user and host encoded
685     as per scp specs.
686     
687     If source is a file object, a special mode will be used to
688     create the remote file with the same contents.
689     
690     If dest is a file object, the remote file (source) will be
691     read and written into dest.
692     
693     In these modes, recursive cannot be True.
694     
695     Source can be a list of files to copy to a single destination,
696     in which case it is advised that the destination be a folder.
697     """
698     
699     if TRACE:
700         print "scp", source, dest
701     
702     if isinstance(source, file) and source.tell() == 0:
703         source = source.name
704     elif hasattr(source, 'read'):
705         tmp = tempfile.NamedTemporaryFile()
706         while True:
707             buf = source.read(65536)
708             if buf:
709                 tmp.write(buf)
710             else:
711                 break
712         tmp.seek(0)
713         source = tmp.name
714     
715     if isinstance(source, file) or isinstance(dest, file) \
716             or hasattr(source, 'read')  or hasattr(dest, 'write'):
717         assert not recursive
718         
719         # Parse source/destination as <user>@<server>:<path>
720         if isinstance(dest, basestring) and ':' in dest:
721             remspec, path = dest.split(':',1)
722         elif isinstance(source, basestring) and ':' in source:
723             remspec, path = source.split(':',1)
724         else:
725             raise ValueError, "Both endpoints cannot be local"
726         user,host = remspec.rsplit('@',1)
727         tmp_known_hosts = None
728         
729         connkey = make_connkey(user,host,port)
730         args = ['ssh', '-l', user, '-C',
731                 # Don't bother with localhost. Makes test easier
732                 '-o', 'NoHostAuthenticationForLocalhost=yes',
733                 '-o', 'ConnectTimeout=30',
734                 '-o', 'ConnectionAttempts=3',
735                 '-o', 'ServerAliveInterval=30',
736                 '-o', 'TCPKeepAlive=yes',
737                 host ]
738         if openssh_has_persist():
739             args.extend([
740                 '-o', 'ControlMaster=auto',
741                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
742                 '-o', 'ControlPersist=60' ])
743         if port:
744             args.append('-P%d' % port)
745         if ident_key:
746             args.extend(('-i', ident_key))
747         if server_key:
748             # Create a temporary server key file
749             tmp_known_hosts = _make_server_key_args(
750                 server_key, host, port, args)
751         
752         if isinstance(source, file) or hasattr(source, 'read'):
753             args.append('cat > %s' % (shell_escape(path),))
754         elif isinstance(dest, file) or hasattr(dest, 'write'):
755             args.append('cat %s' % (shell_escape(path),))
756         else:
757             raise AssertionError, "Unreachable code reached! :-Q"
758         
759         # connects to the remote host and starts a remote connection
760         if isinstance(source, file):
761             proc = subprocess.Popen(args, 
762                     stdout = open('/dev/null','w'),
763                     stderr = subprocess.PIPE,
764                     stdin = source)
765             err = proc.stderr.read()
766             proc._known_hosts = tmp_known_hosts
767             eintr_retry(proc.wait)()
768             return ((None,err), proc)
769         elif isinstance(dest, file):
770             proc = subprocess.Popen(args, 
771                     stdout = open('/dev/null','w'),
772                     stderr = subprocess.PIPE,
773                     stdin = source)
774             err = proc.stderr.read()
775             proc._known_hosts = tmp_known_hosts
776             eintr_retry(proc.wait)()
777             return ((None,err), proc)
778         elif hasattr(source, 'read'):
779             # file-like (but not file) source
780             proc = subprocess.Popen(args, 
781                     stdout = open('/dev/null','w'),
782                     stderr = subprocess.PIPE,
783                     stdin = subprocess.PIPE)
784             
785             buf = None
786             err = []
787             while True:
788                 if not buf:
789                     buf = source.read(4096)
790                 if not buf:
791                     #EOF
792                     break
793                 
794                 rdrdy, wrdy, broken = select.select(
795                     [proc.stderr],
796                     [proc.stdin],
797                     [proc.stderr,proc.stdin])
798                 
799                 if proc.stderr in rdrdy:
800                     # use os.read for fully unbuffered behavior
801                     err.append(os.read(proc.stderr.fileno(), 4096))
802                 
803                 if proc.stdin in wrdy:
804                     proc.stdin.write(buf)
805                     buf = None
806                 
807                 if broken:
808                     break
809             proc.stdin.close()
810             err.append(proc.stderr.read())
811                 
812             proc._known_hosts = tmp_known_hosts
813             eintr_retry(proc.wait)()
814             return ((None,''.join(err)), proc)
815         elif hasattr(dest, 'write'):
816             # file-like (but not file) dest
817             proc = subprocess.Popen(args, 
818                     stdout = subprocess.PIPE,
819                     stderr = subprocess.PIPE,
820                     stdin = open('/dev/null','w'))
821             
822             buf = None
823             err = []
824             while True:
825                 rdrdy, wrdy, broken = select.select(
826                     [proc.stderr, proc.stdout],
827                     [],
828                     [proc.stderr, proc.stdout])
829                 
830                 if proc.stderr in rdrdy:
831                     # use os.read for fully unbuffered behavior
832                     err.append(os.read(proc.stderr.fileno(), 4096))
833                 
834                 if proc.stdout in rdrdy:
835                     # use os.read for fully unbuffered behavior
836                     buf = os.read(proc.stdout.fileno(), 4096)
837                     dest.write(buf)
838                     
839                     if not buf:
840                         #EOF
841                         break
842                 
843                 if broken:
844                     break
845             err.append(proc.stderr.read())
846                 
847             proc._known_hosts = tmp_known_hosts
848             eintr_retry(proc.wait)()
849             return ((None,''.join(err)), proc)
850         else:
851             raise AssertionError, "Unreachable code reached! :-Q"
852     else:
853         # Parse destination as <user>@<server>:<path>
854         if isinstance(dest, basestring) and ':' in dest:
855             remspec, path = dest.split(':',1)
856         elif isinstance(source, basestring) and ':' in source:
857             remspec, path = source.split(':',1)
858         else:
859             raise ValueError, "Both endpoints cannot be local"
860         user,host = remspec.rsplit('@',1)
861         
862         # plain scp
863         tmp_known_hosts = None
864         args = ['scp', '-q', '-p', '-C',
865                 # Don't bother with localhost. Makes test easier
866                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
867         if port:
868             args.append('-P%d' % port)
869         if recursive:
870             args.append('-r')
871         if ident_key:
872             args.extend(('-i', ident_key))
873         if server_key:
874             # Create a temporary server key file
875             tmp_known_hosts = _make_server_key_args(
876                 server_key, host, port, args)
877         if isinstance(source,list):
878             args.extend(source)
879         else:
880             args.append(source)
881         args.append(dest)
882
883         # connects to the remote host and starts a remote connection
884         proc = subprocess.Popen(args, 
885                 stdout = subprocess.PIPE,
886                 stdin = subprocess.PIPE, 
887                 stderr = subprocess.PIPE)
888         proc._known_hosts = tmp_known_hosts
889         
890         comm = proc.communicate()
891         eintr_retry(proc.wait)()
892         return (comm, proc)
893
894 def decode_and_execute():
895     # The python code we want to execute might have characters that 
896     # are not compatible with the 'inline' mode we are using. To avoid
897     # problems we receive the encoded python code in base64 as a input 
898     # stream and decode it for execution.
899     import base64, os
900     cmd = ""
901     while True:
902         try:
903             cmd += os.read(0, 1)# one byte from stdin
904         except OSError, e:            
905             if e.errno == errno.EINTR:
906                 continue
907             else:
908                 raise
909         if cmd[-1] == "\n": 
910             break
911     cmd = base64.b64decode(cmd)
912     # Uncomment for debug
913     #os.write(2, "Executing python code: %s\n" % cmd)
914     os.write(1, "OK\n") # send a sync message
915     exec(cmd)
916
917 def popen_python(python_code, 
918         communication = DC.ACCESS_LOCAL,
919         host = None, 
920         port = None, 
921         user = None, 
922         agent = False, 
923         python_path = None,
924         ident_key = None,
925         server_key = None,
926         tty = False,
927         sudo = False, 
928         environment_setup = ""):
929
930     cmd = ""
931     if python_path:
932         python_path.replace("'", r"'\''")
933         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
934         cmd += " ; "
935     if environment_setup:
936         cmd += environment_setup
937         cmd += " ; "
938     # Uncomment for debug (to run everything under strace)
939     # We had to verify if strace works (cannot nest them)
940     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
941     #cmd += "$CMD "
942     #cmd += "strace -f -tt -s 200 -o strace$$.out "
943     import nepi
944     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
945         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
946     )
947
948     if sudo:
949         if ';' in cmd:
950             cmd = "sudo bash -c " + shell_escape(cmd)
951         else:
952             cmd = "sudo " + cmd
953
954     if communication == DC.ACCESS_SSH:
955         tmp_known_hosts = None
956         args = ['ssh', '-C',
957                 # Don't bother with localhost. Makes test easier
958                 '-o', 'NoHostAuthenticationForLocalhost=yes',
959                 '-o', 'ConnectionAttempts=3',
960                 '-o', 'ServerAliveInterval=30',
961                 '-o', 'TCPKeepAlive=yes',
962                 '-l', user, host]
963         if agent:
964             args.append('-A')
965         if port:
966             args.append('-p%d' % port)
967         if ident_key:
968             args.extend(('-i', ident_key))
969         if tty:
970             args.append('-t')
971         if server_key:
972             # Create a temporary server key file
973             tmp_known_hosts = _make_server_key_args(
974                 server_key, host, port, args)
975         args.append(cmd)
976     else:
977         args = [ "/bin/bash", "-c", cmd ]
978
979     # connects to the remote host and starts a remote
980     proc = subprocess.Popen(args,
981             shell = False, 
982             stdout = subprocess.PIPE,
983             stdin = subprocess.PIPE, 
984             stderr = subprocess.PIPE)
985
986     if communication == DC.ACCESS_SSH:
987         proc._known_hosts = tmp_known_hosts
988
989     # send the command to execute
990     os.write(proc.stdin.fileno(),
991             base64.b64encode(python_code) + "\n")
992  
993     while True: 
994         try:
995             msg = os.read(proc.stdout.fileno(), 3)
996             break
997         except OSError, e:            
998             if e.errno == errno.EINTR:
999                 continue
1000             else:
1001                 raise
1002     
1003     if msg != "OK\n":
1004         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1005             msg, proc.stdout.read(), proc.stderr.read())
1006
1007     return proc
1008
1009 # POSIX
1010 def _communicate(self, input, timeout=None, err_on_timeout=True):
1011     read_set = []
1012     write_set = []
1013     stdout = None # Return
1014     stderr = None # Return
1015     
1016     killed = False
1017     
1018     if timeout is not None:
1019         timelimit = time.time() + timeout
1020         killtime = timelimit + 4
1021         bailtime = timelimit + 4
1022
1023     if self.stdin:
1024         # Flush stdio buffer.  This might block, if the user has
1025         # been writing to .stdin in an uncontrolled fashion.
1026         self.stdin.flush()
1027         if input:
1028             write_set.append(self.stdin)
1029         else:
1030             self.stdin.close()
1031     if self.stdout:
1032         read_set.append(self.stdout)
1033         stdout = []
1034     if self.stderr:
1035         read_set.append(self.stderr)
1036         stderr = []
1037
1038     input_offset = 0
1039     while read_set or write_set:
1040         if timeout is not None:
1041             curtime = time.time()
1042             if timeout is None or curtime > timelimit:
1043                 if curtime > bailtime:
1044                     break
1045                 elif curtime > killtime:
1046                     signum = signal.SIGKILL
1047                 else:
1048                     signum = signal.SIGTERM
1049                 # Lets kill it
1050                 os.kill(self.pid, signum)
1051                 select_timeout = 0.5
1052             else:
1053                 select_timeout = timelimit - curtime + 0.1
1054         else:
1055             select_timeout = 1.0
1056         
1057         if select_timeout > 1.0:
1058             select_timeout = 1.0
1059             
1060         try:
1061             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1062         except select.error,e:
1063             if e[0] != 4:
1064                 raise
1065             else:
1066                 continue
1067         
1068         if not rlist and not wlist and not xlist and self.poll() is not None:
1069             # timeout and process exited, say bye
1070             break
1071
1072         if self.stdin in wlist:
1073             # When select has indicated that the file is writable,
1074             # we can write up to PIPE_BUF bytes without risk
1075             # blocking.  POSIX defines PIPE_BUF >= 512
1076             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1077             input_offset += bytes_written
1078             if input_offset >= len(input):
1079                 self.stdin.close()
1080                 write_set.remove(self.stdin)
1081
1082         if self.stdout in rlist:
1083             data = os.read(self.stdout.fileno(), 1024)
1084             if data == "":
1085                 self.stdout.close()
1086                 read_set.remove(self.stdout)
1087             stdout.append(data)
1088
1089         if self.stderr in rlist:
1090             data = os.read(self.stderr.fileno(), 1024)
1091             if data == "":
1092                 self.stderr.close()
1093                 read_set.remove(self.stderr)
1094             stderr.append(data)
1095     
1096     # All data exchanged.  Translate lists into strings.
1097     if stdout is not None:
1098         stdout = ''.join(stdout)
1099     if stderr is not None:
1100         stderr = ''.join(stderr)
1101
1102     # Translate newlines, if requested.  We cannot let the file
1103     # object do the translation: It is based on stdio, which is
1104     # impossible to combine with select (unless forcing no
1105     # buffering).
1106     if self.universal_newlines and hasattr(file, 'newlines'):
1107         if stdout:
1108             stdout = self._translate_newlines(stdout)
1109         if stderr:
1110             stderr = self._translate_newlines(stderr)
1111
1112     if killed and err_on_timeout:
1113         errcode = self.poll()
1114         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1115     else:
1116         if killed:
1117             self.poll()
1118         else:
1119             self.wait()
1120         return (stdout, stderr)
1121