Have rconn and vconn export information about IPs and ports
[sliver-openvswitch.git] / lib / vconn-stream.c
1 /*
2  * Copyright (c) 2008, 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "leak-checker.h"
27 #include "ofpbuf.h"
28 #include "openflow/openflow.h"
29 #include "poll-loop.h"
30 #include "socket-util.h"
31 #include "util.h"
32 #include "vconn-provider.h"
33 #include "vconn.h"
34
35 #include "vlog.h"
36 #define THIS_MODULE VLM_vconn_stream
37
38 /* Active stream socket vconn. */
39
40 struct stream_vconn
41 {
42     struct vconn vconn;
43     int fd;
44     void (*connect_success_cb)(struct vconn *, int);
45     struct ofpbuf *rxbuf;
46     struct ofpbuf *txbuf;
47     struct poll_waiter *tx_waiter;
48 };
49
50 static struct vconn_class stream_vconn_class;
51
52 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
53
54 static void stream_clear_txbuf(struct stream_vconn *);
55
56 int
57 new_stream_vconn(const char *name, int fd, int connect_status,
58                  uint32_t remote_ip, uint16_t remote_port, 
59                  bool reconnectable, 
60                  connect_success_cb_func *connect_success_cb,
61                  struct vconn **vconnp)
62 {
63     struct stream_vconn *s;
64
65     s = xmalloc(sizeof *s);
66     vconn_init(&s->vconn, &stream_vconn_class, connect_status, remote_ip, 
67                remote_port, name, reconnectable);
68     s->fd = fd;
69     s->txbuf = NULL;
70     s->tx_waiter = NULL;
71     s->rxbuf = NULL;
72     s->connect_success_cb = connect_success_cb;
73     *vconnp = &s->vconn;
74     return 0;
75 }
76
77 static struct stream_vconn *
78 stream_vconn_cast(struct vconn *vconn)
79 {
80     vconn_assert_class(vconn, &stream_vconn_class);
81     return CONTAINER_OF(vconn, struct stream_vconn, vconn);
82 }
83
84 static void
85 stream_close(struct vconn *vconn)
86 {
87     struct stream_vconn *s = stream_vconn_cast(vconn);
88     poll_cancel(s->tx_waiter);
89     stream_clear_txbuf(s);
90     ofpbuf_delete(s->rxbuf);
91     close(s->fd);
92     free(s);
93 }
94
95 static int
96 stream_connect(struct vconn *vconn)
97 {
98     struct stream_vconn *s = stream_vconn_cast(vconn);
99     int retval = check_connection_completion(s->fd);
100     if (retval) {
101         return retval;
102     }
103     if (s->connect_success_cb) {
104         s->connect_success_cb(vconn, s->fd);
105     }
106     return 0;
107 }
108
109 static int
110 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
111 {
112     struct stream_vconn *s = stream_vconn_cast(vconn);
113     struct ofpbuf *rx;
114     size_t want_bytes;
115     ssize_t retval;
116
117     if (s->rxbuf == NULL) {
118         s->rxbuf = ofpbuf_new(1564);
119     }
120     rx = s->rxbuf;
121
122 again:
123     if (sizeof(struct ofp_header) > rx->size) {
124         want_bytes = sizeof(struct ofp_header) - rx->size;
125     } else {
126         struct ofp_header *oh = rx->data;
127         size_t length = ntohs(oh->length);
128         if (length < sizeof(struct ofp_header)) {
129             VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
130                         length);
131             return EPROTO;
132         }
133         want_bytes = length - rx->size;
134         if (!want_bytes) {
135             *bufferp = rx;
136             s->rxbuf = NULL;
137             return 0;
138         }
139     }
140     ofpbuf_prealloc_tailroom(rx, want_bytes);
141
142     retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
143     if (retval > 0) {
144         rx->size += retval;
145         if (retval == want_bytes) {
146             if (rx->size > sizeof(struct ofp_header)) {
147                 *bufferp = rx;
148                 s->rxbuf = NULL;
149                 return 0;
150             } else {
151                 goto again;
152             }
153         }
154         return EAGAIN;
155     } else if (retval == 0) {
156         if (rx->size) {
157             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
158             return EPROTO;
159         } else {
160             return EOF;
161         }
162     } else {
163         return errno;
164     }
165 }
166
167 static void
168 stream_clear_txbuf(struct stream_vconn *s)
169 {
170     ofpbuf_delete(s->txbuf);
171     s->txbuf = NULL;
172     s->tx_waiter = NULL;
173 }
174
175 static void
176 stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_)
177 {
178     struct vconn *vconn = vconn_;
179     struct stream_vconn *s = stream_vconn_cast(vconn);
180     ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size);
181     if (n < 0) {
182         if (errno != EAGAIN) {
183             VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
184             stream_clear_txbuf(s);
185             return;
186         }
187     } else if (n > 0) {
188         ofpbuf_pull(s->txbuf, n);
189         if (!s->txbuf->size) {
190             stream_clear_txbuf(s);
191             return;
192         }
193     }
194     s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
195 }
196
197 static int
198 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
199 {
200     struct stream_vconn *s = stream_vconn_cast(vconn);
201     ssize_t retval;
202
203     if (s->txbuf) {
204         return EAGAIN;
205     }
206
207     retval = write(s->fd, buffer->data, buffer->size);
208     if (retval == buffer->size) {
209         ofpbuf_delete(buffer);
210         return 0;
211     } else if (retval >= 0 || errno == EAGAIN) {
212         leak_checker_claim(buffer);
213         s->txbuf = buffer;
214         if (retval > 0) {
215             ofpbuf_pull(buffer, retval);
216         }
217         s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn);
218         return 0;
219     } else {
220         return errno;
221     }
222 }
223
224 static void
225 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
226 {
227     struct stream_vconn *s = stream_vconn_cast(vconn);
228     switch (wait) {
229     case WAIT_CONNECT:
230         poll_fd_wait(s->fd, POLLOUT);
231         break;
232
233     case WAIT_SEND:
234         if (!s->txbuf) {
235             poll_fd_wait(s->fd, POLLOUT);
236         } else {
237             /* Nothing to do: need to drain txbuf first. */
238         }
239         break;
240
241     case WAIT_RECV:
242         poll_fd_wait(s->fd, POLLIN);
243         break;
244
245     default:
246         NOT_REACHED();
247     }
248 }
249
250 static struct vconn_class stream_vconn_class = {
251     "stream",                   /* name */
252     NULL,                       /* open */
253     stream_close,               /* close */
254     stream_connect,             /* connect */
255     stream_recv,                /* recv */
256     stream_send,                /* send */
257     stream_wait,                /* wait */
258 };
259 \f
260 /* Passive stream socket vconn. */
261
262 struct pstream_pvconn
263 {
264     struct pvconn pvconn;
265     int fd;
266     int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
267                      struct vconn **);
268 };
269
270 static struct pvconn_class pstream_pvconn_class;
271
272 static struct pstream_pvconn *
273 pstream_pvconn_cast(struct pvconn *pvconn)
274 {
275     pvconn_assert_class(pvconn, &pstream_pvconn_class);
276     return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
277 }
278
279 int
280 new_pstream_pvconn(const char *name, int fd,
281                   int (*accept_cb)(int fd, const struct sockaddr *,
282                                    size_t sa_len, struct vconn **),
283                   struct pvconn **pvconnp)
284 {
285     struct pstream_pvconn *ps;
286     int retval;
287
288     retval = set_nonblocking(fd);
289     if (retval) {
290         close(fd);
291         return retval;
292     }
293
294     if (listen(fd, 10) < 0) {
295         int error = errno;
296         VLOG_ERR("%s: listen: %s", name, strerror(error));
297         close(fd);
298         return error;
299     }
300
301     ps = xmalloc(sizeof *ps);
302     pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
303     ps->fd = fd;
304     ps->accept_cb = accept_cb;
305     *pvconnp = &ps->pvconn;
306     return 0;
307 }
308
309 static void
310 pstream_close(struct pvconn *pvconn)
311 {
312     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
313     close(ps->fd);
314     free(ps);
315 }
316
317 static int
318 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
319 {
320     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
321     struct sockaddr_storage ss;
322     socklen_t ss_len = sizeof ss;
323     int new_fd;
324     int retval;
325
326     new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
327     if (new_fd < 0) {
328         int retval = errno;
329         if (retval != EAGAIN) {
330             VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
331         }
332         return retval;
333     }
334
335     retval = set_nonblocking(new_fd);
336     if (retval) {
337         close(new_fd);
338         return retval;
339     }
340
341     return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
342                          new_vconnp);
343 }
344
345 static void
346 pstream_wait(struct pvconn *pvconn)
347 {
348     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
349     poll_fd_wait(ps->fd, POLLIN);
350 }
351
352 static struct pvconn_class pstream_pvconn_class = {
353     "pstream",
354     NULL,
355     pstream_close,
356     pstream_accept,
357     pstream_wait
358 };