+static void *
+dp_forwarder_main(void *f_)
+{
+ struct dp_forwarder *f = f_;
+ struct dp_netdev *dp = f->dp;
+ struct ofpbuf packet;
+
+ f->name = xasprintf("forwarder_%u", ovsthread_id_self());
+ set_subprogram_name("%s", f->name);
+
+ ofpbuf_init(&packet, 0);
+ while (!latch_is_set(&dp->exit_latch)) {
+ bool received_anything;
+ int i;
+
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+ for (i = 0; i < 50; i++) {
+ struct dp_netdev_port *port;
+
+ received_anything = false;
+ HMAP_FOR_EACH (port, node, &f->dp->ports) {
+ if (port->rx
+ && port->node.hash >= f->min_hash
+ && port->node.hash <= f->max_hash) {
+ int buf_size;
+ int error;
+ int mtu;
+
+ if (netdev_get_mtu(port->netdev, &mtu)) {
+ mtu = ETH_PAYLOAD_MAX;
+ }
+ buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
+
+ ofpbuf_clear(&packet);
+ ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM,
+ buf_size);
+
+ error = netdev_rx_recv(port->rx, &packet);
+ if (!error) {
+ struct pkt_metadata md
+ = PKT_METADATA_INITIALIZER(port->port_no);
+ dp_netdev_port_input(dp, &packet, &md);
+
+ received_anything = true;
+ } else if (error != EAGAIN && error != EOPNOTSUPP) {
+ static struct vlog_rate_limit rl
+ = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
+ netdev_get_name(port->netdev),
+ ovs_strerror(error));
+ }
+ }
+ }
+
+ if (!received_anything) {
+ break;
+ }
+ }
+
+ if (received_anything) {
+ poll_immediate_wake();
+ } else {
+ struct dp_netdev_port *port;
+
+ HMAP_FOR_EACH (port, node, &f->dp->ports)
+ if (port->rx
+ && port->node.hash >= f->min_hash
+ && port->node.hash <= f->max_hash) {
+ netdev_rx_wait(port->rx);
+ }
+ seq_wait(dp->port_seq, seq_read(dp->port_seq));
+ latch_wait(&dp->exit_latch);
+ }
+ ovs_rwlock_unlock(&dp->port_rwlock);
+
+ poll_block();
+ }
+ ofpbuf_uninit(&packet);
+
+ free(f->name);
+
+ return NULL;
+}
+
+static void
+dp_netdev_set_threads(struct dp_netdev *dp, int n)
+{
+ int i;
+
+ if (n == dp->n_forwarders) {
+ return;
+ }
+
+ /* Stop existing threads. */
+ latch_set(&dp->exit_latch);
+ for (i = 0; i < dp->n_forwarders; i++) {
+ struct dp_forwarder *f = &dp->forwarders[i];
+
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&dp->exit_latch);
+ free(dp->forwarders);
+
+ /* Start new threads. */
+ dp->forwarders = xmalloc(n * sizeof *dp->forwarders);
+ dp->n_forwarders = n;
+ for (i = 0; i < n; i++) {
+ struct dp_forwarder *f = &dp->forwarders[i];
+
+ f->dp = dp;
+ f->min_hash = UINT32_MAX / n * i;
+ f->max_hash = UINT32_MAX / n * (i + 1) - 1;
+ if (i == n - 1) {
+ f->max_hash = UINT32_MAX;
+ }
+ xpthread_create(&f->thread, NULL, dp_forwarder_main, f);
+ }
+}
+\f