ovsdb: Check for changed columns only once per transaction commit.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "json.h"
26 #include "jsonrpc.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
29 #include "ovsdb.h"
30 #include "reconnect.h"
31 #include "row.h"
32 #include "stream.h"
33 #include "table.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "trigger.h"
37
38 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
39 #include "vlog.h"
40
41 struct ovsdb_jsonrpc_remote;
42 struct ovsdb_jsonrpc_session;
43
44 /* Message rate-limiting. */
45 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
46
47 /* Sessions. */
48 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
49     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
50 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
52 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
53
54 /* Triggers. */
55 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
56                                          struct json *id, struct json *params);
57 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
58     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
59 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
60 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
61 static void ovsdb_jsonrpc_trigger_complete_done(
62     struct ovsdb_jsonrpc_session *);
63
64 /* Monitors. */
65 static struct json *ovsdb_jsonrpc_monitor_create(
66     struct ovsdb_jsonrpc_session *, struct json *params);
67 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
68     struct ovsdb_jsonrpc_session *,
69     struct json_array *params,
70     const struct json *request_id);
71 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
72 \f
73 /* JSON-RPC database server. */
74
75 struct ovsdb_jsonrpc_server {
76     struct ovsdb *db;
77     unsigned int n_sessions, max_sessions;
78     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
79 };
80
81 /* A configured remote.  This is either a passive stream listener plus a list
82  * of the currently connected sessions, or a list of exactly one active
83  * session. */
84 struct ovsdb_jsonrpc_remote {
85     struct ovsdb_jsonrpc_server *server;
86     struct pstream *listener;   /* Listener, if passive. */
87     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
88 };
89
90 static void ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *,
91                                             const char *name);
92 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
93
94 struct ovsdb_jsonrpc_server *
95 ovsdb_jsonrpc_server_create(struct ovsdb *db)
96 {
97     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
98     server->db = db;
99     server->max_sessions = 64;
100     shash_init(&server->remotes);
101     return server;
102 }
103
104 void
105 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
106 {
107     struct shash_node *node, *next;
108
109     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
110         ovsdb_jsonrpc_server_del_remote(node);
111     }
112     shash_destroy(&svr->remotes);
113     free(svr);
114 }
115
116 /* Sets 'svr''s current set of remotes to the names in 'new_remotes'.  The data
117  * values in 'new_remotes' are ignored.
118  *
119  * A remote is an active or passive stream connection method, e.g. "pssl:" or
120  * "tcp:1.2.3.4". */
121 void
122 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
123                                  const struct shash *new_remotes)
124 {
125     struct shash_node *node, *next;
126
127     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
128         if (!shash_find(new_remotes, node->name)) {
129             ovsdb_jsonrpc_server_del_remote(node);
130         }
131     }
132     SHASH_FOR_EACH (node, new_remotes) {
133         if (!shash_find(&svr->remotes, node->name)) {
134             ovsdb_jsonrpc_server_add_remote(svr, node->name);
135         }
136     }
137 }
138
139 static void
140 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
141                                 const char *name)
142 {
143     struct ovsdb_jsonrpc_remote *remote;
144     struct pstream *listener;
145     int error;
146
147     error = pstream_open(name, &listener);
148     if (error && error != EAFNOSUPPORT) {
149         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
150         return;
151     }
152
153     remote = xmalloc(sizeof *remote);
154     remote->server = svr;
155     remote->listener = listener;
156     list_init(&remote->sessions);
157     shash_add(&svr->remotes, name, remote);
158
159     if (!listener) {
160         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
161     }
162 }
163
164 static void
165 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
166 {
167     struct ovsdb_jsonrpc_remote *remote = node->data;
168
169     ovsdb_jsonrpc_session_close_all(remote);
170     pstream_close(remote->listener);
171     shash_delete(&remote->server->remotes, node);
172     free(remote);
173 }
174
175 void
176 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
177 {
178     struct shash_node *node;
179
180     SHASH_FOR_EACH (node, &svr->remotes) {
181         struct ovsdb_jsonrpc_remote *remote = node->data;
182
183         if (remote->listener && svr->n_sessions < svr->max_sessions) {
184             struct stream *stream;
185             int error;
186
187             error = pstream_accept(remote->listener, &stream);
188             if (!error) {
189                 struct jsonrpc_session *js;
190                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
191                 ovsdb_jsonrpc_session_create(remote, js);
192             } else if (error != EAGAIN) {
193                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
194                              pstream_get_name(remote->listener),
195                              strerror(error));
196             }
197         }
198
199         ovsdb_jsonrpc_session_run_all(remote);
200     }
201 }
202
203 void
204 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
205 {
206     struct shash_node *node;
207
208     SHASH_FOR_EACH (node, &svr->remotes) {
209         struct ovsdb_jsonrpc_remote *remote = node->data;
210
211         if (remote->listener && svr->n_sessions < svr->max_sessions) {
212             pstream_wait(remote->listener);
213         }
214
215         ovsdb_jsonrpc_session_wait_all(remote);
216     }
217 }
218 \f
219 /* JSON-RPC database server session. */
220
221 struct ovsdb_jsonrpc_session {
222     struct ovsdb_jsonrpc_remote *remote;
223     struct list node;           /* Element in remote's sessions list. */
224
225     /* Triggers. */
226     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
227     struct list completions;    /* Completed triggers. */
228
229     /* Monitors. */
230     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
231
232     /* Network connectivity. */
233     struct jsonrpc_session *js;  /* JSON-RPC session. */
234     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
235 };
236
237 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
238 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
239 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
240 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
241                                              struct jsonrpc_msg *);
242 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
243                                              struct jsonrpc_msg *);
244
245 static struct ovsdb_jsonrpc_session *
246 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
247                              struct jsonrpc_session *js)
248 {
249     struct ovsdb_jsonrpc_session *s;
250
251     s = xzalloc(sizeof *s);
252     s->remote = remote;
253     list_push_back(&remote->sessions, &s->node);
254     hmap_init(&s->triggers);
255     hmap_init(&s->monitors);
256     list_init(&s->completions);
257     s->js = js;
258     s->js_seqno = jsonrpc_session_get_seqno(js);
259
260     remote->server->n_sessions++;
261
262     return s;
263 }
264
265 static void
266 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
267 {
268     ovsdb_jsonrpc_monitor_remove_all(s);
269     jsonrpc_session_close(s->js);
270     list_remove(&s->node);
271     s->remote->server->n_sessions--;
272     free(s);
273 }
274
275 static int
276 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
277 {
278     jsonrpc_session_run(s->js);
279     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
280         s->js_seqno = jsonrpc_session_get_seqno(s->js);
281         ovsdb_jsonrpc_trigger_complete_all(s);
282         ovsdb_jsonrpc_monitor_remove_all(s);
283     }
284
285     ovsdb_jsonrpc_trigger_complete_done(s);
286
287     if (!jsonrpc_session_get_backlog(s->js)) {
288         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
289         if (msg) {
290             if (msg->type == JSONRPC_REQUEST) {
291                 ovsdb_jsonrpc_session_got_request(s, msg);
292             } else if (msg->type == JSONRPC_NOTIFY) {
293                 ovsdb_jsonrpc_session_got_notify(s, msg);
294             } else {
295                 VLOG_WARN("%s: received unexpected %s message",
296                           jsonrpc_session_get_name(s->js),
297                           jsonrpc_msg_type_to_string(msg->type));
298                 jsonrpc_session_force_reconnect(s->js);
299                 jsonrpc_msg_destroy(msg);
300             }
301         }
302     }
303     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
304 }
305
306 static void
307 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
308 {
309     struct ovsdb_jsonrpc_session *s, *next;
310
311     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
312                         &remote->sessions) {
313         int error = ovsdb_jsonrpc_session_run(s);
314         if (error) {
315             ovsdb_jsonrpc_session_close(s);
316         }
317     }
318 }
319
320 static void
321 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
322 {
323     jsonrpc_session_wait(s->js);
324     if (!jsonrpc_session_get_backlog(s->js)) {
325         jsonrpc_session_recv_wait(s->js);
326     }
327 }
328
329 static void
330 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
331 {
332     struct ovsdb_jsonrpc_session *s;
333
334     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &remote->sessions) {
335         ovsdb_jsonrpc_session_wait(s);
336     }
337 }
338
339 static void
340 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
341 {
342     struct ovsdb_jsonrpc_session *s, *next;
343
344     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
345                         &remote->sessions) {
346         ovsdb_jsonrpc_session_close(s);
347     }
348 }
349
350 static const char *
351 get_db_name(const struct ovsdb_jsonrpc_session *s)
352 {
353     return s->remote->server->db->schema->name;
354 }
355
356 static struct jsonrpc_msg *
357 ovsdb_jsonrpc_check_db_name(const struct ovsdb_jsonrpc_session *s,
358                             const struct jsonrpc_msg *request)
359 {
360     struct json_array *params;
361     const char *want_db_name;
362     const char *have_db_name;
363     struct ovsdb_error *error;
364     struct jsonrpc_msg *reply;
365
366     params = json_array(request->params);
367     if (!params->n || params->elems[0]->type != JSON_STRING) {
368         error = ovsdb_syntax_error(
369             request->params, NULL,
370             "%s request params must begin with <db-name>", request->method);
371         goto error;
372     }
373
374     want_db_name = params->elems[0]->u.string;
375     have_db_name = get_db_name(s);
376     if (strcmp(want_db_name, have_db_name)) {
377         error = ovsdb_syntax_error(
378             request->params, "unknown database",
379             "%s request specifies unknown database %s",
380             request->method, want_db_name);
381         goto error;
382     }
383
384     return NULL;
385
386 error:
387     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
388     ovsdb_error_destroy(error);
389     return reply;
390 }
391
392 static struct jsonrpc_msg *
393 execute_transaction(struct ovsdb_jsonrpc_session *s,
394                     struct jsonrpc_msg *request)
395 {
396     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
397     request->id = NULL;
398     request->params = NULL;
399     jsonrpc_msg_destroy(request);
400     return NULL;
401 }
402
403 static void
404 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
405                                   struct jsonrpc_msg *request)
406 {
407     struct jsonrpc_msg *reply;
408
409     if (!strcmp(request->method, "transact")) {
410         reply = ovsdb_jsonrpc_check_db_name(s, request);
411         if (!reply) {
412             reply = execute_transaction(s, request);
413         }
414     } else if (!strcmp(request->method, "monitor")) {
415         reply = ovsdb_jsonrpc_check_db_name(s, request);
416         if (!reply) {
417             reply = jsonrpc_create_reply(
418                 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
419         }
420     } else if (!strcmp(request->method, "monitor_cancel")) {
421         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
422                                              request->id);
423     } else if (!strcmp(request->method, "get_schema")) {
424         reply = ovsdb_jsonrpc_check_db_name(s, request);
425         if (!reply) {
426             reply = jsonrpc_create_reply(
427                 ovsdb_schema_to_json(s->remote->server->db->schema),
428                 request->id);
429         }
430     } else if (!strcmp(request->method, "list_dbs")) {
431         reply = jsonrpc_create_reply(
432             json_array_create_1(json_string_create(get_db_name(s))),
433             request->id);
434     } else if (!strcmp(request->method, "echo")) {
435         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
436     } else {
437         reply = jsonrpc_create_error(json_string_create("unknown method"),
438                                      request->id);
439     }
440
441     if (reply) {
442         jsonrpc_msg_destroy(request);
443         jsonrpc_session_send(s->js, reply);
444     }
445 }
446
447 static void
448 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
449 {
450     if (json_array(request->params)->n == 1) {
451         struct ovsdb_jsonrpc_trigger *t;
452         struct json *id;
453
454         id = request->params->u.array.elems[0];
455         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
456         if (t) {
457             ovsdb_jsonrpc_trigger_complete(t);
458         }
459     }
460 }
461
462 static void
463 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
464                                  struct jsonrpc_msg *request)
465 {
466     if (!strcmp(request->method, "cancel")) {
467         execute_cancel(s, request);
468     }
469     jsonrpc_msg_destroy(request);
470 }
471 \f
472 /* JSON-RPC database server triggers.
473  *
474  * (Every transaction is treated as a trigger even if it doesn't actually have
475  * any "wait" operations.) */
476
477 struct ovsdb_jsonrpc_trigger {
478     struct ovsdb_trigger trigger;
479     struct ovsdb_jsonrpc_session *session;
480     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
481     struct json *id;
482 };
483
484 static void
485 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
486                              struct json *id, struct json *params)
487 {
488     struct ovsdb_jsonrpc_trigger *t;
489     size_t hash;
490
491     /* Check for duplicate ID. */
492     hash = json_hash(id, 0);
493     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
494     if (t) {
495         struct jsonrpc_msg *msg;
496
497         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
498                                    id);
499         jsonrpc_session_send(s->js, msg);
500         json_destroy(id);
501         json_destroy(params);
502         return;
503     }
504
505     /* Insert into trigger table. */
506     t = xmalloc(sizeof *t);
507     ovsdb_trigger_init(s->remote->server->db,
508                        &t->trigger, params, &s->completions,
509                        time_msec());
510     t->session = s;
511     t->id = id;
512     hmap_insert(&s->triggers, &t->hmap_node, hash);
513
514     /* Complete early if possible. */
515     if (ovsdb_trigger_is_complete(&t->trigger)) {
516         ovsdb_jsonrpc_trigger_complete(t);
517     }
518 }
519
520 static struct ovsdb_jsonrpc_trigger *
521 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
522                            const struct json *id, size_t hash)
523 {
524     struct ovsdb_jsonrpc_trigger *t;
525
526     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
527                              &s->triggers) {
528         if (json_equal(t->id, id)) {
529             return t;
530         }
531     }
532
533     return NULL;
534 }
535
536 static void
537 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
538 {
539     struct ovsdb_jsonrpc_session *s = t->session;
540
541     if (jsonrpc_session_is_connected(s->js)) {
542         struct jsonrpc_msg *reply;
543         struct json *result;
544
545         result = ovsdb_trigger_steal_result(&t->trigger);
546         if (result) {
547             reply = jsonrpc_create_reply(result, t->id);
548         } else {
549             reply = jsonrpc_create_error(json_string_create("canceled"),
550                                          t->id);
551         }
552         jsonrpc_session_send(s->js, reply);
553     }
554
555     json_destroy(t->id);
556     ovsdb_trigger_destroy(&t->trigger);
557     hmap_remove(&s->triggers, &t->hmap_node);
558     free(t);
559 }
560
561 static void
562 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
563 {
564     struct ovsdb_jsonrpc_trigger *t, *next;
565     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
566                         &s->triggers) {
567         ovsdb_jsonrpc_trigger_complete(t);
568     }
569 }
570
571 static void
572 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
573 {
574     while (!list_is_empty(&s->completions)) {
575         struct ovsdb_jsonrpc_trigger *t
576             = CONTAINER_OF(s->completions.next,
577                            struct ovsdb_jsonrpc_trigger, trigger.node);
578         ovsdb_jsonrpc_trigger_complete(t);
579     }
580 }
581 \f
582 /* JSON-RPC database table monitors. */
583
584 enum ovsdb_jsonrpc_monitor_selection {
585     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
586     OJMS_INSERT = 1 << 1,       /* New rows. */
587     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
588     OJMS_MODIFY = 1 << 3        /* Modified rows. */
589 };
590
591 struct ovsdb_jsonrpc_monitor_table {
592     const struct ovsdb_table *table;
593     enum ovsdb_jsonrpc_monitor_selection select;
594     struct ovsdb_column_set columns;
595 };
596
597 struct ovsdb_jsonrpc_monitor {
598     struct ovsdb_replica replica;
599     struct ovsdb_jsonrpc_session *session;
600     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
601
602     struct json *monitor_id;
603     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
604 };
605
606 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
607
608 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
609     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
610 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
611 static struct json *ovsdb_jsonrpc_monitor_get_initial(
612     const struct ovsdb_jsonrpc_monitor *);
613
614 static bool
615 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
616 {
617     const struct json *json;
618
619     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
620     return json ? json_boolean(json) : default_value;
621 }
622
623 struct ovsdb_jsonrpc_monitor *
624 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
625                            const struct json *monitor_id)
626 {
627     struct ovsdb_jsonrpc_monitor *m;
628
629     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
630                              json_hash(monitor_id, 0), &s->monitors) {
631         if (json_equal(m->monitor_id, monitor_id)) {
632             return m;
633         }
634     }
635
636     return NULL;
637 }
638
639 static struct json *
640 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
641                              struct json *params)
642 {
643     struct ovsdb_jsonrpc_monitor *m = NULL;
644     struct json *monitor_id, *monitor_requests;
645     struct ovsdb_error *error = NULL;
646     struct shash_node *node;
647     struct json *json;
648
649     if (json_array(params)->n != 3) {
650         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
651         goto error;
652     }
653     monitor_id = params->u.array.elems[1];
654     monitor_requests = params->u.array.elems[2];
655     if (monitor_requests->type != JSON_OBJECT) {
656         error = ovsdb_syntax_error(monitor_requests, NULL,
657                                    "monitor-requests must be object");
658         goto error;
659     }
660
661     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
662         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
663         goto error;
664     }
665
666     m = xzalloc(sizeof *m);
667     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
668     ovsdb_add_replica(s->remote->server->db, &m->replica);
669     m->session = s;
670     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
671     m->monitor_id = json_clone(monitor_id);
672     shash_init(&m->tables);
673
674     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
675         const struct ovsdb_table *table;
676         struct ovsdb_jsonrpc_monitor_table *mt;
677         const struct json *columns_json, *select_json;
678         struct ovsdb_parser parser;
679
680         table = ovsdb_get_table(s->remote->server->db, node->name);
681         if (!table) {
682             error = ovsdb_syntax_error(NULL, NULL,
683                                        "no table named %s", node->name);
684             goto error;
685         }
686
687         mt = xzalloc(sizeof *mt);
688         mt->table = table;
689         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
690         ovsdb_column_set_init(&mt->columns);
691         shash_add(&m->tables, table->schema->name, mt);
692
693         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
694         columns_json = ovsdb_parser_member(&parser, "columns",
695                                            OP_ARRAY | OP_OPTIONAL);
696         select_json = ovsdb_parser_member(&parser, "select",
697                                           OP_OBJECT | OP_OPTIONAL);
698         error = ovsdb_parser_finish(&parser);
699         if (error) {
700             goto error;
701         }
702
703         if (columns_json) {
704             error = ovsdb_column_set_from_json(columns_json, table,
705                                                &mt->columns);
706             if (error) {
707                 goto error;
708             }
709         } else {
710             struct shash_node *node;
711
712             SHASH_FOR_EACH (node, &table->schema->columns) {
713                 const struct ovsdb_column *column = node->data;
714                 if (column->index != OVSDB_COL_UUID) {
715                     ovsdb_column_set_add(&mt->columns, column);
716                 }
717             }
718         }
719
720         if (select_json) {
721             mt->select = 0;
722             ovsdb_parser_init(&parser, select_json, "table %s select",
723                               table->schema->name);
724             if (parse_bool(&parser, "initial", true)) {
725                 mt->select |= OJMS_INITIAL;
726             }
727             if (parse_bool(&parser, "insert", true)) {
728                 mt->select |= OJMS_INSERT;
729             }
730             if (parse_bool(&parser, "delete", true)) {
731                 mt->select |= OJMS_DELETE;
732             }
733             if (parse_bool(&parser, "modify", true)) {
734                 mt->select |= OJMS_MODIFY;
735             }
736             error = ovsdb_parser_finish(&parser);
737             if (error) {
738                 goto error;
739             }
740         }
741     }
742
743     return ovsdb_jsonrpc_monitor_get_initial(m);
744
745 error:
746     if (m) {
747         ovsdb_remove_replica(s->remote->server->db, &m->replica);
748     }
749
750     json = ovsdb_error_to_json(error);
751     ovsdb_error_destroy(error);
752     return json;
753 }
754
755 static struct jsonrpc_msg *
756 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
757                              struct json_array *params,
758                              const struct json *request_id)
759 {
760     if (params->n != 1) {
761         return jsonrpc_create_error(json_string_create("invalid parameters"),
762                                     request_id);
763     } else {
764         struct ovsdb_jsonrpc_monitor *m;
765
766         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
767         if (!m) {
768             return jsonrpc_create_error(json_string_create("unknown monitor"),
769                                         request_id);
770         } else {
771             ovsdb_remove_replica(s->remote->server->db, &m->replica);
772             return jsonrpc_create_reply(json_object_create(), request_id);
773         }
774     }
775 }
776
777 static void
778 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
779 {
780     struct ovsdb_jsonrpc_monitor *m, *next;
781
782     HMAP_FOR_EACH_SAFE (m, next,
783                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
784         ovsdb_remove_replica(s->remote->server->db, &m->replica);
785     }
786 }
787
788 static struct ovsdb_jsonrpc_monitor *
789 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
790 {
791     assert(replica->class == &ovsdb_jsonrpc_replica_class);
792     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
793 }
794
795 struct ovsdb_jsonrpc_monitor_aux {
796     bool initial;               /* Sending initial contents of table? */
797     const struct ovsdb_jsonrpc_monitor *monitor;
798     struct json *json;          /* JSON for the whole transaction. */
799
800     /* Current table.  */
801     struct ovsdb_jsonrpc_monitor_table *mt;
802     struct json *table_json;    /* JSON for table's transaction. */
803 };
804
805 static bool
806 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
807                                 const struct ovsdb_row *new,
808                                 const unsigned long int *changed,
809                                 void *aux_)
810 {
811     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
812     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
813     struct ovsdb_table *table = new ? new->table : old->table;
814     enum ovsdb_jsonrpc_monitor_selection type;
815     struct json *old_json, *new_json;
816     struct json *row_json;
817     char uuid[UUID_LEN + 1];
818     int n_changed;
819     size_t i;
820
821     if (!aux->mt || table != aux->mt->table) {
822         aux->mt = shash_find_data(&m->tables, table->schema->name);
823         aux->table_json = NULL;
824         if (!aux->mt) {
825             /* We don't care about rows in this table at all.  Tell the caller
826              * to skip it.  */
827             return false;
828         }
829     }
830
831     type = (aux->initial ? OJMS_INITIAL
832             : !old ? OJMS_INSERT
833             : !new ? OJMS_DELETE
834             : OJMS_MODIFY);
835     if (!(aux->mt->select & type)) {
836         /* We don't care about this type of change (but do want to be called
837          * back for changes to other rows in the same table). */
838         return true;
839     }
840
841     old_json = new_json = NULL;
842     n_changed = 0;
843     for (i = 0; i < aux->mt->columns.n_columns; i++) {
844         const struct ovsdb_column *column = aux->mt->columns.columns[i];
845         unsigned int idx = column->index;
846         bool column_changed = false;
847
848         if (type == OJMS_MODIFY) {
849             column_changed = bitmap_is_set(changed, idx);
850             n_changed += column_changed;
851         }
852         if (column_changed || type == OJMS_DELETE) {
853             if (!old_json) {
854                 old_json = json_object_create();
855             }
856             json_object_put(old_json, column->name,
857                             ovsdb_datum_to_json(&old->fields[idx],
858                                                 &column->type));
859         }
860         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
861             if (!new_json) {
862                 new_json = json_object_create();
863             }
864             json_object_put(new_json, column->name,
865                             ovsdb_datum_to_json(&new->fields[idx],
866                                                 &column->type));
867         }
868     }
869     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
870         /* No reportable changes. */
871         json_destroy(old_json);
872         json_destroy(new_json);
873         return true;
874     }
875
876     /* Create JSON object for transaction overall. */
877     if (!aux->json) {
878         aux->json = json_object_create();
879     }
880
881     /* Create JSON object for transaction on this table. */
882     if (!aux->table_json) {
883         aux->table_json = json_object_create();
884         json_object_put(aux->json, aux->mt->table->schema->name,
885                         aux->table_json);
886     }
887
888     /* Create JSON object for transaction on this row. */
889     row_json = json_object_create();
890     if (old_json) {
891         json_object_put(row_json, "old", old_json);
892     }
893     if (new_json) {
894         json_object_put(row_json, "new", new_json);
895     }
896
897     /* Add JSON row to JSON table. */
898     snprintf(uuid, sizeof uuid,
899              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
900     json_object_put(aux->table_json, uuid, row_json);
901
902     return true;
903 }
904
905 static void
906 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
907                                const struct ovsdb_jsonrpc_monitor *m,
908                                bool initial)
909 {
910     aux->initial = initial;
911     aux->monitor = m;
912     aux->json = NULL;
913     aux->mt = NULL;
914     aux->table_json = NULL;
915 }
916
917 static struct ovsdb_error *
918 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
919                              const struct ovsdb_txn *txn,
920                              bool durable OVS_UNUSED)
921 {
922     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
923     struct ovsdb_jsonrpc_monitor_aux aux;
924
925     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
926     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
927     if (aux.json) {
928         struct jsonrpc_msg *msg;
929         struct json *params;
930
931         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
932                                      aux.json);
933         msg = jsonrpc_create_notify("update", params);
934         jsonrpc_session_send(aux.monitor->session->js, msg);
935     }
936
937     return NULL;
938 }
939
940 static struct json *
941 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
942 {
943     struct ovsdb_jsonrpc_monitor_aux aux;
944     struct shash_node *node;
945
946     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
947     SHASH_FOR_EACH (node, &m->tables) {
948         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
949
950         if (mt->select & OJMS_INITIAL) {
951             struct ovsdb_row *row;
952
953             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
954                            &mt->table->rows) {
955                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
956             }
957         }
958     }
959     return aux.json ? aux.json : json_object_create();
960 }
961
962 static void
963 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
964 {
965     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
966     struct shash_node *node;
967
968     json_destroy(m->monitor_id);
969     SHASH_FOR_EACH (node, &m->tables) {
970         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
971         ovsdb_column_set_destroy(&mt->columns);
972         free(mt);
973     }
974     shash_destroy(&m->tables);
975     hmap_remove(&m->session->monitors, &m->node);
976     free(m);
977 }
978
979 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
980     ovsdb_jsonrpc_monitor_commit,
981     ovsdb_jsonrpc_monitor_destroy
982 };