/* By default, choose a priority in the middle. */
#define NETDEV_RULE_PRIORITY 0x8000
+#define NR_THREADS 1
+
/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
/* Forwarding threads. */
struct latch exit_latch;
- struct dp_forwarder *forwarders;
- size_t n_forwarders;
+ struct pmd_thread *pmd_threads;
+ size_t n_pmd_threads;
+ int pmd_count;
};
static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
odp_port_t port_no;
struct netdev *netdev;
struct netdev_saved_flags *sf;
- struct netdev_rx *rx;
+ struct netdev_rxq **rxq;
+ struct ovs_refcount ref_cnt;
char *type; /* Port type as requested by user. */
};
const struct dp_netdev_flow *);
static void dp_netdev_actions_free(struct dp_netdev_actions *);
-/* A thread that receives packets from some ports, looks them up in the flow
- * table, and executes the actions it finds. */
-struct dp_forwarder {
+/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
+ * the performance overhead of interrupt processing. Therefore netdev can
+ * not implement rx-wait for these devices. dpif-netdev needs to poll
+ * these device to check for recv buffer. pmd-thread does polling for
+ * devices assigned to itself thread.
+ *
+ * DPDK used PMD for accessing NIC.
+ *
+ * A thread that receives packets from PMD ports, looks them up in the flow
+ * table, and executes the actions it finds.
+ **/
+struct pmd_thread {
struct dp_netdev *dp;
pthread_t thread;
+ int id;
+ atomic_uint change_seq;
char *name;
- uint32_t min_hash, max_hash;
};
/* Interface to netdev-based datapath. */
static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
int queue_no, int type,
const struct flow *,
- const struct nlattr *userdata)
- OVS_EXCLUDED(dp->queue_rwlock);
+ const struct nlattr *userdata);
static void dp_netdev_execute_actions(struct dp_netdev *dp,
const struct flow *, struct ofpbuf *, bool may_steal,
struct pkt_metadata *,
const struct nlattr *actions,
- size_t actions_len)
- OVS_REQ_RDLOCK(dp->port_rwlock);
+ size_t actions_len);
static void dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
- struct pkt_metadata *)
- OVS_REQ_RDLOCK(dp->port_rwlock);
-static void dp_netdev_set_threads(struct dp_netdev *, int n);
+ struct pkt_metadata *);
+
+static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
static struct dpif_netdev *
dpif_netdev_cast(const struct dpif *dpif)
dp_netdev_free(dp);
return error;
}
- dp_netdev_set_threads(dp, 2);
*dpp = dp;
return 0;
shash_find_and_delete(&dp_netdevs, dp->name);
- dp_netdev_set_threads(dp, 0);
- free(dp->forwarders);
+ dp_netdev_set_pmd_threads(dp, 0);
+ free(dp->pmd_threads);
dp_netdev_flow_flush(dp);
ovs_rwlock_wrlock(&dp->port_rwlock);
return 0;
}
+static void
+dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+{
+ int i;
+
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+ int id;
+
+ atomic_add(&f->change_seq, 1, &id);
+ }
+}
+
static int
do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
odp_port_t port_no)
struct netdev_saved_flags *sf;
struct dp_netdev_port *port;
struct netdev *netdev;
- struct netdev_rx *rx;
enum netdev_flags flags;
const char *open_type;
int error;
+ int i;
/* XXX reject devices already in some dp_netdev. */
return EINVAL;
}
- error = netdev_rx_open(netdev, &rx);
- if (error
- && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
- VLOG_ERR("%s: cannot receive packets on this network device (%s)",
- devname, ovs_strerror(errno));
- netdev_close(netdev);
- return error;
+ port = xzalloc(sizeof *port);
+ port->port_no = port_no;
+ port->netdev = netdev;
+ port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
+ port->type = xstrdup(type);
+ for (i = 0; i < netdev_n_rxq(netdev); i++) {
+ error = netdev_rxq_open(netdev, &port->rxq[i], i);
+ if (error
+ && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
+ VLOG_ERR("%s: cannot receive packets on this network device (%s)",
+ devname, ovs_strerror(errno));
+ netdev_close(netdev);
+ return error;
+ }
}
error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
if (error) {
- netdev_rx_close(rx);
+ for (i = 0; i < netdev_n_rxq(netdev); i++) {
+ netdev_rxq_close(port->rxq[i]);
+ }
netdev_close(netdev);
+ free(port->rxq);
+ free(port);
return error;
}
-
- port = xmalloc(sizeof *port);
- port->port_no = port_no;
- port->netdev = netdev;
port->sf = sf;
- port->rx = rx;
- port->type = xstrdup(type);
+
+ if (netdev_is_pmd(netdev)) {
+ dp->pmd_count++;
+ dp_netdev_set_pmd_threads(dp, NR_THREADS);
+ dp_netdev_reload_pmd_threads(dp);
+ }
+ ovs_refcount_init(&port->ref_cnt);
hmap_insert(&dp->ports, &port->node, hash_int(odp_to_u32(port_no), 0));
seq_change(dp->port_seq);
}
}
+static void
+port_ref(struct dp_netdev_port *port)
+{
+ if (port) {
+ ovs_refcount_ref(&port->ref_cnt);
+ }
+}
+
+static void
+port_unref(struct dp_netdev_port *port)
+{
+ if (port && ovs_refcount_unref(&port->ref_cnt) == 1) {
+ int i;
+
+ netdev_close(port->netdev);
+ netdev_restore_flags(port->sf);
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ netdev_rxq_close(port->rxq[i]);
+ }
+ free(port->type);
+ free(port);
+ }
+}
+
static int
get_port_by_name(struct dp_netdev *dp,
const char *devname, struct dp_netdev_port **portp)
hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
+ if (netdev_is_pmd(port->netdev)) {
+ dp_netdev_reload_pmd_threads(dp);
+ }
- netdev_close(port->netdev);
- netdev_restore_flags(port->sf);
- netdev_rx_close(port->rx);
- free(port->type);
- free(port);
-
+ port_unref(port);
return 0;
}
free(actions);
}
\f
+
+inline static void
+dp_netdev_process_rxq_port(struct dp_netdev *dp,
+ struct dp_netdev_port *port,
+ struct netdev_rxq *rxq)
+{
+ struct ofpbuf *packet[NETDEV_MAX_RX_BATCH];
+ int error, c;
+
+ error = netdev_rxq_recv(rxq, packet, &c);
+ if (!error) {
+ struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no);
+ int i;
+
+ for (i = 0; i < c; i++) {
+ dp_netdev_port_input(dp, packet[i], &md);
+ }
+ } else if (error != EAGAIN && error != EOPNOTSUPP) {
+ static struct vlog_rate_limit rl
+ = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
+ netdev_get_name(port->netdev),
+ ovs_strerror(error));
+ }
+}
+
+static void
+dpif_netdev_run(struct dpif *dpif)
+{
+ struct dp_netdev_port *port;
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ int i;
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
+ }
+ }
+ }
+
+ ovs_rwlock_unlock(&dp->port_rwlock);
+}
+
+static void
+dpif_netdev_wait(struct dpif *dpif)
+{
+ struct dp_netdev_port *port;
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ int i;
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ netdev_rxq_wait(port->rxq[i]);
+ }
+ }
+ }
+ ovs_rwlock_unlock(&dp->port_rwlock);
+}
+
+struct rxq_poll {
+ struct dp_netdev_port *port;
+ struct netdev_rxq *rx;
+};
+
+static int
+pmd_load_queues(struct pmd_thread *f,
+ struct rxq_poll **ppoll_list, int poll_cnt)
+{
+ struct dp_netdev *dp = f->dp;
+ struct rxq_poll *poll_list = *ppoll_list;
+ struct dp_netdev_port *port;
+ int id = f->id;
+ int index;
+ int i;
+
+ /* Simple scheduler for netdev rx polling. */
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ poll_cnt = 0;
+ index = 0;
+
+ HMAP_FOR_EACH (port, node, &f->dp->ports) {
+ if (netdev_is_pmd(port->netdev)) {
+ int i;
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ if ((index % dp->n_pmd_threads) == id) {
+ poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
+
+ port_ref(port);
+ poll_list[poll_cnt].port = port;
+ poll_list[poll_cnt].rx = port->rxq[i];
+ poll_cnt++;
+ }
+ index++;
+ }
+ }
+ }
+
+ ovs_rwlock_unlock(&dp->port_rwlock);
+ *ppoll_list = poll_list;
+ return poll_cnt;
+}
+
static void *
-dp_forwarder_main(void *f_)
+pmd_thread_main(void *f_)
{
- struct dp_forwarder *f = f_;
+ struct pmd_thread *f = f_;
struct dp_netdev *dp = f->dp;
+ unsigned int lc = 0;
+ struct rxq_poll *poll_list;
+ unsigned int port_seq;
+ int poll_cnt;
+ int i;
- f->name = xasprintf("forwarder_%u", ovsthread_id_self());
+ f->name = xasprintf("pmd_%u", ovsthread_id_self());
set_subprogram_name("%s", f->name);
+ poll_cnt = 0;
+ poll_list = NULL;
- while (!latch_is_set(&dp->exit_latch)) {
- bool received_anything;
+reload:
+ poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+ atomic_read(&f->change_seq, &port_seq);
+
+ for (;;) {
+ unsigned int c_port_seq;
int i;
- ovs_rwlock_rdlock(&dp->port_rwlock);
- for (i = 0; i < 50; i++) {
- struct dp_netdev_port *port;
-
- received_anything = false;
- HMAP_FOR_EACH (port, node, &f->dp->ports) {
- if (port->rx
- && port->node.hash >= f->min_hash
- && port->node.hash <= f->max_hash) {
- struct ofpbuf *packets[NETDEV_MAX_RX_BATCH];
- int count;
- int error;
-
- error = netdev_rx_recv(port->rx, packets, &count);
- if (!error) {
- int i;
- struct pkt_metadata md
- = PKT_METADATA_INITIALIZER(port->port_no);
-
- for (i = 0; i < count; i++) {
- dp_netdev_port_input(dp, packets[i], &md);
- }
- received_anything = true;
- } else if (error != EAGAIN && error != EOPNOTSUPP) {
- static struct vlog_rate_limit rl
- = VLOG_RATE_LIMIT_INIT(1, 5);
-
- VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
- netdev_get_name(port->netdev),
- ovs_strerror(error));
- }
- }
- }
+ for (i = 0; i < poll_cnt; i++) {
+ dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
+ }
- if (!received_anything) {
+ if (lc++ > 1024) {
+ ovsrcu_quiesce();
+
+ /* TODO: need completely userspace based signaling method.
+ * to keep this thread entirely in userspace.
+ * For now using atomic counter. */
+ lc = 0;
+ atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
+ if (c_port_seq != port_seq) {
break;
}
}
+ }
- if (received_anything) {
- poll_immediate_wake();
- } else {
- struct dp_netdev_port *port;
-
- HMAP_FOR_EACH (port, node, &f->dp->ports)
- if (port->rx
- && port->node.hash >= f->min_hash
- && port->node.hash <= f->max_hash) {
- netdev_rx_wait(port->rx);
- }
- seq_wait(dp->port_seq, seq_read(dp->port_seq));
- latch_wait(&dp->exit_latch);
- }
- ovs_rwlock_unlock(&dp->port_rwlock);
+ if (!latch_is_set(&f->dp->exit_latch)){
+ goto reload;
+ }
- poll_block();
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
}
+ free(poll_list);
free(f->name);
-
return NULL;
}
static void
-dp_netdev_set_threads(struct dp_netdev *dp, int n)
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
{
int i;
- if (n == dp->n_forwarders) {
+ if (n == dp->n_pmd_threads) {
return;
}
/* Stop existing threads. */
latch_set(&dp->exit_latch);
- for (i = 0; i < dp->n_forwarders; i++) {
- struct dp_forwarder *f = &dp->forwarders[i];
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
xpthread_join(f->thread, NULL);
}
latch_poll(&dp->exit_latch);
- free(dp->forwarders);
+ free(dp->pmd_threads);
/* Start new threads. */
- dp->forwarders = xmalloc(n * sizeof *dp->forwarders);
- dp->n_forwarders = n;
+ dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+ dp->n_pmd_threads = n;
+
for (i = 0; i < n; i++) {
- struct dp_forwarder *f = &dp->forwarders[i];
+ struct pmd_thread *f = &dp->pmd_threads[i];
f->dp = dp;
- f->min_hash = UINT32_MAX / n * i;
- f->max_hash = UINT32_MAX / n * (i + 1) - 1;
- if (i == n - 1) {
- f->max_hash = UINT32_MAX;
- }
- xpthread_create(&f->thread, NULL, dp_forwarder_main, f);
+ f->id = i;
+ atomic_store(&f->change_seq, 1);
+
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ xpthread_create(&f->thread, NULL, pmd_thread_main, f);
}
}
+
\f
static void *
dp_netdev_flow_stats_new_cb(void)
static void
dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
struct pkt_metadata *md)
- OVS_REQ_RDLOCK(dp->port_rwlock)
{
struct dp_netdev_flow *netdev_flow;
struct flow key;
dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
int queue_no, int type, const struct flow *flow,
const struct nlattr *userdata)
- OVS_EXCLUDED(dp->queue_rwlock)
{
struct dp_netdev_queue *q;
int error;
case OVS_ACTION_ATTR_OUTPUT:
p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
if (p) {
- netdev_send(p->netdev, packet);
+ netdev_send(p->netdev, packet, may_steal);
}
break;
% aux->dp->n_handlers,
DPIF_UC_ACTION, aux->key,
userdata);
+
+ if (may_steal) {
+ ofpbuf_delete(packet);
+ }
break;
}
case OVS_ACTION_ATTR_PUSH_VLAN:
OVS_NOT_REACHED();
}
- if (may_steal) {
- ofpbuf_delete(packet);
- }
}
static void
struct ofpbuf *packet, bool may_steal,
struct pkt_metadata *md,
const struct nlattr *actions, size_t actions_len)
- OVS_REQ_RDLOCK(dp->port_rwlock)
{
struct dp_netdev_execute_aux aux = {dp, key};
dpif_netdev_open,
dpif_netdev_close,
dpif_netdev_destroy,
- NULL, /* run */
- NULL, /* wait */
+ dpif_netdev_run,
+ dpif_netdev_wait,
dpif_netdev_get_stats,
dpif_netdev_port_add,
dpif_netdev_port_del,