+static void
+ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
+{
+ reconnect_disconnected(s->reconnect, time_msec(), 0);
+ if (s->rpc) {
+ struct ovsdb_jsonrpc_trigger *t, *next;
+
+ jsonrpc_error(s->rpc, EOF);
+ HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
+ &s->triggers) {
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+
+ jsonrpc_close(s->rpc);
+ s->rpc = NULL;
+ } else if (s->stream) {
+ stream_close(s->stream);
+ s->stream = NULL;
+ }
+}
+
+static void
+ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
+{
+ ovsdb_jsonrpc_session_disconnect(s);
+ if (s->active) {
+ int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
+ if (error) {
+ reconnect_connect_failed(s->reconnect, time_msec(), error);
+ } else {
+ reconnect_connecting(s->reconnect, time_msec());
+ }
+ }
+}
+
+static int
+ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
+{
+ if (s->rpc) {
+ struct jsonrpc_msg *msg;
+ int error;
+
+ jsonrpc_run(s->rpc);
+
+ while (!list_is_empty(&s->completions)) {
+ struct ovsdb_jsonrpc_trigger *t
+ = CONTAINER_OF(s->completions.next,
+ struct ovsdb_jsonrpc_trigger, trigger.node);
+ ovsdb_jsonrpc_trigger_complete(t);
+ }
+
+ if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
+ reconnect_received(s->reconnect, time_msec());
+ if (msg->type == JSONRPC_REQUEST) {
+ ovsdb_jsonrpc_session_got_request(s, msg);
+ } else if (msg->type == JSONRPC_NOTIFY) {
+ ovsdb_jsonrpc_session_got_notify(s, msg);
+ } else {
+ VLOG_WARN("%s: received unexpected %s message",
+ jsonrpc_get_name(s->rpc),
+ jsonrpc_msg_type_to_string(msg->type));
+ jsonrpc_error(s->rpc, EPROTO);
+ jsonrpc_msg_destroy(msg);
+ }
+ }
+
+ error = jsonrpc_get_status(s->rpc);
+ if (error) {
+ if (s->active) {
+ ovsdb_jsonrpc_session_disconnect(s);
+ } else {
+ return error;
+ }
+ }
+ } else if (s->stream) {
+ int error = stream_connect(s->stream);
+ if (!error) {
+ reconnect_connected(s->reconnect, time_msec());
+ s->rpc = jsonrpc_open(s->stream);
+ s->stream = NULL;
+ } else if (error != EAGAIN) {
+ reconnect_connect_failed(s->reconnect, time_msec(), error);
+ stream_close(s->stream);
+ s->stream = NULL;
+ }
+ }
+
+ switch (reconnect_run(s->reconnect, time_msec())) {
+ case RECONNECT_CONNECT:
+ ovsdb_jsonrpc_session_connect(s);
+ break;
+
+ case RECONNECT_DISCONNECT:
+ ovsdb_jsonrpc_session_disconnect(s);
+ break;
+
+ case RECONNECT_PROBE:
+ if (s->rpc) {
+ struct json *params = json_integer_create(0);
+ jsonrpc_send(s->rpc, jsonrpc_create_request("echo", params));
+ }
+ break;
+ }
+ return s->active || s->rpc ? 0 : ETIMEDOUT;
+
+}
+
+static void
+ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
+{
+ if (s->rpc) {
+ jsonrpc_wait(s->rpc);
+ if (!jsonrpc_get_backlog(s->rpc)) {
+ jsonrpc_recv_wait(s->rpc);
+ }
+ } else if (s->stream) {
+ stream_connect_wait(s->stream);
+ }
+ reconnect_wait(s->reconnect, time_msec());
+}
+