From: Claudio-Daniel Freire Date: Tue, 23 Aug 2011 16:13:19 +0000 (+0200) Subject: - Incomplete multicast support (initial work) X-Git-Tag: nepi-3.0.0~274^2 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=d642d20d4708b957521241f6064d01db3da86789;p=nepi.git - Incomplete multicast support (initial work) - Bandwidth limit for TUN/TAP - Classful queue improvements, especially on tunchannel when using custom queues --- diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index f86026a9..90028f6b 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -155,6 +155,10 @@ class TestbedController(testbed_impl.TestbedController): raise RuntimeError, "PlanetLab account username not set" if not self.authString: raise RuntimeError, "PlanetLab account passphrase not set" + if not self.sliceSSHKey: + raise RuntimeError, "PlanetLab account key not specified" + if not os.path.exists(self.sliceSSHKey): + raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,) self._logger.setLevel(getattr(logging,self.logLevel)) diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index 80f834a3..c853a378 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -128,6 +128,8 @@ class TunIface(object): self.snat = False self.txqueuelen = None self.pointopoint = None + self.multicast = False + self.bwlimit = None # Enabled traces self.capture = False diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index f008c5cc..98c0f423 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -987,6 +987,15 @@ attributes = dict({ "value": False, "validation_function": validation.is_bool }), + "multicast": dict({ + "name": "multicast", + "help": "Enable multicast forwarding on this device. " + "Note that you still need a multicast routing daemon " + "in the node.", + "type": Attribute.BOOL, + "value": False, + "validation_function": validation.is_bool + }), "pointopoint": dict({ "name": "pointopoint", "help": "If the interface is a P2P link, the remote endpoint's IP " @@ -995,6 +1004,14 @@ attributes = dict({ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, "validation_function": validation.is_string }), + "bwlimit": dict({ + "name": "bwlimit", + "help": "Emulated transmission speed (in kbytes per second)", + "type": Attribute.INTEGER, + "range" : (1,10*2**20), + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_integer + }), "txqueuelen": dict({ "name": "txqueuelen", "help": "Transmission queue length (in packets)", @@ -1257,7 +1274,7 @@ factories_info = dict({ "configure_function": postconfigure_tuniface, "prestart_function": wait_tuniface, "box_attributes": [ - "up", "if_name", "mtu", "snat", "pointopoint", + "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit", "txqueuelen", "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", ], @@ -1273,7 +1290,7 @@ factories_info = dict({ "configure_function": postconfigure_tuniface, "prestart_function": wait_tuniface, "box_attributes": [ - "up", "if_name", "mtu", "snat", "pointopoint", + "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit", "txqueuelen", "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", ], diff --git a/src/nepi/testbeds/planetlab/scripts/classqueue.py b/src/nepi/testbeds/planetlab/scripts/classqueue.py index fc799627..0fcae273 100644 --- a/src/nepi/testbeds/planetlab/scripts/classqueue.py +++ b/src/nepi/testbeds/planetlab/scripts/classqueue.py @@ -5,6 +5,11 @@ import re import sys import iovec +dstats = collections.defaultdict(int) +astats = collections.defaultdict(int) +dump_count = [0] + +_red = True _size = 1000 _classes = ( "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:" @@ -73,8 +78,6 @@ class ClassQueue(object): def __init__(self): self.size = _size self.len = 0 - self.stats = collections.defaultdict(int) - self.dump_count = 0 # Prepare classes self.classspec = _parse_classes(_classes) @@ -148,19 +151,36 @@ class ClassQueue(object): rv = self.classmap.get(None) return proto, rv, self.sizemap[rv] - def append(self, packet, len=len): + def get_packetdrop_p(self, qlen, qsize, packet): + pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0 + pdrop *= pdrop + return pdrop + + def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random): proto,qi,size = self.queuefor(packet) q = self.queues[qi] - if len(q) < size: - classes = self.classes - if qi not in classes: - classes.add(qi) - self.cycle_update = True - q.append(packet) - self.len += 1 + lq = len(q) + if lq < size: + dropped = 0 + if lq > (size/2) and _red: + pdrop = self.get_packetdrop_p(lq, size, packet) + if rng() < pdrop: + dropped = 1 + if not dropped: + classes = self.classes + if qi not in classes: + classes.add(qi) + self.cycle_update = True + q.append(packet) + self.len += 1 # packet dropped - elif _logdropped: - self.stats[proto] += 1 + else: + dropped = 1 + if _logdropped: + if dropped: + dstats[proto] += 1 + else: + astats[proto] += 1 self.dump_stats() def appendleft(self, packet): @@ -170,7 +190,7 @@ class ClassQueue(object): def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop): return self.popleft(pop=pop) - def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft): + def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft): queues = self.queues classes = self.classes @@ -206,27 +226,33 @@ class ClassQueue(object): else: raise IndexError, "pop from an empty queue" - def dump_stats(self): - if self.dump_count >= 10000: - stats = "".join(['%s:%s\n' % (key, value) for key, value in self.stats.items()]) + def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count): + if dump_count[0] >= 10000: + dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()]) + astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()]) fd = open('dropped_stats', 'w') - iovec.writev(fd.fileno(), stats) + iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr) fd.close() - self.dump_count = 0 + dump_count[0] = 0 else: - self.dump_count += 1 + dump_count[0] += 1 queueclass = ClassQueue -def init(size = 1000, classes = _classes, logdropped = 'False'): +def init(size = 1000, classes = _classes, logdropped = 'False', red = True): global _size, _classes, _logdropped _size = int(size) _classes = classes + _red = red _logdropped = logdropped.lower() in ('true','1','on') + + if _logdropped: + # Truncate stats + open('dropped_stats', 'w').close() _protomap = { '3pc' : 34, - 'a/n' : 107, + 'an' : 107, 'ah' : 51, 'argus' : 13, 'aris' : 104, diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index 3753631e..ae7a0cc4 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -19,6 +19,7 @@ import base64 import traceback import tunchannel +import ipaddr2 tun_name = 'tun0' tun_path = '/dev/net/tun' @@ -85,6 +86,11 @@ parser.add_option( default = None, help = "See mode. This specifies the interface's transmission queue length. " ) +parser.add_option( + "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int", + default = None, + help = + "This specifies the interface's emulated bandwidth in bytes per second." ) parser.add_option( "-u", "--udp", dest="udp", metavar="PORT", type="int", default = None, @@ -117,6 +123,15 @@ parser.add_option( default = None, help = "If specified, packets won't be logged to standard output, " "but dumped to a pcap-formatted trace in the specified file. " ) +parser.add_option( + "--multicast", dest="multicast", + action = "store_true", + default = False, + help = "If specified, multicast packets will be forwarded and IGMP " + "join/leave packets will be generated. Routing information " + "must be sent to the mroute unix socket, in a format identical " + "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL " + "code and 32-bit data length." ) parser.add_option( "--filter", dest="filter_module", metavar="PATH", default = None, @@ -198,6 +213,54 @@ IFNAMSIZ = 0x00000010 IFREQ_SZ = 0x00000028 FIONREAD = 0x0000541b +class MulticastThread(threading.Thread): + def __init__(self, *p, **kw): + super(MulticastThread, self).__init__(*p, **kw) + self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP) + self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, + socket.inet_aton(options.vif_addr) ) + self._stop = False + self.setDaemon(True) + + def run(self): + devnull = open('/dev/null','r+b') + maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*") + cur_maddr = set() + while not self._stop: + # Get current subscriptions + proc = subprocess.Popen(['ip','maddr','show',tun_name], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = devnull) + new_maddr = set() + for line in proc.stdout: + match = maddr_re.match(line) + if match: + new_maddr.add(match.group(1)) + proc.wait() + + # Notify new subscriptions + for grp in new_maddr - cur_maddr: + self.igmp_socket.sendto( + ipaddr2.igmp(0x16, 0, grp), + 0, + (grp,0)) + + # Notify group leave + for grp in cur_maddr - new_maddr: + self.igmp_socket.sendto( + ipaddr2.igmp(0x17, 0, grp), + 0, + (grp,0)) + + cur_maddr = new_maddr + + time.sleep(1) + + def stop(self): + self._stop = True + self.join(5) + class HostLock(object): # This class is used as a lock to prevent concurrency issues with more # than one instance of netns running in the same machine. Both in @@ -484,7 +547,7 @@ def pl_vif_stop(tun_path, tun_name): del lock, lockfile -def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True): +def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None): global TERMINATE tunqueue = options.vif_txqueuelen or 1000 @@ -506,7 +569,8 @@ def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = accept_local = accept_local, accept_remote = accept_remote, queueclass = queueclass, - slowlocal = slowlocal + slowlocal = slowlocal, + bwlimit = bwlimit ) @@ -634,6 +698,7 @@ signal.signal(signal.SIGTERM, _finalize) try: tcpdump = None reconnect = None + mcastthread = None if options.pass_fd: if accept_packet or filter_init: @@ -812,12 +877,18 @@ try: # Ignore errors, we might not have enough privileges, # or perhaps there is no os.nice support in the system pass + + if options.multicast: + # Start multicast forwarding daemon + mcastthread = MulticastThread() + mcastthread.start() if not filter_init: tun_fwd(tun, remote, reconnect = reconnect, accept_local = accept_packet, accept_remote = accept_packet, + bwlimit = options.bwlimit, slowlocal = True) else: # Hm... @@ -848,6 +919,7 @@ try: tun_fwd(filter_remote_fd, remote, reconnect = reconnect, accept_remote = accept_packet, + bwlimit = options.bwlimit, slowlocal = False) localthread = threading.Thread(target=localside) @@ -867,6 +939,12 @@ finally: # tidy shutdown in every case - swallow exceptions TERMINATE.append(None) + if mcastthread: + try: + mcastthread.stop() + except: + pass + if filter_thread: try: filter_thread.join() diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 15bfa943..84e2d9c6 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -88,10 +88,12 @@ class TunProtoBase(object): # Install the tun_connect script and tunalloc utility from nepi.util import tunchannel + from nepi.util import ipaddr2 sources = [ os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'), os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'), re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific + re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific ] if local.filter_module: filter_sources = filter(bool,map(str.strip,local.filter_module.module.split())) @@ -205,6 +207,8 @@ class TunProtoBase(object): local_txq = local.txqueuelen local_p2p = local.pointopoint local_cipher=local.tun_cipher + local_mcast= local.multicast + local_bwlim= local.bwlimit if not local_p2p and hasattr(peer, 'address'): local_p2p = peer.address @@ -272,6 +276,10 @@ class TunProtoBase(object): args.append("-N") elif local_cap == 'pcap': args.extend(('-c','pcap')) + if local_mcast: + args.append("--multicast") + if local_bwlim: + args.extend(("-b",str(local_bwlim*1024))) if extra_args: args.extend(map(str,extra_args)) if not listen and check_proto != 'fd': diff --git a/src/nepi/util/ipaddr2.py b/src/nepi/util/ipaddr2.py index a01067d0..7211404e 100644 --- a/src/nepi/util/ipaddr2.py +++ b/src/nepi/util/ipaddr2.py @@ -1,6 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import struct +import random +import socket +import array def ipv4_dot2mask(mask): mask = mask.split('.',4) # a.b.c.d -> [a,b,c,d] @@ -43,3 +46,35 @@ def ipdistn(a,b): d -= 1 return d +def inet_cksum(packet): + words = array.array('H') + words.fromstring(packet[:len(packet)&~0x1]) + cksum = sum(words) + if len(packet)&0x1: + cksum += ord(packet[-1]) + cksum = (cksum >> 16) + (cksum & 0xffff) + cksum += (cksum >> 16) + return ~cksum + +def iphdr(src, dst, datalen, ttl, proto): + cksum = 0 + src = socket.inet_aton(src) + dst = socket.inet_aton(dst) + hdr = struct.pack('!BBHHHBBH4s4s', + 0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0, + ttl, proto, cksum & 0xffff, src, dst) + cksum = inet_cksum(hdr) + hdr = struct.pack('!BBHHHBBH4s4s', + 0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0, + ttl, proto, cksum & 0xffff, src, dst) + return hdr + +def igmp(type, mxrt, grp): + cksum = 0 + grp = socket.inet_aton(grp) + ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp) + cksum = inet_cksum(ighdr) + ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp) + return ighdr + + diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index af18de59..49f0e6c5 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -192,8 +192,8 @@ def nonblock(fd): return False def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000, - cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, - len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket, + cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None, + len=len, max=max, min=min, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket, retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): crypto_mode = False try: @@ -295,8 +295,13 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr if queueclass is None: queueclass = collections.deque + maxbatch = 2000 + maxtbatch = 50 else: maxfwbuf = maxbkbuf = 2000000000 + maxbatch = 50 + maxtbatch = 30 + tunhurry = 30 fwbuf = queueclass() bkbuf = queueclass() @@ -314,13 +319,17 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr os_read = os.read os_write = os.write + tget = time.time + maxbwfree = bwfree = 1500 * tunqueue + lastbwtime = tget() + remoteok = True while not TERMINATE: wset = [] if packetReady(bkbuf): wset.append(tun) - if remoteok and packetReady(fwbuf): + if remoteok and packetReady(fwbuf) and (not bwlimit or bwfree > 0): wset.append(remote) rset = [] @@ -363,15 +372,16 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr # check to see if we can write #rr = wr = rt = wt = 0 if remote in wrdy: + sent = 0 try: try: - for x in xrange(2000): + for x in xrange(maxbatch): packet = pullPacket(fwbuf) if crypto_mode: packet = encrypt_(packet, crypter) - rwrite(remote, packet) + sent += rwrite(remote, packet) #wr += 1 if not rnonblock or not packetReady(fwbuf): @@ -396,9 +406,12 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr # in UDP mode, we ignore errors - packet loss man... raise #traceback.print_exc(file=sys.stderr) + + if bwlimit: + bwfree -= sent if tun in wrdy: try: - for x in xrange(50): + for x in xrange(maxtbatch): packet = pullPacket(bkbuf) twrite(tunfd, packet) #wt += 1 @@ -426,7 +439,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr # check incoming data packets if tun in rdrdy: try: - for x in xrange(2000): + for x in xrange(maxbatch): packet = tread(tunfd,2000) # tun.read blocks until it gets 2k! if not packet: continue @@ -444,7 +457,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr if remote in rdrdy: try: try: - for x in xrange(2000): + for x in xrange(maxbatch): packet = rread(remote,2000) #rr += 1 @@ -480,6 +493,15 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr # in UDP mode, we ignore errors - packet loss man... raise traceback.print_exc(file=sys.stderr) + + if bwlimit: + tnow = tget() + delta = tnow - lastbwtime + if delta > 0.001: + delta = int(bwlimit * delta) + if delta > 0: + bwfree = min(bwfree+delta, maxbwfree) + lastbwtime = tnow #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)