- Incomplete multicast support (initial work)
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 23 Aug 2011 16:13:19 +0000 (18:13 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 23 Aug 2011 16:13:19 +0000 (18:13 +0200)
 - Bandwidth limit for TUN/TAP
 - Classful queue improvements, especially on tunchannel when using custom queues

src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/classqueue.py
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/ipaddr2.py
src/nepi/util/tunchannel.py

index f86026a..90028f6 100644 (file)
@@ -155,6 +155,10 @@ class TestbedController(testbed_impl.TestbedController):
             raise RuntimeError, "PlanetLab account username not set"
         if not self.authString:
             raise RuntimeError, "PlanetLab account passphrase not set"
+        if not self.sliceSSHKey:
+            raise RuntimeError, "PlanetLab account key not specified"
+        if not os.path.exists(self.sliceSSHKey):
+            raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
         
         self._logger.setLevel(getattr(logging,self.logLevel))
         
index 80f834a..c853a37 100644 (file)
@@ -128,6 +128,8 @@ class TunIface(object):
         self.snat = False
         self.txqueuelen = None
         self.pointopoint = None
+        self.multicast = False
+        self.bwlimit = None
         
         # Enabled traces
         self.capture = False
index f008c5c..98c0f42 100644 (file)
@@ -987,6 +987,15 @@ attributes = dict({
                 "value": False,
                 "validation_function": validation.is_bool
             }),
+    "multicast":  dict({
+                "name": "multicast", 
+                "help": "Enable multicast forwarding on this device. "
+                        "Note that you still need a multicast routing daemon "
+                        "in the node.",
+                "type": Attribute.BOOL,
+                "value": False,
+                "validation_function": validation.is_bool
+            }),
     "pointopoint":  dict({
                 "name": "pointopoint", 
                 "help": "If the interface is a P2P link, the remote endpoint's IP "
@@ -995,6 +1004,14 @@ attributes = dict({
                 "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
                 "validation_function": validation.is_string
             }),
+    "bwlimit":  dict({
+                "name": "bwlimit", 
+                "help": "Emulated transmission speed (in kbytes per second)",
+                "type": Attribute.INTEGER,
+                "range" : (1,10*2**20),
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_integer
+            }),
     "txqueuelen":  dict({
                 "name": "txqueuelen", 
                 "help": "Transmission queue length (in packets)",
@@ -1257,7 +1274,7 @@ factories_info = dict({
             "configure_function": postconfigure_tuniface,
             "prestart_function": wait_tuniface,
             "box_attributes": [
-                "up", "if_name", "mtu", "snat", "pointopoint",
+                "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
                 "txqueuelen",
                 "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
             ],
@@ -1273,7 +1290,7 @@ factories_info = dict({
             "configure_function": postconfigure_tuniface,
             "prestart_function": wait_tuniface,
             "box_attributes": [
-                "up", "if_name", "mtu", "snat", "pointopoint",
+                "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
                 "txqueuelen",
                 "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
             ],
index fc79962..0fcae27 100644 (file)
@@ -5,6 +5,11 @@ import re
 import sys
 import iovec
 
+dstats = collections.defaultdict(int)
+astats = collections.defaultdict(int)
+dump_count = [0]
+
+_red = True
 _size = 1000
 _classes = (
     "igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
@@ -73,8 +78,6 @@ class ClassQueue(object):
     def __init__(self):
         self.size = _size
         self.len = 0
-        self.stats = collections.defaultdict(int)
-        self.dump_count = 0
 
         # Prepare classes
         self.classspec = _parse_classes(_classes)
@@ -148,19 +151,36 @@ class ClassQueue(object):
             rv = self.classmap.get(None)
         return proto, rv, self.sizemap[rv]
     
-    def append(self, packet, len=len):
+    def get_packetdrop_p(self, qlen, qsize, packet):
+        pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
+        pdrop *= pdrop
+        return pdrop
+    
+    def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
         proto,qi,size = self.queuefor(packet)
         q = self.queues[qi]
-        if len(q) < size:
-            classes = self.classes
-            if qi not in classes:
-                classes.add(qi)
-                self.cycle_update = True
-            q.append(packet)
-            self.len += 1
+        lq = len(q)
+        if lq < size:
+            dropped = 0
+            if lq > (size/2) and _red:
+                pdrop = self.get_packetdrop_p(lq, size, packet)
+                if rng() < pdrop:
+                    dropped = 1
+            if not dropped:
+                classes = self.classes
+                if qi not in classes:
+                    classes.add(qi)
+                    self.cycle_update = True
+                q.append(packet)
+                self.len += 1
         # packet dropped
-        elif _logdropped:
-            self.stats[proto] += 1
+        else:
+            dropped = 1
+        if _logdropped:
+            if dropped:
+                dstats[proto] += 1
+            else:
+                astats[proto] += 1
             self.dump_stats()
 
     def appendleft(self, packet):
@@ -170,7 +190,7 @@ class ClassQueue(object):
     def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
         return self.popleft(pop=pop)
     
-    def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft):
+    def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
         queues = self.queues
         classes = self.classes
 
@@ -206,27 +226,33 @@ class ClassQueue(object):
         else:
             raise IndexError, "pop from an empty queue"
 
-    def dump_stats(self):
-        if self.dump_count >= 10000:
-            stats = "".join(['%s:%s\n' % (key, value) for key, value in self.stats.items()])
+    def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
+        if dump_count[0] >= 10000:
+            dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
+            astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
             fd = open('dropped_stats', 'w')
-            iovec.writev(fd.fileno(), stats)
+            iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
             fd.close()
-            self.dump_count = 0
+            dump_count[0] = 0
         else:
-            self.dump_count += 1
+            dump_count[0] += 1
 
 queueclass = ClassQueue
 
-def init(size = 1000, classes = _classes, logdropped = 'False'):
+def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
     global _size, _classes, _logdropped
     _size = int(size)
     _classes = classes
+    _red = red
     _logdropped = logdropped.lower() in ('true','1','on')
+    
+    if _logdropped:
+        # Truncate stats
+        open('dropped_stats', 'w').close()
 
 _protomap = {
     '3pc'      :       34,
-    'a/n'      :       107,
+    'an'       :       107,
     'ah'       :       51,
     'argus'    :       13,
     'aris'     :       104,
index 3753631..ae7a0cc 100644 (file)
@@ -19,6 +19,7 @@ import base64
 import traceback
 
 import tunchannel
+import ipaddr2
 
 tun_name = 'tun0'
 tun_path = '/dev/net/tun'
@@ -85,6 +86,11 @@ parser.add_option(
     default = None,
     help = 
         "See mode. This specifies the interface's transmission queue length. " )
+parser.add_option(
+    "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
+    default = None,
+    help = 
+        "This specifies the interface's emulated bandwidth in bytes per second." )
 parser.add_option(
     "-u", "--udp", dest="udp", metavar="PORT", type="int",
     default = None,
@@ -117,6 +123,15 @@ parser.add_option(
     default = None,
     help = "If specified, packets won't be logged to standard output, "
            "but dumped to a pcap-formatted trace in the specified file. " )
+parser.add_option(
+    "--multicast", dest="multicast", 
+    action = "store_true",
+    default = False,
+    help = "If specified, multicast packets will be forwarded and IGMP "
+           "join/leave packets will be generated. Routing information "
+           "must be sent to the mroute unix socket, in a format identical "
+           "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
+           "code and 32-bit data length." )
 parser.add_option(
     "--filter", dest="filter_module", metavar="PATH",
     default = None,
@@ -198,6 +213,54 @@ IFNAMSIZ = 0x00000010
 IFREQ_SZ = 0x00000028
 FIONREAD = 0x0000541b
 
+class MulticastThread(threading.Thread):
+    def __init__(self, *p, **kw):
+        super(MulticastThread, self).__init__(*p, **kw)
+        self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
+        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
+            socket.inet_aton(options.vif_addr) )
+        self._stop = False
+        self.setDaemon(True)
+    
+    def run(self):
+        devnull = open('/dev/null','r+b')
+        maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
+        cur_maddr = set()
+        while not self._stop:
+            # Get current subscriptions
+            proc = subprocess.Popen(['ip','maddr','show',tun_name],
+                stdout = subprocess.PIPE,
+                stderr = subprocess.STDOUT,
+                stdin = devnull)
+            new_maddr = set()
+            for line in proc.stdout:
+                match = maddr_re.match(line)
+                if match:
+                    new_maddr.add(match.group(1))
+            proc.wait()
+            
+            # Notify new subscriptions
+            for grp in new_maddr - cur_maddr:
+                self.igmp_socket.sendto(
+                    ipaddr2.igmp(0x16, 0, grp), 
+                    0, 
+                    (grp,0))
+
+            # Notify group leave
+            for grp in cur_maddr - new_maddr:
+                self.igmp_socket.sendto(
+                    ipaddr2.igmp(0x17, 0, grp), 
+                    0, 
+                    (grp,0))
+
+            cur_maddr = new_maddr
+            
+            time.sleep(1)
+    
+    def stop(self):
+        self._stop = True
+        self.join(5)
+
 class HostLock(object):
     # This class is used as a lock to prevent concurrency issues with more
     # than one instance of netns running in the same machine. Both in 
@@ -484,7 +547,7 @@ def pl_vif_stop(tun_path, tun_name):
     del lock, lockfile
 
 
-def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True):
+def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
     global TERMINATE
     
     tunqueue = options.vif_txqueuelen or 1000
@@ -506,7 +569,8 @@ def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote =
         accept_local = accept_local,
         accept_remote = accept_remote,
         queueclass = queueclass,
-        slowlocal = slowlocal
+        slowlocal = slowlocal,
+        bwlimit = bwlimit
     )
 
 
@@ -634,6 +698,7 @@ signal.signal(signal.SIGTERM, _finalize)
 try:
     tcpdump = None
     reconnect = None
+    mcastthread = None
     
     if options.pass_fd:
         if accept_packet or filter_init:
@@ -812,12 +877,18 @@ try:
         # Ignore errors, we might not have enough privileges,
         # or perhaps there is no os.nice support in the system
         pass
+    
+    if options.multicast:
+        # Start multicast forwarding daemon
+        mcastthread = MulticastThread()
+        mcastthread.start()
 
     if not filter_init:
         tun_fwd(tun, remote,
             reconnect = reconnect,
             accept_local = accept_packet,
             accept_remote = accept_packet,
+            bwlimit = options.bwlimit,
             slowlocal = True)
     else:
         # Hm...
@@ -848,6 +919,7 @@ try:
             tun_fwd(filter_remote_fd, remote,
                 reconnect = reconnect,
                 accept_remote = accept_packet,
+                bwlimit = options.bwlimit,
                 slowlocal = False)
         
         localthread = threading.Thread(target=localside)
@@ -867,6 +939,12 @@ finally:
     # tidy shutdown in every case - swallow exceptions
     TERMINATE.append(None)
     
+    if mcastthread:
+        try:
+            mcastthread.stop()
+        except:
+            pass
+    
     if filter_thread:
         try:
             filter_thread.join()
index 15bfa94..84e2d9c 100644 (file)
@@ -88,10 +88,12 @@ class TunProtoBase(object):
         
         # Install the tun_connect script and tunalloc utility
         from nepi.util import tunchannel
+        from nepi.util import ipaddr2
         sources = [
             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
+            re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
         ]
         if local.filter_module:
             filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
@@ -205,6 +207,8 @@ class TunProtoBase(object):
         local_txq  = local.txqueuelen
         local_p2p  = local.pointopoint
         local_cipher=local.tun_cipher
+        local_mcast= local.multicast
+        local_bwlim= local.bwlimit
         
         if not local_p2p and hasattr(peer, 'address'):
             local_p2p = peer.address
@@ -272,6 +276,10 @@ class TunProtoBase(object):
             args.append("-N")
         elif local_cap == 'pcap':
             args.extend(('-c','pcap'))
+        if local_mcast:
+            args.append("--multicast")
+        if local_bwlim:
+            args.extend(("-b",str(local_bwlim*1024)))
         if extra_args:
             args.extend(map(str,extra_args))
         if not listen and check_proto != 'fd':
index a01067d..7211404 100644 (file)
@@ -1,6 +1,9 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import struct
+import random
+import socket
+import array
 
 def ipv4_dot2mask(mask):
     mask = mask.split('.',4) # a.b.c.d -> [a,b,c,d]
@@ -43,3 +46,35 @@ def ipdistn(a,b):
         d -= 1
     return d
 
+def inet_cksum(packet):
+    words = array.array('H')
+    words.fromstring(packet[:len(packet)&~0x1])
+    cksum = sum(words)
+    if len(packet)&0x1:
+       cksum += ord(packet[-1])
+    cksum = (cksum >> 16) + (cksum & 0xffff)
+    cksum += (cksum >> 16)
+    return ~cksum
+
+def iphdr(src, dst, datalen, ttl, proto):
+    cksum = 0
+    src = socket.inet_aton(src)
+    dst = socket.inet_aton(dst)
+    hdr = struct.pack('!BBHHHBBH4s4s', 
+        0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0, 
+        ttl, proto, cksum & 0xffff, src, dst)
+    cksum = inet_cksum(hdr)
+    hdr = struct.pack('!BBHHHBBH4s4s', 
+        0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0, 
+        ttl, proto, cksum & 0xffff, src, dst)
+    return hdr
+
+def igmp(type, mxrt, grp):
+    cksum = 0
+    grp = socket.inet_aton(grp)
+    ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp)
+    cksum = inet_cksum(ighdr)
+    ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp)
+    return ighdr
+
+
index af18de5..49f0e6c 100644 (file)
@@ -192,8 +192,8 @@ def nonblock(fd):
         return False
 
 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
-        cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None,
-        len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
+        cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None,
+        len=len, max=max, min=min, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
     crypto_mode = False
     try:
@@ -295,8 +295,13 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
     
     if queueclass is None:
         queueclass = collections.deque
+        maxbatch = 2000
+        maxtbatch = 50
     else:
         maxfwbuf = maxbkbuf = 2000000000
+        maxbatch = 50
+        maxtbatch = 30
+        tunhurry = 30
     
     fwbuf = queueclass()
     bkbuf = queueclass()
@@ -314,13 +319,17 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
     os_read = os.read
     os_write = os.write
     
+    tget = time.time
+    maxbwfree = bwfree = 1500 * tunqueue
+    lastbwtime = tget()
+    
     remoteok = True
     
     while not TERMINATE:
         wset = []
         if packetReady(bkbuf):
             wset.append(tun)
-        if remoteok and packetReady(fwbuf):
+        if remoteok and packetReady(fwbuf) and (not bwlimit or bwfree > 0):
             wset.append(remote)
         
         rset = []
@@ -363,15 +372,16 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
         # check to see if we can write
         #rr = wr = rt = wt = 0
         if remote in wrdy:
+            sent = 0
             try:
                 try:
-                    for x in xrange(2000):
+                    for x in xrange(maxbatch):
                         packet = pullPacket(fwbuf)
 
                         if crypto_mode:
                             packet = encrypt_(packet, crypter)
                         
-                        rwrite(remote, packet)
+                        sent += rwrite(remote, packet)
                         #wr += 1
                         
                         if not rnonblock or not packetReady(fwbuf):
@@ -396,9 +406,12 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
                     # in UDP mode, we ignore errors - packet loss man...
                     raise
                 #traceback.print_exc(file=sys.stderr)
+            
+            if bwlimit:
+                bwfree -= sent
         if tun in wrdy:
             try:
-                for x in xrange(50):
+                for x in xrange(maxtbatch):
                     packet = pullPacket(bkbuf)
                     twrite(tunfd, packet)
                     #wt += 1
@@ -426,7 +439,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
         # check incoming data packets
         if tun in rdrdy:
             try:
-                for x in xrange(2000):
+                for x in xrange(maxbatch):
                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
                     if not packet:
                         continue
@@ -444,7 +457,7 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
         if remote in rdrdy:
             try:
                 try:
-                    for x in xrange(2000):
+                    for x in xrange(maxbatch):
                         packet = rread(remote,2000)
                         #rr += 1
                         
@@ -480,6 +493,15 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
                     # in UDP mode, we ignore errors - packet loss man...
                     raise
                 traceback.print_exc(file=sys.stderr)
+
+        if bwlimit:
+            tnow = tget()
+            delta = tnow - lastbwtime
+            if delta > 0.001:
+                delta = int(bwlimit * delta)
+                if delta > 0:
+                    bwfree = min(bwfree+delta, maxbwfree)
+                    lastbwtime = tnow
         
         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)