Setting tag sliver-openvswitch-2.2.90-1
[sliver-openvswitch.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <errno.h>
22
23 #include "byteq.h"
24 #include "dynamic-string.h"
25 #include "fatal-signal.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ofpbuf.h"
29 #include "ovs-thread.h"
30 #include "poll-loop.h"
31 #include "reconnect.h"
32 #include "stream.h"
33 #include "timeval.h"
34 #include "vlog.h"
35
36 VLOG_DEFINE_THIS_MODULE(jsonrpc);
37 \f
38 struct jsonrpc {
39     struct stream *stream;
40     char *name;
41     int status;
42
43     /* Input. */
44     struct byteq input;
45     uint8_t input_buffer[512];
46     struct json_parser *parser;
47
48     /* Output. */
49     struct list output;         /* Contains "struct ofpbuf"s. */
50     size_t backlog;
51 };
52
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55
56 static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
58 static void jsonrpc_error(struct jsonrpc *, int error);
59
60 /* This is just the same as stream_open() except that it uses the default
61  * JSONRPC port if none is specified. */
62 int
63 jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
64 {
65     return stream_open_with_default_port(name, OVSDB_OLD_PORT,
66                                          streamp, dscp);
67 }
68
69 /* This is just the same as pstream_open() except that it uses the default
70  * JSONRPC port if none is specified. */
71 int
72 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
73 {
74     return pstream_open_with_default_port(name, OVSDB_OLD_PORT,
75                                           pstreamp, dscp);
76 }
77
78 /* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
79  * new jsonrpc object takes ownership of 'stream'. */
80 struct jsonrpc *
81 jsonrpc_open(struct stream *stream)
82 {
83     struct jsonrpc *rpc;
84
85     ovs_assert(stream != NULL);
86
87     rpc = xzalloc(sizeof *rpc);
88     rpc->name = xstrdup(stream_get_name(stream));
89     rpc->stream = stream;
90     byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
91     list_init(&rpc->output);
92
93     return rpc;
94 }
95
96 /* Destroys 'rpc', closing the stream on which it is based, and frees its
97  * memory. */
98 void
99 jsonrpc_close(struct jsonrpc *rpc)
100 {
101     if (rpc) {
102         jsonrpc_cleanup(rpc);
103         free(rpc->name);
104         free(rpc);
105     }
106 }
107
108 /* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
109 void
110 jsonrpc_run(struct jsonrpc *rpc)
111 {
112     if (rpc->status) {
113         return;
114     }
115
116     stream_run(rpc->stream);
117     while (!list_is_empty(&rpc->output)) {
118         struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
119         int retval;
120
121         retval = stream_send(rpc->stream, ofpbuf_data(buf), ofpbuf_size(buf));
122         if (retval >= 0) {
123             rpc->backlog -= retval;
124             ofpbuf_pull(buf, retval);
125             if (!ofpbuf_size(buf)) {
126                 list_remove(&buf->list_node);
127                 ofpbuf_delete(buf);
128             }
129         } else {
130             if (retval != -EAGAIN) {
131                 VLOG_WARN_RL(&rl, "%s: send error: %s",
132                              rpc->name, ovs_strerror(-retval));
133                 jsonrpc_error(rpc, -retval);
134             }
135             break;
136         }
137     }
138 }
139
140 /* Arranges for the poll loop to wake up when 'rpc' needs to perform
141  * maintenance activities. */
142 void
143 jsonrpc_wait(struct jsonrpc *rpc)
144 {
145     if (!rpc->status) {
146         stream_run_wait(rpc->stream);
147         if (!list_is_empty(&rpc->output)) {
148             stream_send_wait(rpc->stream);
149         }
150     }
151 }
152
153 /*
154  * Returns the current status of 'rpc'.  The possible return values are:
155  * - 0: no error yet
156  * - >0: errno value
157  * - EOF: end of file (remote end closed connection; not necessarily an error).
158  *
159  * When this functions nonzero, 'rpc' is effectively out of commission.  'rpc'
160  * will not receive any more messages and any further messages that one
161  * attempts to send with 'rpc' will be discarded.  The caller can keep 'rpc'
162  * around as long as it wants, but it's not going to provide any more useful
163  * services.
164  */
165 int
166 jsonrpc_get_status(const struct jsonrpc *rpc)
167 {
168     return rpc->status;
169 }
170
171 /* Returns the number of bytes buffered by 'rpc' to be written to the
172  * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
173  * the remote end closed the connection. */
174 size_t
175 jsonrpc_get_backlog(const struct jsonrpc *rpc)
176 {
177     return rpc->status ? 0 : rpc->backlog;
178 }
179
180 /* Returns the number of bytes that have been received on 'rpc''s underlying
181  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
182 unsigned int
183 jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
184 {
185     return rpc->input.head;
186 }
187
188 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
189  * the stream underlying 'rpc' when 'rpc' was created. */
190 const char *
191 jsonrpc_get_name(const struct jsonrpc *rpc)
192 {
193     return rpc->name;
194 }
195
196 static void
197 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
198                 const struct jsonrpc_msg *msg)
199 {
200     if (VLOG_IS_DBG_ENABLED()) {
201         struct ds s = DS_EMPTY_INITIALIZER;
202         if (msg->method) {
203             ds_put_format(&s, ", method=\"%s\"", msg->method);
204         }
205         if (msg->params) {
206             ds_put_cstr(&s, ", params=");
207             json_to_ds(msg->params, 0, &s);
208         }
209         if (msg->result) {
210             ds_put_cstr(&s, ", result=");
211             json_to_ds(msg->result, 0, &s);
212         }
213         if (msg->error) {
214             ds_put_cstr(&s, ", error=");
215             json_to_ds(msg->error, 0, &s);
216         }
217         if (msg->id) {
218             ds_put_cstr(&s, ", id=");
219             json_to_ds(msg->id, 0, &s);
220         }
221         VLOG_DBG("%s: %s %s%s", rpc->name, title,
222                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
223         ds_destroy(&s);
224     }
225 }
226
227 /* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
228  * jsonrpc_get_status()).
229  *
230  * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
231  * is responsible for ensuring that the amount of buffered data is somehow
232  * limited.  (jsonrpc_get_backlog() returns the amount of data currently
233  * buffered in 'rpc'.)
234  *
235  * Always takes ownership of 'msg', regardless of success. */
236 int
237 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
238 {
239     struct ofpbuf *buf;
240     struct json *json;
241     size_t length;
242     char *s;
243
244     if (rpc->status) {
245         jsonrpc_msg_destroy(msg);
246         return rpc->status;
247     }
248
249     jsonrpc_log_msg(rpc, "send", msg);
250
251     json = jsonrpc_msg_to_json(msg);
252     s = json_to_string(json, 0);
253     length = strlen(s);
254     json_destroy(json);
255
256     buf = xmalloc(sizeof *buf);
257     ofpbuf_use(buf, s, length);
258     ofpbuf_set_size(buf, length);
259     list_push_back(&rpc->output, &buf->list_node);
260     rpc->backlog += length;
261
262     if (rpc->backlog == length) {
263         jsonrpc_run(rpc);
264     }
265     return rpc->status;
266 }
267
268 /* Attempts to receive a message from 'rpc'.
269  *
270  * If successful, stores the received message in '*msgp' and returns 0.  The
271  * caller takes ownership of '*msgp' and must eventually destroy it with
272  * jsonrpc_msg_destroy().
273  *
274  * Otherwise, stores NULL in '*msgp' and returns one of the following:
275  *
276  *   - EAGAIN: No message has been received.
277  *
278  *   - EOF: The remote end closed the connection gracefully.
279  *
280  *   - Otherwise an errno value that represents a JSON-RPC protocol violation
281  *     or another error fatal to the connection.  'rpc' will not send or
282  *     receive any more messages.
283  */
284 int
285 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
286 {
287     int i;
288
289     *msgp = NULL;
290     if (rpc->status) {
291         return rpc->status;
292     }
293
294     for (i = 0; i < 50; i++) {
295         size_t n, used;
296
297         /* Fill our input buffer if it's empty. */
298         if (byteq_is_empty(&rpc->input)) {
299             size_t chunk;
300             int retval;
301
302             chunk = byteq_headroom(&rpc->input);
303             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
304             if (retval < 0) {
305                 if (retval == -EAGAIN) {
306                     return EAGAIN;
307                 } else {
308                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
309                                  rpc->name, ovs_strerror(-retval));
310                     jsonrpc_error(rpc, -retval);
311                     return rpc->status;
312                 }
313             } else if (retval == 0) {
314                 jsonrpc_error(rpc, EOF);
315                 return EOF;
316             }
317             byteq_advance_head(&rpc->input, retval);
318         }
319
320         /* We have some input.  Feed it into the JSON parser. */
321         if (!rpc->parser) {
322             rpc->parser = json_parser_create(0);
323         }
324         n = byteq_tailroom(&rpc->input);
325         used = json_parser_feed(rpc->parser,
326                                 (char *) byteq_tail(&rpc->input), n);
327         byteq_advance_tail(&rpc->input, used);
328
329         /* If we have complete JSON, attempt to parse it as JSON-RPC. */
330         if (json_parser_is_done(rpc->parser)) {
331             *msgp = jsonrpc_parse_received_message(rpc);
332             if (*msgp) {
333                 return 0;
334             }
335
336             if (rpc->status) {
337                 const struct byteq *q = &rpc->input;
338                 if (q->head <= q->size) {
339                     stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
340                                           THIS_MODULE, rpc->name);
341                 }
342                 return rpc->status;
343             }
344         }
345     }
346
347     return EAGAIN;
348 }
349
350 /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
351  * than EAGAIN. */
352 void
353 jsonrpc_recv_wait(struct jsonrpc *rpc)
354 {
355     if (rpc->status || !byteq_is_empty(&rpc->input)) {
356         poll_immediate_wake_at(rpc->name);
357     } else {
358         stream_recv_wait(rpc->stream);
359     }
360 }
361
362 /* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
363  * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
364  * status value (see jsonrpc_get_status()).
365  *
366  * Always takes ownership of 'msg', regardless of success. */
367 int
368 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
369 {
370     int error;
371
372     fatal_signal_run();
373
374     error = jsonrpc_send(rpc, msg);
375     if (error) {
376         return error;
377     }
378
379     for (;;) {
380         jsonrpc_run(rpc);
381         if (list_is_empty(&rpc->output) || rpc->status) {
382             return rpc->status;
383         }
384         jsonrpc_wait(rpc);
385         poll_block();
386     }
387 }
388
389 /* Waits for a message to be received on 'rpc'.  Same semantics as
390  * jsonrpc_recv() except that EAGAIN will never be returned. */
391 int
392 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
393 {
394     for (;;) {
395         int error = jsonrpc_recv(rpc, msgp);
396         if (error != EAGAIN) {
397             fatal_signal_run();
398             return error;
399         }
400
401         jsonrpc_run(rpc);
402         jsonrpc_wait(rpc);
403         jsonrpc_recv_wait(rpc);
404         poll_block();
405     }
406 }
407
408 /* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
409  * successful, in which case '*replyp' is set to the reply, which the caller
410  * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
411  * value (see jsonrpc_get_status()).
412  *
413  * Discards any message received on 'rpc' that is not a reply to 'request'
414  * (based on message id).
415  *
416  * Always takes ownership of 'request', regardless of success. */
417 int
418 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
419                        struct jsonrpc_msg **replyp)
420 {
421     struct jsonrpc_msg *reply = NULL;
422     struct json *id;
423     int error;
424
425     id = json_clone(request->id);
426     error = jsonrpc_send_block(rpc, request);
427     if (!error) {
428         for (;;) {
429             error = jsonrpc_recv_block(rpc, &reply);
430             if (error) {
431                 break;
432             }
433             if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
434                 && json_equal(id, reply->id)) {
435                 break;
436             }
437             jsonrpc_msg_destroy(reply);
438         }
439     }
440     *replyp = error ? NULL : reply;
441     json_destroy(id);
442     return error;
443 }
444
445 /* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
446  * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
447  * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
448 static struct jsonrpc_msg *
449 jsonrpc_parse_received_message(struct jsonrpc *rpc)
450 {
451     struct jsonrpc_msg *msg;
452     struct json *json;
453     char *error;
454
455     json = json_parser_finish(rpc->parser);
456     rpc->parser = NULL;
457     if (json->type == JSON_STRING) {
458         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
459                      rpc->name, json_string(json));
460         jsonrpc_error(rpc, EPROTO);
461         json_destroy(json);
462         return NULL;
463     }
464
465     error = jsonrpc_msg_from_json(json, &msg);
466     if (error) {
467         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
468                      rpc->name, error);
469         free(error);
470         jsonrpc_error(rpc, EPROTO);
471         return NULL;
472     }
473
474     jsonrpc_log_msg(rpc, "received", msg);
475     return msg;
476 }
477
478 static void
479 jsonrpc_error(struct jsonrpc *rpc, int error)
480 {
481     ovs_assert(error);
482     if (!rpc->status) {
483         rpc->status = error;
484         jsonrpc_cleanup(rpc);
485     }
486 }
487
488 static void
489 jsonrpc_cleanup(struct jsonrpc *rpc)
490 {
491     stream_close(rpc->stream);
492     rpc->stream = NULL;
493
494     json_parser_abort(rpc->parser);
495     rpc->parser = NULL;
496
497     ofpbuf_list_delete(&rpc->output);
498     rpc->backlog = 0;
499 }
500 \f
501 static struct jsonrpc_msg *
502 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
503                 struct json *params, struct json *result, struct json *error,
504                 struct json *id)
505 {
506     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
507     msg->type = type;
508     msg->method = method ? xstrdup(method) : NULL;
509     msg->params = params;
510     msg->result = result;
511     msg->error = error;
512     msg->id = id;
513     return msg;
514 }
515
516 static struct json *
517 jsonrpc_create_id(void)
518 {
519     static atomic_uint next_id = ATOMIC_VAR_INIT(0);
520     unsigned int id;
521
522     atomic_add(&next_id, 1, &id);
523     return json_integer_create(id);
524 }
525
526 struct jsonrpc_msg *
527 jsonrpc_create_request(const char *method, struct json *params,
528                        struct json **idp)
529 {
530     struct json *id = jsonrpc_create_id();
531     if (idp) {
532         *idp = json_clone(id);
533     }
534     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
535 }
536
537 struct jsonrpc_msg *
538 jsonrpc_create_notify(const char *method, struct json *params)
539 {
540     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
541 }
542
543 struct jsonrpc_msg *
544 jsonrpc_create_reply(struct json *result, const struct json *id)
545 {
546     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
547                            json_clone(id));
548 }
549
550 struct jsonrpc_msg *
551 jsonrpc_create_error(struct json *error, const struct json *id)
552 {
553     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
554                            json_clone(id));
555 }
556
557 const char *
558 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
559 {
560     switch (type) {
561     case JSONRPC_REQUEST:
562         return "request";
563
564     case JSONRPC_NOTIFY:
565         return "notification";
566
567     case JSONRPC_REPLY:
568         return "reply";
569
570     case JSONRPC_ERROR:
571         return "error";
572     }
573     return "(null)";
574 }
575
576 char *
577 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
578 {
579     const char *type_name;
580     unsigned int pattern;
581
582     if (m->params && m->params->type != JSON_ARRAY) {
583         return xstrdup("\"params\" must be JSON array");
584     }
585
586     switch (m->type) {
587     case JSONRPC_REQUEST:
588         pattern = 0x11001;
589         break;
590
591     case JSONRPC_NOTIFY:
592         pattern = 0x11000;
593         break;
594
595     case JSONRPC_REPLY:
596         pattern = 0x00101;
597         break;
598
599     case JSONRPC_ERROR:
600         pattern = 0x00011;
601         break;
602
603     default:
604         return xasprintf("invalid JSON-RPC message type %d", m->type);
605     }
606
607     type_name = jsonrpc_msg_type_to_string(m->type);
608     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
609         return xasprintf("%s must%s have \"method\"",
610                          type_name, (pattern & 0x10000) ? "" : " not");
611
612     }
613     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
614         return xasprintf("%s must%s have \"params\"",
615                          type_name, (pattern & 0x1000) ? "" : " not");
616
617     }
618     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
619         return xasprintf("%s must%s have \"result\"",
620                          type_name, (pattern & 0x100) ? "" : " not");
621
622     }
623     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
624         return xasprintf("%s must%s have \"error\"",
625                          type_name, (pattern & 0x10) ? "" : " not");
626
627     }
628     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
629         return xasprintf("%s must%s have \"id\"",
630                          type_name, (pattern & 0x1) ? "" : " not");
631
632     }
633     return NULL;
634 }
635
636 void
637 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
638 {
639     if (m) {
640         free(m->method);
641         json_destroy(m->params);
642         json_destroy(m->result);
643         json_destroy(m->error);
644         json_destroy(m->id);
645         free(m);
646     }
647 }
648
649 static struct json *
650 null_from_json_null(struct json *json)
651 {
652     if (json && json->type == JSON_NULL) {
653         json_destroy(json);
654         return NULL;
655     }
656     return json;
657 }
658
659 char *
660 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
661 {
662     struct json *method = NULL;
663     struct jsonrpc_msg *msg = NULL;
664     struct shash *object;
665     char *error;
666
667     if (json->type != JSON_OBJECT) {
668         error = xstrdup("message is not a JSON object");
669         goto exit;
670     }
671     object = json_object(json);
672
673     method = shash_find_and_delete(object, "method");
674     if (method && method->type != JSON_STRING) {
675         error = xstrdup("method is not a JSON string");
676         goto exit;
677     }
678
679     msg = xzalloc(sizeof *msg);
680     msg->method = method ? xstrdup(method->u.string) : NULL;
681     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
682     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
683     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
684     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
685     msg->type = (msg->result ? JSONRPC_REPLY
686                  : msg->error ? JSONRPC_ERROR
687                  : msg->id ? JSONRPC_REQUEST
688                  : JSONRPC_NOTIFY);
689     if (!shash_is_empty(object)) {
690         error = xasprintf("message has unexpected member \"%s\"",
691                           shash_first(object)->name);
692         goto exit;
693     }
694     error = jsonrpc_msg_is_valid(msg);
695     if (error) {
696         goto exit;
697     }
698
699 exit:
700     json_destroy(method);
701     json_destroy(json);
702     if (error) {
703         jsonrpc_msg_destroy(msg);
704         msg = NULL;
705     }
706     *msgp = msg;
707     return error;
708 }
709
710 struct json *
711 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
712 {
713     struct json *json = json_object_create();
714
715     if (m->method) {
716         json_object_put(json, "method", json_string_create_nocopy(m->method));
717     }
718
719     if (m->params) {
720         json_object_put(json, "params", m->params);
721     }
722
723     if (m->result) {
724         json_object_put(json, "result", m->result);
725     } else if (m->type == JSONRPC_ERROR) {
726         json_object_put(json, "result", json_null_create());
727     }
728
729     if (m->error) {
730         json_object_put(json, "error", m->error);
731     } else if (m->type == JSONRPC_REPLY) {
732         json_object_put(json, "error", json_null_create());
733     }
734
735     if (m->id) {
736         json_object_put(json, "id", m->id);
737     } else if (m->type == JSONRPC_NOTIFY) {
738         json_object_put(json, "id", json_null_create());
739     }
740
741     free(m);
742
743     return json;
744 }
745 \f
746 /* A JSON-RPC session with reconnection. */
747
748 struct jsonrpc_session {
749     struct reconnect *reconnect;
750     struct jsonrpc *rpc;
751     struct stream *stream;
752     struct pstream *pstream;
753     int last_error;
754     unsigned int seqno;
755     uint8_t dscp;
756 };
757
758 /* Creates and returns a jsonrpc_session to 'name', which should be a string
759  * acceptable to stream_open() or pstream_open().
760  *
761  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
762  * jsonrpc_session connects to 'name'.  If 'retry' is true, then the new
763  * session connects and reconnects to 'name', with backoff.  If 'retry' is
764  * false, the new session will only try to connect once and after a connection
765  * failure or a disconnection jsonrpc_session_is_alive() will return false for
766  * the new session.
767  *
768  * If 'name' is a passive connection method, e.g. "ptcp:", the new
769  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
770  * connection at any given time.  Any new connection causes the previous one
771  * (if any) to be dropped. */
772 struct jsonrpc_session *
773 jsonrpc_session_open(const char *name, bool retry)
774 {
775     struct jsonrpc_session *s;
776
777     s = xmalloc(sizeof *s);
778     s->reconnect = reconnect_create(time_msec());
779     reconnect_set_name(s->reconnect, name);
780     reconnect_enable(s->reconnect, time_msec());
781     s->rpc = NULL;
782     s->stream = NULL;
783     s->pstream = NULL;
784     s->seqno = 0;
785     s->dscp = 0;
786     s->last_error = 0;
787
788     if (!pstream_verify_name(name)) {
789         reconnect_set_passive(s->reconnect, true, time_msec());
790     } else if (!retry) {
791         reconnect_set_max_tries(s->reconnect, 1);
792         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
793     }
794
795     if (!stream_or_pstream_needs_probes(name)) {
796         reconnect_set_probe_interval(s->reconnect, 0);
797     }
798
799     return s;
800 }
801
802 /* Creates and returns a jsonrpc_session that is initially connected to
803  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
804  *
805  * On the assumption that such connections are likely to be short-lived
806  * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
807 struct jsonrpc_session *
808 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
809 {
810     struct jsonrpc_session *s;
811
812     s = xmalloc(sizeof *s);
813     s->reconnect = reconnect_create(time_msec());
814     reconnect_set_quiet(s->reconnect, true);
815     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
816     reconnect_set_max_tries(s->reconnect, 0);
817     reconnect_connected(s->reconnect, time_msec());
818     s->dscp = dscp;
819     s->rpc = jsonrpc;
820     s->stream = NULL;
821     s->pstream = NULL;
822     s->seqno = 0;
823
824     return s;
825 }
826
827 void
828 jsonrpc_session_close(struct jsonrpc_session *s)
829 {
830     if (s) {
831         jsonrpc_close(s->rpc);
832         reconnect_destroy(s->reconnect);
833         stream_close(s->stream);
834         pstream_close(s->pstream);
835         free(s);
836     }
837 }
838
839 static void
840 jsonrpc_session_disconnect(struct jsonrpc_session *s)
841 {
842     if (s->rpc) {
843         jsonrpc_error(s->rpc, EOF);
844         jsonrpc_close(s->rpc);
845         s->rpc = NULL;
846         s->seqno++;
847     } else if (s->stream) {
848         stream_close(s->stream);
849         s->stream = NULL;
850         s->seqno++;
851     }
852 }
853
854 static void
855 jsonrpc_session_connect(struct jsonrpc_session *s)
856 {
857     const char *name = reconnect_get_name(s->reconnect);
858     int error;
859
860     jsonrpc_session_disconnect(s);
861     if (!reconnect_is_passive(s->reconnect)) {
862         error = jsonrpc_stream_open(name, &s->stream, s->dscp);
863         if (!error) {
864             reconnect_connecting(s->reconnect, time_msec());
865         } else {
866             s->last_error = error;
867         }
868     } else {
869         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
870                                                       s->dscp);
871         if (!error) {
872             reconnect_listening(s->reconnect, time_msec());
873         }
874     }
875
876     if (error) {
877         reconnect_connect_failed(s->reconnect, time_msec(), error);
878     }
879     s->seqno++;
880 }
881
882 void
883 jsonrpc_session_run(struct jsonrpc_session *s)
884 {
885     if (s->pstream) {
886         struct stream *stream;
887         int error;
888
889         error = pstream_accept(s->pstream, &stream);
890         if (!error) {
891             if (s->rpc || s->stream) {
892                 VLOG_INFO_RL(&rl,
893                              "%s: new connection replacing active connection",
894                              reconnect_get_name(s->reconnect));
895                 jsonrpc_session_disconnect(s);
896             }
897             reconnect_connected(s->reconnect, time_msec());
898             s->rpc = jsonrpc_open(stream);
899         } else if (error != EAGAIN) {
900             reconnect_listen_error(s->reconnect, time_msec(), error);
901             pstream_close(s->pstream);
902             s->pstream = NULL;
903         }
904     }
905
906     if (s->rpc) {
907         size_t backlog;
908         int error;
909
910         backlog = jsonrpc_get_backlog(s->rpc);
911         jsonrpc_run(s->rpc);
912         if (jsonrpc_get_backlog(s->rpc) < backlog) {
913             /* Data previously caught in a queue was successfully sent (or
914              * there's an error, which we'll catch below.)
915              *
916              * We don't count data that is successfully sent immediately as
917              * activity, because there's a lot of queuing downstream from us,
918              * which means that we can push a lot of data into a connection
919              * that has stalled and won't ever recover.
920              */
921             reconnect_activity(s->reconnect, time_msec());
922         }
923
924         error = jsonrpc_get_status(s->rpc);
925         if (error) {
926             reconnect_disconnected(s->reconnect, time_msec(), error);
927             jsonrpc_session_disconnect(s);
928             s->last_error = error;
929         }
930     } else if (s->stream) {
931         int error;
932
933         stream_run(s->stream);
934         error = stream_connect(s->stream);
935         if (!error) {
936             reconnect_connected(s->reconnect, time_msec());
937             s->rpc = jsonrpc_open(s->stream);
938             s->stream = NULL;
939         } else if (error != EAGAIN) {
940             reconnect_connect_failed(s->reconnect, time_msec(), error);
941             stream_close(s->stream);
942             s->stream = NULL;
943         }
944     }
945
946     switch (reconnect_run(s->reconnect, time_msec())) {
947     case RECONNECT_CONNECT:
948         jsonrpc_session_connect(s);
949         break;
950
951     case RECONNECT_DISCONNECT:
952         reconnect_disconnected(s->reconnect, time_msec(), 0);
953         jsonrpc_session_disconnect(s);
954         break;
955
956     case RECONNECT_PROBE:
957         if (s->rpc) {
958             struct json *params;
959             struct jsonrpc_msg *request;
960
961             params = json_array_create_empty();
962             request = jsonrpc_create_request("echo", params, NULL);
963             json_destroy(request->id);
964             request->id = json_string_create("echo");
965             jsonrpc_send(s->rpc, request);
966         }
967         break;
968     }
969 }
970
971 void
972 jsonrpc_session_wait(struct jsonrpc_session *s)
973 {
974     if (s->rpc) {
975         jsonrpc_wait(s->rpc);
976     } else if (s->stream) {
977         stream_run_wait(s->stream);
978         stream_connect_wait(s->stream);
979     }
980     if (s->pstream) {
981         pstream_wait(s->pstream);
982     }
983     reconnect_wait(s->reconnect, time_msec());
984 }
985
986 size_t
987 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
988 {
989     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
990 }
991
992 /* Always returns a pointer to a valid C string, assuming 's' was initialized
993  * correctly. */
994 const char *
995 jsonrpc_session_get_name(const struct jsonrpc_session *s)
996 {
997     return reconnect_get_name(s->reconnect);
998 }
999
1000 /* Always takes ownership of 'msg', regardless of success. */
1001 int
1002 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
1003 {
1004     if (s->rpc) {
1005         return jsonrpc_send(s->rpc, msg);
1006     } else {
1007         jsonrpc_msg_destroy(msg);
1008         return ENOTCONN;
1009     }
1010 }
1011
1012 struct jsonrpc_msg *
1013 jsonrpc_session_recv(struct jsonrpc_session *s)
1014 {
1015     if (s->rpc) {
1016         unsigned int received_bytes;
1017         struct jsonrpc_msg *msg;
1018
1019         received_bytes = jsonrpc_get_received_bytes(s->rpc);
1020         jsonrpc_recv(s->rpc, &msg);
1021         if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
1022             /* Data was successfully received.
1023              *
1024              * Previously we only counted receiving a full message as activity,
1025              * but with large messages or a slow connection that policy could
1026              * time out the session mid-message. */
1027             reconnect_activity(s->reconnect, time_msec());
1028         }
1029
1030         if (msg) {
1031             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
1032                 /* Echo request.  Send reply. */
1033                 struct jsonrpc_msg *reply;
1034
1035                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
1036                 jsonrpc_session_send(s, reply);
1037             } else if (msg->type == JSONRPC_REPLY
1038                        && msg->id && msg->id->type == JSON_STRING
1039                        && !strcmp(msg->id->u.string, "echo")) {
1040                 /* It's a reply to our echo request.  Suppress it. */
1041             } else {
1042                 return msg;
1043             }
1044             jsonrpc_msg_destroy(msg);
1045         }
1046     }
1047     return NULL;
1048 }
1049
1050 void
1051 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
1052 {
1053     if (s->rpc) {
1054         jsonrpc_recv_wait(s->rpc);
1055     }
1056 }
1057
1058 bool
1059 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
1060 {
1061     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
1062 }
1063
1064 bool
1065 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
1066 {
1067     return s->rpc != NULL;
1068 }
1069
1070 unsigned int
1071 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
1072 {
1073     return s->seqno;
1074 }
1075
1076 int
1077 jsonrpc_session_get_status(const struct jsonrpc_session *s)
1078 {
1079     return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
1080 }
1081
1082 int
1083 jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
1084 {
1085     return s->last_error;
1086 }
1087
1088 void
1089 jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
1090                                     struct reconnect_stats *stats)
1091 {
1092     reconnect_get_stats(s->reconnect, time_msec(), stats);
1093 }
1094
1095 void
1096 jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
1097 {
1098     reconnect_set_max_tries(s->reconnect, UINT_MAX);
1099     reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF,
1100                           RECONNECT_DEFAULT_MAX_BACKOFF);
1101 }
1102
1103 void
1104 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
1105 {
1106     reconnect_force_reconnect(s->reconnect, time_msec());
1107 }
1108
1109 void
1110 jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
1111 {
1112     reconnect_set_backoff(s->reconnect, 0, max_backoff);
1113 }
1114
1115 void
1116 jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
1117                                    int probe_interval)
1118 {
1119     reconnect_set_probe_interval(s->reconnect, probe_interval);
1120 }
1121
1122 void
1123 jsonrpc_session_set_dscp(struct jsonrpc_session *s,
1124                          uint8_t dscp)
1125 {
1126     if (s->dscp != dscp) {
1127         if (s->pstream) {
1128             int error;
1129
1130             error = pstream_set_dscp(s->pstream, dscp);
1131             if (error) {
1132                 VLOG_ERR("%s: failed set_dscp %s",
1133                          reconnect_get_name(s->reconnect),
1134                          ovs_strerror(error));
1135             }
1136             /*
1137              * XXX race window between setting dscp to listening socket
1138              * and accepting socket. accepted socket may have old dscp value.
1139              * Ignore this race window for now.
1140              */
1141         }
1142         s->dscp = dscp;
1143         jsonrpc_session_force_reconnect(s);
1144     }
1145 }