Merge citrix branch into master.
[sliver-openvswitch.git] / lib / vconn-stream.c
1 /*
2  * Copyright (c) 2008, 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "leak-checker.h"
27 #include "ofpbuf.h"
28 #include "openflow/openflow.h"
29 #include "poll-loop.h"
30 #include "socket-util.h"
31 #include "util.h"
32 #include "vconn-provider.h"
33 #include "vconn.h"
34
35 #include "vlog.h"
36 #define THIS_MODULE VLM_vconn_stream
37
38 /* Active stream socket vconn. */
39
40 struct stream_vconn
41 {
42     struct vconn vconn;
43     int fd;
44     struct ofpbuf *rxbuf;
45     struct ofpbuf *txbuf;
46     struct poll_waiter *tx_waiter;
47 };
48
49 static struct vconn_class stream_vconn_class;
50
51 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
52
53 static void stream_clear_txbuf(struct stream_vconn *);
54
55 int
56 new_stream_vconn(const char *name, int fd, int connect_status,
57                  bool reconnectable, struct vconn **vconnp)
58 {
59     struct stream_vconn *s;
60
61     s = xmalloc(sizeof *s);
62     vconn_init(&s->vconn, &stream_vconn_class, connect_status,
63                name, reconnectable);
64     s->fd = fd;
65     s->txbuf = NULL;
66     s->tx_waiter = NULL;
67     s->rxbuf = NULL;
68     *vconnp = &s->vconn;
69     return 0;
70 }
71
72 static struct stream_vconn *
73 stream_vconn_cast(struct vconn *vconn)
74 {
75     vconn_assert_class(vconn, &stream_vconn_class);
76     return CONTAINER_OF(vconn, struct stream_vconn, vconn);
77 }
78
79 static void
80 stream_close(struct vconn *vconn)
81 {
82     struct stream_vconn *s = stream_vconn_cast(vconn);
83     poll_cancel(s->tx_waiter);
84     stream_clear_txbuf(s);
85     ofpbuf_delete(s->rxbuf);
86     close(s->fd);
87     free(s);
88 }
89
90 static int
91 stream_connect(struct vconn *vconn)
92 {
93     struct stream_vconn *s = stream_vconn_cast(vconn);
94     return check_connection_completion(s->fd);
95 }
96
97 static int
98 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
99 {
100     struct stream_vconn *s = stream_vconn_cast(vconn);
101     struct ofpbuf *rx;
102     size_t want_bytes;
103     ssize_t retval;
104
105     if (s->rxbuf == NULL) {
106         s->rxbuf = ofpbuf_new(1564);
107     }
108     rx = s->rxbuf;
109
110 again:
111     if (sizeof(struct ofp_header) > rx->size) {
112         want_bytes = sizeof(struct ofp_header) - rx->size;
113     } else {
114         struct ofp_header *oh = rx->data;
115         size_t length = ntohs(oh->length);
116         if (length < sizeof(struct ofp_header)) {
117             VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
118                         length);
119             return EPROTO;
120         }
121         want_bytes = length - rx->size;
122         if (!want_bytes) {
123             *bufferp = rx;
124             s->rxbuf = NULL;
125             return 0;
126         }
127     }
128     ofpbuf_prealloc_tailroom(rx, want_bytes);
129
130     retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
131     if (retval > 0) {
132         rx->size += retval;
133         if (retval == want_bytes) {
134             if (rx->size > sizeof(struct ofp_header)) {
135                 *bufferp = rx;
136                 s->rxbuf = NULL;
137                 return 0;
138             } else {
139                 goto again;
140             }
141         }
142         return EAGAIN;
143     } else if (retval == 0) {
144         if (rx->size) {
145             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
146             return EPROTO;
147         } else {
148             return EOF;
149         }
150     } else {
151         return errno;
152     }
153 }
154
155 static void
156 stream_clear_txbuf(struct stream_vconn *s)
157 {
158     ofpbuf_delete(s->txbuf);
159     s->txbuf = NULL;
160     s->tx_waiter = NULL;
161 }
162
163 static void
164 stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
165 {
166     struct vconn *vconn = vconn_;
167     struct stream_vconn *s = stream_vconn_cast(vconn);
168     ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size);
169     if (n < 0) {
170         if (errno != EAGAIN) {
171             VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
172             stream_clear_txbuf(s);
173             return;
174         }
175     } else if (n > 0) {
176         ofpbuf_pull(s->txbuf, n);
177         if (!s->txbuf->size) {
178             stream_clear_txbuf(s);
179             return;
180         }
181     }
182     s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
183 }
184
185 static int
186 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
187 {
188     struct stream_vconn *s = stream_vconn_cast(vconn);
189     ssize_t retval;
190
191     if (s->txbuf) {
192         return EAGAIN;
193     }
194
195     retval = write(s->fd, buffer->data, buffer->size);
196     if (retval == buffer->size) {
197         ofpbuf_delete(buffer);
198         return 0;
199     } else if (retval >= 0 || errno == EAGAIN) {
200         leak_checker_claim(buffer);
201         s->txbuf = buffer;
202         if (retval > 0) {
203             ofpbuf_pull(buffer, retval);
204         }
205         s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
206         return 0;
207     } else {
208         return errno;
209     }
210 }
211
212 static void
213 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
214 {
215     struct stream_vconn *s = stream_vconn_cast(vconn);
216     switch (wait) {
217     case WAIT_CONNECT:
218         poll_fd_wait(s->fd, POLLOUT);
219         break;
220
221     case WAIT_SEND:
222         if (!s->txbuf) {
223             poll_fd_wait(s->fd, POLLOUT);
224         } else {
225             /* Nothing to do: need to drain txbuf first. */
226         }
227         break;
228
229     case WAIT_RECV:
230         poll_fd_wait(s->fd, POLLIN);
231         break;
232
233     default:
234         NOT_REACHED();
235     }
236 }
237
238 static struct vconn_class stream_vconn_class = {
239     "stream",                   /* name */
240     NULL,                       /* open */
241     stream_close,               /* close */
242     stream_connect,             /* connect */
243     stream_recv,                /* recv */
244     stream_send,                /* send */
245     stream_wait,                /* wait */
246 };
247 \f
248 /* Passive stream socket vconn. */
249
250 struct pstream_pvconn
251 {
252     struct pvconn pvconn;
253     int fd;
254     int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
255                      struct vconn **);
256 };
257
258 static struct pvconn_class pstream_pvconn_class;
259
260 static struct pstream_pvconn *
261 pstream_pvconn_cast(struct pvconn *pvconn)
262 {
263     pvconn_assert_class(pvconn, &pstream_pvconn_class);
264     return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
265 }
266
267 int
268 new_pstream_pvconn(const char *name, int fd,
269                   int (*accept_cb)(int fd, const struct sockaddr *,
270                                    size_t sa_len, struct vconn **),
271                   struct pvconn **pvconnp)
272 {
273     struct pstream_pvconn *ps = xmalloc(sizeof *ps);
274     pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
275     ps->fd = fd;
276     ps->accept_cb = accept_cb;
277     *pvconnp = &ps->pvconn;
278     return 0;
279 }
280
281 static void
282 pstream_close(struct pvconn *pvconn)
283 {
284     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
285     close(ps->fd);
286     free(ps);
287 }
288
289 static int
290 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
291 {
292     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
293     struct sockaddr_storage ss;
294     socklen_t ss_len = sizeof ss;
295     int new_fd;
296     int retval;
297
298     new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
299     if (new_fd < 0) {
300         int retval = errno;
301         if (retval != EAGAIN) {
302             VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
303         }
304         return retval;
305     }
306
307     retval = set_nonblocking(new_fd);
308     if (retval) {
309         close(new_fd);
310         return retval;
311     }
312
313     return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
314                          new_vconnp);
315 }
316
317 static void
318 pstream_wait(struct pvconn *pvconn)
319 {
320     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
321     poll_fd_wait(ps->fd, POLLIN);
322 }
323
324 static struct pvconn_class pstream_pvconn_class = {
325     "pstream",
326     NULL,
327     pstream_close,
328     pstream_accept,
329     pstream_wait
330 };