seq: New module for race-free, pollable, thread-safe sequence number.
[sliver-openvswitch.git] / lib / seq.c
1 /*
2  * Copyright (c) 2013 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "seq.h"
20
21 #include <stdbool.h>
22
23 #include "hash.h"
24 #include "hmap.h"
25 #include "latch.h"
26 #include "list.h"
27 #include "ovs-thread.h"
28 #include "poll-loop.h"
29
30 /* A sequence number object. */
31 struct seq {
32     uint64_t value OVS_GUARDED;
33     struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
34 };
35
36 /* A thread waiting on a particular seq. */
37 struct seq_waiter {
38     struct seq *seq OVS_GUARDED;            /* Seq being waited for. */
39     struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */
40     unsigned int ovsthread_id OVS_GUARDED;  /* Key in 'waiters' hmap. */
41
42     struct seq_thread *thread OVS_GUARDED; /* Thread preparing to wait. */
43     struct list list_node OVS_GUARDED;     /* In 'thread->waiters'. */
44
45     uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */
46 };
47
48 /* A thread that might be waiting on one or more seqs. */
49 struct seq_thread {
50     struct list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
51     struct latch latch OVS_GUARDED;  /* Wakeup latch for this thread. */
52     bool waiting OVS_GUARDED;        /* True if latch_wait() already called. */
53 };
54
55 static struct ovs_mutex seq_mutex = OVS_ADAPTIVE_MUTEX_INITIALIZER;
56
57 static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
58
59 static pthread_key_t seq_thread_key;
60
61 static void seq_init(void);
62 static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex);
63 static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex);
64 static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex);
65 static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
66 static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
67
68 /* Creates and returns a new 'seq' object. */
69 struct seq * OVS_EXCLUDED(seq_mutex)
70 seq_create(void)
71 {
72     struct seq *seq;
73
74     seq_init();
75
76     seq = xmalloc(sizeof *seq);
77     ovs_mutex_lock(&seq_mutex);
78     seq->value = seq_next++;
79     hmap_init(&seq->waiters);
80     ovs_mutex_unlock(&seq_mutex);
81
82     return seq;
83 }
84
85 /* Destroys 'seq', waking up threads that were waiting on it, if any. */
86 void
87 seq_destroy(struct seq *seq)
88      OVS_EXCLUDED(seq_mutex)
89 {
90     ovs_mutex_lock(&seq_mutex);
91     seq_wake_waiters(seq);
92     hmap_destroy(&seq->waiters);
93     free(seq);
94     ovs_mutex_unlock(&seq_mutex);
95 }
96
97 /* Increments 'seq''s sequence number, waking up any threads that are waiting
98  * on 'seq'. */
99 void
100 seq_change(struct seq *seq)
101     OVS_EXCLUDED(seq_mutex)
102 {
103     ovs_mutex_lock(&seq_mutex);
104     seq->value = seq_next++;
105     seq_wake_waiters(seq);
106     ovs_mutex_unlock(&seq_mutex);
107 }
108
109 /* Returns 'seq''s current sequence number (which could change immediately). */
110 uint64_t
111 seq_read(const struct seq *seq)
112     OVS_EXCLUDED(seq_mutex)
113 {
114     uint64_t value;
115
116     ovs_mutex_lock(&seq_mutex);
117     value = seq->value;
118     ovs_mutex_unlock(&seq_mutex);
119
120     return value;
121 }
122
123 static void
124 seq_wait__(struct seq *seq, uint64_t value)
125     OVS_REQUIRES(seq_mutex)
126 {
127     unsigned int id = ovsthread_id_self();
128     uint32_t hash = hash_int(id, 0);
129     struct seq_waiter *waiter;
130
131     HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) {
132         if (waiter->ovsthread_id == id) {
133             if (waiter->value != value) {
134                 /* The current value is different from the value we've already
135                  * waited for, */
136                 poll_immediate_wake();
137             } else {
138                 /* Already waiting on 'value', nothing more to do. */
139             }
140             return;
141         }
142     }
143
144     waiter = xmalloc(sizeof *waiter);
145     waiter->seq = seq;
146     hmap_insert(&seq->waiters, &waiter->hmap_node, hash);
147     waiter->value = value;
148     waiter->thread = seq_thread_get();
149     list_push_back(&waiter->thread->waiters, &waiter->list_node);
150
151     if (!waiter->thread->waiting) {
152         latch_wait(&waiter->thread->latch);
153         waiter->thread->waiting = true;
154     }
155 }
156
157 /* Causes the following poll_block() to wake up when 'seq''s sequence number
158  * changes from 'value'.  (If 'seq''s sequence number isn't 'value', then
159  * poll_block() won't block at all.) */
160 void
161 seq_wait(const struct seq *seq_, uint64_t value)
162     OVS_EXCLUDED(seq_mutex)
163 {
164     struct seq *seq = CONST_CAST(struct seq *, seq_);
165
166     ovs_mutex_lock(&seq_mutex);
167     if (value == seq->value) {
168         seq_wait__(seq, value);
169     } else {
170         poll_immediate_wake();
171     }
172     ovs_mutex_unlock(&seq_mutex);
173 }
174
175 /* Called by poll_block() just before it returns, this function destroys any
176  * seq_waiter objects associated with the current thread. */
177 void
178 seq_woke(void)
179     OVS_EXCLUDED(seq_mutex)
180 {
181     struct seq_thread *thread;
182
183     seq_init();
184
185     thread = pthread_getspecific(seq_thread_key);
186     if (thread) {
187         ovs_mutex_lock(&seq_mutex);
188         seq_thread_woke(thread);
189         thread->waiting = false;
190         ovs_mutex_unlock(&seq_mutex);
191     }
192 }
193 \f
194 static void
195 seq_init(void)
196 {
197     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
198
199     if (ovsthread_once_start(&once)) {
200         xpthread_key_create(&seq_thread_key, seq_thread_exit);
201         ovsthread_once_done(&once);
202     }
203 }
204
205 static struct seq_thread *
206 seq_thread_get(void)
207     OVS_REQUIRES(seq_mutex)
208 {
209     struct seq_thread *thread = pthread_getspecific(seq_thread_key);
210     if (!thread) {
211         thread = xmalloc(sizeof *thread);
212         list_init(&thread->waiters);
213         latch_init(&thread->latch);
214         thread->waiting = false;
215
216         xpthread_setspecific(seq_thread_key, thread);
217     }
218     return thread;
219 }
220
221 static void
222 seq_thread_exit(void *thread_)
223     OVS_EXCLUDED(seq_mutex)
224 {
225     struct seq_thread *thread = thread_;
226
227     ovs_mutex_lock(&seq_mutex);
228     seq_thread_woke(thread);
229     latch_destroy(&thread->latch);
230     free(thread);
231     ovs_mutex_unlock(&seq_mutex);
232 }
233
234 static void
235 seq_thread_woke(struct seq_thread *thread)
236     OVS_REQUIRES(seq_mutex)
237 {
238     struct seq_waiter *waiter, *next_waiter;
239
240     LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) {
241         ovs_assert(waiter->thread == thread);
242         seq_waiter_destroy(waiter);
243     }
244     latch_poll(&thread->latch);
245 }
246
247 static void
248 seq_waiter_destroy(struct seq_waiter *waiter)
249     OVS_REQUIRES(seq_mutex)
250 {
251     hmap_remove(&waiter->seq->waiters, &waiter->hmap_node);
252     list_remove(&waiter->list_node);
253     free(waiter);
254 }
255
256 static void
257 seq_wake_waiters(struct seq *seq)
258     OVS_REQUIRES(seq_mutex)
259 {
260     struct seq_waiter *waiter, *next_waiter;
261
262     HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) {
263         latch_set(&waiter->thread->latch);
264         seq_waiter_destroy(waiter);
265     }
266 }