1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
26 #include "reconnect.h"
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
34 struct ovsdb_jsonrpc_session;
36 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
38 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
40 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
41 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
43 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
44 struct json *id, struct json *params);
45 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
46 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
47 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
48 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
49 static void ovsdb_jsonrpc_trigger_complete_done(
50 struct ovsdb_jsonrpc_session *);
52 /* JSON-RPC database server. */
54 struct ovsdb_jsonrpc_server {
57 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
58 unsigned int n_sessions, max_sessions;
59 unsigned int max_triggers;
61 struct pstream **listeners;
62 size_t n_listeners, allocated_listeners;
65 struct ovsdb_jsonrpc_server *
66 ovsdb_jsonrpc_server_create(struct ovsdb *db)
68 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
70 server->max_sessions = 64;
71 server->max_triggers = 64;
72 list_init(&server->sessions);
77 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
79 struct pstream *pstream;
82 error = pstream_open(name, &pstream);
87 if (svr->n_listeners >= svr->allocated_listeners) {
88 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
89 sizeof *svr->listeners);
91 svr->listeners[svr->n_listeners++] = pstream;
96 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
99 ovsdb_jsonrpc_session_create_active(svr, name);
103 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
107 /* Accept new connections. */
108 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
109 struct pstream *listener = svr->listeners[i];
110 struct stream *stream;
113 error = pstream_accept(listener, &stream);
115 ovsdb_jsonrpc_session_create_passive(svr, stream);
116 } else if (error == EAGAIN) {
119 VLOG_WARN("%s: accept failed: %s",
120 pstream_get_name(listener), strerror(error));
121 pstream_close(listener);
122 svr->listeners[i] = svr->listeners[--svr->n_listeners];
126 /* Handle each session. */
127 ovsdb_jsonrpc_session_run_all(svr);
131 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
133 if (svr->n_sessions < svr->max_sessions) {
136 for (i = 0; i < svr->n_listeners; i++) {
137 pstream_wait(svr->listeners[i]);
141 ovsdb_jsonrpc_session_wait_all(svr);
144 /* JSON-RPC database server session. */
146 struct ovsdb_jsonrpc_session {
147 struct ovsdb_jsonrpc_server *server;
148 struct list node; /* Element in server's sessions list. */
151 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
152 struct list completions; /* Completed triggers. */
154 /* Connecting and reconnecting. */
155 struct reconnect *reconnect; /* For back-off. */
156 bool active; /* Active or passive connection? */
158 struct stream *stream; /* Only if active == false and rpc == NULL. */
161 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
162 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
163 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
164 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
165 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
166 struct jsonrpc_msg *);
167 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
168 struct jsonrpc_msg *);
170 static struct ovsdb_jsonrpc_session *
171 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
172 const char *name, bool active)
174 struct ovsdb_jsonrpc_session *s;
176 s = xzalloc(sizeof *s);
178 list_push_back(&svr->sessions, &s->node);
179 hmap_init(&s->triggers);
180 list_init(&s->completions);
181 s->reconnect = reconnect_create(time_msec());
182 reconnect_set_name(s->reconnect, name);
183 reconnect_enable(s->reconnect, time_msec());
192 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
195 ovsdb_jsonrpc_session_create(svr, name, true);
199 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
200 struct stream *stream)
202 struct ovsdb_jsonrpc_session *s;
204 s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
205 reconnect_connected(s->reconnect, time_msec());
206 s->rpc = jsonrpc_open(stream);
210 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
212 ovsdb_jsonrpc_session_disconnect(s);
213 list_remove(&s->node);
214 s->server->n_sessions--;
218 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
220 reconnect_disconnected(s->reconnect, time_msec(), 0);
222 jsonrpc_error(s->rpc, EOF);
223 ovsdb_jsonrpc_trigger_complete_all(s);
224 jsonrpc_close(s->rpc);
226 } else if (s->stream) {
227 stream_close(s->stream);
233 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
235 ovsdb_jsonrpc_session_disconnect(s);
237 int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
239 reconnect_connect_failed(s->reconnect, time_msec(), error);
241 reconnect_connecting(s->reconnect, time_msec());
247 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
250 struct jsonrpc_msg *msg;
255 ovsdb_jsonrpc_trigger_complete_done(s);
257 if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
258 reconnect_received(s->reconnect, time_msec());
259 if (msg->type == JSONRPC_REQUEST) {
260 ovsdb_jsonrpc_session_got_request(s, msg);
261 } else if (msg->type == JSONRPC_NOTIFY) {
262 ovsdb_jsonrpc_session_got_notify(s, msg);
263 } else if (msg->type == JSONRPC_REPLY
264 && msg->id && msg->id->type == JSON_STRING
265 && !strcmp(msg->id->u.string, "echo")) {
266 /* It's a reply to our echo request. Ignore it. */
268 VLOG_WARN("%s: received unexpected %s message",
269 jsonrpc_get_name(s->rpc),
270 jsonrpc_msg_type_to_string(msg->type));
271 jsonrpc_error(s->rpc, EPROTO);
272 jsonrpc_msg_destroy(msg);
276 error = jsonrpc_get_status(s->rpc);
279 ovsdb_jsonrpc_session_disconnect(s);
284 } else if (s->stream) {
285 int error = stream_connect(s->stream);
287 reconnect_connected(s->reconnect, time_msec());
288 s->rpc = jsonrpc_open(s->stream);
290 } else if (error != EAGAIN) {
291 reconnect_connect_failed(s->reconnect, time_msec(), error);
292 stream_close(s->stream);
297 switch (reconnect_run(s->reconnect, time_msec())) {
298 case RECONNECT_CONNECT:
299 ovsdb_jsonrpc_session_connect(s);
302 case RECONNECT_DISCONNECT:
303 ovsdb_jsonrpc_session_disconnect(s);
306 case RECONNECT_PROBE:
309 struct jsonrpc_msg *request;
311 params = json_array_create_empty();
312 request = jsonrpc_create_request("echo", params);
313 json_destroy(request->id);
314 request->id = json_string_create("echo");
315 jsonrpc_send(s->rpc, request);
319 return s->active || s->rpc ? 0 : ETIMEDOUT;
323 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
325 struct ovsdb_jsonrpc_session *s, *next;
327 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
329 int error = ovsdb_jsonrpc_session_run(s);
331 ovsdb_jsonrpc_session_close(s);
337 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
340 jsonrpc_wait(s->rpc);
341 if (!jsonrpc_get_backlog(s->rpc)) {
342 jsonrpc_recv_wait(s->rpc);
344 } else if (s->stream) {
345 stream_connect_wait(s->stream);
347 reconnect_wait(s->reconnect, time_msec());
351 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
353 struct ovsdb_jsonrpc_session *s;
355 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
356 ovsdb_jsonrpc_session_wait(s);
360 static struct jsonrpc_msg *
361 execute_transaction(struct ovsdb_jsonrpc_session *s,
362 struct jsonrpc_msg *request)
364 ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
366 request->params = NULL;
371 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
372 struct jsonrpc_msg *request)
374 struct jsonrpc_msg *reply;
376 if (!strcmp(request->method, "transact")) {
377 reply = execute_transaction(s, request);
378 } else if (!strcmp(request->method, "get_schema")) {
379 reply = jsonrpc_create_reply(
380 ovsdb_schema_to_json(s->server->db->schema), request->id);
381 } else if (!strcmp(request->method, "echo")) {
382 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
384 reply = jsonrpc_create_error(json_string_create("unknown method"),
389 jsonrpc_msg_destroy(request);
390 jsonrpc_send(s->rpc, reply);
395 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
397 if (json_array(request->params)->n == 1) {
398 struct ovsdb_jsonrpc_trigger *t;
401 id = request->params->u.array.elems[0];
402 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
404 ovsdb_jsonrpc_trigger_complete(t);
410 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
411 struct jsonrpc_msg *request)
413 if (!strcmp(request->method, "cancel")) {
414 execute_cancel(s, request);
416 jsonrpc_msg_destroy(request);
419 /* JSON-RPC database server triggers.
421 * (Every transaction is treated as a trigger even if it doesn't actually have
422 * any "wait" operations.) */
424 struct ovsdb_jsonrpc_trigger {
425 struct ovsdb_trigger trigger;
426 struct ovsdb_jsonrpc_session *session;
427 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
432 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
433 struct json *id, struct json *params)
435 struct ovsdb_jsonrpc_trigger *t;
438 /* Check for duplicate ID. */
439 hash = json_hash(id, 0);
440 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
442 jsonrpc_send(s->rpc, jsonrpc_create_error(
443 json_string_create("duplicate request ID"), id));
445 json_destroy(params);
449 /* Insert into trigger table. */
450 t = xmalloc(sizeof *t);
451 ovsdb_trigger_init(s->server->db,
452 &t->trigger, params, &s->completions,
456 hmap_insert(&s->triggers, &t->hmap_node, hash);
458 /* Complete early if possible. */
459 if (ovsdb_trigger_is_complete(&t->trigger)) {
460 ovsdb_jsonrpc_trigger_complete(t);
464 static struct ovsdb_jsonrpc_trigger *
465 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
466 const struct json *id, size_t hash)
468 struct ovsdb_jsonrpc_trigger *t;
470 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
472 if (json_equal(t->id, id)) {
481 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
483 struct ovsdb_jsonrpc_session *s = t->session;
485 if (s->rpc && !jsonrpc_get_status(s->rpc)) {
486 struct jsonrpc_msg *reply;
489 result = ovsdb_trigger_steal_result(&t->trigger);
491 reply = jsonrpc_create_reply(result, t->id);
493 reply = jsonrpc_create_error(json_string_create("canceled"),
496 jsonrpc_send(s->rpc, reply);
500 ovsdb_trigger_destroy(&t->trigger);
501 hmap_remove(&s->triggers, &t->hmap_node);
506 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
508 struct ovsdb_jsonrpc_trigger *t, *next;
509 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
511 ovsdb_jsonrpc_trigger_complete(t);
516 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
518 while (!list_is_empty(&s->completions)) {
519 struct ovsdb_jsonrpc_trigger *t
520 = CONTAINER_OF(s->completions.next,
521 struct ovsdb_jsonrpc_trigger, trigger.node);
522 ovsdb_jsonrpc_trigger_complete(t);