* when it is called, it should return EAGAIN without blocking. */
int (*recv)(struct wdp *wdp, struct wdp_packet *packet);
+ /* Discards any queued messages that otherwise would be received by the
+ * 'recv' member function for 'wdp'. */
+ int (*recv_purge)(struct wdp *wdp);
+
/* Arranges for the poll loop to wake up when 'wdp' has a message queued
* to be received with the recv member function. */
void (*recv_wait)(struct wdp *wdp);
return EAGAIN;
}
+static void
+wx_recv_purge_queue__(struct wx *wx, int max, int xflow_listen_mask,
+ int *errorp)
+{
+ int error;
+
+ error = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+ if (!error) {
+ struct ofpbuf *buf;
+
+ while (max > 0 && (error = xfif_recv(wx->xfif, &buf)) == 0) {
+ ofpbuf_delete(buf);
+ max--;
+ }
+ }
+ if (error && error != EAGAIN) {
+ *errorp = error;
+ }
+}
+
+static int
+wx_recv_purge(struct wdp *wdp)
+{
+ struct wx *wx = wx_cast(wdp);
+ struct xflow_stats xflow_stats;
+ int xflow_listen_mask;
+ int retval, error;
+
+ xfif_get_xf_stats(wx->xfif, &xflow_stats);
+
+ error = xfif_recv_get_mask(wx->xfif, &xflow_listen_mask);
+ if (error || !(xflow_listen_mask & XFLOWL_ALL)) {
+ return error;
+ }
+
+ if (xflow_listen_mask & XFLOWL_MISS) {
+ wx_recv_purge_queue__(wx, xflow_stats.max_miss_queue, XFLOWL_MISS,
+ &error);
+ }
+ if (xflow_listen_mask & XFLOWL_ACTION) {
+ wx_recv_purge_queue__(wx, xflow_stats.max_action_queue, XFLOWL_ACTION,
+ &error);
+ }
+ if (xflow_listen_mask & XFLOWL_SFLOW) {
+ wx_recv_purge_queue__(wx, xflow_stats.max_sflow_queue, XFLOWL_SFLOW,
+ &error);
+ }
+
+ retval = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+ return retval ? retval : error;
+}
+
+
static void
wx_recv_wait(struct wdp *wdp)
{
wx_get_sflow_probability,
wx_set_sflow_probability,
wx_recv,
+ wx_recv_purge,
wx_recv_wait,
};
int
wdp_recv_purge(struct wdp *wdp)
{
- struct wdp_stats stats;
- unsigned int i;
- int error;
-
COVERAGE_INC(wdp_purge);
-
- error = wdp_get_wdp_stats(wdp, &stats);
- if (error) {
- return error;
- }
-
- for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) {
- struct wdp_packet packet;
-
- error = wdp_recv(wdp, &packet);
- if (error) {
- return error == EAGAIN ? 0 : error;
- }
- ofpbuf_delete(packet.payload);
- }
- return 0;
+ return wdp->wdp_class->recv_purge(wdp);
}
/* Arranges for the poll loop to wake up when 'wdp' has a message queued to be