Add the ability to connect to a vconn asynchronously.
[sliver-openvswitch.git] / lib / vconn-tcp.c
index 1878d2d..996ac17 100644 (file)
@@ -35,6 +35,7 @@
 #include "util.h"
 #include "openflow.h"
 #include "ofp-print.h"
+#include "poll-loop.h"
 
 #include "vlog.h"
 #define THIS_MODULE VLM_vconn_tcp
@@ -47,22 +48,17 @@ struct tcp_vconn
     int fd;
     struct buffer *rxbuf;
     struct buffer *txbuf;
+    struct poll_waiter *tx_waiter;
 };
 
 static int
-new_tcp_vconn(const char *name, int fd, struct vconn **vconnp) 
+new_tcp_vconn(const char *name, int fd, int connect_status,
+              struct vconn **vconnp)
 {
     struct tcp_vconn *tcp;
     int on = 1;
     int retval;
 
-    retval = set_nonblocking(fd);
-    if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
-        close(fd);
-        return retval;
-    }
-
     retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
     if (retval) {
         VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
@@ -72,18 +68,20 @@ new_tcp_vconn(const char *name, int fd, struct vconn **vconnp)
 
     tcp = xmalloc(sizeof *tcp);
     tcp->vconn.class = &tcp_vconn_class;
+    tcp->vconn.connect_status = connect_status;
     tcp->fd = fd;
     tcp->txbuf = NULL;
+    tcp->tx_waiter = NULL;
     tcp->rxbuf = NULL;
     *vconnp = &tcp->vconn;
     return 0;
 }
 
 static struct tcp_vconn *
-tcp_vconn_cast(struct vconn *vconn) 
+tcp_vconn_cast(struct vconn *vconn)
 {
     assert(vconn->class == &tcp_vconn_class);
-    return CONTAINER_OF(vconn, struct tcp_vconn, vconn); 
+    return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
 }
 
 
@@ -120,60 +118,41 @@ tcp_open(const char *name, char *suffix, struct vconn **vconnp)
         return errno;
     }
 
-    retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
-    if (retval < 0) {
-        int error = errno;
-        VLOG_ERR("%s: connect: %s", name, strerror(error));
+    retval = set_nonblocking(fd);
+    if (retval) {
         close(fd);
-        return error;
+        return retval;
     }
 
-    return new_tcp_vconn(name, fd, vconnp);
+    retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
+    if (retval < 0) {
+        if (errno == EINPROGRESS) {
+            return new_tcp_vconn(name, fd, EAGAIN, vconnp);
+        } else {
+            int error = errno;
+            VLOG_ERR("%s: connect: %s", name, strerror(error));
+            close(fd);
+            return error;
+        }
+    } else {
+        return new_tcp_vconn(name, fd, 0, vconnp);
+    }
 }
 
 static void
-tcp_close(struct vconn *vconn) 
+tcp_close(struct vconn *vconn)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    poll_cancel(tcp->tx_waiter);
     close(tcp->fd);
     free(tcp);
 }
 
-static void
-tcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd) 
-{
-    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
-    pfd->fd = tcp->fd;
-    if (want & WANT_RECV) {
-        pfd->events |= POLLIN;
-    }
-    if (want & WANT_SEND || tcp->txbuf) {
-        pfd->events |= POLLOUT;
-    }
-}
-
-static void
-tcp_postpoll(struct vconn *vconn, short int *revents)
+static int
+tcp_connect(struct vconn *vconn)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
-    if (*revents & POLLOUT && tcp->txbuf) {
-        ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
-        if (n < 0) {
-            if (errno != EAGAIN) {
-                VLOG_ERR("send: %s", strerror(errno));
-                *revents |= POLLERR;
-            }
-        } else if (n > 0) {
-            buffer_pull(tcp->txbuf, n);
-            if (tcp->txbuf->size == 0) {
-                buffer_delete(tcp->txbuf);
-                tcp->txbuf = NULL;
-            }
-        }
-        if (tcp->txbuf) {
-            *revents &= ~POLLOUT;
-        }
-    }
+    return check_connection_completion(tcp->fd);
 }
 
 static int
@@ -223,8 +202,38 @@ again:
     }
 }
 
+static void
+tcp_clear_txbuf(struct tcp_vconn *tcp)
+{
+    buffer_delete(tcp->txbuf);
+    tcp->txbuf = NULL;
+    tcp->tx_waiter = NULL;
+}
+
+static void
+tcp_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+    struct vconn *vconn = vconn_;
+    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
+    if (n < 0) {
+        if (errno != EAGAIN) {
+            VLOG_ERR("send: %s", strerror(errno));
+            tcp_clear_txbuf(tcp);
+            return;
+        }
+    } else if (n > 0) {
+        buffer_pull(tcp->txbuf, n);
+        if (!tcp->txbuf->size) {
+            tcp_clear_txbuf(tcp);
+            return;
+        }
+    }
+    tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
+}
+
 static int
-tcp_send(struct vconn *vconn, struct buffer *buffer) 
+tcp_send(struct vconn *vconn, struct buffer *buffer)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
     ssize_t retval;
@@ -242,20 +251,47 @@ tcp_send(struct vconn *vconn, struct buffer *buffer)
         if (retval > 0) {
             buffer_pull(buffer, retval);
         }
+        tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
         return 0;
     } else {
         return errno;
     }
 }
 
+static void
+tcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    switch (wait) {
+    case WAIT_CONNECT:
+        poll_fd_wait(tcp->fd, POLLOUT, NULL);
+        break;
+
+    case WAIT_SEND:
+        if (!tcp->txbuf) {
+            poll_fd_wait(tcp->fd, POLLOUT, NULL);
+        } else {
+            /* Nothing to do: need to drain txbuf first. */
+        }
+        break;
+
+    case WAIT_RECV:
+        poll_fd_wait(tcp->fd, POLLIN, NULL);
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+}
+
 struct vconn_class tcp_vconn_class = {
     .name = "tcp",
     .open = tcp_open,
     .close = tcp_close,
-    .prepoll = tcp_prepoll,
-    .postpoll = tcp_postpoll,
+    .connect = tcp_connect,
     .recv = tcp_recv,
     .send = tcp_send,
+    .wait = tcp_wait,
 };
 \f
 /* Passive TCP. */
@@ -267,10 +303,10 @@ struct ptcp_vconn
 };
 
 static struct ptcp_vconn *
-ptcp_vconn_cast(struct vconn *vconn) 
+ptcp_vconn_cast(struct vconn *vconn)
 {
     assert(vconn->class == &ptcp_vconn_class);
-    return CONTAINER_OF(vconn, struct ptcp_vconn, vconn); 
+    return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
 }
 
 static int
@@ -316,55 +352,64 @@ ptcp_open(const char *name, char *suffix, struct vconn **vconnp)
 
     retval = set_nonblocking(fd);
     if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
         close(fd);
         return retval;
     }
 
     ptcp = xmalloc(sizeof *ptcp);
     ptcp->vconn.class = &ptcp_vconn_class;
+    ptcp->vconn.connect_status = 0;
     ptcp->fd = fd;
     *vconnp = &ptcp->vconn;
     return 0;
 }
 
 static void
-ptcp_close(struct vconn *vconn) 
+ptcp_close(struct vconn *vconn)
 {
     struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
     close(ptcp->fd);
     free(ptcp);
 }
 
-static void
-ptcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd) 
-{
-    struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
-    pfd->fd = ptcp->fd;
-    if (want & WANT_ACCEPT) {
-        pfd->events |= POLLIN;
-    }
-}
-
 static int
-ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp) 
+ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
 {
     struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
     int new_fd;
+    int error;
 
     new_fd = accept(ptcp->fd, NULL, NULL);
     if (new_fd < 0) {
-        return errno;
+        int error = errno;
+        if (error != EAGAIN) {
+            VLOG_DBG("accept: %s", strerror(error));
+        }
+        return error;
     }
 
-    return new_tcp_vconn("tcp" /* FIXME */, new_fd, new_vconnp);
+    error = set_nonblocking(new_fd);
+    if (error) {
+        close(new_fd);
+        return error;
+    }
+
+    return new_tcp_vconn("tcp" /* FIXME */, new_fd, 0, new_vconnp);
+}
+
+static void
+ptcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
+    assert(wait == WAIT_ACCEPT);
+    poll_fd_wait(ptcp->fd, POLLIN, NULL);
 }
 
 struct vconn_class ptcp_vconn_class = {
     .name = "ptcp",
     .open = ptcp_open,
     .close = ptcp_close,
-    .prepoll = ptcp_prepoll,
     .accept = ptcp_accept,
+    .wait = ptcp_wait
 };