From: Ben Pfaff Date: Tue, 6 Aug 2013 16:39:10 +0000 (-0700) Subject: seq: New module for race-free, pollable, thread-safe sequence number. X-Git-Tag: sliver-openvswitch-2.0.90-1~27^2~38 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=55b403558b3525108c72846db2cf09fda59bb22b;p=sliver-openvswitch.git seq: New module for race-free, pollable, thread-safe sequence number. Signed-off-by: Ben Pfaff Acked-by: Andy Zhou --- diff --git a/lib/automake.mk b/lib/automake.mk index b46a8680a..f936897bf 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -159,6 +159,8 @@ lib_libopenvswitch_a_SOURCES = \ lib/reconnect.c \ lib/reconnect.h \ lib/sat-math.h \ + lib/seq.c \ + lib/seq.h \ lib/sha1.c \ lib/sha1.h \ lib/shash.c \ diff --git a/lib/poll-loop.c b/lib/poll-loop.c index 5f9b9cdfd..4eb118701 100644 --- a/lib/poll-loop.c +++ b/lib/poll-loop.c @@ -26,6 +26,7 @@ #include "fatal-signal.h" #include "list.h" #include "ovs-thread.h" +#include "seq.h" #include "socket-util.h" #include "timeval.h" #include "vlog.h" @@ -248,6 +249,8 @@ poll_block(void) /* Handle any pending signals before doing anything else. */ fatal_signal_run(); + + seq_woke(); } static void diff --git a/lib/seq.c b/lib/seq.c new file mode 100644 index 000000000..abe1ad8e1 --- /dev/null +++ b/lib/seq.c @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "seq.h" + +#include + +#include "hash.h" +#include "hmap.h" +#include "latch.h" +#include "list.h" +#include "ovs-thread.h" +#include "poll-loop.h" + +/* A sequence number object. */ +struct seq { + uint64_t value OVS_GUARDED; + struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */ +}; + +/* A thread waiting on a particular seq. */ +struct seq_waiter { + struct seq *seq OVS_GUARDED; /* Seq being waited for. */ + struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */ + unsigned int ovsthread_id OVS_GUARDED; /* Key in 'waiters' hmap. */ + + struct seq_thread *thread OVS_GUARDED; /* Thread preparing to wait. */ + struct list list_node OVS_GUARDED; /* In 'thread->waiters'. */ + + uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */ +}; + +/* A thread that might be waiting on one or more seqs. */ +struct seq_thread { + struct list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */ + struct latch latch OVS_GUARDED; /* Wakeup latch for this thread. */ + bool waiting OVS_GUARDED; /* True if latch_wait() already called. */ +}; + +static struct ovs_mutex seq_mutex = OVS_ADAPTIVE_MUTEX_INITIALIZER; + +static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1; + +static pthread_key_t seq_thread_key; + +static void seq_init(void); +static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex); +static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex); +static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex); +static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex); +static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex); + +/* Creates and returns a new 'seq' object. */ +struct seq * OVS_EXCLUDED(seq_mutex) +seq_create(void) +{ + struct seq *seq; + + seq_init(); + + seq = xmalloc(sizeof *seq); + ovs_mutex_lock(&seq_mutex); + seq->value = seq_next++; + hmap_init(&seq->waiters); + ovs_mutex_unlock(&seq_mutex); + + return seq; +} + +/* Destroys 'seq', waking up threads that were waiting on it, if any. */ +void +seq_destroy(struct seq *seq) + OVS_EXCLUDED(seq_mutex) +{ + ovs_mutex_lock(&seq_mutex); + seq_wake_waiters(seq); + hmap_destroy(&seq->waiters); + free(seq); + ovs_mutex_unlock(&seq_mutex); +} + +/* Increments 'seq''s sequence number, waking up any threads that are waiting + * on 'seq'. */ +void +seq_change(struct seq *seq) + OVS_EXCLUDED(seq_mutex) +{ + ovs_mutex_lock(&seq_mutex); + seq->value = seq_next++; + seq_wake_waiters(seq); + ovs_mutex_unlock(&seq_mutex); +} + +/* Returns 'seq''s current sequence number (which could change immediately). */ +uint64_t +seq_read(const struct seq *seq) + OVS_EXCLUDED(seq_mutex) +{ + uint64_t value; + + ovs_mutex_lock(&seq_mutex); + value = seq->value; + ovs_mutex_unlock(&seq_mutex); + + return value; +} + +static void +seq_wait__(struct seq *seq, uint64_t value) + OVS_REQUIRES(seq_mutex) +{ + unsigned int id = ovsthread_id_self(); + uint32_t hash = hash_int(id, 0); + struct seq_waiter *waiter; + + HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) { + if (waiter->ovsthread_id == id) { + if (waiter->value != value) { + /* The current value is different from the value we've already + * waited for, */ + poll_immediate_wake(); + } else { + /* Already waiting on 'value', nothing more to do. */ + } + return; + } + } + + waiter = xmalloc(sizeof *waiter); + waiter->seq = seq; + hmap_insert(&seq->waiters, &waiter->hmap_node, hash); + waiter->value = value; + waiter->thread = seq_thread_get(); + list_push_back(&waiter->thread->waiters, &waiter->list_node); + + if (!waiter->thread->waiting) { + latch_wait(&waiter->thread->latch); + waiter->thread->waiting = true; + } +} + +/* Causes the following poll_block() to wake up when 'seq''s sequence number + * changes from 'value'. (If 'seq''s sequence number isn't 'value', then + * poll_block() won't block at all.) */ +void +seq_wait(const struct seq *seq_, uint64_t value) + OVS_EXCLUDED(seq_mutex) +{ + struct seq *seq = CONST_CAST(struct seq *, seq_); + + ovs_mutex_lock(&seq_mutex); + if (value == seq->value) { + seq_wait__(seq, value); + } else { + poll_immediate_wake(); + } + ovs_mutex_unlock(&seq_mutex); +} + +/* Called by poll_block() just before it returns, this function destroys any + * seq_waiter objects associated with the current thread. */ +void +seq_woke(void) + OVS_EXCLUDED(seq_mutex) +{ + struct seq_thread *thread; + + seq_init(); + + thread = pthread_getspecific(seq_thread_key); + if (thread) { + ovs_mutex_lock(&seq_mutex); + seq_thread_woke(thread); + thread->waiting = false; + ovs_mutex_unlock(&seq_mutex); + } +} + +static void +seq_init(void) +{ + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; + + if (ovsthread_once_start(&once)) { + xpthread_key_create(&seq_thread_key, seq_thread_exit); + ovsthread_once_done(&once); + } +} + +static struct seq_thread * +seq_thread_get(void) + OVS_REQUIRES(seq_mutex) +{ + struct seq_thread *thread = pthread_getspecific(seq_thread_key); + if (!thread) { + thread = xmalloc(sizeof *thread); + list_init(&thread->waiters); + latch_init(&thread->latch); + thread->waiting = false; + + xpthread_setspecific(seq_thread_key, thread); + } + return thread; +} + +static void +seq_thread_exit(void *thread_) + OVS_EXCLUDED(seq_mutex) +{ + struct seq_thread *thread = thread_; + + ovs_mutex_lock(&seq_mutex); + seq_thread_woke(thread); + latch_destroy(&thread->latch); + free(thread); + ovs_mutex_unlock(&seq_mutex); +} + +static void +seq_thread_woke(struct seq_thread *thread) + OVS_REQUIRES(seq_mutex) +{ + struct seq_waiter *waiter, *next_waiter; + + LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) { + ovs_assert(waiter->thread == thread); + seq_waiter_destroy(waiter); + } + latch_poll(&thread->latch); +} + +static void +seq_waiter_destroy(struct seq_waiter *waiter) + OVS_REQUIRES(seq_mutex) +{ + hmap_remove(&waiter->seq->waiters, &waiter->hmap_node); + list_remove(&waiter->list_node); + free(waiter); +} + +static void +seq_wake_waiters(struct seq *seq) + OVS_REQUIRES(seq_mutex) +{ + struct seq_waiter *waiter, *next_waiter; + + HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) { + latch_set(&waiter->thread->latch); + seq_waiter_destroy(waiter); + } +} diff --git a/lib/seq.h b/lib/seq.h new file mode 100644 index 000000000..3423e217d --- /dev/null +++ b/lib/seq.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SEQ_H +#define SEQ_H 1 + +/* Thread-safe, pollable sequence number. + * + * + * Background + * ========== + * + * It is sometimes desirable to take an action whenever an object changes. + * Suppose we associate a sequence number with an object and increment the + * sequence number whenver we change the object. An observer can then record + * the sequence number it sees. Later on, if the current sequence number + * differs from the one it saw last, then the observer knows to examine the + * object for changes. + * + * Code that wants to run when a sequence number changes is challenging to + * implement in a multithreaded environment. A naive implementation, that + * simply checks whether the sequence number changed and, if so, calls + * poll_immediate_wake(), will fail when another thread increments the sequence + * number after the check (including during poll_block()). + * + * struct seq is a solution. It implements a sequence number along with enough + * internal infrastructure so that a thread waiting on a particular value will + * wake up if the sequence number changes, or even if the "struct seq" is + * destroyed. + * + * + * Usage + * ===== + * + * The object that includes a sequence number should use seq_create() and + * seq_destroy() at creation and destruction, and seq_change() whenever the + * object's observable state changes. + * + * An observer may seq_read() to read the current sequence number and + * seq_wait() to cause poll_block() to wake up when the sequence number changes + * from a specified value. + * + * To avoid races, observers should use seq_read() to check for changes, + * process any changes, and then use seq_wait() to wait for a change from the + * previously read value. That is, a correct usage looks something like this: + * + * new_seq = seq_read(seq); + * if (new_seq != last_seq) { + * ...process changes... + * last_seq = new_seq; + * } + * seq_wait(seq, new_seq); + * poll_block(); + * + * + * Thread-safety + * ============= + * + * Fully thread safe. + */ + +#include + +/* For implementation of an object with a sequence number attached. */ +struct seq *seq_create(void); +void seq_destroy(struct seq *); +void seq_change(struct seq *); + +/* For observers. */ +uint64_t seq_read(const struct seq *); +void seq_wait(const struct seq *, uint64_t value); + +/* For poll_block() internal use. */ +void seq_woke(void); + +#endif /* seq.h */