From caf6491f3e077d58e0c09e64a3183f34fcbdf672 Mon Sep 17 00:00:00 2001 From: Jarno Rajahalme Date: Wed, 28 Aug 2013 12:06:07 -0700 Subject: [PATCH] ofproto-dpif-upcall: Batch upcalls. Batching reduces overheads and enables upto 4 times the upcall processing performance in a specialized test case. Signed-off-by: Jarno Rajahalme Signed-off-by: Ben Pfaff --- ofproto/ofproto-dpif-upcall.c | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index db78af381..22cdf1b8a 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -55,6 +55,8 @@ struct handler { struct list upcalls OVS_GUARDED; size_t n_upcalls OVS_GUARDED; + size_t n_new_upcalls; /* Only changed by the dispatcher. */ + pthread_cond_t wake_cond; /* Wakes 'thread' while holding 'mutex'. */ }; @@ -515,6 +517,10 @@ static void recv_upcalls(struct udpif *udpif) { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60); + size_t n_udpif_new_upcalls = 0; + struct handler *handler; + int n; + for (;;) { struct upcall *upcall; int error; @@ -535,7 +541,6 @@ recv_upcalls(struct udpif *udpif) } else if (upcall->type == MISS_UPCALL) { struct dpif_upcall *dupcall = &upcall->dpif_upcall; uint32_t hash = udpif->secret; - struct handler *handler; struct nlattr *nla; size_t n_bytes, left; @@ -561,8 +566,11 @@ recv_upcalls(struct udpif *udpif) ovs_mutex_lock(&handler->mutex); if (handler->n_upcalls < MAX_QUEUE_LENGTH) { list_push_back(&handler->upcalls, &upcall->list_node); - handler->n_upcalls++; - xpthread_cond_signal(&handler->wake_cond); + handler->n_new_upcalls = ++handler->n_upcalls; + + if (handler->n_new_upcalls >= FLOW_MISS_MAX_BATCH) { + xpthread_cond_signal(&handler->wake_cond); + } ovs_mutex_unlock(&handler->mutex); if (!VLOG_DROP_DBG(&rl)) { struct ds ds = DS_EMPTY_INITIALIZER; @@ -581,10 +589,13 @@ recv_upcalls(struct udpif *udpif) } else { ovs_mutex_lock(&udpif->upcall_mutex); if (udpif->n_upcalls < MAX_QUEUE_LENGTH) { - udpif->n_upcalls++; + n_udpif_new_upcalls = ++udpif->n_upcalls; list_push_back(&udpif->upcalls, &upcall->list_node); ovs_mutex_unlock(&udpif->upcall_mutex); - seq_change(udpif->wait_seq); + + if (n_udpif_new_upcalls >= FLOW_MISS_MAX_BATCH) { + seq_change(udpif->wait_seq); + } } else { ovs_mutex_unlock(&udpif->upcall_mutex); COVERAGE_INC(upcall_queue_overflow); @@ -592,6 +603,18 @@ recv_upcalls(struct udpif *udpif) } } } + for (n = 0; n < udpif->n_handlers; ++n) { + handler = &udpif->handlers[n]; + if (handler->n_new_upcalls) { + handler->n_new_upcalls = 0; + ovs_mutex_lock(&handler->mutex); + xpthread_cond_signal(&handler->wake_cond); + ovs_mutex_unlock(&handler->mutex); + } + } + if (n_udpif_new_upcalls) { + seq_change(udpif->wait_seq); + } } static struct flow_miss * -- 2.43.0