2 * RelayFS lockless scheme implementation.
4 * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@opersys.com)
5 * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
6 * Copyright (C) 2002, 2003 - Bob Wisniewski (bob@watson.ibm.com), IBM Corp
8 * This file is released under the GPL.
11 #include <asm/relay.h>
12 #include "relay_lockless.h"
16 * compare_and_store_volatile - self-explicit
17 * @ptr: ptr to the word that will receive the new value
18 * @oval: the value we think is currently in *ptr
19 * @nval: the value *ptr will get if we were right
22 compare_and_store_volatile(volatile u32 *ptr,
29 prev = cmpxchg(ptr, oval, nval);
32 return (prev == oval);
36 * atomic_set_volatile - atomically set the value in ptr to nval.
37 * @ptr: ptr to the word that will receive the new value
38 * @nval: the new value
41 atomic_set_volatile(atomic_t *ptr,
45 atomic_set(ptr, (int)nval);
50 * atomic_add_volatile - atomically add val to the value at ptr.
51 * @ptr: ptr to the word that will receive the addition
52 * @val: the value to add to *ptr
55 atomic_add_volatile(atomic_t *ptr, u32 val)
58 atomic_add((int)val, ptr);
63 * atomic_sub_volatile - atomically substract val from the value at ptr.
64 * @ptr: ptr to the word that will receive the subtraction
65 * @val: the value to subtract from *ptr
68 atomic_sub_volatile(atomic_t *ptr, s32 val)
71 atomic_sub((int)val, ptr);
76 * lockless_commit - commit a reserved slot in the buffer
78 * @from: commit the length starting here
79 * @len: length committed
80 * @deliver: length committed
81 * @interrupting: not used
83 * Commits len bytes and calls deliver callback if applicable.
86 lockless_commit(struct rchan *rchan,
94 idx = from - rchan->buf;
97 bufno = RELAY_BUFNO_GET(idx, offset_bits(rchan));
98 atomic_add_volatile(&fill_count(rchan, bufno), len);
102 u32 mask = offset_mask(rchan);
103 if (bulk_delivery(rchan)) {
104 from = rchan->buf + RELAY_BUF_OFFSET_CLEAR(idx, mask);
105 len += RELAY_BUF_OFFSET_GET(idx, mask);
107 rchan->callbacks->deliver(rchan->id, from, len);
113 * get_buffer_end - get the address of the end of buffer
114 * @rchan: the channel
115 * @buf_idx: index into channel corresponding to address
118 get_buffer_end(struct rchan *rchan, u32 buf_idx)
121 + RELAY_BUF_OFFSET_CLEAR(buf_idx, offset_mask(rchan))
122 + RELAY_BUF_SIZE(offset_bits(rchan));
127 * finalize_buffer - utility function consolidating end-of-buffer tasks.
128 * @rchan: the channel
129 * @end_idx: index into buffer to write the end-buffer event at
130 * @size_lost: number of unused bytes at the end of the buffer
131 * @time_stamp: the time of the end-buffer event
132 * @tsc: the timestamp counter associated with time
133 * @resetting: are we resetting the channel?
135 * This function must be called with local irqs disabled.
138 finalize_buffer(struct rchan *rchan,
141 struct timeval *time_stamp,
150 cur_write_pos = rchan->buf + end_idx;
151 write_buf_end = get_buffer_end(rchan, end_idx - 1);
153 bytes_written = rchan->callbacks->buffer_end(rchan->id, cur_write_pos,
154 write_buf_end, *time_stamp, *tsc, using_tsc(rchan));
155 if (bytes_written == 0)
156 rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] = size_lost;
158 bufno = RELAY_BUFNO_GET(end_idx, offset_bits(rchan));
159 atomic_add_volatile(&fill_count(rchan, bufno), size_lost);
161 rchan->bufs_produced = rchan->bufs_produced + rchan->n_bufs;
162 rchan->bufs_produced -= rchan->bufs_produced % rchan->n_bufs;
163 rchan->bufs_consumed = rchan->bufs_produced;
164 rchan->bytes_consumed = 0;
165 update_readers_consumed(rchan, rchan->bufs_consumed, rchan->bytes_consumed);
167 rchan->bufs_produced++;
171 * lockless_finalize: - finalize last buffer at end of channel use
172 * @rchan: the channel
175 lockless_finalize(struct rchan *rchan)
179 unsigned long int flags;
183 event_end_idx = RELAY_BUF_OFFSET_GET(idx(rchan), offset_mask(rchan));
184 size_lost = RELAY_BUF_SIZE(offset_bits(rchan)) - event_end_idx;
186 local_irq_save(flags);
187 get_timestamp(&time, &tsc, rchan);
188 finalize_buffer(rchan, idx(rchan) & idx_mask(rchan), size_lost,
190 local_irq_restore(flags);
194 * discard_check: - determine whether a write should be discarded
195 * @rchan: the channel
196 * @old_idx: index into buffer where check for space should begin
197 * @write_len: the length of the write to check
198 * @time_stamp: the time of the end-buffer event
199 * @tsc: the timestamp counter associated with time
201 * The return value contains the result flags and is an ORed combination
204 * RELAY_WRITE_DISCARD_NONE - write should not be discarded
205 * RELAY_BUFFER_SWITCH - buffer switch occurred
206 * RELAY_WRITE_DISCARD - write should be discarded (all buffers are full)
207 * RELAY_WRITE_TOO_LONG - write won't fit into even an empty buffer
210 discard_check(struct rchan *rchan,
213 struct timeval *time_stamp,
217 u32 offset_mask = offset_mask(rchan);
218 u8 offset_bits = offset_bits(rchan);
219 u32 idx_mask = idx_mask(rchan);
221 unsigned long int flags;
223 if (write_len > RELAY_BUF_SIZE(offset_bits))
224 return RELAY_WRITE_DISCARD | RELAY_WRITE_TOO_LONG;
226 if (mode_continuous(rchan))
227 return RELAY_WRITE_DISCARD_NONE;
229 local_irq_save(flags);
230 if (atomic_read(&rchan->suspended) == 1) {
231 local_irq_restore(flags);
232 return RELAY_WRITE_DISCARD;
234 if (rchan->half_switch) {
235 local_irq_restore(flags);
236 return RELAY_WRITE_DISCARD_NONE;
238 buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
239 if (buffers_ready == rchan->n_bufs - 1) {
240 atomic_set(&rchan->suspended, 1);
241 size_lost = RELAY_BUF_SIZE(offset_bits)
242 - RELAY_BUF_OFFSET_GET(old_idx, offset_mask);
243 finalize_buffer(rchan, old_idx & idx_mask, size_lost,
245 rchan->half_switch = 1;
246 idx(rchan) = RELAY_BUF_OFFSET_CLEAR((old_idx & idx_mask), offset_mask(rchan)) + RELAY_BUF_SIZE(offset_bits) - 1;
247 local_irq_restore(flags);
249 return RELAY_BUFFER_SWITCH | RELAY_WRITE_DISCARD;
251 local_irq_restore(flags);
253 return RELAY_WRITE_DISCARD_NONE;
257 * switch_buffers - switch over to a new sub-buffer
258 * @rchan: the channel
259 * @slot_len: the length of the slot needed for the current write
260 * @offset: the offset calculated for the new index
262 * @tsc: the timestamp counter associated with time
263 * @old_idx: the value of the buffer control index when we were called
264 * @old_idx: the new calculated value of the buffer control index
265 * @resetting: are we resetting the channel?
268 switch_buffers(struct rchan *rchan,
277 u32 size_lost = rchan->end_reserve;
278 unsigned long int flags;
279 u32 idx_mask = idx_mask(rchan);
280 u8 offset_bits = offset_bits(rchan);
283 u32 start_reserve = rchan->start_reserve;
286 size_lost = RELAY_BUF_SIZE(offset_bits(rchan)) - old_idx % rchan->buf_size;
289 size_lost += slot_len - offset;
293 local_irq_save(flags);
294 if (!rchan->half_switch)
295 finalize_buffer(rchan, old_idx & idx_mask, size_lost,
297 rchan->half_switch = 0;
298 rchan->buf_start_time = *ts;
299 rchan->buf_start_tsc = *tsc;
300 local_irq_restore(flags);
302 cur_write_pos = rchan->buf + RELAY_BUF_OFFSET_CLEAR((new_idx
303 & idx_mask), offset_mask(rchan));
310 rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] = 0;
312 rchan->callbacks->buffer_start(rchan->id, cur_write_pos,
313 rchan->buf_id, *ts, *tsc, using_tsc(rchan));
314 new_buf_no = RELAY_BUFNO_GET(new_idx & idx_mask, offset_bits);
315 atomic_sub_volatile(&fill_count(rchan, new_buf_no),
316 RELAY_BUF_SIZE(offset_bits) - start_reserve);
317 if (atomic_read(&fill_count(rchan, new_buf_no)) < start_reserve)
318 atomic_set_volatile(&fill_count(rchan, new_buf_no),
323 * lockless_reserve_slow - the slow reserve path in the lockless scheme
324 * @rchan: the channel
325 * @slot_len: the length of the slot to reserve
326 * @ts: variable that will receive the time the slot was reserved
327 * @tsc: the timestamp counter associated with time
328 * @old_idx: the value of the buffer control index when we were called
329 * @err: receives the result flags
331 * Returns pointer to the beginning of the reserved slot, NULL if error.
333 * err values same as for lockless_reserve.
336 lockless_reserve_slow(struct rchan *rchan,
344 unsigned long int flags;
345 u32 offset_mask = offset_mask(rchan);
346 u32 idx_mask = idx_mask(rchan);
347 u32 start_reserve = rchan->start_reserve;
348 u32 end_reserve = rchan->end_reserve;
352 int initializing = 0;
354 *err = RELAY_BUFFER_SWITCH_NONE;
356 discard_event = discard_check(rchan, old_idx, slot_len, ts, tsc);
357 if (discard_event != RELAY_WRITE_DISCARD_NONE) {
358 *err = discard_event;
362 local_irq_save(flags);
363 if (rchan->initialized == 0) {
364 rchan->initialized = initializing = 1;
365 idx(rchan) = rchan->start_reserve + rchan->rchan_start_reserve;
367 local_irq_restore(flags);
370 old_idx = idx(rchan);
371 new_idx = old_idx + slot_len;
373 offset = RELAY_BUF_OFFSET_GET(new_idx + end_reserve,
375 if ((offset < slot_len) && (offset > 0)) {
376 reserved_idx = RELAY_BUF_OFFSET_CLEAR(new_idx
377 + end_reserve, offset_mask) + start_reserve;
378 new_idx = reserved_idx + slot_len;
379 } else if (offset < slot_len) {
380 reserved_idx = old_idx;
381 new_idx = RELAY_BUF_OFFSET_CLEAR(new_idx
382 + end_reserve, offset_mask) + start_reserve;
384 reserved_idx = old_idx;
385 get_timestamp(ts, tsc, rchan);
386 } while (!compare_and_store_volatile(&idx(rchan), old_idx, new_idx));
388 reserved_idx &= idx_mask;
390 if (initializing == 1) {
391 cur_write_pos = rchan->buf
392 + RELAY_BUF_OFFSET_CLEAR((old_idx & idx_mask),
394 rchan->buf_start_time = *ts;
395 rchan->buf_start_tsc = *tsc;
396 rchan->unused_bytes[0] = 0;
398 rchan->callbacks->buffer_start(rchan->id, cur_write_pos,
399 rchan->buf_id, *ts, *tsc, using_tsc(rchan));
402 if (offset < slot_len) {
403 switch_buffers(rchan, slot_len, offset, ts, tsc, new_idx,
405 *err = RELAY_BUFFER_SWITCH;
408 /* If not using TSC, need to calc time delta */
409 recalc_time_delta(ts, tsc, rchan);
411 return rchan->buf + reserved_idx;
415 * lockless_reserve - reserve a slot in the buffer for an event.
416 * @rchan: the channel
417 * @slot_len: the length of the slot to reserve
418 * @ts: variable that will receive the time the slot was reserved
419 * @tsc: the timestamp counter associated with time
420 * @err: receives the result flags
421 * @interrupting: not used
423 * Returns pointer to the beginning of the reserved slot, NULL if error.
425 * The err value contains the result flags and is an ORed combination
428 * RELAY_BUFFER_SWITCH_NONE - no buffer switch occurred
429 * RELAY_EVENT_DISCARD_NONE - event should not be discarded
430 * RELAY_BUFFER_SWITCH - buffer switch occurred
431 * RELAY_EVENT_DISCARD - event should be discarded (all buffers are full)
432 * RELAY_EVENT_TOO_LONG - event won't fit into even an empty buffer
435 lockless_reserve(struct rchan *rchan,
442 u32 old_idx, new_idx, offset;
443 u32 offset_mask = offset_mask(rchan);
446 old_idx = idx(rchan);
447 new_idx = old_idx + slot_len;
449 offset = RELAY_BUF_OFFSET_GET(new_idx + rchan->end_reserve,
451 if (offset < slot_len)
452 return lockless_reserve_slow(rchan, slot_len,
453 ts, tsc, old_idx, err);
454 get_time_or_tsc(ts, tsc, rchan);
455 } while (!compare_and_store_volatile(&idx(rchan), old_idx, new_idx));
457 /* If not using TSC, need to calc time delta */
458 recalc_time_delta(ts, tsc, rchan);
460 *err = RELAY_BUFFER_SWITCH_NONE;
462 return rchan->buf + (old_idx & idx_mask(rchan));
466 * lockless_get_offset - get current and max channel offsets
467 * @rchan: the channel
468 * @max_offset: maximum channel offset
470 * Returns the current and maximum channel offsets.
473 lockless_get_offset(struct rchan *rchan,
477 *max_offset = rchan->buf_size * rchan->n_bufs - 1;
479 return rchan->initialized ? idx(rchan) & idx_mask(rchan) : 0;
483 * lockless_reset - reset the channel
484 * @rchan: the channel
485 * @init: 1 if this is a first-time channel initialization
487 void lockless_reset(struct rchan *rchan, int init)
491 /* Start first buffer at 0 - (end_reserve + 1) so that it
492 gets initialized via buffer_start callback as well. */
493 idx(rchan) = 0UL - (rchan->end_reserve + 1);
495 (1UL << (bufno_bits(rchan) + offset_bits(rchan))) - 1;
496 atomic_set(&fill_count(rchan, 0),
497 (int)rchan->start_reserve +
498 (int)rchan->rchan_start_reserve);
499 for (i = 1; i < rchan->n_bufs; i++)
500 atomic_set(&fill_count(rchan, i),
501 (int)RELAY_BUF_SIZE(offset_bits(rchan)));
505 * lockless_reset_index - atomically set channel index to the beginning
506 * @rchan: the channel
507 * @old_idx: the current index
509 * If this fails, it means that something else just logged something
510 * and therefore we probably no longer want to do this. It's up to the
513 * Returns 0 if the index was successfully set, negative otherwise
516 lockless_reset_index(struct rchan *rchan, u32 old_idx)
522 if (compare_and_store_volatile(&idx(rchan), old_idx, 0)) {
523 new_idx = rchan->start_reserve;
524 switch_buffers(rchan, 0, 0, &ts, &tsc, new_idx, old_idx, 1);