X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=ipc%2Fmsg.c;h=095f2b74732433d4de85809993acf0f6076c715a;hb=c7b5ebbddf7bcd3651947760f423e3783bbe6573;hp=c4d3b2a7a699ebe223ef200c0c98d5662b52db07;hpb=9bf4aaab3e101692164d49b7ca357651eb691cb6;p=linux-2.6.git diff --git a/ipc/msg.c b/ipc/msg.c index c4d3b2a7a..095f2b747 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -103,14 +103,14 @@ static int newque (key_t key, int msgflg) msq->q_perm.security = NULL; retval = security_msg_queue_alloc(msq); if (retval) { - ipc_rcu_free(msq, sizeof(*msq)); + ipc_rcu_putref(msq); return retval; } id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni); if(id == -1) { security_msg_queue_free(msq); - ipc_rcu_free(msq, sizeof(*msq)); + ipc_rcu_putref(msq); return -ENOSPC; } @@ -166,8 +166,10 @@ static void expunge_all(struct msg_queue* msq, int res) msr = list_entry(tmp,struct msg_receiver,r_list); tmp = tmp->next; - msr->r_msg = ERR_PTR(res); + msr->r_msg = NULL; wake_up_process(msr->r_tsk); + smp_mb(); + msr->r_msg = ERR_PTR(res); } } /* @@ -196,7 +198,7 @@ static void freeque (struct msg_queue *msq, int id) } atomic_sub(msq->q_cbytes, &msg_bytes); security_msg_queue_free(msq); - ipc_rcu_free(msq, sizeof(struct msg_queue)); + ipc_rcu_putref(msq); } asmlinkage long sys_msgget (key_t key, int msgflg) @@ -528,13 +530,17 @@ static inline int pipelined_send(struct msg_queue* msq, struct msg_msg* msg) !security_msg_queue_msgrcv(msq, msg, msr->r_tsk, msr->r_msgtype, msr->r_mode)) { list_del(&msr->r_list); if(msr->r_maxsize < msg->m_ts) { - msr->r_msg = ERR_PTR(-E2BIG); + msr->r_msg = NULL; wake_up_process(msr->r_tsk); + smp_mb(); + msr->r_msg = ERR_PTR(-E2BIG); } else { - msr->r_msg = msg; + msr->r_msg = NULL; msq->q_lrpid = msr->r_tsk->pid; msq->q_rtime = get_seconds(); wake_up_process(msr->r_tsk); + smp_mb(); + msr->r_msg = msg; return 1; } } @@ -567,43 +573,49 @@ asmlinkage long sys_msgsnd (int msqid, struct msgbuf __user *msgp, size_t msgsz, err=-EINVAL; if(msq==NULL) goto out_free; -retry: + err= -EIDRM; if (msg_checkid(msq,msqid)) goto out_unlock_free; - err=-EACCES; - if (ipcperms(&msq->q_perm, S_IWUGO)) - goto out_unlock_free; + for (;;) { + struct msg_sender s; - err = security_msg_queue_msgsnd(msq, msg, msgflg); - if (err) - goto out_unlock_free; + err=-EACCES; + if (ipcperms(&msq->q_perm, S_IWUGO)) + goto out_unlock_free; - if(msgsz + msq->q_cbytes > msq->q_qbytes || - 1 + msq->q_qnum > msq->q_qbytes) { - struct msg_sender s; + err = security_msg_queue_msgsnd(msq, msg, msgflg); + if (err) + goto out_unlock_free; + if(msgsz + msq->q_cbytes <= msq->q_qbytes && + 1 + msq->q_qnum <= msq->q_qbytes) { + break; + } + + /* queue full, wait: */ if(msgflg&IPC_NOWAIT) { err=-EAGAIN; goto out_unlock_free; } ss_add(msq, &s); + ipc_rcu_getref(msq); msg_unlock(msq); schedule(); - current->state= TASK_RUNNING; - msq = msg_lock(msqid); - err = -EIDRM; - if(msq==NULL) - goto out_free; + ipc_lock_by_ptr(&msq->q_perm); + ipc_rcu_putref(msq); + if (msq->q_perm.deleted) { + err = -EIDRM; + goto out_unlock_free; + } ss_del(&s); if (signal_pending(current)) { - err=-EINTR; + err=-ERESTARTNOHAND; goto out_unlock_free; } - goto retry; } msq->q_lspid = current->tgid; @@ -652,10 +664,7 @@ asmlinkage long sys_msgrcv (int msqid, struct msgbuf __user *msgp, size_t msgsz, long msgtyp, int msgflg) { struct msg_queue *msq; - struct msg_receiver msr_d; - struct list_head* tmp; - struct msg_msg* msg, *found_msg; - int err; + struct msg_msg *msg; int mode; if (msqid < 0 || (long) msgsz < 0) @@ -665,62 +674,57 @@ asmlinkage long sys_msgrcv (int msqid, struct msgbuf __user *msgp, size_t msgsz, msq = msg_lock(msqid); if(msq==NULL) return -EINVAL; -retry: - err = -EIDRM; + + msg = ERR_PTR(-EIDRM); if (msg_checkid(msq,msqid)) goto out_unlock; - err=-EACCES; - if (ipcperms (&msq->q_perm, S_IRUGO)) - goto out_unlock; + for (;;) { + struct msg_receiver msr_d; + struct list_head* tmp; - tmp = msq->q_messages.next; - found_msg=NULL; - while (tmp != &msq->q_messages) { - msg = list_entry(tmp,struct msg_msg,m_list); - if(testmsg(msg,msgtyp,mode) && - !security_msg_queue_msgrcv(msq, msg, current, msgtyp, mode)) { - found_msg = msg; - if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) { - found_msg=msg; - msgtyp=msg->m_type-1; - } else { - found_msg=msg; - break; - } - } - tmp = tmp->next; - } - if(found_msg) { - msg=found_msg; - if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) { - err=-E2BIG; + msg = ERR_PTR(-EACCES); + if (ipcperms (&msq->q_perm, S_IRUGO)) goto out_unlock; + + msg = ERR_PTR(-EAGAIN); + tmp = msq->q_messages.next; + while (tmp != &msq->q_messages) { + struct msg_msg *walk_msg; + walk_msg = list_entry(tmp,struct msg_msg,m_list); + if(testmsg(walk_msg,msgtyp,mode) && + !security_msg_queue_msgrcv(msq, walk_msg, current, msgtyp, mode)) { + msg = walk_msg; + if(mode == SEARCH_LESSEQUAL && walk_msg->m_type != 1) { + msg=walk_msg; + msgtyp=walk_msg->m_type-1; + } else { + msg=walk_msg; + break; + } + } + tmp = tmp->next; } - list_del(&msg->m_list); - msq->q_qnum--; - msq->q_rtime = get_seconds(); - msq->q_lrpid = current->tgid; - msq->q_cbytes -= msg->m_ts; - atomic_sub(msg->m_ts,&msg_bytes); - atomic_dec(&msg_hdrs); - ss_wakeup(&msq->q_senders,0); - msg_unlock(msq); -out_success: - msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz; - if (put_user (msg->m_type, &msgp->mtype) || - store_msg(msgp->mtext, msg, msgsz)) { - msgsz = -EFAULT; + if(!IS_ERR(msg)) { + /* Found a suitable message. Unlink it from the queue. */ + if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) { + msg = ERR_PTR(-E2BIG); + goto out_unlock; + } + list_del(&msg->m_list); + msq->q_qnum--; + msq->q_rtime = get_seconds(); + msq->q_lrpid = current->tgid; + msq->q_cbytes -= msg->m_ts; + atomic_sub(msg->m_ts,&msg_bytes); + atomic_dec(&msg_hdrs); + ss_wakeup(&msq->q_senders,0); + msg_unlock(msq); + break; } - free_msg(msg); - return msgsz; - } else - { - /* no message waiting. Prepare for pipelined - * receive. - */ + /* No message waiting. Wait for a message */ if (msgflg & IPC_NOWAIT) { - err=-ENOMSG; + msg = ERR_PTR(-ENOMSG); goto out_unlock; } list_add_tail(&msr_d.r_list,&msq->q_receivers); @@ -730,52 +734,76 @@ out_success: if(msgflg & MSG_NOERROR) msr_d.r_maxsize = INT_MAX; else - msr_d.r_maxsize = msgsz; + msr_d.r_maxsize = msgsz; msr_d.r_msg = ERR_PTR(-EAGAIN); current->state = TASK_INTERRUPTIBLE; msg_unlock(msq); schedule(); - /* - * The below optimisation is buggy. A sleeping thread that is - * woken up checks if it got a message and if so, copies it to - * userspace and just returns without taking any locks. - * But this return to user space can be faster than the message - * send, and if the receiver immediately exits the - * wake_up_process performed by the sender will oops. + /* Lockless receive, part 1: + * Disable preemption. We don't hold a reference to the queue + * and getting a reference would defeat the idea of a lockless + * operation, thus the code relies on rcu to guarantee the + * existance of msq: + * Prior to destruction, expunge_all(-EIRDM) changes r_msg. + * Thus if r_msg is -EAGAIN, then the queue not yet destroyed. + * rcu_read_lock() prevents preemption between reading r_msg + * and the spin_lock() inside ipc_lock_by_ptr(). + */ + rcu_read_lock(); + + /* Lockless receive, part 2: + * Wait until pipelined_send or expunge_all are outside of + * wake_up_process(). There is a race with exit(), see + * ipc/mqueue.c for the details. */ -#if 0 msg = (struct msg_msg*) msr_d.r_msg; - if(!IS_ERR(msg)) - goto out_success; -#endif + while (msg == NULL) { + cpu_relax(); + msg = (struct msg_msg*) msr_d.r_msg; + } - msq = msg_lock(msqid); - msg = (struct msg_msg*)msr_d.r_msg; - if(!IS_ERR(msg)) { - /* our message arived while we waited for - * the spinlock. Process it. - */ - if(msq) - msg_unlock(msq); - goto out_success; + /* Lockless receive, part 3: + * If there is a message or an error then accept it without + * locking. + */ + if(msg != ERR_PTR(-EAGAIN)) { + rcu_read_unlock(); + break; } - err = PTR_ERR(msg); - if(err == -EAGAIN) { - if(!msq) - BUG(); - list_del(&msr_d.r_list); - if (signal_pending(current)) - err=-EINTR; - else - goto retry; + + /* Lockless receive, part 3: + * Acquire the queue spinlock. + */ + ipc_lock_by_ptr(&msq->q_perm); + rcu_read_unlock(); + + /* Lockless receive, part 4: + * Repeat test after acquiring the spinlock. + */ + msg = (struct msg_msg*)msr_d.r_msg; + if(msg != ERR_PTR(-EAGAIN)) + goto out_unlock; + + list_del(&msr_d.r_list); + if (signal_pending(current)) { + msg = ERR_PTR(-ERESTARTNOHAND); +out_unlock: + msg_unlock(msq); + break; } } -out_unlock: - if(msq) - msg_unlock(msq); - return err; + if (IS_ERR(msg)) + return PTR_ERR(msg); + + msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz; + if (put_user (msg->m_type, &msgp->mtype) || + store_msg(msgp->mtext, msg, msgsz)) { + msgsz = -EFAULT; + } + free_msg(msg); + return msgsz; } #ifdef CONFIG_PROC_FS @@ -794,7 +822,7 @@ static int sysvipc_msg_read_proc(char *buffer, char **start, off_t offset, int l if (msq) { if (!vx_check(msq->q_perm.xid, VX_IDENT)) { msg_unlock(msq); - continue; + continue; } len += sprintf(buffer + len, "%10d %10d %4o %10lu %10lu %5u %5u %5u %5u %5u %5u %10lu %10lu %10lu\n", msq->q_perm.key,