ovsdb: Support replicating a table without including any columns.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009, 2010 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <assert.h>
21 #include <errno.h>
22
23 #include "bitmap.h"
24 #include "column.h"
25 #include "json.h"
26 #include "jsonrpc.h"
27 #include "ovsdb-error.h"
28 #include "ovsdb-parser.h"
29 #include "ovsdb.h"
30 #include "reconnect.h"
31 #include "row.h"
32 #include "stream.h"
33 #include "table.h"
34 #include "timeval.h"
35 #include "transaction.h"
36 #include "trigger.h"
37 #include "vlog.h"
38
39 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
40
41 struct ovsdb_jsonrpc_remote;
42 struct ovsdb_jsonrpc_session;
43
44 /* Message rate-limiting. */
45 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
46
47 /* Sessions. */
48 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
49     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
50 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
51 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
52 static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
53 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
54 static void ovsdb_jsonrpc_session_set_all_options(
55     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
56
57 /* Triggers. */
58 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
59                                          struct json *id, struct json *params);
60 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
61     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
62 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
63 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
64 static void ovsdb_jsonrpc_trigger_complete_done(
65     struct ovsdb_jsonrpc_session *);
66
67 /* Monitors. */
68 static struct json *ovsdb_jsonrpc_monitor_create(
69     struct ovsdb_jsonrpc_session *, struct json *params);
70 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
71     struct ovsdb_jsonrpc_session *,
72     struct json_array *params,
73     const struct json *request_id);
74 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
75 \f
76 /* JSON-RPC database server. */
77
78 struct ovsdb_jsonrpc_server {
79     struct ovsdb *db;
80     unsigned int n_sessions, max_sessions;
81     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
82 };
83
84 /* A configured remote.  This is either a passive stream listener plus a list
85  * of the currently connected sessions, or a list of exactly one active
86  * session. */
87 struct ovsdb_jsonrpc_remote {
88     struct ovsdb_jsonrpc_server *server;
89     struct pstream *listener;   /* Listener, if passive. */
90     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
91 };
92
93 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
94     struct ovsdb_jsonrpc_server *, const char *name);
95 static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
96
97 struct ovsdb_jsonrpc_server *
98 ovsdb_jsonrpc_server_create(struct ovsdb *db)
99 {
100     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
101     server->db = db;
102     server->max_sessions = 64;
103     shash_init(&server->remotes);
104     return server;
105 }
106
107 void
108 ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr)
109 {
110     struct shash_node *node, *next;
111
112     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
113         ovsdb_jsonrpc_server_del_remote(node);
114     }
115     shash_destroy(&svr->remotes);
116     free(svr);
117 }
118
119 struct ovsdb_jsonrpc_options *
120 ovsdb_jsonrpc_default_options(void)
121 {
122     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
123     options->probe_interval = RECONNECT_DEFAULT_PROBE_INTERVAL;
124     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
125     return options;
126 }
127
128 /* Sets 'svr''s current set of remotes to the names in 'new_remotes', with
129  * options in the struct ovsdb_jsonrpc_options supplied as the data values.
130  *
131  * A remote is an active or passive stream connection method, e.g. "pssl:" or
132  * "tcp:1.2.3.4". */
133 void
134 ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
135                                  const struct shash *new_remotes)
136 {
137     struct shash_node *node, *next;
138
139     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
140         if (!shash_find(new_remotes, node->name)) {
141             ovsdb_jsonrpc_server_del_remote(node);
142         }
143     }
144     SHASH_FOR_EACH (node, new_remotes) {
145         const struct ovsdb_jsonrpc_options *options = node->data;
146         struct ovsdb_jsonrpc_remote *remote;
147
148         remote = shash_find_data(&svr->remotes, node->name);
149         if (!remote) {
150             remote = ovsdb_jsonrpc_server_add_remote(svr, node->name);
151             if (!remote) {
152                 continue;
153             }
154         }
155
156         ovsdb_jsonrpc_session_set_all_options(remote, options);
157     }
158 }
159
160 static struct ovsdb_jsonrpc_remote *
161 ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
162                                 const char *name)
163 {
164     struct ovsdb_jsonrpc_remote *remote;
165     struct pstream *listener;
166     int error;
167
168     error = jsonrpc_pstream_open(name, &listener);
169     if (error && error != EAFNOSUPPORT) {
170         VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
171         return NULL;
172     }
173
174     remote = xmalloc(sizeof *remote);
175     remote->server = svr;
176     remote->listener = listener;
177     list_init(&remote->sessions);
178     shash_add(&svr->remotes, name, remote);
179
180     if (!listener) {
181         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
182     }
183     return remote;
184 }
185
186 static void
187 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
188 {
189     struct ovsdb_jsonrpc_remote *remote = node->data;
190
191     ovsdb_jsonrpc_session_close_all(remote);
192     pstream_close(remote->listener);
193     shash_delete(&remote->server->remotes, node);
194     free(remote);
195 }
196
197 /* Forces all of the JSON-RPC sessions managed by 'svr' to disconnect and
198  * reconnect. */
199 void
200 ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
201 {
202     struct shash_node *node;
203
204     SHASH_FOR_EACH (node, &svr->remotes) {
205         struct ovsdb_jsonrpc_remote *remote = node->data;
206
207         ovsdb_jsonrpc_session_reconnect_all(remote);
208     }
209 }
210
211 void
212 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
213 {
214     struct shash_node *node;
215
216     SHASH_FOR_EACH (node, &svr->remotes) {
217         struct ovsdb_jsonrpc_remote *remote = node->data;
218
219         if (remote->listener && svr->n_sessions < svr->max_sessions) {
220             struct stream *stream;
221             int error;
222
223             error = pstream_accept(remote->listener, &stream);
224             if (!error) {
225                 struct jsonrpc_session *js;
226                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream));
227                 ovsdb_jsonrpc_session_create(remote, js);
228             } else if (error != EAGAIN) {
229                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
230                              pstream_get_name(remote->listener),
231                              strerror(error));
232             }
233         }
234
235         ovsdb_jsonrpc_session_run_all(remote);
236     }
237 }
238
239 void
240 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
241 {
242     struct shash_node *node;
243
244     SHASH_FOR_EACH (node, &svr->remotes) {
245         struct ovsdb_jsonrpc_remote *remote = node->data;
246
247         if (remote->listener && svr->n_sessions < svr->max_sessions) {
248             pstream_wait(remote->listener);
249         }
250
251         ovsdb_jsonrpc_session_wait_all(remote);
252     }
253 }
254 \f
255 /* JSON-RPC database server session. */
256
257 struct ovsdb_jsonrpc_session {
258     struct ovsdb_jsonrpc_remote *remote;
259     struct list node;           /* Element in remote's sessions list. */
260
261     /* Triggers. */
262     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
263     struct list completions;    /* Completed triggers. */
264
265     /* Monitors. */
266     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
267
268     /* Network connectivity. */
269     struct jsonrpc_session *js;  /* JSON-RPC session. */
270     unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
271 };
272
273 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
274 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
275 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
276 static void ovsdb_jsonrpc_session_set_options(
277     struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
278 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
279                                              struct jsonrpc_msg *);
280 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
281                                              struct jsonrpc_msg *);
282
283 static struct ovsdb_jsonrpc_session *
284 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
285                              struct jsonrpc_session *js)
286 {
287     struct ovsdb_jsonrpc_session *s;
288
289     s = xzalloc(sizeof *s);
290     s->remote = remote;
291     list_push_back(&remote->sessions, &s->node);
292     hmap_init(&s->triggers);
293     hmap_init(&s->monitors);
294     list_init(&s->completions);
295     s->js = js;
296     s->js_seqno = jsonrpc_session_get_seqno(js);
297
298     remote->server->n_sessions++;
299
300     return s;
301 }
302
303 static void
304 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
305 {
306     ovsdb_jsonrpc_monitor_remove_all(s);
307     jsonrpc_session_close(s->js);
308     list_remove(&s->node);
309     s->remote->server->n_sessions--;
310     free(s);
311 }
312
313 static int
314 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
315 {
316     jsonrpc_session_run(s->js);
317     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
318         s->js_seqno = jsonrpc_session_get_seqno(s->js);
319         ovsdb_jsonrpc_trigger_complete_all(s);
320         ovsdb_jsonrpc_monitor_remove_all(s);
321     }
322
323     ovsdb_jsonrpc_trigger_complete_done(s);
324
325     if (!jsonrpc_session_get_backlog(s->js)) {
326         struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
327         if (msg) {
328             if (msg->type == JSONRPC_REQUEST) {
329                 ovsdb_jsonrpc_session_got_request(s, msg);
330             } else if (msg->type == JSONRPC_NOTIFY) {
331                 ovsdb_jsonrpc_session_got_notify(s, msg);
332             } else {
333                 VLOG_WARN("%s: received unexpected %s message",
334                           jsonrpc_session_get_name(s->js),
335                           jsonrpc_msg_type_to_string(msg->type));
336                 jsonrpc_session_force_reconnect(s->js);
337                 jsonrpc_msg_destroy(msg);
338             }
339         }
340     }
341     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
342 }
343
344 static void
345 ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
346                                   const struct ovsdb_jsonrpc_options *options)
347 {
348     jsonrpc_session_set_max_backoff(session->js, options->max_backoff);
349     jsonrpc_session_set_probe_interval(session->js, options->probe_interval);
350 }
351
352 static void
353 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
354 {
355     struct ovsdb_jsonrpc_session *s, *next;
356
357     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
358         int error = ovsdb_jsonrpc_session_run(s);
359         if (error) {
360             ovsdb_jsonrpc_session_close(s);
361         }
362     }
363 }
364
365 static void
366 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
367 {
368     jsonrpc_session_wait(s->js);
369     if (!jsonrpc_session_get_backlog(s->js)) {
370         jsonrpc_session_recv_wait(s->js);
371     }
372 }
373
374 static void
375 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
376 {
377     struct ovsdb_jsonrpc_session *s;
378
379     LIST_FOR_EACH (s, node, &remote->sessions) {
380         ovsdb_jsonrpc_session_wait(s);
381     }
382 }
383
384 static void
385 ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
386 {
387     struct ovsdb_jsonrpc_session *s, *next;
388
389     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
390         ovsdb_jsonrpc_session_close(s);
391     }
392 }
393
394 /* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
395  * reconnect. */
396 static void
397 ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
398 {
399     struct ovsdb_jsonrpc_session *s, *next;
400
401     LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
402         jsonrpc_session_force_reconnect(s->js);
403         if (!jsonrpc_session_is_alive(s->js)) {
404             ovsdb_jsonrpc_session_close(s);
405         }
406     }
407 }
408
409 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
410  * 'options'. */
411 static void
412 ovsdb_jsonrpc_session_set_all_options(
413     struct ovsdb_jsonrpc_remote *remote,
414     const struct ovsdb_jsonrpc_options *options)
415 {
416     struct ovsdb_jsonrpc_session *s;
417
418     LIST_FOR_EACH (s, node, &remote->sessions) {
419         ovsdb_jsonrpc_session_set_options(s, options);
420     }
421 }
422
423 static const char *
424 get_db_name(const struct ovsdb_jsonrpc_session *s)
425 {
426     return s->remote->server->db->schema->name;
427 }
428
429 static struct jsonrpc_msg *
430 ovsdb_jsonrpc_check_db_name(const struct ovsdb_jsonrpc_session *s,
431                             const struct jsonrpc_msg *request)
432 {
433     struct json_array *params;
434     const char *want_db_name;
435     const char *have_db_name;
436     struct ovsdb_error *error;
437     struct jsonrpc_msg *reply;
438
439     params = json_array(request->params);
440     if (!params->n || params->elems[0]->type != JSON_STRING) {
441         error = ovsdb_syntax_error(
442             request->params, NULL,
443             "%s request params must begin with <db-name>", request->method);
444         goto error;
445     }
446
447     want_db_name = params->elems[0]->u.string;
448     have_db_name = get_db_name(s);
449     if (strcmp(want_db_name, have_db_name)) {
450         error = ovsdb_syntax_error(
451             request->params, "unknown database",
452             "%s request specifies unknown database %s",
453             request->method, want_db_name);
454         goto error;
455     }
456
457     return NULL;
458
459 error:
460     reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
461     ovsdb_error_destroy(error);
462     return reply;
463 }
464
465 static struct jsonrpc_msg *
466 execute_transaction(struct ovsdb_jsonrpc_session *s,
467                     struct jsonrpc_msg *request)
468 {
469     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
470     request->id = NULL;
471     request->params = NULL;
472     jsonrpc_msg_destroy(request);
473     return NULL;
474 }
475
476 static void
477 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
478                                   struct jsonrpc_msg *request)
479 {
480     struct jsonrpc_msg *reply;
481
482     if (!strcmp(request->method, "transact")) {
483         reply = ovsdb_jsonrpc_check_db_name(s, request);
484         if (!reply) {
485             reply = execute_transaction(s, request);
486         }
487     } else if (!strcmp(request->method, "monitor")) {
488         reply = ovsdb_jsonrpc_check_db_name(s, request);
489         if (!reply) {
490             reply = jsonrpc_create_reply(
491                 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
492         }
493     } else if (!strcmp(request->method, "monitor_cancel")) {
494         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
495                                              request->id);
496     } else if (!strcmp(request->method, "get_schema")) {
497         reply = ovsdb_jsonrpc_check_db_name(s, request);
498         if (!reply) {
499             reply = jsonrpc_create_reply(
500                 ovsdb_schema_to_json(s->remote->server->db->schema),
501                 request->id);
502         }
503     } else if (!strcmp(request->method, "list_dbs")) {
504         reply = jsonrpc_create_reply(
505             json_array_create_1(json_string_create(get_db_name(s))),
506             request->id);
507     } else if (!strcmp(request->method, "echo")) {
508         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
509     } else {
510         reply = jsonrpc_create_error(json_string_create("unknown method"),
511                                      request->id);
512     }
513
514     if (reply) {
515         jsonrpc_msg_destroy(request);
516         jsonrpc_session_send(s->js, reply);
517     }
518 }
519
520 static void
521 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
522 {
523     if (json_array(request->params)->n == 1) {
524         struct ovsdb_jsonrpc_trigger *t;
525         struct json *id;
526
527         id = request->params->u.array.elems[0];
528         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
529         if (t) {
530             ovsdb_jsonrpc_trigger_complete(t);
531         }
532     }
533 }
534
535 static void
536 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
537                                  struct jsonrpc_msg *request)
538 {
539     if (!strcmp(request->method, "cancel")) {
540         execute_cancel(s, request);
541     }
542     jsonrpc_msg_destroy(request);
543 }
544 \f
545 /* JSON-RPC database server triggers.
546  *
547  * (Every transaction is treated as a trigger even if it doesn't actually have
548  * any "wait" operations.) */
549
550 struct ovsdb_jsonrpc_trigger {
551     struct ovsdb_trigger trigger;
552     struct ovsdb_jsonrpc_session *session;
553     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
554     struct json *id;
555 };
556
557 static void
558 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
559                              struct json *id, struct json *params)
560 {
561     struct ovsdb_jsonrpc_trigger *t;
562     size_t hash;
563
564     /* Check for duplicate ID. */
565     hash = json_hash(id, 0);
566     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
567     if (t) {
568         struct jsonrpc_msg *msg;
569
570         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
571                                    id);
572         jsonrpc_session_send(s->js, msg);
573         json_destroy(id);
574         json_destroy(params);
575         return;
576     }
577
578     /* Insert into trigger table. */
579     t = xmalloc(sizeof *t);
580     ovsdb_trigger_init(s->remote->server->db,
581                        &t->trigger, params, &s->completions,
582                        time_msec());
583     t->session = s;
584     t->id = id;
585     hmap_insert(&s->triggers, &t->hmap_node, hash);
586
587     /* Complete early if possible. */
588     if (ovsdb_trigger_is_complete(&t->trigger)) {
589         ovsdb_jsonrpc_trigger_complete(t);
590     }
591 }
592
593 static struct ovsdb_jsonrpc_trigger *
594 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
595                            const struct json *id, size_t hash)
596 {
597     struct ovsdb_jsonrpc_trigger *t;
598
599     HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &s->triggers) {
600         if (json_equal(t->id, id)) {
601             return t;
602         }
603     }
604
605     return NULL;
606 }
607
608 static void
609 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
610 {
611     struct ovsdb_jsonrpc_session *s = t->session;
612
613     if (jsonrpc_session_is_connected(s->js)) {
614         struct jsonrpc_msg *reply;
615         struct json *result;
616
617         result = ovsdb_trigger_steal_result(&t->trigger);
618         if (result) {
619             reply = jsonrpc_create_reply(result, t->id);
620         } else {
621             reply = jsonrpc_create_error(json_string_create("canceled"),
622                                          t->id);
623         }
624         jsonrpc_session_send(s->js, reply);
625     }
626
627     json_destroy(t->id);
628     ovsdb_trigger_destroy(&t->trigger);
629     hmap_remove(&s->triggers, &t->hmap_node);
630     free(t);
631 }
632
633 static void
634 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
635 {
636     struct ovsdb_jsonrpc_trigger *t, *next;
637     HMAP_FOR_EACH_SAFE (t, next, hmap_node, &s->triggers) {
638         ovsdb_jsonrpc_trigger_complete(t);
639     }
640 }
641
642 static void
643 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
644 {
645     while (!list_is_empty(&s->completions)) {
646         struct ovsdb_jsonrpc_trigger *t
647             = CONTAINER_OF(s->completions.next,
648                            struct ovsdb_jsonrpc_trigger, trigger.node);
649         ovsdb_jsonrpc_trigger_complete(t);
650     }
651 }
652 \f
653 /* JSON-RPC database table monitors. */
654
655 enum ovsdb_jsonrpc_monitor_selection {
656     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
657     OJMS_INSERT = 1 << 1,       /* New rows. */
658     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
659     OJMS_MODIFY = 1 << 3        /* Modified rows. */
660 };
661
662 /* A particular column being monitored. */
663 struct ovsdb_jsonrpc_monitor_column {
664     const struct ovsdb_column *column;
665     enum ovsdb_jsonrpc_monitor_selection select;
666 };
667
668 /* A particular table being monitored. */
669 struct ovsdb_jsonrpc_monitor_table {
670     const struct ovsdb_table *table;
671
672     /* This is the union (bitwise-OR) of the 'select' values in all of the
673      * members of 'columns' below. */
674     enum ovsdb_jsonrpc_monitor_selection select;
675
676     /* Columns being monitored. */
677     struct ovsdb_jsonrpc_monitor_column *columns;
678     size_t n_columns;
679 };
680
681 /* A collection of tables being monitored. */
682 struct ovsdb_jsonrpc_monitor {
683     struct ovsdb_replica replica;
684     struct ovsdb_jsonrpc_session *session;
685     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
686
687     struct json *monitor_id;
688     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
689 };
690
691 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
692
693 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
694     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
695 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
696 static struct json *ovsdb_jsonrpc_monitor_get_initial(
697     const struct ovsdb_jsonrpc_monitor *);
698
699 static bool
700 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
701 {
702     const struct json *json;
703
704     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
705     return json ? json_boolean(json) : default_value;
706 }
707
708 struct ovsdb_jsonrpc_monitor *
709 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
710                            const struct json *monitor_id)
711 {
712     struct ovsdb_jsonrpc_monitor *m;
713
714     HMAP_FOR_EACH_WITH_HASH (m, node, json_hash(monitor_id, 0), &s->monitors) {
715         if (json_equal(m->monitor_id, monitor_id)) {
716             return m;
717         }
718     }
719
720     return NULL;
721 }
722
723 static void
724 ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
725                                  const struct ovsdb_column *column,
726                                  enum ovsdb_jsonrpc_monitor_selection select,
727                                  size_t *allocated_columns)
728 {
729     struct ovsdb_jsonrpc_monitor_column *c;
730
731     if (mt->n_columns >= *allocated_columns) {
732         mt->columns = x2nrealloc(mt->columns, allocated_columns,
733                                  sizeof *mt->columns);
734     }
735
736     c = &mt->columns[mt->n_columns++];
737     c->column = column;
738     c->select = select;
739 }
740
741 static int
742 compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
743 {
744     const struct ovsdb_jsonrpc_monitor_column *a = a_;
745     const struct ovsdb_jsonrpc_monitor_column *b = b_;
746
747     return a->column < b->column ? -1 : a->column > b->column;
748 }
749
750 static struct ovsdb_error * WARN_UNUSED_RESULT
751 ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
752                                     const struct json *monitor_request,
753                                     size_t *allocated_columns)
754 {
755     const struct ovsdb_table_schema *ts = mt->table->schema;
756     enum ovsdb_jsonrpc_monitor_selection select;
757     const struct json *columns, *select_json;
758     struct ovsdb_parser parser;
759     struct ovsdb_error *error;
760
761     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
762     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
763     select_json = ovsdb_parser_member(&parser, "select",
764                                       OP_OBJECT | OP_OPTIONAL);
765     error = ovsdb_parser_finish(&parser);
766     if (error) {
767         return error;
768     }
769
770     if (select_json) {
771         select = 0;
772         ovsdb_parser_init(&parser, select_json, "table %s select", ts->name);
773         if (parse_bool(&parser, "initial", true)) {
774             select |= OJMS_INITIAL;
775         }
776         if (parse_bool(&parser, "insert", true)) {
777             select |= OJMS_INSERT;
778         }
779         if (parse_bool(&parser, "delete", true)) {
780             select |= OJMS_DELETE;
781         }
782         if (parse_bool(&parser, "modify", true)) {
783             select |= OJMS_MODIFY;
784         }
785         error = ovsdb_parser_finish(&parser);
786         if (error) {
787             return error;
788         }
789     } else {
790         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
791     }
792     mt->select |= select;
793
794     if (columns) {
795         size_t i;
796
797         if (columns->type != JSON_ARRAY) {
798             return ovsdb_syntax_error(columns, NULL,
799                                       "array of column names expected");
800         }
801
802         for (i = 0; i < columns->u.array.n; i++) {
803             const struct ovsdb_column *column;
804             const char *s;
805
806             if (columns->u.array.elems[i]->type != JSON_STRING) {
807                 return ovsdb_syntax_error(columns, NULL,
808                                           "array of column names expected");
809             }
810
811             s = columns->u.array.elems[i]->u.string;
812             column = shash_find_data(&mt->table->schema->columns, s);
813             if (!column) {
814                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
815                                           "column name", s);
816             }
817             ovsdb_jsonrpc_add_monitor_column(mt, column, select,
818                                              allocated_columns);
819         }
820     } else {
821         struct shash_node *node;
822
823         SHASH_FOR_EACH (node, &ts->columns) {
824             const struct ovsdb_column *column = node->data;
825             if (column->index != OVSDB_COL_UUID) {
826                 ovsdb_jsonrpc_add_monitor_column(mt, column, select,
827                                                  allocated_columns);
828             }
829         }
830     }
831
832     return NULL;
833 }
834
835 static struct json *
836 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
837                              struct json *params)
838 {
839     struct ovsdb_jsonrpc_monitor *m = NULL;
840     struct json *monitor_id, *monitor_requests;
841     struct ovsdb_error *error = NULL;
842     struct shash_node *node;
843     struct json *json;
844
845     if (json_array(params)->n != 3) {
846         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
847         goto error;
848     }
849     monitor_id = params->u.array.elems[1];
850     monitor_requests = params->u.array.elems[2];
851     if (monitor_requests->type != JSON_OBJECT) {
852         error = ovsdb_syntax_error(monitor_requests, NULL,
853                                    "monitor-requests must be object");
854         goto error;
855     }
856
857     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
858         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
859         goto error;
860     }
861
862     m = xzalloc(sizeof *m);
863     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
864     ovsdb_add_replica(s->remote->server->db, &m->replica);
865     m->session = s;
866     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
867     m->monitor_id = json_clone(monitor_id);
868     shash_init(&m->tables);
869
870     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
871         const struct ovsdb_table *table;
872         struct ovsdb_jsonrpc_monitor_table *mt;
873         size_t allocated_columns;
874         const struct json *mr_value;
875         size_t i;
876
877         table = ovsdb_get_table(s->remote->server->db, node->name);
878         if (!table) {
879             error = ovsdb_syntax_error(NULL, NULL,
880                                        "no table named %s", node->name);
881             goto error;
882         }
883
884         mt = xzalloc(sizeof *mt);
885         mt->table = table;
886         shash_add(&m->tables, table->schema->name, mt);
887
888         /* Parse columns. */
889         mr_value = node->data;
890         allocated_columns = 0;
891         if (mr_value->type == JSON_ARRAY) {
892             const struct json_array *array = &mr_value->u.array;
893
894             for (i = 0; i < array->n; i++) {
895                 error = ovsdb_jsonrpc_parse_monitor_request(
896                     mt, array->elems[i], &allocated_columns);
897                 if (error) {
898                     goto error;
899                 }
900             }
901         } else {
902             error = ovsdb_jsonrpc_parse_monitor_request(
903                 mt, mr_value, &allocated_columns);
904             if (error) {
905                 goto error;
906             }
907         }
908
909         /* Check for duplicate columns. */
910         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
911               compare_ovsdb_jsonrpc_monitor_column);
912         for (i = 1; i < mt->n_columns; i++) {
913             if (mt->columns[i].column == mt->columns[i - 1].column) {
914                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
915                                            "mentioned more than once",
916                                            mt->columns[i].column->name);
917                 goto error;
918             }
919         }
920     }
921
922     return ovsdb_jsonrpc_monitor_get_initial(m);
923
924 error:
925     if (m) {
926         ovsdb_remove_replica(s->remote->server->db, &m->replica);
927     }
928
929     json = ovsdb_error_to_json(error);
930     ovsdb_error_destroy(error);
931     return json;
932 }
933
934 static struct jsonrpc_msg *
935 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
936                              struct json_array *params,
937                              const struct json *request_id)
938 {
939     if (params->n != 1) {
940         return jsonrpc_create_error(json_string_create("invalid parameters"),
941                                     request_id);
942     } else {
943         struct ovsdb_jsonrpc_monitor *m;
944
945         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
946         if (!m) {
947             return jsonrpc_create_error(json_string_create("unknown monitor"),
948                                         request_id);
949         } else {
950             ovsdb_remove_replica(s->remote->server->db, &m->replica);
951             return jsonrpc_create_reply(json_object_create(), request_id);
952         }
953     }
954 }
955
956 static void
957 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
958 {
959     struct ovsdb_jsonrpc_monitor *m, *next;
960
961     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
962         ovsdb_remove_replica(s->remote->server->db, &m->replica);
963     }
964 }
965
966 static struct ovsdb_jsonrpc_monitor *
967 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
968 {
969     assert(replica->class == &ovsdb_jsonrpc_replica_class);
970     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
971 }
972
973 struct ovsdb_jsonrpc_monitor_aux {
974     bool initial;               /* Sending initial contents of table? */
975     const struct ovsdb_jsonrpc_monitor *monitor;
976     struct json *json;          /* JSON for the whole transaction. */
977
978     /* Current table.  */
979     struct ovsdb_jsonrpc_monitor_table *mt;
980     struct json *table_json;    /* JSON for table's transaction. */
981 };
982
983 static bool
984 any_reportable_change(const struct ovsdb_jsonrpc_monitor_table *mt,
985                       const unsigned long int *changed)
986 {
987     size_t i;
988
989     for (i = 0; i < mt->n_columns; i++) {
990         const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
991         unsigned int idx = c->column->index;
992
993         if (c->select & OJMS_MODIFY && bitmap_is_set(changed, idx)) {
994             return true;
995         }
996     }
997
998     return false;
999 }
1000
1001 static bool
1002 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
1003                                 const struct ovsdb_row *new,
1004                                 const unsigned long int *changed,
1005                                 void *aux_)
1006 {
1007     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
1008     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
1009     struct ovsdb_table *table = new ? new->table : old->table;
1010     enum ovsdb_jsonrpc_monitor_selection type;
1011     struct json *old_json, *new_json;
1012     struct json *row_json;
1013     char uuid[UUID_LEN + 1];
1014     size_t i;
1015
1016     if (!aux->mt || table != aux->mt->table) {
1017         aux->mt = shash_find_data(&m->tables, table->schema->name);
1018         aux->table_json = NULL;
1019         if (!aux->mt) {
1020             /* We don't care about rows in this table at all.  Tell the caller
1021              * to skip it.  */
1022             return false;
1023         }
1024     }
1025
1026     type = (aux->initial ? OJMS_INITIAL
1027             : !old ? OJMS_INSERT
1028             : !new ? OJMS_DELETE
1029             : OJMS_MODIFY);
1030     if (!(aux->mt->select & type)) {
1031         /* We don't care about this type of change (but do want to be called
1032          * back for changes to other rows in the same table). */
1033         return true;
1034     }
1035
1036     if (type == OJMS_MODIFY && !any_reportable_change(aux->mt, changed)) {
1037         /* Nothing of interest changed. */
1038         return true;
1039     }
1040
1041     old_json = new_json = NULL;
1042     if (type & (OJMS_DELETE | OJMS_MODIFY)) {
1043         old_json = json_object_create();
1044     }
1045     if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1046         new_json = json_object_create();
1047     }
1048     for (i = 0; i < aux->mt->n_columns; i++) {
1049         const struct ovsdb_jsonrpc_monitor_column *c = &aux->mt->columns[i];
1050         const struct ovsdb_column *column = c->column;
1051         unsigned int idx = c->column->index;
1052
1053         if (!(type & c->select)) {
1054             /* We don't care about this type of change for this particular
1055              * column (but we will care about it for some other column). */
1056             continue;
1057         }
1058
1059         if ((type == OJMS_MODIFY && bitmap_is_set(changed, idx))
1060             || type == OJMS_DELETE) {
1061             json_object_put(old_json, column->name,
1062                             ovsdb_datum_to_json(&old->fields[idx],
1063                                                 &column->type));
1064         }
1065         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
1066             json_object_put(new_json, column->name,
1067                             ovsdb_datum_to_json(&new->fields[idx],
1068                                                 &column->type));
1069         }
1070     }
1071
1072     /* Create JSON object for transaction overall. */
1073     if (!aux->json) {
1074         aux->json = json_object_create();
1075     }
1076
1077     /* Create JSON object for transaction on this table. */
1078     if (!aux->table_json) {
1079         aux->table_json = json_object_create();
1080         json_object_put(aux->json, aux->mt->table->schema->name,
1081                         aux->table_json);
1082     }
1083
1084     /* Create JSON object for transaction on this row. */
1085     row_json = json_object_create();
1086     if (old_json) {
1087         json_object_put(row_json, "old", old_json);
1088     }
1089     if (new_json) {
1090         json_object_put(row_json, "new", new_json);
1091     }
1092
1093     /* Add JSON row to JSON table. */
1094     snprintf(uuid, sizeof uuid,
1095              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
1096     json_object_put(aux->table_json, uuid, row_json);
1097
1098     return true;
1099 }
1100
1101 static void
1102 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
1103                                const struct ovsdb_jsonrpc_monitor *m,
1104                                bool initial)
1105 {
1106     aux->initial = initial;
1107     aux->monitor = m;
1108     aux->json = NULL;
1109     aux->mt = NULL;
1110     aux->table_json = NULL;
1111 }
1112
1113 static struct ovsdb_error *
1114 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
1115                              const struct ovsdb_txn *txn,
1116                              bool durable OVS_UNUSED)
1117 {
1118     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1119     struct ovsdb_jsonrpc_monitor_aux aux;
1120
1121     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
1122     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
1123     if (aux.json) {
1124         struct jsonrpc_msg *msg;
1125         struct json *params;
1126
1127         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
1128                                      aux.json);
1129         msg = jsonrpc_create_notify("update", params);
1130         jsonrpc_session_send(aux.monitor->session->js, msg);
1131     }
1132
1133     return NULL;
1134 }
1135
1136 static struct json *
1137 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
1138 {
1139     struct ovsdb_jsonrpc_monitor_aux aux;
1140     struct shash_node *node;
1141
1142     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
1143     SHASH_FOR_EACH (node, &m->tables) {
1144         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1145
1146         if (mt->select & OJMS_INITIAL) {
1147             struct ovsdb_row *row;
1148
1149             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
1150                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
1151             }
1152         }
1153     }
1154     return aux.json ? aux.json : json_object_create();
1155 }
1156
1157 static void
1158 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
1159 {
1160     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
1161     struct shash_node *node;
1162
1163     json_destroy(m->monitor_id);
1164     SHASH_FOR_EACH (node, &m->tables) {
1165         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
1166         free(mt->columns);
1167         free(mt);
1168     }
1169     shash_destroy(&m->tables);
1170     hmap_remove(&m->session->monitors, &m->node);
1171     free(m);
1172 }
1173
1174 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
1175     ovsdb_jsonrpc_monitor_commit,
1176     ovsdb_jsonrpc_monitor_destroy
1177 };