1 /* Copyright (c) 2012, 2013 Nicira, Inc.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
24 #include <sys/socket.h>
25 #include <sys/types.h>
30 #include "command-line.h"
33 #include "poll-loop.h"
34 #include "socket-util.h"
38 VLOG_DEFINE_THIS_MODULE(worker);
40 /* ovs_assert() logs the assertion message and logging sometimes goes through a
41 * worker, so using ovs_assert() in this source file could cause recursion. */
43 #define ovs_assert use_assert_instead_of_ovs_assert_in_this_module
45 /* Header for an RPC request. */
46 struct worker_request {
47 size_t request_len; /* Length of the payload in bytes. */
48 worker_request_func *request_cb; /* Function to call in worker process. */
49 worker_reply_func *reply_cb; /* Function to call in main process. */
50 void *reply_aux; /* Auxiliary data for 'reply_cb'. */
53 /* Header for an RPC reply. */
55 size_t reply_len; /* Length of the payload in bytes. */
56 worker_reply_func *reply_cb; /* Function to call in main process. */
57 void *reply_aux; /* Auxiliary data for 'reply_cb'. */
60 /* Receive buffer for a RPC request or reply. */
63 struct ofpbuf header; /* Header data. */
64 int fds[SOUTIL_MAX_FDS]; /* File descriptors. */
68 struct ofpbuf payload; /* Payload data. */
71 static int client_sock = -1;
72 static struct rxbuf client_rx;
74 static void rxbuf_init(struct rxbuf *);
75 static void rxbuf_clear(struct rxbuf *);
76 static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
78 static struct iovec *prefix_iov(void *data, size_t len,
79 const struct iovec *iovs, size_t n_iovs);
81 static void worker_broke(void);
83 static void worker_main(int fd) NO_RETURN;
85 /* Starts a worker process as a subprocess of the current process. Currently
86 * only a single worker process is supported, so this function may only be
89 * The client should call worker_run() and worker_wait() from its main loop.
91 * Call this function between daemonize_start() and daemonize_complete(). */
97 assert(client_sock < 0);
99 /* Create non-blocking socket pair. */
100 xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
101 xset_nonblocking(work_fds[0]);
102 xset_nonblocking(work_fds[1]);
104 if (!fork_and_clean_up()) {
105 /* In child (worker) process. */
106 daemonize_post_detach();
108 worker_main(work_fds[1]);
112 /* In parent (main) process. */
114 client_sock = work_fds[0];
115 rxbuf_init(&client_rx);
118 /* Returns true if this process has started a worker and the worker is not
119 * known to have malfunctioned. */
121 worker_is_running(void)
123 return client_sock >= 0;
126 /* If a worker process was started, processes RPC replies from it, calling the
127 * registered 'reply_cb' callbacks.
129 * If the worker process died or malfunctioned, aborts. */
133 if (worker_is_running()) {
136 error = rxbuf_run(&client_rx, client_sock,
137 sizeof(struct worker_reply));
139 struct worker_reply *reply = client_rx.header.data;
140 reply->reply_cb(&client_rx.payload, client_rx.fds,
141 client_rx.n_fds, reply->reply_aux);
142 rxbuf_clear(&client_rx);
143 } else if (error != EAGAIN) {
145 VLOG_ABORT("receive from worker failed (%s)",
146 ovs_retval_to_string(error));
151 /* Causes the poll loop to wake up if we need to process RPC replies. */
155 if (worker_is_running()) {
156 poll_fd_wait(client_sock, POLLIN);
160 /* Interface for main process to interact with the worker. */
162 /* Sends an RPC request to the worker process. The worker process will call
163 * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
164 * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
167 * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
168 * or worker_reply_iovec() with a reply. The main process will later call
169 * 'reply_cb' with the reply data (if any) and file descriptors (if any).
171 * 'request_cb' receives copies (as if by dup()) of the file descriptors in
172 * fds[]. 'request_cb' takes ownership of these copies, and the caller of
173 * worker_request() retains its ownership of the originals.
175 * This function may block until the RPC request has been sent (if the socket
176 * buffer fills up) but it does not wait for the reply (if any). If this
177 * function blocks, it may invoke reply callbacks for previous requests.
179 * The worker process executes RPC requests in strict order of submission and
180 * runs each request to completion before beginning the next request. The main
181 * process invokes reply callbacks in strict order of request submission. */
183 worker_request(const void *data, size_t size,
184 const int fds[], size_t n_fds,
185 worker_request_func *request_cb,
186 worker_reply_func *reply_cb, void *aux)
191 iov.iov_base = (void *) data;
193 worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
195 worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
200 worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
201 const int fds[], size_t n_fds)
209 /* Try to send the rest of the request. */
210 error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
211 fds, n_fds, sent, &sent);
212 if (error != EAGAIN) {
216 /* Process replies to avoid deadlock. */
219 /* Wait for 'client_sock' to become ready before trying again. We
220 * can't use poll_block() because it sometimes calls into vlog, which
221 * calls indirectly into worker_send_iovec(). To be usable here,
222 * poll_block() would therefore need to be reentrant, but it isn't
223 * (calling it recursively causes memory corruption and an eventual
225 pfd.fd = client_sock;
226 pfd.events = POLLIN | POLLOUT;
228 error = poll(&pfd, 1, -1) < 0 ? errno : 0;
229 } while (error == EINTR);
232 VLOG_ABORT("poll failed (%s)", strerror(error));
237 /* Same as worker_request() except that the data to send is specified as an
238 * array of iovecs. */
240 worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
241 const int fds[], size_t n_fds,
242 worker_request_func *request_cb,
243 worker_reply_func *reply_cb, void *aux)
245 static bool recursing = false;
246 struct worker_request rq;
247 struct iovec *all_iovs;
250 assert(worker_is_running());
254 rq.request_len = iovec_len(iovs, n_iovs);
255 rq.request_cb = request_cb;
256 rq.reply_cb = reply_cb;
259 all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
260 error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
263 VLOG_ABORT("send failed (%s)", strerror(error));
270 /* Closes the client socket, if any, so that worker_is_running() will return
273 * The client does this just before aborting if the worker process dies or
274 * malfunctions, to prevent the logging subsystem from trying to use the
275 * worker to log the failure. */
279 if (client_sock >= 0) {
285 /* Interfaces for RPC implementations (running in the worker process). */
287 static int server_sock = -1;
288 static bool expect_reply;
289 static struct worker_request request;
291 /* When a call to worker_request() or worker_request_iovec() provides a
292 * 'reply_cb' callback, the 'request_cb' implementation must call this function
293 * to send its reply. The main process will call 'reply_cb' passing the
294 * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
295 * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
297 * If a call to worker_request() or worker_request_iovec() provides no
298 * 'reply_cb' callback, the 'request_cb' implementation must not call this
301 * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
302 * fds[]. 'reply_cb' takes ownership of these copies, and the caller of
303 * worker_reply() retains its ownership of the originals.
305 * This function blocks until the RPC reply has been sent (if the socket buffer
306 * fills up) but it does not wait for the main process to receive or to process
309 worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
314 iov.iov_base = (void *) data;
316 worker_reply_iovec(&iov, 1, fds, n_fds);
318 worker_reply_iovec(NULL, 0, fds, n_fds);
322 /* Same as worker_reply() except that the data to send is specified as an array
325 worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
326 const int fds[], size_t n_fds)
328 struct worker_reply reply;
329 struct iovec *all_iovs;
332 assert(expect_reply);
333 expect_reply = false;
335 reply.reply_len = iovec_len(iovs, n_iovs);
336 reply.reply_cb = request.reply_cb;
337 reply.reply_aux = request.reply_aux;
339 all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
341 error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
343 if (error == EPIPE) {
344 /* Parent probably died. Continue processing any RPCs still buffered,
345 * to avoid missing log messages. */
346 VLOG_INFO("send failed (%s)", strerror(error));
348 VLOG_FATAL("send failed (%s)", strerror(error));
361 subprogram_name = "worker";
362 proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
363 VLOG_INFO("worker process started");
369 error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
371 request = *(struct worker_request *) rx.header.data;
373 expect_reply = request.reply_cb != NULL;
374 request.request_cb(&rx.payload, rx.fds, rx.n_fds);
375 assert(!expect_reply);
378 } else if (error == EOF && !rx.header.size) {
379 /* Main process closed the IPC socket. Exit cleanly. */
381 } else if (error != EAGAIN) {
382 VLOG_FATAL("RPC receive failed (%s)", ovs_retval_to_string(error));
385 poll_fd_wait(server_sock, POLLIN);
389 VLOG_INFO("worker process exiting");
394 rxbuf_init(struct rxbuf *rx)
396 ofpbuf_init(&rx->header, 0);
398 ofpbuf_init(&rx->payload, 0);
402 rxbuf_clear(struct rxbuf *rx)
404 ofpbuf_clear(&rx->header);
406 ofpbuf_clear(&rx->payload);
410 rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
413 if (!rx->header.size) {
416 ofpbuf_clear(&rx->header);
417 ofpbuf_prealloc_tailroom(&rx->header, header_len);
419 retval = recv_data_and_fds(sock, rx->header.data, header_len,
420 rx->fds, &rx->n_fds);
422 return retval ? -retval : EOF;
424 rx->header.size += retval;
425 } else if (rx->header.size < header_len) {
429 error = read_fully(sock, ofpbuf_tail(&rx->header),
430 header_len - rx->header.size, &bytes_read);
431 rx->header.size += bytes_read;
436 size_t payload_len = *(size_t *) rx->header.data;
438 if (rx->payload.size < payload_len) {
439 size_t left = payload_len - rx->payload.size;
443 ofpbuf_prealloc_tailroom(&rx->payload, left);
444 error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
446 rx->payload.size += bytes_read;
457 static struct iovec *
458 prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
462 dst = xmalloc((n_iovs + 1) * sizeof *dst);
463 dst[0].iov_base = data;
464 dst[0].iov_len = len;
465 memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);