1 /* Copyright (c) 2009 Nicira Networks
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
18 #include "jsonrpc-server.h"
25 #include "ovsdb-error.h"
26 #include "ovsdb-parser.h"
28 #include "reconnect.h"
33 #include "transaction.h"
36 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
39 struct ovsdb_jsonrpc_session;
42 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
44 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
46 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
47 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
50 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
51 struct json *id, struct json *params);
52 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
53 struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
54 static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
55 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
56 static void ovsdb_jsonrpc_trigger_complete_done(
57 struct ovsdb_jsonrpc_session *);
60 static struct json *ovsdb_jsonrpc_monitor_create(
61 struct ovsdb_jsonrpc_session *, struct json *params);
62 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
63 struct ovsdb_jsonrpc_session *,
64 struct json_array *params,
65 const struct json *request_id);
66 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
68 /* JSON-RPC database server. */
70 struct ovsdb_jsonrpc_server {
73 struct list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
74 unsigned int n_sessions, max_sessions;
76 struct pstream **listeners;
77 size_t n_listeners, allocated_listeners;
80 struct ovsdb_jsonrpc_server *
81 ovsdb_jsonrpc_server_create(struct ovsdb *db)
83 struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
85 server->max_sessions = 64;
86 list_init(&server->sessions);
91 ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
92 struct pstream *pstream)
94 if (svr->n_listeners >= svr->allocated_listeners) {
95 svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
96 sizeof *svr->listeners);
98 svr->listeners[svr->n_listeners++] = pstream;
102 ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
105 ovsdb_jsonrpc_session_create_active(svr, name);
109 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
113 /* Accept new connections. */
114 for (i = 0; i < svr->n_listeners && svr->n_sessions < svr->max_sessions;) {
115 struct pstream *listener = svr->listeners[i];
116 struct stream *stream;
119 error = pstream_accept(listener, &stream);
121 ovsdb_jsonrpc_session_create_passive(svr, stream);
122 } else if (error == EAGAIN) {
125 VLOG_WARN("%s: accept failed: %s",
126 pstream_get_name(listener), strerror(error));
127 pstream_close(listener);
128 svr->listeners[i] = svr->listeners[--svr->n_listeners];
132 /* Handle each session. */
133 ovsdb_jsonrpc_session_run_all(svr);
137 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
139 if (svr->n_sessions < svr->max_sessions) {
142 for (i = 0; i < svr->n_listeners; i++) {
143 pstream_wait(svr->listeners[i]);
147 ovsdb_jsonrpc_session_wait_all(svr);
150 /* JSON-RPC database server session. */
152 struct ovsdb_jsonrpc_session {
153 struct ovsdb_jsonrpc_server *server;
154 struct list node; /* Element in server's sessions list. */
157 struct hmap triggers; /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
158 struct list completions; /* Completed triggers. */
161 struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
163 /* Network connectivity. */
164 struct jsonrpc_session *js; /* JSON-RPC session. */
165 unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
168 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
169 static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
170 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
171 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
172 struct jsonrpc_msg *);
173 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
174 struct jsonrpc_msg *);
176 static struct ovsdb_jsonrpc_session *
177 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
178 struct jsonrpc_session *js)
180 struct ovsdb_jsonrpc_session *s;
182 s = xzalloc(sizeof *s);
184 list_push_back(&svr->sessions, &s->node);
185 hmap_init(&s->triggers);
186 hmap_init(&s->monitors);
187 list_init(&s->completions);
189 s->js_seqno = jsonrpc_session_get_seqno(js);
197 ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
200 ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name));
204 ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
205 struct stream *stream)
207 ovsdb_jsonrpc_session_create(
208 svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream)));
212 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
214 jsonrpc_session_close(s->js);
215 list_remove(&s->node);
216 s->server->n_sessions--;
220 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
222 jsonrpc_session_run(s->js);
223 if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
224 s->js_seqno = jsonrpc_session_get_seqno(s->js);
225 ovsdb_jsonrpc_trigger_complete_all(s);
226 ovsdb_jsonrpc_monitor_remove_all(s);
229 ovsdb_jsonrpc_trigger_complete_done(s);
231 if (!jsonrpc_session_get_backlog(s->js)) {
232 struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
234 if (msg->type == JSONRPC_REQUEST) {
235 ovsdb_jsonrpc_session_got_request(s, msg);
236 } else if (msg->type == JSONRPC_NOTIFY) {
237 ovsdb_jsonrpc_session_got_notify(s, msg);
239 VLOG_WARN("%s: received unexpected %s message",
240 jsonrpc_session_get_name(s->js),
241 jsonrpc_msg_type_to_string(msg->type));
242 jsonrpc_session_force_reconnect(s->js);
243 jsonrpc_msg_destroy(msg);
247 return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
251 ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
253 struct ovsdb_jsonrpc_session *s, *next;
255 LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
257 int error = ovsdb_jsonrpc_session_run(s);
259 ovsdb_jsonrpc_session_close(s);
265 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
267 jsonrpc_session_wait(s->js);
268 if (!jsonrpc_session_get_backlog(s->js)) {
269 jsonrpc_session_recv_wait(s->js);
274 ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
276 struct ovsdb_jsonrpc_session *s;
278 LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
279 ovsdb_jsonrpc_session_wait(s);
283 static struct jsonrpc_msg *
284 execute_transaction(struct ovsdb_jsonrpc_session *s,
285 struct jsonrpc_msg *request)
287 ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
289 request->params = NULL;
294 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
295 struct jsonrpc_msg *request)
297 struct jsonrpc_msg *reply;
299 if (!strcmp(request->method, "transact")) {
300 reply = execute_transaction(s, request);
301 } else if (!strcmp(request->method, "monitor")) {
302 reply = jsonrpc_create_reply(
303 ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
304 } else if (!strcmp(request->method, "monitor_cancel")) {
305 reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
307 } else if (!strcmp(request->method, "get_schema")) {
308 reply = jsonrpc_create_reply(
309 ovsdb_schema_to_json(s->server->db->schema), request->id);
310 } else if (!strcmp(request->method, "echo")) {
311 reply = jsonrpc_create_reply(json_clone(request->params), request->id);
313 reply = jsonrpc_create_error(json_string_create("unknown method"),
318 jsonrpc_msg_destroy(request);
319 jsonrpc_session_send(s->js, reply);
324 execute_cancel(struct ovsdb_jsonrpc_session *s, struct jsonrpc_msg *request)
326 if (json_array(request->params)->n == 1) {
327 struct ovsdb_jsonrpc_trigger *t;
330 id = request->params->u.array.elems[0];
331 t = ovsdb_jsonrpc_trigger_find(s, id, json_hash(id, 0));
333 ovsdb_jsonrpc_trigger_complete(t);
339 ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
340 struct jsonrpc_msg *request)
342 if (!strcmp(request->method, "cancel")) {
343 execute_cancel(s, request);
345 jsonrpc_msg_destroy(request);
348 /* JSON-RPC database server triggers.
350 * (Every transaction is treated as a trigger even if it doesn't actually have
351 * any "wait" operations.) */
353 struct ovsdb_jsonrpc_trigger {
354 struct ovsdb_trigger trigger;
355 struct ovsdb_jsonrpc_session *session;
356 struct hmap_node hmap_node; /* In session's "triggers" hmap. */
361 ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
362 struct json *id, struct json *params)
364 struct ovsdb_jsonrpc_trigger *t;
367 /* Check for duplicate ID. */
368 hash = json_hash(id, 0);
369 t = ovsdb_jsonrpc_trigger_find(s, id, hash);
371 struct jsonrpc_msg *msg;
373 msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
375 jsonrpc_session_send(s->js, msg);
377 json_destroy(params);
381 /* Insert into trigger table. */
382 t = xmalloc(sizeof *t);
383 ovsdb_trigger_init(s->server->db,
384 &t->trigger, params, &s->completions,
388 hmap_insert(&s->triggers, &t->hmap_node, hash);
390 /* Complete early if possible. */
391 if (ovsdb_trigger_is_complete(&t->trigger)) {
392 ovsdb_jsonrpc_trigger_complete(t);
396 static struct ovsdb_jsonrpc_trigger *
397 ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
398 const struct json *id, size_t hash)
400 struct ovsdb_jsonrpc_trigger *t;
402 HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
404 if (json_equal(t->id, id)) {
413 ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
415 struct ovsdb_jsonrpc_session *s = t->session;
417 if (jsonrpc_session_is_connected(s->js)) {
418 struct jsonrpc_msg *reply;
421 result = ovsdb_trigger_steal_result(&t->trigger);
423 reply = jsonrpc_create_reply(result, t->id);
425 reply = jsonrpc_create_error(json_string_create("canceled"),
428 jsonrpc_session_send(s->js, reply);
432 ovsdb_trigger_destroy(&t->trigger);
433 hmap_remove(&s->triggers, &t->hmap_node);
438 ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
440 struct ovsdb_jsonrpc_trigger *t, *next;
441 HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
443 ovsdb_jsonrpc_trigger_complete(t);
448 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
450 while (!list_is_empty(&s->completions)) {
451 struct ovsdb_jsonrpc_trigger *t
452 = CONTAINER_OF(s->completions.next,
453 struct ovsdb_jsonrpc_trigger, trigger.node);
454 ovsdb_jsonrpc_trigger_complete(t);
458 /* JSON-RPC database table monitors. */
460 enum ovsdb_jsonrpc_monitor_selection {
461 OJMS_INITIAL = 1 << 0, /* All rows when monitor is created. */
462 OJMS_INSERT = 1 << 1, /* New rows. */
463 OJMS_DELETE = 1 << 2, /* Deleted rows. */
464 OJMS_MODIFY = 1 << 3 /* Modified rows. */
467 struct ovsdb_jsonrpc_monitor_table {
468 const struct ovsdb_table *table;
469 enum ovsdb_jsonrpc_monitor_selection select;
470 struct ovsdb_column_set columns;
473 struct ovsdb_jsonrpc_monitor {
474 struct ovsdb_replica replica;
475 struct ovsdb_jsonrpc_session *session;
476 struct hmap_node node; /* In ovsdb_jsonrpc_session's "monitors". */
478 struct json *monitor_id;
479 struct shash tables; /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
482 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
484 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
485 struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
486 static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
487 static struct json *ovsdb_jsonrpc_monitor_get_initial(
488 const struct ovsdb_jsonrpc_monitor *);
491 parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
493 const struct json *json;
495 json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
496 return json ? json_boolean(json) : default_value;
499 struct ovsdb_jsonrpc_monitor *
500 ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
501 const struct json *monitor_id)
503 struct ovsdb_jsonrpc_monitor *m;
505 HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
506 json_hash(monitor_id, 0), &s->monitors) {
507 if (json_equal(m->monitor_id, monitor_id)) {
516 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
519 struct ovsdb_jsonrpc_monitor *m = NULL;
520 struct json *monitor_id, *monitor_requests;
521 struct ovsdb_error *error = NULL;
522 struct shash_node *node;
525 if (json_array(params)->n != 2) {
526 error = ovsdb_syntax_error(params, NULL, "invalid parameters");
529 monitor_id = params->u.array.elems[0];
530 monitor_requests = params->u.array.elems[1];
531 if (monitor_requests->type != JSON_OBJECT) {
532 error = ovsdb_syntax_error(monitor_requests, NULL,
533 "monitor-requests must be object");
537 if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
538 error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
542 m = xzalloc(sizeof *m);
543 ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
544 ovsdb_add_replica(s->server->db, &m->replica);
546 hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
547 m->monitor_id = json_clone(monitor_id);
548 shash_init(&m->tables);
550 SHASH_FOR_EACH (node, json_object(monitor_requests)) {
551 const struct ovsdb_table *table;
552 struct ovsdb_jsonrpc_monitor_table *mt;
553 const struct json *columns_json, *select_json;
554 struct ovsdb_parser parser;
556 table = ovsdb_get_table(s->server->db, node->name);
558 error = ovsdb_syntax_error(NULL, NULL,
559 "no table named %s", node->name);
563 mt = xzalloc(sizeof *mt);
565 mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
566 ovsdb_column_set_init(&mt->columns);
567 shash_add(&m->tables, table->schema->name, mt);
569 ovsdb_parser_init(&parser, node->data, "table %s", node->name);
570 columns_json = ovsdb_parser_member(&parser, "columns",
571 OP_ARRAY | OP_OPTIONAL);
572 select_json = ovsdb_parser_member(&parser, "select",
573 OP_OBJECT | OP_OPTIONAL);
574 error = ovsdb_parser_finish(&parser);
580 error = ovsdb_column_set_from_json(columns_json, table,
586 struct shash_node *node;
588 SHASH_FOR_EACH (node, &table->schema->columns) {
589 const struct ovsdb_column *column = node->data;
590 if (column->index != OVSDB_COL_UUID) {
591 ovsdb_column_set_add(&mt->columns, column);
598 ovsdb_parser_init(&parser, select_json, "table %s select",
599 table->schema->name);
600 if (parse_bool(&parser, "initial", true)) {
601 mt->select |= OJMS_INITIAL;
603 if (parse_bool(&parser, "insert", true)) {
604 mt->select |= OJMS_INSERT;
606 if (parse_bool(&parser, "delete", true)) {
607 mt->select |= OJMS_DELETE;
609 if (parse_bool(&parser, "modify", true)) {
610 mt->select |= OJMS_MODIFY;
612 error = ovsdb_parser_finish(&parser);
619 return ovsdb_jsonrpc_monitor_get_initial(m);
623 ovsdb_remove_replica(s->server->db, &m->replica);
626 json = ovsdb_error_to_json(error);
627 ovsdb_error_destroy(error);
631 static struct jsonrpc_msg *
632 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
633 struct json_array *params,
634 const struct json *request_id)
636 if (params->n != 1) {
637 return jsonrpc_create_error(json_string_create("invalid parameters"),
640 struct ovsdb_jsonrpc_monitor *m;
642 m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
644 return jsonrpc_create_error(json_string_create("unknown monitor"),
647 ovsdb_remove_replica(s->server->db, &m->replica);
648 return jsonrpc_create_reply(json_object_create(), request_id);
654 ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
656 struct ovsdb_jsonrpc_monitor *m, *next;
658 HMAP_FOR_EACH_SAFE (m, next,
659 struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
660 ovsdb_remove_replica(s->server->db, &m->replica);
664 static struct ovsdb_jsonrpc_monitor *
665 ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
667 assert(replica->class == &ovsdb_jsonrpc_replica_class);
668 return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
671 struct ovsdb_jsonrpc_monitor_aux {
672 bool initial; /* Sending initial contents of table? */
673 const struct ovsdb_jsonrpc_monitor *monitor;
674 struct json *json; /* JSON for the whole transaction. */
677 struct ovsdb_jsonrpc_monitor_table *mt;
678 struct json *table_json; /* JSON for table's transaction. */
682 ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
683 const struct ovsdb_row *new,
686 struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
687 const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
688 struct ovsdb_table *table = new ? new->table : old->table;
689 enum ovsdb_jsonrpc_monitor_selection type;
690 struct json *old_json, *new_json;
691 struct json *row_json;
692 char uuid[UUID_LEN + 1];
696 if (!aux->mt || table != aux->mt->table) {
697 aux->mt = shash_find_data(&m->tables, table->schema->name);
698 aux->table_json = NULL;
700 /* We don't care about rows in this table at all. Tell the caller
706 type = (aux->initial ? OJMS_INITIAL
710 if (!(aux->mt->select & type)) {
711 /* We don't care about this type of change (but do want to be called
712 * back for changes to other rows in the same table). */
716 old_json = new_json = NULL;
718 for (i = 0; i < aux->mt->columns.n_columns; i++) {
719 const struct ovsdb_column *column = aux->mt->columns.columns[i];
720 unsigned int idx = column->index;
721 bool changed = false;
723 if (type == OJMS_MODIFY) {
724 changed = !ovsdb_datum_equals(&old->fields[idx],
725 &new->fields[idx], &column->type);
726 n_changed += changed;
728 if (changed || type == OJMS_DELETE) {
730 old_json = json_object_create();
732 json_object_put(old_json, column->name,
733 ovsdb_datum_to_json(&old->fields[idx],
736 if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
738 new_json = json_object_create();
740 json_object_put(new_json, column->name,
741 ovsdb_datum_to_json(&new->fields[idx],
745 if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
746 /* No reportable changes. */
747 json_destroy(old_json);
748 json_destroy(new_json);
752 /* Create JSON object for transaction overall. */
754 aux->json = json_object_create();
757 /* Create JSON object for transaction on this table. */
758 if (!aux->table_json) {
759 aux->table_json = json_object_create();
760 json_object_put(aux->json, aux->mt->table->schema->name,
764 /* Create JSON object for transaction on this row. */
765 row_json = json_object_create();
767 json_object_put(row_json, "old", old_json);
770 json_object_put(row_json, "new", new_json);
773 /* Add JSON row to JSON table. */
774 snprintf(uuid, sizeof uuid,
775 UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
776 json_object_put(aux->table_json, uuid, row_json);
782 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
783 const struct ovsdb_jsonrpc_monitor *m,
786 aux->initial = initial;
790 aux->table_json = NULL;
793 static struct ovsdb_error *
794 ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
795 const struct ovsdb_txn *txn, bool durable UNUSED)
797 struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
798 struct ovsdb_jsonrpc_monitor_aux aux;
800 ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
801 ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
803 struct jsonrpc_msg *msg;
806 params = json_array_create_2(json_clone(aux.monitor->monitor_id),
808 msg = jsonrpc_create_notify("update", params);
809 jsonrpc_session_send(aux.monitor->session->js, msg);
816 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
818 struct ovsdb_jsonrpc_monitor_aux aux;
819 struct shash_node *node;
821 ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
822 SHASH_FOR_EACH (node, &m->tables) {
823 struct ovsdb_jsonrpc_monitor_table *mt = node->data;
825 if (mt->select & OJMS_INITIAL) {
826 struct ovsdb_row *row;
828 HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
830 ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
834 return aux.json ? aux.json : json_object_create();
838 ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
840 struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
841 struct shash_node *node;
843 json_destroy(m->monitor_id);
844 SHASH_FOR_EACH (node, &m->tables) {
845 struct ovsdb_jsonrpc_monitor_table *mt = node->data;
846 ovsdb_column_set_destroy(&mt->columns);
849 shash_destroy(&m->tables);
850 hmap_remove(&m->session->monitors, &m->node);
854 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
855 ovsdb_jsonrpc_monitor_commit,
856 ovsdb_jsonrpc_monitor_destroy