Adding ICN PlanetLab large experiment scenarios
[nepi.git] / src / nepi / util / server.py
index 79c7ee6..ed9bccd 100644 (file)
-#!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+from nepi.util.constants import DeploymentConfiguration as DC
+
 import base64
 import errno
 import os
+import os.path
+import resource
 import select
+import shutil
+import signal
 import socket
 import sys
 import subprocess
 import threading
+import time
+import traceback
+import re
+import tempfile
+import defer
+import functools
+import collections
+import hashlib
 
 CTRL_SOCK = "ctrl.sock"
+CTRL_PID = "ctrl.pid"
 STD_ERR = "stderr.log"
 MAX_FD = 1024
 
 STOP_MSG = "STOP"
 
+TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
+
+OPENSSH_HAS_PERSIST = None
+
+if hasattr(os, "devnull"):
+    DEV_NULL = os.devnull
+else:
+    DEV_NULL = "/dev/null"
+
+SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
+
+hostbyname_cache = dict()
+
+def gethostbyname(host):
+    hostbyname = hostbyname_cache.get(host)
+    if not hostbyname:
+        hostbyname = socket.gethostbyname(host)
+        hostbyname_cache[host] = hostbyname
+    return hostbyname
+
+def openssh_has_persist():
+    global OPENSSH_HAS_PERSIST
+    if OPENSSH_HAS_PERSIST is None:
+        proc = subprocess.Popen(["ssh","-v"],
+            stdout = subprocess.PIPE,
+            stderr = subprocess.STDOUT,
+            stdin = open("/dev/null","r") )
+        out,err = proc.communicate()
+        proc.wait()
+        
+        vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
+        OPENSSH_HAS_PERSIST = bool(vre.match(out))
+    return OPENSSH_HAS_PERSIST
+
+def shell_escape(s):
+    """ Escapes strings so that they are safe to use as command-line arguments """
+    if SHELL_SAFE.match(s):
+        # safe string - no escaping needed
+        return s
+    else:
+        # unsafe string - escape
+        def escp(c):
+            if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
+                return c
+            else:
+                return "'$'\\x%02x''" % (ord(c),)
+        s = ''.join(map(escp,s))
+        return "'%s'" % (s,)
+
+def eintr_retry(func):
+    import functools
+    @functools.wraps(func)
+    def rv(*p, **kw):
+        retry = kw.pop("_retry", False)
+        for i in xrange(0 if retry else 4):
+            try:
+                return func(*p, **kw)
+            except (select.error, socket.error), args:
+                if args[0] == errno.EINTR:
+                    continue
+                else:
+                    raise 
+            except OSError, e:
+                if e.errno == errno.EINTR:
+                    continue
+                else:
+                    raise
+        else:
+            return func(*p, **kw)
+    return rv
+
 class Server(object):
-    def __init__(self, root_dir = "."):
+    def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL, 
+            environment_setup = "", clean_root = False):
         self._root_dir = root_dir
+        self._clean_root = clean_root
         self._stop = False
         self._ctrl_sock = None
-        self._stderr = None 
+        self._log_level = log_level
+        self._rdbuf = ""
+        self._environment_setup = environment_setup
 
     def run(self):
-        if self.daemonize():
-            self.post_daemonize()
-            self.loop()
+        try:
+            if self.daemonize():
+                self.post_daemonize()
+                self.loop()
+                self.cleanup()
+                # ref: "os._exit(0)"
+                # can not return normally after fork beacuse no exec was done.
+                # This means that if we don't do a os._exit(0) here the code that 
+                # follows the call to "Server.run()" in the "caller code" will be 
+                # executed... but by now it has already been executed after the 
+                # first process (the one that did the first fork) returned.
+                os._exit(0)
+        except:
+            print >>sys.stderr, "SERVER_ERROR."
+            self.log_error()
             self.cleanup()
-            # ref: "os._exit(0)"
-            # can not return normally after fork beacuse no exec was done.
-            # This means that if we don't do a os._exit(0) here the code that 
-            # follows the call to "Server.run()" in the "caller code" will be 
-            # executed... but by now it has already been executed after the 
-            # first process (the one that did the first fork) returned.
             os._exit(0)
+        print >>sys.stderr, "SERVER_READY."
 
     def daemonize(self):
+        # pipes for process synchronization
+        (r, w) = os.pipe()
+        
+        # build root folder
+        root = os.path.normpath(self._root_dir)
+        if self._root_dir not in [".", ""] and os.path.exists(root) \
+                and self._clean_root:
+            shutil.rmtree(root)
+        if not os.path.exists(root):
+            os.makedirs(root, 0755)
+
         pid1 = os.fork()
         if pid1 > 0:
-            # we do os.waitpid to avoid leaving a <defunc> (zombie) process
-            os.waitpid(pid1, 0)
+            os.close(w)
+            while True:
+                try:
+                    os.read(r, 1)
+                except OSError, e: # pragma: no cover
+                    if e.errno == errno.EINTR:
+                        continue
+                    else:
+                        raise
+                break
+            os.close(r)
+            # os.waitpid avoids leaving a <defunc> (zombie) process
+            st = os.waitpid(pid1, 0)[1]
+            if st:
+                raise RuntimeError("Daemonization failed")
             # return 0 to inform the caller method that this is not the 
             # daemonized process
             return 0
+        os.close(r)
 
         # Decouple from parent environment.
         os.chdir(self._root_dir)
@@ -57,67 +178,171 @@ class Server(object):
             os._exit(0)
 
         # close all open file descriptors.
-        for fd in range(2, MAX_FD):
-            try:
-                os.close(fd)
-            except OSError:
-                pass
+        max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+        if (max_fd == resource.RLIM_INFINITY):
+            max_fd = MAX_FD
+        for fd in range(3, max_fd):
+            if fd != w:
+                try:
+                    os.close(fd)
+                except OSError:
+                    pass
 
         # Redirect standard file descriptors.
-        self._stderr = stdout = file(STD_ERR, "a", 0)
-        stdin = open('/dev/null', 'r')
+        stdin = open(DEV_NULL, "r")
+        stderr = stdout = open(STD_ERR, "a", 0)
         os.dup2(stdin.fileno(), sys.stdin.fileno())
+        # NOTE: sys.stdout.write will still be buffered, even if the file
+        # was opened with 0 buffer
         os.dup2(stdout.fileno(), sys.stdout.fileno())
-        os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+        os.dup2(stderr.fileno(), sys.stderr.fileno())
+        
+        # setup environment
+        if self._environment_setup:
+            # parse environment variables and pass to child process
+            # do it by executing shell commands, in case there's some heavy setup involved
+            envproc = subprocess.Popen(
+                [ "bash", "-c", 
+                    "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+                        ( self._environment_setup, ) ],
+                stdin = subprocess.PIPE, 
+                stdout = subprocess.PIPE,
+                stderr = subprocess.PIPE
+            )
+            out,err = envproc.communicate()
+
+            # parse new environment
+            if out:
+                environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+            
+                # apply to current environment
+                for name, value in environment.iteritems():
+                    os.environ[name] = value
+                
+                # apply pythonpath
+                if 'PYTHONPATH' in environment:
+                    sys.path = environment['PYTHONPATH'].split(':') + sys.path
+
+        # create control socket
+        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            self._ctrl_sock.bind(CTRL_SOCK)
+        except socket.error:
+            # Address in use, check pidfile
+            pid = None
+            try:
+                pidfile = open(CTRL_PID, "r")
+                pid = pidfile.read()
+                pidfile.close()
+                pid = int(pid)
+            except:
+                # no pidfile
+                pass
+            
+            if pid is not None:
+                # Check process liveliness
+                if not os.path.exists("/proc/%d" % (pid,)):
+                    # Ok, it's dead, clean the socket
+                    os.remove(CTRL_SOCK)
+            
+            # try again
+            self._ctrl_sock.bind(CTRL_SOCK)
+            
+        self._ctrl_sock.listen(0)
+        
+        # Save pidfile
+        pidfile = open(CTRL_PID, "w")
+        pidfile.write(str(os.getpid()))
+        pidfile.close()
+
+        # let the parent process know that the daemonization is finished
+        os.write(w, "\n")
+        os.close(w)
         return 1
 
     def post_daemonize(self):
-        pass
+        os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
+        # QT, for some strange reason, redefines the SIGCHILD handler to write
+        # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
+        # Server dameonization closes all file descriptors from fileno '3',
+        # but the overloaded handler (inherited by the forked process) will
+        # keep trying to write the \0 to fileno 'x', which might have been reused 
+        # after closing, for other operations. This is bad bad bad when fileno 'x'
+        # is in use for communication pouroses, because unexpected \0 start
+        # appearing in the communication messages... this is exactly what happens 
+        # when using netns in daemonized form. Thus, be have no other alternative than
+        # restoring the SIGCHLD handler to the default here.
+        import signal
+        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
 
     def loop(self):
-        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        self._ctrl_sock.bind(CTRL_SOCK)
-        self._ctrl_sock.listen(0)
         while not self._stop:
             conn, addr = self._ctrl_sock.accept()
+            self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
             conn.settimeout(5)
-            while True:
+            while not self._stop:
                 try:
                     msg = self.recv_msg(conn)
                 except socket.timeout, e:
+                    #self.log_error("SERVER recv_msg: connection timedout ")
+                    continue
+                
+                if not msg:
+                    self.log_error("CONNECTION LOST")
                     break
                     
                 if msg == STOP_MSG:
                     self._stop = True
-                    try:
-                        reply = self.stop_action()
-                    except e:
-                        sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
-                    self.send_reply(conn, reply)
-                    break
+                    reply = self.stop_action()
                 else:
-                    try:
-                        reply = self.reply_action(msg)
-                    except e:
-                        sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
+                    reply = self.reply_action(msg)
+                
+                try:
                     self.send_reply(conn, reply)
-            conn.close()
+                except socket.error:
+                    self.log_error()
+                    self.log_error("NOTICE: Awaiting for reconnection")
+                    break
+            try:
+                conn.close()
+            except:
+                # Doesn't matter
+                self.log_error()
 
     def recv_msg(self, conn):
-       data = conn.recv(1024)
-       decoded = base64.b64decode(data)
-       return decoded.rstrip()
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
+            try:
+                chunk = conn.recv(1024)
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
+                    raise
+                else:
+                    continue
+            if chunk:
+                data.append(chunk)
+            else:
+                # empty chunk = EOF
+                break
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
+        decoded = base64.b64decode(data)
+        return decoded.rstrip()
 
     def send_reply(self, conn, reply):
-       encoded = base64.b64encode(reply)
-       conn.send("%s\n" % encoded)
+        encoded = base64.b64encode(reply)
+        conn.send("%s\n" % encoded)
        
     def cleanup(self):
         try:
             self._ctrl_sock.close()
             os.remove(CTRL_SOCK)
-        except e:
-            sys.stderr.write("ERROR: %s\n" % sys.exc_info()[0])
+        except:
+            self.log_error()
 
     def stop_action(self):
         return "Stopping server"
@@ -125,18 +350,42 @@ class Server(object):
     def reply_action(self, msg):
         return "Reply to: %s" % msg
 
+    def log_error(self, text = None, context = ''):
+        if text == None:
+            text = traceback.format_exc()
+        date = time.strftime("%Y-%m-%d %H:%M:%S")
+        if context:
+            context = " (%s)" % (context,)
+        sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
+        return text
+
+    def log_debug(self, text):
+        if self._log_level == DC.DEBUG_LEVEL:
+            date = time.strftime("%Y-%m-%d %H:%M:%S")
+            sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
+
 class Forwarder(object):
     def __init__(self, root_dir = "."):
         self._ctrl_sock = None
         self._root_dir = root_dir
         self._stop = False
+        self._rdbuf = ""
 
     def forward(self):
         self.connect()
+        print >>sys.stderr, "FORWARDER_READY."
         while not self._stop:
             data = self.read_data()
+            if not data:
+                # Connection to client lost
+                break
             self.send_to_server(data)
+            
             data = self.recv_from_server()
+            if not data:
+                # Connection to server lost
+                raise IOError, "Connection to server lost while "\
+                    "expecting response"
             self.write_data(data)
         self.disconnect()
 
@@ -145,13 +394,14 @@ class Forwarder(object):
 
     def write_data(self, data):
         sys.stdout.write(data)
+        # sys.stdout.write is buffered, this is why we need to do a flush()
         sys.stdout.flush()
 
     def send_to_server(self, data):
         try:
             self._ctrl_sock.send(data)
-        except IOError, e:
-            if e.errno == errno.EPIPE:
+        except (IOError, socket.error), e:
+            if e[0] == errno.EPIPE:
                 self.connect()
                 self._ctrl_sock.send(data)
             else:
@@ -162,7 +412,26 @@ class Forwarder(object):
             self._stop = True
 
     def recv_from_server(self):
-        return self._ctrl_sock.recv(1024)
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
+            try:
+                chunk = self._ctrl_sock.recv(1024)
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
+                    raise
+                continue
+            if chunk:
+                data.append(chunk)
+            else:
+                # empty chunk = EOF
+                break
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
+        return data+'\n'
  
     def connect(self):
         self.disconnect()
@@ -177,26 +446,694 @@ class Forwarder(object):
             pass
 
 class Client(object):
-    def __init__(self, root_dir = "."):
-        self._process = subprocess.Popen(
-                ["python", "-c", 
-                "from nepi.util import server;c=server.Forwarder('%s');\
-                        c.forward()" % root_dir
-                ],
-                stdin = subprocess.PIPE, 
-                stdout = subprocess.PIPE, 
-                env = os.environ)
+    def __init__(self, root_dir = ".", host = None, port = None, user = None, 
+            agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
+            environment_setup = ""):
+        self.root_dir = root_dir
+        self.addr = (host, port)
+        self.user = user
+        self.agent = agent
+        self.sudo = sudo
+        self.communication = communication
+        self.environment_setup = environment_setup
+        self._stopped = False
+        self._deferreds = collections.deque()
+        self.connect()
+    
+    def __del__(self):
+        if self._process.poll() is None:
+            os.kill(self._process.pid, signal.SIGTERM)
+        self._process.wait()
+        
+    def connect(self):
+        root_dir = self.root_dir
+        (host, port) = self.addr
+        user = self.user
+        agent = self.agent
+        sudo = self.sudo
+        communication = self.communication
+        
+        python_code = "from nepi.util import server;c=server.Forwarder(%r);\
+                c.forward()" % (root_dir,)
 
+        self._process = popen_python(python_code, 
+                    communication = communication,
+                    host = host, 
+                    port = port, 
+                    user = user, 
+                    agent = agent, 
+                    sudo = sudo, 
+                    environment_setup = self.environment_setup)
+               
+        # Wait for the forwarder to be ready, otherwise nobody
+        # will be able to connect to it
+        err = []
+        helo = "nope"
+        while helo:
+            helo = self._process.stderr.readline()
+            if helo == 'FORWARDER_READY.\n':
+                break
+            err.append(helo)
+        else:
+            raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
+        
     def send_msg(self, msg):
         encoded = base64.b64encode(msg)
         data = "%s\n" % encoded
-        self._process.stdin.write(data)
+        
+        try:
+            self._process.stdin.write(data)
+        except (IOError, ValueError):
+            # dead process, poll it to un-zombify
+            self._process.poll()
+            
+            # try again after reconnect
+            # If it fails again, though, give up
+            self.connect()
+            self._process.stdin.write(data)
 
     def send_stop(self):
         self.send_msg(STOP_MSG)
+        self._stopped = True
 
-    def read_reply(self):
+    def defer_reply(self, transform=None):
+        defer_entry = []
+        self._deferreds.append(defer_entry)
+        return defer.Defer(
+            functools.partial(self.read_reply, defer_entry, transform)
+        )
+        
+    def _read_reply(self):
         data = self._process.stdout.readline()
         encoded = data.rstrip() 
+        if not encoded:
+            # empty == eof == dead process, poll it to un-zombify
+            self._process.poll()
+            
+            raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
         return base64.b64decode(encoded)
+    
+    def read_reply(self, which=None, transform=None):
+        # Test to see if someone did it already
+        if which is not None and len(which):
+            # Ok, they did it...
+            # ...just return the deferred value
+            if transform:
+                return transform(which[0])
+            else:
+                return which[0]
+        
+        # Process all deferreds until the one we're looking for
+        # or until the queue is empty
+        while self._deferreds:
+            try:
+                deferred = self._deferreds.popleft()
+            except IndexError:
+                # emptied
+                break
+            
+            deferred.append(self._read_reply())
+            if deferred is which:
+                # We reached the one we were looking for
+                if transform:
+                    return transform(deferred[0])
+                else:
+                    return deferred[0]
+        
+        if which is None:
+            # They've requested a synchronous read
+            if transform:
+                return transform(self._read_reply())
+            else:
+                return self._read_reply()
+
+def _make_server_key_args(server_key, host, port, args):
+    """ 
+    Returns a reference to the created temporary file, and adds the
+    corresponding arguments to the given argument list.
+    
+    Make sure to hold onto it until the process is done with the file
+    """
+    if port is not None:
+        host = '%s:%s' % (host,port)
+    # Create a temporary server key file
+    tmp_known_hosts = tempfile.NamedTemporaryFile()
+   
+    hostbyname = gethostbyname(host) 
+
+    # Add the intended host key
+    tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
+    
+    # If we're not in strict mode, add user-configured keys
+    if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
+        user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
+        if os.access(user_hosts_path, os.R_OK):
+            f = open(user_hosts_path, "r")
+            tmp_known_hosts.write(f.read())
+            f.close()
+        
+    tmp_known_hosts.flush()
+    
+    args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+    
+    return tmp_known_hosts
+
+def popen_ssh_command(command, host, port, user, agent, 
+        stdin="", 
+        ident_key = None,
+        server_key = None,
+        tty = False,
+        timeout = None,
+        retry = 0,
+        err_on_timeout = True,
+        connect_timeout = 60,
+        persistent = True,
+        hostip = None):
+    """
+    Executes a remote commands, returns ((stdout,stderr),process)
+    """
+   
+    tmp_known_hosts = None
+    args = ['ssh', '-C',
+            # Don't bother with localhost. Makes test easier
+            '-o', 'NoHostAuthenticationForLocalhost=yes',
+            '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+            '-o', 'ConnectionAttempts=3',
+            '-o', 'ServerAliveInterval=30',
+            '-o', 'TCPKeepAlive=yes',
+            '-l', user, hostip or host]
+    if persistent and openssh_has_persist():
+        args.extend([
+            '-o', 'ControlMaster=auto',
+            '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
+            '-o', 'ControlPersist=60' ])
+    if agent:
+        args.append('-A')
+    if port:
+        args.append('-p%d' % port)
+    if ident_key:
+        args.extend(('-i', ident_key))
+    if tty:
+        args.append('-t')
+        args.append('-t')
+    if server_key:
+        # Create a temporary server key file
+        tmp_known_hosts = _make_server_key_args(
+            server_key, host, port, args)
+    args.append(command)
+
+    for x in xrange(retry or 3):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+    
+        try:
+            out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+            if TRACE:
+                print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
+
+            if proc.poll():
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    continue
+                elif :
+                    ControlSocket /tmp/nepi_ssh-inria_alina@planetlab04.cnds.unibe.ch:22 already exists, disabling multiplexing
+                    # SSH error, can safely retry (but need to delete controlpath file)
+                    # TODO: delete file
+                    continue
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    continue
+            break
+        except RuntimeError,e:
+            if TRACE:
+                print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
+                        host, " ".join(args), out, err, e.args)
+
+            if retry <= 0:
+                raise
+            retry -= 1
+        
+    return ((out, err), proc)
+
+def popen_scp(source, dest, 
+        port = None, 
+        agent = None, 
+        recursive = False,
+        ident_key = None,
+        server_key = None):
+    """
+    Copies from/to remote sites.
+    
+    Source and destination should have the user and host encoded
+    as per scp specs.
+    
+    If source is a file object, a special mode will be used to
+    create the remote file with the same contents.
+    
+    If dest is a file object, the remote file (source) will be
+    read and written into dest.
+    
+    In these modes, recursive cannot be True.
+    
+    Source can be a list of files to copy to a single destination,
+    in which case it is advised that the destination be a folder.
+    """
+    
+    if TRACE:
+        print "scp", source, dest
+    
+    if isinstance(source, file) and source.tell() == 0:
+        source = source.name
+    elif hasattr(source, 'read'):
+        tmp = tempfile.NamedTemporaryFile()
+        while True:
+            buf = source.read(65536)
+            if buf:
+                tmp.write(buf)
+            else:
+                break
+        tmp.seek(0)
+        source = tmp.name
+    
+    if isinstance(source, file) or isinstance(dest, file) \
+            or hasattr(source, 'read')  or hasattr(dest, 'write'):
+        assert not recursive
+        
+        # Parse source/destination as <user>@<server>:<path>
+        if isinstance(dest, basestring) and ':' in dest:
+            remspec, path = dest.split(':',1)
+        elif isinstance(source, basestring) and ':' in source:
+            remspec, path = source.split(':',1)
+        else:
+            raise ValueError, "Both endpoints cannot be local"
+        user,host = remspec.rsplit('@',1)
+        tmp_known_hosts = None
+        
+        args = ['ssh', '-l', user, '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-o', 'ConnectTimeout=60',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes',
+                host ]
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
+                '-o', 'ControlPersist=60' ])
+        if port:
+            args.append('-P%d' % port)
+        if ident_key:
+            args.extend(('-i', ident_key))
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
+        
+        if isinstance(source, file) or hasattr(source, 'read'):
+            args.append('cat > %s' % (shell_escape(path),))
+        elif isinstance(dest, file) or hasattr(dest, 'write'):
+            args.append('cat %s' % (shell_escape(path),))
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
+        
+        # connects to the remote host and starts a remote connection
+        if isinstance(source, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
+        elif isinstance(dest, file):
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = source)
+            err = proc.stderr.read()
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,err), proc)
+        elif hasattr(source, 'read'):
+            # file-like (but not file) source
+            proc = subprocess.Popen(args, 
+                    stdout = open('/dev/null','w'),
+                    stderr = subprocess.PIPE,
+                    stdin = subprocess.PIPE)
+            
+            buf = None
+            err = []
+            while True:
+                if not buf:
+                    buf = source.read(4096)
+                if not buf:
+                    #EOF
+                    break
+                
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr],
+                    [proc.stdin],
+                    [proc.stderr,proc.stdin])
+                
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdin in wrdy:
+                    proc.stdin.write(buf)
+                    buf = None
+                
+                if broken:
+                    break
+            proc.stdin.close()
+            err.append(proc.stderr.read())
+                
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,''.join(err)), proc)
+        elif hasattr(dest, 'write'):
+            # file-like (but not file) dest
+            proc = subprocess.Popen(args, 
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE,
+                    stdin = open('/dev/null','w'))
+            
+            buf = None
+            err = []
+            while True:
+                rdrdy, wrdy, broken = select.select(
+                    [proc.stderr, proc.stdout],
+                    [],
+                    [proc.stderr, proc.stdout])
+                
+                if proc.stderr in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    err.append(os.read(proc.stderr.fileno(), 4096))
+                
+                if proc.stdout in rdrdy:
+                    # use os.read for fully unbuffered behavior
+                    buf = os.read(proc.stdout.fileno(), 4096)
+                    dest.write(buf)
+                    
+                    if not buf:
+                        #EOF
+                        break
+                
+                if broken:
+                    break
+            err.append(proc.stderr.read())
+                
+            proc._known_hosts = tmp_known_hosts
+            eintr_retry(proc.wait)()
+            return ((None,''.join(err)), proc)
+        else:
+            raise AssertionError, "Unreachable code reached! :-Q"
+    else:
+        # Parse destination as <user>@<server>:<path>
+        if isinstance(dest, basestring) and ':' in dest:
+            remspec, path = dest.split(':',1)
+        elif isinstance(source, basestring) and ':' in source:
+            remspec, path = source.split(':',1)
+        else:
+            raise ValueError, "Both endpoints cannot be local"
+        user,host = remspec.rsplit('@',1)
+        
+        # plain scp
+        tmp_known_hosts = None
+        args = ['scp', '-q', '-p', '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-o', 'ConnectTimeout=60',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes' ]
+                
+        if port:
+            args.append('-P%d' % port)
+        if recursive:
+            args.append('-r')
+        if ident_key:
+            args.extend(('-i', ident_key))
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
+        if isinstance(source,list):
+            args.extend(source)
+        else:
+            if openssh_has_persist():
+                args.extend([
+                    '-o', 'ControlMaster=auto',
+                    '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
+            args.append(source)
+        args.append(dest)
+
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        proc._known_hosts = tmp_known_hosts
+        
+        comm = proc.communicate()
+        eintr_retry(proc.wait)()
+        return (comm, proc)
+
+def decode_and_execute():
+    # The python code we want to execute might have characters that 
+    # are not compatible with the 'inline' mode we are using. To avoid
+    # problems we receive the encoded python code in base64 as a input 
+    # stream and decode it for execution.
+    import base64, os
+    cmd = ""
+    while True:
+        try:
+            cmd += os.read(0, 1)# one byte from stdin
+        except OSError, e:            
+            if e.errno == errno.EINTR:
+                continue
+            else:
+                raise
+        if cmd[-1] == "\n": 
+            break
+    cmd = base64.b64decode(cmd)
+    # Uncomment for debug
+    #os.write(2, "Executing python code: %s\n" % cmd)
+    os.write(1, "OK\n") # send a sync message
+    exec(cmd)
+
+def popen_python(python_code, 
+        communication = DC.ACCESS_LOCAL,
+        host = None, 
+        port = None, 
+        user = None, 
+        agent = False, 
+        python_path = None,
+        ident_key = None,
+        server_key = None,
+        tty = False,
+        sudo = False, 
+        environment_setup = ""):
+
+    cmd = ""
+    if python_path:
+        python_path.replace("'", r"'\''")
+        cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+        cmd += " ; "
+    if environment_setup:
+        cmd += environment_setup
+        cmd += " ; "
+    # Uncomment for debug (to run everything under strace)
+    # We had to verify if strace works (cannot nest them)
+    #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+    #cmd += "$CMD "
+    #cmd += "strace -f -tt -s 200 -o strace$$.out "
+    import nepi
+    cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
+        repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
+    )
+
+    if sudo:
+        if ';' in cmd:
+            cmd = "sudo bash -c " + shell_escape(cmd)
+        else:
+            cmd = "sudo " + cmd
+
+    if communication == DC.ACCESS_SSH:
+        tmp_known_hosts = None
+        args = ['ssh', '-C',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes',
+                '-l', user, host]
+        if agent:
+            args.append('-A')
+        if port:
+            args.append('-p%d' % port)
+        if ident_key:
+            args.extend(('-i', ident_key))
+        if tty:
+            args.append('-t')
+        if server_key:
+            # Create a temporary server key file
+            tmp_known_hosts = _make_server_key_args(
+                server_key, host, port, args)
+        args.append(cmd)
+    else:
+        args = [ "/bin/bash", "-c", cmd ]
+
+    # connects to the remote host and starts a remote
+    proc = subprocess.Popen(args,
+            shell = False, 
+            stdout = subprocess.PIPE,
+            stdin = subprocess.PIPE, 
+            stderr = subprocess.PIPE)
+
+    if communication == DC.ACCESS_SSH:
+        proc._known_hosts = tmp_known_hosts
+
+    # send the command to execute
+    os.write(proc.stdin.fileno(),
+            base64.b64encode(python_code) + "\n")
+    while True: 
+        try:
+            msg = os.read(proc.stdout.fileno(), 3)
+            break
+        except OSError, e:            
+            if e.errno == errno.EINTR:
+                continue
+            else:
+                raise
+    
+    if msg != "OK\n":
+        raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
+            msg, proc.stdout.read(), proc.stderr.read())
+
+    return proc
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+    read_set = []
+    write_set = []
+    stdout = None # Return
+    stderr = None # Return
+    
+    killed = False
+    
+    if timeout is not None:
+        timelimit = time.time() + timeout
+        killtime = timelimit + 4
+        bailtime = timelimit + 4
+
+    if self.stdin:
+        # Flush stdio buffer.  This might block, if the user has
+        # been writing to .stdin in an uncontrolled fashion.
+        self.stdin.flush()
+        if input:
+            write_set.append(self.stdin)
+        else:
+            self.stdin.close()
+    if self.stdout:
+        read_set.append(self.stdout)
+        stdout = []
+    if self.stderr:
+        read_set.append(self.stderr)
+        stderr = []
+
+    input_offset = 0
+    while read_set or write_set:
+        if timeout is not None:
+            curtime = time.time()
+            if timeout is None or curtime > timelimit:
+                if curtime > bailtime:
+                    break
+                elif curtime > killtime:
+                    signum = signal.SIGKILL
+                else:
+                    signum = signal.SIGTERM
+                # Lets kill it
+                os.kill(self.pid, signum)
+                select_timeout = 0.5
+            else:
+                select_timeout = timelimit - curtime + 0.1
+        else:
+            select_timeout = 1.0
+        
+        if select_timeout > 1.0:
+            select_timeout = 1.0
+            
+        try:
+            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+        except select.error,e:
+            if e[0] != 4:
+                raise
+            else:
+                continue
+        
+        if not rlist and not wlist and not xlist and self.poll() is not None:
+            # timeout and process exited, say bye
+            break
+
+        if self.stdin in wlist:
+            # When select has indicated that the file is writable,
+            # we can write up to PIPE_BUF bytes without risk
+            # blocking.  POSIX defines PIPE_BUF >= 512
+            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+            input_offset += bytes_written
+            if input_offset >= len(input):
+                self.stdin.close()
+                write_set.remove(self.stdin)
+
+        if self.stdout in rlist:
+            data = os.read(self.stdout.fileno(), 1024)
+            if data == "":
+                self.stdout.close()
+                read_set.remove(self.stdout)
+            stdout.append(data)
+
+        if self.stderr in rlist:
+            data = os.read(self.stderr.fileno(), 1024)
+            if data == "":
+                self.stderr.close()
+                read_set.remove(self.stderr)
+            stderr.append(data)
+    
+    # All data exchanged.  Translate lists into strings.
+    if stdout is not None:
+        stdout = ''.join(stdout)
+    if stderr is not None:
+        stderr = ''.join(stderr)
+
+    # Translate newlines, if requested.  We cannot let the file
+    # object do the translation: It is based on stdio, which is
+    # impossible to combine with select (unless forcing no
+    # buffering).
+    if self.universal_newlines and hasattr(file, 'newlines'):
+        if stdout:
+            stdout = self._translate_newlines(stdout)
+        if stderr:
+            stderr = self._translate_newlines(stderr)
+
+    if killed and err_on_timeout:
+        errcode = self.poll()
+        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+    else:
+        if killed:
+            self.poll()
+        else:
+            self.wait()
+        return (stdout, stderr)