ovsdb-server: Remove write-only struct member.
[sliver-openvswitch.git] / ovsdb / jsonrpc-server.c
1 /* Copyright (c) 2009 Nicira Networks
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17
18 #include "jsonrpc-server.h"
19
20 #include <errno.h>
21
22 #include "column.h"
23 #include "json.h"
24 #include "jsonrpc.h"
25 #include "ovsdb-error.h"
26 #include "ovsdb-parser.h"
27 #include "ovsdb.h"
28 #include "reconnect.h"
29 #include "row.h"
30 #include "stream.h"
31 #include "table.h"
32 #include "timeval.h"
33 #include "transaction.h"
34 #include "trigger.h"
35
36 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
37 #include "vlog.h"
38
39 struct ovsdb_jsonrpc_session;
40
41 /* Sessions. */
42 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
43                                                 const char *name);
44 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
45                                                  struct stream *);
46 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
47 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
48
49 /* Triggers. */
50 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
51                                          struct json *id, struct json *params);
52 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
53     struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
54 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
55 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
56 static void ovsdb_jsonrpc_trigger_complete_done(
57     struct ovsdb_jsonrpc_session *);
58
59 /* Monitors. */
60 static struct json *ovsdb_jsonrpc_monitor_create(
61     struct ovsdb_jsonrpc_session *, struct json *params);
62 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
63     struct ovsdb_jsonrpc_session *,
64     struct json_array *params,
65     const struct json *request_id);
66 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
67 \f
68 /* JSON-RPC database server. */
69
70 struct ovsdb_jsonrpc_server {
71     struct ovsdb *db;
72
73     struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
74     unsigned int n_sessions, max_sessions;
75
76     struct pstream **listeners;
77     size_t n_listeners, allocated_listeners;
78 };
79
80 struct ovsdb_jsonrpc_server *
81 ovsdb_jsonrpc_server_create(struct ovsdb *db)
82 {
83     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
84     server->db = db;
85     server->max_sessions = 64;
86     list_init(&server->sessions);
87     return server;
88 }
89
90 void
91 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
92                             struct pstream *pstream)
93 {
94     if (svr->n_listeners >= svr->allocated_listeners) {
95         svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
96                                     sizeof *svr->listeners);
97     }
98     svr->listeners[svr->n_listeners++] = pstream;
99 }
100
101 void
102 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
103                              const char *name)
104 {
105     ovsdb_jsonrpc_session_create_active(svr, name);
106 }
107
108 void
109 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
110 {
111     size_t i;
112
113     /* Accept new connections. */
114     for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
115         struct pstream *listener = svr->listeners[i];
116         struct stream *stream;
117         int error;
118
119         error = pstream_accept(listener, &stream);
120         if (!error) {
121             ovsdb_jsonrpc_session_create_passive(svr, stream);
122         } else if (error == EAGAIN) {
123             i++;
124         } else if (error) {
125             VLOG_WARN("%s: accept failed: %s",
126                       pstream_get_name(listener), strerror(error));
127             pstream_close(listener);
128             svr->listeners[i] = svr->listeners[--svr->n_listeners];
129         }
130     }
131
132     /* Handle each session. */
133     ovsdb_jsonrpc_session_run_all(svr);
134 }
135
136 void
137 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
138 {
139     if (svr->n_sessions < svr->max_sessions) {
140         size_t i;
141
142         for (i = 0; i < svr->n_listeners; i++) {
143             pstream_wait(svr->listeners[i]);
144         }
145     }
146
147     ovsdb_jsonrpc_session_wait_all(svr);
148 }
149 \f
150 /* JSON-RPC database server session. */
151
152 struct ovsdb_jsonrpc_session {
153     struct ovsdb_jsonrpc_server *server;
154     struct list node;           /* Element in server's sessions list. */
155
156     /* Triggers. */
157     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
158     struct list completions;    /* Completed triggers. */
159
160     /* Monitors. */
161     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
162
163     /* Connecting and reconnecting. */
164     struct reconnect *reconnect; /* For back-off. */
165     bool active;                /* Active or passive connection? */
166     struct jsonrpc *rpc;
167     struct stream *stream;      /* Only if active == false and rpc == NULL. */
168 };
169
170 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
171 static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
172 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
173 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
174 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
175                                              struct jsonrpc_msg *);
176 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
177                                              struct jsonrpc_msg *);
178
179 static struct ovsdb_jsonrpc_session *
180 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
181                              const char *name, bool active)
182 {
183     struct ovsdb_jsonrpc_session *s;
184
185     s = xzalloc(sizeof *s);
186     s->server = svr;
187     list_push_back(&svr->sessions, &s->node);
188     hmap_init(&s->triggers);
189     hmap_init(&s->monitors);
190     list_init(&s->completions);
191     s->reconnect = reconnect_create(time_msec());
192     reconnect_set_name(s->reconnect, name);
193     reconnect_enable(s->reconnect, time_msec());
194     s->active = active;
195
196     svr->n_sessions++;
197
198     return s;
199 }
200
201 static void
202 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
203                                     const char *name)
204 {
205     ovsdb_jsonrpc_session_create(svr, name, true);
206 }
207
208 static void
209 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
210                                      struct stream *stream)
211 {
212     struct ovsdb_jsonrpc_session *s;
213
214     s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
215     reconnect_connected(s->reconnect, time_msec());
216     s->rpc = jsonrpc_open(stream);
217 }
218
219 static void
220 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
221 {
222     ovsdb_jsonrpc_session_disconnect(s);
223     list_remove(&s->node);
224     s->server->n_sessions--;
225 }
226
227 static void
228 ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
229 {
230     reconnect_disconnected(s->reconnect, time_msec(), 0);
231     if (s->rpc) {
232         jsonrpc_error(s->rpc, EOF);
233         ovsdb_jsonrpc_trigger_complete_all(s);
234         ovsdb_jsonrpc_monitor_remove_all(s);
235         jsonrpc_close(s->rpc);
236         s->rpc = NULL;
237     } else if (s->stream) {
238         stream_close(s->stream);
239         s->stream = NULL;
240     }
241 }
242
243 static void
244 ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
245 {
246     ovsdb_jsonrpc_session_disconnect(s);
247     if (s->active) {
248         int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
249         if (error) {
250             reconnect_connect_failed(s->reconnect, time_msec(), error);
251         } else {
252             reconnect_connecting(s->reconnect, time_msec());
253         }
254     }
255 }
256
257 static int
258 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
259 {
260     if (s->rpc) {
261         struct jsonrpc_msg *msg;
262         int error;
263
264         jsonrpc_run(s->rpc);
265
266         ovsdb_jsonrpc_trigger_complete_done(s);
267
268         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
269             reconnect_received(s->reconnect, time_msec());
270             if (msg->type == JSONRPC_REQUEST) {
271                 ovsdb_jsonrpc_session_got_request(s, msg);
272             } else if (msg->type == JSONRPC_NOTIFY) {
273                 ovsdb_jsonrpc_session_got_notify(s, msg);
274             } else if (msg->type == JSONRPC_REPLY
275                        && msg->id && msg->id->type == JSON_STRING
276                        && !strcmp(msg->id->u.string, "echo")) {
277                 /* It's a reply to our echo request.  Ignore it. */
278             } else {
279                 VLOG_WARN("%s: received unexpected %s message",
280                           jsonrpc_get_name(s->rpc),
281                           jsonrpc_msg_type_to_string(msg->type));
282                 jsonrpc_error(s->rpc, EPROTO);
283                 jsonrpc_msg_destroy(msg);
284             }
285         }
286
287         error = jsonrpc_get_status(s->rpc);
288         if (error) {
289             if (s->active) {
290                 ovsdb_jsonrpc_session_disconnect(s);
291             } else {
292                 return error;
293             }
294         }
295     } else if (s->stream) {
296         int error = stream_connect(s->stream);
297         if (!error) {
298             reconnect_connected(s->reconnect, time_msec());
299             s->rpc = jsonrpc_open(s->stream);
300             s->stream = NULL;
301         } else if (error != EAGAIN) {
302             reconnect_connect_failed(s->reconnect, time_msec(), error);
303             stream_close(s->stream);
304             s->stream = NULL;
305         }
306     }
307
308     switch (reconnect_run(s->reconnect, time_msec())) {
309     case RECONNECT_CONNECT:
310         ovsdb_jsonrpc_session_connect(s);
311         break;
312
313     case RECONNECT_DISCONNECT:
314         ovsdb_jsonrpc_session_disconnect(s);
315         break;
316
317     case RECONNECT_PROBE:
318         if (s->rpc) {
319             struct json *params;
320             struct jsonrpc_msg *request;
321
322             params = json_array_create_empty();
323             request = jsonrpc_create_request("echo", params, NULL);
324             json_destroy(request->id);
325             request->id = json_string_create("echo");
326             jsonrpc_send(s->rpc, request);
327         }
328         break;
329     }
330     return s->active || s->rpc ? 0 : ETIMEDOUT;
331 }
332
333 static void
334 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
335 {
336     struct ovsdb_jsonrpc_session *s, *next;
337
338     LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
339                         &svr->sessions) {
340         int error = ovsdb_jsonrpc_session_run(s);
341         if (error) {
342             ovsdb_jsonrpc_session_close(s);
343         }
344     }
345 }
346
347 static void
348 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
349 {
350     if (s->rpc) {
351         jsonrpc_wait(s->rpc);
352         if (!jsonrpc_get_backlog(s->rpc)) {
353             jsonrpc_recv_wait(s->rpc);
354         }
355     } else if (s->stream) {
356         stream_connect_wait(s->stream);
357     }
358     reconnect_wait(s->reconnect, time_msec());
359 }
360
361 static void
362 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
363 {
364     struct ovsdb_jsonrpc_session *s;
365
366     LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
367         ovsdb_jsonrpc_session_wait(s);
368     }
369 }
370
371 static struct jsonrpc_msg *
372 execute_transaction(struct ovsdb_jsonrpc_session *s,
373                     struct jsonrpc_msg *request)
374 {
375     ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
376     request->id = NULL;
377     request->params = NULL;
378     return NULL;
379 }
380
381 static void
382 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
383                                   struct jsonrpc_msg *request)
384 {
385     struct jsonrpc_msg *reply;
386
387     if (!strcmp(request->method, "transact")) {
388         reply = execute_transaction(s, request);
389     } else if (!strcmp(request->method, "monitor")) {
390         reply = jsonrpc_create_reply(
391             ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
392     } else if (!strcmp(request->method, "monitor_cancel")) {
393         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
394                                              request->id);
395     } else if (!strcmp(request->method, "get_schema")) {
396         reply = jsonrpc_create_reply(
397             ovsdb_schema_to_json(s->server->db->schema), request->id);
398     } else if (!strcmp(request->method, "echo")) {
399         reply = jsonrpc_create_reply(json_clone(request->params), request->id);
400     } else {
401         reply = jsonrpc_create_error(json_string_create("unknown method"),
402                                      request->id);
403     }
404
405     if (reply) {
406         jsonrpc_msg_destroy(request);
407         jsonrpc_send(s->rpc, reply);
408     }
409 }
410
411 static void
412 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
413 {
414     if (json_array(request->params)->n == 1) {
415         struct ovsdb_jsonrpc_trigger *t;
416         struct json *id;
417
418         id = request->params->u.array.elems[0];
419         t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
420         if (t) {
421             ovsdb_jsonrpc_trigger_complete(t);
422         }
423     }
424 }
425
426 static void
427 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
428                                  struct jsonrpc_msg *request)
429 {
430     if (!strcmp(request->method, "cancel")) {
431         execute_cancel(s, request);
432     }
433     jsonrpc_msg_destroy(request);
434 }
435 \f
436 /* JSON-RPC database server triggers.
437  *
438  * (Every transaction is treated as a trigger even if it doesn't actually have
439  * any "wait" operations.) */
440
441 struct ovsdb_jsonrpc_trigger {
442     struct ovsdb_trigger trigger;
443     struct ovsdb_jsonrpc_session *session;
444     struct hmap_node hmap_node; /* In session's "triggers" hmap. */
445     struct json *id;
446 };
447
448 static void
449 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
450                              struct json *id, struct json *params)
451 {
452     struct ovsdb_jsonrpc_trigger *t;
453     size_t hash;
454
455     /* Check for duplicate ID. */
456     hash = json_hash(id, 0);
457     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
458     if (t) {
459         jsonrpc_send(s->rpc, jsonrpc_create_error(
460                          json_string_create("duplicate request ID"), id));
461         json_destroy(id);
462         json_destroy(params);
463         return;
464     }
465
466     /* Insert into trigger table. */
467     t = xmalloc(sizeof *t);
468     ovsdb_trigger_init(s->server->db,
469                        &t->trigger, params, &s->completions,
470                        time_msec());
471     t->session = s;
472     t->id = id;
473     hmap_insert(&s->triggers, &t->hmap_node, hash);
474
475     /* Complete early if possible. */
476     if (ovsdb_trigger_is_complete(&t->trigger)) {
477         ovsdb_jsonrpc_trigger_complete(t);
478     }
479 }
480
481 static struct ovsdb_jsonrpc_trigger *
482 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
483                            const struct json *id, size_t hash)
484 {
485     struct ovsdb_jsonrpc_trigger *t;
486
487     HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
488                              &s->triggers) {
489         if (json_equal(t->id, id)) {
490             return t;
491         }
492     }
493
494     return NULL;
495 }
496
497 static void
498 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
499 {
500     struct ovsdb_jsonrpc_session *s = t->session;
501
502     if (s->rpc && !jsonrpc_get_status(s->rpc)) {
503         struct jsonrpc_msg *reply;
504         struct json *result;
505
506         result = ovsdb_trigger_steal_result(&t->trigger);
507         if (result) {
508             reply = jsonrpc_create_reply(result, t->id);
509         } else {
510             reply = jsonrpc_create_error(json_string_create("canceled"),
511                                          t->id);
512         }
513         jsonrpc_send(s->rpc, reply);
514     }
515
516     json_destroy(t->id);
517     ovsdb_trigger_destroy(&t->trigger);
518     hmap_remove(&s->triggers, &t->hmap_node);
519     free(t);
520 }
521
522 static void
523 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
524 {
525     struct ovsdb_jsonrpc_trigger *t, *next;
526     HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
527                         &s->triggers) {
528         ovsdb_jsonrpc_trigger_complete(t);
529     }
530 }
531
532 static void
533 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
534 {
535     while (!list_is_empty(&s->completions)) {
536         struct ovsdb_jsonrpc_trigger *t
537             = CONTAINER_OF(s->completions.next,
538                            struct ovsdb_jsonrpc_trigger, trigger.node);
539         ovsdb_jsonrpc_trigger_complete(t);
540     }
541 }
542 \f
543 /* JSON-RPC database table monitors. */
544
545 enum ovsdb_jsonrpc_monitor_selection {
546     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
547     OJMS_INSERT = 1 << 1,       /* New rows. */
548     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
549     OJMS_MODIFY = 1 << 3        /* Modified rows. */
550 };
551
552 struct ovsdb_jsonrpc_monitor_table {
553     const struct ovsdb_table *table;
554     enum ovsdb_jsonrpc_monitor_selection select;
555     struct ovsdb_column_set columns;
556 };
557
558 struct ovsdb_jsonrpc_monitor {
559     struct ovsdb_replica replica;
560     struct ovsdb_jsonrpc_session *session;
561     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
562
563     struct json *monitor_id;
564     struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
565 };
566
567 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
568
569 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
570     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
571 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
572 static struct json *ovsdb_jsonrpc_monitor_get_initial(
573     const struct ovsdb_jsonrpc_monitor *);
574
575 static bool
576 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
577 {
578     const struct json *json;
579
580     json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
581     return json ? json_boolean(json) : default_value;
582 }
583
584 struct ovsdb_jsonrpc_monitor *
585 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
586                            const struct json *monitor_id)
587 {
588     struct ovsdb_jsonrpc_monitor *m;
589
590     HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
591                              json_hash(monitor_id, 0), &s->monitors) {
592         if (json_equal(m->monitor_id, monitor_id)) {
593             return m;
594         }
595     }
596
597     return NULL;
598 }
599
600 static struct json *
601 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
602                              struct json *params)
603 {
604     struct ovsdb_jsonrpc_monitor *m = NULL;
605     struct json *monitor_id, *monitor_requests;
606     struct ovsdb_error *error = NULL;
607     struct shash_node *node;
608     struct json *json;
609
610     if (json_array(params)->n != 2) {
611         error = ovsdb_syntax_error(params, NULL, "invalid parameters");
612         goto error;
613     }
614     monitor_id = params->u.array.elems[0];
615     monitor_requests = params->u.array.elems[1];
616     if (monitor_requests->type != JSON_OBJECT) {
617         error = ovsdb_syntax_error(monitor_requests, NULL,
618                                    "monitor-requests must be object");
619         goto error;
620     }
621
622     if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
623         error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
624         goto error;
625     }
626
627     m = xzalloc(sizeof *m);
628     ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
629     ovsdb_add_replica(s->server->db, &m->replica);
630     m->session = s;
631     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
632     m->monitor_id = json_clone(monitor_id);
633     shash_init(&m->tables);
634
635     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
636         const struct ovsdb_table *table;
637         struct ovsdb_jsonrpc_monitor_table *mt;
638         const struct json *columns_json, *select_json;
639         struct ovsdb_parser parser;
640
641         table = ovsdb_get_table(s->server->db, node->name);
642         if (!table) {
643             error = ovsdb_syntax_error(NULL, NULL,
644                                        "no table named %s", node->name);
645             goto error;
646         }
647
648         mt = xzalloc(sizeof *mt);
649         mt->table = table;
650         mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
651         ovsdb_column_set_init(&mt->columns);
652         shash_add(&m->tables, table->schema->name, mt);
653
654         ovsdb_parser_init(&parser, node->data, "table %s", node->name);
655         columns_json = ovsdb_parser_member(&parser, "columns",
656                                            OP_ARRAY | OP_OPTIONAL);
657         select_json = ovsdb_parser_member(&parser, "select",
658                                           OP_OBJECT | OP_OPTIONAL);
659         error = ovsdb_parser_finish(&parser);
660         if (error) {
661             goto error;
662         }
663
664         if (columns_json) {
665             error = ovsdb_column_set_from_json(columns_json, table,
666                                                &mt->columns);
667             if (error) {
668                 goto error;
669             }
670         } else {
671             struct shash_node *node;
672
673             SHASH_FOR_EACH (node, &table->schema->columns) {
674                 const struct ovsdb_column *column = node->data;
675                 if (column->index != OVSDB_COL_UUID) {
676                     ovsdb_column_set_add(&mt->columns, column);
677                 }
678             }
679         }
680
681         if (select_json) {
682             mt->select = 0;
683             ovsdb_parser_init(&parser, select_json, "table %s select",
684                               table->schema->name);
685             if (parse_bool(&parser, "initial", true)) {
686                 mt->select |= OJMS_INITIAL;
687             }
688             if (parse_bool(&parser, "insert", true)) {
689                 mt->select |= OJMS_INSERT;
690             }
691             if (parse_bool(&parser, "delete", true)) {
692                 mt->select |= OJMS_DELETE;
693             }
694             if (parse_bool(&parser, "modify", true)) {
695                 mt->select |= OJMS_MODIFY;
696             }
697             error = ovsdb_parser_finish(&parser);
698             if (error) {
699                 goto error;
700             }
701         }
702     }
703
704     return ovsdb_jsonrpc_monitor_get_initial(m);
705
706 error:
707     if (m) {
708         ovsdb_remove_replica(s->server->db, &m->replica);
709     }
710
711     json = ovsdb_error_to_json(error);
712     ovsdb_error_destroy(error);
713     return json;
714 }
715
716 static struct jsonrpc_msg *
717 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
718                              struct json_array *params,
719                              const struct json *request_id)
720 {
721     if (params->n != 1) {
722         return jsonrpc_create_error(json_string_create("invalid parameters"),
723                                     request_id);
724     } else {
725         struct ovsdb_jsonrpc_monitor *m;
726
727         m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
728         if (!m) {
729             return jsonrpc_create_error(json_string_create("unknown monitor"),
730                                         request_id);
731         } else {
732             ovsdb_remove_replica(s->server->db, &m->replica);
733             return jsonrpc_create_reply(json_object_create(), request_id);
734         }
735     }
736 }
737
738 static void
739 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
740 {
741     struct ovsdb_jsonrpc_monitor *m, *next;
742
743     HMAP_FOR_EACH_SAFE (m, next,
744                         struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
745         ovsdb_remove_replica(s->server->db, &m->replica);
746     }
747 }
748
749 static struct ovsdb_jsonrpc_monitor *
750 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
751 {
752     assert(replica->class == &ovsdb_jsonrpc_replica_class);
753     return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
754 }
755
756 struct ovsdb_jsonrpc_monitor_aux {
757     bool initial;               /* Sending initial contents of table? */
758     const struct ovsdb_jsonrpc_monitor *monitor;
759     struct json *json;          /* JSON for the whole transaction. */
760
761     /* Current table.  */
762     struct ovsdb_jsonrpc_monitor_table *mt;
763     struct json *table_json;    /* JSON for table's transaction. */
764 };
765
766 static bool
767 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
768                                 const struct ovsdb_row *new,
769                                 void *aux_)
770 {
771     struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
772     const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
773     struct ovsdb_table *table = new ? new->table : old->table;
774     enum ovsdb_jsonrpc_monitor_selection type;
775     struct json *old_json, *new_json;
776     struct json *row_json;
777     char uuid[UUID_LEN + 1];
778     int n_changed;
779     size_t i;
780
781     if (!aux->mt || table != aux->mt->table) {
782         aux->mt = shash_find_data(&m->tables, table->schema->name);
783         aux->table_json = NULL;
784         if (!aux->mt) {
785             /* We don't care about rows in this table at all.  Tell the caller
786              * to skip it.  */
787             return false;
788         }
789     }
790
791     type = (aux->initial ? OJMS_INITIAL
792             : !old ? OJMS_INSERT
793             : !new ? OJMS_DELETE
794             : OJMS_MODIFY);
795     if (!(aux->mt->select & type)) {
796         /* We don't care about this type of change (but do want to be called
797          * back for changes to other rows in the same table). */
798         return true;
799     }
800
801     old_json = new_json = NULL;
802     n_changed = 0;
803     for (i = 0; i < aux->mt->columns.n_columns; i++) {
804         const struct ovsdb_column *column = aux->mt->columns.columns[i];
805         unsigned int idx = column->index;
806         bool changed = false;
807
808         if (type == OJMS_MODIFY) {
809             changed = !ovsdb_datum_equals(&old->fields[idx],
810                                           &new->fields[idx], &column->type);
811             n_changed += changed;
812         }
813         if (changed || type == OJMS_DELETE) {
814             if (!old_json) {
815                 old_json = json_object_create();
816             }
817             json_object_put(old_json, column->name,
818                             ovsdb_datum_to_json(&old->fields[idx],
819                                                 &column->type));
820         }
821         if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
822             if (!new_json) {
823                 new_json = json_object_create();
824             }
825             json_object_put(new_json, column->name,
826                             ovsdb_datum_to_json(&new->fields[idx],
827                                                 &column->type));
828         }
829     }
830     if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
831         /* No reportable changes. */
832         json_destroy(old_json);
833         json_destroy(new_json);
834         return true;
835     }
836
837     /* Create JSON object for transaction overall. */
838     if (!aux->json) {
839         aux->json = json_object_create();
840     }
841
842     /* Create JSON object for transaction on this table. */
843     if (!aux->table_json) {
844         aux->table_json = json_object_create();
845         json_object_put(aux->json, aux->mt->table->schema->name,
846                         aux->table_json);
847     }
848
849     /* Create JSON object for transaction on this row. */
850     row_json = json_object_create();
851     if (old_json) {
852         json_object_put(row_json, "old", old_json);
853     }
854     if (new_json) {
855         json_object_put(row_json, "new", new_json);
856     }
857
858     /* Add JSON row to JSON table. */
859     snprintf(uuid, sizeof uuid,
860              UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
861     json_object_put(aux->table_json, uuid, row_json);
862
863     return true;
864 }
865
866 static void
867 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
868                                const struct ovsdb_jsonrpc_monitor *m,
869                                bool initial)
870 {
871     aux->initial = initial;
872     aux->monitor = m;
873     aux->json = NULL;
874     aux->mt = NULL;
875     aux->table_json = NULL;
876 }
877
878 static struct ovsdb_error *
879 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
880                              const struct ovsdb_txn *txn, bool durable UNUSED)
881 {
882     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
883     struct ovsdb_jsonrpc_monitor_aux aux;
884
885     ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
886     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
887     if (aux.json) {
888         struct jsonrpc_msg *msg;
889         struct json *params;
890
891         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
892                                      aux.json);
893         msg = jsonrpc_create_notify("update", params);
894         jsonrpc_send(aux.monitor->session->rpc, msg);
895     }
896
897     return NULL;
898 }
899
900 static struct json *
901 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
902 {
903     struct ovsdb_jsonrpc_monitor_aux aux;
904     struct shash_node *node;
905
906     ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
907     SHASH_FOR_EACH (node, &m->tables) {
908         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
909
910         if (mt->select & OJMS_INITIAL) {
911             struct ovsdb_row *row;
912
913             HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
914                            &mt->table->rows) {
915                 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
916             }
917         }
918     }
919     return aux.json ? aux.json : json_object_create();
920 }
921
922 static void
923 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
924 {
925     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
926     struct shash_node *node;
927
928     json_destroy(m->monitor_id);
929     SHASH_FOR_EACH (node, &m->tables) {
930         struct ovsdb_jsonrpc_monitor_table *mt = node->data;
931         ovsdb_column_set_destroy(&mt->columns);
932         free(mt);
933     }
934     shash_destroy(&m->tables);
935     hmap_remove(&m->session->monitors, &m->node);
936     free(m);
937 }
938
939 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
940     ovsdb_jsonrpc_monitor_commit,
941     ovsdb_jsonrpc_monitor_destroy
942 };