vconn: Introduce infrastructure for stream socket-based vconns.
authorBen Pfaff <blp@nicira.com>
Wed, 30 Jul 2008 22:44:22 +0000 (15:44 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 30 Jul 2008 22:44:22 +0000 (15:44 -0700)
include/Makefile.am
include/vconn-stream.h [new file with mode: 0644]
include/vlog.h
lib/Makefile.am
lib/vconn-stream.c [new file with mode: 0644]

index c37f142..4f0a465 100644 (file)
@@ -32,6 +32,7 @@ noinst_HEADERS = \
        util.h \
        vconn.h \
        vconn-ssl.h \
+       vconn-stream.h \
        vlog-socket.h \
        vlog.h \
        xtoxll.h
diff --git a/include/vconn-stream.h b/include/vconn-stream.h
new file mode 100644 (file)
index 0000000..d7eb59f
--- /dev/null
@@ -0,0 +1,50 @@
+/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford
+ * Junior University
+ * 
+ * We are making the OpenFlow specification and associated documentation
+ * (Software) available for public use and benefit with the expectation
+ * that others will use, modify and enhance the Software and contribute
+ * those enhancements back to the community. However, since we would
+ * like to make the Software available for broadest use, with as few
+ * restrictions as possible permission is hereby granted, free of
+ * charge, to any person obtaining a copy of this Software to deal in
+ * the Software under the copyrights without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ * 
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ * 
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * 
+ * The name and trademarks of copyright holder(s) may NOT be used in
+ * advertising or publicity pertaining to the Software or any
+ * derivatives without specific, written prior permission.
+ */
+
+#ifndef VCONN_STREAM_H
+#define VCONN_STREAM_H 1
+
+#include <stddef.h>
+#include <stdint.h>
+
+struct vconn;
+struct sockaddr;
+
+int new_stream_vconn(const char *name, int fd, int connect_status,
+                     uint32_t ip, struct vconn **vconnp);
+int new_pstream_vconn(const char *name, int fd,
+                      int (*accept_cb)(int fd, const struct sockaddr *,
+                                       size_t sa_len, struct vconn **),
+                      struct vconn **vconnp);
+
+#endif /* vconn-stream.h */
index 2ecaa41..a25c528 100644 (file)
@@ -85,6 +85,7 @@ enum vlog_facility vlog_get_facility_val(const char *name);
         VLOG_MODULE(vconn_netlink)              \
         VLOG_MODULE(vconn_tcp)                  \
         VLOG_MODULE(vconn_ssl)                  \
+        VLOG_MODULE(vconn_stream)               \
         VLOG_MODULE(vconn)                      \
         VLOG_MODULE(vlog)                       \
 
index bedd77f..6c13fc5 100644 (file)
@@ -26,6 +26,7 @@ libopenflow_a_SOURCES = \
        socket-util.c \
        util.c \
        vconn-tcp.c \
+       vconn-stream.c \
        vconn.c \
        vlog-socket.c \
        vlog.c
diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c
new file mode 100644 (file)
index 0000000..28d9635
--- /dev/null
@@ -0,0 +1,354 @@
+/* Copyright (c) 2008 The Board of Trustees of The Leland Stanford
+ * Junior University
+ *
+ * We are making the OpenFlow specification and associated documentation
+ * (Software) available for public use and benefit with the expectation
+ * that others will use, modify and enhance the Software and contribute
+ * those enhancements back to the community. However, since we would
+ * like to make the Software available for broadest use, with as few
+ * restrictions as possible permission is hereby granted, free of
+ * charge, to any person obtaining a copy of this Software to deal in
+ * the Software under the copyrights without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * The name and trademarks of copyright holder(s) may NOT be used in
+ * advertising or publicity pertaining to the Software or any
+ * derivatives without specific, written prior permission.
+ */
+
+#include <config.h>
+#include "vconn-stream.h"
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "buffer.h"
+#include "util.h"
+#include "openflow.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "vconn.h"
+
+#include "vlog.h"
+#define THIS_MODULE VLM_vconn_stream
+
+/* Active stream socket vconn. */
+
+struct stream_vconn
+{
+    struct vconn vconn;
+    int fd;
+    struct buffer *rxbuf;
+    struct buffer *txbuf;
+    struct poll_waiter *tx_waiter;
+};
+
+static struct vconn_class stream_vconn_class;
+
+int
+new_stream_vconn(const char *name, int fd, int connect_status,
+                 uint32_t ip, struct vconn **vconnp)
+{
+    struct stream_vconn *s;
+
+    s = xmalloc(sizeof *s);
+    s->vconn.class = &stream_vconn_class;
+    s->vconn.connect_status = connect_status;
+    s->vconn.ip = ip;
+    s->fd = fd;
+    s->txbuf = NULL;
+    s->tx_waiter = NULL;
+    s->rxbuf = NULL;
+    *vconnp = &s->vconn;
+    return 0;
+}
+
+static struct stream_vconn *
+stream_vconn_cast(struct vconn *vconn)
+{
+    assert(vconn->class == &stream_vconn_class);
+    return CONTAINER_OF(vconn, struct stream_vconn, vconn);
+}
+
+static void
+stream_close(struct vconn *vconn)
+{
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    poll_cancel(s->tx_waiter);
+    close(s->fd);
+    free(s);
+}
+
+static int
+stream_connect(struct vconn *vconn)
+{
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    return check_connection_completion(s->fd);
+}
+
+static int
+stream_recv(struct vconn *vconn, struct buffer **bufferp)
+{
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    struct buffer *rx;
+    size_t want_bytes;
+    ssize_t retval;
+
+    if (s->rxbuf == NULL) {
+        s->rxbuf = buffer_new(1564);
+    }
+    rx = s->rxbuf;
+
+again:
+    if (sizeof(struct ofp_header) > rx->size) {
+        want_bytes = sizeof(struct ofp_header) - rx->size;
+    } else {
+        struct ofp_header *oh = rx->data;
+        size_t length = ntohs(oh->length);
+        if (length < sizeof(struct ofp_header)) {
+            VLOG_ERR("received too-short ofp_header (%zu bytes)", length);
+            return EPROTO;
+        }
+        want_bytes = length - rx->size;
+        if (!want_bytes) {
+            *bufferp = rx;
+            s->rxbuf = NULL;
+            return 0;
+        }
+    }
+    buffer_prealloc_tailroom(rx, want_bytes);
+
+    retval = read(s->fd, buffer_tail(rx), want_bytes);
+    if (retval > 0) {
+        rx->size += retval;
+        if (retval == want_bytes) {
+            if (rx->size > sizeof(struct ofp_header)) {
+                *bufferp = rx;
+                s->rxbuf = NULL;
+                return 0;
+            } else {
+                goto again;
+            }
+        }
+        return EAGAIN;
+    } else if (retval == 0) {
+        if (rx->size) {
+            VLOG_ERR("connection dropped mid-packet");
+            return EPROTO;
+        } else {
+            return EOF;
+        }
+    } else {
+        return retval ? errno : EAGAIN;
+    }
+}
+
+static void
+stream_clear_txbuf(struct stream_vconn *s)
+{
+    buffer_delete(s->txbuf);
+    s->txbuf = NULL;
+    s->tx_waiter = NULL;
+}
+
+static void
+stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
+{
+    struct vconn *vconn = vconn_;
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size);
+    if (n < 0) {
+        if (errno != EAGAIN) {
+            VLOG_ERR("send: %s", strerror(errno));
+            stream_clear_txbuf(s);
+            return;
+        }
+    } else if (n > 0) {
+        buffer_pull(s->txbuf, n);
+        if (!s->txbuf->size) {
+            stream_clear_txbuf(s);
+            return;
+        }
+    }
+    s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
+}
+
+static int
+stream_send(struct vconn *vconn, struct buffer *buffer)
+{
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    ssize_t retval;
+
+    if (s->txbuf) {
+        return EAGAIN;
+    }
+
+    retval = write(s->fd, buffer->data, buffer->size);
+    if (retval == buffer->size) {
+        buffer_delete(buffer);
+        return 0;
+    } else if (retval >= 0 || errno == EAGAIN) {
+        s->txbuf = buffer;
+        if (retval > 0) {
+            buffer_pull(buffer, retval);
+        }
+        s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
+        return 0;
+    } else {
+        return errno;
+    }
+}
+
+static void
+stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct stream_vconn *s = stream_vconn_cast(vconn);
+    switch (wait) {
+    case WAIT_CONNECT:
+        poll_fd_wait(s->fd, POLLOUT);
+        break;
+
+    case WAIT_SEND:
+        if (!s->txbuf) {
+            poll_fd_wait(s->fd, POLLOUT);
+        } else {
+            /* Nothing to do: need to drain txbuf first. */
+        }
+        break;
+
+    case WAIT_RECV:
+        poll_fd_wait(s->fd, POLLIN);
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+}
+
+static struct vconn_class stream_vconn_class = {
+    .name = "stream",
+    .close = stream_close,
+    .connect = stream_connect,
+    .recv = stream_recv,
+    .send = stream_send,
+    .wait = stream_wait,
+};
+\f
+/* Passive stream socket vconn. */
+
+struct pstream_vconn
+{
+    struct vconn vconn;
+    int fd;
+    int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
+                     struct vconn **);
+};
+
+static struct vconn_class pstream_vconn_class;
+
+static struct pstream_vconn *
+pstream_vconn_cast(struct vconn *vconn)
+{
+    assert(vconn->class == &pstream_vconn_class);
+    return CONTAINER_OF(vconn, struct pstream_vconn, vconn);
+}
+
+int
+new_pstream_vconn(const char *name, int fd,
+                  int (*accept_cb)(int fd, const struct sockaddr *,
+                                   size_t sa_len, struct vconn **),
+                  struct vconn **vconnp)
+{
+    struct pstream_vconn *ps;
+    int retval;
+
+    retval = set_nonblocking(fd);
+    if (retval) {
+        close(fd);
+        return retval;
+    }
+
+    if (listen(fd, 10) < 0) {
+        int error = errno;
+        VLOG_ERR("%s: listen: %s", name, strerror(error));
+        close(fd);
+        return error;
+    }
+
+    ps = xmalloc(sizeof *ps);
+    ps->vconn.class = &pstream_vconn_class;
+    ps->vconn.connect_status = 0;
+    ps->fd = fd;
+    ps->accept_cb = accept_cb;
+    *vconnp = &ps->vconn;
+    return 0;
+}
+
+static void
+pstream_close(struct vconn *vconn)
+{
+    struct pstream_vconn *ps = pstream_vconn_cast(vconn);
+    close(ps->fd);
+    free(ps);
+}
+
+static int
+pstream_accept(struct vconn *vconn, struct vconn **new_vconnp)
+{
+    struct pstream_vconn *ps = pstream_vconn_cast(vconn);
+    struct sockaddr_storage ss;
+    socklen_t ss_len = sizeof ss;
+    int new_fd;
+    int retval;
+
+    new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
+    if (new_fd < 0) {
+        int retval = errno;
+        if (retval != EAGAIN) {
+            VLOG_DBG("accept: %s", strerror(retval));
+        }
+        return retval;
+    }
+
+    retval = set_nonblocking(new_fd);
+    if (retval) {
+        close(new_fd);
+        return retval;
+    }
+
+    return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
+                         new_vconnp);
+}
+
+static void
+pstream_wait(struct vconn *vconn, enum vconn_wait_type wait)
+{
+    struct pstream_vconn *ps = pstream_vconn_cast(vconn);
+    assert(wait == WAIT_ACCEPT);
+    poll_fd_wait(ps->fd, POLLIN);
+}
+
+static struct vconn_class pstream_vconn_class = {
+    .name = "pstream",
+    .close = pstream_close,
+    .accept = pstream_accept,
+    .wait = pstream_wait
+};