- msg_size = sizeof *header + packet->size;
- msg = ofpbuf_new_with_headroom(msg_size, DPIF_RECV_MSG_PADDING);
- header = ofpbuf_put_uninit(msg, sizeof *header);
- header->type = queue_no;
- header->length = msg_size;
- header->port = port_no;
- header->arg = arg;
- ofpbuf_put(msg, packet->data, packet->size);
- list_push_back(&dp->queues[queue_no], &msg->list_node);
- dp->queue_len[queue_no]++;
+ buf = ofpbuf_new(ODPUTIL_FLOW_KEY_BYTES + 2 + packet->size);
+ odp_flow_key_from_flow(buf, flow);
+ key_len = buf->size;
+ ofpbuf_pull(buf, key_len);
+ ofpbuf_reserve(buf, 2);
+ ofpbuf_put(buf, packet->data, packet->size);
+
+ upcall = xzalloc(sizeof *upcall);
+ upcall->type = queue_no;
+ upcall->packet = buf;
+ upcall->key = buf->base;
+ upcall->key_len = key_len;
+ upcall->userdata = arg;
+
+ q->upcalls[++q->head & QUEUE_MASK] = upcall;