linux 2.6.16.38 w/ vs2.0.3-rc1
[linux-2.6.git] / net / tipc / port.c
index b9c8c6b..72aae52 100644 (file)
 
 #define MAX_REJECT_SIZE 1024
 
-static struct sk_buff *msg_queue_head = NULL;
-static struct sk_buff *msg_queue_tail = NULL;
+static struct sk_buff *msg_queue_head = 0;
+static struct sk_buff *msg_queue_tail = 0;
 
-DEFINE_SPINLOCK(tipc_port_list_lock);
-static DEFINE_SPINLOCK(queue_lock);
+spinlock_t tipc_port_list_lock = SPIN_LOCK_UNLOCKED;
+static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
 
 static LIST_HEAD(ports);
 static void port_handle_node_down(unsigned long ref);
@@ -67,22 +67,27 @@ static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
 static void port_timeout(unsigned long ref);
 
 
-static u32 port_peernode(struct port *p_ptr)
+static inline u32 port_peernode(struct port *p_ptr)
 {
        return msg_destnode(&p_ptr->publ.phdr);
 }
 
-static u32 port_peerport(struct port *p_ptr)
+static inline u32 port_peerport(struct port *p_ptr)
 {
        return msg_destport(&p_ptr->publ.phdr);
 }
 
-static u32 port_out_seqno(struct port *p_ptr)
+static inline u32 port_out_seqno(struct port *p_ptr)
 {
        return msg_transp_seqno(&p_ptr->publ.phdr);
 }
 
-static void port_incr_out_seqno(struct port *p_ptr)
+static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno) 
+{
+       msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
+}
+
+static inline void port_incr_out_seqno(struct port *p_ptr)
 {
        struct tipc_msg *m = &p_ptr->publ.phdr;
 
@@ -168,6 +173,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
        struct port_list *item = dp;
        int cnt = 0;
 
+       assert(buf);
        msg = buf_msg(buf);
 
        /* Create destination port list, if one wasn't supplied */
@@ -195,7 +201,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 
                        if (b == NULL) {
-                               warn("Unable to deliver multicast message(s)\n");
+                               warn("Buffer allocation failure\n");
                                msg_dbg(msg, "LOST:");
                                goto exit;
                        }
@@ -226,14 +232,15 @@ u32 tipc_createport_raw(void *usr_handle,
        struct tipc_msg *msg;
        u32 ref;
 
-       p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
-       if (!p_ptr) {
-               warn("Port creation failed, no memory\n");
+       p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
+       if (p_ptr == NULL) {
+               warn("Memory squeeze; failed to create port\n");
                return 0;
        }
+       memset(p_ptr, 0, sizeof(*p_ptr));
        ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
        if (!ref) {
-               warn("Port creation failed, reference table exhausted\n");
+               warn("Reference Table Exhausted\n");
                kfree(p_ptr);
                return 0;
        }
@@ -251,11 +258,11 @@ u32 tipc_createport_raw(void *usr_handle,
        p_ptr->publ.usr_handle = usr_handle;
        INIT_LIST_HEAD(&p_ptr->wait_list);
        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
-       p_ptr->congested_link = NULL;
+       p_ptr->congested_link = 0;
        p_ptr->max_pkt = MAX_PKT_DEFAULT;
        p_ptr->dispatcher = dispatcher;
        p_ptr->wakeup = wakeup;
-       p_ptr->user_port = NULL;
+       p_ptr->user_port = 0;
        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
        spin_lock_bh(&tipc_port_list_lock);
        INIT_LIST_HEAD(&p_ptr->publications);
@@ -269,9 +276,9 @@ u32 tipc_createport_raw(void *usr_handle,
 int tipc_deleteport(u32 ref)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = NULL;
+       struct sk_buff *buf = 0;
 
-       tipc_withdraw(ref, 0, NULL);
+       tipc_withdraw(ref, 0, 0);
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr) 
                return -EINVAL;
@@ -322,13 +329,13 @@ void *tipc_get_handle(const u32 ref)
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
-               return NULL;
+               return 0;
        handle = p_ptr->publ.usr_handle;
        tipc_port_unlock(p_ptr);
        return handle;
 }
 
-static int port_unreliable(struct port *p_ptr)
+static inline int port_unreliable(struct port *p_ptr)
 {
        return msg_src_droppable(&p_ptr->publ.phdr);
 }
@@ -357,7 +364,7 @@ int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
        return TIPC_OK;
 }
 
-static int port_unreturnable(struct port *p_ptr)
+static inline int port_unreturnable(struct port *p_ptr)
 {
        return msg_dest_droppable(&p_ptr->publ.phdr);
 }
@@ -468,7 +475,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
 
        /* send self-abort message when rejecting on a connected port */
        if (msg_connected(msg)) {
-               struct sk_buff *abuf = NULL;
+               struct sk_buff *abuf = 0;
                struct port *p_ptr = tipc_port_lock(msg_destport(msg));
 
                if (p_ptr) {
@@ -503,7 +510,7 @@ int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
 static void port_timeout(unsigned long ref)
 {
        struct port *p_ptr = tipc_port_lock(ref);
-       struct sk_buff *buf = NULL;
+       struct sk_buff *buf = 0;
 
        if (!p_ptr || !p_ptr->publ.connected)
                return;
@@ -533,7 +540,7 @@ static void port_timeout(unsigned long ref)
 static void port_handle_node_down(unsigned long ref)
 {
        struct port *p_ptr = tipc_port_lock(ref);
-       struct sk_buff* buf = NULL;
+       struct sk_buff* buf = 0;
 
        if (!p_ptr)
                return;
@@ -548,7 +555,7 @@ static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
        u32 imp = msg_importance(&p_ptr->publ.phdr);
 
        if (!p_ptr->publ.connected)
-               return NULL;
+               return 0;
        if (imp < TIPC_CRITICAL_IMPORTANCE)
                imp++;
        return port_build_proto_msg(p_ptr->publ.ref,
@@ -568,7 +575,7 @@ static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
        u32 imp = msg_importance(&p_ptr->publ.phdr);
 
        if (!p_ptr->publ.connected)
-               return NULL;
+               return 0;
        if (imp < TIPC_CRITICAL_IMPORTANCE)
                imp++;
        return port_build_proto_msg(port_peerport(p_ptr),
@@ -587,8 +594,8 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
        struct tipc_msg *msg = buf_msg(buf);
        struct port *p_ptr = tipc_port_lock(msg_destport(msg));
        u32 err = TIPC_OK;
-       struct sk_buff *r_buf = NULL;
-       struct sk_buff *abort_buf = NULL;
+       struct sk_buff *r_buf = 0;
+       struct sk_buff *abort_buf = 0;
 
        msg_dbg(msg, "PORT<RECV<:");
 
@@ -797,7 +804,7 @@ static void port_dispatcher_sigh(void *dummy)
 
        spin_lock_bh(&queue_lock);
        buf = msg_queue_head;
-       msg_queue_head = NULL;
+       msg_queue_head = 0;
        spin_unlock_bh(&queue_lock);
 
        while (buf) {
@@ -808,20 +815,18 @@ static void port_dispatcher_sigh(void *dummy)
                void *usr_handle;
                int connected;
                int published;
-               u32 message_type;
 
                struct sk_buff *next = buf->next;
                struct tipc_msg *msg = buf_msg(buf);
                u32 dref = msg_destport(msg);
                
-               message_type = msg_type(msg);
-               if (message_type > TIPC_DIRECT_MSG)
-                       goto reject;    /* Unsupported message type */
-
                p_ptr = tipc_port_lock(dref);
-               if (!p_ptr)
-                       goto reject;    /* Port deleted while msg in queue */
-
+               if (!p_ptr) {
+                       /* Port deleted while msg in queue */
+                       tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+                       buf = next;
+                       continue;
+               }
                orig.ref = msg_origport(msg);
                orig.node = msg_orignode(msg);
                up_ptr = p_ptr->user_port;
@@ -832,7 +837,7 @@ static void port_dispatcher_sigh(void *dummy)
                if (unlikely(msg_errcode(msg)))
                        goto err;
 
-               switch (message_type) {
+               switch (msg_type(msg)) {
                
                case TIPC_CONN_MSG:{
                                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
@@ -874,7 +879,6 @@ static void port_dispatcher_sigh(void *dummy)
                                   &orig);
                                break;
                        }
-               case TIPC_MCAST_MSG:
                case TIPC_NAMED_MSG:{
                                tipc_named_msg_event cb = up_ptr->named_msg_cb;
 
@@ -887,8 +891,7 @@ static void port_dispatcher_sigh(void *dummy)
                                        goto reject;
                                dseq.type =  msg_nametype(msg);
                                dseq.lower = msg_nameinst(msg);
-                               dseq.upper = (message_type == TIPC_NAMED_MSG)
-                                       ? dseq.lower : msg_nameupper(msg);
+                               dseq.upper = dseq.lower;
                                skb_pull(buf, msg_hdr_sz(msg));
                                cb(usr_handle, dref, &buf, msg_data(msg), 
                                   msg_data_sz(msg), msg_importance(msg),
@@ -901,7 +904,7 @@ static void port_dispatcher_sigh(void *dummy)
                buf = next;
                continue;
 err:
-               switch (message_type) {
+               switch (msg_type(msg)) {
                
                case TIPC_CONN_MSG:{
                                tipc_conn_shutdown_event cb = 
@@ -933,7 +936,6 @@ err:
                                   msg_data_sz(msg), msg_errcode(msg), &orig);
                                break;
                        }
-               case TIPC_MCAST_MSG:
                case TIPC_NAMED_MSG:{
                                tipc_named_msg_err_event cb = 
                                        up_ptr->named_err_cb;
@@ -943,8 +945,7 @@ err:
                                        break;
                                dseq.type =  msg_nametype(msg);
                                dseq.lower = msg_nameinst(msg);
-                               dseq.upper = (message_type == TIPC_NAMED_MSG)
-                                       ? dseq.lower : msg_nameupper(msg);
+                               dseq.upper = dseq.lower;
                                skb_pull(buf, msg_hdr_sz(msg));
                                cb(usr_handle, dref, &buf, msg_data(msg), 
                                   msg_data_sz(msg), msg_errcode(msg), &dseq);
@@ -990,8 +991,8 @@ static void port_wakeup_sh(unsigned long ref)
 {
        struct port *p_ptr;
        struct user_port *up_ptr;
-       tipc_continue_event cb = NULL;
-       void *uh = NULL;
+       tipc_continue_event cb = 0;
+       void *uh = 0;
 
        p_ptr = tipc_port_lock(ref);
        if (p_ptr) {
@@ -1015,7 +1016,7 @@ static void port_wakeup(struct tipc_port *p_ptr)
 void tipc_acknowledge(u32 ref, u32 ack)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = NULL;
+       struct sk_buff *buf = 0;
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
@@ -1057,12 +1058,11 @@ int tipc_createport(u32 user_ref,
        struct port *p_ptr; 
        u32 ref;
 
-       up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
-       if (!up_ptr) {
-               warn("Port creation failed, no memory\n");
+       up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
+       if (up_ptr == NULL) {
                return -ENOMEM;
        }
-       ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance);
+       ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr) {
                kfree(up_ptr);
@@ -1170,6 +1170,8 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)
                return -EINVAL;
+       if (!p_ptr->publ.published)
+               goto exit;
        if (!seq) {
                list_for_each_entry_safe(publ, tpubl, 
                                         &p_ptr->publications, pport_list) {
@@ -1196,6 +1198,7 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
        }
        if (list_empty(&p_ptr->publications))
                p_ptr->publ.published = 0;
+exit:
        tipc_port_unlock(p_ptr);
        return res;
 }
@@ -1270,7 +1273,7 @@ int tipc_disconnect(u32 ref)
 int tipc_shutdown(u32 ref)
 {
        struct port *p_ptr;
-       struct sk_buff *buf = NULL;
+       struct sk_buff *buf = 0;
 
        p_ptr = tipc_port_lock(ref);
        if (!p_ptr)