Merge branch 'mainstream'
authorGiuseppe Lettieri <g.lettieri@iet.unipi.it>
Thu, 10 Apr 2014 15:20:49 +0000 (17:20 +0200)
committerGiuseppe Lettieri <g.lettieri@iet.unipi.it>
Thu, 10 Apr 2014 15:20:49 +0000 (17:20 +0200)
Conflicts:
lib/dpif-netdev.c
lib/netdev.c

1  2 
.gitignore
Makefile.am
lib/automake.mk
lib/dpif-netdev.c
lib/dpif-provider.h
lib/dpif.c
lib/netdev-pltap.c
lib/netdev-provider.h
lib/netdev-tunnel.c
lib/netdev.c

diff --combined .gitignore
@@@ -7,7 -7,12 +7,12 @@@
  *.loT
  *.mod.c
  *.o
- *.o
+ *.obj
+ *.exe
+ *.exp
+ *.ilk
+ *.lib
+ *.pdb
  *.pyc
  *.so
  *~
@@@ -52,5 -57,4 +57,5 @@@ Module.symver
  TAGS
  cscope.*
  tags
 +myexp/
  _debian
diff --combined Makefile.am
@@@ -14,14 -14,16 +14,16 @@@ AM_LDFLAGS = $(SSL_LDFLAGS
  
  if WIN32
  AM_CPPFLAGS += -I $(top_srcdir)/include/windows
+ AM_CPPFLAGS += $(PTHREAD_INCLUDES)
+ AM_LDFLAGS += $(PTHREAD_LDFLAGS)
  endif
  
- AM_CPPFLAGS += $(SSL_INCLUDES)
  AM_CPPFLAGS += -I $(top_srcdir)/include
  AM_CPPFLAGS += -I $(top_srcdir)/lib
  AM_CPPFLAGS += -I $(top_builddir)/lib
  
+ AM_CPPFLAGS += $(SSL_INCLUDES)
  AM_CFLAGS = -Wstrict-prototypes
  AM_CFLAGS += $(WARNING_FLAGS)
  
@@@ -61,6 -63,7 +63,7 @@@ EXTRA_DIST = 
        FAQ \
        INSTALL \
        INSTALL.Debian \
+         INSTALL.DPDK \
        INSTALL.Fedora \
        INSTALL.KVM \
        INSTALL.Libvirt \
@@@ -125,7 -128,6 +128,7 @@@ ro_shell = printf '\043 Generated autom
  
  SUFFIXES += .in
  .in:
 +      @mkdir -p $$(dirname $@)
        $(PERL) $(srcdir)/build-aux/soexpand.pl -I$(srcdir) < $< | \
            sed \
                -e 's,[@]PKIDIR[@],$(PKIDIR),g' \
@@@ -164,10 -166,10 +167,10 @@@ dist-hook-git: distfile
        @if test -e $(srcdir)/.git && (git --version) >/dev/null 2>&1; then \
          (cd datapath && $(MAKE) distfiles);                               \
          (cat distfiles; sed 's|^|datapath/|' datapath/distfiles) |        \
 -          LC_ALL=C sort -u > all-distfiles;                               \
 -        (cd $(srcdir) && git ls-files) | grep -v '\.gitignore$$' |        \
 -          LC_ALL=C sort -u > all-gitfiles;                                \
 -        LC_ALL=C comm -1 -3 all-distfiles all-gitfiles > missing-distfiles; \
 +          LC_ALL=C sort -u > all-distfiles;                                       \
 +        (cd $(srcdir) && git ls-files) | grep -vFf $(srcdir)/.non-distfiles |     \
 +          LC_ALL=C sort -u > all-gitfiles;                                        \
 +        LC_ALL=C comm -1 -3 all-distfiles all-gitfiles > missing-distfiles;       \
          if test -s missing-distfiles; then                                \
            echo "The distribution is missing the following files:";        \
            cat missing-distfiles;                                          \
@@@ -305,6 -307,5 +308,6 @@@ include rhel/automake.m
  include xenserver/automake.mk
  include python/automake.mk
  include python/compat/automake.mk
 +include planetlab/automake.mk
  include tutorial/automake.mk
  include vtep/automake.mk
diff --combined lib/automake.mk
@@@ -8,6 -8,11 +8,11 @@@
  lib_LTLIBRARIES += lib/libopenvswitch.la
  
  lib_libopenvswitch_la_LIBADD = $(SSL_LIBS)
+ if WIN32
+ lib_libopenvswitch_la_LIBADD += ${PTHREAD_LIBS}
+ endif
  lib_libopenvswitch_la_LDFLAGS = -release $(VERSION)
  
  lib_libopenvswitch_la_SOURCES = \
@@@ -47,6 -52,7 +52,7 @@@
        lib/dhparams.h \
        lib/dirs.h \
        lib/dpif-netdev.c \
+       lib/dpif-netdev.h \
        lib/dpif-provider.h \
        lib/dpif.c \
        lib/dpif.h \
        lib/multipath.c \
        lib/multipath.h \
        lib/netdev-dummy.c \
 +      lib/netdev-tunnel.c \
 +      lib/netdev-pltap.c \
        lib/netdev-provider.h \
        lib/netdev-vport.c \
        lib/netdev-vport.h \
        lib/ovs-atomic-c11.h \
        lib/ovs-atomic-clang.h \
        lib/ovs-atomic-flag-gcc4.7+.h \
-       lib/ovs-atomic-gcc4+.c \
        lib/ovs-atomic-gcc4+.h \
        lib/ovs-atomic-gcc4.7+.h \
-       lib/ovs-atomic-pthreads.c \
+       lib/ovs-atomic-locked.c \
+       lib/ovs-atomic-locked.h \
        lib/ovs-atomic-pthreads.h \
        lib/ovs-atomic.h \
+       lib/ovs-rcu.c \
+       lib/ovs-rcu.h \
        lib/ovs-thread.c \
        lib/ovs-thread.h \
        lib/ovsdb-data.c \
        lib/timeval.h \
        lib/token-bucket.c \
        lib/token-bucket.h \
 +      lib/tunalloc.c \
 +      lib/tunalloc.h \
        lib/type-props.h \
        lib/unaligned.h \
        lib/unicode.c \
@@@ -242,6 -246,8 +250,8 @@@ lib_libopenvswitch_la_SOURCES += 
        lib/getopt_long.c \
        lib/getrusage-windows.c \
        lib/latch-windows.c \
+       lib/route-table-stub.c \
+       lib/strsep.c \
        lib/stream-fd-windows.c
  else
  lib_libopenvswitch_la_SOURCES += \
@@@ -296,6 -302,12 +306,12 @@@ lib_libopenvswitch_la_SOURCES += 
        lib/route-table.h
  endif
  
+ if DPDK_NETDEV
+ lib_libopenvswitch_la_SOURCES += \
+        lib/netdev-dpdk.c \
+        lib/netdev-dpdk.h
+ endif
  if HAVE_POSIX_AIO
  lib_libopenvswitch_la_SOURCES += lib/async-append-aio.c
  else
diff --combined lib/dpif-netdev.c
  #include "list.h"
  #include "meta-flow.h"
  #include "netdev.h"
+ #include "netdev-dpdk.h"
  #include "netdev-vport.h"
  #include "netlink.h"
  #include "odp-execute.h"
  #include "odp-util.h"
  #include "ofp-print.h"
  #include "ofpbuf.h"
+ #include "ovs-rcu.h"
  #include "packets.h"
  #include "poll-loop.h"
  #include "random.h"
@@@ -65,15 -67,12 +67,12 @@@ VLOG_DEFINE_THIS_MODULE(dpif_netdev)
  /* By default, choose a priority in the middle. */
  #define NETDEV_RULE_PRIORITY 0x8000
  
+ #define NR_THREADS 1
  /* Configuration parameters. */
  enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
  
- /* Enough headroom to add a vlan tag, plus an extra 2 bytes to allow IP
-  * headers to be aligned on a 4-byte boundary.  */
- enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
  /* Queues. */
- enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
  enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
  enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
  BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
@@@ -90,14 -89,17 +89,17 @@@ struct dp_netdev_upcall 
      struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
  };
  
- /* A queue passing packets from a struct dp_netdev to its clients.
+ /* A queue passing packets from a struct dp_netdev to its clients (handlers).
   *
   *
   * Thread-safety
   * =============
   *
-  * Any access at all requires the owning 'dp_netdev''s queue_mutex. */
+  * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
+  * its own mutex. */
  struct dp_netdev_queue {
+     struct ovs_mutex mutex;
+     struct seq *seq;      /* Incremented whenever a packet is queued. */
      struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
      unsigned int head OVS_GUARDED;
      unsigned int tail OVS_GUARDED;
   *    port_rwlock
   *    flow_mutex
   *    cls.rwlock
-  *    queue_mutex
+  *    queue_rwlock
   */
  struct dp_netdev {
      const struct dpif_class *const class;
  
      /* Queues.
       *
-      * Everything in 'queues' is protected by 'queue_mutex'. */
-     struct ovs_mutex queue_mutex;
-     struct dp_netdev_queue queues[N_QUEUES];
-     struct seq *queue_seq;      /* Incremented whenever a packet is queued. */
+      * 'queue_rwlock' protects the modification of 'handler_queues' and
+      * 'n_handlers'.  The queue elements are protected by its
+      * 'handler_queues''s mutex. */
+     struct fat_rwlock queue_rwlock;
+     struct dp_netdev_queue *handler_queues;
+     uint32_t n_handlers;
  
      /* Statistics.
       *
-      * ovsthread_counter is internally synchronized. */
-     struct ovsthread_counter *n_hit;    /* Number of flow table matches. */
-     struct ovsthread_counter *n_missed; /* Number of flow table misses. */
-     struct ovsthread_counter *n_lost;   /* Number of misses not passed up. */
+      * ovsthread_stats is internally synchronized. */
+     struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
  
      /* Ports.
       *
  
      /* Forwarding threads. */
      struct latch exit_latch;
-     struct dp_forwarder *forwarders;
-     size_t n_forwarders;
+     struct pmd_thread *pmd_threads;
+     size_t n_pmd_threads;
+     int pmd_count;
  };
  
  static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
                                                      odp_port_t)
      OVS_REQ_RDLOCK(dp->port_rwlock);
  
+ enum dp_stat_type {
+     DP_STAT_HIT,                /* Packets that matched in the flow table. */
+     DP_STAT_MISS,               /* Packets that did not match. */
+     DP_STAT_LOST,               /* Packets not passed up to the client. */
+     DP_N_STATS
+ };
+ /* Contained by struct dp_netdev's 'stats' member.  */
+ struct dp_netdev_stats {
+     struct ovs_mutex mutex;          /* Protects 'n'. */
+     /* Indexed by DP_STAT_*, protected by 'mutex'. */
+     unsigned long long int n[DP_N_STATS] OVS_GUARDED;
+ };
  /* A port in a netdev-based datapath. */
  struct dp_netdev_port {
      struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
      odp_port_t port_no;
      struct netdev *netdev;
      struct netdev_saved_flags *sf;
-     struct netdev_rx *rx;
+     struct netdev_rxq **rxq;
+     struct ovs_refcount ref_cnt;
      char *type;                 /* Port type as requested by user. */
  };
  
@@@ -231,12 -251,6 +251,6 @@@ struct dp_netdev_flow 
      const struct hmap_node node; /* In owning dp_netdev's 'flow_table'. */
      const struct flow flow;      /* The flow that created this entry. */
  
-     /* Number of references.
-      * The classifier owns one reference.
-      * Any thread trying to keep a rule from being freed should hold its own
-      * reference. */
-     struct ovs_refcount ref_cnt;
      /* Protects members marked OVS_GUARDED.
       *
       * Acquire after datapath's flow_mutex. */
      /* Statistics.
       *
       * Reading or writing these members requires 'mutex'. */
-     long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
-     long long int packet_count OVS_GUARDED; /* Number of packets matched. */
-     long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
-     uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
+     struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
  
      /* Actions.
       *
       * Reading 'actions' requires 'mutex'.
       * Writing 'actions' requires 'mutex' and (to allow for transactions) the
       * datapath's flow_mutex. */
-     struct dp_netdev_actions *actions OVS_GUARDED;
+     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
  };
  
- static struct dp_netdev_flow *dp_netdev_flow_ref(
-     const struct dp_netdev_flow *);
- static void dp_netdev_flow_unref(struct dp_netdev_flow *);
+ static void dp_netdev_flow_free(struct dp_netdev_flow *);
+ /* Contained by struct dp_netdev_flow's 'stats' member.  */
+ struct dp_netdev_flow_stats {
+     struct ovs_mutex mutex;         /* Guards all the other members. */
+     long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
+     long long int packet_count OVS_GUARDED; /* Number of packets matched. */
+     long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
+     uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
+ };
  
  /* A set of datapath actions within a "struct dp_netdev_flow".
   *
   * 'flow' is the dp_netdev_flow for which 'flow->actions == actions') or that
   * owns a reference to 'actions->ref_cnt' (or both). */
  struct dp_netdev_actions {
-     struct ovs_refcount ref_cnt;
      /* These members are immutable: they do not change during the struct's
       * lifetime.  */
      struct nlattr *actions;     /* Sequence of OVS_ACTION_ATTR_* attributes. */
  
  struct dp_netdev_actions *dp_netdev_actions_create(const struct nlattr *,
                                                     size_t);
- struct dp_netdev_actions *dp_netdev_actions_ref(
-     const struct dp_netdev_actions *);
void dp_netdev_actions_unref(struct dp_netdev_actions *);
+ struct dp_netdev_actions *dp_netdev_flow_get_actions(
+     const struct dp_netdev_flow *);
static void dp_netdev_actions_free(struct dp_netdev_actions *);
  
- /* A thread that receives packets from some ports, looks them up in the flow
-  * table, and executes the actions it finds. */
- struct dp_forwarder {
+ /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
+  * the performance overhead of interrupt processing.  Therefore netdev can
+  * not implement rx-wait for these devices.  dpif-netdev needs to poll
+  * these device to check for recv buffer.  pmd-thread does polling for
+  * devices assigned to itself thread.
+  *
+  * DPDK used PMD for accessing NIC.
+  *
+  * A thread that receives packets from PMD ports, looks them up in the flow
+  * table, and executes the actions it finds.
+  **/
+ struct pmd_thread {
      struct dp_netdev *dp;
      pthread_t thread;
+     int id;
+     atomic_uint change_seq;
      char *name;
-     uint32_t min_hash, max_hash;
  };
  
  /* Interface to netdev-based datapath. */
@@@ -317,22 -344,23 +344,23 @@@ static int do_add_port(struct dp_netde
      OVS_REQ_WRLOCK(dp->port_rwlock);
  static int do_del_port(struct dp_netdev *dp, odp_port_t port_no)
      OVS_REQ_WRLOCK(dp->port_rwlock);
+ static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
+     OVS_REQ_WRLOCK(dp->queue_rwlock);
  static int dpif_netdev_open(const struct dpif_class *, const char *name,
                              bool create, struct dpif **);
  static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
-                                     int queue_no, const struct flow *,
-                                     const struct nlattr *userdata)
-     OVS_EXCLUDED(dp->queue_mutex);
+                                       int queue_no, int type,
+                                       const struct flow *,
+                                       const struct nlattr *userdata);
  static void dp_netdev_execute_actions(struct dp_netdev *dp,
-                                       const struct flow *, struct ofpbuf *,
+                                       const struct flow *, struct ofpbuf *, bool may_steal,
                                        struct pkt_metadata *,
                                        const struct nlattr *actions,
-                                       size_t actions_len)
-     OVS_REQ_RDLOCK(dp->port_rwlock);
+                                       size_t actions_len);
  static void dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
-                                  struct pkt_metadata *)
-     OVS_REQ_RDLOCK(dp->port_rwlock);
- static void dp_netdev_set_threads(struct dp_netdev *, int n);
+                                  struct pkt_metadata *);
+ static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
  
  static struct dpif_netdev *
  dpif_netdev_cast(const struct dpif *dpif)
@@@ -367,17 -395,10 +395,17 @@@ dpif_netdev_class_is_dummy(const struc
      return class != &dpif_netdev_class;
  }
  
 +static bool
 +dpif_netdev_class_is_planetlab(const struct dpif_class *class)
 +{
 +    return class == &dpif_planetlab_class;
 +}
 +
  static const char *
  dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
  {
      return strcmp(type, "internal") ? type
 +                  : dpif_netdev_class_is_planetlab(class) ? "pltap"
                    : dpif_netdev_class_is_dummy(class) ? "dummy"
                    : "tap";
  }
@@@ -406,8 -427,7 +434,8 @@@ choose_port(struct dp_netdev *dp, cons
  {
      uint32_t port_no;
  
 -    if (dp->class != &dpif_netdev_class) {
 +    if (dp->class != &dpif_netdev_class && 
 +        dp->class != &dpif_planetlab_class) {
          const char *p;
          int start_no = 0;
  
@@@ -448,7 -468,6 +476,6 @@@ create_dp_netdev(const char *name, cons
  {
      struct dp_netdev *dp;
      int error;
-     int i;
  
      dp = xzalloc(sizeof *dp);
      shash_add(&dp_netdevs, name, dp);
      *CONST_CAST(const struct dpif_class **, &dp->class) = class;
      *CONST_CAST(const char **, &dp->name) = xstrdup(name);
      ovs_refcount_init(&dp->ref_cnt);
-     atomic_flag_init(&dp->destroyed);
+     atomic_flag_clear(&dp->destroyed);
  
      ovs_mutex_init(&dp->flow_mutex);
      classifier_init(&dp->cls, NULL);
      hmap_init(&dp->flow_table);
  
-     ovs_mutex_init(&dp->queue_mutex);
-     ovs_mutex_lock(&dp->queue_mutex);
-     for (i = 0; i < N_QUEUES; i++) {
-         dp->queues[i].head = dp->queues[i].tail = 0;
-     }
-     ovs_mutex_unlock(&dp->queue_mutex);
-     dp->queue_seq = seq_create();
+     fat_rwlock_init(&dp->queue_rwlock);
  
-     dp->n_hit = ovsthread_counter_create();
-     dp->n_missed = ovsthread_counter_create();
-     dp->n_lost = ovsthread_counter_create();
+     ovsthread_stats_init(&dp->stats);
  
      ovs_rwlock_init(&dp->port_rwlock);
      hmap_init(&dp->ports);
          dp_netdev_free(dp);
          return error;
      }
-     dp_netdev_set_threads(dp, 2);
  
      *dpp = dp;
      return 0;
@@@ -518,20 -528,21 +536,21 @@@ dpif_netdev_open(const struct dpif_clas
  
  static void
  dp_netdev_purge_queues(struct dp_netdev *dp)
+     OVS_REQ_WRLOCK(dp->queue_rwlock)
  {
      int i;
  
-     ovs_mutex_lock(&dp->queue_mutex);
-     for (i = 0; i < N_QUEUES; i++) {
-         struct dp_netdev_queue *q = &dp->queues[i];
+     for (i = 0; i < dp->n_handlers; i++) {
+         struct dp_netdev_queue *q = &dp->handler_queues[i];
  
+         ovs_mutex_lock(&q->mutex);
          while (q->tail != q->head) {
              struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
              ofpbuf_uninit(&u->upcall.packet);
              ofpbuf_uninit(&u->buf);
          }
+         ovs_mutex_unlock(&q->mutex);
      }
-     ovs_mutex_unlock(&dp->queue_mutex);
  }
  
  /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
@@@ -541,11 -552,13 +560,13 @@@ dp_netdev_free(struct dp_netdev *dp
      OVS_REQUIRES(dp_netdev_mutex)
  {
      struct dp_netdev_port *port, *next;
+     struct dp_netdev_stats *bucket;
+     int i;
  
      shash_find_and_delete(&dp_netdevs, dp->name);
  
-     dp_netdev_set_threads(dp, 0);
-     free(dp->forwarders);
+     dp_netdev_set_pmd_threads(dp, 0);
+     free(dp->pmd_threads);
  
      dp_netdev_flow_flush(dp);
      ovs_rwlock_wrlock(&dp->port_rwlock);
          do_del_port(dp, port->port_no);
      }
      ovs_rwlock_unlock(&dp->port_rwlock);
-     ovsthread_counter_destroy(dp->n_hit);
-     ovsthread_counter_destroy(dp->n_missed);
-     ovsthread_counter_destroy(dp->n_lost);
  
-     dp_netdev_purge_queues(dp);
-     seq_destroy(dp->queue_seq);
-     ovs_mutex_destroy(&dp->queue_mutex);
+     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
+         ovs_mutex_destroy(&bucket->mutex);
+         free_cacheline(bucket);
+     }
+     ovsthread_stats_destroy(&dp->stats);
+     fat_rwlock_wrlock(&dp->queue_rwlock);
+     dp_netdev_destroy_all_queues(dp);
+     fat_rwlock_unlock(&dp->queue_rwlock);
+     fat_rwlock_destroy(&dp->queue_rwlock);
  
      classifier_destroy(&dp->cls);
      hmap_destroy(&dp->flow_table);
      ovs_mutex_destroy(&dp->flow_mutex);
      seq_destroy(dp->port_seq);
      hmap_destroy(&dp->ports);
-     atomic_flag_destroy(&dp->destroyed);
-     ovs_refcount_destroy(&dp->ref_cnt);
      latch_destroy(&dp->exit_latch);
      free(CONST_CAST(char *, dp->name));
      free(dp);
@@@ -615,20 -631,40 +639,40 @@@ static in
  dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
  {
      struct dp_netdev *dp = get_dp_netdev(dpif);
+     struct dp_netdev_stats *bucket;
+     size_t i;
  
      fat_rwlock_rdlock(&dp->cls.rwlock);
      stats->n_flows = hmap_count(&dp->flow_table);
      fat_rwlock_unlock(&dp->cls.rwlock);
  
-     stats->n_hit = ovsthread_counter_read(dp->n_hit);
-     stats->n_missed = ovsthread_counter_read(dp->n_missed);
-     stats->n_lost = ovsthread_counter_read(dp->n_lost);
+     stats->n_hit = stats->n_missed = stats->n_lost = 0;
+     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
+         ovs_mutex_lock(&bucket->mutex);
+         stats->n_hit += bucket->n[DP_STAT_HIT];
+         stats->n_missed += bucket->n[DP_STAT_MISS];
+         stats->n_lost += bucket->n[DP_STAT_LOST];
+         ovs_mutex_unlock(&bucket->mutex);
+     }
      stats->n_masks = UINT32_MAX;
      stats->n_mask_hit = UINT64_MAX;
  
      return 0;
  }
  
+ static void
+ dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+ {
+     int i;
+     for (i = 0; i < dp->n_pmd_threads; i++) {
+         struct pmd_thread *f = &dp->pmd_threads[i];
+         int id;
+         atomic_add(&f->change_seq, 1, &id);
+    }
+ }
  static int
  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
              odp_port_t port_no)
      struct netdev_saved_flags *sf;
      struct dp_netdev_port *port;
      struct netdev *netdev;
-     struct netdev_rx *rx;
      enum netdev_flags flags;
      const char *open_type;
      int error;
+     int i;
  
      /* XXX reject devices already in some dp_netdev. */
  
          return EINVAL;
      }
  
-     error = netdev_rx_open(netdev, &rx);
-     if (error
-         && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
-         VLOG_ERR("%s: cannot receive packets on this network device (%s)",
-                  devname, ovs_strerror(errno));
-         netdev_close(netdev);
-         return error;
+     port = xzalloc(sizeof *port);
+     port->port_no = port_no;
+     port->netdev = netdev;
+     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
+     port->type = xstrdup(type);
+     for (i = 0; i < netdev_n_rxq(netdev); i++) {
+         error = netdev_rxq_open(netdev, &port->rxq[i], i);
+         if (error
+             && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
+             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
+                      devname, ovs_strerror(errno));
+             netdev_close(netdev);
+             return error;
+         }
      }
  
      error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
      if (error) {
-         netdev_rx_close(rx);
+         for (i = 0; i < netdev_n_rxq(netdev); i++) {
+             netdev_rxq_close(port->rxq[i]);
+         }
          netdev_close(netdev);
+         free(port->rxq);
+         free(port);
          return error;
      }
-     port = xmalloc(sizeof *port);
-     port->port_no = port_no;
-     port->netdev = netdev;
      port->sf = sf;
-     port->rx = rx;
-     port->type = xstrdup(type);
+     if (netdev_is_pmd(netdev)) {
+         dp->pmd_count++;
+         dp_netdev_set_pmd_threads(dp, NR_THREADS);
+         dp_netdev_reload_pmd_threads(dp);
+     }
+     ovs_refcount_init(&port->ref_cnt);
  
      hmap_insert(&dp->ports, &port->node, hash_int(odp_to_u32(port_no), 0));
      seq_change(dp->port_seq);
@@@ -764,6 -812,31 +820,31 @@@ get_port_by_number(struct dp_netdev *dp
      }
  }
  
+ static void
+ port_ref(struct dp_netdev_port *port)
+ {
+     if (port) {
+         ovs_refcount_ref(&port->ref_cnt);
+     }
+ }
+ static void
+ port_unref(struct dp_netdev_port *port)
+ {
+     if (port && ovs_refcount_unref(&port->ref_cnt) == 1) {
+         int i;
+         netdev_close(port->netdev);
+         netdev_restore_flags(port->sf);
+         for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+             netdev_rxq_close(port->rxq[i]);
+         }
+         free(port->type);
+         free(port);
+     }
+ }
  static int
  get_port_by_name(struct dp_netdev *dp,
                   const char *devname, struct dp_netdev_port **portp)
@@@ -794,13 -867,11 +875,11 @@@ do_del_port(struct dp_netdev *dp, odp_p
  
      hmap_remove(&dp->ports, &port->node);
      seq_change(dp->port_seq);
+     if (netdev_is_pmd(port->netdev)) {
+         dp_netdev_reload_pmd_threads(dp);
+     }
  
-     netdev_close(port->netdev);
-     netdev_restore_flags(port->sf);
-     netdev_rx_close(port->rx);
-     free(port->type);
-     free(port);
+     port_unref(port);
      return 0;
  }
  
@@@ -849,6 -920,24 +928,24 @@@ dpif_netdev_port_query_by_name(const st
      return error;
  }
  
+ static void
+ dp_netdev_flow_free(struct dp_netdev_flow *flow)
+ {
+     struct dp_netdev_flow_stats *bucket;
+     size_t i;
+     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
+         ovs_mutex_destroy(&bucket->mutex);
+         free_cacheline(bucket);
+     }
+     ovsthread_stats_destroy(&flow->stats);
+     cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr));
+     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
+     ovs_mutex_destroy(&flow->mutex);
+     free(flow);
+ }
  static void
  dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
      OVS_REQ_WRLOCK(dp->cls.rwlock)
  
      classifier_remove(&dp->cls, cr);
      hmap_remove(&dp->flow_table, node);
-     dp_netdev_flow_unref(flow);
- }
- static struct dp_netdev_flow *
- dp_netdev_flow_ref(const struct dp_netdev_flow *flow_)
- {
-     struct dp_netdev_flow *flow = CONST_CAST(struct dp_netdev_flow *, flow_);
-     if (flow) {
-         ovs_refcount_ref(&flow->ref_cnt);
-     }
-     return flow;
- }
- static void
- dp_netdev_flow_unref(struct dp_netdev_flow *flow)
- {
-     if (flow && ovs_refcount_unref(&flow->ref_cnt) == 1) {
-         cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr));
-         ovs_mutex_lock(&flow->mutex);
-         dp_netdev_actions_unref(flow->actions);
-         ovs_refcount_destroy(&flow->ref_cnt);
-         ovs_mutex_unlock(&flow->mutex);
-         ovs_mutex_destroy(&flow->mutex);
-         free(flow);
-     }
+     ovsrcu_postpone(dp_netdev_flow_free, flow);
  }
  
  static void
@@@ -1002,7 -1067,6 +1075,6 @@@ dp_netdev_lookup_flow(const struct dp_n
  
      fat_rwlock_rdlock(&dp->cls.rwlock);
      netdev_flow = dp_netdev_flow_cast(classifier_lookup(&dp->cls, flow, NULL));
-     dp_netdev_flow_ref(netdev_flow);
      fat_rwlock_unlock(&dp->cls.rwlock);
  
      return netdev_flow;
@@@ -1017,7 -1081,7 +1089,7 @@@ dp_netdev_find_flow(const struct dp_net
      HMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0),
                               &dp->flow_table) {
          if (flow_equal(&netdev_flow->flow, flow)) {
-             return dp_netdev_flow_ref(netdev_flow);
+             return netdev_flow;
          }
      }
  
  static void
  get_dpif_flow_stats(struct dp_netdev_flow *netdev_flow,
                      struct dpif_flow_stats *stats)
-     OVS_REQ_RDLOCK(netdev_flow->mutex)
  {
-     stats->n_packets = netdev_flow->packet_count;
-     stats->n_bytes = netdev_flow->byte_count;
-     stats->used = netdev_flow->used;
-     stats->tcp_flags = netdev_flow->tcp_flags;
+     struct dp_netdev_flow_stats *bucket;
+     size_t i;
+     memset(stats, 0, sizeof *stats);
+     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
+         ovs_mutex_lock(&bucket->mutex);
+         stats->n_packets += bucket->packet_count;
+         stats->n_bytes += bucket->byte_count;
+         stats->used = MAX(stats->used, bucket->used);
+         stats->tcp_flags |= bucket->tcp_flags;
+         ovs_mutex_unlock(&bucket->mutex);
+     }
  }
  
  static int
@@@ -1141,24 -1212,17 +1220,17 @@@ dpif_netdev_flow_get(const struct dpif 
      fat_rwlock_unlock(&dp->cls.rwlock);
  
      if (netdev_flow) {
-         struct dp_netdev_actions *actions = NULL;
-         ovs_mutex_lock(&netdev_flow->mutex);
          if (stats) {
              get_dpif_flow_stats(netdev_flow, stats);
          }
-         if (actionsp) {
-             actions = dp_netdev_actions_ref(netdev_flow->actions);
-         }
-         ovs_mutex_unlock(&netdev_flow->mutex);
-         dp_netdev_flow_unref(netdev_flow);
  
          if (actionsp) {
+             struct dp_netdev_actions *actions;
+             actions = dp_netdev_flow_get_actions(netdev_flow);
              *actionsp = ofpbuf_clone_data(actions->actions, actions->size);
-             dp_netdev_actions_unref(actions);
          }
-     } else {
+      } else {
          error = ENOENT;
      }
  
@@@ -1177,12 -1241,13 +1249,13 @@@ dp_netdev_flow_add(struct dp_netdev *dp
  
      netdev_flow = xzalloc(sizeof *netdev_flow);
      *CONST_CAST(struct flow *, &netdev_flow->flow) = *flow;
-     ovs_refcount_init(&netdev_flow->ref_cnt);
  
      ovs_mutex_init(&netdev_flow->mutex);
-     ovs_mutex_lock(&netdev_flow->mutex);
  
-     netdev_flow->actions = dp_netdev_actions_create(actions, actions_len);
+     ovsthread_stats_init(&netdev_flow->stats);
+     ovsrcu_set(&netdev_flow->actions,
+                dp_netdev_actions_create(actions, actions_len));
  
      match_init(&match, flow, wc);
      cls_rule_init(CONST_CAST(struct cls_rule *, &netdev_flow->cr),
                  flow_hash(flow, 0));
      fat_rwlock_unlock(&dp->cls.rwlock);
  
-     ovs_mutex_unlock(&netdev_flow->mutex);
      return 0;
  }
  
  static void
  clear_stats(struct dp_netdev_flow *netdev_flow)
-     OVS_REQUIRES(netdev_flow->mutex)
  {
-     netdev_flow->used = 0;
-     netdev_flow->packet_count = 0;
-     netdev_flow->byte_count = 0;
-     netdev_flow->tcp_flags = 0;
+     struct dp_netdev_flow_stats *bucket;
+     size_t i;
+     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
+         ovs_mutex_lock(&bucket->mutex);
+         bucket->used = 0;
+         bucket->packet_count = 0;
+         bucket->byte_count = 0;
+         bucket->tcp_flags = 0;
+         ovs_mutex_unlock(&bucket->mutex);
+     }
  }
  
  static int
@@@ -1255,25 -1324,23 +1332,23 @@@ dpif_netdev_flow_put(struct dpif *dpif
              new_actions = dp_netdev_actions_create(put->actions,
                                                     put->actions_len);
  
-             ovs_mutex_lock(&netdev_flow->mutex);
-             old_actions = netdev_flow->actions;
-             netdev_flow->actions = new_actions;
+             old_actions = dp_netdev_flow_get_actions(netdev_flow);
+             ovsrcu_set(&netdev_flow->actions, new_actions);
              if (put->stats) {
                  get_dpif_flow_stats(netdev_flow, put->stats);
              }
              if (put->flags & DPIF_FP_ZERO_STATS) {
                  clear_stats(netdev_flow);
              }
-             ovs_mutex_unlock(&netdev_flow->mutex);
  
-             dp_netdev_actions_unref(old_actions);
+             ovsrcu_postpone(dp_netdev_actions_free, old_actions);
          } else if (put->flags & DPIF_FP_CREATE) {
              error = EEXIST;
          } else {
              /* Overlapping flow. */
              error = EINVAL;
          }
-         dp_netdev_flow_unref(netdev_flow);
      }
      ovs_mutex_unlock(&dp->flow_mutex);
  
@@@ -1298,12 -1365,9 +1373,9 @@@ dpif_netdev_flow_del(struct dpif *dpif
      netdev_flow = dp_netdev_find_flow(dp, &key);
      if (netdev_flow) {
          if (del->stats) {
-             ovs_mutex_lock(&netdev_flow->mutex);
              get_dpif_flow_stats(netdev_flow, del->stats);
-             ovs_mutex_unlock(&netdev_flow->mutex);
          }
          dp_netdev_remove_flow(dp, netdev_flow);
-         dp_netdev_flow_unref(netdev_flow);
      } else {
          error = ENOENT;
      }
@@@ -1341,7 -1405,6 +1413,6 @@@ dpif_netdev_flow_dump_state_uninit(voi
  {
      struct dp_netdev_flow_state *state = state_;
  
-     dp_netdev_actions_unref(state->actions);
      free(state);
  }
  
@@@ -1358,6 -1421,7 +1429,7 @@@ dpif_netdev_flow_dump_start(const struc
      return 0;
  }
  
+ /* XXX the caller must use 'actions' without quiescing */
  static int
  dpif_netdev_flow_dump_next(const struct dpif *dpif, void *iter_, void *state_,
                             const struct nlattr **key, size_t *key_len,
          node = hmap_at_position(&dp->flow_table, &iter->bucket, &iter->offset);
          if (node) {
              netdev_flow = CONTAINER_OF(node, struct dp_netdev_flow, node);
-             dp_netdev_flow_ref(netdev_flow);
          }
          fat_rwlock_unlock(&dp->cls.rwlock);
          if (!node) {
      }
  
      if (actions || stats) {
-         dp_netdev_actions_unref(state->actions);
          state->actions = NULL;
  
-         ovs_mutex_lock(&netdev_flow->mutex);
          if (actions) {
-             state->actions = dp_netdev_actions_ref(netdev_flow->actions);
+             state->actions = dp_netdev_flow_get_actions(netdev_flow);
              *actions = state->actions->actions;
              *actions_len = state->actions->size;
          }
          if (stats) {
              get_dpif_flow_stats(netdev_flow, &state->stats);
              *stats = &state->stats;
          }
-         ovs_mutex_unlock(&netdev_flow->mutex);
      }
  
-     dp_netdev_flow_unref(netdev_flow);
      return 0;
  }
  
@@@ -1465,16 -1524,84 +1532,84 @@@ dpif_netdev_execute(struct dpif *dpif, 
      flow_extract(execute->packet, md, &key);
  
      ovs_rwlock_rdlock(&dp->port_rwlock);
-     dp_netdev_execute_actions(dp, &key, execute->packet, md, execute->actions,
-                               execute->actions_len);
+     dp_netdev_execute_actions(dp, &key, execute->packet, false, md,
+                               execute->actions, execute->actions_len);
      ovs_rwlock_unlock(&dp->port_rwlock);
  
      return 0;
  }
  
+ static void
+ dp_netdev_destroy_all_queues(struct dp_netdev *dp)
+     OVS_REQ_WRLOCK(dp->queue_rwlock)
+ {
+     size_t i;
+     dp_netdev_purge_queues(dp);
+     for (i = 0; i < dp->n_handlers; i++) {
+         struct dp_netdev_queue *q = &dp->handler_queues[i];
+         ovs_mutex_destroy(&q->mutex);
+         seq_destroy(q->seq);
+     }
+     free(dp->handler_queues);
+     dp->handler_queues = NULL;
+     dp->n_handlers = 0;
+ }
+ static void
+ dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
+     OVS_REQ_WRLOCK(dp->queue_rwlock)
+ {
+     if (dp->n_handlers != n_handlers) {
+         size_t i;
+         dp_netdev_destroy_all_queues(dp);
+         dp->n_handlers = n_handlers;
+         dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
+         for (i = 0; i < n_handlers; i++) {
+             struct dp_netdev_queue *q = &dp->handler_queues[i];
+             ovs_mutex_init(&q->mutex);
+             q->seq = seq_create();
+         }
+     }
+ }
  static int
- dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
+ dpif_netdev_recv_set(struct dpif *dpif, bool enable)
  {
+     struct dp_netdev *dp = get_dp_netdev(dpif);
+     if ((dp->handler_queues != NULL) == enable) {
+         return 0;
+     }
+     fat_rwlock_wrlock(&dp->queue_rwlock);
+     if (!enable) {
+         dp_netdev_destroy_all_queues(dp);
+     } else {
+         dp_netdev_refresh_queues(dp, 1);
+     }
+     fat_rwlock_unlock(&dp->queue_rwlock);
+     return 0;
+ }
+ static int
+ dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
+ {
+     struct dp_netdev *dp = get_dp_netdev(dpif);
+     fat_rwlock_wrlock(&dp->queue_rwlock);
+     if (dp->handler_queues) {
+         dp_netdev_refresh_queues(dp, n_handlers);
+     }
+     fat_rwlock_unlock(&dp->queue_rwlock);
      return 0;
  }
  
@@@ -1486,62 -1613,86 +1621,86 @@@ dpif_netdev_queue_to_priority(const str
      return 0;
  }
  
- static struct dp_netdev_queue *
find_nonempty_queue(struct dp_netdev *dp)
-     OVS_REQUIRES(dp->queue_mutex)
+ static bool
dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
+     OVS_REQ_RDLOCK(dp->queue_rwlock)
  {
-     int i;
+     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
  
-     for (i = 0; i < N_QUEUES; i++) {
-         struct dp_netdev_queue *q = &dp->queues[i];
-         if (q->head != q->tail) {
-             return q;
-         }
+     if (!dp->handler_queues) {
+         VLOG_WARN_RL(&rl, "receiving upcall disabled");
+         return false;
      }
-     return NULL;
+     if (handler_id >= dp->n_handlers) {
+         VLOG_WARN_RL(&rl, "handler index out of bound");
+         return false;
+     }
+     return true;
  }
  
  static int
- dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
-                  struct ofpbuf *buf)
+ dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
+                  struct dpif_upcall *upcall, struct ofpbuf *buf)
  {
      struct dp_netdev *dp = get_dp_netdev(dpif);
      struct dp_netdev_queue *q;
-     int error;
+     int error = 0;
+     fat_rwlock_rdlock(&dp->queue_rwlock);
  
-     ovs_mutex_lock(&dp->queue_mutex);
-     q = find_nonempty_queue(dp);
-     if (q) {
+     if (!dp_netdev_recv_check(dp, handler_id)) {
+         error = EAGAIN;
+         goto out;
+     }
+     q = &dp->handler_queues[handler_id];
+     ovs_mutex_lock(&q->mutex);
+     if (q->head != q->tail) {
          struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
  
          *upcall = u->upcall;
  
          ofpbuf_uninit(buf);
          *buf = u->buf;
-         error = 0;
      } else {
          error = EAGAIN;
      }
-     ovs_mutex_unlock(&dp->queue_mutex);
+     ovs_mutex_unlock(&q->mutex);
+ out:
+     fat_rwlock_unlock(&dp->queue_rwlock);
  
      return error;
  }
  
  static void
- dpif_netdev_recv_wait(struct dpif *dpif)
+ dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
  {
      struct dp_netdev *dp = get_dp_netdev(dpif);
+     struct dp_netdev_queue *q;
      uint64_t seq;
  
-     ovs_mutex_lock(&dp->queue_mutex);
-     seq = seq_read(dp->queue_seq);
-     if (find_nonempty_queue(dp)) {
+     fat_rwlock_rdlock(&dp->queue_rwlock);
+     if (!dp_netdev_recv_check(dp, handler_id)) {
+         goto out;
+     }
+     q = &dp->handler_queues[handler_id];
+     ovs_mutex_lock(&q->mutex);
+     seq = seq_read(q->seq);
+     if (q->head != q->tail) {
          poll_immediate_wake();
      } else {
-         seq_wait(dp->queue_seq, seq);
+         seq_wait(q->seq, seq);
      }
-     ovs_mutex_unlock(&dp->queue_mutex);
+     ovs_mutex_unlock(&q->mutex);
+ out:
+     fat_rwlock_unlock(&dp->queue_rwlock);
  }
  
  static void
@@@ -1549,7 -1700,9 +1708,9 @@@ dpif_netdev_recv_purge(struct dpif *dpi
  {
      struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
  
+     fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
      dp_netdev_purge_queues(dpif_netdev->dp);
+     fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
  }
  \f
  /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
@@@ -1561,178 -1714,291 +1722,291 @@@ dp_netdev_actions_create(const struct n
      struct dp_netdev_actions *netdev_actions;
  
      netdev_actions = xmalloc(sizeof *netdev_actions);
-     ovs_refcount_init(&netdev_actions->ref_cnt);
      netdev_actions->actions = xmemdup(actions, size);
      netdev_actions->size = size;
  
      return netdev_actions;
  }
  
- /* Increments 'actions''s refcount. */
  struct dp_netdev_actions *
- dp_netdev_actions_ref(const struct dp_netdev_actions *actions_)
+ dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow)
  {
-     struct dp_netdev_actions *actions;
+     return ovsrcu_get(struct dp_netdev_actions *, &flow->actions);
+ }
+ static void
+ dp_netdev_actions_free(struct dp_netdev_actions *actions)
+ {
+     free(actions->actions);
+     free(actions);
+ }
\f
+ static void
+ dp_netdev_process_rxq_port(struct dp_netdev *dp,
+                           struct dp_netdev_port *port,
+                           struct netdev_rxq *rxq)
+ {
+     struct ofpbuf *packet[NETDEV_MAX_RX_BATCH];
+     int error, c;
+     error = netdev_rxq_recv(rxq, packet, &c);
+     if (!error) {
+         struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no);
+         int i;
  
-     actions = CONST_CAST(struct dp_netdev_actions *, actions_);
-     if (actions) {
-         ovs_refcount_ref(&actions->ref_cnt);
+         for (i = 0; i < c; i++) {
+             dp_netdev_port_input(dp, packet[i], &md);
+         }
+     } else if (error != EAGAIN && error != EOPNOTSUPP) {
+         static struct vlog_rate_limit rl
+             = VLOG_RATE_LIMIT_INIT(1, 5);
+         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
+                     netdev_get_name(port->netdev),
+                     ovs_strerror(error));
      }
-     return actions;
  }
  
- /* Decrements 'actions''s refcount and frees 'actions' if the refcount reaches
-  * 0. */
- void
- dp_netdev_actions_unref(struct dp_netdev_actions *actions)
+ static void
+ dpif_netdev_run(struct dpif *dpif)
  {
-     if (actions && ovs_refcount_unref(&actions->ref_cnt) == 1) {
-         ovs_refcount_destroy(&actions->ref_cnt);
-         free(actions->actions);
-         free(actions);
+     struct dp_netdev_port *port;
+     struct dp_netdev *dp = get_dp_netdev(dpif);
+     ovs_rwlock_rdlock(&dp->port_rwlock);
+     HMAP_FOR_EACH (port, node, &dp->ports) {
+         if (!netdev_is_pmd(port->netdev)) {
+             int i;
+             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                 dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
+             }
+         }
      }
+     ovs_rwlock_unlock(&dp->port_rwlock);
  }
\f
+ static void
+ dpif_netdev_wait(struct dpif *dpif)
+ {
+     struct dp_netdev_port *port;
+     struct dp_netdev *dp = get_dp_netdev(dpif);
+     ovs_rwlock_rdlock(&dp->port_rwlock);
+     HMAP_FOR_EACH (port, node, &dp->ports) {
+         if (!netdev_is_pmd(port->netdev)) {
+             int i;
+             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                 netdev_rxq_wait(port->rxq[i]);
+             }
+         }
+     }
+     ovs_rwlock_unlock(&dp->port_rwlock);
+ }
+ struct rxq_poll {
+     struct dp_netdev_port *port;
+     struct netdev_rxq *rx;
+ };
+ static int
+ pmd_load_queues(struct pmd_thread *f,
+                 struct rxq_poll **ppoll_list, int poll_cnt)
+ {
+     struct dp_netdev *dp = f->dp;
+     struct rxq_poll *poll_list = *ppoll_list;
+     struct dp_netdev_port *port;
+     int id = f->id;
+     int index;
+     int i;
+     /* Simple scheduler for netdev rx polling. */
+     ovs_rwlock_rdlock(&dp->port_rwlock);
+     for (i = 0; i < poll_cnt; i++) {
+          port_unref(poll_list[i].port);
+     }
+     poll_cnt = 0;
+     index = 0;
+     HMAP_FOR_EACH (port, node, &f->dp->ports) {
+         if (netdev_is_pmd(port->netdev)) {
+             int i;
+             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                 if ((index % dp->n_pmd_threads) == id) {
+                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
+                     port_ref(port);
+                     poll_list[poll_cnt].port = port;
+                     poll_list[poll_cnt].rx = port->rxq[i];
+                     poll_cnt++;
+                 }
+                 index++;
+             }
+         }
+     }
+     ovs_rwlock_unlock(&dp->port_rwlock);
+     *ppoll_list = poll_list;
+     return poll_cnt;
+ }
  static void *
dp_forwarder_main(void *f_)
pmd_thread_main(void *f_)
  {
-     struct dp_forwarder *f = f_;
+     struct pmd_thread *f = f_;
      struct dp_netdev *dp = f->dp;
-     struct ofpbuf packet;
+     unsigned int lc = 0;
+     struct rxq_poll *poll_list;
+     unsigned int port_seq;
+     int poll_cnt;
+     int i;
  
-     f->name = xasprintf("forwarder_%u", ovsthread_id_self());
+     f->name = xasprintf("pmd_%u", ovsthread_id_self());
      set_subprogram_name("%s", f->name);
+     poll_cnt = 0;
+     poll_list = NULL;
+     pmd_thread_setaffinity_cpu(f->id);
+ reload:
+     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+     atomic_read(&f->change_seq, &port_seq);
  
-     ofpbuf_init(&packet, 0);
-     while (!latch_is_set(&dp->exit_latch)) {
-         bool received_anything;
+     for (;;) {
+         unsigned int c_port_seq;
          int i;
  
-         ovs_rwlock_rdlock(&dp->port_rwlock);
-         for (i = 0; i < 50; i++) {
-             struct dp_netdev_port *port;
-             received_anything = false;
-             HMAP_FOR_EACH (port, node, &f->dp->ports) {
-                 if (port->rx
-                     && port->node.hash >= f->min_hash
-                     && port->node.hash <= f->max_hash) {
-                     int buf_size;
-                     int error;
-                     int mtu;
-                     if (netdev_get_mtu(port->netdev, &mtu)) {
-                         mtu = ETH_PAYLOAD_MAX;
-                     }
-                     buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
-                     ofpbuf_clear(&packet);
-                     ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM,
-                                                  buf_size);
-                     error = netdev_rx_recv(port->rx, &packet);
-                     if (!error) {
-                         struct pkt_metadata md
-                             = PKT_METADATA_INITIALIZER(port->port_no);
-                         dp_netdev_port_input(dp, &packet, &md);
-                         received_anything = true;
-                     } else if (error != EAGAIN && error != EOPNOTSUPP) {
-                         static struct vlog_rate_limit rl
-                             = VLOG_RATE_LIMIT_INIT(1, 5);
-                         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
-                                     netdev_get_name(port->netdev),
-                                     ovs_strerror(error));
-                     }
-                 }
-             }
+         for (i = 0; i < poll_cnt; i++) {
+             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
+         }
+         if (lc++ > 1024) {
+             ovsrcu_quiesce();
  
-             if (!received_anything) {
+             /* TODO: need completely userspace based signaling method.
+              * to keep this thread entirely in userspace.
+              * For now using atomic counter. */
+             lc = 0;
+             atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
+             if (c_port_seq != port_seq) {
                  break;
              }
          }
+     }
  
-         if (received_anything) {
-             poll_immediate_wake();
-         } else {
-             struct dp_netdev_port *port;
-             HMAP_FOR_EACH (port, node, &f->dp->ports)
-                 if (port->rx
-                     && port->node.hash >= f->min_hash
-                     && port->node.hash <= f->max_hash) {
-                     netdev_rx_wait(port->rx);
-                 }
-             seq_wait(dp->port_seq, seq_read(dp->port_seq));
-             latch_wait(&dp->exit_latch);
-         }
-         ovs_rwlock_unlock(&dp->port_rwlock);
+     if (!latch_is_set(&f->dp->exit_latch)){
+         goto reload;
+     }
  
-         poll_block();
+     for (i = 0; i < poll_cnt; i++) {
+          port_unref(poll_list[i].port);
      }
-     ofpbuf_uninit(&packet);
  
+     free(poll_list);
      free(f->name);
      return NULL;
  }
  
  static void
- dp_netdev_set_threads(struct dp_netdev *dp, int n)
+ dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
  {
      int i;
  
-     if (n == dp->n_forwarders) {
+     if (n == dp->n_pmd_threads) {
          return;
      }
  
      /* Stop existing threads. */
      latch_set(&dp->exit_latch);
-     for (i = 0; i < dp->n_forwarders; i++) {
-         struct dp_forwarder *f = &dp->forwarders[i];
+     dp_netdev_reload_pmd_threads(dp);
+     for (i = 0; i < dp->n_pmd_threads; i++) {
+         struct pmd_thread *f = &dp->pmd_threads[i];
  
          xpthread_join(f->thread, NULL);
      }
      latch_poll(&dp->exit_latch);
-     free(dp->forwarders);
+     free(dp->pmd_threads);
  
      /* Start new threads. */
-     dp->forwarders = xmalloc(n * sizeof *dp->forwarders);
-     dp->n_forwarders = n;
+     dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
+     dp->n_pmd_threads = n;
      for (i = 0; i < n; i++) {
-         struct dp_forwarder *f = &dp->forwarders[i];
+         struct pmd_thread *f = &dp->pmd_threads[i];
  
          f->dp = dp;
-         f->min_hash = UINT32_MAX / n * i;
-         f->max_hash = UINT32_MAX / n * (i + 1) - 1;
-         if (i == n - 1) {
-             f->max_hash = UINT32_MAX;
-         }
-         xpthread_create(&f->thread, NULL, dp_forwarder_main, f);
+         f->id = i;
+         atomic_store(&f->change_seq, 1);
+         /* Each thread will distribute all devices rx-queues among
+          * themselves. */
+         xpthread_create(&f->thread, NULL, pmd_thread_main, f);
      }
  }
  \f
+ static void *
+ dp_netdev_flow_stats_new_cb(void)
+ {
+     struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+     ovs_mutex_init(&bucket->mutex);
+     return bucket;
+ }
  static void
  dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
-                     const struct ofpbuf *packet)
-     OVS_REQUIRES(netdev_flow->mutex)
+                     const struct ofpbuf *packet,
+                     const struct flow *key)
+ {
+     uint16_t tcp_flags = ntohs(key->tcp_flags);
+     long long int now = time_msec();
+     struct dp_netdev_flow_stats *bucket;
+     bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
+                                         dp_netdev_flow_stats_new_cb);
+     ovs_mutex_lock(&bucket->mutex);
+     bucket->used = MAX(now, bucket->used);
+     bucket->packet_count++;
+     bucket->byte_count += packet->size;
+     bucket->tcp_flags |= tcp_flags;
+     ovs_mutex_unlock(&bucket->mutex);
+ }
+ static void *
+ dp_netdev_stats_new_cb(void)
+ {
+     struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
+     ovs_mutex_init(&bucket->mutex);
+     return bucket;
+ }
+ static void
+ dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
  {
-     netdev_flow->used = time_msec();
-     netdev_flow->packet_count++;
-     netdev_flow->byte_count += packet->size;
-     netdev_flow->tcp_flags |= packet_get_tcp_flags(packet, &netdev_flow->flow);
+     struct dp_netdev_stats *bucket;
+     bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
+     ovs_mutex_lock(&bucket->mutex);
+     bucket->n[type]++;
+     ovs_mutex_unlock(&bucket->mutex);
  }
  
  static void
  dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
                       struct pkt_metadata *md)
-     OVS_REQ_RDLOCK(dp->port_rwlock)
  {
      struct dp_netdev_flow *netdev_flow;
      struct flow key;
  
      if (packet->size < ETH_HEADER_LEN) {
+         ofpbuf_delete(packet);
          return;
      }
      flow_extract(packet, md, &key);
      if (netdev_flow) {
          struct dp_netdev_actions *actions;
  
-         ovs_mutex_lock(&netdev_flow->mutex);
-         dp_netdev_flow_used(netdev_flow, packet);
-         actions = dp_netdev_actions_ref(netdev_flow->actions);
-         ovs_mutex_unlock(&netdev_flow->mutex);
+         dp_netdev_flow_used(netdev_flow, packet, &key);
  
-         dp_netdev_execute_actions(dp, &key, packet, md,
+         actions = dp_netdev_flow_get_actions(netdev_flow);
+         dp_netdev_execute_actions(dp, &key, packet, true, md,
                                    actions->actions, actions->size);
-         dp_netdev_actions_unref(actions);
-         dp_netdev_flow_unref(netdev_flow);
-         ovsthread_counter_inc(dp->n_hit, 1);
-     } else {
-         ovsthread_counter_inc(dp->n_missed, 1);
-         dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
+         dp_netdev_count_packet(dp, DP_STAT_HIT);
+     } else if (dp->handler_queues) {
+         dp_netdev_count_packet(dp, DP_STAT_MISS);
+         dp_netdev_output_userspace(dp, packet,
+                                    flow_hash_5tuple(&key, 0) % dp->n_handlers,
+                                    DPIF_UC_MISS, &key, NULL);
+         ofpbuf_delete(packet);
      }
  }
  
  static int
  dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
-                            int queue_no, const struct flow *flow,
+                            int queue_no, int type, const struct flow *flow,
                             const struct nlattr *userdata)
-     OVS_EXCLUDED(dp->queue_mutex)
  {
-     struct dp_netdev_queue *q = &dp->queues[queue_no];
+     struct dp_netdev_queue *q;
      int error;
  
-     ovs_mutex_lock(&dp->queue_mutex);
+     fat_rwlock_rdlock(&dp->queue_rwlock);
+     q = &dp->handler_queues[queue_no];
+     ovs_mutex_lock(&q->mutex);
      if (q->head - q->tail < MAX_QUEUE_LEN) {
          struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
          struct dpif_upcall *upcall = &u->upcall;
          struct ofpbuf *buf = &u->buf;
          size_t buf_size;
  
-         upcall->type = queue_no;
+         upcall->type = type;
  
          /* Allocate buffer big enough for everything. */
          buf_size = ODPUTIL_FLOW_KEY_BYTES;
          if (userdata) {
              buf_size += NLA_ALIGN(userdata->nla_len);
          }
+         buf_size += packet->size;
          ofpbuf_init(buf, buf_size);
  
          /* Put ODP flow. */
                                            NLA_ALIGN(userdata->nla_len));
          }
  
-         /* Steal packet data. */
-         ovs_assert(packet->source == OFPBUF_MALLOC);
-         upcall->packet = *packet;
-         ofpbuf_use(packet, NULL, 0);
+         upcall->packet.data = ofpbuf_put(buf, packet->data, packet->size);
+         upcall->packet.size = packet->size;
  
-         seq_change(dp->queue_seq);
+         seq_change(q->seq);
  
          error = 0;
      } else {
-         ovsthread_counter_inc(dp->n_lost, 1);
+         dp_netdev_count_packet(dp, DP_STAT_LOST);
          error = ENOBUFS;
      }
-     ovs_mutex_unlock(&dp->queue_mutex);
+     ovs_mutex_unlock(&q->mutex);
+     fat_rwlock_unlock(&dp->queue_rwlock);
  
      return error;
  }
@@@ -1816,7 -2082,7 +2090,7 @@@ struct dp_netdev_execute_aux 
  
  static void
  dp_execute_cb(void *aux_, struct ofpbuf *packet,
-               const struct pkt_metadata *md OVS_UNUSED,
+               struct pkt_metadata *md,
                const struct nlattr *a, bool may_steal)
      OVS_NO_THREAD_SAFETY_ANALYSIS
  {
      case OVS_ACTION_ATTR_OUTPUT:
          p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
          if (p) {
-             netdev_send(p->netdev, packet);
+             netdev_send(p->netdev, packet, may_steal);
          }
          break;
  
  
          userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
  
-         /* Make a copy if we are not allowed to steal the packet's data. */
-         if (!may_steal) {
-             packet = ofpbuf_clone_with_headroom(packet, DP_NETDEV_HEADROOM);
-         }
-         dp_netdev_output_userspace(aux->dp, packet, DPIF_UC_ACTION, aux->key,
+         dp_netdev_output_userspace(aux->dp, packet,
+                                    flow_hash_5tuple(aux->key, 0)
+                                        % aux->dp->n_handlers,
+                                    DPIF_UC_ACTION, aux->key,
                                     userdata);
-         if (!may_steal) {
-             ofpbuf_uninit(packet);
+         if (may_steal) {
+             ofpbuf_delete(packet);
          }
          break;
      }
+     case OVS_ACTION_ATTR_RECIRC: {
+         const struct ovs_action_recirc *act;
+         act = nl_attr_get(a);
+         md->recirc_id = act->recirc_id;
+         md->dp_hash = 0;
+         if (act->hash_alg == OVS_RECIRC_HASH_ALG_L4) {
+             struct flow flow;
+             flow_extract(packet, md, &flow);
+             md->dp_hash = flow_hash_symmetric_l4(&flow, act->hash_bias);
+         }
+         dp_netdev_port_input(aux->dp, packet, md);
+         break;
+     }
      case OVS_ACTION_ATTR_PUSH_VLAN:
      case OVS_ACTION_ATTR_POP_VLAN:
      case OVS_ACTION_ATTR_PUSH_MPLS:
      case __OVS_ACTION_ATTR_MAX:
          OVS_NOT_REACHED();
      }
  }
  
  static void
  dp_netdev_execute_actions(struct dp_netdev *dp, const struct flow *key,
-                           struct ofpbuf *packet, struct pkt_metadata *md,
+                           struct ofpbuf *packet, bool may_steal,
+                           struct pkt_metadata *md,
                            const struct nlattr *actions, size_t actions_len)
-     OVS_REQ_RDLOCK(dp->port_rwlock)
  {
      struct dp_netdev_execute_aux aux = {dp, key};
  
-     odp_execute_actions(&aux, packet, md, actions, actions_len, dp_execute_cb);
+     odp_execute_actions(&aux, packet, may_steal, md,
+                         actions, actions_len, dp_execute_cb);
  }
  
-     NULL,                                     \
-     NULL,                                     \
 +#define DPIF_NETDEV_CLASS_FUNCTIONS                   \
 +    dpif_netdev_enumerate,                            \
 +    dpif_netdev_port_open_type,                               \
 +    dpif_netdev_open,                                 \
 +    dpif_netdev_close,                                        \
 +    dpif_netdev_destroy,                              \
++    dpif_netdev_run,                                  \
++    dpif_netdev_wait,                                 \
 +    dpif_netdev_get_stats,                            \
 +    dpif_netdev_port_add,                             \
 +    dpif_netdev_port_del,                             \
 +    dpif_netdev_port_query_by_number,                 \
 +    dpif_netdev_port_query_by_name,                   \
 +    NULL,                       /* port_get_pid */    \
 +    dpif_netdev_port_dump_start,                      \
 +    dpif_netdev_port_dump_next,                               \
 +    dpif_netdev_port_dump_done,                               \
 +    dpif_netdev_port_poll,                            \
 +    dpif_netdev_port_poll_wait,                               \
 +    dpif_netdev_flow_get,                             \
 +    dpif_netdev_flow_put,                             \
 +    dpif_netdev_flow_del,                             \
 +    dpif_netdev_flow_flush,                           \
 +    dpif_netdev_flow_dump_state_init,   \
 +    dpif_netdev_flow_dump_start,                      \
 +    dpif_netdev_flow_dump_next,                               \
 +    NULL,                                   \
 +    dpif_netdev_flow_dump_done,                               \
 +    dpif_netdev_flow_dump_state_uninit,     \
 +    dpif_netdev_execute,                              \
 +    NULL,                       /* operate */         \
 +    dpif_netdev_recv_set,                             \
++    dpif_netdev_handlers_set,           \
 +    dpif_netdev_queue_to_priority,                    \
 +    dpif_netdev_recv,                                 \
 +    dpif_netdev_recv_wait,                            \
 +    dpif_netdev_recv_purge,                           \
 +
  const struct dpif_class dpif_netdev_class = {
      "netdev",
 -    dpif_netdev_enumerate,
 -    dpif_netdev_port_open_type,
 -    dpif_netdev_open,
 -    dpif_netdev_close,
 -    dpif_netdev_destroy,
 -    dpif_netdev_run,
 -    dpif_netdev_wait,
 -    dpif_netdev_get_stats,
 -    dpif_netdev_port_add,
 -    dpif_netdev_port_del,
 -    dpif_netdev_port_query_by_number,
 -    dpif_netdev_port_query_by_name,
 -    NULL,                       /* port_get_pid */
 -    dpif_netdev_port_dump_start,
 -    dpif_netdev_port_dump_next,
 -    dpif_netdev_port_dump_done,
 -    dpif_netdev_port_poll,
 -    dpif_netdev_port_poll_wait,
 -    dpif_netdev_flow_get,
 -    dpif_netdev_flow_put,
 -    dpif_netdev_flow_del,
 -    dpif_netdev_flow_flush,
 -    dpif_netdev_flow_dump_state_init,
 -    dpif_netdev_flow_dump_start,
 -    dpif_netdev_flow_dump_next,
 -    NULL,
 -    dpif_netdev_flow_dump_done,
 -    dpif_netdev_flow_dump_state_uninit,
 -    dpif_netdev_execute,
 -    NULL,                       /* operate */
 -    dpif_netdev_recv_set,
 -    dpif_netdev_handlers_set,
 -    dpif_netdev_queue_to_priority,
 -    dpif_netdev_recv,
 -    dpif_netdev_recv_wait,
 -    dpif_netdev_recv_purge,
 +    DPIF_NETDEV_CLASS_FUNCTIONS
 +};
 +
 +const struct dpif_class dpif_planetlab_class = {
 +    "planetlab",
 +    DPIF_NETDEV_CLASS_FUNCTIONS
  };
  
  static void
@@@ -1996,4 -2276,3 +2292,4 @@@ dpif_dummy_register(bool override
                               "DP PORT NEW-NUMBER",
                               3, 3, dpif_dummy_change_port_number, NULL);
  }
 +
diff --combined lib/dpif-provider.h
@@@ -146,7 -146,16 +146,16 @@@ struct dpif_class 
  
      /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE
       * actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in
-      * flows whose packets arrived on port 'port_no'.
+      * flows whose packets arrived on port 'port_no'.  In the case where the
+      * provider allocates multiple Netlink PIDs to a single port, it may use
+      * 'hash' to spread load among them.  The caller need not use a particular
+      * hash function; a 5-tuple hash is suitable.
+      *
+      * (The datapath implementation might use some different hash function for
+      * distributing packets received via flow misses among PIDs.  This means
+      * that packets received via flow misses might be reordered relative to
+      * packets received via userspace actions.  This is not ordinarily a
+      * problem.)
       *
       * A 'port_no' of UINT32_MAX should be treated as a special case.  The
       * implementation should return a reserved PID, not allocated to any port,
       *
       * A dpif provider that doesn't have meaningful Netlink PIDs can use NULL
       * for this function.  This is equivalent to always returning 0. */
-     uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no);
+     uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no,
+                              uint32_t hash);
  
      /* Attempts to begin dumping the ports in a dpif.  On success, returns 0
       * and initializes '*statep' with any data needed for iteration.  On
       * updating flows as necessary if it does this. */
      int (*recv_set)(struct dpif *dpif, bool enable);
  
+     /* Refreshes the poll loops and Netlink sockets associated to each port,
+      * when the number of upcall handlers (upcall receiving thread) is changed
+      * to 'n_handlers' and receiving packets for 'dpif' is enabled by
+      * recv_set().
+      *
+      * Since multiple upcall handlers can read upcalls simultaneously from
+      * 'dpif', each port can have multiple Netlink sockets, one per upcall
+      * handler.  So, handlers_set() is responsible for the following tasks:
+      *
+      *    When receiving upcall is enabled, extends or creates the
+      *    configuration to support:
+      *
+      *        - 'n_handlers' Netlink sockets for each port.
+      *
+      *        - 'n_handlers' poll loops, one for each upcall handler.
+      *
+      *        - registering the Netlink sockets for the same upcall handler to
+      *          the corresponding poll loop.
+      * */
+     int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);
      /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
       * priority value used for setting packet priority. */
      int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
                               uint32_t *priority);
  
-     /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
-      * '*upcall', using 'buf' for storage.  Should only be called if 'recv_set'
-      * has been used to enable receiving packets from 'dpif'.
+     /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
+      * can be multiple poll loops (see ->handlers_set()), 'handler_id' is
+      * needed as index to identify the corresponding poll loop.  If
+      * successful, stores the upcall into '*upcall', using 'buf' for
+      * storage.  Should only be called if 'recv_set' has been used to enable
+      * receiving packets from 'dpif'.
       *
       * The implementation should point 'upcall->key' and 'upcall->userdata'
       * (if any) into data in the caller-provided 'buf'.  The implementation may
       *
       * This function must not block.  If no upcall is pending when it is
       * called, it should return EAGAIN without blocking. */
-     int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
-                 struct ofpbuf *buf);
-     /* Arranges for the poll loop to wake up when 'dpif' has a message queued
-      * to be received with the recv member function. */
-     void (*recv_wait)(struct dpif *dpif);
+     int (*recv)(struct dpif *dpif, uint32_t handler_id,
+                 struct dpif_upcall *upcall, struct ofpbuf *buf);
+     /* Arranges for the poll loop for an upcall handler to wake up when 'dpif'
+      * has a message queued to be received with the recv member functions.
+      * Since there can be multiple poll loops (see ->handlers_set()),
+      * 'handler_id' is needed as index to identify the corresponding poll loop.
+      * */
+     void (*recv_wait)(struct dpif *dpif, uint32_t handler_id);
  
      /* Throws away any queued upcalls that 'dpif' currently has ready to
       * return. */
  
  extern const struct dpif_class dpif_linux_class;
  extern const struct dpif_class dpif_netdev_class;
 +extern const struct dpif_class dpif_planetlab_class;
  
  #ifdef  __cplusplus
  }
diff --combined lib/dpif.c
@@@ -61,7 -61,6 +61,7 @@@ static const struct dpif_class *base_dp
      &dpif_linux_class,
  #endif
      &dpif_netdev_class,
 +    &dpif_planetlab_class,
  };
  
  struct registered_dpif_class {
@@@ -633,9 -632,18 +633,18 @@@ dpif_port_query_by_name(const struct dp
      return error;
  }
  
- /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE actions
-  * as the OVS_USERSPACE_ATTR_PID attribute's value, for use in flows whose
-  * packets arrived on port 'port_no'.
+ /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE
+  * actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in
+  * flows whose packets arrived on port 'port_no'.  In the case where the
+  * provider allocates multiple Netlink PIDs to a single port, it may use
+  * 'hash' to spread load among them.  The caller need not use a particular
+  * hash function; a 5-tuple hash is suitable.
+  *
+  * (The datapath implementation might use some different hash function for
+  * distributing packets received via flow misses among PIDs.  This means
+  * that packets received via flow misses might be reordered relative to
+  * packets received via userspace actions.  This is not ordinarily a
+  * problem.)
   *
   * A 'port_no' of ODPP_NONE is a special case: it returns a reserved PID, not
   * allocated to any port, that the client may use for special purposes.
   * update all of the flows that it installed that contain
   * OVS_ACTION_ATTR_USERSPACE actions. */
  uint32_t
- dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no)
+ dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no, uint32_t hash)
  {
      return (dpif->dpif_class->port_get_pid
-             ? (dpif->dpif_class->port_get_pid)(dpif, port_no)
+             ? (dpif->dpif_class->port_get_pid)(dpif, port_no, hash)
              : 0);
  }
  
@@@ -779,7 -787,7 +788,7 @@@ voi
  dpif_flow_stats_extract(const struct flow *flow, const struct ofpbuf *packet,
                          long long int used, struct dpif_flow_stats *stats)
  {
-     stats->tcp_flags = packet_get_tcp_flags(packet, flow);
+     stats->tcp_flags = ntohs(flow->tcp_flags);
      stats->n_bytes = packet->size;
      stats->n_packets = 1;
      stats->used = used;
@@@ -1100,7 -1108,7 +1109,7 @@@ struct dpif_execute_helper_aux 
   * meaningful. */
  static void
  dpif_execute_helper_cb(void *aux_, struct ofpbuf *packet,
-                        const struct pkt_metadata *md,
+                        struct pkt_metadata *md,
                         const struct nlattr *action, bool may_steal OVS_UNUSED)
  {
      struct dpif_execute_helper_aux *aux = aux_;
      case OVS_ACTION_ATTR_SET:
      case OVS_ACTION_ATTR_SAMPLE:
      case OVS_ACTION_ATTR_UNSPEC:
+     case OVS_ACTION_ATTR_RECIRC:
      case __OVS_ACTION_ATTR_MAX:
          OVS_NOT_REACHED();
      }
@@@ -1142,7 -1151,7 +1152,7 @@@ dpif_execute_with_help(struct dpif *dpi
  
      COVERAGE_INC(dpif_execute_with_help);
  
-     odp_execute_actions(&aux, execute->packet, &execute->md,
+     odp_execute_actions(&aux, execute->packet, false, &execute->md,
                          execute->actions, execute->actions_len,
                          dpif_execute_helper_cb);
      return aux.error;
@@@ -1295,9 -1304,39 +1305,39 @@@ dpif_recv_set(struct dpif *dpif, bool e
      return error;
  }
  
- /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
-  * '*upcall', using 'buf' for storage.  Should only be called if
-  * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
+ /* Refreshes the poll loops and Netlink sockets associated to each port,
+  * when the number of upcall handlers (upcall receiving thread) is changed
+  * to 'n_handlers' and receiving packets for 'dpif' is enabled by
+  * recv_set().
+  *
+  * Since multiple upcall handlers can read upcalls simultaneously from
+  * 'dpif', each port can have multiple Netlink sockets, one per upcall
+  * handler.  So, handlers_set() is responsible for the following tasks:
+  *
+  *    When receiving upcall is enabled, extends or creates the
+  *    configuration to support:
+  *
+  *        - 'n_handlers' Netlink sockets for each port.
+  *
+  *        - 'n_handlers' poll loops, one for each upcall handler.
+  *
+  *        - registering the Netlink sockets for the same upcall handler to
+  *          the corresponding poll loop.
+  *
+  * Returns 0 if successful, otherwise a positive errno value. */
+ int
+ dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers)
+ {
+     int error = dpif->dpif_class->handlers_set(dpif, n_handlers);
+     log_operation(dpif, "handlers_set", error);
+     return error;
+ }
+ /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
+  * there can be multiple poll loops, 'handler_id' is needed as index to
+  * identify the corresponding poll loop.  If successful, stores the upcall
+  * into '*upcall', using 'buf' for storage.  Should only be called if
+  * 'recv_set' has been used to enable receiving packets from 'dpif'.
   *
   * 'upcall->key' and 'upcall->userdata' point into data in the caller-provided
   * 'buf', so their memory cannot be freed separately from 'buf'.
   * Returns 0 if successful, otherwise a positive errno value.  Returns EAGAIN
   * if no upcall is immediately available. */
  int
- dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)
+ dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,
+           struct ofpbuf *buf)
  {
-     int error = dpif->dpif_class->recv(dpif, upcall, buf);
+     int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
      if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
          struct ds flow;
          char *packet;
@@@ -1348,12 -1388,14 +1389,14 @@@ dpif_recv_purge(struct dpif *dpif
      }
  }
  
- /* Arranges for the poll loop to wake up when 'dpif' has a message queued to be
-  * received with dpif_recv(). */
+ /* Arranges for the poll loop for an upcall handler to wake up when 'dpif'
+  * 'dpif' has a message queued to be received with the recv member
+  * function.  Since there can be multiple poll loops, 'handler_id' is
+  * needed as index to identify the corresponding poll loop. */
  void
- dpif_recv_wait(struct dpif *dpif)
+ dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
  {
-     dpif->dpif_class->recv_wait(dpif);
+     dpif->dpif_class->recv_wait(dpif, handler_id);
  }
  
  /* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'
diff --combined lib/netdev-pltap.c
index 1875c0e,0000000..cbb76ea
mode 100644,000000..100644
--- /dev/null
@@@ -1,874 -1,0 +1,905 @@@
- struct netdev_rx_pltap {
-     struct netdev_rx up;    
 +/*
 + * Copyright (c) 2012 Giuseppe Lettieri
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at:
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +#include <config.h>
 +
 +#include <sys/types.h>
 +#include <unistd.h>
 +#include <fcntl.h>
 +#include <arpa/inet.h>
 +#include <sys/ioctl.h>
 +#include <sys/socket.h>
 +#include <net/if.h>
 +#include <net/if_arp.h>
 +#include <linux/if_tun.h>
 +#include <netinet/in.h>
 +#include <errno.h>
 +
 +#include "flow.h"
 +#include "list.h"
++#include "dpif-netdev.h"
 +#include "netdev-provider.h"
 +#include "odp-util.h"
 +#include "ofp-print.h"
 +#include "ofpbuf.h"
 +#include "packets.h"
 +#include "poll-loop.h"
 +#include "shash.h"
 +#include "sset.h"
 +#include "unixctl.h"
 +#include "socket-util.h"
 +#include "vlog.h"
 +#include "tunalloc.h"
 +
 +VLOG_DEFINE_THIS_MODULE(netdev_pltap);
 +
 +/* Protects 'sync_list'. */
 +static struct ovs_mutex sync_list_mutex = OVS_MUTEX_INITIALIZER;
 +
 +static struct list sync_list OVS_GUARDED_BY(sync_list_mutex)
 +    = LIST_INITIALIZER(&sync_list);
 +
 +struct netdev_pltap {
 +    struct netdev up;
 +
 +    /* In sync_list. */
 +    struct list sync_list OVS_GUARDED_BY(sync_list_mutex);
 +
 +    /* Protects all members below. */
 +    struct ovs_mutex mutex OVS_ACQ_AFTER(sync_list_mutex);
 +
 +    char *real_name;
 +    struct netdev_stats stats;
 +    enum netdev_flags new_flags;
 +    enum netdev_flags flags;
 +    int fd;
 +    struct sockaddr_in local_addr;
 +    int local_netmask;
 +    bool valid_local_ip;
 +    bool valid_local_netmask;
 +    bool sync_flags_needed;
 +    unsigned int change_seq;
 +};
 +
 +
- static struct netdev_rx_pltap*
- netdev_rx_pltap_cast(const struct netdev_rx *rx)
++struct netdev_rxq_pltap {
++    struct netdev_rxq up;    
 +    int fd;
 +};
 +
 +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 +
 +/* Protects 'pltap_netdevs' */
 +static struct ovs_mutex pltap_netdevs_mutex = OVS_MUTEX_INITIALIZER;
 +static struct shash pltap_netdevs OVS_GUARDED_BY(pltap_netdevs_mutex)
 +    = SHASH_INITIALIZER(&pltap_netdevs);
 +
 +static int netdev_pltap_construct(struct netdev *netdev_);
 +
 +static void netdev_pltap_update_seq(struct netdev_pltap *) 
 +    OVS_REQUIRES(dev->mutex);
 +static int get_flags(struct netdev_pltap *dev, enum netdev_flags *flags)
 +    OVS_REQUIRES(dev->mutex);
 +
 +static bool
 +netdev_pltap_finalized(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    return dev->valid_local_ip && dev->valid_local_netmask;
 +}
 +
 +static bool
 +is_netdev_pltap_class(const struct netdev_class *class)
 +{
 +    return class->construct == netdev_pltap_construct;
 +}
 +
 +static struct netdev_pltap *
 +netdev_pltap_cast(const struct netdev *netdev)
 +{
 +    ovs_assert(is_netdev_pltap_class(netdev_get_class(netdev)));
 +    return CONTAINER_OF(netdev, struct netdev_pltap, up);
 +}
 +
-     return CONTAINER_OF(rx, struct netdev_rx_pltap, up);
++static struct netdev_rxq_pltap*
++netdev_rxq_pltap_cast(const struct netdev_rxq *rx)
 +{
 +    ovs_assert(is_netdev_pltap_class(netdev_get_class(rx->netdev)));
- static struct netdev_rx *
- netdev_pltap_rx_alloc(void)
++    return CONTAINER_OF(rx, struct netdev_rxq_pltap, up);
 +}
 +
 +static void sync_needed(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex, sync_list_mutex)
 +{
 +    if (dev->sync_flags_needed)
 +        return;
 +
 +    dev->sync_flags_needed = true;
 +    list_insert(&sync_list, &dev->sync_list);
 +}
 +
 +static void sync_done(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex, sync_list_mutex)
 +{
 +    if (!dev->sync_flags_needed)
 +        return;
 +
 +    (void) list_remove(&dev->sync_list);
 +    dev->sync_flags_needed = false;
 +}
 +
 +static struct netdev *
 +netdev_pltap_alloc(void)
 +{
 +    struct netdev_pltap *netdev = xzalloc(sizeof *netdev);
 +    return &netdev->up;
 +}
 +
 +static int
 +netdev_pltap_construct(struct netdev *netdev_)
 +{
 +    struct netdev_pltap *netdev = netdev_pltap_cast(netdev_);
 +    int error;
 +
 +    ovs_mutex_init(&netdev->mutex);
 +    netdev->real_name = xzalloc(IFNAMSIZ + 1);
 +    memset(&netdev->local_addr, 0, sizeof(netdev->local_addr));
 +    netdev->valid_local_ip = false;
 +    netdev->valid_local_netmask = false;
 +    netdev->flags = 0;
 +    netdev->sync_flags_needed = false;
 +    netdev->change_seq = 1;
 +
 +
 +    /* Open tap device. */
 +    netdev->fd = tun_alloc(IFF_TAP, netdev->real_name);
 +    if (netdev->fd < 0) {
 +        error = errno;
 +        VLOG_WARN("tun_alloc(IFF_TAP, %s) failed: %s",
 +            netdev_get_name(netdev_), ovs_strerror(error));
 +        return error;
 +    }
 +    VLOG_DBG("real_name = %s", netdev->real_name);
 +
 +    /* Make non-blocking. */
 +    error = set_nonblocking(netdev->fd);
 +    if (error) {
 +        return error;
 +    }
 +
 +    ovs_mutex_lock(&pltap_netdevs_mutex);
 +    shash_add(&pltap_netdevs, netdev_get_name(netdev_), netdev);
 +    ovs_mutex_unlock(&pltap_netdevs_mutex);
 +    return 0;
 +}
 +
 +static void
 +netdev_pltap_destruct(struct netdev *netdev_)
 +{
 +    struct netdev_pltap *netdev = netdev_pltap_cast(netdev_);
 +
 +    ovs_mutex_lock(&pltap_netdevs_mutex);
 +    if (netdev->fd != -1)
 +      close(netdev->fd);
 +
 +    if (netdev->sync_flags_needed) {
 +        ovs_mutex_lock(&sync_list_mutex);
 +        (void) list_remove(&netdev->sync_list);
 +        ovs_mutex_unlock(&sync_list_mutex);
 +    }
 +
 +    shash_find_and_delete(&pltap_netdevs,
 +                          netdev_get_name(netdev_));
 +    ovs_mutex_unlock(&pltap_netdevs_mutex);
 +    ovs_mutex_destroy(&netdev->mutex);
 +}
 +
 +static void
 +netdev_pltap_dealloc(struct netdev *netdev_)
 +{
 +    struct netdev_pltap *netdev = netdev_pltap_cast(netdev_);
 +    free(netdev);
 +}
 +
 +static int netdev_pltap_up(struct netdev_pltap *dev) OVS_REQUIRES(dev->mutex);
 +
-     struct netdev_rx_pltap *rx = xzalloc(sizeof *rx);
++static struct netdev_rxq *
++netdev_pltap_rxq_alloc(void)
 +{
- netdev_pltap_rx_construct(struct netdev_rx *rx_)
++    struct netdev_rxq_pltap *rx = xzalloc(sizeof *rx);
 +    return &rx->up;
 +}
 +
 +static int
-     struct netdev_rx_pltap *rx = netdev_rx_pltap_cast(rx_);
++netdev_pltap_rxq_construct(struct netdev_rxq *rx_)
 +{
- netdev_pltap_rx_destruct(struct netdev_rx *rx_ OVS_UNUSED)
++    struct netdev_rxq_pltap *rx = netdev_rxq_pltap_cast(rx_);
 +    struct netdev *netdev_ = rx->up.netdev;
 +    struct netdev_pltap *netdev =
 +        netdev_pltap_cast(netdev_);
 +    int error = 0;
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    rx->fd = netdev->fd;
 +    if (!netdev_pltap_finalized(netdev))
 +        goto out;
 +    error = netdev_pltap_up(netdev);
 +    if (error) {
 +        goto out;
 +    }
 +out:
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return error;
 +}
 +
 +static void
- netdev_pltap_rx_dealloc(struct netdev_rx *rx_)
++netdev_pltap_rxq_destruct(struct netdev_rxq *rx_ OVS_UNUSED)
 +{
 +}
 +
 +static void
-     struct netdev_rx_pltap *rx = netdev_rx_pltap_cast(rx_);
++netdev_pltap_rxq_dealloc(struct netdev_rxq *rx_)
 +{
- netdev_pltap_rx_recv(struct netdev_rx *rx_, struct ofpbuf *buffer)
++    struct netdev_rxq_pltap *rx = netdev_rxq_pltap_cast(rx_);
 +
 +    free(rx);
 +}
 +
 +static int vsys_transaction(const char *script,
 +      const char **preply, char *format, ...)
 +{
 +    char *msg = NULL, *reply = NULL;
 +    const size_t reply_size = 1024;
 +    int ifd = -1, ofd = -1, maxfd;
 +    size_t bytes_to_write, bytes_to_read,
 +           bytes_written = 0, bytes_read = 0;
 +    int error = 0;
 +    char *ofname = NULL, *ifname = NULL;
 +    va_list args;
 +
 +    va_start(args, format);
 +    msg = xvasprintf(format, args);
 +    va_end(args);
 +    reply = (char*)xmalloc(reply_size);
 +    if (!msg || !reply) {
 +      VLOG_ERR("Out of memory");
 +      error = ENOMEM;
 +      goto cleanup;
 +    }
 +
 +    ofname = xasprintf("/vsys/%s.out", script);
 +    ifname = xasprintf("/vsys/%s.in", script);
 +    if (!ofname || !ifname) {
 +      VLOG_ERR("Out of memory");
 +      error = ENOMEM;
 +      goto cleanup;
 +    }
 +
 +    ofd = open(ofname, O_RDONLY | O_NONBLOCK);
 +    if (ofd < 0) {
 +        VLOG_ERR("Cannot open %s: %s", ofname, ovs_strerror(errno));
 +      error = errno;
 +      goto cleanup;
 +    }
 +    ifd = open(ifname, O_WRONLY | O_NONBLOCK);
 +    if (ifd < 0) {
 +        VLOG_ERR("Cannot open %s: %s", ifname, ovs_strerror(errno));
 +      error = errno;
 +      goto cleanup;
 +    }
 +    maxfd = (ifd < ofd) ? ofd : ifd;
 +
 +    bytes_to_write = strlen(msg);
 +    bytes_to_read = reply_size;
 +    while (bytes_to_write || bytes_to_read) {
 +        fd_set readset, writeset, errorset;
 +
 +      FD_ZERO(&readset);
 +      FD_ZERO(&writeset);
 +      FD_ZERO(&errorset);
 +      if (bytes_to_write) {
 +          FD_SET(ifd, &writeset);
 +          FD_SET(ifd, &errorset);
 +      }
 +      FD_SET(ofd, &readset);
 +      FD_SET(ofd, &errorset);
 +      if (select(maxfd + 1, &readset, &writeset, &errorset, NULL) < 0) {
 +          if (errno == EINTR)
 +              continue;
 +          VLOG_ERR("selec error: %s", ovs_strerror(errno));
 +          error = errno;
 +          goto cleanup;
 +      }
 +      if (FD_ISSET(ifd, &errorset) || FD_ISSET(ofd, &errorset)) {
 +          VLOG_ERR("error condition on ifd or ofd");
 +          goto cleanup;
 +      }
 +      if (FD_ISSET(ifd, &writeset)) {
 +          ssize_t n = write(ifd, msg + bytes_written, bytes_to_write);    
 +          if (n < 0) {
 +              if (errno != EAGAIN && errno != EINTR) {
 +                  VLOG_ERR("write on %s: %s", ifname, ovs_strerror(errno));
 +                  error = errno;
 +                  goto cleanup;
 +              }
 +            } else {
 +              bytes_written += n;
 +              bytes_to_write -= n;
 +              if (bytes_to_write == 0)
 +                  close(ifd);
 +          }
 +      }
 +      if (FD_ISSET(ofd, &readset)) {
 +          ssize_t n = read(ofd, reply + bytes_read, bytes_to_read);    
 +          if (n < 0) {
 +              if (errno != EAGAIN && errno != EINTR) {
 +                  VLOG_ERR("read on %s: %s", ofname, ovs_strerror(errno));
 +                  error = errno;
 +                  goto cleanup;
 +              }
 +            } else if (n == 0) {
 +              bytes_to_read = 0;
 +            } else {
 +              bytes_read += n;
 +              bytes_to_read -= n;
 +          }
 +      }
 +    }
 +    if (bytes_read) {
 +      reply[bytes_read] = '\0';
 +      if (preply) {
 +              *preply = reply;
 +              reply = NULL; /* prevent freeing the reply msg */
 +      } else {
 +              VLOG_ERR("%s returned: %s", script, reply);
 +      }
 +      error = EAGAIN;
 +      goto cleanup;
 +    }
 +
 +cleanup:
 +    free(msg);
 +    free(reply);
 +    free(ofname);
 +    free(ifname);
 +    close(ifd);
 +    close(ofd);
 +    return error;
 +}
 +
 +static int
 +netdev_pltap_up(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    if (!netdev_pltap_finalized(dev)) {
 +        return 0;
 +    }
 +    
 +    return vsys_transaction("vif_up", NULL, "%s\n"IP_FMT"\n%d\n",
 +             dev->real_name,
 +             IP_ARGS(dev->local_addr.sin_addr.s_addr),
 +             dev->local_netmask);
 +}
 +
 +static int
 +netdev_pltap_down(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    if (!netdev_pltap_finalized(dev)) {
 +        return 0;
 +    }
 +    
 +    return vsys_transaction("vif_down", NULL, "%s\n", dev->real_name);
 +}
 +
 +static int
 +netdev_pltap_promisc(struct netdev_pltap *dev, bool promisc)
 +    OVS_REQUIRES(dev-mutex)
 +{
 +    if (!netdev_pltap_finalized(dev)) {
 +        return 0;
 +    }
 +
 +    return vsys_transaction("promisc", NULL, "%s\n%s",
 +             dev->real_name,
 +             (promisc ? "" : "-\n"));
 +}
 +
 +static void
 +netdev_pltap_sync_flags(struct netdev_pltap *dev)
 +    OVS_REQUIRES(sync_list_mutex)
 +{
 +
 +    ovs_mutex_lock(&dev->mutex);
 +
 +    if (dev->fd < 0 || !netdev_pltap_finalized(dev)) {
 +      goto out;
 +    }
 +    
 +    VLOG_DBG("sync_flags(%s): current: %s %s  target: %s %s",
 +        dev->real_name,
 +      (dev->flags & NETDEV_UP ? "UP" : "-"),
 +      (dev->flags & NETDEV_PROMISC ? "PROMISC" : "-"),
 +      (dev->new_flags & NETDEV_UP ? "UP" : "-"),
 +      (dev->new_flags & NETDEV_PROMISC ? "PROMISC" : "-"));
 +
 +    if ((dev->new_flags & NETDEV_UP) && !(dev->flags & NETDEV_UP)) {
 +        (void) netdev_pltap_up(dev);
 +    } else if (!(dev->new_flags & NETDEV_UP) && (dev->flags & NETDEV_UP)) {
 +        (void) netdev_pltap_down(dev);
 +    }
 +
 +    if ((dev->new_flags & NETDEV_PROMISC) ^ (dev->flags & NETDEV_PROMISC)) {
 +        (void) netdev_pltap_promisc(dev, dev->new_flags & NETDEV_PROMISC);
 +    }
 +
 +    netdev_pltap_update_seq(dev);
 +
 +out:
 +    sync_done(dev);
 +    ovs_mutex_unlock(&dev->mutex);
 +}
 +
 +
 +static int
 +netdev_pltap_get_config(const struct netdev *dev_, struct smap *args)
 +{
 +    struct netdev_pltap *netdev = netdev_pltap_cast(dev_);
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    if (netdev->valid_local_ip)
 +      smap_add_format(args, "local_ip", IP_FMT,
 +            IP_ARGS(netdev->local_addr.sin_addr.s_addr));
 +    if (netdev->valid_local_netmask)
 +        smap_add_format(args, "local_netmask", "%"PRIu32,
 +            ntohs(netdev->local_netmask));
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return 0;
 +}
 +
 +static int
 +netdev_pltap_set_config(struct netdev *dev_, const struct smap *args)
 +{
 +    struct netdev_pltap *netdev = netdev_pltap_cast(dev_);
 +    struct shash_node *node;
 +
 +    ovs_mutex_lock(&sync_list_mutex);
 +    ovs_mutex_lock(&netdev->mutex);
 +    VLOG_DBG("pltap_set_config(%s)", netdev_get_name(dev_));
 +    SMAP_FOR_EACH(node, args) {
 +        VLOG_DBG("arg: %s->%s", node->name, (char*)node->data);
 +      if (!strcmp(node->name, "local_ip")) {
 +          struct in_addr addr;
 +          if (lookup_ip(node->data, &addr)) {
 +              VLOG_WARN("%s: bad 'local_ip'", node->name);
 +          } else {
 +              netdev->local_addr.sin_addr = addr;
 +              netdev->valid_local_ip = true;
 +          }
 +      } else if (!strcmp(node->name, "local_netmask")) {
 +          netdev->local_netmask = atoi(node->data);
 +          // XXX check valididy
 +          netdev->valid_local_netmask = true;
 +      } else {
 +          VLOG_WARN("%s: unknown argument '%s'", 
 +              netdev_get_name(dev_), node->name);
 +      }
 +    }
 +    if (netdev_pltap_finalized(netdev)) {
 +        netdev->new_flags |= NETDEV_UP;
 +        sync_needed(netdev);
 +    }
 +    ovs_mutex_unlock(&netdev->mutex);
 +    ovs_mutex_unlock(&sync_list_mutex);
 +    return 0;
 +}
 +
 +static int
-     size_t size = ofpbuf_tailroom(buffer);
-     struct netdev_rx_pltap *rx = netdev_rx_pltap_cast(rx_);
++netdev_pltap_rxq_recv(struct netdev_rxq *rx_, struct ofpbuf **packet, int *c)
 +{
-           { .iov_base = buffer->data, .iov_len = size }
++    struct netdev_rxq_pltap *rx = netdev_rxq_pltap_cast(rx_);
 +    struct tun_pi pi;
 +    struct iovec iov[2] = {
 +        { .iov_base = &pi, .iov_len = sizeof(pi) },
-               return 0;
 +    };
++    struct ofpbuf *buffer = NULL;
++    size_t size;
++    int error = 0;
++
++    buffer = ofpbuf_new_with_headroom(VLAN_ETH_HEADER_LEN + ETH_PAYLOAD_MAX,
++        DP_NETDEV_HEADROOM);
++    size = ofpbuf_tailroom(buffer);
++    iov[1].iov_base = buffer->data;
++    iov[1].iov_len = size;
 +    for (;;) {
 +        ssize_t retval;
 +        retval = readv(rx->fd, iov, 2);
 +        if (retval >= 0) {
 +            if (retval <= size) {
 +            buffer->size += retval;
-               return EMSGSIZE;
++              goto out;
 +          } else {
-                     netdev_rx_get_name(rx_), ovs_strerror(errno));
++              error = EMSGSIZE;
++            goto out;
 +          }
 +        } else if (errno != EINTR) {
 +            if (errno != EAGAIN) {
 +                VLOG_WARN_RL(&rl, "error receiveing Ethernet packet on %s: %s",
-             return errno;
++                    netdev_rxq_get_name(rx_), ovs_strerror(errno));
 +            }
- netdev_pltap_rx_wait(struct netdev_rx *rx_)
++            error = errno;
++            goto out;
 +        }
 +    }
++out:
++    if (error) {
++        ofpbuf_delete(buffer);
++    } else {
++        dp_packet_pad(buffer);
++        packet[0] = buffer;
++        *c = 1;
++    }
++
++    return error;
 +}
 +
 +static void
-     struct netdev_rx_pltap *rx = netdev_rx_pltap_cast(rx_);
++netdev_pltap_rxq_wait(struct netdev_rxq *rx_)
 +{
- netdev_pltap_send(struct netdev *netdev_, const void *buffer, size_t size)
++    struct netdev_rxq_pltap *rx = netdev_rxq_pltap_cast(rx_);
 +    struct netdev_pltap *netdev =
 +        netdev_pltap_cast(rx->up.netdev);
 +    if (rx->fd >= 0 && netdev_pltap_finalized(netdev)) {
 +        poll_fd_wait(rx->fd, POLLIN);
 +    }
 +}
 +
 +static int
-       netdev_pltap_cast(netdev_);
++netdev_pltap_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal)
 +{
++    const void *buffer = pkt->data;
++    size_t size = pkt->size;
 +    struct netdev_pltap *dev = 
-       { .iov_base = (char*) buffer, .iov_len = size }
++        netdev_pltap_cast(netdev_);
++    int error = 0;
 +    struct tun_pi pi = { 0, 0x86 };
 +    struct iovec iov[2] = {
 +        { .iov_base = &pi, .iov_len = sizeof(pi) },
-     if (dev->fd < 0)
-         return EAGAIN;
++        { .iov_base = (char*) buffer, .iov_len = size }
 +    };
-           if (retval != size + 4) {
-               VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIdSIZE" bytes of %"PRIuSIZE") on %s",
-                            retval, size + 4, netdev_get_name(netdev_));
-           }
-             return 0;
++    if (dev->fd < 0) {
++        error = EAGAIN;
++        goto out;
++    }
 +    for (;;) {
 +        ssize_t retval;
 +        retval = writev(dev->fd, iov, 2);
 +        if (retval >= 0) {
-                     netdev_get_name(netdev_), ovs_strerror(errno));
++            if (retval != size + 4) {
++                VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIdSIZE" bytes of %"PRIuSIZE") on %s",
++                        retval, size + 4, netdev_get_name(netdev_));
++            }
++            goto out;
 +        } else if (errno != EINTR) {
 +            if (errno != EAGAIN) {
 +                VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
-             return errno;
++                        netdev_get_name(netdev_), ovs_strerror(errno));
 +            }
- netdev_pltap_rx_drain(struct netdev_rx *rx_)
++            error = errno;
++            goto out;
 +        }
 +    }
++out:
++    if (may_steal) {
++        ofpbuf_delete(pkt);
++    }
++    return error;
 +}
 +
 +static void
 +netdev_pltap_send_wait(struct netdev *netdev_)
 +{
 +    struct netdev_pltap *dev = 
 +      netdev_pltap_cast(netdev_);
 +    if (dev->fd >= 0 && netdev_pltap_finalized(dev)) {
 +        poll_fd_wait(dev->fd, POLLOUT);
 +    }
 +}
 +
 +static int
-     struct netdev_rx_pltap *rx = netdev_rx_pltap_cast(rx_);
++netdev_pltap_rxq_drain(struct netdev_rxq *rx_)
 +{
-     netdev_pltap_rx_alloc,
-     netdev_pltap_rx_construct,
-     netdev_pltap_rx_destruct,
-     netdev_pltap_rx_dealloc,
-     netdev_pltap_rx_recv,
-     netdev_pltap_rx_wait,
-     netdev_pltap_rx_drain,
++    struct netdev_rxq_pltap *rx = netdev_rxq_pltap_cast(rx_);
 +    char buffer[128];
 +    int error;
 +
 +    if (rx->fd < 0)
 +      return EAGAIN;
 +    for (;;) {
 +      error = recv(rx->fd, buffer, 128, MSG_TRUNC);
 +      if (error) {
 +            if (error == -EAGAIN)
 +              break;
 +            else if (error != -EMSGSIZE)
 +              return error;
 +      }
 +    }
 +    return 0;
 +}
 +
 +static int
 +netdev_pltap_set_etheraddr(struct netdev *netdevi OVS_UNUSED,
 +                           const uint8_t mac[ETH_ADDR_LEN] OVS_UNUSED)
 +{
 +    return ENOTSUP;
 +}
 +
 +
 +// XXX from netdev-linux.c
 +static int
 +get_etheraddr(struct netdev_pltap *dev, uint8_t ea[ETH_ADDR_LEN])
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    struct ifreq ifr;
 +    int hwaddr_family;
 +    int error;
 +
 +    memset(&ifr, 0, sizeof ifr);
 +    ovs_strzcpy(ifr.ifr_name, dev->real_name, sizeof ifr.ifr_name);
 +    error = af_inet_ifreq_ioctl(dev->real_name, &ifr,
 +        SIOCGIFHWADDR, "SIOCGIFHWADDR");
 +    if (error) {
 +        return error;
 +    }
 +    hwaddr_family = ifr.ifr_hwaddr.sa_family;
 +    if (hwaddr_family != AF_UNSPEC && hwaddr_family != ARPHRD_ETHER) {
 +        VLOG_WARN("%s device has unknown hardware address family %d",
 +                  dev->real_name, hwaddr_family);
 +    }
 +    memcpy(ea, ifr.ifr_hwaddr.sa_data, ETH_ADDR_LEN);
 +    return 0;
 +}
 +
 +static int
 +get_flags(struct netdev_pltap *dev, enum netdev_flags *flags)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    struct ifreq ifr;
 +    int error;
 +
 +    error = af_inet_ifreq_ioctl(dev->real_name, &ifr,
 +        SIOCGIFFLAGS, "SIOCGIFFLAGS");
 +    if (error) {
 +        return error;
 +    }
 +    *flags = 0;
 +    if (ifr.ifr_flags & IFF_UP)
 +      *flags |= NETDEV_UP;
 +    if (ifr.ifr_flags & IFF_PROMISC)
 +        *flags |= NETDEV_PROMISC;
 +    return 0;
 +}
 +
 +static int
 +netdev_pltap_get_etheraddr(const struct netdev *netdev,
 +                           uint8_t mac[ETH_ADDR_LEN])
 +{
 +    struct netdev_pltap *dev = 
 +      netdev_pltap_cast(netdev);
 +    int error = 0;
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    if (dev->fd < 0) {
 +        error = EAGAIN;
 +        goto out;
 +    }
 +    error = get_etheraddr(dev, mac);
 +
 +out:
 +    ovs_mutex_unlock(&dev->mutex);
 +    return error;
 +}
 +
 +
 +// XXX can we read stats in planetlab?
 +static int
 +netdev_pltap_get_stats(const struct netdev *netdev OVS_UNUSED, struct netdev_stats *stats OVS_UNUSED)
 +{
 +    return ENOTSUP;
 +}
 +
 +static int
 +netdev_pltap_set_stats(struct netdev *netdev OVS_UNUSED, const struct netdev_stats *stats OVS_UNUSED)
 +{
 +    return ENOTSUP;
 +}
 +
 +
 +static int
 +netdev_pltap_update_flags(struct netdev *dev_,
 +                          enum netdev_flags off, enum netdev_flags on,
 +                          enum netdev_flags *old_flagsp)
 +{
 +    struct netdev_pltap *netdev =
 +        netdev_pltap_cast(dev_);
 +    int error = 0;
 +
 +    ovs_mutex_lock(&sync_list_mutex);
 +    ovs_mutex_lock(&netdev->mutex);
 +    if ((off | on) & ~(NETDEV_UP | NETDEV_PROMISC)) {
 +        error = EINVAL;
 +        goto out;
 +    }
 +
 +    if (netdev_pltap_finalized(netdev)) {
 +        error = get_flags(netdev, &netdev->flags);
 +    }
 +    *old_flagsp = netdev->flags;
 +    netdev->new_flags |= on;
 +    netdev->new_flags &= ~off;
 +    if (netdev->flags != netdev->new_flags) {
 +      /* we cannot sync here, since we may be in a signal handler */
 +        sync_needed(netdev);
 +    }
 +
 +out:
 +    ovs_mutex_unlock(&netdev->mutex);
 +    ovs_mutex_unlock(&sync_list_mutex);
 +    return error;
 +}
 +
 +static unsigned int
 +netdev_pltap_change_seq(const struct netdev *netdev)
 +{
 +    struct netdev_pltap *dev =
 +        netdev_pltap_cast(netdev);
 +    unsigned int change_seq;
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    change_seq = dev->change_seq;
 +    ovs_mutex_unlock(&dev->mutex);
 +
 +    return change_seq;
 +}
 +\f
 +/* Helper functions. */
 +
 +static void
 +netdev_pltap_update_seq(struct netdev_pltap *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    dev->change_seq++;
 +    if (!dev->change_seq) {
 +        dev->change_seq++;
 +    }
 +}
 +
 +static void
 +netdev_pltap_get_real_name(struct unixctl_conn *conn,
 +                     int argc OVS_UNUSED, const char *argv[], void *aux OVS_UNUSED)
 +{
 +    struct netdev_pltap *pltap_dev;
 +
 +    ovs_mutex_lock(&pltap_netdevs_mutex);
 +    pltap_dev = shash_find_data(&pltap_netdevs, argv[1]);
 +    if (!pltap_dev) {
 +        unixctl_command_reply_error(conn, "no such pltap netdev");
 +        goto out;
 +    }
 +    if (pltap_dev->fd < 0) {
 +      unixctl_command_reply_error(conn, "no real device attached");
 +        goto out;     
 +    }
 +
 +    unixctl_command_reply(conn, pltap_dev->real_name);
 +
 +out:
 +    ovs_mutex_unlock(&pltap_netdevs_mutex);
 +}
 +
 +static int
 +netdev_pltap_init(void)
 +{
 +    unixctl_command_register("netdev-pltap/get-tapname", "port",
 +                             1, 1, netdev_pltap_get_real_name, NULL);
 +    return 0;
 +}
 +
 +static void
 +netdev_pltap_run(void)
 +{
 +    struct netdev_pltap *iter, *next;
 +    ovs_mutex_lock(&sync_list_mutex);
 +    LIST_FOR_EACH_SAFE(iter, next, sync_list, &sync_list) {
 +        netdev_pltap_sync_flags(iter);
 +    }
 +    ovs_mutex_unlock(&sync_list_mutex);
 +}
 +
 +static void
 +netdev_pltap_wait(void)
 +{
 +    ovs_mutex_lock(&sync_list_mutex);
 +    if (!list_is_empty(&sync_list)) {
 +        VLOG_DBG("netdev_pltap: scheduling sync");
 +        poll_immediate_wake();
 +    }
 +    ovs_mutex_unlock(&sync_list_mutex);
 +}
 +
 +const struct netdev_class netdev_pltap_class = {
 +    "pltap",
 +    netdev_pltap_init,
 +    netdev_pltap_run,  
 +    netdev_pltap_wait,            
 +
 +    netdev_pltap_alloc,
 +    netdev_pltap_construct,
 +    netdev_pltap_destruct,
 +    netdev_pltap_dealloc,
 +    netdev_pltap_get_config,
 +    netdev_pltap_set_config, 
 +    NULL,                                 /* get_tunnel_config */
 +
 +    netdev_pltap_send, 
 +    netdev_pltap_send_wait,  
 +
 +    netdev_pltap_set_etheraddr,
 +    netdev_pltap_get_etheraddr,
 +    NULL,                                 /* get_mtu */
 +    NULL,                                 /* set_mtu */
 +    NULL,                       /* get_ifindex */
 +    NULL,                                 /* get_carrier */
 +    NULL,                       /* get_carrier_resets */
 +    NULL,                       /* get_miimon */
 +    netdev_pltap_get_stats,
 +    netdev_pltap_set_stats,
 +
 +    NULL,                       /* get_features */
 +    NULL,                       /* set_advertisements */
 +
 +    NULL,                       /* set_policing */
 +    NULL,                       /* get_qos_types */
 +    NULL,                       /* get_qos_capabilities */
 +    NULL,                       /* get_qos */
 +    NULL,                       /* set_qos */
 +    NULL,                       /* get_queue */
 +    NULL,                       /* set_queue */
 +    NULL,                       /* delete_queue */
 +    NULL,                       /* get_queue_stats */
 +    NULL,                       /* queue_dump_start */
 +    NULL,                       /* queue_dump_next */
 +    NULL,                       /* queue_dump_done */
 +    NULL,                       /* dump_queue_stats */
 +
 +    NULL,                       /* get_in4 */
 +    NULL,                       /* set_in4 */
 +    NULL,                       /* get_in6 */
 +    NULL,                       /* add_router */
 +    NULL,                       /* get_next_hop */
 +    NULL,                       /* get_drv_info */
 +    NULL,                       /* arp_lookup */
 +
 +    netdev_pltap_update_flags,
 +
++    netdev_pltap_rxq_alloc,
++    netdev_pltap_rxq_construct,
++    netdev_pltap_rxq_destruct,
++    netdev_pltap_rxq_dealloc,
++    netdev_pltap_rxq_recv,
++    netdev_pltap_rxq_wait,
++    netdev_pltap_rxq_drain,
 +};
diff --combined lib/netdev-provider.h
@@@ -39,6 -39,7 +39,7 @@@ struct netdev 
                                                  this device. */
  
      /* The following are protected by 'netdev_mutex' (internal to netdev.c). */
+     int n_rxq;
      int ref_cnt;                        /* Times this devices was opened. */
      struct shash_node *node;            /* Pointer to element in global map. */
      struct list saved_flags_list; /* Contains "struct netdev_saved_flags". */
@@@ -56,12 -57,13 +57,13 @@@ void netdev_get_devices(const struct ne
   * Network device implementations may read these members but should not modify
   * them.
   *
-  * None of these members change during the lifetime of a struct netdev_rx. */
- struct netdev_rx {
+  * None of these members change during the lifetime of a struct netdev_rxq. */
+ struct netdev_rxq {
      struct netdev *netdev;      /* Owns a reference to the netdev. */
+     int queue_id;
  };
  
- struct netdev *netdev_rx_get_netdev(const struct netdev_rx *);
+ struct netdev *netdev_rxq_get_netdev(const struct netdev_rxq *);
  
  /* Network device class structure, to be defined by each implementation of a
   * network device.
@@@ -77,7 -79,7 +79,7 @@@
   *
   *   - "struct netdev", which represents a network device.
   *
-  *   - "struct netdev_rx", which represents a handle for capturing packets
+  *   - "struct netdev_rxq", which represents a handle for capturing packets
   *     received on a network device
   *
   * Each of these data structures contains all of the implementation-independent
   *
   * Four stylized functions accompany each of these data structures:
   *
-  *            "alloc"       "construct"       "destruct"       "dealloc"
-  *            ------------  ----------------  ---------------  --------------
-  * netdev     ->alloc       ->construct       ->destruct       ->dealloc
-  * netdev_rx  ->rx_alloc    ->rx_construct    ->rx_destruct    ->rx_dealloc
+  *            "alloc"          "construct"        "destruct"       "dealloc"
+  *            ------------   ----------------  ---------------  --------------
+  * netdev      ->alloc        ->construct        ->destruct        ->dealloc
+  * netdev_rxq  ->rxq_alloc    ->rxq_construct    ->rxq_destruct    ->rxq_dealloc
   *
   * Any instance of a given data structure goes through the following life
   * cycle:
   *      implementation must not refer to base or derived state in the data
   *      structure, because it has already been uninitialized.
   *
+  * If netdev support multi-queue IO then netdev->construct should set initialize
+  * netdev->n_rxq to number of queues.
+  *
   * Each "alloc" function allocates and returns a new instance of the respective
   * data structure.  The "alloc" function is not given any information about the
   * use of the new data structure, so it cannot perform much initialization.
@@@ -223,13 -228,13 +228,13 @@@ struct netdev_class 
      const struct netdev_tunnel_config *
          (*get_tunnel_config)(const struct netdev *netdev);
  
-     /* Sends the 'size'-byte packet in 'buffer' on 'netdev'.  Returns 0 if
-      * successful, otherwise a positive errno value.  Returns EAGAIN without
-      * blocking if the packet cannot be queued immediately.  Returns EMSGSIZE
-      * if a partial packet was transmitted or if the packet is too big or too
-      * small to transmit on the device.
+     /* Sends the buffer on 'netdev'.
+      * Returns 0 if successful, otherwise a positive errno value.  Returns
+      * EAGAIN without blocking if the packet cannot be queued immediately.
+      * Returns EMSGSIZE if a partial packet was transmitted or if the packet
+      * is too big or too small to transmit on the device.
       *
-      * The caller retains ownership of 'buffer' in all cases.
+      * To retain ownership of 'buffer' caller can set may_steal to false.
       *
       * The network device is expected to maintain a packet transmission queue,
       * so that the caller does not ordinarily have to do additional queuing of
       * network device from being usefully used by the netdev-based "userspace
       * datapath".  It will also prevent the OVS implementation of bonding from
       * working properly over 'netdev'.) */
-     int (*send)(struct netdev *netdev, const void *buffer, size_t size);
+     int (*send)(struct netdev *netdev, struct ofpbuf *buffer, bool may_steal);
  
      /* Registers with the poll loop to wake up from the next call to
       * poll_block() when the packet transmission queue for 'netdev' has
      int (*update_flags)(struct netdev *netdev, enum netdev_flags off,
                          enum netdev_flags on, enum netdev_flags *old_flags);
  
- /* ## ------------------- ## */
- /* ## netdev_rx Functions ## */
- /* ## ------------------- ## */
+ /* ## -------------------- ## */
+ /* ## netdev_rxq Functions ## */
+ /* ## -------------------- ## */
  
  /* If a particular netdev class does not support receiving packets, all these
   * function pointers must be NULL. */
  
-     /* Life-cycle functions for a netdev_rx.  See the large comment above on
+     /* Life-cycle functions for a netdev_rxq.  See the large comment above on
       * struct netdev_class. */
-     struct netdev_rx *(*rx_alloc)(void);
-     int (*rx_construct)(struct netdev_rx *);
-     void (*rx_destruct)(struct netdev_rx *);
-     void (*rx_dealloc)(struct netdev_rx *);
-     /* Attempts to receive a packet from 'rx' into the tailroom of 'buffer',
-      * which should initially be empty.  If successful, returns 0 and
-      * increments 'buffer->size' by the number of bytes in the received packet,
-      * otherwise a positive errno value.  Returns EAGAIN immediately if no
-      * packet is ready to be received.
-      *
-      * Must return EMSGSIZE, and discard the packet, if the received packet
-      * is longer than 'ofpbuf_tailroom(buffer)'.
-      *
-      * Implementations may make use of VLAN_HEADER_LEN bytes of tailroom to
-      * add a VLAN header which is obtained out-of-band to the packet. If
-      * this occurs then VLAN_HEADER_LEN bytes of tailroom will no longer be
-      * available for the packet, otherwise it may be used for the packet
-      * itself.
-      *
-      * It is advised that the tailroom of 'buffer' should be
-      * VLAN_HEADER_LEN bytes longer than the MTU to allow space for an
-      * out-of-band VLAN header to be added to the packet.
+     struct netdev_rxq *(*rxq_alloc)(void);
+     int (*rxq_construct)(struct netdev_rxq *);
+     void (*rxq_destruct)(struct netdev_rxq *);
+     void (*rxq_dealloc)(struct netdev_rxq *);
+     /* Attempts to receive batch of packets from 'rx' and place array of pointers
+      * into '*pkt'. netdev is responsible for allocating buffers.
+      * '*cnt' points to packet count for given batch. Once packets are returned
+      * to caller, netdev should give up ownership of ofpbuf data.
+      *
+      * Implementations should allocate buffer with DP_NETDEV_HEADROOM headroom
+      * and add a VLAN header which is obtained out-of-band to the packet.
       *
+      * Caller is expected to pass array of size MAX_RX_BATCH.
       * This function may be set to null if it would always return EOPNOTSUPP
       * anyhow. */
-     int (*rx_recv)(struct netdev_rx *rx, struct ofpbuf *buffer);
+     int (*rxq_recv)(struct netdev_rxq *rx, struct ofpbuf **pkt, int *cnt);
  
      /* Registers with the poll loop to wake up from the next call to
-      * poll_block() when a packet is ready to be received with netdev_rx_recv()
+      * poll_block() when a packet is ready to be received with netdev_rxq_recv()
       * on 'rx'. */
-     void (*rx_wait)(struct netdev_rx *rx);
+     void (*rxq_wait)(struct netdev_rxq *rx);
  
      /* Discards all packets waiting to be received from 'rx'. */
-     int (*rx_drain)(struct netdev_rx *rx);
+     int (*rxq_drain)(struct netdev_rxq *rx);
  };
  
  int netdev_register_provider(const struct netdev_class *);
@@@ -676,9 -671,6 +671,9 @@@ extern const struct netdev_class netdev
  extern const struct netdev_class netdev_bsd_class;
  #endif
  
 +extern const struct netdev_class netdev_tunnel_class;
 +extern const struct netdev_class netdev_pltap_class;
 +
  #ifdef  __cplusplus
  }
  #endif
diff --combined lib/netdev-tunnel.c
index 13cca1f,0000000..dcc5e2c
mode 100644,000000..100644
--- /dev/null
@@@ -1,630 -1,0 +1,662 @@@
- struct netdev_rx_tunnel {
-     struct netdev_rx up;
 +/*
 + * Copyright (c) 2010, 2011, 2012 Nicira Networks.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at:
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +#include <config.h>
 +
 +#include <unistd.h>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +#include <errno.h>
 +
 +#include "flow.h"
 +#include "list.h"
++#include "dpif-netdev.h"
 +#include "netdev-provider.h"
 +#include "odp-util.h"
 +#include "ofp-print.h"
 +#include "ofpbuf.h"
 +#include "packets.h"
 +#include "poll-loop.h"
 +#include "shash.h"
 +#include "sset.h"
 +#include "unixctl.h"
 +#include "socket-util.h"
 +#include "vlog.h"
 +
 +VLOG_DEFINE_THIS_MODULE(netdev_tunnel);
 +
 +struct netdev_tunnel {
 +    struct netdev up;
 +
 +    /* Protects all members below. */
 +    struct ovs_mutex mutex;
 +
 +    uint8_t hwaddr[ETH_ADDR_LEN];
 +    struct netdev_stats stats;
 +    enum netdev_flags flags;
 +    int sockfd;
 +    struct sockaddr_storage local_addr;
 +    struct sockaddr_storage remote_addr;
 +    bool valid_remote_ip;
 +    bool valid_remote_port;
 +    bool connected;
 +    unsigned int change_seq;
 +};
 +
- static struct netdev_rx_tunnel *
- netdev_rx_tunnel_cast(const struct netdev_rx *rx)
++struct netdev_rxq_tunnel {
++    struct netdev_rxq up;
 +    int fd;
 +};
 +
 +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 +
 +static struct ovs_mutex tunnel_netdevs_mutex = OVS_MUTEX_INITIALIZER;
 +static struct shash tunnel_netdevs OVS_GUARDED_BY(tunnel_netdevs_mutex)
 +    = SHASH_INITIALIZER(&tunnel_netdevs);
 +
 +static int netdev_tunnel_construct(struct netdev *netdevp_);
 +static void netdev_tunnel_update_seq(struct netdev_tunnel *);
 +
 +static bool
 +is_netdev_tunnel_class(const struct netdev_class *class)
 +{
 +    return class->construct == netdev_tunnel_construct;
 +}
 +
 +static struct netdev_tunnel *
 +netdev_tunnel_cast(const struct netdev *netdev)
 +{
 +    ovs_assert(is_netdev_tunnel_class(netdev_get_class(netdev)));
 +    return CONTAINER_OF(netdev, struct netdev_tunnel, up);
 +}
 +
-     return CONTAINER_OF(rx, struct netdev_rx_tunnel, up);
++static struct netdev_rxq_tunnel *
++netdev_rxq_tunnel_cast(const struct netdev_rxq *rx)
 +{
 +    ovs_assert(is_netdev_tunnel_class(netdev_get_class(rx->netdev)));
-       smap_add_format(args, "remote_ip", IP_FMT,
-               IP_ARGS(sin->sin_addr.s_addr));
++    return CONTAINER_OF(rx, struct netdev_rxq_tunnel, up);
 +}
 +
 +static struct netdev *
 +netdev_tunnel_alloc(void)
 +{
 +    struct netdev_tunnel *netdev = xzalloc(sizeof *netdev);
 +    return &netdev->up;
 +}
 +
 +static int
 +netdev_tunnel_construct(struct netdev *netdev_)
 +{
 +    static atomic_uint next_n = ATOMIC_VAR_INIT(0);
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(netdev_);
 +    unsigned int n;
 +
 +    atomic_add(&next_n, 1, &n);
 +
 +    ovs_mutex_init(&netdev->mutex);
 +    netdev->hwaddr[0] = 0xfe;
 +    netdev->hwaddr[1] = 0xff;
 +    netdev->hwaddr[2] = 0xff;
 +    netdev->hwaddr[3] = n >> 16;
 +    netdev->hwaddr[4] = n >> 8;
 +    netdev->hwaddr[5] = n;
 +    netdev->flags = 0;
 +    netdev->change_seq = 1;
 +    memset(&netdev->remote_addr, 0, sizeof(netdev->remote_addr));
 +    netdev->valid_remote_ip = false;
 +    netdev->valid_remote_port = false;
 +    netdev->connected = false;
 +
 +
 +    netdev->sockfd = inet_open_passive(SOCK_DGRAM, "", 0,
 +        &netdev->local_addr, 0);
 +    if (netdev->sockfd < 0) {
 +      return netdev->sockfd;
 +    }
 +
 +
 +    shash_add(&tunnel_netdevs, netdev_get_name(netdev_), netdev);
 +
 +    n++;
 +
 +    VLOG_DBG("tunnel_create: name=%s, fd=%d, port=%d",
 +        netdev_get_name(netdev_), netdev->sockfd, ss_get_port(&netdev->local_addr));
 +
 +    return 0;
 +
 +}
 +
 +static void
 +netdev_tunnel_destruct(struct netdev *netdev_)
 +{
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(netdev_);
 +
 +    ovs_mutex_lock(&tunnel_netdevs_mutex);
 +
 +    if (netdev->sockfd != -1)
 +      close(netdev->sockfd);
 +
 +    shash_find_and_delete(&tunnel_netdevs,
 +                          netdev_get_name(netdev_));
 +
 +    ovs_mutex_destroy(&netdev->mutex);
 +    ovs_mutex_unlock(&tunnel_netdevs_mutex);
 +}
 +
 +static void
 +netdev_tunnel_dealloc(struct netdev *netdev_)
 +{
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(netdev_);
 +    free(netdev);
 +}
 +
 +static int
 +netdev_tunnel_get_config(const struct netdev *dev_, struct smap *args)
 +{
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(dev_);
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    if (netdev->valid_remote_ip) {
 +        const struct sockaddr_in *sin =
 +            ALIGNED_CAST(const struct sockaddr_in *, &netdev->remote_addr);
-               ss_get_port(&netdev->remote_addr));
++        smap_add_format(args, "remote_ip", IP_FMT,
++                IP_ARGS(sin->sin_addr.s_addr));
 +    }
 +    if (netdev->valid_remote_port)
 +        smap_add_format(args, "remote_port", "%"PRIu16,
-             ovs_strerror(errno));
++                ss_get_port(&netdev->remote_addr));
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_connect(struct netdev_tunnel *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    char buf[1024];
 +    struct sockaddr_in *sin =
 +        ALIGNED_CAST(struct sockaddr_in *, &dev->remote_addr);
 +    if (dev->sockfd < 0)
 +        return EBADF;
 +    if (!dev->valid_remote_ip || !dev->valid_remote_port)
 +        return 0;
 +    if (connect(dev->sockfd, (struct sockaddr*) sin, sizeof(*sin)) < 0) {
 +        VLOG_DBG("%s: connect returned %s", netdev_get_name(&dev->up),
-         inet_ntop(AF_INET, &sin->sin_addr.s_addr, buf, 1024),
-         ss_get_port(&dev->remote_addr));
++                ovs_strerror(errno));
 +        return errno;
 +    }
 +    dev->connected = true;
 +    netdev_tunnel_update_seq(dev);
 +    VLOG_DBG("%s: connected to (%s, %d)", netdev_get_name(&dev->up),
-       if (!strcmp(node->name, "remote_ip")) {
-           struct in_addr addr;
-           if (lookup_ip(node->data, &addr)) {
-               VLOG_WARN("%s: bad 'remote_ip'", node->name);
-           } else {
-         sin->sin_family = AF_INET;
-               sin->sin_addr = addr;
-               netdev->valid_remote_ip = true;
-           }
-       } else if (!strcmp(node->name, "remote_port")) {
-           sin->sin_port = htons(atoi(node->data));
-           netdev->valid_remote_port = true;
-       } else {
-           VLOG_WARN("%s: unknown argument '%s'", 
-               netdev_get_name(dev_), node->name);
-       }
++            inet_ntop(AF_INET, &sin->sin_addr.s_addr, buf, 1024),
++            ss_get_port(&dev->remote_addr));
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_set_config(struct netdev *dev_, const struct smap *args)
 +{
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(dev_);
 +    struct shash_node *node;
 +    int error;
 +    struct sockaddr_in *sin =
 +        ALIGNED_CAST(struct sockaddr_in *, &netdev->remote_addr);
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    VLOG_DBG("tunnel_set_config(%s)", netdev_get_name(dev_));
 +    SMAP_FOR_EACH(node, args) {
 +        VLOG_DBG("arg: %s->%s", node->name, (char*)node->data);
- static struct netdev_rx *
- netdev_tunnel_rx_alloc(void)
++        if (!strcmp(node->name, "remote_ip")) {
++            struct in_addr addr;
++            if (lookup_ip(node->data, &addr)) {
++                VLOG_WARN("%s: bad 'remote_ip'", node->name);
++            } else {
++                sin->sin_family = AF_INET;
++                sin->sin_addr = addr;
++                netdev->valid_remote_ip = true;
++            }
++        } else if (!strcmp(node->name, "remote_port")) {
++            sin->sin_port = htons(atoi(node->data));
++            netdev->valid_remote_port = true;
++        } else {
++            VLOG_WARN("%s: unknown argument '%s'", 
++                    netdev_get_name(dev_), node->name);
++        }
 +    }
 +    error = netdev_tunnel_connect(netdev);        
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return error;
 +}
 +
-     struct netdev_rx_tunnel *rx = xzalloc(sizeof *rx);
++static struct netdev_rxq *
++netdev_tunnel_rxq_alloc(void)
 +{
- netdev_tunnel_rx_construct(struct netdev_rx *rx_)
++    struct netdev_rxq_tunnel *rx = xzalloc(sizeof *rx);
 +    return &rx->up;
 +}
 +
 +static int
-     struct netdev_rx_tunnel *rx = netdev_rx_tunnel_cast(rx_);
++netdev_tunnel_rxq_construct(struct netdev_rxq *rx_)
 +{   
- netdev_tunnel_rx_destruct(struct netdev_rx *rx_ OVS_UNUSED)
++    struct netdev_rxq_tunnel *rx = netdev_rxq_tunnel_cast(rx_);
 +    struct netdev *netdev_ = rx->up.netdev;
 +    struct netdev_tunnel *netdev = netdev_tunnel_cast(netdev_);
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    rx->fd = netdev->sockfd;
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return 0;
 +}
 +
 +static void
- netdev_tunnel_rx_dealloc(struct netdev_rx *rx_)
++netdev_tunnel_rxq_destruct(struct netdev_rxq *rx_ OVS_UNUSED)
 +{
 +}
 +
 +static void
-     struct netdev_rx_tunnel *rx = netdev_rx_tunnel_cast(rx_);
++netdev_tunnel_rxq_dealloc(struct netdev_rxq *rx_)
 +{
- netdev_tunnel_rx_recv(struct netdev_rx *rx_, struct ofpbuf *buffer)
++    struct netdev_rxq_tunnel *rx = netdev_rxq_tunnel_cast(rx_);
 +
 +    free(rx);
 +}
 +
 +static int
-     size_t size = ofpbuf_tailroom(buffer);
-     struct netdev_rx_tunnel *rx = netdev_rx_tunnel_cast(rx_);
++netdev_tunnel_rxq_recv(struct netdev_rxq *rx_, struct ofpbuf **packet, int *c)
 +{
-           VLOG_DBG("%s: recv(%"PRIxPTR", %"PRIuSIZE", MSG_TRUNC) = %"PRIdSIZE,
-                   netdev_rx_get_name(rx_), (uintptr_t)buffer->data, size, retval);
++    struct netdev_rxq_tunnel *rx = netdev_rxq_tunnel_cast(rx_);
 +    struct netdev_tunnel *netdev =
 +        netdev_tunnel_cast(rx_->netdev);
++    struct ofpbuf *buffer = NULL;
++    size_t size;
++    int error = 0;
++
 +    if (!netdev->connected)
 +        return EAGAIN;
++    buffer = ofpbuf_new_with_headroom(VLAN_ETH_HEADER_LEN + ETH_PAYLOAD_MAX,
++        DP_NETDEV_HEADROOM);
++    size = ofpbuf_tailroom(buffer);
++
 +    for (;;) {
 +        ssize_t retval;
 +        retval = recv(rx->fd, buffer->data, size, MSG_TRUNC);
-               netdev->stats.rx_packets++;
-               netdev->stats.rx_bytes += retval;
++        VLOG_DBG("%s: recv(%"PRIxPTR", %"PRIuSIZE", MSG_TRUNC) = %"PRIdSIZE,
++                netdev_rxq_get_name(rx_), (uintptr_t)buffer->data, size, retval);
 +        if (retval >= 0) {
-                   return 0;
++            netdev->stats.rx_packets++;
++            netdev->stats.rx_bytes += retval;
 +            if (retval <= size) {
 +                buffer->size += retval;
-                 return EMSGSIZE;
++                goto out;
 +            } else {
 +                netdev->stats.rx_errors++;
 +                netdev->stats.rx_length_errors++;
-                     netdev_rx_get_name(rx_), ovs_strerror(errno));
-                   netdev->stats.rx_errors++;
++                error = EMSGSIZE;
++                goto out;
 +            }
 +        } else if (errno != EINTR) {
 +            if (errno != EAGAIN) {
 +                VLOG_WARN_RL(&rl, "error receiveing Ethernet packet on %s: %s",
-             return errno;
++                        netdev_rxq_get_name(rx_), ovs_strerror(errno));
++                netdev->stats.rx_errors++;
 +            }
- netdev_tunnel_rx_wait(struct netdev_rx *rx_)
++            error = errno;
++            goto out;
 +        }
 +    }
++out:
++    if (error) {
++        ofpbuf_delete(buffer);
++    } else {
++        dp_packet_pad(buffer);
++        packet[0] = buffer;
++        *c = 1;
++    }
++
++    return error;
 +}
 +
 +static void
-     struct netdev_rx_tunnel *rx = 
-       netdev_rx_tunnel_cast(rx_);
++netdev_tunnel_rxq_wait(struct netdev_rxq *rx_)
 +{
- netdev_tunnel_send(struct netdev *netdev_, const void *buffer, size_t size)
++    struct netdev_rxq_tunnel *rx = 
++        netdev_rxq_tunnel_cast(rx_);
 +    if (rx->fd >= 0) {
 +        poll_fd_wait(rx->fd, POLLIN);
 +    }
 +}
 +
 +static int
-       netdev_tunnel_cast(netdev_);
-     if (!dev->connected)
-         return EAGAIN;
++netdev_tunnel_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal)
 +{
++    const void *buffer = pkt->data;
++    size_t size = pkt->size;
 +    struct netdev_tunnel *dev = 
-       VLOG_DBG("%s: send(%"PRIxPTR", %"PRIuSIZE") = %"PRIdSIZE,
-                netdev_get_name(netdev_), (uintptr_t)buffer, size, retval);
++        netdev_tunnel_cast(netdev_);
++    int error = 0;
++    if (!dev->connected) {
++        error = EAGAIN;
++        goto out;
++    }
 +    for (;;) {
 +        ssize_t retval;
 +        retval = send(dev->sockfd, buffer, size, 0);
-           dev->stats.tx_packets++;
-           dev->stats.tx_bytes += retval;
-           if (retval != size) {
-               VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIdSIZE" bytes of "
-                            "%"PRIuSIZE") on %s", retval, size, netdev_get_name(netdev_));
-               dev->stats.tx_errors++;
-           }
-             return 0;
++        VLOG_DBG("%s: send(%"PRIxPTR", %"PRIuSIZE") = %"PRIdSIZE,
++                netdev_get_name(netdev_), (uintptr_t)buffer, size, retval);
 +        if (retval >= 0) {
-                     netdev_get_name(netdev_), ovs_strerror(errno));
-               dev->stats.tx_errors++;
++            dev->stats.tx_packets++;
++            dev->stats.tx_bytes += retval;
++            if (retval != size) {
++                VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIdSIZE" bytes of "
++                        "%"PRIuSIZE") on %s", retval, size, netdev_get_name(netdev_));
++                dev->stats.tx_errors++;
++            }
++            goto out;
 +        } else if (errno != EINTR) {
 +            if (errno != EAGAIN) {
 +                VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
-             return errno;
++                        netdev_get_name(netdev_), ovs_strerror(errno));
++                dev->stats.tx_errors++;
 +            }
- netdev_tunnel_rx_drain(struct netdev_rx *rx_)
++            error = errno;
++            goto out;
 +        }
 +    }
++out:
++    if (may_steal) {
++        ofpbuf_delete(pkt);
++    }
++
++    return error;
 +}
 +
 +static void
 +netdev_tunnel_send_wait(struct netdev *netdev_)
 +{
 +    struct netdev_tunnel *dev = netdev_tunnel_cast(netdev_);
 +    if (dev->sockfd >= 0) {
 +        poll_fd_wait(dev->sockfd, POLLOUT);
 +    }
 +}
 +
 +static int
-     struct netdev_rx_tunnel *rx = 
-       netdev_rx_tunnel_cast(rx_);
++netdev_tunnel_rxq_drain(struct netdev_rxq *rx_)
 +{
 +    struct netdev_tunnel *netdev =
 +        netdev_tunnel_cast(rx_->netdev);
-       return 0;
++    struct netdev_rxq_tunnel *rx = 
++        netdev_rxq_tunnel_cast(rx_);
 +    char buffer[128];
 +    int error;
 +
 +    if (!netdev->connected)
-       error = recv(rx->fd, buffer, 128, MSG_TRUNC);
-       if (error) {
++        return 0;
 +    for (;;) {
-               break;
++        error = recv(rx->fd, buffer, 128, MSG_TRUNC);
++        if (error) {
 +            if (error == -EAGAIN)
-               return error;
-       }
++                break;
 +            else if (error != -EMSGSIZE)
-     netdev_tunnel_rx_alloc,
-     netdev_tunnel_rx_construct,
-     netdev_tunnel_rx_destruct,
-     netdev_tunnel_rx_dealloc,
-     netdev_tunnel_rx_recv,
-     netdev_tunnel_rx_wait,
-     netdev_tunnel_rx_drain,
++                return error;
++        }
 +    }
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_set_etheraddr(struct netdev *netdev,
 +                           const uint8_t mac[ETH_ADDR_LEN])
 +{
 +    struct netdev_tunnel *dev = netdev_tunnel_cast(netdev);
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    if (!eth_addr_equals(dev->hwaddr, mac)) {
 +        memcpy(dev->hwaddr, mac, ETH_ADDR_LEN);
 +        netdev_tunnel_update_seq(dev);
 +    }
 +    ovs_mutex_unlock(&dev->mutex);
 +
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_get_etheraddr(const struct netdev *netdev,
 +                           uint8_t mac[ETH_ADDR_LEN])
 +{
 +    const struct netdev_tunnel *dev = netdev_tunnel_cast(netdev);
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    memcpy(mac, dev->hwaddr, ETH_ADDR_LEN);
 +    ovs_mutex_unlock(&dev->mutex);
 +    return 0;
 +}
 +
 +
 +static int
 +netdev_tunnel_get_stats(const struct netdev *netdev, struct netdev_stats *stats)
 +{
 +    const struct netdev_tunnel *dev = netdev_tunnel_cast(netdev);
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    *stats = dev->stats;
 +    ovs_mutex_unlock(&dev->mutex);
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_set_stats(struct netdev *netdev, const struct netdev_stats *stats)
 +{
 +    struct netdev_tunnel *dev = netdev_tunnel_cast(netdev);
 +
 +    ovs_mutex_lock(&dev->mutex);
 +    dev->stats = *stats;
 +    ovs_mutex_unlock(&dev->mutex);
 +    return 0;
 +}
 +
 +static int
 +netdev_tunnel_update_flags(struct netdev *dev_,
 +                          enum netdev_flags off, enum netdev_flags on,
 +                          enum netdev_flags *old_flagsp)
 +{
 +    struct netdev_tunnel *netdev =
 +        netdev_tunnel_cast(dev_);
 +    int error = 0;
 +
 +    ovs_mutex_lock(&netdev->mutex);
 +    if ((off | on) & ~(NETDEV_UP | NETDEV_PROMISC)) {
 +        error = EINVAL;
 +        goto out;
 +    }
 +
 +    // XXX should we actually do something with these flags?
 +    *old_flagsp = netdev->flags;
 +    netdev->flags |= on;
 +    netdev->flags &= ~off;
 +    if (*old_flagsp != netdev->flags) {
 +        netdev_tunnel_update_seq(netdev);
 +    }
 +
 +out:
 +    ovs_mutex_unlock(&netdev->mutex);
 +    return error;
 +}
 +
 +\f
 +/* Helper functions. */
 +
 +static void
 +netdev_tunnel_update_seq(struct netdev_tunnel *dev)
 +    OVS_REQUIRES(dev->mutex)
 +{
 +    dev->change_seq++;
 +    if (!dev->change_seq) {
 +        dev->change_seq++;
 +    }
 +}
 +
 +static void
 +netdev_tunnel_get_port(struct unixctl_conn *conn,
 +                     int argc OVS_UNUSED, const char *argv[], void *aux OVS_UNUSED)
 +{
 +    struct netdev_tunnel *tunnel_dev;
 +    char buf[6];
 +
 +    ovs_mutex_lock(&tunnel_netdevs_mutex);
 +    tunnel_dev = shash_find_data(&tunnel_netdevs, argv[1]);
 +    if (!tunnel_dev) {
 +        unixctl_command_reply_error(conn, "no such tunnel netdev");
 +        goto out;
 +    }
 +
 +    ovs_mutex_lock(&tunnel_dev->mutex);
 +    sprintf(buf, "%d", ss_get_port(&tunnel_dev->local_addr));
 +    ovs_mutex_unlock(&tunnel_dev->mutex);
 +
 +    unixctl_command_reply(conn, buf);
 +out:
 +    ovs_mutex_unlock(&tunnel_netdevs_mutex);
 +}
 +
 +static void
 +netdev_tunnel_get_tx_bytes(struct unixctl_conn *conn,
 +                     int argc OVS_UNUSED, const char *argv[], void *aux OVS_UNUSED)
 +{
 +    struct netdev_tunnel *tunnel_dev;
 +    char buf[128];
 +
 +    ovs_mutex_lock(&tunnel_netdevs_mutex);
 +    tunnel_dev = shash_find_data(&tunnel_netdevs, argv[1]);
 +    if (!tunnel_dev) {
 +        unixctl_command_reply_error(conn, "no such tunnel netdev");
 +        goto out;
 +    }
 +
 +    ovs_mutex_lock(&tunnel_dev->mutex);
 +    sprintf(buf, "%"PRIu64, tunnel_dev->stats.tx_bytes);
 +    ovs_mutex_unlock(&tunnel_dev->mutex);
 +    unixctl_command_reply(conn, buf);
 +out:
 +    ovs_mutex_unlock(&tunnel_netdevs_mutex);
 +}
 +
 +static void
 +netdev_tunnel_get_rx_bytes(struct unixctl_conn *conn,
 +                     int argc OVS_UNUSED, const char *argv[], void *aux OVS_UNUSED)
 +{
 +    struct netdev_tunnel *tunnel_dev;
 +    char buf[128];
 +
 +    ovs_mutex_lock(&tunnel_netdevs_mutex);
 +    tunnel_dev = shash_find_data(&tunnel_netdevs, argv[1]);
 +    if (!tunnel_dev) {
 +        unixctl_command_reply_error(conn, "no such tunnel netdev");
 +        goto out;
 +    }
 +
 +    sprintf(buf, "%"PRIu64, tunnel_dev->stats.rx_bytes);
 +    unixctl_command_reply(conn, buf);
 +out:
 +    ovs_mutex_unlock(&tunnel_netdevs_mutex);
 +}
 +
 +
 +static int
 +netdev_tunnel_init(void)
 +{
 +    unixctl_command_register("netdev-tunnel/get-port", "NAME",
 +                             1, 1, netdev_tunnel_get_port, NULL);
 +    unixctl_command_register("netdev-tunnel/get-tx-bytes", "NAME",
 +                             1, 1, netdev_tunnel_get_tx_bytes, NULL);
 +    unixctl_command_register("netdev-tunnel/get-rx-bytes", "NAME",
 +                             1, 1, netdev_tunnel_get_rx_bytes, NULL);
 +    return 0;
 +}
 +
 +static void
 +netdev_tunnel_run(void)
 +{
 +}
 +
 +static void
 +netdev_tunnel_wait(void)
 +{
 +}
 +
 +const struct netdev_class netdev_tunnel_class = {
 +    "tunnel",
 +    netdev_tunnel_init,    
 +    netdev_tunnel_run,      
 +    netdev_tunnel_wait,   
 +
 +    netdev_tunnel_alloc,
 +    netdev_tunnel_construct,
 +    netdev_tunnel_destruct,
 +    netdev_tunnel_dealloc,
 +    netdev_tunnel_get_config,
 +    netdev_tunnel_set_config, 
 +    NULL,                                 /* get_tunnel_config */
 +
 +    netdev_tunnel_send, 
 +    netdev_tunnel_send_wait,  
 +
 +    netdev_tunnel_set_etheraddr,
 +    netdev_tunnel_get_etheraddr,
 +    NULL,                                 /* get_mtu */
 +    NULL,                                 /* set_mtu */
 +    NULL,                       /* get_ifindex */
 +    NULL,                                 /* get_carrier */
 +    NULL,                       /* get_carrier_resets */
 +    NULL,                       /* get_miimon */
 +    netdev_tunnel_get_stats,
 +    netdev_tunnel_set_stats,
 +
 +    NULL,                       /* get_features */
 +    NULL,                       /* set_advertisements */
 +
 +    NULL,                       /* set_policing */
 +    NULL,                       /* get_qos_types */
 +    NULL,                       /* get_qos_capabilities */
 +    NULL,                       /* get_qos */
 +    NULL,                       /* set_qos */
 +    NULL,                       /* get_queue */
 +    NULL,                       /* set_queue */
 +    NULL,                       /* delete_queue */
 +    NULL,                       /* get_queue_stats */
 +    NULL,                       /* queue_dump_start */
 +    NULL,                       /* queue_dump_next */
 +    NULL,                       /* queue_dump_done */
 +    NULL,                       /* dump_queue_stats */
 +
 +    NULL,                       /* get_in4 */
 +    NULL,                       /* set_in4 */
 +    NULL,                       /* get_in6 */
 +    NULL,                       /* add_router */
 +    NULL,                       /* get_next_hop */
 +    NULL,                       /* get_status */
 +    NULL,                       /* arp_lookup */
 +
 +    netdev_tunnel_update_flags,
 +
++    netdev_tunnel_rxq_alloc,
++    netdev_tunnel_rxq_construct,
++    netdev_tunnel_rxq_destruct,
++    netdev_tunnel_rxq_dealloc,
++    netdev_tunnel_rxq_recv,
++    netdev_tunnel_rxq_wait,
++    netdev_tunnel_rxq_drain,
 +};
diff --combined lib/netdev.c
@@@ -31,6 -31,7 +31,7 @@@
  #include "fatal-signal.h"
  #include "hash.h"
  #include "list.h"
+ #include "netdev-dpdk.h"
  #include "netdev-provider.h"
  #include "netdev-vport.h"
  #include "ofpbuf.h"
@@@ -90,6 -91,18 +91,18 @@@ static struct vlog_rate_limit rl = VLOG
  static void restore_all_flags(void *aux OVS_UNUSED);
  void update_device_args(struct netdev *, const struct shash *args);
  
+ int
+ netdev_n_rxq(const struct netdev *netdev)
+ {
+     return netdev->n_rxq;
+ }
+ bool
+ netdev_is_pmd(const struct netdev *netdev)
+ {
+     return !strcmp(netdev->netdev_class->type, "dpdk");
+ }
  static void
  netdev_initialize(void)
      OVS_EXCLUDED(netdev_class_mutex, netdev_mutex)
          netdev_register_provider(&netdev_tap_class);
          netdev_register_provider(&netdev_bsd_class);
  #endif
 +        netdev_register_provider(&netdev_tunnel_class);
 +        netdev_register_provider(&netdev_pltap_class);
+         netdev_dpdk_register();
  
          ovsthread_once_done(&once);
      }
@@@ -227,7 -239,6 +241,6 @@@ netdev_unregister_provider(const char *
          atomic_read(&rc->ref_cnt, &ref_cnt);
          if (!ref_cnt) {
              hmap_remove(&netdev_classes, &rc->hmap_node);
-             atomic_destroy(&rc->ref_cnt);
              free(rc);
              error = 0;
          } else {
@@@ -330,6 -341,13 +343,13 @@@ netdev_open(const char *name, const cha
                  netdev->netdev_class = rc->class;
                  netdev->name = xstrdup(name);
                  netdev->node = shash_add(&netdev_shash, name, netdev);
+                 /* By default enable one rx queue per netdev. */
+                 if (netdev->netdev_class->rxq_alloc) {
+                     netdev->n_rxq = 1;
+                 } else {
+                     netdev->n_rxq = 0;
+                 }
                  list_init(&netdev->saved_flags_list);
  
                  error = rc->class->construct(netdev);
@@@ -503,24 -521,25 +523,25 @@@ netdev_parse_name(const char *netdev_na
      }
  }
  
- /* Attempts to open a netdev_rx handle for obtaining packets received on
-  * 'netdev'.  On success, returns 0 and stores a nonnull 'netdev_rx *' into
+ /* Attempts to open a netdev_rxq handle for obtaining packets received on
+  * 'netdev'.  On success, returns 0 and stores a nonnull 'netdev_rxq *' into
   * '*rxp'.  On failure, returns a positive errno value and stores NULL into
   * '*rxp'.
   *
   * Some kinds of network devices might not support receiving packets.  This
   * function returns EOPNOTSUPP in that case.*/
  int
- netdev_rx_open(struct netdev *netdev, struct netdev_rx **rxp)
+ netdev_rxq_open(struct netdev *netdev, struct netdev_rxq **rxp, int id)
      OVS_EXCLUDED(netdev_mutex)
  {
      int error;
  
-     if (netdev->netdev_class->rx_alloc) {
-         struct netdev_rx *rx = netdev->netdev_class->rx_alloc();
+     if (netdev->netdev_class->rxq_alloc && id < netdev->n_rxq) {
+         struct netdev_rxq *rx = netdev->netdev_class->rxq_alloc();
          if (rx) {
              rx->netdev = netdev;
-             error = netdev->netdev_class->rx_construct(rx);
+             rx->queue_id = id;
+             error = netdev->netdev_class->rxq_construct(rx);
              if (!error) {
                  ovs_mutex_lock(&netdev_mutex);
                  netdev->ref_cnt++;
                  *rxp = rx;
                  return 0;
              }
-             netdev->netdev_class->rx_dealloc(rx);
+             netdev->netdev_class->rxq_dealloc(rx);
          } else {
              error = ENOMEM;
          }
  
  /* Closes 'rx'. */
  void
- netdev_rx_close(struct netdev_rx *rx)
+ netdev_rxq_close(struct netdev_rxq *rx)
      OVS_EXCLUDED(netdev_mutex)
  {
      if (rx) {
          struct netdev *netdev = rx->netdev;
-         netdev->netdev_class->rx_destruct(rx);
-         netdev->netdev_class->rx_dealloc(rx);
+         netdev->netdev_class->rxq_destruct(rx);
+         netdev->netdev_class->rxq_dealloc(rx);
          netdev_close(netdev);
      }
  }
  
- /* Attempts to receive a packet from 'rx' into the tailroom of 'buffer', which
-  * must initially be empty.  If successful, returns 0 and increments
-  * 'buffer->size' by the number of bytes in the received packet, otherwise a
-  * positive errno value.
+ /* Attempts to receive batch of packets from 'rx'.
   *
   * Returns EAGAIN immediately if no packet is ready to be received.
   *
   * Returns EMSGSIZE, and discards the packet, if the received packet is longer
   * than 'ofpbuf_tailroom(buffer)'.
   *
-  * Implementations may make use of VLAN_HEADER_LEN bytes of tailroom to
-  * add a VLAN header which is obtained out-of-band to the packet. If
-  * this occurs then VLAN_HEADER_LEN bytes of tailroom will no longer be
-  * available for the packet, otherwise it may be used for the packet
-  * itself.
-  *
   * It is advised that the tailroom of 'buffer' should be
   * VLAN_HEADER_LEN bytes longer than the MTU to allow space for an
   * out-of-band VLAN header to be added to the packet.  At the very least,
   * This function may be set to null if it would always return EOPNOTSUPP
   * anyhow. */
  int
- netdev_rx_recv(struct netdev_rx *rx, struct ofpbuf *buffer)
+ netdev_rxq_recv(struct netdev_rxq *rx, struct ofpbuf **buffers, int *cnt)
  {
      int retval;
  
-     ovs_assert(buffer->size == 0);
-     ovs_assert(ofpbuf_tailroom(buffer) >= ETH_TOTAL_MIN);
-     retval = rx->netdev->netdev_class->rx_recv(rx, buffer);
+     retval = rx->netdev->netdev_class->rxq_recv(rx, buffers, cnt);
      if (!retval) {
          COVERAGE_INC(netdev_received);
-         if (buffer->size < ETH_TOTAL_MIN) {
-             ofpbuf_put_zeros(buffer, ETH_TOTAL_MIN - buffer->size);
-         }
-         return 0;
-     } else {
-         return retval;
      }
+     return retval;
  }
  
  /* Arranges for poll_block() to wake up when a packet is ready to be received
   * on 'rx'. */
  void
- netdev_rx_wait(struct netdev_rx *rx)
+ netdev_rxq_wait(struct netdev_rxq *rx)
  {
-     rx->netdev->netdev_class->rx_wait(rx);
+     rx->netdev->netdev_class->rxq_wait(rx);
  }
  
  /* Discards any packets ready to be received on 'rx'. */
  int
- netdev_rx_drain(struct netdev_rx *rx)
+ netdev_rxq_drain(struct netdev_rxq *rx)
  {
-     return (rx->netdev->netdev_class->rx_drain
-             ? rx->netdev->netdev_class->rx_drain(rx)
+     return (rx->netdev->netdev_class->rxq_drain
+             ? rx->netdev->netdev_class->rxq_drain(rx)
              : 0);
  }
  
   * immediately.  Returns EMSGSIZE if a partial packet was transmitted or if
   * the packet is too big or too small to transmit on the device.
   *
-  * The caller retains ownership of 'buffer' in all cases.
+  * To retain ownership of 'buffer' caller can set may_steal to false.
   *
   * The kernel maintains a packet transmission queue, so the caller is not
   * expected to do additional queuing of packets.
   * Some network devices may not implement support for this function.  In such
   * cases this function will always return EOPNOTSUPP. */
  int
- netdev_send(struct netdev *netdev, const struct ofpbuf *buffer)
+ netdev_send(struct netdev *netdev, struct ofpbuf *buffer, bool may_steal)
  {
      int error;
  
      error = (netdev->netdev_class->send
-              ? netdev->netdev_class->send(netdev, buffer->data, buffer->size)
+              ? netdev->netdev_class->send(netdev, buffer, may_steal)
               : EOPNOTSUPP);
      if (!error) {
          COVERAGE_INC(netdev_sent);
@@@ -1608,16 -1610,16 +1612,16 @@@ netdev_get_type_from_name(const char *n
  }
  \f
  struct netdev *
- netdev_rx_get_netdev(const struct netdev_rx *rx)
+ netdev_rxq_get_netdev(const struct netdev_rxq *rx)
  {
      ovs_assert(rx->netdev->ref_cnt > 0);
      return rx->netdev;
  }
  
  const char *
- netdev_rx_get_name(const struct netdev_rx *rx)
+ netdev_rxq_get_name(const struct netdev_rxq *rx)
  {
-     return netdev_get_name(netdev_rx_get_netdev(rx));
+     return netdev_get_name(netdev_rxq_get_netdev(rx));
  }
  
  static void