From 352d4735755a77d58af16690e1c1af6c7447789a Mon Sep 17 00:00:00 2001 From: Scott Baker Date: Thu, 14 Aug 2014 16:10:59 -0700 Subject: [PATCH] nat support for neutron --- .../1:2013.2.2-0ubuntu1~cloud0/nat.py | 50 ++ .../1:2013.2.2-0ubuntu1~cloud0/ovs_db_v2.py | 425 +++++++++++ .../ovs_models_v2.py | 118 +++ .../ovs_neutron_plugin.py | 697 ++++++++++++++++++ 4 files changed, 1290 insertions(+) create mode 100644 planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/nat.py create mode 100644 planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_db_v2.py create mode 100644 planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_models_v2.py create mode 100644 planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_neutron_plugin.py diff --git a/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/nat.py b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/nat.py new file mode 100644 index 0000000..04e39f1 --- /dev/null +++ b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/nat.py @@ -0,0 +1,50 @@ +from neutron.api.v2 import attributes + +FORWARD_PORTS = 'nat:forward_ports' + +EXTENDED_ATTRIBUTES_2_0 = { + 'ports': { + FORWARD_PORTS: {'allow_post': True, 'allow_put': True, + 'default': attributes.ATTR_NOT_SPECIFIED, + 'is_visible': True}, + } +} + + +class Nat(object): + """Extension class supporting OpenCloud NAT networking + + This class is used by Quantum's extension framework to make + metadata about the OpenCloud Port extension available to + clients. No new resources are defined by this extension. Instead, + the existing Port resource's request and response messages are + extended with attributes in the OpenCloud namespace. + """ + + @classmethod + def get_name(cls): + return "OpenCloud NAT Networking Extension" + + @classmethod + def get_alias(cls): + return "nat" + + @classmethod + def get_description(cls): + return "Add TCP/UDP port forwarding through NAT to Quantum Port objects" + + @classmethod + def get_namespace(cls): + # return "http://docs.openstack.org/ext/provider/api/v1.0" + # Nothing there right now + return "http://www.vicci.org/ext/opencloud/nat/api/v0.1" + + @classmethod + def get_updated(cls): + return "2013-09-12T10:00:00-00:00" + + def get_extended_resources(self, version): + if version == "2.0": + return EXTENDED_ATTRIBUTES_2_0 + else: + return {} diff --git a/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_db_v2.py b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_db_v2.py new file mode 100644 index 0000000..39cf315 --- /dev/null +++ b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_db_v2.py @@ -0,0 +1,425 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright 2011 Nicira Networks, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Aaron Rosen, Nicira Networks, Inc. +# @author: Bob Kukura, Red Hat, Inc. + +from sqlalchemy import func +from sqlalchemy.orm import exc + +from neutron.common import exceptions as q_exc +import neutron.db.api as db +from neutron.db import models_v2 +from neutron.db import securitygroups_db as sg_db +from neutron.extensions import securitygroup as ext_sg +from neutron import manager +from neutron.openstack.common.db import exception as db_exc +from neutron.openstack.common import log as logging +from neutron.plugins.openvswitch.common import constants +from neutron.plugins.openvswitch import ovs_models_v2 + +LOG = logging.getLogger(__name__) + + +def initialize(): + db.configure_db() + + +def get_network_binding(session, network_id): + session = session or db.get_session() + try: + binding = (session.query(ovs_models_v2.NetworkBinding). + filter_by(network_id=network_id). + one()) + return binding + except exc.NoResultFound: + return + + +def add_network_binding(session, network_id, network_type, + physical_network, segmentation_id): + with session.begin(subtransactions=True): + binding = ovs_models_v2.NetworkBinding(network_id, network_type, + physical_network, + segmentation_id) + session.add(binding) + +def get_port_forwarding(session, port_id): + session = session or db.get_session() + try: + forward = (session.query(ovs_models_v2.PortForwarding). + filter_by(port_id=port_id).one()) + return forward['forward_ports'] + except exc.NoResultFound: + return + +def clear_port_forwarding(session, port_id): + with session.begin(subtransactions=True): + try: + # Get rid of old port bindings + forward = (session.query(ovs_models_v2.PortForwarding). + filter_by(port_id=port_id).one()) + if forward: + session.delete(forward) + except exc.NoResultFound: + pass + +def add_port_forwarding(session, port_id, forward_ports): + with session.begin(subtransactions=True): + forward = ovs_models_v2.PortForwarding(port_id, forward_ports) + session.add(forward) + +def sync_vlan_allocations(network_vlan_ranges): + """Synchronize vlan_allocations table with configured VLAN ranges.""" + + session = db.get_session() + with session.begin(): + # get existing allocations for all physical networks + allocations = dict() + allocs = (session.query(ovs_models_v2.VlanAllocation). + all()) + for alloc in allocs: + if alloc.physical_network not in allocations: + allocations[alloc.physical_network] = set() + allocations[alloc.physical_network].add(alloc) + + # process vlan ranges for each configured physical network + for physical_network, vlan_ranges in network_vlan_ranges.iteritems(): + # determine current configured allocatable vlans for this + # physical network + vlan_ids = set() + for vlan_range in vlan_ranges: + vlan_ids |= set(xrange(vlan_range[0], vlan_range[1] + 1)) + + # remove from table unallocated vlans not currently allocatable + if physical_network in allocations: + for alloc in allocations[physical_network]: + try: + # see if vlan is allocatable + vlan_ids.remove(alloc.vlan_id) + except KeyError: + # it's not allocatable, so check if its allocated + if not alloc.allocated: + # it's not, so remove it from table + LOG.debug(_("Removing vlan %(vlan_id)s on " + "physical network " + "%(physical_network)s from pool"), + {'vlan_id': alloc.vlan_id, + 'physical_network': physical_network}) + session.delete(alloc) + del allocations[physical_network] + + # add missing allocatable vlans to table + for vlan_id in sorted(vlan_ids): + alloc = ovs_models_v2.VlanAllocation(physical_network, vlan_id) + session.add(alloc) + + # remove from table unallocated vlans for any unconfigured physical + # networks + for allocs in allocations.itervalues(): + for alloc in allocs: + if not alloc.allocated: + LOG.debug(_("Removing vlan %(vlan_id)s on physical " + "network %(physical_network)s from pool"), + {'vlan_id': alloc.vlan_id, + 'physical_network': alloc.physical_network}) + session.delete(alloc) + + +def get_vlan_allocation(physical_network, vlan_id): + session = db.get_session() + try: + alloc = (session.query(ovs_models_v2.VlanAllocation). + filter_by(physical_network=physical_network, + vlan_id=vlan_id). + one()) + return alloc + except exc.NoResultFound: + return + + +def reserve_vlan(session): + with session.begin(subtransactions=True): + alloc = (session.query(ovs_models_v2.VlanAllocation). + filter_by(allocated=False). + with_lockmode('update'). + first()) + if alloc: + LOG.debug(_("Reserving vlan %(vlan_id)s on physical network " + "%(physical_network)s from pool"), + {'vlan_id': alloc.vlan_id, + 'physical_network': alloc.physical_network}) + alloc.allocated = True + return (alloc.physical_network, alloc.vlan_id) + raise q_exc.NoNetworkAvailable() + + +def reserve_specific_vlan(session, physical_network, vlan_id): + with session.begin(subtransactions=True): + try: + alloc = (session.query(ovs_models_v2.VlanAllocation). + filter_by(physical_network=physical_network, + vlan_id=vlan_id). + with_lockmode('update'). + one()) + if alloc.allocated: + if vlan_id == constants.FLAT_VLAN_ID: + raise q_exc.FlatNetworkInUse( + physical_network=physical_network) + else: + raise q_exc.VlanIdInUse(vlan_id=vlan_id, + physical_network=physical_network) + LOG.debug(_("Reserving specific vlan %(vlan_id)s on physical " + "network %(physical_network)s from pool"), + {'vlan_id': vlan_id, + 'physical_network': physical_network}) + alloc.allocated = True + except exc.NoResultFound: + LOG.debug(_("Reserving specific vlan %(vlan_id)s on physical " + "network %(physical_network)s outside pool"), + {'vlan_id': vlan_id, + 'physical_network': physical_network}) + alloc = ovs_models_v2.VlanAllocation(physical_network, vlan_id) + alloc.allocated = True + session.add(alloc) + + +def release_vlan(session, physical_network, vlan_id, network_vlan_ranges): + with session.begin(subtransactions=True): + try: + alloc = (session.query(ovs_models_v2.VlanAllocation). + filter_by(physical_network=physical_network, + vlan_id=vlan_id). + with_lockmode('update'). + one()) + alloc.allocated = False + inside = False + for vlan_range in network_vlan_ranges.get(physical_network, []): + if vlan_id >= vlan_range[0] and vlan_id <= vlan_range[1]: + inside = True + break + if not inside: + session.delete(alloc) + LOG.debug(_("Releasing vlan %(vlan_id)s on physical network " + "%(physical_network)s outside pool"), + {'vlan_id': vlan_id, + 'physical_network': physical_network}) + else: + LOG.debug(_("Releasing vlan %(vlan_id)s on physical network " + "%(physical_network)s to pool"), + {'vlan_id': vlan_id, + 'physical_network': physical_network}) + except exc.NoResultFound: + LOG.warning(_("vlan_id %(vlan_id)s on physical network " + "%(physical_network)s not found"), + {'vlan_id': vlan_id, + 'physical_network': physical_network}) + + +def sync_tunnel_allocations(tunnel_id_ranges): + """Synchronize tunnel_allocations table with configured tunnel ranges.""" + + # determine current configured allocatable tunnels + tunnel_ids = set() + for tunnel_id_range in tunnel_id_ranges: + tun_min, tun_max = tunnel_id_range + if tun_max + 1 - tun_min > 1000000: + LOG.error(_("Skipping unreasonable tunnel ID range " + "%(tun_min)s:%(tun_max)s"), + {'tun_min': tun_min, 'tun_max': tun_max}) + else: + tunnel_ids |= set(xrange(tun_min, tun_max + 1)) + + session = db.get_session() + with session.begin(): + # remove from table unallocated tunnels not currently allocatable + allocs = (session.query(ovs_models_v2.TunnelAllocation). + all()) + for alloc in allocs: + try: + # see if tunnel is allocatable + tunnel_ids.remove(alloc.tunnel_id) + except KeyError: + # it's not allocatable, so check if its allocated + if not alloc.allocated: + # it's not, so remove it from table + LOG.debug(_("Removing tunnel %s from pool"), + alloc.tunnel_id) + session.delete(alloc) + + # add missing allocatable tunnels to table + for tunnel_id in sorted(tunnel_ids): + alloc = ovs_models_v2.TunnelAllocation(tunnel_id) + session.add(alloc) + + +def get_tunnel_allocation(tunnel_id): + session = db.get_session() + try: + alloc = (session.query(ovs_models_v2.TunnelAllocation). + filter_by(tunnel_id=tunnel_id). + with_lockmode('update'). + one()) + return alloc + except exc.NoResultFound: + return + + +def reserve_tunnel(session): + with session.begin(subtransactions=True): + alloc = (session.query(ovs_models_v2.TunnelAllocation). + filter_by(allocated=False). + with_lockmode('update'). + first()) + if alloc: + LOG.debug(_("Reserving tunnel %s from pool"), alloc.tunnel_id) + alloc.allocated = True + return alloc.tunnel_id + raise q_exc.NoNetworkAvailable() + + +def reserve_specific_tunnel(session, tunnel_id): + with session.begin(subtransactions=True): + try: + alloc = (session.query(ovs_models_v2.TunnelAllocation). + filter_by(tunnel_id=tunnel_id). + with_lockmode('update'). + one()) + if alloc.allocated: + raise q_exc.TunnelIdInUse(tunnel_id=tunnel_id) + LOG.debug(_("Reserving specific tunnel %s from pool"), tunnel_id) + alloc.allocated = True + except exc.NoResultFound: + LOG.debug(_("Reserving specific tunnel %s outside pool"), + tunnel_id) + alloc = ovs_models_v2.TunnelAllocation(tunnel_id) + alloc.allocated = True + session.add(alloc) + + +def release_tunnel(session, tunnel_id, tunnel_id_ranges): + with session.begin(subtransactions=True): + try: + alloc = (session.query(ovs_models_v2.TunnelAllocation). + filter_by(tunnel_id=tunnel_id). + with_lockmode('update'). + one()) + alloc.allocated = False + inside = False + for tunnel_id_range in tunnel_id_ranges: + if (tunnel_id >= tunnel_id_range[0] + and tunnel_id <= tunnel_id_range[1]): + inside = True + break + if not inside: + session.delete(alloc) + LOG.debug(_("Releasing tunnel %s outside pool"), tunnel_id) + else: + LOG.debug(_("Releasing tunnel %s to pool"), tunnel_id) + except exc.NoResultFound: + LOG.warning(_("tunnel_id %s not found"), tunnel_id) + + +def get_port(port_id): + session = db.get_session() + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + except exc.NoResultFound: + port = None + return port + + +def get_port_from_device(port_id): + """Get port from database.""" + LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id) + session = db.get_session() + sg_binding_port = sg_db.SecurityGroupPortBinding.port_id + + query = session.query(models_v2.Port, + sg_db.SecurityGroupPortBinding.security_group_id) + query = query.outerjoin(sg_db.SecurityGroupPortBinding, + models_v2.Port.id == sg_binding_port) + query = query.filter(models_v2.Port.id == port_id) + port_and_sgs = query.all() + if not port_and_sgs: + return None + port = port_and_sgs[0][0] + plugin = manager.NeutronManager.get_plugin() + port_dict = plugin._make_port_dict(port) + port_dict[ext_sg.SECURITYGROUPS] = [ + sg_id for port_, sg_id in port_and_sgs if sg_id] + port_dict['security_group_rules'] = [] + port_dict['security_group_source_groups'] = [] + port_dict['fixed_ips'] = [ip['ip_address'] + for ip in port['fixed_ips']] + return port_dict + + +def set_port_status(port_id, status): + session = db.get_session() + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + port['status'] = status + session.merge(port) + session.flush() + except exc.NoResultFound: + raise q_exc.PortNotFound(port_id=port_id) + + +def get_tunnel_endpoints(): + session = db.get_session() + + tunnels = session.query(ovs_models_v2.TunnelEndpoint) + return [{'id': tunnel.id, + 'ip_address': tunnel.ip_address} for tunnel in tunnels] + + +def _generate_tunnel_id(session): + max_tunnel_id = session.query( + func.max(ovs_models_v2.TunnelEndpoint.id)).scalar() or 0 + return max_tunnel_id + 1 + + +def add_tunnel_endpoint(ip, max_retries=10): + """Return the endpoint of the given IP address or generate a new one.""" + + # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a + # repeatedly executed transactional block to ensure it + # doesn't conflict with any other concurrently executed + # DB transactions in spite of the specified transactions + # isolation level value + for i in xrange(max_retries): + LOG.debug(_('Adding a tunnel endpoint for %s'), ip) + try: + session = db.get_session() + with session.begin(subtransactions=True): + tunnel = (session.query(ovs_models_v2.TunnelEndpoint). + filter_by(ip_address=ip).with_lockmode('update'). + first()) + + if tunnel is None: + tunnel_id = _generate_tunnel_id(session) + tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id) + session.add(tunnel) + + return tunnel + except db_exc.DBDuplicateEntry: + # a concurrent transaction has been commited, try again + LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent' + 'transaction had been commited (%s attempts left)'), + max_retries - (i + 1)) + + raise q_exc.NeutronException( + message=_('Unable to generate a new tunnel id')) diff --git a/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_models_v2.py b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_models_v2.py new file mode 100644 index 0000000..7e022f5 --- /dev/null +++ b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_models_v2.py @@ -0,0 +1,118 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright 2011 Nicira Networks, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Aaron Rosen, Nicira Networks, Inc. +# @author: Bob Kukura, Red Hat, Inc. + + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, PickleType +from sqlalchemy.schema import UniqueConstraint + +from neutron.db.models_v2 import model_base + + +class VlanAllocation(model_base.BASEV2): + """Represents allocation state of vlan_id on physical network.""" + __tablename__ = 'ovs_vlan_allocations' + + physical_network = Column(String(64), nullable=False, primary_key=True) + vlan_id = Column(Integer, nullable=False, primary_key=True, + autoincrement=False) + allocated = Column(Boolean, nullable=False) + + def __init__(self, physical_network, vlan_id): + self.physical_network = physical_network + self.vlan_id = vlan_id + self.allocated = False + + def __repr__(self): + return "" % (self.physical_network, + self.vlan_id, self.allocated) + + +class TunnelAllocation(model_base.BASEV2): + """Represents allocation state of tunnel_id.""" + __tablename__ = 'ovs_tunnel_allocations' + + tunnel_id = Column(Integer, nullable=False, primary_key=True, + autoincrement=False) + allocated = Column(Boolean, nullable=False) + + def __init__(self, tunnel_id): + self.tunnel_id = tunnel_id + self.allocated = False + + def __repr__(self): + return "" % (self.tunnel_id, self.allocated) + + +class NetworkBinding(model_base.BASEV2): + """Represents binding of virtual network to physical realization.""" + __tablename__ = 'ovs_network_bindings' + + network_id = Column(String(36), + ForeignKey('networks.id', ondelete="CASCADE"), + primary_key=True) + # 'gre', 'vlan', 'flat', 'local' + network_type = Column(String(32), nullable=False) + physical_network = Column(String(64)) + segmentation_id = Column(Integer) # tunnel_id or vlan_id + + def __init__(self, network_id, network_type, physical_network, + segmentation_id): + self.network_id = network_id + self.network_type = network_type + self.physical_network = physical_network + self.segmentation_id = segmentation_id + + def __repr__(self): + return "" % (self.network_id, + self.network_type, + self.physical_network, + self.segmentation_id) + +class PortForwarding(model_base.BASEV2): + """Ports to be forwarded through NAT """ + __tablename__ = 'ovs_port_forwarding' + + port_id = Column(String(36), + ForeignKey('ports.id', ondelete="CASCADE"), + primary_key=True) + forward_ports = Column(PickleType) + + def __init__(self, port_id, forward_ports): + self.port_id = port_id + self.forward_ports = forward_ports + + def __repr__(self): + return "" % (self.port_id, self.forward_ports) + +class TunnelEndpoint(model_base.BASEV2): + """Represents tunnel endpoint in RPC mode.""" + __tablename__ = 'ovs_tunnel_endpoints' + __table_args__ = ( + UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'), + ) + + ip_address = Column(String(64), primary_key=True) + id = Column(Integer, nullable=False) + + def __init__(self, ip_address, id): + self.ip_address = ip_address + self.id = id + + def __repr__(self): + return "" % (self.ip_address, self.id) + diff --git a/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_neutron_plugin.py b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_neutron_plugin.py new file mode 100644 index 0000000..cacf165 --- /dev/null +++ b/planetstack/neutron_extension/1:2013.2.2-0ubuntu1~cloud0/ovs_neutron_plugin.py @@ -0,0 +1,697 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright 2011 Nicira Networks, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Somik Behera, Nicira Networks, Inc. +# @author: Brad Hall, Nicira Networks, Inc. +# @author: Dan Wendlandt, Nicira Networks, Inc. +# @author: Dave Lapsley, Nicira Networks, Inc. +# @author: Aaron Rosen, Nicira Networks, Inc. +# @author: Bob Kukura, Red Hat, Inc. +# @author: Seetharama Ayyadevara, Freescale Semiconductor, Inc. + +import sys + +from oslo.config import cfg + +from neutron.agent import securitygroups_rpc as sg_rpc +from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api +from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api +from neutron.api.v2 import attributes +from neutron.common import constants as q_const +from neutron.common import exceptions as q_exc +from neutron.common import rpc as q_rpc +from neutron.common import topics +from neutron.common import utils +from neutron.db import agents_db +from neutron.db import agentschedulers_db +from neutron.db import allowedaddresspairs_db as addr_pair_db +from neutron.db import db_base_plugin_v2 +from neutron.db import dhcp_rpc_base +from neutron.db import external_net_db +from neutron.db import extradhcpopt_db +from neutron.db import extraroute_db +from neutron.db import l3_agentschedulers_db +from neutron.db import l3_gwmode_db +from neutron.db import l3_rpc_base +from neutron.db import portbindings_db +from neutron.db import quota_db # noqa +from neutron.db import securitygroups_rpc_base as sg_db_rpc +from neutron.extensions import allowedaddresspairs as addr_pair +from neutron.extensions import extra_dhcp_opt as edo_ext +from neutron.extensions import portbindings +from neutron.extensions import providernet as provider +from neutron.extensions import nat +from neutron import manager +from neutron.openstack.common import importutils +from neutron.openstack.common import log as logging +from neutron.openstack.common import rpc +from neutron.openstack.common.rpc import proxy +from neutron.plugins.common import constants as svc_constants +from neutron.plugins.common import utils as plugin_utils +from neutron.plugins.openvswitch.common import config # noqa +from neutron.plugins.openvswitch.common import constants +from neutron.plugins.openvswitch import ovs_db_v2 + + +LOG = logging.getLogger(__name__) + + +class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, + l3_rpc_base.L3RpcCallbackMixin, + sg_db_rpc.SecurityGroupServerRpcCallbackMixin): + + # history + # 1.0 Initial version + # 1.1 Support Security Group RPC + + RPC_API_VERSION = '1.1' + + def __init__(self, notifier, tunnel_type): + self.notifier = notifier + self.tunnel_type = tunnel_type + + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return q_rpc.PluginRpcDispatcher([self, + agents_db.AgentExtRpcCallback()]) + + @classmethod + def get_port_from_device(cls, device): + port = ovs_db_v2.get_port_from_device(device) + if port: + port['device'] = device + return port + + def get_device_details(self, rpc_context, **kwargs): + """Agent requests device details.""" + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + LOG.debug(_("Device %(device)s details requested from %(agent_id)s"), + {'device': device, 'agent_id': agent_id}) + port = ovs_db_v2.get_port(device) + if port: + binding = ovs_db_v2.get_network_binding(None, port['network_id']) + entry = {'device': device, + 'network_id': port['network_id'], + 'port_id': port['id'], + 'admin_state_up': port['admin_state_up'], + 'network_type': binding.network_type, + 'segmentation_id': binding.segmentation_id, + 'physical_network': binding.physical_network} + new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up'] + else q_const.PORT_STATUS_DOWN) + if port['status'] != new_status: + ovs_db_v2.set_port_status(port['id'], new_status) + else: + entry = {'device': device} + LOG.debug(_("%s can not be found in database"), device) + return entry + + def update_device_down(self, rpc_context, **kwargs): + """Device no longer exists on agent.""" + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + host = kwargs.get('host') + port = ovs_db_v2.get_port(device) + LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), + {'device': device, 'agent_id': agent_id}) + if port: + entry = {'device': device, + 'exists': True} + plugin = manager.NeutronManager.get_plugin() + if (host and + not plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + elif port['status'] != q_const.PORT_STATUS_DOWN: + # Set port status to DOWN + ovs_db_v2.set_port_status(port['id'], + q_const.PORT_STATUS_DOWN) + else: + entry = {'device': device, + 'exists': False} + LOG.debug(_("%s can not be found in database"), device) + return entry + + def update_device_up(self, rpc_context, **kwargs): + """Device is up on agent.""" + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + host = kwargs.get('host') + port = ovs_db_v2.get_port(device) + LOG.debug(_("Device %(device)s up on %(agent_id)s"), + {'device': device, 'agent_id': agent_id}) + plugin = manager.NeutronManager.get_plugin() + if port: + if (host and + not plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + return + elif port['status'] != q_const.PORT_STATUS_ACTIVE: + ovs_db_v2.set_port_status(port['id'], + q_const.PORT_STATUS_ACTIVE) + else: + LOG.debug(_("%s can not be found in database"), device) + + def tunnel_sync(self, rpc_context, **kwargs): + """Update new tunnel. + + Updates the datbase with the tunnel IP. All listening agents will also + be notified about the new tunnel IP. + """ + tunnel_ip = kwargs.get('tunnel_ip') + # Update the database with the IP + tunnel = ovs_db_v2.add_tunnel_endpoint(tunnel_ip) + tunnels = ovs_db_v2.get_tunnel_endpoints() + entry = dict() + entry['tunnels'] = tunnels + # Notify all other listening agents + self.notifier.tunnel_update(rpc_context, tunnel.ip_address, + tunnel.id, self.tunnel_type) + # Return the list of tunnels IP's to the agent + return entry + + +class AgentNotifierApi(proxy.RpcProxy, + sg_rpc.SecurityGroupAgentRpcApiMixin): + '''Agent side of the openvswitch rpc API. + + API version history: + 1.0 - Initial version. + + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(AgentNotifierApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic_network_delete = topics.get_topic_name(topic, + topics.NETWORK, + topics.DELETE) + self.topic_port_update = topics.get_topic_name(topic, + topics.PORT, + topics.UPDATE) + self.topic_tunnel_update = topics.get_topic_name(topic, + constants.TUNNEL, + topics.UPDATE) + + def network_delete(self, context, network_id): + self.fanout_cast(context, + self.make_msg('network_delete', + network_id=network_id), + topic=self.topic_network_delete) + + def port_update(self, context, port, network_type, segmentation_id, + physical_network): + self.fanout_cast(context, + self.make_msg('port_update', + port=port, + network_type=network_type, + segmentation_id=segmentation_id, + physical_network=physical_network), + topic=self.topic_port_update) + + def tunnel_update(self, context, tunnel_ip, tunnel_id, tunnel_type): + self.fanout_cast(context, + self.make_msg('tunnel_update', + tunnel_ip=tunnel_ip, + tunnel_id=tunnel_id, + tunnel_type=tunnel_type), + topic=self.topic_tunnel_update) + + +class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, + external_net_db.External_net_db_mixin, + extraroute_db.ExtraRoute_db_mixin, + l3_gwmode_db.L3_NAT_db_mixin, + sg_db_rpc.SecurityGroupServerRpcMixin, + l3_agentschedulers_db.L3AgentSchedulerDbMixin, + agentschedulers_db.DhcpAgentSchedulerDbMixin, + portbindings_db.PortBindingMixin, + extradhcpopt_db.ExtraDhcpOptMixin, + addr_pair_db.AllowedAddressPairsMixin): + + """Implement the Neutron abstractions using Open vSwitch. + + Depending on whether tunneling is enabled, either a GRE, VXLAN tunnel or + a new VLAN is created for each network. An agent is relied upon to + perform the actual OVS configuration on each host. + + The provider extension is also supported. As discussed in + https://bugs.launchpad.net/neutron/+bug/1023156, this class could + be simplified, and filtering on extended attributes could be + handled, by adding support for extended attributes to the + NeutronDbPluginV2 base class. When that occurs, this class should + be updated to take advantage of it. + + The port binding extension enables an external application relay + information to and from the plugin. + """ + + # This attribute specifies whether the plugin supports or not + # bulk/pagination/sorting operations. Name mangling is used in + # order to ensure it is qualified by class + __native_bulk_support = True + __native_pagination_support = True + __native_sorting_support = True + + _supported_extension_aliases = ["provider", "external-net", "router", + "ext-gw-mode", "binding", "quotas", + "security-group", "agent", "extraroute", + "l3_agent_scheduler", + "dhcp_agent_scheduler", + "extra_dhcp_opt", + "allowed-address-pairs", + "nat"] + + @property + def supported_extension_aliases(self): + if not hasattr(self, '_aliases'): + aliases = self._supported_extension_aliases[:] + sg_rpc.disable_security_group_extension_if_noop_driver(aliases) + self._aliases = aliases + return self._aliases + + def __init__(self, configfile=None): + self.base_binding_dict = { + portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS, + portbindings.CAPABILITIES: { + portbindings.CAP_PORT_FILTER: + 'security-group' in self.supported_extension_aliases}} + ovs_db_v2.initialize() + self._parse_network_vlan_ranges() + ovs_db_v2.sync_vlan_allocations(self.network_vlan_ranges) + self.tenant_network_type = cfg.CONF.OVS.tenant_network_type + if self.tenant_network_type not in [constants.TYPE_LOCAL, + constants.TYPE_VLAN, + constants.TYPE_GRE, + constants.TYPE_VXLAN, + constants.TYPE_NONE]: + LOG.error(_("Invalid tenant_network_type: %s. " + "Server terminated!"), + self.tenant_network_type) + sys.exit(1) + self.enable_tunneling = cfg.CONF.OVS.enable_tunneling + self.tunnel_type = None + if self.enable_tunneling: + self.tunnel_type = cfg.CONF.OVS.tunnel_type or constants.TYPE_GRE + elif cfg.CONF.OVS.tunnel_type: + self.tunnel_type = cfg.CONF.OVS.tunnel_type + self.enable_tunneling = True + self.tunnel_id_ranges = [] + if self.enable_tunneling: + self._parse_tunnel_id_ranges() + ovs_db_v2.sync_tunnel_allocations(self.tunnel_id_ranges) + elif self.tenant_network_type in constants.TUNNEL_NETWORK_TYPES: + LOG.error(_("Tunneling disabled but tenant_network_type is '%s'. " + "Server terminated!"), self.tenant_network_type) + sys.exit(1) + self.setup_rpc() + self.network_scheduler = importutils.import_object( + cfg.CONF.network_scheduler_driver + ) + self.router_scheduler = importutils.import_object( + cfg.CONF.router_scheduler_driver + ) + + def setup_rpc(self): + # RPC support + self.service_topics = {svc_constants.CORE: topics.PLUGIN, + svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} + self.conn = rpc.create_connection(new=True) + self.notifier = AgentNotifierApi(topics.AGENT) + self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) + self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + for svc_topic in self.service_topics.values(): + self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def _parse_network_vlan_ranges(self): + try: + self.network_vlan_ranges = plugin_utils.parse_network_vlan_ranges( + cfg.CONF.OVS.network_vlan_ranges) + except Exception as ex: + LOG.error(_("%s. Server terminated!"), ex) + sys.exit(1) + LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges) + + def _parse_tunnel_id_ranges(self): + for entry in cfg.CONF.OVS.tunnel_id_ranges: + entry = entry.strip() + try: + tun_min, tun_max = entry.split(':') + self.tunnel_id_ranges.append((int(tun_min), int(tun_max))) + except ValueError as ex: + LOG.error(_("Invalid tunnel ID range: " + "'%(range)s' - %(e)s. Server terminated!"), + {'range': entry, 'e': ex}) + sys.exit(1) + LOG.info(_("Tunnel ID ranges: %s"), self.tunnel_id_ranges) + + def _extend_network_dict_provider(self, context, network): + binding = ovs_db_v2.get_network_binding(context.session, + network['id']) + network[provider.NETWORK_TYPE] = binding.network_type + if binding.network_type in constants.TUNNEL_NETWORK_TYPES: + network[provider.PHYSICAL_NETWORK] = None + network[provider.SEGMENTATION_ID] = binding.segmentation_id + elif binding.network_type == constants.TYPE_FLAT: + network[provider.PHYSICAL_NETWORK] = binding.physical_network + network[provider.SEGMENTATION_ID] = None + elif binding.network_type == constants.TYPE_VLAN: + network[provider.PHYSICAL_NETWORK] = binding.physical_network + network[provider.SEGMENTATION_ID] = binding.segmentation_id + elif binding.network_type == constants.TYPE_LOCAL: + network[provider.PHYSICAL_NETWORK] = None + network[provider.SEGMENTATION_ID] = None + + def _process_provider_create(self, context, attrs): + network_type = attrs.get(provider.NETWORK_TYPE) + physical_network = attrs.get(provider.PHYSICAL_NETWORK) + segmentation_id = attrs.get(provider.SEGMENTATION_ID) + + network_type_set = attributes.is_attr_set(network_type) + physical_network_set = attributes.is_attr_set(physical_network) + segmentation_id_set = attributes.is_attr_set(segmentation_id) + + if not (network_type_set or physical_network_set or + segmentation_id_set): + return (None, None, None) + + if not network_type_set: + msg = _("provider:network_type required") + raise q_exc.InvalidInput(error_message=msg) + elif network_type == constants.TYPE_FLAT: + if segmentation_id_set: + msg = _("provider:segmentation_id specified for flat network") + raise q_exc.InvalidInput(error_message=msg) + else: + segmentation_id = constants.FLAT_VLAN_ID + elif network_type == constants.TYPE_VLAN: + if not segmentation_id_set: + msg = _("provider:segmentation_id required") + raise q_exc.InvalidInput(error_message=msg) + if not utils.is_valid_vlan_tag(segmentation_id): + msg = (_("provider:segmentation_id out of range " + "(%(min_id)s through %(max_id)s)") % + {'min_id': q_const.MIN_VLAN_TAG, + 'max_id': q_const.MAX_VLAN_TAG}) + raise q_exc.InvalidInput(error_message=msg) + elif network_type in constants.TUNNEL_NETWORK_TYPES: + if not self.enable_tunneling: + msg = _("%s networks are not enabled") % network_type + raise q_exc.InvalidInput(error_message=msg) + if physical_network_set: + msg = _("provider:physical_network specified for %s " + "network") % network_type + raise q_exc.InvalidInput(error_message=msg) + else: + physical_network = None + if not segmentation_id_set: + msg = _("provider:segmentation_id required") + raise q_exc.InvalidInput(error_message=msg) + elif network_type == constants.TYPE_LOCAL: + if physical_network_set: + msg = _("provider:physical_network specified for local " + "network") + raise q_exc.InvalidInput(error_message=msg) + else: + physical_network = None + if segmentation_id_set: + msg = _("provider:segmentation_id specified for local " + "network") + raise q_exc.InvalidInput(error_message=msg) + else: + segmentation_id = None + else: + msg = _("provider:network_type %s not supported") % network_type + raise q_exc.InvalidInput(error_message=msg) + + if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]: + if physical_network_set: + if physical_network not in self.network_vlan_ranges: + msg = _("Unknown provider:physical_network " + "%s") % physical_network + raise q_exc.InvalidInput(error_message=msg) + elif 'default' in self.network_vlan_ranges: + physical_network = 'default' + else: + msg = _("provider:physical_network required") + raise q_exc.InvalidInput(error_message=msg) + + return (network_type, physical_network, segmentation_id) + + def create_network(self, context, network): + (network_type, physical_network, + segmentation_id) = self._process_provider_create(context, + network['network']) + + session = context.session + #set up default security groups + tenant_id = self._get_tenant_id_for_create( + context, network['network']) + self._ensure_default_security_group(context, tenant_id) + + with session.begin(subtransactions=True): + if not network_type: + # tenant network + network_type = self.tenant_network_type + if network_type == constants.TYPE_NONE: + raise q_exc.TenantNetworksDisabled() + elif network_type == constants.TYPE_VLAN: + (physical_network, + segmentation_id) = ovs_db_v2.reserve_vlan(session) + elif network_type in constants.TUNNEL_NETWORK_TYPES: + segmentation_id = ovs_db_v2.reserve_tunnel(session) + # no reservation needed for TYPE_LOCAL + else: + # provider network + if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]: + ovs_db_v2.reserve_specific_vlan(session, physical_network, + segmentation_id) + elif network_type in constants.TUNNEL_NETWORK_TYPES: + ovs_db_v2.reserve_specific_tunnel(session, segmentation_id) + # no reservation needed for TYPE_LOCAL + net = super(OVSNeutronPluginV2, self).create_network(context, + network) + ovs_db_v2.add_network_binding(session, net['id'], network_type, + physical_network, segmentation_id) + + self._process_l3_create(context, net, network['network']) + self._extend_network_dict_provider(context, net) + # note - exception will rollback entire transaction + LOG.debug(_("Created network: %s"), net['id']) + return net + + def update_network(self, context, id, network): + provider._raise_if_updates_provider_attributes(network['network']) + + session = context.session + with session.begin(subtransactions=True): + net = super(OVSNeutronPluginV2, self).update_network(context, id, + network) + self._process_l3_update(context, net, network['network']) + self._extend_network_dict_provider(context, net) + return net + + def delete_network(self, context, id): + session = context.session + with session.begin(subtransactions=True): + binding = ovs_db_v2.get_network_binding(session, id) + super(OVSNeutronPluginV2, self).delete_network(context, id) + if binding.network_type in constants.TUNNEL_NETWORK_TYPES: + ovs_db_v2.release_tunnel(session, binding.segmentation_id, + self.tunnel_id_ranges) + elif binding.network_type in [constants.TYPE_VLAN, + constants.TYPE_FLAT]: + ovs_db_v2.release_vlan(session, binding.physical_network, + binding.segmentation_id, + self.network_vlan_ranges) + # the network_binding record is deleted via cascade from + # the network record, so explicit removal is not necessary + self.notifier.network_delete(context, id) + + def get_network(self, context, id, fields=None): + session = context.session + with session.begin(subtransactions=True): + net = super(OVSNeutronPluginV2, self).get_network(context, + id, None) + self._extend_network_dict_provider(context, net) + return self._fields(net, fields) + + def get_networks(self, context, filters=None, fields=None, + sorts=None, + limit=None, marker=None, page_reverse=False): + session = context.session + with session.begin(subtransactions=True): + nets = super(OVSNeutronPluginV2, + self).get_networks(context, filters, None, sorts, + limit, marker, page_reverse) + for net in nets: + self._extend_network_dict_provider(context, net) + + return [self._fields(net, fields) for net in nets] + + def create_port(self, context, port): + # Set port status as 'DOWN'. This will be updated by agent + port['port']['status'] = q_const.PORT_STATUS_DOWN + port_data = port['port'] + session = context.session + with session.begin(subtransactions=True): + self._ensure_default_security_group_on_port(context, port) + sgids = self._get_security_groups_on_port(context, port) + dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, []) + port = super(OVSNeutronPluginV2, self).create_port(context, port) + self._process_portbindings_create_and_update(context, + port_data, port) + self._process_port_create_security_group(context, port, sgids) + self._process_port_create_extra_dhcp_opts(context, port, + dhcp_opts) + port[addr_pair.ADDRESS_PAIRS] = ( + self._process_create_allowed_address_pairs( + context, port, + port_data.get(addr_pair.ADDRESS_PAIRS))) + self.notify_security_groups_member_updated(context, port) + return port + + def _extend_port_dict_nat(self, context, port): + forward = ovs_db_v2.get_port_forwarding(context.session, port['id']) + if forward: + port[nat.FORWARD_PORTS] = forward + else: + port[nat.FORWARD_PORTS] = None + + def _process_nat_update(self, context, attrs, id): + forward_ports = attrs.get(nat.FORWARD_PORTS) + forward_ports_set = attributes.is_attr_set(forward_ports) + + if not forward_ports_set: + return None + + # LOG.info("forward ports %s" % forward_ports) + valid_protocols = ["tcp", "udp"] + for entry in forward_ports: + if not isinstance(entry, dict): + msg = _("nat:forward_ports: must specify a list of dicts (ex: 'l4_protocol=tcp,l4_port=80')") + raise q_exc.InvalidInput(error_message=msg) + if not ("l4_protocol" in entry and "l4_port" in entry): + msg = _("nat:forward_ports: dict is missing l4_protocol and l4_port (ex: 'l4_protocol=tcp,l4_port=80')") + raise q_exc.InvalidInput(error_message=msg) + if entry['l4_protocol'] not in valid_protocols: + msg = _("nat:forward_ports: invalid protocol (only tcp and udp allowed)") + raise q_exc.InvalidInput(error_message=msg) + try: + l4_port = int(entry['l4_port']) + except: + msg = _("nat:forward_ports: l4_port must be an integer") + raise q_exc.InvalidInput(error_message=msg) + + return forward_ports + + def get_port(self, context, id, fields=None): + session = context.session + with session.begin(subtransactions=True): + port = super(OVSNeutronPluginV2, self).get_port(context, id, None) + self._extend_port_dict_nat(context, port) + return self._fields(port, fields) + + def get_ports(self, context, filters=None, fields=None): + session = context.session + with session.begin(subtransactions=True): + ports = super(OVSNeutronPluginV2, self).get_ports(context, filters, + None) + for port in ports: + self._extend_port_dict_nat(context, port) + + return [self._fields(port, fields) for port in ports] + + def update_port(self, context, id, port): + forward_ports = self._process_nat_update(context, port['port'], id) + + session = context.session + need_port_update_notify = False + changed_fixed_ips = 'fixed_ips' in port['port'] + with session.begin(subtransactions=True): + original_port = super(OVSNeutronPluginV2, self).get_port( + context, id) + updated_port = super(OVSNeutronPluginV2, self).update_port( + context, id, port) + if addr_pair.ADDRESS_PAIRS in port['port']: + self._delete_allowed_address_pairs(context, id) + self._process_create_allowed_address_pairs( + context, updated_port, + port['port'][addr_pair.ADDRESS_PAIRS]) + need_port_update_notify = True + elif changed_fixed_ips: + self._check_fixed_ips_and_address_pairs_no_overlap( + context, updated_port) + + if forward_ports: + ovs_db_v2.clear_port_forwarding(session, updated_port['id']) + ovs_db_v2.add_port_forwarding(session, updated_port['id'], forward_ports) + self._extend_port_dict_nat(context, updated_port) + + need_port_update_notify |= self.update_security_group_on_port( + context, id, port, original_port, updated_port) + self._process_portbindings_create_and_update(context, + port['port'], + updated_port) + need_port_update_notify |= self._update_extra_dhcp_opts_on_port( + context, id, port, updated_port) + + need_port_update_notify |= self.is_security_group_member_updated( + context, original_port, updated_port) + if original_port['admin_state_up'] != updated_port['admin_state_up']: + need_port_update_notify = True + + if need_port_update_notify: + binding = ovs_db_v2.get_network_binding(None, + updated_port['network_id']) + self.notifier.port_update(context, updated_port, + binding.network_type, + binding.segmentation_id, + binding.physical_network) + return updated_port + + def delete_port(self, context, id, l3_port_check=True): + + # if needed, check to see if this is a port owned by + # and l3-router. If so, we should prevent deletion. + if l3_port_check: + self.prevent_l3_port_deletion(context, id) + + session = context.session + with session.begin(subtransactions=True): + self.disassociate_floatingips(context, id) + port = self.get_port(context, id) + self._delete_port_security_group_bindings(context, id) + super(OVSNeutronPluginV2, self).delete_port(context, id) + + self.notify_security_groups_member_updated(context, port) -- 2.43.0