ovsdb: Fix bug in JSON-RPC server.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "json.h"
23 #include "jsonrpc.h"
24 #include "ovsdb.h"
25 #include "stream.h"
26 #include "svec.h"
27 #include "timeval.h"
28 #include "trigger.h"
29
30 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
31 #include "vlog.h"
32
33 struct ovsdb_jsonrpc_trigger {
34     struct ovsdb_trigger trigger;
35     struct ovsdb_jsonrpc_session *session;
36     struct hmap_node hmap_node; /* Element in session's trigger table. */
37     struct json *id;
38 };
39
40 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
41     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
42 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
43
44 struct ovsdb_jsonrpc_session {
45     struct ovsdb_jsonrpc_server *server;
46     struct list node;           /* Element in server's sessions list. */
47     struct jsonrpc *rpc;
48     struct hmap triggers;
49     struct list completions;    /* Completed triggers. */
50 };
51
52 static void ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *,
53                                        struct stream *);
54 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
55 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
56                                              struct jsonrpc_msg *);
57 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
58                                              struct jsonrpc_msg *);
59
60 struct ovsdb_jsonrpc_server {
61     struct ovsdb *db;
62
63     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
64     unsigned int n_sessions, max_sessions;
65     unsigned int max_triggers;
66
67     struct pstream **listeners;
68     size_t n_listeners, allocated_listeners;
69 };
70
71 static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
72                                         struct pstream *);
73
74 int
75 ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
76                             const struct svec *passive,
77                             struct ovsdb_jsonrpc_server **serverp)
78 {
79     struct ovsdb_jsonrpc_server *server;
80     const char *name;
81     int retval = 0;
82     size_t i;
83
84     server = xzalloc(sizeof *server);
85     server->db = db;
86     server->max_sessions = 64;
87     server->max_triggers = 64;
88     list_init(&server->sessions);
89
90     SVEC_FOR_EACH (i, name, active) {
91         struct stream *stream;
92         int error;
93
94         error = stream_open(name, &stream);
95         if (!error) {
96             ovsdb_jsonrpc_session_open(server, stream);
97         } else {
98             ovs_error(error, "%s: connection failed", name);
99             retval = error;
100         }
101     }
102
103     SVEC_FOR_EACH (i, name, passive) {
104         struct pstream *pstream;
105         int error;
106
107         error = pstream_open(name, &pstream);
108         if (!error) {
109             ovsdb_jsonrpc_server_listen(server, pstream);
110         } else {
111             ovs_error(error, "failed to listen on %s", name);
112             retval = error;
113         }
114     }
115
116     *serverp = server;
117     return retval;
118 }
119
120 void
121 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
122 {
123     struct ovsdb_jsonrpc_session *s, *next;
124     size_t i;
125
126     /* Accept new connections. */
127     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
128         struct pstream *listener = svr->listeners[i];
129         struct stream *stream;
130         int error;
131
132         error = pstream_accept(listener, &stream);
133         if (!error) {
134             ovsdb_jsonrpc_session_open(svr, stream);
135         } else if (error == EAGAIN) {
136             i++;
137         } else if (error) {
138             VLOG_WARN("%s: accept failed: %s",
139                       pstream_get_name(listener), strerror(error));
140             pstream_close(listener);
141             svr->listeners[i] = svr->listeners[--svr->n_listeners];
142         }
143     }
144
145     /* Handle each session. */
146     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
147                         &svr->sessions) {
148         struct jsonrpc_msg *msg;
149         int error;
150
151         jsonrpc_run(s->rpc);
152
153         while (!list_is_empty(&s->completions)) {
154             struct ovsdb_jsonrpc_trigger *t
155                 = CONTAINER_OF(s->completions.next,
156                                struct ovsdb_jsonrpc_trigger, trigger.node);
157             ovsdb_jsonrpc_trigger_complete(t);
158         }
159
160         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
161             if (msg->type == JSONRPC_REQUEST) {
162                 ovsdb_jsonrpc_session_got_request(s, msg);
163             } else if (msg->type == JSONRPC_NOTIFY) {
164                 ovsdb_jsonrpc_session_got_notify(s, msg);
165             } else {
166                 VLOG_WARN("%s: received unexpected %s message",
167                           jsonrpc_get_name(s->rpc),
168                           jsonrpc_msg_type_to_string(msg->type));
169                 jsonrpc_error(s->rpc, EPROTO);
170                 jsonrpc_msg_destroy(msg);
171             }
172         }
173
174         error = jsonrpc_get_status(s->rpc);
175         if (error) {
176             ovsdb_jsonrpc_session_close(s);
177         }
178     }
179 }
180
181 void
182 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
183 {
184     struct ovsdb_jsonrpc_session *s;
185
186     if (svr->n_sessions < svr->max_sessions) {
187         size_t i;
188
189         for (i = 0; i < svr->n_listeners; i++) {
190             pstream_wait(svr->listeners[i]);
191         }
192     }
193
194     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
195         jsonrpc_wait(s->rpc);
196         if (!jsonrpc_get_backlog(s->rpc)) {
197             jsonrpc_recv_wait(s->rpc);
198         }
199     }
200 }
201
202 static void
203 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
204                             struct pstream *pstream)
205 {
206     if (svr->n_listeners >= svr->allocated_listeners) {
207         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
208                                     sizeof *svr->listeners);
209     }
210     svr->listeners[svr->n_listeners++] = pstream;
211 }
212
213 static struct ovsdb_jsonrpc_trigger *
214 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
215                            const struct json *id, size_t hash)
216 {
217     struct ovsdb_jsonrpc_trigger *t;
218
219     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
220                              &s->triggers) {
221         if (json_equal(t->id, id)) {
222             return t;
223         }
224     }
225
226     return NULL;
227 }
228
229 static void
230 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
231 {
232     struct ovsdb_jsonrpc_session *s = t->session;
233
234     if (!jsonrpc_get_status(s->rpc)) {
235         struct jsonrpc_msg *reply;
236         struct json *result;
237
238         result = ovsdb_trigger_steal_result(&t->trigger);
239         if (result) {
240             reply = jsonrpc_create_reply(result, t->id);
241         } else {
242             reply = jsonrpc_create_error(json_string_create("canceled"),
243                                          t->id);
244         }
245         jsonrpc_send(s->rpc, reply);
246     }
247
248     json_destroy(t->id);
249     ovsdb_trigger_destroy(&t->trigger);
250     hmap_remove(&s->triggers, &t->hmap_node);
251     free(t);
252 }
253
254 static void
255 ovsdb_jsonrpc_session_open(struct ovsdb_jsonrpc_server *svr,
256                            struct stream *stream)
257 {
258     struct ovsdb_jsonrpc_session *s;
259
260     s = xzalloc(sizeof *s);
261     s->server = svr;
262     list_push_back(&svr->sessions, &s->node);
263     s->rpc = jsonrpc_open(stream);
264     hmap_init(&s->triggers);
265     list_init(&s->completions);
266 }
267
268 static void
269 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
270 {
271     struct ovsdb_jsonrpc_trigger *t, *next;
272
273     jsonrpc_error(s->rpc, EOF);
274     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
275                         &s->triggers) {
276         ovsdb_jsonrpc_trigger_complete(t);
277     }
278
279     jsonrpc_close(s->rpc);
280
281     list_remove(&s->node);
282     s->server->n_sessions--;
283 }
284
285 static struct jsonrpc_msg *
286 execute_transaction(struct ovsdb_jsonrpc_session *s,
287                     struct jsonrpc_msg *request)
288 {
289     struct ovsdb_jsonrpc_trigger *t;
290     size_t hash;
291
292     /* Check for duplicate ID. */
293     hash = json_hash(request->id, 0);
294     t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
295     if (t) {
296         return jsonrpc_create_error(
297             json_string_create("duplicate request ID"), request->id);
298     }
299
300     /* Insert into trigger table. */
301     t = xmalloc(sizeof *t);
302     ovsdb_trigger_init(s->server->db,
303                        &t->trigger, request->params, &s->completions,
304                        time_msec());
305     t->session = s;
306     t->id = request->id;
307     hmap_insert(&s->triggers, &t->hmap_node, hash);
308
309     request->id = NULL;
310     request->params = NULL;
311
312     /* Complete early if possible. */
313     if (ovsdb_trigger_is_complete(&t->trigger)) {
314         ovsdb_jsonrpc_trigger_complete(t);
315     }
316
317     return NULL;
318 }
319
320 static void
321 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
322                                   struct jsonrpc_msg *request)
323 {
324     struct jsonrpc_msg *reply;
325
326     if (!strcmp(request->method, "transact")) {
327         reply = execute_transaction(s, request);
328     } else if (!strcmp(request->method, "get_schema")) {
329         reply = jsonrpc_create_reply(
330             ovsdb_schema_to_json(s->server->db->schema), request->id);
331     } else {
332         reply = jsonrpc_create_error(json_string_create("unknown method"),
333                                      request->id);
334     }
335
336     if (reply) {
337         jsonrpc_msg_destroy(request);
338         jsonrpc_send(s->rpc, reply);
339     }
340 }
341
342 static void
343 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
344 {
345     size_t hash = json_hash(request->id, 0);
346     struct ovsdb_jsonrpc_trigger *t;
347
348     t = ovsdb_jsonrpc_trigger_find(s, request->params, hash);
349     if (t) {
350         ovsdb_jsonrpc_trigger_complete(t);
351     }
352 }
353
354 static void
355 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
356                                  struct jsonrpc_msg *request)
357 {
358     if (!strcmp(request->method, "cancel")) {
359         execute_cancel(s, request);
360     }
361     jsonrpc_msg_destroy(request);
362 }