2 * Copyright (c) 2013 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "ovs-thread.h"
25 #include "poll-loop.h"
26 #include "socket-util.h"
30 /* Omit the definitions in this file because they are somewhat difficult to
31 * write without prompting "sparse" complaints, without ugliness or
32 * cut-and-paste. Since "sparse" is just a checker, not a compiler, it
33 * doesn't matter that we don't define them. */
37 VLOG_DEFINE_THIS_MODULE(ovs_thread);
39 /* If there is a reason that we cannot fork anymore (unless the fork will be
40 * immediately followed by an exec), then this points to a string that
42 static const char *must_not_fork;
44 /* True if we created any threads beyond the main initial thread. */
45 static bool multithreaded;
47 #define LOCK_FUNCTION(TYPE, FUN) \
49 ovs_##TYPE##_##FUN##_at(const struct ovs_##TYPE *l_, \
51 OVS_NO_THREAD_SAFETY_ANALYSIS \
53 struct ovs_##TYPE *l = CONST_CAST(struct ovs_##TYPE *, l_); \
54 int error = pthread_##TYPE##_##FUN(&l->lock); \
55 if (OVS_UNLIKELY(error)) { \
56 ovs_abort(error, "pthread_%s_%s failed", #TYPE, #FUN); \
60 LOCK_FUNCTION(mutex, lock);
61 LOCK_FUNCTION(rwlock, rdlock);
62 LOCK_FUNCTION(rwlock, wrlock);
64 #define TRY_LOCK_FUNCTION(TYPE, FUN) \
66 ovs_##TYPE##_##FUN##_at(const struct ovs_##TYPE *l_, \
68 OVS_NO_THREAD_SAFETY_ANALYSIS \
70 struct ovs_##TYPE *l = CONST_CAST(struct ovs_##TYPE *, l_); \
71 int error = pthread_##TYPE##_##FUN(&l->lock); \
72 if (OVS_UNLIKELY(error) && error != EBUSY) { \
73 ovs_abort(error, "pthread_%s_%s failed", #TYPE, #FUN); \
80 TRY_LOCK_FUNCTION(mutex, trylock);
81 TRY_LOCK_FUNCTION(rwlock, tryrdlock);
82 TRY_LOCK_FUNCTION(rwlock, trywrlock);
84 #define UNLOCK_FUNCTION(TYPE, FUN) \
86 ovs_##TYPE##_##FUN(const struct ovs_##TYPE *l_) \
87 OVS_NO_THREAD_SAFETY_ANALYSIS \
89 struct ovs_##TYPE *l = CONST_CAST(struct ovs_##TYPE *, l_); \
92 error = pthread_##TYPE##_##FUN(&l->lock); \
93 if (OVS_UNLIKELY(error)) { \
94 ovs_abort(error, "pthread_%s_%sfailed", #TYPE, #FUN); \
97 UNLOCK_FUNCTION(mutex, unlock);
98 UNLOCK_FUNCTION(mutex, destroy);
99 UNLOCK_FUNCTION(rwlock, unlock);
100 UNLOCK_FUNCTION(rwlock, destroy);
102 #define XPTHREAD_FUNC1(FUNCTION, PARAM1) \
104 x##FUNCTION(PARAM1 arg1) \
106 int error = FUNCTION(arg1); \
107 if (OVS_UNLIKELY(error)) { \
108 ovs_abort(error, "%s failed", #FUNCTION); \
111 #define XPTHREAD_FUNC2(FUNCTION, PARAM1, PARAM2) \
113 x##FUNCTION(PARAM1 arg1, PARAM2 arg2) \
115 int error = FUNCTION(arg1, arg2); \
116 if (OVS_UNLIKELY(error)) { \
117 ovs_abort(error, "%s failed", #FUNCTION); \
121 XPTHREAD_FUNC1(pthread_mutex_lock, pthread_mutex_t *);
122 XPTHREAD_FUNC1(pthread_mutex_unlock, pthread_mutex_t *);
123 XPTHREAD_FUNC1(pthread_mutexattr_init, pthread_mutexattr_t *);
124 XPTHREAD_FUNC1(pthread_mutexattr_destroy, pthread_mutexattr_t *);
125 XPTHREAD_FUNC2(pthread_mutexattr_settype, pthread_mutexattr_t *, int);
126 XPTHREAD_FUNC2(pthread_mutexattr_gettype, pthread_mutexattr_t *, int *);
128 XPTHREAD_FUNC2(pthread_cond_init, pthread_cond_t *, pthread_condattr_t *);
129 XPTHREAD_FUNC1(pthread_cond_destroy, pthread_cond_t *);
130 XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *);
131 XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *);
133 XPTHREAD_FUNC2(pthread_join, pthread_t, void **);
135 typedef void destructor_func(void *);
136 XPTHREAD_FUNC2(pthread_key_create, pthread_key_t *, destructor_func *);
137 XPTHREAD_FUNC2(pthread_setspecific, pthread_key_t, const void *);
140 ovs_mutex_init__(const struct ovs_mutex *l_, int type)
142 struct ovs_mutex *l = CONST_CAST(struct ovs_mutex *, l_);
143 pthread_mutexattr_t attr;
147 xpthread_mutexattr_init(&attr);
148 xpthread_mutexattr_settype(&attr, type);
149 error = pthread_mutex_init(&l->lock, &attr);
150 if (OVS_UNLIKELY(error)) {
151 ovs_abort(error, "pthread_mutex_init failed");
153 xpthread_mutexattr_destroy(&attr);
156 /* Initializes 'mutex' as a normal (non-recursive) mutex. */
158 ovs_mutex_init(const struct ovs_mutex *mutex)
160 ovs_mutex_init__(mutex, PTHREAD_MUTEX_ERRORCHECK);
163 /* Initializes 'mutex' as a recursive mutex. */
165 ovs_mutex_init_recursive(const struct ovs_mutex *mutex)
167 ovs_mutex_init__(mutex, PTHREAD_MUTEX_RECURSIVE);
171 ovs_rwlock_init(const struct ovs_rwlock *l_)
173 struct ovs_rwlock *l = CONST_CAST(struct ovs_rwlock *, l_);
177 error = pthread_rwlock_init(&l->lock, NULL);
178 if (OVS_UNLIKELY(error)) {
179 ovs_abort(error, "pthread_rwlock_init failed");
184 ovs_mutex_cond_wait(pthread_cond_t *cond, const struct ovs_mutex *mutex_)
186 struct ovs_mutex *mutex = CONST_CAST(struct ovs_mutex *, mutex_);
187 int error = pthread_cond_wait(cond, &mutex->lock);
188 if (OVS_UNLIKELY(error)) {
189 ovs_abort(error, "pthread_cond_wait failed");
193 DEFINE_EXTERN_PER_THREAD_DATA(ovsthread_id, 0);
195 struct ovsthread_aux {
196 void *(*start)(void *);
201 ovsthread_wrapper(void *aux_)
203 static atomic_uint next_id = ATOMIC_VAR_INIT(1);
205 struct ovsthread_aux *auxp = aux_;
206 struct ovsthread_aux aux;
209 atomic_add(&next_id, 1, &id);
210 *ovsthread_id_get() = id;
215 return aux.start(aux.arg);
219 xpthread_create(pthread_t *threadp, pthread_attr_t *attr,
220 void *(*start)(void *), void *arg)
222 struct ovsthread_aux *aux;
226 forbid_forking("multiple threads exist");
227 multithreaded = true;
229 aux = xmalloc(sizeof *aux);
233 error = pthread_create(threadp ? threadp : &thread, attr,
234 ovsthread_wrapper, aux);
236 ovs_abort(error, "pthread_create failed");
241 ovsthread_once_start__(struct ovsthread_once *once)
243 ovs_mutex_lock(&once->mutex);
244 if (!ovsthread_once_is_done__(once)) {
247 ovs_mutex_unlock(&once->mutex);
252 ovsthread_once_done(struct ovsthread_once *once)
254 atomic_store(&once->done, true);
255 ovs_mutex_unlock(&once->mutex);
258 /* Asserts that the process has not yet created any threads (beyond the initial
261 * ('where' is used in logging. Commonly one would use
262 * assert_single_threaded() to automatically provide the caller's source file
263 * and line number for 'where'.) */
265 assert_single_threaded_at(const char *where)
268 VLOG_FATAL("%s: attempted operation not allowed when multithreaded",
273 /* Forks the current process (checking that this is allowed). Aborts with
274 * VLOG_FATAL if fork() returns an error, and otherwise returns the value
275 * returned by fork().
277 * ('where' is used in logging. Commonly one would use xfork() to
278 * automatically provide the caller's source file and line number for
281 xfork_at(const char *where)
286 VLOG_FATAL("%s: attempted to fork but forking not allowed (%s)",
287 where, must_not_fork);
292 VLOG_FATAL("%s: fork failed (%s)", where, ovs_strerror(errno));
297 /* Notes that the process must not call fork() from now on, for the specified
298 * 'reason'. (The process may still fork() if it execs itself immediately
301 forbid_forking(const char *reason)
303 ovs_assert(reason != NULL);
304 must_not_fork = reason;
307 /* Returns true if the process is allowed to fork, false otherwise. */
311 return !must_not_fork;
314 /* ovsthread_counter.
316 * We implement the counter as an array of N_COUNTERS individual counters, each
317 * with its own lock. Each thread uses one of the counters chosen based on a
318 * hash of the thread's ID, the idea being that, statistically, different
319 * threads will tend to use different counters and therefore avoid
320 * interfering with each other.
322 * Undoubtedly, better implementations are possible. */
324 /* Basic counter structure. */
325 struct ovsthread_counter__ {
326 struct ovs_mutex mutex;
327 unsigned long long int value;
330 /* Pad the basic counter structure to 64 bytes to avoid cache line
332 struct ovsthread_counter {
333 struct ovsthread_counter__ c;
334 char pad[ROUND_UP(sizeof(struct ovsthread_counter__), 64)
335 - sizeof(struct ovsthread_counter__)];
338 #define N_COUNTERS 16
340 struct ovsthread_counter *
341 ovsthread_counter_create(void)
343 struct ovsthread_counter *c;
346 c = xmalloc(N_COUNTERS * sizeof *c);
347 for (i = 0; i < N_COUNTERS; i++) {
348 ovs_mutex_init(&c[i].c.mutex);
355 ovsthread_counter_destroy(struct ovsthread_counter *c)
360 for (i = 0; i < N_COUNTERS; i++) {
361 ovs_mutex_destroy(&c[i].c.mutex);
368 ovsthread_counter_inc(struct ovsthread_counter *c, unsigned long long int n)
370 c = &c[hash_int(ovsthread_id_self(), 0) % N_COUNTERS];
372 ovs_mutex_lock(&c->c.mutex);
374 ovs_mutex_unlock(&c->c.mutex);
377 unsigned long long int
378 ovsthread_counter_read(const struct ovsthread_counter *c)
380 unsigned long long int sum;
384 for (i = 0; i < N_COUNTERS; i++) {
385 ovs_mutex_lock(&c[i].c.mutex);
387 ovs_mutex_unlock(&c[i].c.mutex);
392 /* Parses /proc/cpuinfo for the total number of physical cores on this system
393 * across all CPU packages, not counting hyper-threads.
395 * Sets *n_cores to the total number of cores on this system, or 0 if the
396 * number cannot be determined. */
398 parse_cpuinfo(long int *n_cores)
400 static const char file_name[] = "/proc/cpuinfo";
402 uint64_t cpu = 0; /* Support up to 64 CPU packages on a single system. */
406 stream = fopen(file_name, "r");
408 VLOG_DBG("%s: open failed (%s)", file_name, ovs_strerror(errno));
412 while (fgets(line, sizeof line, stream)) {
415 /* Find the next CPU package. */
416 if (ovs_scan(line, "physical id%*[^:]: %u", &id)) {
418 VLOG_WARN("Counted over 64 CPU packages on this system. "
419 "Parsing %s for core count may be inaccurate.",
425 if (cpu & (1 << id)) {
426 /* We've already counted this package's cores. */
431 /* Find the number of cores for this package. */
432 while (fgets(line, sizeof line, stream)) {
435 if (ovs_scan(line, "cpu cores%*[^:]: %u", &count)) {
447 /* Returns the total number of cores on this system, or 0 if the number cannot
450 * Tries not to count hyper-threads, but may be inaccurate - particularly on
451 * platforms that do not provide /proc/cpuinfo, but also if /proc/cpuinfo is
452 * formatted different to the layout that parse_cpuinfo() expects. */
454 count_cpu_cores(void)
456 static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
457 static long int n_cores;
459 if (ovsthread_once_start(&once)) {
460 parse_cpuinfo(&n_cores);
462 n_cores = sysconf(_SC_NPROCESSORS_ONLN);
464 ovsthread_once_done(&once);
467 return n_cores > 0 ? n_cores : 0;