Replace all uses of strerror() by ovs_strerror(), for thread safety.
[sliver-openvswitch.git] / lib / worker.c
1 /* Copyright (c) 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "worker.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <sys/uio.h>
27 #include <sys/wait.h>
28 #include <unistd.h>
29
30 #include "command-line.h"
31 #include "daemon.h"
32 #include "ofpbuf.h"
33 #include "poll-loop.h"
34 #include "socket-util.h"
35 #include "util.h"
36 #include "vlog.h"
37
38 VLOG_DEFINE_THIS_MODULE(worker);
39
40 /* ovs_assert() logs the assertion message and logging sometimes goes through a
41  * worker, so using ovs_assert() in this source file could cause recursion. */
42 #undef ovs_assert
43 #define ovs_assert use_assert_instead_of_ovs_assert_in_this_module
44
45 /* Header for an RPC request. */
46 struct worker_request {
47     size_t request_len;              /* Length of the payload in bytes. */
48     worker_request_func *request_cb; /* Function to call in worker process. */
49     worker_reply_func *reply_cb;     /* Function to call in main process. */
50     void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
51 };
52
53 /* Header for an RPC reply. */
54 struct worker_reply {
55     size_t reply_len;            /* Length of the payload in bytes. */
56     worker_reply_func *reply_cb; /* Function to call in main process. */
57     void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
58 };
59
60 /* Receive buffer for a RPC request or reply. */
61 struct rxbuf {
62     /* Header. */
63     struct ofpbuf header;       /* Header data. */
64     int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
65     size_t n_fds;
66
67     /* Payload. */
68     struct ofpbuf payload;      /* Payload data. */
69 };
70
71 static int client_sock = -1;
72 static struct rxbuf client_rx;
73
74 static void rxbuf_init(struct rxbuf *);
75 static void rxbuf_clear(struct rxbuf *);
76 static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
77
78 static struct iovec *prefix_iov(void *data, size_t len,
79                                 const struct iovec *iovs, size_t n_iovs);
80
81 static void worker_broke(void);
82
83 static void worker_main(int fd) NO_RETURN;
84
85 /* Starts a worker process as a subprocess of the current process.  Currently
86  * only a single worker process is supported, so this function may only be
87  * called once.
88  *
89  * The client should call worker_run() and worker_wait() from its main loop.
90  *
91  * Call this function between daemonize_start() and daemonize_complete(). */
92 void
93 worker_start(void)
94 {
95     int work_fds[2];
96
97     assert(client_sock < 0);
98
99     /* Create non-blocking socket pair. */
100     xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
101     xset_nonblocking(work_fds[0]);
102     xset_nonblocking(work_fds[1]);
103
104     /* Don't let the worker process own the responsibility to delete
105      * the pidfile.  Register it again after the fork. */
106     remove_pidfile_from_unlink();
107     if (!fork_and_clean_up()) {
108         /* In child (worker) process. */
109         daemonize_post_detach();
110         close(work_fds[0]);
111         worker_main(work_fds[1]);
112         NOT_REACHED();
113     }
114
115     /* In parent (main) process. */
116     add_pidfile_to_unlink();
117     close(work_fds[1]);
118     client_sock = work_fds[0];
119     rxbuf_init(&client_rx);
120 }
121
122 /* Returns true if this process has started a worker and the worker is not
123  * known to have malfunctioned. */
124 bool
125 worker_is_running(void)
126 {
127     return client_sock >= 0;
128 }
129
130 /* If a worker process was started, processes RPC replies from it, calling the
131  * registered 'reply_cb' callbacks.
132  *
133  * If the worker process died or malfunctioned, aborts. */
134 void
135 worker_run(void)
136 {
137     if (worker_is_running()) {
138         int error;
139
140         error = rxbuf_run(&client_rx, client_sock,
141                           sizeof(struct worker_reply));
142         if (!error) {
143             struct worker_reply *reply = client_rx.header.data;
144             reply->reply_cb(&client_rx.payload, client_rx.fds,
145                             client_rx.n_fds, reply->reply_aux);
146             rxbuf_clear(&client_rx);
147         } else if (error != EAGAIN) {
148             worker_broke();
149             VLOG_ABORT("receive from worker failed (%s)",
150                        ovs_retval_to_string(error));
151         }
152     }
153 }
154
155 /* Causes the poll loop to wake up if we need to process RPC replies. */
156 void
157 worker_wait(void)
158 {
159     if (worker_is_running()) {
160         poll_fd_wait(client_sock, POLLIN);
161     }
162 }
163 \f
164 /* Interface for main process to interact with the worker. */
165
166 /* Sends an RPC request to the worker process.  The worker process will call
167  * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
168  * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
169  * in 'fds'.
170  *
171  * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
172  * or worker_reply_iovec() with a reply.  The main process will later call
173  * 'reply_cb' with the reply data (if any) and file descriptors (if any).
174  *
175  * 'request_cb' receives copies (as if by dup()) of the file descriptors in
176  * fds[].  'request_cb' takes ownership of these copies, and the caller of
177  * worker_request() retains its ownership of the originals.
178  *
179  * This function may block until the RPC request has been sent (if the socket
180  * buffer fills up) but it does not wait for the reply (if any).  If this
181  * function blocks, it may invoke reply callbacks for previous requests.
182  *
183  * The worker process executes RPC requests in strict order of submission and
184  * runs each request to completion before beginning the next request.  The main
185  * process invokes reply callbacks in strict order of request submission. */
186 void
187 worker_request(const void *data, size_t size,
188                const int fds[], size_t n_fds,
189                worker_request_func *request_cb,
190                worker_reply_func *reply_cb, void *aux)
191 {
192     if (size > 0) {
193         struct iovec iov;
194
195         iov.iov_base = (void *) data;
196         iov.iov_len = size;
197         worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
198     } else {
199         worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
200     }
201 }
202
203 static int
204 worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
205                   const int fds[], size_t n_fds)
206 {
207     size_t sent = 0;
208
209     for (;;) {
210         struct pollfd pfd;
211         int error;
212
213         /* Try to send the rest of the request. */
214         error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
215                                          fds, n_fds, sent, &sent);
216         if (error != EAGAIN) {
217             return error;
218         }
219
220         /* Process replies to avoid deadlock. */
221         worker_run();
222
223         /* Wait for 'client_sock' to become ready before trying again.  We
224          * can't use poll_block() because it sometimes calls into vlog, which
225          * calls indirectly into worker_send_iovec().  To be usable here,
226          * poll_block() would therefore need to be reentrant, but it isn't
227          * (calling it recursively causes memory corruption and an eventual
228          * crash). */
229         pfd.fd = client_sock;
230         pfd.events = POLLIN | POLLOUT;
231         do {
232             error = poll(&pfd, 1, -1) < 0 ? errno : 0;
233         } while (error == EINTR);
234         if (error) {
235             worker_broke();
236             VLOG_ABORT("poll failed (%s)", ovs_strerror(error));
237         }
238     }
239 }
240
241 /* Same as worker_request() except that the data to send is specified as an
242  * array of iovecs. */
243 void
244 worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
245                      const int fds[], size_t n_fds,
246                      worker_request_func *request_cb,
247                      worker_reply_func *reply_cb, void *aux)
248 {
249     static bool recursing = false;
250     struct worker_request rq;
251     struct iovec *all_iovs;
252     int error;
253
254     assert(worker_is_running());
255     assert(!recursing);
256     recursing = true;
257
258     rq.request_len = iovec_len(iovs, n_iovs);
259     rq.request_cb = request_cb;
260     rq.reply_cb = reply_cb;
261     rq.reply_aux = aux;
262
263     all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
264     error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
265     if (error) {
266         worker_broke();
267         VLOG_ABORT("send failed (%s)", ovs_strerror(error));
268     }
269     free(all_iovs);
270
271     recursing = false;
272 }
273
274 /* Closes the client socket, if any, so that worker_is_running() will return
275  * false.
276  *
277  * The client does this just before aborting if the worker process dies or
278  * malfunctions, to prevent the logging subsystem from trying to use the
279  * worker to log the failure. */
280 static void
281 worker_broke(void)
282 {
283     if (client_sock >= 0) {
284         close(client_sock);
285         client_sock = -1;
286     }
287 }
288 \f
289 /* Interfaces for RPC implementations (running in the worker process). */
290
291 static int server_sock = -1;
292 static bool expect_reply;
293 static struct worker_request request;
294
295 /* When a call to worker_request() or worker_request_iovec() provides a
296  * 'reply_cb' callback, the 'request_cb' implementation must call this function
297  * to send its reply.  The main process will call 'reply_cb' passing the
298  * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
299  * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
300  *
301  * If a call to worker_request() or worker_request_iovec() provides no
302  * 'reply_cb' callback, the 'request_cb' implementation must not call this
303  * function.
304  *
305  * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
306  * fds[].  'reply_cb' takes ownership of these copies, and the caller of
307  * worker_reply() retains its ownership of the originals.
308  *
309  * This function blocks until the RPC reply has been sent (if the socket buffer
310  * fills up) but it does not wait for the main process to receive or to process
311  * the reply. */
312 void
313 worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
314 {
315     if (size > 0) {
316         struct iovec iov;
317
318         iov.iov_base = (void *) data;
319         iov.iov_len = size;
320         worker_reply_iovec(&iov, 1, fds, n_fds);
321     } else {
322         worker_reply_iovec(NULL, 0, fds, n_fds);
323     }
324 }
325
326 /* Same as worker_reply() except that the data to send is specified as an array
327  * of iovecs. */
328 void
329 worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
330                        const int fds[], size_t n_fds)
331 {
332     struct worker_reply reply;
333     struct iovec *all_iovs;
334     int error;
335
336     assert(expect_reply);
337     expect_reply = false;
338
339     reply.reply_len = iovec_len(iovs, n_iovs);
340     reply.reply_cb = request.reply_cb;
341     reply.reply_aux = request.reply_aux;
342
343     all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
344
345     error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
346                                            fds, n_fds);
347     if (error == EPIPE) {
348         /* Parent probably died.  Continue processing any RPCs still buffered,
349          * to avoid missing log messages. */
350         VLOG_INFO("send failed (%s)", ovs_strerror(error));
351     } else if (error) {
352         VLOG_FATAL("send failed (%s)", ovs_strerror(error));
353     }
354
355     free(all_iovs);
356 }
357
358 static void
359 worker_main(int fd)
360 {
361     struct rxbuf rx;
362
363     server_sock = fd;
364
365     subprogram_name = "worker";
366     proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
367     VLOG_INFO("worker process started");
368
369     rxbuf_init(&rx);
370     for (;;) {
371         int error;
372
373         error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
374         if (!error) {
375             request = *(struct worker_request *) rx.header.data;
376
377             expect_reply = request.reply_cb != NULL;
378             request.request_cb(&rx.payload, rx.fds, rx.n_fds);
379             assert(!expect_reply);
380
381             rxbuf_clear(&rx);
382         } else if (error == EOF && !rx.header.size) {
383             /* Main process closed the IPC socket.  Exit cleanly. */
384             break;
385         } else if (error != EAGAIN) {
386             VLOG_FATAL("RPC receive failed (%s)", ovs_retval_to_string(error));
387         }
388
389         poll_fd_wait(server_sock, POLLIN);
390         poll_block();
391     }
392
393     VLOG_INFO("worker process exiting");
394     exit(0);
395 }
396 \f
397 static void
398 rxbuf_init(struct rxbuf *rx)
399 {
400     ofpbuf_init(&rx->header, 0);
401     rx->n_fds = 0;
402     ofpbuf_init(&rx->payload, 0);
403 }
404
405 static void
406 rxbuf_clear(struct rxbuf *rx)
407 {
408     ofpbuf_clear(&rx->header);
409     rx->n_fds = 0;
410     ofpbuf_clear(&rx->payload);
411 }
412
413 static int
414 rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
415 {
416     for (;;) {
417         if (!rx->header.size) {
418             int retval;
419
420             ofpbuf_clear(&rx->header);
421             ofpbuf_prealloc_tailroom(&rx->header, header_len);
422
423             retval = recv_data_and_fds(sock, rx->header.data, header_len,
424                                        rx->fds, &rx->n_fds);
425             if (retval <= 0) {
426                 return retval ? -retval : EOF;
427             }
428             rx->header.size += retval;
429         } else if (rx->header.size < header_len) {
430             size_t bytes_read;
431             int error;
432
433             error = read_fully(sock, ofpbuf_tail(&rx->header),
434                                header_len - rx->header.size, &bytes_read);
435             rx->header.size += bytes_read;
436             if (error) {
437                 return error;
438             }
439         } else {
440             size_t payload_len = *(size_t *) rx->header.data;
441
442             if (rx->payload.size < payload_len) {
443                 size_t left = payload_len - rx->payload.size;
444                 size_t bytes_read;
445                 int error;
446
447                 ofpbuf_prealloc_tailroom(&rx->payload, left);
448                 error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
449                                    &bytes_read);
450                 rx->payload.size += bytes_read;
451                 if (error) {
452                     return error;
453                 }
454             } else {
455                 return 0;
456             }
457         }
458     }
459 }
460
461 static struct iovec *
462 prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
463 {
464     struct iovec *dst;
465
466     dst = xmalloc((n_iovs + 1) * sizeof *dst);
467     dst[0].iov_base = data;
468     dst[0].iov_len = len;
469     memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
470
471     return dst;
472 }