worker: Make worker_request_iovec() verify that it is not being reentered.
[sliver-openvswitch.git] / lib / worker.c
1 /* Copyright (c) 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "worker.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <sys/uio.h>
27 #include <sys/wait.h>
28 #include <unistd.h>
29
30 #include "command-line.h"
31 #include "daemon.h"
32 #include "ofpbuf.h"
33 #include "poll-loop.h"
34 #include "socket-util.h"
35 #include "util.h"
36 #include "vlog.h"
37
38 VLOG_DEFINE_THIS_MODULE(worker);
39
40 /* Header for an RPC request. */
41 struct worker_request {
42     size_t request_len;              /* Length of the payload in bytes. */
43     worker_request_func *request_cb; /* Function to call in worker process. */
44     worker_reply_func *reply_cb;     /* Function to call in main process. */
45     void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
46 };
47
48 /* Header for an RPC reply. */
49 struct worker_reply {
50     size_t reply_len;            /* Length of the payload in bytes. */
51     worker_reply_func *reply_cb; /* Function to call in main process. */
52     void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
53 };
54
55 /* Receive buffer for a RPC request or reply. */
56 struct rxbuf {
57     /* Header. */
58     struct ofpbuf header;       /* Header data. */
59     int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
60     size_t n_fds;
61
62     /* Payload. */
63     struct ofpbuf payload;      /* Payload data. */
64 };
65
66 static int client_sock = -1;
67 static struct rxbuf client_rx;
68
69 static void rxbuf_init(struct rxbuf *);
70 static void rxbuf_clear(struct rxbuf *);
71 static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
72
73 static struct iovec *prefix_iov(void *data, size_t len,
74                                 const struct iovec *iovs, size_t n_iovs);
75
76 static void worker_broke(void);
77
78 static void worker_main(int fd) NO_RETURN;
79
80 /* Starts a worker process as a subprocess of the current process.  Currently
81  * only a single worker process is supported, so this function may only be
82  * called once.
83  *
84  * The client should call worker_run() and worker_wait() from its main loop.
85  *
86  * Call this function between daemonize_start() and daemonize_complete(). */
87 void
88 worker_start(void)
89 {
90     int work_fds[2];
91
92     assert(client_sock < 0);
93
94     /* Create non-blocking socket pair. */
95     xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
96     xset_nonblocking(work_fds[0]);
97     xset_nonblocking(work_fds[1]);
98
99     if (!fork_and_clean_up()) {
100         /* In child (worker) process. */
101         daemonize_post_detach();
102         close(work_fds[0]);
103         worker_main(work_fds[1]);
104         NOT_REACHED();
105     }
106
107     /* In parent (main) process. */
108     close(work_fds[1]);
109     client_sock = work_fds[0];
110     rxbuf_init(&client_rx);
111 }
112
113 /* Returns true if this process has started a worker and the worker is not
114  * known to have malfunctioned. */
115 bool
116 worker_is_running(void)
117 {
118     return client_sock >= 0;
119 }
120
121 /* If a worker process was started, processes RPC replies from it, calling the
122  * registered 'reply_cb' callbacks.
123  *
124  * If the worker process died or malfunctioned, aborts. */
125 void
126 worker_run(void)
127 {
128     if (worker_is_running()) {
129         int error;
130
131         error = rxbuf_run(&client_rx, client_sock,
132                           sizeof(struct worker_reply));
133         if (!error) {
134             struct worker_reply *reply = client_rx.header.data;
135             reply->reply_cb(&client_rx.payload, client_rx.fds,
136                             client_rx.n_fds, reply->reply_aux);
137             rxbuf_clear(&client_rx);
138         } else if (error != EAGAIN) {
139             worker_broke();
140             VLOG_ABORT("receive from worker failed (%s)",
141                        ovs_retval_to_string(error));
142         }
143     }
144 }
145
146 /* Causes the poll loop to wake up if we need to process RPC replies. */
147 void
148 worker_wait(void)
149 {
150     if (worker_is_running()) {
151         poll_fd_wait(client_sock, POLLIN);
152     }
153 }
154 \f
155 /* Interface for main process to interact with the worker. */
156
157 /* Sends an RPC request to the worker process.  The worker process will call
158  * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
159  * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
160  * in 'fds'.
161  *
162  * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
163  * or worker_reply_iovec() with a reply.  The main process will later call
164  * 'reply_cb' with the reply data (if any) and file descriptors (if any).
165  *
166  * 'request_cb' receives copies (as if by dup()) of the file descriptors in
167  * fds[].  'request_cb' takes ownership of these copies, and the caller of
168  * worker_request() retains its ownership of the originals.
169  *
170  * This function may block until the RPC request has been sent (if the socket
171  * buffer fills up) but it does not wait for the reply (if any).  If this
172  * function blocks, it may invoke reply callbacks for previous requests.
173  *
174  * The worker process executes RPC requests in strict order of submission and
175  * runs each request to completion before beginning the next request.  The main
176  * process invokes reply callbacks in strict order of request submission. */
177 void
178 worker_request(const void *data, size_t size,
179                const int fds[], size_t n_fds,
180                worker_request_func *request_cb,
181                worker_reply_func *reply_cb, void *aux)
182 {
183     if (size > 0) {
184         struct iovec iov;
185
186         iov.iov_base = (void *) data;
187         iov.iov_len = size;
188         worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
189     } else {
190         worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
191     }
192 }
193
194 static int
195 worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
196                   const int fds[], size_t n_fds)
197 {
198     size_t sent = 0;
199
200     for (;;) {
201         struct pollfd pfd;
202         int error;
203
204         /* Try to send the rest of the request. */
205         error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
206                                          fds, n_fds, sent, &sent);
207         if (error != EAGAIN) {
208             return error;
209         }
210
211         /* Process replies to avoid deadlock. */
212         worker_run();
213
214         /* Wait for 'client_sock' to become ready before trying again.  We
215          * can't use poll_block() because it sometimes calls into vlog, which
216          * calls indirectly into worker_send_iovec().  To be usable here,
217          * poll_block() would therefore need to be reentrant, but it isn't
218          * (calling it recursively causes memory corruption and an eventual
219          * crash). */
220         pfd.fd = client_sock;
221         pfd.events = POLLIN | POLLOUT;
222         do {
223             error = poll(&pfd, 1, -1) < 0 ? errno : 0;
224         } while (error == EINTR);
225         if (error) {
226             worker_broke();
227             VLOG_ABORT("poll failed (%s)", strerror(error));
228         }
229     }
230 }
231
232 /* Same as worker_request() except that the data to send is specified as an
233  * array of iovecs. */
234 void
235 worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
236                      const int fds[], size_t n_fds,
237                      worker_request_func *request_cb,
238                      worker_reply_func *reply_cb, void *aux)
239 {
240     static bool recursing = false;
241     struct worker_request rq;
242     struct iovec *all_iovs;
243     int error;
244
245     assert(worker_is_running());
246     assert(!recursing);
247     recursing = true;
248
249     rq.request_len = iovec_len(iovs, n_iovs);
250     rq.request_cb = request_cb;
251     rq.reply_cb = reply_cb;
252     rq.reply_aux = aux;
253
254     all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
255     error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
256     if (error) {
257         worker_broke();
258         VLOG_ABORT("send failed (%s)", strerror(error));
259     }
260     free(all_iovs);
261
262     recursing = false;
263 }
264
265 /* Closes the client socket, if any, so that worker_is_running() will return
266  * false.
267  *
268  * The client does this just before aborting if the worker process dies or
269  * malfunctions, to prevent the logging subsystem from trying to use the
270  * worker to log the failure. */
271 static void
272 worker_broke(void)
273 {
274     if (client_sock >= 0) {
275         close(client_sock);
276         client_sock = -1;
277     }
278 }
279 \f
280 /* Interfaces for RPC implementations (running in the worker process). */
281
282 static int server_sock = -1;
283 static bool expect_reply;
284 static struct worker_request request;
285
286 /* When a call to worker_request() or worker_request_iovec() provides a
287  * 'reply_cb' callback, the 'request_cb' implementation must call this function
288  * to send its reply.  The main process will call 'reply_cb' passing the
289  * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
290  * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
291  *
292  * If a call to worker_request() or worker_request_iovec() provides no
293  * 'reply_cb' callback, the 'request_cb' implementation must not call this
294  * function.
295  *
296  * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
297  * fds[].  'reply_cb' takes ownership of these copies, and the caller of
298  * worker_reply() retains its ownership of the originals.
299  *
300  * This function blocks until the RPC reply has been sent (if the socket buffer
301  * fills up) but it does not wait for the main process to receive or to process
302  * the reply. */
303 void
304 worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
305 {
306     if (size > 0) {
307         struct iovec iov;
308
309         iov.iov_base = (void *) data;
310         iov.iov_len = size;
311         worker_reply_iovec(&iov, 1, fds, n_fds);
312     } else {
313         worker_reply_iovec(NULL, 0, fds, n_fds);
314     }
315 }
316
317 /* Same as worker_reply() except that the data to send is specified as an array
318  * of iovecs. */
319 void
320 worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
321                        const int fds[], size_t n_fds)
322 {
323     struct worker_reply reply;
324     struct iovec *all_iovs;
325     int error;
326
327     assert(expect_reply);
328     expect_reply = false;
329
330     reply.reply_len = iovec_len(iovs, n_iovs);
331     reply.reply_cb = request.reply_cb;
332     reply.reply_aux = request.reply_aux;
333
334     all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
335
336     error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
337                                            fds, n_fds);
338     if (error == EPIPE) {
339         /* Parent probably died.  Continue processing any RPCs still buffered,
340          * to avoid missing log messages. */
341         VLOG_INFO("send failed (%s)", strerror(error));
342     } else if (error) {
343         VLOG_ABORT("send failed (%s)", strerror(error));
344     }
345
346     free(all_iovs);
347 }
348
349 static void
350 worker_main(int fd)
351 {
352     struct rxbuf rx;
353
354     server_sock = fd;
355
356     subprogram_name = "worker";
357     proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
358     VLOG_INFO("worker process started");
359
360     rxbuf_init(&rx);
361     for (;;) {
362         int error;
363
364         error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
365         if (!error) {
366             request = *(struct worker_request *) rx.header.data;
367
368             expect_reply = request.reply_cb != NULL;
369             request.request_cb(&rx.payload, rx.fds, rx.n_fds);
370             assert(!expect_reply);
371
372             rxbuf_clear(&rx);
373         } else if (error == EOF && !rx.header.size) {
374             /* Main process closed the IPC socket.  Exit cleanly. */
375             break;
376         } else if (error != EAGAIN) {
377             VLOG_ABORT("RPC receive failed (%s)", strerror(error));
378         }
379
380         poll_fd_wait(server_sock, POLLIN);
381         poll_block();
382     }
383
384     VLOG_INFO("worker process exiting");
385     exit(0);
386 }
387 \f
388 static void
389 rxbuf_init(struct rxbuf *rx)
390 {
391     ofpbuf_init(&rx->header, 0);
392     rx->n_fds = 0;
393     ofpbuf_init(&rx->payload, 0);
394 }
395
396 static void
397 rxbuf_clear(struct rxbuf *rx)
398 {
399     ofpbuf_clear(&rx->header);
400     rx->n_fds = 0;
401     ofpbuf_clear(&rx->payload);
402 }
403
404 static int
405 rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
406 {
407     for (;;) {
408         if (!rx->header.size) {
409             int retval;
410
411             ofpbuf_clear(&rx->header);
412             ofpbuf_prealloc_tailroom(&rx->header, header_len);
413
414             retval = recv_data_and_fds(sock, rx->header.data, header_len,
415                                        rx->fds, &rx->n_fds);
416             if (retval <= 0) {
417                 return retval ? -retval : EOF;
418             }
419             rx->header.size += retval;
420         } else if (rx->header.size < header_len) {
421             size_t bytes_read;
422             int error;
423
424             error = read_fully(sock, ofpbuf_tail(&rx->header),
425                                header_len - rx->header.size, &bytes_read);
426             rx->header.size += bytes_read;
427             if (error) {
428                 return error;
429             }
430         } else {
431             size_t payload_len = *(size_t *) rx->header.data;
432
433             if (rx->payload.size < payload_len) {
434                 size_t left = payload_len - rx->payload.size;
435                 size_t bytes_read;
436                 int error;
437
438                 ofpbuf_prealloc_tailroom(&rx->payload, left);
439                 error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
440                                    &bytes_read);
441                 rx->payload.size += bytes_read;
442                 if (error) {
443                     return error;
444                 }
445             } else {
446                 return 0;
447             }
448         }
449     }
450
451     return EAGAIN;
452 }
453
454 static struct iovec *
455 prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
456 {
457     struct iovec *dst;
458
459     dst = xmalloc((n_iovs + 1) * sizeof *dst);
460     dst[0].iov_base = data;
461     dst[0].iov_len = len;
462     memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
463
464     return dst;
465 }