#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
# Alexandros Kouvakas
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
ResourceState, failtrap
from nepi.resources.linux.application import LinuxApplication
from nepi.resources.planetlab.node import PlanetlabNode
from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
from nepi.util.timefuncs import tnow, tdiffsec
import os
import time
import socket
reschedule_delay = "0.5s"
@clsinit_copy
class OVSTunnel(LinuxApplication):
"""
.. class:: Class Args :
:param ec: The Experiment controller
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
:param creds: Credentials to communicate with the rm
:type creds: dict
"""
_rtype = "OVSTunnel"
_authorized_connections = ["OVSPort", "PlanetlabTap"]
@classmethod
def _register_attributes(cls):
""" Register the attributes of Connection RM
"""
cipher = Attribute("cipher",
"Cipher to encript communication. "
"One of PLAIN, AES, Blowfish, DES, DES3. ",
default = None,
allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
type = Types.Enumerate,
flags = Flags.ExecReadOnly)
cipher_key = Attribute("cipherKey",
"Specify a symmetric encryption key with which to protect "
"packets across the tunnel. python-crypto must be installed "
"on the system." ,
flags = Flags.ExecReadOnly)
txqueuelen = Attribute("txQueueLen",
"Specifies the interface's transmission queue length. "
"Defaults to 1000. ",
type = Types.Integer,
flags = Flags.ExecReadOnly)
bwlimit = Attribute("bwLimit",
"Specifies the interface's emulated bandwidth in bytes "
"per second.",
type = Types.Integer,
flags = Flags.ExecReadOnly)
cls._register_attribute(cipher)
cls._register_attribute(cipher_key)
cls._register_attribute(txqueuelen)
cls._register_attribute(bwlimit)
def __init__(self, ec, guid):
"""
:param ec: The Experiment controller
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
"""
super(OVSTunnel, self).__init__(ec, guid)
self._home = "tunnel-%s" % self.guid
self.port_info_tunl = []
self._nodes = []
self._pid = None
self._ppid = None
@property
def node(self):
return self._nodes[0]
def app_home(self, node):
return os.path.join(node.exp_home, self._home)
def run_home(self, node):
return os.path.join(self.app_home(node), self.ec.run_id)
def port_endpoints(self):
# Switch-Switch connection
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
if hasattr(rm, "create_port"):
connected.append(rm)
return connected
def mixed_endpoints(self):
# Switch-Host connection
connected = [1, 2]
for guid in self.connections:
rm = self.ec.get_resource(guid)
if hasattr(rm, "create_port"):
connected[0] = rm
elif hasattr(rm, "udp_connect_command"):
connected[1] = rm
return connected
def get_node(self, endpoint):
# Get connected to the nodes
if hasattr(endpoint, "create_port"):
res = []
rm_list = endpoint.get_connected(OVSWitch.rtype())
if rm_list:
rm = rm_list[0].get_connected(PlanetlabNode.rtype())
if rm:
res.append(rm[0])
return res
else:
res = []
rm = endpoint.get_connected(PlanetlabNode.rtype())
if rm :
res.append(rm[0])
return res
@property
def endpoint1(self):
if self.check_endpoints():
port_endpoints = self.port_endpoints()
if port_endpoints: return port_endpoints[0]
else:
mixed_endpoints = self.mixed_endpoints()
if mixed_endpoints: return mixed_endpoints[0]
@property
def endpoint2(self):
if self.check_endpoints():
port_endpoints = self.port_endpoints()
if port_endpoints: return port_endpoints[1]
else:
mixed_endpoints = self.mixed_endpoints()
if mixed_endpoints: return mixed_endpoints[1]
def check_endpoints(self):
""" Check if the links are between switches
or switch-host. Return False for latter.
"""
port_endpoints = self.port_endpoints()
if len(port_endpoints) == 2:
return True
else:
return False
def get_port_info(self, endpoint, rem_endpoint):
""" Retrieve the port_info list for each port
:param port_info_tunl: [hostname, publ_IP_addr, port_name,
virtual_ip, local_port_Numb]
:type port_info_tunl: list
"""
self.port_info_tunl = []
if self.check_endpoints():
# Use for the link switch-->switch
self.port_info_tunl.append(endpoint.port_info)
host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
self.port_info_tunl.append(rem_endpoint.port_info)
host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
return (pname0, ip1, pnumber1)
else:
# Use for the link host-->switch
self.port_info_tunl.append(endpoint.port_info)
host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
return pnumber0
def udp_connect(self, endpoint, rem_endpoint):
# Collect info from rem_endpoint
self._nodes = self.get_node(rem_endpoint)
remote_ip = socket.gethostbyname(self.node.get("hostname"))
# Collect info from endpoint
self._nodes = self.get_node(endpoint)
local_port_file = os.path.join(self.run_home(self.node),
"local_port")
remote_port_file = os.path.join(self.run_home(self.node),
"remote_port")
ret_file = os.path.join(self.run_home(self.node),
"ret_file")
cipher = self.get("cipher")
cipher_key = self.get("cipherKey")
bwlimit = self.get("bwLimit")
txqueuelen = self.get("txQueueLen")
rem_port = str(self.get_port_info(rem_endpoint, endpoint))
# Upload the remote port in a file
self.node.upload(rem_port,
remote_port_file,
text = True,
overwrite = False)
udp_connect_command = endpoint.udp_connect_command(
remote_ip, local_port_file, remote_port_file,
ret_file, cipher, cipher_key, bwlimit, txqueuelen)
# upload command to host_connect.sh script
shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
self.node.upload(udp_connect_command,
shfile,
text = True,
overwrite = False)
# invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "udp_stdout",
stderr = "udp_stderr")
# check if execution errors
msg = "Failed to connect endpoints"
if proc.poll():
self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
msg = "Connection on host %s configured" \
% self.node.get("hostname")
self.info(msg)
# Wait for pid file to be generated
self._nodes = self.get_node(endpoint)
pid, ppid = self.node.wait_pid(self.run_home(self.node))
# If the process is not running, check for error information
# on the remote machine
if not pid or not ppid:
(out, err), proc = self.node.check_errors(self.run_home(self.node))
# Out is what was written in the stderr file
if err:
self.fail()
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
return (pid, ppid)
def switch_connect(self, endpoint, rem_endpoint):
""" Get switch connect command
"""
# Get and configure switch connection command
(local_port_name, remote_ip, remote_port_num) = self.get_port_info(
endpoint, rem_endpoint)
switch_connect_command = endpoint.switch_connect_command(
local_port_name, remote_ip, remote_port_num)
self._nodes = self.get_node(endpoint)
# Upload command to the file sw_connect.sh
shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
self.node.upload(switch_connect_command,
shfile,
text = True,
overwrite = False)
#invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "sw_stdout",
stderr = "sw_stderr")
# check if execution errors occured
msg = "Failed to connect endpoints"
if proc.poll():
self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
else:
msg = "Connection on port %s configured" % local_port_name
self.info(msg)
return
def sw_host_connect(self, endpoint, rem_endpoint):
"""Link switch--> host
"""
# Retrieve remote port number from rem_endpoint
local_port_name = endpoint.get('port_name')
self._nodes = self.get_node(rem_endpoint)
time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
remote_port_num = ''
(out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
remote_port_num = int(out)
remote_ip = socket.gethostbyname(self.node.get("hostname"))
switch_connect_command = endpoint.switch_connect_command(
local_port_name, remote_ip, remote_port_num)
# Upload command to the file sw_connect.sh
self._nodes = self.get_node(endpoint)
shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
self.node.upload(switch_connect_command,
shfile,
text = True,
overwrite = False)
#invoke connect script
cmd = "bash %s" % shfile
(out, err), proc = self.node.run(cmd, self.run_home(self.node),
sudo = True,
stdout = "sw_stdout",
stderr = "sw_stderr")
# check if execution errors occured
msg = "Failed to connect endpoints"
if proc.poll():
self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
else:
msg = "Connection on port %s configured" % local_port_name
self.info(msg)
return
@failtrap
def provision(self):
""" Provision the tunnel
"""
# Create folders
self._nodes = self.get_node(self.endpoint1)
self.node.mkdir(self.run_home(self.node))
self._nodes = self.get_node(self.endpoint2)
self.node.mkdir(self.run_home(self.node))
if self.check_endpoints():
#Invoke connect script between switches
switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2)
switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1)
else:
# Invoke connect script between switch & host
(self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
super(OVSTunnel, self).provision()
@failtrap
def deploy(self):
if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
(not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
self.ec.schedule(reschedule_delay, self.deploy)
else:
self.discover()
self.provision()
super(OVSTunnel, self).deploy()
def release(self):
""" Release the udp_tunnel on endpoint2.
On endpoint1 means nothing special.
"""
try:
if not self.check_endpoints():
# Kill the TAP devices
# TODO: Make more generic Release method of PLTAP
if self._pid and self._ppid:
self._nodes = self.get_node(self.endpoint2)
(out, err), proc = self.node.kill(self._pid,
self._ppid, sudo = True)
if err or proc.poll():
# check if execution errors occurred
msg = " Failed to delete TAP device"
self.error(msg, err, err)
self.fail()
except:
import traceback
err = traceback.format_exc()
self.error(err)
super(OVSTunnel, self).release()