jsonrpc: Make it easy to get a new JSON-RPC request's id.
[sliver-openvswitch.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <errno.h>
22
23 #include "byteq.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "list.h"
27 #include "ofpbuf.h"
28 #include "poll-loop.h"
29 #include "queue.h"
30 #include "reconnect.h"
31 #include "stream.h"
32 #include "timeval.h"
33
34 #define THIS_MODULE VLM_jsonrpc
35 #include "vlog.h"
36 \f
37 struct jsonrpc {
38     struct stream *stream;
39     char *name;
40     int status;
41
42     /* Input. */
43     struct byteq input;
44     struct json_parser *parser;
45     struct jsonrpc_msg *received;
46
47     /* Output. */
48     struct ovs_queue output;
49     size_t backlog;
50 };
51
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
54
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
57
58 struct jsonrpc *
59 jsonrpc_open(struct stream *stream)
60 {
61     struct jsonrpc *rpc;
62
63     assert(stream != NULL);
64
65     rpc = xzalloc(sizeof *rpc);
66     rpc->name = xstrdup(stream_get_name(stream));
67     rpc->stream = stream;
68     byteq_init(&rpc->input);
69     queue_init(&rpc->output);
70
71     return rpc;
72 }
73
74 void
75 jsonrpc_close(struct jsonrpc *rpc)
76 {
77     if (rpc) {
78         jsonrpc_cleanup(rpc);
79         free(rpc->name);
80         free(rpc);
81     }
82 }
83
84 void
85 jsonrpc_run(struct jsonrpc *rpc)
86 {
87     if (rpc->status) {
88         return;
89     }
90
91     while (!queue_is_empty(&rpc->output)) {
92         struct ofpbuf *buf = rpc->output.head;
93         int retval;
94
95         retval = stream_send(rpc->stream, buf->data, buf->size);
96         if (retval >= 0) {
97             rpc->backlog -= retval;
98             ofpbuf_pull(buf, retval);
99             if (!buf->size) {
100                 ofpbuf_delete(queue_pop_head(&rpc->output));
101             }
102         } else {
103             if (retval != -EAGAIN) {
104                 VLOG_WARN_RL(&rl, "%s: send error: %s",
105                              rpc->name, strerror(-retval));
106                 jsonrpc_error(rpc, -retval);
107             }
108             break;
109         }
110     }
111 }
112
113 void
114 jsonrpc_wait(struct jsonrpc *rpc)
115 {
116     if (!rpc->status && !queue_is_empty(&rpc->output)) {
117         stream_send_wait(rpc->stream);
118     }
119 }
120
121 int
122 jsonrpc_get_status(const struct jsonrpc *rpc)
123 {
124     return rpc->status;
125 }
126
127 size_t
128 jsonrpc_get_backlog(const struct jsonrpc *rpc)
129 {
130     return rpc->status ? 0 : rpc->backlog;
131 }
132
133 const char *
134 jsonrpc_get_name(const struct jsonrpc *rpc)
135 {
136     return rpc->name;
137 }
138
139 static void
140 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
141                 const struct jsonrpc_msg *msg)
142 {
143     if (VLOG_IS_DBG_ENABLED()) {
144         struct ds s = DS_EMPTY_INITIALIZER;
145         if (msg->method) {
146             ds_put_format(&s, ", method=\"%s\"", msg->method);
147         }
148         if (msg->params) {
149             ds_put_cstr(&s, ", params=");
150             ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
151         }
152         if (msg->result) {
153             ds_put_cstr(&s, ", result=");
154             ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
155         }
156         if (msg->error) {
157             ds_put_cstr(&s, ", error=");
158             ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
159         }
160         if (msg->id) {
161             ds_put_cstr(&s, ", id=");
162             ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
163         }
164         VLOG_DBG("%s: %s %s%s", rpc->name, title,
165                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
166         ds_destroy(&s);
167     }
168 }
169
170 int
171 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
172 {
173     struct ofpbuf *buf;
174     struct json *json;
175     size_t length;
176     char *s;
177
178     if (rpc->status) {
179         jsonrpc_msg_destroy(msg);
180         return rpc->status;
181     }
182
183     jsonrpc_log_msg(rpc, "send", msg);
184
185     json = jsonrpc_msg_to_json(msg);
186     s = json_to_string(json, 0);
187     length = strlen(s);
188     json_destroy(json);
189
190     buf = xmalloc(sizeof *buf);
191     ofpbuf_use(buf, s, length);
192     buf->size = length;
193     queue_push_tail(&rpc->output, buf);
194     rpc->backlog += length;
195
196     if (rpc->output.n == 1) {
197         jsonrpc_run(rpc);
198     }
199     return rpc->status;
200 }
201
202 int
203 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
204 {
205     *msgp = NULL;
206     if (rpc->status) {
207         return rpc->status;
208     }
209
210     while (!rpc->received) {
211         if (byteq_is_empty(&rpc->input)) {
212             size_t chunk;
213             int retval;
214
215             chunk = byteq_headroom(&rpc->input);
216             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
217             if (retval < 0) {
218                 if (retval == -EAGAIN) {
219                     return EAGAIN;
220                 } else {
221                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
222                                  rpc->name, strerror(-retval));
223                     jsonrpc_error(rpc, -retval);
224                     return rpc->status;
225                 }
226             } else if (retval == 0) {
227                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
228                 jsonrpc_error(rpc, EOF);
229                 return EOF;
230             }
231             byteq_advance_head(&rpc->input, retval);
232         } else {
233             size_t n, used;
234
235             if (!rpc->parser) {
236                 rpc->parser = json_parser_create(0);
237             }
238             n = byteq_tailroom(&rpc->input);
239             used = json_parser_feed(rpc->parser,
240                                     (char *) byteq_tail(&rpc->input), n);
241             byteq_advance_tail(&rpc->input, used);
242             if (json_parser_is_done(rpc->parser)) {
243                 jsonrpc_received(rpc);
244                 if (rpc->status) {
245                     return rpc->status;
246                 }
247             }
248         }
249     }
250
251     *msgp = rpc->received;
252     rpc->received = NULL;
253     return 0;
254 }
255
256 void
257 jsonrpc_recv_wait(struct jsonrpc *rpc)
258 {
259     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
260         poll_immediate_wake();
261     } else {
262         stream_recv_wait(rpc->stream);
263     }
264 }
265
266 int
267 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
268 {
269     int error;
270
271     error = jsonrpc_send(rpc, msg);
272     if (error) {
273         return error;
274     }
275
276     while (!queue_is_empty(&rpc->output) && !rpc->status) {
277         jsonrpc_run(rpc);
278         jsonrpc_wait(rpc);
279         poll_block();
280     }
281     return rpc->status;
282 }
283
284 int
285 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
286 {
287     for (;;) {
288         int error = jsonrpc_recv(rpc, msgp);
289         if (error != EAGAIN) {
290             return error;
291         }
292
293         jsonrpc_run(rpc);
294         jsonrpc_wait(rpc);
295         jsonrpc_recv_wait(rpc);
296         poll_block();
297     }
298 }
299
300 int
301 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
302                        struct jsonrpc_msg **replyp)
303 {
304     struct jsonrpc_msg *reply = NULL;
305     struct json *id;
306     int error;
307
308     id = json_clone(request->id);
309     error = jsonrpc_send_block(rpc, request);
310     if (!error) {
311         for (;;) {
312             error = jsonrpc_recv_block(rpc, &reply);
313             if (error
314                 || (reply->type == JSONRPC_REPLY
315                     && json_equal(id, reply->id))) {
316                 break;
317             }
318             jsonrpc_msg_destroy(reply);
319         }
320     }
321     *replyp = error ? NULL : reply;
322     json_destroy(id);
323     return error;
324 }
325
326 static void
327 jsonrpc_received(struct jsonrpc *rpc)
328 {
329     struct jsonrpc_msg *msg;
330     struct json *json;
331     char *error;
332
333     json = json_parser_finish(rpc->parser);
334     rpc->parser = NULL;
335     if (json->type == JSON_STRING) {
336         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
337                      rpc->name, json_string(json));
338         jsonrpc_error(rpc, EPROTO);
339         json_destroy(json);
340         return;
341     }
342
343     error = jsonrpc_msg_from_json(json, &msg);
344     if (error) {
345         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
346                      rpc->name, error);
347         free(error);
348         jsonrpc_error(rpc, EPROTO);
349         return;
350     }
351
352     jsonrpc_log_msg(rpc, "received", msg);
353     rpc->received = msg;
354 }
355
356 void
357 jsonrpc_error(struct jsonrpc *rpc, int error)
358 {
359     assert(error);
360     if (!rpc->status) {
361         rpc->status = error;
362         jsonrpc_cleanup(rpc);
363     }
364 }
365
366 static void
367 jsonrpc_cleanup(struct jsonrpc *rpc)
368 {
369     stream_close(rpc->stream);
370     rpc->stream = NULL;
371
372     json_parser_abort(rpc->parser);
373     rpc->parser = NULL;
374
375     jsonrpc_msg_destroy(rpc->received);
376     rpc->received = NULL;
377
378     queue_clear(&rpc->output);
379     rpc->backlog = 0;
380 }
381 \f
382 static struct jsonrpc_msg *
383 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
384                 struct json *params, struct json *result, struct json *error,
385                 struct json *id)
386 {
387     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
388     msg->type = type;
389     msg->method = method ? xstrdup(method) : NULL;
390     msg->params = params;
391     msg->result = result;
392     msg->error = error;
393     msg->id = id;
394     return msg;
395 }
396
397 static struct json *
398 jsonrpc_create_id(void)
399 {
400     static unsigned int id;
401     return json_integer_create(id++);
402 }
403
404 struct jsonrpc_msg *
405 jsonrpc_create_request(const char *method, struct json *params,
406                        struct json **idp)
407 {
408     struct json *id = jsonrpc_create_id();
409     if (idp) {
410         *idp = json_clone(id);
411     }
412     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
413 }
414
415 struct jsonrpc_msg *
416 jsonrpc_create_notify(const char *method, struct json *params)
417 {
418     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
419 }
420
421 struct jsonrpc_msg *
422 jsonrpc_create_reply(struct json *result, const struct json *id)
423 {
424     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
425                            json_clone(id));
426 }
427
428 struct jsonrpc_msg *
429 jsonrpc_create_error(struct json *error, const struct json *id)
430 {
431     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
432                            json_clone(id));
433 }
434
435 const char *
436 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
437 {
438     switch (type) {
439     case JSONRPC_REQUEST:
440         return "request";
441
442     case JSONRPC_NOTIFY:
443         return "notification";
444
445     case JSONRPC_REPLY:
446         return "reply";
447
448     case JSONRPC_ERROR:
449         return "error";
450     }
451     return "(null)";
452 }
453
454 char *
455 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
456 {
457     const char *type_name;
458     unsigned int pattern;
459
460     if (m->params && m->params->type != JSON_ARRAY) {
461         return xstrdup("\"params\" must be JSON array");
462     }
463
464     switch (m->type) {
465     case JSONRPC_REQUEST:
466         pattern = 0x11001;
467         break;
468
469     case JSONRPC_NOTIFY:
470         pattern = 0x11000;
471         break;
472
473     case JSONRPC_REPLY:
474         pattern = 0x00101;
475         break;
476
477     case JSONRPC_ERROR:
478         pattern = 0x00011;
479         break;
480
481     default:
482         return xasprintf("invalid JSON-RPC message type %d", m->type);
483     }
484
485     type_name = jsonrpc_msg_type_to_string(m->type);
486     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
487         return xasprintf("%s must%s have \"method\"",
488                          type_name, (pattern & 0x10000) ? "" : " not");
489
490     }
491     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
492         return xasprintf("%s must%s have \"params\"",
493                          type_name, (pattern & 0x1000) ? "" : " not");
494
495     }
496     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
497         return xasprintf("%s must%s have \"result\"",
498                          type_name, (pattern & 0x100) ? "" : " not");
499
500     }
501     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
502         return xasprintf("%s must%s have \"error\"",
503                          type_name, (pattern & 0x10) ? "" : " not");
504
505     }
506     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
507         return xasprintf("%s must%s have \"id\"",
508                          type_name, (pattern & 0x1) ? "" : " not");
509
510     }
511     return NULL;
512 }
513
514 void
515 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
516 {
517     if (m) {
518         free(m->method);
519         json_destroy(m->params);
520         json_destroy(m->result);
521         json_destroy(m->error);
522         json_destroy(m->id);
523         free(m);
524     }
525 }
526
527 static struct json *
528 null_from_json_null(struct json *json)
529 {
530     if (json && json->type == JSON_NULL) {
531         json_destroy(json);
532         return NULL;
533     }
534     return json;
535 }
536
537 char *
538 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
539 {
540     struct json *method = NULL;
541     struct jsonrpc_msg *msg = NULL;
542     struct shash *object;
543     char *error;
544
545     if (json->type != JSON_OBJECT) {
546         error = xstrdup("message is not a JSON object");
547         goto exit;
548     }
549     object = json_object(json);
550
551     method = shash_find_and_delete(object, "method");
552     if (method && method->type != JSON_STRING) {
553         error = xstrdup("method is not a JSON string");
554         goto exit;
555     }
556
557     msg = xzalloc(sizeof *msg);
558     msg->method = method ? xstrdup(method->u.string) : NULL;
559     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
560     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
561     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
562     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
563     msg->type = (msg->result ? JSONRPC_REPLY
564                  : msg->error ? JSONRPC_ERROR
565                  : msg->id ? JSONRPC_REQUEST
566                  : JSONRPC_NOTIFY);
567     if (!shash_is_empty(object)) {
568         error = xasprintf("message has unexpected member \"%s\"",
569                           shash_first(object)->name);
570         goto exit;
571     }
572     error = jsonrpc_msg_is_valid(msg);
573     if (error) {
574         goto exit;
575     }
576
577 exit:
578     json_destroy(method);
579     json_destroy(json);
580     if (error) {
581         jsonrpc_msg_destroy(msg);
582         msg = NULL;
583     }
584     *msgp = msg;
585     return error;
586 }
587
588 struct json *
589 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
590 {
591     struct json *json = json_object_create();
592
593     if (m->method) {
594         json_object_put(json, "method", json_string_create_nocopy(m->method));
595     }
596
597     if (m->params) {
598         json_object_put(json, "params", m->params);
599     }
600
601     if (m->result) {
602         json_object_put(json, "result", m->result);
603     } else if (m->type == JSONRPC_ERROR) {
604         json_object_put(json, "result", json_null_create());
605     }
606
607     if (m->error) {
608         json_object_put(json, "error", m->error);
609     } else if (m->type == JSONRPC_REPLY) {
610         json_object_put(json, "error", json_null_create());
611     }
612
613     if (m->id) {
614         json_object_put(json, "id", m->id);
615     } else if (m->type == JSONRPC_NOTIFY) {
616         json_object_put(json, "id", json_null_create());
617     }
618
619     free(m);
620
621     return json;
622 }
623 \f
624 /* A JSON-RPC session with reconnection. */
625
626 struct jsonrpc_session {
627     struct reconnect *reconnect;
628     struct jsonrpc *rpc;
629     struct stream *stream;
630     unsigned int seqno;
631 };
632
633 struct jsonrpc_session *
634 jsonrpc_session_open(const char *name)
635 {
636     struct jsonrpc_session *s;
637
638     s = xmalloc(sizeof *s);
639     s->reconnect = reconnect_create(time_msec());
640     reconnect_set_name(s->reconnect, name);
641     reconnect_enable(s->reconnect, time_msec());
642     s->rpc = NULL;
643     s->stream = NULL;
644     s->seqno = 0;
645
646     return s;
647 }
648
649 void
650 jsonrpc_session_close(struct jsonrpc_session *s)
651 {
652     if (s) {
653         jsonrpc_close(s->rpc);
654         reconnect_destroy(s->reconnect);
655         free(s);
656     }
657 }
658
659 static void
660 jsonrpc_session_disconnect(struct jsonrpc_session *s)
661 {
662     reconnect_disconnected(s->reconnect, time_msec(), 0);
663     if (s->rpc) {
664         jsonrpc_error(s->rpc, EOF);
665         jsonrpc_close(s->rpc);
666         s->rpc = NULL;
667         s->seqno++;
668     } else if (s->stream) {
669         stream_close(s->stream);
670         s->stream = NULL;
671         s->seqno++;
672     }
673 }
674
675 static void
676 jsonrpc_session_connect(struct jsonrpc_session *s)
677 {
678     int error;
679
680     jsonrpc_session_disconnect(s);
681     error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
682     if (error) {
683         reconnect_connect_failed(s->reconnect, time_msec(), error);
684     } else {
685         reconnect_connecting(s->reconnect, time_msec());
686     }
687     s->seqno++;
688 }
689
690 void
691 jsonrpc_session_run(struct jsonrpc_session *s)
692 {
693     if (s->rpc) {
694         int error;
695
696         jsonrpc_run(s->rpc);
697         error = jsonrpc_get_status(s->rpc);
698         if (error) {
699             jsonrpc_session_disconnect(s);
700         }
701     } else if (s->stream) {
702         int error = stream_connect(s->stream);
703         if (!error) {
704             reconnect_connected(s->reconnect, time_msec());
705             s->rpc = jsonrpc_open(s->stream);
706             s->stream = NULL;
707         } else if (error != EAGAIN) {
708             reconnect_connect_failed(s->reconnect, time_msec(), error);
709             stream_close(s->stream);
710             s->stream = NULL;
711         }
712     }
713
714     switch (reconnect_run(s->reconnect, time_msec())) {
715     case RECONNECT_CONNECT:
716         jsonrpc_session_connect(s);
717         break;
718
719     case RECONNECT_DISCONNECT:
720         jsonrpc_session_disconnect(s);
721         break;
722
723     case RECONNECT_PROBE:
724         if (s->rpc) {
725             struct json *params;
726             struct jsonrpc_msg *request;
727
728             params = json_array_create_empty();
729             request = jsonrpc_create_request("echo", params, NULL);
730             json_destroy(request->id);
731             request->id = json_string_create("echo");
732             jsonrpc_send(s->rpc, request);
733         }
734         break;
735     }
736 }
737
738 void
739 jsonrpc_session_wait(struct jsonrpc_session *s)
740 {
741     if (s->rpc) {
742         jsonrpc_wait(s->rpc);
743     } else if (s->stream) {
744         stream_connect_wait(s->stream);
745     }
746     reconnect_wait(s->reconnect, time_msec());
747 }
748
749 size_t
750 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
751 {
752     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
753 }
754
755 const char *
756 jsonrpc_session_get_name(const struct jsonrpc_session *s)
757 {
758     return reconnect_get_name(s->reconnect);
759 }
760
761 int
762 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
763 {
764     return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
765 }
766
767 struct jsonrpc_msg *
768 jsonrpc_session_recv(struct jsonrpc_session *s)
769 {
770     struct jsonrpc_msg *msg = NULL;
771     if (s->rpc) {
772         jsonrpc_recv(s->rpc, &msg);
773         if (msg) {
774             reconnect_received(s->reconnect, time_msec());
775         }
776     }
777     return msg;
778 }
779
780 void
781 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
782 {
783     if (s->rpc) {
784         jsonrpc_recv_wait(s->rpc);
785     }
786 }
787
788 bool
789 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
790 {
791     return s->rpc != NULL;
792 }
793
794 unsigned int
795 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
796 {
797     return s->seqno;
798 }
799
800 void
801 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
802 {
803     reconnect_force_reconnect(s->reconnect, time_msec());
804 }