raise RuntimeError, "PlanetLab account username not set"
if not self.authString:
raise RuntimeError, "PlanetLab account passphrase not set"
+ if not self.sliceSSHKey:
+ raise RuntimeError, "PlanetLab account key not specified"
+ if not os.path.exists(self.sliceSSHKey):
+ raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
self._logger.setLevel(getattr(logging,self.logLevel))
self.snat = False
self.txqueuelen = None
self.pointopoint = None
+ self.multicast = False
+ self.bwlimit = None
# Enabled traces
self.capture = False
"value": False,
"validation_function": validation.is_bool
}),
+ "multicast": dict({
+ "name": "multicast",
+ "help": "Enable multicast forwarding on this device. "
+ "Note that you still need a multicast routing daemon "
+ "in the node.",
+ "type": Attribute.BOOL,
+ "value": False,
+ "validation_function": validation.is_bool
+ }),
"pointopoint": dict({
"name": "pointopoint",
"help": "If the interface is a P2P link, the remote endpoint's IP "
"flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
"validation_function": validation.is_string
}),
+ "bwlimit": dict({
+ "name": "bwlimit",
+ "help": "Emulated transmission speed (in kbytes per second)",
+ "type": Attribute.INTEGER,
+ "range" : (1,10*2**20),
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_integer
+ }),
"txqueuelen": dict({
"name": "txqueuelen",
"help": "Transmission queue length (in packets)",
"configure_function": postconfigure_tuniface,
"prestart_function": wait_tuniface,
"box_attributes": [
- "up", "if_name", "mtu", "snat", "pointopoint",
+ "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
"tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
],
"configure_function": postconfigure_tuniface,
"prestart_function": wait_tuniface,
"box_attributes": [
- "up", "if_name", "mtu", "snat", "pointopoint",
+ "up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
"tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
],
import sys
import iovec
+dstats = collections.defaultdict(int)
+astats = collections.defaultdict(int)
+dump_count = [0]
+
+_red = True
_size = 1000
_classes = (
"igmp.ggp.cbt.egp.igp.idrp.mhrp.narp.ospf.eigrp*p1:"
def __init__(self):
self.size = _size
self.len = 0
- self.stats = collections.defaultdict(int)
- self.dump_count = 0
# Prepare classes
self.classspec = _parse_classes(_classes)
rv = self.classmap.get(None)
return proto, rv, self.sizemap[rv]
- def append(self, packet, len=len):
+ def get_packetdrop_p(self, qlen, qsize, packet):
+ pdrop = ((qlen * 1.0 / qsize) - 0.5) * 2.0
+ pdrop *= pdrop
+ return pdrop
+
+ def append(self, packet, len=len, dstats=dstats, astats=astats, rng=random.random):
proto,qi,size = self.queuefor(packet)
q = self.queues[qi]
- if len(q) < size:
- classes = self.classes
- if qi not in classes:
- classes.add(qi)
- self.cycle_update = True
- q.append(packet)
- self.len += 1
+ lq = len(q)
+ if lq < size:
+ dropped = 0
+ if lq > (size/2) and _red:
+ pdrop = self.get_packetdrop_p(lq, size, packet)
+ if rng() < pdrop:
+ dropped = 1
+ if not dropped:
+ classes = self.classes
+ if qi not in classes:
+ classes.add(qi)
+ self.cycle_update = True
+ q.append(packet)
+ self.len += 1
# packet dropped
- elif _logdropped:
- self.stats[proto] += 1
+ else:
+ dropped = 1
+ if _logdropped:
+ if dropped:
+ dstats[proto] += 1
+ else:
+ astats[proto] += 1
self.dump_stats()
def appendleft(self, packet):
def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
return self.popleft(pop=pop)
- def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft):
+ def popleft(self, xrange=xrange, len=len, iter=iter, enumerate=enumerate, zip=zip, pop=collections.deque.popleft):
queues = self.queues
classes = self.classes
else:
raise IndexError, "pop from an empty queue"
- def dump_stats(self):
- if self.dump_count >= 10000:
- stats = "".join(['%s:%s\n' % (key, value) for key, value in self.stats.items()])
+ def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
+ if dump_count[0] >= 10000:
+ dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
+ astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
fd = open('dropped_stats', 'w')
- iovec.writev(fd.fileno(), stats)
+ iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
fd.close()
- self.dump_count = 0
+ dump_count[0] = 0
else:
- self.dump_count += 1
+ dump_count[0] += 1
queueclass = ClassQueue
-def init(size = 1000, classes = _classes, logdropped = 'False'):
+def init(size = 1000, classes = _classes, logdropped = 'False', red = True):
global _size, _classes, _logdropped
_size = int(size)
_classes = classes
+ _red = red
_logdropped = logdropped.lower() in ('true','1','on')
+
+ if _logdropped:
+ # Truncate stats
+ open('dropped_stats', 'w').close()
_protomap = {
'3pc' : 34,
- 'a/n' : 107,
+ 'an' : 107,
'ah' : 51,
'argus' : 13,
'aris' : 104,
import traceback
import tunchannel
+import ipaddr2
tun_name = 'tun0'
tun_path = '/dev/net/tun'
default = None,
help =
"See mode. This specifies the interface's transmission queue length. " )
+parser.add_option(
+ "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
+ default = None,
+ help =
+ "This specifies the interface's emulated bandwidth in bytes per second." )
parser.add_option(
"-u", "--udp", dest="udp", metavar="PORT", type="int",
default = None,
default = None,
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
+parser.add_option(
+ "--multicast", dest="multicast",
+ action = "store_true",
+ default = False,
+ help = "If specified, multicast packets will be forwarded and IGMP "
+ "join/leave packets will be generated. Routing information "
+ "must be sent to the mroute unix socket, in a format identical "
+ "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
+ "code and 32-bit data length." )
parser.add_option(
"--filter", dest="filter_module", metavar="PATH",
default = None,
IFREQ_SZ = 0x00000028
FIONREAD = 0x0000541b
+class MulticastThread(threading.Thread):
+ def __init__(self, *p, **kw):
+ super(MulticastThread, self).__init__(*p, **kw)
+ self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
+ self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
+ socket.inet_aton(options.vif_addr) )
+ self._stop = False
+ self.setDaemon(True)
+
+ def run(self):
+ devnull = open('/dev/null','r+b')
+ maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
+ cur_maddr = set()
+ while not self._stop:
+ # Get current subscriptions
+ proc = subprocess.Popen(['ip','maddr','show',tun_name],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT,
+ stdin = devnull)
+ new_maddr = set()
+ for line in proc.stdout:
+ match = maddr_re.match(line)
+ if match:
+ new_maddr.add(match.group(1))
+ proc.wait()
+
+ # Notify new subscriptions
+ for grp in new_maddr - cur_maddr:
+ self.igmp_socket.sendto(
+ ipaddr2.igmp(0x16, 0, grp),
+ 0,
+ (grp,0))
+
+ # Notify group leave
+ for grp in cur_maddr - new_maddr:
+ self.igmp_socket.sendto(
+ ipaddr2.igmp(0x17, 0, grp),
+ 0,
+ (grp,0))
+
+ cur_maddr = new_maddr
+
+ time.sleep(1)
+
+ def stop(self):
+ self._stop = True
+ self.join(5)
+
class HostLock(object):
# This class is used as a lock to prevent concurrency issues with more
# than one instance of netns running in the same machine. Both in
del lock, lockfile
-def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True):
+def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
global TERMINATE
tunqueue = options.vif_txqueuelen or 1000
accept_local = accept_local,
accept_remote = accept_remote,
queueclass = queueclass,
- slowlocal = slowlocal
+ slowlocal = slowlocal,
+ bwlimit = bwlimit
)
try:
tcpdump = None
reconnect = None
+ mcastthread = None
if options.pass_fd:
if accept_packet or filter_init:
# Ignore errors, we might not have enough privileges,
# or perhaps there is no os.nice support in the system
pass
+
+ if options.multicast:
+ # Start multicast forwarding daemon
+ mcastthread = MulticastThread()
+ mcastthread.start()
if not filter_init:
tun_fwd(tun, remote,
reconnect = reconnect,
accept_local = accept_packet,
accept_remote = accept_packet,
+ bwlimit = options.bwlimit,
slowlocal = True)
else:
# Hm...
tun_fwd(filter_remote_fd, remote,
reconnect = reconnect,
accept_remote = accept_packet,
+ bwlimit = options.bwlimit,
slowlocal = False)
localthread = threading.Thread(target=localside)
# tidy shutdown in every case - swallow exceptions
TERMINATE.append(None)
+ if mcastthread:
+ try:
+ mcastthread.stop()
+ except:
+ pass
+
if filter_thread:
try:
filter_thread.join()
# Install the tun_connect script and tunalloc utility
from nepi.util import tunchannel
+ from nepi.util import ipaddr2
sources = [
os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
+ re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
]
if local.filter_module:
filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
local_txq = local.txqueuelen
local_p2p = local.pointopoint
local_cipher=local.tun_cipher
+ local_mcast= local.multicast
+ local_bwlim= local.bwlimit
if not local_p2p and hasattr(peer, 'address'):
local_p2p = peer.address
args.append("-N")
elif local_cap == 'pcap':
args.extend(('-c','pcap'))
+ if local_mcast:
+ args.append("--multicast")
+ if local_bwlim:
+ args.extend(("-b",str(local_bwlim*1024)))
if extra_args:
args.extend(map(str,extra_args))
if not listen and check_proto != 'fd':
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import struct
+import random
+import socket
+import array
def ipv4_dot2mask(mask):
mask = mask.split('.',4) # a.b.c.d -> [a,b,c,d]
d -= 1
return d
+def inet_cksum(packet):
+ words = array.array('H')
+ words.fromstring(packet[:len(packet)&~0x1])
+ cksum = sum(words)
+ if len(packet)&0x1:
+ cksum += ord(packet[-1])
+ cksum = (cksum >> 16) + (cksum & 0xffff)
+ cksum += (cksum >> 16)
+ return ~cksum
+
+def iphdr(src, dst, datalen, ttl, proto):
+ cksum = 0
+ src = socket.inet_aton(src)
+ dst = socket.inet_aton(dst)
+ hdr = struct.pack('!BBHHHBBH4s4s',
+ 0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0,
+ ttl, proto, cksum & 0xffff, src, dst)
+ cksum = inet_cksum(hdr)
+ hdr = struct.pack('!BBHHHBBH4s4s',
+ 0x45, 0, datalen + 5*32, int(random.random() * 65536) & 0xffff, 0,
+ ttl, proto, cksum & 0xffff, src, dst)
+ return hdr
+
+def igmp(type, mxrt, grp):
+ cksum = 0
+ grp = socket.inet_aton(grp)
+ ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp)
+ cksum = inet_cksum(ighdr)
+ ighdr = struct.pack('!BBH4s', type, mxrt, cksum & 0xffff, grp)
+ return ighdr
+
+
return False
def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
- cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None,
- len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
+ cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None,
+ len=len, max=max, min=min, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
crypto_mode = False
try:
if queueclass is None:
queueclass = collections.deque
+ maxbatch = 2000
+ maxtbatch = 50
else:
maxfwbuf = maxbkbuf = 2000000000
+ maxbatch = 50
+ maxtbatch = 30
+ tunhurry = 30
fwbuf = queueclass()
bkbuf = queueclass()
os_read = os.read
os_write = os.write
+ tget = time.time
+ maxbwfree = bwfree = 1500 * tunqueue
+ lastbwtime = tget()
+
remoteok = True
while not TERMINATE:
wset = []
if packetReady(bkbuf):
wset.append(tun)
- if remoteok and packetReady(fwbuf):
+ if remoteok and packetReady(fwbuf) and (not bwlimit or bwfree > 0):
wset.append(remote)
rset = []
# check to see if we can write
#rr = wr = rt = wt = 0
if remote in wrdy:
+ sent = 0
try:
try:
- for x in xrange(2000):
+ for x in xrange(maxbatch):
packet = pullPacket(fwbuf)
if crypto_mode:
packet = encrypt_(packet, crypter)
- rwrite(remote, packet)
+ sent += rwrite(remote, packet)
#wr += 1
if not rnonblock or not packetReady(fwbuf):
# in UDP mode, we ignore errors - packet loss man...
raise
#traceback.print_exc(file=sys.stderr)
+
+ if bwlimit:
+ bwfree -= sent
if tun in wrdy:
try:
- for x in xrange(50):
+ for x in xrange(maxtbatch):
packet = pullPacket(bkbuf)
twrite(tunfd, packet)
#wt += 1
# check incoming data packets
if tun in rdrdy:
try:
- for x in xrange(2000):
+ for x in xrange(maxbatch):
packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
if not packet:
continue
if remote in rdrdy:
try:
try:
- for x in xrange(2000):
+ for x in xrange(maxbatch):
packet = rread(remote,2000)
#rr += 1
# in UDP mode, we ignore errors - packet loss man...
raise
traceback.print_exc(file=sys.stderr)
+
+ if bwlimit:
+ tnow = tget()
+ delta = tnow - lastbwtime
+ if delta > 0.001:
+ delta = int(bwlimit * delta)
+ if delta > 0:
+ bwfree = min(bwfree+delta, maxbwfree)
+ lastbwtime = tnow
#print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)