From: Alina Quereilhac Date: Tue, 30 Aug 2011 17:54:26 +0000 (+0200) Subject: Added TCP-handshake for TunChannel and tun_connect.py X-Git-Tag: nepi-3.0.0~253^2~2 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4bea9f9369373d6f6be4b4b781de6d615bfaf610;p=nepi.git Added TCP-handshake for TunChannel and tun_connect.py --- diff --git a/examples/roads09.py b/examples/roads09.py index 772eec02..47b42b20 100644 --- a/examples/roads09.py +++ b/examples/roads09.py @@ -110,7 +110,7 @@ class Roads09Ns3PLExample(object): def add_ns3_fdnd(self, node, ns3_desc): fdnd = ns3_desc.create("ns3::FdNetDevice") node.connector("devs").connect(fdnd.connector("node")) - fdnd.enable_trace("FileDescriptorPcapTrace") + fdnd.enable_trace("FdPcapTrace") return fdnd def add_ns3_node(self, ns3_desc): diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index a08533b4..4ee31107 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -329,8 +329,14 @@ class TestbedController(execute.TestbedController): cross_connector_type_name, True) if connect_code: - self._logger.debug("Cross-connect: guid: %d, connect_code: %s " % ( - guid, connect_code.func.__name__)) + if hasattr(connect_code, "func"): + func_name = connect_code.func.__name__ + elif hasattr(connect_code, "__name__"): + func_name = connect_code.__name__ + else: + func_name = repr(connect_code) + self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % ( + guid, func_name)) elem_cross_data = cross_data[cross_testbed_guid][cross_guid] connect_code(self, guid, elem_cross_data) diff --git a/src/nepi/testbeds/netns/execute.py b/src/nepi/testbeds/netns/execute.py index 0a2c93dd..020b97f1 100644 --- a/src/nepi/testbeds/netns/execute.py +++ b/src/nepi/testbeds/netns/execute.py @@ -123,7 +123,7 @@ class TestbedController(testbed_impl.TestbedController): trace.close() for guid, element in self._elements.iteritems(): if isinstance(element, self.TunChannel): - element.Cleanup() + element.cleanup() else: factory_id = self._create[guid] if factory_id == "Node": diff --git a/src/nepi/testbeds/netns/metadata.py b/src/nepi/testbeds/netns/metadata.py index c4ddc610..dc875a31 100644 --- a/src/nepi/testbeds/netns/metadata.py +++ b/src/nepi/testbeds/netns/metadata.py @@ -10,7 +10,7 @@ from nepi.util.constants import ApplicationStatus as AS, \ from nepi.util.tunchannel_impl import \ preconfigure_tunchannel, postconfigure_tunchannel, \ - wait_tunchannel, create_tunchannel, \ + prestart_tunchannel, create_tunchannel, \ crossconnect_tunchannel_peer_init, \ crossconnect_tunchannel_peer_compl @@ -530,7 +530,7 @@ factories_info = dict({ "create_function": create_tunchannel, "preconfigure_function": preconfigure_tunchannel, "configure_function": postconfigure_tunchannel, - "prestart_function": wait_tunchannel, + "prestart_function": prestart_tunchannel, "help": "Channel to forward "+TAPIFACE+" data to " "other TAP interfaces supporting the NEPI tunneling protocol.", "connector_types": ["->fd", "udp", "tcp"], diff --git a/src/nepi/testbeds/ns3/connection_metadata.py b/src/nepi/testbeds/ns3/connection_metadata.py index 9c338c30..62818063 100644 --- a/src/nepi/testbeds/ns3/connection_metadata.py +++ b/src/nepi/testbeds/ns3/connection_metadata.py @@ -145,7 +145,7 @@ def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid): # Store a reference to the endpoint to keep the socket alive fdnd._endpoint_socket = sock1 fdnd.SetFileDescriptor(sock1.fileno()) - + # Send the other endpoint to the TUN channel tun.tun_socket = sock2 @@ -154,6 +154,7 @@ def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid): # the default presence of PI headers) tun.with_pi = True + ### Connector information ### connector_types = dict({ diff --git a/src/nepi/testbeds/ns3/execute.py b/src/nepi/testbeds/ns3/execute.py index 928002fd..0635f262 100644 --- a/src/nepi/testbeds/ns3/execute.py +++ b/src/nepi/testbeds/ns3/execute.py @@ -164,7 +164,7 @@ class TestbedController(testbed_impl.TestbedController): for element in self._elements.itervalues(): if isinstance(element, self.LOCAL_TYPES): # graceful shutdown of locally-implemented objects - element.Cleanup() + element.cleanup() if self.ns3: if not self.ns3.Simulator.IsFinished(): self.stop() diff --git a/src/nepi/testbeds/ns3/factories_metadata.py b/src/nepi/testbeds/ns3/factories_metadata.py index ace0f271..f45dca40 100644 --- a/src/nepi/testbeds/ns3/factories_metadata.py +++ b/src/nepi/testbeds/ns3/factories_metadata.py @@ -6,7 +6,7 @@ from nepi.util.constants import AF_INET, ApplicationStatus as AS, \ FactoryCategories as FC from nepi.util.tunchannel_impl import \ preconfigure_tunchannel, postconfigure_tunchannel, \ - wait_tunchannel, create_tunchannel + prestart_tunchannel, create_tunchannel import re wifi_standards = dict({ @@ -1057,7 +1057,7 @@ factories_info = dict({ "create_function": create_tunchannel, "preconfigure_function": preconfigure_tunchannel, "configure_function": postconfigure_tunchannel, - "prestart_function": wait_tunchannel, + "prestart_function": prestart_tunchannel, "help": "Channel to forward FdNetDevice data to " "other TAP interfaces supporting the NEPI tunneling protocol.", "connector_types": ["fd->", "udp", "tcp"], diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index c853a378..1f313c14 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -238,9 +238,9 @@ class TunIface(object): if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None): raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,) - def _impl_instance(self, home_path, listening): + def _impl_instance(self, home_path): impl = self._PROTO_MAP[self.peer_proto]( - self, self.peer_iface, home_path, self.tun_key, listening) + self, self.peer_iface, home_path, self.tun_key) impl.port = self.tun_port return impl @@ -253,8 +253,8 @@ class TunIface(object): else: self._delay_recover = True - def prepare(self, home_path, listening): - if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))): + def prepare(self, home_path): + if not self.peer_iface and (self.peer_proto and self.peer_addr and self.peer_port): # Ad-hoc peer_iface self.peer_iface = _CrossIface( self.peer_proto, @@ -263,15 +263,13 @@ class TunIface(object): self.peer_cipher) if self.peer_iface: if not self.peer_proto_impl: - self.peer_proto_impl = self._impl_instance(home_path, listening) + self.peer_proto_impl = self._impl_instance(home_path) if self._delay_recover: self.peer_proto_impl.recover() - else: - self.peer_proto_impl.prepare() - def setup(self): + def launch(self): if self.peer_proto_impl: - self.peer_proto_impl.setup() + self.peer_proto_impl.launch() def cleanup(self): if self.peer_proto_impl: @@ -282,9 +280,9 @@ class TunIface(object): self.peer_proto_impl.destroy() self.peer_proto_impl = None - def async_launch_wait(self): + def wait(self): if self.peer_proto_impl: - self.peer_proto_impl.async_launch_wait() + self.peer_proto_impl.wait() def sync_trace(self, local_dir, whichtrace, tracemap = None): if self.peer_proto_impl: diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index 98c0f423..7754c0a3 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -423,36 +423,19 @@ def preconfigure_tuniface(testbed_instance, guid): element.validate() # First-phase setup - if element.peer_proto: - if element.peer_iface and isinstance(element.peer_iface, testbed_instance._interfaces.TunIface): - # intra tun - listening = id(element) < id(element.peer_iface) - else: - # cross tun - if not element.tun_addr or not element.tun_port: - listening = True - elif not element.peer_addr or not element.peer_port: - listening = True - else: - # both have addresses... - # ...the one with the lesser address listens - listening = element.tun_addr < element.peer_addr - element.prepare( - 'tun-%s' % (guid,), - listening) + element.prepare('tun-%s' % (guid,)) def postconfigure_tuniface(testbed_instance, guid): element = testbed_instance._elements[guid] # Second-phase setup - element.setup() + element.launch() -def wait_tuniface(testbed_instance, guid): +def prestart_tuniface(testbed_instance, guid): element = testbed_instance._elements[guid] # Second-phase setup - element.async_launch_wait() - + element.wait() def configure_node(testbed_instance, guid): node = testbed_instance._elements[guid] @@ -1272,7 +1255,7 @@ factories_info = dict({ "create_function": create_tuniface, "preconfigure_function": preconfigure_tuniface, "configure_function": postconfigure_tuniface, - "prestart_function": wait_tuniface, + "prestart_function": prestart_tuniface, "box_attributes": [ "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit", "txqueuelen", @@ -1288,7 +1271,7 @@ factories_info = dict({ "create_function": create_tapiface, "preconfigure_function": preconfigure_tuniface, "configure_function": postconfigure_tuniface, - "prestart_function": wait_tuniface, + "prestart_function": prestart_tuniface, "box_attributes": [ "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit", "txqueuelen", diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index dce367bf..08d50502 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -25,7 +25,7 @@ tun_name = 'tun0' tun_path = '/dev/net/tun' hostaddr = socket.gethostbyname(socket.gethostname()) -usage = "usage: %prog [options] " +usage = "usage: %prog [options]" parser = optparse.OptionParser(usage=usage) @@ -38,9 +38,9 @@ parser.add_option( default = "/dev/net/tun", help = "TUN/TAP device file path or file descriptor number") parser.add_option( - "-p", "--port", dest="port", metavar="PORT", type="int", + "-p", "--peer-port", dest="peer-port", metavar="PEER_PORT", type="int", default = 15000, - help = "Peering TCP port to connect or listen to.") + help = "Remote TCP/UDP port to connect to.") parser.add_option( "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET", default = None, @@ -48,7 +48,6 @@ parser.add_option( "If given, all other connectivity options are ignored, tun_connect will " "simply wait to be killed after passing the file descriptor, and it will be " "the receiver's responsability to handle the tunneling.") - parser.add_option( "-m", "--mode", dest="mode", metavar="MODE", default = "none", @@ -57,6 +56,11 @@ parser.add_option( "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), " "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have " "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).") +parser.add_option( + "-t", "--protocol", dest="protocol", metavar="PROTOCOL", + default = None, + help = + "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.") parser.add_option( "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS", default = None, @@ -69,13 +73,18 @@ parser.add_option( help = "See mode. This specifies the VIF_MASK, " "a number indicating the network type (ie: 24 for a C-class network).") +parser.add_option( + "-P", "--port", dest="port", type="int", metavar="PORT", + default = None, + help = + "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.") parser.add_option( "-S", "--vif-snat", dest="vif_snat", action = "store_true", default = False, help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " ) parser.add_option( - "-P", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR", + "-Z", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR", default = None, help = "See mode. This specifies the remote endpoint's virtual address, " @@ -92,11 +101,11 @@ parser.add_option( help = "This specifies the interface's emulated bandwidth in bytes per second." ) parser.add_option( - "-u", "--udp", dest="udp", metavar="PORT", type="int", + "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS", default = None, help = - "Bind to the specified UDP port locally, and send UDP datagrams to the " - "remote endpoint, creating a tunnel through UDP rather than TCP." ) + "This specifies the PEER_ADDRESS, " + "the IP address of the remote interface.") parser.add_option( "-k", "--key", dest="cipher_key", metavar="KEY", default = None, @@ -190,7 +199,7 @@ parser.add_option( help = "If specified, packets won't be logged to standard output, " "but dumped to a pcap-formatted trace in the specified file. " ) -(options, remaining_args) = parser.parse_args(sys.argv[1:]) +(options,) = parser.parse_args(sys.argv[1:]) options.cipher = { 'aes' : 'AES', @@ -570,7 +579,7 @@ def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = with_pi = options.mode.startswith('pl-'), ether_mode = tun_name.startswith('tap'), cipher_key = options.cipher_key, - udp = options.udp, + udp = options.protocol == 'udp', TERMINATE = TERMINATE, stderr = None, reconnect = reconnect, @@ -711,7 +720,7 @@ try: reconnect = None mcastthread = None - if options.pass_fd: + if options.protocol == 'fd': if accept_packet or filter_init: raise NotImplementedError, "--pass-fd and --filter are not compatible" @@ -749,7 +758,7 @@ try: while not TERM: time.sleep(1) remote = None - elif options.mode.startswith('pl-gre'): + elif options.protocol == "gre": if accept_packet or filter_init: raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,) @@ -759,75 +768,27 @@ try: TERM = TERMINATE while not TERM: time.sleep(1) - remote = remaining_args[0] - elif options.udp: + remote = options.peer_addr + elif options.protocol == "udp": # connect to remote endpoint - if remaining_args and not remaining_args[0].startswith('-'): - print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp) - print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port) - rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - retrydelay = 1.0 - for i in xrange(30): - if TERMINATE: - raise OSError, "Killed" - try: - rsock.bind((hostaddr,options.udp)) - break - except socket.error: - # wait a while, retry - print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) - time.sleep(min(30.0,retrydelay)) - retrydelay *= 1.1 - else: - rsock.bind((hostaddr,options.udp)) - rsock.connect((remaining_args[0],options.port)) + if options.peer_addr and options.peer_port: + remote = tunchannel.udp_establish(TERMINATE, hostaddr, + options.port, options.peer_addr, options.peer_port) else: print >>sys.stderr, "Error: need a remote endpoint in UDP mode" raise AssertionError, "Error: need a remote endpoint in UDP mode" - - # Wait for other peer - tunchannel.udp_handshake(TERMINATE, rsock) - - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - else: + elif options.protocol == "tcp": # connect to remote endpoint - if remaining_args and not remaining_args[0].startswith('-'): - print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port) - rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - retrydelay = 1.0 - for i in xrange(30): - if TERMINATE: - raise OSError, "Killed" - try: - rsock.connect((remaining_args[0],options.port)) - break - except socket.error: - # wait a while, retry - print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) - time.sleep(min(30.0,retrydelay)) - retrydelay *= 1.1 - else: - rsock.connect((remaining_args[0],options.port)) + if options.peer_addr and options.peer_port: + remote = tunchannel.tcp_establish(TERMINATE, hostaddr, + options.port, options.peer_addr, options.peer_port) else: - print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port) - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - retrydelay = 1.0 - for i in xrange(30): - if TERMINATE: - raise OSError, "Killed" - try: - lsock.bind((hostaddr,options.port)) - break - except socket.error: - # wait a while, retry - print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) - time.sleep(min(30.0,retrydelay)) - retrydelay *= 1.1 - else: - lsock.bind((hostaddr,options.port)) - lsock.listen(1) - rsock,raddr = lsock.accept() - remote = os.fdopen(rsock.fileno(), 'r+b', 0) + print >>sys.stderr, "Error: need a remote endpoint in TCP mode" + raise AssertionError, "Error: need a remote endpoint in TCP mode" + else: + msg = "Error: Invalid protocol %s" % options.protocol + print >>sys.stderr, msg + raise AssertionError, msg if filter_init: filter_local, filter_remote = filter_init() diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 84e2d9c6..0f4537a6 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -28,11 +28,9 @@ class TunProtoBase(object): self.key = key self.home_path = home_path - - self._launcher = None + self._started = False - self._started_listening = False - self._starting = False + self._pid = None self._ppid = None self._if_name = None @@ -76,7 +74,6 @@ class TunProtoBase(object): if proc.wait(): raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,) - def _install_scripts(self): local = self.local() @@ -182,12 +179,7 @@ class TunProtoBase(object): if proc.wait(): raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,) - def launch(self, check_proto, listen, extra_args=[]): - if self._starting: - raise AssertionError, "Double start" - - self._starting = True - + def launch(self, check_proto): peer = self.peer() local = self.local() @@ -196,8 +188,8 @@ class TunProtoBase(object): peer_port = peer.tun_port peer_addr = peer.tun_addr - peer_proto= peer.tun_proto - peer_cipher=peer.tun_cipher + peer_proto = peer.tun_proto + peer_cipher = peer.tun_cipher local_port = self.port local_cap = local.capture @@ -206,9 +198,9 @@ class TunProtoBase(object): local_snat = local.snat local_txq = local.txqueuelen local_p2p = local.pointopoint - local_cipher=local.tun_cipher - local_mcast= local.multicast - local_bwlim= local.bwlimit + local_cipher = local.tun_cipher + local_mcast = local.multicast + local_bwlim = local.bwlimit if not local_p2p and hasattr(peer, 'address'): local_p2p = peer.address @@ -219,12 +211,6 @@ class TunProtoBase(object): if local_cipher != peer_cipher: raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher) - if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr): - raise RuntimeError, "Misconfigured peer: %s" % (peer,) - - if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask): - raise RuntimeError, "Misconfigured TUN: %s" % (local,) - if check_proto == 'gre' and local_cipher.lower() != 'plain': raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,) @@ -242,6 +228,7 @@ class TunProtoBase(object): args = ["python", "tun_connect.py", "-m", str(self.mode), + "-t", str(check_proto), "-A", str(local_addr), "-M", str(local_mask), "-C", str(local_cipher)] @@ -258,18 +245,22 @@ class TunProtoBase(object): ]) elif check_proto == 'gre': args.extend([ - "-K", str(min(local_port, peer_port)) + "-K", str(min(local_port, peer_port)), + "-a", str(peer_addr), ]) + # both udp and tcp else: args.extend([ - "-p", str(local_port if listen else peer_port), + "-P", str(local_port), + "-p", str(peer_port), + "-a", str(peer_addr), "-k", str(self.key) ]) if local_snat: args.append("-S") if local_p2p: - args.extend(("-P",str(local_p2p))) + args.extend(("-Z",str(local_p2p))) if local_txq: args.extend(("-Q",str(local_txq))) if not local_cap: @@ -280,10 +271,6 @@ class TunProtoBase(object): args.append("--multicast") if local_bwlim: args.extend(("-b",str(local_bwlim*1024))) - if extra_args: - args.extend(map(str,extra_args)) - if not listen and check_proto != 'fd': - args.append(str(peer_addr)) if filter_module: args.extend(("--filter", filter_module)) if filter_args: @@ -316,35 +303,18 @@ class TunProtoBase(object): if proc.wait(): raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,) - + self._started = True def recover(self): # Tunnel should be still running in its node # Just check its pidfile and we're done self._started = True - self._started_listening = True self.checkpid() - def _launch_and_wait(self, *p, **kw): - try: - self.__launch_and_wait(*p, **kw) - except: - if self._launcher: - import sys - self._launcher._exc.append(sys.exc_info()) - else: - raise - - def __launch_and_wait(self, *p, **kw): + def wait(self): local = self.local() - self.launch(*p, **kw) - - # Wait for the process to be started - while self.status() == rspawn.NOT_STARTED: - time.sleep(1.0) - # Wait for the connection to be established retrytime = 2.0 for spin in xrange(30): @@ -385,9 +355,6 @@ class TunProtoBase(object): ) proc.wait() - if out.strip() == '1': - self._started_listening = True - time.sleep(min(30.0, retrytime)) retrytime *= 1.1 else: @@ -476,38 +443,6 @@ class TunProtoBase(object): return True return False - def async_launch(self, check_proto, listen, extra_args=[]): - if not self._started and not self._launcher: - self._launcher = threading.Thread( - target = self._launch_and_wait, - args = (check_proto, listen, extra_args)) - self._launcher._exc = [] - self._launcher.start() - - def async_launch_wait(self): - if self._launcher: - self._launcher.join() - - if self._launcher._exc: - exctyp,exval,exctrace = self._launcher._exc[0] - raise exctyp,exval,exctrace - elif not self._started: - raise RuntimeError, "Failed to launch TUN forwarder" - elif not self._started: - self.launch() - - def async_launch_wait_listening(self): - if self._launcher: - for x in xrange(180): - if self._launcher._exc: - exctyp,exval,exctrace = self._launcher._exc[0] - raise exctyp,exval,exctrace - elif self._started and self._started_listening: - break - time.sleep(1) - elif not self._started: - self.launch() - def checkpid(self): local = self.local() @@ -605,7 +540,6 @@ class TunProtoBase(object): def remote_trace_path(self, whichtrace, tracemap = None): tracemap = self._TRACEMAP if not tracemap else tracemap - if whichtrace not in tracemap: return None @@ -650,153 +584,61 @@ class TunProtoBase(object): return local_path - - def prepare(self): - """ - First-phase setup - - eg: set up listening ports - """ - raise NotImplementedError - - def setup(self): - """ - Second-phase setup - - eg: connect to peer - """ - raise NotImplementedError - def shutdown(self): - """ - Cleanup - """ - raise NotImplementedError + self.kill() def destroy(self): - """ - Second-phase cleanup - """ - pass - + self.waitkill() class TunProtoUDP(TunProtoBase): - def __init__(self, local, peer, home_path, key, listening): + def __init__(self, local, peer, home_path, key): super(TunProtoUDP, self).__init__(local, peer, home_path, key) - self.listening = listening - - def prepare(self): - pass - - def setup(self): - self.async_launch('udp', False, ("-u",str(self.port))) - def shutdown(self): - self.kill() - - def destroy(self): - self.waitkill() - - def launch(self, check_proto='udp', listen=False, extra_args=None): - if extra_args is None: - extra_args = ("-u",str(self.port)) - super(TunProtoUDP, self).launch(check_proto, listen, extra_args) + def launch(self): + super(TunProtoUDP, self).launch('udp') class TunProtoFD(TunProtoBase): - def __init__(self, local, peer, home_path, key, listening): + def __init__(self, local, peer, home_path, key): super(TunProtoFD, self).__init__(local, peer, home_path, key) - self.listening = listening - - def prepare(self): - pass - - def setup(self): - self.async_launch('fd', False) - def shutdown(self): - self.kill() - - def destroy(self): - self.waitkill() - - def launch(self, check_proto='fd', listen=False, extra_args=[]): - super(TunProtoFD, self).launch(check_proto, listen, extra_args) + def launch(self): + super(TunProtoFD, self).launch('fd') class TunProtoGRE(TunProtoBase): - def __init__(self, local, peer, home_path, key, listening): + def __init__(self, local, peer, home_path, key): super(TunProtoGRE, self).__init__(local, peer, home_path, key) - self.listening = listening self.mode = 'pl-gre-ip' - - def prepare(self): - pass - - def setup(self): - self.async_launch('gre', False) - - def shutdown(self): - self.kill() - def destroy(self): - self.waitkill() - - def launch(self, check_proto='gre', listen=False, extra_args=[]): - super(TunProtoGRE, self).launch(check_proto, listen, extra_args) + def launch(self): + super(TunProtoGRE, self).launch('gre') class TunProtoTCP(TunProtoBase): - def __init__(self, local, peer, home_path, key, listening): + def __init__(self, local, peer, home_path, key): super(TunProtoTCP, self).__init__(local, peer, home_path, key) - self.listening = listening - def prepare(self): - if self.listening: - self.async_launch('tcp', True) - - def setup(self): - if not self.listening: - # make sure our peer is ready - peer = self.peer() - if peer and peer.peer_proto_impl: - peer.peer_proto_impl.async_launch_wait_listening() - - if not self._started: - self.async_launch('tcp', False) - - self.checkpid() - - def shutdown(self): - self.kill() - - def destroy(self): - self.waitkill() - - def launch(self, check_proto='tcp', listen=None, extra_args=[]): - if listen is None: - listen = self.listening - super(TunProtoTCP, self).launch(check_proto, listen, extra_args) + def launch(self): + super(TunProtoTCP, self).launch('tcp') class TapProtoUDP(TunProtoUDP): - def __init__(self, local, peer, home_path, key, listening): - super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening) + def __init__(self, local, peer, home_path, key): + super(TapProtoUDP, self).__init__(local, peer, home_path, key) self.mode = 'pl-tap' class TapProtoTCP(TunProtoTCP): - def __init__(self, local, peer, home_path, key, listening): - super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening) + def __init__(self, local, peer, home_path, key): + super(TapProtoTCP, self).__init__(local, peer, home_path, key) self.mode = 'pl-tap' class TapProtoFD(TunProtoFD): - def __init__(self, local, peer, home_path, key, listening): - super(TapProtoFD, self).__init__(local, peer, home_path, key, listening) + def __init__(self, local, peer, home_path, key): + super(TapProtoFD, self).__init__(local, peer, home_path, key) self.mode = 'pl-tap' class TapProtoGRE(TunProtoGRE): - def __init__(self, local, peer, home_path, key, listening): - super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening) + def __init__(self, local, peer, home_path, key): + super(TapProtoGRE, self).__init__(local, peer, home_path, key) self.mode = 'pl-gre-eth' - - TUN_PROTO_MAP = { 'tcp' : TunProtoTCP, 'udp' : TunProtoUDP, @@ -811,4 +653,3 @@ TAP_PROTO_MAP = { 'gre' : TapProtoGRE, } - diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index c44803f9..fc627df4 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -6,6 +6,7 @@ import socket import threading import errno import fcntl +import random import traceback import functools import collections @@ -197,6 +198,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): crypto_mode = False crypter = None + try: if cipher_key and cipher: import Crypto.Cipher @@ -233,7 +235,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr if rread is None: def rread(remote, maxlen, os_read=os.read): return os_read(remote_fd, maxlen) - + rnonblock = nonblock(remote) tnonblock = nonblock(tun) @@ -326,6 +328,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr remoteok = True + while not TERMINATE: wset = [] if packetReady(bkbuf): @@ -350,6 +353,8 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr if e.args[0] == errno.EINTR: # just retry continue + else: + raise # check for errors if errs: @@ -460,6 +465,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr try: for x in xrange(maxbatch): packet = rread(remote,2000) + #rr += 1 if crypto_mode: @@ -506,6 +512,28 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt) +def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + retrydelay = 1.0 + for i in xrange(30): + # TERMINATE is a array. An item can be added to TERMINATE, from + # outside this function to force termination of the loop + if TERMINATE: + raise OSError, "Killed" + try: + rsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.bind((local_addr, local_port)) + print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port) + print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port) + rsock.connect((peer_addr, peer_port)) + return rsock def udp_handshake(TERMINATE, rsock): endme = False @@ -537,3 +565,176 @@ def udp_handshake(TERMINATE, rsock): endme = True keepalive_thread.join() +def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr, + peer_port) + udp_handshake(TERMINATE, rsock) + return rsock + +def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port): + sock = None + retrydelay = 1.0 + # The peer has a firewall that prevents a response to the connect, we + # will be forever blocked in the connect, so we put a reasonable timeout. + sock.settimeout(10) + # We wait for + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + rsock.connect((peer_addr, peer_port)) + sock = rsock + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.connect((peer_addr, peer_port)) + sock = rsock + if sock: + sock.settimeout(0) + return sock + +def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port): + sock = None + retrydelay = 1.0 + # We try to bind to the local virtual interface. + # It might not exist yet so we wait in a loop. + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + lsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + lsock.bind((local_addr, local_port)) + + # Now we wait until the other side connects. + # The other side might not be ready yet, so we also wait in a loop for timeouts. + timeout = 1 + lsock.listen(1) + for i in xrange(30): + if TERMINATE: + raise OSError, "Killed" + rlist, wlist, xlist = select.select([lsock], [], [], timeout) + if stop: + break + if lsock in rlist: + sock,raddr = lsock.accept() + break + timeout += 5 + return sock + +def tcp_handshake(TERMINATE, sock, listen, dice): + # we are going to use a barrier algorithm to decide wich side listen. + # each side will "roll a dice" and send the resulting value to the other + # side. + result = None + sock.settimeout(5) + for i in xrange(100): + if TERMINATE: + raise OSError, "Killed" + try: + hand = dice.read() + sock.send(hand) + peer_hand = sock.recv(1) + if hand < peer_hand: + if listen: + result = sock + break + elif hand > peer_hand: + if not listen: + result = sock + break + else: + dice.release() + dice.throw() + except socket.error: + dice.release() + break + if result: + sock.settimeout(0) + return result + +def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + def listen(stop, result, lsock, dice): + lsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port) + if lsock: + lsock = tcp_handshake(TERMINATE, lsock, True, dice) + if lsock: + stop[0] = True + result[0] = lsock + + def connect(stop, result, rsock, dice): + rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port) + if rsock: + rsock = tcp_handshake(TERMINATE, rsock, False, dice) + if sock: + stop[0] = True + result[0] = rsock + + dice = Dice() + dice.throw() + stop = [] + result = [] + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice)) + listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice)) + connect_thread.start() + listen_thread.start() + connect_thread.join() + listen_thread.join() + if not result: + raise OSError, "Error: tcp_establish could not establish connection." + sock = result[0] + if sock == lsock: + rsock.close() + else: + lsock.close() + return sock + +class Dice(object): + def __init__(self): + self._condition = threading.Condition(threading.Lock()) + self._readers = 0 + self._value = None + + def read(self): + self._condition.acquire() + try: + self._readers += 1 + finally: + self._condition.release() + return self._value + + def release(self): + self._condition.acquire() + try: + if self._readers > 0: + self._readers -= 1 + if self._readers == 0: + self._condition.notifyAll() + finally: + self._condition.release() + + def throw(self): + self._condition.acquire() + try: + while self._readers > 0: + self._condition.wait() + self._value = random.randint(1, 6) + finally: + self._condition.release() + diff --git a/src/nepi/util/tunchannel_impl.py b/src/nepi/util/tunchannel_impl.py index 1a38aef1..c358429b 100644 --- a/src/nepi/util/tunchannel_impl.py +++ b/src/nepi/util/tunchannel_impl.py @@ -7,7 +7,7 @@ import select import weakref import time -from tunchannel import tun_fwd, udp_handshake +from tunchannel import tun_fwd, udp_establish, tcp_establish class TunChannel(object): """ @@ -30,19 +30,12 @@ class TunChannel(object): tun_key: the agreed upon encryption key. - listen: if set to True (and in TCP mode), it marks a - listening endpoint. Be certain that any TCP connection - is made between a listening and a non-listening - endpoint, or it won't work. - with_pi: set if the incoming packet stream (see tun_socket) contains PI headers - if so, they will be stripped. ethernet_mode: set if the incoming packet stream is composed of ethernet frames (as opposed of IP packets). - udp: set to use UDP datagrams instead of TCP connections. - tun_socket: a socket or file object that can be read from and written to. Packets will be read when available, remote packets will be forwarded as writes. @@ -57,7 +50,6 @@ class TunChannel(object): def __init__(self): # Some operational attributes - self.listen = False self.ethernet_mode = True self.with_pi = False @@ -85,7 +77,7 @@ class TunChannel(object): self._exc = [] # exception store, to relay exceptions from the forwarder thread self._connected = threading.Event() self._forwarder_thread = None - + # trace to stderr self.stderr = sys.stderr @@ -109,23 +101,20 @@ class TunChannel(object): self.tun_cipher, ) - def Prepare(self): - if self.tun_proto: - udp = self.tun_proto == "udp" - if not udp and self.listen and not self._forwarder_thread: - if self.listen or (self.peer_addr and self.peer_port and self.peer_proto): - self._launch() - - def Setup(self): + def launch(self): + # self.tun_proto is only set if the channel is connected + # launch has to be a no-op in unconnected channels because + # it is called at configuration time, which for cross connections + # happens before connection. if self.tun_proto: if not self._forwarder_thread: self._launch() - def Cleanup(self): + def cleanup(self): if self._forwarder_thread: - self.Kill() + self.kill() - def Wait(self): + def wait(self): if self._forwarder_thread: self._connected.wait() for exc in self._exc: @@ -133,7 +122,7 @@ class TunChannel(object): eTyp, eVal, eLoc = exc raise eTyp, eVal, eLoc - def Kill(self): + def kill(self): if self._forwarder_thread: if not self._terminate: self._terminate.append(None) @@ -186,66 +175,31 @@ class TunChannel(object): if local_cipher != peer_cipher: raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher) - udp = local_proto == 'udp' - listen = self.listen - - if (udp or not listen) and (not peer_port or not peer_addr): + if not peer_port or not peer_addr: raise RuntimeError, "Misconfigured peer for: %s" % (self,) - if (udp or listen) and (not local_port or not local_addr): + if not local_port or not local_addr: raise RuntimeError, "Misconfigured TUN: %s" % (self,) TERMINATE = self._terminate cipher_key = self.tun_key tun = self.tun_socket - + udp = local_proto == 'udp' + if not tun: raise RuntimeError, "Unconnected TUN channel %s" % (self,) - - if udp: - # listen on udp port - rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - for i in xrange(30): - try: - rsock.bind((local_addr,local_port)) - break - except socket.error: - # wait a while, retry - time.sleep(1) - else: - rsock.bind((local_addr,local_port)) - rsock.connect((peer_addr,peer_port)) - udp_handshake(TERMINATE, rsock) + + if local_proto == 'udp': + rsock = udp_establish(TERMINATE, local_addr, local_port, + peer_addr, peer_port) remote = os.fdopen(rsock.fileno(), 'r+b', 0) - elif listen: - # accept tcp connections - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - for i in xrange(30): - try: - lsock.bind((local_addr,local_port)) - break - except socket.error: - # wait a while, retry - time.sleep(1) - else: - lsock.bind((local_addr,local_port)) - lsock.listen(1) - rsock,raddr = lsock.accept() + elif local_proto == 'tcp': + rsock = tcp_establish(TERMINATE, local_addr, local_port, + peer_addr, peer_port) remote = os.fdopen(rsock.fileno(), 'r+b', 0) else: - # connect to tcp server - rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - for i in xrange(30): - try: - rsock.connect((peer_addr,peer_port)) - break - except socket.error: - # wait a while, retry - time.sleep(1) - else: - rsock.connect((peer_addr,peer_port)) - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - + raise RuntimeError, "Bad protocol for %s: %r" % (self,local_proto) + # notify that we're ready self._connected.set() @@ -305,20 +259,6 @@ def preconfigure_tunchannel(testbed_instance, guid): if not element.tun_port and element.tun_addr: element.tun_port = 15000 + int(guid) - # First-phase setup - if element.peer_proto: - # cross tun - if not element.tun_addr or not element.tun_port: - listening = True - elif not element.peer_addr or not element.peer_port: - listening = True - else: - # both have addresses... - # ...the one with the lesser address listens - listening = element.tun_addr < element.peer_addr - element.listen = listening - element.Prepare() - def postconfigure_tunchannel(testbed_instance, guid): """ TunChannel preconfiguration. @@ -329,10 +269,8 @@ def postconfigure_tunchannel(testbed_instance, guid): Should be adequate for most implementations. """ element = testbed_instance._elements[guid] - - # Second-phase setup - element.Setup() - + + element.launch() def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data, preconfigure_tunchannel = preconfigure_tunchannel): @@ -353,7 +291,7 @@ def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_da tun.peer_cipher = peer_data.get("tun_cipher") tun.tun_key = min(tun.tun_key, peer_data.get("tun_key")) tun.tun_proto = proto - + preconfigure_tunchannel(testbed_instance, tun_guid) def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data, @@ -374,12 +312,10 @@ def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_d tun.peer_proto = peer_data.get("tun_proto") or proto tun.peer_port = peer_data.get("tun_port") tun.peer_cipher = peer_data.get("tun_cipher") - + postconfigure_tunchannel(testbed_instance, tun_guid) - - -def wait_tunchannel(testbed_instance, guid): +def prestart_tunchannel(testbed_instance, guid): """ Wait for the channel forwarder to be up and running. @@ -387,5 +323,5 @@ def wait_tunchannel(testbed_instance, guid): be certain to start TunChannels before applications that might require them. """ element = testbed_instance.elements[guid] - element.Wait() + element.wait() diff --git a/test/testbeds/planetlab/integration_cross.py b/test/testbeds/planetlab/integration_cross.py index 50d0ed9c..a5e8216e 100755 --- a/test/testbeds/planetlab/integration_cross.py +++ b/test/testbeds/planetlab/integration_cross.py @@ -162,7 +162,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase): ns1.connector("protos").connect(arp1.connector("node")) ns1.connector("protos").connect(icmp1.connector("node")) ns1if = ns3.create("ns3::FdNetDevice") - ns1if.enable_trace("FileDescriptorPcapTrace") + ns1if.enable_trace("FdPcapTrace") ns1if.set_attribute_value("label", "ns1if") ns1tc = ns3.create("ns3::Nepi::TunChannel") ns1.connector("devs").connect(ns1if.connector("node")) diff --git a/test/testbeds/planetlab/integration_ns3.py b/test/testbeds/planetlab/integration_ns3.py index d1d86950..c79e98f0 100755 --- a/test/testbeds/planetlab/integration_ns3.py +++ b/test/testbeds/planetlab/integration_ns3.py @@ -194,7 +194,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): ns1.connector("protos").connect(arp1.connector("node")) ns1.connector("protos").connect(icmp1.connector("node")) ns1if = ns3_desc.create("ns3::FdNetDevice") - ns1if.enable_trace("FileDescriptorPcapTrace") + ns1if.enable_trace("FdPcapTrace") ns1if.set_attribute_value("label", "ns1if") ns1.connector("devs").connect(ns1if.connector("node")) tap1.connector("fd->").connect(ns1if.connector("->fd")) @@ -281,7 +281,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): ns1.connector("protos").connect(arp1.connector("node")) ns1.connector("protos").connect(icmp1.connector("node")) ns1if = ns3_desc.create("ns3::FdNetDevice") - ns1if.enable_trace("FileDescriptorPcapTrace") + ns1if.enable_trace("FdPcapTrace") ns1if.set_attribute_value("label", "ns1if") ns1.connector("devs").connect(ns1if.connector("node")) tap1.connector("fd->").connect(ns1if.connector("->fd")) @@ -394,7 +394,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): ns1.connector("protos").connect(arp1.connector("node")) ns1.connector("protos").connect(icmp1.connector("node")) ns1if = ns3_desc.create("ns3::FdNetDevice") - ns1if.enable_trace("FileDescriptorPcapTrace") + ns1if.enable_trace("FdPcapTrace") ns1if.set_attribute_value("label", "ns1if") ns1.connector("devs").connect(ns1if.connector("node")) tap0.connector("fd->").connect(ns1if.connector("->fd"))