* when it is called, it should return EAGAIN without blocking. */
     int (*recv)(struct wdp *wdp, struct wdp_packet *packet);
 
+    /* Discards any queued messages that otherwise would be received by the
+     * 'recv' member function for 'wdp'. */
+    int (*recv_purge)(struct wdp *wdp);
+
     /* Arranges for the poll loop to wake up when 'wdp' has a message queued
      * to be received with the recv member function. */
     void (*recv_wait)(struct wdp *wdp);
 
     return EAGAIN;
 }
 
+static void
+wx_recv_purge_queue__(struct wx *wx, int max, int xflow_listen_mask,
+                      int *errorp)
+{
+    int error;
+
+    error = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+    if (!error) {
+        struct ofpbuf *buf;
+
+        while (max > 0 && (error = xfif_recv(wx->xfif, &buf)) == 0) {
+            ofpbuf_delete(buf);
+            max--;
+        }
+    }
+    if (error && error != EAGAIN) {
+        *errorp = error;
+    }
+}
+
+static int
+wx_recv_purge(struct wdp *wdp)
+{
+    struct wx *wx = wx_cast(wdp);
+    struct xflow_stats xflow_stats;
+    int xflow_listen_mask;
+    int retval, error;
+
+    xfif_get_xf_stats(wx->xfif, &xflow_stats);
+
+    error = xfif_recv_get_mask(wx->xfif, &xflow_listen_mask);
+    if (error || !(xflow_listen_mask & XFLOWL_ALL)) {
+        return error;
+    }
+
+    if (xflow_listen_mask & XFLOWL_MISS) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_miss_queue, XFLOWL_MISS,
+                              &error);
+    }
+    if (xflow_listen_mask & XFLOWL_ACTION) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_action_queue, XFLOWL_ACTION,
+                              &error);
+    }
+    if (xflow_listen_mask & XFLOWL_SFLOW) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_sflow_queue, XFLOWL_SFLOW,
+                              &error);
+    }
+
+    retval = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+    return retval ? retval : error;
+}
+
+
 static void
 wx_recv_wait(struct wdp *wdp)
 {
         wx_get_sflow_probability,
         wx_set_sflow_probability,
         wx_recv,
+        wx_recv_purge,
         wx_recv_wait,
     };
 
 
 int
 wdp_recv_purge(struct wdp *wdp)
 {
-    struct wdp_stats stats;
-    unsigned int i;
-    int error;
-
     COVERAGE_INC(wdp_purge);
-
-    error = wdp_get_wdp_stats(wdp, &stats);
-    if (error) {
-        return error;
-    }
-
-    for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) {
-        struct wdp_packet packet;
-
-        error = wdp_recv(wdp, &packet);
-        if (error) {
-            return error == EAGAIN ? 0 : error;
-        }
-        ofpbuf_delete(packet.payload);
-    }
-    return 0;
+    return wdp->wdp_class->recv_purge(wdp);
 }
 
 /* Arranges for the poll loop to wake up when 'wdp' has a message queued to be