+
+ /* Stop existing threads. */
+ latch_set(&dp->exit_latch);
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < dp->n_pmd_threads; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&dp->exit_latch);
+ free(dp->pmd_threads);
+
+ /* Start new threads. */
+ dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+ dp->n_pmd_threads = n;
+
+ for (i = 0; i < n; i++) {
+ struct pmd_thread *f = &dp->pmd_threads[i];
+
+ f->dp = dp;
+ f->id = i;
+ atomic_store(&f->change_seq, 1);
+
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ xpthread_create(&f->thread, NULL, pmd_thread_main, f);
+ }
+}
+
+\f
+static void *
+dp_netdev_flow_stats_new_cb(void)
+{
+ struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
+ const struct ofpbuf *packet,
+ const struct miniflow *key)
+{
+ uint16_t tcp_flags = miniflow_get_tcp_flags(key);
+ long long int now = time_msec();
+ struct dp_netdev_flow_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
+ dp_netdev_flow_stats_new_cb);
+
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->used = MAX(now, bucket->used);
+ bucket->packet_count++;
+ bucket->byte_count += ofpbuf_size(packet);
+ bucket->tcp_flags |= tcp_flags;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void *
+dp_netdev_stats_new_cb(void)
+{
+ struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+ ovs_mutex_init(&bucket->mutex);
+ return bucket;
+}
+
+static void
+dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
+{
+ struct dp_netdev_stats *bucket;
+
+ bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
+ ovs_mutex_lock(&bucket->mutex);
+ bucket->n[type]++;
+ ovs_mutex_unlock(&bucket->mutex);
+}
+
+static void
+dp_netdev_input(struct dp_netdev *dp, struct ofpbuf *packet,
+ struct pkt_metadata *md)
+ OVS_REQ_RDLOCK(dp->port_rwlock)
+{
+ struct dp_netdev_flow *netdev_flow;
+ struct miniflow key;
+ uint32_t buf[FLOW_U32S];
+
+ if (ofpbuf_size(packet) < ETH_HEADER_LEN) {
+ ofpbuf_delete(packet);
+ return;
+ }
+ miniflow_initialize(&key, buf);
+ miniflow_extract(packet, md, &key);
+
+ netdev_flow = dp_netdev_lookup_flow(dp, &key);
+ if (netdev_flow) {
+ struct dp_netdev_actions *actions;
+
+ dp_netdev_flow_used(netdev_flow, packet, &key);
+
+ actions = dp_netdev_flow_get_actions(netdev_flow);
+ dp_netdev_execute_actions(dp, &key, packet, true, md,
+ actions->actions, actions->size);
+ dp_netdev_count_packet(dp, DP_STAT_HIT);
+ } else if (dp->handler_queues) {
+ dp_netdev_count_packet(dp, DP_STAT_MISS);
+ dp_netdev_output_userspace(dp, packet,
+ miniflow_hash_5tuple(&key, 0)
+ % dp->n_handlers,
+ DPIF_UC_MISS, &key, NULL);
+ ofpbuf_delete(packet);
+ }
+}
+
+static void
+dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
+ struct pkt_metadata *md)
+ OVS_REQ_RDLOCK(dp->port_rwlock)
+{
+ uint32_t *recirc_depth = recirc_depth_get();
+
+ *recirc_depth = 0;
+ dp_netdev_input(dp, packet, md);