X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=lib%2Fvconn-stream.c;h=f19f3ebfa04043add23c0345011e1a9e7d33de0e;hb=60cb3eb8b296e2aebbda6ccc161e99ad2bc7ca4a;hp=46279e57ea03ab0a5bf79075e2e1f8235f9cf02f;hpb=193456d581423f894e57e8463ff5049c0d802f0a;p=sliver-openvswitch.git diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c index 46279e57e..f19f3ebfa 100644 --- a/lib/vconn-stream.c +++ b/lib/vconn-stream.c @@ -23,6 +23,7 @@ #include #include #include +#include "fatal-signal.h" #include "leak-checker.h" #include "ofpbuf.h" #include "openflow/openflow.h" @@ -41,10 +42,9 @@ struct stream_vconn { struct vconn vconn; int fd; - void (*connect_success_cb)(struct vconn *, int); struct ofpbuf *rxbuf; struct ofpbuf *txbuf; - struct poll_waiter *tx_waiter; + char *unlink_path; }; static struct vconn_class stream_vconn_class; @@ -52,24 +52,29 @@ static struct vconn_class stream_vconn_class; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); static void stream_clear_txbuf(struct stream_vconn *); +static void maybe_unlink_and_free(char *path); +/* Creates a new vconn named 'name' that will send and receive data on 'fd' and + * stores a pointer to the vconn in '*vconnp'. Initial connection status + * 'connect_status' is interpreted as described for vconn_init(). + * + * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to + * fatal_signal_unlink_file_now() and then freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ int new_stream_vconn(const char *name, int fd, int connect_status, - uint32_t remote_ip, uint16_t remote_port, - bool reconnectable, - connect_success_cb_func *connect_success_cb, - struct vconn **vconnp) + char *unlink_path, struct vconn **vconnp) { struct stream_vconn *s; s = xmalloc(sizeof *s); - vconn_init(&s->vconn, &stream_vconn_class, connect_status, remote_ip, - remote_port, name, reconnectable); + vconn_init(&s->vconn, &stream_vconn_class, connect_status, name); s->fd = fd; s->txbuf = NULL; - s->tx_waiter = NULL; s->rxbuf = NULL; - s->connect_success_cb = connect_success_cb; + s->unlink_path = unlink_path; *vconnp = &s->vconn; return 0; } @@ -85,10 +90,10 @@ static void stream_close(struct vconn *vconn) { struct stream_vconn *s = stream_vconn_cast(vconn); - poll_cancel(s->tx_waiter); stream_clear_txbuf(s); ofpbuf_delete(s->rxbuf); close(s->fd); + maybe_unlink_and_free(s->unlink_path); free(s); } @@ -96,14 +101,7 @@ static int stream_connect(struct vconn *vconn) { struct stream_vconn *s = stream_vconn_cast(vconn); - int retval = check_connection_completion(s->fd); - if (retval) { - return retval; - } - if (s->connect_success_cb) { - s->connect_success_cb(vconn, s->fd); - } - return 0; + return check_connection_completion(s->fd); } static int @@ -169,29 +167,6 @@ stream_clear_txbuf(struct stream_vconn *s) { ofpbuf_delete(s->txbuf); s->txbuf = NULL; - s->tx_waiter = NULL; -} - -static void -stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_) -{ - struct vconn *vconn = vconn_; - struct stream_vconn *s = stream_vconn_cast(vconn); - ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size); - if (n < 0) { - if (errno != EAGAIN) { - VLOG_ERR_RL(&rl, "send: %s", strerror(errno)); - stream_clear_txbuf(s); - return; - } - } else if (n > 0) { - ofpbuf_pull(s->txbuf, n); - if (!s->txbuf->size) { - stream_clear_txbuf(s); - return; - } - } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); } static int @@ -214,13 +189,48 @@ stream_send(struct vconn *vconn, struct ofpbuf *buffer) if (retval > 0) { ofpbuf_pull(buffer, retval); } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); return 0; } else { return errno; } } +static void +stream_run(struct vconn *vconn) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + ssize_t n; + + if (!s->txbuf) { + return; + } + + n = write(s->fd, s->txbuf->data, s->txbuf->size); + if (n < 0) { + if (errno != EAGAIN) { + VLOG_ERR_RL(&rl, "send: %s", strerror(errno)); + stream_clear_txbuf(s); + return; + } + } else if (n > 0) { + ofpbuf_pull(s->txbuf, n); + if (!s->txbuf->size) { + stream_clear_txbuf(s); + return; + } + } +} + +static void +stream_run_wait(struct vconn *vconn) +{ + struct stream_vconn *s = stream_vconn_cast(vconn); + + if (s->txbuf) { + poll_fd_wait(s->fd, POLLOUT); + } +} + static void stream_wait(struct vconn *vconn, enum vconn_wait_type wait) { @@ -234,7 +244,9 @@ stream_wait(struct vconn *vconn, enum vconn_wait_type wait) if (!s->txbuf) { poll_fd_wait(s->fd, POLLOUT); } else { - /* Nothing to do: need to drain txbuf first. */ + /* Nothing to do: need to drain txbuf first. stream_run_wait() + * will arrange to wake up when there room to send data, so there's + * no point in calling poll_fd_wait() redundantly here. */ } break; @@ -254,6 +266,8 @@ static struct vconn_class stream_vconn_class = { stream_connect, /* connect */ stream_recv, /* recv */ stream_send, /* send */ + stream_run, /* run */ + stream_run_wait, /* run_wait */ stream_wait, /* wait */ }; @@ -265,6 +279,7 @@ struct pstream_pvconn int fd; int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, struct vconn **); + char *unlink_path; }; static struct pvconn_class pstream_pvconn_class; @@ -276,32 +291,31 @@ pstream_pvconn_cast(struct pvconn *pvconn) return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn); } +/* Creates a new pvconn named 'name' that will accept new socket connections on + * 'fd' and stores a pointer to the vconn in '*pvconnp'. + * + * When a connection has been accepted, 'accept_cb' will be called with the new + * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. + * accept_cb must return 0 if the connection is successful, in which case it + * must initialize '*vconnp' to the new vconn, or a positive errno value on + * error. In either case accept_cb takes ownership of the 'fd' passed in. + * + * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to + * fatal_signal_unlink_file_now() and freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ int new_pstream_pvconn(const char *name, int fd, - int (*accept_cb)(int fd, const struct sockaddr *, - size_t sa_len, struct vconn **), - struct pvconn **pvconnp) + int (*accept_cb)(int fd, const struct sockaddr *sa, + size_t sa_len, struct vconn **vconnp), + char *unlink_path, struct pvconn **pvconnp) { - struct pstream_pvconn *ps; - int retval; - - retval = set_nonblocking(fd); - if (retval) { - close(fd); - return retval; - } - - if (listen(fd, 10) < 0) { - int error = errno; - VLOG_ERR("%s: listen: %s", name, strerror(error)); - close(fd); - return error; - } - - ps = xmalloc(sizeof *ps); + struct pstream_pvconn *ps = xmalloc(sizeof *ps); pvconn_init(&ps->pvconn, &pstream_pvconn_class, name); ps->fd = fd; ps->accept_cb = accept_cb; + ps->unlink_path = unlink_path; *pvconnp = &ps->pvconn; return 0; } @@ -311,6 +325,7 @@ pstream_close(struct pvconn *pvconn) { struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); close(ps->fd); + maybe_unlink_and_free(ps->unlink_path); free(ps); } @@ -356,3 +371,13 @@ static struct pvconn_class pstream_pvconn_class = { pstream_accept, pstream_wait }; + +/* Helper functions. */ +static void +maybe_unlink_and_free(char *path) +{ + if (path) { + fatal_signal_unlink_file_now(path); + free(path); + } +}