jsonrpc: Return received JSON-RPC messages immediately in jsonrpc_recv().
authorBen Pfaff <blp@nicira.com>
Thu, 3 Apr 2014 22:27:18 +0000 (15:27 -0700)
committerBen Pfaff <blp@nicira.com>
Thu, 3 Apr 2014 23:11:09 +0000 (16:11 -0700)
Until now, jsonrpc_recv() used separate iterations of its loop to receive
data, feed it to the JSON-RPC parser, and return the received message.
This is unnecessarily complicated and can occasionally mean that the
jsonrpc object has received and parsed but not returned a message.  This
commit refactors the code to receive data, feed it to the parse, and
return the received message in a single iteration, and simplifies the code
in the process.

Reported-by: Chris Hydon <chydon@aristanetworks.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
AUTHORS
lib/jsonrpc.c

diff --git a/AUTHORS b/AUTHORS
index 7c84c2b..78640b8 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -159,6 +159,7 @@ Brent Salisbury         brent.salisbury@gmail.com
 Bryan Fulton            bryan@nicira.com
 Bryan Osoro             bosoro@nicira.com
 Cedric Hobbs            cedric@nicira.com
+Chris Hydon             chydon@aristanetworks.com
 Christopher Paggen      cpaggen@cisco.com
 Dave Walker             DaveWalker@ubuntu.com
 David Palma             palma@onesource.pt
index bda0f7f..842d117 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -44,7 +44,6 @@ struct jsonrpc {
     struct byteq input;
     uint8_t input_buffer[512];
     struct json_parser *parser;
-    struct jsonrpc_msg *received;
 
     /* Output. */
     struct list output;         /* Contains "struct ofpbuf"s. */
@@ -54,7 +53,7 @@ struct jsonrpc {
 /* Rate limit for error messages. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
 
-static void jsonrpc_received(struct jsonrpc *);
+static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
 static void jsonrpc_cleanup(struct jsonrpc *);
 static void jsonrpc_error(struct jsonrpc *, int error);
 
@@ -293,11 +292,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
     }
 
     for (i = 0; i < 50; i++) {
-        if (rpc->received) {
-            *msgp = rpc->received;
-            rpc->received = NULL;
-            return 0;
-        } else if (byteq_is_empty(&rpc->input)) {
+        size_t n, used;
+
+        /* Fill our input buffer if it's empty. */
+        if (byteq_is_empty(&rpc->input)) {
             size_t chunk;
             int retval;
 
@@ -317,27 +315,31 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
                 return EOF;
             }
             byteq_advance_head(&rpc->input, retval);
-        } else {
-            size_t n, used;
+        }
 
-            if (!rpc->parser) {
-                rpc->parser = json_parser_create(0);
+        /* We have some input.  Feed it into the JSON parser. */
+        if (!rpc->parser) {
+            rpc->parser = json_parser_create(0);
+        }
+        n = byteq_tailroom(&rpc->input);
+        used = json_parser_feed(rpc->parser,
+                                (char *) byteq_tail(&rpc->input), n);
+        byteq_advance_tail(&rpc->input, used);
+
+        /* If we have complete JSON, attempt to parse it as JSON-RPC. */
+        if (json_parser_is_done(rpc->parser)) {
+            *msgp = jsonrpc_parse_received_message(rpc);
+            if (*msgp) {
+                return 0;
             }
-            n = byteq_tailroom(&rpc->input);
-            used = json_parser_feed(rpc->parser,
-                                    (char *) byteq_tail(&rpc->input), n);
-            byteq_advance_tail(&rpc->input, used);
-            if (json_parser_is_done(rpc->parser)) {
-                jsonrpc_received(rpc);
-                if (rpc->status) {
-                    const struct byteq *q = &rpc->input;
-                    if (q->head <= q->size) {
-                        stream_report_content(q->buffer, q->head,
-                                              STREAM_JSONRPC,
-                                              THIS_MODULE, rpc->name);
-                    }
-                    return rpc->status;
+
+            if (rpc->status) {
+                const struct byteq *q = &rpc->input;
+                if (q->head <= q->size) {
+                    stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
+                                          THIS_MODULE, rpc->name);
                 }
+                return rpc->status;
             }
         }
     }
@@ -350,7 +352,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
 void
 jsonrpc_recv_wait(struct jsonrpc *rpc)
 {
-    if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
+    if (rpc->status || !byteq_is_empty(&rpc->input)) {
         poll_immediate_wake_at(rpc->name);
     } else {
         stream_recv_wait(rpc->stream);
@@ -440,8 +442,11 @@ jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
     return error;
 }
 
-static void
-jsonrpc_received(struct jsonrpc *rpc)
+/* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
+ * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
+ * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
+static struct jsonrpc_msg *
+jsonrpc_parse_received_message(struct jsonrpc *rpc)
 {
     struct jsonrpc_msg *msg;
     struct json *json;
@@ -454,7 +459,7 @@ jsonrpc_received(struct jsonrpc *rpc)
                      rpc->name, json_string(json));
         jsonrpc_error(rpc, EPROTO);
         json_destroy(json);
-        return;
+        return NULL;
     }
 
     error = jsonrpc_msg_from_json(json, &msg);
@@ -463,11 +468,11 @@ jsonrpc_received(struct jsonrpc *rpc)
                      rpc->name, error);
         free(error);
         jsonrpc_error(rpc, EPROTO);
-        return;
+        return NULL;
     }
 
     jsonrpc_log_msg(rpc, "received", msg);
-    rpc->received = msg;
+    return msg;
 }
 
 static void
@@ -489,9 +494,6 @@ jsonrpc_cleanup(struct jsonrpc *rpc)
     json_parser_abort(rpc->parser);
     rpc->parser = NULL;
 
-    jsonrpc_msg_destroy(rpc->received);
-    rpc->received = NULL;
-
     ofpbuf_list_delete(&rpc->output);
     rpc->backlog = 0;
 }