/*
- * Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include "jsonrpc.h"
-#include <assert.h>
#include <errno.h>
#include "byteq.h"
#include "json.h"
#include "list.h"
#include "ofpbuf.h"
+#include "ovs-thread.h"
#include "poll-loop.h"
#include "reconnect.h"
#include "stream.h"
/* Input. */
struct byteq input;
+ uint8_t input_buffer[512];
struct json_parser *parser;
struct jsonrpc_msg *received;
{
struct jsonrpc *rpc;
- assert(stream != NULL);
+ ovs_assert(stream != NULL);
rpc = xzalloc(sizeof *rpc);
rpc->name = xstrdup(stream_get_name(stream));
rpc->stream = stream;
- byteq_init(&rpc->input);
+ byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
list_init(&rpc->output);
return rpc;
} else {
if (retval != -EAGAIN) {
VLOG_WARN_RL(&rl, "%s: send error: %s",
- rpc->name, strerror(-retval));
+ rpc->name, ovs_strerror(-retval));
jsonrpc_error(rpc, -retval);
}
break;
return EAGAIN;
} else {
VLOG_WARN_RL(&rl, "%s: receive error: %s",
- rpc->name, strerror(-retval));
+ rpc->name, ovs_strerror(-retval));
jsonrpc_error(rpc, -retval);
return rpc->status;
}
jsonrpc_received(rpc);
if (rpc->status) {
const struct byteq *q = &rpc->input;
- if (q->head <= BYTEQ_SIZE) {
+ if (q->head <= q->size) {
stream_report_content(q->buffer, q->head,
STREAM_JSONRPC,
THIS_MODULE, rpc->name);
static void
jsonrpc_error(struct jsonrpc *rpc, int error)
{
- assert(error);
+ ovs_assert(error);
if (!rpc->status) {
rpc->status = error;
jsonrpc_cleanup(rpc);
static struct json *
jsonrpc_create_id(void)
{
- static unsigned int id;
- return json_integer_create(id++);
+ static pthread_mutex_t mutex = PTHREAD_ADAPTIVE_MUTEX_INITIALIZER;
+ static unsigned int next_id;
+ unsigned int id;
+
+ xpthread_mutex_lock(&mutex);
+ id = next_id++;
+ xpthread_mutex_unlock(&mutex);
+
+ return json_integer_create(id);
}
struct jsonrpc_msg *
struct jsonrpc *rpc;
struct stream *stream;
struct pstream *pstream;
+ int last_error;
unsigned int seqno;
uint8_t dscp;
};
* acceptable to stream_open() or pstream_open().
*
* If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
- * jsonrpc_session connects and reconnects, with back-off, to 'name'.
+ * jsonrpc_session connects to 'name'. If 'retry' is true, then the new
+ * session connects and reconnects to 'name', with backoff. If 'retry' is
+ * false, the new session will only try to connect once and after a connection
+ * failure or a disconnection jsonrpc_session_is_alive() will return false for
+ * the new session.
*
* If 'name' is a passive connection method, e.g. "ptcp:", the new
* jsonrpc_session listens for connections to 'name'. It maintains at most one
* connection at any given time. Any new connection causes the previous one
* (if any) to be dropped. */
struct jsonrpc_session *
-jsonrpc_session_open(const char *name)
+jsonrpc_session_open(const char *name, bool retry)
{
struct jsonrpc_session *s;
s->pstream = NULL;
s->seqno = 0;
s->dscp = 0;
+ s->last_error = 0;
if (!pstream_verify_name(name)) {
reconnect_set_passive(s->reconnect, true, time_msec());
+ } else if (!retry) {
+ reconnect_set_max_tries(s->reconnect, 1);
+ reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
}
if (!stream_or_pstream_needs_probes(name)) {
* On the assumption that such connections are likely to be short-lived
* (e.g. from ovs-vsctl), informational logging for them is suppressed. */
struct jsonrpc_session *
-jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
+jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
{
struct jsonrpc_session *s;
reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
reconnect_set_max_tries(s->reconnect, 0);
reconnect_connected(s->reconnect, time_msec());
- s->dscp = 0;
+ s->dscp = dscp;
s->rpc = jsonrpc;
s->stream = NULL;
s->pstream = NULL;
error = jsonrpc_stream_open(name, &s->stream, s->dscp);
if (!error) {
reconnect_connecting(s->reconnect, time_msec());
+ } else {
+ s->last_error = error;
}
} else {
error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
if (error) {
reconnect_disconnected(s->reconnect, time_msec(), error);
jsonrpc_session_disconnect(s);
+ s->last_error = error;
}
} else if (s->stream) {
int error;
return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
}
+int
+jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
+{
+ return s->last_error;
+}
+
void
jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
struct reconnect_stats *stats)
uint8_t dscp)
{
if (s->dscp != dscp) {
+ if (s->pstream) {
+ int error;
+
+ error = pstream_set_dscp(s->pstream, dscp);
+ if (error) {
+ VLOG_ERR("%s: failed set_dscp %s",
+ reconnect_get_name(s->reconnect),
+ ovs_strerror(error));
+ }
+ /*
+ * XXX race window between setting dscp to listening socket
+ * and accepting socket. accepted socket may have old dscp value.
+ * Ignore this race window for now.
+ */
+ }
s->dscp = dscp;
jsonrpc_session_force_reconnect(s);
}