/* * Copyright (c) 2008, 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include "fatal-signal.h" #include "leak-checker.h" #include "ofpbuf.h" #include "openflow/openflow.h" #include "poll-loop.h" #include "socket-util.h" #include "stream.h" #include "util.h" #include "vconn-provider.h" #include "vconn.h" #include "vlog.h" #define THIS_MODULE VLM_vconn_stream /* Active stream socket vconn. */ struct vconn_stream { struct vconn vconn; struct stream *stream; struct ofpbuf *rxbuf; struct ofpbuf *txbuf; }; static struct vconn_class stream_vconn_class; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); static void vconn_stream_clear_txbuf(struct vconn_stream *); static int count_fields(const char *); static struct vconn * vconn_stream_new(struct stream *stream, int connect_status) { struct vconn_stream *s; s = xmalloc(sizeof *s); vconn_init(&s->vconn, &stream_vconn_class, connect_status, stream_get_name(stream)); s->stream = stream; s->txbuf = NULL; s->rxbuf = NULL; s->vconn.remote_ip = stream_get_remote_ip(stream); s->vconn.remote_port = stream_get_remote_port(stream); s->vconn.local_ip = stream_get_local_ip(stream); s->vconn.local_port = stream_get_local_port(stream); return &s->vconn; } /* Creates a new vconn that will send and receive data on a stream named 'name' * and stores a pointer to the vconn in '*vconnp'. * * Returns 0 if successful, otherwise a positive errno value. */ static int vconn_stream_open(const char *name_, char *suffix OVS_UNUSED, struct vconn **vconnp) { struct stream *stream; char *name; int error; if (!strncmp(name_, "tcp:", 4) && count_fields(name_) < 3) { name = xasprintf("%s:%d", name_, OFP_TCP_PORT); } else if (!strncmp(name_, "ssl:", 4) && count_fields(name_) < 3) { name = xasprintf("%s:%d", name_, OFP_SSL_PORT); } else { name = xstrdup(name_); } error = stream_open(name, &stream); free(name); if (error && error != EAGAIN) { return error; } *vconnp = vconn_stream_new(stream, error); return 0; } static struct vconn_stream * vconn_stream_cast(struct vconn *vconn) { return CONTAINER_OF(vconn, struct vconn_stream, vconn); } static void vconn_stream_close(struct vconn *vconn) { struct vconn_stream *s = vconn_stream_cast(vconn); stream_close(s->stream); vconn_stream_clear_txbuf(s); ofpbuf_delete(s->rxbuf); free(s); } static int vconn_stream_connect(struct vconn *vconn) { struct vconn_stream *s = vconn_stream_cast(vconn); return stream_connect(s->stream); } static int vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) { struct vconn_stream *s = vconn_stream_cast(vconn); struct ofpbuf *rx; size_t want_bytes; ssize_t retval; if (s->rxbuf == NULL) { s->rxbuf = ofpbuf_new(1564); } rx = s->rxbuf; again: if (sizeof(struct ofp_header) > rx->size) { want_bytes = sizeof(struct ofp_header) - rx->size; } else { struct ofp_header *oh = rx->data; size_t length = ntohs(oh->length); if (length < sizeof(struct ofp_header)) { VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)", length); return EPROTO; } want_bytes = length - rx->size; if (!want_bytes) { *bufferp = rx; s->rxbuf = NULL; return 0; } } ofpbuf_prealloc_tailroom(rx, want_bytes); retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes); if (retval > 0) { rx->size += retval; if (retval == want_bytes) { if (rx->size > sizeof(struct ofp_header)) { *bufferp = rx; s->rxbuf = NULL; return 0; } else { goto again; } } return EAGAIN; } else if (retval == 0) { if (rx->size) { VLOG_ERR_RL(&rl, "connection dropped mid-packet"); return EPROTO; } else { return EOF; } } else { return -retval; } } static void vconn_stream_clear_txbuf(struct vconn_stream *s) { ofpbuf_delete(s->txbuf); s->txbuf = NULL; } static int vconn_stream_send(struct vconn *vconn, struct ofpbuf *buffer) { struct vconn_stream *s = vconn_stream_cast(vconn); ssize_t retval; if (s->txbuf) { return EAGAIN; } retval = stream_send(s->stream, buffer->data, buffer->size); if (retval == buffer->size) { ofpbuf_delete(buffer); return 0; } else if (retval >= 0 || retval == -EAGAIN) { leak_checker_claim(buffer); s->txbuf = buffer; if (retval > 0) { ofpbuf_pull(buffer, retval); } return 0; } else { return -retval; } } static void vconn_stream_run(struct vconn *vconn) { struct vconn_stream *s = vconn_stream_cast(vconn); ssize_t retval; if (!s->txbuf) { return; } retval = stream_send(s->stream, s->txbuf->data, s->txbuf->size); if (retval < 0) { if (retval != -EAGAIN) { VLOG_ERR_RL(&rl, "send: %s", strerror(-retval)); vconn_stream_clear_txbuf(s); return; } } else if (retval > 0) { ofpbuf_pull(s->txbuf, retval); if (!s->txbuf->size) { vconn_stream_clear_txbuf(s); return; } } } static void vconn_stream_run_wait(struct vconn *vconn) { struct vconn_stream *s = vconn_stream_cast(vconn); if (s->txbuf) { stream_send_wait(s->stream); } } static void vconn_stream_wait(struct vconn *vconn, enum vconn_wait_type wait) { struct vconn_stream *s = vconn_stream_cast(vconn); switch (wait) { case WAIT_CONNECT: stream_connect_wait(s->stream); break; case WAIT_SEND: if (!s->txbuf) { stream_send_wait(s->stream); } else { /* Nothing to do: need to drain txbuf first. * vconn_stream_run_wait() will arrange to wake up when there room * to send data, so there's no point in calling poll_fd_wait() * redundantly here. */ } break; case WAIT_RECV: stream_recv_wait(s->stream); break; default: NOT_REACHED(); } } /* Passive stream socket vconn. */ struct pvconn_pstream { struct pvconn pvconn; struct pstream *pstream; }; static struct pvconn_class pstream_pvconn_class; static struct pvconn_pstream * pvconn_pstream_cast(struct pvconn *pvconn) { return CONTAINER_OF(pvconn, struct pvconn_pstream, pvconn); } /* Creates a new pvconn named 'name' that will accept new connections using * pstream_accept() and stores a pointer to the pvconn in '*pvconnp'. * * Returns 0 if successful, otherwise a positive errno value. (The current * implementation never fails.) */ static int pvconn_pstream_listen(const char *name_, char *suffix OVS_UNUSED, struct pvconn **pvconnp) { struct pvconn_pstream *ps; struct pstream *pstream; char *name; int error; if (!strncmp(name_, "ptcp:", 5) && count_fields(name_) < 2) { name = xasprintf("%s%d", name_, OFP_TCP_PORT); } else if (!strncmp(name_, "pssl:", 5) && count_fields(name_) < 2) { name = xasprintf("%s%d", name_, OFP_SSL_PORT); } else { name = xstrdup(name_); } error = pstream_open(name, &pstream); free(name); if (error) { return error; } ps = xmalloc(sizeof *ps); pvconn_init(&ps->pvconn, &pstream_pvconn_class, name_); ps->pstream = pstream; *pvconnp = &ps->pvconn; return 0; } static void pvconn_pstream_close(struct pvconn *pvconn) { struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); pstream_close(ps->pstream); free(ps); } static int pvconn_pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) { struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); struct stream *stream; int error; error = pstream_accept(ps->pstream, &stream); if (error) { if (error != EAGAIN) { VLOG_DBG_RL(&rl, "%s: accept: %s", pstream_get_name(ps->pstream), strerror(error)); } return error; } *new_vconnp = vconn_stream_new(stream, 0); return 0; } static void pvconn_pstream_wait(struct pvconn *pvconn) { struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); pstream_wait(ps->pstream); } static int count_fields(const char *s_) { char *s, *field, *save_ptr; int n = 0; save_ptr = NULL; s = xstrdup(s_); for (field = strtok_r(s, ":", &save_ptr); field != NULL; field = strtok_r(NULL, ":", &save_ptr)) { n++; } free(s); return n; } /* Stream-based vconns and pvconns. */ #define DEFINE_VCONN_STREAM_CLASS(NAME) \ struct vconn_class NAME##_vconn_class = { \ #NAME, \ vconn_stream_open, \ vconn_stream_close, \ vconn_stream_connect, \ vconn_stream_recv, \ vconn_stream_send, \ vconn_stream_run, \ vconn_stream_run_wait, \ vconn_stream_wait, \ }; #define DEFINE_PVCONN_STREAM_CLASS(NAME) \ struct pvconn_class NAME##_pvconn_class = { \ #NAME, \ pvconn_pstream_listen, \ pvconn_pstream_close, \ pvconn_pstream_accept, \ pvconn_pstream_wait \ }; static DEFINE_VCONN_STREAM_CLASS(stream); static DEFINE_PVCONN_STREAM_CLASS(pstream); DEFINE_VCONN_STREAM_CLASS(tcp); DEFINE_PVCONN_STREAM_CLASS(ptcp); DEFINE_VCONN_STREAM_CLASS(unix); DEFINE_PVCONN_STREAM_CLASS(punix); #ifdef HAVE_OPENSSL DEFINE_VCONN_STREAM_CLASS(ssl); DEFINE_PVCONN_STREAM_CLASS(pssl); #endif