Adding tuncahnnel and pl-vif-tunnconnect scripts
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 15 Jul 2013 05:39:44 +0000 (22:39 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 15 Jul 2013 05:39:44 +0000 (22:39 -0700)
setup.py
src/nepi/resources/all/scripts/tunchannel.py [new file with mode: 0644]
src/nepi/resources/linux/application.py
src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp [deleted file]
src/nepi/resources/planetlab/scripts/pl-vif-create.py
src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py [new file with mode: 0644]
src/nepi/resources/planetlab/tap.py
test/execution/resource.py
test/resources/linux/nping.py [changed mode: 0644->0755]
test/resources/planetlab/tap.py

index bebb10a..407e842 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -26,5 +26,8 @@ setup(
             "nepi.resources.planetlab",
             "nepi.util"],
         package_dir = {"": "src"},
-        package_data = {"nepi.resources.planetlab" : [ "scripts/*.py" ]}
+        package_data = {
+            "nepi.resources.planetlab" : [ "scripts/*.py" ],
+            "nepi.resources.all" : [ "scripts/*.py" ]
+            }
     )
diff --git a/src/nepi/resources/all/scripts/tunchannel.py b/src/nepi/resources/all/scripts/tunchannel.py
new file mode 100644 (file)
index 0000000..cd30b8b
--- /dev/null
@@ -0,0 +1,792 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Claudio Freire <claudio-daniel.freire@inria.fr>
+#
+
+
+import select
+import sys
+import os
+import struct
+import socket
+import threading
+import traceback
+import errno
+import fcntl
+import random
+import traceback
+import functools
+import collections
+import ctypes
+import time
+
+def ipfmt(ip):
+    ipbytes = map(ord,ip.decode("hex"))
+    return '.'.join(map(str,ipbytes))
+
+tagtype = {
+    '0806' : 'arp',
+    '0800' : 'ipv4',
+    '8870' : 'jumbo',
+    '8863' : 'PPPoE discover',
+    '8864' : 'PPPoE',
+    '86dd' : 'ipv6',
+}
+
+def etherProto(packet, len=len):
+    if len(packet) > 14:
+        if packet[12] == "\x81" and packet[13] == "\x00":
+            # tagged
+            return packet[16:18]
+        else:
+            # untagged
+            return packet[12:14]
+    # default: ip
+    return "\x08\x00"
+
+def formatPacket(packet, ether_mode):
+    if ether_mode:
+        stripped_packet = etherStrip(packet)
+        if not stripped_packet:
+            packet = packet.encode("hex")
+            if len(packet) < 28:
+                return "malformed eth " + packet.encode("hex")
+            else:
+                if packet[24:28] == "8100":
+                    # tagged
+                    ethertype = tagtype.get(packet[32:36], 'eth')
+                    return ethertype + " " + ( '-'.join( (
+                        packet[0:12], # MAC dest
+                        packet[12:24], # MAC src
+                        packet[24:32], # VLAN tag
+                        packet[32:36], # Ethertype/len
+                        packet[36:], # Payload
+                    ) ) )
+                else:
+                    # untagged
+                    ethertype = tagtype.get(packet[24:28], 'eth')
+                    return ethertype + " " + ( '-'.join( (
+                        packet[0:12], # MAC dest
+                        packet[12:24], # MAC src
+                        packet[24:28], # Ethertype/len
+                        packet[28:], # Payload
+                    ) ) )
+        else:
+            packet = stripped_packet
+    packet = packet.encode("hex")
+    if len(packet) < 48:
+        return "malformed ip " + packet
+    else:
+        return "ip " + ( '-'.join( (
+            packet[0:1], #version
+            packet[1:2], #header length
+            packet[2:4], #diffserv/ECN
+            packet[4:8], #total length
+            packet[8:12], #ident
+            packet[12:16], #flags/fragment offs
+            packet[16:18], #ttl
+            packet[18:20], #ip-proto
+            packet[20:24], #checksum
+            ipfmt(packet[24:32]), # src-ip
+            ipfmt(packet[32:40]), # dst-ip
+            packet[40:48] if (int(packet[1],16) > 5) else "", # options
+            packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
+        ) ) )
+
+def _packetReady(buf, ether_mode=False, len=len, str=str):
+    if not buf:
+        return False
+        
+    rv = False
+    while not rv:
+        if len(buf[0]) < 4:
+            rv = False
+        elif ether_mode:
+            rv = True
+        else:
+            _,totallen = struct.unpack('HH',buf[0][:4])
+            totallen = socket.htons(totallen)
+            rv = len(buf[0]) >= totallen
+        if not rv and len(buf) > 1:
+            # collapse only first two buffers
+            # as needed, to mantain len(buf) meaningful
+            p1 = buf.popleft()
+            buf[0] = p1+str(buf[0])
+        else:
+            return rv
+    return rv
+
+def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
+    if ether_mode:
+        return buf.popleft()
+    else:
+        _,totallen = struct.unpack('HH',buf[0][:4])
+        totallen = socket.htons(totallen)
+        if len(buf[0]) > totallen:
+            rv = buffer(buf[0],0,totallen)
+            buf[0] = buffer(buf[0],totallen)
+        else:
+            rv = buf.popleft()
+        return rv
+
+def etherStrip(buf, buffer=buffer, len=len):
+    if len(buf) < 14:
+        return ""
+    if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
+        # tagged ethernet frame
+        return buffer(buf, 18)
+    elif buf[12:14] == '\x08\x00':
+        # untagged ethernet frame
+        return buffer(buf, 14)
+    else:
+        return ""
+
+def etherWrap(packet):
+    return ''.join((
+        "\x00"*6*2 # bogus src and dst mac
+        +"\x08\x00", # IPv4
+        packet, # payload
+        "\x00"*4, # bogus crc
+    ))
+
+def piStrip(buf, len=len):
+    if len(buf) < 4:
+        return buf
+    else:
+        return buffer(buf,4)
+    
+def piWrap(buf, ether_mode, etherProto=etherProto):
+    if ether_mode:
+        proto = etherProto(buf)
+    else:
+        proto = "\x08\x00"
+    return ''.join((
+        "\x00\x00", # PI: 16 bits flags
+        proto, # 16 bits proto
+        buf,
+    ))
+
+_padmap = [ chr(padding) * padding for padding in xrange(127) ]
+del padding
+
+def encrypt(packet, crypter, len=len, padmap=_padmap):
+    # pad
+    padding = crypter.block_size - len(packet) % crypter.block_size
+    packet += padmap[padding]
+    
+    # encrypt
+    return crypter.encrypt(packet)
+
+def decrypt(packet, crypter, ord=ord):
+    if packet:
+        # decrypt
+        packet = crypter.decrypt(packet)
+        
+        # un-pad
+        padding = ord(packet[-1])
+        if not (0 < padding <= crypter.block_size):
+            # wrong padding
+            raise RuntimeError, "Truncated packet %s"
+        packet = packet[:-padding]
+    
+    return packet
+
+def nonblock(fd):
+    try:
+        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+        fl |= os.O_NONBLOCK
+        fcntl.fcntl(fd, fcntl.F_SETFL, fl)
+        return True
+    except:
+        traceback.print_exc(file=sys.stderr)
+        # Just ignore
+        return False
+
+def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
+        stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
+        tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, 
+        accept_remote = None, slowlocal = True, queueclass = None, 
+        bwlimit = None, len = len, max = max, min = min, buffer = buffer,
+        OSError = OSError, select = select.select, selecterror = select.error, 
+        os = os, socket = socket,
+        retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
+    crypto_mode = False
+    crypter = None
+
+    try:
+        if cipher_key and cipher:
+            import Crypto.Cipher
+            import hashlib
+            __import__('Crypto.Cipher.'+cipher)
+            
+            ciphername = cipher
+            cipher = getattr(Crypto.Cipher, cipher)
+            hashed_key = hashlib.sha256(cipher_key).digest()
+
+            if ciphername == 'AES':
+                hashed_key = hashed_key[:16]
+            elif ciphername == 'Blowfish':
+                hashed_key = hashed_key[:24]
+            elif ciphername == 'DES':
+                hashed_key = hashed_key[:8]
+            elif ciphername == 'DES3':
+                hashed_key = hashed_key[:24]
+
+            crypter = cipher.new(
+                hashed_key, 
+                cipher.MODE_ECB)
+            crypto_mode = True
+    except:
+        # We don't want decription to work only on one side,
+        # This could break things really bad
+        #crypto_mode = False
+        #crypter = None
+        traceback.print_exc(file=sys.stderr)
+        raise
+
+    if stderr is not None:
+        if crypto_mode:
+            print >>stderr, "Packets are transmitted in CIPHER"
+        else:
+            print >>stderr, "Packets are transmitted in PLAINTEXT"
+    
+    if hasattr(remote, 'fileno'):
+        remote_fd = remote.fileno()
+        if rwrite is None:
+            def rwrite(remote, packet, os_write=os.write):
+                return os_write(remote_fd, packet)
+        if rread is None:
+            def rread(remote, maxlen, os_read=os.read):
+                return os_read(remote_fd, maxlen)
+    rnonblock = nonblock(remote)
+    tnonblock = nonblock(tun)
+    
+    # Pick up TUN/TAP writing method
+    if with_pi:
+        try:
+            import iovec
+            
+            # We have iovec, so we can skip PI injection
+            # and use iovec which does it natively
+            if ether_mode:
+                twrite = iovec.ethpiwrite
+                tread = iovec.piread2
+            else:
+                twrite = iovec.ippiwrite
+                tread = iovec.piread2
+        except ImportError:
+            # We have to inject PI headers pythonically
+            def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
+                return oswrite(fd, piWrap(packet, ether_mode))
+            
+            # For reading, we strip PI headers with buffer slicing and that's it
+            def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
+                return piStrip(osread(fd, maxlen))
+    else:
+        # No need to inject PI headers
+        twrite = os.write
+        tread = os.read
+    
+    encrypt_ = encrypt
+    decrypt_ = decrypt
+    xrange_ = xrange
+
+    if accept_local is not None:
+        def tread(fd, maxlen, _tread=tread, accept=accept_local):
+            packet = _tread(fd, maxlen)
+            if accept(packet, 0):
+                return packet
+            else:
+                return None
+
+    if accept_remote is not None:
+        if crypto_mode:
+            def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
+                packet = decrypt_(packet, crypter)
+                if accept(packet, 1):
+                    return packet
+                else:
+                    return None
+        else:
+            def rread(fd, maxlen, _rread=rread, accept=accept_remote):
+                packet = _rread(fd, maxlen)
+                if accept(packet, 1):
+                    return packet
+                else:
+                    return None
+    
+    maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
+    tunhurry = max(0,maxbkbuf/2)
+    
+    if queueclass is None:
+        queueclass = collections.deque
+        maxbatch = 2000
+        maxtbatch = 50
+    else:
+        maxfwbuf = maxbkbuf = 2000000000
+        maxbatch = 50
+        maxtbatch = 30
+        tunhurry = 30
+    
+    fwbuf = queueclass()
+    bkbuf = queueclass()
+    nfwbuf = 0
+    nbkbuf = 0
+    
+    # backwards queue functions
+    # they may need packet inspection to 
+    # reconstruct packet boundaries
+    if ether_mode or udp:
+        packetReady = bool
+        pullPacket = queueclass.popleft
+        reschedule = queueclass.appendleft
+    else:
+        packetReady = _packetReady
+        pullPacket = _pullPacket
+        reschedule = queueclass.appendleft
+    
+    # forward queue functions
+    # no packet inspection needed
+    fpacketReady = bool
+    fpullPacket = queueclass.popleft
+    freschedule = queueclass.appendleft
+    
+    tunfd = tun.fileno()
+    os_read = os.read
+    os_write = os.write
+    
+    tget = time.time
+    maxbwfree = bwfree = 1500 * tunqueue
+    lastbwtime = tget()
+    
+    remoteok = True
+    
+    
+    while not TERMINATE:
+        # The SUSPEND flag has been set. This means we need to wait on
+        # the SUSPEND condition until it is released.
+        while SUSPEND and not TERMINATE:
+            time.sleep(0.5)
+
+        wset = []
+        if packetReady(bkbuf):
+            wset.append(tun)
+        if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
+            wset.append(remote)
+        
+        rset = []
+        if len(fwbuf) < maxfwbuf:
+            rset.append(tun)
+        if remoteok and len(bkbuf) < maxbkbuf:
+            rset.append(remote)
+        
+        if remoteok:
+            eset = (tun,remote)
+        else:
+            eset = (tun,)
+        
+        try:
+            rdrdy, wrdy, errs = select(rset,wset,eset,1)
+        except selecterror, e:
+            if e.args[0] == errno.EINTR:
+                # just retry
+                continue
+            else:
+                traceback.print_exc(file=sys.stderr)
+                # If the SUSPEND flag has been set, then the TUN will be in a bad
+                # state and the select error should be ignores.
+                if SUSPEND:
+                    continue
+                else:
+                    raise
+
+        # check for errors
+        if errs:
+            if reconnect is not None and remote in errs and tun not in errs:
+                remote = reconnect()
+                if hasattr(remote, 'fileno'):
+                    remote_fd = remote.fileno()
+            elif udp and remote in errs and tun not in errs:
+                # In UDP mode, those are always transient errors
+                # Usually, an error will imply a read-ready socket
+                # that will raise an "Connection refused" error, so
+                # disable read-readiness just for now, and retry
+                # the select
+                remoteok = False
+                continue
+            else:
+                break
+        else:
+            remoteok = True
+        
+        # check to see if we can write
+        #rr = wr = rt = wt = 0
+        if remote in wrdy:
+            sent = 0
+            try:
+                try:
+                    for x in xrange(maxbatch):
+                        packet = pullPacket(fwbuf)
+
+                        if crypto_mode:
+                            packet = encrypt_(packet, crypter)
+                        
+                        sentnow = rwrite(remote, packet)
+                        sent += sentnow
+                        #wr += 1
+                        
+                        if not udp and 0 <= sentnow < len(packet):
+                            # packet partially sent
+                            # reschedule the remaining part
+                            # this doesn't happen ever in udp mode
+                            freschedule(fwbuf, buffer(packet,sentnow))
+                        
+                        if not rnonblock or not fpacketReady(fwbuf):
+                            break
+                except OSError,e:
+                    # This except handles the entire While block on PURPOSE
+                    # as an optimization (setting a try/except block is expensive)
+                    # The only operation that can raise this exception is rwrite
+                    if e.errno in retrycodes:
+                        # re-schedule packet
+                        freschedule(fwbuf, packet)
+                    else:
+                        raise
+            except:
+                if reconnect is not None:
+                    # in UDP mode, sometimes connected sockets can return a connection refused.
+                    # Give the caller a chance to reconnect
+                    remote = reconnect()
+                    if hasattr(remote, 'fileno'):
+                        remote_fd = remote.fileno()
+                elif not udp:
+                    # in UDP mode, we ignore errors - packet loss man...
+                    raise
+                #traceback.print_exc(file=sys.stderr)
+            
+            if bwlimit:
+                bwfree -= sent
+        if tun in wrdy:
+            try:
+                for x in xrange(maxtbatch):
+                    packet = pullPacket(bkbuf)
+                    twrite(tunfd, packet)
+                    #wt += 1
+                    
+                    # Do not inject packets into the TUN faster than they arrive, unless we're falling
+                    # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
+                    # don't block either (they're always ready to write), so if we flood the device 
+                    # we'll have high packet loss.
+                    if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
+                        break
+                else:
+                    if slowlocal:
+                        # Give some time for the kernel to process the packets
+                        time.sleep(0)
+            except OSError,e:
+                # This except handles the entire While block on PURPOSE
+                # as an optimization (setting a try/except block is expensive)
+                # The only operation that can raise this exception is os_write
+                if e.errno in retrycodes:
+                    # re-schedule packet
+                    reschedule(bkbuf, packet)
+                else:
+                    raise
+        
+        # check incoming data packets
+        if tun in rdrdy:
+            try:
+                for x in xrange(maxbatch):
+                    packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
+                    if not packet:
+                        continue
+                    #rt += 1
+                    fwbuf.append(packet)
+                    
+                    if not tnonblock or len(fwbuf) >= maxfwbuf:
+                        break
+            except OSError,e:
+                # This except handles the entire While block on PURPOSE
+                # as an optimization (setting a try/except block is expensive)
+                # The only operation that can raise this exception is os_read
+                if e.errno not in retrycodes:
+                    raise
+        if remote in rdrdy:
+            try:
+                try:
+                    for x in xrange(maxbatch):
+                        packet = rread(remote,2000)
+                        
+                        #rr += 1
+                        
+                        if crypto_mode:
+                            packet = decrypt_(packet, crypter)
+                            if not packet:
+                                continue
+                        elif not packet:
+                            if not udp and packet == "":
+                                # Connection broken, try to reconnect (or just die)
+                                raise RuntimeError, "Connection broken"
+                            else:
+                                continue
+
+                        bkbuf.append(packet)
+                        
+                        if not rnonblock or len(bkbuf) >= maxbkbuf:
+                            break
+                except OSError,e:
+                    # This except handles the entire While block on PURPOSE
+                    # as an optimization (setting a try/except block is expensive)
+                    # The only operation that can raise this exception is rread
+                    if e.errno not in retrycodes:
+                        raise
+            except Exception, e:
+                if reconnect is not None:
+                    # in UDP mode, sometimes connected sockets can return a connection refused
+                    # on read. Give the caller a chance to reconnect
+                    remote = reconnect()
+                    if hasattr(remote, 'fileno'):
+                        remote_fd = remote.fileno()
+                elif not udp:
+                    # in UDP mode, we ignore errors - packet loss man...
+                    raise
+                traceback.print_exc(file=sys.stderr)
+
+        if bwlimit:
+            tnow = tget()
+            delta = tnow - lastbwtime
+            if delta > 0.001:
+                delta = int(bwlimit * delta)
+                if delta > 0:
+                    bwfree = min(bwfree+delta, maxbwfree)
+                    lastbwtime = tnow
+        
+        #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
+
+def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+    rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+    retrydelay = 1.0
+    for i in xrange(30):
+        # TERMINATE is a array. An item can be added to TERMINATE, from
+        # outside this function to force termination of the loop
+        if TERMINATE:
+            raise OSError, "Killed"
+        try:
+            rsock.bind((local_addr, local_port))
+            break
+        except socket.error:
+            # wait a while, retry
+            print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+            time.sleep(min(30.0,retrydelay))
+            retrydelay *= 1.1
+    else:
+        rsock.bind((local_addr, local_port))
+    print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
+    print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
+    rsock.connect((peer_addr, peer_port))
+    return rsock
+
+def udp_handshake(TERMINATE, rsock):
+    endme = False
+    def keepalive():
+        while not endme and not TERMINATE:
+            try:
+                rsock.send('')
+            except:
+                pass
+            time.sleep(1)
+        try:
+            rsock.send('')
+        except:
+            pass
+    keepalive_thread = threading.Thread(target=keepalive)
+    keepalive_thread.start()
+    for i in xrange(900):
+        if TERMINATE:
+            raise OSError, "Killed"
+        try:
+            heartbeat = rsock.recv(10)
+            break
+        except:
+            time.sleep(1)
+    else:
+        heartbeat = rsock.recv(10)
+    endme = True
+    keepalive_thread.join()
+
+def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+    rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
+            peer_port)
+    udp_handshake(TERMINATE, rsock)
+    return rsock 
+
+def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
+    sock = None
+    retrydelay = 1.0
+    # The peer has a firewall that prevents a response to the connect, we 
+    # will be forever blocked in the connect, so we put a reasonable timeout.
+    rsock.settimeout(10) 
+    # We wait for 
+    for i in xrange(30):
+        if stop:
+            break
+        if TERMINATE:
+            raise OSError, "Killed"
+        try:
+            rsock.connect((peer_addr, peer_port))
+            sock = rsock
+            break
+        except socket.error:
+            # wait a while, retry
+            print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+            time.sleep(min(30.0,retrydelay))
+            retrydelay *= 1.1
+    else:
+        rsock.connect((peer_addr, peer_port))
+        sock = rsock
+    if sock:
+        print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
+        sock.settimeout(0) 
+        
+        print >>sys.stderr, "tcp_connect: disabling NAGLE"
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+    return sock
+
+def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
+    sock = None
+    retrydelay = 1.0
+    # We try to bind to the local virtual interface. 
+    # It might not exist yet so we wait in a loop.
+    for i in xrange(30):
+        if stop:
+            break
+        if TERMINATE:
+            raise OSError, "Killed"
+        try:
+            lsock.bind((local_addr, local_port))
+            break
+        except socket.error:
+            # wait a while, retry
+            print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+            time.sleep(min(30.0,retrydelay))
+            retrydelay *= 1.1
+    else:
+        lsock.bind((local_addr, local_port))
+
+    print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
+    # Now we wait until the other side connects. 
+    # The other side might not be ready yet, so we also wait in a loop for timeouts.
+    timeout = 1
+    lsock.listen(1)
+    for i in xrange(30):
+        if TERMINATE:
+            raise OSError, "Killed"
+        rlist, wlist, xlist = select.select([lsock], [], [], timeout)
+        if stop:
+            break
+        if lsock in rlist:
+            sock,raddr = lsock.accept()
+            print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
+            break
+        timeout += 5
+    return sock
+
+def tcp_handshake(rsock, listen, hand):
+    # we are going to use a barrier algorithm to decide wich side listen.
+    # each side will "roll a dice" and send the resulting value to the other 
+    # side. 
+    win = False
+    rsock.settimeout(10)
+    try:
+        rsock.send(hand)
+        peer_hand = rsock.recv(4)
+        if not peer_hand:
+            print >>sys.stderr, "tcp_handshake: connection reset by peer"
+            return False
+        else:
+            print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand)
+        if hand < peer_hand:
+            if listen:
+                win = True
+        elif hand > peer_hand:
+            if not listen:
+                win = True
+    finally:
+        rsock.settimeout(0)
+    return win
+
+def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+    def listen(stop, hand, lsock, lresult):
+        win = False
+        rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
+        if rsock:
+            win = tcp_handshake(rsock, True, hand)
+            stop.append(True)
+        lresult.append((win, rsock))
+
+    def connect(stop, hand, rsock, rresult):
+        win = False
+        rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
+        if rsock:
+            win = tcp_handshake(rsock, False, hand)
+            stop.append(True)
+        rresult.append((win, rsock))
+  
+    end = False
+    sock = None
+    for i in xrange(0, 50):
+        if end:
+            break
+        if TERMINATE:
+            raise OSError, "Killed"
+        hand = struct.pack("!L", random.randint(0, 2**30))
+        stop = []
+        lresult = []
+        rresult = []
+        lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
+        connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
+        connect_thread.start()
+        listen_thread.start()
+        connect_thread.join()
+        listen_thread.join()
+        (lwin, lrsock) = lresult[0]
+        (rwin, rrsock) = rresult[0]
+        if not lrsock or not rrsock:
+            if not lrsock:
+                sock = rrsock
+            if not rrsock:
+                sock = lrsock
+            end = True
+        # both socket are connected
+        else:
+           if lwin:
+                sock = lrsock
+                end = True
+           elif rwin: 
+                sock = rrsock
+                end = True
+
+    if not sock:
+        raise OSError, "Error: tcp_establish could not establish connection."
+    return sock
+
+
index 493c899..8b975ac 100644 (file)
@@ -334,7 +334,8 @@ class LinuxApplication(ResourceManager):
 
             self.node.upload_command(command, 
                     shfile = shfile,
-                    env = env)
+                    env = env,
+                    overwrite = False)
 
     def execute_deploy_command(self, command):
         if command:
diff --git a/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp b/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp
deleted file mode 100644 (file)
index 9f6e2d6..0000000
Binary files a/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp and /dev/null differ
index 758cfc4..998ed53 100644 (file)
 
 import base64
 import errno
-import vsys
+import passfd
 import socket
+import vsys
 from optparse import OptionParser, SUPPRESS_HELP
 
 # TODO: GRE OPTION!! CONFIGURE THE VIF-UP IN GRE MODE!!
 
 STOP_MSG = "STOP"
+PASSFD_MSG = "PASSFD"
 
 def create_socket(socket_name):
     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -52,8 +54,18 @@ def recv_msg(conn):
             break
 
     msg = ''.join(msg).split('\n')[0]
-    decoded = base64.b64decode(msg)
-    return decoded.rstrip()
+    # The message might have arguments that will be appended
+    # as a '|' separated list after the message type
+    args = msg.split("|")
+    msg = args.pop(0)
+
+    dmsg = base64.b64decode(msg)
+    dargs = []
+    for arg in args:
+        darg = base64.b64decode(arg)
+        dargs.append(darg.rstrip())
+
+    return (dmsg.rstrip(), dargs)
 
 def send_reply(conn, reply):
     encoded = base64.b64encode(reply)
@@ -62,8 +74,13 @@ def send_reply(conn, reply):
 def stop_action():
     return "STOP-ACK"
 
-def reply_action(msg):
-    return "Reply to: %s" % msg
+def passfd_action(fd, args):
+    address = args.pop(0)
+    print address
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    sock.connect(address)
+    passfd.sendfd(sock, fd, '0')
+    return "PASSFD-ACK"
 
 def get_options():
     usage = ("usage: %prog -t <vif-type> -a <ip4-address> -n <net-prefix> "
@@ -135,7 +152,7 @@ if __name__ == '__main__':
 
         while not stop:
             try:
-                msg = recv_msg(conn)
+                (msg, args) = recv_msg(conn)
             except socket.timeout, e:
                 # Ingore time-out
                 continue
@@ -147,8 +164,8 @@ if __name__ == '__main__':
             if msg == STOP_MSG:
                 stop = True
                 reply = stop_action()
-            else:
-                reply = reply_action(msg)
+            elif msg == PASSFD_MSG:
+                reply = passfd_action(fd, args)
 
             try:
                 send_reply(conn, reply)
diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py b/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py
new file mode 100644 (file)
index 0000000..98fa923
--- /dev/null
@@ -0,0 +1,70 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import errno
+import passfd
+import vsys
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+PASSFD_MSG = "PASSFD"
+
+def get_options():
+    usage = ("usage: %prog -S <socket-name>")
+    
+    parser = OptionParser(usage = usage)
+
+    parser.add_option("-S", "--socket-name", dest="socket_name",
+        help = "Name for the unix socket used to interact with this process", 
+        default = "tap.sock", type="str")
+
+    (options, args) = parser.parse_args()
+    
+    return (options.socket_name)
+
+if __name__ == '__main__':
+
+    (socket_name) = get_options()
+
+    # Socket to recive the file descriptor
+    fdsock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    fdsock.bind("")
+    address = fdsock.getsockname()
+
+    # vif-create-socket to send the PASSFD message
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.connect(socket_name)
+    emsg = base64.b64encode(PASSFD_MSG)
+    eargs = base64.b64encode(address)
+    encoded = "%s|%s\n" % (emsg, eargs)
+    sock.send(encoded)
+
+    # Receive fd
+    (fd, msg) = passfd.recvfd(fdsock)
+    
+    # Receive reply
+    reply = sock.recv(1024)
+    reply = base64.b64decode(reply)
+
+    print reply, fd
+
+
+
+
index f4afcf6..54d1863 100644 (file)
@@ -28,8 +28,6 @@ import os
 import time
 
 # TODO: - routes!!!
-#       - Instead of doing an infinite loop, open a port for communication allowing
-#           to pass the fd to another process
 
 PYTHON_VSYS_VERSION = "1.0"
 
@@ -89,50 +87,36 @@ class PlanetlabTap(LinuxApplication):
         return None
 
     def upload_sources(self):
-        depends = "mercurial make gcc"
-        self.set("depends", depends)
+        # upload vif-creation python script
+        pl_vif_create = os.path.join(os.path.dirname(__file__), "scripts",
+                "pl-vif-create.py")
 
-        install = ( " ( "
-                    "   python -c 'import vsys, os;  vsys.__version__ == \"%(version)s\" or os._exit(1)' "
-                    " ) "
-                    " ||"
-                    " ( "
-                    "   cd ${SRC} ; "
-                    "   hg clone http://nepi.inria.fr/code/python-vsys ; "
-                    "   cd python-vsys ; "
-                    "   make all ; "
-                    "   sudo -S make install "
-                    " )" ) % ({
-                        "version": PYTHON_VSYS_VERSION
-                        })
+        self.node.upload(pl_vif_create,
+                os.path.join(self.app_home, "pl-vif-create.py"),
+                overwrite = False)
 
-        self.set("install", install)
+        # upload vif-stop python script
+        pl_vif_stop = os.path.join(os.path.dirname(__file__), "scripts",
+                "pl-vif-stop.py")
 
-    def upload_start_command(self):
-        # upload tap-creation python script
-        pl_tap_create = os.path.join(os.path.dirname(__file__), "scripts",
-                "pl-tap-create.py")
-        self.node.upload(pl_tap_create,
-                os.path.join(self.app_home, "pl-vif-create.py"),
+        self.node.upload(pl_vif_stop,
+                os.path.join(self.app_home, "pl-vif-stop.py"),
                 overwrite = False)
 
-        # upload start.sh
-        start_command = self.replace_paths(self._start_command)
-        
-        self.info("Uploading command '%s'" % start_command)
-        
-        self.set("command", start_command)
+        # upload vif-connect python script
+        pl_vif_connect = os.path.join(os.path.dirname(__file__), "scripts",
+                "pl-vif-tunconnect.py")
 
-        self.node.upload(start_command,
-                os.path.join(self.app_home, "start.sh"),
-                text = True, 
+        self.node.upload(pl_vif_connect,
+                os.path.join(self.app_home, "pl-vif-connect.py"),
                 overwrite = False)
 
-        # upload tap-stop python script
-        pl_tap_stop = os.path.join(os.path.dirname(__file__), "scripts",
-                "pl-tap-stop.py")
-        self.node.upload(pl_tap_stop,
-                os.path.join(self.app_home, "pl-vif-stop.py"),
+        # upload tun-connect python script
+        tunchannel = os.path.join(os.path.dirname(__file__), "..", "all", "scripts",
+                "tunchannel.py")
+
+        self.node.upload(tunchannel,
+                os.path.join(self.app_home, "tunchannel.py"),
                 overwrite = False)
 
         # upload stop.sh script
@@ -142,6 +126,9 @@ class PlanetlabTap(LinuxApplication):
                 text = True, 
                 overwrite = False)
 
+    def upload_start_command(self):
+        super(PlanetlabTap, self).upload_start_command()
+
         # We want to make sure the device is up and running
         # before the deploy finishes (so things will be ready
         # before other stuff starts running).
@@ -158,6 +145,14 @@ class PlanetlabTap(LinuxApplication):
         if not self.node or self.node.state < ResourceState.PROVISIONED:
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
+            if not self.get("command"):
+                self.set("command", self._start_command)
+
+            if not self.get("depends"):
+                self.set("depends", self._dependencies)
+
+            if not self.get("install"):
+                self.set("install", self._install)
 
             try:
                 self.discover()
@@ -273,6 +268,38 @@ class PlanetlabTap(LinuxApplication):
     def sock_name(self):
         return os.path.join(self.run_home, "tap.sock")
 
+    @property
+    def _dependencies(self):
+        return "mercurial make gcc"
+
+    @property
+    def _install(self):
+        install_vsys = ( " ( "
+                    "   python -c 'import vsys, os;  vsys.__version__ == \"%(version)s\" or os._exit(1)' "
+                    " ) "
+                    " || "
+                    " ( "
+                    "   cd ${SRC} ; "
+                    "   hg clone http://nepi.inria.fr/code/python-vsys ; "
+                    "   cd python-vsys ; "
+                    "   make all ; "
+                    "   sudo -S make install "
+                    " )" ) % ({
+                        "version": PYTHON_VSYS_VERSION
+                        })
+
+        install_passfd = ( " ( python -c 'import passfd' ) "
+                    " || "
+                    " ( "
+                    "   cd ${SRC} ; "
+                    "   hg clone http://nepi.inria.fr/code/python-passfd ; "
+                    "   cd python-passfd ; "
+                    "   make all ; "
+                    "   sudo -S make install "
+                    " )" )
+
+        return "%s ; %s" % ( install_vsys, install_passfd )
+
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
index 365ca08..091c43e 100755 (executable)
@@ -125,6 +125,7 @@ class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
         from nepi.execution.resource import ResourceFactory
 
+        ResourceFactory._resource_types = dict()
         ResourceFactory.register_type(MyResource)
         ResourceFactory.register_type(AnotherResource)
 
@@ -138,6 +139,11 @@ class ResourceFactoryTestCase(unittest.TestCase):
         self.assertEquals(len(AnotherResource._attributes), 0)
 
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
+        
+        # restore factory state for other tests
+        from nepi.execution.resource import populate_factory
+        ResourceFactory._resource_types = dict()
+        populate_factory()
 
 class ResourceManagerTestCase(unittest.TestCase):
     def test_register_condition(self):
old mode 100644 (file)
new mode 100755 (executable)
index f08a54c..67fa147 100755 (executable)
@@ -28,7 +28,7 @@ import unittest
 
 class PlanetlabTapTestCase(unittest.TestCase):
     def setUp(self):
-        self.host = "nepi2.pl.sophia.inria.fr"
+        self.host = "nepi5.pl.sophia.inria.fr"
         self.user = "inria_nepi"
 
     @skipIfNotAlive