Catalli's threaded switch orig-parallel
authorGiuseppe Lettieri <g.lettieri@iet.unipi.it>
Thu, 17 May 2012 13:11:28 +0000 (15:11 +0200)
committerGiuseppe Lettieri <g.lettieri@iet.unipi.it>
Thu, 17 May 2012 13:11:28 +0000 (15:11 +0200)
20 files changed:
configure.ac
lib/dispatch.h [new file with mode: 0644]
lib/dpif-linux.c
lib/dpif-netdev.c
lib/dpif-provider.h
lib/dpif.c
lib/dpif.h
lib/fatal-signal.c
lib/fatal-signal.h
lib/netdev-linux.c
lib/netdev-patch.c
lib/netdev-provider.h
lib/netdev.c
lib/netdev.h
m4/openvswitch.m4
ofproto/ofproto.c
utilities/ovs-ofctl.c
utilities/ovs-openflowd.c
vswitchd/bridge.c
vswitchd/ovs-vswitchd.c

index 08a6f0f..2ef1383 100644 (file)
@@ -42,6 +42,7 @@ AC_SYS_LARGEFILE
 AC_SEARCH_LIBS([pow], [m])
 AC_SEARCH_LIBS([clock_gettime], [rt])
 
+OVS_CHECK_THREADED
 OVS_CHECK_COVERAGE
 OVS_CHECK_NDEBUG
 OVS_CHECK_NETLINK
diff --git a/lib/dispatch.h b/lib/dispatch.h
new file mode 100644 (file)
index 0000000..1bc1167
--- /dev/null
@@ -0,0 +1,16 @@
+#include <sys/types.h>
+#include <sys/time.h>
+
+#ifndef DISPATCH_H
+#define DISPATCH_H 1
+
+struct pkthdr {
+       struct timeval ts;      /* time stamp */
+       uint32_t caplen;        /* length of portion present */
+       uint32_t len;   /* length this packet (off wire) */
+};
+
+typedef void (*pkt_handler)(u_char *user, const struct pkthdr *h, 
+        const u_char *pkt);
+
+#endif /* DISPATCH_H */
index 2c688e3..6297f8c 100644 (file)
@@ -524,6 +524,10 @@ const struct dpif_class dpif_linux_class = {
     "system",
     NULL,
     NULL,
+#ifdef THREADED
+    NULL,
+    NULL,
+#endif
     dpif_linux_enumerate,
     dpif_linux_open,
     dpif_linux_close,
index 323f364..ba05eca 100644 (file)
 #include <sys/stat.h>
 #include <unistd.h>
 
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
 #include "csum.h"
 #include "dpif-provider.h"
 #include "flow.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netdev)
 
+/* We could use these macros instead of using #ifdef and #endif every time we
+ * need to call the pthread_mutex_lock/unlock.
+#ifdef THREADED
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
+*/
+
 /* Configuration parameters. */
 enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
 enum { MAX_QUEUE_LEN = 100 };   /* Maximum number of packets per queue. */
@@ -68,8 +88,25 @@ struct dp_netdev {
     bool destroyed;
 
     bool drop_frags;            /* Drop all IP fragments, if true. */
-    struct ovs_queue queues[N_QUEUES]; /* Messages queued for dpif_recv(). */
+
+#ifdef THREADED
+    /* The pipe is used to signal the presence of a packet on the queue.
+     * - dpif_netdev_recv_wait() waits on p[0]
+     * - dpif_netdev_recv() extract from queue and read p[0]
+     * - dp_netdev_output_control() send to queue and write p[1]
+     */
+
+    /* The access to this queue is protected by the table_mutex mutex */
+    int pipe[2];    /* signal a packet on the queue */
+
+    pthread_mutex_t table_mutex;    /* mutex for the flow table */
+    pthread_mutex_t port_list_mutex;    /* port list mutex */
+#endif
+    
+    struct ovs_queue queues[N_QUEUES];  /* messages queued for dpif_recv(). */
+
     struct hmap flow_table;     /* Flow table. */
+
     struct odp_port_group groups[N_GROUPS];
 
     /* Statistics. */
@@ -79,9 +116,9 @@ struct dp_netdev {
     long long int n_lost;       /* Number of misses not passed to client. */
 
     /* Ports. */
-    int n_ports;
     struct dp_netdev_port *ports[MAX_PORTS];
     struct list port_list;
+    int n_ports;
     unsigned int serial;
 };
 
@@ -90,7 +127,12 @@ struct dp_netdev_port {
     int port_no;                /* Index into dp_netdev's 'ports'. */
     struct list node;           /* Element in dp_netdev's 'port_list'. */
     struct netdev *netdev;
+
     bool internal;              /* Internal port (as ODP_PORT_INTERNAL)? */
+#ifdef THREADED
+    struct pollfd *poll_fd;     /* Useful to manage the poll loop in the
+                                 * thread */
+#endif
 };
 
 /* A flow in dp_netdev's 'flow_table'. */
@@ -122,6 +164,11 @@ static struct dp_netdev *dp_netdevs[256];
 struct list dp_netdev_list = LIST_INITIALIZER(&dp_netdev_list);
 enum { N_DP_NETDEVS = ARRAY_SIZE(dp_netdevs) };
 
+#ifdef THREADED
+/* Descriptor of the thread that manages the datapaths */
+pthread_t thread_p;
+#endif
+
 /* Maximum port MTU seen so far. */
 static int max_mtu = ETH_PAYLOAD_MAX;
 
@@ -213,7 +260,7 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     struct dp_netdev *dp;
     int error;
     int i;
-
+    
     if (dp_netdevs[dp_idx]) {
         return EBUSY;
     }
@@ -224,15 +271,33 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     dp->dp_idx = dp_idx;
     dp->open_cnt = 0;
     dp->drop_frags = false;
+
+#ifdef THREADED
+    error = pipe(dp->pipe);
+    if (error) {
+        fprintf(stderr, "pipe creation error\n");
+        return errno;
+    }
+    if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
+        fprintf(stderr, "error set_nonblock on pipe\n");
+        return errno;
+    }
+
+    pthread_mutex_init(&dp->table_mutex, NULL);
+    pthread_mutex_init(&dp->port_list_mutex, NULL);
+#endif
+
     for (i = 0; i < N_QUEUES; i++) {
         queue_init(&dp->queues[i]);
     }
+
     hmap_init(&dp->flow_table);
     for (i = 0; i < N_GROUPS; i++) {
         dp->groups[i].ports = NULL;
         dp->groups[i].n_ports = 0;
         dp->groups[i].group = i;
     }
+
     list_init(&dp->port_list);
     error = do_add_port(dp, name, ODP_PORT_INTERNAL, ODPP_LOCAL);
     if (error) {
@@ -285,15 +350,28 @@ dp_netdev_free(struct dp_netdev *dp)
     int i;
 
     dp_netdev_flow_flush(dp);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     while (dp->n_ports > 0) {
         struct dp_netdev_port *port = CONTAINER_OF(
             dp->port_list.next, struct dp_netdev_port, node);
         do_del_port(dp, port->port_no);
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     for (i = 0; i < N_QUEUES; i++) {
         queue_destroy(&dp->queues[i]);
     }
     hmap_destroy(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->port_list_mutex);
+#endif
+
     for (i = 0; i < N_GROUPS; i++) {
         free(dp->groups[i].ports);
     }
@@ -326,8 +404,16 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct odp_stats *stats)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     memset(stats, 0, sizeof *stats);
+
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     stats->n_flows = hmap_count(&dp->flow_table);
     stats->cur_capacity = hmap_capacity(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
     stats->max_capacity = MAX_FLOWS;
     stats->n_ports = dp->n_ports;
     stats->max_ports = MAX_PORTS;
@@ -395,15 +481,24 @@ do_add_port(struct dp_netdev *dp, const char *devname, uint16_t flags,
     port->port_no = port_no;
     port->netdev = netdev;
     port->internal = internal;
+#ifdef THREADED
+    port->poll_fd = NULL;
+#endif
 
     netdev_get_mtu(netdev, &mtu);
     if (mtu > max_mtu) {
         max_mtu = mtu;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     list_push_back(&dp->port_list, &port->node);
-    dp->ports[port_no] = port;
     dp->n_ports++;
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
+    dp->ports[port_no] = port;
     dp->serial++;
 
     return 0;
@@ -457,12 +552,21 @@ get_port_by_name(struct dp_netdev *dp,
 {
     struct dp_netdev_port *port;
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         if (!strcmp(netdev_get_name(port->netdev), devname)) {
             *portp = port;
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
             return 0;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return ENOENT;
 }
 
@@ -472,7 +576,7 @@ do_del_port(struct dp_netdev *dp, uint16_t port_no)
     struct dp_netdev_port *port;
     char *name;
     int error;
-
+    /* XXX why no semaphores?? */
     error = get_port_by_number(dp, port_no, &port);
     if (error) {
         return error;
@@ -535,7 +639,13 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
 static void
 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
 {
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_remove(&dp->flow_table, &flow->node);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     free(flow->actions);
     free(flow);
 }
@@ -567,6 +677,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
     int i;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         struct odp_port *odp_port = &ports[i];
         if (i >= n) {
@@ -575,6 +688,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
         answer_port_query(port, odp_port);
         i++;
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return dp->n_ports;
 }
 
@@ -656,17 +772,27 @@ dpif_netdev_port_group_set(struct dpif *dpif, int group_no,
 }
 
 static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const flow_t *key)
+dp_netdev_lookup_flow(struct dp_netdev *dp, const flow_t *key)
 {
     struct dp_netdev_flow *flow;
 
     assert(!key->reserved[0] && !key->reserved[1] && !key->reserved[2]);
+    
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH_WITH_HASH (flow, struct dp_netdev_flow, node,
                              flow_hash(key, 0), &dp->flow_table) {
         if (flow_equal(&flow->key, key)) {
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
             return flow;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return NULL;
 }
 
@@ -707,8 +833,11 @@ dpif_netdev_flow_get(const struct dpif *dpif, struct odp_flow flows[], int n)
 
     for (i = 0; i < n; i++) {
         struct odp_flow *odp_flow = &flows[i];
-        answer_flow_query(dp_netdev_lookup_flow(dp, &odp_flow->key),
-                          odp_flow->flags, odp_flow);
+    struct dp_netdev_flow *lookup_flow;
+
+    lookup_flow = dp_netdev_lookup_flow(dp, &odp_flow->key);
+    if ( lookup_flow == NULL )
+        answer_flow_query(lookup_flow, odp_flow->flags, odp_flow);
     }
     return 0;
 }
@@ -817,7 +946,13 @@ add_flow(struct dpif *dpif, struct odp_flow *odp_flow)
         return error;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return 0;
 }
 
@@ -836,11 +971,19 @@ dpif_netdev_flow_put(struct dpif *dpif, struct odp_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
+    int n_flows;
 
     flow = dp_netdev_lookup_flow(dp, &put->flow.key);
     if (!flow) {
         if (put->flags & ODPPF_CREATE) {
-            if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+#ifdef THREADED
+            pthread_mutex_lock(&dp->table_mutex);
+#endif
+            n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
+            if (n_flows < MAX_FLOWS) {
                 return add_flow(dpif, &put->flow);
             } else {
                 return EFBIG;
@@ -883,16 +1026,24 @@ dpif_netdev_flow_list(const struct dpif *dpif, struct odp_flow flows[], int n)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
-    int i;
+    int i, n_flows;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH (flow, struct dp_netdev_flow, node, &dp->flow_table) {
         if (i >= n) {
             break;
         }
         answer_flow_query(flow, 0, &flows[i++]);
     }
-    return hmap_count(&dp->flow_table);
+    n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
+    return n_flows;
 }
 
 static int
@@ -974,13 +1125,31 @@ find_nonempty_queue(struct dpif *dpif)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
+dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp OVS_UNUSED)
 {
-    struct ovs_queue *q = find_nonempty_queue(dpif);
+    struct ovs_queue *q;
+
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    char c;
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    q = find_nonempty_queue(dpif);
     if (q) {
         *bufp = queue_pop_head(q);
+#ifdef THREADED
+        /* read a byte from the pipe to advertise that a packet has been
+         * received */
+        if (read(dp->pipe[0], &c, 1) < 0) {
+            printf("Error reading from the pipe\n");
+        }
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return 0;
     } else {
+#ifdef THREADED
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return EAGAIN;
     }
 }
@@ -988,6 +1157,11 @@ dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    poll_fd_wait(dp->pipe[0], POLLIN);
+#else 
     struct ovs_queue *q = find_nonempty_queue(dpif);
     if (q) {
         poll_immediate_wake();
@@ -995,8 +1169,10 @@ dpif_netdev_recv_wait(struct dpif *dpif)
         /* No messages ready to be received, and dp_wait() will ensure that we
          * wake up to queue new messages, so there is nothing to do. */
     }
+#endif
 }
 \f
+
 static void
 dp_netdev_flow_used(struct dp_netdev_flow *flow, const flow_t *key,
                     const struct ofpbuf *packet)
@@ -1037,13 +1213,17 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
     }
 }
 
+/*
+ * This function is no longer called by the threaded version. The same task is
+ * instead performed in the thread body.
+ */
 static void
 dp_netdev_run(void)
 {
     struct ofpbuf packet;
     struct dp_netdev *dp;
 
-    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + max_mtu);
+    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
     LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
         struct dp_netdev_port *port;
 
@@ -1067,6 +1247,7 @@ dp_netdev_run(void)
     ofpbuf_uninit(&packet);
 }
 
+/* This function is no longer called in the threaded version. */
 static void
 dp_netdev_wait(void)
 {
@@ -1080,6 +1261,139 @@ dp_netdev_wait(void)
     }
 }
 
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+    struct dp_netdev *dp;   /* update statistics */
+    struct dp_netdev_port *port;    /* argument to flow identifier function */
+    struct ofpbuf buf;      /* used to process the packet */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet)
+{
+    struct dispatch_arg *arg = (struct dispatch_arg *)arg_p;
+    struct ofpbuf *buf = &arg->buf;
+
+    /* set packet size and data pointer */
+    buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or
+                              * hdr->caplen */
+    buf->data = (void*)packet;
+
+    dp_netdev_port_input(arg->dp, arg->port, buf);
+
+    return;
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+    struct dp_netdev *dp;
+    struct dp_netdev_port *port;
+    struct dispatch_arg arg;
+    int error;
+    int n_fds;
+    uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+    int processed;     /* actual number of pkts processed by the dispatch */
+
+    sigset_t sigmask;
+
+    /*XXX Since the poll involves all ports of all datapaths, the right fds
+     * size should be MAX_PORTS * max_number_of_datapaths */
+    struct pollfd fds[MAX_PORTS]; 
+    
+    /* mask the fatal signals. In this way the main thread is delegate to
+     * manage this them. */
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGALRM);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGHUP);
+
+    if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+        printf("Error pthread_sigmask\n");
+    }
+
+    ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
+    for(;;) {
+        n_fds = 0;
+        /* build the structure for poll */
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                /* insert an element in the fds structure */
+                fds[n_fds].fd = netdev_get_fd(port->netdev);
+                fds[n_fds].events = POLLIN;
+                port->poll_fd = &fds[n_fds];
+                n_fds++;
+            }
+            pthread_mutex_unlock(&dp->port_list_mutex);
+        }
+
+        error = poll(fds, n_fds, 2000);
+
+        if (error < 0) {
+            printf("poll() error: %s\n", strerror(errno));
+            break;
+        }
+
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            arg.dp = dp;
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                arg.port = port;
+                arg.buf.size = 0;
+                arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM;
+                if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+                    /* call the dispatch and process the packet into
+                     * its callback. We process 'batch' packets at time */
+                    processed = netdev_dispatch(port->netdev, batch,
+                                         process_pkt, (u_char *)&arg);
+                    if (processed < 0) { /* pcap returns error */
+                        struct vlog_rate_limit rl =
+                            VLOG_RATE_LIMIT_INIT(1, 5);
+                        VLOG_ERR_RL(&rl, 
+                                "error receiving data from XXX \n"); 
+                    }
+                } /* end of if poll */
+            } /* end of port loop */
+        pthread_mutex_unlock(&dp->port_list_mutex);
+        } /* end of dp loop */
+    } /* for ;; */
+
+    ofpbuf_uninit(&arg.buf);
+    return NULL;
+}
+
+/* Starts the datapath */
+static void
+dp_netdev_start(void) 
+{
+    int error;
+
+    /* Launch thread which manages the datapath */
+    error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+    return;
+}
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dp_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+    pthread_cancel(thread_p);
+    pthread_join(thread_p, NULL);
+}
+#endif /* THREADED */
 
 /* Modify the TCI field of 'packet'.  If a VLAN tag is not present, one
  * is added with the TCI field set to 'tci'.  If a VLAN tag is present,
@@ -1204,7 +1518,7 @@ static void
 dp_netdev_set_tp_port(struct ofpbuf *packet, const flow_t *key,
                       const struct odp_action_tp_port *a)
 {
-       if (is_ip(packet, key)) {
+    if (is_ip(packet, key)) {
         uint16_t *field;
         if (key->nw_proto == IPPROTO_TCP && packet->l7) {
             struct tcp_header *th = packet->l4;
@@ -1255,6 +1569,9 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     struct odp_msg *header;
     struct ofpbuf *msg;
     size_t msg_size;
+#ifdef THREADED
+    char c;
+#endif
 
     if (q->n >= MAX_QUEUE_LEN) {
         dp->n_lost++;
@@ -1269,7 +1586,18 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     header->port = port_no;
     header->arg = arg;
     ofpbuf_put(msg, packet->data, packet->size);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    
     queue_push_tail(q, msg);
+#ifdef THREADED
+    /* write a byte on the pipe to advertise that a packet is ready */
+    if (write(dp->pipe[1], &c, 1) < 0) {
+        printf("Error writing on the pipe\n");
+    }
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
 
     return 0;
 }
@@ -1302,12 +1630,16 @@ dp_netdev_is_spoofed_arp(struct ofpbuf *packet, const struct odp_flow_key *key)
             || !eth_addr_equals(arp->ar_sha, eth->eth_src));
 }
 
+/*
+ * Execute the actions associated to a flow.
+ */
 static int
 dp_netdev_execute_actions(struct dp_netdev *dp,
                           struct ofpbuf *packet, const flow_t *key,
                           const union odp_action *actions, int n_actions)
 {
     int i;
+
     for (i = 0; i < n_actions; i++) {
         const union odp_action *a = &actions[i];
 
@@ -1376,6 +1708,10 @@ const struct dpif_class dpif_netdev_class = {
     "netdev",
     dp_netdev_run,
     dp_netdev_wait,
+#ifdef THREADED
+    dp_netdev_start, 
+    dp_netdev_exit_hook,
+#endif
     NULL,                       /* enumerate */
     dpif_netdev_open,
     dpif_netdev_close,
index 1106db8..5a36af3 100644 (file)
@@ -77,6 +77,16 @@ struct dpif_class {
      * to be called. */
     void (*wait)(void);
 
+#ifdef THREADED
+    /* Starts the datapath management. This function is thought for a scenario
+     * in which the datapath and the ofproto modules are managed in different
+     * threads/processes */
+    void (*start)(void);
+
+    /* Function called in the arrival of a fatal signal (e.g. SIGTERM) */
+    void (*exit_hook)(void*);
+#endif
+
     /* Enumerates the names of all known created datapaths, if possible, into
      * 'all_dps'.  The caller has already initialized 'all_dps' and other dpif
      * classes might already have added names to it.
index 01e905d..28b936b 100644 (file)
@@ -37,6 +37,7 @@
 #include "svec.h"
 #include "util.h"
 #include "valgrind.h"
+#include "fatal-signal.h"
 #include "vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif)
@@ -54,6 +55,7 @@ struct registered_dpif_class {
 };
 static struct shash dpif_classes = SHASH_INITIALIZER(&dpif_classes);
 
+
 /* Rate limit for individual messages going to or from the datapath, output at
  * DBG level.  This is very high because, if these are enabled, it is because
  * we really need to see them. */
@@ -78,14 +80,30 @@ dp_initialize(void)
 
     if (status < 0) {
         int i;
+#ifdef THREADED
+        struct shash_node *node;
+#endif
 
         status = 0;
         for (i = 0; i < ARRAY_SIZE(base_dpif_classes); i++) {
             dp_register_provider(base_dpif_classes[i]);
         }
+        
+#ifdef THREADED
+        /* register an exit handler for the registered classes */
+        SHASH_FOR_EACH(node, &dpif_classes) {
+            const struct registered_dpif_class *registered_class = node->data;
+            if (registered_class->dpif_class.exit_hook) {
+                fatal_signal_add_hook(registered_class->dpif_class.exit_hook,
+                        NULL, NULL, true);
+            }
+        }
+#endif
     }
 }
 
+
+
 /* Performs periodic work needed by all the various kinds of dpifs.
  *
  * If your program opens any dpifs, it must call both this function and
@@ -118,13 +136,34 @@ dp_wait(void)
     }
 }
 
+#ifdef THREADED
+/* Start the datapath management.
+ * 
+ * This function has been thought for a scenario in which the management of the
+ * datapath module and the ofproto module are performed in separate
+ * threads/processes module. */
+void
+dp_start(void)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH(node, &dpif_classes) {
+        const struct registered_dpif_class *registered_class = node->data;
+        if (registered_class->dpif_class.start) {
+            registered_class->dpif_class.start();
+        }
+    }
+}
+#endif
+
+
 /* Registers a new datapath provider.  After successful registration, new
  * datapaths of that type can be opened using dpif_open(). */
 int
 dp_register_provider(const struct dpif_class *new_class)
 {
     struct registered_dpif_class *registered_class;
-
+       
     if (shash_find(&dpif_classes, new_class->type)) {
         VLOG_WARN("attempted to register duplicate datapath provider: %s",
                   new_class->type);
index 1496c22..d6a0cde 100644 (file)
@@ -36,6 +36,9 @@ struct dpif_class;
 
 void dp_run(void);
 void dp_wait(void);
+#ifdef THREADED
+void dp_start(void);
+#endif
 
 int dp_register_provider(const struct dpif_class *);
 int dp_unregister_provider(const char *type);
index 3f118f8..fbc4bbf 100644 (file)
@@ -17,7 +17,6 @@
 #include "fatal-signal.h"
 #include <assert.h>
 #include <errno.h>
-#include <signal.h>
 #include <stdbool.h>
 #include <stdio.h>
 #include <stdint.h>
index 94a1f1f..a41eb05 100644 (file)
@@ -18,6 +18,7 @@
 #define FATAL_SIGNAL_H 1
 
 #include <stdbool.h>
+#include <signal.h>
 
 /* Basic interface. */
 void fatal_signal_add_hook(void (*hook_cb)(void *aux),
index e6036bf..430474e 100644 (file)
@@ -2079,6 +2079,10 @@ const struct netdev_class netdev_linux_class = {
 
     netdev_linux_recv,
     netdev_linux_recv_wait,
+#ifdef THREADED
+    NULL, /* dispatch */
+    NULL, /* get_fd */
+#endif
     netdev_linux_drain,
 
     netdev_linux_send,
@@ -2139,6 +2143,10 @@ const struct netdev_class netdev_tap_class = {
 
     netdev_linux_recv,
     netdev_linux_recv_wait,
+#ifdef THREADED
+    NULL, /* dispatch */
+    NULL, /* get_fd */
+#endif
     netdev_linux_drain,
 
     netdev_linux_send,
index 7e8b199..ea0f88b 100644 (file)
@@ -191,6 +191,10 @@ const struct netdev_class netdev_patch_class = {
 
     NULL,                       /* recv */
     NULL,                       /* recv_wait */
+#ifdef THREADED
+    NULL,                       /* dispatch */
+    NULL,                       /* get_fd */
+#endif
     NULL,                       /* drain */
 
     NULL,                       /* send */
index c0ed4ef..46884ed 100644 (file)
@@ -24,6 +24,9 @@
 #include "netdev.h"
 #include "list.h"
 #include "shash.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
 
 #ifdef  __cplusplus
 extern "C" {
@@ -194,6 +197,22 @@ struct netdev_class {
      * implement packet reception through the 'recv' member function. */
     void (*recv_wait)(struct netdev *netdev);
 
+#ifdef THREADED
+    /* Attempts to receive 'batch' packets from 'netdev' and process them
+     * through the 'handler' callback. This function is used in the 'THREADED'
+     * version in order to optimize the forwarding process, since it permits to
+     * process packets directly in the netdev memory.
+     * 
+     * Returns the number of packets processed on success; this can be 0 if no
+     * packets are available to be read. Returns -1 if an error occurred.
+     */
+    int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
+                    u_char *user);
+
+    /* Return the file descriptor of the device */
+    int (*get_fd)(struct netdev *netdev);
+#endif
+
     /* Discards all packets waiting to be received from 'netdev'.
      *
      * May be null if not needed, such as for a network device that does not
@@ -551,6 +570,12 @@ extern const struct netdev_class netdev_tap_class;
 extern const struct netdev_class netdev_patch_class;
 extern const struct netdev_class netdev_gre_class;
 extern const struct netdev_class netdev_capwap_class;
+#ifdef __FreeBSD__
+extern const struct netdev_class netdev_bsd_class;
+#ifdef NETMAP
+extern const struct netdev_class netdev_netmap_class;
+#endif
+#endif
 
 #ifdef  __cplusplus
 }
index 24c2a88..00ded43 100644 (file)
@@ -74,7 +74,6 @@ netdev_initialize(void)
 
     if (status < 0) {
         int i;
-
         fatal_signal_add_hook(close_all_netdevs, NULL, NULL, true);
 
         status = 0;
@@ -122,7 +121,7 @@ int
 netdev_register_provider(const struct netdev_class *new_class)
 {
     struct netdev_class *new_provider;
-
+    
     if (shash_find(&netdev_classes, new_class->type)) {
         VLOG_WARN("attempted to register duplicate netdev provider: %s",
                    new_class->type);
@@ -505,6 +504,28 @@ netdev_recv_wait(struct netdev *netdev)
     }
 }
 
+#ifdef THREADED
+/* Attempts to receive and process 'batch' packets from 'netdev'. */
+int
+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
+{
+    int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
+
+    dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
+    return dispatch ? dispatch(netdev, batch, h, user) : 0;
+}
+
+/* Returns the file descriptor */
+int
+netdev_get_fd(struct netdev *netdev)
+{
+    int (*get_fd)(struct netdev *);
+
+    get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
+    return get_fd ? get_fd(netdev) : 0;
+}
+#endif
+
 /* Discards all packets waiting to be received from 'netdev'. */
 int
 netdev_drain(struct netdev *netdev)
index cd5c8c3..a9081eb 100644 (file)
 #include <stddef.h>
 #include <stdint.h>
 
+#ifdef THREADED
+#include "dispatch.h"
+#endif
+
 #ifdef  __cplusplus
 extern "C" {
 #endif
@@ -117,6 +121,10 @@ int netdev_get_ifindex(const struct netdev *);
 /* Packet send and receive. */
 int netdev_recv(struct netdev *, struct ofpbuf *);
 void netdev_recv_wait(struct netdev *);
+#ifdef THREADED
+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
+int netdev_get_fd(struct netdev *);
+#endif
 int netdev_drain(struct netdev *);
 
 int netdev_send(struct netdev *, const struct ofpbuf *);
index 5d77ca2..4cb5891 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+dnl Check for --enable-threaded and updates CFLAGS.
+AC_DEFUN([OVS_CHECK_THREADED],
+  [AC_REQUIRE([AC_PROG_CC])
+   AC_ARG_ENABLE(
+     [threaded],
+     [AC_HELP_STRING([--enable-threaded], 
+                     [Enable threaded version of userspace implementation])],
+     [case "${enableval}" in
+        (yes) coverage=true ;;
+        (no)  coverage=false ;;
+        (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
+      esac],
+     [threaded=false])
+   if $threaded; then
+      AC_DEFINE([THREADED], [1],
+                [Define to 1 if the threaded version of userspace
+                implementation is enabled.])
+   fi])
+
 dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
 AC_DEFUN([OVS_CHECK_COVERAGE],
   [AC_REQUIRE([AC_PROG_CC])
index 844083d..d9a278e 100644 (file)
@@ -995,6 +995,7 @@ ofproto_destroy(struct ofproto *p)
 int
 ofproto_run(struct ofproto *p)
 {
+    /* handle protocol messages coming from the datapath */
     int error = ofproto_run1(p);
     if (!error) {
         error = ofproto_run2(p, false);
@@ -1056,6 +1057,11 @@ add_snooper(struct ofproto *ofproto, struct vconn *vconn)
     }
 }
 
+/*
+ * Calls the netdevice dpif_netdev_recv() callback,
+ * that read a protocol packet from the dpif queue
+ * and handle the message
+ */
 int
 ofproto_run1(struct ofproto *p)
 {
@@ -1278,6 +1284,7 @@ ofproto_send_packet(struct ofproto *p, const flow_t *flow,
 
     /* XXX Should we translate the dpif_execute() errno value into an OpenFlow
      * error code? */
+    fprintf(stderr, "OFPROTO EXECUTE\n");
     dpif_execute(p->dpif, flow->in_port, odp_actions.actions,
                  odp_actions.n_actions, packet);
     return 0;
index dc6d5e3..39a2e2f 100644 (file)
@@ -18,6 +18,7 @@
 #include <errno.h>
 #include <getopt.h>
 #include <inttypes.h>
+#include <sys/socket.h> //XXX
 #include <net/if.h>
 #include <signal.h>
 #include <stdlib.h>
index 8cb50e4..5d780e1 100644 (file)
@@ -152,18 +152,30 @@ main(int argc, char *argv[])
 
     daemonize_complete();
 
+#ifdef THREADED
+    /* Data thread started */
+    fprintf(stdout, "THREADED version running!\n");
+    dp_start();
+#endif
+
+    /* The following loop polls on protocol messages 
+     * and on messages related to topology changes */
     while (ofproto_is_alive(ofproto)) {
         error = ofproto_run(ofproto);
         if (error) {
             ovs_fatal(error, "unrecoverable datapath error");
         }
         unixctl_server_run(unixctl);
+#ifndef THREADED
         dp_run();
+#endif
         netdev_run();
 
         ofproto_wait(ofproto);
         unixctl_server_wait(unixctl);
+#ifndef THREADED
         dp_wait();
+#endif
         netdev_wait();
         poll_block();
     }
index 598b001..3e4458a 100644 (file)
@@ -1260,6 +1260,7 @@ bridge_create(const struct ovsrec_bridge *br_cfg)
 {
     struct bridge *br;
     int error;
+    static int first = 1;
 
     assert(!bridge_lookup(br_cfg->name));
     br = xzalloc(sizeof *br);
@@ -1299,6 +1300,15 @@ bridge_create(const struct ovsrec_bridge *br_cfg)
 
     VLOG_INFO("created bridge %s on %s", br->name, dpif_name(br->dpif));
 
+#ifdef THREADED
+    /* The first time a bridge is created, we launch the datapath thread */
+    if (first) {
+        fprintf(stderr, "THREADED version running!\n");
+        dp_start();
+        first = 0;
+    }
+#endif
+
     return br;
 }
 
index 541cdcb..9ed2404 100644 (file)
@@ -91,13 +91,17 @@ main(int argc, char *argv[])
         }
         bridge_run();
         unixctl_server_run(unixctl);
+#ifndef THREADED
         dp_run();
+#endif
         netdev_run();
 
         signal_wait(sighup);
         bridge_wait();
         unixctl_server_wait(unixctl);
+#ifndef THREADED
         dp_wait();
+#endif
         netdev_wait();
         poll_block();
     }