Catalli's threaded switch
[sliver-openvswitch.git] / lib / vconn-stream.c
index 034d1d5..ba1cec6 100644 (file)
@@ -32,9 +32,9 @@
 #include "util.h"
 #include "vconn-provider.h"
 #include "vconn.h"
-
 #include "vlog.h"
-#define THIS_MODULE VLM_vconn_stream
+
+VLOG_DEFINE_THIS_MODULE(vconn_stream)
 
 /* Active stream socket vconn. */
 
@@ -44,6 +44,7 @@ struct vconn_stream
     struct stream *stream;
     struct ofpbuf *rxbuf;
     struct ofpbuf *txbuf;
+    int n_packets;
 };
 
 static struct vconn_class stream_vconn_class;
@@ -63,6 +64,7 @@ vconn_stream_new(struct stream *stream, int connect_status)
     s->stream = stream;
     s->txbuf = NULL;
     s->rxbuf = NULL;
+    s->n_packets = 0;
     s->vconn.remote_ip = stream_get_remote_ip(stream);
     s->vconn.remote_port = stream_get_remote_port(stream);
     s->vconn.local_ip = stream_get_local_ip(stream);
@@ -83,13 +85,16 @@ vconn_stream_open(const char *name, char *suffix OVS_UNUSED,
 
     error = stream_open_with_default_ports(name, OFP_TCP_PORT, OFP_SSL_PORT,
                                            &stream);
-
-    if (error && error != EAGAIN) {
-        return error;
+    if (!error) {
+        error = stream_connect(stream);
+        if (!error || error == EAGAIN) {
+            *vconnp = vconn_stream_new(stream, error);
+            return 0;
+        }
     }
 
-    *vconnp = vconn_stream_new(stream, error);
-    return 0;
+    stream_close(stream);
+    return error;
 }
 
 static struct vconn_stream *
@@ -102,6 +107,12 @@ static void
 vconn_stream_close(struct vconn *vconn)
 {
     struct vconn_stream *s = vconn_stream_cast(vconn);
+
+    if ((vconn->error == EPROTO || s->n_packets < 1) && s->rxbuf) {
+        stream_report_content(s->rxbuf->data, s->rxbuf->size, STREAM_OPENFLOW,
+                              THIS_MODULE, vconn_get_name(vconn));
+    }
+
     stream_close(s->stream);
     vconn_stream_clear_txbuf(s);
     ofpbuf_delete(s->rxbuf);
@@ -116,63 +127,67 @@ vconn_stream_connect(struct vconn *vconn)
 }
 
 static int
-vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
+vconn_stream_recv__(struct vconn_stream *s, int rx_len)
 {
-    struct vconn_stream *s = vconn_stream_cast(vconn);
-    struct ofpbuf *rx;
-    size_t want_bytes;
-    ssize_t retval;
-
-    if (s->rxbuf == NULL) {
-        s->rxbuf = ofpbuf_new(1564);
-    }
-    rx = s->rxbuf;
+    struct ofpbuf *rx = s->rxbuf;
+    int want_bytes, retval;
 
-again:
-    if (sizeof(struct ofp_header) > rx->size) {
-        want_bytes = sizeof(struct ofp_header) - rx->size;
-    } else {
-        struct ofp_header *oh = rx->data;
-        size_t length = ntohs(oh->length);
-        if (length < sizeof(struct ofp_header)) {
-            VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
-                        length);
-            return EPROTO;
-        }
-        want_bytes = length - rx->size;
-        if (!want_bytes) {
-            *bufferp = rx;
-            s->rxbuf = NULL;
-            return 0;
-        }
-    }
+    want_bytes = rx_len - rx->size;
     ofpbuf_prealloc_tailroom(rx, want_bytes);
-
     retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes);
     if (retval > 0) {
         rx->size += retval;
-        if (retval == want_bytes) {
-            if (rx->size > sizeof(struct ofp_header)) {
-                *bufferp = rx;
-                s->rxbuf = NULL;
-                return 0;
-            } else {
-                goto again;
-            }
-        }
-        return EAGAIN;
+        return retval == want_bytes ? 0 : EAGAIN;
     } else if (retval == 0) {
         if (rx->size) {
             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
             return EPROTO;
-        } else {
-            return EOF;
         }
+        return EOF;
     } else {
         return -retval;
     }
 }
 
+static int
+vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
+{
+    struct vconn_stream *s = vconn_stream_cast(vconn);
+    const struct ofp_header *oh;
+    int rx_len;
+
+    /* Allocate new receive buffer if we don't have one. */
+    if (s->rxbuf == NULL) {
+        s->rxbuf = ofpbuf_new(1564);
+    }
+
+    /* Read ofp_header. */
+    if (s->rxbuf->size < sizeof(struct ofp_header)) {
+        int retval = vconn_stream_recv__(s, sizeof(struct ofp_header));
+        if (retval) {
+            return retval;
+        }
+    }
+
+    /* Read payload. */
+    oh = s->rxbuf->data;
+    rx_len = ntohs(oh->length);
+    if (rx_len < sizeof(struct ofp_header)) {
+        VLOG_ERR_RL(&rl, "received too-short ofp_header (%d bytes)", rx_len);
+        return EPROTO;
+    } else if (s->rxbuf->size < rx_len) {
+        int retval = vconn_stream_recv__(s, rx_len);
+        if (retval) {
+            return retval;
+        }
+    }
+
+    s->n_packets++;
+    *bufferp = s->rxbuf;
+    s->rxbuf = NULL;
+    return 0;
+}
+
 static void
 vconn_stream_clear_txbuf(struct vconn_stream *s)
 {