#include <string.h>
#include <sys/types.h>
#include <unistd.h>
-#include "buffer.h"
-#include "util.h"
-#include "openflow.h"
+#include "ofpbuf.h"
+#include "openflow/openflow.h"
#include "poll-loop.h"
#include "socket-util.h"
+#include "util.h"
+#include "vconn-provider.h"
#include "vconn.h"
#include "vlog.h"
{
struct vconn vconn;
int fd;
- struct buffer *rxbuf;
- struct buffer *txbuf;
+ struct ofpbuf *rxbuf;
+ struct ofpbuf *txbuf;
struct poll_waiter *tx_waiter;
};
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
+static void stream_clear_txbuf(struct stream_vconn *);
+
int
new_stream_vconn(const char *name, int fd, int connect_status,
uint32_t ip, struct vconn **vconnp)
struct stream_vconn *s;
s = xmalloc(sizeof *s);
- s->vconn.class = &stream_vconn_class;
- s->vconn.connect_status = connect_status;
- s->vconn.ip = ip;
+ vconn_init(&s->vconn, &stream_vconn_class, connect_status, ip, name);
s->fd = fd;
s->txbuf = NULL;
s->tx_waiter = NULL;
static struct stream_vconn *
stream_vconn_cast(struct vconn *vconn)
{
- assert(vconn->class == &stream_vconn_class);
+ vconn_assert_class(vconn, &stream_vconn_class);
return CONTAINER_OF(vconn, struct stream_vconn, vconn);
}
{
struct stream_vconn *s = stream_vconn_cast(vconn);
poll_cancel(s->tx_waiter);
+ stream_clear_txbuf(s);
+ ofpbuf_delete(s->rxbuf);
close(s->fd);
free(s);
}
}
static int
-stream_recv(struct vconn *vconn, struct buffer **bufferp)
+stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
{
struct stream_vconn *s = stream_vconn_cast(vconn);
- struct buffer *rx;
+ struct ofpbuf *rx;
size_t want_bytes;
ssize_t retval;
if (s->rxbuf == NULL) {
- s->rxbuf = buffer_new(1564);
+ s->rxbuf = ofpbuf_new(1564);
}
rx = s->rxbuf;
return 0;
}
}
- buffer_prealloc_tailroom(rx, want_bytes);
+ ofpbuf_prealloc_tailroom(rx, want_bytes);
- retval = read(s->fd, buffer_tail(rx), want_bytes);
+ retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
if (retval > 0) {
rx->size += retval;
if (retval == want_bytes) {
return EOF;
}
} else {
- return retval ? errno : EAGAIN;
+ return errno;
}
}
static void
stream_clear_txbuf(struct stream_vconn *s)
{
- buffer_delete(s->txbuf);
+ ofpbuf_delete(s->txbuf);
s->txbuf = NULL;
s->tx_waiter = NULL;
}
return;
}
} else if (n > 0) {
- buffer_pull(s->txbuf, n);
+ ofpbuf_pull(s->txbuf, n);
if (!s->txbuf->size) {
stream_clear_txbuf(s);
return;
}
static int
-stream_send(struct vconn *vconn, struct buffer *buffer)
+stream_send(struct vconn *vconn, struct ofpbuf *buffer)
{
struct stream_vconn *s = stream_vconn_cast(vconn);
ssize_t retval;
retval = write(s->fd, buffer->data, buffer->size);
if (retval == buffer->size) {
- buffer_delete(buffer);
+ ofpbuf_delete(buffer);
return 0;
} else if (retval >= 0 || errno == EAGAIN) {
s->txbuf = buffer;
if (retval > 0) {
- buffer_pull(buffer, retval);
+ ofpbuf_pull(buffer, retval);
}
s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
return 0;
}
static struct vconn_class stream_vconn_class = {
- .name = "stream",
- .close = stream_close,
- .connect = stream_connect,
- .recv = stream_recv,
- .send = stream_send,
- .wait = stream_wait,
+ "stream", /* name */
+ NULL, /* open */
+ stream_close, /* close */
+ stream_connect, /* connect */
+ stream_recv, /* recv */
+ stream_send, /* send */
+ stream_wait, /* wait */
};
\f
/* Passive stream socket vconn. */
-struct pstream_vconn
+struct pstream_pvconn
{
- struct vconn vconn;
+ struct pvconn pvconn;
int fd;
int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
struct vconn **);
};
-static struct vconn_class pstream_vconn_class;
+static struct pvconn_class pstream_pvconn_class;
-static struct pstream_vconn *
-pstream_vconn_cast(struct vconn *vconn)
+static struct pstream_pvconn *
+pstream_pvconn_cast(struct pvconn *pvconn)
{
- assert(vconn->class == &pstream_vconn_class);
- return CONTAINER_OF(vconn, struct pstream_vconn, vconn);
+ pvconn_assert_class(pvconn, &pstream_pvconn_class);
+ return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
}
int
-new_pstream_vconn(const char *name, int fd,
+new_pstream_pvconn(const char *name, int fd,
int (*accept_cb)(int fd, const struct sockaddr *,
size_t sa_len, struct vconn **),
- struct vconn **vconnp)
+ struct pvconn **pvconnp)
{
- struct pstream_vconn *ps;
+ struct pstream_pvconn *ps;
int retval;
retval = set_nonblocking(fd);
}
ps = xmalloc(sizeof *ps);
- ps->vconn.class = &pstream_vconn_class;
- ps->vconn.connect_status = 0;
+ pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
ps->fd = fd;
ps->accept_cb = accept_cb;
- *vconnp = &ps->vconn;
+ *pvconnp = &ps->pvconn;
return 0;
}
static void
-pstream_close(struct vconn *vconn)
+pstream_close(struct pvconn *pvconn)
{
- struct pstream_vconn *ps = pstream_vconn_cast(vconn);
+ struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
close(ps->fd);
free(ps);
}
static int
-pstream_accept(struct vconn *vconn, struct vconn **new_vconnp)
+pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
{
- struct pstream_vconn *ps = pstream_vconn_cast(vconn);
+ struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
struct sockaddr_storage ss;
socklen_t ss_len = sizeof ss;
int new_fd;
}
static void
-pstream_wait(struct vconn *vconn, enum vconn_wait_type wait)
+pstream_wait(struct pvconn *pvconn)
{
- struct pstream_vconn *ps = pstream_vconn_cast(vconn);
- assert(wait == WAIT_ACCEPT);
+ struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
poll_fd_wait(ps->fd, POLLIN);
}
-static struct vconn_class pstream_vconn_class = {
- .name = "pstream",
- .close = pstream_close,
- .accept = pstream_accept,
- .wait = pstream_wait
+static struct pvconn_class pstream_pvconn_class = {
+ "pstream",
+ NULL,
+ pstream_close,
+ pstream_accept,
+ pstream_wait
};