6ca05cdbec04aba04017734b7522bff1b442b543
[sliver-openvswitch.git] / lib / worker.c
1 /* Copyright (c) 2012, 2013 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "worker.h"
19
20 #include <assert.h>
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <sys/uio.h>
27 #include <sys/wait.h>
28 #include <unistd.h>
29
30 #include "command-line.h"
31 #include "daemon.h"
32 #include "ofpbuf.h"
33 #include "poll-loop.h"
34 #include "socket-util.h"
35 #include "util.h"
36 #include "vlog.h"
37
38 VLOG_DEFINE_THIS_MODULE(worker);
39
40 /* ovs_assert() logs the assertion message and logging sometimes goes through a
41  * worker, so using ovs_assert() in this source file could cause recursion. */
42 #undef ovs_assert
43 #define ovs_assert use_assert_instead_of_ovs_assert_in_this_module
44
45 /* Header for an RPC request. */
46 struct worker_request {
47     size_t request_len;              /* Length of the payload in bytes. */
48     worker_request_func *request_cb; /* Function to call in worker process. */
49     worker_reply_func *reply_cb;     /* Function to call in main process. */
50     void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
51 };
52
53 /* Header for an RPC reply. */
54 struct worker_reply {
55     size_t reply_len;            /* Length of the payload in bytes. */
56     worker_reply_func *reply_cb; /* Function to call in main process. */
57     void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
58 };
59
60 /* Receive buffer for a RPC request or reply. */
61 struct rxbuf {
62     /* Header. */
63     struct ofpbuf header;       /* Header data. */
64     int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
65     size_t n_fds;
66
67     /* Payload. */
68     struct ofpbuf payload;      /* Payload data. */
69 };
70
71 static int client_sock = -1;
72 static struct rxbuf client_rx;
73
74 static void rxbuf_init(struct rxbuf *);
75 static void rxbuf_clear(struct rxbuf *);
76 static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
77
78 static struct iovec *prefix_iov(void *data, size_t len,
79                                 const struct iovec *iovs, size_t n_iovs);
80
81 static void worker_broke(void);
82
83 static void worker_main(int fd) NO_RETURN;
84
85 /* Starts a worker process as a subprocess of the current process.  Currently
86  * only a single worker process is supported, so this function may only be
87  * called once.
88  *
89  * The client should call worker_run() and worker_wait() from its main loop.
90  *
91  * Call this function between daemonize_start() and daemonize_complete(). */
92 void
93 worker_start(void)
94 {
95     int work_fds[2];
96
97     assert(client_sock < 0);
98
99     /* Create non-blocking socket pair. */
100     xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
101     xset_nonblocking(work_fds[0]);
102     xset_nonblocking(work_fds[1]);
103
104     if (!fork_and_clean_up()) {
105         /* In child (worker) process. */
106         daemonize_post_detach();
107         close(work_fds[0]);
108         worker_main(work_fds[1]);
109         NOT_REACHED();
110     }
111
112     /* In parent (main) process. */
113     close(work_fds[1]);
114     client_sock = work_fds[0];
115     rxbuf_init(&client_rx);
116 }
117
118 /* Returns true if this process has started a worker and the worker is not
119  * known to have malfunctioned. */
120 bool
121 worker_is_running(void)
122 {
123     return client_sock >= 0;
124 }
125
126 /* If a worker process was started, processes RPC replies from it, calling the
127  * registered 'reply_cb' callbacks.
128  *
129  * If the worker process died or malfunctioned, aborts. */
130 void
131 worker_run(void)
132 {
133     if (worker_is_running()) {
134         int error;
135
136         error = rxbuf_run(&client_rx, client_sock,
137                           sizeof(struct worker_reply));
138         if (!error) {
139             struct worker_reply *reply = client_rx.header.data;
140             reply->reply_cb(&client_rx.payload, client_rx.fds,
141                             client_rx.n_fds, reply->reply_aux);
142             rxbuf_clear(&client_rx);
143         } else if (error != EAGAIN) {
144             worker_broke();
145             VLOG_ABORT("receive from worker failed (%s)",
146                        ovs_retval_to_string(error));
147         }
148     }
149 }
150
151 /* Causes the poll loop to wake up if we need to process RPC replies. */
152 void
153 worker_wait(void)
154 {
155     if (worker_is_running()) {
156         poll_fd_wait(client_sock, POLLIN);
157     }
158 }
159 \f
160 /* Interface for main process to interact with the worker. */
161
162 /* Sends an RPC request to the worker process.  The worker process will call
163  * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
164  * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
165  * in 'fds'.
166  *
167  * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
168  * or worker_reply_iovec() with a reply.  The main process will later call
169  * 'reply_cb' with the reply data (if any) and file descriptors (if any).
170  *
171  * 'request_cb' receives copies (as if by dup()) of the file descriptors in
172  * fds[].  'request_cb' takes ownership of these copies, and the caller of
173  * worker_request() retains its ownership of the originals.
174  *
175  * This function may block until the RPC request has been sent (if the socket
176  * buffer fills up) but it does not wait for the reply (if any).  If this
177  * function blocks, it may invoke reply callbacks for previous requests.
178  *
179  * The worker process executes RPC requests in strict order of submission and
180  * runs each request to completion before beginning the next request.  The main
181  * process invokes reply callbacks in strict order of request submission. */
182 void
183 worker_request(const void *data, size_t size,
184                const int fds[], size_t n_fds,
185                worker_request_func *request_cb,
186                worker_reply_func *reply_cb, void *aux)
187 {
188     if (size > 0) {
189         struct iovec iov;
190
191         iov.iov_base = (void *) data;
192         iov.iov_len = size;
193         worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
194     } else {
195         worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
196     }
197 }
198
199 static int
200 worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
201                   const int fds[], size_t n_fds)
202 {
203     size_t sent = 0;
204
205     for (;;) {
206         struct pollfd pfd;
207         int error;
208
209         /* Try to send the rest of the request. */
210         error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
211                                          fds, n_fds, sent, &sent);
212         if (error != EAGAIN) {
213             return error;
214         }
215
216         /* Process replies to avoid deadlock. */
217         worker_run();
218
219         /* Wait for 'client_sock' to become ready before trying again.  We
220          * can't use poll_block() because it sometimes calls into vlog, which
221          * calls indirectly into worker_send_iovec().  To be usable here,
222          * poll_block() would therefore need to be reentrant, but it isn't
223          * (calling it recursively causes memory corruption and an eventual
224          * crash). */
225         pfd.fd = client_sock;
226         pfd.events = POLLIN | POLLOUT;
227         do {
228             error = poll(&pfd, 1, -1) < 0 ? errno : 0;
229         } while (error == EINTR);
230         if (error) {
231             worker_broke();
232             VLOG_ABORT("poll failed (%s)", strerror(error));
233         }
234     }
235 }
236
237 /* Same as worker_request() except that the data to send is specified as an
238  * array of iovecs. */
239 void
240 worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
241                      const int fds[], size_t n_fds,
242                      worker_request_func *request_cb,
243                      worker_reply_func *reply_cb, void *aux)
244 {
245     static bool recursing = false;
246     struct worker_request rq;
247     struct iovec *all_iovs;
248     int error;
249
250     assert(worker_is_running());
251     assert(!recursing);
252     recursing = true;
253
254     rq.request_len = iovec_len(iovs, n_iovs);
255     rq.request_cb = request_cb;
256     rq.reply_cb = reply_cb;
257     rq.reply_aux = aux;
258
259     all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
260     error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
261     if (error) {
262         worker_broke();
263         VLOG_ABORT("send failed (%s)", strerror(error));
264     }
265     free(all_iovs);
266
267     recursing = false;
268 }
269
270 /* Closes the client socket, if any, so that worker_is_running() will return
271  * false.
272  *
273  * The client does this just before aborting if the worker process dies or
274  * malfunctions, to prevent the logging subsystem from trying to use the
275  * worker to log the failure. */
276 static void
277 worker_broke(void)
278 {
279     if (client_sock >= 0) {
280         close(client_sock);
281         client_sock = -1;
282     }
283 }
284 \f
285 /* Interfaces for RPC implementations (running in the worker process). */
286
287 static int server_sock = -1;
288 static bool expect_reply;
289 static struct worker_request request;
290
291 /* When a call to worker_request() or worker_request_iovec() provides a
292  * 'reply_cb' callback, the 'request_cb' implementation must call this function
293  * to send its reply.  The main process will call 'reply_cb' passing the
294  * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
295  * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
296  *
297  * If a call to worker_request() or worker_request_iovec() provides no
298  * 'reply_cb' callback, the 'request_cb' implementation must not call this
299  * function.
300  *
301  * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
302  * fds[].  'reply_cb' takes ownership of these copies, and the caller of
303  * worker_reply() retains its ownership of the originals.
304  *
305  * This function blocks until the RPC reply has been sent (if the socket buffer
306  * fills up) but it does not wait for the main process to receive or to process
307  * the reply. */
308 void
309 worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
310 {
311     if (size > 0) {
312         struct iovec iov;
313
314         iov.iov_base = (void *) data;
315         iov.iov_len = size;
316         worker_reply_iovec(&iov, 1, fds, n_fds);
317     } else {
318         worker_reply_iovec(NULL, 0, fds, n_fds);
319     }
320 }
321
322 /* Same as worker_reply() except that the data to send is specified as an array
323  * of iovecs. */
324 void
325 worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
326                        const int fds[], size_t n_fds)
327 {
328     struct worker_reply reply;
329     struct iovec *all_iovs;
330     int error;
331
332     assert(expect_reply);
333     expect_reply = false;
334
335     reply.reply_len = iovec_len(iovs, n_iovs);
336     reply.reply_cb = request.reply_cb;
337     reply.reply_aux = request.reply_aux;
338
339     all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
340
341     error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
342                                            fds, n_fds);
343     if (error == EPIPE) {
344         /* Parent probably died.  Continue processing any RPCs still buffered,
345          * to avoid missing log messages. */
346         VLOG_INFO("send failed (%s)", strerror(error));
347     } else if (error) {
348         VLOG_ABORT("send failed (%s)", strerror(error));
349     }
350
351     free(all_iovs);
352 }
353
354 static void
355 worker_main(int fd)
356 {
357     struct rxbuf rx;
358
359     server_sock = fd;
360
361     subprogram_name = "worker";
362     proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
363     VLOG_INFO("worker process started");
364
365     rxbuf_init(&rx);
366     for (;;) {
367         int error;
368
369         error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
370         if (!error) {
371             request = *(struct worker_request *) rx.header.data;
372
373             expect_reply = request.reply_cb != NULL;
374             request.request_cb(&rx.payload, rx.fds, rx.n_fds);
375             assert(!expect_reply);
376
377             rxbuf_clear(&rx);
378         } else if (error == EOF && !rx.header.size) {
379             /* Main process closed the IPC socket.  Exit cleanly. */
380             break;
381         } else if (error != EAGAIN) {
382             VLOG_ABORT("RPC receive failed (%s)", strerror(error));
383         }
384
385         poll_fd_wait(server_sock, POLLIN);
386         poll_block();
387     }
388
389     VLOG_INFO("worker process exiting");
390     exit(0);
391 }
392 \f
393 static void
394 rxbuf_init(struct rxbuf *rx)
395 {
396     ofpbuf_init(&rx->header, 0);
397     rx->n_fds = 0;
398     ofpbuf_init(&rx->payload, 0);
399 }
400
401 static void
402 rxbuf_clear(struct rxbuf *rx)
403 {
404     ofpbuf_clear(&rx->header);
405     rx->n_fds = 0;
406     ofpbuf_clear(&rx->payload);
407 }
408
409 static int
410 rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
411 {
412     for (;;) {
413         if (!rx->header.size) {
414             int retval;
415
416             ofpbuf_clear(&rx->header);
417             ofpbuf_prealloc_tailroom(&rx->header, header_len);
418
419             retval = recv_data_and_fds(sock, rx->header.data, header_len,
420                                        rx->fds, &rx->n_fds);
421             if (retval <= 0) {
422                 return retval ? -retval : EOF;
423             }
424             rx->header.size += retval;
425         } else if (rx->header.size < header_len) {
426             size_t bytes_read;
427             int error;
428
429             error = read_fully(sock, ofpbuf_tail(&rx->header),
430                                header_len - rx->header.size, &bytes_read);
431             rx->header.size += bytes_read;
432             if (error) {
433                 return error;
434             }
435         } else {
436             size_t payload_len = *(size_t *) rx->header.data;
437
438             if (rx->payload.size < payload_len) {
439                 size_t left = payload_len - rx->payload.size;
440                 size_t bytes_read;
441                 int error;
442
443                 ofpbuf_prealloc_tailroom(&rx->payload, left);
444                 error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
445                                    &bytes_read);
446                 rx->payload.size += bytes_read;
447                 if (error) {
448                     return error;
449                 }
450             } else {
451                 return 0;
452             }
453         }
454     }
455 }
456
457 static struct iovec *
458 prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
459 {
460     struct iovec *dst;
461
462     dst = xmalloc((n_iovs + 1) * sizeof *dst);
463     dst[0].iov_base = data;
464     dst[0].iov_len = len;
465     memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
466
467     return dst;
468 }