ovsdb-server: Ignore replies to echo requests.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "column.h"
23 #include "json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb.h"
26 #include "reconnect.h"
27 #include "stream.h"
28 #include "timeval.h"
29 #include "trigger.h"
30
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
32 #include "vlog.h"
33
34 struct ovsdb_jsonrpc_session;
35
36 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
37                                                 const char *name);
38 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
39                                                  struct stream *);
40 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
41 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
42
43 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
44                                          struct json *id, struct json *params);
45 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
46     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
47 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
48 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
49 static void ovsdb_jsonrpc_trigger_complete_done(
50     struct ovsdb_jsonrpc_session *);
51 \f
52 /* JSON-RPC database server. */
53
54 struct ovsdb_jsonrpc_server {
55     struct ovsdb *db;
56
57     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
58     unsigned int n_sessions, max_sessions;
59     unsigned int max_triggers;
60
61     struct pstream **listeners;
62     size_t n_listeners, allocated_listeners;
63 };
64
65 struct ovsdb_jsonrpc_server *
66 ovsdb_jsonrpc_server_create(struct ovsdb *db)
67 {
68     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
69     server->db = db;
70     server->max_sessions = 64;
71     server->max_triggers = 64;
72     list_init(&server->sessions);
73     return server;
74 }
75
76 int
77 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
78 {
79     struct pstream *pstream;
80     int error;
81
82     error = pstream_open(name, &pstream);
83     if (error) {
84         return error;
85     }
86
87     if (svr->n_listeners >= svr->allocated_listeners) {
88         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
89                                     sizeof *svr->listeners);
90     }
91     svr->listeners[svr->n_listeners++] = pstream;
92     return 0;
93 }
94
95 void
96 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
97                              const char *name)
98 {
99     ovsdb_jsonrpc_session_create_active(svr, name);
100 }
101
102 void
103 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
104 {
105     size_t i;
106
107     /* Accept new connections. */
108     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
109         struct pstream *listener = svr->listeners[i];
110         struct stream *stream;
111         int error;
112
113         error = pstream_accept(listener, &stream);
114         if (!error) {
115             ovsdb_jsonrpc_session_create_passive(svr, stream);
116         } else if (error == EAGAIN) {
117             i++;
118         } else if (error) {
119             VLOG_WARN("%s: accept failed: %s",
120                       pstream_get_name(listener), strerror(error));
121             pstream_close(listener);
122             svr->listeners[i] = svr->listeners[--svr->n_listeners];
123         }
124     }
125
126     /* Handle each session. */
127     ovsdb_jsonrpc_session_run_all(svr);
128 }
129
130 void
131 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
132 {
133     if (svr->n_sessions < svr->max_sessions) {
134         size_t i;
135
136         for (i = 0; i < svr->n_listeners; i++) {
137             pstream_wait(svr->listeners[i]);
138         }
139     }
140
141     ovsdb_jsonrpc_session_wait_all(svr);
142 }
143 \f
144 /* JSON-RPC database server session. */
145
146 struct ovsdb_jsonrpc_session {
147     struct ovsdb_jsonrpc_server *server;
148     struct list node;           /* Element in server's sessions list. */
149
150     /* Triggers. */
151     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
152     struct list completions;    /* Completed triggers. */
153
154     /* Connecting and reconnecting. */
155     struct reconnect *reconnect; /* For back-off. */
156     bool active;                /* Active or passive connection? */
157     struct jsonrpc *rpc;
158     struct stream *stream;      /* Only if active == false and rpc == NULL. */
159 };
160
161 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
162 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
163 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
164 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
165 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
166                                              struct jsonrpc_msg *);
167 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
168                                              struct jsonrpc_msg *);
169
170 static struct ovsdb_jsonrpc_session *
171 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
172                              const char *name, bool active)
173 {
174     struct ovsdb_jsonrpc_session *s;
175
176     s = xzalloc(sizeof *s);
177     s->server = svr;
178     list_push_back(&svr->sessions, &s->node);
179     hmap_init(&s->triggers);
180     list_init(&s->completions);
181     s->reconnect = reconnect_create(time_msec());
182     reconnect_set_name(s->reconnect, name);
183     reconnect_enable(s->reconnect, time_msec());
184     s->active = active;
185
186     svr->n_sessions++;
187
188     return s;
189 }
190
191 static void
192 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
193                                     const char *name)
194 {
195     ovsdb_jsonrpc_session_create(svr, name, true);
196 }
197
198 static void
199 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
200                                      struct stream *stream)
201 {
202     struct ovsdb_jsonrpc_session *s;
203
204     s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
205     reconnect_connected(s->reconnect, time_msec());
206     s->rpc = jsonrpc_open(stream);
207 }
208
209 static void
210 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
211 {
212     ovsdb_jsonrpc_session_disconnect(s);
213     list_remove(&s->node);
214     s->server->n_sessions--;
215 }
216
217 static void
218 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
219 {
220     reconnect_disconnected(s->reconnect, time_msec(), 0);
221     if (s->rpc) {
222         jsonrpc_error(s->rpc, EOF);
223         ovsdb_jsonrpc_trigger_complete_all(s);
224         jsonrpc_close(s->rpc);
225         s->rpc = NULL;
226     } else if (s->stream) {
227         stream_close(s->stream);
228         s->stream = NULL;
229     }
230 }
231
232 static void
233 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
234 {
235     ovsdb_jsonrpc_session_disconnect(s);
236     if (s->active) {
237         int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
238         if (error) {
239             reconnect_connect_failed(s->reconnect, time_msec(), error);
240         } else {
241             reconnect_connecting(s->reconnect, time_msec());
242         }
243     }
244 }
245
246 static int
247 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
248 {
249     if (s->rpc) {
250         struct jsonrpc_msg *msg;
251         int error;
252
253         jsonrpc_run(s->rpc);
254
255         ovsdb_jsonrpc_trigger_complete_done(s);
256
257         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
258             reconnect_received(s->reconnect, time_msec());
259             if (msg->type == JSONRPC_REQUEST) {
260                 ovsdb_jsonrpc_session_got_request(s, msg);
261             } else if (msg->type == JSONRPC_NOTIFY) {
262                 ovsdb_jsonrpc_session_got_notify(s, msg);
263             } else if (msg->type == JSONRPC_REPLY
264                        && msg->id && msg->id->type == JSON_STRING
265                        && !strcmp(msg->id->u.string, "echo")) {
266                 /* It's a reply to our echo request.  Ignore it. */
267             } else {
268                 VLOG_WARN("%s: received unexpected %s message",
269                           jsonrpc_get_name(s->rpc),
270                           jsonrpc_msg_type_to_string(msg->type));
271                 jsonrpc_error(s->rpc, EPROTO);
272                 jsonrpc_msg_destroy(msg);
273             }
274         }
275
276         error = jsonrpc_get_status(s->rpc);
277         if (error) {
278             if (s->active) {
279                 ovsdb_jsonrpc_session_disconnect(s);
280             } else {
281                 return error;
282             }
283         }
284     } else if (s->stream) {
285         int error = stream_connect(s->stream);
286         if (!error) {
287             reconnect_connected(s->reconnect, time_msec());
288             s->rpc = jsonrpc_open(s->stream);
289             s->stream = NULL;
290         } else if (error != EAGAIN) {
291             reconnect_connect_failed(s->reconnect, time_msec(), error);
292             stream_close(s->stream);
293             s->stream = NULL;
294         }
295     }
296
297     switch (reconnect_run(s->reconnect, time_msec())) {
298     case RECONNECT_CONNECT:
299         ovsdb_jsonrpc_session_connect(s);
300         break;
301
302     case RECONNECT_DISCONNECT:
303         ovsdb_jsonrpc_session_disconnect(s);
304         break;
305
306     case RECONNECT_PROBE:
307         if (s->rpc) {
308             struct json *params;
309             struct jsonrpc_msg *request;
310
311             params = json_array_create_empty();
312             request = jsonrpc_create_request("echo", params);
313             json_destroy(request->id);
314             request->id = json_string_create("echo");
315             jsonrpc_send(s->rpc, request);
316         }
317         break;
318     }
319     return s->active || s->rpc ? 0 : ETIMEDOUT;
320 }
321
322 static void
323 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
324 {
325     struct ovsdb_jsonrpc_session *s, *next;
326
327     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
328                         &svr->sessions) {
329         int error = ovsdb_jsonrpc_session_run(s);
330         if (error) {
331             ovsdb_jsonrpc_session_close(s);
332         }
333     }
334 }
335
336 static void
337 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
338 {
339     if (s->rpc) {
340         jsonrpc_wait(s->rpc);
341         if (!jsonrpc_get_backlog(s->rpc)) {
342             jsonrpc_recv_wait(s->rpc);
343         }
344     } else if (s->stream) {
345         stream_connect_wait(s->stream);
346     }
347     reconnect_wait(s->reconnect, time_msec());
348 }
349
350 static void
351 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
352 {
353     struct ovsdb_jsonrpc_session *s;
354
355     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
356         ovsdb_jsonrpc_session_wait(s);
357     }
358 }
359
360 static struct jsonrpc_msg *
361 execute_transaction(struct ovsdb_jsonrpc_session *s,
362                     struct jsonrpc_msg *request)
363 {
364     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
365     request->id = NULL;
366     request->params = NULL;
367     return NULL;
368 }
369
370 static void
371 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
372                                   struct jsonrpc_msg *request)
373 {
374     struct jsonrpc_msg *reply;
375
376     if (!strcmp(request->method, "transact")) {
377         reply = execute_transaction(s, request);
378     } else if (!strcmp(request->method, "get_schema")) {
379         reply = jsonrpc_create_reply(
380             ovsdb_schema_to_json(s->server->db->schema), request->id);
381     } else if (!strcmp(request->method, "echo")) {
382         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
383     } else {
384         reply = jsonrpc_create_error(json_string_create("unknown method"),
385                                      request->id);
386     }
387
388     if (reply) {
389         jsonrpc_msg_destroy(request);
390         jsonrpc_send(s->rpc, reply);
391     }
392 }
393
394 static void
395 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
396 {
397     if (json_array(request->params)->n == 1) {
398         struct ovsdb_jsonrpc_trigger *t;
399         struct json *id;
400
401         id = request->params->u.array.elems[0];
402         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
403         if (t) {
404             ovsdb_jsonrpc_trigger_complete(t);
405         }
406     }
407 }
408
409 static void
410 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
411                                  struct jsonrpc_msg *request)
412 {
413     if (!strcmp(request->method, "cancel")) {
414         execute_cancel(s, request);
415     }
416     jsonrpc_msg_destroy(request);
417 }
418 \f
419 /* JSON-RPC database server triggers.
420  *
421  * (Every transaction is treated as a trigger even if it doesn't actually have
422  * any "wait" operations.) */
423
424 struct ovsdb_jsonrpc_trigger {
425     struct ovsdb_trigger trigger;
426     struct ovsdb_jsonrpc_session *session;
427     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
428     struct json *id;
429 };
430
431 static void
432 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
433                              struct json *id, struct json *params)
434 {
435     struct ovsdb_jsonrpc_trigger *t;
436     size_t hash;
437
438     /* Check for duplicate ID. */
439     hash = json_hash(id, 0);
440     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
441     if (t) {
442         jsonrpc_send(s->rpc, jsonrpc_create_error(
443                          json_string_create("duplicate request ID"), id));
444         json_destroy(id);
445         json_destroy(params);
446         return;
447     }
448
449     /* Insert into trigger table. */
450     t = xmalloc(sizeof *t);
451     ovsdb_trigger_init(s->server->db,
452                        &t->trigger, params, &s->completions,
453                        time_msec());
454     t->session = s;
455     t->id = id;
456     hmap_insert(&s->triggers, &t->hmap_node, hash);
457
458     /* Complete early if possible. */
459     if (ovsdb_trigger_is_complete(&t->trigger)) {
460         ovsdb_jsonrpc_trigger_complete(t);
461     }
462 }
463
464 static struct ovsdb_jsonrpc_trigger *
465 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
466                            const struct json *id, size_t hash)
467 {
468     struct ovsdb_jsonrpc_trigger *t;
469
470     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
471                              &s->triggers) {
472         if (json_equal(t->id, id)) {
473             return t;
474         }
475     }
476
477     return NULL;
478 }
479
480 static void
481 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
482 {
483     struct ovsdb_jsonrpc_session *s = t->session;
484
485     if (s->rpc && !jsonrpc_get_status(s->rpc)) {
486         struct jsonrpc_msg *reply;
487         struct json *result;
488
489         result = ovsdb_trigger_steal_result(&t->trigger);
490         if (result) {
491             reply = jsonrpc_create_reply(result, t->id);
492         } else {
493             reply = jsonrpc_create_error(json_string_create("canceled"),
494                                          t->id);
495         }
496         jsonrpc_send(s->rpc, reply);
497     }
498
499     json_destroy(t->id);
500     ovsdb_trigger_destroy(&t->trigger);
501     hmap_remove(&s->triggers, &t->hmap_node);
502     free(t);
503 }
504
505 static void
506 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
507 {
508     struct ovsdb_jsonrpc_trigger *t, *next;
509     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
510                         &s->triggers) {
511         ovsdb_jsonrpc_trigger_complete(t);
512     }
513 }
514
515 static void
516 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
517 {
518     while (!list_is_empty(&s->completions)) {
519         struct ovsdb_jsonrpc_trigger *t
520             = CONTAINER_OF(s->completions.next,
521                            struct ovsdb_jsonrpc_trigger, trigger.node);
522         ovsdb_jsonrpc_trigger_complete(t);
523     }
524 }