vconn: Remove unused "reconnectable" member from vconn.
[sliver-openvswitch.git] / lib / vconn-stream.c
1 /*
2  * Copyright (c) 2008, 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "leak-checker.h"
27 #include "ofpbuf.h"
28 #include "openflow/openflow.h"
29 #include "poll-loop.h"
30 #include "socket-util.h"
31 #include "util.h"
32 #include "vconn-provider.h"
33 #include "vconn.h"
34
35 #include "vlog.h"
36 #define THIS_MODULE VLM_vconn_stream
37
38 /* Active stream socket vconn. */
39
40 struct stream_vconn
41 {
42     struct vconn vconn;
43     int fd;
44     struct ofpbuf *rxbuf;
45     struct ofpbuf *txbuf;
46     struct poll_waiter *tx_waiter;
47 };
48
49 static struct vconn_class stream_vconn_class;
50
51 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
52
53 static void stream_clear_txbuf(struct stream_vconn *);
54
55 int
56 new_stream_vconn(const char *name, int fd, int connect_status,
57                  struct vconn **vconnp)
58 {
59     struct stream_vconn *s;
60
61     s = xmalloc(sizeof *s);
62     vconn_init(&s->vconn, &stream_vconn_class, connect_status, name);
63     s->fd = fd;
64     s->txbuf = NULL;
65     s->tx_waiter = NULL;
66     s->rxbuf = NULL;
67     *vconnp = &s->vconn;
68     return 0;
69 }
70
71 static struct stream_vconn *
72 stream_vconn_cast(struct vconn *vconn)
73 {
74     vconn_assert_class(vconn, &stream_vconn_class);
75     return CONTAINER_OF(vconn, struct stream_vconn, vconn);
76 }
77
78 static void
79 stream_close(struct vconn *vconn)
80 {
81     struct stream_vconn *s = stream_vconn_cast(vconn);
82     poll_cancel(s->tx_waiter);
83     stream_clear_txbuf(s);
84     ofpbuf_delete(s->rxbuf);
85     close(s->fd);
86     free(s);
87 }
88
89 static int
90 stream_connect(struct vconn *vconn)
91 {
92     struct stream_vconn *s = stream_vconn_cast(vconn);
93     return check_connection_completion(s->fd);
94 }
95
96 static int
97 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
98 {
99     struct stream_vconn *s = stream_vconn_cast(vconn);
100     struct ofpbuf *rx;
101     size_t want_bytes;
102     ssize_t retval;
103
104     if (s->rxbuf == NULL) {
105         s->rxbuf = ofpbuf_new(1564);
106     }
107     rx = s->rxbuf;
108
109 again:
110     if (sizeof(struct ofp_header) > rx->size) {
111         want_bytes = sizeof(struct ofp_header) - rx->size;
112     } else {
113         struct ofp_header *oh = rx->data;
114         size_t length = ntohs(oh->length);
115         if (length < sizeof(struct ofp_header)) {
116             VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
117                         length);
118             return EPROTO;
119         }
120         want_bytes = length - rx->size;
121         if (!want_bytes) {
122             *bufferp = rx;
123             s->rxbuf = NULL;
124             return 0;
125         }
126     }
127     ofpbuf_prealloc_tailroom(rx, want_bytes);
128
129     retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
130     if (retval > 0) {
131         rx->size += retval;
132         if (retval == want_bytes) {
133             if (rx->size > sizeof(struct ofp_header)) {
134                 *bufferp = rx;
135                 s->rxbuf = NULL;
136                 return 0;
137             } else {
138                 goto again;
139             }
140         }
141         return EAGAIN;
142     } else if (retval == 0) {
143         if (rx->size) {
144             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
145             return EPROTO;
146         } else {
147             return EOF;
148         }
149     } else {
150         return errno;
151     }
152 }
153
154 static void
155 stream_clear_txbuf(struct stream_vconn *s)
156 {
157     ofpbuf_delete(s->txbuf);
158     s->txbuf = NULL;
159     s->tx_waiter = NULL;
160 }
161
162 static void
163 stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
164 {
165     struct vconn *vconn = vconn_;
166     struct stream_vconn *s = stream_vconn_cast(vconn);
167     ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size);
168     if (n < 0) {
169         if (errno != EAGAIN) {
170             VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
171             stream_clear_txbuf(s);
172             return;
173         }
174     } else if (n > 0) {
175         ofpbuf_pull(s->txbuf, n);
176         if (!s->txbuf->size) {
177             stream_clear_txbuf(s);
178             return;
179         }
180     }
181     s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
182 }
183
184 static int
185 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
186 {
187     struct stream_vconn *s = stream_vconn_cast(vconn);
188     ssize_t retval;
189
190     if (s->txbuf) {
191         return EAGAIN;
192     }
193
194     retval = write(s->fd, buffer->data, buffer->size);
195     if (retval == buffer->size) {
196         ofpbuf_delete(buffer);
197         return 0;
198     } else if (retval >= 0 || errno == EAGAIN) {
199         leak_checker_claim(buffer);
200         s->txbuf = buffer;
201         if (retval > 0) {
202             ofpbuf_pull(buffer, retval);
203         }
204         s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
205         return 0;
206     } else {
207         return errno;
208     }
209 }
210
211 static void
212 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
213 {
214     struct stream_vconn *s = stream_vconn_cast(vconn);
215     switch (wait) {
216     case WAIT_CONNECT:
217         poll_fd_wait(s->fd, POLLOUT);
218         break;
219
220     case WAIT_SEND:
221         if (!s->txbuf) {
222             poll_fd_wait(s->fd, POLLOUT);
223         } else {
224             /* Nothing to do: need to drain txbuf first. */
225         }
226         break;
227
228     case WAIT_RECV:
229         poll_fd_wait(s->fd, POLLIN);
230         break;
231
232     default:
233         NOT_REACHED();
234     }
235 }
236
237 static struct vconn_class stream_vconn_class = {
238     "stream",                   /* name */
239     NULL,                       /* open */
240     stream_close,               /* close */
241     stream_connect,             /* connect */
242     stream_recv,                /* recv */
243     stream_send,                /* send */
244     stream_wait,                /* wait */
245 };
246 \f
247 /* Passive stream socket vconn. */
248
249 struct pstream_pvconn
250 {
251     struct pvconn pvconn;
252     int fd;
253     int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
254                      struct vconn **);
255 };
256
257 static struct pvconn_class pstream_pvconn_class;
258
259 static struct pstream_pvconn *
260 pstream_pvconn_cast(struct pvconn *pvconn)
261 {
262     pvconn_assert_class(pvconn, &pstream_pvconn_class);
263     return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
264 }
265
266 int
267 new_pstream_pvconn(const char *name, int fd,
268                   int (*accept_cb)(int fd, const struct sockaddr *,
269                                    size_t sa_len, struct vconn **),
270                   struct pvconn **pvconnp)
271 {
272     struct pstream_pvconn *ps = xmalloc(sizeof *ps);
273     pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
274     ps->fd = fd;
275     ps->accept_cb = accept_cb;
276     *pvconnp = &ps->pvconn;
277     return 0;
278 }
279
280 static void
281 pstream_close(struct pvconn *pvconn)
282 {
283     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
284     close(ps->fd);
285     free(ps);
286 }
287
288 static int
289 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
290 {
291     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
292     struct sockaddr_storage ss;
293     socklen_t ss_len = sizeof ss;
294     int new_fd;
295     int retval;
296
297     new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
298     if (new_fd < 0) {
299         int retval = errno;
300         if (retval != EAGAIN) {
301             VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
302         }
303         return retval;
304     }
305
306     retval = set_nonblocking(new_fd);
307     if (retval) {
308         close(new_fd);
309         return retval;
310     }
311
312     return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
313                          new_vconnp);
314 }
315
316 static void
317 pstream_wait(struct pvconn *pvconn)
318 {
319     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
320     poll_fd_wait(ps->fd, POLLIN);
321 }
322
323 static struct pvconn_class pstream_pvconn_class = {
324     "pstream",
325     NULL,
326     pstream_close,
327     pstream_accept,
328     pstream_wait
329 };