X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=lib%2Fvconn-stream.c;h=99eb21a89d9598f36d63e4345993e8789d5ff7f6;hb=02dd3123a0e312f1d33403e744af52dd6096f12d;hp=468c112ccaa701103a6dd079bb3e47a2fce3330f;hpb=064af42167bf4fc9aaea2702d80ce08074b889c0;p=sliver-openvswitch.git diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c index 468c112cc..99eb21a89 100644 --- a/lib/vconn-stream.c +++ b/lib/vconn-stream.c @@ -1,21 +1,20 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010 Nicira Networks. * - * Permission to use, copy, modify, and/or distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include -#include "vconn-stream.h" #include #include #include @@ -23,11 +22,13 @@ #include #include #include +#include "fatal-signal.h" #include "leak-checker.h" #include "ofpbuf.h" #include "openflow/openflow.h" #include "poll-loop.h" #include "socket-util.h" +#include "stream.h" #include "util.h" #include "vconn-provider.h" #include "vconn.h" @@ -37,67 +38,92 @@ /* Active stream socket vconn. */ -struct stream_vconn +struct vconn_stream { struct vconn vconn; - int fd; + struct stream *stream; struct ofpbuf *rxbuf; struct ofpbuf *txbuf; - struct poll_waiter *tx_waiter; }; static struct vconn_class stream_vconn_class; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); -static void stream_clear_txbuf(struct stream_vconn *); +static void vconn_stream_clear_txbuf(struct vconn_stream *); +static int count_fields(const char *); -int -new_stream_vconn(const char *name, int fd, int connect_status, - uint32_t ip, bool reconnectable, struct vconn **vconnp) +static struct vconn * +vconn_stream_new(struct stream *stream, int connect_status) { - struct stream_vconn *s; + struct vconn_stream *s; s = xmalloc(sizeof *s); - vconn_init(&s->vconn, &stream_vconn_class, connect_status, ip, name, - reconnectable); - s->fd = fd; + vconn_init(&s->vconn, &stream_vconn_class, connect_status, + stream_get_name(stream)); + s->stream = stream; s->txbuf = NULL; - s->tx_waiter = NULL; s->rxbuf = NULL; - *vconnp = &s->vconn; + return &s->vconn; +} + +/* Creates a new vconn that will send and receive data on a stream named 'name' + * and stores a pointer to the vconn in '*vconnp'. + * + * Returns 0 if successful, otherwise a positive errno value. */ +static int +vconn_stream_open(const char *name_, char *suffix OVS_UNUSED, + struct vconn **vconnp) +{ + struct stream *stream; + char *name; + int error; + + if (!strncmp(name_, "tcp:", 4) && count_fields(name_) < 3) { + name = xasprintf("%s:%d", name_, OFP_TCP_PORT); + } else if (!strncmp(name_, "ssl:", 4) && count_fields(name_) < 3) { + name = xasprintf("%s:%d", name_, OFP_SSL_PORT); + } else { + name = xstrdup(name_); + } + error = stream_open(name, &stream); + free(name); + + if (error && error != EAGAIN) { + return error; + } + + *vconnp = vconn_stream_new(stream, error); return 0; } -static struct stream_vconn * -stream_vconn_cast(struct vconn *vconn) +static struct vconn_stream * +vconn_stream_cast(struct vconn *vconn) { - vconn_assert_class(vconn, &stream_vconn_class); - return CONTAINER_OF(vconn, struct stream_vconn, vconn); + return CONTAINER_OF(vconn, struct vconn_stream, vconn); } static void -stream_close(struct vconn *vconn) +vconn_stream_close(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - poll_cancel(s->tx_waiter); - stream_clear_txbuf(s); + struct vconn_stream *s = vconn_stream_cast(vconn); + stream_close(s->stream); + vconn_stream_clear_txbuf(s); ofpbuf_delete(s->rxbuf); - close(s->fd); free(s); } static int -stream_connect(struct vconn *vconn) +vconn_stream_connect(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - return check_connection_completion(s->fd); + struct vconn_stream *s = vconn_stream_cast(vconn); + return stream_connect(s->stream); } static int -stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) +vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); struct ofpbuf *rx; size_t want_bytes; ssize_t retval; @@ -127,7 +153,7 @@ again: } ofpbuf_prealloc_tailroom(rx, want_bytes); - retval = read(s->fd, ofpbuf_tail(rx), want_bytes); + retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes); if (retval > 0) { rx->size += retval; if (retval == want_bytes) { @@ -148,199 +174,244 @@ again: return EOF; } } else { - return errno; + return -retval; } } static void -stream_clear_txbuf(struct stream_vconn *s) +vconn_stream_clear_txbuf(struct vconn_stream *s) { ofpbuf_delete(s->txbuf); s->txbuf = NULL; - s->tx_waiter = NULL; -} - -static void -stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_) -{ - struct vconn *vconn = vconn_; - struct stream_vconn *s = stream_vconn_cast(vconn); - ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size); - if (n < 0) { - if (errno != EAGAIN) { - VLOG_ERR_RL(&rl, "send: %s", strerror(errno)); - stream_clear_txbuf(s); - return; - } - } else if (n > 0) { - ofpbuf_pull(s->txbuf, n); - if (!s->txbuf->size) { - stream_clear_txbuf(s); - return; - } - } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); } static int -stream_send(struct vconn *vconn, struct ofpbuf *buffer) +vconn_stream_send(struct vconn *vconn, struct ofpbuf *buffer) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); ssize_t retval; if (s->txbuf) { return EAGAIN; } - retval = write(s->fd, buffer->data, buffer->size); + retval = stream_send(s->stream, buffer->data, buffer->size); if (retval == buffer->size) { ofpbuf_delete(buffer); return 0; - } else if (retval >= 0 || errno == EAGAIN) { + } else if (retval >= 0 || retval == -EAGAIN) { leak_checker_claim(buffer); s->txbuf = buffer; if (retval > 0) { ofpbuf_pull(buffer, retval); } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); return 0; } else { - return errno; + return -retval; + } +} + +static void +vconn_stream_run(struct vconn *vconn) +{ + struct vconn_stream *s = vconn_stream_cast(vconn); + ssize_t retval; + + if (!s->txbuf) { + return; + } + + retval = stream_send(s->stream, s->txbuf->data, s->txbuf->size); + if (retval < 0) { + if (retval != -EAGAIN) { + VLOG_ERR_RL(&rl, "send: %s", strerror(-retval)); + vconn_stream_clear_txbuf(s); + return; + } + } else if (retval > 0) { + ofpbuf_pull(s->txbuf, retval); + if (!s->txbuf->size) { + vconn_stream_clear_txbuf(s); + return; + } + } +} + +static void +vconn_stream_run_wait(struct vconn *vconn) +{ + struct vconn_stream *s = vconn_stream_cast(vconn); + + if (s->txbuf) { + stream_send_wait(s->stream); } } static void -stream_wait(struct vconn *vconn, enum vconn_wait_type wait) +vconn_stream_wait(struct vconn *vconn, enum vconn_wait_type wait) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); switch (wait) { case WAIT_CONNECT: - poll_fd_wait(s->fd, POLLOUT); + stream_connect_wait(s->stream); break; case WAIT_SEND: if (!s->txbuf) { - poll_fd_wait(s->fd, POLLOUT); + stream_send_wait(s->stream); } else { - /* Nothing to do: need to drain txbuf first. */ + /* Nothing to do: need to drain txbuf first. + * vconn_stream_run_wait() will arrange to wake up when there room + * to send data, so there's no point in calling poll_fd_wait() + * redundantly here. */ } break; case WAIT_RECV: - poll_fd_wait(s->fd, POLLIN); + stream_recv_wait(s->stream); break; default: NOT_REACHED(); } } - -static struct vconn_class stream_vconn_class = { - "stream", /* name */ - NULL, /* open */ - stream_close, /* close */ - stream_connect, /* connect */ - stream_recv, /* recv */ - stream_send, /* send */ - stream_wait, /* wait */ -}; /* Passive stream socket vconn. */ -struct pstream_pvconn +struct pvconn_pstream { struct pvconn pvconn; - int fd; - int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, - struct vconn **); + struct pstream *pstream; }; static struct pvconn_class pstream_pvconn_class; -static struct pstream_pvconn * -pstream_pvconn_cast(struct pvconn *pvconn) +static struct pvconn_pstream * +pvconn_pstream_cast(struct pvconn *pvconn) { - pvconn_assert_class(pvconn, &pstream_pvconn_class); - return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn); + return CONTAINER_OF(pvconn, struct pvconn_pstream, pvconn); } -int -new_pstream_pvconn(const char *name, int fd, - int (*accept_cb)(int fd, const struct sockaddr *, - size_t sa_len, struct vconn **), - struct pvconn **pvconnp) +/* Creates a new pvconn named 'name' that will accept new connections using + * pstream_accept() and stores a pointer to the pvconn in '*pvconnp'. + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +static int +pvconn_pstream_listen(const char *name_, char *suffix OVS_UNUSED, + struct pvconn **pvconnp) { - struct pstream_pvconn *ps; - int retval; - - retval = set_nonblocking(fd); - if (retval) { - close(fd); - return retval; + struct pvconn_pstream *ps; + struct pstream *pstream; + char *name; + int error; + + if (!strncmp(name_, "ptcp:", 5) && count_fields(name_) < 2) { + name = xasprintf("%s:%d", name_, OFP_TCP_PORT); + } else if (!strncmp(name_, "pssl:", 5) && count_fields(name_) < 2) { + name = xasprintf("%s:%d", name_, OFP_SSL_PORT); + } else { + name = xstrdup(name_); } - - if (listen(fd, 10) < 0) { - int error = errno; - VLOG_ERR("%s: listen: %s", name, strerror(error)); - close(fd); + error = pstream_open(name, &pstream); + free(name); + if (error) { return error; } ps = xmalloc(sizeof *ps); - pvconn_init(&ps->pvconn, &pstream_pvconn_class, name); - ps->fd = fd; - ps->accept_cb = accept_cb; + pvconn_init(&ps->pvconn, &pstream_pvconn_class, name_); + ps->pstream = pstream; *pvconnp = &ps->pvconn; return 0; } static void -pstream_close(struct pvconn *pvconn) +pvconn_pstream_close(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - close(ps->fd); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_close(ps->pstream); free(ps); } static int -pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) +pvconn_pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - struct sockaddr_storage ss; - socklen_t ss_len = sizeof ss; - int new_fd; - int retval; - - new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); - if (new_fd < 0) { - int retval = errno; - if (retval != EAGAIN) { - VLOG_DBG_RL(&rl, "accept: %s", strerror(retval)); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + struct stream *stream; + int error; + + error = pstream_accept(ps->pstream, &stream); + if (error) { + if (error != EAGAIN) { + VLOG_DBG_RL(&rl, "%s: accept: %s", + pstream_get_name(ps->pstream), strerror(error)); } - return retval; - } - - retval = set_nonblocking(new_fd); - if (retval) { - close(new_fd); - return retval; + return error; } - return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, - new_vconnp); + *new_vconnp = vconn_stream_new(stream, 0); + return 0; } static void -pstream_wait(struct pvconn *pvconn) +pvconn_pstream_wait(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - poll_fd_wait(ps->fd, POLLIN); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_wait(ps->pstream); } + +static int +count_fields(const char *s_) +{ + char *s, *field, *save_ptr; + int n = 0; + + save_ptr = NULL; + s = xstrdup(s_); + for (field = strtok_r(s, ":", &save_ptr); field != NULL; + field = strtok_r(NULL, ":", &save_ptr)) { + n++; + } + free(s); -static struct pvconn_class pstream_pvconn_class = { - "pstream", - NULL, - pstream_close, - pstream_accept, - pstream_wait -}; + return n; +} + +/* Stream-based vconns and pvconns. */ + +#define DEFINE_VCONN_STREAM_CLASS(NAME) \ + struct vconn_class NAME##_vconn_class = { \ + #NAME, \ + vconn_stream_open, \ + vconn_stream_close, \ + vconn_stream_connect, \ + vconn_stream_recv, \ + vconn_stream_send, \ + vconn_stream_run, \ + vconn_stream_run_wait, \ + vconn_stream_wait, \ + }; + +#define DEFINE_PVCONN_STREAM_CLASS(NAME) \ + struct pvconn_class NAME##_pvconn_class = { \ + #NAME, \ + pvconn_pstream_listen, \ + pvconn_pstream_close, \ + pvconn_pstream_accept, \ + pvconn_pstream_wait \ + }; + +static DEFINE_VCONN_STREAM_CLASS(stream); +static DEFINE_PVCONN_STREAM_CLASS(pstream); + +DEFINE_VCONN_STREAM_CLASS(tcp); +DEFINE_PVCONN_STREAM_CLASS(ptcp); + +DEFINE_VCONN_STREAM_CLASS(unix); +DEFINE_PVCONN_STREAM_CLASS(punix); + +#ifdef HAVE_OPENSSL +DEFINE_VCONN_STREAM_CLASS(ssl); +DEFINE_PVCONN_STREAM_CLASS(pssl); +#endif