d4f5de29ee3586a3e3ba6cf0290fd85f0ff2b1d6
[sliver-openvswitch.git] / lib / stream.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "stream-provider.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <netinet/in.h>
23 #include <poll.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "flow.h"
29 #include "ofp-print.h"
30 #include "ofpbuf.h"
31 #include "openflow/nicira-ext.h"
32 #include "openflow/openflow.h"
33 #include "packets.h"
34 #include "poll-loop.h"
35 #include "random.h"
36 #include "util.h"
37
38 #define THIS_MODULE VLM_stream
39 #include "vlog.h"
40
41 /* State of an active stream.*/
42 enum stream_state {
43     SCS_CONNECTING,             /* Underlying stream is not connected. */
44     SCS_CONNECTED,              /* Connection established. */
45     SCS_DISCONNECTED            /* Connection failed or connection closed. */
46 };
47
48 static struct stream_class *stream_classes[] = {
49     &tcp_stream_class,
50     &unix_stream_class,
51 };
52
53 static struct pstream_class *pstream_classes[] = {
54     &ptcp_pstream_class,
55     &punix_pstream_class,
56 };
57
58 /* Check the validity of the stream class structures. */
59 static void
60 check_stream_classes(void)
61 {
62 #ifndef NDEBUG
63     size_t i;
64
65     for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
66         struct stream_class *class = stream_classes[i];
67         assert(class->name != NULL);
68         assert(class->open != NULL);
69         if (class->close || class->recv || class->send || class->run
70             || class->run_wait || class->wait) {
71             assert(class->close != NULL);
72             assert(class->recv != NULL);
73             assert(class->send != NULL);
74             assert(class->wait != NULL);
75         } else {
76             /* This class delegates to another one. */
77         }
78     }
79
80     for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
81         struct pstream_class *class = pstream_classes[i];
82         assert(class->name != NULL);
83         assert(class->listen != NULL);
84         if (class->close || class->accept || class->wait) {
85             assert(class->close != NULL);
86             assert(class->accept != NULL);
87             assert(class->wait != NULL);
88         } else {
89             /* This class delegates to another one. */
90         }
91     }
92 #endif
93 }
94
95 /* Prints information on active (if 'active') and passive (if 'passive')
96  * connection methods supported by the stream. */
97 void
98 stream_usage(const char *name, bool active, bool passive)
99 {
100     /* Really this should be implemented via callbacks into the stream
101      * providers, but that seems too heavy-weight to bother with at the
102      * moment. */
103
104     printf("\n");
105     if (active) {
106         printf("Active %s connection methods:\n", name);
107         printf("  tcp:IP:PORT             "
108                "PORT at remote IP\n");
109         printf("  unix:FILE               "
110                "Unix domain socket named FILE\n");
111     }
112
113     if (passive) {
114         printf("Passive %s connection methods:\n", name);
115         printf("  ptcp:PORT[:IP]          "
116                "listen to TCP PORT on IP\n");
117         printf("  punix:FILE              "
118                "listen on Unix domain socket FILE\n");
119     }
120 }
121
122 /* Attempts to connect a stream to a remote peer.  'name' is a connection name
123  * in the form "TYPE:ARGS", where TYPE is an active stream class's name and
124  * ARGS are stream class-specific.
125  *
126  * Returns 0 if successful, otherwise a positive errno value.  If successful,
127  * stores a pointer to the new connection in '*streamp', otherwise a null
128  * pointer.  */
129 int
130 stream_open(const char *name, struct stream **streamp)
131 {
132     size_t prefix_len;
133     size_t i;
134
135     COVERAGE_INC(stream_open);
136     check_stream_classes();
137
138     *streamp = NULL;
139     prefix_len = strcspn(name, ":");
140     if (prefix_len == strlen(name)) {
141         return EAFNOSUPPORT;
142     }
143     for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
144         struct stream_class *class = stream_classes[i];
145         if (strlen(class->name) == prefix_len
146             && !memcmp(class->name, name, prefix_len)) {
147             struct stream *stream;
148             char *suffix_copy = xstrdup(name + prefix_len + 1);
149             int retval = class->open(name, suffix_copy, &stream);
150             free(suffix_copy);
151             if (!retval) {
152                 assert(stream->state != SCS_CONNECTING
153                        || stream->class->connect);
154                 *streamp = stream;
155             }
156             return retval;
157         }
158     }
159     return EAFNOSUPPORT;
160 }
161
162 int
163 stream_open_block(const char *name, struct stream **streamp)
164 {
165     struct stream *stream;
166     int error;
167
168     error = stream_open(name, &stream);
169     while (error == EAGAIN) {
170         stream_run(stream);
171         stream_run_wait(stream);
172         stream_connect_wait(stream);
173         poll_block();
174         error = stream_connect(stream);
175         assert(error != EINPROGRESS);
176     }
177     if (error) {
178         stream_close(stream);
179         *streamp = NULL;
180     } else {
181         *streamp = stream;
182     }
183     return error;
184 }
185
186 /* Closes 'stream'. */
187 void
188 stream_close(struct stream *stream)
189 {
190     if (stream != NULL) {
191         char *name = stream->name;
192         (stream->class->close)(stream);
193         free(name);
194     }
195 }
196
197 /* Returns the name of 'stream', that is, the string passed to
198  * stream_open(). */
199 const char *
200 stream_get_name(const struct stream *stream)
201 {
202     return stream ? stream->name : "(null)";
203 }
204
205 /* Returns the IP address of the peer, or 0 if the peer is not connected over
206  * an IP-based protocol or if its IP address is not yet known. */
207 uint32_t
208 stream_get_remote_ip(const struct stream *stream)
209 {
210     return stream->remote_ip;
211 }
212
213 /* Returns the transport port of the peer, or 0 if the connection does not
214  * contain a port or if the port is not yet known. */
215 uint16_t
216 stream_get_remote_port(const struct stream *stream)
217 {
218     return stream->remote_port;
219 }
220
221 /* Returns the IP address used to connect to the peer, or 0 if the connection
222  * is not an IP-based protocol or if its IP address is not yet known. */
223 uint32_t
224 stream_get_local_ip(const struct stream *stream)
225 {
226     return stream->local_ip;
227 }
228
229 /* Returns the transport port used to connect to the peer, or 0 if the
230  * connection does not contain a port or if the port is not yet known. */
231 uint16_t
232 stream_get_local_port(const struct stream *stream)
233 {
234     return stream->local_port;
235 }
236
237 static void
238 scs_connecting(struct stream *stream)
239 {
240     int retval = (stream->class->connect)(stream);
241     assert(retval != EINPROGRESS);
242     if (!retval) {
243         stream->state = SCS_CONNECTED;
244     } else if (retval != EAGAIN) {
245         stream->state = SCS_DISCONNECTED;
246         stream->error = retval;
247     }
248 }
249
250 /* Tries to complete the connection on 'stream', which must be an active
251  * stream.  If 'stream''s connection is complete, returns 0 if the connection
252  * was successful or a positive errno value if it failed.  If the
253  * connection is still in progress, returns EAGAIN. */
254 int
255 stream_connect(struct stream *stream)
256 {
257     enum stream_state last_state;
258
259     do {
260         last_state = stream->state;
261         switch (stream->state) {
262         case SCS_CONNECTING:
263             scs_connecting(stream);
264             break;
265
266         case SCS_CONNECTED:
267             return 0;
268
269         case SCS_DISCONNECTED:
270             return stream->error;
271
272         default:
273             NOT_REACHED();
274         }
275     } while (stream->state != last_state);
276
277     return EAGAIN;
278 }
279
280 /* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns:
281  *
282  *     - If successful, the number of bytes received (between 1 and 'n').
283  *
284  *     - On error, a negative errno value.
285  *
286  *     - 0, if the connection has been closed in the normal fashion, or if 'n'
287  *       is zero.
288  *
289  * The recv function will not block waiting for a packet to arrive.  If no
290  * data have been received, it returns -EAGAIN immediately. */
291 int
292 stream_recv(struct stream *stream, void *buffer, size_t n)
293 {
294     int retval = stream_connect(stream);
295     return (retval ? -retval
296             : n == 0 ? 0
297             : (stream->class->recv)(stream, buffer, n));
298 }
299
300 /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
301  *
302  *     - If successful, the number of bytes sent (between 1 and 'n').  0 is
303  *       only a valid return value if 'n' is 0.
304  *
305  *     - On error, a negative errno value.
306  *
307  * The send function will not block.  If no bytes can be immediately accepted
308  * for transmission, it returns -EAGAIN immediately. */
309 int
310 stream_send(struct stream *stream, const void *buffer, size_t n)
311 {
312     int retval = stream_connect(stream);
313     return (retval ? -retval
314             : n == 0 ? 0
315             : (stream->class->send)(stream, buffer, n));
316 }
317
318 /* Allows 'stream' to perform maintenance activities, such as flushing
319  * output buffers. */
320 void
321 stream_run(struct stream *stream)
322 {
323     if (stream->class->run) {
324         (stream->class->run)(stream);
325     }
326 }
327
328 /* Arranges for the poll loop to wake up when 'stream' needs to perform
329  * maintenance activities. */
330 void
331 stream_run_wait(struct stream *stream)
332 {
333     if (stream->class->run_wait) {
334         (stream->class->run_wait)(stream);
335     }
336 }
337
338 /* Arranges for the poll loop to wake up when 'stream' is ready to take an
339  * action of the given 'type'. */
340 void
341 stream_wait(struct stream *stream, enum stream_wait_type wait)
342 {
343     assert(wait == STREAM_CONNECT || wait == STREAM_RECV
344            || wait == STREAM_SEND);
345
346     switch (stream->state) {
347     case SCS_CONNECTING:
348         wait = STREAM_CONNECT;
349         break;
350
351     case SCS_DISCONNECTED:
352         poll_immediate_wake();
353         return;
354     }
355     (stream->class->wait)(stream, wait);
356 }
357
358 void
359 stream_connect_wait(struct stream *stream)
360 {
361     stream_wait(stream, STREAM_CONNECT);
362 }
363
364 void
365 stream_recv_wait(struct stream *stream)
366 {
367     stream_wait(stream, STREAM_RECV);
368 }
369
370 void
371 stream_send_wait(struct stream *stream)
372 {
373     stream_wait(stream, STREAM_SEND);
374 }
375
376 /* Attempts to start listening for remote stream connections.  'name' is a
377  * connection name in the form "TYPE:ARGS", where TYPE is an passive stream
378  * class's name and ARGS are stream class-specific.
379  *
380  * Returns 0 if successful, otherwise a positive errno value.  If successful,
381  * stores a pointer to the new connection in '*pstreamp', otherwise a null
382  * pointer.  */
383 int
384 pstream_open(const char *name, struct pstream **pstreamp)
385 {
386     size_t prefix_len;
387     size_t i;
388
389     check_stream_classes();
390
391     *pstreamp = NULL;
392     prefix_len = strcspn(name, ":");
393     if (prefix_len == strlen(name)) {
394         return EAFNOSUPPORT;
395     }
396     for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
397         struct pstream_class *class = pstream_classes[i];
398         if (strlen(class->name) == prefix_len
399             && !memcmp(class->name, name, prefix_len)) {
400             char *suffix_copy = xstrdup(name + prefix_len + 1);
401             int retval = class->listen(name, suffix_copy, pstreamp);
402             free(suffix_copy);
403             if (retval) {
404                 *pstreamp = NULL;
405             }
406             return retval;
407         }
408     }
409     return EAFNOSUPPORT;
410 }
411
412 /* Returns the name that was used to open 'pstream'.  The caller must not
413  * modify or free the name. */
414 const char *
415 pstream_get_name(const struct pstream *pstream)
416 {
417     return pstream->name;
418 }
419
420 /* Closes 'pstream'. */
421 void
422 pstream_close(struct pstream *pstream)
423 {
424     if (pstream != NULL) {
425         char *name = pstream->name;
426         (pstream->class->close)(pstream);
427         free(name);
428     }
429 }
430
431 /* Tries to accept a new connection on 'pstream'.  If successful, stores the
432  * new connection in '*new_stream' and returns 0.  Otherwise, returns a
433  * positive errno value.
434  *
435  * pstream_accept() will not block waiting for a connection.  If no connection
436  * is ready to be accepted, it returns EAGAIN immediately. */
437 int
438 pstream_accept(struct pstream *pstream, struct stream **new_stream)
439 {
440     int retval = (pstream->class->accept)(pstream, new_stream);
441     if (retval) {
442         *new_stream = NULL;
443     } else {
444         assert((*new_stream)->state != SCS_CONNECTING
445                || (*new_stream)->class->connect);
446     }
447     return retval;
448 }
449
450 /* Tries to accept a new connection on 'pstream'.  If successful, stores the
451  * new connection in '*new_stream' and returns 0.  Otherwise, returns a
452  * positive errno value.
453  *
454  * pstream_accept_block() blocks until a connection is ready or until an error
455  * occurs.  It will not return EAGAIN. */
456 int
457 pstream_accept_block(struct pstream *pstream, struct stream **new_stream)
458 {
459     int error;
460
461     while ((error = pstream_accept(pstream, new_stream)) == EAGAIN) {
462         pstream_wait(pstream);
463         poll_block();
464     }
465     if (error) {
466         *new_stream = NULL;
467     }
468     return error;
469 }
470
471 void
472 pstream_wait(struct pstream *pstream)
473 {
474     (pstream->class->wait)(pstream);
475 }
476 \f
477 /* Initializes 'stream' as a new stream named 'name', implemented via 'class'.
478  * The initial connection status, supplied as 'connect_status', is interpreted
479  * as follows:
480  *
481  *      - 0: 'stream' is connected.  Its 'send' and 'recv' functions may be
482  *        called in the normal fashion.
483  *
484  *      - EAGAIN: 'stream' is trying to complete a connection.  Its 'connect'
485  *        function should be called to complete the connection.
486  *
487  *      - Other positive errno values indicate that the connection failed with
488  *        the specified error.
489  *
490  * After calling this function, stream_close() must be used to destroy
491  * 'stream', otherwise resources will be leaked.
492  *
493  * The caller retains ownership of 'name'. */
494 void
495 stream_init(struct stream *stream, struct stream_class *class,
496             int connect_status, const char *name)
497 {
498     stream->class = class;
499     stream->state = (connect_status == EAGAIN ? SCS_CONNECTING
500                     : !connect_status ? SCS_CONNECTED
501                     : SCS_DISCONNECTED);
502     stream->error = connect_status;
503     stream->name = xstrdup(name);
504     assert(stream->state != SCS_CONNECTING || class->connect);
505 }
506
507 void
508 stream_set_remote_ip(struct stream *stream, uint32_t ip)
509 {
510     stream->remote_ip = ip;
511 }
512
513 void
514 stream_set_remote_port(struct stream *stream, uint16_t port)
515 {
516     stream->remote_port = port;
517 }
518
519 void
520 stream_set_local_ip(struct stream *stream, uint32_t ip)
521 {
522     stream->local_ip = ip;
523 }
524
525 void
526 stream_set_local_port(struct stream *stream, uint16_t port)
527 {
528     stream->local_port = port;
529 }
530
531 void
532 pstream_init(struct pstream *pstream, struct pstream_class *class,
533             const char *name)
534 {
535     pstream->class = class;
536     pstream->name = xstrdup(name);
537 }