667a23ff370facf6d8266aa976afc993bd992d6b
[sliver-openvswitch.git] / lib / stream.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "stream-provider.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <netinet/in.h>
23 #include <poll.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include "coverage.h"
27 #include "dynamic-string.h"
28 #include "fatal-signal.h"
29 #include "flow.h"
30 #include "ofp-print.h"
31 #include "ofpbuf.h"
32 #include "openflow/nicira-ext.h"
33 #include "openflow/openflow.h"
34 #include "packets.h"
35 #include "poll-loop.h"
36 #include "random.h"
37 #include "util.h"
38
39 #define THIS_MODULE VLM_stream
40 #include "vlog.h"
41
42 /* State of an active stream.*/
43 enum stream_state {
44     SCS_CONNECTING,             /* Underlying stream is not connected. */
45     SCS_CONNECTED,              /* Connection established. */
46     SCS_DISCONNECTED            /* Connection failed or connection closed. */
47 };
48
49 static struct stream_class *stream_classes[] = {
50     &tcp_stream_class,
51     &unix_stream_class,
52 #ifdef HAVE_OPENSSL
53     &ssl_stream_class,
54 #endif
55 };
56
57 static struct pstream_class *pstream_classes[] = {
58     &ptcp_pstream_class,
59     &punix_pstream_class,
60 #ifdef HAVE_OPENSSL
61     &pssl_pstream_class,
62 #endif
63 };
64
65 /* Check the validity of the stream class structures. */
66 static void
67 check_stream_classes(void)
68 {
69 #ifndef NDEBUG
70     size_t i;
71
72     for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
73         struct stream_class *class = stream_classes[i];
74         assert(class->name != NULL);
75         assert(class->open != NULL);
76         if (class->close || class->recv || class->send || class->run
77             || class->run_wait || class->wait) {
78             assert(class->close != NULL);
79             assert(class->recv != NULL);
80             assert(class->send != NULL);
81             assert(class->wait != NULL);
82         } else {
83             /* This class delegates to another one. */
84         }
85     }
86
87     for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
88         struct pstream_class *class = pstream_classes[i];
89         assert(class->name != NULL);
90         assert(class->listen != NULL);
91         if (class->close || class->accept || class->wait) {
92             assert(class->close != NULL);
93             assert(class->accept != NULL);
94             assert(class->wait != NULL);
95         } else {
96             /* This class delegates to another one. */
97         }
98     }
99 #endif
100 }
101
102 /* Prints information on active (if 'active') and passive (if 'passive')
103  * connection methods supported by the stream. */
104 void
105 stream_usage(const char *name, bool active, bool passive,
106              bool bootstrap OVS_UNUSED)
107 {
108     /* Really this should be implemented via callbacks into the stream
109      * providers, but that seems too heavy-weight to bother with at the
110      * moment. */
111
112     printf("\n");
113     if (active) {
114         printf("Active %s connection methods:\n", name);
115         printf("  tcp:IP:PORT             "
116                "PORT at remote IP\n");
117 #ifdef HAVE_OPENSSL
118         printf("  ssl:IP:PORT             "
119                "SSL PORT at remote IP\n");
120 #endif
121         printf("  unix:FILE               "
122                "Unix domain socket named FILE\n");
123     }
124
125     if (passive) {
126         printf("Passive %s connection methods:\n", name);
127         printf("  ptcp:PORT[:IP]          "
128                "listen to TCP PORT on IP\n");
129 #ifdef HAVE_OPENSSL
130         printf("  pssl:PORT[:IP]          "
131                "listen for SSL on PORT on IP\n");
132 #endif
133         printf("  punix:FILE              "
134                "listen on Unix domain socket FILE\n");
135     }
136
137 #ifdef HAVE_OPENSSL
138     printf("PKI configuration (required to use SSL):\n"
139            "  -p, --private-key=FILE  file with private key\n"
140            "  -c, --certificate=FILE  file with certificate for private key\n"
141            "  -C, --ca-cert=FILE      file with peer CA certificate\n");
142     if (bootstrap) {
143         printf("  --bootstrap-ca-cert=FILE  file with peer CA certificate "
144                "to read or create\n");
145     }
146 #endif
147 }
148
149 /* Given 'name', a stream name in the form "TYPE:ARGS", stores the class
150  * named "TYPE" into '*classp' and returns 0.  Returns EAFNOSUPPORT and stores
151  * a null pointer into '*classp' if 'name' is in the wrong form or if no such
152  * class exists. */
153 static int
154 stream_lookup_class(const char *name, struct stream_class **classp)
155 {
156     size_t prefix_len;
157     size_t i;
158
159     check_stream_classes();
160
161     *classp = NULL;
162     prefix_len = strcspn(name, ":");
163     if (name[prefix_len] == '\0') {
164         return EAFNOSUPPORT;
165     }
166     for (i = 0; i < ARRAY_SIZE(stream_classes); i++) {
167         struct stream_class *class = stream_classes[i];
168         if (strlen(class->name) == prefix_len
169             && !memcmp(class->name, name, prefix_len)) {
170             *classp = class;
171             return 0;
172         }
173     }
174     return EAFNOSUPPORT;
175 }
176
177 /* Returns 0 if 'name' is a stream name in the form "TYPE:ARGS" and TYPE is
178  * a supported stream type, otherwise EAFNOSUPPORT.  */
179 int
180 stream_verify_name(const char *name)
181 {
182     struct stream_class *class;
183     return stream_lookup_class(name, &class);
184 }
185
186 /* Attempts to connect a stream to a remote peer.  'name' is a connection name
187  * in the form "TYPE:ARGS", where TYPE is an active stream class's name and
188  * ARGS are stream class-specific.
189  *
190  * Returns 0 if successful, otherwise a positive errno value.  If successful,
191  * stores a pointer to the new connection in '*streamp', otherwise a null
192  * pointer.  */
193 int
194 stream_open(const char *name, struct stream **streamp)
195 {
196     struct stream_class *class;
197     struct stream *stream;
198     char *suffix_copy;
199     int error;
200
201     COVERAGE_INC(stream_open);
202
203     /* Look up the class. */
204     error = stream_lookup_class(name, &class);
205     if (!class) {
206         goto error;
207     }
208
209     /* Call class's "open" function. */
210     suffix_copy = xstrdup(strchr(name, ':') + 1);
211     error = class->open(name, suffix_copy, &stream);
212     free(suffix_copy);
213     if (error) {
214         goto error;
215     }
216
217     /* Success. */
218     *streamp = stream;
219     return 0;
220
221 error:
222     *streamp = NULL;
223     return error;
224 }
225
226 /* Blocks until a previously started stream connection attempt succeeds or
227  * fails.  'error' should be the value returned by stream_open() and 'streamp'
228  * should point to the stream pointer set by stream_open().  Returns 0 if
229  * successful, otherwise a positive errno value other than EAGAIN or
230  * EINPROGRESS.  If successful, leaves '*streamp' untouched; on error, closes
231  * '*streamp' and sets '*streamp' to null.
232  *
233  * Typical usage:
234  *   error = stream_open_block(stream_open("tcp:1.2.3.4:5", &stream), &stream);
235  */
236 int
237 stream_open_block(int error, struct stream **streamp)
238 {
239     struct stream *stream = *streamp;
240
241     fatal_signal_run();
242
243     while (error == EAGAIN) {
244         stream_run(stream);
245         stream_run_wait(stream);
246         stream_connect_wait(stream);
247         poll_block();
248         error = stream_connect(stream);
249         assert(error != EINPROGRESS);
250     }
251     if (error) {
252         stream_close(stream);
253         *streamp = NULL;
254     } else {
255         *streamp = stream;
256     }
257     return error;
258 }
259
260 /* Closes 'stream'. */
261 void
262 stream_close(struct stream *stream)
263 {
264     if (stream != NULL) {
265         char *name = stream->name;
266         (stream->class->close)(stream);
267         free(name);
268     }
269 }
270
271 /* Returns the name of 'stream', that is, the string passed to
272  * stream_open(). */
273 const char *
274 stream_get_name(const struct stream *stream)
275 {
276     return stream ? stream->name : "(null)";
277 }
278
279 /* Returns the IP address of the peer, or 0 if the peer is not connected over
280  * an IP-based protocol or if its IP address is not yet known. */
281 uint32_t
282 stream_get_remote_ip(const struct stream *stream)
283 {
284     return stream->remote_ip;
285 }
286
287 /* Returns the transport port of the peer, or 0 if the connection does not
288  * contain a port or if the port is not yet known. */
289 uint16_t
290 stream_get_remote_port(const struct stream *stream)
291 {
292     return stream->remote_port;
293 }
294
295 /* Returns the IP address used to connect to the peer, or 0 if the connection
296  * is not an IP-based protocol or if its IP address is not yet known. */
297 uint32_t
298 stream_get_local_ip(const struct stream *stream)
299 {
300     return stream->local_ip;
301 }
302
303 /* Returns the transport port used to connect to the peer, or 0 if the
304  * connection does not contain a port or if the port is not yet known. */
305 uint16_t
306 stream_get_local_port(const struct stream *stream)
307 {
308     return stream->local_port;
309 }
310
311 static void
312 scs_connecting(struct stream *stream)
313 {
314     int retval = (stream->class->connect)(stream);
315     assert(retval != EINPROGRESS);
316     if (!retval) {
317         stream->state = SCS_CONNECTED;
318     } else if (retval != EAGAIN) {
319         stream->state = SCS_DISCONNECTED;
320         stream->error = retval;
321     }
322 }
323
324 /* Tries to complete the connection on 'stream', which must be an active
325  * stream.  If 'stream''s connection is complete, returns 0 if the connection
326  * was successful or a positive errno value if it failed.  If the
327  * connection is still in progress, returns EAGAIN. */
328 int
329 stream_connect(struct stream *stream)
330 {
331     enum stream_state last_state;
332
333     do {
334         last_state = stream->state;
335         switch (stream->state) {
336         case SCS_CONNECTING:
337             scs_connecting(stream);
338             break;
339
340         case SCS_CONNECTED:
341             return 0;
342
343         case SCS_DISCONNECTED:
344             return stream->error;
345
346         default:
347             NOT_REACHED();
348         }
349     } while (stream->state != last_state);
350
351     return EAGAIN;
352 }
353
354 /* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns:
355  *
356  *     - If successful, the number of bytes received (between 1 and 'n').
357  *
358  *     - On error, a negative errno value.
359  *
360  *     - 0, if the connection has been closed in the normal fashion, or if 'n'
361  *       is zero.
362  *
363  * The recv function will not block waiting for a packet to arrive.  If no
364  * data have been received, it returns -EAGAIN immediately. */
365 int
366 stream_recv(struct stream *stream, void *buffer, size_t n)
367 {
368     int retval = stream_connect(stream);
369     return (retval ? -retval
370             : n == 0 ? 0
371             : (stream->class->recv)(stream, buffer, n));
372 }
373
374 /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
375  *
376  *     - If successful, the number of bytes sent (between 1 and 'n').  0 is
377  *       only a valid return value if 'n' is 0.
378  *
379  *     - On error, a negative errno value.
380  *
381  * The send function will not block.  If no bytes can be immediately accepted
382  * for transmission, it returns -EAGAIN immediately. */
383 int
384 stream_send(struct stream *stream, const void *buffer, size_t n)
385 {
386     int retval = stream_connect(stream);
387     return (retval ? -retval
388             : n == 0 ? 0
389             : (stream->class->send)(stream, buffer, n));
390 }
391
392 /* Allows 'stream' to perform maintenance activities, such as flushing
393  * output buffers. */
394 void
395 stream_run(struct stream *stream)
396 {
397     if (stream->class->run) {
398         (stream->class->run)(stream);
399     }
400 }
401
402 /* Arranges for the poll loop to wake up when 'stream' needs to perform
403  * maintenance activities. */
404 void
405 stream_run_wait(struct stream *stream)
406 {
407     if (stream->class->run_wait) {
408         (stream->class->run_wait)(stream);
409     }
410 }
411
412 /* Arranges for the poll loop to wake up when 'stream' is ready to take an
413  * action of the given 'type'. */
414 void
415 stream_wait(struct stream *stream, enum stream_wait_type wait)
416 {
417     assert(wait == STREAM_CONNECT || wait == STREAM_RECV
418            || wait == STREAM_SEND);
419
420     switch (stream->state) {
421     case SCS_CONNECTING:
422         wait = STREAM_CONNECT;
423         break;
424
425     case SCS_DISCONNECTED:
426         poll_immediate_wake();
427         return;
428     }
429     (stream->class->wait)(stream, wait);
430 }
431
432 void
433 stream_connect_wait(struct stream *stream)
434 {
435     stream_wait(stream, STREAM_CONNECT);
436 }
437
438 void
439 stream_recv_wait(struct stream *stream)
440 {
441     stream_wait(stream, STREAM_RECV);
442 }
443
444 void
445 stream_send_wait(struct stream *stream)
446 {
447     stream_wait(stream, STREAM_SEND);
448 }
449
450 /* Given 'name', a pstream name in the form "TYPE:ARGS", stores the class
451  * named "TYPE" into '*classp' and returns 0.  Returns EAFNOSUPPORT and stores
452  * a null pointer into '*classp' if 'name' is in the wrong form or if no such
453  * class exists. */
454 static int
455 pstream_lookup_class(const char *name, struct pstream_class **classp)
456 {
457     size_t prefix_len;
458     size_t i;
459
460     check_stream_classes();
461
462     *classp = NULL;
463     prefix_len = strcspn(name, ":");
464     if (name[prefix_len] == '\0') {
465         return EAFNOSUPPORT;
466     }
467     for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) {
468         struct pstream_class *class = pstream_classes[i];
469         if (strlen(class->name) == prefix_len
470             && !memcmp(class->name, name, prefix_len)) {
471             *classp = class;
472             return 0;
473         }
474     }
475     return EAFNOSUPPORT;
476 }
477
478 /* Returns 0 if 'name' is a pstream name in the form "TYPE:ARGS" and TYPE is
479  * a supported pstream type, otherwise EAFNOSUPPORT.  */
480 int
481 pstream_verify_name(const char *name)
482 {
483     struct pstream_class *class;
484     return pstream_lookup_class(name, &class);
485 }
486
487 /* Attempts to start listening for remote stream connections.  'name' is a
488  * connection name in the form "TYPE:ARGS", where TYPE is an passive stream
489  * class's name and ARGS are stream class-specific.
490  *
491  * Returns 0 if successful, otherwise a positive errno value.  If successful,
492  * stores a pointer to the new connection in '*pstreamp', otherwise a null
493  * pointer.  */
494 int
495 pstream_open(const char *name, struct pstream **pstreamp)
496 {
497     struct pstream_class *class;
498     struct pstream *pstream;
499     char *suffix_copy;
500     int error;
501
502     COVERAGE_INC(pstream_open);
503
504     /* Look up the class. */
505     error = pstream_lookup_class(name, &class);
506     if (!class) {
507         goto error;
508     }
509
510     /* Call class's "open" function. */
511     suffix_copy = xstrdup(strchr(name, ':') + 1);
512     error = class->listen(name, suffix_copy, &pstream);
513     free(suffix_copy);
514     if (error) {
515         goto error;
516     }
517
518     /* Success. */
519     *pstreamp = pstream;
520     return 0;
521
522 error:
523     *pstreamp = NULL;
524     return error;
525 }
526
527 /* Returns the name that was used to open 'pstream'.  The caller must not
528  * modify or free the name. */
529 const char *
530 pstream_get_name(const struct pstream *pstream)
531 {
532     return pstream->name;
533 }
534
535 /* Closes 'pstream'. */
536 void
537 pstream_close(struct pstream *pstream)
538 {
539     if (pstream != NULL) {
540         char *name = pstream->name;
541         (pstream->class->close)(pstream);
542         free(name);
543     }
544 }
545
546 /* Tries to accept a new connection on 'pstream'.  If successful, stores the
547  * new connection in '*new_stream' and returns 0.  Otherwise, returns a
548  * positive errno value.
549  *
550  * pstream_accept() will not block waiting for a connection.  If no connection
551  * is ready to be accepted, it returns EAGAIN immediately. */
552 int
553 pstream_accept(struct pstream *pstream, struct stream **new_stream)
554 {
555     int retval = (pstream->class->accept)(pstream, new_stream);
556     if (retval) {
557         *new_stream = NULL;
558     } else {
559         assert((*new_stream)->state != SCS_CONNECTING
560                || (*new_stream)->class->connect);
561     }
562     return retval;
563 }
564
565 /* Tries to accept a new connection on 'pstream'.  If successful, stores the
566  * new connection in '*new_stream' and returns 0.  Otherwise, returns a
567  * positive errno value.
568  *
569  * pstream_accept_block() blocks until a connection is ready or until an error
570  * occurs.  It will not return EAGAIN. */
571 int
572 pstream_accept_block(struct pstream *pstream, struct stream **new_stream)
573 {
574     int error;
575
576     fatal_signal_run();
577     while ((error = pstream_accept(pstream, new_stream)) == EAGAIN) {
578         pstream_wait(pstream);
579         poll_block();
580     }
581     if (error) {
582         *new_stream = NULL;
583     }
584     return error;
585 }
586
587 void
588 pstream_wait(struct pstream *pstream)
589 {
590     (pstream->class->wait)(pstream);
591 }
592 \f
593 /* Initializes 'stream' as a new stream named 'name', implemented via 'class'.
594  * The initial connection status, supplied as 'connect_status', is interpreted
595  * as follows:
596  *
597  *      - 0: 'stream' is connected.  Its 'send' and 'recv' functions may be
598  *        called in the normal fashion.
599  *
600  *      - EAGAIN: 'stream' is trying to complete a connection.  Its 'connect'
601  *        function should be called to complete the connection.
602  *
603  *      - Other positive errno values indicate that the connection failed with
604  *        the specified error.
605  *
606  * After calling this function, stream_close() must be used to destroy
607  * 'stream', otherwise resources will be leaked.
608  *
609  * The caller retains ownership of 'name'. */
610 void
611 stream_init(struct stream *stream, struct stream_class *class,
612             int connect_status, const char *name)
613 {
614     stream->class = class;
615     stream->state = (connect_status == EAGAIN ? SCS_CONNECTING
616                     : !connect_status ? SCS_CONNECTED
617                     : SCS_DISCONNECTED);
618     stream->error = connect_status;
619     stream->name = xstrdup(name);
620     assert(stream->state != SCS_CONNECTING || class->connect);
621 }
622
623 void
624 stream_set_remote_ip(struct stream *stream, uint32_t ip)
625 {
626     stream->remote_ip = ip;
627 }
628
629 void
630 stream_set_remote_port(struct stream *stream, uint16_t port)
631 {
632     stream->remote_port = port;
633 }
634
635 void
636 stream_set_local_ip(struct stream *stream, uint32_t ip)
637 {
638     stream->local_ip = ip;
639 }
640
641 void
642 stream_set_local_port(struct stream *stream, uint16_t port)
643 {
644     stream->local_port = port;
645 }
646
647 void
648 pstream_init(struct pstream *pstream, struct pstream_class *class,
649             const char *name)
650 {
651     pstream->class = class;
652     pstream->name = xstrdup(name);
653 }
654 \f
655 static int
656 count_fields(const char *s_)
657 {
658     char *s, *field, *save_ptr;
659     int n = 0;
660
661     save_ptr = NULL;
662     s = xstrdup(s_);
663     for (field = strtok_r(s, ":", &save_ptr); field != NULL;
664          field = strtok_r(NULL, ":", &save_ptr)) {
665         n++;
666     }
667     free(s);
668
669     return n;
670 }
671
672 /* Like stream_open(), but for tcp streams the port defaults to
673  * 'default_tcp_port' if no port number is given and for SSL streams the port
674  * defaults to 'default_ssl_port' if no port number is given. */
675 int
676 stream_open_with_default_ports(const char *name_,
677                                uint16_t default_tcp_port,
678                                uint16_t default_ssl_port,
679                                struct stream **streamp)
680 {
681     char *name;
682     int error;
683
684     if (!strncmp(name_, "tcp:", 4) && count_fields(name_) < 3) {
685         name = xasprintf("%s:%d", name_, default_tcp_port);
686     } else if (!strncmp(name_, "ssl:", 4) && count_fields(name_) < 3) {
687         name = xasprintf("%s:%d", name_, default_ssl_port);
688     } else {
689         name = xstrdup(name_);
690     }
691     error = stream_open(name, streamp);
692     free(name);
693
694     return error;
695 }
696
697 /* Like pstream_open(), but for ptcp streams the port defaults to
698  * 'default_ptcp_port' if no port number is given and for passive SSL streams
699  * the port defaults to 'default_pssl_port' if no port number is given. */
700 int
701 pstream_open_with_default_ports(const char *name_,
702                                 uint16_t default_ptcp_port,
703                                 uint16_t default_pssl_port,
704                                 struct pstream **pstreamp)
705 {
706     char *name;
707     int error;
708
709     if (!strncmp(name_, "ptcp:", 5) && count_fields(name_) < 2) {
710         name = xasprintf("%s%d", name_, default_ptcp_port);
711     } else if (!strncmp(name_, "pssl:", 5) && count_fields(name_) < 2) {
712         name = xasprintf("%s%d", name_, default_pssl_port);
713     } else {
714         name = xstrdup(name_);
715     }
716     error = pstream_open(name, pstreamp);
717     free(name);
718
719     return error;
720 }
721 \f
722 /* Attempts to guess the content type of a stream whose first few bytes were
723  * the 'size' bytes of 'data'. */
724 static enum stream_content_type
725 stream_guess_content(const uint8_t *data, size_t size)
726 {
727     if (size >= 2) {
728 #define PAIR(A, B) (((A) << 8) | (B))
729         switch (PAIR(data[0], data[1])) {
730         case PAIR(0x16, 0x03):  /* Handshake, version 3. */
731             return STREAM_SSL;
732         case PAIR('{', '"'):
733             return STREAM_JSONRPC;
734         case PAIR(OFP_VERSION, OFPT_HELLO):
735             return STREAM_OPENFLOW;
736         }
737     }
738
739     return STREAM_UNKNOWN;
740 }
741
742 /* Returns a string represenation of 'type'. */
743 static const char *
744 stream_content_type_to_string(enum stream_content_type type)
745 {
746     switch (type) {
747     case STREAM_UNKNOWN:
748     default:
749         return "unknown";
750
751     case STREAM_JSONRPC:
752         return "JSON-RPC";
753
754     case STREAM_OPENFLOW:
755         return "OpenFlow";
756
757     case STREAM_SSL:
758         return "SSL";
759     }
760 }
761
762 /* Attempts to guess the content type of a stream whose first few bytes were
763  * the 'size' bytes of 'data'.  If this is done successfully, and the guessed
764  * content type is other than 'expected_type', then log a message in vlog
765  * module 'module', naming 'stream_name' as the source, explaining what
766  * content was expected and what was actually received. */
767 void
768 stream_report_content(const void *data, size_t size,
769                       enum stream_content_type expected_type,
770                       enum vlog_module module, const char *stream_name)
771 {
772     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
773     enum stream_content_type actual_type;
774
775     actual_type = stream_guess_content(data, size);
776     if (actual_type != expected_type && actual_type != STREAM_UNKNOWN) {
777         vlog_rate_limit(module, VLL_WARN, &rl,
778                         "%s: received %s data on %s channel",
779                         stream_name,
780                         stream_content_type_to_string(expected_type),
781                         stream_content_type_to_string(actual_type));
782     }
783 }