From: Ben Pfaff Date: Thu, 17 Dec 2009 23:16:43 +0000 (-0800) Subject: ovsdb-server: Factor out complication by using jsonrpc_session. X-Git-Tag: v1.0.0~259^2~353 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4931f33ad9e25eb2df70ff20f1eeb4df813f8b2c;p=sliver-openvswitch.git ovsdb-server: Factor out complication by using jsonrpc_session. --- diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index bd019f738..0f535155e 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -630,6 +630,9 @@ struct jsonrpc_session { unsigned int seqno; }; +/* Creates and returns a jsonrpc_session that connects and reconnects, with + * back-off, to 'name', which should be a string acceptable to + * stream_open(). */ struct jsonrpc_session * jsonrpc_session_open(const char *name) { @@ -646,6 +649,25 @@ jsonrpc_session_open(const char *name) return s; } +/* Creates and returns a jsonrpc_session that is initially connected to + * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */ +struct jsonrpc_session * +jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc) +{ + struct jsonrpc_session *s; + + s = xmalloc(sizeof *s); + s->reconnect = reconnect_create(time_msec()); + reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc)); + reconnect_set_max_tries(s->reconnect, 0); + reconnect_connected(s->reconnect, time_msec()); + s->rpc = jsonrpc; + s->stream = NULL; + s->seqno = 0; + + return s; +} + void jsonrpc_session_close(struct jsonrpc_session *s) { @@ -767,14 +789,28 @@ jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg) struct jsonrpc_msg * jsonrpc_session_recv(struct jsonrpc_session *s) { - struct jsonrpc_msg *msg = NULL; if (s->rpc) { + struct jsonrpc_msg *msg; jsonrpc_recv(s->rpc, &msg); if (msg) { reconnect_received(s->reconnect, time_msec()); + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { + /* Echo request. Send reply. */ + struct jsonrpc_msg *reply; + + reply = jsonrpc_create_reply(json_clone(msg->params), msg->id); + jsonrpc_session_send(s, reply); + } else if (msg->type == JSONRPC_REPLY + && msg->id && msg->id->type == JSON_STRING + && !strcmp(msg->id->u.string, "echo")) { + /* It's a reply to our echo request. Suppress it. */ + } else { + return msg; + } + jsonrpc_msg_destroy(msg); } } - return msg; + return NULL; } void @@ -785,6 +821,12 @@ jsonrpc_session_recv_wait(struct jsonrpc_session *s) } } +bool +jsonrpc_session_is_alive(const struct jsonrpc_session *s) +{ + return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect); +} + bool jsonrpc_session_is_connected(const struct jsonrpc_session *s) { diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index 93ac2e860..ae8b9de31 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -86,6 +86,7 @@ struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *); /* A JSON-RPC session with reconnection. */ struct jsonrpc_session *jsonrpc_session_open(const char *name); +struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *); void jsonrpc_session_close(struct jsonrpc_session *); void jsonrpc_session_run(struct jsonrpc_session *); @@ -98,6 +99,7 @@ int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *); struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *); void jsonrpc_session_recv_wait(struct jsonrpc_session *); +bool jsonrpc_session_is_alive(const struct jsonrpc_session *); bool jsonrpc_session_is_connected(const struct jsonrpc_session *); unsigned int jsonrpc_session_get_seqno(const struct jsonrpc_session *); void jsonrpc_session_force_reconnect(struct jsonrpc_session *); diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c index 877fa3e28..29d1d0c40 100644 --- a/lib/ovsdb-idl.c +++ b/lib/ovsdb-idl.c @@ -244,9 +244,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl) } reply = NULL; - if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { - reply = jsonrpc_create_reply(json_clone(msg->params), msg->id); - } else if (msg->type == JSONRPC_NOTIFY + if (msg->type == JSONRPC_NOTIFY && !strcmp(msg->method, "update") && msg->params->type == JSON_ARRAY && msg->params->u.array.n == 2 diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index a8e724a2f..0bf11378e 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -160,15 +160,12 @@ struct ovsdb_jsonrpc_session { /* Monitors. */ struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */ - /* Connecting and reconnecting. */ - struct reconnect *reconnect; /* For back-off. */ - bool active; /* Active or passive connection? */ - struct jsonrpc *rpc; - struct stream *stream; /* Only if active == false and rpc == NULL. */ + /* Network connectivity. */ + struct jsonrpc_session *js; /* JSON-RPC session. */ + unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */ }; static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *); -static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s); static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *, @@ -178,7 +175,7 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, static struct ovsdb_jsonrpc_session * ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr, - const char *name, bool active) + struct jsonrpc_session *js) { struct ovsdb_jsonrpc_session *s; @@ -188,10 +185,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr, hmap_init(&s->triggers); hmap_init(&s->monitors); list_init(&s->completions); - s->reconnect = reconnect_create(time_msec()); - reconnect_set_name(s->reconnect, name); - reconnect_enable(s->reconnect, time_msec()); - s->active = active; + s->js = js; + s->js_seqno = jsonrpc_session_get_seqno(js); svr->n_sessions++; @@ -202,132 +197,54 @@ static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr, const char *name) { - ovsdb_jsonrpc_session_create(svr, name, true); + ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name)); } static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr, struct stream *stream) { - struct ovsdb_jsonrpc_session *s; - - s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false); - reconnect_connected(s->reconnect, time_msec()); - s->rpc = jsonrpc_open(stream); + ovsdb_jsonrpc_session_create( + svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream))); } static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) { - ovsdb_jsonrpc_session_disconnect(s); + jsonrpc_session_close(s->js); list_remove(&s->node); s->server->n_sessions--; } -static void -ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s) +static int +ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) { - reconnect_disconnected(s->reconnect, time_msec(), 0); - if (s->rpc) { - jsonrpc_error(s->rpc, EOF); + jsonrpc_session_run(s->js); + if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) { + s->js_seqno = jsonrpc_session_get_seqno(s->js); ovsdb_jsonrpc_trigger_complete_all(s); ovsdb_jsonrpc_monitor_remove_all(s); - jsonrpc_close(s->rpc); - s->rpc = NULL; - } else if (s->stream) { - stream_close(s->stream); - s->stream = NULL; } -} -static void -ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s) -{ - ovsdb_jsonrpc_session_disconnect(s); - if (s->active) { - int error = stream_open(reconnect_get_name(s->reconnect), &s->stream); - if (error) { - reconnect_connect_failed(s->reconnect, time_msec(), error); - } else { - reconnect_connecting(s->reconnect, time_msec()); - } - } -} - -static int -ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) -{ - if (s->rpc) { - struct jsonrpc_msg *msg; - int error; + ovsdb_jsonrpc_trigger_complete_done(s); - jsonrpc_run(s->rpc); - - ovsdb_jsonrpc_trigger_complete_done(s); - - if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) { - reconnect_received(s->reconnect, time_msec()); + if (!jsonrpc_session_get_backlog(s->js)) { + struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js); + if (msg) { if (msg->type == JSONRPC_REQUEST) { ovsdb_jsonrpc_session_got_request(s, msg); } else if (msg->type == JSONRPC_NOTIFY) { ovsdb_jsonrpc_session_got_notify(s, msg); - } else if (msg->type == JSONRPC_REPLY - && msg->id && msg->id->type == JSON_STRING - && !strcmp(msg->id->u.string, "echo")) { - /* It's a reply to our echo request. Ignore it. */ } else { VLOG_WARN("%s: received unexpected %s message", - jsonrpc_get_name(s->rpc), + jsonrpc_session_get_name(s->js), jsonrpc_msg_type_to_string(msg->type)); - jsonrpc_error(s->rpc, EPROTO); + jsonrpc_session_force_reconnect(s->js); jsonrpc_msg_destroy(msg); } } - - error = jsonrpc_get_status(s->rpc); - if (error) { - if (s->active) { - ovsdb_jsonrpc_session_disconnect(s); - } else { - return error; - } - } - } else if (s->stream) { - int error = stream_connect(s->stream); - if (!error) { - reconnect_connected(s->reconnect, time_msec()); - s->rpc = jsonrpc_open(s->stream); - s->stream = NULL; - } else if (error != EAGAIN) { - reconnect_connect_failed(s->reconnect, time_msec(), error); - stream_close(s->stream); - s->stream = NULL; - } } - - switch (reconnect_run(s->reconnect, time_msec())) { - case RECONNECT_CONNECT: - ovsdb_jsonrpc_session_connect(s); - break; - - case RECONNECT_DISCONNECT: - ovsdb_jsonrpc_session_disconnect(s); - break; - - case RECONNECT_PROBE: - if (s->rpc) { - struct json *params; - struct jsonrpc_msg *request; - - params = json_array_create_empty(); - request = jsonrpc_create_request("echo", params, NULL); - json_destroy(request->id); - request->id = json_string_create("echo"); - jsonrpc_send(s->rpc, request); - } - break; - } - return s->active || s->rpc ? 0 : ETIMEDOUT; + return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT; } static void @@ -347,15 +264,10 @@ ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr) static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) { - if (s->rpc) { - jsonrpc_wait(s->rpc); - if (!jsonrpc_get_backlog(s->rpc)) { - jsonrpc_recv_wait(s->rpc); - } - } else if (s->stream) { - stream_connect_wait(s->stream); + jsonrpc_session_wait(s->js); + if (!jsonrpc_session_get_backlog(s->js)) { + jsonrpc_session_recv_wait(s->js); } - reconnect_wait(s->reconnect, time_msec()); } static void @@ -404,7 +316,7 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s, if (reply) { jsonrpc_msg_destroy(request); - jsonrpc_send(s->rpc, reply); + jsonrpc_session_send(s->js, reply); } } @@ -456,8 +368,11 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, hash = json_hash(id, 0); t = ovsdb_jsonrpc_trigger_find(s, id, hash); if (t) { - jsonrpc_send(s->rpc, jsonrpc_create_error( - json_string_create("duplicate request ID"), id)); + struct jsonrpc_msg *msg; + + msg = jsonrpc_create_error(json_string_create("duplicate request ID"), + id); + jsonrpc_session_send(s->js, msg); json_destroy(id); json_destroy(params); return; @@ -499,7 +414,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) { struct ovsdb_jsonrpc_session *s = t->session; - if (s->rpc && !jsonrpc_get_status(s->rpc)) { + if (jsonrpc_session_is_connected(s->js)) { struct jsonrpc_msg *reply; struct json *result; @@ -510,7 +425,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t) reply = jsonrpc_create_error(json_string_create("canceled"), t->id); } - jsonrpc_send(s->rpc, reply); + jsonrpc_session_send(s->js, reply); } json_destroy(t->id); @@ -891,7 +806,7 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica, params = json_array_create_2(json_clone(aux.monitor->monitor_id), aux.json); msg = jsonrpc_create_notify("update", params); - jsonrpc_send(aux.monitor->session->rpc, msg); + jsonrpc_session_send(aux.monitor->session->js, msg); } return NULL;