ovsdb: Use port 6632 as a default port for database connections.
[sliver-openvswitch.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <assert.h>
22 #include <errno.h>
23
24 #include "byteq.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "list.h"
28 #include "ofpbuf.h"
29 #include "poll-loop.h"
30 #include "queue.h"
31 #include "reconnect.h"
32 #include "stream.h"
33 #include "timeval.h"
34
35 #define THIS_MODULE VLM_jsonrpc
36 #include "vlog.h"
37 \f
38 struct jsonrpc {
39     struct stream *stream;
40     char *name;
41     int status;
42
43     /* Input. */
44     struct byteq input;
45     struct json_parser *parser;
46     struct jsonrpc_msg *received;
47
48     /* Output. */
49     struct ovs_queue output;
50     size_t backlog;
51 };
52
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55
56 static void jsonrpc_received(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
58
59 /* This is just the same as stream_open() except that it uses the default
60  * JSONRPC ports if none is specified. */
61 int
62 jsonrpc_stream_open(const char *name, struct stream **streamp)
63 {
64     return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
65                                           JSONRPC_SSL_PORT, streamp);
66 }
67
68 /* This is just the same as pstream_open() except that it uses the default
69  * JSONRPC ports if none is specified. */
70 int
71 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp)
72 {
73     return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
74                                            JSONRPC_SSL_PORT, pstreamp);
75 }
76
77 struct jsonrpc *
78 jsonrpc_open(struct stream *stream)
79 {
80     struct jsonrpc *rpc;
81
82     assert(stream != NULL);
83
84     rpc = xzalloc(sizeof *rpc);
85     rpc->name = xstrdup(stream_get_name(stream));
86     rpc->stream = stream;
87     byteq_init(&rpc->input);
88     queue_init(&rpc->output);
89
90     return rpc;
91 }
92
93 void
94 jsonrpc_close(struct jsonrpc *rpc)
95 {
96     if (rpc) {
97         jsonrpc_cleanup(rpc);
98         free(rpc->name);
99         free(rpc);
100     }
101 }
102
103 void
104 jsonrpc_run(struct jsonrpc *rpc)
105 {
106     if (rpc->status) {
107         return;
108     }
109
110     stream_run(rpc->stream);
111     while (!queue_is_empty(&rpc->output)) {
112         struct ofpbuf *buf = rpc->output.head;
113         int retval;
114
115         retval = stream_send(rpc->stream, buf->data, buf->size);
116         if (retval >= 0) {
117             rpc->backlog -= retval;
118             ofpbuf_pull(buf, retval);
119             if (!buf->size) {
120                 ofpbuf_delete(queue_pop_head(&rpc->output));
121             }
122         } else {
123             if (retval != -EAGAIN) {
124                 VLOG_WARN_RL(&rl, "%s: send error: %s",
125                              rpc->name, strerror(-retval));
126                 jsonrpc_error(rpc, -retval);
127             }
128             break;
129         }
130     }
131 }
132
133 void
134 jsonrpc_wait(struct jsonrpc *rpc)
135 {
136     if (!rpc->status) {
137         stream_run_wait(rpc->stream);
138         if (!queue_is_empty(&rpc->output)) {
139             stream_send_wait(rpc->stream);
140         }
141     }
142 }
143
144 int
145 jsonrpc_get_status(const struct jsonrpc *rpc)
146 {
147     return rpc->status;
148 }
149
150 size_t
151 jsonrpc_get_backlog(const struct jsonrpc *rpc)
152 {
153     return rpc->status ? 0 : rpc->backlog;
154 }
155
156 const char *
157 jsonrpc_get_name(const struct jsonrpc *rpc)
158 {
159     return rpc->name;
160 }
161
162 static void
163 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
164                 const struct jsonrpc_msg *msg)
165 {
166     if (VLOG_IS_DBG_ENABLED()) {
167         struct ds s = DS_EMPTY_INITIALIZER;
168         if (msg->method) {
169             ds_put_format(&s, ", method=\"%s\"", msg->method);
170         }
171         if (msg->params) {
172             ds_put_cstr(&s, ", params=");
173             json_to_ds(msg->params, 0, &s);
174         }
175         if (msg->result) {
176             ds_put_cstr(&s, ", result=");
177             json_to_ds(msg->result, 0, &s);
178         }
179         if (msg->error) {
180             ds_put_cstr(&s, ", error=");
181             json_to_ds(msg->error, 0, &s);
182         }
183         if (msg->id) {
184             ds_put_cstr(&s, ", id=");
185             json_to_ds(msg->id, 0, &s);
186         }
187         VLOG_DBG("%s: %s %s%s", rpc->name, title,
188                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
189         ds_destroy(&s);
190     }
191 }
192
193 /* Always takes ownership of 'msg', regardless of success. */
194 int
195 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
196 {
197     struct ofpbuf *buf;
198     struct json *json;
199     size_t length;
200     char *s;
201
202     if (rpc->status) {
203         jsonrpc_msg_destroy(msg);
204         return rpc->status;
205     }
206
207     jsonrpc_log_msg(rpc, "send", msg);
208
209     json = jsonrpc_msg_to_json(msg);
210     s = json_to_string(json, 0);
211     length = strlen(s);
212     json_destroy(json);
213
214     buf = xmalloc(sizeof *buf);
215     ofpbuf_use(buf, s, length);
216     buf->size = length;
217     queue_push_tail(&rpc->output, buf);
218     rpc->backlog += length;
219
220     if (rpc->output.n == 1) {
221         jsonrpc_run(rpc);
222     }
223     return rpc->status;
224 }
225
226 int
227 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
228 {
229     *msgp = NULL;
230     if (rpc->status) {
231         return rpc->status;
232     }
233
234     while (!rpc->received) {
235         if (byteq_is_empty(&rpc->input)) {
236             size_t chunk;
237             int retval;
238
239             chunk = byteq_headroom(&rpc->input);
240             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
241             if (retval < 0) {
242                 if (retval == -EAGAIN) {
243                     return EAGAIN;
244                 } else {
245                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
246                                  rpc->name, strerror(-retval));
247                     jsonrpc_error(rpc, -retval);
248                     return rpc->status;
249                 }
250             } else if (retval == 0) {
251                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
252                 jsonrpc_error(rpc, EOF);
253                 return EOF;
254             }
255             byteq_advance_head(&rpc->input, retval);
256         } else {
257             size_t n, used;
258
259             if (!rpc->parser) {
260                 rpc->parser = json_parser_create(0);
261             }
262             n = byteq_tailroom(&rpc->input);
263             used = json_parser_feed(rpc->parser,
264                                     (char *) byteq_tail(&rpc->input), n);
265             byteq_advance_tail(&rpc->input, used);
266             if (json_parser_is_done(rpc->parser)) {
267                 jsonrpc_received(rpc);
268                 if (rpc->status) {
269                     return rpc->status;
270                 }
271             }
272         }
273     }
274
275     *msgp = rpc->received;
276     rpc->received = NULL;
277     return 0;
278 }
279
280 void
281 jsonrpc_recv_wait(struct jsonrpc *rpc)
282 {
283     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
284         poll_immediate_wake();
285     } else {
286         stream_recv_wait(rpc->stream);
287     }
288 }
289
290 /* Always takes ownership of 'msg', regardless of success. */
291 int
292 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
293 {
294     int error;
295
296     error = jsonrpc_send(rpc, msg);
297     if (error) {
298         return error;
299     }
300
301     for (;;) {
302         jsonrpc_run(rpc);
303         if (queue_is_empty(&rpc->output) || rpc->status) {
304             return rpc->status;
305         }
306         jsonrpc_wait(rpc);
307         poll_block();
308     }
309 }
310
311 int
312 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
313 {
314     for (;;) {
315         int error = jsonrpc_recv(rpc, msgp);
316         if (error != EAGAIN) {
317             return error;
318         }
319
320         jsonrpc_run(rpc);
321         jsonrpc_wait(rpc);
322         jsonrpc_recv_wait(rpc);
323         poll_block();
324     }
325 }
326
327 /* Always takes ownership of 'request', regardless of success. */
328 int
329 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
330                        struct jsonrpc_msg **replyp)
331 {
332     struct jsonrpc_msg *reply = NULL;
333     struct json *id;
334     int error;
335
336     id = json_clone(request->id);
337     error = jsonrpc_send_block(rpc, request);
338     if (!error) {
339         for (;;) {
340             error = jsonrpc_recv_block(rpc, &reply);
341             if (error
342                 || (reply->type == JSONRPC_REPLY
343                     && json_equal(id, reply->id))) {
344                 break;
345             }
346             jsonrpc_msg_destroy(reply);
347         }
348     }
349     *replyp = error ? NULL : reply;
350     json_destroy(id);
351     return error;
352 }
353
354 static void
355 jsonrpc_received(struct jsonrpc *rpc)
356 {
357     struct jsonrpc_msg *msg;
358     struct json *json;
359     char *error;
360
361     json = json_parser_finish(rpc->parser);
362     rpc->parser = NULL;
363     if (json->type == JSON_STRING) {
364         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
365                      rpc->name, json_string(json));
366         jsonrpc_error(rpc, EPROTO);
367         json_destroy(json);
368         return;
369     }
370
371     error = jsonrpc_msg_from_json(json, &msg);
372     if (error) {
373         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
374                      rpc->name, error);
375         free(error);
376         jsonrpc_error(rpc, EPROTO);
377         return;
378     }
379
380     jsonrpc_log_msg(rpc, "received", msg);
381     rpc->received = msg;
382 }
383
384 void
385 jsonrpc_error(struct jsonrpc *rpc, int error)
386 {
387     assert(error);
388     if (!rpc->status) {
389         rpc->status = error;
390         jsonrpc_cleanup(rpc);
391     }
392 }
393
394 static void
395 jsonrpc_cleanup(struct jsonrpc *rpc)
396 {
397     stream_close(rpc->stream);
398     rpc->stream = NULL;
399
400     json_parser_abort(rpc->parser);
401     rpc->parser = NULL;
402
403     jsonrpc_msg_destroy(rpc->received);
404     rpc->received = NULL;
405
406     queue_clear(&rpc->output);
407     rpc->backlog = 0;
408 }
409 \f
410 static struct jsonrpc_msg *
411 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
412                 struct json *params, struct json *result, struct json *error,
413                 struct json *id)
414 {
415     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
416     msg->type = type;
417     msg->method = method ? xstrdup(method) : NULL;
418     msg->params = params;
419     msg->result = result;
420     msg->error = error;
421     msg->id = id;
422     return msg;
423 }
424
425 static struct json *
426 jsonrpc_create_id(void)
427 {
428     static unsigned int id;
429     return json_integer_create(id++);
430 }
431
432 struct jsonrpc_msg *
433 jsonrpc_create_request(const char *method, struct json *params,
434                        struct json **idp)
435 {
436     struct json *id = jsonrpc_create_id();
437     if (idp) {
438         *idp = json_clone(id);
439     }
440     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
441 }
442
443 struct jsonrpc_msg *
444 jsonrpc_create_notify(const char *method, struct json *params)
445 {
446     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
447 }
448
449 struct jsonrpc_msg *
450 jsonrpc_create_reply(struct json *result, const struct json *id)
451 {
452     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
453                            json_clone(id));
454 }
455
456 struct jsonrpc_msg *
457 jsonrpc_create_error(struct json *error, const struct json *id)
458 {
459     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
460                            json_clone(id));
461 }
462
463 const char *
464 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
465 {
466     switch (type) {
467     case JSONRPC_REQUEST:
468         return "request";
469
470     case JSONRPC_NOTIFY:
471         return "notification";
472
473     case JSONRPC_REPLY:
474         return "reply";
475
476     case JSONRPC_ERROR:
477         return "error";
478     }
479     return "(null)";
480 }
481
482 char *
483 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
484 {
485     const char *type_name;
486     unsigned int pattern;
487
488     if (m->params && m->params->type != JSON_ARRAY) {
489         return xstrdup("\"params\" must be JSON array");
490     }
491
492     switch (m->type) {
493     case JSONRPC_REQUEST:
494         pattern = 0x11001;
495         break;
496
497     case JSONRPC_NOTIFY:
498         pattern = 0x11000;
499         break;
500
501     case JSONRPC_REPLY:
502         pattern = 0x00101;
503         break;
504
505     case JSONRPC_ERROR:
506         pattern = 0x00011;
507         break;
508
509     default:
510         return xasprintf("invalid JSON-RPC message type %d", m->type);
511     }
512
513     type_name = jsonrpc_msg_type_to_string(m->type);
514     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
515         return xasprintf("%s must%s have \"method\"",
516                          type_name, (pattern & 0x10000) ? "" : " not");
517
518     }
519     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
520         return xasprintf("%s must%s have \"params\"",
521                          type_name, (pattern & 0x1000) ? "" : " not");
522
523     }
524     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
525         return xasprintf("%s must%s have \"result\"",
526                          type_name, (pattern & 0x100) ? "" : " not");
527
528     }
529     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
530         return xasprintf("%s must%s have \"error\"",
531                          type_name, (pattern & 0x10) ? "" : " not");
532
533     }
534     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
535         return xasprintf("%s must%s have \"id\"",
536                          type_name, (pattern & 0x1) ? "" : " not");
537
538     }
539     return NULL;
540 }
541
542 void
543 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
544 {
545     if (m) {
546         free(m->method);
547         json_destroy(m->params);
548         json_destroy(m->result);
549         json_destroy(m->error);
550         json_destroy(m->id);
551         free(m);
552     }
553 }
554
555 static struct json *
556 null_from_json_null(struct json *json)
557 {
558     if (json && json->type == JSON_NULL) {
559         json_destroy(json);
560         return NULL;
561     }
562     return json;
563 }
564
565 char *
566 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
567 {
568     struct json *method = NULL;
569     struct jsonrpc_msg *msg = NULL;
570     struct shash *object;
571     char *error;
572
573     if (json->type != JSON_OBJECT) {
574         error = xstrdup("message is not a JSON object");
575         goto exit;
576     }
577     object = json_object(json);
578
579     method = shash_find_and_delete(object, "method");
580     if (method && method->type != JSON_STRING) {
581         error = xstrdup("method is not a JSON string");
582         goto exit;
583     }
584
585     msg = xzalloc(sizeof *msg);
586     msg->method = method ? xstrdup(method->u.string) : NULL;
587     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
588     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
589     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
590     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
591     msg->type = (msg->result ? JSONRPC_REPLY
592                  : msg->error ? JSONRPC_ERROR
593                  : msg->id ? JSONRPC_REQUEST
594                  : JSONRPC_NOTIFY);
595     if (!shash_is_empty(object)) {
596         error = xasprintf("message has unexpected member \"%s\"",
597                           shash_first(object)->name);
598         goto exit;
599     }
600     error = jsonrpc_msg_is_valid(msg);
601     if (error) {
602         goto exit;
603     }
604
605 exit:
606     json_destroy(method);
607     json_destroy(json);
608     if (error) {
609         jsonrpc_msg_destroy(msg);
610         msg = NULL;
611     }
612     *msgp = msg;
613     return error;
614 }
615
616 struct json *
617 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
618 {
619     struct json *json = json_object_create();
620
621     if (m->method) {
622         json_object_put(json, "method", json_string_create_nocopy(m->method));
623     }
624
625     if (m->params) {
626         json_object_put(json, "params", m->params);
627     }
628
629     if (m->result) {
630         json_object_put(json, "result", m->result);
631     } else if (m->type == JSONRPC_ERROR) {
632         json_object_put(json, "result", json_null_create());
633     }
634
635     if (m->error) {
636         json_object_put(json, "error", m->error);
637     } else if (m->type == JSONRPC_REPLY) {
638         json_object_put(json, "error", json_null_create());
639     }
640
641     if (m->id) {
642         json_object_put(json, "id", m->id);
643     } else if (m->type == JSONRPC_NOTIFY) {
644         json_object_put(json, "id", json_null_create());
645     }
646
647     free(m);
648
649     return json;
650 }
651 \f
652 /* A JSON-RPC session with reconnection. */
653
654 struct jsonrpc_session {
655     struct reconnect *reconnect;
656     struct jsonrpc *rpc;
657     struct stream *stream;
658     struct pstream *pstream;
659     unsigned int seqno;
660 };
661
662 /* Creates and returns a jsonrpc_session to 'name', which should be a string
663  * acceptable to stream_open() or pstream_open().
664  *
665  * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
666  * jsonrpc_session connects and reconnects, with back-off, to 'name'.
667  *
668  * If 'name' is a passive connection method, e.g. "ptcp:", the new
669  * jsonrpc_session listens for connections to 'name'.  It maintains at most one
670  * connection at any given time.  Any new connection causes the previous one
671  * (if any) to be dropped. */
672 struct jsonrpc_session *
673 jsonrpc_session_open(const char *name)
674 {
675     struct jsonrpc_session *s;
676
677     s = xmalloc(sizeof *s);
678     s->reconnect = reconnect_create(time_msec());
679     reconnect_set_name(s->reconnect, name);
680     reconnect_enable(s->reconnect, time_msec());
681     s->rpc = NULL;
682     s->stream = NULL;
683     s->pstream = NULL;
684     s->seqno = 0;
685
686     if (!pstream_verify_name(name)) {
687         reconnect_set_passive(s->reconnect, true, time_msec());
688     }
689
690     return s;
691 }
692
693 /* Creates and returns a jsonrpc_session that is initially connected to
694  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected. */
695 struct jsonrpc_session *
696 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
697 {
698     struct jsonrpc_session *s;
699
700     s = xmalloc(sizeof *s);
701     s->reconnect = reconnect_create(time_msec());
702     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
703     reconnect_set_max_tries(s->reconnect, 0);
704     reconnect_connected(s->reconnect, time_msec());
705     s->rpc = jsonrpc;
706     s->stream = NULL;
707     s->pstream = NULL;
708     s->seqno = 0;
709
710     return s;
711 }
712
713 void
714 jsonrpc_session_close(struct jsonrpc_session *s)
715 {
716     if (s) {
717         jsonrpc_close(s->rpc);
718         reconnect_destroy(s->reconnect);
719         stream_close(s->stream);
720         pstream_close(s->pstream);
721         free(s);
722     }
723 }
724
725 static void
726 jsonrpc_session_disconnect(struct jsonrpc_session *s)
727 {
728     if (s->rpc) {
729         jsonrpc_error(s->rpc, EOF);
730         jsonrpc_close(s->rpc);
731         s->rpc = NULL;
732         s->seqno++;
733     } else if (s->stream) {
734         stream_close(s->stream);
735         s->stream = NULL;
736         s->seqno++;
737     }
738 }
739
740 static void
741 jsonrpc_session_connect(struct jsonrpc_session *s)
742 {
743     const char *name = reconnect_get_name(s->reconnect);
744     int error;
745
746     jsonrpc_session_disconnect(s);
747     if (!reconnect_is_passive(s->reconnect)) {
748         error = jsonrpc_stream_open(name, &s->stream);
749         if (!error) {
750             reconnect_connecting(s->reconnect, time_msec());
751         }
752     } else {
753         error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream);
754         if (!error) {
755             reconnect_listening(s->reconnect, time_msec());
756         }
757     }
758
759     if (error) {
760         reconnect_connect_failed(s->reconnect, time_msec(), error);
761     }
762     s->seqno++;
763 }
764
765 void
766 jsonrpc_session_run(struct jsonrpc_session *s)
767 {
768     if (s->pstream) {
769         struct stream *stream;
770         int error;
771
772         error = pstream_accept(s->pstream, &stream);
773         if (!error) {
774             if (s->rpc || s->stream) {
775                 VLOG_INFO_RL(&rl,
776                              "%s: new connection replacing active connection",
777                              reconnect_get_name(s->reconnect));
778                 jsonrpc_session_disconnect(s);
779             }
780             reconnect_connected(s->reconnect, time_msec());
781             s->rpc = jsonrpc_open(stream);
782         } else if (error != EAGAIN) {
783             reconnect_listen_error(s->reconnect, time_msec(), error);
784             pstream_close(s->pstream);
785             s->pstream = NULL;
786         }
787     }
788
789     if (s->rpc) {
790         int error;
791
792         jsonrpc_run(s->rpc);
793         error = jsonrpc_get_status(s->rpc);
794         if (error) {
795             reconnect_disconnected(s->reconnect, time_msec(), 0);
796             jsonrpc_session_disconnect(s);
797         }
798     } else if (s->stream) {
799         int error;
800
801         stream_run(s->stream);
802         error = stream_connect(s->stream);
803         if (!error) {
804             reconnect_connected(s->reconnect, time_msec());
805             s->rpc = jsonrpc_open(s->stream);
806             s->stream = NULL;
807         } else if (error != EAGAIN) {
808             reconnect_connect_failed(s->reconnect, time_msec(), error);
809             stream_close(s->stream);
810             s->stream = NULL;
811         }
812     }
813
814     switch (reconnect_run(s->reconnect, time_msec())) {
815     case RECONNECT_CONNECT:
816         jsonrpc_session_connect(s);
817         break;
818
819     case RECONNECT_DISCONNECT:
820         reconnect_disconnected(s->reconnect, time_msec(), 0);
821         jsonrpc_session_disconnect(s);
822         break;
823
824     case RECONNECT_PROBE:
825         if (s->rpc) {
826             struct json *params;
827             struct jsonrpc_msg *request;
828
829             params = json_array_create_empty();
830             request = jsonrpc_create_request("echo", params, NULL);
831             json_destroy(request->id);
832             request->id = json_string_create("echo");
833             jsonrpc_send(s->rpc, request);
834         }
835         break;
836     }
837 }
838
839 void
840 jsonrpc_session_wait(struct jsonrpc_session *s)
841 {
842     if (s->rpc) {
843         jsonrpc_wait(s->rpc);
844     } else if (s->stream) {
845         stream_run_wait(s->stream);
846         stream_connect_wait(s->stream);
847     }
848     if (s->pstream) {
849         pstream_wait(s->pstream);
850     }
851     reconnect_wait(s->reconnect, time_msec());
852 }
853
854 size_t
855 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
856 {
857     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
858 }
859
860 const char *
861 jsonrpc_session_get_name(const struct jsonrpc_session *s)
862 {
863     return reconnect_get_name(s->reconnect);
864 }
865
866 /* Always takes ownership of 'msg', regardless of success. */
867 int
868 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
869 {
870     if (s->rpc) {
871         return jsonrpc_send(s->rpc, msg);
872     } else {
873         jsonrpc_msg_destroy(msg);
874         return ENOTCONN;
875     }
876 }
877
878 struct jsonrpc_msg *
879 jsonrpc_session_recv(struct jsonrpc_session *s)
880 {
881     if (s->rpc) {
882         struct jsonrpc_msg *msg;
883         jsonrpc_recv(s->rpc, &msg);
884         if (msg) {
885             reconnect_received(s->reconnect, time_msec());
886             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
887                 /* Echo request.  Send reply. */
888                 struct jsonrpc_msg *reply;
889
890                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
891                 jsonrpc_session_send(s, reply);
892             } else if (msg->type == JSONRPC_REPLY
893                 && msg->id && msg->id->type == JSON_STRING
894                 && !strcmp(msg->id->u.string, "echo")) {
895                 /* It's a reply to our echo request.  Suppress it. */
896             } else {
897                 return msg;
898             }
899             jsonrpc_msg_destroy(msg);
900         }
901     }
902     return NULL;
903 }
904
905 void
906 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
907 {
908     if (s->rpc) {
909         jsonrpc_recv_wait(s->rpc);
910     }
911 }
912
913 bool
914 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
915 {
916     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
917 }
918
919 bool
920 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
921 {
922     return s->rpc != NULL;
923 }
924
925 unsigned int
926 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
927 {
928     return s->seqno;
929 }
930
931 void
932 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
933 {
934     reconnect_force_reconnect(s->reconnect, time_msec());
935 }