+pmd_load_queues(struct pmd_thread *f,
+ struct rxq_poll **ppoll_list, int poll_cnt)
+{
+ struct dp_netdev *dp = f->dp;
+ struct rxq_poll *poll_list = *ppoll_list;
+ struct dp_netdev_port *port;
+ int id = f->id;
+ int index;
+ int i;
+
+ /* Simple scheduler for netdev rx polling. */
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ poll_cnt = 0;
+ index = 0;
+
+ HMAP_FOR_EACH (port, node, &f->dp->ports) {
+ if (netdev_is_pmd(port->netdev)) {
+ int i;
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ if ((index % dp->n_pmd_threads) == id) {
+ poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
+
+ port_ref(port);
+ poll_list[poll_cnt].port = port;
+ poll_list[poll_cnt].rx = port->rxq[i];
+ poll_cnt++;
+ }
+ index++;
+ }
+ }
+ }
+
+ ovs_rwlock_unlock(&dp->port_rwlock);
+ *ppoll_list = poll_list;
+ return poll_cnt;
+}
+
+static void *
+pmd_thread_main(void *f_)
+{
+ struct pmd_thread *f = f_;
+ struct dp_netdev *dp = f->dp;
+ unsigned int lc = 0;
+ struct rxq_poll *poll_list;
+ unsigned int port_seq;
+ int poll_cnt;
+ int i;
+
+ poll_cnt = 0;
+ poll_list = NULL;
+
+ pmd_thread_setaffinity_cpu(f->id);
+reload:
+ poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+ atomic_read(&f->change_seq, &port_seq);
+
+ for (;;) {
+ unsigned int c_port_seq;
+ int i;
+
+ for (i = 0; i < poll_cnt; i++) {
+ dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
+ }
+
+ if (lc++ > 1024) {
+ ovsrcu_quiesce();
+
+ /* TODO: need completely userspace based signaling method.
+ * to keep this thread entirely in userspace.
+ * For now using atomic counter. */
+ lc = 0;
+ atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
+ if (c_port_seq != port_seq) {
+ break;
+ }
+ }
+ }
+
+ if (!latch_is_set(&f->dp->exit_latch)){
+ goto reload;
+ }
+
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ free(poll_list);
+ return NULL;
+}
+
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+{
+ int i;
+
+ if (n == dp->n_pmd_threads) {
+ return;
+ }
+
+ /* Stop existing threads. */
+ latch_set(&dp->exit_latch);
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&dp->exit_latch);
+ free(dp->pmd_threads);
+
+ /* Start new threads. */
+ dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+ dp->n_pmd_threads = n;
+
+ for (i = 0; i < n; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ f->dp = dp;
+ f->id = i;
+ atomic_store(&f->change_seq, 1);
+
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+ }
+}
+
+\f
+static void *
+dp_netdev_flow_stats_new_cb(void)
+{
+ struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
+ const struct ofpbuf *packet,
+ const struct miniflow *key)
+{
+ uint16_t tcp_flags = miniflow_get_tcp_flags(key);
+ long long int now = time_msec();
+ struct dp_netdev_flow_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
+ dp_netdev_flow_stats_new_cb);
+
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->used = MAX(now, bucket->used);
+ bucket->packet_count++;
+ bucket->byte_count += ofpbuf_size(packet);
+ bucket->tcp_flags |= tcp_flags;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void *
+dp_netdev_stats_new_cb(void)
+{
+ struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
+{
+ struct dp_netdev_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->n[type]++;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void
+dp_netdev_input(struct dp_netdev *dp, struct ofpbuf *packet,
+ struct pkt_metadata *md)
+ OVS_REQ_RDLOCK(dp->port_rwlock)
+{
+ struct dp_netdev_flow *netdev_flow;
+ struct {
+ struct miniflow flow;
+ uint32_t buf[FLOW_U32S];
+ } key;
+
+ if (ofpbuf_size(packet) < ETH_HEADER_LEN) {
+ ofpbuf_delete(packet);
+ return;
+ }
+ miniflow_initialize(&key.flow, key.buf);
+ miniflow_extract(packet, md, &key.flow);
+
+ netdev_flow = dp_netdev_lookup_flow(dp, &key.flow);
+ if (netdev_flow) {
+ struct dp_netdev_actions *actions;
+
+ dp_netdev_flow_used(netdev_flow, packet, &key.flow);
+
+ actions = dp_netdev_flow_get_actions(netdev_flow);
+ dp_netdev_execute_actions(dp, &key.flow, packet, true, md,
+ actions->actions, actions->size);
+ dp_netdev_count_packet(dp, DP_STAT_HIT);
+ } else if (dp->handler_queues) {
+ dp_netdev_count_packet(dp, DP_STAT_MISS);
+ dp_netdev_output_userspace(dp, packet,
+ miniflow_hash_5tuple(&key.flow, 0)
+ % dp->n_handlers,
+ DPIF_UC_MISS, &key.flow, NULL);
+ ofpbuf_delete(packet);
+ }
+}
+
+static void
+dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
+ struct pkt_metadata *md)
+ OVS_REQ_RDLOCK(dp->port_rwlock)
+{
+ uint32_t *recirc_depth = recirc_depth_get();
+
+ *recirc_depth = 0;
+ dp_netdev_input(dp, packet, md);
+}
+
+static int
+dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
+ int queue_no, int type, const struct miniflow *key,