#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
# Alexandros Kouvakas
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, ResourceFactory, clsinit_copy, \
ResourceState
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
from nepi.util.timefuncs import tnow, tdiffsec
from nepi.resources.planetlab.vroute import PlanetlabVroute
from nepi.resources.planetlab.tap import PlanetlabTap
import os
import time
import socket
reschedule_delay = "0.5s"
@clsinit_copy
class OVSTunnel(LinuxApplication):
"""
.. class:: Class Args :
:param ec: The Experiment controller
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
:param creds: Credentials to communicate with the rm
:type creds: dict
"""
_rtype = "OVSTunnel"
_authorized_connections = ["OVSPort", "PlanetlabTap"]
@classmethod
def _register_attributes(cls):
""" Register the attributes of Connection RM
"""
network = Attribute("network", "IPv4 Network Address",
flags = Flags.ExecReadOnly)
cipher = Attribute("cipher",
"Cipher to encript communication. "
"One of PLAIN, AES, Blowfish, DES, DES3. ",
default = None,
allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
type = Types.Enumerate,
flags = Flags.ExecReadOnly)
cipher_key = Attribute("cipherKey",
"Specify a symmetric encryption key with which to protect "
"packets across the tunnel. python-crypto must be installed "
"on the system." ,
flags = Flags.ExecReadOnly)
txqueuelen = Attribute("txQueueLen",
"Specifies the interface's transmission queue length. "
"Defaults to 1000. ",
type = Types.Integer,
flags = Flags.ExecReadOnly)
bwlimit = Attribute("bwLimit",
"Specifies the interface's emulated bandwidth in bytes "
"per second.",
type = Types.Integer,
flags = Flags.ExecReadOnly)
cls._register_attribute(network)
cls._register_attribute(cipher)
cls._register_attribute(cipher_key)
cls._register_attribute(txqueuelen)
cls._register_attribute(bwlimit)
def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
"""
super(OVSTunnel, self).__init__(ec, guid)
self._home = "tunnel-%s" % self.guid
self.port_info_tunl = []
self._nodes = []
self._pid = None
self._ppid = None
self._vroute = None
def log_message(self, msg):
return " guid %d - Tunnel - %s " % (self.guid, msg)
@property
def node(self):
if self._nodes:
return self._nodes[0]
def app_home(self, node):
return os.path.join(self.node.exp_home, self._home)
def run_home(self, node):
return os.path.join(self.app_home(node), self.ec.run_id)
def port_endpoints(self):
# Switch-Switch connection
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
if hasattr(rm, "create_port"):
connected.append(rm)
return connected
def mixed_endpoints(self):
# Switch-Host connection
connected = [1, 2]
for guid in self.connections:
rm = self.ec.get_resource(guid)
if hasattr(rm, "create_port"):
connected[0] = rm
elif hasattr(rm, "udp_connect_command"):
connected[1] = rm
return connected
def get_node(self, endpoint):
# Get connected to the nodes
res = []
if hasattr(endpoint, "create_port"):
rm_list = endpoint.get_connected(OVSWitch.get_rtype())
if rm_list:
rm = rm_list[0].get_connected(PlanetlabNode.get_rtype())
else:
rm = endpoint.get_connected(PlanetlabNode.get_rtype())
if rm :
res.append(rm[0])
return res
@property
def endpoint1(self):
if self.check_endpoints:
port_endpoints = self.port_endpoints()
if port_endpoints: return port_endpoints[0]
else:
mixed_endpoints = self.mixed_endpoints()
if mixed_endpoints: return mixed_endpoints[0]
@property
def endpoint2(self):
if self.check_endpoints:
port_endpoints = self.port_endpoints()
if port_endpoints: return port_endpoints[1]
else:
mixed_endpoints = self.mixed_endpoints()
if mixed_endpoints: return mixed_endpoints[1]
@property
def check_endpoints(self):
""" Check if the links are between switches
or switch-host. Return False for latter.
"""
port_endpoints = self.port_endpoints()
if len(port_endpoints) == 2:
return True
return False
def get_port_info(self, endpoint, rem_endpoint):
""" Retrieve the port_info list for each port
:param port_info_tunl: [hostname, publ_IP_addr, port_name,
virtual_ip, local_port_Numb]
:type port_info_tunl: list
"""
self.port_info_tunl = []
if self.check_endpoints:
# Use for the link switch-->switch
self.port_info_tunl.append(endpoint.port_info)
host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
self.port_info_tunl.append(rem_endpoint.port_info)
host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
return (pname0, ip1, pnumber1)
# Use for the link host-->switch
self.port_info_tunl.append(endpoint.port_info)
host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
return pnumber0
def udp_connect(self, endpoint, rem_endpoint):
# Collect info from rem_endpoint
self._nodes = self.get_node(rem_endpoint)
remote_ip = socket.gethostbyname(self.node.get("hostname"))
# Collect info from endpoint
self._nodes = self.get_node(endpoint)
local_port_file = os.path.join(self.run_home(self.node),
"local_port")
remote_port_file = os.path.join(self.run_home(self.node),
"remote_port")
ret_file = os.path.join(self.run_home(self.node),
"ret_file")
cipher = self.get("cipher")
cipher_key = self.get("cipherKey")
bwlimit = self.get("bwLimit")
txqueuelen = self.get("txQueueLen")
rem_port = str(self.get_port_info(rem_endpoint, endpoint))
# Upload the remote port in a file
self.node.upload(rem_port,
remote_port_file,
text = True,
overwrite = False)
udp_connect_command = endpoint.udp_connect_command(
remote_ip, local_port_file, remote_port_file,
ret_file, cipher, cipher_key, bwlimit, txqueuelen)
# upload command to host_connect.sh script
shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
self.node.upload(udp_connect_command,
shfile,
text = True,
overwrite = False)
# invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "udp_stdout",
stderr = "udp_stderr")
# check if execution errors
msg = "Failed to connect endpoints"
if proc.poll():
self.error(msg, out, err)
raise RuntimeError, msg
msg = "Connection on host %s configured" \
% self.node.get("hostname")
self.debug(msg)
# Wait for pid file to be generated
self._nodes = self.get_node(endpoint)
pid, ppid = self.node.wait_pid(self.run_home(self.node))
# If the process is not running, check for error information
# on the remote machine
if not pid or not ppid:
(out, err), proc = self.node.check_errors(self.run_home(self.node))
# Out is what was written in the stderr file
if err:
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
return (pid, ppid)
def switch_connect(self, endpoint, rem_endpoint):
""" Get switch connect command
"""
# Get and configure switch connection command
(local_port_name, remote_ip, remote_port_num) = self.get_port_info(
endpoint, rem_endpoint)
switch_connect_command = endpoint.switch_connect_command(
local_port_name, remote_ip, remote_port_num)
self._nodes = self.get_node(endpoint)
# Upload command to the file sw_connect.sh
shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
self.node.upload(switch_connect_command,
shfile,
text = True,
overwrite = False)
#invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "sw_stdout",
stderr = "sw_stderr")
# check if execution errors occured
if proc.poll():
msg = "Failed to connect endpoints"
self.error(msg, out, err)
raise RuntimeError, msg
# For debugging
msg = "Connection on port %s configured" % local_port_name
self.info(msg)
def wait_local_port(self):
""" Waits until the if_name file for the command is generated,
and returns the if_name for the device """
local_port = None
delay = 1.0
for i in xrange(10):
(out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
if out:
local_port = int(out)
break
else:
time.sleep(delay)
delay = delay * 1.5
else:
msg = "Couldn't retrieve local_port"
self.error(msg, out, err)
raise RuntimeError, msg
return local_port
def sw_host_connect(self, endpoint, rem_endpoint):
"""Link switch--> host
"""
# Retrieve remote port number from rem_endpoint
local_port_name = endpoint.get('port_name')
self._nodes = self.get_node(rem_endpoint)
# time.sleep(4) # Without this, sometimes I get nothing in remote_port_num
out = err= ''
remote_port_num = self.wait_local_port()
remote_ip = socket.gethostbyname(self.node.get("hostname"))
switch_connect_command = endpoint.switch_connect_command(
local_port_name, remote_ip, remote_port_num)
# Upload command to the file sw_connect.sh
self._nodes = self.get_node(endpoint)
shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
self.node.upload(switch_connect_command,
shfile,
text = True,
overwrite = False)
# Invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "sw_stdout",
stderr = "sw_stderr")
# Check if execution errors occured
if proc.poll():
msg = "Failed to connect endpoints"
self.error(msg, out, err)
raise RuntimeError, msg
# For debugging
msg = "Connection on port %s configured" % local_port_name
self.debug(msg)
def do_provision(self):
""" Provision the tunnel
"""
# Create folders
self._nodes = self.get_node(self.endpoint1)
self.node.mkdir(self.run_home(self.node))
self._nodes = self.get_node(self.endpoint2)
self.node.mkdir(self.run_home(self.node))
if self.check_endpoints:
#Invoke connect script between switches
self.switch_connect(self.endpoint1, self.endpoint2)
self.switch_connect(self.endpoint2, self.endpoint1)
else:
# Invoke connect script between switch & host
(self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
self.sw_host_connect(self.endpoint1, self.endpoint2)
super(OVSTunnel, self).do_provision()
@property
def tap(self):
rclass = ResourceFactory.get_resource_type(PlanetlabTap.get_rtype())
for guid in self.connections:
rm = self.ec.get_resource(guid)
if isinstance(rm, rclass):
return rm
@property
def ovswitch(self):
for guid in self.connections:
rm_port = self.ec.get_resource(guid)
if hasattr(rm_port, "create_port"):
rm_list = rm_port.get_connected(OVSWitch.get_rtype())
if rm_list:
return rm_list[0]
def configure(self):
if not self.check_endpoints:
self._vroute = self.ec.register_resource("PlanetlabVroute")
self.ec.set(self._vroute, "action", "add")
self.ec.set(self._vroute, "network", self.get("network"))
self.ec.register_connection(self._vroute, self.tap.guid)
# schedule deploy
self.ec.deploy(guids=[self._vroute], group = self.deployment_group)
def do_deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
return
self.do_discover()
self.do_provision()
self.configure()
super(OVSTunnel, self).do_deploy()
def do_release(self):
""" Release the udp_tunnel on endpoint2.
On endpoint1 means nothing special.
"""
if not self.check_endpoints:
# Kill the TAP devices
# TODO: Make more generic Release method of PLTAP
if self._pid and self._ppid:
self._nodes = self.get_node(self.endpoint2)
(out, err), proc = self.node.kill(self._pid,
self._ppid, sudo = True)
if err or proc.poll():
# check if execution errors occurred
msg = " Failed to delete TAP device"
self.error(msg, err, err)
super(OVSTunnel, self).do_release()