X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fplanetlab%2Fopenvswitch%2Ftunnel.py;h=682d796e64a184392240f8b21f4b2ac0522b8500;hb=4c05030074b51677a854bfd9d71c9a83de44c90c;hp=c1f81fe948d0357d45ecacd947f8188389914325;hpb=99d8b2a4431d8fafd0385e189375106d46f1abd9;p=nepi.git diff --git a/src/nepi/resources/planetlab/openvswitch/tunnel.py b/src/nepi/resources/planetlab/openvswitch/tunnel.py index c1f81fe9..682d796e 100644 --- a/src/nepi/resources/planetlab/openvswitch/tunnel.py +++ b/src/nepi/resources/planetlab/openvswitch/tunnel.py @@ -15,16 +15,20 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -# Author: Alina Quereilhac -# Alexandros Kouvakas +# Authors: Alina Quereilhac +# Alexandros Kouvakas +# Julien Tribino + from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, failtrap +from nepi.execution.resource import ResourceManager, ResourceFactory, clsinit_copy, \ + ResourceState from nepi.resources.linux.application import LinuxApplication from nepi.resources.planetlab.node import PlanetlabNode -from nepi.resources.planetlab.openvswitch.ovs import OVSWitch -from nepi.util.timefuncs import tnow, tdiffsec +from nepi.resources.planetlab.openvswitch.ovs import OVSSwitch +from nepi.util.timefuncs import tnow, tdiffsec +from nepi.resources.planetlab.vroute import PlanetlabVroute +from nepi.resources.planetlab.tap import PlanetlabTap import os import time @@ -51,35 +55,39 @@ class OVSTunnel(LinuxApplication): @classmethod def _register_attributes(cls): - """ Register the attributes of Connection RM + """ Register the attributes of OVSTunnel RM """ + network = Attribute("network", "IPv4 Network Address", + flags = Flags.Design) + cipher = Attribute("cipher", "Cipher to encript communication. " "One of PLAIN, AES, Blowfish, DES, DES3. ", default = None, allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"], type = Types.Enumerate, - flags = Flags.ExecReadOnly) + flags = Flags.Design) cipher_key = Attribute("cipherKey", "Specify a symmetric encryption key with which to protect " "packets across the tunnel. python-crypto must be installed " "on the system." , - flags = Flags.ExecReadOnly) + flags = Flags.Design) txqueuelen = Attribute("txQueueLen", "Specifies the interface's transmission queue length. " "Defaults to 1000. ", type = Types.Integer, - flags = Flags.ExecReadOnly) + flags = Flags.Design) bwlimit = Attribute("bwLimit", "Specifies the interface's emulated bandwidth in bytes " "per second.", type = Types.Integer, - flags = Flags.ExecReadOnly) + flags = Flags.Design) + cls._register_attribute(network) cls._register_attribute(cipher) cls._register_attribute(cipher_key) cls._register_attribute(txqueuelen) @@ -96,13 +104,14 @@ class OVSTunnel(LinuxApplication): super(OVSTunnel, self).__init__(ec, guid) self._home = "tunnel-%s" % self.guid self.port_info_tunl = [] - self._nodes = [] self._pid = None self._ppid = None + self._vroute = None + self._node_endpoint1 = None + self._node_endpoint2 = None - @property - def node(self): - return self._nodes[0] + def log_message(self, msg): + return " guid %d - Tunnel - %s " % (self.guid, msg) def app_home(self, node): return os.path.join(node.exp_home, self._home) @@ -110,296 +119,273 @@ class OVSTunnel(LinuxApplication): def run_home(self, node): return os.path.join(self.app_home(node), self.ec.run_id) - def port_endpoints(self): - # Switch-Switch connection - connected = [] + @property + def tap(self): + """ Return the Tap RM if it exists """ + rclass = ResourceFactory.get_resource_type(PlanetlabTap.get_rtype()) for guid in self.connections: rm = self.ec.get_resource(guid) - if hasattr(rm, "create_port"): - connected.append(rm) - return connected + if isinstance(rm, rclass): + return rm + + @property + def ovsswitch(self): + """ Return the 1st switch """ + for guid in self.connections: + rm_port = self.ec.get_resource(guid) + if hasattr(rm_port, "create_port"): + rm_list = rm_port.get_connected(OVSSwitch.get_rtype()) + if rm_list: + return rm_list[0] + + @property + def check_switch_host_link(self): + """ Check if the links are between switches + or switch-host. Return False for the latter. + """ + if self.tap : + return True + return False - def mixed_endpoints(self): - # Switch-Host connection - connected = [1, 2] + + def endpoints(self): + """ Return the list with the two connected elements. + Either Switch-Switch or Switch-Host + """ + connected = [1, 1] + position = 0 for guid in self.connections: rm = self.ec.get_resource(guid) if hasattr(rm, "create_port"): - connected[0] = rm - elif hasattr(rm, "udp_connect_command"): + connected[position] = rm + position += 1 + elif hasattr(rm, "udp_connect"): connected[1] = rm return connected def get_node(self, endpoint): - # Get connected to the nodes + """ Get the nodes of the endpoint + """ + rm = [] if hasattr(endpoint, "create_port"): - res = [] - rm_list = endpoint.get_connected(OVSWitch.rtype()) + rm_list = endpoint.get_connected(OVSSwitch.get_rtype()) if rm_list: - rm = rm_list[0].get_connected(PlanetlabNode.rtype()) - if rm: - res.append(rm[0]) - return res + rm = rm_list[0].get_connected(PlanetlabNode.get_rtype()) else: - res = [] - rm = endpoint.get_connected(PlanetlabNode.rtype()) - if rm : - res.append(rm[0]) - return res + rm = endpoint.get_connected(PlanetlabNode.get_rtype()) + + if rm : + return rm[0] @property def endpoint1(self): - if self.check_endpoints(): - port_endpoints = self.port_endpoints() - if port_endpoints: return port_endpoints[0] - else: - mixed_endpoints = self.mixed_endpoints() - if mixed_endpoints: return mixed_endpoints[0] + """ Return the first endpoint : Always a Switch + """ + endpoint = self.endpoints() + return endpoint[0] @property def endpoint2(self): - if self.check_endpoints(): - port_endpoints = self.port_endpoints() - if port_endpoints: return port_endpoints[1] - else: - mixed_endpoints = self.mixed_endpoints() - if mixed_endpoints: return mixed_endpoints[1] - - def check_endpoints(self): - """ Check if the links are between switches - or switch-host. Return False for latter. + """ Return the second endpoint : Either a Switch or a TAP """ - port_endpoints = self.port_endpoints() - if len(port_endpoints) == 2: - return True - else: - return False + endpoint = self.endpoints() + return endpoint[1] - def get_port_info(self, endpoint, rem_endpoint): + def get_port_info(self, endpoint1, endpoint2): + #TODO : Need to change it. Really bad to have method that return different type of things !!!!! """ Retrieve the port_info list for each port - :param port_info_tunl: [hostname, publ_IP_addr, port_name, - virtual_ip, local_port_Numb] - :type port_info_tunl: list """ - self.port_info_tunl = [] - if self.check_endpoints(): - # Use for the link switch-->switch - self.port_info_tunl.append(endpoint.port_info) - host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0] - self.port_info_tunl.append(rem_endpoint.port_info) - host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1] - return (pname0, ip1, pnumber1) - - else: - # Use for the link host-->switch - self.port_info_tunl.append(endpoint.port_info) - host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0] + if self.check_switch_host_link : + host0, ip0, pname0, virt_ip0, pnumber0 = endpoint1.port_info return pnumber0 - - def udp_connect(self, endpoint, rem_endpoint): - # Collect info from rem_endpoint - self._nodes = self.get_node(rem_endpoint) - remote_ip = socket.gethostbyname(self.node.get("hostname")) - # Collect info from endpoint - self._nodes = self.get_node(endpoint) - local_port_file = os.path.join(self.run_home(self.node), - "local_port") - remote_port_file = os.path.join(self.run_home(self.node), - "remote_port") - ret_file = os.path.join(self.run_home(self.node), - "ret_file") - cipher = self.get("cipher") - cipher_key = self.get("cipherKey") - bwlimit = self.get("bwLimit") - txqueuelen = self.get("txQueueLen") - - rem_port = str(self.get_port_info(rem_endpoint, endpoint)) - # Upload the remote port in a file - self.node.upload(rem_port, - remote_port_file, - text = True, - overwrite = False) - - udp_connect_command = endpoint.udp_connect_command( - remote_ip, local_port_file, remote_port_file, - ret_file, cipher, cipher_key, bwlimit, txqueuelen) - - # upload command to host_connect.sh script - shfile = os.path.join(self.app_home(self.node), "host_connect.sh") - self.node.upload(udp_connect_command, - shfile, - text = True, - overwrite = False) - - # invoke connect script - cmd = "bash %s" % shfile - (out, err), proc = self.node.run(cmd, self.run_home(self.node), - sudo = True, - stdout = "udp_stdout", - stderr = "udp_stderr") - # check if execution errors - msg = "Failed to connect endpoints" + host0, ip0, pname0, virt_ip0, pnumber0 = endpoint1.port_info + host1, ip1, pname1, virt_ip1, pnumber1 = endpoint2.port_info - if proc.poll(): - self.fail() + return pname0, ip1, pnumber1 + + def wait_local_port(self, node_endpoint): + """ Waits until the if_name file for the command is generated, + and returns the if_name for the device """ + + local_port = None + delay = 1.0 + + #TODO : Need to change it with reschedule to avoid the problem + # of the order of connection + for i in xrange(10): + (out, err), proc = node_endpoint.check_output(self.run_home(node_endpoint), 'local_port') + if out: + local_port = int(out) + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = "Couldn't retrieve local_port" self.error(msg, out, err) raise RuntimeError, msg - msg = "Connection on host %s configured" \ - % self.node.get("hostname") - self.info(msg) - - # Wait for pid file to be generated - self._nodes = self.get_node(endpoint) - pid, ppid = self.node.wait_pid(self.run_home(self.node)) - - # If the process is not running, check for error information - # on the remote machine - if not pid or not ppid: - (out, err), proc = self.node.check_errors(self.run_home(self.node)) - # Out is what was written in the stderr file - if err: - self.fail() - msg = " Failed to start command '%s' " % command - self.error(msg, out, err) - raise RuntimeError, msg - - return (pid, ppid) - - def switch_connect(self, endpoint, rem_endpoint): - """ Get switch connect command + return local_port + + def connection(self, local_endpoint, rm_endpoint): + """ Create the connect command for each case : + - Host - Switch, + - Switch - Switch, + - Switch - Host """ - # Get and configure switch connection command - (local_port_name, remote_ip, remote_port_num) = self.get_port_info( - endpoint, rem_endpoint) - switch_connect_command = endpoint.switch_connect_command( - local_port_name, remote_ip, remote_port_num) - self._nodes = self.get_node(endpoint) - - # Upload command to the file sw_connect.sh - shfile = os.path.join(self.app_home(self.node), "sw_connect.sh") - self.node.upload(switch_connect_command, - shfile, - text = True, - overwrite = False) + local_node = self.get_node(local_endpoint) + local_node.mkdir(self.run_home(local_node)) - #invoke connect script - cmd = "bash %s" % shfile - (out, err), proc = self.node.run(cmd, self.run_home(self.node), - sudo = True, - stdout = "sw_stdout", - stderr = "sw_stderr") - - # check if execution errors occured - msg = "Failed to connect endpoints" + rm_node = self.get_node(rm_endpoint) + rm_node.mkdir(self.run_home(rm_node)) - if proc.poll(): - self.fail() - self.error(msg, out, err) - raise RuntimeError, msg - else: - msg = "Connection on port %s configured" % local_port_name - self.info(msg) - return + # Host to switch + if self.check_switch_host_link and local_endpoint == self.endpoint2 : + # Collect info from rem_endpoint + remote_ip = socket.gethostbyname(rm_node.get("hostname")) - def sw_host_connect(self, endpoint, rem_endpoint): - """Link switch--> host + # Collect info from endpoint + connection_run_home = self.run_home(local_node) + connection_app_home = self.app_home(local_node) + cipher = self.get("cipher") + cipher_key = self.get("cipherKey") + bwlimit = self.get("bwLimit") + txqueuelen = self.get("txQueueLen") + + + # Upload the remote port in a file + rem_port = str(self.get_port_info(rm_endpoint,local_endpoint)) + rem_port_file = os.path.join(self.run_home(local_node), "remote_port") + local_node.upload(rem_port, rem_port_file, + text = True, + overwrite = False) + + self._pid, self._ppid = local_endpoint.udp_connect( + rm_node, connection_run_home, connection_app_home, + cipher, cipher_key, bwlimit, txqueuelen) + + +# connect_command = local_endpoint.udp_connect_command( +# remote_ip, local_port_file, rem_port_file, +# ret_file, cipher, cipher_key, bwlimit, txqueuelen) + +# self.connection_command(connect_command, local_node, rm_node) + +# # Wait for pid file to be generated +# self._pid, self._ppid = local_node.wait_pid(self.run_home(local_node)) + + if not self._pid or not self._ppid: + (out, err), proc = local_node.check_errors(self.run_home(local_node)) + # Out is what was written in the stderr file + if err: + msg = " Failed to start connection of the OVS Tunnel " + self.error(msg, out, err) + raise RuntimeError, msg + return + + # Switch to Host + if self.check_switch_host_link and local_endpoint == self.endpoint1: + local_port_name = local_endpoint.get('port_name') + remote_port_num = self.wait_local_port(rm_node) + remote_ip = socket.gethostbyname(rm_node.get("hostname")) + + # Switch to Switch + if not self.check_switch_host_link : + local_port_name, remote_ip, remote_port_num = self.get_port_info(local_endpoint, rm_endpoint) + + connect_command = local_endpoint.switch_connect_command( + local_port_name, remote_ip, remote_port_num) + + self.connection_command(connect_command, local_node, rm_node) + + def connection_command(self, command, node_endpoint, rm_node_endpoint): + """ Execute the connection command on the node and check if the processus is + correctly running on the node. """ - # Retrieve remote port number from rem_endpoint - local_port_name = endpoint.get('port_name') - self._nodes = self.get_node(rem_endpoint) - time.sleep(2) # Without this, sometimes I get nothing in remote_port_num - remote_port_num = '' - (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port') - remote_port_num = int(out) - remote_ip = socket.gethostbyname(self.node.get("hostname")) - switch_connect_command = endpoint.switch_connect_command( - local_port_name, remote_ip, remote_port_num) - - # Upload command to the file sw_connect.sh - self._nodes = self.get_node(endpoint) - shfile = os.path.join(self.app_home(self.node), "sw_connect.sh") - self.node.upload(switch_connect_command, + shfile = os.path.join(self.app_home(node_endpoint), "sw_connect.sh") + node_endpoint.upload(command, shfile, text = True, overwrite = False) - #invoke connect script + # Invoke connect script + out = err= '' cmd = "bash %s" % shfile - (out, err), proc = self.node.run(cmd, self.run_home(self.node), + (out, err), proc = node_endpoint.run(cmd, self.run_home(node_endpoint), sudo = True, stdout = "sw_stdout", stderr = "sw_stderr") - # check if execution errors occured - msg = "Failed to connect endpoints" + # Check if execution errors occured if proc.poll(): - self.fail() + msg = "Failed to connect endpoints" self.error(msg, out, err) raise RuntimeError, msg - else: - msg = "Connection on port %s configured" % local_port_name - self.info(msg) - return - @failtrap - def provision(self): + # For debugging + msg = "Connection on port configured" + self.debug(msg) + + def do_provision(self): """ Provision the tunnel """ - # Create folders - self._nodes = self.get_node(self.endpoint1) - self.node.mkdir(self.run_home(self.node)) - self._nodes = self.get_node(self.endpoint2) - self.node.mkdir(self.run_home(self.node)) - - if self.check_endpoints(): - #Invoke connect script between switches - switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2) - switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1) - - else: - # Invoke connect script between switch & host - (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1) - switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2) - - super(OVSTunnel, self).provision() - - @failtrap - def deploy(self): + + #TODO : The order of the connection is important for now ! + # Need to change the code of wait local port + self.connection(self.endpoint2, self.endpoint1) + self.connection(self.endpoint1, self.endpoint2) + + def configure_route(self): + """ Configure the route for the tap device + + .. note : In case of a conection between a switch and a host, a route + was missing on the node with the Tap Device. This method create + the missing route. + """ + + if self.check_switch_host_link: + self._vroute = self.ec.register_resource("PlanetlabVroute") + self.ec.set(self._vroute, "action", "add") + self.ec.set(self._vroute, "network", self.get("network")) + + self.ec.register_connection(self._vroute, self.tap.guid) + self.ec.deploy(guids=[self._vroute], group = self.deployment_group) + + def do_deploy(self): + """ Deploy the tunnel after the endpoint get ready + """ if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): self.ec.schedule(reschedule_delay, self.deploy) - else: - self.discover() - self.provision() - - super(OVSTunnel, self).deploy() + return + + self.do_discover() + self.do_provision() + self.configure_route() + + # Cannot call the deploy of the linux application + # because of a log error. + # Need to investigate if it is right that the tunnel + # inherits from the linux application + # super(OVSTunnel, self).do_deploy() + self.set_ready() - def release(self): - """ Release the udp_tunnel on endpoint2. - On endpoint1 means nothing special. + def do_release(self): + """ Release the tunnel by releasing the Tap Device if exists """ - try: - if not self.check_endpoints(): - # Kill the TAP devices - # TODO: Make more generic Release method of PLTAP - if self._pid and self._ppid: - self._nodes = self.get_node(self.endpoint2) - (out, err), proc = self.node.kill(self._pid, - self._ppid, sudo = True) - if err or proc.poll(): - # check if execution errors occurred - msg = " Failed to delete TAP device" - self.error(msg, err, err) - self.fail() - except: - import traceback - err = traceback.format_exc() - self.error(err) + if self.check_switch_host_link: + # TODO: Make more generic Release method of PLTAP + tap_node = self.get_node(self.endpoint2) + if self._pid and self._ppid: + (out, err), proc = tap_node.kill(self._pid, + self._ppid, sudo = True) - super(OVSTunnel, self).release() + if err or proc.poll(): + msg = " Failed to delete TAP device" + self.error(msg, out, err) + super(OVSTunnel, self).do_release()