This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / fs / dlm / lowcomms-sctp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never (well, hardly ever) waits.
42  *
43  */
44
45 #include <asm/ioctls.h>
46 #include <net/sock.h>
47 #include <net/tcp.h>
48 #include <net/sctp/user.h>
49 #include <linux/pagemap.h>
50 #include <linux/socket.h>
51 #include <linux/idr.h>
52
53 #include "dlm_internal.h"
54 #include "lowcomms.h"
55 #include "config.h"
56 #include "midcomms.h"
57
58 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59 static int                      dlm_local_count;
60 static int                      dlm_local_nodeid;
61
62 /* One of these per connected node */
63
64 #define NI_INIT_PENDING 1
65 #define NI_WRITE_PENDING 2
66
67 struct nodeinfo {
68         spinlock_t              lock;
69         sctp_assoc_t            assoc_id;
70         unsigned long           flags;
71         struct list_head        write_list; /* nodes with pending writes */
72         struct list_head        writequeue; /* outgoing writequeue_entries */
73         spinlock_t              writequeue_lock;
74         int                     nodeid;
75         struct work_struct      swork; /* Send workqueue */
76         struct work_struct      lwork; /* Locking workqueue */
77 };
78
79 static DEFINE_IDR(nodeinfo_idr);
80 static DECLARE_RWSEM(nodeinfo_lock);
81 static int max_nodeid;
82
83 struct cbuf {
84         unsigned int base;
85         unsigned int len;
86         unsigned int mask;
87 };
88
89 /* Just the one of these, now. But this struct keeps
90    the connection-specific variables together */
91
92 #define CF_READ_PENDING 1
93
94 struct connection {
95         struct socket           *sock;
96         unsigned long           flags;
97         struct page             *rx_page;
98         atomic_t                waiting_requests;
99         struct cbuf             cb;
100         int                     eagain_flag;
101         struct work_struct      work; /* Send workqueue */
102 };
103
104 /* An entry waiting to be sent */
105
106 struct writequeue_entry {
107         struct list_head        list;
108         struct page             *page;
109         int                     offset;
110         int                     len;
111         int                     end;
112         int                     users;
113         struct nodeinfo         *ni;
114 };
115
116 static void cbuf_add(struct cbuf *cb, int n)
117 {
118         cb->len += n;
119 }
120
121 static int cbuf_data(struct cbuf *cb)
122 {
123         return ((cb->base + cb->len) & cb->mask);
124 }
125
126 static void cbuf_init(struct cbuf *cb, int size)
127 {
128         cb->base = cb->len = 0;
129         cb->mask = size-1;
130 }
131
132 static void cbuf_eat(struct cbuf *cb, int n)
133 {
134         cb->len  -= n;
135         cb->base += n;
136         cb->base &= cb->mask;
137 }
138
139 /* List of nodes which have writes pending */
140 static LIST_HEAD(write_nodes);
141 static DEFINE_SPINLOCK(write_nodes_lock);
142
143
144 /* Maximum number of incoming messages to process before
145  * doing a schedule()
146  */
147 #define MAX_RX_MSG_COUNT 25
148
149 /* Work queues */
150 static struct workqueue_struct *recv_workqueue;
151 static struct workqueue_struct *send_workqueue;
152 static struct workqueue_struct *lock_workqueue;
153
154 /* The SCTP connection */
155 static struct connection sctp_con;
156
157 static void process_send_sockets(struct work_struct *work);
158 static void process_recv_sockets(struct work_struct *work);
159 static void process_lock_request(struct work_struct *work);
160
161 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
162 {
163         struct sockaddr_storage addr;
164         int error;
165
166         if (!dlm_local_count)
167                 return -1;
168
169         error = dlm_nodeid_to_addr(nodeid, &addr);
170         if (error)
171                 return error;
172
173         if (dlm_local_addr[0]->ss_family == AF_INET) {
174                 struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
175                 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
176                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
177         } else {
178                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
179                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
180                 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
181                        sizeof(in6->sin6_addr));
182         }
183
184         return 0;
185 }
186
187 /* If alloc is 0 here we will not attempt to allocate a new
188    nodeinfo struct */
189 static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
190 {
191         struct nodeinfo *ni;
192         int r;
193         int n;
194
195         down_read(&nodeinfo_lock);
196         ni = idr_find(&nodeinfo_idr, nodeid);
197         up_read(&nodeinfo_lock);
198
199         if (ni || !alloc)
200                 return ni;
201
202         down_write(&nodeinfo_lock);
203
204         ni = idr_find(&nodeinfo_idr, nodeid);
205         if (ni)
206                 goto out_up;
207
208         r = idr_pre_get(&nodeinfo_idr, alloc);
209         if (!r)
210                 goto out_up;
211
212         ni = kmalloc(sizeof(struct nodeinfo), alloc);
213         if (!ni)
214                 goto out_up;
215
216         r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
217         if (r) {
218                 kfree(ni);
219                 ni = NULL;
220                 goto out_up;
221         }
222         if (n != nodeid) {
223                 idr_remove(&nodeinfo_idr, n);
224                 kfree(ni);
225                 ni = NULL;
226                 goto out_up;
227         }
228         memset(ni, 0, sizeof(struct nodeinfo));
229         spin_lock_init(&ni->lock);
230         INIT_LIST_HEAD(&ni->writequeue);
231         spin_lock_init(&ni->writequeue_lock);
232         INIT_WORK(&ni->lwork, process_lock_request);
233         INIT_WORK(&ni->swork, process_send_sockets);
234         ni->nodeid = nodeid;
235
236         if (nodeid > max_nodeid)
237                 max_nodeid = nodeid;
238 out_up:
239         up_write(&nodeinfo_lock);
240
241         return ni;
242 }
243
244 /* Don't call this too often... */
245 static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
246 {
247         int i;
248         struct nodeinfo *ni;
249
250         for (i=1; i<=max_nodeid; i++) {
251                 ni = nodeid2nodeinfo(i, 0);
252                 if (ni && ni->assoc_id == assoc)
253                         return ni;
254         }
255         return NULL;
256 }
257
258 /* Data or notification available on socket */
259 static void lowcomms_data_ready(struct sock *sk, int count_unused)
260 {
261         if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
262                 queue_work(recv_workqueue, &sctp_con.work);
263 }
264
265
266 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
267    Also padd out the struct with zeros to make comparisons meaningful */
268
269 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
270                           int *addr_len)
271 {
272         struct sockaddr_in *local4_addr;
273         struct sockaddr_in6 *local6_addr;
274
275         if (!dlm_local_count)
276                 return;
277
278         if (!port) {
279                 if (dlm_local_addr[0]->ss_family == AF_INET) {
280                         local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
281                         port = be16_to_cpu(local4_addr->sin_port);
282                 } else {
283                         local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
284                         port = be16_to_cpu(local6_addr->sin6_port);
285                 }
286         }
287
288         saddr->ss_family = dlm_local_addr[0]->ss_family;
289         if (dlm_local_addr[0]->ss_family == AF_INET) {
290                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
291                 in4_addr->sin_port = cpu_to_be16(port);
292                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
293                 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
294                        sizeof(struct sockaddr_in));
295                 *addr_len = sizeof(struct sockaddr_in);
296         } else {
297                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
298                 in6_addr->sin6_port = cpu_to_be16(port);
299                 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
300                        sizeof(struct sockaddr_in6));
301                 *addr_len = sizeof(struct sockaddr_in6);
302         }
303 }
304
305 /* Close the connection and tidy up */
306 static void close_connection(void)
307 {
308         if (sctp_con.sock) {
309                 sock_release(sctp_con.sock);
310                 sctp_con.sock = NULL;
311         }
312
313         if (sctp_con.rx_page) {
314                 __free_page(sctp_con.rx_page);
315                 sctp_con.rx_page = NULL;
316         }
317 }
318
319 /* We only send shutdown messages to nodes that are not part of the cluster */
320 static void send_shutdown(sctp_assoc_t associd)
321 {
322         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
323         struct msghdr outmessage;
324         struct cmsghdr *cmsg;
325         struct sctp_sndrcvinfo *sinfo;
326         int ret;
327
328         outmessage.msg_name = NULL;
329         outmessage.msg_namelen = 0;
330         outmessage.msg_control = outcmsg;
331         outmessage.msg_controllen = sizeof(outcmsg);
332         outmessage.msg_flags = MSG_EOR;
333
334         cmsg = CMSG_FIRSTHDR(&outmessage);
335         cmsg->cmsg_level = IPPROTO_SCTP;
336         cmsg->cmsg_type = SCTP_SNDRCV;
337         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
338         outmessage.msg_controllen = cmsg->cmsg_len;
339         sinfo = CMSG_DATA(cmsg);
340         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
341
342         sinfo->sinfo_flags |= MSG_EOF;
343         sinfo->sinfo_assoc_id = associd;
344
345         ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
346
347         if (ret != 0)
348                 log_print("send EOF to node failed: %d", ret);
349 }
350
351
352 /* INIT failed but we don't know which node...
353    restart INIT on all pending nodes */
354 static void init_failed(void)
355 {
356         int i;
357         struct nodeinfo *ni;
358
359         for (i=1; i<=max_nodeid; i++) {
360                 ni = nodeid2nodeinfo(i, 0);
361                 if (!ni)
362                         continue;
363
364                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
365                         ni->assoc_id = 0;
366                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
367                                 spin_lock_bh(&write_nodes_lock);
368                                 list_add_tail(&ni->write_list, &write_nodes);
369                                 spin_unlock_bh(&write_nodes_lock);
370                                 queue_work(send_workqueue, &ni->swork);
371                         }
372                 }
373         }
374 }
375
376 /* Something happened to an association */
377 static void process_sctp_notification(struct msghdr *msg, char *buf)
378 {
379         union sctp_notification *sn = (union sctp_notification *)buf;
380
381         if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
382                 switch (sn->sn_assoc_change.sac_state) {
383
384                 case SCTP_COMM_UP:
385                 case SCTP_RESTART:
386                 {
387                         /* Check that the new node is in the lockspace */
388                         struct sctp_prim prim;
389                         mm_segment_t fs;
390                         int nodeid;
391                         int prim_len, ret;
392                         int addr_len;
393                         struct nodeinfo *ni;
394
395                         /* This seems to happen when we received a connection
396                          * too early... or something...  anyway, it happens but
397                          * we always seem to get a real message too, see
398                          * receive_from_sock */
399
400                         if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
401                                 log_print("COMM_UP for invalid assoc ID %d",
402                                           (int)sn->sn_assoc_change.sac_assoc_id);
403                                 init_failed();
404                                 return;
405                         }
406                         memset(&prim, 0, sizeof(struct sctp_prim));
407                         prim_len = sizeof(struct sctp_prim);
408                         prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
409
410                         fs = get_fs();
411                         set_fs(get_ds());
412                         ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
413                                                              IPPROTO_SCTP,
414                                                              SCTP_PRIMARY_ADDR,
415                                                              (char*)&prim,
416                                                              &prim_len);
417                         set_fs(fs);
418                         if (ret < 0) {
419                                 struct nodeinfo *ni;
420
421                                 log_print("getsockopt/sctp_primary_addr on "
422                                           "new assoc %d failed : %d",
423                                           (int)sn->sn_assoc_change.sac_assoc_id,
424                                           ret);
425
426                                 /* Retry INIT later */
427                                 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
428                                 if (ni)
429                                         clear_bit(NI_INIT_PENDING, &ni->flags);
430                                 return;
431                         }
432                         make_sockaddr(&prim.ssp_addr, 0, &addr_len);
433                         if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
434                                 log_print("reject connect from unknown addr");
435                                 send_shutdown(prim.ssp_assoc_id);
436                                 return;
437                         }
438
439                         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
440                         if (!ni)
441                                 return;
442
443                         /* Save the assoc ID */
444                         ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
445
446                         log_print("got new/restarted association %d nodeid %d",
447                                   (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
448
449                         /* Send any pending writes */
450                         clear_bit(NI_INIT_PENDING, &ni->flags);
451                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
452                                 spin_lock_bh(&write_nodes_lock);
453                                 list_add_tail(&ni->write_list, &write_nodes);
454                                 spin_unlock_bh(&write_nodes_lock);
455                                 queue_work(send_workqueue, &ni->swork);
456                         }
457                 }
458                 break;
459
460                 case SCTP_COMM_LOST:
461                 case SCTP_SHUTDOWN_COMP:
462                 {
463                         struct nodeinfo *ni;
464
465                         ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
466                         if (ni) {
467                                 spin_lock(&ni->lock);
468                                 ni->assoc_id = 0;
469                                 spin_unlock(&ni->lock);
470                         }
471                 }
472                 break;
473
474                 /* We don't know which INIT failed, so clear the PENDING flags
475                  * on them all.  if assoc_id is zero then it will then try
476                  * again */
477
478                 case SCTP_CANT_STR_ASSOC:
479                 {
480                         log_print("Can't start SCTP association - retrying");
481                         init_failed();
482                 }
483                 break;
484
485                 default:
486                         log_print("unexpected SCTP assoc change id=%d state=%d",
487                                   (int)sn->sn_assoc_change.sac_assoc_id,
488                                   sn->sn_assoc_change.sac_state);
489                 }
490         }
491 }
492
493 /* Data received from remote end */
494 static int receive_from_sock(void)
495 {
496         int ret = 0;
497         struct msghdr msg;
498         struct kvec iov[2];
499         unsigned len;
500         int r;
501         struct sctp_sndrcvinfo *sinfo;
502         struct cmsghdr *cmsg;
503         struct nodeinfo *ni;
504
505         /* These two are marginally too big for stack allocation, but this
506          * function is (currently) only called by dlm_recvd so static should be
507          * OK.
508          */
509         static struct sockaddr_storage msgname;
510         static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
511
512         if (sctp_con.sock == NULL)
513                 goto out;
514
515         if (sctp_con.rx_page == NULL) {
516                 /*
517                  * This doesn't need to be atomic, but I think it should
518                  * improve performance if it is.
519                  */
520                 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
521                 if (sctp_con.rx_page == NULL)
522                         goto out_resched;
523                 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
524         }
525
526         memset(&incmsg, 0, sizeof(incmsg));
527         memset(&msgname, 0, sizeof(msgname));
528
529         msg.msg_name = &msgname;
530         msg.msg_namelen = sizeof(msgname);
531         msg.msg_flags = 0;
532         msg.msg_control = incmsg;
533         msg.msg_controllen = sizeof(incmsg);
534         msg.msg_iovlen = 1;
535
536         /* I don't see why this circular buffer stuff is necessary for SCTP
537          * which is a packet-based protocol, but the whole thing breaks under
538          * load without it! The overhead is minimal (and is in the TCP lowcomms
539          * anyway, of course) so I'll leave it in until I can figure out what's
540          * really happening.
541          */
542
543         /*
544          * iov[0] is the bit of the circular buffer between the current end
545          * point (cb.base + cb.len) and the end of the buffer.
546          */
547         iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
548         iov[0].iov_base = page_address(sctp_con.rx_page) +
549                 cbuf_data(&sctp_con.cb);
550         iov[1].iov_len = 0;
551
552         /*
553          * iov[1] is the bit of the circular buffer between the start of the
554          * buffer and the start of the currently used section (cb.base)
555          */
556         if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
557                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
558                 iov[1].iov_len = sctp_con.cb.base;
559                 iov[1].iov_base = page_address(sctp_con.rx_page);
560                 msg.msg_iovlen = 2;
561         }
562         len = iov[0].iov_len + iov[1].iov_len;
563
564         r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
565                                  MSG_NOSIGNAL | MSG_DONTWAIT);
566         if (ret <= 0)
567                 goto out_close;
568
569         msg.msg_control = incmsg;
570         msg.msg_controllen = sizeof(incmsg);
571         cmsg = CMSG_FIRSTHDR(&msg);
572         sinfo = CMSG_DATA(cmsg);
573
574         if (msg.msg_flags & MSG_NOTIFICATION) {
575                 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
576                 return 0;
577         }
578
579         /* Is this a new association ? */
580         ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
581         if (ni) {
582                 ni->assoc_id = sinfo->sinfo_assoc_id;
583                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
584
585                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
586                                 spin_lock_bh(&write_nodes_lock);
587                                 list_add_tail(&ni->write_list, &write_nodes);
588                                 spin_unlock_bh(&write_nodes_lock);
589                                 queue_work(send_workqueue, &ni->swork);
590                         }
591                 }
592         }
593
594         /* INIT sends a message with length of 1 - ignore it */
595         if (r == 1)
596                 return 0;
597
598         cbuf_add(&sctp_con.cb, ret);
599         // PJC: TODO: Add to node's workqueue....can we ??
600         ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
601                                           page_address(sctp_con.rx_page),
602                                           sctp_con.cb.base, sctp_con.cb.len,
603                                           PAGE_CACHE_SIZE);
604         if (ret < 0)
605                 goto out_close;
606         cbuf_eat(&sctp_con.cb, ret);
607
608 out:
609         ret = 0;
610         goto out_ret;
611
612 out_resched:
613         lowcomms_data_ready(sctp_con.sock->sk, 0);
614         ret = 0;
615         cond_resched();
616         goto out_ret;
617
618 out_close:
619         if (ret != -EAGAIN)
620                 log_print("error reading from sctp socket: %d", ret);
621 out_ret:
622         return ret;
623 }
624
625 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
626 static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
627 {
628         mm_segment_t fs;
629         int result = 0;
630
631         fs = get_fs();
632         set_fs(get_ds());
633         if (num == 1)
634                 result = sctp_con.sock->ops->bind(sctp_con.sock,
635                                                   (struct sockaddr *) addr,
636                                                   addr_len);
637         else
638                 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
639                                                         SCTP_SOCKOPT_BINDX_ADD,
640                                                         (char *)addr, addr_len);
641         set_fs(fs);
642
643         if (result < 0)
644                 log_print("Can't bind to port %d addr number %d",
645                           dlm_config.ci_tcp_port, num);
646
647         return result;
648 }
649
650 static void init_local(void)
651 {
652         struct sockaddr_storage sas, *addr;
653         int i;
654
655         dlm_local_nodeid = dlm_our_nodeid();
656
657         for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
658                 if (dlm_our_addr(&sas, i))
659                         break;
660
661                 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
662                 if (!addr)
663                         break;
664                 memcpy(addr, &sas, sizeof(*addr));
665                 dlm_local_addr[dlm_local_count++] = addr;
666         }
667 }
668
669 /* Initialise SCTP socket and bind to all interfaces */
670 static int init_sock(void)
671 {
672         mm_segment_t fs;
673         struct socket *sock = NULL;
674         struct sockaddr_storage localaddr;
675         struct sctp_event_subscribe subscribe;
676         int result = -EINVAL, num = 1, i, addr_len;
677
678         if (!dlm_local_count) {
679                 init_local();
680                 if (!dlm_local_count) {
681                         log_print("no local IP address has been set");
682                         goto out;
683                 }
684         }
685
686         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
687                                   IPPROTO_SCTP, &sock);
688         if (result < 0) {
689                 log_print("Can't create comms socket, check SCTP is loaded");
690                 goto out;
691         }
692
693         /* Listen for events */
694         memset(&subscribe, 0, sizeof(subscribe));
695         subscribe.sctp_data_io_event = 1;
696         subscribe.sctp_association_event = 1;
697         subscribe.sctp_send_failure_event = 1;
698         subscribe.sctp_shutdown_event = 1;
699         subscribe.sctp_partial_delivery_event = 1;
700
701         fs = get_fs();
702         set_fs(get_ds());
703         result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
704                                        (char *)&subscribe, sizeof(subscribe));
705         set_fs(fs);
706
707         if (result < 0) {
708                 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
709                           result);
710                 goto create_delsock;
711         }
712
713         /* Init con struct */
714         sock->sk->sk_user_data = &sctp_con;
715         sctp_con.sock = sock;
716         sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
717
718         /* Bind to all interfaces. */
719         for (i = 0; i < dlm_local_count; i++) {
720                 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
721                 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
722
723                 result = add_bind_addr(&localaddr, addr_len, num);
724                 if (result)
725                         goto create_delsock;
726                 ++num;
727         }
728
729         result = sock->ops->listen(sock, 5);
730         if (result < 0) {
731                 log_print("Can't set socket listening");
732                 goto create_delsock;
733         }
734
735         return 0;
736
737 create_delsock:
738         sock_release(sock);
739         sctp_con.sock = NULL;
740 out:
741         return result;
742 }
743
744
745 static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
746 {
747         struct writequeue_entry *entry;
748
749         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
750         if (!entry)
751                 return NULL;
752
753         entry->page = alloc_page(allocation);
754         if (!entry->page) {
755                 kfree(entry);
756                 return NULL;
757         }
758
759         entry->offset = 0;
760         entry->len = 0;
761         entry->end = 0;
762         entry->users = 0;
763
764         return entry;
765 }
766
767 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
768 {
769         struct writequeue_entry *e;
770         int offset = 0;
771         int users = 0;
772         struct nodeinfo *ni;
773
774         ni = nodeid2nodeinfo(nodeid, allocation);
775         if (!ni)
776                 return NULL;
777
778         spin_lock(&ni->writequeue_lock);
779         e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
780         if ((&e->list == &ni->writequeue) ||
781             (PAGE_CACHE_SIZE - e->end < len)) {
782                 e = NULL;
783         } else {
784                 offset = e->end;
785                 e->end += len;
786                 users = e->users++;
787         }
788         spin_unlock(&ni->writequeue_lock);
789
790         if (e) {
791         got_one:
792                 if (users == 0)
793                         kmap(e->page);
794                 *ppc = page_address(e->page) + offset;
795                 return e;
796         }
797
798         e = new_writequeue_entry(allocation);
799         if (e) {
800                 spin_lock(&ni->writequeue_lock);
801                 offset = e->end;
802                 e->end += len;
803                 e->ni = ni;
804                 users = e->users++;
805                 list_add_tail(&e->list, &ni->writequeue);
806                 spin_unlock(&ni->writequeue_lock);
807                 goto got_one;
808         }
809         return NULL;
810 }
811
812 void dlm_lowcomms_commit_buffer(void *arg)
813 {
814         struct writequeue_entry *e = (struct writequeue_entry *) arg;
815         int users;
816         struct nodeinfo *ni = e->ni;
817
818         spin_lock(&ni->writequeue_lock);
819         users = --e->users;
820         if (users)
821                 goto out;
822         e->len = e->end - e->offset;
823         kunmap(e->page);
824         spin_unlock(&ni->writequeue_lock);
825
826         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
827                 spin_lock_bh(&write_nodes_lock);
828                 list_add_tail(&ni->write_list, &write_nodes);
829                 spin_unlock_bh(&write_nodes_lock);
830
831                 queue_work(send_workqueue, &ni->swork);
832         }
833         return;
834
835 out:
836         spin_unlock(&ni->writequeue_lock);
837         return;
838 }
839
840 static void free_entry(struct writequeue_entry *e)
841 {
842         __free_page(e->page);
843         kfree(e);
844 }
845
846 /* Initiate an SCTP association. In theory we could just use sendmsg() on
847    the first IP address and it should work, but this allows us to set up the
848    association before sending any valuable data that we can't afford to lose.
849    It also keeps the send path clean as it can now always use the association ID */
850 static void initiate_association(int nodeid)
851 {
852         struct sockaddr_storage rem_addr;
853         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
854         struct msghdr outmessage;
855         struct cmsghdr *cmsg;
856         struct sctp_sndrcvinfo *sinfo;
857         int ret;
858         int addrlen;
859         char buf[1];
860         struct kvec iov[1];
861         struct nodeinfo *ni;
862
863         log_print("Initiating association with node %d", nodeid);
864
865         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
866         if (!ni)
867                 return;
868
869         if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
870                 log_print("no address for nodeid %d", nodeid);
871                 return;
872         }
873
874         make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
875
876         outmessage.msg_name = &rem_addr;
877         outmessage.msg_namelen = addrlen;
878         outmessage.msg_control = outcmsg;
879         outmessage.msg_controllen = sizeof(outcmsg);
880         outmessage.msg_flags = MSG_EOR;
881
882         iov[0].iov_base = buf;
883         iov[0].iov_len = 1;
884
885         /* Real INIT messages seem to cause trouble. Just send a 1 byte message
886            we can afford to lose */
887         cmsg = CMSG_FIRSTHDR(&outmessage);
888         cmsg->cmsg_level = IPPROTO_SCTP;
889         cmsg->cmsg_type = SCTP_SNDRCV;
890         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
891         sinfo = CMSG_DATA(cmsg);
892         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
893         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
894
895         outmessage.msg_controllen = cmsg->cmsg_len;
896         ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
897         if (ret < 0) {
898                 log_print("send INIT to node failed: %d", ret);
899                 /* Try again later */
900                 clear_bit(NI_INIT_PENDING, &ni->flags);
901         }
902 }
903
904 /* Send a message */
905 static void send_to_sock(struct nodeinfo *ni)
906 {
907         int ret = 0;
908         struct writequeue_entry *e;
909         int len, offset;
910         struct msghdr outmsg;
911         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
912         struct cmsghdr *cmsg;
913         struct sctp_sndrcvinfo *sinfo;
914         struct kvec iov;
915
916         /* See if we need to init an association before we start
917            sending precious messages */
918         spin_lock(&ni->lock);
919         if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
920                 spin_unlock(&ni->lock);
921                 initiate_association(ni->nodeid);
922                 return;
923         }
924         spin_unlock(&ni->lock);
925
926         outmsg.msg_name = NULL; /* We use assoc_id */
927         outmsg.msg_namelen = 0;
928         outmsg.msg_control = outcmsg;
929         outmsg.msg_controllen = sizeof(outcmsg);
930         outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
931
932         cmsg = CMSG_FIRSTHDR(&outmsg);
933         cmsg->cmsg_level = IPPROTO_SCTP;
934         cmsg->cmsg_type = SCTP_SNDRCV;
935         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
936         sinfo = CMSG_DATA(cmsg);
937         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
938         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
939         sinfo->sinfo_assoc_id = ni->assoc_id;
940         outmsg.msg_controllen = cmsg->cmsg_len;
941
942         spin_lock(&ni->writequeue_lock);
943         for (;;) {
944                 if (list_empty(&ni->writequeue))
945                         break;
946                 e = list_entry(ni->writequeue.next, struct writequeue_entry,
947                                list);
948                 len = e->len;
949                 offset = e->offset;
950                 BUG_ON(len == 0 && e->users == 0);
951                 spin_unlock(&ni->writequeue_lock);
952                 kmap(e->page);
953
954                 ret = 0;
955                 if (len) {
956                         iov.iov_base = page_address(e->page)+offset;
957                         iov.iov_len = len;
958
959                         ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
960                                              len);
961                         if (ret == -EAGAIN) {
962                                 sctp_con.eagain_flag = 1;
963                                 goto out;
964                         } else if (ret < 0)
965                                 goto send_error;
966                 } else {
967                         /* Don't starve people filling buffers */
968                         cond_resched();
969                 }
970
971                 spin_lock(&ni->writequeue_lock);
972                 e->offset += ret;
973                 e->len -= ret;
974
975                 if (e->len == 0 && e->users == 0) {
976                         list_del(&e->list);
977                         kunmap(e->page);
978                         free_entry(e);
979                         continue;
980                 }
981         }
982         spin_unlock(&ni->writequeue_lock);
983 out:
984         return;
985
986 send_error:
987         log_print("Error sending to node %d %d", ni->nodeid, ret);
988         spin_lock(&ni->lock);
989         if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
990                 ni->assoc_id = 0;
991                 spin_unlock(&ni->lock);
992                 initiate_association(ni->nodeid);
993         } else
994                 spin_unlock(&ni->lock);
995
996         return;
997 }
998
999 /* Try to send any messages that are pending */
1000 static void process_output_queue(void)
1001 {
1002         struct list_head *list;
1003         struct list_head *temp;
1004
1005         spin_lock_bh(&write_nodes_lock);
1006         list_for_each_safe(list, temp, &write_nodes) {
1007                 struct nodeinfo *ni =
1008                         list_entry(list, struct nodeinfo, write_list);
1009                 clear_bit(NI_WRITE_PENDING, &ni->flags);
1010                 list_del(&ni->write_list);
1011
1012                 spin_unlock_bh(&write_nodes_lock);
1013
1014                 send_to_sock(ni);
1015                 spin_lock_bh(&write_nodes_lock);
1016         }
1017         spin_unlock_bh(&write_nodes_lock);
1018 }
1019
1020 /* Called after we've had -EAGAIN and been woken up */
1021 static void refill_write_queue(void)
1022 {
1023         int i;
1024
1025         for (i=1; i<=max_nodeid; i++) {
1026                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1027
1028                 if (ni) {
1029                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1030                                 spin_lock_bh(&write_nodes_lock);
1031                                 list_add_tail(&ni->write_list, &write_nodes);
1032                                 spin_unlock_bh(&write_nodes_lock);
1033                         }
1034                 }
1035         }
1036 }
1037
1038 static void clean_one_writequeue(struct nodeinfo *ni)
1039 {
1040         struct list_head *list;
1041         struct list_head *temp;
1042
1043         spin_lock(&ni->writequeue_lock);
1044         list_for_each_safe(list, temp, &ni->writequeue) {
1045                 struct writequeue_entry *e =
1046                         list_entry(list, struct writequeue_entry, list);
1047                 list_del(&e->list);
1048                 free_entry(e);
1049         }
1050         spin_unlock(&ni->writequeue_lock);
1051 }
1052
1053 static void clean_writequeues(void)
1054 {
1055         int i;
1056
1057         for (i=1; i<=max_nodeid; i++) {
1058                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1059                 if (ni)
1060                         clean_one_writequeue(ni);
1061         }
1062 }
1063
1064
1065 static void dealloc_nodeinfo(void)
1066 {
1067         int i;
1068
1069         for (i=1; i<=max_nodeid; i++) {
1070                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1071                 if (ni) {
1072                         idr_remove(&nodeinfo_idr, i);
1073                         kfree(ni);
1074                 }
1075         }
1076 }
1077
1078 int dlm_lowcomms_close(int nodeid)
1079 {
1080         struct nodeinfo *ni;
1081
1082         ni = nodeid2nodeinfo(nodeid, 0);
1083         if (!ni)
1084                 return -1;
1085
1086         spin_lock(&ni->lock);
1087         if (ni->assoc_id) {
1088                 ni->assoc_id = 0;
1089                 /* Don't send shutdown here, sctp will just queue it
1090                    till the node comes back up! */
1091         }
1092         spin_unlock(&ni->lock);
1093
1094         clean_one_writequeue(ni);
1095         clear_bit(NI_INIT_PENDING, &ni->flags);
1096         return 0;
1097 }
1098
1099 // PJC: The work queue function for receiving.
1100 static void process_recv_sockets(struct work_struct *work)
1101 {
1102         if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1103                 int ret;
1104                 int count = 0;
1105
1106                 do {
1107                         ret = receive_from_sock();
1108
1109                         /* Don't starve out everyone else */
1110                         if (++count >= MAX_RX_MSG_COUNT) {
1111                                 cond_resched();
1112                                 count = 0;
1113                         }
1114                 } while (!kthread_should_stop() && ret >=0);
1115         }
1116         cond_resched();
1117 }
1118
1119 // PJC: the work queue function for sending
1120 static void process_send_sockets(struct work_struct *work)
1121 {
1122         if (sctp_con.eagain_flag) {
1123                 sctp_con.eagain_flag = 0;
1124                 refill_write_queue();
1125         }
1126         process_output_queue();
1127 }
1128
1129 // PJC: Process lock requests from a particular node.
1130 // TODO: can we optimise this out on UP ??
1131 static void process_lock_request(struct work_struct *work)
1132 {
1133 }
1134
1135 static void daemons_stop(void)
1136 {
1137         destroy_workqueue(recv_workqueue);
1138         destroy_workqueue(send_workqueue);
1139         destroy_workqueue(lock_workqueue);
1140 }
1141
1142 static int daemons_start(void)
1143 {
1144         int error;
1145         recv_workqueue = create_workqueue("dlm_recv");
1146         error = IS_ERR(recv_workqueue);
1147         if (error) {
1148                 log_print("can't start dlm_recv %d", error);
1149                 return error;
1150         }
1151
1152         send_workqueue = create_singlethread_workqueue("dlm_send");
1153         error = IS_ERR(send_workqueue);
1154         if (error) {
1155                 log_print("can't start dlm_send %d", error);
1156                 destroy_workqueue(recv_workqueue);
1157                 return error;
1158         }
1159
1160         lock_workqueue = create_workqueue("dlm_rlock");
1161         error = IS_ERR(lock_workqueue);
1162         if (error) {
1163                 log_print("can't start dlm_rlock %d", error);
1164                 destroy_workqueue(send_workqueue);
1165                 destroy_workqueue(recv_workqueue);
1166                 return error;
1167         }
1168
1169         return 0;
1170 }
1171
1172 /*
1173  * This is quite likely to sleep...
1174  */
1175 int dlm_lowcomms_start(void)
1176 {
1177         int error;
1178
1179         INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
1181         error = init_sock();
1182         if (error)
1183                 goto fail_sock;
1184         error = daemons_start();
1185         if (error)
1186                 goto fail_sock;
1187         return 0;
1188
1189 fail_sock:
1190         close_connection();
1191         return error;
1192 }
1193
1194 void dlm_lowcomms_stop(void)
1195 {
1196         int i;
1197
1198         sctp_con.flags = 0x7;
1199         daemons_stop();
1200         clean_writequeues();
1201         close_connection();
1202         dealloc_nodeinfo();
1203         max_nodeid = 0;
1204
1205         dlm_local_count = 0;
1206         dlm_local_nodeid = 0;
1207
1208         for (i = 0; i < dlm_local_count; i++)
1209                 kfree(dlm_local_addr[i]);
1210 }