ovsdb-server: Free memory on exit.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "column.h"
24 #include "json.h"
25 #include "jsonrpc.h"
26 #include "ovsdb-error.h"
27 #include "ovsdb-parser.h"
28 #include "ovsdb.h"
29 #include "reconnect.h"
30 #include "row.h"
31 #include "stream.h"
32 #include "table.h"
33 #include "timeval.h"
34 #include "transaction.h"
35 #include "trigger.h"
36
37 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
38 #include "vlog.h"
39
40 struct ovsdb_jsonrpc_remote;
41 struct ovsdb_jsonrpc_session;
42
43 /* Message rate-limiting. */
44 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
45
46 /* Sessions. */
47 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
48     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
49 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
50 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
52
53 /* Triggers. */
54 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
55                                          struct json *id, struct json *params);
56 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
57     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
58 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
59 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
60 static void ovsdb_jsonrpc_trigger_complete_done(
61     struct ovsdb_jsonrpc_session *);
62
63 /* Monitors. */
64 static struct json *ovsdb_jsonrpc_monitor_create(
65     struct ovsdb_jsonrpc_session *, struct json *params);
66 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
67     struct ovsdb_jsonrpc_session *,
68     struct json_array *params,
69     const struct json *request_id);
70 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
71 \f
72 /* JSON-RPC database server. */
73
74 struct ovsdb_jsonrpc_server {
75     struct ovsdb *db;
76     unsigned int n_sessions, max_sessions;
77     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
78 };
79
80 /* A configured remote.  This is either a passive stream listener plus a list
81  * of the currently connected sessions, or a list of exactly one active
82  * session. */
83 struct ovsdb_jsonrpc_remote {
84     struct ovsdb_jsonrpc_server *server;
85     struct pstream *listener;   /* Listener, if passive. */
86     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
87 };
88
89 static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
90                                             const char *name);
91 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
92
93 struct ovsdb_jsonrpc_server *
94 ovsdb_jsonrpc_server_create(struct ovsdb *db)
95 {
96     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
97     server->db = db;
98     server->max_sessions = 64;
99     shash_init(&server->remotes);
100     return server;
101 }
102
103 void
104 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
105 {
106     struct shash_node *node, *next;
107
108     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
109         ovsdb_jsonrpc_server_del_remote(node);
110     }
111     shash_destroy(&svr->remotes);
112     free(svr);
113 }
114
115 /* Sets 'svr''s current set of remotes to the names in 'new_remotes'.  The data
116  * values in 'new_remotes' are ignored.
117  *
118  * A remote is an active or passive stream connection method, e.g. "pssl:" or
119  * "tcp:1.2.3.4". */
120 void
121 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
122                                  const struct shash *new_remotes)
123 {
124     struct shash_node *node, *next;
125
126     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
127         if (!shash_find(new_remotes, node->name)) {
128             ovsdb_jsonrpc_server_del_remote(node);
129         }
130     }
131     SHASH_FOR_EACH (node, new_remotes) {
132         if (!shash_find(&svr->remotes, node->name)) {
133             ovsdb_jsonrpc_server_add_remote(svr, node->name);
134         }
135     }
136 }
137
138 static void
139 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
140                                 const char *name)
141 {
142     struct ovsdb_jsonrpc_remote *remote;
143     struct pstream *listener;
144     int error;
145
146     error = pstream_open(name, &listener);
147     if (error && error != EAFNOSUPPORT) {
148         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
149         return;
150     }
151
152     remote = xmalloc(sizeof *remote);
153     remote->server = svr;
154     remote->listener = listener;
155     list_init(&remote->sessions);
156     shash_add(&svr->remotes, name, remote);
157
158     if (!listener) {
159         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
160     }
161 }
162
163 static void
164 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
165 {
166     struct ovsdb_jsonrpc_remote *remote = node->data;
167
168     ovsdb_jsonrpc_session_close_all(remote);
169     pstream_close(remote->listener);
170     shash_delete(&remote->server->remotes, node);
171     free(remote);
172 }
173
174 void
175 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
176 {
177     struct shash_node *node;
178
179     SHASH_FOR_EACH (node, &svr->remotes) {
180         struct ovsdb_jsonrpc_remote *remote = node->data;
181
182         if (remote->listener && svr->n_sessions < svr->max_sessions) {
183             struct stream *stream;
184             int error;
185
186             error = pstream_accept(remote->listener, &stream);
187             if (!error) {
188                 struct jsonrpc_session *js;
189                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
190                 ovsdb_jsonrpc_session_create(remote, js);
191             } else if (error != EAGAIN) {
192                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
193                              pstream_get_name(remote->listener),
194                              strerror(error));
195             }
196         }
197
198         ovsdb_jsonrpc_session_run_all(remote);
199     }
200 }
201
202 void
203 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
204 {
205     struct shash_node *node;
206
207     SHASH_FOR_EACH (node, &svr->remotes) {
208         struct ovsdb_jsonrpc_remote *remote = node->data;
209
210         if (remote->listener && svr->n_sessions < svr->max_sessions) {
211             pstream_wait(remote->listener);
212         }
213
214         ovsdb_jsonrpc_session_wait_all(remote);
215     }
216 }
217 \f
218 /* JSON-RPC database server session. */
219
220 struct ovsdb_jsonrpc_session {
221     struct ovsdb_jsonrpc_remote *remote;
222     struct list node;           /* Element in remote's sessions list. */
223
224     /* Triggers. */
225     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
226     struct list completions;    /* Completed triggers. */
227
228     /* Monitors. */
229     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
230
231     /* Network connectivity. */
232     struct jsonrpc_session *js;  /* JSON-RPC session. */
233     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
234 };
235
236 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
237 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
238 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
239 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
240                                              struct jsonrpc_msg *);
241 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
242                                              struct jsonrpc_msg *);
243
244 static struct ovsdb_jsonrpc_session *
245 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
246                              struct jsonrpc_session *js)
247 {
248     struct ovsdb_jsonrpc_session *s;
249
250     s = xzalloc(sizeof *s);
251     s->remote = remote;
252     list_push_back(&remote->sessions, &s->node);
253     hmap_init(&s->triggers);
254     hmap_init(&s->monitors);
255     list_init(&s->completions);
256     s->js = js;
257     s->js_seqno = jsonrpc_session_get_seqno(js);
258
259     remote->server->n_sessions++;
260
261     return s;
262 }
263
264 static void
265 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
266 {
267     ovsdb_jsonrpc_monitor_remove_all(s);
268     jsonrpc_session_close(s->js);
269     list_remove(&s->node);
270     s->remote->server->n_sessions--;
271     free(s);
272 }
273
274 static int
275 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
276 {
277     jsonrpc_session_run(s->js);
278     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
279         s->js_seqno = jsonrpc_session_get_seqno(s->js);
280         ovsdb_jsonrpc_trigger_complete_all(s);
281         ovsdb_jsonrpc_monitor_remove_all(s);
282     }
283
284     ovsdb_jsonrpc_trigger_complete_done(s);
285
286     if (!jsonrpc_session_get_backlog(s->js)) {
287         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
288         if (msg) {
289             if (msg->type == JSONRPC_REQUEST) {
290                 ovsdb_jsonrpc_session_got_request(s, msg);
291             } else if (msg->type == JSONRPC_NOTIFY) {
292                 ovsdb_jsonrpc_session_got_notify(s, msg);
293             } else {
294                 VLOG_WARN("%s: received unexpected %s message",
295                           jsonrpc_session_get_name(s->js),
296                           jsonrpc_msg_type_to_string(msg->type));
297                 jsonrpc_session_force_reconnect(s->js);
298                 jsonrpc_msg_destroy(msg);
299             }
300         }
301     }
302     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
303 }
304
305 static void
306 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
307 {
308     struct ovsdb_jsonrpc_session *s, *next;
309
310     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
311                         &remote->sessions) {
312         int error = ovsdb_jsonrpc_session_run(s);
313         if (error) {
314             ovsdb_jsonrpc_session_close(s);
315         }
316     }
317 }
318
319 static void
320 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
321 {
322     jsonrpc_session_wait(s->js);
323     if (!jsonrpc_session_get_backlog(s->js)) {
324         jsonrpc_session_recv_wait(s->js);
325     }
326 }
327
328 static void
329 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
330 {
331     struct ovsdb_jsonrpc_session *s;
332
333     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
334         ovsdb_jsonrpc_session_wait(s);
335     }
336 }
337
338 static void
339 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
340 {
341     struct ovsdb_jsonrpc_session *s, *next;
342
343     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
344                         &remote->sessions) {
345         ovsdb_jsonrpc_session_close(s);
346     }
347 }
348
349 static struct jsonrpc_msg *
350 execute_transaction(struct ovsdb_jsonrpc_session *s,
351                     struct jsonrpc_msg *request)
352 {
353     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
354     request->id = NULL;
355     request->params = NULL;
356     jsonrpc_msg_destroy(request);
357     return NULL;
358 }
359
360 static void
361 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
362                                   struct jsonrpc_msg *request)
363 {
364     struct jsonrpc_msg *reply;
365
366     if (!strcmp(request->method, "transact")) {
367         reply = execute_transaction(s, request);
368     } else if (!strcmp(request->method, "monitor")) {
369         reply = jsonrpc_create_reply(
370             ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
371     } else if (!strcmp(request->method, "monitor_cancel")) {
372         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
373                                              request->id);
374     } else if (!strcmp(request->method, "get_schema")) {
375         reply = jsonrpc_create_reply(
376             ovsdb_schema_to_json(s->remote->server->db->schema), request->id);
377     } else if (!strcmp(request->method, "echo")) {
378         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
379     } else {
380         reply = jsonrpc_create_error(json_string_create("unknown method"),
381                                      request->id);
382     }
383
384     if (reply) {
385         jsonrpc_msg_destroy(request);
386         jsonrpc_session_send(s->js, reply);
387     }
388 }
389
390 static void
391 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
392 {
393     if (json_array(request->params)->n == 1) {
394         struct ovsdb_jsonrpc_trigger *t;
395         struct json *id;
396
397         id = request->params->u.array.elems[0];
398         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
399         if (t) {
400             ovsdb_jsonrpc_trigger_complete(t);
401         }
402     }
403 }
404
405 static void
406 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
407                                  struct jsonrpc_msg *request)
408 {
409     if (!strcmp(request->method, "cancel")) {
410         execute_cancel(s, request);
411     }
412     jsonrpc_msg_destroy(request);
413 }
414 \f
415 /* JSON-RPC database server triggers.
416  *
417  * (Every transaction is treated as a trigger even if it doesn't actually have
418  * any "wait" operations.) */
419
420 struct ovsdb_jsonrpc_trigger {
421     struct ovsdb_trigger trigger;
422     struct ovsdb_jsonrpc_session *session;
423     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
424     struct json *id;
425 };
426
427 static void
428 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
429                              struct json *id, struct json *params)
430 {
431     struct ovsdb_jsonrpc_trigger *t;
432     size_t hash;
433
434     /* Check for duplicate ID. */
435     hash = json_hash(id, 0);
436     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
437     if (t) {
438         struct jsonrpc_msg *msg;
439
440         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
441                                    id);
442         jsonrpc_session_send(s->js, msg);
443         json_destroy(id);
444         json_destroy(params);
445         return;
446     }
447
448     /* Insert into trigger table. */
449     t = xmalloc(sizeof *t);
450     ovsdb_trigger_init(s->remote->server->db,
451                        &t->trigger, params, &s->completions,
452                        time_msec());
453     t->session = s;
454     t->id = id;
455     hmap_insert(&s->triggers, &t->hmap_node, hash);
456
457     /* Complete early if possible. */
458     if (ovsdb_trigger_is_complete(&t->trigger)) {
459         ovsdb_jsonrpc_trigger_complete(t);
460     }
461 }
462
463 static struct ovsdb_jsonrpc_trigger *
464 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
465                            const struct json *id, size_t hash)
466 {
467     struct ovsdb_jsonrpc_trigger *t;
468
469     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
470                              &s->triggers) {
471         if (json_equal(t->id, id)) {
472             return t;
473         }
474     }
475
476     return NULL;
477 }
478
479 static void
480 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
481 {
482     struct ovsdb_jsonrpc_session *s = t->session;
483
484     if (jsonrpc_session_is_connected(s->js)) {
485         struct jsonrpc_msg *reply;
486         struct json *result;
487
488         result = ovsdb_trigger_steal_result(&t->trigger);
489         if (result) {
490             reply = jsonrpc_create_reply(result, t->id);
491         } else {
492             reply = jsonrpc_create_error(json_string_create("canceled"),
493                                          t->id);
494         }
495         jsonrpc_session_send(s->js, reply);
496     }
497
498     json_destroy(t->id);
499     ovsdb_trigger_destroy(&t->trigger);
500     hmap_remove(&s->triggers, &t->hmap_node);
501     free(t);
502 }
503
504 static void
505 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
506 {
507     struct ovsdb_jsonrpc_trigger *t, *next;
508     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
509                         &s->triggers) {
510         ovsdb_jsonrpc_trigger_complete(t);
511     }
512 }
513
514 static void
515 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
516 {
517     while (!list_is_empty(&s->completions)) {
518         struct ovsdb_jsonrpc_trigger *t
519             = CONTAINER_OF(s->completions.next,
520                            struct ovsdb_jsonrpc_trigger, trigger.node);
521         ovsdb_jsonrpc_trigger_complete(t);
522     }
523 }
524 \f
525 /* JSON-RPC database table monitors. */
526
527 enum ovsdb_jsonrpc_monitor_selection {
528     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
529     OJMS_INSERT = 1 << 1,       /* New rows. */
530     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
531     OJMS_MODIFY = 1 << 3        /* Modified rows. */
532 };
533
534 struct ovsdb_jsonrpc_monitor_table {
535     const struct ovsdb_table *table;
536     enum ovsdb_jsonrpc_monitor_selection select;
537     struct ovsdb_column_set columns;
538 };
539
540 struct ovsdb_jsonrpc_monitor {
541     struct ovsdb_replica replica;
542     struct ovsdb_jsonrpc_session *session;
543     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
544
545     struct json *monitor_id;
546     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
547 };
548
549 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
550
551 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
552     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
553 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
554 static struct json *ovsdb_jsonrpc_monitor_get_initial(
555     const struct ovsdb_jsonrpc_monitor *);
556
557 static bool
558 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
559 {
560     const struct json *json;
561
562     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
563     return json ? json_boolean(json) : default_value;
564 }
565
566 struct ovsdb_jsonrpc_monitor *
567 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
568                            const struct json *monitor_id)
569 {
570     struct ovsdb_jsonrpc_monitor *m;
571
572     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
573                              json_hash(monitor_id, 0), &s->monitors) {
574         if (json_equal(m->monitor_id, monitor_id)) {
575             return m;
576         }
577     }
578
579     return NULL;
580 }
581
582 static struct json *
583 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
584                              struct json *params)
585 {
586     struct ovsdb_jsonrpc_monitor *m = NULL;
587     struct json *monitor_id, *monitor_requests;
588     struct ovsdb_error *error = NULL;
589     struct shash_node *node;
590     struct json *json;
591
592     if (json_array(params)->n != 2) {
593         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
594         goto error;
595     }
596     monitor_id = params->u.array.elems[0];
597     monitor_requests = params->u.array.elems[1];
598     if (monitor_requests->type != JSON_OBJECT) {
599         error = ovsdb_syntax_error(monitor_requests, NULL,
600                                    "monitor-requests must be object");
601         goto error;
602     }
603
604     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
605         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
606         goto error;
607     }
608
609     m = xzalloc(sizeof *m);
610     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
611     ovsdb_add_replica(s->remote->server->db, &m->replica);
612     m->session = s;
613     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
614     m->monitor_id = json_clone(monitor_id);
615     shash_init(&m->tables);
616
617     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
618         const struct ovsdb_table *table;
619         struct ovsdb_jsonrpc_monitor_table *mt;
620         const struct json *columns_json, *select_json;
621         struct ovsdb_parser parser;
622
623         table = ovsdb_get_table(s->remote->server->db, node->name);
624         if (!table) {
625             error = ovsdb_syntax_error(NULL, NULL,
626                                        "no table named %s", node->name);
627             goto error;
628         }
629
630         mt = xzalloc(sizeof *mt);
631         mt->table = table;
632         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
633         ovsdb_column_set_init(&mt->columns);
634         shash_add(&m->tables, table->schema->name, mt);
635
636         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
637         columns_json = ovsdb_parser_member(&parser, "columns",
638                                            OP_ARRAY | OP_OPTIONAL);
639         select_json = ovsdb_parser_member(&parser, "select",
640                                           OP_OBJECT | OP_OPTIONAL);
641         error = ovsdb_parser_finish(&parser);
642         if (error) {
643             goto error;
644         }
645
646         if (columns_json) {
647             error = ovsdb_column_set_from_json(columns_json, table,
648                                                &mt->columns);
649             if (error) {
650                 goto error;
651             }
652         } else {
653             struct shash_node *node;
654
655             SHASH_FOR_EACH (node, &table->schema->columns) {
656                 const struct ovsdb_column *column = node->data;
657                 if (column->index != OVSDB_COL_UUID) {
658                     ovsdb_column_set_add(&mt->columns, column);
659                 }
660             }
661         }
662
663         if (select_json) {
664             mt->select = 0;
665             ovsdb_parser_init(&parser, select_json, "table %s select",
666                               table->schema->name);
667             if (parse_bool(&parser, "initial", true)) {
668                 mt->select |= OJMS_INITIAL;
669             }
670             if (parse_bool(&parser, "insert", true)) {
671                 mt->select |= OJMS_INSERT;
672             }
673             if (parse_bool(&parser, "delete", true)) {
674                 mt->select |= OJMS_DELETE;
675             }
676             if (parse_bool(&parser, "modify", true)) {
677                 mt->select |= OJMS_MODIFY;
678             }
679             error = ovsdb_parser_finish(&parser);
680             if (error) {
681                 goto error;
682             }
683         }
684     }
685
686     return ovsdb_jsonrpc_monitor_get_initial(m);
687
688 error:
689     if (m) {
690         ovsdb_remove_replica(s->remote->server->db, &m->replica);
691     }
692
693     json = ovsdb_error_to_json(error);
694     ovsdb_error_destroy(error);
695     return json;
696 }
697
698 static struct jsonrpc_msg *
699 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
700                              struct json_array *params,
701                              const struct json *request_id)
702 {
703     if (params->n != 1) {
704         return jsonrpc_create_error(json_string_create("invalid parameters"),
705                                     request_id);
706     } else {
707         struct ovsdb_jsonrpc_monitor *m;
708
709         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
710         if (!m) {
711             return jsonrpc_create_error(json_string_create("unknown monitor"),
712                                         request_id);
713         } else {
714             ovsdb_remove_replica(s->remote->server->db, &m->replica);
715             return jsonrpc_create_reply(json_object_create(), request_id);
716         }
717     }
718 }
719
720 static void
721 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
722 {
723     struct ovsdb_jsonrpc_monitor *m, *next;
724
725     HMAP_FOR_EACH_SAFE (m, next,
726                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
727         ovsdb_remove_replica(s->remote->server->db, &m->replica);
728     }
729 }
730
731 static struct ovsdb_jsonrpc_monitor *
732 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
733 {
734     assert(replica->class == &ovsdb_jsonrpc_replica_class);
735     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
736 }
737
738 struct ovsdb_jsonrpc_monitor_aux {
739     bool initial;               /* Sending initial contents of table? */
740     const struct ovsdb_jsonrpc_monitor *monitor;
741     struct json *json;          /* JSON for the whole transaction. */
742
743     /* Current table.  */
744     struct ovsdb_jsonrpc_monitor_table *mt;
745     struct json *table_json;    /* JSON for table's transaction. */
746 };
747
748 static bool
749 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
750                                 const struct ovsdb_row *new,
751                                 void *aux_)
752 {
753     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
754     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
755     struct ovsdb_table *table = new ? new->table : old->table;
756     enum ovsdb_jsonrpc_monitor_selection type;
757     struct json *old_json, *new_json;
758     struct json *row_json;
759     char uuid[UUID_LEN + 1];
760     int n_changed;
761     size_t i;
762
763     if (!aux->mt || table != aux->mt->table) {
764         aux->mt = shash_find_data(&m->tables, table->schema->name);
765         aux->table_json = NULL;
766         if (!aux->mt) {
767             /* We don't care about rows in this table at all.  Tell the caller
768              * to skip it.  */
769             return false;
770         }
771     }
772
773     type = (aux->initial ? OJMS_INITIAL
774             : !old ? OJMS_INSERT
775             : !new ? OJMS_DELETE
776             : OJMS_MODIFY);
777     if (!(aux->mt->select & type)) {
778         /* We don't care about this type of change (but do want to be called
779          * back for changes to other rows in the same table). */
780         return true;
781     }
782
783     old_json = new_json = NULL;
784     n_changed = 0;
785     for (i = 0; i < aux->mt->columns.n_columns; i++) {
786         const struct ovsdb_column *column = aux->mt->columns.columns[i];
787         unsigned int idx = column->index;
788         bool changed = false;
789
790         if (type == OJMS_MODIFY) {
791             changed = !ovsdb_datum_equals(&old->fields[idx],
792                                           &new->fields[idx], &column->type);
793             n_changed += changed;
794         }
795         if (changed || type == OJMS_DELETE) {
796             if (!old_json) {
797                 old_json = json_object_create();
798             }
799             json_object_put(old_json, column->name,
800                             ovsdb_datum_to_json(&old->fields[idx],
801                                                 &column->type));
802         }
803         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
804             if (!new_json) {
805                 new_json = json_object_create();
806             }
807             json_object_put(new_json, column->name,
808                             ovsdb_datum_to_json(&new->fields[idx],
809                                                 &column->type));
810         }
811     }
812     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
813         /* No reportable changes. */
814         json_destroy(old_json);
815         json_destroy(new_json);
816         return true;
817     }
818
819     /* Create JSON object for transaction overall. */
820     if (!aux->json) {
821         aux->json = json_object_create();
822     }
823
824     /* Create JSON object for transaction on this table. */
825     if (!aux->table_json) {
826         aux->table_json = json_object_create();
827         json_object_put(aux->json, aux->mt->table->schema->name,
828                         aux->table_json);
829     }
830
831     /* Create JSON object for transaction on this row. */
832     row_json = json_object_create();
833     if (old_json) {
834         json_object_put(row_json, "old", old_json);
835     }
836     if (new_json) {
837         json_object_put(row_json, "new", new_json);
838     }
839
840     /* Add JSON row to JSON table. */
841     snprintf(uuid, sizeof uuid,
842              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
843     json_object_put(aux->table_json, uuid, row_json);
844
845     return true;
846 }
847
848 static void
849 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
850                                const struct ovsdb_jsonrpc_monitor *m,
851                                bool initial)
852 {
853     aux->initial = initial;
854     aux->monitor = m;
855     aux->json = NULL;
856     aux->mt = NULL;
857     aux->table_json = NULL;
858 }
859
860 static struct ovsdb_error *
861 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
862                              const struct ovsdb_txn *txn, bool durable UNUSED)
863 {
864     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
865     struct ovsdb_jsonrpc_monitor_aux aux;
866
867     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
868     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
869     if (aux.json) {
870         struct jsonrpc_msg *msg;
871         struct json *params;
872
873         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
874                                      aux.json);
875         msg = jsonrpc_create_notify("update", params);
876         jsonrpc_session_send(aux.monitor->session->js, msg);
877     }
878
879     return NULL;
880 }
881
882 static struct json *
883 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
884 {
885     struct ovsdb_jsonrpc_monitor_aux aux;
886     struct shash_node *node;
887
888     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
889     SHASH_FOR_EACH (node, &m->tables) {
890         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
891
892         if (mt->select & OJMS_INITIAL) {
893             struct ovsdb_row *row;
894
895             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
896                            &mt->table->rows) {
897                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
898             }
899         }
900     }
901     return aux.json ? aux.json : json_object_create();
902 }
903
904 static void
905 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
906 {
907     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
908     struct shash_node *node;
909
910     json_destroy(m->monitor_id);
911     SHASH_FOR_EACH (node, &m->tables) {
912         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
913         ovsdb_column_set_destroy(&mt->columns);
914         free(mt);
915     }
916     shash_destroy(&m->tables);
917     hmap_remove(&m->session->monitors, &m->node);
918     free(m);
919 }
920
921 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
922     ovsdb_jsonrpc_monitor_commit,
923     ovsdb_jsonrpc_monitor_destroy
924 };