From dadecc92b7cd8fb8732b53f6fbbc7bd1519c4ff6 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sun, 14 Jul 2013 22:39:44 -0700 Subject: [PATCH] Adding tuncahnnel and pl-vif-tunnconnect scripts --- setup.py | 5 +- src/nepi/resources/all/scripts/tunchannel.py | 792 ++++++++++++++++++ src/nepi/resources/linux/application.py | 3 +- .../planetlab/scripts/.pl-tap-create.py.swp | Bin 20480 -> 0 bytes .../planetlab/scripts/pl-vif-create.py | 33 +- .../planetlab/scripts/pl-vif-tunconnect.py | 70 ++ src/nepi/resources/planetlab/tap.py | 103 ++- test/execution/resource.py | 6 + test/resources/linux/nping.py | 0 test/resources/planetlab/tap.py | 2 +- 10 files changed, 965 insertions(+), 49 deletions(-) create mode 100644 src/nepi/resources/all/scripts/tunchannel.py delete mode 100644 src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp create mode 100644 src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py mode change 100644 => 100755 test/resources/linux/nping.py diff --git a/setup.py b/setup.py index bebb10aa..407e8422 100755 --- a/setup.py +++ b/setup.py @@ -26,5 +26,8 @@ setup( "nepi.resources.planetlab", "nepi.util"], package_dir = {"": "src"}, - package_data = {"nepi.resources.planetlab" : [ "scripts/*.py" ]} + package_data = { + "nepi.resources.planetlab" : [ "scripts/*.py" ], + "nepi.resources.all" : [ "scripts/*.py" ] + } ) diff --git a/src/nepi/resources/all/scripts/tunchannel.py b/src/nepi/resources/all/scripts/tunchannel.py new file mode 100644 index 00000000..cd30b8b8 --- /dev/null +++ b/src/nepi/resources/all/scripts/tunchannel.py @@ -0,0 +1,792 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Claudio Freire +# + + +import select +import sys +import os +import struct +import socket +import threading +import traceback +import errno +import fcntl +import random +import traceback +import functools +import collections +import ctypes +import time + +def ipfmt(ip): + ipbytes = map(ord,ip.decode("hex")) + return '.'.join(map(str,ipbytes)) + +tagtype = { + '0806' : 'arp', + '0800' : 'ipv4', + '8870' : 'jumbo', + '8863' : 'PPPoE discover', + '8864' : 'PPPoE', + '86dd' : 'ipv6', +} + +def etherProto(packet, len=len): + if len(packet) > 14: + if packet[12] == "\x81" and packet[13] == "\x00": + # tagged + return packet[16:18] + else: + # untagged + return packet[12:14] + # default: ip + return "\x08\x00" + +def formatPacket(packet, ether_mode): + if ether_mode: + stripped_packet = etherStrip(packet) + if not stripped_packet: + packet = packet.encode("hex") + if len(packet) < 28: + return "malformed eth " + packet.encode("hex") + else: + if packet[24:28] == "8100": + # tagged + ethertype = tagtype.get(packet[32:36], 'eth') + return ethertype + " " + ( '-'.join( ( + packet[0:12], # MAC dest + packet[12:24], # MAC src + packet[24:32], # VLAN tag + packet[32:36], # Ethertype/len + packet[36:], # Payload + ) ) ) + else: + # untagged + ethertype = tagtype.get(packet[24:28], 'eth') + return ethertype + " " + ( '-'.join( ( + packet[0:12], # MAC dest + packet[12:24], # MAC src + packet[24:28], # Ethertype/len + packet[28:], # Payload + ) ) ) + else: + packet = stripped_packet + packet = packet.encode("hex") + if len(packet) < 48: + return "malformed ip " + packet + else: + return "ip " + ( '-'.join( ( + packet[0:1], #version + packet[1:2], #header length + packet[2:4], #diffserv/ECN + packet[4:8], #total length + packet[8:12], #ident + packet[12:16], #flags/fragment offs + packet[16:18], #ttl + packet[18:20], #ip-proto + packet[20:24], #checksum + ipfmt(packet[24:32]), # src-ip + ipfmt(packet[32:40]), # dst-ip + packet[40:48] if (int(packet[1],16) > 5) else "", # options + packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload + ) ) ) + +def _packetReady(buf, ether_mode=False, len=len, str=str): + if not buf: + return False + + rv = False + while not rv: + if len(buf[0]) < 4: + rv = False + elif ether_mode: + rv = True + else: + _,totallen = struct.unpack('HH',buf[0][:4]) + totallen = socket.htons(totallen) + rv = len(buf[0]) >= totallen + if not rv and len(buf) > 1: + # collapse only first two buffers + # as needed, to mantain len(buf) meaningful + p1 = buf.popleft() + buf[0] = p1+str(buf[0]) + else: + return rv + return rv + +def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer): + if ether_mode: + return buf.popleft() + else: + _,totallen = struct.unpack('HH',buf[0][:4]) + totallen = socket.htons(totallen) + if len(buf[0]) > totallen: + rv = buffer(buf[0],0,totallen) + buf[0] = buffer(buf[0],totallen) + else: + rv = buf.popleft() + return rv + +def etherStrip(buf, buffer=buffer, len=len): + if len(buf) < 14: + return "" + if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00': + # tagged ethernet frame + return buffer(buf, 18) + elif buf[12:14] == '\x08\x00': + # untagged ethernet frame + return buffer(buf, 14) + else: + return "" + +def etherWrap(packet): + return ''.join(( + "\x00"*6*2 # bogus src and dst mac + +"\x08\x00", # IPv4 + packet, # payload + "\x00"*4, # bogus crc + )) + +def piStrip(buf, len=len): + if len(buf) < 4: + return buf + else: + return buffer(buf,4) + +def piWrap(buf, ether_mode, etherProto=etherProto): + if ether_mode: + proto = etherProto(buf) + else: + proto = "\x08\x00" + return ''.join(( + "\x00\x00", # PI: 16 bits flags + proto, # 16 bits proto + buf, + )) + +_padmap = [ chr(padding) * padding for padding in xrange(127) ] +del padding + +def encrypt(packet, crypter, len=len, padmap=_padmap): + # pad + padding = crypter.block_size - len(packet) % crypter.block_size + packet += padmap[padding] + + # encrypt + return crypter.encrypt(packet) + +def decrypt(packet, crypter, ord=ord): + if packet: + # decrypt + packet = crypter.decrypt(packet) + + # un-pad + padding = ord(packet[-1]) + if not (0 < padding <= crypter.block_size): + # wrong padding + raise RuntimeError, "Truncated packet %s" + packet = packet[:-padding] + + return packet + +def nonblock(fd): + try: + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fl |= os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, fl) + return True + except: + traceback.print_exc(file=sys.stderr) + # Just ignore + return False + +def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND, + stderr = sys.stderr, reconnect = None, rwrite = None, rread = None, + tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, + accept_remote = None, slowlocal = True, queueclass = None, + bwlimit = None, len = len, max = max, min = min, buffer = buffer, + OSError = OSError, select = select.select, selecterror = select.error, + os = os, socket = socket, + retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): + crypto_mode = False + crypter = None + + try: + if cipher_key and cipher: + import Crypto.Cipher + import hashlib + __import__('Crypto.Cipher.'+cipher) + + ciphername = cipher + cipher = getattr(Crypto.Cipher, cipher) + hashed_key = hashlib.sha256(cipher_key).digest() + + if ciphername == 'AES': + hashed_key = hashed_key[:16] + elif ciphername == 'Blowfish': + hashed_key = hashed_key[:24] + elif ciphername == 'DES': + hashed_key = hashed_key[:8] + elif ciphername == 'DES3': + hashed_key = hashed_key[:24] + + crypter = cipher.new( + hashed_key, + cipher.MODE_ECB) + crypto_mode = True + except: + # We don't want decription to work only on one side, + # This could break things really bad + #crypto_mode = False + #crypter = None + traceback.print_exc(file=sys.stderr) + raise + + if stderr is not None: + if crypto_mode: + print >>stderr, "Packets are transmitted in CIPHER" + else: + print >>stderr, "Packets are transmitted in PLAINTEXT" + + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + if rwrite is None: + def rwrite(remote, packet, os_write=os.write): + return os_write(remote_fd, packet) + if rread is None: + def rread(remote, maxlen, os_read=os.read): + return os_read(remote_fd, maxlen) + + rnonblock = nonblock(remote) + tnonblock = nonblock(tun) + + # Pick up TUN/TAP writing method + if with_pi: + try: + import iovec + + # We have iovec, so we can skip PI injection + # and use iovec which does it natively + if ether_mode: + twrite = iovec.ethpiwrite + tread = iovec.piread2 + else: + twrite = iovec.ippiwrite + tread = iovec.piread2 + except ImportError: + # We have to inject PI headers pythonically + def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode): + return oswrite(fd, piWrap(packet, ether_mode)) + + # For reading, we strip PI headers with buffer slicing and that's it + def tread(fd, maxlen, osread=os.read, piStrip=piStrip): + return piStrip(osread(fd, maxlen)) + else: + # No need to inject PI headers + twrite = os.write + tread = os.read + + encrypt_ = encrypt + decrypt_ = decrypt + xrange_ = xrange + + if accept_local is not None: + def tread(fd, maxlen, _tread=tread, accept=accept_local): + packet = _tread(fd, maxlen) + if accept(packet, 0): + return packet + else: + return None + + if accept_remote is not None: + if crypto_mode: + def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote): + packet = decrypt_(packet, crypter) + if accept(packet, 1): + return packet + else: + return None + else: + def rread(fd, maxlen, _rread=rread, accept=accept_remote): + packet = _rread(fd, maxlen) + if accept(packet, 1): + return packet + else: + return None + + maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue) + tunhurry = max(0,maxbkbuf/2) + + if queueclass is None: + queueclass = collections.deque + maxbatch = 2000 + maxtbatch = 50 + else: + maxfwbuf = maxbkbuf = 2000000000 + maxbatch = 50 + maxtbatch = 30 + tunhurry = 30 + + fwbuf = queueclass() + bkbuf = queueclass() + nfwbuf = 0 + nbkbuf = 0 + + # backwards queue functions + # they may need packet inspection to + # reconstruct packet boundaries + if ether_mode or udp: + packetReady = bool + pullPacket = queueclass.popleft + reschedule = queueclass.appendleft + else: + packetReady = _packetReady + pullPacket = _pullPacket + reschedule = queueclass.appendleft + + # forward queue functions + # no packet inspection needed + fpacketReady = bool + fpullPacket = queueclass.popleft + freschedule = queueclass.appendleft + + tunfd = tun.fileno() + os_read = os.read + os_write = os.write + + tget = time.time + maxbwfree = bwfree = 1500 * tunqueue + lastbwtime = tget() + + remoteok = True + + + while not TERMINATE: + # The SUSPEND flag has been set. This means we need to wait on + # the SUSPEND condition until it is released. + while SUSPEND and not TERMINATE: + time.sleep(0.5) + + wset = [] + if packetReady(bkbuf): + wset.append(tun) + if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0): + wset.append(remote) + + rset = [] + if len(fwbuf) < maxfwbuf: + rset.append(tun) + if remoteok and len(bkbuf) < maxbkbuf: + rset.append(remote) + + if remoteok: + eset = (tun,remote) + else: + eset = (tun,) + + try: + rdrdy, wrdy, errs = select(rset,wset,eset,1) + except selecterror, e: + if e.args[0] == errno.EINTR: + # just retry + continue + else: + traceback.print_exc(file=sys.stderr) + # If the SUSPEND flag has been set, then the TUN will be in a bad + # state and the select error should be ignores. + if SUSPEND: + continue + else: + raise + + # check for errors + if errs: + if reconnect is not None and remote in errs and tun not in errs: + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif udp and remote in errs and tun not in errs: + # In UDP mode, those are always transient errors + # Usually, an error will imply a read-ready socket + # that will raise an "Connection refused" error, so + # disable read-readiness just for now, and retry + # the select + remoteok = False + continue + else: + break + else: + remoteok = True + + # check to see if we can write + #rr = wr = rt = wt = 0 + if remote in wrdy: + sent = 0 + try: + try: + for x in xrange(maxbatch): + packet = pullPacket(fwbuf) + + if crypto_mode: + packet = encrypt_(packet, crypter) + + sentnow = rwrite(remote, packet) + sent += sentnow + #wr += 1 + + if not udp and 0 <= sentnow < len(packet): + # packet partially sent + # reschedule the remaining part + # this doesn't happen ever in udp mode + freschedule(fwbuf, buffer(packet,sentnow)) + + if not rnonblock or not fpacketReady(fwbuf): + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is rwrite + if e.errno in retrycodes: + # re-schedule packet + freschedule(fwbuf, packet) + else: + raise + except: + if reconnect is not None: + # in UDP mode, sometimes connected sockets can return a connection refused. + # Give the caller a chance to reconnect + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif not udp: + # in UDP mode, we ignore errors - packet loss man... + raise + #traceback.print_exc(file=sys.stderr) + + if bwlimit: + bwfree -= sent + if tun in wrdy: + try: + for x in xrange(maxtbatch): + packet = pullPacket(bkbuf) + twrite(tunfd, packet) + #wt += 1 + + # Do not inject packets into the TUN faster than they arrive, unless we're falling + # behind. TUN devices discard packets if their queue is full (tunkqueue), but they + # don't block either (they're always ready to write), so if we flood the device + # we'll have high packet loss. + if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf): + break + else: + if slowlocal: + # Give some time for the kernel to process the packets + time.sleep(0) + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is os_write + if e.errno in retrycodes: + # re-schedule packet + reschedule(bkbuf, packet) + else: + raise + + # check incoming data packets + if tun in rdrdy: + try: + for x in xrange(maxbatch): + packet = tread(tunfd,2000) # tun.read blocks until it gets 2k! + if not packet: + continue + #rt += 1 + fwbuf.append(packet) + + if not tnonblock or len(fwbuf) >= maxfwbuf: + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is os_read + if e.errno not in retrycodes: + raise + if remote in rdrdy: + try: + try: + for x in xrange(maxbatch): + packet = rread(remote,2000) + + #rr += 1 + + if crypto_mode: + packet = decrypt_(packet, crypter) + if not packet: + continue + elif not packet: + if not udp and packet == "": + # Connection broken, try to reconnect (or just die) + raise RuntimeError, "Connection broken" + else: + continue + + bkbuf.append(packet) + + if not rnonblock or len(bkbuf) >= maxbkbuf: + break + except OSError,e: + # This except handles the entire While block on PURPOSE + # as an optimization (setting a try/except block is expensive) + # The only operation that can raise this exception is rread + if e.errno not in retrycodes: + raise + except Exception, e: + if reconnect is not None: + # in UDP mode, sometimes connected sockets can return a connection refused + # on read. Give the caller a chance to reconnect + remote = reconnect() + if hasattr(remote, 'fileno'): + remote_fd = remote.fileno() + elif not udp: + # in UDP mode, we ignore errors - packet loss man... + raise + traceback.print_exc(file=sys.stderr) + + if bwlimit: + tnow = tget() + delta = tnow - lastbwtime + if delta > 0.001: + delta = int(bwlimit * delta) + if delta > 0: + bwfree = min(bwfree+delta, maxbwfree) + lastbwtime = tnow + + #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt) + +def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + retrydelay = 1.0 + for i in xrange(30): + # TERMINATE is a array. An item can be added to TERMINATE, from + # outside this function to force termination of the loop + if TERMINATE: + raise OSError, "Killed" + try: + rsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.bind((local_addr, local_port)) + print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port) + print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port) + rsock.connect((peer_addr, peer_port)) + return rsock + +def udp_handshake(TERMINATE, rsock): + endme = False + def keepalive(): + while not endme and not TERMINATE: + try: + rsock.send('') + except: + pass + time.sleep(1) + try: + rsock.send('') + except: + pass + keepalive_thread = threading.Thread(target=keepalive) + keepalive_thread.start() + for i in xrange(900): + if TERMINATE: + raise OSError, "Killed" + try: + heartbeat = rsock.recv(10) + break + except: + time.sleep(1) + else: + heartbeat = rsock.recv(10) + endme = True + keepalive_thread.join() + +def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr, + peer_port) + udp_handshake(TERMINATE, rsock) + return rsock + +def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port): + sock = None + retrydelay = 1.0 + # The peer has a firewall that prevents a response to the connect, we + # will be forever blocked in the connect, so we put a reasonable timeout. + rsock.settimeout(10) + # We wait for + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + rsock.connect((peer_addr, peer_port)) + sock = rsock + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + rsock.connect((peer_addr, peer_port)) + sock = rsock + if sock: + print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port) + sock.settimeout(0) + + print >>sys.stderr, "tcp_connect: disabling NAGLE" + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return sock + +def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port): + sock = None + retrydelay = 1.0 + # We try to bind to the local virtual interface. + # It might not exist yet so we wait in a loop. + for i in xrange(30): + if stop: + break + if TERMINATE: + raise OSError, "Killed" + try: + lsock.bind((local_addr, local_port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 + else: + lsock.bind((local_addr, local_port)) + + print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port) + # Now we wait until the other side connects. + # The other side might not be ready yet, so we also wait in a loop for timeouts. + timeout = 1 + lsock.listen(1) + for i in xrange(30): + if TERMINATE: + raise OSError, "Killed" + rlist, wlist, xlist = select.select([lsock], [], [], timeout) + if stop: + break + if lsock in rlist: + sock,raddr = lsock.accept() + print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port) + break + timeout += 5 + return sock + +def tcp_handshake(rsock, listen, hand): + # we are going to use a barrier algorithm to decide wich side listen. + # each side will "roll a dice" and send the resulting value to the other + # side. + win = False + rsock.settimeout(10) + try: + rsock.send(hand) + peer_hand = rsock.recv(4) + if not peer_hand: + print >>sys.stderr, "tcp_handshake: connection reset by peer" + return False + else: + print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand) + if hand < peer_hand: + if listen: + win = True + elif hand > peer_hand: + if not listen: + win = True + finally: + rsock.settimeout(0) + return win + +def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): + def listen(stop, hand, lsock, lresult): + win = False + rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port) + if rsock: + win = tcp_handshake(rsock, True, hand) + stop.append(True) + lresult.append((win, rsock)) + + def connect(stop, hand, rsock, rresult): + win = False + rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port) + if rsock: + win = tcp_handshake(rsock, False, hand) + stop.append(True) + rresult.append((win, rsock)) + + end = False + sock = None + for i in xrange(0, 50): + if end: + break + if TERMINATE: + raise OSError, "Killed" + hand = struct.pack("!L", random.randint(0, 2**30)) + stop = [] + lresult = [] + rresult = [] + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult)) + connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult)) + connect_thread.start() + listen_thread.start() + connect_thread.join() + listen_thread.join() + (lwin, lrsock) = lresult[0] + (rwin, rrsock) = rresult[0] + if not lrsock or not rrsock: + if not lrsock: + sock = rrsock + if not rrsock: + sock = lrsock + end = True + # both socket are connected + else: + if lwin: + sock = lrsock + end = True + elif rwin: + sock = rrsock + end = True + + if not sock: + raise OSError, "Error: tcp_establish could not establish connection." + return sock + + diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 493c8997..8b975acb 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -334,7 +334,8 @@ class LinuxApplication(ResourceManager): self.node.upload_command(command, shfile = shfile, - env = env) + env = env, + overwrite = False) def execute_deploy_command(self, command): if command: diff --git a/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp b/src/nepi/resources/planetlab/scripts/.pl-tap-create.py.swp deleted file mode 100644 index 9f6e2d644382a158450598c63a39601f8c3d0e22..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeI4TZ|-C8OKYOyWE6$jWK6?T)T&K*UauLV8)q+X0~Ux!)|xe-LuQOi)BsssqWhC zt}5$PP46UTffr&F6a>7yh(JV;2NfSg4Qiqff(V$HK!_LQqKWZ>7tq9b`G2RXx~peq zck#(+P9?wT?o;P{=R4oIpY!dmOz$u6V^`=i8lH~lU^!WF&_;bs;_sQ=|*2PQY#KG!ASwLAp zSwLApSwLApSwLApSwLApSwLApSwLCfzhD8gt!cq|n)WQ9`~PJB|IZ6G?J@8Wcn}!i z&EUxkH0=xEKJWqX4)DzR2m|+l!{ANeLO?Q~0k<3qYD?tvN4bB38d84L101WVcupLZ;SI>nE za056F=D{oHXxit&E#PBdFVMgfXKUIc;OjtuX)pzTe->;5?gDp$Yr(nTmuG6)^Wbh! z1MdObz~9f%wAa9M;BIgOI1aXhmvO-I5_k;!0Ne`pgJqBdFXDjZ=isa0cJN`~fIgT5 zZvk7tY2Yaw=6oAG3LXSEgQMUGsDmPSC)n&^Cwcm!!{(XKTdM}X=31`fOls-)fxk98 z3>b9n9%qglFcG-ktTuc>>2$$^!0vN5404x^gwU%zGuv$+840&_R5x2K?ghCh%BCVo zI$|%HJ;7xj6Kr7G0kd46^|=schYJ?C%=CSGl{0v%Fwb}4YJ$)%??-ycyn40>xRaZX zGNT~KjoeNQ`o)7>^kFz=pGUDK;s`8s>|;`89#C$u5~u8JM+AP@3T)RAQC61T>bZi) zMY8mPZwEYQTPdLy@^w9^5{DWVOqpldK{m;>VXoRv2hs!XTV@M(hDMZcHl0M8L;hJ6 zYeHYeyUxmXA~o!=3Y$lf!3!F^qR z7!-(lkx?0*Hn+^M7a&>&HLjCkGI>UoKA%B5&qVEnVs?3tdS#|U?UK48`r23@2?cLc zgOyF#Yz1s!2VKOnMbbSXStC)gMrf&VQ^{ot4ZO@03Q^4zsPQA9%|!q`MlCFZc0m&8 zNvR&!@+mZ5q~Qw+Vtn!O1?sQsZKe=*$3ZDp*5vCdDWy?fn`vxC8rzoCwm?04sB*g5 zrFA};VQZZGjMHF7-E?T;k&aA?PwK1UIy`?u<-AC_RG%2?HWoHACu$5s8r3?iV_=A* z(om((XG)H_0@F1r#pXt};wG}0g+!zT^^9Te!yidGKWVS!+Uiak3kR-$6b+Ha!;OW|Svg?+Py}oxY8fyt z%Sc(970ihW2C0Qe;;?w5k>OO;PC8~1y#(*{IV#;9q?(^BW1fwbn?~r@QQmkGpw=EW z>S4#Exg}lNo{L!r(WpC|xWHyqU<`(ZSoE;OsR1%wWLdo3(dZ&)>v*K1Iy&k2vfuWD z(Cnr9CbbBil`xv1gT!JO2B3Ikgv1k86tz2yDi}2jBxR^{GK?k~N0p4>ttzLknjA87 zAuTl!C_kf-Q064dRuF8C8uKnz@Yr0cQz5`&bLn9LAq_;9!rI=plj5m!vX zFW;%JT)s03%gM|qr5*yqo*{qBHE165F%(PkkfqQjSTPK`u0P9)J?CXq`En3oI9m6@B>jr^0J2f>J4D^l@ z>aO3JqJhQ{@MGSkN1_h8AroCU?6q0f#DXwh7`BI-{{m1gnk^4@auAa#Ppa@u_0BjA zvvPLqdp#So;Q$M5(+Q|#k&I?Mr;`O4ruR0M*}e_Qh-^n`yIAc)h#4>}+y{2AM*}NX z2v*q3C*{xK?HZiK<8{v@R`P4C2bXZ4t#V&rHj0Z)$}IXQMZ>iMXoFvF=(J6mu=0#Y z14EpI%_J2NTN`UZKT;c-AQhyOJmk_BNz{|ngo)J))t>0eb;0Haut+%(D>qApVlC4_ z%NSXP?0_}i6zyT$wr#B?R#-rr@$F7GV7Y}!wqttx3@caa;$1A9Rp`~iFXpMdX!?}0Ca zd%+#xlOP0Lup7J`yoxi2Uw}u!1K^9`E-(N#D1$4&<=`UlTioS84W0rIgIhok90k{b zt3eT53eE-RfY)%(|0Q68!(ayN0Dr|@|HI%Wa3i=9{0aB`_kxduTR{b!2A;${|2M%m zKnQk#?cf#M^FIVW4L$`L;5_gm?)hH;_kcRcfh_n3?)d)(ehTgZH-Q_$4EQzf_kRU` z2p$KY0M~&7;8O51?)0ArUjui5;{g9`Lvz6{@HTJ;I34^L_4r-z9q-u#KJLxt=-(}OMBTVH1fCIK@v{4=a$^!t-8Od#Y>Vj(&IJ)N>g((jJt-)@FN_1nJPe{R z>CvE3FN;>W*{ps9u2nKv7r5IFa@oU9cJk2l^&>YZioS8TH^F${!wq<=8#?4eTBd*(4Bo za1@Dcou@R8tCz}^W_`46j<=3EI0ul(#Dg>1(@)AhWFm?_lOjt`585c?womWaIV#f7 zML6>1C5y`KE*t^UMNPzHvl>Z_YUI%Mu^UQy`!Vi=LS8d61%1(1Y}hpA4;ZOI0jQKH zT>3B$9eefSqOn{lUpFjeqq=a7(P-97#eI?Q$%);8p-6Hh0O=X}p27dK#hTUK)mgSw zFR^N^S*}*LZedT#-7sNnifZiik18#2e9P3tls2D -a -n " @@ -135,7 +152,7 @@ if __name__ == '__main__': while not stop: try: - msg = recv_msg(conn) + (msg, args) = recv_msg(conn) except socket.timeout, e: # Ingore time-out continue @@ -147,8 +164,8 @@ if __name__ == '__main__': if msg == STOP_MSG: stop = True reply = stop_action() - else: - reply = reply_action(msg) + elif msg == PASSFD_MSG: + reply = passfd_action(fd, args) try: send_reply(conn, reply) diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py b/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py new file mode 100644 index 00000000..98fa9232 --- /dev/null +++ b/src/nepi/resources/planetlab/scripts/pl-vif-tunconnect.py @@ -0,0 +1,70 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import base64 +import errno +import passfd +import vsys +import socket +from optparse import OptionParser, SUPPRESS_HELP + +PASSFD_MSG = "PASSFD" + +def get_options(): + usage = ("usage: %prog -S ") + + parser = OptionParser(usage = usage) + + parser.add_option("-S", "--socket-name", dest="socket_name", + help = "Name for the unix socket used to interact with this process", + default = "tap.sock", type="str") + + (options, args) = parser.parse_args() + + return (options.socket_name) + +if __name__ == '__main__': + + (socket_name) = get_options() + + # Socket to recive the file descriptor + fdsock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + fdsock.bind("") + address = fdsock.getsockname() + + # vif-create-socket to send the PASSFD message + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(socket_name) + emsg = base64.b64encode(PASSFD_MSG) + eargs = base64.b64encode(address) + encoded = "%s|%s\n" % (emsg, eargs) + sock.send(encoded) + + # Receive fd + (fd, msg) = passfd.recvfd(fdsock) + + # Receive reply + reply = sock.recv(1024) + reply = base64.b64decode(reply) + + print reply, fd + + + + diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index f4afcf63..54d18637 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -28,8 +28,6 @@ import os import time # TODO: - routes!!! -# - Instead of doing an infinite loop, open a port for communication allowing -# to pass the fd to another process PYTHON_VSYS_VERSION = "1.0" @@ -89,50 +87,36 @@ class PlanetlabTap(LinuxApplication): return None def upload_sources(self): - depends = "mercurial make gcc" - self.set("depends", depends) + # upload vif-creation python script + pl_vif_create = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-create.py") - install = ( " ( " - " python -c 'import vsys, os; vsys.__version__ == \"%(version)s\" or os._exit(1)' " - " ) " - " ||" - " ( " - " cd ${SRC} ; " - " hg clone http://nepi.inria.fr/code/python-vsys ; " - " cd python-vsys ; " - " make all ; " - " sudo -S make install " - " )" ) % ({ - "version": PYTHON_VSYS_VERSION - }) + self.node.upload(pl_vif_create, + os.path.join(self.app_home, "pl-vif-create.py"), + overwrite = False) - self.set("install", install) + # upload vif-stop python script + pl_vif_stop = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-stop.py") - def upload_start_command(self): - # upload tap-creation python script - pl_tap_create = os.path.join(os.path.dirname(__file__), "scripts", - "pl-tap-create.py") - self.node.upload(pl_tap_create, - os.path.join(self.app_home, "pl-vif-create.py"), + self.node.upload(pl_vif_stop, + os.path.join(self.app_home, "pl-vif-stop.py"), overwrite = False) - # upload start.sh - start_command = self.replace_paths(self._start_command) - - self.info("Uploading command '%s'" % start_command) - - self.set("command", start_command) + # upload vif-connect python script + pl_vif_connect = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-tunconnect.py") - self.node.upload(start_command, - os.path.join(self.app_home, "start.sh"), - text = True, + self.node.upload(pl_vif_connect, + os.path.join(self.app_home, "pl-vif-connect.py"), overwrite = False) - # upload tap-stop python script - pl_tap_stop = os.path.join(os.path.dirname(__file__), "scripts", - "pl-tap-stop.py") - self.node.upload(pl_tap_stop, - os.path.join(self.app_home, "pl-vif-stop.py"), + # upload tun-connect python script + tunchannel = os.path.join(os.path.dirname(__file__), "..", "all", "scripts", + "tunchannel.py") + + self.node.upload(tunchannel, + os.path.join(self.app_home, "tunchannel.py"), overwrite = False) # upload stop.sh script @@ -142,6 +126,9 @@ class PlanetlabTap(LinuxApplication): text = True, overwrite = False) + def upload_start_command(self): + super(PlanetlabTap, self).upload_start_command() + # We want to make sure the device is up and running # before the deploy finishes (so things will be ready # before other stuff starts running). @@ -158,6 +145,14 @@ class PlanetlabTap(LinuxApplication): if not self.node or self.node.state < ResourceState.PROVISIONED: self.ec.schedule(reschedule_delay, self.deploy) else: + if not self.get("command"): + self.set("command", self._start_command) + + if not self.get("depends"): + self.set("depends", self._dependencies) + + if not self.get("install"): + self.set("install", self._install) try: self.discover() @@ -273,6 +268,38 @@ class PlanetlabTap(LinuxApplication): def sock_name(self): return os.path.join(self.run_home, "tap.sock") + @property + def _dependencies(self): + return "mercurial make gcc" + + @property + def _install(self): + install_vsys = ( " ( " + " python -c 'import vsys, os; vsys.__version__ == \"%(version)s\" or os._exit(1)' " + " ) " + " || " + " ( " + " cd ${SRC} ; " + " hg clone http://nepi.inria.fr/code/python-vsys ; " + " cd python-vsys ; " + " make all ; " + " sudo -S make install " + " )" ) % ({ + "version": PYTHON_VSYS_VERSION + }) + + install_passfd = ( " ( python -c 'import passfd' ) " + " || " + " ( " + " cd ${SRC} ; " + " hg clone http://nepi.inria.fr/code/python-passfd ; " + " cd python-passfd ; " + " make all ; " + " sudo -S make install " + " )" ) + + return "%s ; %s" % ( install_vsys, install_passfd ) + def valid_connection(self, guid): # TODO: Validate! return True diff --git a/test/execution/resource.py b/test/execution/resource.py index 365ca08c..091c43e1 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -125,6 +125,7 @@ class ResourceFactoryTestCase(unittest.TestCase): def test_add_resource_factory(self): from nepi.execution.resource import ResourceFactory + ResourceFactory._resource_types = dict() ResourceFactory.register_type(MyResource) ResourceFactory.register_type(AnotherResource) @@ -138,6 +139,11 @@ class ResourceFactoryTestCase(unittest.TestCase): self.assertEquals(len(AnotherResource._attributes), 0) self.assertEquals(len(ResourceFactory.resource_types()), 2) + + # restore factory state for other tests + from nepi.execution.resource import populate_factory + ResourceFactory._resource_types = dict() + populate_factory() class ResourceManagerTestCase(unittest.TestCase): def test_register_condition(self): diff --git a/test/resources/linux/nping.py b/test/resources/linux/nping.py old mode 100644 new mode 100755 diff --git a/test/resources/planetlab/tap.py b/test/resources/planetlab/tap.py index f08a54c9..67fa147b 100755 --- a/test/resources/planetlab/tap.py +++ b/test/resources/planetlab/tap.py @@ -28,7 +28,7 @@ import unittest class PlanetlabTapTestCase(unittest.TestCase): def setUp(self): - self.host = "nepi2.pl.sophia.inria.fr" + self.host = "nepi5.pl.sophia.inria.fr" self.user = "inria_nepi" @skipIfNotAlive -- 2.43.0