+ ovs_rwlock_unlock(&dp->port_rwlock);
+}
+
+struct rxq_poll {
+ struct dp_netdev_port *port;
+ struct netdev_rxq *rx;
+};
+
+static int
+pmd_load_queues(struct pmd_thread *f,
+ struct rxq_poll **ppoll_list, int poll_cnt)
+{
+ struct dp_netdev *dp = f->dp;
+ struct rxq_poll *poll_list = *ppoll_list;
+ struct dp_netdev_port *port;
+ int id = f->id;
+ int index;
+ int i;
+
+ /* Simple scheduler for netdev rx polling. */
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ poll_cnt = 0;
+ index = 0;
+
+ HMAP_FOR_EACH (port, node, &f->dp->ports) {
+ if (netdev_is_pmd(port->netdev)) {
+ int i;
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ if ((index % dp->n_pmd_threads) == id) {
+ poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
+
+ port_ref(port);
+ poll_list[poll_cnt].port = port;
+ poll_list[poll_cnt].rx = port->rxq[i];
+ poll_cnt++;
+ }
+ index++;
+ }
+ }
+ }
+
+ ovs_rwlock_unlock(&dp->port_rwlock);
+ *ppoll_list = poll_list;
+ return poll_cnt;
+}
+
+static void *
+pmd_thread_main(void *f_)
+{
+ struct pmd_thread *f = f_;
+ struct dp_netdev *dp = f->dp;
+ unsigned int lc = 0;
+ struct rxq_poll *poll_list;
+ unsigned int port_seq;
+ int poll_cnt;
+ int i;
+
+ f->name = xasprintf("pmd_%u", ovsthread_id_self());
+ set_subprogram_name("%s", f->name);
+ poll_cnt = 0;
+ poll_list = NULL;
+
+ pmd_thread_setaffinity_cpu(f->id);
+reload:
+ poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+ atomic_read(&f->change_seq, &port_seq);
+
+ for (;;) {
+ unsigned int c_port_seq;
+ int i;
+
+ for (i = 0; i < poll_cnt; i++) {
+ dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
+ }
+
+ if (lc++ > 1024) {
+ ovsrcu_quiesce();
+
+ /* TODO: need completely userspace based signaling method.
+ * to keep this thread entirely in userspace.
+ * For now using atomic counter. */
+ lc = 0;
+ atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
+ if (c_port_seq != port_seq) {
+ break;
+ }
+ }
+ }
+
+ if (!latch_is_set(&f->dp->exit_latch)){
+ goto reload;
+ }
+
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ free(poll_list);
+ free(f->name);
+ return NULL;
+}
+
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+{
+ int i;
+
+ if (n == dp->n_pmd_threads) {
+ return;
+ }
+
+ /* Stop existing threads. */
+ latch_set(&dp->exit_latch);
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&dp->exit_latch);
+ free(dp->pmd_threads);
+
+ /* Start new threads. */
+ dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+ dp->n_pmd_threads = n;
+
+ for (i = 0; i < n; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ f->dp = dp;
+ f->id = i;
+ atomic_store(&f->change_seq, 1);
+
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ xpthread_create(&f->thread, NULL, pmd_thread_main, f);
+ }
+}
+
+\f
+static void *
+dp_netdev_flow_stats_new_cb(void)
+{
+ struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
+ const struct ofpbuf *packet,
+ const struct miniflow *key)
+{
+ uint16_t tcp_flags = miniflow_get_tcp_flags(key);
+ long long int now = time_msec();
+ struct dp_netdev_flow_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
+ dp_netdev_flow_stats_new_cb);
+
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->used = MAX(now, bucket->used);
+ bucket->packet_count++;
+ bucket->byte_count += ofpbuf_size(packet);
+ bucket->tcp_flags |= tcp_flags;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void *
+dp_netdev_stats_new_cb(void)
+{
+ struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;