fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / net / tipc / subscr.c
index 5ff38b9..ddade73 100644 (file)
@@ -86,7 +86,7 @@ static struct top_srv topsrv = { 0 };
  * Returns converted value
  */
 
-static inline u32 htohl(u32 in, int swap)
+static u32 htohl(u32 in, int swap)
 {
        char *c = (char *)∈
 
@@ -155,7 +155,7 @@ void tipc_subscr_report_overlap(struct subscription *sub,
            sub->seq.upper, found_lower, found_upper);
        if (!tipc_subscr_overlap(sub, found_lower, found_upper))
                return;
-       if (!must && (sub->filter != TIPC_SUB_PORTS))
+       if (!must && !(sub->filter & TIPC_SUB_PORTS))
                return;
        subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
 }
@@ -176,6 +176,13 @@ static void subscr_timeout(struct subscription *sub)
        if (subscriber == NULL)
                return;
 
+       /* Validate timeout (in case subscription is being cancelled) */
+
+       if (sub->timeout == TIPC_WAIT_FOREVER) {
+               tipc_ref_unlock(subscriber_ref);
+               return;
+       }
+
        /* Unlink subscription from name table */
 
        tipc_nametbl_unsubscribe(sub);
@@ -198,6 +205,20 @@ static void subscr_timeout(struct subscription *sub)
        atomic_dec(&topsrv.subscription_count);
 }
 
+/**
+ * subscr_del - delete a subscription within a subscription list
+ *
+ * Called with subscriber locked.
+ */
+
+static void subscr_del(struct subscription *sub)
+{
+       tipc_nametbl_unsubscribe(sub);
+       list_del(&sub->subscription_list);
+       kfree(sub);
+       atomic_dec(&topsrv.subscription_count);
+}
+
 /**
  * subscr_terminate - terminate communication with a subscriber
  * 
@@ -227,12 +248,9 @@ static void subscr_terminate(struct subscriber *subscriber)
                        k_cancel_timer(&sub->timer);
                        k_term_timer(&sub->timer);
                }
-               tipc_nametbl_unsubscribe(sub);
-               list_del(&sub->subscription_list);
-               dbg("Term: Removed sub %u,%u,%u from subscriber %x list\n",
+               dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n",
                    sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
-               kfree(sub);
-               atomic_dec(&topsrv.subscription_count);
+               subscr_del(sub);
        }
 
        /* Sever connection to subscriber */
@@ -252,6 +270,49 @@ static void subscr_terminate(struct subscriber *subscriber)
        kfree(subscriber);
 }
 
+/**
+ * subscr_cancel - handle subscription cancellation request
+ *
+ * Called with subscriber locked.  Routine must temporarily release this lock
+ * to enable the subscription timeout routine to finish without deadlocking;
+ * the lock is then reclaimed to allow caller to release it upon return.
+ *
+ * Note that fields of 's' use subscriber's endianness!
+ */
+
+static void subscr_cancel(struct tipc_subscr *s,
+                         struct subscriber *subscriber)
+{
+       struct subscription *sub;
+       struct subscription *sub_temp;
+       int found = 0;
+
+       /* Find first matching subscription, exit if not found */
+
+       list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
+                                subscription_list) {
+               if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
+                       found = 1;
+                       break;
+               }
+       }
+       if (!found)
+               return;
+
+       /* Cancel subscription timer (if used), then delete subscription */
+
+       if (sub->timeout != TIPC_WAIT_FOREVER) {
+               sub->timeout = TIPC_WAIT_FOREVER;
+               spin_unlock_bh(subscriber->lock);
+               k_cancel_timer(&sub->timer);
+               k_term_timer(&sub->timer);
+               spin_lock_bh(subscriber->lock);
+       }
+       dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n",
+           sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
+       subscr_del(sub);
+}
+
 /**
  * subscr_subscribe - create subscription for subscriber
  * 
@@ -263,43 +324,50 @@ static void subscr_subscribe(struct tipc_subscr *s,
 {
        struct subscription *sub;
 
+       /* Determine/update subscriber's endianness */
+
+       if (s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE))
+               subscriber->swap = 0;
+       else
+               subscriber->swap = 1;
+
+       /* Detect & process a subscription cancellation request */
+
+       if (s->filter & htohl(TIPC_SUB_CANCEL, subscriber->swap)) {
+               s->filter &= ~htohl(TIPC_SUB_CANCEL, subscriber->swap);
+               subscr_cancel(s, subscriber);
+               return;
+       }
+
        /* Refuse subscription if global limit exceeded */
 
        if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
-               warn("Failed: max %u subscriptions\n", tipc_max_subscriptions);
+               warn("Subscription rejected, subscription limit reached (%u)\n",
+                    tipc_max_subscriptions);
                subscr_terminate(subscriber);
                return;
        }
 
        /* Allocate subscription object */
 
-       sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
-       if (sub == NULL) {
-               warn("Memory squeeze; ignoring subscription\n");
+       sub = kzalloc(sizeof(*sub), GFP_ATOMIC);
+       if (!sub) {
+               warn("Subscription rejected, no memory\n");
                subscr_terminate(subscriber);
                return;
        }
 
-       /* Determine/update subscriber's endianness */
-
-       if ((s->filter == TIPC_SUB_PORTS) || (s->filter == TIPC_SUB_SERVICE))
-               subscriber->swap = 0;
-       else
-               subscriber->swap = 1;
-
        /* Initialize subscription object */
 
-       memset(sub, 0, sizeof(*sub));
        sub->seq.type = htohl(s->seq.type, subscriber->swap);
        sub->seq.lower = htohl(s->seq.lower, subscriber->swap);
        sub->seq.upper = htohl(s->seq.upper, subscriber->swap);
        sub->timeout = htohl(s->timeout, subscriber->swap);
        sub->filter = htohl(s->filter, subscriber->swap);
-       if ((((sub->filter != TIPC_SUB_PORTS) 
-             && (sub->filter != TIPC_SUB_SERVICE)))
+       if ((!(sub->filter & TIPC_SUB_PORTS)
+            == !(sub->filter & TIPC_SUB_SERVICE))
            || (sub->seq.lower > sub->seq.upper)) {
-               warn("Rejecting illegal subscription %u,%u,%u\n",
-                    sub->seq.type, sub->seq.lower, sub->seq.upper);
+               warn("Subscription rejected, illegal request\n");
                kfree(sub);
                subscr_terminate(subscriber);
                return;
@@ -381,29 +449,28 @@ static void subscr_named_msg_event(void *usr_handle,
                                   struct tipc_name_seq const *dest)
 {
        struct subscriber *subscriber;
-       struct iovec msg_sect = {0, 0};
+       struct iovec msg_sect = {NULL, 0};
        spinlock_t *subscriber_lock;
 
        dbg("subscr_named_msg_event: orig = %x own = %x,\n",
            orig->node, tipc_own_addr);
        if (size && (size != sizeof(struct tipc_subscr))) {
-               warn("Received tipc_subscr of invalid size\n");
+               warn("Subscriber rejected, invalid subscription size\n");
                return;
        }
 
        /* Create subscriber object */
 
-       subscriber = kmalloc(sizeof(struct subscriber), GFP_ATOMIC);
+       subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC);
        if (subscriber == NULL) {
-               warn("Memory squeeze; ignoring subscriber setup\n");
+               warn("Subscriber rejected, no memory\n");
                return;
        }
-       memset(subscriber, 0, sizeof(struct subscriber));
        INIT_LIST_HEAD(&subscriber->subscription_list);
        INIT_LIST_HEAD(&subscriber->subscriber_list);
        subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
        if (subscriber->ref == 0) {
-               warn("Failed to acquire subscriber reference\n");
+               warn("Subscriber rejected, reference table exhausted\n");
                kfree(subscriber);
                return;
        }
@@ -413,16 +480,16 @@ static void subscr_named_msg_event(void *usr_handle,
        tipc_createport(topsrv.user_ref,
                        (void *)(unsigned long)subscriber->ref,
                        importance,
-                       0,
-                       0,
+                       NULL,
+                       NULL,
                        subscr_conn_shutdown_event,
-                       0,
-                       0,
+                       NULL,
+                       NULL,
                        subscr_conn_msg_event,
-                       0,
+                       NULL,
                        &subscriber->port_ref);
        if (subscriber->port_ref == 0) {
-               warn("Memory squeeze; failed to create subscription port\n");
+               warn("Subscriber rejected, unable to create port\n");
                tipc_ref_discard(subscriber->ref);
                kfree(subscriber);
                return;
@@ -457,26 +524,26 @@ int tipc_subscr_start(void)
        int res = -1;
 
        memset(&topsrv, 0, sizeof (topsrv));
-       topsrv.lock = SPIN_LOCK_UNLOCKED;
+       spin_lock_init(&topsrv.lock);
        INIT_LIST_HEAD(&topsrv.subscriber_list);
 
        spin_lock_bh(&topsrv.lock);
-       res = tipc_attach(&topsrv.user_ref, 0, 0);
+       res = tipc_attach(&topsrv.user_ref, NULL, NULL);
        if (res) {
                spin_unlock_bh(&topsrv.lock);
                return res;
        }
 
        res = tipc_createport(topsrv.user_ref,
-                             0,
+                             NULL,
                              TIPC_CRITICAL_IMPORTANCE,
-                             0,
-                             0,
-                             0,
-                             0,
+                             NULL,
+                             NULL,
+                             NULL,
+                             NULL,
                              subscr_named_msg_event,
-                             0,
-                             0,
+                             NULL,
+                             NULL,
                              &topsrv.setup_port);
        if (res)
                goto failed;