X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=lib%2Fvconn-stream.c;h=9347b5ea09e54acc4052e9cbac25160a09fedb46;hb=28c5588e8e1a8d091c5d2275232c35f2968a97fa;hp=f19f3ebfa04043add23c0345011e1a9e7d33de0e;hpb=60cb3eb8b296e2aebbda6ccc161e99ad2bc7ca4a;p=sliver-openvswitch.git diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c index f19f3ebfa..9347b5ea0 100644 --- a/lib/vconn-stream.c +++ b/lib/vconn-stream.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,6 @@ */ #include -#include "vconn-stream.h" -#include #include #include #include @@ -24,360 +22,377 @@ #include #include #include "fatal-signal.h" -#include "leak-checker.h" #include "ofpbuf.h" #include "openflow/openflow.h" #include "poll-loop.h" #include "socket-util.h" +#include "stream.h" #include "util.h" #include "vconn-provider.h" #include "vconn.h" - #include "vlog.h" -#define THIS_MODULE VLM_vconn_stream + +VLOG_DEFINE_THIS_MODULE(vconn_stream); /* Active stream socket vconn. */ -struct stream_vconn +struct vconn_stream { struct vconn vconn; - int fd; + struct stream *stream; struct ofpbuf *rxbuf; struct ofpbuf *txbuf; - char *unlink_path; + int n_packets; }; -static struct vconn_class stream_vconn_class; +static const struct vconn_class stream_vconn_class; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); -static void stream_clear_txbuf(struct stream_vconn *); -static void maybe_unlink_and_free(char *path); +static void vconn_stream_clear_txbuf(struct vconn_stream *); -/* Creates a new vconn named 'name' that will send and receive data on 'fd' and - * stores a pointer to the vconn in '*vconnp'. Initial connection status - * 'connect_status' is interpreted as described for vconn_init(). - * - * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to - * fatal_signal_unlink_file_now() and then freed with free(). - * - * Returns 0 if successful, otherwise a positive errno value. (The current - * implementation never fails.) */ -int -new_stream_vconn(const char *name, int fd, int connect_status, - char *unlink_path, struct vconn **vconnp) +static struct vconn * +vconn_stream_new(struct stream *stream, int connect_status, + uint32_t allowed_versions) { - struct stream_vconn *s; + struct vconn_stream *s; s = xmalloc(sizeof *s); - vconn_init(&s->vconn, &stream_vconn_class, connect_status, name); - s->fd = fd; + vconn_init(&s->vconn, &stream_vconn_class, connect_status, + stream_get_name(stream), allowed_versions); + s->stream = stream; s->txbuf = NULL; s->rxbuf = NULL; - s->unlink_path = unlink_path; - *vconnp = &s->vconn; - return 0; + s->n_packets = 0; + return &s->vconn; +} + +/* Creates a new vconn that will send and receive data on a stream named 'name' + * and stores a pointer to the vconn in '*vconnp'. + * + * Returns 0 if successful, otherwise a positive errno value. */ +static int +vconn_stream_open(const char *name, uint32_t allowed_versions, + char *suffix OVS_UNUSED, struct vconn **vconnp, uint8_t dscp) +{ + struct stream *stream; + int error; + + error = stream_open_with_default_port(name, OFP_OLD_PORT, &stream, dscp); + if (!error) { + error = stream_connect(stream); + if (!error || error == EAGAIN) { + *vconnp = vconn_stream_new(stream, error, allowed_versions); + return 0; + } + } + + stream_close(stream); + return error; } -static struct stream_vconn * -stream_vconn_cast(struct vconn *vconn) +static struct vconn_stream * +vconn_stream_cast(struct vconn *vconn) { - vconn_assert_class(vconn, &stream_vconn_class); - return CONTAINER_OF(vconn, struct stream_vconn, vconn); + return CONTAINER_OF(vconn, struct vconn_stream, vconn); } static void -stream_close(struct vconn *vconn) +vconn_stream_close(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - stream_clear_txbuf(s); + struct vconn_stream *s = vconn_stream_cast(vconn); + + if ((vconn->error == EPROTO || s->n_packets < 1) && s->rxbuf) { + stream_report_content(ofpbuf_data(s->rxbuf), ofpbuf_size(s->rxbuf), STREAM_OPENFLOW, + THIS_MODULE, vconn_get_name(vconn)); + } + + stream_close(s->stream); + vconn_stream_clear_txbuf(s); ofpbuf_delete(s->rxbuf); - close(s->fd); - maybe_unlink_and_free(s->unlink_path); free(s); } static int -stream_connect(struct vconn *vconn) +vconn_stream_connect(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - return check_connection_completion(s->fd); + struct vconn_stream *s = vconn_stream_cast(vconn); + return stream_connect(s->stream); } static int -stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) +vconn_stream_recv__(struct vconn_stream *s, int rx_len) { - struct stream_vconn *s = stream_vconn_cast(vconn); - struct ofpbuf *rx; - size_t want_bytes; - ssize_t retval; + struct ofpbuf *rx = s->rxbuf; + int want_bytes, retval; + + want_bytes = rx_len - ofpbuf_size(rx); + ofpbuf_prealloc_tailroom(rx, want_bytes); + retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes); + if (retval > 0) { + ofpbuf_set_size(rx, ofpbuf_size(rx) + retval); + return retval == want_bytes ? 0 : EAGAIN; + } else if (retval == 0) { + if (ofpbuf_size(rx)) { + VLOG_ERR_RL(&rl, "connection dropped mid-packet"); + return EPROTO; + } + return EOF; + } else { + return -retval; + } +} +static int +vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) +{ + struct vconn_stream *s = vconn_stream_cast(vconn); + const struct ofp_header *oh; + int rx_len; + + /* Allocate new receive buffer if we don't have one. */ if (s->rxbuf == NULL) { s->rxbuf = ofpbuf_new(1564); } - rx = s->rxbuf; -again: - if (sizeof(struct ofp_header) > rx->size) { - want_bytes = sizeof(struct ofp_header) - rx->size; - } else { - struct ofp_header *oh = rx->data; - size_t length = ntohs(oh->length); - if (length < sizeof(struct ofp_header)) { - VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)", - length); - return EPROTO; - } - want_bytes = length - rx->size; - if (!want_bytes) { - *bufferp = rx; - s->rxbuf = NULL; - return 0; + /* Read ofp_header. */ + if (ofpbuf_size(s->rxbuf) < sizeof(struct ofp_header)) { + int retval = vconn_stream_recv__(s, sizeof(struct ofp_header)); + if (retval) { + return retval; } } - ofpbuf_prealloc_tailroom(rx, want_bytes); - retval = read(s->fd, ofpbuf_tail(rx), want_bytes); - if (retval > 0) { - rx->size += retval; - if (retval == want_bytes) { - if (rx->size > sizeof(struct ofp_header)) { - *bufferp = rx; - s->rxbuf = NULL; - return 0; - } else { - goto again; - } - } - return EAGAIN; - } else if (retval == 0) { - if (rx->size) { - VLOG_ERR_RL(&rl, "connection dropped mid-packet"); - return EPROTO; - } else { - return EOF; + /* Read payload. */ + oh = ofpbuf_data(s->rxbuf); + rx_len = ntohs(oh->length); + if (rx_len < sizeof(struct ofp_header)) { + VLOG_ERR_RL(&rl, "received too-short ofp_header (%d bytes)", rx_len); + return EPROTO; + } else if (ofpbuf_size(s->rxbuf) < rx_len) { + int retval = vconn_stream_recv__(s, rx_len); + if (retval) { + return retval; } - } else { - return errno; } + + s->n_packets++; + *bufferp = s->rxbuf; + s->rxbuf = NULL; + return 0; } static void -stream_clear_txbuf(struct stream_vconn *s) +vconn_stream_clear_txbuf(struct vconn_stream *s) { ofpbuf_delete(s->txbuf); s->txbuf = NULL; } static int -stream_send(struct vconn *vconn, struct ofpbuf *buffer) +vconn_stream_send(struct vconn *vconn, struct ofpbuf *buffer) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); ssize_t retval; if (s->txbuf) { return EAGAIN; } - retval = write(s->fd, buffer->data, buffer->size); - if (retval == buffer->size) { + retval = stream_send(s->stream, ofpbuf_data(buffer), ofpbuf_size(buffer)); + if (retval == ofpbuf_size(buffer)) { ofpbuf_delete(buffer); return 0; - } else if (retval >= 0 || errno == EAGAIN) { - leak_checker_claim(buffer); + } else if (retval >= 0 || retval == -EAGAIN) { s->txbuf = buffer; if (retval > 0) { ofpbuf_pull(buffer, retval); } return 0; } else { - return errno; + return -retval; } } static void -stream_run(struct vconn *vconn) +vconn_stream_run(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - ssize_t n; + struct vconn_stream *s = vconn_stream_cast(vconn); + ssize_t retval; + stream_run(s->stream); if (!s->txbuf) { return; } - n = write(s->fd, s->txbuf->data, s->txbuf->size); - if (n < 0) { - if (errno != EAGAIN) { - VLOG_ERR_RL(&rl, "send: %s", strerror(errno)); - stream_clear_txbuf(s); + retval = stream_send(s->stream, ofpbuf_data(s->txbuf), ofpbuf_size(s->txbuf)); + if (retval < 0) { + if (retval != -EAGAIN) { + VLOG_ERR_RL(&rl, "send: %s", ovs_strerror(-retval)); + vconn_stream_clear_txbuf(s); return; } - } else if (n > 0) { - ofpbuf_pull(s->txbuf, n); - if (!s->txbuf->size) { - stream_clear_txbuf(s); + } else if (retval > 0) { + ofpbuf_pull(s->txbuf, retval); + if (!ofpbuf_size(s->txbuf)) { + vconn_stream_clear_txbuf(s); return; } } } static void -stream_run_wait(struct vconn *vconn) +vconn_stream_run_wait(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); + stream_run_wait(s->stream); if (s->txbuf) { - poll_fd_wait(s->fd, POLLOUT); + stream_send_wait(s->stream); } } static void -stream_wait(struct vconn *vconn, enum vconn_wait_type wait) +vconn_stream_wait(struct vconn *vconn, enum vconn_wait_type wait) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); switch (wait) { case WAIT_CONNECT: - poll_fd_wait(s->fd, POLLOUT); + stream_connect_wait(s->stream); break; case WAIT_SEND: if (!s->txbuf) { - poll_fd_wait(s->fd, POLLOUT); + stream_send_wait(s->stream); } else { - /* Nothing to do: need to drain txbuf first. stream_run_wait() - * will arrange to wake up when there room to send data, so there's - * no point in calling poll_fd_wait() redundantly here. */ + /* Nothing to do: need to drain txbuf first. + * vconn_stream_run_wait() will arrange to wake up when there room + * to send data, so there's no point in calling poll_fd_wait() + * redundantly here. */ } break; case WAIT_RECV: - poll_fd_wait(s->fd, POLLIN); + stream_recv_wait(s->stream); break; default: - NOT_REACHED(); + OVS_NOT_REACHED(); } } - -static struct vconn_class stream_vconn_class = { - "stream", /* name */ - NULL, /* open */ - stream_close, /* close */ - stream_connect, /* connect */ - stream_recv, /* recv */ - stream_send, /* send */ - stream_run, /* run */ - stream_run_wait, /* run_wait */ - stream_wait, /* wait */ -}; /* Passive stream socket vconn. */ -struct pstream_pvconn +struct pvconn_pstream { struct pvconn pvconn; - int fd; - int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, - struct vconn **); - char *unlink_path; + struct pstream *pstream; }; -static struct pvconn_class pstream_pvconn_class; +static const struct pvconn_class pstream_pvconn_class; -static struct pstream_pvconn * -pstream_pvconn_cast(struct pvconn *pvconn) +static struct pvconn_pstream * +pvconn_pstream_cast(struct pvconn *pvconn) { - pvconn_assert_class(pvconn, &pstream_pvconn_class); - return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn); + return CONTAINER_OF(pvconn, struct pvconn_pstream, pvconn); } -/* Creates a new pvconn named 'name' that will accept new socket connections on - * 'fd' and stores a pointer to the vconn in '*pvconnp'. - * - * When a connection has been accepted, 'accept_cb' will be called with the new - * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. - * accept_cb must return 0 if the connection is successful, in which case it - * must initialize '*vconnp' to the new vconn, or a positive errno value on - * error. In either case accept_cb takes ownership of the 'fd' passed in. - * - * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to - * fatal_signal_unlink_file_now() and freed with free(). +/* Creates a new pvconn named 'name' that will accept new connections using + * pstream_accept() and stores a pointer to the pvconn in '*pvconnp'. * * Returns 0 if successful, otherwise a positive errno value. (The current * implementation never fails.) */ -int -new_pstream_pvconn(const char *name, int fd, - int (*accept_cb)(int fd, const struct sockaddr *sa, - size_t sa_len, struct vconn **vconnp), - char *unlink_path, struct pvconn **pvconnp) +static int +pvconn_pstream_listen(const char *name, uint32_t allowed_versions, + char *suffix OVS_UNUSED, struct pvconn **pvconnp, + uint8_t dscp) { - struct pstream_pvconn *ps = xmalloc(sizeof *ps); - pvconn_init(&ps->pvconn, &pstream_pvconn_class, name); - ps->fd = fd; - ps->accept_cb = accept_cb; - ps->unlink_path = unlink_path; + struct pvconn_pstream *ps; + struct pstream *pstream; + int error; + + error = pstream_open_with_default_port(name, OFP_OLD_PORT, + &pstream, dscp); + if (error) { + return error; + } + + ps = xmalloc(sizeof *ps); + pvconn_init(&ps->pvconn, &pstream_pvconn_class, name, allowed_versions); + ps->pstream = pstream; *pvconnp = &ps->pvconn; return 0; } static void -pstream_close(struct pvconn *pvconn) +pvconn_pstream_close(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - close(ps->fd); - maybe_unlink_and_free(ps->unlink_path); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_close(ps->pstream); free(ps); } static int -pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) +pvconn_pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - struct sockaddr_storage ss; - socklen_t ss_len = sizeof ss; - int new_fd; - int retval; - - new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); - if (new_fd < 0) { - int retval = errno; - if (retval != EAGAIN) { - VLOG_DBG_RL(&rl, "accept: %s", strerror(retval)); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + struct stream *stream; + int error; + + error = pstream_accept(ps->pstream, &stream); + if (error) { + if (error != EAGAIN) { + VLOG_DBG_RL(&rl, "%s: accept: %s", + pstream_get_name(ps->pstream), ovs_strerror(error)); } - return retval; - } - - retval = set_nonblocking(new_fd); - if (retval) { - close(new_fd); - return retval; + return error; } - return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, - new_vconnp); + *new_vconnp = vconn_stream_new(stream, 0, pvconn->allowed_versions); + return 0; } static void -pstream_wait(struct pvconn *pvconn) +pvconn_pstream_wait(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - poll_fd_wait(ps->fd, POLLIN); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_wait(ps->pstream); } - -static struct pvconn_class pstream_pvconn_class = { - "pstream", - NULL, - pstream_close, - pstream_accept, - pstream_wait -}; -/* Helper functions. */ -static void -maybe_unlink_and_free(char *path) -{ - if (path) { - fatal_signal_unlink_file_now(path); - free(path); +/* Stream-based vconns and pvconns. */ + +#define STREAM_INIT(NAME) \ + { \ + NAME, \ + vconn_stream_open, \ + vconn_stream_close, \ + vconn_stream_connect, \ + vconn_stream_recv, \ + vconn_stream_send, \ + vconn_stream_run, \ + vconn_stream_run_wait, \ + vconn_stream_wait, \ } -} + +#define PSTREAM_INIT(NAME) \ + { \ + NAME, \ + pvconn_pstream_listen, \ + pvconn_pstream_close, \ + pvconn_pstream_accept, \ + pvconn_pstream_wait \ + } + +static const struct vconn_class stream_vconn_class = STREAM_INIT("stream"); +static const struct pvconn_class pstream_pvconn_class = PSTREAM_INIT("pstream"); + +const struct vconn_class tcp_vconn_class = STREAM_INIT("tcp"); +const struct pvconn_class ptcp_pvconn_class = PSTREAM_INIT("ptcp"); + +const struct vconn_class unix_vconn_class = STREAM_INIT("unix"); +const struct pvconn_class punix_pvconn_class = PSTREAM_INIT("punix"); + +#ifdef HAVE_OPENSSL +const struct vconn_class ssl_vconn_class = STREAM_INIT("ssl"); +const struct pvconn_class pssl_pvconn_class = PSTREAM_INIT("pssl"); +#endif