#include "ovsdb-error.h"
#include "ovsdb-parser.h"
#include "ovsdb.h"
+#include "poll-loop.h"
#include "reconnect.h"
#include "row.h"
#include "server.h"
struct ovsdb_jsonrpc_remote_status *);
static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
+static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
+ struct jsonrpc_msg *);
/* Triggers. */
static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
struct json_array *params,
const struct json *request_id);
static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
+static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
\f
/* JSON-RPC database server. */
ovsdb_jsonrpc_trigger_complete_done(s);
if (!jsonrpc_session_get_backlog(s->js)) {
- struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
+ struct jsonrpc_msg *msg;
+
+ ovsdb_jsonrpc_monitor_flush_all(s);
+
+ msg = jsonrpc_session_recv(s->js);
if (msg) {
if (msg->type == JSONRPC_REQUEST) {
ovsdb_jsonrpc_session_got_request(s, msg);
{
jsonrpc_session_wait(s->js);
if (!jsonrpc_session_get_backlog(s->js)) {
- jsonrpc_session_recv_wait(s->js);
+ if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
+ poll_immediate_wake();
+ } else {
+ jsonrpc_session_recv_wait(s->js);
+ }
}
}
s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
params = json_array_create_1(json_string_create(lock_name));
- jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
+ ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
}
static struct jsonrpc_msg *
if (reply) {
jsonrpc_msg_destroy(request);
- jsonrpc_session_send(s->js, reply);
+ ovsdb_jsonrpc_session_send(s, reply);
}
}
}
jsonrpc_msg_destroy(request);
}
+
+static void
+ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
+ struct jsonrpc_msg *msg)
+{
+ ovsdb_jsonrpc_monitor_flush_all(s);
+ jsonrpc_session_send(s->js, msg);
+}
\f
/* JSON-RPC database server triggers.
*
msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
id);
- jsonrpc_session_send(s->js, msg);
+ ovsdb_jsonrpc_session_send(s, msg);
json_destroy(id);
json_destroy(params);
return;
reply = jsonrpc_create_error(json_string_create("canceled"),
t->id);
}
- jsonrpc_session_send(s->js, reply);
+ ovsdb_jsonrpc_session_send(s, reply);
}
json_destroy(t->id);
return json;
}
+static bool
+ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
+{
+ struct ovsdb_jsonrpc_monitor *m;
+
+ HMAP_FOR_EACH (m, node, &s->monitors) {
+ struct shash_node *node;
+
+ SHASH_FOR_EACH (node, &m->tables) {
+ struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+
+ if (!hmap_is_empty(&mt->changes)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+static void
+ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
+{
+ struct ovsdb_jsonrpc_monitor *m;
+
+ HMAP_FOR_EACH (m, node, &s->monitors) {
+ struct json *json;
+
+ json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
+ if (json) {
+ struct jsonrpc_msg *msg;
+ struct json *params;
+
+ params = json_array_create_2(json_clone(m->monitor_id), json);
+ msg = jsonrpc_create_notify("update", params);
+ jsonrpc_session_send(s->js, msg);
+ }
+ }
+}
+
static void
ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
const struct ovsdb_jsonrpc_monitor *m)
{
struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
struct ovsdb_jsonrpc_monitor_aux aux;
- struct json *json;
ovsdb_jsonrpc_monitor_init_aux(&aux, m);
ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
- json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
- if (json) {
- struct jsonrpc_msg *msg;
- struct json *params;
-
- params = json_array_create_2(json_clone(aux.monitor->monitor_id),
- json);
- msg = jsonrpc_create_notify("update", params);
- jsonrpc_session_send(aux.monitor->session->js, msg);
- }
return NULL;
}
EXECUTION_EXAMPLES
\f
+AT_BANNER([ovsdb-server miscellaneous features])
+
AT_SETUP([truncating corrupted database log])
AT_KEYWORDS([ovsdb server positive unix])
OVS_RUNDIR=`pwd`; export OVS_RUNDIR
], [], [test ! -e pid || kill `cat pid`])
OVSDB_SERVER_SHUTDOWN
AT_CLEANUP
+
+AT_SETUP([ovsdb-server combines updates on backlogged connections])
+OVS_LOGDIR=`pwd`; export OVS_LOGDIR
+OVS_RUNDIR=`pwd`; export OVS_RUNDIR
+ON_EXIT([kill `cat *.pid`])
+
+# The maximum socket receive buffer size is important for this test, which
+# tests behavior when the receive buffer overflows.
+if test -e /proc/sys/net/core/rmem_max; then
+ # Linux
+ rmem_max=`cat /proc/sys/net/core/rmem_max`
+elif rmem_max=`sysctl -n net.inet.tcp.recvbuf_max 2>/dev/null`; then
+ : # FreeBSD
+else
+ # Don't know how to get maximum socket receive buffer on this OS
+ AT_SKIP_IF([:])
+fi
+
+# Calculate the number of iterations we need to queue. Each of the
+# iterations we execute, by itself, yields a monitor update of about
+# 25 kB, so fill up that much space plus a few for luck.
+n_iterations=`expr $rmem_max / 2500 + 5`
+echo rmem_max=$rmem_max n_iterations=$n_iterations
+
+# Calculate the exact number of monitor updates expected for $n_iterations,
+# assuming no updates are combined. The "extra" update is for the initial
+# contents of the database.
+n_updates=`expr $n_iterations \* 3 + 1`
+
+# Start an ovsdb-server with the vswitchd schema.
+OVSDB_INIT([db])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --log-file --remote=punix:db.sock db],
+ [0], [ignore], [ignore])
+
+# Executes a set of transactions that add a bridge with 100 ports, and
+# then deletes that bridge. This yields three monitor updates that
+# add up to about 25 kB in size.
+#
+# The update also increments a counter held in the database so that we can
+# verify that the overall effect of the transactions took effect (e.g.
+# monitor updates at the end weren't just dropped). We add an arbitrary
+# string to the counter to make grepping for it more reliable.
+counter=0
+trigger_big_update () {
+ counter=`expr $counter + 1`
+ ovs-vsctl --no-wait -- set open_vswitch . system_version=xyzzy$counter
+ ovs-vsctl --no-wait -- add-br br0 $add
+ ovs-vsctl --no-wait -- del-br br0
+}
+add_ports () {
+ for j in `seq 1 100`; do
+ printf " -- add-port br0 p%d" $j
+ done
+}
+add=`add_ports`
+
+AT_CAPTURE_FILE([ovsdb-client.err])
+
+# Start an ovsdb-client monitoring all changes to the database,
+# make it block to force the buffers to fill up, and then execute
+# enough iterations that ovsdb-server starts combining updates.
+AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL >ovsdb-client.out 2>ovsdb-client.err])
+AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/block])
+for i in `seq 1 $n_iterations`; do
+ echo "blocked update ($i of $n_iterations)"
+ trigger_big_update $i
+done
+AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/unblock])
+OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
+AT_CHECK([ovs-appctl -t ovsdb-client exit])
+OVS_WAIT_WHILE([test -e ovsdb-client.pid])
+
+# Count the number of updates in the ovsdb-client output, by counting
+# the number of changes to the Open_vSwitch table. (All of our
+# transactions modify the Open_vSwitch table.) It should be less than
+# $n_updates updates.
+#
+# Check that the counter is what we expect.
+logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
+echo "logged_updates=$logged_updates (expected less than $n_updates)"
+AT_CHECK([test $logged_updates -lt $n_updates])
+AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
+ ["xyzzy$counter"
+])
+
+# Start an ovsdb-client monitoring all changes to the database,
+# without making it block, and then execute the same transactions that
+# we did before.
+AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL >ovsdb-client.out 2>ovsdb-client.err])
+for i in `seq 1 $n_iterations`; do
+ echo "unblocked update ($i of $n_iterations)"
+ trigger_big_update
+
+ # Make sure that ovsdb-client gets enough CPU time to process the updates.
+ ovs-appctl -t ovsdb-client version > /dev/null
+done
+OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
+AT_CHECK([ovs-appctl -t ovsdb-client exit])
+OVS_WAIT_WHILE([test -e ovsdb-client.pid])
+
+# The ovsdb-client output should have exactly $n_updates updates.
+#
+# Also check that the counter is what we expect.
+logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
+echo "logged_updates=$logged_updates (expected $n_updates)"
+AT_CHECK([test $logged_updates -eq $n_updates])
+AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
+ ["xyzzy$counter"
+])
+AT_CLEANUP
\f
AT_BANNER([OVSDB -- ovsdb-server transactions (SSL IPv4 sockets)])