Maintain separate async and sync connections to nl:0 in secchan.
authorBen Pfaff <blp@nicira.com>
Fri, 21 Nov 2008 21:05:37 +0000 (13:05 -0800)
committerBen Pfaff <blp@nicira.com>
Fri, 21 Nov 2008 21:05:37 +0000 (13:05 -0800)
When a network link is flooded with traffic, secchan's OpenFlow socket
queue becomes congested with traffic.  This leaves no room for replies
to OpenFlow requests relayed to that socket by secchan.

This commit modifies secchan to use separate sockets for asynchronous
traffic and for OpenFlow requests/replies, which should avoid the problem.

secchan/secchan.c
secchan/secchan.h

index 6f0a385..fa8d6e8 100644 (file)
@@ -87,11 +87,12 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
 static void parse_options(int argc, char *argv[], struct settings *);
 static void usage(void) NO_RETURN;
 
+static char *vconn_name_without_subscription(const char *);
 static struct pvconn *open_passive_vconn(const char *name);
 static struct vconn *accept_vconn(struct pvconn *pvconn);
 
-static struct relay *relay_create(struct rconn *local, struct rconn *remote,
-                                  bool is_mgmt_conn);
+static struct relay *relay_create(struct rconn *async,
+                                  struct rconn *local, struct rconn *remote);
 static struct relay *relay_accept(const struct settings *, struct pvconn *);
 static void relay_run(struct relay *, struct secchan *);
 static void relay_wait(struct relay *);
@@ -111,7 +112,8 @@ main(int argc, char *argv[])
     struct pvconn *listeners[MAX_MGMT];
     size_t n_listeners;
 
-    struct rconn *local_rconn, *remote_rconn;
+    char *local_rconn_name;
+    struct rconn *async_rconn, *local_rconn, *remote_rconn;
     struct relay *controller_relay;
     struct discovery *discovery;
     struct switch_status *switch_status;
@@ -152,14 +154,24 @@ main(int argc, char *argv[])
     VLOG_WARN("OpenFlow reference implementation version %s", VERSION BUILDNR);
     VLOG_WARN("OpenFlow protocol version 0x%02x", OFP_VERSION);
 
-    /* Connect to datapath. */
+    /* Check datapath name, to try to catch command-line invocation errors. */
     if (strncmp(s.dp_name, "nl:", 3) && strncmp(s.dp_name, "unix:", 5)
         && !s.controller_name) {
         VLOG_WARN("Controller not specified and datapath is not nl: or "
                   "unix:.  (Did you forget to specify the datapath?)");
     }
+
+    /* Connect to datapath with a subscription for asynchronous events. */
+    async_rconn = rconn_create(0, s.max_backoff);
+    rconn_connect(async_rconn, s.dp_name);
+    switch_status_register_category(switch_status, "async",
+                                    rconn_status_cb, async_rconn);
+
+    /* Connect to datapath without a subscription, for requests and replies. */
+    local_rconn_name = vconn_name_without_subscription(s.dp_name);
     local_rconn = rconn_create(0, s.max_backoff);
-    rconn_connect(local_rconn, s.dp_name);
+    rconn_connect(local_rconn, local_rconn_name);
+    free(local_rconn_name);
     switch_status_register_category(switch_status, "local",
                                     rconn_status_cb, local_rconn);
 
@@ -175,7 +187,7 @@ main(int argc, char *argv[])
                                     rconn_status_cb, remote_rconn);
 
     /* Start relaying. */
-    controller_relay = relay_create(local_rconn, remote_rconn, false);
+    controller_relay = relay_create(async_rconn, local_rconn, remote_rconn);
     list_push_back(&relays, &controller_relay->node);
 
     /* Set up hooks. */
@@ -221,6 +233,8 @@ main(int argc, char *argv[])
         if (monitor) {
             struct vconn *new = accept_vconn(monitor);
             if (new) {
+                /* XXX should monitor async_rconn too but rconn_add_monitor()
+                 * takes ownership of the vconn passed in. */
                 rconn_add_monitor(local_rconn, new);
             }
         }
@@ -341,36 +355,43 @@ get_ofp_packet_eth_header(struct relay *r, struct ofp_packet_in **opip,
 \f
 /* OpenFlow message relaying. */
 
-static struct relay *
-relay_accept(const struct settings *s, struct pvconn *pvconn)
+/* Returns a malloc'd string containing a copy of 'vconn_name' modified not to
+ * subscribe to asynchronous messages such as 'ofp_packet_in' events (if
+ * possible). */
+static char *
+vconn_name_without_subscription(const char *vconn_name)
 {
-    struct vconn *new_remote, *new_local;
-    struct rconn *r1, *r2;
-    char *vconn_name;
     int nl_index;
-    int retval;
-
-    new_remote = accept_vconn(pvconn);
-    if (!new_remote) {
-        return NULL;
-    }
-
-    if (sscanf(s->dp_name, "nl:%d", &nl_index) == 1) {
+    if (sscanf(vconn_name, "nl:%d", &nl_index) == 1) {
         /* nl:123 or nl:123:1 opens a netlink connection to local datapath 123.
          * nl:123:0 opens a netlink connection to local datapath 123 without
          * obtaining a subscription for ofp_packet_in or ofp_flow_expired
-         * messages.  That's what we want here; management connections should
-         * not receive those messages, at least by default. */
-        vconn_name = xasprintf("nl:%d:0", nl_index);
+         * messages. */
+        return xasprintf("nl:%d:0", nl_index);
     } else {
         /* We don't have a way to specify not to subscribe to those messages
          * for other transports.  (That's a defect: really this should be in
          * the OpenFlow protocol, not the Netlink transport). */
         VLOG_WARN_RL(&rl, "new management connection will receive "
                      "asynchronous messages");
-        vconn_name = xstrdup(s->dp_name);
+        return xstrdup(vconn_name);
+    }
+}
+
+static struct relay *
+relay_accept(const struct settings *s, struct pvconn *pvconn)
+{
+    struct vconn *new_remote, *new_local;
+    struct rconn *r1, *r2;
+    char *vconn_name;
+    int retval;
+
+    new_remote = accept_vconn(pvconn);
+    if (!new_remote) {
+        return NULL;
     }
 
+    vconn_name = vconn_name_without_subscription(s->dp_name);
     retval = vconn_open(vconn_name, OFP_VERSION, &new_local);
     if (retval) {
         VLOG_ERR_RL(&rl, "could not connect to %s (%s)",
@@ -388,16 +409,17 @@ relay_accept(const struct settings *s, struct pvconn *pvconn)
     r2 = rconn_create(0, 0);
     rconn_connect_unreliably(r2, "passive", new_remote);
 
-    return relay_create(r1, r2, true);
+    return relay_create(NULL, r1, r2);
 }
 
 static struct relay *
-relay_create(struct rconn *local, struct rconn *remote, bool is_mgmt_conn)
+relay_create(struct rconn *async, struct rconn *local, struct rconn *remote)
 {
     struct relay *r = xcalloc(1, sizeof *r);
     r->halves[HALF_LOCAL].rconn = local;
     r->halves[HALF_REMOTE].rconn = remote;
-    r->is_mgmt_conn = is_mgmt_conn;
+    r->is_mgmt_conn = async == NULL;
+    r->async_rconn = async;
     return r;
 }
 
@@ -433,6 +455,9 @@ relay_run(struct relay *r, struct secchan *secchan)
     int iteration;
     int i;
 
+    if (r->async_rconn) {
+        rconn_run(r->async_rconn);
+    }
     for (i = 0; i < 2; i++) {
         rconn_run(r->halves[i].rconn);
     }
@@ -446,6 +471,9 @@ relay_run(struct relay *r, struct secchan *secchan)
 
             if (!this->rxbuf) {
                 this->rxbuf = rconn_recv(this->rconn);
+                if (!this->rxbuf && i == HALF_LOCAL && r->async_rconn) {
+                    this->rxbuf = rconn_recv(r->async_rconn);
+                }
                 if (this->rxbuf && (i == HALF_REMOTE || !r->is_mgmt_conn)) {
                     if (i == HALF_LOCAL
                         ? call_local_packet_cbs(secchan, r)
@@ -493,12 +521,18 @@ relay_wait(struct relay *r)
 {
     int i;
 
+    if (r->async_rconn) {
+        rconn_run_wait(r->async_rconn);
+    }
     for (i = 0; i < 2; i++) {
         struct half *this = &r->halves[i];
 
         rconn_run_wait(this->rconn);
         if (!this->rxbuf) {
             rconn_recv_wait(this->rconn);
+            if (i == HALF_LOCAL && r->async_rconn) {
+                rconn_recv_wait(r->async_rconn);
+            }
         }
     }
 }
@@ -509,6 +543,7 @@ relay_destroy(struct relay *r)
     int i;
 
     list_remove(&r->node);
+    rconn_destroy(r->async_rconn);
     for (i = 0; i < 2; i++) {
         struct half *this = &r->halves[i];
         rconn_destroy(this->rconn);
index c7ead5a..1b08c69 100644 (file)
@@ -100,7 +100,18 @@ struct relay {
 #define HALF_REMOTE 1
     struct half halves[2];
 
-    bool is_mgmt_conn;
+    /* The secchan has a primary connection (relay) to an OpenFlow controller.
+     * This primary connection actually makes two connections to the datapath:
+     * one for OpenFlow requests and responses, and one that is only used for
+     * receiving asynchronous events such as 'ofp_packet_in' events.  This
+     * design keeps replies to OpenFlow requests from being dropped by the
+     * kernel due to a flooded network device.
+     *
+     * The secchan may also have any number of secondary "management"
+     * connections (relays).  These connections do not receive asychronous
+     * events and thus have a null 'async_rconn'. */
+    bool is_mgmt_conn;          /* Is this a management connection? */
+    struct rconn *async_rconn;  /* For receiving asynchronous events. */
 };
 
 struct hook_class {