+
+ ovs_rwlock_unlock(&dp->port_rwlock);
+ *ppoll_list = poll_list;
+ return poll_cnt;
+}
+
+static void *
+pmd_thread_main(void *f_)
+{
+ struct pmd_thread *f = f_;
+ struct dp_netdev *dp = f->dp;
+ unsigned int lc = 0;
+ struct rxq_poll *poll_list;
+ unsigned int port_seq;
+ int poll_cnt;
+ int i;
+
+ f->name = xasprintf("pmd_%u", ovsthread_id_self());
+ set_subprogram_name("%s", f->name);
+ poll_cnt = 0;
+ poll_list = NULL;
+
+ pmd_thread_setaffinity_cpu(f->id);
+reload:
+ poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+ atomic_read(&f->change_seq, &port_seq);
+
+ for (;;) {
+ unsigned int c_port_seq;
+ int i;
+
+ for (i = 0; i < poll_cnt; i++) {
+ dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
+ }
+
+ if (lc++ > 1024) {
+ ovsrcu_quiesce();
+
+ /* TODO: need completely userspace based signaling method.
+ * to keep this thread entirely in userspace.
+ * For now using atomic counter. */
+ lc = 0;
+ atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
+ if (c_port_seq != port_seq) {
+ break;
+ }
+ }
+ }
+
+ if (!latch_is_set(&f->dp->exit_latch)){
+ goto reload;
+ }
+
+ for (i = 0; i < poll_cnt; i++) {
+ port_unref(poll_list[i].port);
+ }
+
+ free(poll_list);
+ free(f->name);
+ return NULL;
+}
+
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+{
+ int i;
+
+ if (n == dp->n_pmd_threads) {
+ return;
+ }
+
+ /* Stop existing threads. */
+ latch_set(&dp->exit_latch);
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&dp->exit_latch);
+ free(dp->pmd_threads);
+
+ /* Start new threads. */
+ dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+ dp->n_pmd_threads = n;
+
+ for (i = 0; i < n; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ f->dp = dp;
+ f->id = i;
+ atomic_store(&f->change_seq, 1);
+
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ xpthread_create(&f->thread, NULL, pmd_thread_main, f);
+ }
+}
+
+\f
+static void *
+dp_netdev_flow_stats_new_cb(void)
+{
+ struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
+ const struct ofpbuf *packet,
+ const struct miniflow *key)
+{
+ uint16_t tcp_flags = miniflow_get_tcp_flags(key);
+ long long int now = time_msec();
+ struct dp_netdev_flow_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
+ dp_netdev_flow_stats_new_cb);
+
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->used = MAX(now, bucket->used);
+ bucket->packet_count++;
+ bucket->byte_count += ofpbuf_size(packet);
+ bucket->tcp_flags |= tcp_flags;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void *
+dp_netdev_stats_new_cb(void)
+{
+ struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
+{
+ struct dp_netdev_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->n[type]++;
+ ovs_mutex_unlock(&bucket->mutex);