4430d6ef5901df90482b112a4c7b8c41d317b605
[nepi.git] / src / nepi / util / server.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.util.constants import DeploymentConfiguration as DC
5
6 import base64
7 import errno
8 import os
9 import os.path
10 import resource
11 import select
12 import shutil
13 import signal
14 import socket
15 import sys
16 import subprocess
17 import threading
18 import time
19 import traceback
20 import re
21 import tempfile
22 import defer
23 import functools
24 import collections
25 import hashlib
26
27 CTRL_SOCK = "ctrl.sock"
28 CTRL_PID = "ctrl.pid"
29 STD_ERR = "stderr.log"
30 MAX_FD = 1024
31
32 STOP_MSG = "STOP"
33
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35
36 OPENSSH_HAS_PERSIST = None
37
38 if hasattr(os, "devnull"):
39     DEV_NULL = os.devnull
40 else:
41     DEV_NULL = "/dev/null"
42
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44
45 def openssh_has_persist():
46     global OPENSSH_HAS_PERSIST
47     if OPENSSH_HAS_PERSIST is None:
48         proc = subprocess.Popen(["ssh","-v"],
49             stdout = subprocess.PIPE,
50             stderr = subprocess.STDOUT,
51             stdin = open("/dev/null","r") )
52         out,err = proc.communicate()
53         proc.wait()
54         
55         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56         OPENSSH_HAS_PERSIST = bool(vre.match(out))
57     return OPENSSH_HAS_PERSIST
58
59 def shell_escape(s):
60     """ Escapes strings so that they are safe to use as command-line arguments """
61     if SHELL_SAFE.match(s):
62         # safe string - no escaping needed
63         return s
64     else:
65         # unsafe string - escape
66         def escp(c):
67             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
68                 return c
69             else:
70                 return "'$'\\x%02x''" % (ord(c),)
71         s = ''.join(map(escp,s))
72         return "'%s'" % (s,)
73
74 def eintr_retry(func):
75     import functools
76     @functools.wraps(func)
77     def rv(*p, **kw):
78         retry = kw.pop("_retry", False)
79         for i in xrange(0 if retry else 4):
80             try:
81                 return func(*p, **kw)
82             except (select.error, socket.error), args:
83                 if args[0] == errno.EINTR:
84                     continue
85                 else:
86                     raise 
87             except OSError, e:
88                 if e.errno == errno.EINTR:
89                     continue
90                 else:
91                     raise
92         else:
93             return func(*p, **kw)
94     return rv
95
96 class Server(object):
97     def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
98             environment_setup = "", clean_root = False):
99         self._root_dir = root_dir
100         self._clean_root = clean_root
101         self._stop = False
102         self._ctrl_sock = None
103         self._log_level = log_level
104         self._rdbuf = ""
105         self._environment_setup = environment_setup
106
107     def run(self):
108         try:
109             if self.daemonize():
110                 self.post_daemonize()
111                 self.loop()
112                 self.cleanup()
113                 # ref: "os._exit(0)"
114                 # can not return normally after fork beacuse no exec was done.
115                 # This means that if we don't do a os._exit(0) here the code that 
116                 # follows the call to "Server.run()" in the "caller code" will be 
117                 # executed... but by now it has already been executed after the 
118                 # first process (the one that did the first fork) returned.
119                 os._exit(0)
120         except:
121             print >>sys.stderr, "SERVER_ERROR."
122             self.log_error()
123             self.cleanup()
124             os._exit(0)
125         print >>sys.stderr, "SERVER_READY."
126
127     def daemonize(self):
128         # pipes for process synchronization
129         (r, w) = os.pipe()
130         
131         # build root folder
132         root = os.path.normpath(self._root_dir)
133         if self._root_dir not in [".", ""] and os.path.exists(root) \
134                 and self._clean_root:
135             shutil.rmtree(root)
136         if not os.path.exists(root):
137             os.makedirs(root, 0755)
138
139         pid1 = os.fork()
140         if pid1 > 0:
141             os.close(w)
142             while True:
143                 try:
144                     os.read(r, 1)
145                 except OSError, e: # pragma: no cover
146                     if e.errno == errno.EINTR:
147                         continue
148                     else:
149                         raise
150                 break
151             os.close(r)
152             # os.waitpid avoids leaving a <defunc> (zombie) process
153             st = os.waitpid(pid1, 0)[1]
154             if st:
155                 raise RuntimeError("Daemonization failed")
156             # return 0 to inform the caller method that this is not the 
157             # daemonized process
158             return 0
159         os.close(r)
160
161         # Decouple from parent environment.
162         os.chdir(self._root_dir)
163         os.umask(0)
164         os.setsid()
165
166         # fork 2
167         pid2 = os.fork()
168         if pid2 > 0:
169             # see ref: "os._exit(0)"
170             os._exit(0)
171
172         # close all open file descriptors.
173         max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174         if (max_fd == resource.RLIM_INFINITY):
175             max_fd = MAX_FD
176         for fd in range(3, max_fd):
177             if fd != w:
178                 try:
179                     os.close(fd)
180                 except OSError:
181                     pass
182
183         # Redirect standard file descriptors.
184         stdin = open(DEV_NULL, "r")
185         stderr = stdout = open(STD_ERR, "a", 0)
186         os.dup2(stdin.fileno(), sys.stdin.fileno())
187         # NOTE: sys.stdout.write will still be buffered, even if the file
188         # was opened with 0 buffer
189         os.dup2(stdout.fileno(), sys.stdout.fileno())
190         os.dup2(stderr.fileno(), sys.stderr.fileno())
191         
192         # setup environment
193         if self._environment_setup:
194             # parse environment variables and pass to child process
195             # do it by executing shell commands, in case there's some heavy setup involved
196             envproc = subprocess.Popen(
197                 [ "bash", "-c", 
198                     "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199                         ( self._environment_setup, ) ],
200                 stdin = subprocess.PIPE, 
201                 stdout = subprocess.PIPE,
202                 stderr = subprocess.PIPE
203             )
204             out,err = envproc.communicate()
205
206             # parse new environment
207             if out:
208                 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
209             
210                 # apply to current environment
211                 for name, value in environment.iteritems():
212                     os.environ[name] = value
213                 
214                 # apply pythonpath
215                 if 'PYTHONPATH' in environment:
216                     sys.path = environment['PYTHONPATH'].split(':') + sys.path
217
218         # create control socket
219         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220         try:
221             self._ctrl_sock.bind(CTRL_SOCK)
222         except socket.error:
223             # Address in use, check pidfile
224             pid = None
225             try:
226                 pidfile = open(CTRL_PID, "r")
227                 pid = pidfile.read()
228                 pidfile.close()
229                 pid = int(pid)
230             except:
231                 # no pidfile
232                 pass
233             
234             if pid is not None:
235                 # Check process liveliness
236                 if not os.path.exists("/proc/%d" % (pid,)):
237                     # Ok, it's dead, clean the socket
238                     os.remove(CTRL_SOCK)
239             
240             # try again
241             self._ctrl_sock.bind(CTRL_SOCK)
242             
243         self._ctrl_sock.listen(0)
244         
245         # Save pidfile
246         pidfile = open(CTRL_PID, "w")
247         pidfile.write(str(os.getpid()))
248         pidfile.close()
249
250         # let the parent process know that the daemonization is finished
251         os.write(w, "\n")
252         os.close(w)
253         return 1
254
255     def post_daemonize(self):
256         os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257         # QT, for some strange reason, redefines the SIGCHILD handler to write
258         # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259         # Server dameonization closes all file descriptors from fileno '3',
260         # but the overloaded handler (inherited by the forked process) will
261         # keep trying to write the \0 to fileno 'x', which might have been reused 
262         # after closing, for other operations. This is bad bad bad when fileno 'x'
263         # is in use for communication pouroses, because unexpected \0 start
264         # appearing in the communication messages... this is exactly what happens 
265         # when using netns in daemonized form. Thus, be have no other alternative than
266         # restoring the SIGCHLD handler to the default here.
267         import signal
268         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
269
270     def loop(self):
271         while not self._stop:
272             conn, addr = self._ctrl_sock.accept()
273             self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
274             conn.settimeout(5)
275             while not self._stop:
276                 try:
277                     msg = self.recv_msg(conn)
278                 except socket.timeout, e:
279                     #self.log_error("SERVER recv_msg: connection timedout ")
280                     continue
281                 
282                 if not msg:
283                     self.log_error("CONNECTION LOST")
284                     break
285                     
286                 if msg == STOP_MSG:
287                     self._stop = True
288                     reply = self.stop_action()
289                 else:
290                     reply = self.reply_action(msg)
291                 
292                 try:
293                     self.send_reply(conn, reply)
294                 except socket.error:
295                     self.log_error()
296                     self.log_error("NOTICE: Awaiting for reconnection")
297                     break
298             try:
299                 conn.close()
300             except:
301                 # Doesn't matter
302                 self.log_error()
303
304     def recv_msg(self, conn):
305         data = [self._rdbuf]
306         chunk = data[0]
307         while '\n' not in chunk:
308             try:
309                 chunk = conn.recv(1024)
310             except (OSError, socket.error), e:
311                 if e[0] != errno.EINTR:
312                     raise
313                 else:
314                     continue
315             if chunk:
316                 data.append(chunk)
317             else:
318                 # empty chunk = EOF
319                 break
320         data = ''.join(data).split('\n',1)
321         while len(data) < 2:
322             data.append('')
323         data, self._rdbuf = data
324         
325         decoded = base64.b64decode(data)
326         return decoded.rstrip()
327
328     def send_reply(self, conn, reply):
329         encoded = base64.b64encode(reply)
330         conn.send("%s\n" % encoded)
331        
332     def cleanup(self):
333         try:
334             self._ctrl_sock.close()
335             os.remove(CTRL_SOCK)
336         except:
337             self.log_error()
338
339     def stop_action(self):
340         return "Stopping server"
341
342     def reply_action(self, msg):
343         return "Reply to: %s" % msg
344
345     def log_error(self, text = None, context = ''):
346         if text == None:
347             text = traceback.format_exc()
348         date = time.strftime("%Y-%m-%d %H:%M:%S")
349         if context:
350             context = " (%s)" % (context,)
351         sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
352         return text
353
354     def log_debug(self, text):
355         if self._log_level == DC.DEBUG_LEVEL:
356             date = time.strftime("%Y-%m-%d %H:%M:%S")
357             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
358
359 class Forwarder(object):
360     def __init__(self, root_dir = "."):
361         self._ctrl_sock = None
362         self._root_dir = root_dir
363         self._stop = False
364         self._rdbuf = ""
365
366     def forward(self):
367         self.connect()
368         print >>sys.stderr, "FORWARDER_READY."
369         while not self._stop:
370             data = self.read_data()
371             if not data:
372                 # Connection to client lost
373                 break
374             self.send_to_server(data)
375             
376             data = self.recv_from_server()
377             if not data:
378                 # Connection to server lost
379                 raise IOError, "Connection to server lost while "\
380                     "expecting response"
381             self.write_data(data)
382         self.disconnect()
383
384     def read_data(self):
385         return sys.stdin.readline()
386
387     def write_data(self, data):
388         sys.stdout.write(data)
389         # sys.stdout.write is buffered, this is why we need to do a flush()
390         sys.stdout.flush()
391
392     def send_to_server(self, data):
393         try:
394             self._ctrl_sock.send(data)
395         except (IOError, socket.error), e:
396             if e[0] == errno.EPIPE:
397                 self.connect()
398                 self._ctrl_sock.send(data)
399             else:
400                 raise e
401         encoded = data.rstrip() 
402         msg = base64.b64decode(encoded)
403         if msg == STOP_MSG:
404             self._stop = True
405
406     def recv_from_server(self):
407         data = [self._rdbuf]
408         chunk = data[0]
409         while '\n' not in chunk:
410             try:
411                 chunk = self._ctrl_sock.recv(1024)
412             except (OSError, socket.error), e:
413                 if e[0] != errno.EINTR:
414                     raise
415                 continue
416             if chunk:
417                 data.append(chunk)
418             else:
419                 # empty chunk = EOF
420                 break
421         data = ''.join(data).split('\n',1)
422         while len(data) < 2:
423             data.append('')
424         data, self._rdbuf = data
425         
426         return data+'\n'
427  
428     def connect(self):
429         self.disconnect()
430         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431         sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432         self._ctrl_sock.connect(sock_addr)
433
434     def disconnect(self):
435         try:
436             self._ctrl_sock.close()
437         except:
438             pass
439
440 class Client(object):
441     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
442             agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443             environment_setup = ""):
444         self.root_dir = root_dir
445         self.addr = (host, port)
446         self.user = user
447         self.agent = agent
448         self.sudo = sudo
449         self.communication = communication
450         self.environment_setup = environment_setup
451         self._stopped = False
452         self._deferreds = collections.deque()
453         self.connect()
454     
455     def __del__(self):
456         if self._process.poll() is None:
457             os.kill(self._process.pid, signal.SIGTERM)
458         self._process.wait()
459         
460     def connect(self):
461         root_dir = self.root_dir
462         (host, port) = self.addr
463         user = self.user
464         agent = self.agent
465         sudo = self.sudo
466         communication = self.communication
467         
468         python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469                 c.forward()" % (root_dir,)
470
471         self._process = popen_python(python_code, 
472                     communication = communication,
473                     host = host, 
474                     port = port, 
475                     user = user, 
476                     agent = agent, 
477                     sudo = sudo, 
478                     environment_setup = self.environment_setup)
479                
480         # Wait for the forwarder to be ready, otherwise nobody
481         # will be able to connect to it
482         err = []
483         helo = "nope"
484         while helo:
485             helo = self._process.stderr.readline()
486             if helo == 'FORWARDER_READY.\n':
487                 break
488             err.append(helo)
489         else:
490             raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
491         
492     def send_msg(self, msg):
493         encoded = base64.b64encode(msg)
494         data = "%s\n" % encoded
495         
496         try:
497             self._process.stdin.write(data)
498         except (IOError, ValueError):
499             # dead process, poll it to un-zombify
500             self._process.poll()
501             
502             # try again after reconnect
503             # If it fails again, though, give up
504             self.connect()
505             self._process.stdin.write(data)
506
507     def send_stop(self):
508         self.send_msg(STOP_MSG)
509         self._stopped = True
510
511     def defer_reply(self, transform=None):
512         defer_entry = []
513         self._deferreds.append(defer_entry)
514         return defer.Defer(
515             functools.partial(self.read_reply, defer_entry, transform)
516         )
517         
518     def _read_reply(self):
519         data = self._process.stdout.readline()
520         encoded = data.rstrip() 
521         if not encoded:
522             # empty == eof == dead process, poll it to un-zombify
523             self._process.poll()
524             
525             raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526         return base64.b64decode(encoded)
527     
528     def read_reply(self, which=None, transform=None):
529         # Test to see if someone did it already
530         if which is not None and len(which):
531             # Ok, they did it...
532             # ...just return the deferred value
533             if transform:
534                 return transform(which[0])
535             else:
536                 return which[0]
537         
538         # Process all deferreds until the one we're looking for
539         # or until the queue is empty
540         while self._deferreds:
541             try:
542                 deferred = self._deferreds.popleft()
543             except IndexError:
544                 # emptied
545                 break
546             
547             deferred.append(self._read_reply())
548             if deferred is which:
549                 # We reached the one we were looking for
550                 if transform:
551                     return transform(deferred[0])
552                 else:
553                     return deferred[0]
554         
555         if which is None:
556             # They've requested a synchronous read
557             if transform:
558                 return transform(self._read_reply())
559             else:
560                 return self._read_reply()
561
562 def _make_server_key_args(server_key, host, port, args):
563     """ 
564     Returns a reference to the created temporary file, and adds the
565     corresponding arguments to the given argument list.
566     
567     Make sure to hold onto it until the process is done with the file
568     """
569     if port is not None:
570         host = '%s:%s' % (host,port)
571     # Create a temporary server key file
572     tmp_known_hosts = tempfile.NamedTemporaryFile()
573     
574     # Add the intended host key
575     tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
576     
577     # If we're not in strict mode, add user-configured keys
578     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580         if os.access(user_hosts_path, os.R_OK):
581             f = open(user_hosts_path, "r")
582             tmp_known_hosts.write(f.read())
583             f.close()
584         
585     tmp_known_hosts.flush()
586     
587     args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
588     
589     return tmp_known_hosts
590
591 def make_connkey(user, host, port):
592     connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593     if len(connkey) > 60:
594         connkey = hashlib.sha1(connkey).hexdigest()
595     return connkey
596
597 def popen_ssh_command(command, host, port, user, agent, 
598         stdin="", 
599         ident_key = None,
600         server_key = None,
601         tty = False,
602         timeout = None,
603         retry = 0,
604         err_on_timeout = True,
605         connect_timeout = 30,
606         persistent = True,
607         hostip = None):
608     """
609     Executes a remote commands, returns ((stdout,stderr),process)
610     """
611     if TRACE:
612         print "ssh", host, command
613     
614     tmp_known_hosts = None
615     connkey = make_connkey(user,host,port)
616     args = ['ssh', '-C',
617             # Don't bother with localhost. Makes test easier
618             '-o', 'NoHostAuthenticationForLocalhost=yes',
619             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
620             '-o', 'ConnectionAttempts=3',
621             '-o', 'ServerAliveInterval=30',
622             '-o', 'TCPKeepAlive=yes',
623             '-l', user, hostip or host]
624     if persistent and openssh_has_persist():
625         args.extend([
626             '-o', 'ControlMaster=auto',
627             '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
628             '-o', 'ControlPersist=60' ])
629     if agent:
630         args.append('-A')
631     if port:
632         args.append('-p%d' % port)
633     if ident_key:
634         args.extend(('-i', ident_key))
635     if tty:
636         args.append('-t')
637     if server_key:
638         # Create a temporary server key file
639         tmp_known_hosts = _make_server_key_args(
640             server_key, host, port, args)
641     args.append(command)
642
643     for x in xrange(retry or 3):
644         # connects to the remote host and starts a remote connection
645         proc = subprocess.Popen(args, 
646                 stdout = subprocess.PIPE,
647                 stdin = subprocess.PIPE, 
648                 stderr = subprocess.PIPE)
649         
650         # attach tempfile object to the process, to make sure the file stays
651         # alive until the process is finished with it
652         proc._known_hosts = tmp_known_hosts
653         
654         try:
655             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
656             if proc.poll():
657                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
658                     # SSH error, can safely retry
659                     continue
660                 elif retry:
661                     # Probably timed out or plain failed but can retry
662                     continue
663             break
664         except RuntimeError,e:
665             if retry <= 0:
666                 raise
667             if TRACE:
668                 print " timedout -> ", e.args
669             retry -= 1
670         
671     if TRACE:
672         print " -> ", out, err
673
674     return ((out, err), proc)
675
676 def popen_scp(source, dest, 
677         port = None, 
678         agent = None, 
679         recursive = False,
680         ident_key = None,
681         server_key = None):
682     """
683     Copies from/to remote sites.
684     
685     Source and destination should have the user and host encoded
686     as per scp specs.
687     
688     If source is a file object, a special mode will be used to
689     create the remote file with the same contents.
690     
691     If dest is a file object, the remote file (source) will be
692     read and written into dest.
693     
694     In these modes, recursive cannot be True.
695     
696     Source can be a list of files to copy to a single destination,
697     in which case it is advised that the destination be a folder.
698     """
699     
700     if TRACE:
701         print "scp", source, dest
702     
703     if isinstance(source, file) and source.tell() == 0:
704         source = source.name
705     elif hasattr(source, 'read'):
706         tmp = tempfile.NamedTemporaryFile()
707         while True:
708             buf = source.read(65536)
709             if buf:
710                 tmp.write(buf)
711             else:
712                 break
713         tmp.seek(0)
714         source = tmp.name
715     
716     if isinstance(source, file) or isinstance(dest, file) \
717             or hasattr(source, 'read')  or hasattr(dest, 'write'):
718         assert not recursive
719         
720         # Parse source/destination as <user>@<server>:<path>
721         if isinstance(dest, basestring) and ':' in dest:
722             remspec, path = dest.split(':',1)
723         elif isinstance(source, basestring) and ':' in source:
724             remspec, path = source.split(':',1)
725         else:
726             raise ValueError, "Both endpoints cannot be local"
727         user,host = remspec.rsplit('@',1)
728         tmp_known_hosts = None
729         
730         connkey = make_connkey(user,host,port)
731         args = ['ssh', '-l', user, '-C',
732                 # Don't bother with localhost. Makes test easier
733                 '-o', 'NoHostAuthenticationForLocalhost=yes',
734                 '-o', 'ConnectTimeout=30',
735                 '-o', 'ConnectionAttempts=3',
736                 '-o', 'ServerAliveInterval=30',
737                 '-o', 'TCPKeepAlive=yes',
738                 host ]
739         if openssh_has_persist():
740             args.extend([
741                 '-o', 'ControlMaster=auto',
742                 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
743                 '-o', 'ControlPersist=60' ])
744         if port:
745             args.append('-P%d' % port)
746         if ident_key:
747             args.extend(('-i', ident_key))
748         if server_key:
749             # Create a temporary server key file
750             tmp_known_hosts = _make_server_key_args(
751                 server_key, host, port, args)
752         
753         if isinstance(source, file) or hasattr(source, 'read'):
754             args.append('cat > %s' % (shell_escape(path),))
755         elif isinstance(dest, file) or hasattr(dest, 'write'):
756             args.append('cat %s' % (shell_escape(path),))
757         else:
758             raise AssertionError, "Unreachable code reached! :-Q"
759         
760         # connects to the remote host and starts a remote connection
761         if isinstance(source, file):
762             proc = subprocess.Popen(args, 
763                     stdout = open('/dev/null','w'),
764                     stderr = subprocess.PIPE,
765                     stdin = source)
766             err = proc.stderr.read()
767             proc._known_hosts = tmp_known_hosts
768             eintr_retry(proc.wait)()
769             return ((None,err), proc)
770         elif isinstance(dest, file):
771             proc = subprocess.Popen(args, 
772                     stdout = open('/dev/null','w'),
773                     stderr = subprocess.PIPE,
774                     stdin = source)
775             err = proc.stderr.read()
776             proc._known_hosts = tmp_known_hosts
777             eintr_retry(proc.wait)()
778             return ((None,err), proc)
779         elif hasattr(source, 'read'):
780             # file-like (but not file) source
781             proc = subprocess.Popen(args, 
782                     stdout = open('/dev/null','w'),
783                     stderr = subprocess.PIPE,
784                     stdin = subprocess.PIPE)
785             
786             buf = None
787             err = []
788             while True:
789                 if not buf:
790                     buf = source.read(4096)
791                 if not buf:
792                     #EOF
793                     break
794                 
795                 rdrdy, wrdy, broken = select.select(
796                     [proc.stderr],
797                     [proc.stdin],
798                     [proc.stderr,proc.stdin])
799                 
800                 if proc.stderr in rdrdy:
801                     # use os.read for fully unbuffered behavior
802                     err.append(os.read(proc.stderr.fileno(), 4096))
803                 
804                 if proc.stdin in wrdy:
805                     proc.stdin.write(buf)
806                     buf = None
807                 
808                 if broken:
809                     break
810             proc.stdin.close()
811             err.append(proc.stderr.read())
812                 
813             proc._known_hosts = tmp_known_hosts
814             eintr_retry(proc.wait)()
815             return ((None,''.join(err)), proc)
816         elif hasattr(dest, 'write'):
817             # file-like (but not file) dest
818             proc = subprocess.Popen(args, 
819                     stdout = subprocess.PIPE,
820                     stderr = subprocess.PIPE,
821                     stdin = open('/dev/null','w'))
822             
823             buf = None
824             err = []
825             while True:
826                 rdrdy, wrdy, broken = select.select(
827                     [proc.stderr, proc.stdout],
828                     [],
829                     [proc.stderr, proc.stdout])
830                 
831                 if proc.stderr in rdrdy:
832                     # use os.read for fully unbuffered behavior
833                     err.append(os.read(proc.stderr.fileno(), 4096))
834                 
835                 if proc.stdout in rdrdy:
836                     # use os.read for fully unbuffered behavior
837                     buf = os.read(proc.stdout.fileno(), 4096)
838                     dest.write(buf)
839                     
840                     if not buf:
841                         #EOF
842                         break
843                 
844                 if broken:
845                     break
846             err.append(proc.stderr.read())
847                 
848             proc._known_hosts = tmp_known_hosts
849             eintr_retry(proc.wait)()
850             return ((None,''.join(err)), proc)
851         else:
852             raise AssertionError, "Unreachable code reached! :-Q"
853     else:
854         # Parse destination as <user>@<server>:<path>
855         if isinstance(dest, basestring) and ':' in dest:
856             remspec, path = dest.split(':',1)
857         elif isinstance(source, basestring) and ':' in source:
858             remspec, path = source.split(':',1)
859         else:
860             raise ValueError, "Both endpoints cannot be local"
861         user,host = remspec.rsplit('@',1)
862         
863         # plain scp
864         tmp_known_hosts = None
865         args = ['scp', '-q', '-p', '-C',
866                 # Don't bother with localhost. Makes test easier
867                 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
868         if port:
869             args.append('-P%d' % port)
870         if recursive:
871             args.append('-r')
872         if ident_key:
873             args.extend(('-i', ident_key))
874         if server_key:
875             # Create a temporary server key file
876             tmp_known_hosts = _make_server_key_args(
877                 server_key, host, port, args)
878         if isinstance(source,list):
879             args.extend(source)
880         else:
881             args.append(source)
882         args.append(dest)
883
884         # connects to the remote host and starts a remote connection
885         proc = subprocess.Popen(args, 
886                 stdout = subprocess.PIPE,
887                 stdin = subprocess.PIPE, 
888                 stderr = subprocess.PIPE)
889         proc._known_hosts = tmp_known_hosts
890         
891         comm = proc.communicate()
892         eintr_retry(proc.wait)()
893         return (comm, proc)
894
895 def decode_and_execute():
896     # The python code we want to execute might have characters that 
897     # are not compatible with the 'inline' mode we are using. To avoid
898     # problems we receive the encoded python code in base64 as a input 
899     # stream and decode it for execution.
900     import base64, os
901     cmd = ""
902     while True:
903         try:
904             cmd += os.read(0, 1)# one byte from stdin
905         except OSError, e:            
906             if e.errno == errno.EINTR:
907                 continue
908             else:
909                 raise
910         if cmd[-1] == "\n": 
911             break
912     cmd = base64.b64decode(cmd)
913     # Uncomment for debug
914     #os.write(2, "Executing python code: %s\n" % cmd)
915     os.write(1, "OK\n") # send a sync message
916     exec(cmd)
917
918 def popen_python(python_code, 
919         communication = DC.ACCESS_LOCAL,
920         host = None, 
921         port = None, 
922         user = None, 
923         agent = False, 
924         python_path = None,
925         ident_key = None,
926         server_key = None,
927         tty = False,
928         sudo = False, 
929         environment_setup = ""):
930
931     cmd = ""
932     if python_path:
933         python_path.replace("'", r"'\''")
934         cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
935         cmd += " ; "
936     if environment_setup:
937         cmd += environment_setup
938         cmd += " ; "
939     # Uncomment for debug (to run everything under strace)
940     # We had to verify if strace works (cannot nest them)
941     #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
942     #cmd += "$CMD "
943     #cmd += "strace -f -tt -s 200 -o strace$$.out "
944     import nepi
945     cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
946         repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
947     )
948
949     if sudo:
950         if ';' in cmd:
951             cmd = "sudo bash -c " + shell_escape(cmd)
952         else:
953             cmd = "sudo " + cmd
954
955     if communication == DC.ACCESS_SSH:
956         tmp_known_hosts = None
957         args = ['ssh', '-C',
958                 # Don't bother with localhost. Makes test easier
959                 '-o', 'NoHostAuthenticationForLocalhost=yes',
960                 '-o', 'ConnectionAttempts=3',
961                 '-o', 'ServerAliveInterval=30',
962                 '-o', 'TCPKeepAlive=yes',
963                 '-l', user, host]
964         if agent:
965             args.append('-A')
966         if port:
967             args.append('-p%d' % port)
968         if ident_key:
969             args.extend(('-i', ident_key))
970         if tty:
971             args.append('-t')
972         if server_key:
973             # Create a temporary server key file
974             tmp_known_hosts = _make_server_key_args(
975                 server_key, host, port, args)
976         args.append(cmd)
977     else:
978         args = [ "/bin/bash", "-c", cmd ]
979
980     # connects to the remote host and starts a remote
981     proc = subprocess.Popen(args,
982             shell = False, 
983             stdout = subprocess.PIPE,
984             stdin = subprocess.PIPE, 
985             stderr = subprocess.PIPE)
986
987     if communication == DC.ACCESS_SSH:
988         proc._known_hosts = tmp_known_hosts
989
990     # send the command to execute
991     os.write(proc.stdin.fileno(),
992             base64.b64encode(python_code) + "\n")
993  
994     while True: 
995         try:
996             msg = os.read(proc.stdout.fileno(), 3)
997             break
998         except OSError, e:            
999             if e.errno == errno.EINTR:
1000                 continue
1001             else:
1002                 raise
1003     
1004     if msg != "OK\n":
1005         raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1006             msg, proc.stdout.read(), proc.stderr.read())
1007
1008     return proc
1009
1010 # POSIX
1011 def _communicate(self, input, timeout=None, err_on_timeout=True):
1012     read_set = []
1013     write_set = []
1014     stdout = None # Return
1015     stderr = None # Return
1016     
1017     killed = False
1018     
1019     if timeout is not None:
1020         timelimit = time.time() + timeout
1021         killtime = timelimit + 4
1022         bailtime = timelimit + 4
1023
1024     if self.stdin:
1025         # Flush stdio buffer.  This might block, if the user has
1026         # been writing to .stdin in an uncontrolled fashion.
1027         self.stdin.flush()
1028         if input:
1029             write_set.append(self.stdin)
1030         else:
1031             self.stdin.close()
1032     if self.stdout:
1033         read_set.append(self.stdout)
1034         stdout = []
1035     if self.stderr:
1036         read_set.append(self.stderr)
1037         stderr = []
1038
1039     input_offset = 0
1040     while read_set or write_set:
1041         if timeout is not None:
1042             curtime = time.time()
1043             if timeout is None or curtime > timelimit:
1044                 if curtime > bailtime:
1045                     break
1046                 elif curtime > killtime:
1047                     signum = signal.SIGKILL
1048                 else:
1049                     signum = signal.SIGTERM
1050                 # Lets kill it
1051                 os.kill(self.pid, signum)
1052                 select_timeout = 0.5
1053             else:
1054                 select_timeout = timelimit - curtime + 0.1
1055         else:
1056             select_timeout = 1.0
1057         
1058         if select_timeout > 1.0:
1059             select_timeout = 1.0
1060             
1061         try:
1062             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1063         except select.error,e:
1064             if e[0] != 4:
1065                 raise
1066             else:
1067                 continue
1068         
1069         if not rlist and not wlist and not xlist and self.poll() is not None:
1070             # timeout and process exited, say bye
1071             break
1072
1073         if self.stdin in wlist:
1074             # When select has indicated that the file is writable,
1075             # we can write up to PIPE_BUF bytes without risk
1076             # blocking.  POSIX defines PIPE_BUF >= 512
1077             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1078             input_offset += bytes_written
1079             if input_offset >= len(input):
1080                 self.stdin.close()
1081                 write_set.remove(self.stdin)
1082
1083         if self.stdout in rlist:
1084             data = os.read(self.stdout.fileno(), 1024)
1085             if data == "":
1086                 self.stdout.close()
1087                 read_set.remove(self.stdout)
1088             stdout.append(data)
1089
1090         if self.stderr in rlist:
1091             data = os.read(self.stderr.fileno(), 1024)
1092             if data == "":
1093                 self.stderr.close()
1094                 read_set.remove(self.stderr)
1095             stderr.append(data)
1096     
1097     # All data exchanged.  Translate lists into strings.
1098     if stdout is not None:
1099         stdout = ''.join(stdout)
1100     if stderr is not None:
1101         stderr = ''.join(stderr)
1102
1103     # Translate newlines, if requested.  We cannot let the file
1104     # object do the translation: It is based on stdio, which is
1105     # impossible to combine with select (unless forcing no
1106     # buffering).
1107     if self.universal_newlines and hasattr(file, 'newlines'):
1108         if stdout:
1109             stdout = self._translate_newlines(stdout)
1110         if stderr:
1111             stderr = self._translate_newlines(stderr)
1112
1113     if killed and err_on_timeout:
1114         errcode = self.poll()
1115         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1116     else:
1117         if killed:
1118             self.poll()
1119         else:
1120             self.wait()
1121         return (stdout, stderr)
1122