This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / fs / relayfs / relay.c
1 /*
2  * Public API and common code for RelayFS.
3  *
4  * Please see Documentation/filesystems/relayfs.txt for API description.
5  * 
6  * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
7  * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@opersys.com)
8  *
9  * This file is released under the GPL.
10  */
11
12 #include <linux/init.h>
13 #include <linux/errno.h>
14 #include <linux/stddef.h>
15 #include <linux/slab.h>
16 #include <linux/module.h>
17 #include <linux/sched.h>
18 #include <linux/string.h>
19 #include <linux/time.h>
20 #include <linux/page-flags.h>
21 #include <linux/vmalloc.h>
22 #include <linux/mm.h>
23 #include <linux/mman.h>
24 #include <linux/delay.h>
25
26 #include <asm/io.h>
27 #include <asm/current.h>
28 #include <asm/uaccess.h>
29 #include <asm/bitops.h>
30 #include <asm/pgtable.h>
31 #include <asm/relay.h>
32 #include <asm/hardirq.h>
33
34 #include "relay_lockless.h"
35 #include "relay_locking.h"
36 #include "resize.h"
37
38 /* Relay channel table, indexed by channel id */
39 static struct rchan *   rchan_table[RELAY_MAX_CHANNELS];
40 static rwlock_t         rchan_table_lock = RW_LOCK_UNLOCKED;
41
42 /* Relay operation structs, one per scheme */
43 static struct relay_ops lockless_ops = {
44         .reserve = lockless_reserve,
45         .commit = lockless_commit,
46         .get_offset = lockless_get_offset,
47         .finalize = lockless_finalize,
48         .reset = lockless_reset,
49         .reset_index = lockless_reset_index
50 };
51
52 static struct relay_ops locking_ops = {
53         .reserve = locking_reserve,
54         .commit = locking_commit,
55         .get_offset = locking_get_offset,
56         .finalize = locking_finalize,
57         .reset = locking_reset,
58         .reset_index = locking_reset_index
59 };
60
61 /*
62  * Low-level relayfs kernel API.  These functions should not normally be 
63  * used by clients.  See high-level kernel API below.
64  */
65
66 /**
67  *      rchan_get - get channel associated with id, incrementing refcount 
68  *      @rchan_id: the channel id
69  *
70  *      Returns channel if successful, NULL otherwise.
71  */
72 struct rchan *
73 rchan_get(int rchan_id)
74 {
75         struct rchan *rchan;
76         
77         if ((rchan_id < 0) || (rchan_id >= RELAY_MAX_CHANNELS))
78                 return NULL;
79         
80         read_lock(&rchan_table_lock);
81         rchan = rchan_table[rchan_id];
82         if (rchan)
83                 atomic_inc(&rchan->refcount);
84         read_unlock(&rchan_table_lock);
85
86         return rchan;
87 }
88
89 /**
90  *      clear_readers - clear non-VFS readers
91  *      @rchan: the channel
92  *
93  *      Clear the channel pointers of all non-VFS readers open on the channel.
94  */
95 static inline void
96 clear_readers(struct rchan *rchan)
97 {
98         struct list_head *p;
99         struct rchan_reader *reader;
100         
101         read_lock(&rchan->open_readers_lock);
102         list_for_each(p, &rchan->open_readers) {
103                 reader = list_entry(p, struct rchan_reader, list);
104                 if (!reader->vfs_reader)
105                         reader->rchan = NULL;
106         }
107         read_unlock(&rchan->open_readers_lock);
108 }
109
110 /**
111  *      rchan_alloc_id - reserve a channel id and store associated channel
112  *      @rchan: the channel
113  *
114  *      Returns channel id if successful, -1 otherwise.
115  */
116 static inline int
117 rchan_alloc_id(struct rchan *rchan)
118 {
119         int i;
120         int rchan_id = -1;
121         
122         if (rchan == NULL)
123                 return -1;
124
125         write_lock(&rchan_table_lock);
126         for (i = 0; i < RELAY_MAX_CHANNELS; i++) {
127                 if (rchan_table[i] == NULL) {
128                         rchan_table[i] = rchan;
129                         rchan_id = rchan->id = i;
130                         break;
131                 }
132         }
133         if (rchan_id != -1)
134                 atomic_inc(&rchan->refcount);
135         write_unlock(&rchan_table_lock);
136         
137         return rchan_id;
138 }
139
140 /**
141  *      rchan_free_id - revoke a channel id and remove associated channel
142  *      @rchan_id: the channel id
143  */
144 static inline void
145 rchan_free_id(int rchan_id)
146 {
147         struct rchan *rchan;
148
149         if ((rchan_id < 0) || (rchan_id >= RELAY_MAX_CHANNELS))
150                 return;
151
152         write_lock(&rchan_table_lock);
153         rchan = rchan_table[rchan_id];
154         rchan_table[rchan_id] = NULL;
155         write_unlock(&rchan_table_lock);
156 }
157
158 /**
159  *      rchan_destroy_buf - destroy the current channel buffer
160  *      @rchan: the channel
161  */
162 static inline void
163 rchan_destroy_buf(struct rchan *rchan)
164 {
165         if (rchan->buf && !rchan->init_buf)
166                 free_rchan_buf(rchan->buf,
167                                rchan->buf_page_array,
168                                rchan->buf_page_count);
169 }
170
171 /**
172  *      relay_release - perform end-of-buffer processing for last buffer
173  *      @rchan: the channel
174  *
175  *      Returns 0 if successful, negative otherwise.
176  *
177  *      Releases the channel buffer, destroys the channel, and removes the
178  *      relay file from the relayfs filesystem.  Should only be called from 
179  *      rchan_put().  If we're here, it means by definition refcount is 0.
180  */
181 static int 
182 relay_release(struct rchan *rchan)
183 {
184         if (rchan == NULL)
185                 return -EBADF;
186
187         rchan_destroy_buf(rchan);
188         rchan_free_id(rchan->id);
189         relayfs_remove_file(rchan->dentry);
190         clear_readers(rchan);
191         kfree(rchan);
192
193         return 0;
194 }
195
196 /**
197  *      rchan_get - decrement channel refcount, releasing it if 0
198  *      @rchan: the channel
199  *
200  *      If the refcount reaches 0, the channel will be destroyed.
201  */
202 void 
203 rchan_put(struct rchan *rchan)
204 {
205         if (atomic_dec_and_test(&rchan->refcount))
206                 relay_release(rchan);
207 }
208
209 /**
210  *      relay_reserve -  reserve a slot in the channel buffer
211  *      @rchan: the channel
212  *      @len: the length of the slot to reserve
213  *      @td: the time delta between buffer start and current write, or TSC
214  *      @err: receives the result flags
215  *      @interrupting: 1 if interrupting previous, used only in locking scheme
216  *
217  *      Returns pointer to the beginning of the reserved slot, NULL if error.
218  *
219  *      The errcode value contains the result flags and is an ORed combination 
220  *      of the following:
221  *
222  *      RELAY_BUFFER_SWITCH_NONE - no buffer switch occurred
223  *      RELAY_EVENT_DISCARD_NONE - event should not be discarded
224  *      RELAY_BUFFER_SWITCH - buffer switch occurred
225  *      RELAY_EVENT_DISCARD - event should be discarded (all buffers are full)
226  *      RELAY_EVENT_TOO_LONG - event won't fit into even an empty buffer
227  *
228  *      buffer_start and buffer_end callbacks are triggered at this point
229  *      if applicable.
230  */
231 char *
232 relay_reserve(struct rchan *rchan,
233               u32 len,
234               struct timeval *ts,
235               u32 *td,
236               int *err,
237               int *interrupting)
238 {
239         if (rchan == NULL)
240                 return NULL;
241         
242         *interrupting = 0;
243
244         return rchan->relay_ops->reserve(rchan, len, ts, td, err, interrupting);
245 }
246
247
248 /**
249  *      wakeup_readers - wake up VFS readers waiting on a channel
250  *      @private: the channel
251  *
252  *      This is the work function used to defer reader waking.  The
253  *      reason waking is deferred is that calling directly from commit
254  *      causes problems if you're writing from say the scheduler.
255  */
256 static void 
257 wakeup_readers(void *private)
258 {
259         struct rchan *rchan = (struct rchan *)private;
260
261         wake_up_interruptible(&rchan->read_wait);
262 }
263
264
265 /**
266  *      relay_commit - commit a reserved slot in the buffer
267  *      @rchan: the channel
268  *      @from: commit the length starting here
269  *      @len: length committed
270  *      @interrupting: 1 if interrupting previous, used only in locking scheme
271  *
272  *      After the write into the reserved buffer has been complted, this
273  *      function must be called in order for the relay to determine whether 
274  *      buffers are complete and to wake up VFS readers.
275  *
276  *      delivery callback is triggered at this point if applicable.
277  */
278 void
279 relay_commit(struct rchan *rchan,
280              char *from,
281              u32 len,
282              int reserve_code,
283              int interrupting)
284 {
285         int deliver;
286
287         if (rchan == NULL)
288                 return;
289         
290         deliver = packet_delivery(rchan) || 
291                    (reserve_code & RELAY_BUFFER_SWITCH);
292
293         rchan->relay_ops->commit(rchan, from, len, deliver, interrupting);
294
295         /* The params are always the same, so no worry about re-queuing */
296         if (deliver &&  waitqueue_active(&rchan->read_wait)) {
297                 PREPARE_WORK(&rchan->wake_readers, wakeup_readers, rchan);
298                 schedule_delayed_work(&rchan->wake_readers, 1);
299         }
300 }
301
302 /**
303  *      relay_get_offset - get current and max channel buffer offsets
304  *      @rchan: the channel
305  *      @max_offset: maximum channel offset
306  *
307  *      Returns the current and maximum channel buffer offsets.
308  */
309 u32
310 relay_get_offset(struct rchan *rchan, u32 *max_offset)
311 {
312         return rchan->relay_ops->get_offset(rchan, max_offset);
313 }
314
315 /**
316  *      reset_index - try once to reset the current channel index
317  *      @rchan: the channel
318  *      @old_index: the index read before reset
319  *
320  *      Attempts to reset the channel index to 0.  It tries once, and
321  *      if it fails, returns negative, 0 otherwise.
322  */
323 int
324 reset_index(struct rchan *rchan, u32 old_index)
325 {
326         return rchan->relay_ops->reset_index(rchan, old_index);
327 }
328
329 /*
330  * close() vm_op implementation for relayfs file mapping.
331  */
332 static void
333 relay_file_mmap_close(struct vm_area_struct *vma)
334 {
335         struct file *filp = vma->vm_file;
336         struct rchan_reader *reader;
337         struct rchan *rchan;
338
339         reader = (struct rchan_reader *)filp->private_data;
340         rchan = reader->rchan;
341
342         atomic_dec(&rchan->mapped);
343
344         rchan->callbacks->fileop_notify(reader->rchan->id, filp,
345                                         RELAY_FILE_UNMAP);
346 }
347
348 /*
349  * vm_ops for relay file mappings.
350  */
351 static struct vm_operations_struct relay_file_mmap_ops = {
352         .close = relay_file_mmap_close
353 };
354
355 /* \begin{Code inspired from BTTV driver} */
356 static inline unsigned long 
357 kvirt_to_pa(unsigned long adr)
358 {
359         unsigned long kva, ret;
360
361         kva = (unsigned long) page_address(vmalloc_to_page((void *) adr));
362         kva |= adr & (PAGE_SIZE - 1);
363         ret = __pa(kva);
364         return ret;
365 }
366
367 static int
368 relay_mmap_region(struct vm_area_struct *vma,
369                   const char *adr,
370                   const char *start_pos,
371                   unsigned long size)
372 {
373         unsigned long start = (unsigned long) adr;
374         unsigned long page, pos;
375
376         pos = (unsigned long) start_pos;
377
378         while (size > 0) {
379                 page = kvirt_to_pa(pos);
380                 if (remap_page_range(vma, start, page, PAGE_SIZE, PAGE_SHARED))
381                         return -EAGAIN;
382                 start += PAGE_SIZE;
383                 pos += PAGE_SIZE;
384                 size -= PAGE_SIZE;
385         }
386
387         return 0;
388 }
389 /* \end{Code inspired from BTTV driver} */
390
391 /**
392  *      relay_mmap_buffer: - mmap buffer to process address space
393  *      @rchan_id: relay channel id
394  *      @vma: vm_area_struct describing memory to be mapped
395  *
396  *      Returns:
397  *      0 if ok
398  *      -EAGAIN, when remap failed
399  *      -EINVAL, invalid requested length
400  *
401  *      Caller should already have grabbed mmap_sem.
402  */
403 int 
404 __relay_mmap_buffer(struct rchan *rchan,
405                     struct vm_area_struct *vma)
406 {
407         int err = 0;
408         unsigned long length = vma->vm_end - vma->vm_start;
409         struct file *filp = vma->vm_file;
410
411         if (rchan == NULL) {
412                 err = -EBADF;
413                 goto exit;
414         }
415
416         if (rchan->init_buf) {
417                 err = -EPERM;
418                 goto exit;
419         }
420         
421         if (length != (unsigned long)rchan->alloc_size) {
422                 err = -EINVAL;
423                 goto exit;
424         }
425
426         err = relay_mmap_region(vma,
427                                 (char *)vma->vm_start,
428                                 rchan->buf,
429                                 rchan->alloc_size);
430
431         if (err == 0) {
432                 vma->vm_ops = &relay_file_mmap_ops;
433                 err = rchan->callbacks->fileop_notify(rchan->id, filp,
434                                                       RELAY_FILE_MAP);
435                 if (err == 0)
436                         atomic_inc(&rchan->mapped);
437         }
438 exit:   
439         return err;
440 }
441
442 /*
443  * High-level relayfs kernel API.  See Documentation/filesystems/relafys.txt.
444  */
445
446 /*
447  * rchan_callback implementations defining default channel behavior.  Used
448  * in place of corresponding NULL values in client callback struct.
449  */
450
451 /*
452  * buffer_end() default callback.  Does nothing.
453  */
454 static int 
455 buffer_end_default_callback(int rchan_id,
456                             char *current_write_pos,
457                             char *end_of_buffer,
458                             struct timeval end_time,
459                             u32 end_tsc,
460                             int using_tsc) 
461 {
462         return 0;
463 }
464
465 /*
466  * buffer_start() default callback.  Does nothing.
467  */
468 static int 
469 buffer_start_default_callback(int rchan_id,
470                               char *current_write_pos,
471                               u32 buffer_id,
472                               struct timeval start_time,
473                               u32 start_tsc,
474                               int using_tsc)
475 {
476         return 0;
477 }
478
479 /*
480  * deliver() default callback.  Does nothing.
481  */
482 static void 
483 deliver_default_callback(int rchan_id, char *from, u32 len)
484 {
485 }
486
487 /*
488  * user_deliver() default callback.  Does nothing.
489  */
490 static void 
491 user_deliver_default_callback(int rchan_id, char *from, u32 len)
492 {
493 }
494
495 /*
496  * needs_resize() default callback.  Does nothing.
497  */
498 static void
499 needs_resize_default_callback(int rchan_id,
500                               int resize_type,
501                               u32 suggested_buf_size,
502                               u32 suggested_n_bufs)
503 {
504 }
505
506 /*
507  * fileop_notify() default callback.  Does nothing.
508  */
509 static int
510 fileop_notify_default_callback(int rchan_id,
511                                struct file *filp,
512                                enum relay_fileop fileop)
513 {
514         return 0;
515 }
516
517 /*
518  * ioctl() default callback.  Does nothing.
519  */
520 static int
521 ioctl_default_callback(int rchan_id,
522                        unsigned int cmd,
523                        unsigned long arg)
524 {
525         return 0;
526 }
527
528 /* relay channel default callbacks */
529 static struct rchan_callbacks default_channel_callbacks = {
530         .buffer_start = buffer_start_default_callback,
531         .buffer_end = buffer_end_default_callback,
532         .deliver = deliver_default_callback,
533         .user_deliver = user_deliver_default_callback,
534         .needs_resize = needs_resize_default_callback,
535         .fileop_notify = fileop_notify_default_callback,
536         .ioctl = ioctl_default_callback,
537 };
538
539 /**
540  *      check_attribute_flags - check sanity of channel attributes
541  *      @flags: channel attributes
542  *      @resizeable: 1 if true
543  *
544  *      Returns 0 if successful, negative otherwise.
545  */
546 static int
547 check_attribute_flags(u32 *attribute_flags, int resizeable)
548 {
549         u32 flags = *attribute_flags;
550         
551         if (!(flags & RELAY_DELIVERY_BULK) && !(flags & RELAY_DELIVERY_PACKET))
552                 return -EINVAL; /* Delivery mode must be specified */
553         
554         if (!(flags & RELAY_USAGE_SMP) && !(flags & RELAY_USAGE_GLOBAL))
555                 return -EINVAL; /* Usage must be specified */
556         
557         if (resizeable) {  /* Resizeable can never be continuous */
558                 *attribute_flags &= ~RELAY_MODE_CONTINUOUS;
559                 *attribute_flags |= RELAY_MODE_NO_OVERWRITE;
560         }
561         
562         if ((flags & RELAY_MODE_CONTINUOUS) &&
563             (flags & RELAY_MODE_NO_OVERWRITE))
564                 return -EINVAL; /* Can't have it both ways */
565         
566         if (!(flags & RELAY_MODE_CONTINUOUS) &&
567             !(flags & RELAY_MODE_NO_OVERWRITE))
568                 *attribute_flags |= RELAY_MODE_CONTINUOUS; /* Default to continuous */
569         
570         if (!(flags & RELAY_SCHEME_ANY))
571                 return -EINVAL; /* One or both must be specified */
572         else if (flags & RELAY_SCHEME_LOCKLESS) {
573                 if (have_cmpxchg())
574                         *attribute_flags &= ~RELAY_SCHEME_LOCKING;
575                 else if (flags & RELAY_SCHEME_LOCKING)
576                         *attribute_flags &= ~RELAY_SCHEME_LOCKLESS;
577                 else
578                         return -EINVAL; /* Locking scheme not an alternative */
579         }
580         
581         if (!(flags & RELAY_TIMESTAMP_ANY))
582                 return -EINVAL; /* One or both must be specified */
583         else if (flags & RELAY_TIMESTAMP_TSC) {
584                 if (have_tsc())
585                         *attribute_flags &= ~RELAY_TIMESTAMP_GETTIMEOFDAY;
586                 else if (flags & RELAY_TIMESTAMP_GETTIMEOFDAY)
587                         *attribute_flags &= ~RELAY_TIMESTAMP_TSC;
588                 else
589                         return -EINVAL; /* gettimeofday not an alternative */
590         }
591
592         return 0;
593 }
594
595 /*
596  * High-level API functions.
597  */
598
599 /**
600  *      __relay_reset - internal reset function
601  *      @rchan: the channel
602  *      @init: 1 if this is a first-time channel initialization
603  *
604  *      See relay_reset for description of effect.
605  */
606 void
607 __relay_reset(struct rchan *rchan, int init)
608 {
609         int i;
610         
611         if (init) {
612                 rchan->version = RELAYFS_CHANNEL_VERSION;
613                 init_MUTEX(&rchan->resize_sem);
614                 init_waitqueue_head(&rchan->read_wait);
615                 init_waitqueue_head(&rchan->write_wait);
616                 atomic_set(&rchan->refcount, 0);
617                 INIT_LIST_HEAD(&rchan->open_readers);
618                 rchan->open_readers_lock = RW_LOCK_UNLOCKED;
619         }
620         
621         rchan->buf_id = rchan->buf_idx = 0;
622         atomic_set(&rchan->suspended, 0);
623         atomic_set(&rchan->mapped, 0);
624         rchan->half_switch = 0;
625         rchan->bufs_produced = 0;
626         rchan->bufs_consumed = 0;
627         rchan->bytes_consumed = 0;
628         rchan->initialized = 0;
629         rchan->finalized = 0;
630         rchan->resize_min = rchan->resize_max = 0;
631         rchan->resizing = 0;
632         rchan->replace_buffer = 0;
633         rchan->resize_buf = NULL;
634         rchan->resize_buf_size = 0;
635         rchan->resize_alloc_size = 0;
636         rchan->resize_n_bufs = 0;
637         rchan->resize_err = 0;
638         rchan->resize_failures = 0;
639         rchan->resize_order = 0;
640
641         rchan->expand_page_array = NULL;
642         rchan->expand_page_count = 0;
643         rchan->shrink_page_array = NULL;
644         rchan->shrink_page_count = 0;
645         rchan->resize_page_array = NULL;
646         rchan->resize_page_count = 0;
647         rchan->old_buf_page_array = NULL;
648         rchan->expand_buf_id = 0;
649
650         INIT_WORK(&rchan->wake_readers, NULL, NULL);
651         INIT_WORK(&rchan->wake_writers, NULL, NULL);
652
653         for (i = 0; i < RELAY_MAX_BUFS; i++)
654                 rchan->unused_bytes[i] = 0;
655         
656         rchan->relay_ops->reset(rchan, init);
657 }
658
659 /**
660  *      relay_reset - reset the channel
661  *      @rchan: the channel
662  *
663  *      Returns 0 if successful, negative if not.
664  *
665  *      This has the effect of erasing all data from the buffer and
666  *      restarting the channel in its initial state.  The buffer itself
667  *      is not freed, so any mappings are still in effect.
668  *
669  *      NOTE: Care should be taken that the channnel isn't actually
670  *      being used by anything when this call is made.
671  */
672 int
673 relay_reset(int rchan_id)
674 {
675         struct rchan *rchan;
676
677         rchan = rchan_get(rchan_id);
678         if (rchan == NULL)
679                 return -EBADF;
680
681         __relay_reset(rchan, 0);
682         update_readers_consumed(rchan, 0, 0);
683
684         rchan_put(rchan);
685
686         return 0;
687 }
688
689 /**
690  *      check_init_buf - check the sanity of init_buf, if present
691  *      @init_buf: the initbuf
692  *      @init_buf_size: the total initbuf size
693  *      @bufsize: the channel's sub-buffer size
694  *      @nbufs: the number of sub-buffers in the channel
695  *
696  *      Returns 0 if ok, negative otherwise.
697  */
698 static int
699 check_init_buf(char *init_buf, u32 init_buf_size, u32 bufsize, u32 nbufs)
700 {
701         int err = 0;
702         
703         if (init_buf && nbufs == 1) /* 1 sub-buffer makes no sense */
704                 err = -EINVAL;
705
706         if (init_buf && (bufsize * nbufs != init_buf_size))
707                 err = -EINVAL;
708
709         return err;
710 }
711
712 /**
713  *      rchan_create_buf - allocate the initial channel buffer
714  *      @rchan: the channel
715  *      @size_alloc: the total size of the channel buffer
716  *
717  *      Returns 0 if successful, negative otherwise.
718  */
719 static inline int
720 rchan_create_buf(struct rchan *rchan, int size_alloc)
721 {
722         struct page **page_array;
723         int page_count;
724
725         if ((rchan->buf = (char *)alloc_rchan_buf(size_alloc, &page_array, &page_count)) == NULL) {
726                 rchan->buf_page_array = NULL;
727                 rchan->buf_page_count = 0;
728                 return -ENOMEM;
729         }
730
731         rchan->buf_page_array = page_array;
732         rchan->buf_page_count = page_count;
733
734         return 0;
735 }
736
737 /**
738  *      rchan_create - allocate and initialize a channel, including buffer
739  *      @chanpath: path specifying the relayfs channel file to create
740  *      @bufsize: the size of the sub-buffers within the channel buffer
741  *      @nbufs: the number of sub-buffers within the channel buffer
742  *      @rchan_flags: flags specifying buffer attributes
743  *      @err: err code
744  *
745  *      Returns channel if successful, NULL otherwise, err receives errcode.
746  *
747  *      Allocates a struct rchan representing a relay channel, according
748  *      to the attributes passed in via rchan_flags.  Does some basic sanity
749  *      checking but doesn't try to do anything smart.  In particular, the
750  *      number of buffers must be a power of 2, and if the lockless scheme
751  *      is being used, the sub-buffer size must also be a power of 2.  The
752  *      locking scheme can use buffers of any size.
753  */
754 static struct rchan *
755 rchan_create(const char *chanpath, 
756              int bufsize, 
757              int nbufs, 
758              u32 rchan_flags,
759              char *init_buf,
760              u32 init_buf_size,
761              int *err)
762 {
763         int size_alloc;
764         struct rchan *rchan = NULL;
765
766         *err = 0;
767
768         rchan = (struct rchan *)kmalloc(sizeof(struct rchan), GFP_KERNEL);
769         if (rchan == NULL) {
770                 *err = -ENOMEM;
771                 return NULL;
772         }
773         rchan->buf = rchan->init_buf = NULL;
774
775         *err = check_init_buf(init_buf, init_buf_size, bufsize, nbufs);
776         if (*err)
777                 goto exit;
778         
779         if (nbufs == 1 && bufsize) {
780                 rchan->n_bufs = nbufs;
781                 rchan->buf_size = bufsize;
782                 size_alloc = bufsize;
783                 goto alloc;
784         }
785         
786         if (bufsize <= 0 ||
787             (rchan_flags & RELAY_SCHEME_LOCKLESS && hweight32(bufsize) != 1) ||
788             hweight32(nbufs) != 1 ||
789             nbufs < RELAY_MIN_BUFS ||
790             nbufs > RELAY_MAX_BUFS) {
791                 *err = -EINVAL;
792                 goto exit;
793         }
794
795         size_alloc = FIX_SIZE(bufsize * nbufs);
796         if (size_alloc > RELAY_MAX_BUF_SIZE) {
797                 *err = -EINVAL;
798                 goto exit;
799         }
800         rchan->n_bufs = nbufs;
801         rchan->buf_size = bufsize;
802
803         if (rchan_flags & RELAY_SCHEME_LOCKLESS) {
804                 offset_bits(rchan) = ffs(bufsize) - 1;
805                 offset_mask(rchan) =  RELAY_BUF_OFFSET_MASK(offset_bits(rchan));
806                 bufno_bits(rchan) = ffs(nbufs) - 1;
807         }
808 alloc:
809         if (rchan_alloc_id(rchan) == -1) {
810                 *err = -ENOMEM;
811                 goto exit;
812         }
813
814         if (init_buf == NULL) {
815                 *err = rchan_create_buf(rchan, size_alloc);
816                 if (*err) {
817                         rchan_free_id(rchan->id);
818                         goto exit;
819                 }
820         } else
821                 rchan->buf = rchan->init_buf = init_buf;
822         
823         rchan->alloc_size = size_alloc;
824
825         if (rchan_flags & RELAY_SCHEME_LOCKLESS)
826                 rchan->relay_ops = &lockless_ops;
827         else
828                 rchan->relay_ops = &locking_ops;
829
830 exit:
831         if (*err) {
832                 kfree(rchan);
833                 rchan = NULL;
834         }
835
836         return rchan;
837 }
838
839
840 static char tmpname[NAME_MAX];
841
842 /**
843  *      rchan_create_dir - create directory for file
844  *      @chanpath: path to file, including filename
845  *      @residual: filename remaining after parse
846  *      @topdir: the directory filename should be created in
847  *
848  *      Returns 0 if successful, negative otherwise.
849  *
850  *      Inspired by xlate_proc_name() in procfs.  Given a file path which
851  *      includes the filename, creates any and all directories necessary 
852  *      to create the file.
853  */
854 static int 
855 rchan_create_dir(const char * chanpath, 
856                  const char **residual, 
857                  struct dentry **topdir)
858 {
859         const char *cp = chanpath, *next;
860         struct dentry *parent = NULL;
861         int len, err = 0;
862         
863         while (1) {
864                 next = strchr(cp, '/');
865                 if (!next)
866                         break;
867
868                 len = next - cp;
869
870                 strncpy(tmpname, cp, len);
871                 tmpname[len] = '\0';
872                 err = relayfs_create_dir(tmpname, parent, &parent);
873                 if (err && (err != -EEXIST))
874                         return err;
875                 cp += len + 1;
876         }
877
878         *residual = cp;
879         *topdir = parent;
880
881         return err;
882 }
883
884 /**
885  *      rchan_create_file - create file, including parent directories
886  *      @chanpath: path to file, including filename
887  *      @dentry: result dentry
888  *      @data: data to associate with the file
889  *
890  *      Returns 0 if successful, negative otherwise.
891  */
892 static int 
893 rchan_create_file(const char * chanpath, 
894                   struct dentry **dentry, 
895                   struct rchan * data,
896                   int mode)
897 {
898         int err;
899         const char * fname;
900         struct dentry *topdir;
901
902         err = rchan_create_dir(chanpath, &fname, &topdir);
903         if (err && (err != -EEXIST))
904                 return err;
905
906         err = relayfs_create_file(fname, topdir, dentry, (void *)data, mode);
907
908         return err;
909 }
910
911 /**
912  *      relay_open - create a new file/channel buffer in relayfs
913  *      @chanpath: name of file to create, including path
914  *      @bufsize: size of sub-buffers
915  *      @nbufs: number of sub-buffers
916  *      @flags: channel attributes
917  *      @callbacks: client callback functions
918  *      @start_reserve: number of bytes to reserve at start of each sub-buffer
919  *      @end_reserve: number of bytes to reserve at end of each sub-buffer
920  *      @rchan_start_reserve: additional reserve at start of first sub-buffer
921  *      @resize_min: minimum total buffer size, if set
922  *      @resize_max: maximum total buffer size, if set
923  *      @mode: the perms to be given to the relayfs file, 0 to accept defaults
924  *      @init_buf: initial memory buffer to start out with, NULL if N/A
925  *      @init_buf_size: initial memory buffer size to start out with, 0 if N/A
926  *
927  *      Returns channel id if successful, negative otherwise.
928  *
929  *      Creates a relay channel using the sizes and attributes specified.
930  *      The default permissions, used if mode == 0 are S_IRUSR | S_IWUSR.  See
931  *      Documentation/filesystems/relayfs.txt for details.
932  */
933 int
934 relay_open(const char *chanpath,
935            int bufsize,
936            int nbufs,
937            u32 flags,
938            struct rchan_callbacks *channel_callbacks,
939            u32 start_reserve,
940            u32 end_reserve,
941            u32 rchan_start_reserve,
942            u32 resize_min,
943            u32 resize_max,
944            int mode,
945            char *init_buf,
946            u32 init_buf_size)
947 {
948         int err;
949         struct rchan *rchan;
950         struct dentry *dentry;
951         struct rchan_callbacks *callbacks = NULL;
952
953         if (chanpath == NULL)
954                 return -EINVAL;
955
956         if (nbufs != 1) {
957                 err = check_attribute_flags(&flags, resize_min ? 1 : 0);
958                 if (err)
959                         return err;
960         }
961
962         rchan = rchan_create(chanpath, bufsize, nbufs, flags, init_buf, init_buf_size, &err);
963
964         if (err < 0)
965                 return err;
966
967         /* Create file in fs */
968         if ((err = rchan_create_file(chanpath, &dentry, rchan, mode)) < 0) {
969                 rchan_destroy_buf(rchan);
970                 rchan_free_id(rchan->id);
971                 kfree(rchan);
972                 return err;
973         }
974
975         rchan->dentry = dentry;
976
977         if (channel_callbacks == NULL)
978                 callbacks = &default_channel_callbacks;
979         else
980                 callbacks = channel_callbacks;
981
982         if (callbacks->buffer_end == NULL)
983                 callbacks->buffer_end = buffer_end_default_callback;
984         if (callbacks->buffer_start == NULL)
985                 callbacks->buffer_start = buffer_start_default_callback;
986         if (callbacks->deliver == NULL)
987                 callbacks->deliver = deliver_default_callback;
988         if (callbacks->user_deliver == NULL)
989                 callbacks->user_deliver = user_deliver_default_callback;
990         if (callbacks->needs_resize == NULL)
991                 callbacks->needs_resize = needs_resize_default_callback;
992         if (callbacks->fileop_notify == NULL)
993                 callbacks->fileop_notify = fileop_notify_default_callback;
994         if (callbacks->ioctl == NULL)
995                 callbacks->ioctl = ioctl_default_callback;
996         rchan->callbacks = callbacks;
997
998         /* Just to let the client know the sizes used */
999         rchan->callbacks->needs_resize(rchan->id,
1000                                        RELAY_RESIZE_REPLACED,
1001                                        rchan->buf_size,
1002                                        rchan->n_bufs);
1003
1004         rchan->flags = flags;
1005         rchan->start_reserve = start_reserve;
1006         rchan->end_reserve = end_reserve;
1007         rchan->rchan_start_reserve = rchan_start_reserve;
1008
1009         __relay_reset(rchan, 1);
1010
1011         if (resize_min > 0 && resize_max > 0 && 
1012            resize_max < RELAY_MAX_TOTAL_BUF_SIZE) {
1013                 rchan->resize_min = resize_min;
1014                 rchan->resize_max = resize_max;
1015                 init_shrink_timer(rchan);
1016         }
1017
1018         rchan_get(rchan->id);
1019
1020         return rchan->id;
1021 }
1022
1023 /**
1024  *      relay_discard_init_buf - alloc channel buffer and copy init_buf into it
1025  *      @rchan_id: the channel id
1026  *
1027  *      Returns 0 if successful, negative otherwise.
1028  *
1029  *      NOTE: May sleep.  Should also be called only when the channel isn't
1030  *      actively being written into.
1031  */
1032 int
1033 relay_discard_init_buf(int rchan_id)
1034 {
1035         struct rchan *rchan;
1036         int err = 0;
1037         
1038         rchan = rchan_get(rchan_id);
1039         if (rchan == NULL)
1040                 return -EBADF;
1041
1042         if (rchan->init_buf == NULL) {
1043                 err = -EINVAL;
1044                 goto out;
1045         }
1046         
1047         err = rchan_create_buf(rchan, rchan->alloc_size);
1048         if (err)
1049                 goto out;
1050         
1051         memcpy(rchan->buf, rchan->init_buf, rchan->n_bufs * rchan->buf_size);
1052         rchan->init_buf = NULL;
1053 out:
1054         rchan_put(rchan);
1055         
1056         return err;
1057 }
1058
1059 /**
1060  *      relay_finalize - perform end-of-buffer processing for last buffer
1061  *      @rchan_id: the channel id
1062  *      @releasing: true if called when releasing file
1063  *
1064  *      Returns 0 if successful, negative otherwise.
1065  */
1066 static int 
1067 relay_finalize(int rchan_id)
1068 {
1069         struct rchan *rchan = rchan_get(rchan_id);
1070         if (rchan == NULL)
1071                 return -EBADF;
1072
1073         if (rchan->finalized == 0) {
1074                 rchan->relay_ops->finalize(rchan);
1075                 rchan->finalized = 1;
1076         }
1077
1078         if (waitqueue_active(&rchan->read_wait)) {
1079                 PREPARE_WORK(&rchan->wake_readers, wakeup_readers, rchan);
1080                 schedule_delayed_work(&rchan->wake_readers, 1);
1081         }
1082
1083         rchan_put(rchan);
1084
1085         return 0;
1086 }
1087
1088 /**
1089  *      restore_callbacks - restore default channel callbacks
1090  *      @rchan: the channel
1091  *
1092  *      Restore callbacks to the default versions.
1093  */
1094 static inline void
1095 restore_callbacks(struct rchan *rchan)
1096 {
1097         if (rchan->callbacks != &default_channel_callbacks)
1098                 rchan->callbacks = &default_channel_callbacks;
1099 }
1100
1101 /**
1102  *      relay_close - close the channel
1103  *      @rchan_id: relay channel id
1104  *      
1105  *      Finalizes the last sub-buffer and marks the channel as finalized.
1106  *      The channel buffer and channel data structure are then freed
1107  *      automatically when the last reference to the channel is given up.
1108  */
1109 int 
1110 relay_close(int rchan_id)
1111 {
1112         int err;
1113         struct rchan *rchan;
1114
1115         if ((rchan_id < 0) || (rchan_id >= RELAY_MAX_CHANNELS))
1116                 return -EBADF;
1117
1118         err = relay_finalize(rchan_id);
1119
1120         if (!err) {
1121                 read_lock(&rchan_table_lock);
1122                 rchan = rchan_table[rchan_id];
1123                 read_unlock(&rchan_table_lock);
1124
1125                 if (rchan) {
1126                         restore_callbacks(rchan);
1127                         if (rchan->resize_min)
1128                                 del_timer(&rchan->shrink_timer);
1129                         rchan_put(rchan);
1130                 }
1131         }
1132         
1133         return err;
1134 }
1135
1136 /**
1137  *      relay_write - reserve a slot in the channel and write data into it
1138  *      @rchan_id: relay channel id
1139  *      @data_ptr: data to be written into reserved slot
1140  *      @count: number of bytes to write
1141  *      @td_offset: optional offset where time delta should be written
1142  *      @wrote_pos: optional ptr returning buf pos written to, ignored if NULL 
1143  *
1144  *      Returns the number of bytes written, 0 or negative on failure.
1145  *
1146  *      Reserves space in the channel and writes count bytes of data_ptr
1147  *      to it.  Automatically performs any necessary locking, depending
1148  *      on the scheme and SMP usage in effect (no locking is done for the
1149  *      lockless scheme regardless of usage). 
1150  *
1151  *      If td_offset is >= 0, the internal time delta calculated when
1152  *      slot was reserved will be written at that offset.
1153  *
1154  *      If wrote_pos is non-NULL, it will receive the location the data
1155  *      was written to, which may be needed for some applications but is not
1156  *      normally interesting.
1157  */
1158 int
1159 relay_write(int rchan_id, 
1160             const void *data_ptr, 
1161             size_t count,
1162             int td_offset,
1163             void **wrote_pos)
1164 {
1165         unsigned long flags;
1166         char *reserved, *write_pos;
1167         int bytes_written = 0;
1168         int reserve_code, interrupting;
1169         struct timeval ts;
1170         u32 td;
1171         struct rchan *rchan;
1172         
1173         rchan = rchan_get(rchan_id);
1174         if (rchan == NULL)
1175                 return -EBADF;
1176
1177         relay_lock_channel(rchan, flags); /* nop for lockless */
1178
1179         write_pos = reserved = relay_reserve(rchan, count, &ts, &td, 
1180                                              &reserve_code, &interrupting);
1181
1182         if (reserved != NULL) {
1183                 relay_write_direct(write_pos, data_ptr, count);
1184                 if ((td_offset >= 0) && (td_offset < count - sizeof(td)))
1185                         *((u32 *)(reserved + td_offset)) = td;
1186                 bytes_written = count;
1187         } else if (reserve_code == RELAY_WRITE_TOO_LONG)
1188                 bytes_written = -EINVAL;
1189
1190         if (bytes_written > 0)
1191                 relay_commit(rchan, reserved, bytes_written, reserve_code, interrupting);
1192
1193         relay_unlock_channel(rchan, flags); /* nop for lockless */
1194
1195         rchan_put(rchan);
1196
1197         if (wrote_pos)
1198                 *wrote_pos = reserved;
1199         
1200         return bytes_written;
1201 }
1202
1203 /**
1204  *      wakeup_writers - wake up VFS writers waiting on a channel
1205  *      @private: the channel
1206  *
1207  *      This is the work function used to defer writer waking.  The
1208  *      reason waking is deferred is that calling directly from 
1209  *      buffers_consumed causes problems if you're writing from say 
1210  *      the scheduler.
1211  */
1212 static void 
1213 wakeup_writers(void *private)
1214 {
1215         struct rchan *rchan = (struct rchan *)private;
1216         
1217         wake_up_interruptible(&rchan->write_wait);
1218 }
1219
1220
1221 /**
1222  *      __relay_buffers_consumed - internal version of relay_buffers_consumed
1223  *      @rchan: the relay channel
1224  *      @bufs_consumed: number of buffers to add to current count for channel
1225  *      
1226  *      Internal - updates the channel's consumed buffer count.
1227  */
1228 static void
1229 __relay_buffers_consumed(struct rchan *rchan, u32 bufs_consumed)
1230 {
1231         rchan->bufs_consumed += bufs_consumed;
1232         
1233         if (rchan->bufs_consumed > rchan->bufs_produced)
1234                 rchan->bufs_consumed = rchan->bufs_produced;
1235         
1236         atomic_set(&rchan->suspended, 0);
1237
1238         PREPARE_WORK(&rchan->wake_writers, wakeup_writers, rchan);
1239         schedule_delayed_work(&rchan->wake_writers, 1);
1240 }
1241
1242 /**
1243  *      __reader_buffers_consumed - update reader/channel consumed buffer count
1244  *      @reader: channel reader
1245  *      @bufs_consumed: number of buffers to add to current count for channel
1246  *      
1247  *      Internal - updates the reader's consumed buffer count.  If the reader's
1248  *      resulting total is greater than the channel's, update the channel's.
1249 */
1250 static void
1251 __reader_buffers_consumed(struct rchan_reader *reader, u32 bufs_consumed)
1252 {
1253         reader->bufs_consumed += bufs_consumed;
1254         
1255         if (reader->bufs_consumed > reader->rchan->bufs_consumed)
1256                 __relay_buffers_consumed(reader->rchan, bufs_consumed);
1257 }
1258
1259 /**
1260  *      relay_buffers_consumed - add to the # buffers consumed for the channel
1261  *      @reader: channel reader
1262  *      @bufs_consumed: number of buffers to add to current count for channel
1263  *      
1264  *      Adds to the channel's consumed buffer count.  buffers_consumed should
1265  *      be the number of buffers newly consumed, not the total number consumed.
1266  *
1267  *      NOTE: kernel clients don't need to call this function if the reader
1268  *      is auto-consuming or the channel is MODE_CONTINUOUS.
1269  */
1270 void 
1271 relay_buffers_consumed(struct rchan_reader *reader, u32 bufs_consumed)
1272 {
1273         if (reader && reader->rchan)
1274                 __reader_buffers_consumed(reader, bufs_consumed);
1275 }
1276
1277 /**
1278  *      __relay_bytes_consumed - internal version of relay_bytes_consumed 
1279  *      @rchan: the relay channel
1280  *      @bytes_consumed: number of bytes to add to current count for channel
1281  *      @read_offset: where the bytes were consumed from
1282  *      
1283  *      Internal - updates the channel's consumed count.
1284 */
1285 static void
1286 __relay_bytes_consumed(struct rchan *rchan, u32 bytes_consumed, u32 read_offset)
1287 {
1288         u32 consuming_idx;
1289         u32 unused;
1290
1291         consuming_idx = read_offset / rchan->buf_size;
1292
1293         if (consuming_idx >= rchan->n_bufs)
1294                 consuming_idx = rchan->n_bufs - 1;
1295         rchan->bytes_consumed += bytes_consumed;
1296
1297         unused = rchan->unused_bytes[consuming_idx];
1298         
1299         if (rchan->bytes_consumed + unused >= rchan->buf_size) {
1300                 __relay_buffers_consumed(rchan, 1);
1301                 rchan->bytes_consumed = 0;
1302         }
1303 }
1304
1305 /**
1306  *      __reader_bytes_consumed - update reader/channel consumed count
1307  *      @reader: channel reader
1308  *      @bytes_consumed: number of bytes to add to current count for channel
1309  *      @read_offset: where the bytes were consumed from
1310  *      
1311  *      Internal - updates the reader's consumed count.  If the reader's
1312  *      resulting total is greater than the channel's, update the channel's.
1313 */
1314 static void
1315 __reader_bytes_consumed(struct rchan_reader *reader, u32 bytes_consumed, u32 read_offset)
1316 {
1317         u32 consuming_idx;
1318         u32 unused;
1319
1320         consuming_idx = read_offset / reader->rchan->buf_size;
1321
1322         if (consuming_idx >= reader->rchan->n_bufs)
1323                 consuming_idx = reader->rchan->n_bufs - 1;
1324
1325         reader->bytes_consumed += bytes_consumed;
1326         
1327         unused = reader->rchan->unused_bytes[consuming_idx];
1328         
1329         if (reader->bytes_consumed + unused >= reader->rchan->buf_size) {
1330                 reader->bufs_consumed++;
1331                 reader->bytes_consumed = 0;
1332         }
1333
1334         if ((reader->bufs_consumed > reader->rchan->bufs_consumed) ||
1335             ((reader->bufs_consumed == reader->rchan->bufs_consumed) &&
1336              (reader->bytes_consumed > reader->rchan->bytes_consumed)))
1337                 __relay_bytes_consumed(reader->rchan, bytes_consumed, read_offset);
1338 }
1339
1340 /**
1341  *      relay_bytes_consumed - add to the # bytes consumed for the channel
1342  *      @reader: channel reader
1343  *      @bytes_consumed: number of bytes to add to current count for channel
1344  *      @read_offset: where the bytes were consumed from
1345  *      
1346  *      Adds to the channel's consumed count.  bytes_consumed should be the
1347  *      number of bytes actually read e.g. return value of relay_read() and
1348  *      the read_offset should be the actual offset the bytes were read from
1349  *      e.g. the actual_read_offset set by relay_read(). See
1350  *      Documentation/filesystems/relayfs.txt for more details.
1351  *
1352  *      NOTE: kernel clients don't need to call this function if the reader
1353  *      is auto-consuming or the channel is MODE_CONTINUOUS.
1354  */
1355 void
1356 relay_bytes_consumed(struct rchan_reader *reader, u32 bytes_consumed, u32 read_offset)
1357 {
1358         if (reader && reader->rchan)
1359                 __reader_bytes_consumed(reader, bytes_consumed, read_offset);
1360 }
1361
1362 /**
1363  *      update_readers_consumed - apply offset change to reader
1364  *      @rchan: the channel
1365  *
1366  *      Apply the consumed counts to all readers open on the channel.
1367  */
1368 void
1369 update_readers_consumed(struct rchan *rchan, u32 bufs_consumed, u32 bytes_consumed)
1370 {
1371         struct list_head *p;
1372         struct rchan_reader *reader;
1373         
1374         read_lock(&rchan->open_readers_lock);
1375         list_for_each(p, &rchan->open_readers) {
1376                 reader = list_entry(p, struct rchan_reader, list);
1377                 reader->bufs_consumed = bufs_consumed;
1378                 reader->bytes_consumed = bytes_consumed;
1379                 if (reader->vfs_reader) 
1380                         reader->pos.file->f_pos = 0;
1381                 else
1382                         reader->pos.f_pos = 0;
1383                 reader->offset_changed = 1;
1384         }
1385         read_unlock(&rchan->open_readers_lock);
1386 }
1387
1388 /**
1389  *      do_read - utility function to do the actual read to user
1390  *      @rchan: the channel
1391  *      @buf: user buf to read into, NULL if just getting info
1392  *      @count: bytes requested
1393  *      @read_offset: offset into channel
1394  *      @new_offset: new offset into channel after read
1395  *      @actual_read_offset: read offset actually used
1396  *
1397  *      Returns the number of bytes read, 0 if none.
1398  */
1399 static ssize_t
1400 do_read(struct rchan *rchan, char *buf, size_t count, u32 read_offset, u32 *new_offset, u32 *actual_read_offset)
1401 {
1402         u32 read_bufno, cur_bufno;
1403         u32 avail_offset, cur_idx, max_offset, buf_end_offset;
1404         u32 avail_count, buf_size;
1405         int unused_bytes = 0;
1406         size_t read_count = 0;
1407         u32 last_buf_byte_offset;
1408
1409         *actual_read_offset = read_offset;
1410         
1411         buf_size = rchan->buf_size;
1412         if (unlikely(!buf_size)) BUG();
1413
1414         read_bufno = read_offset / buf_size;
1415         if (unlikely(read_bufno >= RELAY_MAX_BUFS)) BUG();
1416         unused_bytes = rchan->unused_bytes[read_bufno];
1417
1418         avail_offset = cur_idx = relay_get_offset(rchan, &max_offset);
1419
1420         if (cur_idx == read_offset) {
1421                 if (atomic_read(&rchan->suspended) == 1) {
1422                         read_offset += 1;
1423                         if (read_offset >= max_offset)
1424                                 read_offset = 0;
1425                         *actual_read_offset = read_offset;
1426                 } else {
1427                         *new_offset = read_offset;
1428                         return 0;
1429                 }
1430         } else {
1431                 last_buf_byte_offset = (read_bufno + 1) * buf_size - 1;
1432                 if (read_offset == last_buf_byte_offset) {
1433                         if (unused_bytes != 1) {
1434                                 read_offset += 1;
1435                                 if (read_offset >= max_offset)
1436                                         read_offset = 0;
1437                                 *actual_read_offset = read_offset;
1438                         }
1439                 }
1440         }
1441
1442         read_bufno = read_offset / buf_size;
1443         if (unlikely(read_bufno >= RELAY_MAX_BUFS)) BUG();
1444         unused_bytes = rchan->unused_bytes[read_bufno];
1445
1446         cur_bufno = cur_idx / buf_size;
1447
1448         buf_end_offset = (read_bufno + 1) * buf_size - unused_bytes;
1449         if (avail_offset > buf_end_offset)
1450                 avail_offset = buf_end_offset;
1451         else if (avail_offset < read_offset)
1452                 avail_offset = buf_end_offset;
1453         avail_count = avail_offset - read_offset;
1454         read_count = avail_count >= count ? count : avail_count;
1455
1456         if (read_count && buf != NULL)
1457                 if (copy_to_user(buf, rchan->buf + read_offset, read_count))
1458                         return -EFAULT;
1459
1460         if (read_bufno == cur_bufno)
1461                 if (read_count && (read_offset + read_count >= buf_end_offset) && (read_offset + read_count <= cur_idx)) {
1462                         *new_offset = cur_idx;
1463                         return read_count;
1464                 }
1465
1466         if (read_offset + read_count + unused_bytes > max_offset)
1467                 *new_offset = 0;
1468         else if (read_offset + read_count >= buf_end_offset)
1469                 *new_offset = read_offset + read_count + unused_bytes;
1470         else
1471                 *new_offset = read_offset + read_count;
1472
1473         return read_count;
1474 }
1475
1476 /**
1477  *      __relay_read - read bytes from channel, relative to current reader pos
1478  *      @reader: channel reader
1479  *      @buf: user buf to read into, NULL if just getting info
1480  *      @count: bytes requested
1481  *      @read_offset: offset into channel
1482  *      @new_offset: new offset into channel after read
1483  *      @actual_read_offset: read offset actually used
1484  *      @wait: if non-zero, wait for something to read
1485  *
1486  *      Internal - see relay_read() for details.
1487  *
1488  *      Returns the number of bytes read, 0 if none, negative on failure.
1489  */
1490 static ssize_t
1491 __relay_read(struct rchan_reader *reader, char *buf, size_t count, u32 read_offset, u32 *new_offset, u32 *actual_read_offset, int wait)
1492 {
1493         int err = 0;
1494         size_t read_count = 0;
1495         struct rchan *rchan = reader->rchan;
1496
1497         if (!wait && !rchan->initialized)
1498                 return -EAGAIN;
1499
1500         if (using_lockless(rchan))
1501                 read_offset &= idx_mask(rchan);
1502
1503         if (read_offset >= rchan->n_bufs * rchan->buf_size) {
1504                 *new_offset = 0;
1505                 if (!wait)
1506                         return -EAGAIN;
1507                 else
1508                         return -EINTR;
1509         }
1510         
1511         if (buf != NULL && wait) {
1512                 err = wait_event_interruptible(rchan->read_wait,
1513                        ((rchan->finalized == 1) ||
1514                         (atomic_read(&rchan->suspended) == 1) ||
1515                         (relay_get_offset(rchan, NULL) != read_offset)));
1516
1517                 if (rchan->finalized)
1518                         return 0;
1519
1520                 if (reader->offset_changed) {
1521                         reader->offset_changed = 0;
1522                         return -EINTR;
1523                 }
1524                 
1525                 if (err)
1526                         return err;
1527         }
1528
1529         read_count = do_read(rchan, buf, count, read_offset, new_offset, actual_read_offset);
1530
1531         if (read_count < 0)
1532                 err = read_count;
1533         
1534         if (err)
1535                 return err;
1536         else
1537                 return read_count;
1538 }
1539
1540 /**
1541  *      relay_read - read bytes from channel, relative to current reader pos
1542  *      @reader: channel reader
1543  *      @buf: user buf to read into, NULL if just getting info
1544  *      @count: bytes requested
1545  *      @wait: if non-zero, wait for something to read
1546  *      @actual_read_offset: set read offset actually used, must not be NULL
1547  *
1548  *      Reads count bytes from the channel, or as much as is available within
1549  *      the sub-buffer currently being read.  The read offset that will be
1550  *      read from is the position contained within the reader object.  If the
1551  *      wait flag is set, buf is non-NULL, and there is nothing available,
1552  *      it will wait until there is.  If the wait flag is 0 and there is
1553  *      nothing available, -EAGAIN is returned.  If buf is NULL, the value
1554  *      returned is the number of bytes that would have been read.
1555  *      actual_read_offset is the value that should be passed as the read
1556  *      offset to relay_bytes_consumed, needed only if the reader is not
1557  *      auto-consuming and the channel is MODE_NO_OVERWRITE, but in any case,
1558  *      it must not be NULL.  See Documentation/filesystems/relayfs.txt for
1559  *      more details.
1560  */
1561 ssize_t
1562 relay_read(struct rchan_reader *reader, char *buf, size_t count, int wait, u32 *actual_read_offset)
1563 {
1564         u32 new_offset;
1565         u32 read_offset;
1566         ssize_t read_count;
1567         
1568         if (reader == NULL || reader->rchan == NULL)
1569                 return -EBADF;
1570
1571         if (actual_read_offset == NULL)
1572                 return -EINVAL;
1573
1574         if (reader->vfs_reader)
1575                 read_offset = (u32)(reader->pos.file->f_pos);
1576         else
1577                 read_offset = reader->pos.f_pos;
1578         *actual_read_offset = read_offset;
1579         
1580         read_count = __relay_read(reader, buf, count, read_offset,
1581                                   &new_offset, actual_read_offset, wait);
1582
1583         if (read_count < 0)
1584                 return read_count;
1585
1586         if (reader->vfs_reader)
1587                 reader->pos.file->f_pos = new_offset;
1588         else
1589                 reader->pos.f_pos = new_offset;
1590
1591         if (reader->auto_consume && ((read_count) || (new_offset != read_offset)))
1592                 __reader_bytes_consumed(reader, read_count, *actual_read_offset);
1593
1594         if (read_count == 0 && !wait)
1595                 return -EAGAIN;
1596         
1597         return read_count;
1598 }
1599
1600 /**
1601  *      relay_bytes_avail - number of bytes available in current sub-buffer
1602  *      @reader: channel reader
1603  *      
1604  *      Returns the number of bytes available relative to the reader's
1605  *      current read position within the corresponding sub-buffer, 0 if
1606  *      there is nothing available.  See Documentation/filesystems/relayfs.txt
1607  *      for more details.
1608  */
1609 ssize_t
1610 relay_bytes_avail(struct rchan_reader *reader)
1611 {
1612         u32 f_pos;
1613         u32 new_offset;
1614         u32 actual_read_offset;
1615         ssize_t bytes_read;
1616         
1617         if (reader == NULL || reader->rchan == NULL)
1618                 return -EBADF;
1619         
1620         if (reader->vfs_reader)
1621                 f_pos = (u32)reader->pos.file->f_pos;
1622         else
1623                 f_pos = reader->pos.f_pos;
1624         new_offset = f_pos;
1625
1626         bytes_read = __relay_read(reader, NULL, reader->rchan->buf_size,
1627                                   f_pos, &new_offset, &actual_read_offset, 0);
1628
1629         if ((new_offset != f_pos) &&
1630             ((bytes_read == -EINTR) || (bytes_read == 0)))
1631                 bytes_read = -EAGAIN;
1632         else if ((bytes_read < 0) && (bytes_read != -EAGAIN))
1633                 bytes_read = 0;
1634
1635         return bytes_read;
1636 }
1637
1638 /**
1639  *      rchan_empty - boolean, is the channel empty wrt reader?
1640  *      @reader: channel reader
1641  *      
1642  *      Returns 1 if the channel is empty, 0 otherwise.
1643  */
1644 int
1645 rchan_empty(struct rchan_reader *reader)
1646 {
1647         ssize_t avail_count;
1648         u32 buffers_ready;
1649         struct rchan *rchan = reader->rchan;
1650         u32 cur_idx, curbuf_bytes;
1651         int mapped;
1652
1653         if (atomic_read(&rchan->suspended) == 1)
1654                 return 0;
1655
1656         mapped = atomic_read(&rchan->mapped);
1657         
1658         if (mapped && bulk_delivery(rchan)) {
1659                 buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
1660                 return buffers_ready ? 0 : 1;
1661         }
1662
1663         if (mapped && packet_delivery(rchan)) {
1664                 buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
1665                 if (buffers_ready)
1666                         return 0;
1667                 else {
1668                         cur_idx = relay_get_offset(rchan, NULL);
1669                         curbuf_bytes = cur_idx % rchan->buf_size;
1670                         return curbuf_bytes == rchan->bytes_consumed ? 1 : 0;
1671                 }
1672         }
1673
1674         avail_count = relay_bytes_avail(reader);
1675
1676         return avail_count ? 0 : 1;
1677 }
1678
1679 /**
1680  *      rchan_full - boolean, is the channel full wrt consuming reader?
1681  *      @reader: channel reader
1682  *      
1683  *      Returns 1 if the channel is full, 0 otherwise.
1684  */
1685 int
1686 rchan_full(struct rchan_reader *reader)
1687 {
1688         u32 buffers_ready;
1689         struct rchan *rchan = reader->rchan;
1690
1691         if (mode_continuous(rchan))
1692                 return 0;
1693
1694         buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
1695
1696         return buffers_ready > reader->rchan->n_bufs - 1 ? 1 : 0;
1697 }
1698
1699 /**
1700  *      relay_info - get status and other information about a relay channel
1701  *      @rchan_id: relay channel id
1702  *      @rchan_info: pointer to the rchan_info struct to be filled in
1703  *      
1704  *      Fills in an rchan_info struct with channel status and attribute 
1705  *      information.  See Documentation/filesystems/relayfs.txt for details.
1706  *
1707  *      Returns 0 if successful, negative otherwise.
1708  */
1709 int 
1710 relay_info(int rchan_id, struct rchan_info *rchan_info)
1711 {
1712         int i;
1713         struct rchan *rchan;
1714
1715         rchan = rchan_get(rchan_id);
1716         if (rchan == NULL)
1717                 return -EBADF;
1718
1719         rchan_info->flags = rchan->flags;
1720         rchan_info->buf_size = rchan->buf_size;
1721         rchan_info->buf_addr = rchan->buf;
1722         rchan_info->alloc_size = rchan->alloc_size;
1723         rchan_info->n_bufs = rchan->n_bufs;
1724         rchan_info->cur_idx = relay_get_offset(rchan, NULL);
1725         rchan_info->bufs_produced = rchan->bufs_produced;
1726         rchan_info->bufs_consumed = rchan->bufs_consumed;
1727         rchan_info->buf_id = rchan->buf_id;
1728
1729         for (i = 0; i < rchan->n_bufs; i++) {
1730                 rchan_info->unused_bytes[i] = rchan->unused_bytes[i];
1731                 if (using_lockless(rchan))
1732                         rchan_info->buffer_complete[i] = (atomic_read(&fill_count(rchan, i)) == rchan->buf_size);
1733                 else
1734                         rchan_info->buffer_complete[i] = 0;
1735         }
1736
1737         rchan_put(rchan);
1738
1739         return 0;
1740 }
1741
1742 /**
1743  *      __add_rchan_reader - creates and adds a reader to a channel
1744  *      @rchan: relay channel
1745  *      @filp: the file associated with rchan, if applicable
1746  *      @auto_consume: boolean, whether reader's reads automatically consume
1747  *      @map_reader: boolean, whether reader's reading via a channel mapping
1748  *
1749  *      Returns a pointer to the reader object create, NULL if unsuccessful
1750  *
1751  *      Creates and initializes an rchan_reader object for reading the channel.
1752  *      If filp is non-NULL, the reader is a VFS reader, otherwise not.
1753  *
1754  *      If the reader is a map reader, it isn't considered a VFS reader for
1755  *      our purposes.  Also, map_readers can't be auto-consuming.
1756  */
1757 struct rchan_reader *
1758 __add_rchan_reader(struct rchan *rchan, struct file *filp, int auto_consume, int map_reader)
1759 {
1760         struct rchan_reader *reader;
1761         u32 will_read;
1762         
1763         reader = kmalloc(sizeof(struct rchan_reader), GFP_KERNEL);
1764
1765         if (reader) {
1766                 write_lock(&rchan->open_readers_lock);
1767                 reader->rchan = rchan;
1768                 if (filp) {
1769                         reader->vfs_reader = 1;
1770                         reader->pos.file = filp;
1771                 } else {
1772                         reader->vfs_reader = 0;
1773                         reader->pos.f_pos = 0;
1774                 }
1775                 reader->map_reader = map_reader;
1776                 reader->auto_consume = auto_consume;
1777
1778                 if (!map_reader) {
1779                         will_read = rchan->bufs_produced % rchan->n_bufs;
1780                         if (!will_read && atomic_read(&rchan->suspended))
1781                                 will_read = rchan->n_bufs;
1782                         reader->bufs_consumed = rchan->bufs_produced - will_read;
1783                         rchan->bufs_consumed = reader->bufs_consumed;
1784                         rchan->bytes_consumed = reader->bytes_consumed = 0;
1785                         reader->offset_changed = 0;
1786                 }
1787                 
1788                 list_add(&reader->list, &rchan->open_readers);
1789                 write_unlock(&rchan->open_readers_lock);
1790         }
1791
1792         return reader;
1793 }
1794
1795 /**
1796  *      add_rchan_reader - create a reader for a channel
1797  *      @rchan_id: relay channel handle
1798  *      @auto_consume: boolean, whether reader's reads automatically consume
1799  *
1800  *      Returns a pointer to the reader object created, NULL if unsuccessful
1801  *
1802  *      Creates and initializes an rchan_reader object for reading the channel.
1803  *      This function is useful only for non-VFS readers.
1804  */
1805 struct rchan_reader *
1806 add_rchan_reader(int rchan_id, int auto_consume)
1807 {
1808         struct rchan *rchan = rchan_get(rchan_id);
1809         if (rchan == NULL)
1810                 return NULL;
1811
1812         return __add_rchan_reader(rchan, NULL, auto_consume, 0);
1813 }
1814
1815 /**
1816  *      add_map_reader - create a map reader for a channel
1817  *      @rchan_id: relay channel handle
1818  *
1819  *      Returns a pointer to the reader object created, NULL if unsuccessful
1820  *
1821  *      Creates and initializes an rchan_reader object for reading the channel.
1822  *      This function is useful only for map readers.
1823  */
1824 struct rchan_reader *
1825 add_map_reader(int rchan_id)
1826 {
1827         struct rchan *rchan = rchan_get(rchan_id);
1828         if (rchan == NULL)
1829                 return NULL;
1830
1831         return __add_rchan_reader(rchan, NULL, 0, 1);
1832 }
1833
1834 /**
1835  *      __remove_rchan_reader - destroy a channel reader
1836  *      @reader: channel reader
1837  *
1838  *      Internal - removes reader from the open readers list, and frees it.
1839  */
1840 void
1841 __remove_rchan_reader(struct rchan_reader *reader)
1842 {
1843         struct list_head *p;
1844         struct rchan_reader *found_reader = NULL;
1845         
1846         write_lock(&reader->rchan->open_readers_lock);
1847         list_for_each(p, &reader->rchan->open_readers) {
1848                 found_reader = list_entry(p, struct rchan_reader, list);
1849                 if (found_reader == reader) {
1850                         list_del(&found_reader->list);
1851                         break;
1852                 }
1853         }
1854         write_unlock(&reader->rchan->open_readers_lock);
1855
1856         if (found_reader)
1857                 kfree(found_reader);
1858 }
1859
1860 /**
1861  *      remove_rchan_reader - destroy a channel reader
1862  *      @reader: channel reader
1863  *
1864  *      Finds and removes the given reader from the channel.  This function
1865  *      is useful only for non-VFS readers.
1866  *
1867  *      Returns 0 if successful, negative otherwise.
1868  */
1869 int 
1870 remove_rchan_reader(struct rchan_reader *reader)
1871 {
1872         int err = 0;
1873         
1874         if (reader) {
1875                 rchan_put(reader->rchan);
1876                 __remove_rchan_reader(reader);
1877         } else
1878                 err = -EINVAL;
1879
1880         return err;
1881 }
1882
1883 /**
1884  *      remove_map_reader - destroy a map reader
1885  *      @reader: channel reader
1886  *
1887  *      Finds and removes the given map reader from the channel.  This function
1888  *      is useful only for map readers.
1889  *
1890  *      Returns 0 if successful, negative otherwise.
1891  */
1892 int 
1893 remove_map_reader(struct rchan_reader *reader)
1894 {
1895         return remove_rchan_reader(reader);
1896 }
1897
1898 EXPORT_SYMBOL(relay_open);
1899 EXPORT_SYMBOL(relay_close);
1900 EXPORT_SYMBOL(relay_reset);
1901 EXPORT_SYMBOL(relay_reserve);
1902 EXPORT_SYMBOL(relay_commit);
1903 EXPORT_SYMBOL(relay_read);
1904 EXPORT_SYMBOL(relay_write);
1905 EXPORT_SYMBOL(relay_bytes_avail);
1906 EXPORT_SYMBOL(relay_buffers_consumed);
1907 EXPORT_SYMBOL(relay_bytes_consumed);
1908 EXPORT_SYMBOL(relay_info);
1909 EXPORT_SYMBOL(relay_discard_init_buf);
1910
1911