2 * Copyright (c) 2009, 2010 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include "dynamic-string.h"
28 #include "poll-loop.h"
30 #include "reconnect.h"
34 #define THIS_MODULE VLM_jsonrpc
38 struct stream *stream;
44 struct json_parser *parser;
45 struct jsonrpc_msg *received;
48 struct ovs_queue output;
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
59 jsonrpc_open(struct stream *stream)
63 assert(stream != NULL);
65 rpc = xzalloc(sizeof *rpc);
66 rpc->name = xstrdup(stream_get_name(stream));
68 byteq_init(&rpc->input);
69 queue_init(&rpc->output);
75 jsonrpc_close(struct jsonrpc *rpc)
85 jsonrpc_run(struct jsonrpc *rpc)
91 stream_run(rpc->stream);
92 while (!queue_is_empty(&rpc->output)) {
93 struct ofpbuf *buf = rpc->output.head;
96 retval = stream_send(rpc->stream, buf->data, buf->size);
98 rpc->backlog -= retval;
99 ofpbuf_pull(buf, retval);
101 ofpbuf_delete(queue_pop_head(&rpc->output));
104 if (retval != -EAGAIN) {
105 VLOG_WARN_RL(&rl, "%s: send error: %s",
106 rpc->name, strerror(-retval));
107 jsonrpc_error(rpc, -retval);
115 jsonrpc_wait(struct jsonrpc *rpc)
118 stream_run_wait(rpc->stream);
119 if (!queue_is_empty(&rpc->output)) {
120 stream_send_wait(rpc->stream);
126 jsonrpc_get_status(const struct jsonrpc *rpc)
132 jsonrpc_get_backlog(const struct jsonrpc *rpc)
134 return rpc->status ? 0 : rpc->backlog;
138 jsonrpc_get_name(const struct jsonrpc *rpc)
144 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
145 const struct jsonrpc_msg *msg)
147 if (VLOG_IS_DBG_ENABLED()) {
148 struct ds s = DS_EMPTY_INITIALIZER;
150 ds_put_format(&s, ", method=\"%s\"", msg->method);
153 ds_put_cstr(&s, ", params=");
154 ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
157 ds_put_cstr(&s, ", result=");
158 ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
161 ds_put_cstr(&s, ", error=");
162 ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
165 ds_put_cstr(&s, ", id=");
166 ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
168 VLOG_DBG("%s: %s %s%s", rpc->name, title,
169 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
175 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
183 jsonrpc_msg_destroy(msg);
187 jsonrpc_log_msg(rpc, "send", msg);
189 json = jsonrpc_msg_to_json(msg);
190 s = json_to_string(json, 0);
194 buf = xmalloc(sizeof *buf);
195 ofpbuf_use(buf, s, length);
197 queue_push_tail(&rpc->output, buf);
198 rpc->backlog += length;
200 if (rpc->output.n == 1) {
207 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
214 while (!rpc->received) {
215 if (byteq_is_empty(&rpc->input)) {
219 chunk = byteq_headroom(&rpc->input);
220 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
222 if (retval == -EAGAIN) {
225 VLOG_WARN_RL(&rl, "%s: receive error: %s",
226 rpc->name, strerror(-retval));
227 jsonrpc_error(rpc, -retval);
230 } else if (retval == 0) {
231 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
232 jsonrpc_error(rpc, EOF);
235 byteq_advance_head(&rpc->input, retval);
240 rpc->parser = json_parser_create(0);
242 n = byteq_tailroom(&rpc->input);
243 used = json_parser_feed(rpc->parser,
244 (char *) byteq_tail(&rpc->input), n);
245 byteq_advance_tail(&rpc->input, used);
246 if (json_parser_is_done(rpc->parser)) {
247 jsonrpc_received(rpc);
255 *msgp = rpc->received;
256 rpc->received = NULL;
261 jsonrpc_recv_wait(struct jsonrpc *rpc)
263 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
264 poll_immediate_wake();
266 stream_recv_wait(rpc->stream);
271 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
275 error = jsonrpc_send(rpc, msg);
280 while (!queue_is_empty(&rpc->output) && !rpc->status) {
289 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
292 int error = jsonrpc_recv(rpc, msgp);
293 if (error != EAGAIN) {
299 jsonrpc_recv_wait(rpc);
305 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
306 struct jsonrpc_msg **replyp)
308 struct jsonrpc_msg *reply = NULL;
312 id = json_clone(request->id);
313 error = jsonrpc_send_block(rpc, request);
316 error = jsonrpc_recv_block(rpc, &reply);
318 || (reply->type == JSONRPC_REPLY
319 && json_equal(id, reply->id))) {
322 jsonrpc_msg_destroy(reply);
325 *replyp = error ? NULL : reply;
331 jsonrpc_received(struct jsonrpc *rpc)
333 struct jsonrpc_msg *msg;
337 json = json_parser_finish(rpc->parser);
339 if (json->type == JSON_STRING) {
340 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
341 rpc->name, json_string(json));
342 jsonrpc_error(rpc, EPROTO);
347 error = jsonrpc_msg_from_json(json, &msg);
349 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
352 jsonrpc_error(rpc, EPROTO);
356 jsonrpc_log_msg(rpc, "received", msg);
361 jsonrpc_error(struct jsonrpc *rpc, int error)
366 jsonrpc_cleanup(rpc);
371 jsonrpc_cleanup(struct jsonrpc *rpc)
373 stream_close(rpc->stream);
376 json_parser_abort(rpc->parser);
379 jsonrpc_msg_destroy(rpc->received);
380 rpc->received = NULL;
382 queue_clear(&rpc->output);
386 static struct jsonrpc_msg *
387 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
388 struct json *params, struct json *result, struct json *error,
391 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
393 msg->method = method ? xstrdup(method) : NULL;
394 msg->params = params;
395 msg->result = result;
402 jsonrpc_create_id(void)
404 static unsigned int id;
405 return json_integer_create(id++);
409 jsonrpc_create_request(const char *method, struct json *params,
412 struct json *id = jsonrpc_create_id();
414 *idp = json_clone(id);
416 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
420 jsonrpc_create_notify(const char *method, struct json *params)
422 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
426 jsonrpc_create_reply(struct json *result, const struct json *id)
428 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
433 jsonrpc_create_error(struct json *error, const struct json *id)
435 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
440 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
443 case JSONRPC_REQUEST:
447 return "notification";
459 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
461 const char *type_name;
462 unsigned int pattern;
464 if (m->params && m->params->type != JSON_ARRAY) {
465 return xstrdup("\"params\" must be JSON array");
469 case JSONRPC_REQUEST:
486 return xasprintf("invalid JSON-RPC message type %d", m->type);
489 type_name = jsonrpc_msg_type_to_string(m->type);
490 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
491 return xasprintf("%s must%s have \"method\"",
492 type_name, (pattern & 0x10000) ? "" : " not");
495 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
496 return xasprintf("%s must%s have \"params\"",
497 type_name, (pattern & 0x1000) ? "" : " not");
500 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
501 return xasprintf("%s must%s have \"result\"",
502 type_name, (pattern & 0x100) ? "" : " not");
505 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
506 return xasprintf("%s must%s have \"error\"",
507 type_name, (pattern & 0x10) ? "" : " not");
510 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
511 return xasprintf("%s must%s have \"id\"",
512 type_name, (pattern & 0x1) ? "" : " not");
519 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
523 json_destroy(m->params);
524 json_destroy(m->result);
525 json_destroy(m->error);
532 null_from_json_null(struct json *json)
534 if (json && json->type == JSON_NULL) {
542 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
544 struct json *method = NULL;
545 struct jsonrpc_msg *msg = NULL;
546 struct shash *object;
549 if (json->type != JSON_OBJECT) {
550 error = xstrdup("message is not a JSON object");
553 object = json_object(json);
555 method = shash_find_and_delete(object, "method");
556 if (method && method->type != JSON_STRING) {
557 error = xstrdup("method is not a JSON string");
561 msg = xzalloc(sizeof *msg);
562 msg->method = method ? xstrdup(method->u.string) : NULL;
563 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
564 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
565 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
566 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
567 msg->type = (msg->result ? JSONRPC_REPLY
568 : msg->error ? JSONRPC_ERROR
569 : msg->id ? JSONRPC_REQUEST
571 if (!shash_is_empty(object)) {
572 error = xasprintf("message has unexpected member \"%s\"",
573 shash_first(object)->name);
576 error = jsonrpc_msg_is_valid(msg);
582 json_destroy(method);
585 jsonrpc_msg_destroy(msg);
593 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
595 struct json *json = json_object_create();
598 json_object_put(json, "method", json_string_create_nocopy(m->method));
602 json_object_put(json, "params", m->params);
606 json_object_put(json, "result", m->result);
607 } else if (m->type == JSONRPC_ERROR) {
608 json_object_put(json, "result", json_null_create());
612 json_object_put(json, "error", m->error);
613 } else if (m->type == JSONRPC_REPLY) {
614 json_object_put(json, "error", json_null_create());
618 json_object_put(json, "id", m->id);
619 } else if (m->type == JSONRPC_NOTIFY) {
620 json_object_put(json, "id", json_null_create());
628 /* A JSON-RPC session with reconnection. */
630 struct jsonrpc_session {
631 struct reconnect *reconnect;
633 struct stream *stream;
637 /* Creates and returns a jsonrpc_session that connects and reconnects, with
638 * back-off, to 'name', which should be a string acceptable to
640 struct jsonrpc_session *
641 jsonrpc_session_open(const char *name)
643 struct jsonrpc_session *s;
645 s = xmalloc(sizeof *s);
646 s->reconnect = reconnect_create(time_msec());
647 reconnect_set_name(s->reconnect, name);
648 reconnect_enable(s->reconnect, time_msec());
656 /* Creates and returns a jsonrpc_session that is initially connected to
657 * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */
658 struct jsonrpc_session *
659 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
661 struct jsonrpc_session *s;
663 s = xmalloc(sizeof *s);
664 s->reconnect = reconnect_create(time_msec());
665 reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
666 reconnect_set_max_tries(s->reconnect, 0);
667 reconnect_connected(s->reconnect, time_msec());
676 jsonrpc_session_close(struct jsonrpc_session *s)
679 jsonrpc_close(s->rpc);
680 reconnect_destroy(s->reconnect);
686 jsonrpc_session_disconnect(struct jsonrpc_session *s)
688 reconnect_disconnected(s->reconnect, time_msec(), 0);
690 jsonrpc_error(s->rpc, EOF);
691 jsonrpc_close(s->rpc);
694 } else if (s->stream) {
695 stream_close(s->stream);
702 jsonrpc_session_connect(struct jsonrpc_session *s)
706 jsonrpc_session_disconnect(s);
707 error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
709 reconnect_connect_failed(s->reconnect, time_msec(), error);
711 reconnect_connecting(s->reconnect, time_msec());
717 jsonrpc_session_run(struct jsonrpc_session *s)
723 error = jsonrpc_get_status(s->rpc);
725 jsonrpc_session_disconnect(s);
727 } else if (s->stream) {
730 stream_run(s->stream);
731 error = stream_connect(s->stream);
733 reconnect_connected(s->reconnect, time_msec());
734 s->rpc = jsonrpc_open(s->stream);
736 } else if (error != EAGAIN) {
737 reconnect_connect_failed(s->reconnect, time_msec(), error);
738 stream_close(s->stream);
743 switch (reconnect_run(s->reconnect, time_msec())) {
744 case RECONNECT_CONNECT:
745 jsonrpc_session_connect(s);
748 case RECONNECT_DISCONNECT:
749 jsonrpc_session_disconnect(s);
752 case RECONNECT_PROBE:
755 struct jsonrpc_msg *request;
757 params = json_array_create_empty();
758 request = jsonrpc_create_request("echo", params, NULL);
759 json_destroy(request->id);
760 request->id = json_string_create("echo");
761 jsonrpc_send(s->rpc, request);
768 jsonrpc_session_wait(struct jsonrpc_session *s)
771 jsonrpc_wait(s->rpc);
772 } else if (s->stream) {
773 stream_run_wait(s->stream);
774 stream_connect_wait(s->stream);
776 reconnect_wait(s->reconnect, time_msec());
780 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
782 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
786 jsonrpc_session_get_name(const struct jsonrpc_session *s)
788 return reconnect_get_name(s->reconnect);
792 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
794 return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
798 jsonrpc_session_recv(struct jsonrpc_session *s)
801 struct jsonrpc_msg *msg;
802 jsonrpc_recv(s->rpc, &msg);
804 reconnect_received(s->reconnect, time_msec());
805 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
806 /* Echo request. Send reply. */
807 struct jsonrpc_msg *reply;
809 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
810 jsonrpc_session_send(s, reply);
811 } else if (msg->type == JSONRPC_REPLY
812 && msg->id && msg->id->type == JSON_STRING
813 && !strcmp(msg->id->u.string, "echo")) {
814 /* It's a reply to our echo request. Suppress it. */
818 jsonrpc_msg_destroy(msg);
825 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
828 jsonrpc_recv_wait(s->rpc);
833 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
835 return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
839 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
841 return s->rpc != NULL;
845 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
851 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
853 reconnect_force_reconnect(s->reconnect, time_msec());