From: Claudio-Daniel Freire Date: Fri, 12 Aug 2011 14:25:51 +0000 (+0200) Subject: TUN/TAP filters, initial version, with tests. X-Git-Tag: nepi-3.0.0~306 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=d6f12ebd56bf2f9c8e04d9b78b7e7ea6dbf9452c;p=nepi.git TUN/TAP filters, initial version, with tests. Still not completely right: the acceptance filter seems to be applied twice. Maybe ingress and egress acceptance filters should be separate. --- diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 9cd1299f..ddee991c 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -670,3 +670,6 @@ class TestbedController(testbed_impl.TestbedController): def _make_ns3_dependency(self, parameters): return self._make_generic(parameters, self._app.NS3Dependency) + def _make_tun_filter(self, parameters): + return self._make_generic(parameters, self._interfaces.TunFilter) + diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index 5b4e3a16..251010df 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -10,6 +10,7 @@ import os import os.path import random import ipaddr +import functools import tunproto @@ -135,6 +136,9 @@ class TunIface(object): # These get initialized when the iface is connected to its node self.node = None + # These get initialized when the iface is connected to any filter + self.filter_module = None + # These get initialized when the iface is configured self.external_iface = None @@ -228,6 +232,10 @@ class TunIface(object): raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,) if not self.address or not self.netprefix or not self.netmask: raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,) + if self.filter_module and self.peer_proto not in ('udp','tcp',None): + raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (self,) + if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None): + raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,) def _impl_instance(self, home_path, listening): impl = self._PROTO_MAP[self.peer_proto]( @@ -462,3 +470,54 @@ class NetPipe(object): return local_path +class TunFilter(object): + def __init__(self, api=None): + if not api: + api = plcapi.PLCAPI() + self._api = api + + # Attributes + self.module = None + + # These get initialised when the filter is connected + self.peer_guid = None + self.peer_proto = None + self.iface_guid = None + self.peer = None + self.iface = None + + def _get(what, self): + wref = self.iface + if wref: + wref = wref() + if wref: + return getattr(wref, what) + else: + return None + + def _set(what, self, val): + wref = self.iface + if wref: + wref = wref() + if wref: + setattr(wref, what, val) + + tun_proto = property( + functools.partial(_get, 'tun_proto'), + functools.partial(_set, 'tun_proto') ) + tun_addr = property( + functools.partial(_get, 'tun_addr'), + functools.partial(_set, 'tun_addr') ) + tun_port = property( + functools.partial(_get, 'tun_port'), + functools.partial(_set, 'tun_port') ) + tun_key = property( + functools.partial(_get, 'tun_key'), + functools.partial(_set, 'tun_key') ) + tun_cipher = property( + functools.partial(_get, 'tun_cipher'), + functools.partial(_set, 'tun_cipher') ) + + del _get + del _set + diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index aac4d7b7..2e6adb46 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -16,6 +16,7 @@ from nepi.util.constants import ApplicationStatus as AS, \ import functools import os import os.path +import weakref NODE = "Node" NODEIFACE = "NodeInterface" @@ -27,6 +28,7 @@ NEPIDEPENDENCY = "NepiDependency" NS3DEPENDENCY = "NS3Dependency" INTERNET = "Internet" NETPIPE = "NetPipe" +TUNFILTER = "TunFilter" PL_TESTBED_ID = "planetlab" @@ -103,10 +105,42 @@ def connect_tun_iface_peer(proto, testbed_instance, iface_guid, peer_iface_guid) iface = testbed_instance._elements[iface_guid] peer_iface = testbed_instance._elements[peer_iface_guid] iface.peer_iface = peer_iface + peer_iface.peer_iface = iface iface.peer_proto = \ - iface.tun_proto = proto + iface.tun_proto = \ + peer_iface.peer_proto = \ + peer_iface.tun_proto = proto iface.tun_key = peer_iface.tun_key +def connect_tun_iface_filter(testbed_instance, iface_guid, filter_guid): + iface = testbed_instance._elements[iface_guid] + filt = testbed_instance._elements[filter_guid] + iface.filter_module = filt + filt.iface_guid = iface_guid + filt.iface = weakref.ref(iface) + if filt.peer_guid: + connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid) + +def connect_filter_peer(proto, testbed_instance, filter_guid, peer_guid): + peer = testbed_instance._elements[peer_guid] + filt = testbed_instance._elements[filter_guid] + filt.peer_proto = proto + filt.peer_guid = peer_guid + if filt.iface_guid: + connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid) + +def connect_filter_filter(proto, testbed_instance, filter_guid, peer_guid): + peer = testbed_instance._elements[peer_guid] + filt = testbed_instance._elements[filter_guid] + filt.peer_proto = proto + peer.peer_proto = proto + if filt.iface_guid: + peer.peer_guid = filt.iface_guid + if peer.iface_guid: + filt.peer_guid = peer.iface_guid + if filt.iface_guid and filt.peer_guid: + connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid) + def crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data): iface = testbed_instance._elements[iface_guid] iface.peer_iface = None @@ -133,6 +167,21 @@ def crossconnect_tun_iface_peer_both(proto, testbed_instance, iface_guid, peer_i crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data) crossconnect_tun_iface_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data) +def crossconnect_filter_peer_init(proto, testbed_instance, filter_guid, peer_data): + filt = testbed_instance._elements[filter_guid] + filt.peer_proto = proto + crossconnect_tun_iface_peer_init(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data) + +def crossconnect_filter_peer_compl(proto, testbed_instance, filter_guid, peer_data): + filt = testbed_instance._elements[filter_guid] + filt.peer_proto = proto + crossconnect_tun_iface_peer_compl(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data) + +def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_data): + crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data) + crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data) + + def connect_dep(testbed_instance, node_guid, app_guid): node = testbed_instance._elements[node_guid] app = testbed_instance._elements[app_guid] @@ -220,6 +269,12 @@ def create_tapiface(testbed_instance, guid): testbed_instance.elements[guid] = element +def create_tunfilter(testbed_instance, guid): + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_tun_filter(parameters) + testbed_instance.elements[guid] = element + + def create_application(testbed_instance, guid): parameters = testbed_instance._get_parameters(guid) element = testbed_instance._make_application(parameters) @@ -509,6 +564,12 @@ connector_types = dict({ "max": 1, "min": 0 }), + "->fd": dict({ + "help": "TUN device file descriptor slot", + "name": "->fd", + "max": 1, + "min": 0 + }), }) connections = [ @@ -584,6 +645,24 @@ connections = [ "init_code": functools.partial(connect_tun_iface_peer,"gre"), "can_cross": False }), + dict({ + "from": (TESTBED_ID, TUNIFACE, "fd->"), + "to": (TESTBED_ID, TUNFILTER, "->fd"), + "init_code": connect_tun_iface_filter, + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "tcp"), + "to": (TESTBED_ID, TUNIFACE, "tcp"), + "init_code": functools.partial(connect_filter_peer,"tcp"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "udp"), + "to": (TESTBED_ID, TUNIFACE, "udp"), + "init_code": functools.partial(connect_filter_peer,"udp"), + "can_cross": False + }), dict({ "from": (TESTBED_ID, TAPIFACE, "tcp"), "to": (TESTBED_ID, TAPIFACE, "tcp"), @@ -602,6 +681,36 @@ connections = [ "init_code": functools.partial(connect_tun_iface_peer,"gre"), "can_cross": False }), + dict({ + "from": (TESTBED_ID, TAPIFACE, "fd->"), + "to": (TESTBED_ID, TUNFILTER, "->fd"), + "init_code": connect_tun_iface_filter, + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "tcp"), + "to": (TESTBED_ID, TAPIFACE, "tcp"), + "init_code": functools.partial(connect_filter_peer,"tcp"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "udp"), + "to": (TESTBED_ID, TAPIFACE, "udp"), + "init_code": functools.partial(connect_filter_peer,"udp"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "tcp"), + "to": (TESTBED_ID, TUNFILTER, "tcp"), + "init_code": functools.partial(connect_filter_filter,"tcp"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "udp"), + "to": (TESTBED_ID, TUNFILTER, "udp"), + "init_code": functools.partial(connect_filter_filter,"udp"), + "can_cross": False + }), dict({ "from": (TESTBED_ID, TUNIFACE, "tcp"), "to": (None, None, "tcp"), @@ -656,6 +765,20 @@ connections = [ "compl_code": functools.partial(crossconnect_tun_iface_peer_both,"gre"), "can_cross": True }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "tcp"), + "to": (None, None, "tcp"), + "init_code": functools.partial(crossconnect_filter_peer_init,"tcp"), + "compl_code": functools.partial(crossconnect_filter_peer_compl,"tcp"), + "can_cross": True + }), + dict({ + "from": (TESTBED_ID, TUNFILTER, "udp"), + "to": (None, None, "udp"), + "init_code": functools.partial(crossconnect_filter_peer_init,"udp"), + "compl_code": functools.partial(crossconnect_filter_peer_compl,"udp"), + "can_cross": True + }), ] attributes = dict({ @@ -998,6 +1121,13 @@ attributes = dict({ "range": (0,60000), "validation_function": validation.is_integer, }), + "module": dict({ + "name": "module", + "help": "Path to a .c or .py source for a filter module, or a binary .so", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), }) traces = dict({ @@ -1029,7 +1159,7 @@ traces = dict({ }), }) -create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] +create_order = [ INTERNET, NODE, NODEIFACE, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] @@ -1104,6 +1234,16 @@ factories_info = dict({ "connector_types": ["node","udp","tcp","fd->","gre"], "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES], }), + TUNFILTER: dict({ + "help": "TUN/TAP stream filter", + "category": FC.CATEGORY_CHANNELS, + "create_function": create_tunfilter, + "box_attributes": [ + "module", + "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", + ], + "connector_types": ["->fd","udp","tcp"], + }), APPLICATION: dict({ "help": "Generic executable command line application", "category": FC.CATEGORY_APPLICATIONS, diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.c b/src/nepi/testbeds/planetlab/scripts/plr50.c new file mode 100644 index 00000000..34d8f9ab --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/plr50.c @@ -0,0 +1,7 @@ +#include + +int accept_packet(const char* packet, int direction) +{ + return (rand() > (RAND_MAX/2)); +} + diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.py b/src/nepi/testbeds/planetlab/scripts/plr50.py new file mode 100644 index 00000000..fcfa6878 --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/plr50.py @@ -0,0 +1,6 @@ +import random + +def accept_packet(packet, direction, rng=random.random): + return rng() > 0.5 + + diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index 98fc3112..3e987ec1 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -117,6 +117,45 @@ parser.add_option( default = None, help = "If specified, packets won't be logged to standard output, " "but dumped to a pcap-formatted trace in the specified file. " ) +parser.add_option( + "--filter", dest="filter_module", metavar="PATH", + default = None, + help = "If specified, it should be either a .py or .so module. " + "It will be loaded, and all incoming and outgoing packets " + "will be routed through it. The module will not be responsible " + "for buffering, packet queueing is performed in tun_connect " + "already, so it should not concern itself with it. It should " + "not, however, block in one direction if the other is congested.\n" + "\n" + "Modules are expected to have the following methods:\n" + "\taccept_packet(packet, direction):\n" + "\t\tDecide whether to drop the packet. Direction is 0 for packets " + "coming from the local side to the remote, and 1 is for packets " + "coming from the remote side to the local. Return a boolean, " + "true if the packet is not to be dropped.\n" + "\tfilter_init():\n" + "\t\tInitializes a filtering pipe (filter_run). It should " + "return two file descriptors to use as a bidirectional " + "pipe: local and remote. 'local' is where packets from the " + "local side will be written to. After filtering, those packets " + "should be written to 'remote', where tun_connect will read " + "from, and it will forward them to the remote peer. " + "Packets from the remote peer will be written to 'remote', " + "where the filter is expected to read from, and eventually " + "forward them to the local side. If the file descriptors are " + "not nonblocking, they will be set to nonblocking. So it's " + "better to set them from the start like that.\n" + "\tfilter_run(local, remote):\n" + "\t\tIf filter_init is provided, it will be called repeatedly, " + "in a separate thread until the process is killed. It should " + "sleep at most for a second.\n" + "\tfilter_close(local, remote):\n" + "\t\tCalled then the process is killed, if filter_init was provided. " + "It should, among other things, close the file descriptors.\n" + "\n" + "Python modules are expected to return a tuple in filter_init, " + "either of file descriptors or file objects, while native ones " + "will receive two int*.\n" ) (options, remaining_args) = parser.parse_args(sys.argv[1:]) @@ -425,7 +464,7 @@ def pl_vif_stop(tun_path, tun_name): del lock, lockfile -def tun_fwd(tun, remote, reconnect = None): +def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True): global TERMINATE tunqueue = options.vif_txqueuelen or 1000 @@ -443,7 +482,10 @@ def tun_fwd(tun, remote, reconnect = None): reconnect = reconnect, tunqueue = tunqueue, tunkqueue = tunkqueue, - cipher = options.cipher + cipher = options.cipher, + accept_local = accept_local, + accept_remote = accept_remote, + slowlocal = slowlocal ) @@ -492,6 +534,40 @@ tun_name = options.tun_name modeinfo = MODEINFO[options.mode] +# Try to load filter module +filter_thread = None +if options.filter_module: + if options.filter_module.endswith('.py'): + sys.path.append(os.path.dirname(options.filter_module)) + filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0]) + elif options.filter_module.endswith('.so'): + filter_module = ctypes.cdll.LoadLibrary(options.filter_module) + + try: + accept_packet = filter_module.accept_packet + except: + accept_packet = None + + try: + _filter_init = filter_module.filter_init + filter_run = filter_module.filter_run + filter_close = filter_module.filter_close + + def filter_init(): + filter_local = ctypes.c_int(0) + filter_remote = ctypes.c_int(0) + _filter_init(filter_local, filter_remote) + return filter_local, filter_remote + except: + filter_init = None + filter_run = None + filter_close = None +else: + accept_packet = None + filter_init = None + filter_run = None + filter_close = None + # be careful to roll back stuff on exceptions tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name) try: @@ -518,6 +594,9 @@ try: reconnect = None if options.pass_fd: + if accept_packet or filter_init: + raise NotImplementedError, "--pass-fd and --filter are not compatible" + if options.pass_fd.startswith("base64:"): options.pass_fd = base64.b64decode( options.pass_fd[len("base64:"):]) @@ -545,13 +624,20 @@ try: # just wait forever def tun_fwd(tun, remote, **kw): - while not TERMINATE: + global TERMINATE + TERM = TERMINATE + while not TERM: time.sleep(1) remote = None elif options.mode.startswith('pl-gre'): + if accept_packet or filter_init: + raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,) + # just wait forever def tun_fwd(tun, remote, **kw): - while not TERMINATE: + global TERMINATE + TERM = TERMINATE + while not TERM: time.sleep(1) remote = remaining_args[0] elif options.udp: @@ -620,6 +706,22 @@ try: ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"] + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) ) + if filter_init: + filter_local, filter_remote = filter_init() + + def filter_loop(): + global TERMINATE + TERM = TERMINATE + run = filter_run + local = filter_local + remote = filter_remote + while not TERM: + run(local, remote) + filter_close(local, remote) + + filter_thread = threading.Thread(target=filter_loop) + filter_thread.start() + print >>sys.stderr, "Connected" # Try to give us high priority @@ -630,8 +732,43 @@ try: # or perhaps there is no os.nice support in the system pass - tun_fwd(tun, remote, - reconnect = reconnect) + if not filter_init: + tun_fwd(tun, remote, + reconnect = reconnect, + accept_local = accept_packet, + accept_remote = accept_packet, + slowlocal = True) + else: + # Hm... + # ...ok, we need to: + # 1. Forward packets from tun to filter + # 2. Forward packets from remote to filter + # + # 1. needs TUN rate-limiting, while + # 2. needs reconnection + # + # 1. needs ONLY TUN-side acceptance checks, while + # 2. needs ONLY remote-side acceptance checks + if isinstance(filter_local, ctypes.c_int): + filter_local_fd = filter_local.value + else: + filter_local_fd = filter_local + if isinstance(filter_remote, ctypes.c_int): + filter_remote_fd = filter_remote.value + else: + filter_remote_fd = filter_remote + + def localside(): + tun_fwd(tun, filter_local_fd, + accept_local = accept_packet, + slowlocal = True) + + def remoteside(): + tun_fwd(filter_remote_fd, remote, + reconnect = reconnect, + accept_remote = accept_packet, + slowlocal = False) + finally: try: @@ -641,6 +778,13 @@ finally: pass # tidy shutdown in every case - swallow exceptions + TERMINATE.append(None) + + if filter_thread: + try: + filter_thread.join() + except: + pass try: if tcpdump: diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 9521f384..3f003bab 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -58,7 +58,7 @@ class TunProtoBase(object): # they have to be created for deployment # Also remove pidfile, if there is one. # Old pidfiles from previous runs can be troublesome. - cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % { + cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % { 'home' : server.shell_escape(self.home_path) } (out,err),proc = server.eintr_retry(server.popen_ssh_command)( @@ -90,6 +90,13 @@ class TunProtoBase(object): os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'), re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific ] + if local.filter_module: + filter_sources = filter(bool,map(str.strip,local.filter_module.module.split())) + filter_module = filter_sources[0] + sources.extend(set(filter_sources)) + else: + filter_module = None + filter_sources = None dest = "%s@%s:%s" % ( local.node.slicename, local.node.hostname, os.path.join(self.home_path,'.'),) @@ -118,6 +125,16 @@ class TunProtoBase(object): "python setup.py install --install-lib .. && " "cd .. " + + ( " && " + "gcc -fPIC -shared %(sources)s -o %(module)s.so " % { + 'module' : os.path.basename(filter_module).rsplit('.',1)[0], + 'sources' : ' '.join(map(os.path.basename,filter_sources)) + } + + if filter_module is not None and filter_module.endswith('.c') + else "" + ) + + ( " && " "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && " "mkdir -p python-passfd && " @@ -126,7 +143,8 @@ class TunProtoBase(object): "python setup.py build && " "python setup.py install --install-lib .. " - if local.tun_proto == "fd" else "" + if local.tun_proto == "fd" + else "" ) ) % { @@ -190,6 +208,16 @@ class TunProtoBase(object): if check_proto == 'gre' and local_cipher.lower() != 'plain': raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,) + + if local.filter_module: + if check_proto not in ('udp', 'tcp'): + raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,) + filter_module = filter(bool,map(str.strip,local.filter_module.module.split())) + filter_module = os.path.join('.',os.path.basename(filter_module[0])) + if filter_module.endswith('.c'): + filter_module = filter_module.rsplit('.',1)[0] + '.so' + else: + filter_module = None args = ["python", "tun_connect.py", "-m", str(self.mode), @@ -231,6 +259,8 @@ class TunProtoBase(object): args.extend(map(str,extra_args)) if not listen and check_proto != 'fd': args.append(str(peer_addr)) + if filter_module: + args.extend(("--filter", filter_module)) self._logger.info("Starting %s", self) @@ -353,6 +383,31 @@ class TunProtoBase(object): self._logger.warn("if_name: Could not get interface name") return self._if_name + def if_alive(self): + name = self.if_name + if name: + local = self.local() + for i in xrange(30): + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( + "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,), + host = local.node.hostname, + port = None, + user = local.node.slicename, + agent = None, + ident_key = local.node.ident_path, + server_key = local.node.server_key + ) + + if proc.wait(): + time.sleep(1) + continue + + if out.strip() == 'DEAD': + return False + elif out.strip() == 'ALIVE': + return True + return False + def async_launch(self, check_proto, listen, extra_args=[]): if not self._started and not self._launcher: self._launcher = threading.Thread( @@ -450,6 +505,14 @@ class TunProtoBase(object): break time.sleep(interval) interval = min(30.0, interval * 1.1) + + if self.if_name: + for i in xrange(30): + if not self.if_alive(): + self._logger.info("Device down %s", self) + break + time.sleep(interval) + interval = min(30.0, interval * 1.1) _TRACEMAP = { # tracename : (remotename, localname) diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index 9e34b45f..58048328 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -191,7 +191,7 @@ def nonblock(fd): return False def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000, - cipher='AES', + cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket, retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): crypto_mode = False @@ -261,6 +261,22 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr twrite = os.write tread = os.read + if accept_local is not None: + def tread(fd, maxlen, _tread=tread, accept=accept_local): + packet = _tread(fd, maxlen) + if accept(packet, 0): + return packet + else: + return None + + if accept_remote is not None: + def rread(fd, maxlen, _rread=rread, accept=accept_remote): + packet = _rread(fd, maxlen) + if accept(packet, 1): + return packet + else: + return None + # Limited frame parsing, to preserve packet boundaries. # Which is needed, since /dev/net/tun is unbuffered maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue) @@ -361,11 +377,12 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr # behind. TUN devices discard packets if their queue is full (tunkqueue), but they # don't block either (they're always ready to write), so if we flood the device # we'll have high packet loss. - if not tnonblock or len(bkbuf) < tunhurry or not packetReady(bkbuf): + if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf): break else: - # Give some time for the kernel to process the packets - time.sleep(0) + if slowlocal: + # Give some time for the kernel to process the packets + time.sleep(0) except OSError,e: # This except handles the entire While block on PURPOSE # as an optimization (setting a try/except block is expensive) @@ -381,6 +398,8 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr try: while 1: packet = tread(tunfd,2000) # tun.read blocks until it gets 2k! + if packet is None: + continue #rt += 1 fwbuf.append(packet) @@ -397,6 +416,8 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr try: while 1: packet = rread(remote,2000) + if packet is None: + continue #rr += 1 if crypto_mode: diff --git a/test/testbeds/planetlab/execute.py b/test/testbeds/planetlab/execute.py index 755bd243..4d276863 100755 --- a/test/testbeds/planetlab/execute.py +++ b/test/testbeds/planetlab/execute.py @@ -23,6 +23,15 @@ class PlanetLabExecuteTestCase(unittest.TestCase): port_base = 2000 + (os.getpid() % 1000) * 13 + PLR50_PY = os.path.join( + os.path.dirname(planetlab.__file__), + 'scripts', + 'plr50.py') + PLR50_C = os.path.join( + os.path.dirname(planetlab.__file__), + 'scripts', + 'plr50.c') + def setUp(self): self.root_dir = tempfile.mkdtemp() self.__class__.port_base = self.port_base + 100 @@ -315,7 +324,7 @@ echo 'OKIDOKI' self.assertTrue(netpipe_stats, "Unavailable netpipe stats") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def _pingtest(self, TunClass, ConnectionProto, Cipher): + def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None): instance = self.make_instance() instance.defer_create(2, "Node") @@ -339,18 +348,38 @@ echo 'OKIDOKI' instance.defer_add_trace(8, "packets") instance.defer_add_address(8, "192.168.2.3", 24, False) instance.defer_connect(3, "devs", 8, "node") - instance.defer_connect(7, ConnectionProto, 8, ConnectionProto) instance.defer_create(9, "Application") - instance.defer_create_set(9, "command", "ping -qc1 {#[GUID-8].addr[0].[Address]#}") + instance.defer_create_set(9, "command", "ping -qc10 {#[GUID-8].addr[0].[Address]#}") instance.defer_add_trace(9, "stdout") instance.defer_add_trace(9, "stderr") instance.defer_connect(9, "node", 2, "apps") + + if Filter1: + instance.defer_create(10, "TunFilter") + instance.defer_create_set(10, "module", Filter1) + instance.defer_connect(7, "fd->", 10, "->fd") + + if Filter2: + instance.defer_create(11, "TunFilter") + instance.defer_create_set(11, "module", Filter2) + instance.defer_connect(8, "fd->", 11, "->fd") + + if Filter1 and Filter2: + plr = "[5-9][0-9]" + elif Filter1 or Filter2: + plr = "[3-9][0-9]" + else: + plr = "0" + + instance.defer_connect( + (10 if Filter1 else 7), ConnectionProto, + (11 if Filter2 else 8), ConnectionProto) comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data. --- .* ping statistics --- -1 packets transmitted, 1 received, 0% packet loss, time \d*ms.* -""" +10 packets transmitted, [0-9]+ received, %s%% packet loss, time \d*ms.* +""" % (plr,) try: instance.do_setup() @@ -410,6 +439,22 @@ echo 'OKIDOKI' def test_tap_ping_gre(self): self._pingtest("TapInterface", "gre", "PLAIN") + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_tap_ping_udp_loss1_py(self): + self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_tap_ping_udp_loss2_py(self): + self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_tap_ping_udp_loss1_c(self): + self._pingtest("TapInterface", "udp", "AES", self.PLR50_C) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_tap_ping_udp_loss2_c(self): + self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C) + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_nepi_depends(self): instance = self.make_instance() diff --git a/tunbench.py b/tunbench.py index b8ac97c8..7af0f95c 100644 --- a/tunbench.py +++ b/tunbench.py @@ -21,7 +21,13 @@ def rread(remote, maxlen, remote_fd = remote.fileno(), os_read=os.read): bytes += len(rv) return rv -def test(cipher, passphrase): +def test(cipher, passphrase, plr=None): + if plr: + import random + def accept(packet, direction, rng=random.random): + return rng() > 0.5 + else: + accept = None TERMINATE = [] def stopme(): time.sleep(100) @@ -29,7 +35,8 @@ def test(cipher, passphrase): t = threading.Thread(target=stopme) t.start() tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500, - rwrite = rwrite, rread = rread, cipher=cipher) + rwrite = rwrite, rread = rread, cipher=cipher, + accept_local = accept, accept_remote = accept) # Swallow exceptions on decryption def decrypt(packet, crypter, super=tunchannel.decrypt): @@ -39,6 +46,7 @@ def decrypt(packet, crypter, super=tunchannel.decrypt): return packet tunchannel.decrypt = decrypt +""" for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'): if cipher is None: passphrase = None @@ -51,5 +59,14 @@ for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'): pstats.Stats('tunchannel.%s.profile' % cipher).strip_dirs().sort_stats('time').print_stats() print "Bandwidth (%s): %.4fMb/s" % ( cipher, bytes / 200.0 * 8 / 2**20, ) +""" + +bytes = 0 +cProfile.runctx('test(None,None,0.5)',globals(),locals(),'tunchannel.plr.profile') + +print "Profile (50% PLR):" +pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats() + +print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )