2 * Copyright (c) 2009, 2010 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include "dynamic-string.h"
29 #include "poll-loop.h"
31 #include "reconnect.h"
35 #define THIS_MODULE VLM_jsonrpc
39 struct stream *stream;
45 struct json_parser *parser;
46 struct jsonrpc_msg *received;
49 struct ovs_queue output;
53 /* Rate limit for error messages. */
54 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
56 static void jsonrpc_received(struct jsonrpc *);
57 static void jsonrpc_cleanup(struct jsonrpc *);
59 /* This is just the same as stream_open() except that it uses the default
60 * JSONRPC ports if none is specified. */
62 jsonrpc_stream_open(const char *name, struct stream **streamp)
64 return stream_open_with_default_ports(name, JSONRPC_TCP_PORT,
65 JSONRPC_SSL_PORT, streamp);
68 /* This is just the same as pstream_open() except that it uses the default
69 * JSONRPC ports if none is specified. */
71 jsonrpc_pstream_open(const char *name, struct pstream **pstreamp)
73 return pstream_open_with_default_ports(name, JSONRPC_TCP_PORT,
74 JSONRPC_SSL_PORT, pstreamp);
78 jsonrpc_open(struct stream *stream)
82 assert(stream != NULL);
84 rpc = xzalloc(sizeof *rpc);
85 rpc->name = xstrdup(stream_get_name(stream));
87 byteq_init(&rpc->input);
88 queue_init(&rpc->output);
94 jsonrpc_close(struct jsonrpc *rpc)
104 jsonrpc_run(struct jsonrpc *rpc)
110 stream_run(rpc->stream);
111 while (!queue_is_empty(&rpc->output)) {
112 struct ofpbuf *buf = rpc->output.head;
115 retval = stream_send(rpc->stream, buf->data, buf->size);
117 rpc->backlog -= retval;
118 ofpbuf_pull(buf, retval);
120 ofpbuf_delete(queue_pop_head(&rpc->output));
123 if (retval != -EAGAIN) {
124 VLOG_WARN_RL(&rl, "%s: send error: %s",
125 rpc->name, strerror(-retval));
126 jsonrpc_error(rpc, -retval);
134 jsonrpc_wait(struct jsonrpc *rpc)
137 stream_run_wait(rpc->stream);
138 if (!queue_is_empty(&rpc->output)) {
139 stream_send_wait(rpc->stream);
145 jsonrpc_get_status(const struct jsonrpc *rpc)
151 jsonrpc_get_backlog(const struct jsonrpc *rpc)
153 return rpc->status ? 0 : rpc->backlog;
157 jsonrpc_get_name(const struct jsonrpc *rpc)
163 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
164 const struct jsonrpc_msg *msg)
166 if (VLOG_IS_DBG_ENABLED()) {
167 struct ds s = DS_EMPTY_INITIALIZER;
169 ds_put_format(&s, ", method=\"%s\"", msg->method);
172 ds_put_cstr(&s, ", params=");
173 json_to_ds(msg->params, 0, &s);
176 ds_put_cstr(&s, ", result=");
177 json_to_ds(msg->result, 0, &s);
180 ds_put_cstr(&s, ", error=");
181 json_to_ds(msg->error, 0, &s);
184 ds_put_cstr(&s, ", id=");
185 json_to_ds(msg->id, 0, &s);
187 VLOG_DBG("%s: %s %s%s", rpc->name, title,
188 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
193 /* Always takes ownership of 'msg', regardless of success. */
195 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
203 jsonrpc_msg_destroy(msg);
207 jsonrpc_log_msg(rpc, "send", msg);
209 json = jsonrpc_msg_to_json(msg);
210 s = json_to_string(json, 0);
214 buf = xmalloc(sizeof *buf);
215 ofpbuf_use(buf, s, length);
217 queue_push_tail(&rpc->output, buf);
218 rpc->backlog += length;
220 if (rpc->output.n == 1) {
227 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
234 while (!rpc->received) {
235 if (byteq_is_empty(&rpc->input)) {
239 chunk = byteq_headroom(&rpc->input);
240 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
242 if (retval == -EAGAIN) {
245 VLOG_WARN_RL(&rl, "%s: receive error: %s",
246 rpc->name, strerror(-retval));
247 jsonrpc_error(rpc, -retval);
250 } else if (retval == 0) {
251 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
252 jsonrpc_error(rpc, EOF);
255 byteq_advance_head(&rpc->input, retval);
260 rpc->parser = json_parser_create(0);
262 n = byteq_tailroom(&rpc->input);
263 used = json_parser_feed(rpc->parser,
264 (char *) byteq_tail(&rpc->input), n);
265 byteq_advance_tail(&rpc->input, used);
266 if (json_parser_is_done(rpc->parser)) {
267 jsonrpc_received(rpc);
275 *msgp = rpc->received;
276 rpc->received = NULL;
281 jsonrpc_recv_wait(struct jsonrpc *rpc)
283 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
284 poll_immediate_wake();
286 stream_recv_wait(rpc->stream);
290 /* Always takes ownership of 'msg', regardless of success. */
292 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
296 error = jsonrpc_send(rpc, msg);
303 if (queue_is_empty(&rpc->output) || rpc->status) {
312 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
315 int error = jsonrpc_recv(rpc, msgp);
316 if (error != EAGAIN) {
322 jsonrpc_recv_wait(rpc);
327 /* Always takes ownership of 'request', regardless of success. */
329 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
330 struct jsonrpc_msg **replyp)
332 struct jsonrpc_msg *reply = NULL;
336 id = json_clone(request->id);
337 error = jsonrpc_send_block(rpc, request);
340 error = jsonrpc_recv_block(rpc, &reply);
342 || (reply->type == JSONRPC_REPLY
343 && json_equal(id, reply->id))) {
346 jsonrpc_msg_destroy(reply);
349 *replyp = error ? NULL : reply;
355 jsonrpc_received(struct jsonrpc *rpc)
357 struct jsonrpc_msg *msg;
361 json = json_parser_finish(rpc->parser);
363 if (json->type == JSON_STRING) {
364 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
365 rpc->name, json_string(json));
366 jsonrpc_error(rpc, EPROTO);
371 error = jsonrpc_msg_from_json(json, &msg);
373 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
376 jsonrpc_error(rpc, EPROTO);
380 jsonrpc_log_msg(rpc, "received", msg);
385 jsonrpc_error(struct jsonrpc *rpc, int error)
390 jsonrpc_cleanup(rpc);
395 jsonrpc_cleanup(struct jsonrpc *rpc)
397 stream_close(rpc->stream);
400 json_parser_abort(rpc->parser);
403 jsonrpc_msg_destroy(rpc->received);
404 rpc->received = NULL;
406 queue_clear(&rpc->output);
410 static struct jsonrpc_msg *
411 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
412 struct json *params, struct json *result, struct json *error,
415 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
417 msg->method = method ? xstrdup(method) : NULL;
418 msg->params = params;
419 msg->result = result;
426 jsonrpc_create_id(void)
428 static unsigned int id;
429 return json_integer_create(id++);
433 jsonrpc_create_request(const char *method, struct json *params,
436 struct json *id = jsonrpc_create_id();
438 *idp = json_clone(id);
440 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
444 jsonrpc_create_notify(const char *method, struct json *params)
446 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
450 jsonrpc_create_reply(struct json *result, const struct json *id)
452 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
457 jsonrpc_create_error(struct json *error, const struct json *id)
459 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
464 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
467 case JSONRPC_REQUEST:
471 return "notification";
483 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
485 const char *type_name;
486 unsigned int pattern;
488 if (m->params && m->params->type != JSON_ARRAY) {
489 return xstrdup("\"params\" must be JSON array");
493 case JSONRPC_REQUEST:
510 return xasprintf("invalid JSON-RPC message type %d", m->type);
513 type_name = jsonrpc_msg_type_to_string(m->type);
514 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
515 return xasprintf("%s must%s have \"method\"",
516 type_name, (pattern & 0x10000) ? "" : " not");
519 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
520 return xasprintf("%s must%s have \"params\"",
521 type_name, (pattern & 0x1000) ? "" : " not");
524 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
525 return xasprintf("%s must%s have \"result\"",
526 type_name, (pattern & 0x100) ? "" : " not");
529 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
530 return xasprintf("%s must%s have \"error\"",
531 type_name, (pattern & 0x10) ? "" : " not");
534 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
535 return xasprintf("%s must%s have \"id\"",
536 type_name, (pattern & 0x1) ? "" : " not");
543 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
547 json_destroy(m->params);
548 json_destroy(m->result);
549 json_destroy(m->error);
556 null_from_json_null(struct json *json)
558 if (json && json->type == JSON_NULL) {
566 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
568 struct json *method = NULL;
569 struct jsonrpc_msg *msg = NULL;
570 struct shash *object;
573 if (json->type != JSON_OBJECT) {
574 error = xstrdup("message is not a JSON object");
577 object = json_object(json);
579 method = shash_find_and_delete(object, "method");
580 if (method && method->type != JSON_STRING) {
581 error = xstrdup("method is not a JSON string");
585 msg = xzalloc(sizeof *msg);
586 msg->method = method ? xstrdup(method->u.string) : NULL;
587 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
588 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
589 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
590 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
591 msg->type = (msg->result ? JSONRPC_REPLY
592 : msg->error ? JSONRPC_ERROR
593 : msg->id ? JSONRPC_REQUEST
595 if (!shash_is_empty(object)) {
596 error = xasprintf("message has unexpected member \"%s\"",
597 shash_first(object)->name);
600 error = jsonrpc_msg_is_valid(msg);
606 json_destroy(method);
609 jsonrpc_msg_destroy(msg);
617 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
619 struct json *json = json_object_create();
622 json_object_put(json, "method", json_string_create_nocopy(m->method));
626 json_object_put(json, "params", m->params);
630 json_object_put(json, "result", m->result);
631 } else if (m->type == JSONRPC_ERROR) {
632 json_object_put(json, "result", json_null_create());
636 json_object_put(json, "error", m->error);
637 } else if (m->type == JSONRPC_REPLY) {
638 json_object_put(json, "error", json_null_create());
642 json_object_put(json, "id", m->id);
643 } else if (m->type == JSONRPC_NOTIFY) {
644 json_object_put(json, "id", json_null_create());
652 /* A JSON-RPC session with reconnection. */
654 struct jsonrpc_session {
655 struct reconnect *reconnect;
657 struct stream *stream;
658 struct pstream *pstream;
662 /* Creates and returns a jsonrpc_session to 'name', which should be a string
663 * acceptable to stream_open() or pstream_open().
665 * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
666 * jsonrpc_session connects and reconnects, with back-off, to 'name'.
668 * If 'name' is a passive connection method, e.g. "ptcp:", the new
669 * jsonrpc_session listens for connections to 'name'. It maintains at most one
670 * connection at any given time. Any new connection causes the previous one
671 * (if any) to be dropped. */
672 struct jsonrpc_session *
673 jsonrpc_session_open(const char *name)
675 struct jsonrpc_session *s;
677 s = xmalloc(sizeof *s);
678 s->reconnect = reconnect_create(time_msec());
679 reconnect_set_name(s->reconnect, name);
680 reconnect_enable(s->reconnect, time_msec());
686 if (!pstream_verify_name(name)) {
687 reconnect_set_passive(s->reconnect, true, time_msec());
693 /* Creates and returns a jsonrpc_session that is initially connected to
694 * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */
695 struct jsonrpc_session *
696 jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
698 struct jsonrpc_session *s;
700 s = xmalloc(sizeof *s);
701 s->reconnect = reconnect_create(time_msec());
702 reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
703 reconnect_set_max_tries(s->reconnect, 0);
704 reconnect_connected(s->reconnect, time_msec());
714 jsonrpc_session_close(struct jsonrpc_session *s)
717 jsonrpc_close(s->rpc);
718 reconnect_destroy(s->reconnect);
719 stream_close(s->stream);
720 pstream_close(s->pstream);
726 jsonrpc_session_disconnect(struct jsonrpc_session *s)
729 jsonrpc_error(s->rpc, EOF);
730 jsonrpc_close(s->rpc);
733 } else if (s->stream) {
734 stream_close(s->stream);
741 jsonrpc_session_connect(struct jsonrpc_session *s)
743 const char *name = reconnect_get_name(s->reconnect);
746 jsonrpc_session_disconnect(s);
747 if (!reconnect_is_passive(s->reconnect)) {
748 error = jsonrpc_stream_open(name, &s->stream);
750 reconnect_connecting(s->reconnect, time_msec());
753 error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream);
755 reconnect_listening(s->reconnect, time_msec());
760 reconnect_connect_failed(s->reconnect, time_msec(), error);
766 jsonrpc_session_run(struct jsonrpc_session *s)
769 struct stream *stream;
772 error = pstream_accept(s->pstream, &stream);
774 if (s->rpc || s->stream) {
776 "%s: new connection replacing active connection",
777 reconnect_get_name(s->reconnect));
778 jsonrpc_session_disconnect(s);
780 reconnect_connected(s->reconnect, time_msec());
781 s->rpc = jsonrpc_open(stream);
782 } else if (error != EAGAIN) {
783 reconnect_listen_error(s->reconnect, time_msec(), error);
784 pstream_close(s->pstream);
793 error = jsonrpc_get_status(s->rpc);
795 reconnect_disconnected(s->reconnect, time_msec(), 0);
796 jsonrpc_session_disconnect(s);
798 } else if (s->stream) {
801 stream_run(s->stream);
802 error = stream_connect(s->stream);
804 reconnect_connected(s->reconnect, time_msec());
805 s->rpc = jsonrpc_open(s->stream);
807 } else if (error != EAGAIN) {
808 reconnect_connect_failed(s->reconnect, time_msec(), error);
809 stream_close(s->stream);
814 switch (reconnect_run(s->reconnect, time_msec())) {
815 case RECONNECT_CONNECT:
816 jsonrpc_session_connect(s);
819 case RECONNECT_DISCONNECT:
820 reconnect_disconnected(s->reconnect, time_msec(), 0);
821 jsonrpc_session_disconnect(s);
824 case RECONNECT_PROBE:
827 struct jsonrpc_msg *request;
829 params = json_array_create_empty();
830 request = jsonrpc_create_request("echo", params, NULL);
831 json_destroy(request->id);
832 request->id = json_string_create("echo");
833 jsonrpc_send(s->rpc, request);
840 jsonrpc_session_wait(struct jsonrpc_session *s)
843 jsonrpc_wait(s->rpc);
844 } else if (s->stream) {
845 stream_run_wait(s->stream);
846 stream_connect_wait(s->stream);
849 pstream_wait(s->pstream);
851 reconnect_wait(s->reconnect, time_msec());
855 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
857 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
861 jsonrpc_session_get_name(const struct jsonrpc_session *s)
863 return reconnect_get_name(s->reconnect);
866 /* Always takes ownership of 'msg', regardless of success. */
868 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
871 return jsonrpc_send(s->rpc, msg);
873 jsonrpc_msg_destroy(msg);
879 jsonrpc_session_recv(struct jsonrpc_session *s)
882 struct jsonrpc_msg *msg;
883 jsonrpc_recv(s->rpc, &msg);
885 reconnect_received(s->reconnect, time_msec());
886 if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
887 /* Echo request. Send reply. */
888 struct jsonrpc_msg *reply;
890 reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
891 jsonrpc_session_send(s, reply);
892 } else if (msg->type == JSONRPC_REPLY
893 && msg->id && msg->id->type == JSON_STRING
894 && !strcmp(msg->id->u.string, "echo")) {
895 /* It's a reply to our echo request. Suppress it. */
899 jsonrpc_msg_destroy(msg);
906 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
909 jsonrpc_recv_wait(s->rpc);
914 jsonrpc_session_is_alive(const struct jsonrpc_session *s)
916 return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
920 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
922 return s->rpc != NULL;
926 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
932 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
934 reconnect_force_reconnect(s->reconnect, time_msec());