From 6c2882f90cd0b52321308998c06769741ab91c10 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Thu, 12 Nov 2009 12:58:53 -0800 Subject: [PATCH] ovsdb-server: Reconnect to clients specified on --connect. --- ovsdb/SPECS | 22 ++++ ovsdb/jsonrpc-server.c | 226 +++++++++++++++++++++++++++++----------- ovsdb/ovsdb-server.1.in | 4 +- 3 files changed, 191 insertions(+), 61 deletions(-) diff --git a/ovsdb/SPECS b/ovsdb/SPECS index 12d97682a..f0d7748ff 100644 --- a/ovsdb/SPECS +++ b/ovsdb/SPECS @@ -277,6 +277,28 @@ form: The "cancel" notification itself has no reply. +echo +.... + +Request object members: + + "method": "echo" required + "params": any JSON value required + "id": any JSON value required + +Response object members: + + "result": same as "params" + "error": null + "id": the request "id" member + +Both the JSON-RPC client and the server must implement this request. + +This JSON-RPC request and response can be used to implement connection +keepalives, by allowing the server to check that the client is still +there or vice versa. + + Notation for the Wire Protocol ------------------------------ diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index e97a2c333..7a33d77af 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -22,6 +22,7 @@ #include "json.h" #include "jsonrpc.h" #include "ovsdb.h" +#include "reconnect.h" #include "stream.h" #include "svec.h" #include "timeval.h" @@ -44,14 +45,23 @@ static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *); struct ovsdb_jsonrpc_session { struct ovsdb_jsonrpc_server *server; struct list node; /* Element in server's sessions list. */ - struct jsonrpc *rpc; struct hmap triggers; struct list completions; /* Completed triggers. */ + + struct reconnect *reconnect; /* For back-off. */ + bool active; /* Active or passive connection? */ + struct jsonrpc *rpc; + struct stream *stream; /* Only if active == false and rpc == NULL. */ }; -static void ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *, - struct stream *); +static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *, + const char *name); +static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *, + struct stream *); static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s); +static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *); +static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, struct jsonrpc_msg *); static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, @@ -88,16 +98,7 @@ ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active, list_init(&server->sessions); SVEC_FOR_EACH (i, name, active) { - struct stream *stream; - int error; - - error = stream_open(name, &stream); - if (!error) { - ovsdb_jsonrpc_session_open(server, stream); - } else { - ovs_error(error, "%s: connection failed", name); - retval = error; - } + ovsdb_jsonrpc_session_create_active(server, name); } SVEC_FOR_EACH (i, name, passive) { @@ -131,7 +132,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) error = pstream_accept(listener, &stream); if (!error) { - ovsdb_jsonrpc_session_open(svr, stream); + ovsdb_jsonrpc_session_create_passive(svr, stream); } else if (error == EAGAIN) { i++; } else if (error) { @@ -145,33 +146,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) /* Handle each session. */ LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node, &svr->sessions) { - struct jsonrpc_msg *msg; - int error; - - jsonrpc_run(s->rpc); - - while (!list_is_empty(&s->completions)) { - struct ovsdb_jsonrpc_trigger *t - = CONTAINER_OF(s->completions.next, - struct ovsdb_jsonrpc_trigger, trigger.node); - ovsdb_jsonrpc_trigger_complete(t); - } - - if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) { - if (msg->type == JSONRPC_REQUEST) { - ovsdb_jsonrpc_session_got_request(s, msg); - } else if (msg->type == JSONRPC_NOTIFY) { - ovsdb_jsonrpc_session_got_notify(s, msg); - } else { - VLOG_WARN("%s: received unexpected %s message", - jsonrpc_get_name(s->rpc), - jsonrpc_msg_type_to_string(msg->type)); - jsonrpc_error(s->rpc, EPROTO); - jsonrpc_msg_destroy(msg); - } - } - - error = jsonrpc_get_status(s->rpc); + int error = ovsdb_jsonrpc_session_run(s); if (error) { ovsdb_jsonrpc_session_close(s); } @@ -192,10 +167,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) } LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) { - jsonrpc_wait(s->rpc); - if (!jsonrpc_get_backlog(s->rpc)) { - jsonrpc_recv_wait(s->rpc); - } + ovsdb_jsonrpc_session_wait(s); } } @@ -231,7 +203,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) { struct ovsdb_jsonrpc_session *s = t->session; - if (!jsonrpc_get_status(s->rpc)) { + if (s->rpc && !jsonrpc_get_status(s->rpc)) { struct jsonrpc_msg *reply; struct json *result; @@ -251,38 +223,174 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) free(t); } -static void -ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *svr, - struct stream *stream) +static struct ovsdb_jsonrpc_session * +ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr, + const char *name, bool active) { struct ovsdb_jsonrpc_session *s; s = xzalloc(sizeof *s); s->server = svr; list_push_back(&svr->sessions, &s->node); - s->rpc = jsonrpc_open(stream); hmap_init(&s->triggers); list_init(&s->completions); + s->reconnect = reconnect_create(time_msec()); + reconnect_set_name(s->reconnect, name); + reconnect_enable(s->reconnect, time_msec()); + s->active = active; + svr->n_sessions++; + + return s; } static void -ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) +ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr, + const char *name) { - struct ovsdb_jsonrpc_trigger *t, *next; + ovsdb_jsonrpc_session_create(svr, name, true); +} - jsonrpc_error(s->rpc, EOF); - HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node, - &s->triggers) { - ovsdb_jsonrpc_trigger_complete(t); - } +static void +ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr, + struct stream *stream) +{ + struct ovsdb_jsonrpc_session *s; - jsonrpc_close(s->rpc); + s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false); + reconnect_connected(s->reconnect, time_msec()); + s->rpc = jsonrpc_open(stream); +} +static void +ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) +{ + ovsdb_jsonrpc_session_disconnect(s); list_remove(&s->node); s->server->n_sessions--; } +static void +ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s) +{ + reconnect_disconnected(s->reconnect, time_msec(), 0); + if (s->rpc) { + struct ovsdb_jsonrpc_trigger *t, *next; + + jsonrpc_error(s->rpc, EOF); + HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node, + &s->triggers) { + ovsdb_jsonrpc_trigger_complete(t); + } + + jsonrpc_close(s->rpc); + s->rpc = NULL; + } else if (s->stream) { + stream_close(s->stream); + s->stream = NULL; + } +} + +static void +ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s) +{ + ovsdb_jsonrpc_session_disconnect(s); + if (s->active) { + int error = stream_open(reconnect_get_name(s->reconnect), &s->stream); + if (error) { + reconnect_connect_failed(s->reconnect, time_msec(), error); + } else { + reconnect_connecting(s->reconnect, time_msec()); + } + } +} + +static int +ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) +{ + if (s->rpc) { + struct jsonrpc_msg *msg; + int error; + + jsonrpc_run(s->rpc); + + while (!list_is_empty(&s->completions)) { + struct ovsdb_jsonrpc_trigger *t + = CONTAINER_OF(s->completions.next, + struct ovsdb_jsonrpc_trigger, trigger.node); + ovsdb_jsonrpc_trigger_complete(t); + } + + if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) { + reconnect_received(s->reconnect, time_msec()); + if (msg->type == JSONRPC_REQUEST) { + ovsdb_jsonrpc_session_got_request(s, msg); + } else if (msg->type == JSONRPC_NOTIFY) { + ovsdb_jsonrpc_session_got_notify(s, msg); + } else { + VLOG_WARN("%s: received unexpected %s message", + jsonrpc_get_name(s->rpc), + jsonrpc_msg_type_to_string(msg->type)); + jsonrpc_error(s->rpc, EPROTO); + jsonrpc_msg_destroy(msg); + } + } + + error = jsonrpc_get_status(s->rpc); + if (error) { + if (s->active) { + ovsdb_jsonrpc_session_disconnect(s); + } else { + return error; + } + } + } else if (s->stream) { + int error = stream_connect(s->stream); + if (!error) { + reconnect_connected(s->reconnect, time_msec()); + s->rpc = jsonrpc_open(s->stream); + s->stream = NULL; + } else if (error != EAGAIN) { + reconnect_connect_failed(s->reconnect, time_msec(), error); + stream_close(s->stream); + s->stream = NULL; + } + } + + switch (reconnect_run(s->reconnect, time_msec())) { + case RECONNECT_CONNECT: + ovsdb_jsonrpc_session_connect(s); + break; + + case RECONNECT_DISCONNECT: + ovsdb_jsonrpc_session_disconnect(s); + break; + + case RECONNECT_PROBE: + if (s->rpc) { + struct json *params = json_integer_create(0); + jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params)); + } + break; + } + return s->active || s->rpc ? 0 : ETIMEDOUT; + +} + +static void +ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) +{ + if (s->rpc) { + jsonrpc_wait(s->rpc); + if (!jsonrpc_get_backlog(s->rpc)) { + jsonrpc_recv_wait(s->rpc); + } + } else if (s->stream) { + stream_connect_wait(s->stream); + } + reconnect_wait(s->reconnect, time_msec()); +} + static struct jsonrpc_msg * execute_transaction(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request) @@ -329,6 +437,8 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, } else if (!strcmp(request->method, "get_schema")) { reply = jsonrpc_create_reply( ovsdb_schema_to_json(s->server->db->schema), request->id); + } else if (!strcmp(request->method, "echo")) { + reply = jsonrpc_create_reply(json_clone(request->params), request->id); } else { reply = jsonrpc_create_error(json_string_create("unknown method"), request->id); diff --git a/ovsdb/ovsdb-server.1.in b/ovsdb/ovsdb-server.1.in index d24e443f3..9a888fc83 100644 --- a/ovsdb/ovsdb-server.1.in +++ b/ovsdb/ovsdb-server.1.in @@ -45,9 +45,7 @@ named \fIfile\fR. .IP "\fB\-\-connect=\fIremote\fR" Makes \fBovsdb\-server\fR initiate a JSON-RPC connection to \fIremote\fR, which must take one of the forms listed below. The -current implementation only attempts to connect once, and does not -reconnect after a failure or after the connection closes. This will -be fixed later. +server will reconnect to \fIremote\fR as necessary. . .RS .IP "\fBtcp:\fIip\fB:\fIport\fR" -- 2.43.0