stream: Add stream_run(), stream_run_wait() functions.
[sliver-openvswitch.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <errno.h>
22
23 #include "byteq.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "list.h"
27 #include "ofpbuf.h"
28 #include "poll-loop.h"
29 #include "queue.h"
30 #include "reconnect.h"
31 #include "stream.h"
32 #include "timeval.h"
33
34 #define THIS_MODULE VLM_jsonrpc
35 #include "vlog.h"
36 \f
37 struct jsonrpc {
38     struct stream *stream;
39     char *name;
40     int status;
41
42     /* Input. */
43     struct byteq input;
44     struct json_parser *parser;
45     struct jsonrpc_msg *received;
46
47     /* Output. */
48     struct ovs_queue output;
49     size_t backlog;
50 };
51
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
54
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
57
58 struct jsonrpc *
59 jsonrpc_open(struct stream *stream)
60 {
61     struct jsonrpc *rpc;
62
63     assert(stream != NULL);
64
65     rpc = xzalloc(sizeof *rpc);
66     rpc->name = xstrdup(stream_get_name(stream));
67     rpc->stream = stream;
68     byteq_init(&rpc->input);
69     queue_init(&rpc->output);
70
71     return rpc;
72 }
73
74 void
75 jsonrpc_close(struct jsonrpc *rpc)
76 {
77     if (rpc) {
78         jsonrpc_cleanup(rpc);
79         free(rpc->name);
80         free(rpc);
81     }
82 }
83
84 void
85 jsonrpc_run(struct jsonrpc *rpc)
86 {
87     if (rpc->status) {
88         return;
89     }
90
91     stream_run(rpc->stream);
92     while (!queue_is_empty(&rpc->output)) {
93         struct ofpbuf *buf = rpc->output.head;
94         int retval;
95
96         retval = stream_send(rpc->stream, buf->data, buf->size);
97         if (retval >= 0) {
98             rpc->backlog -= retval;
99             ofpbuf_pull(buf, retval);
100             if (!buf->size) {
101                 ofpbuf_delete(queue_pop_head(&rpc->output));
102             }
103         } else {
104             if (retval != -EAGAIN) {
105                 VLOG_WARN_RL(&rl, "%s: send error: %s",
106                              rpc->name, strerror(-retval));
107                 jsonrpc_error(rpc, -retval);
108             }
109             break;
110         }
111     }
112 }
113
114 void
115 jsonrpc_wait(struct jsonrpc *rpc)
116 {
117     if (!rpc->status) {
118         stream_run_wait(rpc->stream);
119         if (!queue_is_empty(&rpc->output)) {
120             stream_send_wait(rpc->stream);
121         }
122     }
123 }
124
125 int
126 jsonrpc_get_status(const struct jsonrpc *rpc)
127 {
128     return rpc->status;
129 }
130
131 size_t
132 jsonrpc_get_backlog(const struct jsonrpc *rpc)
133 {
134     return rpc->status ? 0 : rpc->backlog;
135 }
136
137 const char *
138 jsonrpc_get_name(const struct jsonrpc *rpc)
139 {
140     return rpc->name;
141 }
142
143 static void
144 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
145                 const struct jsonrpc_msg *msg)
146 {
147     if (VLOG_IS_DBG_ENABLED()) {
148         struct ds s = DS_EMPTY_INITIALIZER;
149         if (msg->method) {
150             ds_put_format(&s, ", method=\"%s\"", msg->method);
151         }
152         if (msg->params) {
153             ds_put_cstr(&s, ", params=");
154             ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
155         }
156         if (msg->result) {
157             ds_put_cstr(&s, ", result=");
158             ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
159         }
160         if (msg->error) {
161             ds_put_cstr(&s, ", error=");
162             ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
163         }
164         if (msg->id) {
165             ds_put_cstr(&s, ", id=");
166             ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
167         }
168         VLOG_DBG("%s: %s %s%s", rpc->name, title,
169                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
170         ds_destroy(&s);
171     }
172 }
173
174 int
175 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
176 {
177     struct ofpbuf *buf;
178     struct json *json;
179     size_t length;
180     char *s;
181
182     if (rpc->status) {
183         jsonrpc_msg_destroy(msg);
184         return rpc->status;
185     }
186
187     jsonrpc_log_msg(rpc, "send", msg);
188
189     json = jsonrpc_msg_to_json(msg);
190     s = json_to_string(json, 0);
191     length = strlen(s);
192     json_destroy(json);
193
194     buf = xmalloc(sizeof *buf);
195     ofpbuf_use(buf, s, length);
196     buf->size = length;
197     queue_push_tail(&rpc->output, buf);
198     rpc->backlog += length;
199
200     if (rpc->output.n == 1) {
201         jsonrpc_run(rpc);
202     }
203     return rpc->status;
204 }
205
206 int
207 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
208 {
209     *msgp = NULL;
210     if (rpc->status) {
211         return rpc->status;
212     }
213
214     while (!rpc->received) {
215         if (byteq_is_empty(&rpc->input)) {
216             size_t chunk;
217             int retval;
218
219             chunk = byteq_headroom(&rpc->input);
220             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
221             if (retval < 0) {
222                 if (retval == -EAGAIN) {
223                     return EAGAIN;
224                 } else {
225                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
226                                  rpc->name, strerror(-retval));
227                     jsonrpc_error(rpc, -retval);
228                     return rpc->status;
229                 }
230             } else if (retval == 0) {
231                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
232                 jsonrpc_error(rpc, EOF);
233                 return EOF;
234             }
235             byteq_advance_head(&rpc->input, retval);
236         } else {
237             size_t n, used;
238
239             if (!rpc->parser) {
240                 rpc->parser = json_parser_create(0);
241             }
242             n = byteq_tailroom(&rpc->input);
243             used = json_parser_feed(rpc->parser,
244                                     (char *) byteq_tail(&rpc->input), n);
245             byteq_advance_tail(&rpc->input, used);
246             if (json_parser_is_done(rpc->parser)) {
247                 jsonrpc_received(rpc);
248                 if (rpc->status) {
249                     return rpc->status;
250                 }
251             }
252         }
253     }
254
255     *msgp = rpc->received;
256     rpc->received = NULL;
257     return 0;
258 }
259
260 void
261 jsonrpc_recv_wait(struct jsonrpc *rpc)
262 {
263     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
264         poll_immediate_wake();
265     } else {
266         stream_recv_wait(rpc->stream);
267     }
268 }
269
270 int
271 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
272 {
273     int error;
274
275     error = jsonrpc_send(rpc, msg);
276     if (error) {
277         return error;
278     }
279
280     while (!queue_is_empty(&rpc->output) && !rpc->status) {
281         jsonrpc_run(rpc);
282         jsonrpc_wait(rpc);
283         poll_block();
284     }
285     return rpc->status;
286 }
287
288 int
289 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
290 {
291     for (;;) {
292         int error = jsonrpc_recv(rpc, msgp);
293         if (error != EAGAIN) {
294             return error;
295         }
296
297         jsonrpc_run(rpc);
298         jsonrpc_wait(rpc);
299         jsonrpc_recv_wait(rpc);
300         poll_block();
301     }
302 }
303
304 int
305 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
306                        struct jsonrpc_msg **replyp)
307 {
308     struct jsonrpc_msg *reply = NULL;
309     struct json *id;
310     int error;
311
312     id = json_clone(request->id);
313     error = jsonrpc_send_block(rpc, request);
314     if (!error) {
315         for (;;) {
316             error = jsonrpc_recv_block(rpc, &reply);
317             if (error
318                 || (reply->type == JSONRPC_REPLY
319                     && json_equal(id, reply->id))) {
320                 break;
321             }
322             jsonrpc_msg_destroy(reply);
323         }
324     }
325     *replyp = error ? NULL : reply;
326     json_destroy(id);
327     return error;
328 }
329
330 static void
331 jsonrpc_received(struct jsonrpc *rpc)
332 {
333     struct jsonrpc_msg *msg;
334     struct json *json;
335     char *error;
336
337     json = json_parser_finish(rpc->parser);
338     rpc->parser = NULL;
339     if (json->type == JSON_STRING) {
340         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
341                      rpc->name, json_string(json));
342         jsonrpc_error(rpc, EPROTO);
343         json_destroy(json);
344         return;
345     }
346
347     error = jsonrpc_msg_from_json(json, &msg);
348     if (error) {
349         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
350                      rpc->name, error);
351         free(error);
352         jsonrpc_error(rpc, EPROTO);
353         return;
354     }
355
356     jsonrpc_log_msg(rpc, "received", msg);
357     rpc->received = msg;
358 }
359
360 void
361 jsonrpc_error(struct jsonrpc *rpc, int error)
362 {
363     assert(error);
364     if (!rpc->status) {
365         rpc->status = error;
366         jsonrpc_cleanup(rpc);
367     }
368 }
369
370 static void
371 jsonrpc_cleanup(struct jsonrpc *rpc)
372 {
373     stream_close(rpc->stream);
374     rpc->stream = NULL;
375
376     json_parser_abort(rpc->parser);
377     rpc->parser = NULL;
378
379     jsonrpc_msg_destroy(rpc->received);
380     rpc->received = NULL;
381
382     queue_clear(&rpc->output);
383     rpc->backlog = 0;
384 }
385 \f
386 static struct jsonrpc_msg *
387 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
388                 struct json *params, struct json *result, struct json *error,
389                 struct json *id)
390 {
391     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
392     msg->type = type;
393     msg->method = method ? xstrdup(method) : NULL;
394     msg->params = params;
395     msg->result = result;
396     msg->error = error;
397     msg->id = id;
398     return msg;
399 }
400
401 static struct json *
402 jsonrpc_create_id(void)
403 {
404     static unsigned int id;
405     return json_integer_create(id++);
406 }
407
408 struct jsonrpc_msg *
409 jsonrpc_create_request(const char *method, struct json *params,
410                        struct json **idp)
411 {
412     struct json *id = jsonrpc_create_id();
413     if (idp) {
414         *idp = json_clone(id);
415     }
416     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
417 }
418
419 struct jsonrpc_msg *
420 jsonrpc_create_notify(const char *method, struct json *params)
421 {
422     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
423 }
424
425 struct jsonrpc_msg *
426 jsonrpc_create_reply(struct json *result, const struct json *id)
427 {
428     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
429                            json_clone(id));
430 }
431
432 struct jsonrpc_msg *
433 jsonrpc_create_error(struct json *error, const struct json *id)
434 {
435     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
436                            json_clone(id));
437 }
438
439 const char *
440 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
441 {
442     switch (type) {
443     case JSONRPC_REQUEST:
444         return "request";
445
446     case JSONRPC_NOTIFY:
447         return "notification";
448
449     case JSONRPC_REPLY:
450         return "reply";
451
452     case JSONRPC_ERROR:
453         return "error";
454     }
455     return "(null)";
456 }
457
458 char *
459 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
460 {
461     const char *type_name;
462     unsigned int pattern;
463
464     if (m->params && m->params->type != JSON_ARRAY) {
465         return xstrdup("\"params\" must be JSON array");
466     }
467
468     switch (m->type) {
469     case JSONRPC_REQUEST:
470         pattern = 0x11001;
471         break;
472
473     case JSONRPC_NOTIFY:
474         pattern = 0x11000;
475         break;
476
477     case JSONRPC_REPLY:
478         pattern = 0x00101;
479         break;
480
481     case JSONRPC_ERROR:
482         pattern = 0x00011;
483         break;
484
485     default:
486         return xasprintf("invalid JSON-RPC message type %d", m->type);
487     }
488
489     type_name = jsonrpc_msg_type_to_string(m->type);
490     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
491         return xasprintf("%s must%s have \"method\"",
492                          type_name, (pattern & 0x10000) ? "" : " not");
493
494     }
495     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
496         return xasprintf("%s must%s have \"params\"",
497                          type_name, (pattern & 0x1000) ? "" : " not");
498
499     }
500     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
501         return xasprintf("%s must%s have \"result\"",
502                          type_name, (pattern & 0x100) ? "" : " not");
503
504     }
505     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
506         return xasprintf("%s must%s have \"error\"",
507                          type_name, (pattern & 0x10) ? "" : " not");
508
509     }
510     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
511         return xasprintf("%s must%s have \"id\"",
512                          type_name, (pattern & 0x1) ? "" : " not");
513
514     }
515     return NULL;
516 }
517
518 void
519 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
520 {
521     if (m) {
522         free(m->method);
523         json_destroy(m->params);
524         json_destroy(m->result);
525         json_destroy(m->error);
526         json_destroy(m->id);
527         free(m);
528     }
529 }
530
531 static struct json *
532 null_from_json_null(struct json *json)
533 {
534     if (json && json->type == JSON_NULL) {
535         json_destroy(json);
536         return NULL;
537     }
538     return json;
539 }
540
541 char *
542 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
543 {
544     struct json *method = NULL;
545     struct jsonrpc_msg *msg = NULL;
546     struct shash *object;
547     char *error;
548
549     if (json->type != JSON_OBJECT) {
550         error = xstrdup("message is not a JSON object");
551         goto exit;
552     }
553     object = json_object(json);
554
555     method = shash_find_and_delete(object, "method");
556     if (method && method->type != JSON_STRING) {
557         error = xstrdup("method is not a JSON string");
558         goto exit;
559     }
560
561     msg = xzalloc(sizeof *msg);
562     msg->method = method ? xstrdup(method->u.string) : NULL;
563     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
564     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
565     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
566     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
567     msg->type = (msg->result ? JSONRPC_REPLY
568                  : msg->error ? JSONRPC_ERROR
569                  : msg->id ? JSONRPC_REQUEST
570                  : JSONRPC_NOTIFY);
571     if (!shash_is_empty(object)) {
572         error = xasprintf("message has unexpected member \"%s\"",
573                           shash_first(object)->name);
574         goto exit;
575     }
576     error = jsonrpc_msg_is_valid(msg);
577     if (error) {
578         goto exit;
579     }
580
581 exit:
582     json_destroy(method);
583     json_destroy(json);
584     if (error) {
585         jsonrpc_msg_destroy(msg);
586         msg = NULL;
587     }
588     *msgp = msg;
589     return error;
590 }
591
592 struct json *
593 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
594 {
595     struct json *json = json_object_create();
596
597     if (m->method) {
598         json_object_put(json, "method", json_string_create_nocopy(m->method));
599     }
600
601     if (m->params) {
602         json_object_put(json, "params", m->params);
603     }
604
605     if (m->result) {
606         json_object_put(json, "result", m->result);
607     } else if (m->type == JSONRPC_ERROR) {
608         json_object_put(json, "result", json_null_create());
609     }
610
611     if (m->error) {
612         json_object_put(json, "error", m->error);
613     } else if (m->type == JSONRPC_REPLY) {
614         json_object_put(json, "error", json_null_create());
615     }
616
617     if (m->id) {
618         json_object_put(json, "id", m->id);
619     } else if (m->type == JSONRPC_NOTIFY) {
620         json_object_put(json, "id", json_null_create());
621     }
622
623     free(m);
624
625     return json;
626 }
627 \f
628 /* A JSON-RPC session with reconnection. */
629
630 struct jsonrpc_session {
631     struct reconnect *reconnect;
632     struct jsonrpc *rpc;
633     struct stream *stream;
634     unsigned int seqno;
635 };
636
637 /* Creates and returns a jsonrpc_session that connects and reconnects, with
638  * back-off, to 'name', which should be a string acceptable to
639  * stream_open(). */
640 struct jsonrpc_session *
641 jsonrpc_session_open(const char *name)
642 {
643     struct jsonrpc_session *s;
644
645     s = xmalloc(sizeof *s);
646     s->reconnect = reconnect_create(time_msec());
647     reconnect_set_name(s->reconnect, name);
648     reconnect_enable(s->reconnect, time_msec());
649     s->rpc = NULL;
650     s->stream = NULL;
651     s->seqno = 0;
652
653     return s;
654 }
655
656 /* Creates and returns a jsonrpc_session that is initially connected to
657  * 'jsonrpc'.  If the connection is dropped, it will not be reconnected. */
658 struct jsonrpc_session *
659 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
660 {
661     struct jsonrpc_session *s;
662
663     s = xmalloc(sizeof *s);
664     s->reconnect = reconnect_create(time_msec());
665     reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
666     reconnect_set_max_tries(s->reconnect, 0);
667     reconnect_connected(s->reconnect, time_msec());
668     s->rpc = jsonrpc;
669     s->stream = NULL;
670     s->seqno = 0;
671
672     return s;
673 }
674
675 void
676 jsonrpc_session_close(struct jsonrpc_session *s)
677 {
678     if (s) {
679         jsonrpc_close(s->rpc);
680         reconnect_destroy(s->reconnect);
681         free(s);
682     }
683 }
684
685 static void
686 jsonrpc_session_disconnect(struct jsonrpc_session *s)
687 {
688     reconnect_disconnected(s->reconnect, time_msec(), 0);
689     if (s->rpc) {
690         jsonrpc_error(s->rpc, EOF);
691         jsonrpc_close(s->rpc);
692         s->rpc = NULL;
693         s->seqno++;
694     } else if (s->stream) {
695         stream_close(s->stream);
696         s->stream = NULL;
697         s->seqno++;
698     }
699 }
700
701 static void
702 jsonrpc_session_connect(struct jsonrpc_session *s)
703 {
704     int error;
705
706     jsonrpc_session_disconnect(s);
707     error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
708     if (error) {
709         reconnect_connect_failed(s->reconnect, time_msec(), error);
710     } else {
711         reconnect_connecting(s->reconnect, time_msec());
712     }
713     s->seqno++;
714 }
715
716 void
717 jsonrpc_session_run(struct jsonrpc_session *s)
718 {
719     if (s->rpc) {
720         int error;
721
722         jsonrpc_run(s->rpc);
723         error = jsonrpc_get_status(s->rpc);
724         if (error) {
725             jsonrpc_session_disconnect(s);
726         }
727     } else if (s->stream) {
728         int error;
729
730         stream_run(s->stream);
731         error = stream_connect(s->stream);
732         if (!error) {
733             reconnect_connected(s->reconnect, time_msec());
734             s->rpc = jsonrpc_open(s->stream);
735             s->stream = NULL;
736         } else if (error != EAGAIN) {
737             reconnect_connect_failed(s->reconnect, time_msec(), error);
738             stream_close(s->stream);
739             s->stream = NULL;
740         }
741     }
742
743     switch (reconnect_run(s->reconnect, time_msec())) {
744     case RECONNECT_CONNECT:
745         jsonrpc_session_connect(s);
746         break;
747
748     case RECONNECT_DISCONNECT:
749         jsonrpc_session_disconnect(s);
750         break;
751
752     case RECONNECT_PROBE:
753         if (s->rpc) {
754             struct json *params;
755             struct jsonrpc_msg *request;
756
757             params = json_array_create_empty();
758             request = jsonrpc_create_request("echo", params, NULL);
759             json_destroy(request->id);
760             request->id = json_string_create("echo");
761             jsonrpc_send(s->rpc, request);
762         }
763         break;
764     }
765 }
766
767 void
768 jsonrpc_session_wait(struct jsonrpc_session *s)
769 {
770     if (s->rpc) {
771         jsonrpc_wait(s->rpc);
772     } else if (s->stream) {
773         stream_run_wait(s->stream);
774         stream_connect_wait(s->stream);
775     }
776     reconnect_wait(s->reconnect, time_msec());
777 }
778
779 size_t
780 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
781 {
782     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
783 }
784
785 const char *
786 jsonrpc_session_get_name(const struct jsonrpc_session *s)
787 {
788     return reconnect_get_name(s->reconnect);
789 }
790
791 int
792 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
793 {
794     return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
795 }
796
797 struct jsonrpc_msg *
798 jsonrpc_session_recv(struct jsonrpc_session *s)
799 {
800     if (s->rpc) {
801         struct jsonrpc_msg *msg;
802         jsonrpc_recv(s->rpc, &msg);
803         if (msg) {
804             reconnect_received(s->reconnect, time_msec());
805             if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
806                 /* Echo request.  Send reply. */
807                 struct jsonrpc_msg *reply;
808
809                 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
810                 jsonrpc_session_send(s, reply);
811             } else if (msg->type == JSONRPC_REPLY
812                 && msg->id && msg->id->type == JSON_STRING
813                 && !strcmp(msg->id->u.string, "echo")) {
814                 /* It's a reply to our echo request.  Suppress it. */
815             } else {
816                 return msg;
817             }
818             jsonrpc_msg_destroy(msg);
819         }
820     }
821     return NULL;
822 }
823
824 void
825 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
826 {
827     if (s->rpc) {
828         jsonrpc_recv_wait(s->rpc);
829     }
830 }
831
832 bool
833 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
834 {
835     return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
836 }
837
838 bool
839 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
840 {
841     return s->rpc != NULL;
842 }
843
844 unsigned int
845 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
846 {
847     return s->seqno;
848 }
849
850 void
851 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
852 {
853     reconnect_force_reconnect(s->reconnect, time_msec());
854 }