Add the ability to connect to a vconn asynchronously.
authorBen Pfaff <blp@nicira.com>
Thu, 27 Mar 2008 22:16:19 +0000 (15:16 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 28 Mar 2008 00:50:33 +0000 (17:50 -0700)
Until now, vconn_connect() has always completed the connection
synchronously, blocking as necessary.  In the userspace
switch, we want to be able to continue forwarding packets even
if the connection to the controller drops.  Thus, this change set
that makes that possible.

The approach taken is perhaps more ambitious than needed, as it
actually adds a new high-level mechanism for polling on arbitrary
file descriptors.  This necessitates quite a bit of change to
each of the userspace programs that use vconns, but it also has
the effect of simplifying them.  The new structure of these programs
is a lot less fragile than the old one (which tended to end up
livelocking or hanging when something wasn't quite right), so it
seems like the changes are worth it.

17 files changed:
controller/controller.c
include/Makefile.am
include/poll-loop.h [new file with mode: 0644]
include/socket-util.h
include/vconn.h
include/vlog-socket.h
include/vlog.h
lib/Makefile.am
lib/poll-loop.c [new file with mode: 0644]
lib/socket-util.c
lib/vconn-netlink.c
lib/vconn-ssl.c
lib/vconn-tcp.c
lib/vconn.c
lib/vlog-socket.c
secchan/secchan.c
utilities/dpctl.c

index 43c0b3f..f4a53f5 100644 (file)
@@ -39,6 +39,7 @@
 #include "mac.h"
 #include "ofp-print.h"
 #include "openflow.h"
+#include "poll-loop.h"
 #include "time.h"
 #include "util.h"
 #include "vconn-ssl.h"
@@ -55,7 +56,6 @@
 struct switch_ {
     char *name;
     struct vconn *vconn;
-    struct pollfd *pollfd;
 
     uint64_t datapath_id;
     time_t last_control_hello;
@@ -95,8 +95,6 @@ int
 main(int argc, char *argv[])
 {
     struct switch_ *switches[MAX_SWITCHES];
-    struct pollfd pollfds[MAX_SWITCHES + 1];
-    struct vlog_server *vlog_server;
     int n_switches;
     int retval;
     int i;
@@ -114,7 +112,7 @@ main(int argc, char *argv[])
         fatal(0, "at least one vconn argument required; use --help for usage");
     }
 
-    retval = vlog_server_listen(NULL, &vlog_server);
+    retval = vlog_server_listen(NULL, NULL);
     if (retval) {
         fatal(retval, "Could not listen for vlog connections");
     }
@@ -132,99 +130,67 @@ main(int argc, char *argv[])
     if (n_switches == 0) {
         fatal(0, "could not connect to any switches");
     }
-    
-    while (n_switches > 0) {
-        size_t n_ready;
-        int retval;
-
-        /* Wait until there's something to do. */
-        n_ready = 0;
-        for (i = 0; i < n_switches; i++) {
-            struct switch_ *this = switches[i];
-            int want;
-
-            if (vconn_is_passive(this->vconn)) {
-                want = n_switches < MAX_SWITCHES ? WANT_ACCEPT : 0;
-            } else {
-                want = WANT_RECV;
-                if (this->n_txq) {
-                    want |= WANT_SEND;
-                }
-            }
-
-            this->pollfd = &pollfds[i];
-            this->pollfd->fd = -1;
-            this->pollfd->events = 0;
-            n_ready += vconn_prepoll(this->vconn, want, this->pollfd);
-        }
-        if (vlog_server) {
-            pollfds[n_switches].fd = vlog_server_get_fd(vlog_server);
-            pollfds[n_switches].events = POLLIN;
-        }
-        do {
-            retval = poll(pollfds, n_switches + (vlog_server != NULL),
-                          n_ready ? 0 : -1);
-        } while (retval < 0 && errno == EINTR);
-        if (retval < 0 || (retval == 0 && !n_ready)) {
-            fatal(retval < 0 ? errno : 0, "poll");
-        }
-
-        /* Let each connection deal with any pending operations. */
-        for (i = 0; i < n_switches; i++) {
-            struct switch_ *this = switches[i];
-            vconn_postpoll(this->vconn, &this->pollfd->revents);
-            if (this->pollfd->revents & POLLERR) {
-                this->pollfd->revents |= POLLIN | POLLOUT;
-            }
-        }
-        if (vlog_server && pollfds[n_switches].revents) {
-            vlog_server_poll(vlog_server);
-        }
 
-        for (i = 0; i < n_switches; ) {
-            struct switch_ *this = switches[i];
+    while (n_switches > 0) {
+        /* Do some work.  Limit the number of iterations so that callbacks
+         * registered with the poll loop don't starve. */
+        int iteration;
+        int i;
+        for (iteration = 0; iteration < 50; iteration++) {
+            bool progress = false;
+            for (i = 0; i < n_switches; ) {
+                struct switch_ *this = switches[i];
+                int retval;
 
-            if (this->pollfd) {
-                retval = 0;
                 if (vconn_is_passive(this->vconn)) {
-                    if (this->pollfd->revents & POLLIN) {
+                    retval = 0;
+                    while (n_switches < MAX_SWITCHES) {
                         struct vconn *new_vconn;
-                        while (n_switches < MAX_SWITCHES 
-                               && (retval = vconn_accept(this->vconn,
-                                                         &new_vconn)) == 0) {
-                            switches[n_switches++] = new_switch("tcp",
-                                                                new_vconn);
+                        retval = vconn_accept(this->vconn, &new_vconn);
+                        if (retval) {
+                            break;
                         }
+                        switches[n_switches++] = new_switch("tcp", new_vconn);
                     }
                 } else {
-                    bool may_read = this->pollfd->revents & POLLIN;
-                    bool may_write = this->pollfd->revents & POLLOUT;
-                    if (may_read) {
-                        retval = do_switch_recv(this);
-                        if (!retval || retval == EAGAIN) {
-                            retval = 0;
-
-                            /* Enable writing to avoid round trip through poll
-                             * in common case. */
-                            may_write = true;
-                        }
-                    }
-                    while ((!retval || retval == EAGAIN) && may_write) {
-                        retval = do_switch_send(this);
-                        may_write = !retval;
+                    retval = do_switch_recv(this);
+                    if (!retval || retval == EAGAIN) {
+                        do {
+                            retval = do_switch_send(this);
+                            if (!retval) {
+                                progress = true;
+                            }
+                        } while (!retval);
                     }
                 }
 
                 if (retval && retval != EAGAIN) {
                     close_switch(this);
                     switches[i] = switches[--n_switches];
-                    continue;
+                } else {
+                    i++;
+                }
+            }
+            if (!progress) {
+                break;
+            }
+        }
+
+        /* Wait for something to happen. */
+        for (i = 0; i < n_switches; i++) {
+            struct switch_ *this = switches[i];
+            if (vconn_is_passive(this->vconn)) {
+                if (n_switches < MAX_SWITCHES) {
+                    vconn_accept_wait(this->vconn);
                 }
             } else {
-                /* New switch that hasn't been polled yet. */
+                vconn_recv_wait(this->vconn);
+                if (this->n_txq) {
+                    vconn_send_wait(this->vconn);
+                }
             }
-            i++;
         }
+        poll_block();
     }
 
     return 0;
@@ -288,7 +254,6 @@ new_switch(const char *name, struct vconn *vconn)
     memset(this, 0, sizeof *this);
     this->name = xstrdup(name);
     this->vconn = vconn;
-    this->pollfd = NULL;
     this->n_txq = 0;
     this->txq = NULL;
     this->tx_tail = NULL;
index 035769e..c1651e1 100644 (file)
@@ -11,12 +11,12 @@ noinst_HEADERS = \
        ip.h \
        list.h \
        mac.h \
-       Makefile.am \
        netlink.h \
        ofp-print.h \
        openflow.h \
        openflow-netlink.h \
        packets.h \
+       poll-loop.h \
        socket-util.h \
        util.h \
        vconn.h \
diff --git a/include/poll-loop.h b/include/poll-loop.h
new file mode 100644 (file)
index 0000000..87d541a
--- /dev/null
@@ -0,0 +1,38 @@
+/* Copyright (C) 2008 Board of Trustees, Leland Stanford Jr. University.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#ifndef POLL_LOOP_H
+#define POLL_LOOP_H 1
+
+#include <poll.h>
+
+typedef void poll_fd_func(int fd, short int revents, void *aux);
+
+struct poll_waiter *poll_fd_callback(int fd, short int events,
+                                     poll_fd_func *, void *aux);
+struct poll_waiter *poll_fd_wait(int fd, short int events, short int *revents);
+void poll_cancel(struct poll_waiter *);
+
+void poll_immediate_wake(void);
+void poll_timer_wait(int msec);
+void poll_block(void);
+
+#endif /* poll-loop.h */
index 2d167f9..4a42736 100644 (file)
@@ -27,5 +27,7 @@
 
 int set_nonblocking(int fd);
 int lookup_ip(const char *host_name, struct in_addr *address);
+int get_socket_error(int sock);
+int check_connection_completion(int fd);
 
 #endif /* socket-util.h */
index 064947c..a305523 100644 (file)
@@ -34,24 +34,31 @@ struct pollfd;
 /* Virtual connection to an OpenFlow device. */
 struct vconn {
     struct vconn_class *class;
-};
-
-/* What kind of operation do we want to perform? */
-enum {
-    WANT_ACCEPT = 1 << 0,          /* Want to accept a new connection. */
-    WANT_RECV = 1 << 1,            /* Want to receive a message. */
-    WANT_SEND = 1 << 2             /* Want to send a message. */
+    int connect_status;
 };
 
 int vconn_open(const char *name, struct vconn **);
 void vconn_close(struct vconn *);
 bool vconn_is_passive(const struct vconn *);
-bool vconn_prepoll(struct vconn *, int want, struct pollfd *);
-void vconn_postpoll(struct vconn *, short int *revents);
+int vconn_connect(struct vconn *);
 int vconn_accept(struct vconn *, struct vconn **);
 int vconn_recv(struct vconn *, struct buffer **);
 int vconn_send(struct vconn *, struct buffer *);
-int vconn_send_wait(struct vconn *, struct buffer *);
+
+int vconn_open_block(const char *name, struct vconn **);
+int vconn_send_block(struct vconn *, struct buffer *);
+
+enum vconn_wait_type {
+    WAIT_CONNECT,
+    WAIT_ACCEPT,
+    WAIT_RECV,
+    WAIT_SEND
+};
+void vconn_wait(struct vconn *, enum vconn_wait_type);
+void vconn_connect_wait(struct vconn *);
+void vconn_accept_wait(struct vconn *);
+void vconn_recv_wait(struct vconn *);
+void vconn_send_wait(struct vconn *);
 
 struct buffer *make_add_simple_flow(const struct flow *,
                                     uint32_t buffer_id, uint16_t out_port);
@@ -69,43 +76,29 @@ struct vconn_class {
     /* Attempts to connect to an OpenFlow device.  'name' is the full
      * connection name provided by the user, e.g. "nl:0", "tcp:1.2.3.4".  This
      * name is useful for error messages but must not be modified.
-     * 
+     *
      * 'suffix' is a copy of 'name' following the colon and may be modified.
      *
      * Returns 0 if successful, otherwise a positive errno value.  If
-     * successful, stores a pointer to the new connection in '*vconnp'. */
+     * successful, stores a pointer to the new connection in '*vconnp'.
+     *
+     * The open function must not block waiting for a connection to complete.
+     * If the connection cannot be completed immediately, it should return
+     * EAGAIN (not EINPROGRESS, as returned by the connect system call) and
+     * continue the connection in the background. */
     int (*open)(const char *name, char *suffix, struct vconn **vconnp);
 
     /* Closes 'vconn' and frees associated memory. */
     void (*close)(struct vconn *vconn);
 
-    /* Called by the main loop before calling poll(), this function must
-     * initialize 'pfd->fd' and 'pfd->events' appropriately so that poll() will
-     * wake up when the connection becomes available for the operations
-     * specified in 'want'.  The prepoll function may also set bits in 'pfd' to
-     * allow for internal processing.
-     *
-     * Should return false normally.  May return true to indicate that no
-     * blocking should happen in poll() because the connection is available for
-     * some operation specified in 'want' but that status cannot be detected
-     * via poll() and thus poll() could block forever otherwise. */
-    bool (*prepoll)(struct vconn *, int want, struct pollfd *pfd);
-
-    /* Called by the main loop after calling poll(), this function may perform
-     * any internal processing needed by the connection.  It is provided with
-     * the vconn file descriptor's status in '*revents', as reported by poll().
+    /* Tries to complete the connection on 'vconn', which must be an active
+     * vconn.  If 'vconn''s connection is complete, returns 0 if the connection
+     * was successful or a positive errno value if it failed.  If the
+     * connection is still in progress, returns EAGAIN.
      *
-     * The postpoll function should adjust '*revents' to reflect the status of
-     * the connection from the caller's point of view: that is, upon return
-     * '*revents & POLLIN' should indicate that a packet is (potentially) ready
-     * to be read (for an active vconn) or a new connection is ready to be
-     * accepted (for a passive vconn) and '*revents & POLLOUT' should indicate
-     * that a packet is (potentially) ready to be written.
-     *
-     * This function may be a null pointer in a vconn class that has no use for
-     * it, that is, if the vconn does not need to do any internal processing
-     * and poll's revents out properly reflects the vconn's status.  */
-    void (*postpoll)(struct vconn *, short int *revents);
+     * The connect function must not block waiting for the connection to
+     * complete; instead, it should return EAGAIN immediately. */
+    int (*connect)(struct vconn *vconn);
 
     /* Tries to accept a new connection on 'vconn', which must be a passive
      * vconn.  If successful, stores the new connection in '*new_vconnp' and
@@ -114,7 +107,7 @@ struct vconn_class {
      * The accept function must not block waiting for a connection.  If no
      * connection is ready to be accepted, it should return EAGAIN.
      *
-     * Nonnull iff this is a passive vconn (one that accepts connection and
+     * Nonnull iff this is a passive vconn (one that accepts connections and
      * does not transfer data). */
     int (*accept)(struct vconn *vconn, struct vconn **new_vconnp);
 
@@ -148,6 +141,8 @@ struct vconn_class {
      * Nonnull iff this is an active vconn (one that transfers data and does
      * not accept connections). */
     int (*send)(struct vconn *vconn, struct buffer *msg);
+
+    void (*wait)(struct vconn *vconn, enum vconn_wait_type);
 };
 
 extern struct vconn_class tcp_vconn_class;
index 90ec97c..2c41e24 100644 (file)
@@ -26,8 +26,6 @@
 struct vlog_server;
 int vlog_server_listen(const char *path, struct vlog_server **);
 void vlog_server_close(struct vlog_server *);
-int vlog_server_get_fd(const struct vlog_server *);
-void vlog_server_poll(struct vlog_server *);
 
 /* Client for Vlog control connection. */
 struct vlog_client;
index b324c4f..9be199c 100644 (file)
@@ -49,14 +49,20 @@ enum vlog_facility vlog_get_facility_val(const char *name);
 
 /* Modules that can emit log messages. */
 #define VLOG_MODULES                            \
+        VLOG_MODULE(chain)                      \
         VLOG_MODULE(controller)                 \
+        VLOG_MODULE(controller_connection)      \
         VLOG_MODULE(ctlpath)                    \
+        VLOG_MODULE(datapath)                   \
         VLOG_MODULE(dpif)                       \
         VLOG_MODULE(dpctl)                      \
         VLOG_MODULE(fault)                      \
         VLOG_MODULE(flow)                       \
+        VLOG_MODULE(netdev)                     \
         VLOG_MODULE(netlink)                    \
+        VLOG_MODULE(poll_loop)                  \
         VLOG_MODULE(secchan)                    \
+        VLOG_MODULE(switch)                     \
         VLOG_MODULE(socket_util)                \
         VLOG_MODULE(vconn_netlink)              \
         VLOG_MODULE(vconn_tcp)                  \
index f0c1ac6..906b5d8 100644 (file)
@@ -12,6 +12,7 @@ libopenflow_la_SOURCES = \
        hash.c \
        list.c \
        ofp-print.c \
+       poll-loop.c \
        socket-util.c \
        util.c \
        vconn-tcp.c \
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
new file mode 100644 (file)
index 0000000..ac31e32
--- /dev/null
@@ -0,0 +1,166 @@
+/* Copyright (C) 2008 Board of Trustees, Leland Stanford Jr. University.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "poll-loop.h"
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include "list.h"
+
+#define THIS_MODULE VLM_poll_loop
+#include "vlog.h"
+
+struct poll_waiter {
+    struct list node;
+    int fd;
+    short int events;
+    struct pollfd *pollfd;
+
+    short int *revents;
+
+    poll_fd_func *function;
+    void *aux;
+};
+
+static struct list waiters = LIST_INITIALIZER(&waiters);
+static size_t n_waiters;
+static int timeout = -1;
+
+#ifndef NDEBUG
+static struct poll_waiter *running_cb;
+#endif
+
+static struct poll_waiter *
+new_waiter(int fd, short int events)
+{
+    struct poll_waiter *waiter = xcalloc(1, sizeof *waiter);
+    assert(fd >= 0);
+    waiter->fd = fd;
+    waiter->events = events;
+    list_push_back(&waiters, &waiter->node);
+    n_waiters++;
+    return waiter;
+}
+
+struct poll_waiter *
+poll_fd_callback(int fd, short int events, poll_fd_func *function, void *aux)
+{
+    struct poll_waiter *pw = new_waiter(fd, events);
+    pw->function = function;
+    pw->aux = aux;
+    return pw;
+}
+
+struct poll_waiter *
+poll_fd_wait(int fd, short int events, short int *revents)
+{
+    struct poll_waiter *pw = new_waiter(fd, events);
+    pw->revents = revents;
+    if (revents) {
+        *revents = 0;
+    }
+    return pw;
+}
+
+void
+poll_cancel(struct poll_waiter *pw)
+{
+    if (pw) {
+        assert(pw != running_cb);
+        list_remove(&pw->node);
+        n_waiters--;
+    }
+}
+
+void
+poll_immediate_wake(void)
+{
+    timeout = 0;
+}
+
+void
+poll_timer_wait(int msec)
+{
+    if (timeout < 0 || msec < timeout) {
+        timeout = MAX(0, msec);
+    }
+}
+
+void
+poll_block(void)
+{
+    static struct pollfd *pollfds;
+    static size_t max_pollfds;
+
+    struct poll_waiter *pw;
+    struct list *node;
+    int n_pollfds;
+    int retval;
+
+    assert(!running_cb);
+    if (max_pollfds < n_waiters) {
+        max_pollfds = n_waiters;
+        pollfds = xrealloc(pollfds, max_pollfds * sizeof *pollfds);
+    }
+
+    n_pollfds = 0;
+    LIST_FOR_EACH (pw, struct poll_waiter, node, &waiters) {
+        pw->pollfd = &pollfds[n_pollfds];
+        pollfds[n_pollfds].fd = pw->fd;
+        pollfds[n_pollfds].events = pw->events;
+        pollfds[n_pollfds].revents = 0;
+        n_pollfds++;
+    }
+
+    do {
+        retval = poll(pollfds, n_pollfds, timeout);
+    } while (retval < 0 && errno == EINTR);
+    if (retval < 0) {
+        VLOG_ERR("poll: %s", strerror(errno));
+    }
+
+    for (node = waiters.next; node != &waiters; ) {
+        pw = CONTAINER_OF(node, struct poll_waiter, node);
+        if (!pw->pollfd || !pw->pollfd->revents) {
+            if (pw->function) {
+                node = node->next;
+                continue;
+            }
+        } else {
+            if (pw->function) {
+#ifndef NDEBUG
+                running_cb = pw;
+#endif
+                pw->function(pw->fd, pw->pollfd->revents, pw->aux);
+#ifndef NDEBUG
+                running_cb = NULL;
+#endif
+            } else if (pw->revents) {
+                *pw->revents = pw->pollfd->revents;
+            }
+        }
+        node = list_remove(node);
+        n_waiters--;
+    }
+
+    timeout = -1;
+}
index 3397fdb..dac3594 100644 (file)
@@ -24,7 +24,9 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <netdb.h>
+#include <poll.h>
 #include <stdio.h>
+#include <string.h>
 
 #include "vlog.h"
 #define THIS_MODULE VLM_socket_util
@@ -36,8 +38,14 @@ set_nonblocking(int fd)
 {
     int flags = fcntl(fd, F_GETFL, 0);
     if (flags != -1) {
-        return fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1 ? 0 : errno;
+        if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1) {
+            return 0;
+        } else {
+            VLOG_ERR("fcntl(F_SETFL) failed: %s", strerror(errno));
+            return errno;
+        }
     } else {
+        VLOG_ERR("fcntl(F_GETFL) failed: %s", strerror(errno));
         return errno;
     }
 }
@@ -63,3 +71,38 @@ lookup_ip(const char *host_name, struct in_addr *addr)
     }
     return 0;
 }
+
+/* Returns the error condition associated with socket 'fd' and resets the
+ * socket's error status. */
+int
+get_socket_error(int fd) 
+{
+    int error;
+    socklen_t len = sizeof(error);
+    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
+        error = errno;
+        VLOG_ERR("getsockopt(SO_ERROR): %s", strerror(error));
+    }
+    return error;
+}
+
+int
+check_connection_completion(int fd) 
+{
+    struct pollfd pfd;
+    int retval;
+
+    pfd.fd = fd;
+    pfd.events = POLLOUT;
+    do {
+        retval = poll(&pfd, 1, 0);
+    } while (retval < 0 && errno == EINTR);
+    if (retval == 1) {
+        return get_socket_error(fd);
+    } else if (retval < 0) {
+        VLOG_ERR("poll: %s", strerror(errno));
+        return errno;
+    } else {
+        return EAGAIN;
+    }
+}
index ab07739..5a4a617 100644 (file)
@@ -34,6 +34,7 @@
 #include "buffer.h"
 #include "dpif.h"
 #include "netlink.h"
+#include "poll-loop.h"
 #include "socket-util.h"
 #include "util.h"
 #include "openflow.h"
@@ -67,6 +68,7 @@ netlink_open(const char *name, char *suffix, struct vconn **vconnp)
 
     netlink = xmalloc(sizeof *netlink);
     netlink->vconn.class = &netlink_vconn_class;
+    netlink->vconn.connect_status = 0;
     retval = dpif_open(dp_idx, true, &netlink->dp);
     if (retval) {
         free(netlink);
@@ -85,20 +87,6 @@ netlink_close(struct vconn *vconn)
     free(netlink);
 }
 
-static bool
-netlink_prepoll(struct vconn *vconn, int want, struct pollfd *pfd) 
-{
-    struct netlink_vconn *netlink = netlink_vconn_cast(vconn);
-    pfd->fd = nl_sock_fd(netlink->dp.sock);
-    if (want & WANT_RECV) {
-        pfd->events |= POLLIN;
-    }
-    if (want & WANT_SEND) {
-        pfd->events |= POLLOUT;
-    }
-    return false;
-}
-
 static int
 netlink_recv(struct vconn *vconn, struct buffer **bufferp)
 {
@@ -117,11 +105,31 @@ netlink_send(struct vconn *vconn, struct buffer *buffer)
     return retval;
 }
 
+static void
+netlink_wait(struct vconn *vconn, enum vconn_wait_type wait) 
+{
+    struct netlink_vconn *netlink = netlink_vconn_cast(vconn);
+    short int events = 0;
+    switch (wait) {
+    case WAIT_RECV:
+        events = POLLIN;
+        break;
+
+    case WAIT_SEND:
+        events = 0;
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+    poll_fd_wait(nl_sock_fd(netlink->dp.sock), events, NULL);
+}
+
 struct vconn_class netlink_vconn_class = {
     .name = "nl",
     .open = netlink_open,
     .close = netlink_close,
-    .prepoll = netlink_prepoll,
     .recv = netlink_recv,
     .send = netlink_send,
+    .wait = netlink_wait,
 };
index 199c987..ed2e035 100644 (file)
@@ -33,7 +33,9 @@
 #include "socket-util.h"
 #include "util.h"
 #include "openflow.h"
+#include "poll-loop.h"
 #include "ofp-print.h"
+#include "socket-util.h"
 #include "vconn.h"
 
 #include "vlog.h"
@@ -42,8 +44,8 @@
 /* Active SSL. */
 
 enum ssl_state {
-    STATE_SSL_CONNECTING,
-    STATE_CONNECTED
+    STATE_TCP_CONNECTING,
+    STATE_SSL_CONNECTING
 };
 
 enum session_type {
@@ -61,6 +63,7 @@ struct ssl_vconn
     SSL *ssl;
     struct buffer *rxbuf;
     struct buffer *txbuf;
+    struct poll_waiter *tx_waiter;
 };
 
 /* SSL context created by ssl_init(). */
@@ -71,15 +74,15 @@ static bool has_private_key, has_certificate, has_ca_cert;
 
 static int ssl_init(void);
 static int do_ssl_init(void);
-static void connect_completed(struct ssl_vconn *, int error);
 static bool ssl_wants_io(int ssl_error);
 static void ssl_close(struct vconn *);
-static bool state_machine(struct ssl_vconn *sslv);
+static int interpret_ssl_error(const char *function, int ret, int error);
+static void ssl_do_tx(int fd, short int revents, void *vconn_);
 static DH *tmp_dh_callback(SSL *ssl, int is_export UNUSED, int keylength);
 
 static int
 new_ssl_vconn(const char *name, int fd, enum session_type type,
-              struct vconn **vconnp)
+              enum ssl_state state, struct vconn **vconnp)
 {
     struct ssl_vconn *sslv;
     SSL *ssl = NULL;
@@ -104,13 +107,7 @@ new_ssl_vconn(const char *name, int fd, enum session_type type,
         goto error;
     }
 
-    /* Make 'fd' non-blocking and disable Nagle. */
-    retval = set_nonblocking(fd);
-    if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
-        close(fd);
-        return retval;
-    }
+    /* Disable Nagle. */
     retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
     if (retval) {
         VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
@@ -133,12 +130,14 @@ new_ssl_vconn(const char *name, int fd, enum session_type type,
     /* Create and return the ssl_vconn. */
     sslv = xmalloc(sizeof *sslv);
     sslv->vconn.class = &ssl_vconn_class;
-    sslv->state = STATE_SSL_CONNECTING;
+    sslv->vconn.connect_status = EAGAIN;
+    sslv->state = state;
     sslv->type = type;
     sslv->fd = fd;
     sslv->ssl = ssl;
     sslv->rxbuf = NULL;
     sslv->txbuf = NULL;
+    sslv->tx_waiter = NULL;
     *vconnp = &sslv->vconn;
     return 0;
 
@@ -194,95 +193,74 @@ ssl_open(const char *name, char *suffix, struct vconn **vconnp)
         VLOG_ERR("%s: socket: %s", name, strerror(errno));
         return errno;
     }
+    retval = set_nonblocking(fd);
+    if (retval) {
+        close(fd);
+        return retval;
+    }
 
-    /* Connect socket (blocking). */
+    /* Connect socket. */
     retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
     if (retval < 0) {
-        int error = errno;
-        VLOG_ERR("%s: connect: %s", name, strerror(error));
-        close(fd);
-        return error;
+        if (errno == EINPROGRESS) {
+            return new_ssl_vconn(name, fd, CLIENT, STATE_TCP_CONNECTING,
+                                 vconnp);
+        } else {
+            int error = errno;
+            VLOG_ERR("%s: connect: %s", name, strerror(error));
+            close(fd);
+            return error;
+        }
+    } else {
+        return new_ssl_vconn(name, fd, CLIENT, STATE_SSL_CONNECTING,
+                             vconnp);
     }
-
-    /* Make an ssl_vconn for the socket. */
-    return new_ssl_vconn(name, fd, CLIENT, vconnp);
 }
 
-static void
-ssl_close(struct vconn *vconn)
+static int
+ssl_connect(struct vconn *vconn)
 {
     struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
-    SSL_free(sslv->ssl);
-    close(sslv->fd);
-    free(sslv);
-}
+    int retval;
 
-static bool
-ssl_want_io_to_events(SSL *ssl, short int *events)
-{
-    if (SSL_want_read(ssl)) {
-        *events |= POLLIN;
-        return true;
-    } else if (SSL_want_write(ssl)) {
-        *events |= POLLOUT;
-        return true;
-    } else {
-        return false;
-    }
-}
+    switch (sslv->state) {
+    case STATE_TCP_CONNECTING:
+        retval = check_connection_completion(sslv->fd);
+        if (retval) {
+            return retval;
+        }
+        sslv->state = STATE_SSL_CONNECTING;
+        /* Fall through. */
 
-static bool
-ssl_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
-    struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
-    pfd->fd = sslv->fd;
-    if (!state_machine(sslv)) {
-        switch (sslv->state) {
-        case STATE_SSL_CONNECTING:
-            if (!ssl_want_io_to_events(sslv->ssl, &pfd->events)) {
-                /* state_machine() should have transitioned us away to another
-                 * state. */
-                NOT_REACHED();
+    case STATE_SSL_CONNECTING:
+        retval = (sslv->type == CLIENT
+                   ? SSL_connect(sslv->ssl) : SSL_accept(sslv->ssl));
+        if (retval != 1) {
+            int error = SSL_get_error(sslv->ssl, retval);
+            if (retval < 0 && ssl_wants_io(error)) {
+                return EAGAIN;
+            } else {
+                interpret_ssl_error((sslv->type == CLIENT ? "SSL_connect"
+                                     : "SSL_accept"), retval, error);
+                shutdown(sslv->fd, SHUT_RDWR);
+                return EPROTO;
             }
-            break;
-        default:
-            NOT_REACHED();
-        }
-    } else if (sslv->connect_error) {
-        pfd->events = 0;
-        return true;
-    } else if (!ssl_want_io_to_events(sslv->ssl, &pfd->events)) {
-        if (want & WANT_RECV) {
-            pfd->events |= POLLIN;
-        }
-        if (want & WANT_SEND || sslv->txbuf) {
-            pfd->events |= POLLOUT;
+        } else {
+            return 0;
         }
     }
-    return false;
+
+    NOT_REACHED();
 }
 
 static void
-ssl_postpoll(struct vconn *vconn, short int *revents)
+ssl_close(struct vconn *vconn)
 {
     struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
-    if (!state_machine(sslv)) {
-        *revents = 0;
-    } else if (sslv->connect_error) {
-        *revents |= POLLERR;
-    } else if (*revents & POLLOUT && sslv->txbuf) {
-        ssize_t n = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
-        if (n > 0) {
-            buffer_pull(sslv->txbuf, n);
-            if (sslv->txbuf->size == 0) {
-                buffer_delete(sslv->txbuf);
-                sslv->txbuf = NULL;
-            }
-        }
-        if (sslv->txbuf) {
-            *revents &= ~POLLOUT;
-        }
-    }
+    poll_cancel(sslv->tx_waiter);
+    SSL_free(sslv->ssl);
+    close(sslv->fd);
+    free(sslv);
 }
 
 static int
@@ -355,12 +333,6 @@ ssl_recv(struct vconn *vconn, struct buffer **bufferp)
     size_t want_bytes;
     ssize_t ret;
 
-    if (!state_machine(sslv)) {
-        return EAGAIN;
-    } else if (sslv->connect_error) {
-        return sslv->connect_error;
-    }
-
     if (sslv->rxbuf == NULL) {
         sslv->rxbuf = buffer_new(1564);
     }
@@ -412,18 +384,53 @@ again:
     }
 }
 
+static void
+ssl_clear_txbuf(struct ssl_vconn *sslv)
+{
+    buffer_delete(sslv->txbuf);
+    sslv->txbuf = NULL;
+    sslv->tx_waiter = NULL;
+}
+
+static void
+ssl_register_tx_waiter(struct vconn *vconn) 
+{
+    struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+    short int events = SSL_want_read(sslv->ssl) ? POLLIN : POLLOUT;
+    sslv->tx_waiter = poll_fd_callback(sslv->fd, events, ssl_do_tx, vconn);
+}
+
+static void
+ssl_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+    struct vconn *vconn = vconn_;
+    struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+    int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
+    if (ret > 0) {
+        buffer_pull(sslv->txbuf, ret);
+        if (sslv->txbuf->size == 0) {
+            ssl_clear_txbuf(sslv);
+            return;
+        }
+    } else {
+        int error = SSL_get_error(sslv->ssl, ret);
+        if (error == SSL_ERROR_ZERO_RETURN) {
+            /* Connection closed (EOF). */
+            VLOG_WARN("SSL_write: connection close");
+        } else if (interpret_ssl_error("SSL_write", ret, error) != EAGAIN) {
+            ssl_clear_txbuf(sslv);
+            return;
+        }
+    }
+    ssl_register_tx_waiter(vconn);
+}
+
 static int
 ssl_send(struct vconn *vconn, struct buffer *buffer)
 {
     struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
     ssize_t ret;
 
-    if (!state_machine(sslv)) {
-        return EAGAIN;
-    } else if (sslv->connect_error) {
-        return sslv->connect_error;
-    }
-
     if (sslv->txbuf) {
         return EAGAIN;
     }
@@ -435,6 +442,7 @@ ssl_send(struct vconn *vconn, struct buffer *buffer)
         } else {
             sslv->txbuf = buffer;
             buffer_pull(buffer, ret);
+            ssl_register_tx_waiter(vconn);
         }
         return 0;
     } else {
@@ -449,14 +457,65 @@ ssl_send(struct vconn *vconn, struct buffer *buffer)
     }
 }
 
+static bool
+ssl_needs_wait(struct ssl_vconn *sslv) 
+{
+    if (SSL_want_read(sslv->ssl)) {
+        poll_fd_wait(sslv->fd, POLLIN, NULL);
+        return true;
+    } else if (SSL_want_write(sslv->ssl)) {
+        poll_fd_wait(sslv->fd, POLLOUT, NULL);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+static void
+ssl_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct ssl_vconn *sslv = ssl_vconn_cast(vconn);
+
+    switch (wait) {
+    case WAIT_CONNECT:
+        if (vconn_connect(vconn) != EAGAIN) {
+            poll_immediate_wake();
+        } else if (sslv->state == STATE_TCP_CONNECTING) {
+            poll_fd_wait(sslv->fd, POLLOUT, NULL);
+        } else if (!ssl_needs_wait(sslv)) {
+            NOT_REACHED();
+        }
+        break;
+
+    case WAIT_RECV:
+        if (!ssl_needs_wait(sslv)) {
+            if (SSL_pending(sslv->ssl)) {
+                poll_immediate_wake();
+            } else {
+                poll_fd_wait(sslv->fd, POLLIN, NULL);
+            }
+        }
+        break;
+
+    case WAIT_SEND:
+        if (!sslv->txbuf && !ssl_needs_wait(sslv)) {
+            poll_fd_wait(sslv->fd, POLLOUT, NULL);
+        }
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+}
+
 struct vconn_class ssl_vconn_class = {
     .name = "ssl",
     .open = ssl_open,
     .close = ssl_close,
-    .prepoll = ssl_prepoll,
-    .postpoll = ssl_postpoll,
+    .connect = ssl_connect,
     .recv = ssl_recv,
     .send = ssl_send,
+    .wait = ssl_wait,
 };
 \f
 /* Passive SSL. */
@@ -524,13 +583,13 @@ pssl_open(const char *name, char *suffix, struct vconn **vconnp)
 
     retval = set_nonblocking(fd);
     if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
         close(fd);
         return retval;
     }
 
     pssl = xmalloc(sizeof *pssl);
     pssl->vconn.class = &pssl_vconn_class;
+    pssl->vconn.connect_status = 0;
     pssl->fd = fd;
     *vconnp = &pssl->vconn;
     return 0;
@@ -544,41 +603,46 @@ pssl_close(struct vconn *vconn)
     free(pssl);
 }
 
-static bool
-pssl_prepoll(struct vconn *vconn, int want, struct pollfd *pfd)
-{
-    struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
-    pfd->fd = pssl->fd;
-    if (want & WANT_ACCEPT) {
-        pfd->events |= POLLIN;
-    }
-    return false;
-}
-
 static int
 pssl_accept(struct vconn *vconn, struct vconn **new_vconnp)
 {
     struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
     int new_fd;
+    int error;
 
     new_fd = accept(pssl->fd, NULL, NULL);
     if (new_fd < 0) {
         int error = errno;
         if (error != EAGAIN) {
-            VLOG_DBG("pssl: accept: %s", strerror(error));
+            VLOG_DBG("accept: %s", strerror(error));
         }
         return error;
     }
 
-    return new_ssl_vconn("ssl" /* FIXME */, new_fd, SERVER, new_vconnp);
+    error = set_nonblocking(new_fd);
+    if (error) {
+        close(new_fd);
+        return error;
+    }
+
+    return new_ssl_vconn("ssl" /* FIXME */, new_fd,
+                         SERVER, STATE_SSL_CONNECTING, new_vconnp);
+}
+
+static void
+pssl_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct pssl_vconn *pssl = pssl_vconn_cast(vconn);
+    assert(wait == WAIT_ACCEPT);
+    poll_fd_wait(pssl->fd, POLLIN, NULL);
 }
 
 struct vconn_class pssl_vconn_class = {
     .name = "pssl",
     .open = pssl_open,
     .close = pssl_close,
-    .prepoll = pssl_prepoll,
     .accept = pssl_accept,
+    .wait = pssl_wait,
 };
 \f
 /*
@@ -633,37 +697,6 @@ do_ssl_init(void)
     return 0;
 }
 
-static bool
-state_machine(struct ssl_vconn *sslv)
-{
-    if (sslv->state == STATE_SSL_CONNECTING) {
-        int ret = (sslv->type == CLIENT
-                   ? SSL_connect(sslv->ssl) : SSL_accept(sslv->ssl));
-        if (ret != 1) {
-            int error = SSL_get_error(sslv->ssl, ret);
-            if (ret < 0 && ssl_wants_io(error)) {
-                /* Stay in this state to repeat the SSL_connect later. */
-                return false;
-            } else {
-                interpret_ssl_error((sslv->type == CLIENT ? "SSL_connect"
-                                     : "SSL_accept"), ret, error);
-                shutdown(sslv->fd, SHUT_RDWR);
-                connect_completed(sslv, EPROTO);
-            }
-        } else {
-            connect_completed(sslv, 0);
-        }
-    }
-    return sslv->state == STATE_CONNECTED;
-}
-
-static void
-connect_completed(struct ssl_vconn *sslv, int error)
-{
-    sslv->state = STATE_CONNECTED;
-    sslv->connect_error = error;
-}
-
 static DH *
 tmp_dh_callback(SSL *ssl, int is_export UNUSED, int keylength)
 {
index f97761f..996ac17 100644 (file)
@@ -35,6 +35,7 @@
 #include "util.h"
 #include "openflow.h"
 #include "ofp-print.h"
+#include "poll-loop.h"
 
 #include "vlog.h"
 #define THIS_MODULE VLM_vconn_tcp
@@ -47,22 +48,17 @@ struct tcp_vconn
     int fd;
     struct buffer *rxbuf;
     struct buffer *txbuf;
+    struct poll_waiter *tx_waiter;
 };
 
 static int
-new_tcp_vconn(const char *name, int fd, struct vconn **vconnp) 
+new_tcp_vconn(const char *name, int fd, int connect_status,
+              struct vconn **vconnp)
 {
     struct tcp_vconn *tcp;
     int on = 1;
     int retval;
 
-    retval = set_nonblocking(fd);
-    if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
-        close(fd);
-        return retval;
-    }
-
     retval = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
     if (retval) {
         VLOG_ERR("%s: setsockopt(TCP_NODELAY): %s", name, strerror(errno));
@@ -72,18 +68,20 @@ new_tcp_vconn(const char *name, int fd, struct vconn **vconnp)
 
     tcp = xmalloc(sizeof *tcp);
     tcp->vconn.class = &tcp_vconn_class;
+    tcp->vconn.connect_status = connect_status;
     tcp->fd = fd;
     tcp->txbuf = NULL;
+    tcp->tx_waiter = NULL;
     tcp->rxbuf = NULL;
     *vconnp = &tcp->vconn;
     return 0;
 }
 
 static struct tcp_vconn *
-tcp_vconn_cast(struct vconn *vconn) 
+tcp_vconn_cast(struct vconn *vconn)
 {
     assert(vconn->class == &tcp_vconn_class);
-    return CONTAINER_OF(vconn, struct tcp_vconn, vconn); 
+    return CONTAINER_OF(vconn, struct tcp_vconn, vconn);
 }
 
 
@@ -120,61 +118,41 @@ tcp_open(const char *name, char *suffix, struct vconn **vconnp)
         return errno;
     }
 
-    retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
-    if (retval < 0) {
-        int error = errno;
-        VLOG_ERR("%s: connect: %s", name, strerror(error));
+    retval = set_nonblocking(fd);
+    if (retval) {
         close(fd);
-        return error;
+        return retval;
     }
 
-    return new_tcp_vconn(name, fd, vconnp);
+    retval = connect(fd, (struct sockaddr *) &sin, sizeof sin);
+    if (retval < 0) {
+        if (errno == EINPROGRESS) {
+            return new_tcp_vconn(name, fd, EAGAIN, vconnp);
+        } else {
+            int error = errno;
+            VLOG_ERR("%s: connect: %s", name, strerror(error));
+            close(fd);
+            return error;
+        }
+    } else {
+        return new_tcp_vconn(name, fd, 0, vconnp);
+    }
 }
 
 static void
-tcp_close(struct vconn *vconn) 
+tcp_close(struct vconn *vconn)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    poll_cancel(tcp->tx_waiter);
     close(tcp->fd);
     free(tcp);
 }
 
-static bool
-tcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd) 
+static int
+tcp_connect(struct vconn *vconn)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
-    pfd->fd = tcp->fd;
-    if (want & WANT_RECV) {
-        pfd->events |= POLLIN;
-    }
-    if (want & WANT_SEND || tcp->txbuf) {
-        pfd->events |= POLLOUT;
-    }
-    return false;
-}
-
-static void
-tcp_postpoll(struct vconn *vconn, short int *revents)
-{
-    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
-    if (*revents & POLLOUT && tcp->txbuf) {
-        ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
-        if (n < 0) {
-            if (errno != EAGAIN) {
-                VLOG_ERR("send: %s", strerror(errno));
-                *revents |= POLLERR;
-            }
-        } else if (n > 0) {
-            buffer_pull(tcp->txbuf, n);
-            if (tcp->txbuf->size == 0) {
-                buffer_delete(tcp->txbuf);
-                tcp->txbuf = NULL;
-            }
-        }
-        if (tcp->txbuf) {
-            *revents &= ~POLLOUT;
-        }
-    }
+    return check_connection_completion(tcp->fd);
 }
 
 static int
@@ -224,8 +202,38 @@ again:
     }
 }
 
+static void
+tcp_clear_txbuf(struct tcp_vconn *tcp)
+{
+    buffer_delete(tcp->txbuf);
+    tcp->txbuf = NULL;
+    tcp->tx_waiter = NULL;
+}
+
+static void
+tcp_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+    struct vconn *vconn = vconn_;
+    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    ssize_t n = write(tcp->fd, tcp->txbuf->data, tcp->txbuf->size);
+    if (n < 0) {
+        if (errno != EAGAIN) {
+            VLOG_ERR("send: %s", strerror(errno));
+            tcp_clear_txbuf(tcp);
+            return;
+        }
+    } else if (n > 0) {
+        buffer_pull(tcp->txbuf, n);
+        if (!tcp->txbuf->size) {
+            tcp_clear_txbuf(tcp);
+            return;
+        }
+    }
+    tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
+}
+
 static int
-tcp_send(struct vconn *vconn, struct buffer *buffer) 
+tcp_send(struct vconn *vconn, struct buffer *buffer)
 {
     struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
     ssize_t retval;
@@ -243,20 +251,47 @@ tcp_send(struct vconn *vconn, struct buffer *buffer)
         if (retval > 0) {
             buffer_pull(buffer, retval);
         }
+        tcp->tx_waiter = poll_fd_callback(tcp->fd, POLLOUT, tcp_do_tx, vconn);
         return 0;
     } else {
         return errno;
     }
 }
 
+static void
+tcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct tcp_vconn *tcp = tcp_vconn_cast(vconn);
+    switch (wait) {
+    case WAIT_CONNECT:
+        poll_fd_wait(tcp->fd, POLLOUT, NULL);
+        break;
+
+    case WAIT_SEND:
+        if (!tcp->txbuf) {
+            poll_fd_wait(tcp->fd, POLLOUT, NULL);
+        } else {
+            /* Nothing to do: need to drain txbuf first. */
+        }
+        break;
+
+    case WAIT_RECV:
+        poll_fd_wait(tcp->fd, POLLIN, NULL);
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+}
+
 struct vconn_class tcp_vconn_class = {
     .name = "tcp",
     .open = tcp_open,
     .close = tcp_close,
-    .prepoll = tcp_prepoll,
-    .postpoll = tcp_postpoll,
+    .connect = tcp_connect,
     .recv = tcp_recv,
     .send = tcp_send,
+    .wait = tcp_wait,
 };
 \f
 /* Passive TCP. */
@@ -268,10 +303,10 @@ struct ptcp_vconn
 };
 
 static struct ptcp_vconn *
-ptcp_vconn_cast(struct vconn *vconn) 
+ptcp_vconn_cast(struct vconn *vconn)
 {
     assert(vconn->class == &ptcp_vconn_class);
-    return CONTAINER_OF(vconn, struct ptcp_vconn, vconn); 
+    return CONTAINER_OF(vconn, struct ptcp_vconn, vconn);
 }
 
 static int
@@ -317,56 +352,64 @@ ptcp_open(const char *name, char *suffix, struct vconn **vconnp)
 
     retval = set_nonblocking(fd);
     if (retval) {
-        VLOG_ERR("%s: set_nonblocking: %s", name, strerror(retval));
         close(fd);
         return retval;
     }
 
     ptcp = xmalloc(sizeof *ptcp);
     ptcp->vconn.class = &ptcp_vconn_class;
+    ptcp->vconn.connect_status = 0;
     ptcp->fd = fd;
     *vconnp = &ptcp->vconn;
     return 0;
 }
 
 static void
-ptcp_close(struct vconn *vconn) 
+ptcp_close(struct vconn *vconn)
 {
     struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
     close(ptcp->fd);
     free(ptcp);
 }
 
-static bool
-ptcp_prepoll(struct vconn *vconn, int want, struct pollfd *pfd) 
-{
-    struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
-    pfd->fd = ptcp->fd;
-    if (want & WANT_ACCEPT) {
-        pfd->events |= POLLIN;
-    }
-    return false;
-}
-
 static int
-ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp) 
+ptcp_accept(struct vconn *vconn, struct vconn **new_vconnp)
 {
     struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
     int new_fd;
+    int error;
 
     new_fd = accept(ptcp->fd, NULL, NULL);
     if (new_fd < 0) {
-        return errno;
+        int error = errno;
+        if (error != EAGAIN) {
+            VLOG_DBG("accept: %s", strerror(error));
+        }
+        return error;
     }
 
-    return new_tcp_vconn("tcp" /* FIXME */, new_fd, new_vconnp);
+    error = set_nonblocking(new_fd);
+    if (error) {
+        close(new_fd);
+        return error;
+    }
+
+    return new_tcp_vconn("tcp" /* FIXME */, new_fd, 0, new_vconnp);
+}
+
+static void
+ptcp_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct ptcp_vconn *ptcp = ptcp_vconn_cast(vconn);
+    assert(wait == WAIT_ACCEPT);
+    poll_fd_wait(ptcp->fd, POLLIN, NULL);
 }
 
 struct vconn_class ptcp_vconn_class = {
     .name = "ptcp",
     .open = ptcp_open,
     .close = ptcp_close,
-    .prepoll = ptcp_prepoll,
     .accept = ptcp_accept,
+    .wait = ptcp_wait
 };
 
index be96ef9..4cc1baf 100644 (file)
 #include "buffer.h"
 #include "flow.h"
 #include "openflow.h"
+#include "poll-loop.h"
 #include "util.h"
 
+#define THIS_MODULE VLM_vconn
+#include "vlog.h"
+
 static struct vconn_class *vconn_classes[] = {
     &tcp_vconn_class,
     &ptcp_vconn_class,
@@ -45,7 +49,7 @@ static struct vconn_class *vconn_classes[] = {
 
 /* Check the validity of the vconn class structures. */
 static void
-check_vconn_classes(void) 
+check_vconn_classes(void)
 {
 #ifndef NDEBUG
     size_t i;
@@ -55,10 +59,10 @@ check_vconn_classes(void)
         assert(class->name != NULL);
         assert(class->open != NULL);
         assert(class->close != NULL);
-        assert(class->prepoll != NULL);
         assert(class->accept
                ? !class->recv && !class->send
-               : class->recv && class->send);
+               :  class->recv && class->send);
+        assert(class->wait != NULL);
     }
 #endif
 }
@@ -71,7 +75,7 @@ check_vconn_classes(void)
  * stores a pointer to the new connection in '*vconnp', otherwise a null
  * pointer.  */
 int
-vconn_open(const char *name, struct vconn **vconnp) 
+vconn_open(const char *name, struct vconn **vconnp)
 {
     size_t prefix_len;
     size_t i;
@@ -91,6 +95,9 @@ vconn_open(const char *name, struct vconn **vconnp)
             free(suffix_copy);
             if (retval) {
                 *vconnp = NULL;
+            } else {
+                assert((*vconnp)->connect_status != EAGAIN
+                       || (*vconnp)->class->connect);
             }
             return retval;
         }
@@ -99,9 +106,31 @@ vconn_open(const char *name, struct vconn **vconnp)
     abort();
 }
 
+int
+vconn_open_block(const char *name, struct vconn **vconnp)
+{
+    struct vconn *vconn;
+    int error;
+
+    error = vconn_open(name, &vconn);
+    while (error == EAGAIN) {
+        vconn_connect_wait(vconn);
+        poll_block();
+        error = vconn_connect(vconn);
+        assert(error != EINPROGRESS);
+    }
+    if (error) {
+        vconn_close(vconn);
+        *vconnp = NULL;
+    } else {
+        *vconnp = vconn;
+    }
+    return error;
+}
+
 /* Closes 'vconn'. */
 void
-vconn_close(struct vconn *vconn) 
+vconn_close(struct vconn *vconn)
 {
     if (vconn != NULL) {
         (vconn->class->close)(vconn);
@@ -113,40 +142,23 @@ vconn_close(struct vconn *vconn)
  * 'vconn' is an active vconn, that is, its purpose is to transfer data, not
  * to wait for new connections to arrive. */
 bool
-vconn_is_passive(const struct vconn *vconn) 
+vconn_is_passive(const struct vconn *vconn)
 {
     return vconn->class->accept != NULL;
 }
 
-/* Initializes 'pfd->fd' and 'pfd->events' appropriately so that poll() will
- * wake up when the connection becomes available for the operations specified
- * in 'want', or for performing the vconn's needed internal processing.
- *
- * Normally returns false.  Returns true to indicate that no blocking should
- * happen in poll() because the connection is available for some operation
- * specified in 'want' but that status cannot be detected via poll() and thus
- * poll() could block forever otherwise. */
-bool
-vconn_prepoll(struct vconn *vconn, int want, struct pollfd *pollfd)
-{
-    return (vconn->class->prepoll)(vconn, want, pollfd);
-}
-
-/* Perform any internal processing needed by the connections.  The vconn file
- * descriptor's status, as reported by poll(), must be provided in '*revents'.
- *
- * The postpoll function adjusts '*revents' to reflect the status of the
- * connection from the caller's point of view.  That is, upon return '*revents
- * & POLLIN' indicates that a packet is (potentially) ready to be read (for an
- * active vconn) or a new connection is ready to be accepted (for a passive
- * vconn) and '*revents & POLLOUT' indicates that a packet is (potentially)
- * ready to be written. */
-void
-vconn_postpoll(struct vconn *vconn, short int *revents) 
+/* Tries to complete the connection on 'vconn', which must be an active
+ * vconn.  If 'vconn''s connection is complete, returns 0 if the connection
+ * was successful or a positive errno value if it failed.  If the
+ * connection is still in progress, returns EAGAIN. */
+int
+vconn_connect(struct vconn *vconn)
 {
-    if (vconn->class->postpoll) {
-        (vconn->class->postpoll)(vconn, revents);
-    } 
+    if (vconn->connect_status == EAGAIN) {
+        vconn->connect_status = (vconn->class->connect)(vconn);
+        assert(vconn->connect_status != EINPROGRESS);
+    }
+    return vconn->connect_status;
 }
 
 /* Tries to accept a new connection on 'vconn', which must be a passive vconn.
@@ -156,11 +168,17 @@ vconn_postpoll(struct vconn *vconn, short int *revents)
  * vconn_accept will not block waiting for a connection.  If no connection is
  * ready to be accepted, it returns EAGAIN immediately. */
 int
-vconn_accept(struct vconn *vconn, struct vconn **new_vconn) 
+vconn_accept(struct vconn *vconn, struct vconn **new_vconn)
 {
-    int retval = (vconn->class->accept)(vconn, new_vconn);
+    int retval;
+
+    retval = (vconn->class->accept)(vconn, new_vconn);
+
     if (retval) {
         *new_vconn = NULL;
+    } else {
+        assert((*new_vconn)->connect_status != EAGAIN
+               || (*new_vconn)->class->connect);
     }
     return retval;
 }
@@ -174,9 +192,12 @@ vconn_accept(struct vconn *vconn, struct vconn **new_vconn)
  * vconn_recv will not block waiting for a packet to arrive.  If no packets
  * have been received, it returns EAGAIN immediately. */
 int
-vconn_recv(struct vconn *vconn, struct buffer **msgp) 
+vconn_recv(struct vconn *vconn, struct buffer **msgp)
 {
-    int retval = (vconn->class->recv)(vconn, msgp);
+    int retval = vconn_connect(vconn);
+    if (!retval) {
+        retval = (vconn->class->recv)(vconn, msgp);
+    }
     if (retval) {
         *msgp = NULL;
     }
@@ -195,37 +216,76 @@ vconn_recv(struct vconn *vconn, struct buffer **msgp)
  * vconn_send will not block.  If 'msg' cannot be immediately accepted for
  * transmission, it returns EAGAIN immediately. */
 int
-vconn_send(struct vconn *vconn, struct buffer *msg) 
+vconn_send(struct vconn *vconn, struct buffer *msg)
 {
-    return (vconn->class->send)(vconn, msg);
+    int retval = vconn_connect(vconn);
+    if (!retval) {
+        retval = (vconn->class->send)(vconn, msg);
+    }
+    return retval;
 }
 
 /* Same as vconn_send, except that it waits until 'msg' can be transmitted. */
 int
-vconn_send_wait(struct vconn *vconn, struct buffer *msg) 
+vconn_send_block(struct vconn *vconn, struct buffer *msg)
 {
     int retval;
     while ((retval = vconn_send(vconn, msg)) == EAGAIN) {
-        struct pollfd pfd;
+        vconn_send_wait(vconn);
+        poll_block();
+    }
+    return retval;
+}
+
+void
+vconn_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    int connect_status;
+
+    assert(vconn_is_passive(vconn)
+           ? wait == WAIT_ACCEPT || wait == WAIT_CONNECT
+           : wait == WAIT_CONNECT || wait == WAIT_RECV || wait == WAIT_SEND);
 
-        pfd.fd = -1;
-        pfd.events = 0;
-        vconn_prepoll(vconn, WANT_SEND, &pfd);
-        do {
-            retval = poll(&pfd, 1, -1);
-        } while (retval < 0 && errno == EINTR);
-        if (retval < 0) {
-            return errno;
+    connect_status = vconn_connect(vconn);
+    if (connect_status) {
+        if (connect_status == EAGAIN) {
+            wait = WAIT_CONNECT;
+        } else {
+            poll_immediate_wake();
+            return;
         }
-        assert(retval == 1);
-        vconn_postpoll(vconn, &pfd.revents);
     }
-    return retval;
+
+    (vconn->class->wait)(vconn, wait);
+}
+
+void
+vconn_connect_wait(struct vconn *vconn)
+{
+    vconn_wait(vconn, WAIT_CONNECT);
+}
+
+void
+vconn_accept_wait(struct vconn *vconn)
+{
+    vconn_wait(vconn, WAIT_ACCEPT);
+}
+
+void
+vconn_recv_wait(struct vconn *vconn)
+{
+    vconn_wait(vconn, WAIT_RECV);
+}
+
+void
+vconn_send_wait(struct vconn *vconn)
+{
+    vconn_wait(vconn, WAIT_SEND);
 }
 
 struct buffer *
 make_add_simple_flow(const struct flow *flow,
-                     uint32_t buffer_id, uint16_t out_port) 
+                     uint32_t buffer_id, uint16_t out_port)
 {
     struct ofp_flow_mod *ofm;
     size_t size = sizeof *ofm + sizeof ofm->actions[0];
index b3c2a28..08a8e95 100644 (file)
@@ -31,6 +31,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include "fatal-signal.h"
+#include "poll-loop.h"
 #include "util.h"
 #include "vlog.h"
 
@@ -43,10 +44,13 @@ static int make_unix_socket(bool nonblock, bool passcred,
 \f
 /* Server for Vlog control connection. */
 struct vlog_server {
+    struct poll_waiter *waiter;
     char *path;
     int fd;
 };
 
+static void poll_server(int fd, short int events, void *server_);
+
 /* Start listening for connections from clients and processing their
  * requests.  'path' may be:
  *
@@ -80,10 +84,17 @@ vlog_server_listen(const char *path, struct vlog_server **serverp)
         free(server);
         fprintf(stderr, "Could not initialize vlog configuration socket: %s\n",
                 strerror(-server->fd));
-        *serverp = NULL;
+        if (serverp) {
+            *serverp = NULL; 
+        }
         return fd;
     }
-    *serverp = server;
+
+    server->waiter = poll_fd_callback(server->fd, POLLIN, poll_server, server);
+
+    if (serverp) {
+        *serverp = server; 
+    }
     return 0;
 }
 
@@ -92,6 +103,7 @@ void
 vlog_server_close(struct vlog_server *server)
 {
     if (server) {
+        poll_cancel(server->waiter);
         close(server->fd);
         unlink(server->path);
         fatal_signal_remove_file_to_unlink(server->path);
@@ -100,14 +112,6 @@ vlog_server_close(struct vlog_server *server)
     }
 }
 
-/* Returns the fd used by 'server'.  The caller can poll this fd (POLLIN) to
- * determine when to call vlog_server_poll(). */
-int
-vlog_server_get_fd(const struct vlog_server *server)
-{
-    return server->fd;
-}
-
 static int
 recv_with_creds(const struct vlog_server *server,
                 char *cmd_buf, size_t cmd_buf_size,
@@ -212,9 +216,10 @@ recv_with_creds(const struct vlog_server *server,
 }
 
 /* Processes incoming requests for 'server'. */
-void
-vlog_server_poll(struct vlog_server *server)
+static void
+poll_server(int fd UNUSED, short int events, void *server_)
 {
+    struct vlog_server *server = server_;
     for (;;) {
         char cmd_buf[512];
         struct sockaddr_un un;
@@ -228,7 +233,7 @@ vlog_server_poll(struct vlog_server *server)
                 fprintf(stderr, "vlog: reading configuration socket: %s",
                         strerror(errno));
             }
-            return;
+            break;
         } else if (error < 0) {
             continue;
         }
@@ -246,6 +251,7 @@ vlog_server_poll(struct vlog_server *server)
                (struct sockaddr*) &un, un_len);
         free(reply);
     }
+    server->waiter = poll_fd_callback(server->fd, POLLIN, poll_server, server);
 }
 \f
 /* Client for Vlog control connection. */
index f50b3ac..dac24fd 100644 (file)
@@ -36,6 +36,7 @@
 #include "vconn.h"
 #include "vlog-socket.h"
 #include "openflow.h"
+#include "poll-loop.h"
 
 #include "vlog.h"
 #define THIS_MODULE VLM_secchan
@@ -48,7 +49,6 @@ static bool reliable = true;
 struct half {
     const char *name;
     struct vconn *vconn;
-    struct pollfd *pollfd;
     struct buffer *rxbuf;
     time_t backoff_deadline;
     int backoff;
@@ -60,8 +60,6 @@ int
 main(int argc, char *argv[])
 {
     struct half halves[2];
-    struct pollfd pollfds[2 + 1];
-    struct vlog_server *vlog_server;
     int retval;
     int i;
 
@@ -74,7 +72,7 @@ main(int argc, char *argv[])
         fatal(0, "exactly two peer arguments required; use --help for usage");
     }
 
-    retval = vlog_server_listen(NULL, &vlog_server);
+    retval = vlog_server_listen(NULL, NULL);
     if (retval) {
         fatal(retval, "Could not listen for vlog connections");
     }
@@ -82,82 +80,65 @@ main(int argc, char *argv[])
     for (i = 0; i < 2; i++) {
         halves[i].name = argv[optind + i];
         halves[i].vconn = NULL;
-        halves[i].pollfd = &pollfds[i];
         halves[i].rxbuf = NULL;
         halves[i].backoff_deadline = 0;
         halves[i].backoff = 1;
         reconnect(&halves[i]);
     }
     for (;;) {
-        size_t n_ready;
-        
-        /* Wait until there's something to do. */
-        n_ready = 0;
-        for (i = 0; i < 2; i++) {
-            struct half *this = &halves[i];
-            struct half *peer = &halves[!i];
-            int want = 0;
-            if (peer->rxbuf) {
-                want |= WANT_SEND;
-            }
-            if (!this->rxbuf) {
-                want |= WANT_RECV;
-            }
-            this->pollfd->fd = -1;
-            this->pollfd->events = 0;
-            n_ready += vconn_prepoll(this->vconn, want, this->pollfd);
-        }
-        if (vlog_server) {
-            pollfds[2].fd = vlog_server_get_fd(vlog_server);
-            pollfds[2].events = POLLIN;
-        }
-        do {
-            retval = poll(pollfds, 2 + (vlog_server != NULL),
-                          n_ready ? 0 : -1);
-        } while (retval < 0 && errno == EINTR);
-        if (retval < 0 || (retval == 0 && !n_ready)) {
-            fatal(retval < 0 ? errno : 0, "poll");
-        }
+        /* Do some work.  Limit the number of iterations so that callbacks
+         * registered with the poll loop don't starve. */
+        int iteration;
+        for (iteration = 0; iteration < 50; iteration++) {
+            bool progress = false;
+            for (i = 0; i < 2; i++) {
+                struct half *this = &halves[i];
+                struct half *peer = &halves[!i];
+
+                if (!this->rxbuf) {
+                    retval = vconn_recv(this->vconn, &this->rxbuf);
+                    if (retval && retval != EAGAIN) {
+                        if (retval == EOF) {
+                            VLOG_DBG("%s: connection closed by remote host",
+                                     this->name); 
+                        } else {
+                            VLOG_DBG("%s: recv: closing connection: %s",
+                                     this->name, strerror(retval));
+                        }
+                        reconnect(this);
+                        break;
+                    }
+                }
 
-        /* Let each connection deal with any pending operations. */
-        for (i = 0; i < 2; i++) {
-            struct half *this = &halves[i];
-            vconn_postpoll(this->vconn, &this->pollfd->revents);
-            if (this->pollfd->revents & POLLERR) {
-                this->pollfd->revents |= POLLIN | POLLOUT;
+                if (this->rxbuf) {
+                    retval = vconn_send(peer->vconn, this->rxbuf);
+                    if (!retval) {
+                        this->rxbuf = NULL;
+                        progress = true;
+                    } else if (retval != EAGAIN) {
+                        VLOG_DBG("%s: send: closing connection: %s",
+                                 peer->name, strerror(retval));
+                        reconnect(peer);
+                        break;
+                    }
+                }
+            }
+            if (!progress) {
+                break;
             }
-        }
-        if (vlog_server && pollfds[2].revents) {
-            vlog_server_poll(vlog_server);
         }
 
-        /* Do as much work as we can without waiting. */
+        /* Wait for something to happen. */
         for (i = 0; i < 2; i++) {
             struct half *this = &halves[i];
             struct half *peer = &halves[!i];
-
-            if (this->pollfd->revents & POLLIN && !this->rxbuf) {
-                retval = vconn_recv(this->vconn, &this->rxbuf);
-                if (retval && retval != EAGAIN) {
-                    VLOG_DBG("%s: recv: closing connection: %s",
-                             this->name, strerror(retval));
-                    reconnect(this);
-                    break;
-                }
+            if (!this->rxbuf) {
+                vconn_recv_wait(this->vconn);
+            } else {
+                vconn_send_wait(peer->vconn);
             }
-
-            if (peer->pollfd->revents & POLLOUT && this->rxbuf) {
-                retval = vconn_send(peer->vconn, this->rxbuf);
-                if (!retval) {
-                    this->rxbuf = NULL;
-                } else if (retval != EAGAIN) {
-                    VLOG_DBG("%s: send: closing connection: %s",
-                             peer->name, strerror(retval));
-                    reconnect(peer); 
-                    break;
-                }
-            } 
         }
+        poll_block();
     }
 
     return 0;
@@ -177,7 +158,6 @@ reconnect(struct half *this)
         buffer_delete(this->rxbuf);
         this->rxbuf = NULL;
     }
-    this->pollfd->revents = POLLIN | POLLOUT;
 
     for (;;) {
         time_t now = time(0);
@@ -192,10 +172,11 @@ reconnect(struct half *this)
             }
             VLOG_WARN("%s: waiting %d seconds before reconnect\n",
                       this->name, (int) (this->backoff_deadline - now));
-            sleep(this->backoff_deadline - now);
+            poll_timer_wait((this->backoff_deadline - now) * 1000);
+            poll_block();
         }
 
-        retval = vconn_open(this->name, &this->vconn);
+        retval = vconn_open_block(this->name, &this->vconn);
         if (!retval) {
             VLOG_WARN("%s: connected", this->name);
             if (vconn_is_passive(this->vconn)) {
@@ -209,7 +190,7 @@ reconnect(struct half *this)
         if (!reliable) {
             fatal(0, "%s: connection failed", this->name);
         }
-        VLOG_WARN("%s: connection failed (%s)", this->name, strerror(errno));
+        VLOG_WARN("%s: connection failed (%s)", this->name, strerror(retval));
         this->backoff_deadline = time(0) + this->backoff;
     }
 }
index 1bf3699..a608362 100644 (file)
@@ -480,7 +480,7 @@ static void do_add_flows(int argc, char *argv[])
         ofm->group_id = htonl(0);
         str_to_flow(line, &ofm->match, &ofm->actions[0]);
 
-        retval = vconn_send_wait(vconn, buffer);
+        retval = vconn_send_block(vconn, buffer);
         if (retval) {
             fatal(retval, "sending to datapath");
         }