ovsdb: Refactor JSON-RPC database server implementation.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "column.h"
23 #include "json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb.h"
26 #include "reconnect.h"
27 #include "stream.h"
28 #include "timeval.h"
29 #include "trigger.h"
30
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
32 #include "vlog.h"
33
34 struct ovsdb_jsonrpc_session;
35
36 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
37                                                 const char *name);
38 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
39                                                  struct stream *);
40 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
41 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
42
43 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
44                                          struct json *id, struct json *params);
45 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
46     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
47 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
48 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
49 static void ovsdb_jsonrpc_trigger_complete_done(
50     struct ovsdb_jsonrpc_session *);
51 \f
52 /* JSON-RPC database server. */
53
54 struct ovsdb_jsonrpc_server {
55     struct ovsdb *db;
56
57     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
58     unsigned int n_sessions, max_sessions;
59     unsigned int max_triggers;
60
61     struct pstream **listeners;
62     size_t n_listeners, allocated_listeners;
63 };
64
65 struct ovsdb_jsonrpc_server *
66 ovsdb_jsonrpc_server_create(struct ovsdb *db)
67 {
68     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
69     server->db = db;
70     server->max_sessions = 64;
71     server->max_triggers = 64;
72     list_init(&server->sessions);
73     return server;
74 }
75
76 int
77 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
78 {
79     struct pstream *pstream;
80     int error;
81
82     error = pstream_open(name, &pstream);
83     if (error) {
84         return error;
85     }
86
87     if (svr->n_listeners >= svr->allocated_listeners) {
88         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
89                                     sizeof *svr->listeners);
90     }
91     svr->listeners[svr->n_listeners++] = pstream;
92     return 0;
93 }
94
95 void
96 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
97                              const char *name)
98 {
99     ovsdb_jsonrpc_session_create_active(svr, name);
100 }
101
102 void
103 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
104 {
105     size_t i;
106
107     /* Accept new connections. */
108     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
109         struct pstream *listener = svr->listeners[i];
110         struct stream *stream;
111         int error;
112
113         error = pstream_accept(listener, &stream);
114         if (!error) {
115             ovsdb_jsonrpc_session_create_passive(svr, stream);
116         } else if (error == EAGAIN) {
117             i++;
118         } else if (error) {
119             VLOG_WARN("%s: accept failed: %s",
120                       pstream_get_name(listener), strerror(error));
121             pstream_close(listener);
122             svr->listeners[i] = svr->listeners[--svr->n_listeners];
123         }
124     }
125
126     /* Handle each session. */
127     ovsdb_jsonrpc_session_run_all(svr);
128 }
129
130 void
131 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
132 {
133     if (svr->n_sessions < svr->max_sessions) {
134         size_t i;
135
136         for (i = 0; i < svr->n_listeners; i++) {
137             pstream_wait(svr->listeners[i]);
138         }
139     }
140
141     ovsdb_jsonrpc_session_wait_all(svr);
142 }
143 \f
144 /* JSON-RPC database server session. */
145
146 struct ovsdb_jsonrpc_session {
147     struct ovsdb_jsonrpc_server *server;
148     struct list node;           /* Element in server's sessions list. */
149
150     /* Triggers. */
151     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
152     struct list completions;    /* Completed triggers. */
153
154     /* Connecting and reconnecting. */
155     struct reconnect *reconnect; /* For back-off. */
156     bool active;                /* Active or passive connection? */
157     struct jsonrpc *rpc;
158     struct stream *stream;      /* Only if active == false and rpc == NULL. */
159 };
160
161 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
162 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
163 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
164 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
165 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
166                                              struct jsonrpc_msg *);
167 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
168                                              struct jsonrpc_msg *);
169
170 static struct ovsdb_jsonrpc_session *
171 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
172                              const char *name, bool active)
173 {
174     struct ovsdb_jsonrpc_session *s;
175
176     s = xzalloc(sizeof *s);
177     s->server = svr;
178     list_push_back(&svr->sessions, &s->node);
179     hmap_init(&s->triggers);
180     list_init(&s->completions);
181     s->reconnect = reconnect_create(time_msec());
182     reconnect_set_name(s->reconnect, name);
183     reconnect_enable(s->reconnect, time_msec());
184     s->active = active;
185
186     svr->n_sessions++;
187
188     return s;
189 }
190
191 static void
192 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
193                                     const char *name)
194 {
195     ovsdb_jsonrpc_session_create(svr, name, true);
196 }
197
198 static void
199 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
200                                      struct stream *stream)
201 {
202     struct ovsdb_jsonrpc_session *s;
203
204     s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
205     reconnect_connected(s->reconnect, time_msec());
206     s->rpc = jsonrpc_open(stream);
207 }
208
209 static void
210 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
211 {
212     ovsdb_jsonrpc_session_disconnect(s);
213     list_remove(&s->node);
214     s->server->n_sessions--;
215 }
216
217 static void
218 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
219 {
220     reconnect_disconnected(s->reconnect, time_msec(), 0);
221     if (s->rpc) {
222         jsonrpc_error(s->rpc, EOF);
223         ovsdb_jsonrpc_trigger_complete_all(s);
224         jsonrpc_close(s->rpc);
225         s->rpc = NULL;
226     } else if (s->stream) {
227         stream_close(s->stream);
228         s->stream = NULL;
229     }
230 }
231
232 static void
233 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
234 {
235     ovsdb_jsonrpc_session_disconnect(s);
236     if (s->active) {
237         int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
238         if (error) {
239             reconnect_connect_failed(s->reconnect, time_msec(), error);
240         } else {
241             reconnect_connecting(s->reconnect, time_msec());
242         }
243     }
244 }
245
246 static int
247 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
248 {
249     if (s->rpc) {
250         struct jsonrpc_msg *msg;
251         int error;
252
253         jsonrpc_run(s->rpc);
254
255         ovsdb_jsonrpc_trigger_complete_done(s);
256
257         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
258             reconnect_received(s->reconnect, time_msec());
259             if (msg->type == JSONRPC_REQUEST) {
260                 ovsdb_jsonrpc_session_got_request(s, msg);
261             } else if (msg->type == JSONRPC_NOTIFY) {
262                 ovsdb_jsonrpc_session_got_notify(s, msg);
263             } else {
264                 VLOG_WARN("%s: received unexpected %s message",
265                           jsonrpc_get_name(s->rpc),
266                           jsonrpc_msg_type_to_string(msg->type));
267                 jsonrpc_error(s->rpc, EPROTO);
268                 jsonrpc_msg_destroy(msg);
269             }
270         }
271
272         error = jsonrpc_get_status(s->rpc);
273         if (error) {
274             if (s->active) {
275                 ovsdb_jsonrpc_session_disconnect(s);
276             } else {
277                 return error;
278             }
279         }
280     } else if (s->stream) {
281         int error = stream_connect(s->stream);
282         if (!error) {
283             reconnect_connected(s->reconnect, time_msec());
284             s->rpc = jsonrpc_open(s->stream);
285             s->stream = NULL;
286         } else if (error != EAGAIN) {
287             reconnect_connect_failed(s->reconnect, time_msec(), error);
288             stream_close(s->stream);
289             s->stream = NULL;
290         }
291     }
292
293     switch (reconnect_run(s->reconnect, time_msec())) {
294     case RECONNECT_CONNECT:
295         ovsdb_jsonrpc_session_connect(s);
296         break;
297
298     case RECONNECT_DISCONNECT:
299         ovsdb_jsonrpc_session_disconnect(s);
300         break;
301
302     case RECONNECT_PROBE:
303         if (s->rpc) {
304             struct json *params = json_array_create_empty();
305             jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
306         }
307         break;
308     }
309     return s->active || s->rpc ? 0 : ETIMEDOUT;
310 }
311
312 static void
313 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
314 {
315     struct ovsdb_jsonrpc_session *s, *next;
316
317     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
318                         &svr->sessions) {
319         int error = ovsdb_jsonrpc_session_run(s);
320         if (error) {
321             ovsdb_jsonrpc_session_close(s);
322         }
323     }
324 }
325
326 static void
327 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
328 {
329     if (s->rpc) {
330         jsonrpc_wait(s->rpc);
331         if (!jsonrpc_get_backlog(s->rpc)) {
332             jsonrpc_recv_wait(s->rpc);
333         }
334     } else if (s->stream) {
335         stream_connect_wait(s->stream);
336     }
337     reconnect_wait(s->reconnect, time_msec());
338 }
339
340 static void
341 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
342 {
343     struct ovsdb_jsonrpc_session *s;
344
345     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
346         ovsdb_jsonrpc_session_wait(s);
347     }
348 }
349
350 static struct jsonrpc_msg *
351 execute_transaction(struct ovsdb_jsonrpc_session *s,
352                     struct jsonrpc_msg *request)
353 {
354     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
355     request->id = NULL;
356     request->params = NULL;
357     return NULL;
358 }
359
360 static void
361 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
362                                   struct jsonrpc_msg *request)
363 {
364     struct jsonrpc_msg *reply;
365
366     if (!strcmp(request->method, "transact")) {
367         reply = execute_transaction(s, request);
368     } else if (!strcmp(request->method, "get_schema")) {
369         reply = jsonrpc_create_reply(
370             ovsdb_schema_to_json(s->server->db->schema), request->id);
371     } else if (!strcmp(request->method, "echo")) {
372         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
373     } else {
374         reply = jsonrpc_create_error(json_string_create("unknown method"),
375                                      request->id);
376     }
377
378     if (reply) {
379         jsonrpc_msg_destroy(request);
380         jsonrpc_send(s->rpc, reply);
381     }
382 }
383
384 static void
385 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
386 {
387     if (json_array(request->params)->n == 1) {
388         struct ovsdb_jsonrpc_trigger *t;
389         struct json *id;
390
391         id = request->params->u.array.elems[0];
392         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
393         if (t) {
394             ovsdb_jsonrpc_trigger_complete(t);
395         }
396     }
397 }
398
399 static void
400 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
401                                  struct jsonrpc_msg *request)
402 {
403     if (!strcmp(request->method, "cancel")) {
404         execute_cancel(s, request);
405     }
406     jsonrpc_msg_destroy(request);
407 }
408 \f
409 /* JSON-RPC database server triggers.
410  *
411  * (Every transaction is treated as a trigger even if it doesn't actually have
412  * any "wait" operations.) */
413
414 struct ovsdb_jsonrpc_trigger {
415     struct ovsdb_trigger trigger;
416     struct ovsdb_jsonrpc_session *session;
417     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
418     struct json *id;
419 };
420
421 static void
422 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
423                              struct json *id, struct json *params)
424 {
425     struct ovsdb_jsonrpc_trigger *t;
426     size_t hash;
427
428     /* Check for duplicate ID. */
429     hash = json_hash(id, 0);
430     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
431     if (t) {
432         jsonrpc_send(s->rpc, jsonrpc_create_error(
433                          json_string_create("duplicate request ID"), id));
434         json_destroy(id);
435         json_destroy(params);
436         return;
437     }
438
439     /* Insert into trigger table. */
440     t = xmalloc(sizeof *t);
441     ovsdb_trigger_init(s->server->db,
442                        &t->trigger, params, &s->completions,
443                        time_msec());
444     t->session = s;
445     t->id = id;
446     hmap_insert(&s->triggers, &t->hmap_node, hash);
447
448     /* Complete early if possible. */
449     if (ovsdb_trigger_is_complete(&t->trigger)) {
450         ovsdb_jsonrpc_trigger_complete(t);
451     }
452 }
453
454 static struct ovsdb_jsonrpc_trigger *
455 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
456                            const struct json *id, size_t hash)
457 {
458     struct ovsdb_jsonrpc_trigger *t;
459
460     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
461                              &s->triggers) {
462         if (json_equal(t->id, id)) {
463             return t;
464         }
465     }
466
467     return NULL;
468 }
469
470 static void
471 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
472 {
473     struct ovsdb_jsonrpc_session *s = t->session;
474
475     if (s->rpc && !jsonrpc_get_status(s->rpc)) {
476         struct jsonrpc_msg *reply;
477         struct json *result;
478
479         result = ovsdb_trigger_steal_result(&t->trigger);
480         if (result) {
481             reply = jsonrpc_create_reply(result, t->id);
482         } else {
483             reply = jsonrpc_create_error(json_string_create("canceled"),
484                                          t->id);
485         }
486         jsonrpc_send(s->rpc, reply);
487     }
488
489     json_destroy(t->id);
490     ovsdb_trigger_destroy(&t->trigger);
491     hmap_remove(&s->triggers, &t->hmap_node);
492     free(t);
493 }
494
495 static void
496 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
497 {
498     struct ovsdb_jsonrpc_trigger *t, *next;
499     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
500                         &s->triggers) {
501         ovsdb_jsonrpc_trigger_complete(t);
502     }
503 }
504
505 static void
506 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
507 {
508     while (!list_is_empty(&s->completions)) {
509         struct ovsdb_jsonrpc_trigger *t
510             = CONTAINER_OF(s->completions.next,
511                            struct ovsdb_jsonrpc_trigger, trigger.node);
512         ovsdb_jsonrpc_trigger_complete(t);
513     }
514 }