/*
- * Copyright (c) 2009, 2010 Nicira Networks.
+ * Copyright (c) 2009, 2010, 2011 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include "list.h"
#include "ofpbuf.h"
#include "poll-loop.h"
-#include "queue.h"
#include "reconnect.h"
#include "stream.h"
#include "timeval.h"
#include "vlog.h"
-VLOG_DEFINE_THIS_MODULE(jsonrpc)
+VLOG_DEFINE_THIS_MODULE(jsonrpc);
\f
struct jsonrpc {
struct stream *stream;
struct jsonrpc_msg *received;
/* Output. */
- struct ovs_queue output;
+ struct list output; /* Contains "struct ofpbuf"s. */
size_t backlog;
};
rpc->name = xstrdup(stream_get_name(stream));
rpc->stream = stream;
byteq_init(&rpc->input);
- queue_init(&rpc->output);
+ list_init(&rpc->output);
return rpc;
}
}
stream_run(rpc->stream);
- while (!queue_is_empty(&rpc->output)) {
- struct ofpbuf *buf = rpc->output.head;
+ while (!list_is_empty(&rpc->output)) {
+ struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
int retval;
retval = stream_send(rpc->stream, buf->data, buf->size);
rpc->backlog -= retval;
ofpbuf_pull(buf, retval);
if (!buf->size) {
- ofpbuf_delete(queue_pop_head(&rpc->output));
+ list_remove(&buf->list_node);
+ ofpbuf_delete(buf);
}
} else {
if (retval != -EAGAIN) {
{
if (!rpc->status) {
stream_run_wait(rpc->stream);
- if (!queue_is_empty(&rpc->output)) {
+ if (!list_is_empty(&rpc->output)) {
stream_send_wait(rpc->stream);
}
}
}
+/*
+ * Possible status values:
+ * - 0: no error yet
+ * - >0: errno value
+ * - EOF: end of file (remote end closed connection; not necessarily an error)
+ */
int
jsonrpc_get_status(const struct jsonrpc *rpc)
{
buf = xmalloc(sizeof *buf);
ofpbuf_use(buf, s, length);
buf->size = length;
- queue_push_tail(&rpc->output, buf);
+ list_push_back(&rpc->output, &buf->list_node);
rpc->backlog += length;
- if (rpc->output.n == 1) {
+ if (rpc->backlog == length) {
jsonrpc_run(rpc);
}
return rpc->status;
jsonrpc_recv_wait(struct jsonrpc *rpc)
{
if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
- poll_immediate_wake();
+ (poll_immediate_wake)(rpc->name);
} else {
stream_recv_wait(rpc->stream);
}
for (;;) {
jsonrpc_run(rpc);
- if (queue_is_empty(&rpc->output) || rpc->status) {
+ if (list_is_empty(&rpc->output) || rpc->status) {
return rpc->status;
}
jsonrpc_wait(rpc);
jsonrpc_msg_destroy(rpc->received);
rpc->received = NULL;
- queue_clear(&rpc->output);
+ ofpbuf_list_delete(&rpc->output);
rpc->backlog = 0;
}
\f
return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
}
+/* Always returns a pointer to a valid C string, assuming 's' was initialized
+ * correctly. */
const char *
jsonrpc_session_get_name(const struct jsonrpc_session *s)
{
return s->seqno;
}
+int
+jsonrpc_session_get_status(const struct jsonrpc_session *s)
+{
+ return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
+}
+
+void
+jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
+ struct reconnect_stats *stats)
+{
+ reconnect_get_stats(s->reconnect, time_msec(), stats);
+}
+
void
jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
{
reconnect_force_reconnect(s->reconnect, time_msec());
}
+
+void
+jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
+{
+ reconnect_set_backoff(s->reconnect, 0, max_backoff);
+}
+
+void
+jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
+ int probe_interval)
+{
+ reconnect_set_probe_interval(s->reconnect, probe_interval);
+}