Global replace of Nicira Networks.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "dynamic-string.h"
26 #include "json.h"
27 #include "jsonrpc.h"
28 #include "ovsdb-error.h"
29 #include "ovsdb-parser.h"
30 #include "ovsdb.h"
31 #include "reconnect.h"
32 #include "row.h"
33 #include "server.h"
34 #include "stream.h"
35 #include "table.h"
36 #include "timeval.h"
37 #include "transaction.h"
38 #include "trigger.h"
39 #include "vlog.h"
40
41 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
42
43 struct ovsdb_jsonrpc_remote;
44 struct ovsdb_jsonrpc_session;
45
46 /* Message rate-limiting. */
47 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
48
49 /* Sessions. */
50 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
51     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
52 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
53 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
55 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
56 static void ovsdb_jsonrpc_session_set_all_options(
57     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
58 static bool ovsdb_jsonrpc_session_get_status(
59     const struct ovsdb_jsonrpc_remote *,
60     struct ovsdb_jsonrpc_remote_status *);
61 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
62 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
63
64 /* Triggers. */
65 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
66                                          struct json *id, struct json *params);
67 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
68     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
69 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
70 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
71 static void ovsdb_jsonrpc_trigger_complete_done(
72     struct ovsdb_jsonrpc_session *);
73
74 /* Monitors. */
75 static struct json *ovsdb_jsonrpc_monitor_create(
76     struct ovsdb_jsonrpc_session *, struct json *params);
77 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
78     struct ovsdb_jsonrpc_session *,
79     struct json_array *params,
80     const struct json *request_id);
81 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
82 \f
83 /* JSON-RPC database server. */
84
85 struct ovsdb_jsonrpc_server {
86     struct ovsdb_server up;
87     unsigned int n_sessions, max_sessions;
88     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
89 };
90
91 /* A configured remote.  This is either a passive stream listener plus a list
92  * of the currently connected sessions, or a list of exactly one active
93  * session. */
94 struct ovsdb_jsonrpc_remote {
95     struct ovsdb_jsonrpc_server *server;
96     struct pstream *listener;   /* Listener, if passive. */
97     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
98 };
99
100 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
101     struct ovsdb_jsonrpc_server *, const char *name,
102     const struct ovsdb_jsonrpc_options *options
103 );
104 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
105
106 struct ovsdb_jsonrpc_server *
107 ovsdb_jsonrpc_server_create(struct ovsdb *db)
108 {
109     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
110     ovsdb_server_init(&server->up, db);
111     server->max_sessions = 64;
112     shash_init(&server->remotes);
113     return server;
114 }
115
116 void
117 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
118 {
119     struct shash_node *node, *next;
120
121     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
122         ovsdb_jsonrpc_server_del_remote(node);
123     }
124     shash_destroy(&svr->remotes);
125     ovsdb_server_destroy(&svr->up);
126     free(svr);
127 }
128
129 struct ovsdb_jsonrpc_options *
130 ovsdb_jsonrpc_default_options(const char *target)
131 {
132     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
133     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
134     options->probe_interval = (stream_or_pstream_needs_probes(target)
135                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
136                                : 0);
137     return options;
138 }
139
140 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
141  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
142  *
143  * A remote is an active or passive stream connection method, e.g. "pssl:" or
144  * "tcp:1.2.3.4". */
145 void
146 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
147                                  const struct shash *new_remotes)
148 {
149     struct shash_node *node, *next;
150
151     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
152         if (!shash_find(new_remotes, node->name)) {
153             VLOG_INFO("%s: remote deconfigured", node->name);
154             ovsdb_jsonrpc_server_del_remote(node);
155         }
156     }
157     SHASH_FOR_EACH (node, new_remotes) {
158         const struct ovsdb_jsonrpc_options *options = node->data;
159         struct ovsdb_jsonrpc_remote *remote;
160
161         remote = shash_find_data(&svr->remotes, node->name);
162         if (!remote) {
163             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name, options);
164             if (!remote) {
165                 continue;
166             }
167         }
168
169         ovsdb_jsonrpc_session_set_all_options(remote, options);
170     }
171 }
172
173 static struct ovsdb_jsonrpc_remote *
174 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
175                                 const char *name,
176                                 const struct ovsdb_jsonrpc_options *options)
177 {
178     struct ovsdb_jsonrpc_remote *remote;
179     struct pstream *listener;
180     int error;
181
182     error = jsonrpc_pstream_open(name, &listener, options->dscp);
183     if (error && error != EAFNOSUPPORT) {
184         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
185         return NULL;
186     }
187
188     remote = xmalloc(sizeof *remote);
189     remote->server = svr;
190     remote->listener = listener;
191     list_init(&remote->sessions);
192     shash_add(&svr->remotes, name, remote);
193
194     if (!listener) {
195         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
196     }
197     return remote;
198 }
199
200 static void
201 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
202 {
203     struct ovsdb_jsonrpc_remote *remote = node->data;
204
205     ovsdb_jsonrpc_session_close_all(remote);
206     pstream_close(remote->listener);
207     shash_delete(&remote->server->remotes, node);
208     free(remote);
209 }
210
211 /* Stores status information for the remote named 'target', which should have
212  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
213  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
214  * a remote named 'target' or if that remote is an inbound remote that has no
215  * active connections) returns false.  On failure, 'status' will be zeroed.
216  */
217 bool
218 ovsdb_jsonrpc_server_get_remote_status(
219     const struct ovsdb_jsonrpc_server *svr, const char *target,
220     struct ovsdb_jsonrpc_remote_status *status)
221 {
222     const struct ovsdb_jsonrpc_remote *remote;
223
224     memset(status, 0, sizeof *status);
225
226     remote = shash_find_data(&svr->remotes, target);
227     return remote && ovsdb_jsonrpc_session_get_status(remote, status);
228 }
229
230 void
231 ovsdb_jsonrpc_server_free_remote_status(
232     struct ovsdb_jsonrpc_remote_status *status)
233 {
234     free(status->locks_held);
235     free(status->locks_waiting);
236     free(status->locks_lost);
237 }
238
239 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
240  * reconnect. */
241 void
242 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
243 {
244     struct shash_node *node;
245
246     SHASH_FOR_EACH (node, &svr->remotes) {
247         struct ovsdb_jsonrpc_remote *remote = node->data;
248
249         ovsdb_jsonrpc_session_reconnect_all(remote);
250     }
251 }
252
253 void
254 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
255 {
256     struct shash_node *node;
257
258     SHASH_FOR_EACH (node, &svr->remotes) {
259         struct ovsdb_jsonrpc_remote *remote = node->data;
260
261         if (remote->listener && svr->n_sessions < svr->max_sessions) {
262             struct stream *stream;
263             int error;
264
265             error = pstream_accept(remote->listener, &stream);
266             if (!error) {
267                 struct jsonrpc_session *js;
268                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
269                 ovsdb_jsonrpc_session_create(remote, js);
270             } else if (error != EAGAIN) {
271                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
272                              pstream_get_name(remote->listener),
273                              strerror(error));
274             }
275         }
276
277         ovsdb_jsonrpc_session_run_all(remote);
278     }
279 }
280
281 void
282 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
283 {
284     struct shash_node *node;
285
286     SHASH_FOR_EACH (node, &svr->remotes) {
287         struct ovsdb_jsonrpc_remote *remote = node->data;
288
289         if (remote->listener && svr->n_sessions < svr->max_sessions) {
290             pstream_wait(remote->listener);
291         }
292
293         ovsdb_jsonrpc_session_wait_all(remote);
294     }
295 }
296 \f
297 /* JSON-RPC database server session. */
298
299 struct ovsdb_jsonrpc_session {
300     struct list node;           /* Element in remote's sessions list. */
301     struct ovsdb_session up;
302     struct ovsdb_jsonrpc_remote *remote;
303
304     /* Triggers. */
305     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
306
307     /* Monitors. */
308     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
309
310     /* Network connectivity. */
311     struct jsonrpc_session *js;  /* JSON-RPC session. */
312     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
313 };
314
315 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
316 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
317 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
318 static void ovsdb_jsonrpc_session_set_options(
319     struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
320 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
321                                              struct jsonrpc_msg *);
322 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
323                                              struct jsonrpc_msg *);
324
325 static struct ovsdb_jsonrpc_session *
326 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
327                              struct jsonrpc_session *js)
328 {
329     struct ovsdb_jsonrpc_session *s;
330
331     s = xzalloc(sizeof *s);
332     ovsdb_session_init(&s->up, remote->server->up.db);
333     s->remote = remote;
334     list_push_back(&remote->sessions, &s->node);
335     hmap_init(&s->triggers);
336     hmap_init(&s->monitors);
337     s->js = js;
338     s->js_seqno = jsonrpc_session_get_seqno(js);
339
340     remote->server->n_sessions++;
341
342     return s;
343 }
344
345 static void
346 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
347 {
348     ovsdb_jsonrpc_monitor_remove_all(s);
349     ovsdb_jsonrpc_session_unlock_all(s);
350     jsonrpc_session_close(s->js);
351     list_remove(&s->node);
352     ovsdb_session_destroy(&s->up);
353     s->remote->server->n_sessions--;
354     ovsdb_session_destroy(&s->up);
355     free(s);
356 }
357
358 static int
359 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
360 {
361     jsonrpc_session_run(s->js);
362     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
363         s->js_seqno = jsonrpc_session_get_seqno(s->js);
364         ovsdb_jsonrpc_trigger_complete_all(s);
365         ovsdb_jsonrpc_monitor_remove_all(s);
366         ovsdb_jsonrpc_session_unlock_all(s);
367     }
368
369     ovsdb_jsonrpc_trigger_complete_done(s);
370
371     if (!jsonrpc_session_get_backlog(s->js)) {
372         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
373         if (msg) {
374             if (msg->type == JSONRPC_REQUEST) {
375                 ovsdb_jsonrpc_session_got_request(s, msg);
376             } else if (msg->type == JSONRPC_NOTIFY) {
377                 ovsdb_jsonrpc_session_got_notify(s, msg);
378             } else {
379                 VLOG_WARN("%s: received unexpected %s message",
380                           jsonrpc_session_get_name(s->js),
381                           jsonrpc_msg_type_to_string(msg->type));
382                 jsonrpc_session_force_reconnect(s->js);
383                 jsonrpc_msg_destroy(msg);
384             }
385         }
386     }
387     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
388 }
389
390 static void
391 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
392                                   const struct ovsdb_jsonrpc_options *options)
393 {
394     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
395     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
396     jsonrpc_session_set_dscp(session->js, options->dscp);
397 }
398
399 static void
400 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
401 {
402     struct ovsdb_jsonrpc_session *s, *next;
403
404     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
405         int error = ovsdb_jsonrpc_session_run(s);
406         if (error) {
407             ovsdb_jsonrpc_session_close(s);
408         }
409     }
410 }
411
412 static void
413 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
414 {
415     jsonrpc_session_wait(s->js);
416     if (!jsonrpc_session_get_backlog(s->js)) {
417         jsonrpc_session_recv_wait(s->js);
418     }
419 }
420
421 static void
422 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
423 {
424     struct ovsdb_jsonrpc_session *s;
425
426     LIST_FOR_EACH (s, node, &remote->sessions) {
427         ovsdb_jsonrpc_session_wait(s);
428     }
429 }
430
431 static void
432 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
433 {
434     struct ovsdb_jsonrpc_session *s, *next;
435
436     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
437         ovsdb_jsonrpc_session_close(s);
438     }
439 }
440
441 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
442  * reconnect. */
443 static void
444 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
445 {
446     struct ovsdb_jsonrpc_session *s, *next;
447
448     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
449         jsonrpc_session_force_reconnect(s->js);
450         if (!jsonrpc_session_is_alive(s->js)) {
451             ovsdb_jsonrpc_session_close(s);
452         }
453     }
454 }
455
456 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
457  * 'options'. */
458 static void
459 ovsdb_jsonrpc_session_set_all_options(
460     struct ovsdb_jsonrpc_remote *remote,
461     const struct ovsdb_jsonrpc_options *options)
462 {
463     struct ovsdb_jsonrpc_session *s;
464
465     LIST_FOR_EACH (s, node, &remote->sessions) {
466         ovsdb_jsonrpc_session_set_options(s, options);
467     }
468 }
469
470 static bool
471 ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
472                                  struct ovsdb_jsonrpc_remote_status *status)
473 {
474     const struct ovsdb_jsonrpc_session *s;
475     const struct jsonrpc_session *js;
476     struct ovsdb_lock_waiter *waiter;
477     struct reconnect_stats rstats;
478     struct ds locks_held, locks_waiting, locks_lost;
479
480     if (list_is_empty(&remote->sessions)) {
481         return false;
482     }
483     s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
484     js = s->js;
485
486     status->is_connected = jsonrpc_session_is_connected(js);
487     status->last_error = jsonrpc_session_get_status(js);
488
489     jsonrpc_session_get_reconnect_stats(js, &rstats);
490     status->state = rstats.state;
491     status->sec_since_connect = rstats.msec_since_connect == UINT_MAX
492         ? UINT_MAX : rstats.msec_since_connect / 1000;
493     status->sec_since_disconnect = rstats.msec_since_disconnect == UINT_MAX
494         ? UINT_MAX : rstats.msec_since_disconnect / 1000;
495
496     ds_init(&locks_held);
497     ds_init(&locks_waiting);
498     ds_init(&locks_lost);
499     HMAP_FOR_EACH (waiter, session_node, &s->up.waiters) {
500         struct ds *string;
501
502         string = (ovsdb_lock_waiter_is_owner(waiter) ? &locks_held
503                   : waiter->mode == OVSDB_LOCK_WAIT ? &locks_waiting
504                   : &locks_lost);
505         if (string->length) {
506             ds_put_char(string, ' ');
507         }
508         ds_put_cstr(string, waiter->lock_name);
509     }
510     status->locks_held = ds_steal_cstr(&locks_held);
511     status->locks_waiting = ds_steal_cstr(&locks_waiting);
512     status->locks_lost = ds_steal_cstr(&locks_lost);
513
514     status->n_connections = list_size(&remote->sessions);
515
516     return true;
517 }
518
519 static const char *
520 get_db_name(const struct ovsdb_jsonrpc_session *s)
521 {
522     return s->remote->server->up.db->schema->name;
523 }
524
525 static struct jsonrpc_msg *
526 ovsdb_jsonrpc_check_db_name(const struct ovsdb_jsonrpc_session *s,
527                             const struct jsonrpc_msg *request)
528 {
529     struct json_array *params;
530     const char *want_db_name;
531     const char *have_db_name;
532     struct ovsdb_error *error;
533     struct jsonrpc_msg *reply;
534
535     params = json_array(request->params);
536     if (!params->n || params->elems[0]->type != JSON_STRING) {
537         error = ovsdb_syntax_error(
538             request->params, NULL,
539             "%s request params must begin with <db-name>", request->method);
540         goto error;
541     }
542
543     want_db_name = params->elems[0]->u.string;
544     have_db_name = get_db_name(s);
545     if (strcmp(want_db_name, have_db_name)) {
546         error = ovsdb_syntax_error(
547             request->params, "unknown database",
548             "%s request specifies unknown database %s",
549             request->method, want_db_name);
550         goto error;
551     }
552
553     return NULL;
554
555 error:
556     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
557     ovsdb_error_destroy(error);
558     return reply;
559 }
560
561 static struct ovsdb_error *
562 ovsdb_jsonrpc_session_parse_lock_name(const struct jsonrpc_msg *request,
563                                       const char **lock_namep)
564 {
565     const struct json_array *params;
566
567     params = json_array(request->params);
568     if (params->n != 1 || params->elems[0]->type != JSON_STRING ||
569         !ovsdb_parser_is_id(json_string(params->elems[0]))) {
570         *lock_namep = NULL;
571         return ovsdb_syntax_error(request->params, NULL,
572                                   "%s request params must be <id>",
573                                   request->method);
574     }
575
576     *lock_namep = json_string(params->elems[0]);
577     return NULL;
578 }
579
580 static void
581 ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
582                              const char *lock_name,
583                              const char *method)
584 {
585     struct ovsdb_jsonrpc_session *s;
586     struct json *params;
587
588     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
589     params = json_array_create_1(json_string_create(lock_name));
590     jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
591 }
592
593 static struct jsonrpc_msg *
594 ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
595                            struct jsonrpc_msg *request,
596                            enum ovsdb_lock_mode mode)
597 {
598     struct ovsdb_lock_waiter *waiter;
599     struct jsonrpc_msg *reply;
600     struct ovsdb_error *error;
601     struct ovsdb_session *victim;
602     const char *lock_name;
603     struct json *result;
604
605     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
606     if (error) {
607         goto error;
608     }
609
610     /* Report error if this session has issued a "lock" or "steal" without a
611      * matching "unlock" for this lock. */
612     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
613     if (waiter) {
614         error = ovsdb_syntax_error(
615             request->params, NULL,
616             "must issue \"unlock\" before new \"%s\"", request->method);
617         goto error;
618     }
619
620     /* Get the lock, add us as a waiter. */
621     waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
622                                &victim);
623     if (victim) {
624         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
625     }
626
627     result = json_object_create();
628     json_object_put(result, "locked",
629                     json_boolean_create(ovsdb_lock_waiter_is_owner(waiter)));
630
631     return jsonrpc_create_reply(result, request->id);
632
633 error:
634     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
635     ovsdb_error_destroy(error);
636     return reply;
637 }
638
639 static void
640 ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *s)
641 {
642     struct ovsdb_lock_waiter *waiter, *next;
643
644     HMAP_FOR_EACH_SAFE (waiter, next, session_node, &s->up.waiters) {
645         ovsdb_jsonrpc_session_unlock__(waiter);
646     }
647 }
648
649 static void
650 ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *waiter)
651 {
652     struct ovsdb_lock *lock = waiter->lock;
653
654     if (lock) {
655         struct ovsdb_session *new_owner = ovsdb_lock_waiter_remove(waiter);
656         if (new_owner) {
657             ovsdb_jsonrpc_session_notify(new_owner, lock->name, "locked");
658         } else {
659             /* ovsdb_server_lock() might have freed 'lock'. */
660         }
661     }
662
663     ovsdb_lock_waiter_destroy(waiter);
664 }
665
666 static struct jsonrpc_msg *
667 ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
668                              struct jsonrpc_msg *request)
669 {
670     struct ovsdb_lock_waiter *waiter;
671     struct jsonrpc_msg *reply;
672     struct ovsdb_error *error;
673     const char *lock_name;
674
675     error = ovsdb_jsonrpc_session_parse_lock_name(request, &lock_name);
676     if (error) {
677         goto error;
678     }
679
680     /* Report error if this session has not issued a "lock" or "steal" for this
681      * lock. */
682     waiter = ovsdb_session_get_lock_waiter(&s->up, lock_name);
683     if (!waiter) {
684         error = ovsdb_syntax_error(
685             request->params, NULL, "\"unlock\" without \"lock\" or \"steal\"");
686         goto error;
687     }
688
689     ovsdb_jsonrpc_session_unlock__(waiter);
690
691     return jsonrpc_create_reply(json_object_create(), request->id);
692
693 error:
694     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
695     ovsdb_error_destroy(error);
696     return reply;
697 }
698
699 static struct jsonrpc_msg *
700 execute_transaction(struct ovsdb_jsonrpc_session *s,
701                     struct jsonrpc_msg *request)
702 {
703     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
704     request->id = NULL;
705     request->params = NULL;
706     jsonrpc_msg_destroy(request);
707     return NULL;
708 }
709
710 static void
711 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
712                                   struct jsonrpc_msg *request)
713 {
714     struct jsonrpc_msg *reply;
715
716     if (!strcmp(request->method, "transact")) {
717         reply = ovsdb_jsonrpc_check_db_name(s, request);
718         if (!reply) {
719             reply = execute_transaction(s, request);
720         }
721     } else if (!strcmp(request->method, "monitor")) {
722         reply = ovsdb_jsonrpc_check_db_name(s, request);
723         if (!reply) {
724             reply = jsonrpc_create_reply(
725                 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
726         }
727     } else if (!strcmp(request->method, "monitor_cancel")) {
728         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
729                                              request->id);
730     } else if (!strcmp(request->method, "get_schema")) {
731         reply = ovsdb_jsonrpc_check_db_name(s, request);
732         if (!reply) {
733             reply = jsonrpc_create_reply(
734                 ovsdb_schema_to_json(s->remote->server->up.db->schema),
735                 request->id);
736         }
737     } else if (!strcmp(request->method, "list_dbs")) {
738         reply = jsonrpc_create_reply(
739             json_array_create_1(json_string_create(get_db_name(s))),
740             request->id);
741     } else if (!strcmp(request->method, "lock")) {
742         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_WAIT);
743     } else if (!strcmp(request->method, "steal")) {
744         reply = ovsdb_jsonrpc_session_lock(s, request, OVSDB_LOCK_STEAL);
745     } else if (!strcmp(request->method, "unlock")) {
746         reply = ovsdb_jsonrpc_session_unlock(s, request);
747     } else if (!strcmp(request->method, "echo")) {
748         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
749     } else {
750         reply = jsonrpc_create_error(json_string_create("unknown method"),
751                                      request->id);
752     }
753
754     if (reply) {
755         jsonrpc_msg_destroy(request);
756         jsonrpc_session_send(s->js, reply);
757     }
758 }
759
760 static void
761 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
762 {
763     if (json_array(request->params)->n == 1) {
764         struct ovsdb_jsonrpc_trigger *t;
765         struct json *id;
766
767         id = request->params->u.array.elems[0];
768         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
769         if (t) {
770             ovsdb_jsonrpc_trigger_complete(t);
771         }
772     }
773 }
774
775 static void
776 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
777                                  struct jsonrpc_msg *request)
778 {
779     if (!strcmp(request->method, "cancel")) {
780         execute_cancel(s, request);
781     }
782     jsonrpc_msg_destroy(request);
783 }
784 \f
785 /* JSON-RPC database server triggers.
786  *
787  * (Every transaction is treated as a trigger even if it doesn't actually have
788  * any "wait" operations.) */
789
790 struct ovsdb_jsonrpc_trigger {
791     struct ovsdb_trigger trigger;
792     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
793     struct json *id;
794 };
795
796 static void
797 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
798                              struct json *id, struct json *params)
799 {
800     struct ovsdb_jsonrpc_trigger *t;
801     size_t hash;
802
803     /* Check for duplicate ID. */
804     hash = json_hash(id, 0);
805     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
806     if (t) {
807         struct jsonrpc_msg *msg;
808
809         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
810                                    id);
811         jsonrpc_session_send(s->js, msg);
812         json_destroy(id);
813         json_destroy(params);
814         return;
815     }
816
817     /* Insert into trigger table. */
818     t = xmalloc(sizeof *t);
819     ovsdb_trigger_init(&s->up, &t->trigger, params, time_msec());
820     t->id = id;
821     hmap_insert(&s->triggers, &t->hmap_node, hash);
822
823     /* Complete early if possible. */
824     if (ovsdb_trigger_is_complete(&t->trigger)) {
825         ovsdb_jsonrpc_trigger_complete(t);
826     }
827 }
828
829 static struct ovsdb_jsonrpc_trigger *
830 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
831                            const struct json *id, size_t hash)
832 {
833     struct ovsdb_jsonrpc_trigger *t;
834
835     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
836         if (json_equal(t->id, id)) {
837             return t;
838         }
839     }
840
841     return NULL;
842 }
843
844 static void
845 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
846 {
847     struct ovsdb_jsonrpc_session *s;
848
849     s = CONTAINER_OF(t->trigger.session, struct ovsdb_jsonrpc_session, up);
850
851     if (jsonrpc_session_is_connected(s->js)) {
852         struct jsonrpc_msg *reply;
853         struct json *result;
854
855         result = ovsdb_trigger_steal_result(&t->trigger);
856         if (result) {
857             reply = jsonrpc_create_reply(result, t->id);
858         } else {
859             reply = jsonrpc_create_error(json_string_create("canceled"),
860                                          t->id);
861         }
862         jsonrpc_session_send(s->js, reply);
863     }
864
865     json_destroy(t->id);
866     ovsdb_trigger_destroy(&t->trigger);
867     hmap_remove(&s->triggers, &t->hmap_node);
868     free(t);
869 }
870
871 static void
872 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
873 {
874     struct ovsdb_jsonrpc_trigger *t, *next;
875     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
876         ovsdb_jsonrpc_trigger_complete(t);
877     }
878 }
879
880 static void
881 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
882 {
883     while (!list_is_empty(&s->up.completions)) {
884         struct ovsdb_jsonrpc_trigger *t
885             = CONTAINER_OF(s->up.completions.next,
886                            struct ovsdb_jsonrpc_trigger, trigger.node);
887         ovsdb_jsonrpc_trigger_complete(t);
888     }
889 }
890 \f
891 /* JSON-RPC database table monitors. */
892
893 enum ovsdb_jsonrpc_monitor_selection {
894     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
895     OJMS_INSERT = 1 << 1,       /* New rows. */
896     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
897     OJMS_MODIFY = 1 << 3        /* Modified rows. */
898 };
899
900 /* A particular column being monitored. */
901 struct ovsdb_jsonrpc_monitor_column {
902     const struct ovsdb_column *column;
903     enum ovsdb_jsonrpc_monitor_selection select;
904 };
905
906 /* A particular table being monitored. */
907 struct ovsdb_jsonrpc_monitor_table {
908     const struct ovsdb_table *table;
909
910     /* This is the union (bitwise-OR) of the 'select' values in all of the
911      * members of 'columns' below. */
912     enum ovsdb_jsonrpc_monitor_selection select;
913
914     /* Columns being monitored. */
915     struct ovsdb_jsonrpc_monitor_column *columns;
916     size_t n_columns;
917 };
918
919 /* A collection of tables being monitored. */
920 struct ovsdb_jsonrpc_monitor {
921     struct ovsdb_replica replica;
922     struct ovsdb_jsonrpc_session *session;
923     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
924
925     struct json *monitor_id;
926     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
927 };
928
929 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
930
931 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
932     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
933 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
934 static struct json *ovsdb_jsonrpc_monitor_get_initial(
935     const struct ovsdb_jsonrpc_monitor *);
936
937 static bool
938 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
939 {
940     const struct json *json;
941
942     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
943     return json ? json_boolean(json) : default_value;
944 }
945
946 struct ovsdb_jsonrpc_monitor *
947 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
948                            const struct json *monitor_id)
949 {
950     struct ovsdb_jsonrpc_monitor *m;
951
952     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
953         if (json_equal(m->monitor_id, monitor_id)) {
954             return m;
955         }
956     }
957
958     return NULL;
959 }
960
961 static void
962 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
963                                  const struct ovsdb_column *column,
964                                  enum ovsdb_jsonrpc_monitor_selection select,
965                                  size_t *allocated_columns)
966 {
967     struct ovsdb_jsonrpc_monitor_column *c;
968
969     if (mt->n_columns >= *allocated_columns) {
970         mt->columns = x2nrealloc(mt->columns, allocated_columns,
971                                  sizeof *mt->columns);
972     }
973
974     c = &mt->columns[mt->n_columns++];
975     c->column = column;
976     c->select = select;
977 }
978
979 static int
980 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
981 {
982     const struct ovsdb_jsonrpc_monitor_column *a = a_;
983     const struct ovsdb_jsonrpc_monitor_column *b = b_;
984
985     return a->column < b->column ? -1 : a->column > b->column;
986 }
987
988 static struct ovsdb_error * WARN_UNUSED_RESULT
989 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
990                                     const struct json *monitor_request,
991                                     size_t *allocated_columns)
992 {
993     const struct ovsdb_table_schema *ts = mt->table->schema;
994     enum ovsdb_jsonrpc_monitor_selection select;
995     const struct json *columns, *select_json;
996     struct ovsdb_parser parser;
997     struct ovsdb_error *error;
998
999     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
1000     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
1001     select_json = ovsdb_parser_member(&parser, "select",
1002                                       OP_OBJECT | OP_OPTIONAL);
1003     error = ovsdb_parser_finish(&parser);
1004     if (error) {
1005         return error;
1006     }
1007
1008     if (select_json) {
1009         select = 0;
1010         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
1011         if (parse_bool(&parser, "initial", true)) {
1012             select |= OJMS_INITIAL;
1013         }
1014         if (parse_bool(&parser, "insert", true)) {
1015             select |= OJMS_INSERT;
1016         }
1017         if (parse_bool(&parser, "delete", true)) {
1018             select |= OJMS_DELETE;
1019         }
1020         if (parse_bool(&parser, "modify", true)) {
1021             select |= OJMS_MODIFY;
1022         }
1023         error = ovsdb_parser_finish(&parser);
1024         if (error) {
1025             return error;
1026         }
1027     } else {
1028         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
1029     }
1030     mt->select |= select;
1031
1032     if (columns) {
1033         size_t i;
1034
1035         if (columns->type != JSON_ARRAY) {
1036             return ovsdb_syntax_error(columns, NULL,
1037                                       "array of column names expected");
1038         }
1039
1040         for (i = 0; i < columns->u.array.n; i++) {
1041             const struct ovsdb_column *column;
1042             const char *s;
1043
1044             if (columns->u.array.elems[i]->type != JSON_STRING) {
1045                 return ovsdb_syntax_error(columns, NULL,
1046                                           "array of column names expected");
1047             }
1048
1049             s = columns->u.array.elems[i]->u.string;
1050             column = shash_find_data(&mt->table->schema->columns, s);
1051             if (!column) {
1052                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
1053                                           "column name", s);
1054             }
1055             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1056                                              allocated_columns);
1057         }
1058     } else {
1059         struct shash_node *node;
1060
1061         SHASH_FOR_EACH (node, &ts->columns) {
1062             const struct ovsdb_column *column = node->data;
1063             if (column->index != OVSDB_COL_UUID) {
1064                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
1065                                                  allocated_columns);
1066             }
1067         }
1068     }
1069
1070     return NULL;
1071 }
1072
1073 static struct json *
1074 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
1075                              struct json *params)
1076 {
1077     struct ovsdb_jsonrpc_monitor *m = NULL;
1078     struct json *monitor_id, *monitor_requests;
1079     struct ovsdb_error *error = NULL;
1080     struct shash_node *node;
1081     struct json *json;
1082
1083     if (json_array(params)->n != 3) {
1084         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
1085         goto error;
1086     }
1087     monitor_id = params->u.array.elems[1];
1088     monitor_requests = params->u.array.elems[2];
1089     if (monitor_requests->type != JSON_OBJECT) {
1090         error = ovsdb_syntax_error(monitor_requests, NULL,
1091                                    "monitor-requests must be object");
1092         goto error;
1093     }
1094
1095     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
1096         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
1097         goto error;
1098     }
1099
1100     m = xzalloc(sizeof *m);
1101     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
1102     ovsdb_add_replica(s->remote->server->up.db, &m->replica);
1103     m->session = s;
1104     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
1105     m->monitor_id = json_clone(monitor_id);
1106     shash_init(&m->tables);
1107
1108     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
1109         const struct ovsdb_table *table;
1110         struct ovsdb_jsonrpc_monitor_table *mt;
1111         size_t allocated_columns;
1112         const struct json *mr_value;
1113         size_t i;
1114
1115         table = ovsdb_get_table(s->remote->server->up.db, node->name);
1116         if (!table) {
1117             error = ovsdb_syntax_error(NULL, NULL,
1118                                        "no table named %s", node->name);
1119             goto error;
1120         }
1121
1122         mt = xzalloc(sizeof *mt);
1123         mt->table = table;
1124         shash_add(&m->tables, table->schema->name, mt);
1125
1126         /* Parse columns. */
1127         mr_value = node->data;
1128         allocated_columns = 0;
1129         if (mr_value->type == JSON_ARRAY) {
1130             const struct json_array *array = &mr_value->u.array;
1131
1132             for (i = 0; i < array->n; i++) {
1133                 error = ovsdb_jsonrpc_parse_monitor_request(
1134                     mt, array->elems[i], &allocated_columns);
1135                 if (error) {
1136                     goto error;
1137                 }
1138             }
1139         } else {
1140             error = ovsdb_jsonrpc_parse_monitor_request(
1141                 mt, mr_value, &allocated_columns);
1142             if (error) {
1143                 goto error;
1144             }
1145         }
1146
1147         /* Check for duplicate columns. */
1148         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
1149               compare_ovsdb_jsonrpc_monitor_column);
1150         for (i = 1; i < mt->n_columns; i++) {
1151             if (mt->columns[i].column == mt->columns[i - 1].column) {
1152                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
1153                                            "mentioned more than once",
1154                                            mt->columns[i].column->name);
1155                 goto error;
1156             }
1157         }
1158     }
1159
1160     return ovsdb_jsonrpc_monitor_get_initial(m);
1161
1162 error:
1163     if (m) {
1164         ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1165     }
1166
1167     json = ovsdb_error_to_json(error);
1168     ovsdb_error_destroy(error);
1169     return json;
1170 }
1171
1172 static struct jsonrpc_msg *
1173 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
1174                              struct json_array *params,
1175                              const struct json *request_id)
1176 {
1177     if (params->n != 1) {
1178         return jsonrpc_create_error(json_string_create("invalid parameters"),
1179                                     request_id);
1180     } else {
1181         struct ovsdb_jsonrpc_monitor *m;
1182
1183         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
1184         if (!m) {
1185             return jsonrpc_create_error(json_string_create("unknown monitor"),
1186                                         request_id);
1187         } else {
1188             ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1189             return jsonrpc_create_reply(json_object_create(), request_id);
1190         }
1191     }
1192 }
1193
1194 static void
1195 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
1196 {
1197     struct ovsdb_jsonrpc_monitor *m, *next;
1198
1199     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
1200         ovsdb_remove_replica(s->remote->server->up.db, &m->replica);
1201     }
1202 }
1203
1204 static struct ovsdb_jsonrpc_monitor *
1205 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
1206 {
1207     assert(replica->class == &ovsdb_jsonrpc_replica_class);
1208     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
1209 }
1210
1211 struct ovsdb_jsonrpc_monitor_aux {
1212     bool initial;               /* Sending initial contents of table? */
1213     const struct ovsdb_jsonrpc_monitor *monitor;
1214     struct json *json;          /* JSON for the whole transaction. */
1215
1216     /* Current table.  */
1217     struct ovsdb_jsonrpc_monitor_table *mt;
1218     struct json *table_json;    /* JSON for table's transaction. */
1219 };
1220
1221 static bool
1222 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table *mt,
1223                       const unsigned long int *changed)
1224 {
1225     size_t i;
1226
1227     for (i = 0; i < mt->n_columns; i++) {
1228         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
1229         unsigned int idx = c->column->index;
1230
1231         if (c->select & OJMS_MODIFY && bitmap_is_set(changed, idx)) {
1232             return true;
1233         }
1234     }
1235
1236     return false;
1237 }
1238
1239 static bool
1240 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1241                                 const struct ovsdb_row *new,
1242                                 const unsigned long int *changed,
1243                                 void *aux_)
1244 {
1245     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1246     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1247     struct ovsdb_table *table = new ? new->table : old->table;
1248     enum ovsdb_jsonrpc_monitor_selection type;
1249     struct json *old_json, *new_json;
1250     struct json *row_json;
1251     char uuid[UUID_LEN + 1];
1252     size_t i;
1253
1254     if (!aux->mt || table != aux->mt->table) {
1255         aux->mt = shash_find_data(&m->tables, table->schema->name);
1256         aux->table_json = NULL;
1257         if (!aux->mt) {
1258             /* We don't care about rows in this table at all.  Tell the caller
1259              * to skip it.  */
1260             return false;
1261         }
1262     }
1263
1264     type = (aux->initial ? OJMS_INITIAL
1265             : !old ? OJMS_INSERT
1266             : !new ? OJMS_DELETE
1267             : OJMS_MODIFY);
1268     if (!(aux->mt->select & type)) {
1269         /* We don't care about this type of change (but do want to be called
1270          * back for changes to other rows in the same table). */
1271         return true;
1272     }
1273
1274     if (type == OJMS_MODIFY && !any_reportable_change(aux->mt, changed)) {
1275         /* Nothing of interest changed. */
1276         return true;
1277     }
1278
1279     old_json = new_json = NULL;
1280     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1281         old_json = json_object_create();
1282     }
1283     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1284         new_json = json_object_create();
1285     }
1286     for (i = 0; i < aux->mt->n_columns; i++) {
1287         const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
1288         const struct ovsdb_column *column = c->column;
1289         unsigned int idx = c->column->index;
1290
1291         if (!(type & c->select)) {
1292             /* We don't care about this type of change for this particular
1293              * column (but we will care about it for some other column). */
1294             continue;
1295         }
1296
1297         if ((type == OJMS_MODIFY && bitmap_is_set(changed, idx))
1298             || type == OJMS_DELETE) {
1299             json_object_put(old_json, column->name,
1300                             ovsdb_datum_to_json(&old->fields[idx],
1301                                                 &column->type));
1302         }
1303         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1304             json_object_put(new_json, column->name,
1305                             ovsdb_datum_to_json(&new->fields[idx],
1306                                                 &column->type));
1307         }
1308     }
1309
1310     /* Create JSON object for transaction overall. */
1311     if (!aux->json) {
1312         aux->json = json_object_create();
1313     }
1314
1315     /* Create JSON object for transaction on this table. */
1316     if (!aux->table_json) {
1317         aux->table_json = json_object_create();
1318         json_object_put(aux->json, aux->mt->table->schema->name,
1319                         aux->table_json);
1320     }
1321
1322     /* Create JSON object for transaction on this row. */
1323     row_json = json_object_create();
1324     if (old_json) {
1325         json_object_put(row_json, "old", old_json);
1326     }
1327     if (new_json) {
1328         json_object_put(row_json, "new", new_json);
1329     }
1330
1331     /* Add JSON row to JSON table. */
1332     snprintf(uuid, sizeof uuid,
1333              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
1334     json_object_put(aux->table_json, uuid, row_json);
1335
1336     return true;
1337 }
1338
1339 static void
1340 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1341                                const struct ovsdb_jsonrpc_monitor *m,
1342                                bool initial)
1343 {
1344     aux->initial = initial;
1345     aux->monitor = m;
1346     aux->json = NULL;
1347     aux->mt = NULL;
1348     aux->table_json = NULL;
1349 }
1350
1351 static struct ovsdb_error *
1352 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1353                              const struct ovsdb_txn *txn,
1354                              bool durable OVS_UNUSED)
1355 {
1356     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1357     struct ovsdb_jsonrpc_monitor_aux aux;
1358
1359     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
1360     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1361     if (aux.json) {
1362         struct jsonrpc_msg *msg;
1363         struct json *params;
1364
1365         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
1366                                      aux.json);
1367         msg = jsonrpc_create_notify("update", params);
1368         jsonrpc_session_send(aux.monitor->session->js, msg);
1369     }
1370
1371     return NULL;
1372 }
1373
1374 static struct json *
1375 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1376 {
1377     struct ovsdb_jsonrpc_monitor_aux aux;
1378     struct shash_node *node;
1379
1380     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
1381     SHASH_FOR_EACH (node, &m->tables) {
1382         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1383
1384         if (mt->select & OJMS_INITIAL) {
1385             struct ovsdb_row *row;
1386
1387             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1388                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1389             }
1390         }
1391     }
1392     return aux.json ? aux.json : json_object_create();
1393 }
1394
1395 static void
1396 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1397 {
1398     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1399     struct shash_node *node;
1400
1401     json_destroy(m->monitor_id);
1402     SHASH_FOR_EACH (node, &m->tables) {
1403         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1404         free(mt->columns);
1405         free(mt);
1406     }
1407     shash_destroy(&m->tables);
1408     hmap_remove(&m->session->monitors, &m->node);
1409     free(m);
1410 }
1411
1412 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1413     ovsdb_jsonrpc_monitor_commit,
1414     ovsdb_jsonrpc_monitor_destroy
1415 };