fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / net / tipc / port.c
index 72aae52..b7f3199 100644 (file)
 
 #define MAX_REJECT_SIZE 1024
 
-static struct sk_buff *msg_queue_head = 0;
-static struct sk_buff *msg_queue_tail = 0;
+static struct sk_buff *msg_queue_head = NULL;
+static struct sk_buff *msg_queue_tail = NULL;
 
-spinlock_t tipc_port_list_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
+DEFINE_SPINLOCK(tipc_port_list_lock);
+static DEFINE_SPINLOCK(queue_lock);
 
 static LIST_HEAD(ports);
 static void port_handle_node_down(unsigned long ref);
@@ -67,27 +67,22 @@ static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
 static void port_timeout(unsigned long ref);
 
 
-static inline u32 port_peernode(struct port *p_ptr)
+static u32 port_peernode(struct port *p_ptr)
 {
        return msg_destnode(&p_ptr->publ.phdr);
 }
 
-static inline u32 port_peerport(struct port *p_ptr)
+static u32 port_peerport(struct port *p_ptr)
 {
        return msg_destport(&p_ptr->publ.phdr);
 }
 
-static inline u32 port_out_seqno(struct port *p_ptr)
+static u32 port_out_seqno(struct port *p_ptr)
 {
        return msg_transp_seqno(&p_ptr->publ.phdr);
 }
 
-static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno) 
-{
-       msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
-}
-
-static inline void port_incr_out_seqno(struct port *p_ptr)
+static void port_incr_out_seqno(struct port *p_ptr)
 {
        struct tipc_msg *m = &p_ptr->publ.phdr;
 
@@ -173,7 +168,6 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
        struct port_list *item = dp;
        int cnt = 0;
 
-       assert(buf);
        msg = buf_msg(buf);
 
        /* Create destination port list, if one wasn't supplied */
@@ -201,7 +195,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 
                        if (b == NULL) {
-                               warn("Buffer allocation failure\n");
+                               warn("Unable to deliver multicast message(s)\n");
                                msg_dbg(msg, "LOST:");
                                goto exit;
                        }
@@ -232,15 +226,14 @@ u32 tipc_createport_raw(void *usr_handle,
        struct tipc_msg *msg;
        u32 ref;
 
-       p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
-       if (p_ptr == NULL) {
-               warn("Memory squeeze; failed to create port\n");
+       p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
+       if (!p_ptr) {
+               warn("Port creation failed, no memory\n");
                return 0;
        }
-       memset(p_ptr, 0, sizeof(*p_ptr));
        ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
        if (!ref) {
-               warn("Reference Table Exhausted\n");
+               warn("Port creation failed, reference table exhausted\n");
                kfree(p_ptr);
                return 0;
        }
@@ -258,11 +251,11 @@ u32 tipc_createport_raw(void *usr_handle,
        p_ptr->publ.usr_handle = usr_handle;
        INIT_LIST_HEAD(&p_ptr->wait_list);
        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
-       p_ptr->congested_link = 0;
+       p_ptr->congested_link = NULL;
        p_ptr->max_pkt = MAX_PKT_DEFAULT;
        p_ptr->dispatcher = dispatcher;
        p_ptr->wakeup = wakeup;
-       p_ptr->user_port = 0;
+       p_ptr->user_port = NULL;
        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
        spin_lock_bh(&tipc_port_list_lock);
        INIT_LIST_HEAD(&p_ptr->publications);
@@ -276,9 +269,9 @@ u32 tipc_createport_raw(void *usr_handle,
 int tipc_deleteport(u32 ref)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = 0;
+       struct sk_buff *buf = NULL;
 
-       tipc_withdraw(ref, 0, 0);
+       tipc_withdraw(ref, 0, NULL);
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr) 
                return -EINVAL;
@@ -329,13 +322,13 @@ void *tipc_get_handle(const u32 ref)
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
-               return 0;
+               return NULL;
        handle = p_ptr->publ.usr_handle;
        tipc_port_unlock(p_ptr);
        return handle;
 }
 
-static inline int port_unreliable(struct port *p_ptr)
+static int port_unreliable(struct port *p_ptr)
 {
        return msg_src_droppable(&p_ptr->publ.phdr);
 }
@@ -364,7 +357,7 @@ int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
        return TIPC_OK;
 }
 
-static inline int port_unreturnable(struct port *p_ptr)
+static int port_unreturnable(struct port *p_ptr)
 {
        return msg_dest_droppable(&p_ptr->publ.phdr);
 }
@@ -475,7 +468,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
 
        /* send self-abort message when rejecting on a connected port */
        if (msg_connected(msg)) {
-               struct sk_buff *abuf = 0;
+               struct sk_buff *abuf = NULL;
                struct port *p_ptr = tipc_port_lock(msg_destport(msg));
 
                if (p_ptr) {
@@ -510,10 +503,15 @@ int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
 static void port_timeout(unsigned long ref)
 {
        struct port *p_ptr = tipc_port_lock(ref);
-       struct sk_buff *buf = 0;
+       struct sk_buff *buf = NULL;
 
-       if (!p_ptr || !p_ptr->publ.connected)
+       if (!p_ptr)
+               return;
+
+       if (!p_ptr->publ.connected) {
+               tipc_port_unlock(p_ptr);
                return;
+       }
 
        /* Last probe answered ? */
        if (p_ptr->probing_state == PROBING) {
@@ -540,7 +538,7 @@ static void port_timeout(unsigned long ref)
 static void port_handle_node_down(unsigned long ref)
 {
        struct port *p_ptr = tipc_port_lock(ref);
-       struct sk_buff* buf = 0;
+       struct sk_buff* buf = NULL;
 
        if (!p_ptr)
                return;
@@ -555,7 +553,7 @@ static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
        u32 imp = msg_importance(&p_ptr->publ.phdr);
 
        if (!p_ptr->publ.connected)
-               return 0;
+               return NULL;
        if (imp < TIPC_CRITICAL_IMPORTANCE)
                imp++;
        return port_build_proto_msg(p_ptr->publ.ref,
@@ -575,7 +573,7 @@ static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
        u32 imp = msg_importance(&p_ptr->publ.phdr);
 
        if (!p_ptr->publ.connected)
-               return 0;
+               return NULL;
        if (imp < TIPC_CRITICAL_IMPORTANCE)
                imp++;
        return port_build_proto_msg(port_peerport(p_ptr),
@@ -594,8 +592,8 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
        struct tipc_msg *msg = buf_msg(buf);
        struct port *p_ptr = tipc_port_lock(msg_destport(msg));
        u32 err = TIPC_OK;
-       struct sk_buff *r_buf = 0;
-       struct sk_buff *abort_buf = 0;
+       struct sk_buff *r_buf = NULL;
+       struct sk_buff *abort_buf = NULL;
 
        msg_dbg(msg, "PORT<RECV<:");
 
@@ -804,7 +802,7 @@ static void port_dispatcher_sigh(void *dummy)
 
        spin_lock_bh(&queue_lock);
        buf = msg_queue_head;
-       msg_queue_head = 0;
+       msg_queue_head = NULL;
        spin_unlock_bh(&queue_lock);
 
        while (buf) {
@@ -815,18 +813,20 @@ static void port_dispatcher_sigh(void *dummy)
                void *usr_handle;
                int connected;
                int published;
+               u32 message_type;
 
                struct sk_buff *next = buf->next;
                struct tipc_msg *msg = buf_msg(buf);
                u32 dref = msg_destport(msg);
                
+               message_type = msg_type(msg);
+               if (message_type > TIPC_DIRECT_MSG)
+                       goto reject;    /* Unsupported message type */
+
                p_ptr = tipc_port_lock(dref);
-               if (!p_ptr) {
-                       /* Port deleted while msg in queue */
-                       tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
-                       buf = next;
-                       continue;
-               }
+               if (!p_ptr)
+                       goto reject;    /* Port deleted while msg in queue */
+
                orig.ref = msg_origport(msg);
                orig.node = msg_orignode(msg);
                up_ptr = p_ptr->user_port;
@@ -837,7 +837,7 @@ static void port_dispatcher_sigh(void *dummy)
                if (unlikely(msg_errcode(msg)))
                        goto err;
 
-               switch (msg_type(msg)) {
+               switch (message_type) {
                
                case TIPC_CONN_MSG:{
                                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
@@ -879,6 +879,7 @@ static void port_dispatcher_sigh(void *dummy)
                                   &orig);
                                break;
                        }
+               case TIPC_MCAST_MSG:
                case TIPC_NAMED_MSG:{
                                tipc_named_msg_event cb = up_ptr->named_msg_cb;
 
@@ -891,7 +892,8 @@ static void port_dispatcher_sigh(void *dummy)
                                        goto reject;
                                dseq.type =  msg_nametype(msg);
                                dseq.lower = msg_nameinst(msg);
-                               dseq.upper = dseq.lower;
+                               dseq.upper = (message_type == TIPC_NAMED_MSG)
+                                       ? dseq.lower : msg_nameupper(msg);
                                skb_pull(buf, msg_hdr_sz(msg));
                                cb(usr_handle, dref, &buf, msg_data(msg), 
                                   msg_data_sz(msg), msg_importance(msg),
@@ -904,7 +906,7 @@ static void port_dispatcher_sigh(void *dummy)
                buf = next;
                continue;
 err:
-               switch (msg_type(msg)) {
+               switch (message_type) {
                
                case TIPC_CONN_MSG:{
                                tipc_conn_shutdown_event cb = 
@@ -936,6 +938,7 @@ err:
                                   msg_data_sz(msg), msg_errcode(msg), &orig);
                                break;
                        }
+               case TIPC_MCAST_MSG:
                case TIPC_NAMED_MSG:{
                                tipc_named_msg_err_event cb = 
                                        up_ptr->named_err_cb;
@@ -945,7 +948,8 @@ err:
                                        break;
                                dseq.type =  msg_nametype(msg);
                                dseq.lower = msg_nameinst(msg);
-                               dseq.upper = dseq.lower;
+                               dseq.upper = (message_type == TIPC_NAMED_MSG)
+                                       ? dseq.lower : msg_nameupper(msg);
                                skb_pull(buf, msg_hdr_sz(msg));
                                cb(usr_handle, dref, &buf, msg_data(msg), 
                                   msg_data_sz(msg), msg_errcode(msg), &dseq);
@@ -991,8 +995,8 @@ static void port_wakeup_sh(unsigned long ref)
 {
        struct port *p_ptr;
        struct user_port *up_ptr;
-       tipc_continue_event cb = 0;
-       void *uh = 0;
+       tipc_continue_event cb = NULL;
+       void *uh = NULL;
 
        p_ptr = tipc_port_lock(ref);
        if (p_ptr) {
@@ -1016,7 +1020,7 @@ static void port_wakeup(struct tipc_port *p_ptr)
 void tipc_acknowledge(u32 ref, u32 ack)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = 0;
+       struct sk_buff *buf = NULL;
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
@@ -1058,11 +1062,12 @@ int tipc_createport(u32 user_ref,
        struct port *p_ptr; 
        u32 ref;
 
-       up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
-       if (up_ptr == NULL) {
+       up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
+       if (!up_ptr) {
+               warn("Port creation failed, no memory\n");
                return -ENOMEM;
        }
-       ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
+       ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance);
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr) {
                kfree(up_ptr);
@@ -1131,11 +1136,12 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
        int res = -EINVAL;
 
        p_ptr = tipc_port_lock(ref);
+       if (!p_ptr)
+               return -EINVAL;
+
        dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
            "lower = %u, upper = %u\n",
            ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
-       if (!p_ptr)
-               return -EINVAL;
        if (p_ptr->publ.connected)
                goto exit;
        if (seq->lower > seq->upper)
@@ -1170,8 +1176,6 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
                return -EINVAL;
-       if (!p_ptr->publ.published)
-               goto exit;
        if (!seq) {
                list_for_each_entry_safe(publ, tpubl, 
                                         &p_ptr->publications, pport_list) {
@@ -1198,7 +1202,6 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
        }
        if (list_empty(&p_ptr->publications))
                p_ptr->publ.published = 0;
-exit:
        tipc_port_unlock(p_ptr);
        return res;
 }
@@ -1273,7 +1276,7 @@ int tipc_disconnect(u32 ref)
 int tipc_shutdown(u32 ref)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = 0;
+       struct sk_buff *buf = NULL;
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)