This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / drivers / xen / xenbus / xenbus_xs.c
1 /******************************************************************************
2  * xenbus_xs.c
3  *
4  * This is the kernel equivalent of the "xs" library.  We don't need everything
5  * and we use xenbus_comms for communication.
6  *
7  * Copyright (C) 2005 Rusty Russell, IBM Corporation
8  * 
9  * This program is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU General Public License version 2
11  * as published by the Free Software Foundation; or, when distributed
12  * separately from the Linux kernel or incorporated into other
13  * software packages, subject to the following license:
14  * 
15  * Permission is hereby granted, free of charge, to any person obtaining a copy
16  * of this source file (the "Software"), to deal in the Software without
17  * restriction, including without limitation the rights to use, copy, modify,
18  * merge, publish, distribute, sublicense, and/or sell copies of the Software,
19  * and to permit persons to whom the Software is furnished to do so, subject to
20  * the following conditions:
21  * 
22  * The above copyright notice and this permission notice shall be included in
23  * all copies or substantial portions of the Software.
24  * 
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
26  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
27  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
28  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
29  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
30  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
31  * IN THE SOFTWARE.
32  */
33
34 #include <linux/unistd.h>
35 #include <linux/errno.h>
36 #include <linux/types.h>
37 #include <linux/uio.h>
38 #include <linux/kernel.h>
39 #include <linux/string.h>
40 #include <linux/err.h>
41 #include <linux/slab.h>
42 #include <linux/fcntl.h>
43 #include <linux/kthread.h>
44 #include <linux/rwsem.h>
45 #include <xen/xenbus.h>
46 #include "xenbus_comms.h"
47
48 /* xenbus_probe.c */
49 extern char *kasprintf(const char *fmt, ...);
50
51 struct xs_stored_msg {
52         struct list_head list;
53
54         struct xsd_sockmsg hdr;
55
56         union {
57                 /* Queued replies. */
58                 struct {
59                         char *body;
60                 } reply;
61
62                 /* Queued watch events. */
63                 struct {
64                         struct xenbus_watch *handle;
65                         char **vec;
66                         unsigned int vec_size;
67                 } watch;
68         } u;
69 };
70
71 struct xs_handle {
72         /* A list of replies. Currently only one will ever be outstanding. */
73         struct list_head reply_list;
74         spinlock_t reply_lock;
75         wait_queue_head_t reply_waitq;
76
77         /* One request at a time. */
78         struct mutex request_mutex;
79
80         /* Protect transactions against save/restore. */
81         struct rw_semaphore suspend_mutex;
82 };
83
84 static struct xs_handle xs_state;
85
86 /* List of registered watches, and a lock to protect it. */
87 static LIST_HEAD(watches);
88 static DEFINE_SPINLOCK(watches_lock);
89
90 /* List of pending watch callback events, and a lock to protect it. */
91 static LIST_HEAD(watch_events);
92 static DEFINE_SPINLOCK(watch_events_lock);
93
94 /*
95  * Details of the xenwatch callback kernel thread. The thread waits on the
96  * watch_events_waitq for work to do (queued on watch_events list). When it
97  * wakes up it acquires the xenwatch_mutex before reading the list and
98  * carrying out work.
99  */
100 static pid_t xenwatch_pid;
101 /* static */ DEFINE_MUTEX(xenwatch_mutex);
102 static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
103
104 static int get_error(const char *errorstring)
105 {
106         unsigned int i;
107
108         for (i = 0; strcmp(errorstring, xsd_errors[i].errstring) != 0; i++) {
109                 if (i == ARRAY_SIZE(xsd_errors) - 1) {
110                         printk(KERN_WARNING
111                                "XENBUS xen store gave: unknown error %s",
112                                errorstring);
113                         return EINVAL;
114                 }
115         }
116         return xsd_errors[i].errnum;
117 }
118
119 static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
120 {
121         struct xs_stored_msg *msg;
122         char *body;
123
124         spin_lock(&xs_state.reply_lock);
125
126         while (list_empty(&xs_state.reply_list)) {
127                 spin_unlock(&xs_state.reply_lock);
128                 /* XXX FIXME: Avoid synchronous wait for response here. */
129                 wait_event(xs_state.reply_waitq,
130                            !list_empty(&xs_state.reply_list));
131                 spin_lock(&xs_state.reply_lock);
132         }
133
134         msg = list_entry(xs_state.reply_list.next,
135                          struct xs_stored_msg, list);
136         list_del(&msg->list);
137
138         spin_unlock(&xs_state.reply_lock);
139
140         *type = msg->hdr.type;
141         if (len)
142                 *len = msg->hdr.len;
143         body = msg->u.reply.body;
144
145         kfree(msg);
146
147         return body;
148 }
149
150 /* Emergency write. */
151 void xenbus_debug_write(const char *str, unsigned int count)
152 {
153         struct xsd_sockmsg msg = { 0 };
154
155         msg.type = XS_DEBUG;
156         msg.len = sizeof("print") + count + 1;
157
158         mutex_lock(&xs_state.request_mutex);
159         xb_write(&msg, sizeof(msg));
160         xb_write("print", sizeof("print"));
161         xb_write(str, count);
162         xb_write("", 1);
163         mutex_unlock(&xs_state.request_mutex);
164 }
165
166 void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
167 {
168         void *ret;
169         struct xsd_sockmsg req_msg = *msg;
170         int err;
171
172         if (req_msg.type == XS_TRANSACTION_START)
173                 down_read(&xs_state.suspend_mutex);
174
175         mutex_lock(&xs_state.request_mutex);
176
177         err = xb_write(msg, sizeof(*msg) + msg->len);
178         if (err) {
179                 msg->type = XS_ERROR;
180                 ret = ERR_PTR(err);
181         } else
182                 ret = read_reply(&msg->type, &msg->len);
183
184         mutex_unlock(&xs_state.request_mutex);
185
186         if ((req_msg.type == XS_TRANSACTION_END) ||
187             ((req_msg.type == XS_TRANSACTION_START) &&
188              (msg->type == XS_ERROR)))
189                 up_read(&xs_state.suspend_mutex);
190
191         return ret;
192 }
193
194 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
195 static void *xs_talkv(struct xenbus_transaction t,
196                       enum xsd_sockmsg_type type,
197                       const struct kvec *iovec,
198                       unsigned int num_vecs,
199                       unsigned int *len)
200 {
201         struct xsd_sockmsg msg;
202         void *ret = NULL;
203         unsigned int i;
204         int err;
205
206         msg.tx_id = t.id;
207         msg.req_id = 0;
208         msg.type = type;
209         msg.len = 0;
210         for (i = 0; i < num_vecs; i++)
211                 msg.len += iovec[i].iov_len;
212
213         mutex_lock(&xs_state.request_mutex);
214
215         err = xb_write(&msg, sizeof(msg));
216         if (err) {
217                 mutex_unlock(&xs_state.request_mutex);
218                 return ERR_PTR(err);
219         }
220
221         for (i = 0; i < num_vecs; i++) {
222                 err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
223                 if (err) {
224                         mutex_unlock(&xs_state.request_mutex);
225                         return ERR_PTR(err);
226                 }
227         }
228
229         ret = read_reply(&msg.type, len);
230
231         mutex_unlock(&xs_state.request_mutex);
232
233         if (IS_ERR(ret))
234                 return ret;
235
236         if (msg.type == XS_ERROR) {
237                 err = get_error(ret);
238                 kfree(ret);
239                 return ERR_PTR(-err);
240         }
241
242         if (msg.type != type) {
243                 if (printk_ratelimit())
244                         printk(KERN_WARNING
245                                "XENBUS unexpected type [%d], expected [%d]\n",
246                                msg.type, type);
247                 kfree(ret);
248                 return ERR_PTR(-EINVAL);
249         }
250         return ret;
251 }
252
253 /* Simplified version of xs_talkv: single message. */
254 static void *xs_single(struct xenbus_transaction t,
255                        enum xsd_sockmsg_type type,
256                        const char *string,
257                        unsigned int *len)
258 {
259         struct kvec iovec;
260
261         iovec.iov_base = (void *)string;
262         iovec.iov_len = strlen(string) + 1;
263         return xs_talkv(t, type, &iovec, 1, len);
264 }
265
266 /* Many commands only need an ack, don't care what it says. */
267 static int xs_error(char *reply)
268 {
269         if (IS_ERR(reply))
270                 return PTR_ERR(reply);
271         kfree(reply);
272         return 0;
273 }
274
275 static unsigned int count_strings(const char *strings, unsigned int len)
276 {
277         unsigned int num;
278         const char *p;
279
280         for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
281                 num++;
282
283         return num;
284 }
285
286 /* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
287 static char *join(const char *dir, const char *name)
288 {
289         char *buffer;
290
291         if (strlen(name) == 0)
292                 buffer = kasprintf("%s", dir);
293         else
294                 buffer = kasprintf("%s/%s", dir, name);
295         return (!buffer) ? ERR_PTR(-ENOMEM) : buffer;
296 }
297
298 static char **split(char *strings, unsigned int len, unsigned int *num)
299 {
300         char *p, **ret;
301
302         /* Count the strings. */
303         *num = count_strings(strings, len);
304
305         /* Transfer to one big alloc for easy freeing. */
306         ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
307         if (!ret) {
308                 kfree(strings);
309                 return ERR_PTR(-ENOMEM);
310         }
311         memcpy(&ret[*num], strings, len);
312         kfree(strings);
313
314         strings = (char *)&ret[*num];
315         for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
316                 ret[(*num)++] = p;
317
318         return ret;
319 }
320
321 char **xenbus_directory(struct xenbus_transaction t,
322                         const char *dir, const char *node, unsigned int *num)
323 {
324         char *strings, *path;
325         unsigned int len;
326
327         path = join(dir, node);
328         if (IS_ERR(path))
329                 return (char **)path;
330
331         strings = xs_single(t, XS_DIRECTORY, path, &len);
332         kfree(path);
333         if (IS_ERR(strings))
334                 return (char **)strings;
335
336         return split(strings, len, num);
337 }
338 EXPORT_SYMBOL_GPL(xenbus_directory);
339
340 /* Check if a path exists. Return 1 if it does. */
341 int xenbus_exists(struct xenbus_transaction t,
342                   const char *dir, const char *node)
343 {
344         char **d;
345         int dir_n;
346
347         d = xenbus_directory(t, dir, node, &dir_n);
348         if (IS_ERR(d))
349                 return 0;
350         kfree(d);
351         return 1;
352 }
353 EXPORT_SYMBOL_GPL(xenbus_exists);
354
355 /* Get the value of a single file.
356  * Returns a kmalloced value: call free() on it after use.
357  * len indicates length in bytes.
358  */
359 void *xenbus_read(struct xenbus_transaction t,
360                   const char *dir, const char *node, unsigned int *len)
361 {
362         char *path;
363         void *ret;
364
365         path = join(dir, node);
366         if (IS_ERR(path))
367                 return (void *)path;
368
369         ret = xs_single(t, XS_READ, path, len);
370         kfree(path);
371         return ret;
372 }
373 EXPORT_SYMBOL_GPL(xenbus_read);
374
375 /* Write the value of a single file.
376  * Returns -err on failure.
377  */
378 int xenbus_write(struct xenbus_transaction t,
379                  const char *dir, const char *node, const char *string)
380 {
381         const char *path;
382         struct kvec iovec[2];
383         int ret;
384
385         path = join(dir, node);
386         if (IS_ERR(path))
387                 return PTR_ERR(path);
388
389         iovec[0].iov_base = (void *)path;
390         iovec[0].iov_len = strlen(path) + 1;
391         iovec[1].iov_base = (void *)string;
392         iovec[1].iov_len = strlen(string);
393
394         ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
395         kfree(path);
396         return ret;
397 }
398 EXPORT_SYMBOL_GPL(xenbus_write);
399
400 /* Create a new directory. */
401 int xenbus_mkdir(struct xenbus_transaction t,
402                  const char *dir, const char *node)
403 {
404         char *path;
405         int ret;
406
407         path = join(dir, node);
408         if (IS_ERR(path))
409                 return PTR_ERR(path);
410
411         ret = xs_error(xs_single(t, XS_MKDIR, path, NULL));
412         kfree(path);
413         return ret;
414 }
415 EXPORT_SYMBOL_GPL(xenbus_mkdir);
416
417 /* Destroy a file or directory (directories must be empty). */
418 int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
419 {
420         char *path;
421         int ret;
422
423         path = join(dir, node);
424         if (IS_ERR(path))
425                 return PTR_ERR(path);
426
427         ret = xs_error(xs_single(t, XS_RM, path, NULL));
428         kfree(path);
429         return ret;
430 }
431 EXPORT_SYMBOL_GPL(xenbus_rm);
432
433 /* Start a transaction: changes by others will not be seen during this
434  * transaction, and changes will not be visible to others until end.
435  */
436 int xenbus_transaction_start(struct xenbus_transaction *t)
437 {
438         char *id_str;
439
440         down_read(&xs_state.suspend_mutex);
441
442         id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
443         if (IS_ERR(id_str)) {
444                 up_read(&xs_state.suspend_mutex);
445                 return PTR_ERR(id_str);
446         }
447
448         t->id = simple_strtoul(id_str, NULL, 0);
449         kfree(id_str);
450         return 0;
451 }
452 EXPORT_SYMBOL_GPL(xenbus_transaction_start);
453
454 /* End a transaction.
455  * If abandon is true, transaction is discarded instead of committed.
456  */
457 int xenbus_transaction_end(struct xenbus_transaction t, int abort)
458 {
459         char abortstr[2];
460         int err;
461
462         if (abort)
463                 strcpy(abortstr, "F");
464         else
465                 strcpy(abortstr, "T");
466
467         err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
468
469         up_read(&xs_state.suspend_mutex);
470
471         return err;
472 }
473 EXPORT_SYMBOL_GPL(xenbus_transaction_end);
474
475 /* Single read and scanf: returns -errno or num scanned. */
476 int xenbus_scanf(struct xenbus_transaction t,
477                  const char *dir, const char *node, const char *fmt, ...)
478 {
479         va_list ap;
480         int ret;
481         char *val;
482
483         val = xenbus_read(t, dir, node, NULL);
484         if (IS_ERR(val))
485                 return PTR_ERR(val);
486
487         va_start(ap, fmt);
488         ret = vsscanf(val, fmt, ap);
489         va_end(ap);
490         kfree(val);
491         /* Distinctive errno. */
492         if (ret == 0)
493                 return -ERANGE;
494         return ret;
495 }
496 EXPORT_SYMBOL_GPL(xenbus_scanf);
497
498 /* Single printf and write: returns -errno or 0. */
499 int xenbus_printf(struct xenbus_transaction t,
500                   const char *dir, const char *node, const char *fmt, ...)
501 {
502         va_list ap;
503         int ret;
504 #define PRINTF_BUFFER_SIZE 4096
505         char *printf_buffer;
506
507         printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
508         if (printf_buffer == NULL)
509                 return -ENOMEM;
510
511         va_start(ap, fmt);
512         ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
513         va_end(ap);
514
515         BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
516         ret = xenbus_write(t, dir, node, printf_buffer);
517
518         kfree(printf_buffer);
519
520         return ret;
521 }
522 EXPORT_SYMBOL_GPL(xenbus_printf);
523
524 /* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
525 int xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
526 {
527         va_list ap;
528         const char *name;
529         int ret = 0;
530
531         va_start(ap, dir);
532         while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
533                 const char *fmt = va_arg(ap, char *);
534                 void *result = va_arg(ap, void *);
535                 char *p;
536
537                 p = xenbus_read(t, dir, name, NULL);
538                 if (IS_ERR(p)) {
539                         ret = PTR_ERR(p);
540                         break;
541                 }
542                 if (fmt) {
543                         if (sscanf(p, fmt, result) == 0)
544                                 ret = -EINVAL;
545                         kfree(p);
546                 } else
547                         *(char **)result = p;
548         }
549         va_end(ap);
550         return ret;
551 }
552 EXPORT_SYMBOL_GPL(xenbus_gather);
553
554 static int xs_watch(const char *path, const char *token)
555 {
556         struct kvec iov[2];
557
558         iov[0].iov_base = (void *)path;
559         iov[0].iov_len = strlen(path) + 1;
560         iov[1].iov_base = (void *)token;
561         iov[1].iov_len = strlen(token) + 1;
562
563         return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov,
564                                  ARRAY_SIZE(iov), NULL));
565 }
566
567 static int xs_unwatch(const char *path, const char *token)
568 {
569         struct kvec iov[2];
570
571         iov[0].iov_base = (char *)path;
572         iov[0].iov_len = strlen(path) + 1;
573         iov[1].iov_base = (char *)token;
574         iov[1].iov_len = strlen(token) + 1;
575
576         return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov,
577                                  ARRAY_SIZE(iov), NULL));
578 }
579
580 static struct xenbus_watch *find_watch(const char *token)
581 {
582         struct xenbus_watch *i, *cmp;
583
584         cmp = (void *)simple_strtoul(token, NULL, 16);
585
586         list_for_each_entry(i, &watches, list)
587                 if (i == cmp)
588                         return i;
589
590         return NULL;
591 }
592
593 /* Register callback to watch this node. */
594 int register_xenbus_watch(struct xenbus_watch *watch)
595 {
596         /* Pointer in ascii is the token. */
597         char token[sizeof(watch) * 2 + 1];
598         int err;
599
600         sprintf(token, "%lX", (long)watch);
601
602         down_read(&xs_state.suspend_mutex);
603
604         spin_lock(&watches_lock);
605         BUG_ON(find_watch(token));
606         list_add(&watch->list, &watches);
607         spin_unlock(&watches_lock);
608
609         err = xs_watch(watch->node, token);
610
611         /* Ignore errors due to multiple registration. */
612         if ((err != 0) && (err != -EEXIST)) {
613                 spin_lock(&watches_lock);
614                 list_del(&watch->list);
615                 spin_unlock(&watches_lock);
616         }
617
618         up_read(&xs_state.suspend_mutex);
619
620         return err;
621 }
622 EXPORT_SYMBOL_GPL(register_xenbus_watch);
623
624 void unregister_xenbus_watch(struct xenbus_watch *watch)
625 {
626         struct xs_stored_msg *msg, *tmp;
627         char token[sizeof(watch) * 2 + 1];
628         int err;
629
630         sprintf(token, "%lX", (long)watch);
631
632         down_read(&xs_state.suspend_mutex);
633
634         spin_lock(&watches_lock);
635         BUG_ON(!find_watch(token));
636         list_del(&watch->list);
637         spin_unlock(&watches_lock);
638
639         err = xs_unwatch(watch->node, token);
640         if (err)
641                 printk(KERN_WARNING
642                        "XENBUS Failed to release watch %s: %i\n",
643                        watch->node, err);
644
645         up_read(&xs_state.suspend_mutex);
646
647         /* Cancel pending watch events. */
648         spin_lock(&watch_events_lock);
649         list_for_each_entry_safe(msg, tmp, &watch_events, list) {
650                 if (msg->u.watch.handle != watch)
651                         continue;
652                 list_del(&msg->list);
653                 kfree(msg->u.watch.vec);
654                 kfree(msg);
655         }
656         spin_unlock(&watch_events_lock);
657
658         /* Flush any currently-executing callback, unless we are it. :-) */
659         if (current->pid != xenwatch_pid) {
660                 mutex_lock(&xenwatch_mutex);
661                 mutex_unlock(&xenwatch_mutex);
662         }
663 }
664 EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
665
666 void xs_suspend(void)
667 {
668         down_write(&xs_state.suspend_mutex);
669         mutex_lock(&xs_state.request_mutex);
670 }
671
672 void xs_resume(void)
673 {
674         struct xenbus_watch *watch;
675         char token[sizeof(watch) * 2 + 1];
676
677         mutex_unlock(&xs_state.request_mutex);
678
679         /* No need for watches_lock: the suspend_mutex is sufficient. */
680         list_for_each_entry(watch, &watches, list) {
681                 sprintf(token, "%lX", (long)watch);
682                 xs_watch(watch->node, token);
683         }
684
685         up_write(&xs_state.suspend_mutex);
686 }
687
688 static int xenwatch_handle_callback(void *data)
689 {
690         struct xs_stored_msg *msg = data;
691
692         msg->u.watch.handle->callback(msg->u.watch.handle,
693                                       (const char **)msg->u.watch.vec,
694                                       msg->u.watch.vec_size);
695
696         kfree(msg->u.watch.vec);
697         kfree(msg);
698
699         /* Kill this kthread if we were spawned just for this callback. */
700         if (current->pid != xenwatch_pid)
701                 do_exit(0);
702
703         return 0;
704 }
705
706 static int xenwatch_thread(void *unused)
707 {
708         struct list_head *ent;
709         struct xs_stored_msg *msg;
710
711         for (;;) {
712                 wait_event_interruptible(watch_events_waitq,
713                                          !list_empty(&watch_events));
714
715                 if (kthread_should_stop())
716                         break;
717
718                 mutex_lock(&xenwatch_mutex);
719
720                 spin_lock(&watch_events_lock);
721                 ent = watch_events.next;
722                 if (ent != &watch_events)
723                         list_del(ent);
724                 spin_unlock(&watch_events_lock);
725
726                 if (ent != &watch_events) {
727                         msg = list_entry(ent, struct xs_stored_msg, list);
728                         if (msg->u.watch.handle->flags & XBWF_new_thread)
729                                 kthread_run(xenwatch_handle_callback,
730                                             msg, "xenwatch_cb");
731                         else
732                                 xenwatch_handle_callback(msg);
733                 }
734
735                 mutex_unlock(&xenwatch_mutex);
736         }
737
738         return 0;
739 }
740
741 static int process_msg(void)
742 {
743         struct xs_stored_msg *msg;
744         char *body;
745         int err;
746
747         msg = kmalloc(sizeof(*msg), GFP_KERNEL);
748         if (msg == NULL)
749                 return -ENOMEM;
750
751         err = xb_read(&msg->hdr, sizeof(msg->hdr));
752         if (err) {
753                 kfree(msg);
754                 return err;
755         }
756
757         body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
758         if (body == NULL) {
759                 kfree(msg);
760                 return -ENOMEM;
761         }
762
763         err = xb_read(body, msg->hdr.len);
764         if (err) {
765                 kfree(body);
766                 kfree(msg);
767                 return err;
768         }
769         body[msg->hdr.len] = '\0';
770
771         if (msg->hdr.type == XS_WATCH_EVENT) {
772                 msg->u.watch.vec = split(body, msg->hdr.len,
773                                          &msg->u.watch.vec_size);
774                 if (IS_ERR(msg->u.watch.vec)) {
775                         kfree(msg);
776                         return PTR_ERR(msg->u.watch.vec);
777                 }
778
779                 spin_lock(&watches_lock);
780                 msg->u.watch.handle = find_watch(
781                         msg->u.watch.vec[XS_WATCH_TOKEN]);
782                 if (msg->u.watch.handle != NULL) {
783                         spin_lock(&watch_events_lock);
784                         list_add_tail(&msg->list, &watch_events);
785                         wake_up(&watch_events_waitq);
786                         spin_unlock(&watch_events_lock);
787                 } else {
788                         kfree(msg->u.watch.vec);
789                         kfree(msg);
790                 }
791                 spin_unlock(&watches_lock);
792         } else {
793                 msg->u.reply.body = body;
794                 spin_lock(&xs_state.reply_lock);
795                 list_add_tail(&msg->list, &xs_state.reply_list);
796                 spin_unlock(&xs_state.reply_lock);
797                 wake_up(&xs_state.reply_waitq);
798         }
799
800         return 0;
801 }
802
803 static int xenbus_thread(void *unused)
804 {
805         int err;
806
807         for (;;) {
808                 err = process_msg();
809                 if (err)
810                         printk(KERN_WARNING "XENBUS error %d while reading "
811                                "message\n", err);
812                 if (kthread_should_stop())
813                         break;
814         }
815
816         return 0;
817 }
818
819 int xs_init(void)
820 {
821         int err;
822         struct task_struct *task;
823
824         INIT_LIST_HEAD(&xs_state.reply_list);
825         spin_lock_init(&xs_state.reply_lock);
826         init_waitqueue_head(&xs_state.reply_waitq);
827
828         mutex_init(&xs_state.request_mutex);
829         init_rwsem(&xs_state.suspend_mutex);
830
831         /* Initialize the shared memory rings to talk to xenstored */
832         err = xb_init_comms();
833         if (err)
834                 return err;
835
836         task = kthread_run(xenwatch_thread, NULL, "xenwatch");
837         if (IS_ERR(task))
838                 return PTR_ERR(task);
839         xenwatch_pid = task->pid;
840
841         task = kthread_run(xenbus_thread, NULL, "xenbus");
842         if (IS_ERR(task))
843                 return PTR_ERR(task);
844
845         return 0;
846 }