+ struct dp_forwarder *f = f_;
+ struct dp_netdev *dp = f->dp;
+ struct ofpbuf packet;
+
+ f->name = xasprintf("forwarder_%u", ovsthread_id_self());
+ set_subprogram_name("%s", f->name);
+
+ ofpbuf_init(&packet, 0);
+ while (!latch_is_set(&dp->exit_latch)) {
+ bool received_anything;
+ int i;
+
+ ovs_rwlock_rdlock(&dp->port_rwlock);
+ for (i = 0; i < 50; i++) {
+ struct dp_netdev_port *port;
+
+ received_anything = false;
+ HMAP_FOR_EACH (port, node, &f->dp->ports) {
+ if (port->rx
+ && port->node.hash >= f->min_hash
+ && port->node.hash <= f->max_hash) {
+ int buf_size;
+ int error;
+ int mtu;
+
+ if (netdev_get_mtu(port->netdev, &mtu)) {
+ mtu = ETH_PAYLOAD_MAX;
+ }
+ buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
+
+ ofpbuf_clear(&packet);
+ ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM,
+ buf_size);
+
+ error = netdev_rx_recv(port->rx, &packet);
+ if (!error) {
+ struct pkt_metadata md
+ = PKT_METADATA_INITIALIZER(port->port_no);
+
+ dp_netdev_port_input(dp, &packet, &md);
+ received_anything = true;
+ } else if (error != EAGAIN && error != EOPNOTSUPP) {
+ static struct vlog_rate_limit rl
+ = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
+ netdev_get_name(port->netdev),
+ ovs_strerror(error));
+ }
+ }
+ }
+
+ if (!received_anything) {
+ break;
+ }
+ }
+
+ if (received_anything) {
+ poll_immediate_wake();
+ } else {
+ struct dp_netdev_port *port;
+
+ HMAP_FOR_EACH (port, node, &f->dp->ports)
+ if (port->rx
+ && port->node.hash >= f->min_hash
+ && port->node.hash <= f->max_hash) {
+ netdev_rx_wait(port->rx);
+ }
+ seq_wait(dp->port_seq, seq_read(dp->port_seq));
+ latch_wait(&dp->exit_latch);
+ }
+ ovs_rwlock_unlock(&dp->port_rwlock);
+
+ poll_block();
+ }
+ ofpbuf_uninit(&packet);
+
+ free(f->name);
+
+ return NULL;