From dadecc92b7cd8fb8732b53f6fbbc7bd1519c4ff6 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sun, 14 Jul 2013 22:39:44 -0700 Subject: [PATCH] Adding tuncahnnel and pl-vif-tunnconnect scripts --- setup.py | 5 +- src/nepi/resources/all/scripts/tunchannel.py | 792 ++++++++++++++++++ src/nepi/resources/linux/application.py | 3 +- .../planetlab/scripts/.pl-tap-create.py.swp | Bin 20480 -> 0 bytes .../planetlab/scripts/pl-vif-create.py | 33 +- .../planetlab/scripts/pl-vif-tunconnect.py | 70 ++ src/nepi/resources/planetlab/tap.py | 103 ++- test/execution/resource.py | 6 + test/resources/linux/nping.py | 0 test/resources/planetlab/tap.py | 2 +- 10 files changed, 965 insertions(+), 49 deletions(-) create mode 100644 src/nepi/resources/all/scripts/tunchannel.py delete mode 100644 src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp create mode 100644 src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py mode change 100644 => 100755 test/resources/linux/nping.py diff --git a/setup.py b/setup.py index bebb10aa..407e8422 100755 --- a/setup.py +++ b/setup.py @@ -26,5 +26,8 @@ setup( "nepi.resources.planetlab", "nepi.util"], package_dir = {"": "src"}, - package_data = {"nepi.resources.planetlab" : [ "scripts/*.py" ]} + package_data = { + "nepi.resources.planetlab" : [ "scripts/*.py" ], + "nepi.resources.all" : [ "scripts/*.py" ] + } ) diff --git a/src/nepi/resources/all/scripts/tunchannel.py b/src/nepi/resources/all/scripts/tunchannel.py new file mode 100644 index 00000000..cd30b8b8 --- /dev/null +++ b/src/nepi/resources/all/scripts/tunchannel.py @@ -0,0 +1,792 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Claudio Freire +# + + +import select +import sys +import os +import struct +import socket +import threading +import traceback +import errno +import fcntl +import random +import traceback +import functools +import collections +import ctypes +import time + +def ipfmt(ip): + ipbytes = map(ord,ip.decode("hex")) + return '.'.join(map(str,ipbytes)) + +tagtype = { + '0806' : 'arp', + '0800' : 'ipv4', + '8870' : 'jumbo', + '8863' : 'PPPoE discover', + '8864' : 'PPPoE', + '86dd' : 'ipv6', +} + +def etherProto(packet, len=len): + if len(packet) > 14: + if packet[12] == "\x81" and packet[13] == "\x00": + # tagged + return packet[16:18] + else: + # untagged + return packet[12:14] + # default: ip + return "\x08\x00" + +def formatPacket(packet, ether_mode): + if ether_mode: + stripped_packet = etherStrip(packet) + if not stripped_packet: + packet = packet.encode("hex") + if len(packet) < 28: + return "malformed eth " + packet.encode("hex") + else: + if packet[24:28] == "8100": + # tagged + ethertype = tagtype.get(packet[32:36], 'eth') + return ethertype + " " + ( '-'.join( ( + packet[0:12], # MAC dest + packet[12:24], # MAC src + packet[24:32], # VLAN tag + packet[32:36], # Ethertype/len + packet[36:], # Payload + ) ) ) + else: + # untagged + ethertype = tagtype.get(packet[24:28], 'eth') + return ethertype + " " + ( '-'.join( ( + packet[0:12], # MAC dest + packet[12:24], # MAC src + packet[24:28], # Ethertype/len + packet[28:], # Payload + ) ) ) + else: + packet = stripped_packet + packet = packet.encode("hex") + if len(packet) < 48: + return "malformed ip " + packet + else: + return "ip " + ( '-'.join( ( + packet[0:1], #version + packet[1:2], #header length + packet[2:4], #diffserv/ECN + packet[4:8], #total length + packet[8:12], #ident + packet[12:16], #flags/fragment offs + packet[16:18], #ttl + packet[18:20], #ip-proto + packet[20:24], #checksum + ipfmt(packet[24:32]), # src-ip + ipfmt(packet[32:40]), # dst-ip + packet[40:48] if (int(packet[1],16) > 5) else "", # options + packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload + ) ) ) + +def _packetReady(buf, ether_mode=False, len=len, str=str): + if not buf: + return False + + rv = False + while not rv: + if len(buf[0]) < 4: + rv = False + elif ether_mode: + rv = True + else: + _,totallen = struct.unpack('HH',buf[0][:4]) + totallen = socket.htons(totallen) + rv = len(buf[0]) >= totallen + if not rv and len(buf) > 1: + # collapse only first two buffers + # as needed, to mantain len(buf) meaningful + p1 = buf.popleft() + buf[0] = p1+str(buf[0]) + else: + return rv + return rv + +def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer): + if ether_mode: + return buf.popleft() + else: + _,totallen = struct.unpack('HH',buf[0][:4]) + totallen = socket.htons(totallen) + if len(buf[0]) > totallen: + rv = buffer(buf[0],0,totallen) + buf[0] = buffer(buf[0],totallen) + else: + rv = buf.popleft() + return rv + +def etherStrip(buf, buffer=buffer, len=len): + if len(buf) < 14: + return "" + if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00': + # tagged ethernet frame + return buffer(buf, 18) + elif buf[12:14] == '\x08\x00': + # untagged ethernet frame + return buffer(buf, 14) + else: + return "" + +def etherWrap(packet): + return ''.join(( + "\x00"*6*2 # bogus src and dst mac + +"\x08\x00", # IPv4 + packet, # payload + "\x00"*4, # bogus crc + )) + +def piStrip(buf, len=len): + if len(buf) < 4: + return buf + else: + return buffer(buf,4) + +def piWrap(buf, ether_mode, etherProto=etherProto): + if ether_mode: + proto = etherProto(buf) + else: + proto = "\x08\x00" + return ''.join(( + "\x00\x00", # PI: 16 bits flags + proto, # 16 bits proto + buf, + )) + +_padmap = [ chr(padding) * padding for padding in xrange(127) ] +del padding + +def encrypt(packet, crypter, len=len, padmap=_padmap): + # pad + padding = crypter.block_size - len(packet) % crypter.block_size + packet += padmap[padding] + + # encrypt + return crypter.encrypt(packet) + +def decrypt(packet, crypter, ord=ord): + if packet: + # decrypt + packet = crypter.decrypt(packet) + + # un-pad + padding = ord(packet[-1]) + if not (0 < padding <= crypter.block_size): + # wrong padding + raise RuntimeError, "Truncated packet %s" + packet = packet[:-padding] + + return packet + +def nonblock(fd): + try: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fl |= os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, fl) + return True + except: + traceback.print_exc(file=sys.stderr) + # Just ignore + return False + +def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND, + stderr = sys.stderr, reconnect = None, rwrite = None, rread = None, + tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, + accept_remote = None, slowlocal = True, queueclass = None, + bwlimit = None, len = len, max = max, min = min, buffer = buffer, + OSError = OSError, select = select.select, selecterror = select.error, + os = os, socket = socket, + retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): + crypto_mode = False + crypter = None + + try: + if cipher_key and cipher: + import Crypto.Cipher + import hashlib + __import__('Crypto.Cipher.'+cipher) + + ciphername = cipher + cipher = getattr(Crypto.Cipher, cipher) + hashed_key = hashlib.sha256(cipher_key).digest() + + if ciphername == 'AES': + hashed_key = hashed_key[:16] + elif ciphername == 'Blowfish': + hashed_key = hashed_key[:24] + elif ciphername == 'DES': + hashed_key = hashed_key[:8] + elif ciphername == 'DES3': + hashed_key = hashed_key[:24] + + crypter = cipher.new( + hashed_key, + cipher.MODE_ECB) + crypto_mode = True + except: + # We don't want decription to work only on one side, + # This could break things really bad + #crypto_mode = False + #crypter = None + traceback.print_exc(file=sys.stderr) + raise + + if stderr is not None: + if crypto_mode: + print >>stderr, "Packets are transmitted in CIPHER" + else: + print >>stderr, "Packets are transmitted in PLAINTEXT" + + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + if rwrite is None: + def rwrite(remote, packet, os_write=os.write): + return os_write(remote_fd, packet) + if rread is None: + def rread(remote, maxlen, os_read=os.read): + return os_read(remote_fd, maxlen) + + rnonblock = nonblock(remote) + tnonblock = nonblock(tun) + + # Pick up TUN/TAP writing method + if with_pi: + try: + import iovec + + # We have iovec, so we can skip PI injection + # and use iovec which does it natively + if ether_mode: + twrite = iovec.ethpiwrite + tread = iovec.piread2 + else: + twrite = iovec.ippiwrite + tread = iovec.piread2 + except ImportError: + # We have to inject PI headers pythonically + def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode): + return oswrite(fd, piWrap(packet, ether_mode)) + + # For reading, we strip PI headers with buffer slicing and that's it + def tread(fd, maxlen, osread=os.read, piStrip=piStrip): + return piStrip(osread(fd, maxlen)) + else: + # No need to inject PI headers + twrite = os.write + tread = os.read + + encrypt_ = encrypt + decrypt_ = decrypt + xrange_ = xrange + + if accept_local is not None: + def tread(fd, maxlen, _tread=tread, accept=accept_local): + packet = _tread(fd, maxlen) + if accept(packet, 0): + return packet + else: + return None + + if accept_remote is not None: + if crypto_mode: + def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote): + packet = decrypt_(packet, crypter) + if accept(packet, 1): + return packet + else: + return None + else: + def rread(fd, maxlen, _rread=rread, accept=accept_remote): + packet = _rread(fd, maxlen) + if accept(packet, 1): + return packet + else: + return None + + maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue) + tunhurry = max(0,maxbkbuf/2) + + if queueclass is None: + queueclass = collections.deque + maxbatch = 2000 + maxtbatch = 50 + else: + maxfwbuf = maxbkbuf = 2000000000 + maxbatch = 50 + maxtbatch = 30 + tunhurry = 30 + + fwbuf = queueclass() + bkbuf = queueclass() + nfwbuf = 0 + nbkbuf = 0 + + # backwards queue functions + # they may need packet inspection to + # reconstruct packet boundaries + if ether_mode or udp: + packetReady = bool + pullPacket = queueclass.popleft + reschedule = queueclass.appendleft + else: + packetReady = _packetReady + pullPacket = _pullPacket + reschedule = queueclass.appendleft + + # forward queue functions + # no packet inspection needed + fpacketReady = bool + fpullPacket = queueclass.popleft + freschedule = queueclass.appendleft + + tunfd = tun.fileno() + os_read = os.read + os_write = os.write + + tget = time.time + maxbwfree = bwfree = 1500 * tunqueue + lastbwtime = tget() + + remoteok = True + + + while not TERMINATE: + # The SUSPEND flag has been set. This means we need to wait on + # the SUSPEND condition until it is released. + while SUSPEND and not TERMINATE: + time.sleep(0.5) + + wset = [] + if packetReady(bkbuf): + wset.append(tun) + if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0): + wset.append(remote) + + rset = [] + if len(fwbuf) < maxfwbuf: + rset.append(tun) + if remoteok and len(bkbuf) < maxbkbuf: + rset.append(remote) + + if remoteok: + eset = (tun,remote) + else: + eset = (tun,) + + try: + rdrdy, wrdy, errs = select(rset,wset,eset,1) + except selecterror, e: + if e.args[0] == errno.EINTR: + # just retry + continue + else: + traceback.print_exc(file=sys.stderr) + # If the SUSPEND flag has been set, then the TUN will be in a bad + # state and the select error should be ignores. + if SUSPEND: + continue + else: + raise + + # check for errors + if errs: + if reconnect is not None and remote in errs and tun not in errs: + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif udp and remote in errs and tun not in errs: + # In UDP mode, those are always transient errors + # Usually, an error will imply a read-ready socket + # that will raise an "Connection refused" error, so + # disable read-readiness just for now, and retry + # the select + remoteok = False + continue + else: + break + else: + remoteok = True + + # check to see if we can write + #rr = wr = rt = wt = 0 + if remote in wrdy: + sent = 0 + try: + try: + for x in xrange(maxbatch): + packet = pullPacket(fwbuf) + + if crypto_mode: + packet = encrypt_(packet, crypter) + + sentnow = rwrite(remote, packet) + sent += sentnow + #wr += 1 + + if not udp and 0 <= sentnow < len(packet): + # packet partially sent + # reschedule the remaining part + # this doesn't happen ever in udp mode + freschedule(fwbuf, buffer(packet,sentnow)) + + if not rnonblock or not fpacketReady(fwbuf): + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is rwrite + if e.errno in retrycodes: + # re-schedule packet + freschedule(fwbuf, packet) + else: + raise + except: + if reconnect is not None: + # in UDP mode, sometimes connected sockets can return a connection refused. + # Give the caller a chance to reconnect + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif not udp: + # in UDP mode, we ignore errors - packet loss man... + raise + #traceback.print_exc(file=sys.stderr) + + if bwlimit: + bwfree -= sent + if tun in wrdy: + try: + for x in xrange(maxtbatch): + packet = pullPacket(bkbuf) + twrite(tunfd, packet) + #wt += 1 + + # Do not inject packets into the TUN faster than they arrive, unless we're falling + # behind. TUN devices discard packets if their queue is full (tunkqueue), but they + # don't block either (they're always ready to write), so if we flood the device + # we'll have high packet loss. + if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf): + break + else: + if slowlocal: + # Give some time for the kernel to process the packets + time.sleep(0) + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is os_write + if e.errno in retrycodes: + # re-schedule packet + reschedule(bkbuf, packet) + else: + raise + + # check incoming data packets + if tun in rdrdy: + try: + for x in xrange(maxbatch): + packet = tread(tunfd,2000) # tun.read blocks until it gets 2k! + if not packet: + continue + #rt += 1 + fwbuf.append(packet) + + if not tnonblock or len(fwbuf) >= maxfwbuf: + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is os_read + if e.errno not in retrycodes: + raise + if remote in rdrdy: + try: + try: + for x in xrange(maxbatch): + packet = rread(remote,2000) + + #rr += 1 + + if crypto_mode: + packet = decrypt_(packet, crypter) + if not packet: + continue + elif not packet: + if not udp and packet == "": + # Connection broken, try to reconnect (or just die) + raise RuntimeError, "Connection broken" + else: + continue + + bkbuf.append(packet) + + if not rnonblock or len(bkbuf) >= maxbkbuf: + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is rread + if e.errno not in retrycodes: + raise + except Exception, e: + if reconnect is not None: + # in UDP mode, sometimes connected sockets can return a connection refused + # on read. Give the caller a chance to reconnect + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif not udp: + # in UDP mode, we ignore errors - packet loss man... + raise + traceback.print_exc(file=sys.stderr) + + if bwlimit: + tnow = tget() + delta = tnow - lastbwtime + if delta > 0.001: + delta = int(bwlimit * delta) + if delta > 0: + bwfree = min(bwfree+delta, maxbwfree) + lastbwtime = tnow + + #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt) + +def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + retrydelay = 1.0 + for i in xrange(30): + # TERMINATE is a array. An item can be added to TERMINATE, from + # outside this function to force termination of the loop + if TERMINATE: + raise OSError, "Killed" + try: + rsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.bind((local_addr, local_port)) + print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port) + print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port) + rsock.connect((peer_addr, peer_port)) + return rsock + +def udp_handshake(TERMINATE, rsock): + endme = False + def keepalive(): + while not endme and not TERMINATE: + try: + rsock.send('') + except: + pass + time.sleep(1) + try: + rsock.send('') + except: + pass + keepalive_thread = threading.Thread(target=keepalive) + keepalive_thread.start() + for i in xrange(900): + if TERMINATE: + raise OSError, "Killed" + try: + heartbeat = rsock.recv(10) + break + except: + time.sleep(1) + else: + heartbeat = rsock.recv(10) + endme = True + keepalive_thread.join() + +def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr, + peer_port) + udp_handshake(TERMINATE, rsock) + return rsock + +def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port): + sock = None + retrydelay = 1.0 + # The peer has a firewall that prevents a response to the connect, we + # will be forever blocked in the connect, so we put a reasonable timeout. + rsock.settimeout(10) + # We wait for + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + rsock.connect((peer_addr, peer_port)) + sock = rsock + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.connect((peer_addr, peer_port)) + sock = rsock + if sock: + print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port) + sock.settimeout(0) + + print >>sys.stderr, "tcp_connect: disabling NAGLE" + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return sock + +def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port): + sock = None + retrydelay = 1.0 + # We try to bind to the local virtual interface. + # It might not exist yet so we wait in a loop. + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + lsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + lsock.bind((local_addr, local_port)) + + print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port) + # Now we wait until the other side connects. + # The other side might not be ready yet, so we also wait in a loop for timeouts. + timeout = 1 + lsock.listen(1) + for i in xrange(30): + if TERMINATE: + raise OSError, "Killed" + rlist, wlist, xlist = select.select([lsock], [], [], timeout) + if stop: + break + if lsock in rlist: + sock,raddr = lsock.accept() + print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port) + break + timeout += 5 + return sock + +def tcp_handshake(rsock, listen, hand): + # we are going to use a barrier algorithm to decide wich side listen. + # each side will "roll a dice" and send the resulting value to the other + # side. + win = False + rsock.settimeout(10) + try: + rsock.send(hand) + peer_hand = rsock.recv(4) + if not peer_hand: + print >>sys.stderr, "tcp_handshake: connection reset by peer" + return False + else: + print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand) + if hand < peer_hand: + if listen: + win = True + elif hand > peer_hand: + if not listen: + win = True + finally: + rsock.settimeout(0) + return win + +def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + def listen(stop, hand, lsock, lresult): + win = False + rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port) + if rsock: + win = tcp_handshake(rsock, True, hand) + stop.append(True) + lresult.append((win, rsock)) + + def connect(stop, hand, rsock, rresult): + win = False + rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port) + if rsock: + win = tcp_handshake(rsock, False, hand) + stop.append(True) + rresult.append((win, rsock)) + + end = False + sock = None + for i in xrange(0, 50): + if end: + break + if TERMINATE: + raise OSError, "Killed" + hand = struct.pack("!L", random.randint(0, 2**30)) + stop = [] + lresult = [] + rresult = [] + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult)) + connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult)) + connect_thread.start() + listen_thread.start() + connect_thread.join() + listen_thread.join() + (lwin, lrsock) = lresult[0] + (rwin, rrsock) = rresult[0] + if not lrsock or not rrsock: + if not lrsock: + sock = rrsock + if not rrsock: + sock = lrsock + end = True + # both socket are connected + else: + if lwin: + sock = lrsock + end = True + elif rwin: + sock = rrsock + end = True + + if not sock: + raise OSError, "Error: tcp_establish could not establish connection." + return sock + + diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 493c8997..8b975acb 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -334,7 +334,8 @@ class LinuxApplication(ResourceManager): self.node.upload_command(command, shfile = shfile, - env = env) + env = env, + overwrite = False) def execute_deploy_command(self, command): if command: diff --git a/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp b/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp deleted file mode 100644 index 9f6e2d644382a158450598c63a39601f8c3d0e22..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeHNS&Sn`8UC<6w~r+erx5zr!!q8;_RJnh&@i6F+4byN+4W4uo=uibFg8hUDkx-782tfh~NjP2zNtBR~2SRutfC5PHKtf1@gp?@a2suDP9N|bnLL92L{ulnoyS7(lukJ82YeE_fnm^}Q_6~Dc87&{IDTE{RoS$+;QG_ILV z=bTlqQv|0SW?OvPWVSKQord?|FmAa{gYjv*qnS)}wAE?ea12|>lAO@&T*G0SV0nAv z9NdHm5n>x$q37P0z`nhEX5_cY^iDeeyp?U$LWBqrB1DJ~Awq-*@&5~9M{zxXh%e%K z{{Ne{{Kx$FQ~rN+%lF{@{4MnYIw3-Y2oWMgh!7z{ga{ELM2P<$v?d_pz;ki${y*6N z|04xFo&p|90S~5tdJ1?;3V13B+?NFIO#&ZG0`E)$&nAHT62QF);OYeM<^+&Rcr>4l z1GgoCTay4!0(ug-ED5|V3H&Jm{4@c4GXdQ1)3`kWxCx+@01hR93lqTp1dvPszm5Y> z#epBjfrsP3$Kt@XalnWJ$Kt@Hap00Tkck7MabP44{CNU+U;?O503VnD_DldX6ToZZ zz{BIf_2a;G<3M2?cy$c;!WeM#81V5i;K&$&G2qEj;L%awYoh=k1!hKp=~3W!Bfx_r zz?~z&9V5UMBf$6w@QY#Kg<;^XVPI((c<(T6SxX#<|vFoPKA(B-*U&@um9e7+f$j zGvjq8i>N?e+DTo$F_H29>ZaCZUad*L0=cGfO0{7^_U5n#lie967mnKyhGlZED?Q)n zSe*GH>G^fX5G-TpeMEICkYzJ7D6Q*8hh=GMeJbd*s;wDjiwsjRN7ovRBuBE}DW8yP zqST>&*7yd`uNivXwF9DAGVJ|zscXi0mP{tyUm-m*{|VQgv!hY2TWlF`HiVAcW zXtlhsP_M02lpGCq+kX-V=I)RF&WI{Fzu;$lSfkI1AA30lV#7yob55!Hzs<#4U}yHOc zcZ>#`;%X4o*Gw6rwN{?eo<{opwY;>pU&g|9%Q;D2Jlrlsp2~u{E*G@wd4^>&N}o}? zrYD%mM9!nwQ|@MDIG5R#OaWhTV*rkq>) zrR`W|OMaB2`OE@qzOUQ+b3W;w#C-v>Ut}fTLC?yH4?j{#8=Z9+95Wo@YMnm6mmZO) zk|Em63CNJ;Kdw~rXW?7=T{@RrGxVIK(qlRZqXEaB_fmlt)zr15LT1L5OARWJ`u8HC zatS3{j?ravXw3c3Gc4vcxHA}mAns&)`pK9aC^13(jGhN== z+OAq#T*?&>T<-NC*JVNFpr*g{Gn<+8E`GHsi8HfdHCfZsm;XWqTGcqaXn%h7qWxZ3 zMs>bh@xhtdtgCmm2F-em1$`(fFiAyMv@K_riZY9tu5g*dj80o?(40y`&VRU9a;afD zhL+czc}!wbnoCoDJ}@J-9IcxtD(iAdnYgz^xlQTX& zpZ7=|w_M_F%k4C&t*x9iu@HqXRe_F zR9DVYO(ry>!~L$0mX;43E>>#A%gRT}wc~OSER<`NQngA8i_27`rQ&j}d|>5BahaA@ zmX{W*r94rUshH+*zv>FQvRpg7xKg8H;5VUtfdPDN zXUUSzBiF5hT(il%m5X3bmj_kPVhN3tEv?7fYF%)81?ZAF?lU(lg`$RLlEXGNBYW4( z=Cs_ySWQFU2()m(vNs%~)fSXFu$%VI?Ab@<%5u5r^;9V>m9wOg?r2@6rZ8bqS2JZ` z3MLPB%4Uus=MRoawf}!o?y-~J{y(_Ce?A2~ngZ@h0iRC+H>H3RDWH@B4yJ(jq=3;B z@M02pCJEf{p9|cP1UgCJ@+5F!(xbai(hdb*@{}g0&cT>EL8p$z#X#?dI$@H% zoQ874GKFDo;mId%w$w$1(PcTy9i%yE4*%jcOKeu{|I+~8E$@SOpqJnO&lK>76!3Bi zcrFDzkpe!K0zR7pKIG>E98Cd7Qb0BZ{5}c%I0<|&34AvRd^ri+lLT&00-s6(ZW3rG zfrCll9ZBG|1n~0&@K^$PAOU~3z#qqed&YoIi~+Zd0hKWzG6p;~3VdS}`1&Z|jskl}fjy(Zt0TZeBfw`yfKQJA z)e#^*0=zT~yf_TpJq#=l1DRo9Y8d$Y5b)O_;3q@C-9x~QL%3wR%K$K`$y8=R-jUbMwx3?;7Y<|?cx5W1L9KYbRqoVO{#wiM z5*_l~?ieC7b+tLQ`^3yOgB#NAyS+(bU0ZBWqwSh9hqSb~&|6Ax-;{QFE3etM%!*JA zHaDY&?vdEC5j^cCu(!KphAA9UTW;R#HGrcTewJ(iLnn4(<{H{rkZnvWUn*B>%bVL~ zf9sf8td`_1KFHwo_Vl~e9x@;$5s0kpEK`LuduI0T-xR5rBJAbzN{y-Uhc`6(}8D!;7C9wKFa4bFiEw=gNw6tXt_j-OSST1W#>*huvl3rA6i*1 zQSESvj+Ga3D@# -a -n " @@ -135,7 +152,7 @@ if __name__ == '__main__': while not stop: try: - msg = recv_msg(conn) + (msg, args) = recv_msg(conn) except socket.timeout, e: # Ingore time-out continue @@ -147,8 +164,8 @@ if __name__ == '__main__': if msg == STOP_MSG: stop = True reply = stop_action() - else: - reply = reply_action(msg) + elif msg == PASSFD_MSG: + reply = passfd_action(fd, args) try: send_reply(conn, reply) diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py b/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py new file mode 100644 index 00000000..98fa9232 --- /dev/null +++ b/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py @@ -0,0 +1,70 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import base64 +import errno +import passfd +import vsys +import socket +from optparse import OptionParser, SUPPRESS_HELP + +PASSFD_MSG = "PASSFD" + +def get_options(): + usage = ("usage: %prog -S ") + + parser = OptionParser(usage = usage) + + parser.add_option("-S", "--socket-name", dest="socket_name", + help = "Name for the unix socket used to interact with this process", + default = "tap.sock", type="str") + + (options, args) = parser.parse_args() + + return (options.socket_name) + +if __name__ == '__main__': + + (socket_name) = get_options() + + # Socket to recive the file descriptor + fdsock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + fdsock.bind("") + address = fdsock.getsockname() + + # vif-create-socket to send the PASSFD message + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(socket_name) + emsg = base64.b64encode(PASSFD_MSG) + eargs = base64.b64encode(address) + encoded = "%s|%s\n" % (emsg, eargs) + sock.send(encoded) + + # Receive fd + (fd, msg) = passfd.recvfd(fdsock) + + # Receive reply + reply = sock.recv(1024) + reply = base64.b64decode(reply) + + print reply, fd + + + + diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index f4afcf63..54d18637 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -28,8 +28,6 @@ import os import time # TODO: - routes!!! -# - Instead of doing an infinite loop, open a port for communication allowing -# to pass the fd to another process PYTHON_VSYS_VERSION = "1.0" @@ -89,50 +87,36 @@ class PlanetlabTap(LinuxApplication): return None def upload_sources(self): - depends = "mercurial make gcc" - self.set("depends", depends) + # upload vif-creation python script + pl_vif_create = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-create.py") - install = ( " ( " - " python -c 'import vsys, os; vsys.__version__ == \"%(version)s\" or os._exit(1)' " - " ) " - " ||" - " ( " - " cd ${SRC} ; " - " hg clone http://nepi.inria.fr/code/python-vsys ; " - " cd python-vsys ; " - " make all ; " - " sudo -S make install " - " )" ) % ({ - "version": PYTHON_VSYS_VERSION - }) + self.node.upload(pl_vif_create, + os.path.join(self.app_home, "pl-vif-create.py"), + overwrite = False) - self.set("install", install) + # upload vif-stop python script + pl_vif_stop = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-stop.py") - def upload_start_command(self): - # upload tap-creation python script - pl_tap_create = os.path.join(os.path.dirname(__file__), "scripts", - "pl-tap-create.py") - self.node.upload(pl_tap_create, - os.path.join(self.app_home, "pl-vif-create.py"), + self.node.upload(pl_vif_stop, + os.path.join(self.app_home, "pl-vif-stop.py"), overwrite = False) - # upload start.sh - start_command = self.replace_paths(self._start_command) - - self.info("Uploading command '%s'" % start_command) - - self.set("command", start_command) + # upload vif-connect python script + pl_vif_connect = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-tunconnect.py") - self.node.upload(start_command, - os.path.join(self.app_home, "start.sh"), - text = True, + self.node.upload(pl_vif_connect, + os.path.join(self.app_home, "pl-vif-connect.py"), overwrite = False) - # upload tap-stop python script - pl_tap_stop = os.path.join(os.path.dirname(__file__), "scripts", - "pl-tap-stop.py") - self.node.upload(pl_tap_stop, - os.path.join(self.app_home, "pl-vif-stop.py"), + # upload tun-connect python script + tunchannel = os.path.join(os.path.dirname(__file__), "..", "all", "scripts", + "tunchannel.py") + + self.node.upload(tunchannel, + os.path.join(self.app_home, "tunchannel.py"), overwrite = False) # upload stop.sh script @@ -142,6 +126,9 @@ class PlanetlabTap(LinuxApplication): text = True, overwrite = False) + def upload_start_command(self): + super(PlanetlabTap, self).upload_start_command() + # We want to make sure the device is up and running # before the deploy finishes (so things will be ready # before other stuff starts running). @@ -158,6 +145,14 @@ class PlanetlabTap(LinuxApplication): if not self.node or self.node.state < ResourceState.PROVISIONED: self.ec.schedule(reschedule_delay, self.deploy) else: + if not self.get("command"): + self.set("command", self._start_command) + + if not self.get("depends"): + self.set("depends", self._dependencies) + + if not self.get("install"): + self.set("install", self._install) try: self.discover() @@ -273,6 +268,38 @@ class PlanetlabTap(LinuxApplication): def sock_name(self): return os.path.join(self.run_home, "tap.sock") + @property + def _dependencies(self): + return "mercurial make gcc" + + @property + def _install(self): + install_vsys = ( " ( " + " python -c 'import vsys, os; vsys.__version__ == \"%(version)s\" or os._exit(1)' " + " ) " + " || " + " ( " + " cd ${SRC} ; " + " hg clone http://nepi.inria.fr/code/python-vsys ; " + " cd python-vsys ; " + " make all ; " + " sudo -S make install " + " )" ) % ({ + "version": PYTHON_VSYS_VERSION + }) + + install_passfd = ( " ( python -c 'import passfd' ) " + " || " + " ( " + " cd ${SRC} ; " + " hg clone http://nepi.inria.fr/code/python-passfd ; " + " cd python-passfd ; " + " make all ; " + " sudo -S make install " + " )" ) + + return "%s ; %s" % ( install_vsys, install_passfd ) + def valid_connection(self, guid): # TODO: Validate! return True diff --git a/test/execution/resource.py b/test/execution/resource.py index 365ca08c..091c43e1 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -125,6 +125,7 @@ class ResourceFactoryTestCase(unittest.TestCase): def test_add_resource_factory(self): from nepi.execution.resource import ResourceFactory + ResourceFactory._resource_types = dict() ResourceFactory.register_type(MyResource) ResourceFactory.register_type(AnotherResource) @@ -138,6 +139,11 @@ class ResourceFactoryTestCase(unittest.TestCase): self.assertEquals(len(AnotherResource._attributes), 0) self.assertEquals(len(ResourceFactory.resource_types()), 2) + + # restore factory state for other tests + from nepi.execution.resource import populate_factory + ResourceFactory._resource_types = dict() + populate_factory() class ResourceManagerTestCase(unittest.TestCase): def test_register_condition(self): diff --git a/test/resources/linux/nping.py b/test/resources/linux/nping.py old mode 100644 new mode 100755 diff --git a/test/resources/planetlab/tap.py b/test/resources/planetlab/tap.py index f08a54c9..67fa147b 100755 --- a/test/resources/planetlab/tap.py +++ b/test/resources/planetlab/tap.py @@ -28,7 +28,7 @@ import unittest class PlanetlabTapTestCase(unittest.TestCase): def setUp(self): - self.host = "nepi2.pl.sophia.inria.fr" + self.host = "nepi5.pl.sophia.inria.fr" self.user = "inria_nepi" @skipIfNotAlive -- 2.47.0