Merging with HEAD
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
index a9f4888..0ea63b9 100644 (file)
@@ -6,11 +6,15 @@ import os
 import os.path
 import rspawn
 import subprocess
+import threading
+import base64
+import time
+import re
 
 from nepi.util import server
 
 class TunProtoBase(object):
-    def __init__(self, local, peer, home_path):
+    def __init__(self, local, peer, home_path, key):
         # Weak references, since ifaces do have a reference to the
         # tunneling protocol implementation - we don't want strong
         # circular references.
@@ -19,12 +23,15 @@ class TunProtoBase(object):
         
         self.port = 15000
         self.mode = 'pl-tun'
+        self.key = key
         
         self.home_path = home_path
         
+        self._launcher = None
         self._started = False
         self._pid = None
         self._ppid = None
+        self._if_name = None
 
     def _make_home(self):
         local = self.local()
@@ -60,32 +67,44 @@ class TunProtoBase(object):
             raise RuntimeError, "Unconnected TUN - missing node"
         
         # Install the tun_connect script and tunalloc utility
-        source = os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py')
+        from nepi.util import tunchannel
+        sources = [
+            os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
+            os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
+            re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
+        ]
         dest = "%s@%s:%s" % (
             local.node.slicename, local.node.hostname, 
             os.path.join(self.home_path,'.'),)
         (out,err),proc = server.popen_scp(
-            source,
+            sources,
             dest,
             ident_key = local.node.ident_path,
             server_key = local.node.server_key
             )
     
         if proc.wait():
-            raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
-
-        source = os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c')
-        (out,err),proc = server.popen_scp(
-            source,
-            dest,
-            ident_key = local.node.ident_path,
-            server_key = local.node.server_key
-            )
-    
-        if proc.wait():
-            raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
+            raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
+        
+        # Make sure all dependencies are satisfied
+        local.node.wait_dependencies()
 
-        cmd = "cd %s && gcc -shared tunalloc.c -o tunalloc.so" % (server.shell_escape(self.home_path),)
+        cmd = ( (
+            "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
+            + ( " && "
+                "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
+                "mkdir -p python-passfd && "
+                "cd python-passfd && "
+                "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
+                "python setup.py build && "
+                "python setup.py install --install-lib .. "
+                
+                if local.tun_proto == "fd" else ""
+            ) )
+        % {
+            'home' : server.shell_escape(self.home_path),
+            'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
+        } )
         (out,err),proc = server.popen_ssh_command(
             cmd,
             host = local.node.hostname,
@@ -99,7 +118,6 @@ class TunProtoBase(object):
         if proc.wait():
             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
         
-    
     def launch(self, check_proto, listen, extra_args=[]):
         peer = self.peer()
         local = self.local()
@@ -117,29 +135,52 @@ class TunProtoBase(object):
         local_mask = local.netprefix
         local_snat = local.snat
         local_txq  = local.txqueuelen
+        local_p2p  = local.pointopoint
         
+        if not local_p2p and hasattr(peer, 'address'):
+            local_p2p = peer.address
+
         if check_proto != peer_proto:
             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
         
-        if not listen and (not peer_port or not peer_addr):
+        if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
         
-        if listen and (not local_port or not local_addr or not local_mask):
+        if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
         
         args = ["python", "tun_connect.py", 
             "-m", str(self.mode),
-            "-p", str(local_port if listen else peer_port),
             "-A", str(local_addr),
             "-M", str(local_mask)]
         
+        if check_proto == 'fd':
+            passfd_arg = str(peer_addr)
+            if passfd_arg.startswith('\x00'):
+                # cannot shell_encode null characters :(
+                passfd_arg = "base64:"+base64.b64encode(passfd_arg)
+            else:
+                passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
+            args.extend([
+                "--pass-fd", passfd_arg
+            ])
+        else:
+            args.extend([
+                "-p", str(local_port if listen else peer_port),
+                "-k", str(self.key)
+            ])
+        
         if local_snat:
             args.append("-S")
+        if local_p2p:
+            args.extend(("-P",str(local_p2p)))
         if local_txq:
             args.extend(("-Q",str(local_txq)))
+        if not local_cap:
+            args.append("-N")
         if extra_args:
             args.extend(map(str,extra_args))
-        if not listen:
+        if not listen and check_proto != 'fd':
             args.append(str(peer_addr))
         
         self._make_home()
@@ -153,7 +194,7 @@ class TunProtoBase(object):
             pidfile = './pid',
             home = self.home_path,
             stdin = '/dev/null',
-            stdout = 'capture' if local_cap else '/dev/null',
+            stdout = 'capture',
             stderr = rspawn.STDOUT,
             sudo = True,
             
@@ -166,9 +207,85 @@ class TunProtoBase(object):
             )
         
         if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-
+            raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
+        
         self._started = True
+    
+    def _launch_and_wait(self, *p, **kw):
+        local = self.local()
+        
+        self.launch(*p, **kw)
+        
+        # Wait for the process to be started
+        while self.status() == rspawn.NOT_STARTED:
+            time.sleep(1.0)
+        
+        # Wait for the connection to be established
+        for spin in xrange(30):
+            if self.status() != rspawn.RUNNING:
+                break
+            
+            (out,err),proc = server.popen_ssh_command(
+                "cd %(home)s ; grep -c Connected capture" % dict(
+                    home = server.shell_escape(self.home_path)),
+                host = local.node.hostname,
+                port = None,
+                user = local.node.slicename,
+                agent = None,
+                ident_key = local.node.ident_path,
+                server_key = local.node.server_key
+                )
+            
+            if proc.wait():
+                break
+            
+            if out.strip() != '0':
+                break
+            
+            time.sleep(1.0)
+    
+    @property
+    def if_name(self):
+        if not self._if_name:
+            # Inspect the trace to check the assigned iface
+            local = self.local()
+            if local:
+                for spin in xrange(30):
+                    (out,err),proc = server.popen_ssh_command(
+                        "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
+                            home = server.shell_escape(self.home_path)),
+                        host = local.node.hostname,
+                        port = None,
+                        user = local.node.slicename,
+                        agent = None,
+                        ident_key = local.node.ident_path,
+                        server_key = local.node.server_key
+                        )
+                    
+                    if proc.wait():
+                        return
+                    
+                    out = out.strip()
+                    
+                    match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
+                    if match:
+                        self._if_name = match.group(1)
+        return self._if_name
+    
+    def async_launch(self, check_proto, listen, extra_args=[]):
+        if not self._launcher:
+            self._launcher = threading.Thread(
+                target = self._launch_and_wait,
+                args = (check_proto, listen, extra_args))
+            self._launcher.start()
+    
+    def async_launch_wait(self):
+        if self._launcher:
+            self._launcher.join()
+            if not self._started:
+                raise RuntimeError, "Failed to launch TUN forwarder"
+        elif not self._started:
+            self.launch()
 
     def checkpid(self):            
         local = self.local()
@@ -295,40 +412,100 @@ class TunProtoBase(object):
         
 
 class TunProtoUDP(TunProtoBase):
-    def __init__(self, local, peer, home_path, listening):
-        super(TunProtoTCP, self).__init__(local, peer, home_path)
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TunProtoUDP, self).__init__(local, peer, home_path, key)
+        self.listening = listening
+    
+    def prepare(self):
+        pass
+    
+    def setup(self):
+        self.async_launch('udp', False, ("-u",str(self.port)))
+    
+    def shutdown(self):
+        self.kill()
+
+    def launch(self, check_proto='udp', listen=False, extra_args=None):
+        if extra_args is None:
+            extra_args = ("-u",str(self.port))
+        super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
+
+class TunProtoFD(TunProtoBase):
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TunProtoFD, self).__init__(local, peer, home_path, key)
         self.listening = listening
     
     def prepare(self):
         pass
     
     def setup(self):
-        self.launch('udp', False, ("-U",))
+        self.async_launch('fd', False)
     
     def shutdown(self):
         self.kill()
 
+    def launch(self, check_proto='fd', listen=False, extra_args=[]):
+        super(TunProtoFD, self).launch(check_proto, listen, extra_args)
+
 class TunProtoTCP(TunProtoBase):
-    def __init__(self, local, peer, home_path, listening):
-        super(TunProtoTCP, self).__init__(local, peer, home_path)
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TunProtoTCP, self).__init__(local, peer, home_path, key)
         self.listening = listening
     
     def prepare(self):
         if self.listening:
-            self.launch('tcp', True)
+            self.async_launch('tcp', True)
     
     def setup(self):
         if not self.listening:
-            self.launch('tcp', False)
+            # make sure our peer is ready
+            peer = self.peer()
+            if peer and peer.peer_proto_impl:
+                peer.peer_proto_impl.async_launch_wait()
+            
+            if not self._started:
+                self.launch('tcp', False)
+        else:
+            # make sure WE are ready
+            self.async_launch_wait()
         
         self.checkpid()
     
     def shutdown(self):
         self.kill()
 
-PROTO_MAP = {
+    def launch(self, check_proto='tcp', listen=None, extra_args=[]):
+        if listen is None:
+            listen = self.listening
+        super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
+
+class TapProtoUDP(TunProtoUDP):
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
+        self.mode = 'pl-tap'
+
+class TapProtoTCP(TunProtoTCP):
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
+        self.mode = 'pl-tap'
+
+class TapProtoFD(TunProtoFD):
+    def __init__(self, local, peer, home_path, key, listening):
+        super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
+        self.mode = 'pl-tap'
+
+
+
+TUN_PROTO_MAP = {
     'tcp' : TunProtoTCP,
     'udp' : TunProtoUDP,
+    'fd'  : TunProtoFD,
+}
+
+TAP_PROTO_MAP = {
+    'tcp' : TapProtoTCP,
+    'udp' : TapProtoUDP,
+    'fd'  : TapProtoFD,
 }