Merge TCP handshake stuff
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sun, 4 Sep 2011 17:27:37 +0000 (19:27 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sun, 4 Sep 2011 17:27:37 +0000 (19:27 +0200)
1  2 
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/tunchannel.py

@@@ -139,7 -139,6 +139,7 @@@ class TunIface(object)
          
          # These get initialized when the iface is connected to any filter
          self.filter_module = None
 +        self.multicast_forwarder = None
          
          # These get initialized when the iface is configured
          self.external_iface = None
          if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
              raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
      
-     def _impl_instance(self, home_path, listening):
+     def _impl_instance(self, home_path):
          impl = self._PROTO_MAP[self.peer_proto](
-             self, self.peer_iface, home_path, self.tun_key, listening)
+             self, self.peer_iface, home_path, self.tun_key)
          impl.port = self.tun_port
 +        impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
          return impl
      
      def recover(self):
          else:
              self._delay_recover = True
      
-     def prepare(self, home_path, listening):
-         if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
+     def prepare(self, home_path):
+         if not self.peer_iface and (self.peer_proto and self.peer_addr and self.peer_port):
              # Ad-hoc peer_iface
              self.peer_iface = _CrossIface(
                  self.peer_proto,
                  self.peer_cipher)
          if self.peer_iface:
              if not self.peer_proto_impl:
-                 self.peer_proto_impl = self._impl_instance(home_path, listening)
+                 self.peer_proto_impl = self._impl_instance(home_path)
              if self._delay_recover:
                  self.peer_proto_impl.recover()
-             else:
-                 self.peer_proto_impl.prepare()
      
-     def setup(self):
+     def launch(self):
          if self.peer_proto_impl:
-             self.peer_proto_impl.setup()
+             self.peer_proto_impl.launch()
      
      def cleanup(self):
          if self.peer_proto_impl:
              self.peer_proto_impl.destroy()
              self.peer_proto_impl = None
  
-     def async_launch_wait(self):
+     def wait(self):
          if self.peer_proto_impl:
-             self.peer_proto_impl.async_launch_wait()
+             self.peer_proto_impl.wait()
  
      def sync_trace(self, local_dir, whichtrace, tracemap = None):
          if self.peer_proto_impl:
@@@ -31,9 -31,6 +31,9 @@@ NETPIPE = "NetPipe
  TUNFILTER = "TunFilter"
  CLASSQUEUEFILTER = "ClassQueueFilter"
  TOSQUEUEFILTER = "TosQueueFilter"
 +MULTICASTFORWARDER = "MulticastForwarder"
 +MULTICASTANNOUNCER = "MulticastAnnouncer"
 +MULTICASTROUTER = "MulticastRouter"
  
  TUNFILTERS = (TUNFILTER, CLASSQUEUEFILTER, TOSQUEUEFILTER)
  TAPFILTERS = (TUNFILTER, )
@@@ -195,9 -192,9 +195,9 @@@ def crossconnect_filter_peer_both(proto
      crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
      crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
  
 -def connect_dep(testbed_instance, node_guid, app_guid):
 -    node = testbed_instance._elements[node_guid]
 -    app = testbed_instance._elements[app_guid]
 +def connect_dep(testbed_instance, node_guid, app_guid, node=None, app=None):
 +    node = node or testbed_instance._elements[node_guid]
 +    app = app or testbed_instance._elements[app_guid]
      app.node = node
      
      if app.depends:
      if app.rpmFusion:
          node.rpmFusion = True
  
 +def connect_forwarder(testbed_instance, node_guid, fwd_guid):
 +    node = testbed_instance._elements[node_guid]
 +    fwd = testbed_instance._elements[fwd_guid]
 +    node.multicast_forwarder = fwd
 +    
 +    if fwd.router:
 +        connect_dep(testbed_instance, node_guid, None, app=fwd.router)
 +
 +    connect_dep(testbed_instance, node_guid, fwd_guid)
 +
 +def connect_router(testbed_instance, fwd_guid, router_guid):
 +    fwd = testbed_instance._elements[fwd_guid]
 +    router = testbed_instance._elements[router_guid]
 +    fwd.router = router
 +    
 +    if fwd.node:
 +        connect_dep(testbed_instance, None, router_guid, node=fwd.node)
 +
  def connect_node_netpipe(testbed_instance, node_guid, netpipe_guid):
      node = testbed_instance._elements[node_guid]
      netpipe = testbed_instance._elements[netpipe_guid]
@@@ -351,33 -330,6 +351,33 @@@ def create_ns3_dependency(testbed_insta
      
      testbed_instance.elements[guid] = element
  
 +def create_multicast_forwarder(testbed_instance, guid):
 +    parameters = testbed_instance._get_parameters(guid)
 +    element = testbed_instance._make_multicast_forwarder(parameters)
 +    
 +    # Just inject configuration stuff
 +    element.home_path = "nepi-mcfwd-%s" % (guid,)
 +    
 +    testbed_instance.elements[guid] = element
 +
 +def create_multicast_announcer(testbed_instance, guid):
 +    parameters = testbed_instance._get_parameters(guid)
 +    element = testbed_instance._make_multicast_announcer(parameters)
 +    
 +    # Just inject configuration stuff
 +    element.home_path = "nepi-mcann-%s" % (guid,)
 +    
 +    testbed_instance.elements[guid] = element
 +
 +def create_multicast_router(testbed_instance, guid):
 +    parameters = testbed_instance._get_parameters(guid)
 +    element = testbed_instance._make_multicast_router(parameters)
 +    
 +    # Just inject configuration stuff
 +    element.home_path = "nepi-mcrt-%s" % (guid,)
 +    
 +    testbed_instance.elements[guid] = element
 +
  def create_internet(testbed_instance, guid):
      parameters = testbed_instance._get_parameters(guid)
      element = testbed_instance._make_internet(parameters)
@@@ -471,36 -423,19 +471,19 @@@ def preconfigure_tuniface(testbed_insta
      element.validate()
      
      # First-phase setup
-     if element.peer_proto:
-         if element.peer_iface and isinstance(element.peer_iface, testbed_instance._interfaces.TunIface):
-             # intra tun
-             listening = id(element) < id(element.peer_iface)
-         else:
-             # cross tun
-             if not element.tun_addr or not element.tun_port:
-                 listening = True
-             elif not element.peer_addr or not element.peer_port:
-                 listening = True
-             else:
-                 # both have addresses...
-                 # ...the one with the lesser address listens
-                 listening = element.tun_addr < element.peer_addr
-         element.prepare( 
-             'tun-%s' % (guid,),
-              listening)
+     element.prepare('tun-%s' % (guid,))
  
  def postconfigure_tuniface(testbed_instance, guid):
      element = testbed_instance._elements[guid]
      
      # Second-phase setup
-     element.setup()
+     element.launch()
      
- def wait_tuniface(testbed_instance, guid):
+ def prestart_tuniface(testbed_instance, guid):
      element = testbed_instance._elements[guid]
      
      # Second-phase setup
-     element.async_launch_wait()
-     
+     element.wait()
  
  def configure_node(testbed_instance, guid):
      node = testbed_instance._elements[guid]
@@@ -555,41 -490,6 +538,41 @@@ def configure_dependency(testbed_instan
      # Install stuff
      dep.async_setup()
  
 +def configure_announcer(testbed_instance, guid):
 +    # Link ifaces
 +    fwd = testbed_instance._elements[guid]
 +    fwd.ifaces = [ dev
 +        for node_guid in testbed_instance.get_connected(guid, "node", "apps")
 +        for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
 +        for dev in ( testbed_instance._elements.get(dev_guid) ,)
 +        if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
 +            and dev.multicast ]
 +    
 +    # Install stuff
 +    configure_dependency(testbed_instance, guid)
 +
 +def configure_forwarder(testbed_instance, guid):
 +    configure_announcer(testbed_instance, guid)
 +    
 +    # Link ifaces to forwarder
 +    fwd = testbed_instance._elements[guid]
 +    for iface in fwd.ifaces:
 +        iface.multicast_forwarder = '/var/run/mcastfwd'
 +
 +def configure_router(testbed_instance, guid):
 +    # Link ifaces
 +    rt = testbed_instance._elements[guid]
 +    rt.nonifaces = [ dev
 +        for fwd_guid in testbed_instance.get_connected(guid, "fwd", "router")
 +        for node_guid in testbed_instance.get_connected(fwd_guid, "node", "apps")
 +        for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
 +        for dev in ( testbed_instance._elements.get(dev_guid) ,)
 +        if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
 +            and not dev.multicast ]
 +    
 +    # Install stuff
 +    configure_dependency(testbed_instance, guid)
 +
  def configure_netpipe(testbed_instance, guid):
      netpipe = testbed_instance._elements[guid]
      
@@@ -636,18 -536,6 +619,18 @@@ connector_types = dict(
                  "max": 1, 
                  "min": 1
              }),
 +    "router": dict({
 +                "help": "Connector to a routing daemon", 
 +                "name": "router",
 +                "max": 1, 
 +                "min": 1
 +            }),
 +    "fwd": dict({
 +                "help": "Forwarder this routing daemon communicates with", 
 +                "name": "fwd",
 +                "max": 1, 
 +                "min": 1
 +            }),
      "pipes": dict({
                  "help": "Connector to a NetPipe", 
                  "name": "pipes",
@@@ -714,20 -602,32 +697,20 @@@ connections = 
      }),
      dict({
          "from": (TESTBED_ID, NODE, "apps"),
 -        "to":   (TESTBED_ID, APPLICATION, "node"),
 +        "to":   (TESTBED_ID, (APPLICATION, DEPENDENCY, NEPIDEPENDENCY, NS3DEPENDENCY, MULTICASTANNOUNCER), "node"),
          "init_code": connect_dep,
          "can_cross": False
      }),
      dict({
 -        "from": (TESTBED_ID, NODE, "deps"),
 -        "to":   (TESTBED_ID, DEPENDENCY, "node"),
 -        "init_code": connect_dep,
 -        "can_cross": False
 -    }),
 -    dict({
 -        "from": (TESTBED_ID, NODE, "deps"),
 -        "to":   (TESTBED_ID, NEPIDEPENDENCY, "node"),
 -        "init_code": connect_dep,
 -        "can_cross": False
 -    }),
 -    dict({
 -        "from": (TESTBED_ID, NODE, "deps"),
 -        "to":   (TESTBED_ID, NS3DEPENDENCY, "node"),
 -        "init_code": connect_dep,
 +        "from": (TESTBED_ID, NODE, "apps"),
 +        "to":   (TESTBED_ID, MULTICASTFORWARDER, "node"),
 +        "init_code": connect_forwarder,
          "can_cross": False
      }),
      dict({
 -        "from": (TESTBED_ID, NODE, "pipes"),
 -        "to":   (TESTBED_ID, NETPIPE, "node"),
 -        "init_code": connect_node_netpipe,
 +        "from": (TESTBED_ID, MULTICASTFORWARDER, "router"),
 +        "to":   (TESTBED_ID, MULTICASTROUTER, "fwd"),
 +        "init_code": connect_router,
          "can_cross": False
      }),
      dict({
@@@ -1267,15 -1167,6 +1250,15 @@@ attributes = dict(
                  "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
                  "validation_function": validation.is_string
              }),
 +    "routing_algorithm": dict({      
 +            "name": "algorithm",
 +            "help": "Routing algorithm.",
 +            "value": "dvmrp",
 +            "type": Attribute.ENUM, 
 +            "allowed": ["dvmrp"],
 +            "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
 +            "validation_function": validation.is_enum,
 +        }),
      })
  
  traces = dict({
              }),
      })
  
 -create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 +create_order = [ 
 +    INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, 
 +    MULTICASTANNOUNCER, MULTICASTFORWARDER, MULTICASTROUTER, 
 +    TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, 
 +    NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
  
 -configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 +configure_order = [ 
 +    INTERNET, Parallel(NODE), 
 +    NODEIFACE, 
 +    Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), 
 +    Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, 
 +    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
  
  # Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
 -start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 +start_order = [ INTERNET, 
 +    NODEIFACE, 
 +    Parallel(TAPIFACE), Parallel(TUNIFACE), 
 +    Parallel(NODE), NETPIPE, 
 +    Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), 
 +    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
  
  # cleanup order
 -shutdown_order = [ Parallel(APPLICATION), Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), NODEIFACE, Parallel(NODE) ]
 +shutdown_order = [ 
 +    Parallel(APPLICATION), 
 +    Parallel(MULTICASTROUTER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTANNOUNCER), 
 +    Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), 
 +    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), 
 +    NODEIFACE, Parallel(NODE) ]
  
  factories_info = dict({
      NODE: dict({
              "create_function": create_tuniface,
              "preconfigure_function": preconfigure_tuniface,
              "configure_function": postconfigure_tuniface,
-             "prestart_function": wait_tuniface,
+             "prestart_function": prestart_tuniface,
              "box_attributes": [
                  "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
                  "txqueuelen",
              "create_function": create_tapiface,
              "preconfigure_function": preconfigure_tuniface,
              "configure_function": postconfigure_tuniface,
-             "prestart_function": wait_tuniface,
+             "prestart_function": prestart_tuniface,
              "box_attributes": [
                  "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
                  "txqueuelen",
              "connector_types": ["node"],
              "traces": ["buildlog"],
          }),
 +    MULTICASTFORWARDER: dict({
 +            "help": "This application installs a userspace packet forwarder "
 +                    "that, when connected to a node, filters all packets "
 +                    "flowing through multicast-capable virtual interfaces "
 +                    "and applies custom-specified routing policies.",
 +            "category": FC.CATEGORY_APPLICATIONS,
 +            "create_function": create_multicast_forwarder,
 +            "preconfigure_function": configure_forwarder,
 +            "start_function": start_application,
 +            "status_function": status_application,
 +            "stop_function": stop_application,
 +            "box_attributes": [ ],
 +            "connector_types": ["node","router"],
 +            "traces": ["buildlog","stderr"],
 +        }),
 +    MULTICASTANNOUNCER: dict({
 +            "help": "This application installs a userspace daemon that "
 +                    "monitors multicast membership and announces it on all "
 +                    "multicast-capable interfaces.\n"
 +                    "This does not usually happen automatically on PlanetLab slivers.",
 +            "category": FC.CATEGORY_APPLICATIONS,
 +            "create_function": create_multicast_announcer,
 +            "preconfigure_function": configure_announcer,
 +            "start_function": start_application,
 +            "status_function": status_application,
 +            "stop_function": stop_application,
 +            "box_attributes": [ ],
 +            "connector_types": ["node"],
 +            "traces": ["buildlog","stderr"],
 +        }),
 +    MULTICASTROUTER: dict({
 +            "help": "This application installs a userspace daemon that "
 +                    "monitors multicast membership and announces it on all "
 +                    "multicast-capable interfaces.\n"
 +                    "This does not usually happen automatically on PlanetLab slivers.",
 +            "category": FC.CATEGORY_APPLICATIONS,
 +            "create_function": create_multicast_router,
 +            "preconfigure_function": configure_router,
 +            "start_function": start_application,
 +            "status_function": status_application,
 +            "stop_function": stop_application,
 +            "box_attributes": ["routing_algorithm"],
 +            "connector_types": ["fwd"],
 +            "traces": ["buildlog","stdout","stderr"],
 +        }),
      INTERNET: dict({
              "help": "Internet routing",
              "category": FC.CATEGORY_CHANNELS,
@@@ -19,18 -19,13 +19,18 @@@ import base6
  import traceback
  
  import tunchannel
 -import ipaddr2
 +
 +try:
 +    import iovec
 +    HAS_IOVEC = True
 +except:
 +    HAS_IOVEC = False
  
  tun_name = 'tun0'
  tun_path = '/dev/net/tun'
  hostaddr = socket.gethostbyname(socket.gethostname())
  
- usage = "usage: %prog [options] <remote-endpoint>"
+ usage = "usage: %prog [options]"
  
  parser = optparse.OptionParser(usage=usage)
  
@@@ -43,9 -38,9 +43,9 @@@ parser.add_option
      default = "/dev/net/tun",
      help = "TUN/TAP device file path or file descriptor number")
  parser.add_option(
-     "-p", "--port", dest="port", metavar="PORT", type="int",
+     "-p", "--peer-port", dest="peer_port", metavar="PEER_PORT", type="int",
      default = 15000,
-     help = "Peering TCP port to connect or listen to.")
+     help = "Remote TCP/UDP port to connect to.")
  parser.add_option(
      "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
      default = None,
@@@ -53,7 -48,6 +53,6 @@@
             "If given, all other connectivity options are ignored, tun_connect will "
             "simply wait to be killed after passing the file descriptor, and it will be "
             "the receiver's responsability to handle the tunneling.")
  parser.add_option(
      "-m", "--mode", dest="mode", metavar="MODE",
      default = "none",
          "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
          "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
          "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
+ parser.add_option(
+     "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
+     default = None,
+     help = 
+         "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
  parser.add_option(
      "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
      default = None,
@@@ -74,13 -73,18 +78,18 @@@ parser.add_option
      help = 
          "See mode. This specifies the VIF_MASK, "
          "a number indicating the network type (ie: 24 for a C-class network).")
+ parser.add_option(
+     "-P", "--port", dest="port", type="int", metavar="PORT", 
+     default = None,
+     help = 
+         "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
  parser.add_option(
      "-S", "--vif-snat", dest="vif_snat", 
      action = "store_true",
      default = False,
      help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
  parser.add_option(
-     "-P", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
+     "-Z", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
      default = None,
      help = 
          "See mode. This specifies the remote endpoint's virtual address, "
@@@ -97,11 -101,11 +106,11 @@@ parser.add_option
      help = 
          "This specifies the interface's emulated bandwidth in bytes per second." )
  parser.add_option(
-     "-u", "--udp", dest="udp", metavar="PORT", type="int",
+     "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
      default = None,
      help = 
-         "Bind to the specified UDP port locally, and send UDP datagrams to the "
-         "remote endpoint, creating a tunnel through UDP rather than TCP." )
+         "This specifies the PEER_ADDRESS, "
+         "the IP address of the remote interface.")
  parser.add_option(
      "-k", "--key", dest="cipher_key", metavar="KEY",
      default = None,
          "Specify a symmetric encryption key with which to protect packets across "
          "the tunnel. python-crypto must be installed on the system." )
  parser.add_option(
 -    "-K", "--gre-key", dest="gre_key", metavar="KEY", type="int",
 -    default = None,
 +    "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
 +    default = "true",
      help = 
          "Specify a demultiplexing 32-bit numeric key for GRE." )
  parser.add_option(
@@@ -129,12 -133,14 +138,12 @@@ parser.add_option
      help = "If specified, packets won't be logged to standard output, "
             "but dumped to a pcap-formatted trace in the specified file. " )
  parser.add_option(
 -    "--multicast", dest="multicast", 
 -    action = "store_true",
 -    default = False,
 -    help = "If specified, multicast packets will be forwarded and IGMP "
 -           "join/leave packets will be generated. Routing information "
 -           "must be sent to the mroute unix socket, in a format identical "
 -           "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
 -           "code and 32-bit data length." )
 +    "--multicast-forwarder", dest="multicast_fwd", 
 +    default = None,
 +    help = "If specified, multicast packets will be forwarded to "
 +           "the specified unix-domain socket. If the device uses ethernet "
 +           "frames, ethernet headers will be stripped and IP packets "
 +           "will be forwarded, prefixed with the interface's address." )
  parser.add_option(
      "--filter", dest="filter_module", metavar="PATH",
      default = None,
@@@ -193,7 -199,7 +202,7 @@@ parser.add_option
      help = "If specified, packets won't be logged to standard output, "
             "but dumped to a pcap-formatted trace in the specified file. " )
  
- (options, remaining_args) = parser.parse_args(sys.argv[1:])
+ (options,args) = parser.parse_args(sys.argv[1:])
  
  options.cipher = {
      'aes' : 'AES',
@@@ -216,6 -222,65 +225,6 @@@ IFNAMSIZ = 0x0000001
  IFREQ_SZ = 0x00000028
  FIONREAD = 0x0000541b
  
 -class MulticastThread(threading.Thread):
 -    def __init__(self, *p, **kw):
 -        super(MulticastThread, self).__init__(*p, **kw)
 -        self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
 -        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
 -            socket.inet_aton(options.vif_addr) )
 -        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
 -        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
 -        self._stop = False
 -        self.setDaemon(True)
 -    
 -    def run(self):
 -        devnull = open('/dev/null','r+b')
 -        maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
 -        cur_maddr = set()
 -        lastfullrefresh = time.time()
 -        while not self._stop:
 -            # Get current subscriptions
 -            proc = subprocess.Popen(['ip','maddr','show',tun_name],
 -                stdout = subprocess.PIPE,
 -                stderr = subprocess.STDOUT,
 -                stdin = devnull)
 -            new_maddr = set()
 -            for line in proc.stdout:
 -                match = maddr_re.match(line)
 -                if match:
 -                    new_maddr.add(match.group(1))
 -            proc.wait()
 -            
 -            # Every now and then, send a full report
 -            now = time.time()
 -            report_new = new_maddr
 -            if (now - lastfullrefresh) <= 30.0:
 -                report_new = report_new - cur_maddr
 -            else:
 -                lastfullrefresh = now
 -            
 -            # Report subscriptions
 -            for grp in report_new:
 -                igmpp = ipaddr2.ipigmp(
 -                    options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp, 
 -                    noipcksum=True)
 -                self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
 -
 -            # Notify group leave
 -            for grp in cur_maddr - new_maddr:
 -                igmpp = ipaddr2.ipigmp(
 -                    options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, 
 -                    noipcksum=True)
 -                self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
 -
 -            cur_maddr = new_maddr
 -            
 -            time.sleep(1)
 -    
 -    def stop(self):
 -        self._stop = True
 -        self.join(5)
 -
  class HostLock(object):
      # This class is used as a lock to prevent concurrency issues with more
      # than one instance of netns running in the same machine. Both in 
@@@ -452,8 -517,8 +461,8 @@@ def pl_vif_start(tun_path, tun_name)
      if options.vif_txqueuelen is not None:
          stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
      if options.mode.startswith('pl-gre'):
 -        stdin.write("gre=%d\n" % (options.gre_key,))
 +        stdin.write("gre=%s\n" % (options.gre_key,))
-         stdin.write("remote=%s\n" % (remaining_args[0],))
+         stdin.write("remote=%s\n" % (options.peer_addr,))
      stdin.close()
      
      t.join()
@@@ -514,7 -579,7 +523,7 @@@ def tun_fwd(tun, remote, reconnect = No
          with_pi = options.mode.startswith('pl-'),
          ether_mode = tun_name.startswith('tap'),
          cipher_key = options.cipher_key,
-         udp = options.udp,
+         udp = options.protocol == 'udp',
          TERMINATE = TERMINATE,
          stderr = None,
          reconnect = reconnect,
@@@ -629,12 -694,6 +638,12 @@@ else
      filter_close = None
      queueclass = None
  
 +# install multicast forwarding hook
 +if options.multicast_fwd:
 +    print >>sys.stderr, "Connecting to mcast filter"
 +    mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
 +    tunchannel.nonblock(mcfwd_sock.fileno())
 +
  # be careful to roll back stuff on exceptions
  tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
  try:
@@@ -660,63 -719,8 +669,63 @@@ try
      tcpdump = None
      reconnect = None
      mcastthread = None
 +
 +    # install multicast forwarding hook
 +    if options.multicast_fwd:
 +        print >>sys.stderr, "Installing mcast filter"
 +        
 +        if HAS_IOVEC:
 +            writev = iovec.writev
 +        else:
 +            os_write = os.write
 +            map_ = map
 +            str_ = str
 +            def writev(fileno, *stuff):
 +                os_write(''.join(map_(str_,stuff)))
 +        
 +        def accept_packet(packet, direction, 
 +                _up_accept=accept_packet, 
 +                sock=mcfwd_sock, 
 +                sockno=mcfwd_sock.fileno(),
 +                etherProto=tunchannel.etherProto,
 +                etherStrip=tunchannel.etherStrip,
 +                etherMode=tun_name.startswith('tap'),
 +                multicast_fwd = options.multicast_fwd,
 +                vif_addr = socket.inet_aton(options.vif_addr),
 +                connected = [], writev=writev,
 +                len=len, ord=ord):
 +            if _up_accept:
 +                rv = _up_accept(packet, direction)
 +                if not rv:
 +                    return rv
 +
 +            if direction == 1:
 +                # Incoming... what?
 +                if etherMode:
 +                    if etherProto(packet)=='\x08\x00':
 +                        fwd = etherStrip(packet)
 +                    else:
 +                        fwd = None
 +                else:
 +                    fwd = packet
 +                if fwd is not None and len(fwd) >= 20:
 +                    if (ord(fwd[16]) & 0xf0) == 0xe0:
 +                        # Forward it
 +                        if not connected:
 +                            try:
 +                                sock.connect(multicast_fwd)
 +                                connected.append(None)
 +                            except:
 +                                traceback.print_exc(file=sys.stderr)
 +                        if connected:
 +                            try:
 +                                writev(sockno, vif_addr,fwd)
 +                            except:
 +                                traceback.print_exc(file=sys.stderr)
 +            return 1
 +
      
-     if options.pass_fd:
+     if options.protocol == 'fd':
          if accept_packet or filter_init:
              raise NotImplementedError, "--pass-fd and --filter are not compatible"
          
              while not TERM:
                  time.sleep(1)
          remote = None
-     elif options.mode.startswith('pl-gre'):
+     elif options.protocol == "gre":
          if accept_packet or filter_init:
              raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
          
              TERM = TERMINATE
              while not TERM:
                  time.sleep(1)
-         remote = remaining_args[0]
-     elif options.udp:
+         remote = options.peer_addr
+     elif options.protocol == "udp":
          # connect to remote endpoint
-         if remaining_args and not remaining_args[0].startswith('-'):
-             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
-             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
-             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
-             retrydelay = 1.0
-             for i in xrange(30):
-                 if TERMINATE:
-                     raise OSError, "Killed"
-                 try:
-                     rsock.bind((hostaddr,options.udp))
-                     break
-                 except socket.error:
-                     # wait a while, retry
-                     print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
-                     time.sleep(min(30.0,retrydelay))
-                     retrydelay *= 1.1
-             else:
-                 rsock.bind((hostaddr,options.udp))
-             rsock.connect((remaining_args[0],options.port))
+         if options.peer_addr and options.peer_port:
+             rsock = tunchannel.udp_establish(TERMINATE, hostaddr, options.port, 
+                     options.peer_addr, options.peer_port)
+             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
          else:
              print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
              raise AssertionError, "Error: need a remote endpoint in UDP mode"
-         
-         # Wait for other peer
-         tunchannel.udp_handshake(TERMINATE, rsock)
-         
-         remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-     else:
+     elif options.protocol == "tcp":
          # connect to remote endpoint
-         if remaining_args and not remaining_args[0].startswith('-'):
-             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
-             rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-             retrydelay = 1.0
-             for i in xrange(30):
-                 if TERMINATE:
-                     raise OSError, "Killed"
-                 try:
-                     rsock.connect((remaining_args[0],options.port))
-                     break
-                 except socket.error:
-                     # wait a while, retry
-                     print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
-                     time.sleep(min(30.0,retrydelay))
-                     retrydelay *= 1.1
-             else:
-                 rsock.connect((remaining_args[0],options.port))
+         if options.peer_addr and options.peer_port:
+             rsock = tunchannel.tcp_establish(TERMINATE, hostaddr, options.port,
+                     options.peer_addr, options.peer_port)
+             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
          else:
-             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
-             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-             retrydelay = 1.0
-             for i in xrange(30):
-                 if TERMINATE:
-                     raise OSError, "Killed"
-                 try:
-                     lsock.bind((hostaddr,options.port))
-                     break
-                 except socket.error:
-                     # wait a while, retry
-                     print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
-                     time.sleep(min(30.0,retrydelay))
-                     retrydelay *= 1.1
-             else:
-                 lsock.bind((hostaddr,options.port))
-             lsock.listen(1)
-             rsock,raddr = lsock.accept()
-         remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+             print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
+             raise AssertionError, "Error: need a remote endpoint in TCP mode"
+     else:
+         msg = "Error: Invalid protocol %s" % options.protocol
+         print >>sys.stderr, msg 
+         raise AssertionError, msg
  
      if filter_init:
          filter_local, filter_remote = filter_init()
          # or perhaps there is no os.nice support in the system
          pass
      
 -    if options.multicast:
 -        # Start multicast forwarding daemon
 -        mcastthread = MulticastThread()
 -        mcastthread.start()
 -
      if not filter_init:
          tun_fwd(tun, remote,
              reconnect = reconnect,
@@@ -26,14 -26,11 +26,12 @@@ class TunProtoBase(object)
          self.port = 15000
          self.mode = 'pl-tun'
          self.key = key
 +        self.cross_slice = False
          
          self.home_path = home_path
-         
-         self._launcher = None
+        
          self._started = False
-         self._started_listening = False
-         self._starting = False
          self._pid = None
          self._ppid = None
          self._if_name = None
@@@ -77,7 -74,6 +75,6 @@@
          
          if proc.wait():
              raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
-         
      
      def _install_scripts(self):
          local = self.local()
          if proc.wait():
              raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
          
-     def launch(self, check_proto, listen, extra_args=[]):
-         if self._starting:
-             raise AssertionError, "Double start"
-         
-         self._starting = True
-         
+     def launch(self, check_proto):
          peer = self.peer()
          local = self.local()
          
          
          peer_port = peer.tun_port
          peer_addr = peer.tun_addr
-         peer_proto= peer.tun_proto
-         peer_cipher=peer.tun_cipher
+         peer_proto = peer.tun_proto
+         peer_cipher = peer.tun_cipher
          
          local_port = self.port
          local_cap  = local.capture
          local_snat = local.snat
          local_txq  = local.txqueuelen
          local_p2p  = local.pointopoint
 -        local_cipher = local.tun_cipher
 -        local_mcast = local.multicast
 -        local_bwlim = local.bwlimit
 +        local_cipher=local.tun_cipher
 +        local_mcast= local.multicast
 +        local_bwlim= local.bwlimit
 +        local_mcastfwd = local.multicast_forwarder
          
          if not local_p2p and hasattr(peer, 'address'):
              local_p2p = peer.address
          if local_cipher != peer_cipher:
              raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
          
-         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
-             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
-         
-         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
-             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
          if check_proto == 'gre' and local_cipher.lower() != 'plain':
              raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
  
          
          args = ["python", "tun_connect.py", 
              "-m", str(self.mode),
+             "-t", str(check_proto),
              "-A", str(local_addr),
              "-M", str(local_mask),
 -            "-C", str(local_cipher)]
 +            "-C", str(local_cipher),
 +            ]
          
          if check_proto == 'fd':
              passfd_arg = str(peer_addr)
                  "--pass-fd", passfd_arg
              ])
          elif check_proto == 'gre':
 -                "-K", str(min(local_port, peer_port)),
 +            if self.cross_slice:
 +                args.extend([
 +                    "-K", str(self.key.strip('='))
 +                ])
++
+             args.extend([
+                 "-a", str(peer_addr),
+             ])
+         # both udp and tcp
          else:
              args.extend([
-                 "-p", str(local_port if listen else peer_port),
+                 "-P", str(local_port),
+                 "-p", str(peer_port),
+                 "-a", str(peer_addr),
                  "-k", str(self.key)
              ])
          
          if local_snat:
              args.append("-S")
          if local_p2p:
-             args.extend(("-P",str(local_p2p)))
+             args.extend(("-Z",str(local_p2p)))
          if local_txq:
              args.extend(("-Q",str(local_txq)))
          if not local_cap:
              args.append("-N")
          elif local_cap == 'pcap':
              args.extend(('-c','pcap'))
 -        if local_mcast:
 -            args.append("--multicast")
          if local_bwlim:
              args.extend(("-b",str(local_bwlim*1024)))
-         if extra_args:
-             args.extend(map(str,extra_args))
-         if not listen and check_proto != 'fd':
-             args.append(str(peer_addr))
          if filter_module:
              args.extend(("--filter", filter_module))
          if filter_args:
              args.extend(("--filter-args", filter_args))
 +        if local_mcast and local_mcastfwd:
 +            args.extend(("--multicast-forwarder", local_mcastfwd))
  
          self._logger.info("Starting %s", self)
          
          
          if proc.wait():
              raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
-         
+        
          self._started = True
      
      def recover(self):
          # Tunnel should be still running in its node
          # Just check its pidfile and we're done
          self._started = True
-         self._started_listening = True
          self.checkpid()
      
-     def _launch_and_wait(self, *p, **kw):
-         try:
-             self.__launch_and_wait(*p, **kw)
-         except:
-             if self._launcher:
-                 import sys
-                 self._launcher._exc.append(sys.exc_info())
-             else:
-                 raise
-             
-     def __launch_and_wait(self, *p, **kw):
+     def wait(self):
          local = self.local()
          
-         self.launch(*p, **kw)
-         
-         # Wait for the process to be started
-         while self.status() == rspawn.NOT_STARTED:
-             time.sleep(1.0)
-         
          # Wait for the connection to be established
          retrytime = 2.0
          for spin in xrange(30):
              
              # Connected?
              (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
 -                "cd %(home)s ; grep -c Connected capture" % dict(
 +                "cd %(home)s ; grep -a -c Connected capture" % dict(
                      home = server.shell_escape(self.home_path)),
                  host = local.node.hostname,
                  port = None,
  
              # At least listening?
              (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
 -                "cd %(home)s ; grep -c Listening capture" % dict(
 +                "cd %(home)s ; grep -a -c Listening capture" % dict(
                      home = server.shell_escape(self.home_path)),
                  host = local.node.hostname,
                  port = None,
                  )
              proc.wait()
  
-             if out.strip() == '1':
-                 self._started_listening = True
-             
              time.sleep(min(30.0, retrytime))
              retrytime *= 1.1
          else:
              # Inspect the trace to check the assigned iface
              local = self.local()
              if local:
 -                cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
 +                cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
                              home = server.shell_escape(self.home_path))
                  for spin in xrange(30):
                      (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
                      return True
          return False
      
-     def async_launch(self, check_proto, listen, extra_args=[]):
-         if not self._started and not self._launcher:
-             self._launcher = threading.Thread(
-                 target = self._launch_and_wait,
-                 args = (check_proto, listen, extra_args))
-             self._launcher._exc = []
-             self._launcher.start()
-     
-     def async_launch_wait(self):
-         if self._launcher:
-             self._launcher.join()
-             if self._launcher._exc:
-                 exctyp,exval,exctrace = self._launcher._exc[0]
-                 raise exctyp,exval,exctrace
-             elif not self._started:
-                 raise RuntimeError, "Failed to launch TUN forwarder"
-         elif not self._started:
-             self.launch()
-     def async_launch_wait_listening(self):
-         if self._launcher:
-             for x in xrange(180):
-                 if self._launcher._exc:
-                     exctyp,exval,exctrace = self._launcher._exc[0]
-                     raise exctyp,exval,exctrace
-                 elif self._started and self._started_listening:
-                     break
-                 time.sleep(1)
-         elif not self._started:
-             self.launch()
      def checkpid(self):            
          local = self.local()
          
      def remote_trace_path(self, whichtrace, tracemap = None):
          tracemap = self._TRACEMAP if not tracemap else tracemap
          
-         
          if whichtrace not in tracemap:
              return None
          
          
          return local_path
          
-         
-     def prepare(self):
-         """
-         First-phase setup
-         
-         eg: set up listening ports
-         """
-         raise NotImplementedError
-     
-     def setup(self):
-         """
-         Second-phase setup
-         
-         eg: connect to peer
-         """
-         raise NotImplementedError
-     
      def shutdown(self):
-         """
-         Cleanup
-         """
-         raise NotImplementedError
+         self.kill()
      
      def destroy(self):
-         """
-         Second-phase cleanup
-         """
-         pass
-         
+         self.waitkill()
  
  class TunProtoUDP(TunProtoBase):
-     def __init__(self, local, peer, home_path, key, listening):
+     def __init__(self, local, peer, home_path, key):
          super(TunProtoUDP, self).__init__(local, peer, home_path, key)
-         self.listening = listening
-     
-     def prepare(self):
-         pass
      
-     def setup(self):
-         self.async_launch('udp', False, ("-u",str(self.port)))
-     
-     def shutdown(self):
-         self.kill()
-     def destroy(self):
-         self.waitkill()
-     def launch(self, check_proto='udp', listen=False, extra_args=None):
-         if extra_args is None:
-             extra_args = ("-u",str(self.port))
-         super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
+     def launch(self):
+         super(TunProtoUDP, self).launch('udp')
  
  class TunProtoFD(TunProtoBase):
-     def __init__(self, local, peer, home_path, key, listening):
+     def __init__(self, local, peer, home_path, key):
          super(TunProtoFD, self).__init__(local, peer, home_path, key)
-         self.listening = listening
-     
-     def prepare(self):
-         pass
      
-     def setup(self):
-         self.async_launch('fd', False)
-     
-     def shutdown(self):
-         self.kill()
-     def destroy(self):
-         self.waitkill()
-     def launch(self, check_proto='fd', listen=False, extra_args=[]):
-         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
+     def launch(self):
+         super(TunProtoFD, self).launch('fd')
  
  class TunProtoGRE(TunProtoBase):
-     def __init__(self, local, peer, home_path, key, listening):
+     def __init__(self, local, peer, home_path, key):
          super(TunProtoGRE, self).__init__(local, peer, home_path, key)
-         self.listening = listening
          self.mode = 'pl-gre-ip'
-     
-     def prepare(self):
-         pass
-     
-     def setup(self):
-         self.async_launch('gre', False)
-     
-     def shutdown(self):
-         self.kill()
-     def destroy(self):
-         self.waitkill()
  
-     def launch(self, check_proto='gre', listen=False, extra_args=[]):
-         super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
+     def launch(self):
+         super(TunProtoGRE, self).launch('gre')
  
  class TunProtoTCP(TunProtoBase):
-     def __init__(self, local, peer, home_path, key, listening):
+     def __init__(self, local, peer, home_path, key):
          super(TunProtoTCP, self).__init__(local, peer, home_path, key)
-         self.listening = listening
-     
-     def prepare(self):
-         if self.listening:
-             self.async_launch('tcp', True)
-     
-     def setup(self):
-         if not self.listening:
-             # make sure our peer is ready
-             peer = self.peer()
-             if peer and peer.peer_proto_impl:
-                 peer.peer_proto_impl.async_launch_wait_listening()
-             
-             if not self._started:
-                 self.async_launch('tcp', False)
-         
-         self.checkpid()
      
-     def shutdown(self):
-         self.kill()
-     def destroy(self):
-         self.waitkill()
-     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
-         if listen is None:
-             listen = self.listening
-         super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
+     def launch(self):
+         super(TunProtoTCP, self).launch('tcp')
  
  class TapProtoUDP(TunProtoUDP):
-     def __init__(self, local, peer, home_path, key, listening):
-         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
+     def __init__(self, local, peer, home_path, key):
+         super(TapProtoUDP, self).__init__(local, peer, home_path, key)
          self.mode = 'pl-tap'
  
  class TapProtoTCP(TunProtoTCP):
-     def __init__(self, local, peer, home_path, key, listening):
-         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
+     def __init__(self, local, peer, home_path, key):
+         super(TapProtoTCP, self).__init__(local, peer, home_path, key)
          self.mode = 'pl-tap'
  
  class TapProtoFD(TunProtoFD):
-     def __init__(self, local, peer, home_path, key, listening):
-         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
+     def __init__(self, local, peer, home_path, key):
+         super(TapProtoFD, self).__init__(local, peer, home_path, key)
          self.mode = 'pl-tap'
  
  class TapProtoGRE(TunProtoGRE):
-     def __init__(self, local, peer, home_path, key, listening):
-         super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
+     def __init__(self, local, peer, home_path, key):
+         super(TapProtoGRE, self).__init__(local, peer, home_path, key)
          self.mode = 'pl-gre-eth'
  
  TUN_PROTO_MAP = {
      'tcp' : TunProtoTCP,
      'udp' : TunProtoUDP,
@@@ -815,4 -653,3 +660,3 @@@ TAP_PROTO_MAP = 
      'gre' : TapProtoGRE,
  }
  
@@@ -6,6 -6,7 +6,7 @@@ import socke
  import threading
  import errno
  import fcntl
+ import random
  import traceback
  import functools
  import collections
@@@ -118,15 -119,15 +119,15 @@@ def _pullPacket(buf, ether_mode=False, 
              rv = buf.popleft()
          return rv
  
 -def etherStrip(buf):
 +def etherStrip(buf, buffer=buffer, len=len):
      if len(buf) < 14:
          return ""
      if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
          # tagged ethernet frame
 -        return buf[18:]
 +        return buffer(buf, 18)
      elif buf[12:14] == '\x08\x00':
          # untagged ethernet frame
 -        return buf[14:]
 +        return buffer(buf, 14)
      else:
          return ""
  
@@@ -197,6 -198,7 +198,7 @@@ def tun_fwd(tun, remote, with_pi, ether
          retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
      crypto_mode = False
      crypter = None
      try:
          if cipher_key and cipher:
              import Crypto.Cipher
          if rread is None:
              def rread(remote, maxlen, os_read=os.read):
                  return os_read(remote_fd, maxlen)
-     
+  
      rnonblock = nonblock(remote)
      tnonblock = nonblock(tun)
      
      
      remoteok = True
      
+     
      while not TERMINATE:
          wset = []
          if packetReady(bkbuf):
              if e.args[0] == errno.EINTR:
                  # just retry
                  continue
+             else:
+                 raise
  
          # check for errors
          if errs:
                  try:
                      for x in xrange(maxbatch):
                          packet = rread(remote,2000)
+                         
                          #rr += 1
                          
                          if crypto_mode:
          
          #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
  
+ def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+     retrydelay = 1.0
+     for i in xrange(30):
+         # TERMINATE is a array. An item can be added to TERMINATE, from
+         # outside this function to force termination of the loop
+         if TERMINATE:
+             raise OSError, "Killed"
+         try:
+             rsock.bind((local_addr, local_port))
+             break
+         except socket.error:
+             # wait a while, retry
+             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+             time.sleep(min(30.0,retrydelay))
+             retrydelay *= 1.1
+     else:
+         rsock.bind((local_addr, local_port))
+     print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
+     print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
+     rsock.connect((peer_addr, peer_port))
+     return rsock
  
  def udp_handshake(TERMINATE, rsock):
      endme = False
      endme = True
      keepalive_thread.join()
  
+ def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
+             peer_port)
+     udp_handshake(TERMINATE, rsock)
+     return rsock 
+ def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
+     sock = None
+     retrydelay = 1.0
+     # The peer has a firewall that prevents a response to the connect, we 
+     # will be forever blocked in the connect, so we put a reasonable timeout.
+     rsock.settimeout(10) 
+     # We wait for 
+     for i in xrange(30):
+         if stop:
+             break
+         if TERMINATE:
+             raise OSError, "Killed"
+         try:
+             rsock.connect((peer_addr, peer_port))
+             sock = rsock
+             break
+         except socket.error:
+             # wait a while, retry
+             print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+             time.sleep(min(30.0,retrydelay))
+             retrydelay *= 1.1
+     else:
+         rsock.connect((peer_addr, peer_port))
+         sock = rsock
+     if sock:
+         print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
+         sock.settimeout(0) 
+     return sock
+ def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
+     sock = None
+     retrydelay = 1.0
+     # We try to bind to the local virtual interface. 
+     # It might not exist yet so we wait in a loop.
+     for i in xrange(30):
+         if stop:
+             break
+         if TERMINATE:
+             raise OSError, "Killed"
+         try:
+             lsock.bind((local_addr, local_port))
+             break
+         except socket.error:
+             # wait a while, retry
+             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+             time.sleep(min(30.0,retrydelay))
+             retrydelay *= 1.1
+     else:
+         lsock.bind((local_addr, local_port))
+     print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
+     # Now we wait until the other side connects. 
+     # The other side might not be ready yet, so we also wait in a loop for timeouts.
+     timeout = 1
+     lsock.listen(1)
+     for i in xrange(30):
+         if TERMINATE:
+             raise OSError, "Killed"
+         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
+         if stop:
+             break
+         if lsock in rlist:
+             sock,raddr = lsock.accept()
+             print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
+             break
+         timeout += 5
+     return sock
+ def tcp_handshake(rsock, listen, hand):
+     # we are going to use a barrier algorithm to decide wich side listen.
+     # each side will "roll a dice" and send the resulting value to the other 
+     # side. 
+     win = False
+     rsock.settimeout(10)
+     try:
+         rsock.send(hand)
+         peer_hand = rsock.recv(1)
+         print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand)
+         if hand < peer_hand:
+             if listen:
+                 win = True
+         elif hand > peer_hand:
+             if not listen:
+                 win = True
+     except socket.timeout:
+         pass
+     rsock.settimeout(0)
+     return win
+ def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+     def listen(stop, hand, lsock, lresult):
+         win = False
+         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
+         if rsock:
+             win = tcp_handshake(rsock, True, hand)
+             stop.append(True)
+         lresult.append((win, rsock))
+     def connect(stop, hand, rsock, rresult):
+         win = False
+         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
+         if rsock:
+             win = tcp_handshake(rsock, False, hand)
+             stop.append(True)
+         rresult.append((win, rsock))
+   
+     end = False
+     sock = None
+     while not end:
+         if TERMINATE:
+             raise OSError, "Killed"
+         hand = str(random.randint(1, 6))
+         stop = []
+         lresult = []
+         rresult = []
+         lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+         rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+         listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
+         connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
+         connect_thread.start()
+         listen_thread.start()
+         connect_thread.join()
+         listen_thread.join()
+         (lwin, lrsock) = lresult[0]
+         (rwin, rrsock) = rresult[0]
+         if not lrsock or not rrsock:
+             if not lrsock:
+                 sock = rrsock
+             if not rrsock:
+                 sock = lrsock
+             end = True
+         # both socket are connected
+         else:
+            if lwin:
+                 sock = lrsock
+                 end = True
+            elif rwin: 
+                 sock = rrsock
+                 end = True
+     if not sock:
+         raise OSError, "Error: tcp_establish could not establish connection."
+     return sock