Fedora kernel-2.6.17-1.2142_FC4 patched with stable patch-2.6.17.4-vs2.0.2-rc26.diff
[linux-2.6.git] / arch / ia64 / sn / kernel / xpc_channel.c
index 0bf6fbc..8255a9b 100644 (file)
@@ -3,7 +3,7 @@
  * License.  See the file "COPYING" in the main directory of this archive
  * for more details.
  *
- * Copyright (c) 2004-2005 Silicon Graphics, Inc.  All Rights Reserved.
+ * Copyright (c) 2004-2006 Silicon Graphics, Inc.  All Rights Reserved.
  */
 
 
 #include <linux/sched.h>
 #include <linux/cache.h>
 #include <linux/interrupt.h>
-#include <linux/slab.h>
+#include <linux/mutex.h>
+#include <linux/completion.h>
 #include <asm/sn/bte.h>
 #include <asm/sn/sn_sal.h>
-#include "xpc.h"
+#include <asm/sn/xpc.h>
+
+
+/*
+ * Guarantee that the kzalloc'd memory is cacheline aligned.
+ */
+static void *
+xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
+{
+       /* see if kzalloc will give us cachline aligned memory by default */
+       *base = kzalloc(size, flags);
+       if (*base == NULL) {
+               return NULL;
+       }
+       if ((u64) *base == L1_CACHE_ALIGN((u64) *base)) {
+               return *base;
+       }
+       kfree(*base);
+
+       /* nope, we'll have to do it ourselves */
+       *base = kzalloc(size + L1_CACHE_BYTES, flags);
+       if (*base == NULL) {
+               return NULL;
+       }
+       return (void *) L1_CACHE_ALIGN((u64) *base);
+}
 
 
 /*
@@ -56,7 +82,8 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
                atomic_set(&ch->n_to_notify, 0);
 
                spin_lock_init(&ch->lock);
-               sema_init(&ch->msg_to_pull_sema, 1);    /* mutex */
+               mutex_init(&ch->msg_to_pull_mutex);
+               init_completion(&ch->wdisconnect_wait);
 
                atomic_set(&ch->n_on_msg_allocate_wq, 0);
                init_waitqueue_head(&ch->msg_allocate_wq);
@@ -72,7 +99,7 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
 enum xpc_retval
 xpc_setup_infrastructure(struct xpc_partition *part)
 {
-       int ret;
+       int ret, cpuid;
        struct timer_list *timer;
        partid_t partid = XPC_PARTID(part);
 
@@ -90,20 +117,19 @@ xpc_setup_infrastructure(struct xpc_partition *part)
         * Allocate all of the channel structures as a contiguous chunk of
         * memory.
         */
-       part->channels = kmalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
+       part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
                                                                GFP_KERNEL);
        if (part->channels == NULL) {
                dev_err(xpc_chan, "can't get memory for channels\n");
                return xpcNoMemory;
        }
-       memset(part->channels, 0, sizeof(struct xpc_channel) * XPC_NCHANNELS);
 
        part->nchannels = XPC_NCHANNELS;
 
 
        /* allocate all the required GET/PUT values */
 
-       part->local_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
+       part->local_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
                                        GFP_KERNEL, &part->local_GPs_base);
        if (part->local_GPs == NULL) {
                kfree(part->channels);
@@ -112,60 +138,57 @@ xpc_setup_infrastructure(struct xpc_partition *part)
                        "values\n");
                return xpcNoMemory;
        }
-       memset(part->local_GPs, 0, XPC_GP_SIZE);
 
-       part->remote_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
+       part->remote_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
                                        GFP_KERNEL, &part->remote_GPs_base);
        if (part->remote_GPs == NULL) {
-               kfree(part->channels);
-               part->channels = NULL;
-               kfree(part->local_GPs_base);
-               part->local_GPs = NULL;
                dev_err(xpc_chan, "can't get memory for remote get/put "
                        "values\n");
+               kfree(part->local_GPs_base);
+               part->local_GPs = NULL;
+               kfree(part->channels);
+               part->channels = NULL;
                return xpcNoMemory;
        }
-       memset(part->remote_GPs, 0, XPC_GP_SIZE);
 
 
        /* allocate all the required open and close args */
 
-       part->local_openclose_args = xpc_kmalloc_cacheline_aligned(
+       part->local_openclose_args = xpc_kzalloc_cacheline_aligned(
                                        XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
                                        &part->local_openclose_args_base);
        if (part->local_openclose_args == NULL) {
-               kfree(part->channels);
-               part->channels = NULL;
-               kfree(part->local_GPs_base);
-               part->local_GPs = NULL;
+               dev_err(xpc_chan, "can't get memory for local connect args\n");
                kfree(part->remote_GPs_base);
                part->remote_GPs = NULL;
-               dev_err(xpc_chan, "can't get memory for local connect args\n");
+               kfree(part->local_GPs_base);
+               part->local_GPs = NULL;
+               kfree(part->channels);
+               part->channels = NULL;
                return xpcNoMemory;
        }
-       memset(part->local_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
 
-       part->remote_openclose_args = xpc_kmalloc_cacheline_aligned(
+       part->remote_openclose_args = xpc_kzalloc_cacheline_aligned(
                                        XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
                                        &part->remote_openclose_args_base);
        if (part->remote_openclose_args == NULL) {
-               kfree(part->channels);
-               part->channels = NULL;
-               kfree(part->local_GPs_base);
-               part->local_GPs = NULL;
-               kfree(part->remote_GPs_base);
-               part->remote_GPs = NULL;
+               dev_err(xpc_chan, "can't get memory for remote connect args\n");
                kfree(part->local_openclose_args_base);
                part->local_openclose_args = NULL;
-               dev_err(xpc_chan, "can't get memory for remote connect args\n");
+               kfree(part->remote_GPs_base);
+               part->remote_GPs = NULL;
+               kfree(part->local_GPs_base);
+               part->local_GPs = NULL;
+               kfree(part->channels);
+               part->channels = NULL;
                return xpcNoMemory;
        }
-       memset(part->remote_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
 
 
        xpc_initialize_channels(part, partid);
 
        atomic_set(&part->nchannels_active, 0);
+       atomic_set(&part->nchannels_engaged, 0);
 
 
        /* local_IPI_amo were set to 0 by an earlier memset() */
@@ -182,18 +205,18 @@ xpc_setup_infrastructure(struct xpc_partition *part)
        ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, SA_SHIRQ,
                                part->IPI_owner, (void *) (u64) partid);
        if (ret != 0) {
-               kfree(part->channels);
-               part->channels = NULL;
-               kfree(part->local_GPs_base);
-               part->local_GPs = NULL;
-               kfree(part->remote_GPs_base);
-               part->remote_GPs = NULL;
-               kfree(part->local_openclose_args_base);
-               part->local_openclose_args = NULL;
-               kfree(part->remote_openclose_args_base);
-               part->remote_openclose_args = NULL;
                dev_err(xpc_chan, "can't register NOTIFY IRQ handler, "
                        "errno=%d\n", -ret);
+               kfree(part->remote_openclose_args_base);
+               part->remote_openclose_args = NULL;
+               kfree(part->local_openclose_args_base);
+               part->local_openclose_args = NULL;
+               kfree(part->remote_GPs_base);
+               part->remote_GPs = NULL;
+               kfree(part->local_GPs_base);
+               part->local_GPs = NULL;
+               kfree(part->channels);
+               part->channels = NULL;
                return xpcLackOfResources;
        }
 
@@ -209,7 +232,7 @@ xpc_setup_infrastructure(struct xpc_partition *part)
         * With the setting of the partition setup_state to XPC_P_SETUP, we're
         * declaring that this partition is ready to go.
         */
-       (volatile u8) part->setup_state = XPC_P_SETUP;
+       part->setup_state = XPC_P_SETUP;
 
 
        /*
@@ -223,11 +246,11 @@ xpc_setup_infrastructure(struct xpc_partition *part)
        xpc_vars_part[partid].openclose_args_pa =
                                        __pa(part->local_openclose_args);
        xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va);
-       xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(smp_processor_id());
-       xpc_vars_part[partid].IPI_phys_cpuid =
-                                       cpu_physical_id(smp_processor_id());
+       cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */
+       xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid);
+       xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid);
        xpc_vars_part[partid].nchannels = part->nchannels;
-       (volatile u64) xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
+       xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
 
        return xpcSuccess;
 }
@@ -355,7 +378,7 @@ xpc_pull_remote_vars_part(struct xpc_partition *part)
 
                /* let the other side know that we've pulled their variables */
 
-               (volatile u64) xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
+               xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
        }
 
        if (pulled_entry->magic == XPC_VP_MAGIC1) {
@@ -442,22 +465,20 @@ xpc_allocate_local_msgqueue(struct xpc_channel *ch)
        for (nentries = ch->local_nentries; nentries > 0; nentries--) {
 
                nbytes = nentries * ch->msg_size;
-               ch->local_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
-                                               (GFP_KERNEL | GFP_DMA),
+               ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
+                                               GFP_KERNEL,
                                                &ch->local_msgqueue_base);
                if (ch->local_msgqueue == NULL) {
                        continue;
                }
-               memset(ch->local_msgqueue, 0, nbytes);
 
                nbytes = nentries * sizeof(struct xpc_notify);
-               ch->notify_queue = kmalloc(nbytes, (GFP_KERNEL | GFP_DMA));
+               ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
                if (ch->notify_queue == NULL) {
                        kfree(ch->local_msgqueue_base);
                        ch->local_msgqueue = NULL;
                        continue;
                }
-               memset(ch->notify_queue, 0, nbytes);
 
                spin_lock_irqsave(&ch->lock, irq_flags);
                if (nentries < ch->local_nentries) {
@@ -497,13 +518,12 @@ xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
        for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
 
                nbytes = nentries * ch->msg_size;
-               ch->remote_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
-                                               (GFP_KERNEL | GFP_DMA),
+               ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
+                                               GFP_KERNEL,
                                                &ch->remote_msgqueue_base);
                if (ch->remote_msgqueue == NULL) {
                        continue;
                }
-               memset(ch->remote_msgqueue, 0, nbytes);
 
                spin_lock_irqsave(&ch->lock, irq_flags);
                if (nentries < ch->remote_nentries) {
@@ -532,7 +552,6 @@ static enum xpc_retval
 xpc_allocate_msgqueues(struct xpc_channel *ch)
 {
        unsigned long irq_flags;
-       int i;
        enum xpc_retval ret;
 
 
@@ -550,13 +569,6 @@ xpc_allocate_msgqueues(struct xpc_channel *ch)
                return ret;
        }
 
-       for (i = 0; i < ch->local_nentries; i++) {
-               /* use a semaphore as an event wait queue */
-               sema_init(&ch->notify_queue[i].sema, 0);
-       }
-
-       sema_init(&ch->teardown_sema, 0);       /* event wait */
-
        spin_lock_irqsave(&ch->lock, irq_flags);
        ch->flags |= XPC_C_SETUP;
        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -625,6 +637,55 @@ xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
 }
 
 
+/*
+ * Notify those who wanted to be notified upon delivery of their message.
+ */
+static void
+xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
+{
+       struct xpc_notify *notify;
+       u8 notify_type;
+       s64 get = ch->w_remote_GP.get - 1;
+
+
+       while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
+
+               notify = &ch->notify_queue[get % ch->local_nentries];
+
+               /*
+                * See if the notify entry indicates it was associated with
+                * a message who's sender wants to be notified. It is possible
+                * that it is, but someone else is doing or has done the
+                * notification.
+                */
+               notify_type = notify->type;
+               if (notify_type == 0 ||
+                               cmpxchg(&notify->type, notify_type, 0) !=
+                                                               notify_type) {
+                       continue;
+               }
+
+               DBUG_ON(notify_type != XPC_N_CALL);
+
+               atomic_dec(&ch->n_to_notify);
+
+               if (notify->func != NULL) {
+                       dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
+                               "msg_number=%ld, partid=%d, channel=%d\n",
+                               (void *) notify, get, ch->partid, ch->number);
+
+                       notify->func(reason, ch->partid, ch->number,
+                                                               notify->key);
+
+                       dev_dbg(xpc_chan, "notify->func() returned, "
+                               "notify=0x%p, msg_number=%ld, partid=%d, "
+                               "channel=%d\n", (void *) notify, get,
+                               ch->partid, ch->number);
+               }
+       }
+}
+
+
 /*
  * Free up message queues and other stuff that were allocated for the specified
  * channel.
@@ -669,9 +730,6 @@ xpc_free_msgqueues(struct xpc_channel *ch)
                ch->remote_msgqueue = NULL;
                kfree(ch->notify_queue);
                ch->notify_queue = NULL;
-
-               /* in case someone is waiting for the teardown to complete */
-               up(&ch->teardown_sema);
        }
 }
 
@@ -683,7 +741,7 @@ static void
 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
 {
        struct xpc_partition *part = &xpc_partitions[ch->partid];
-       u32 ch_flags = ch->flags;
+       u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
 
 
        DBUG_ON(!spin_is_locked(&ch->lock));
@@ -696,17 +754,20 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
 
        /* make sure all activity has settled down first */
 
-       if (atomic_read(&ch->references) > 0) {
+       if (atomic_read(&ch->references) > 0 ||
+                       ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
+                       !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE))) {
                return;
        }
        DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
 
-       /* it's now safe to free the channel's message queues */
-
-       xpc_free_msgqueues(ch);
-       DBUG_ON(ch->flags & XPC_C_SETUP);
+       if (part->act_state == XPC_P_DEACTIVATING) {
+               /* can't proceed until the other side disengages from us */
+               if (xpc_partition_engaged(1UL << ch->partid)) {
+                       return;
+               }
 
-       if (part->act_state != XPC_P_DEACTIVATING) {
+       } else {
 
                /* as long as the other side is up do the full protocol */
 
@@ -724,16 +785,46 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
                }
        }
 
+       /* wake those waiting for notify completion */
+       if (atomic_read(&ch->n_to_notify) > 0) {
+               /* >>> we do callout while holding ch->lock */
+               xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
+       }
+
        /* both sides are disconnected now */
 
-       ch->flags = XPC_C_DISCONNECTED; /* clear all flags, but this one */
+       if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
+               spin_unlock_irqrestore(&ch->lock, *irq_flags);
+               xpc_disconnect_callout(ch, xpcDisconnected);
+               spin_lock_irqsave(&ch->lock, *irq_flags);
+       }
+
+       /* it's now safe to free the channel's message queues */
+       xpc_free_msgqueues(ch);
+
+       /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
+       ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
 
        atomic_dec(&part->nchannels_active);
 
-       if (ch_flags & XPC_C_WASCONNECTED) {
+       if (channel_was_connected) {
                dev_info(xpc_chan, "channel %d to partition %d disconnected, "
                        "reason=%d\n", ch->number, ch->partid, ch->reason);
        }
+
+       if (ch->flags & XPC_C_WDISCONNECT) {
+               /* we won't lose the CPU since we're holding ch->lock */
+               complete(&ch->wdisconnect_wait);
+       } else if (ch->delayed_IPI_flags) {
+               if (part->act_state != XPC_P_DEACTIVATING) {
+                       /* time to take action on any delayed IPI flags */
+                       spin_lock(&part->IPI_lock);
+                       XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
+                                                       ch->delayed_IPI_flags);
+                       spin_unlock(&part->IPI_lock);
+               }
+               ch->delayed_IPI_flags = 0;
+       }
 }
 
 
@@ -754,6 +845,19 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
 
        spin_lock_irqsave(&ch->lock, irq_flags);
 
+again:
+
+       if ((ch->flags & XPC_C_DISCONNECTED) &&
+                                       (ch->flags & XPC_C_WDISCONNECT)) {
+               /*
+                * Delay processing IPI flags until thread waiting disconnect
+                * has had a chance to see that the channel is disconnected.
+                */
+               ch->delayed_IPI_flags |= IPI_flags;
+               spin_unlock_irqrestore(&ch->lock, irq_flags);
+               return;
+       }
+
 
        if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
 
@@ -764,7 +868,7 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                /*
                 * If RCLOSEREQUEST is set, we're probably waiting for
                 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
-                * with this RCLOSEQREUQEST in the IPI_flags.
+                * with this RCLOSEREQUEST in the IPI_flags.
                 */
 
                if (ch->flags & XPC_C_RCLOSEREQUEST) {
@@ -779,14 +883,22 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
 
                        /* both sides have finished disconnecting */
                        xpc_process_disconnect(ch, &irq_flags);
+                       DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
+                       goto again;
                }
 
                if (ch->flags & XPC_C_DISCONNECTED) {
-                       // >>> explain this section
-
                        if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
-                               DBUG_ON(part->act_state !=
-                                                       XPC_P_DEACTIVATING);
+                               if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
+                                        ch_number) & XPC_IPI_OPENREQUEST)) {
+
+                                       DBUG_ON(ch->delayed_IPI_flags != 0);
+                                       spin_lock(&part->IPI_lock);
+                                       XPC_SET_IPI_FLAGS(part->local_IPI_amo,
+                                                       ch_number,
+                                                       XPC_IPI_CLOSEREQUEST);
+                                       spin_unlock(&part->IPI_lock);
+                               }
                                spin_unlock_irqrestore(&ch->lock, irq_flags);
                                return;
                        }
@@ -816,9 +928,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                        }
 
                        XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
-               } else {
-                       xpc_process_disconnect(ch, &irq_flags);
+
+                       DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
+                       spin_unlock_irqrestore(&ch->lock, irq_flags);
+                       return;
                }
+
+               xpc_process_disconnect(ch, &irq_flags);
        }
 
 
@@ -834,7 +950,20 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                }
 
                DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
-               DBUG_ON(!(ch->flags & XPC_C_RCLOSEREQUEST));
+
+               if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
+                       if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
+                                               & XPC_IPI_CLOSEREQUEST)) {
+
+                               DBUG_ON(ch->delayed_IPI_flags != 0);
+                               spin_lock(&part->IPI_lock);
+                               XPC_SET_IPI_FLAGS(part->local_IPI_amo,
+                                               ch_number, XPC_IPI_CLOSEREPLY);
+                               spin_unlock(&part->IPI_lock);
+                       }
+                       spin_unlock_irqrestore(&ch->lock, irq_flags);
+                       return;
+               }
 
                ch->flags |= XPC_C_RCLOSEREPLY;
 
@@ -852,8 +981,14 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                        "channel=%d\n", args->msg_size, args->local_nentries,
                        ch->partid, ch->number);
 
-               if ((ch->flags & XPC_C_DISCONNECTING) ||
-                                       part->act_state == XPC_P_DEACTIVATING) {
+               if (part->act_state == XPC_P_DEACTIVATING ||
+                                       (ch->flags & XPC_C_ROPENREQUEST)) {
+                       spin_unlock_irqrestore(&ch->lock, irq_flags);
+                       return;
+               }
+
+               if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
+                       ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
                        return;
                }
@@ -867,8 +1002,11 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                 *      msg_size = size of channel's messages in bytes
                 *      local_nentries = remote partition's local_nentries
                 */
-               DBUG_ON(args->msg_size == 0);
-               DBUG_ON(args->local_nentries == 0);
+               if (args->msg_size == 0 || args->local_nentries == 0) {
+                       /* assume OPENREQUEST was delayed by mistake */
+                       spin_unlock_irqrestore(&ch->lock, irq_flags);
+                       return;
+               }
 
                ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
                ch->remote_nentries = args->local_nentries;
@@ -906,7 +1044,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
                        return;
                }
-               DBUG_ON(!(ch->flags & XPC_C_OPENREQUEST));
+               if (!(ch->flags & XPC_C_OPENREQUEST)) {
+                       XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError,
+                                                               &irq_flags);
+                       spin_unlock_irqrestore(&ch->lock, irq_flags);
+                       return;
+               }
+
                DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
                DBUG_ON(ch->flags & XPC_C_CONNECTED);
 
@@ -960,12 +1104,12 @@ xpc_connect_channel(struct xpc_channel *ch)
        struct xpc_registration *registration = &xpc_registrations[ch->number];
 
 
-       if (down_interruptible(&registration->sema) != 0) {
-               return xpcInterrupted;
+       if (mutex_trylock(&registration->mutex) == 0) {
+               return xpcRetry;
        }
 
        if (!XPC_CHANNEL_REGISTERED(ch->number)) {
-               up(&registration->sema);
+               mutex_unlock(&registration->mutex);
                return xpcUnregistered;
        }
 
@@ -976,7 +1120,7 @@ xpc_connect_channel(struct xpc_channel *ch)
 
        if (ch->flags & XPC_C_DISCONNECTING) {
                spin_unlock_irqrestore(&ch->lock, irq_flags);
-               up(&registration->sema);
+               mutex_unlock(&registration->mutex);
                return ch->reason;
        }
 
@@ -1008,7 +1152,7 @@ xpc_connect_channel(struct xpc_channel *ch)
                         * channel lock be locked and will unlock and relock
                         * the channel lock as needed.
                         */
-                       up(&registration->sema);
+                       mutex_unlock(&registration->mutex);
                        XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
                                                                &irq_flags);
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -1023,7 +1167,7 @@ xpc_connect_channel(struct xpc_channel *ch)
                atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
        }
 
-       up(&registration->sema);
+       mutex_unlock(&registration->mutex);
 
 
        /* initiate the connection */
@@ -1039,55 +1183,6 @@ xpc_connect_channel(struct xpc_channel *ch)
 }
 
 
-/*
- * Notify those who wanted to be notified upon delivery of their message.
- */
-static void
-xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
-{
-       struct xpc_notify *notify;
-       u8 notify_type;
-       s64 get = ch->w_remote_GP.get - 1;
-
-
-       while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
-
-               notify = &ch->notify_queue[get % ch->local_nentries];
-
-               /*
-                * See if the notify entry indicates it was associated with
-                * a message who's sender wants to be notified. It is possible
-                * that it is, but someone else is doing or has done the
-                * notification.
-                */
-               notify_type = notify->type;
-               if (notify_type == 0 ||
-                               cmpxchg(&notify->type, notify_type, 0) !=
-                                                               notify_type) {
-                       continue;
-               }
-
-               DBUG_ON(notify_type != XPC_N_CALL);
-
-               atomic_dec(&ch->n_to_notify);
-
-               if (notify->func != NULL) {
-                       dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
-                               "msg_number=%ld, partid=%d, channel=%d\n",
-                               (void *) notify, get, ch->partid, ch->number);
-
-                       notify->func(reason, ch->partid, ch->number,
-                                                               notify->key);
-
-                       dev_dbg(xpc_chan, "notify->func() returned, "
-                               "notify=0x%p, msg_number=%ld, partid=%d, "
-                               "channel=%d\n", (void *) notify, get,
-                               ch->partid, ch->number);
-               }
-       }
-}
-
-
 /*
  * Clear some of the msg flags in the local message queue.
  */
@@ -1183,7 +1278,7 @@ xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
                 */
                xpc_clear_local_msgqueue_flags(ch);
 
-               (volatile s64) ch->w_remote_GP.get = ch->remote_GP.get;
+               ch->w_remote_GP.get = ch->remote_GP.get;
 
                dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
                        "channel=%d\n", ch->w_remote_GP.get, ch->partid,
@@ -1211,7 +1306,7 @@ xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
                 */
                xpc_clear_remote_msgqueue_flags(ch);
 
-               (volatile s64) ch->w_remote_GP.put = ch->remote_GP.put;
+               ch->w_remote_GP.put = ch->remote_GP.put;
 
                dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
                        "channel=%d\n", ch->w_remote_GP.put, ch->partid,
@@ -1223,7 +1318,7 @@ xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
                                "delivered=%d, partid=%d, channel=%d\n",
                                nmsgs_sent, ch->partid, ch->number);
 
-                       if (ch->flags & XPC_C_CONNECTCALLOUT) {
+                       if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
                                xpc_activate_kthreads(ch, nmsgs_sent);
                        }
                }
@@ -1240,6 +1335,7 @@ xpc_process_channel_activity(struct xpc_partition *part)
        u64 IPI_amo, IPI_flags;
        struct xpc_channel *ch;
        int ch_number;
+       u32 ch_flags;
 
 
        IPI_amo = xpc_get_IPI_flags(part);
@@ -1266,8 +1362,9 @@ xpc_process_channel_activity(struct xpc_partition *part)
                        xpc_process_openclose_IPI(part, ch_number, IPI_flags);
                }
 
+               ch_flags = ch->flags;   /* need an atomic snapshot of flags */
 
-               if (ch->flags & XPC_C_DISCONNECTING) {
+               if (ch_flags & XPC_C_DISCONNECTING) {
                        spin_lock_irqsave(&ch->lock, irq_flags);
                        xpc_process_disconnect(ch, &irq_flags);
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -1278,9 +1375,9 @@ xpc_process_channel_activity(struct xpc_partition *part)
                        continue;
                }
 
-               if (!(ch->flags & XPC_C_CONNECTED)) {
-                       if (!(ch->flags & XPC_C_OPENREQUEST)) {
-                               DBUG_ON(ch->flags & XPC_C_SETUP);
+               if (!(ch_flags & XPC_C_CONNECTED)) {
+                       if (!(ch_flags & XPC_C_OPENREQUEST)) {
+                               DBUG_ON(ch_flags & XPC_C_SETUP);
                                (void) xpc_connect_channel(ch);
                        } else {
                                spin_lock_irqsave(&ch->lock, irq_flags);
@@ -1305,8 +1402,8 @@ xpc_process_channel_activity(struct xpc_partition *part)
 
 
 /*
- * XPC's heartbeat code calls this function to inform XPC that a partition has
- * gone down.  XPC responds by tearing down the XPartition Communication
+ * XPC's heartbeat code calls this function to inform XPC that a partition is
+ * going down.  XPC responds by tearing down the XPartition Communication
  * infrastructure used for the just downed partition.
  *
  * XPC's heartbeat code will never call this function and xpc_partition_up()
@@ -1314,7 +1411,7 @@ xpc_process_channel_activity(struct xpc_partition *part)
  * at the same time.
  */
 void
-xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason)
+xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason)
 {
        unsigned long irq_flags;
        int ch_number;
@@ -1330,12 +1427,11 @@ xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason)
        }
 
 
-       /* disconnect all channels associated with the downed partition */
+       /* disconnect channels associated with the partition going down */
 
        for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
                ch = &part->channels[ch_number];
 
-
                xpc_msgqueue_ref(ch);
                spin_lock_irqsave(&ch->lock, irq_flags);
 
@@ -1370,6 +1466,7 @@ xpc_teardown_infrastructure(struct xpc_partition *part)
         * this partition.
         */
 
+       DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
        DBUG_ON(atomic_read(&part->nchannels_active) != 0);
        DBUG_ON(part->setup_state != XPC_P_SETUP);
        part->setup_state = XPC_P_WTEARDOWN;
@@ -1428,19 +1525,11 @@ xpc_initiate_connect(int ch_number)
                if (xpc_part_ref(part)) {
                        ch = &part->channels[ch_number];
 
-                       if (!(ch->flags & XPC_C_DISCONNECTING)) {
-                               DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
-                               DBUG_ON(ch->flags & XPC_C_CONNECTED);
-                               DBUG_ON(ch->flags & XPC_C_SETUP);
-
-                               /*
-                                * Initiate the establishment of a connection
-                                * on the newly registered channel to the
-                                * remote partition.
-                                */
-                               xpc_wakeup_channel_mgr(part);
-                       }
-
+                       /*
+                        * Initiate the establishment of a connection on the
+                        * newly registered channel to the remote partition.
+                        */
+                       xpc_wakeup_channel_mgr(part);
                        xpc_part_deref(part);
                }
        }
@@ -1450,9 +1539,6 @@ xpc_initiate_connect(int ch_number)
 void
 xpc_connected_callout(struct xpc_channel *ch)
 {
-       unsigned long irq_flags;
-
-
        /* let the registerer know that a connection has been established */
 
        if (ch->func != NULL) {
@@ -1465,10 +1551,6 @@ xpc_connected_callout(struct xpc_channel *ch)
                dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, "
                        "partid=%d, channel=%d\n", ch->partid, ch->number);
        }
-
-       spin_lock_irqsave(&ch->lock, irq_flags);
-       ch->flags |= XPC_C_CONNECTCALLOUT;
-       spin_unlock_irqrestore(&ch->lock, irq_flags);
 }
 
 
@@ -1506,8 +1588,12 @@ xpc_initiate_disconnect(int ch_number)
 
                        spin_lock_irqsave(&ch->lock, irq_flags);
 
-                       XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
+                       if (!(ch->flags & XPC_C_DISCONNECTED)) {
+                               ch->flags |= XPC_C_WDISCONNECT;
+
+                               XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
                                                                &irq_flags);
+                       }
 
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
 
@@ -1523,8 +1609,9 @@ xpc_initiate_disconnect(int ch_number)
 /*
  * To disconnect a channel, and reflect it back to all who may be waiting.
  *
- * >>> An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
- * >>> xpc_free_msgqueues().
+ * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
+ * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
+ * xpc_disconnect_wait().
  *
  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
  */
@@ -1532,7 +1619,7 @@ void
 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
                        enum xpc_retval reason, unsigned long *irq_flags)
 {
-       u32 flags;
+       u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
 
 
        DBUG_ON(!spin_is_locked(&ch->lock));
@@ -1547,61 +1634,51 @@ xpc_disconnect_channel(const int line, struct xpc_channel *ch,
 
        XPC_SET_REASON(ch, reason, line);
 
-       flags = ch->flags;
+       ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
        /* some of these may not have been set */
        ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
                        XPC_C_CONNECTING | XPC_C_CONNECTED);
 
-       ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
        xpc_IPI_send_closerequest(ch, irq_flags);
 
-       if (flags & XPC_C_CONNECTED) {
+       if (channel_was_connected) {
                ch->flags |= XPC_C_WASCONNECTED;
        }
 
+       spin_unlock_irqrestore(&ch->lock, *irq_flags);
+
+       /* wake all idle kthreads so they can exit */
        if (atomic_read(&ch->kthreads_idle) > 0) {
-               /* wake all idle kthreads so they can exit */
                wake_up_all(&ch->idle_wq);
        }
 
-       spin_unlock_irqrestore(&ch->lock, *irq_flags);
-
-
        /* wake those waiting to allocate an entry from the local msg queue */
-
        if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
                wake_up(&ch->msg_allocate_wq);
        }
 
-       /* wake those waiting for notify completion */
-
-       if (atomic_read(&ch->n_to_notify) > 0) {
-               xpc_notify_senders(ch, reason, ch->w_local_GP.put);
-       }
-
        spin_lock_irqsave(&ch->lock, *irq_flags);
 }
 
 
 void
-xpc_disconnected_callout(struct xpc_channel *ch)
+xpc_disconnect_callout(struct xpc_channel *ch, enum xpc_retval reason)
 {
        /*
-        * Let the channel's registerer know that the channel is now
+        * Let the channel's registerer know that the channel is being
         * disconnected. We don't want to do this if the registerer was never
-        * informed of a connection being made, unless the disconnect was for
-        * abnormal reasons.
+        * informed of a connection being made.
         */
 
        if (ch->func != NULL) {
                dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
-                       "channel=%d\n", ch->reason, ch->partid, ch->number);
+                       "channel=%d\n", reason, ch->partid, ch->number);
 
-               ch->func(ch->reason, ch->partid, ch->number, NULL, ch->key);
+               ch->func(reason, ch->partid, ch->number, NULL, ch->key);
 
                dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
-                       "channel=%d\n", ch->reason, ch->partid, ch->number);
+                       "channel=%d\n", reason, ch->partid, ch->number);
        }
 }
 
@@ -1754,7 +1831,7 @@ xpc_initiate_allocate(partid_t partid, int ch_number, u32 flags, void **payload)
 {
        struct xpc_partition *part = &xpc_partitions[partid];
        enum xpc_retval ret = xpcUnknownReason;
-       struct xpc_msg *msg;
+       struct xpc_msg *msg = NULL;
 
 
        DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
@@ -1848,7 +1925,7 @@ xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
                        xpc_notify_func func, void *key)
 {
        enum xpc_retval ret = xpcSuccess;
-       struct xpc_notify *notify = NULL;   // >>> to keep the compiler happy!!
+       struct xpc_notify *notify = notify;
        s64 put, msg_number = msg->number;
 
 
@@ -1875,7 +1952,7 @@ xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
                notify = &ch->notify_queue[msg_number % ch->local_nentries];
                notify->func = func;
                notify->key = key;
-               (volatile u8) notify->type = notify_type;
+               notify->type = notify_type;
 
                // >>> is a mb() needed here?
 
@@ -2024,7 +2101,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
        enum xpc_retval ret;
 
 
-       if (down_interruptible(&ch->msg_to_pull_sema) != 0) {
+       if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
                /* we were interrupted by a signal */
                return NULL;
        }
@@ -2060,7 +2137,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
 
                        XPC_DEACTIVATE_PARTITION(part, ret);
 
-                       up(&ch->msg_to_pull_sema);
+                       mutex_unlock(&ch->msg_to_pull_mutex);
                        return NULL;
                }
 
@@ -2069,7 +2146,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
                ch->next_msg_to_pull += nmsgs;
        }
 
-       up(&ch->msg_to_pull_sema);
+       mutex_unlock(&ch->msg_to_pull_mutex);
 
        /* return the message we were looking for */
        msg_offset = (get % ch->remote_nentries) * ch->msg_size;