ovsdb: Fix use of non-array for JSON-RPC parameters.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "json.h"
23 #include "jsonrpc.h"
24 #include "ovsdb.h"
25 #include "reconnect.h"
26 #include "stream.h"
27 #include "svec.h"
28 #include "timeval.h"
29 #include "trigger.h"
30
31 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
32 #include "vlog.h"
33
34 struct ovsdb_jsonrpc_trigger {
35     struct ovsdb_trigger trigger;
36     struct ovsdb_jsonrpc_session *session;
37     struct hmap_node hmap_node; /* Element in session's trigger table. */
38     struct json *id;
39 };
40
41 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
42     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
43 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
44
45 struct ovsdb_jsonrpc_session {
46     struct ovsdb_jsonrpc_server *server;
47     struct list node;           /* Element in server's sessions list. */
48     struct hmap triggers;
49     struct list completions;    /* Completed triggers. */
50
51     struct reconnect *reconnect; /* For back-off. */
52     bool active;                /* Active or passive connection? */
53     struct jsonrpc *rpc;
54     struct stream *stream;      /* Only if active == false and rpc == NULL. */
55 };
56
57 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
58                                                 const char *name);
59 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
60                                                  struct stream *);
61 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
62 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
63 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
64 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
65 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
66                                              struct jsonrpc_msg *);
67 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
68                                              struct jsonrpc_msg *);
69
70 struct ovsdb_jsonrpc_server {
71     struct ovsdb *db;
72
73     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
74     unsigned int n_sessions, max_sessions;
75     unsigned int max_triggers;
76
77     struct pstream **listeners;
78     size_t n_listeners, allocated_listeners;
79 };
80
81 static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
82                                         struct pstream *);
83
84 int
85 ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
86                             const struct svec *passive,
87                             struct ovsdb_jsonrpc_server **serverp)
88 {
89     struct ovsdb_jsonrpc_server *server;
90     const char *name;
91     int retval = 0;
92     size_t i;
93
94     server = xzalloc(sizeof *server);
95     server->db = db;
96     server->max_sessions = 64;
97     server->max_triggers = 64;
98     list_init(&server->sessions);
99
100     SVEC_FOR_EACH (i, name, active) {
101         ovsdb_jsonrpc_session_create_active(server, name);
102     }
103
104     SVEC_FOR_EACH (i, name, passive) {
105         struct pstream *pstream;
106         int error;
107
108         error = pstream_open(name, &pstream);
109         if (!error) {
110             ovsdb_jsonrpc_server_listen(server, pstream);
111         } else {
112             ovs_error(error, "failed to listen on %s", name);
113             retval = error;
114         }
115     }
116
117     *serverp = server;
118     return retval;
119 }
120
121 void
122 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
123 {
124     struct ovsdb_jsonrpc_session *s, *next;
125     size_t i;
126
127     /* Accept new connections. */
128     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
129         struct pstream *listener = svr->listeners[i];
130         struct stream *stream;
131         int error;
132
133         error = pstream_accept(listener, &stream);
134         if (!error) {
135             ovsdb_jsonrpc_session_create_passive(svr, stream);
136         } else if (error == EAGAIN) {
137             i++;
138         } else if (error) {
139             VLOG_WARN("%s: accept failed: %s",
140                       pstream_get_name(listener), strerror(error));
141             pstream_close(listener);
142             svr->listeners[i] = svr->listeners[--svr->n_listeners];
143         }
144     }
145
146     /* Handle each session. */
147     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
148                         &svr->sessions) {
149         int error = ovsdb_jsonrpc_session_run(s);
150         if (error) {
151             ovsdb_jsonrpc_session_close(s);
152         }
153     }
154 }
155
156 void
157 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
158 {
159     struct ovsdb_jsonrpc_session *s;
160
161     if (svr->n_sessions < svr->max_sessions) {
162         size_t i;
163
164         for (i = 0; i < svr->n_listeners; i++) {
165             pstream_wait(svr->listeners[i]);
166         }
167     }
168
169     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
170         ovsdb_jsonrpc_session_wait(s);
171     }
172 }
173
174 static void
175 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
176                             struct pstream *pstream)
177 {
178     if (svr->n_listeners >= svr->allocated_listeners) {
179         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
180                                     sizeof *svr->listeners);
181     }
182     svr->listeners[svr->n_listeners++] = pstream;
183 }
184
185 static struct ovsdb_jsonrpc_trigger *
186 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
187                            const struct json *id, size_t hash)
188 {
189     struct ovsdb_jsonrpc_trigger *t;
190
191     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
192                              &s->triggers) {
193         if (json_equal(t->id, id)) {
194             return t;
195         }
196     }
197
198     return NULL;
199 }
200
201 static void
202 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
203 {
204     struct ovsdb_jsonrpc_session *s = t->session;
205
206     if (s->rpc && !jsonrpc_get_status(s->rpc)) {
207         struct jsonrpc_msg *reply;
208         struct json *result;
209
210         result = ovsdb_trigger_steal_result(&t->trigger);
211         if (result) {
212             reply = jsonrpc_create_reply(result, t->id);
213         } else {
214             reply = jsonrpc_create_error(json_string_create("canceled"),
215                                          t->id);
216         }
217         jsonrpc_send(s->rpc, reply);
218     }
219
220     json_destroy(t->id);
221     ovsdb_trigger_destroy(&t->trigger);
222     hmap_remove(&s->triggers, &t->hmap_node);
223     free(t);
224 }
225
226 static struct ovsdb_jsonrpc_session *
227 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
228                              const char *name, bool active)
229 {
230     struct ovsdb_jsonrpc_session *s;
231
232     s = xzalloc(sizeof *s);
233     s->server = svr;
234     list_push_back(&svr->sessions, &s->node);
235     hmap_init(&s->triggers);
236     list_init(&s->completions);
237     s->reconnect = reconnect_create(time_msec());
238     reconnect_set_name(s->reconnect, name);
239     reconnect_enable(s->reconnect, time_msec());
240     s->active = active;
241
242     svr->n_sessions++;
243
244     return s;
245 }
246
247 static void
248 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
249                                     const char *name)
250 {
251     ovsdb_jsonrpc_session_create(svr, name, true);
252 }
253
254 static void
255 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
256                                      struct stream *stream)
257 {
258     struct ovsdb_jsonrpc_session *s;
259
260     s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
261     reconnect_connected(s->reconnect, time_msec());
262     s->rpc = jsonrpc_open(stream);
263 }
264
265 static void
266 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
267 {
268     ovsdb_jsonrpc_session_disconnect(s);
269     list_remove(&s->node);
270     s->server->n_sessions--;
271 }
272
273 static void
274 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
275 {
276     reconnect_disconnected(s->reconnect, time_msec(), 0);
277     if (s->rpc) {
278         struct ovsdb_jsonrpc_trigger *t, *next;
279
280         jsonrpc_error(s->rpc, EOF);
281         HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
282                             &s->triggers) {
283             ovsdb_jsonrpc_trigger_complete(t);
284         }
285
286         jsonrpc_close(s->rpc);
287         s->rpc = NULL;
288     } else if (s->stream) {
289         stream_close(s->stream);
290         s->stream = NULL;
291     }
292 }
293
294 static void
295 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
296 {
297     ovsdb_jsonrpc_session_disconnect(s);
298     if (s->active) {
299         int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
300         if (error) {
301             reconnect_connect_failed(s->reconnect, time_msec(), error);
302         } else {
303             reconnect_connecting(s->reconnect, time_msec());
304         }
305     }
306 }
307
308 static int
309 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
310 {
311     if (s->rpc) {
312         struct jsonrpc_msg *msg;
313         int error;
314
315         jsonrpc_run(s->rpc);
316
317         while (!list_is_empty(&s->completions)) {
318             struct ovsdb_jsonrpc_trigger *t
319                 = CONTAINER_OF(s->completions.next,
320                                struct ovsdb_jsonrpc_trigger, trigger.node);
321             ovsdb_jsonrpc_trigger_complete(t);
322         }
323
324         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
325             reconnect_received(s->reconnect, time_msec());
326             if (msg->type == JSONRPC_REQUEST) {
327                 ovsdb_jsonrpc_session_got_request(s, msg);
328             } else if (msg->type == JSONRPC_NOTIFY) {
329                 ovsdb_jsonrpc_session_got_notify(s, msg);
330             } else {
331                 VLOG_WARN("%s: received unexpected %s message",
332                           jsonrpc_get_name(s->rpc),
333                           jsonrpc_msg_type_to_string(msg->type));
334                 jsonrpc_error(s->rpc, EPROTO);
335                 jsonrpc_msg_destroy(msg);
336             }
337         }
338
339         error = jsonrpc_get_status(s->rpc);
340         if (error) {
341             if (s->active) {
342                 ovsdb_jsonrpc_session_disconnect(s);
343             } else {
344                 return error;
345             }
346         }
347     } else if (s->stream) {
348         int error = stream_connect(s->stream);
349         if (!error) {
350             reconnect_connected(s->reconnect, time_msec());
351             s->rpc = jsonrpc_open(s->stream);
352             s->stream = NULL;
353         } else if (error != EAGAIN) {
354             reconnect_connect_failed(s->reconnect, time_msec(), error);
355             stream_close(s->stream);
356             s->stream = NULL;
357         }
358     }
359
360     switch (reconnect_run(s->reconnect, time_msec())) {
361     case RECONNECT_CONNECT:
362         ovsdb_jsonrpc_session_connect(s);
363         break;
364
365     case RECONNECT_DISCONNECT:
366         ovsdb_jsonrpc_session_disconnect(s);
367         break;
368
369     case RECONNECT_PROBE:
370         if (s->rpc) {
371             struct json *params = json_array_create_empty();
372             jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
373         }
374         break;
375     }
376     return s->active || s->rpc ? 0 : ETIMEDOUT;
377
378 }
379
380 static void
381 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
382 {
383     if (s->rpc) {
384         jsonrpc_wait(s->rpc);
385         if (!jsonrpc_get_backlog(s->rpc)) {
386             jsonrpc_recv_wait(s->rpc);
387         }
388     } else if (s->stream) {
389         stream_connect_wait(s->stream);
390     }
391     reconnect_wait(s->reconnect, time_msec());
392 }
393
394 static struct jsonrpc_msg *
395 execute_transaction(struct ovsdb_jsonrpc_session *s,
396                     struct jsonrpc_msg *request)
397 {
398     struct ovsdb_jsonrpc_trigger *t;
399     size_t hash;
400
401     /* Check for duplicate ID. */
402     hash = json_hash(request->id, 0);
403     t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
404     if (t) {
405         return jsonrpc_create_error(
406             json_string_create("duplicate request ID"), request->id);
407     }
408
409     /* Insert into trigger table. */
410     t = xmalloc(sizeof *t);
411     ovsdb_trigger_init(s->server->db,
412                        &t->trigger, request->params, &s->completions,
413                        time_msec());
414     t->session = s;
415     t->id = request->id;
416     hmap_insert(&s->triggers, &t->hmap_node, hash);
417
418     request->id = NULL;
419     request->params = NULL;
420
421     /* Complete early if possible. */
422     if (ovsdb_trigger_is_complete(&t->trigger)) {
423         ovsdb_jsonrpc_trigger_complete(t);
424     }
425
426     return NULL;
427 }
428
429 static void
430 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
431                                   struct jsonrpc_msg *request)
432 {
433     struct jsonrpc_msg *reply;
434
435     if (!strcmp(request->method, "transact")) {
436         reply = execute_transaction(s, request);
437     } else if (!strcmp(request->method, "get_schema")) {
438         reply = jsonrpc_create_reply(
439             ovsdb_schema_to_json(s->server->db->schema), request->id);
440     } else if (!strcmp(request->method, "echo")) {
441         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
442     } else {
443         reply = jsonrpc_create_error(json_string_create("unknown method"),
444                                      request->id);
445     }
446
447     if (reply) {
448         jsonrpc_msg_destroy(request);
449         jsonrpc_send(s->rpc, reply);
450     }
451 }
452
453 static void
454 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
455 {
456     if (json_array(request->params)->n == 1) {
457         struct ovsdb_jsonrpc_trigger *t;
458         struct json *id;
459
460         id = request->params->u.array.elems[0];
461         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
462         if (t) {
463             ovsdb_jsonrpc_trigger_complete(t);
464         }
465     }
466 }
467
468 static void
469 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
470                                  struct jsonrpc_msg *request)
471 {
472     if (!strcmp(request->method, "cancel")) {
473         execute_cancel(s, request);
474     }
475     jsonrpc_msg_destroy(request);
476 }