#include "util.h"
#include "openflow.h"
#include "ofp-print.h"
+#include "poll-loop.h"
#include "vlog.h"
#define THIS_MODULE VLM_vconn_tcp
int fd;
struct buffer *rxbuf;
struct buffer *txbuf;
+ struct poll_waiter *tx_waiter;
};
static int
-new_tcp_vconn(const char *name, int fd, struct vconn **vconnp)
+new_tcp_vconn(const char *name, int fd, int connect_status,
+ struct vconn **vconnp)
{
struct tcp_vconn *tcp;
int on = 1;
int retval;
- retval = set_nonblocking(fd);
- if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
- close(fd);
- return retval;
- }
-
retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
if (retval) {
VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
tcp = xmalloc(sizeof *tcp);
tcp->vconn.class = &tcp_vconn_class;
+ tcp->vconn.connect_status = connect_status;
tcp->fd = fd;
tcp->txbuf = NULL;
+ tcp->tx_waiter = NULL;
tcp->rxbuf = NULL;
*vconnp = &tcp->vconn;
return 0;
}
static struct tcp_vconn *
-tcp_vconn_cast(struct vconn *vconn)
+tcp_vconn_cast(struct vconn *vconn)
{
assert(vconn->class == &tcp_vconn_class);
- return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
+ return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
}
return errno;
}
- retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
- if (retval < 0) {
- int error = errno;
- VLOG_ERR("%s: connect: %s", name, strerror(error));
+ retval = set_nonblocking(fd);
+ if (retval) {
close(fd);
- return error;
+ return retval;
}
- return new_tcp_vconn(name, fd, vconnp);
+ retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
+ if (retval < 0) {
+ if (errno == EINPROGRESS) {
+ return new_tcp_vconn(name, fd, EAGAIN, vconnp);
+ } else {
+ int error = errno;
+ VLOG_ERR("%s: connect: %s", name, strerror(error));
+ close(fd);
+ return error;
+ }
+ } else {
+ return new_tcp_vconn(name, fd, 0, vconnp);
+ }
}
static void
-tcp_close(struct vconn *vconn)
+tcp_close(struct vconn *vconn)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ poll_cancel(tcp->tx_waiter);
close(tcp->fd);
free(tcp);
}
-static bool
-tcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
+static int
+tcp_connect(struct vconn *vconn)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
- pfd->fd = tcp->fd;
- if (want & WANT_RECV) {
- pfd->events |= POLLIN;
- }
- if (want & WANT_SEND || tcp->txbuf) {
- pfd->events |= POLLOUT;
- }
- return false;
-}
-
-static void
-tcp_postpoll(struct vconn *vconn, short int *revents)
-{
- struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
- if (*revents & POLLOUT && tcp->txbuf) {
- ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
- if (n < 0) {
- if (errno != EAGAIN) {
- VLOG_ERR("send: %s", strerror(errno));
- *revents |= POLLERR;
- }
- } else if (n > 0) {
- buffer_pull(tcp->txbuf, n);
- if (tcp->txbuf->size == 0) {
- buffer_delete(tcp->txbuf);
- tcp->txbuf = NULL;
- }
- }
- if (tcp->txbuf) {
- *revents &= ~POLLOUT;
- }
- }
+ return check_connection_completion(tcp->fd);
}
static int
}
}
+static void
+tcp_clear_txbuf(struct tcp_vconn *tcp)
+{
+ buffer_delete(tcp->txbuf);
+ tcp->txbuf = NULL;
+ tcp->tx_waiter = NULL;
+}
+
+static void
+tcp_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+ struct vconn *vconn = vconn_;
+ struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ VLOG_ERR("send: %s", strerror(errno));
+ tcp_clear_txbuf(tcp);
+ return;
+ }
+ } else if (n > 0) {
+ buffer_pull(tcp->txbuf, n);
+ if (!tcp->txbuf->size) {
+ tcp_clear_txbuf(tcp);
+ return;
+ }
+ }
+ tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
+}
+
static int
-tcp_send(struct vconn *vconn, struct buffer *buffer)
+tcp_send(struct vconn *vconn, struct buffer *buffer)
{
struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
ssize_t retval;
if (retval > 0) {
buffer_pull(buffer, retval);
}
+ tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
return 0;
} else {
return errno;
}
}
+static void
+tcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+ switch (wait) {
+ case WAIT_CONNECT:
+ poll_fd_wait(tcp->fd, POLLOUT, NULL);
+ break;
+
+ case WAIT_SEND:
+ if (!tcp->txbuf) {
+ poll_fd_wait(tcp->fd, POLLOUT, NULL);
+ } else {
+ /* Nothing to do: need to drain txbuf first. */
+ }
+ break;
+
+ case WAIT_RECV:
+ poll_fd_wait(tcp->fd, POLLIN, NULL);
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+}
+
struct vconn_class tcp_vconn_class = {
.name = "tcp",
.open = tcp_open,
.close = tcp_close,
- .prepoll = tcp_prepoll,
- .postpoll = tcp_postpoll,
+ .connect = tcp_connect,
.recv = tcp_recv,
.send = tcp_send,
+ .wait = tcp_wait,
};
\f
/* Passive TCP. */
};
static struct ptcp_vconn *
-ptcp_vconn_cast(struct vconn *vconn)
+ptcp_vconn_cast(struct vconn *vconn)
{
assert(vconn->class == &ptcp_vconn_class);
- return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
+ return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
}
static int
retval = set_nonblocking(fd);
if (retval) {
- VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
close(fd);
return retval;
}
ptcp = xmalloc(sizeof *ptcp);
ptcp->vconn.class = &ptcp_vconn_class;
+ ptcp->vconn.connect_status = 0;
ptcp->fd = fd;
*vconnp = &ptcp->vconn;
return 0;
}
static void
-ptcp_close(struct vconn *vconn)
+ptcp_close(struct vconn *vconn)
{
struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
close(ptcp->fd);
free(ptcp);
}
-static bool
-ptcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
- struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
- pfd->fd = ptcp->fd;
- if (want & WANT_ACCEPT) {
- pfd->events |= POLLIN;
- }
- return false;
-}
-
static int
-ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
+ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
{
struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
int new_fd;
+ int error;
new_fd = accept(ptcp->fd, NULL, NULL);
if (new_fd < 0) {
- return errno;
+ int error = errno;
+ if (error != EAGAIN) {
+ VLOG_DBG("accept: %s", strerror(error));
+ }
+ return error;
}
- return new_tcp_vconn("tcp" /* FIXME */, new_fd, new_vconnp);
+ error = set_nonblocking(new_fd);
+ if (error) {
+ close(new_fd);
+ return error;
+ }
+
+ return new_tcp_vconn("tcp" /* FIXME */, new_fd, 0, new_vconnp);
+}
+
+static void
+ptcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+ struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
+ assert(wait == WAIT_ACCEPT);
+ poll_fd_wait(ptcp->fd, POLLIN, NULL);
}
struct vconn_class ptcp_vconn_class = {
.name = "ptcp",
.open = ptcp_open,
.close = ptcp_close,
- .prepoll = ptcp_prepoll,
.accept = ptcp_accept,
+ .wait = ptcp_wait
};