Catalli's threaded switch
[sliver-openvswitch.git] / lib / dpif-netdev.c
index 323f364..ba05eca 100644 (file)
 #include <sys/stat.h>
 #include <unistd.h>
 
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
 #include "csum.h"
 #include "dpif-provider.h"
 #include "flow.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netdev)
 
+/* We could use these macros instead of using #ifdef and #endif every time we
+ * need to call the pthread_mutex_lock/unlock.
+#ifdef THREADED
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
+*/
+
 /* Configuration parameters. */
 enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
 enum { MAX_QUEUE_LEN = 100 };   /* Maximum number of packets per queue. */
@@ -68,8 +88,25 @@ struct dp_netdev {
     bool destroyed;
 
     bool drop_frags;            /* Drop all IP fragments, if true. */
-    struct ovs_queue queues[N_QUEUES]; /* Messages queued for dpif_recv(). */
+
+#ifdef THREADED
+    /* The pipe is used to signal the presence of a packet on the queue.
+     * - dpif_netdev_recv_wait() waits on p[0]
+     * - dpif_netdev_recv() extract from queue and read p[0]
+     * - dp_netdev_output_control() send to queue and write p[1]
+     */
+
+    /* The access to this queue is protected by the table_mutex mutex */
+    int pipe[2];    /* signal a packet on the queue */
+
+    pthread_mutex_t table_mutex;    /* mutex for the flow table */
+    pthread_mutex_t port_list_mutex;    /* port list mutex */
+#endif
+    
+    struct ovs_queue queues[N_QUEUES];  /* messages queued for dpif_recv(). */
+
     struct hmap flow_table;     /* Flow table. */
+
     struct odp_port_group groups[N_GROUPS];
 
     /* Statistics. */
@@ -79,9 +116,9 @@ struct dp_netdev {
     long long int n_lost;       /* Number of misses not passed to client. */
 
     /* Ports. */
-    int n_ports;
     struct dp_netdev_port *ports[MAX_PORTS];
     struct list port_list;
+    int n_ports;
     unsigned int serial;
 };
 
@@ -90,7 +127,12 @@ struct dp_netdev_port {
     int port_no;                /* Index into dp_netdev's 'ports'. */
     struct list node;           /* Element in dp_netdev's 'port_list'. */
     struct netdev *netdev;
+
     bool internal;              /* Internal port (as ODP_PORT_INTERNAL)? */
+#ifdef THREADED
+    struct pollfd *poll_fd;     /* Useful to manage the poll loop in the
+                                 * thread */
+#endif
 };
 
 /* A flow in dp_netdev's 'flow_table'. */
@@ -122,6 +164,11 @@ static struct dp_netdev *dp_netdevs[256];
 struct list dp_netdev_list = LIST_INITIALIZER(&dp_netdev_list);
 enum { N_DP_NETDEVS = ARRAY_SIZE(dp_netdevs) };
 
+#ifdef THREADED
+/* Descriptor of the thread that manages the datapaths */
+pthread_t thread_p;
+#endif
+
 /* Maximum port MTU seen so far. */
 static int max_mtu = ETH_PAYLOAD_MAX;
 
@@ -213,7 +260,7 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     struct dp_netdev *dp;
     int error;
     int i;
-
+    
     if (dp_netdevs[dp_idx]) {
         return EBUSY;
     }
@@ -224,15 +271,33 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     dp->dp_idx = dp_idx;
     dp->open_cnt = 0;
     dp->drop_frags = false;
+
+#ifdef THREADED
+    error = pipe(dp->pipe);
+    if (error) {
+        fprintf(stderr, "pipe creation error\n");
+        return errno;
+    }
+    if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
+        fprintf(stderr, "error set_nonblock on pipe\n");
+        return errno;
+    }
+
+    pthread_mutex_init(&dp->table_mutex, NULL);
+    pthread_mutex_init(&dp->port_list_mutex, NULL);
+#endif
+
     for (i = 0; i < N_QUEUES; i++) {
         queue_init(&dp->queues[i]);
     }
+
     hmap_init(&dp->flow_table);
     for (i = 0; i < N_GROUPS; i++) {
         dp->groups[i].ports = NULL;
         dp->groups[i].n_ports = 0;
         dp->groups[i].group = i;
     }
+
     list_init(&dp->port_list);
     error = do_add_port(dp, name, ODP_PORT_INTERNAL, ODPP_LOCAL);
     if (error) {
@@ -285,15 +350,28 @@ dp_netdev_free(struct dp_netdev *dp)
     int i;
 
     dp_netdev_flow_flush(dp);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     while (dp->n_ports > 0) {
         struct dp_netdev_port *port = CONTAINER_OF(
             dp->port_list.next, struct dp_netdev_port, node);
         do_del_port(dp, port->port_no);
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     for (i = 0; i < N_QUEUES; i++) {
         queue_destroy(&dp->queues[i]);
     }
     hmap_destroy(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->port_list_mutex);
+#endif
+
     for (i = 0; i < N_GROUPS; i++) {
         free(dp->groups[i].ports);
     }
@@ -326,8 +404,16 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct odp_stats *stats)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     memset(stats, 0, sizeof *stats);
+
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     stats->n_flows = hmap_count(&dp->flow_table);
     stats->cur_capacity = hmap_capacity(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
     stats->max_capacity = MAX_FLOWS;
     stats->n_ports = dp->n_ports;
     stats->max_ports = MAX_PORTS;
@@ -395,15 +481,24 @@ do_add_port(struct dp_netdev *dp, const char *devname, uint16_t flags,
     port->port_no = port_no;
     port->netdev = netdev;
     port->internal = internal;
+#ifdef THREADED
+    port->poll_fd = NULL;
+#endif
 
     netdev_get_mtu(netdev, &mtu);
     if (mtu > max_mtu) {
         max_mtu = mtu;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     list_push_back(&dp->port_list, &port->node);
-    dp->ports[port_no] = port;
     dp->n_ports++;
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
+    dp->ports[port_no] = port;
     dp->serial++;
 
     return 0;
@@ -457,12 +552,21 @@ get_port_by_name(struct dp_netdev *dp,
 {
     struct dp_netdev_port *port;
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         if (!strcmp(netdev_get_name(port->netdev), devname)) {
             *portp = port;
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
             return 0;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return ENOENT;
 }
 
@@ -472,7 +576,7 @@ do_del_port(struct dp_netdev *dp, uint16_t port_no)
     struct dp_netdev_port *port;
     char *name;
     int error;
-
+    /* XXX why no semaphores?? */
     error = get_port_by_number(dp, port_no, &port);
     if (error) {
         return error;
@@ -535,7 +639,13 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
 static void
 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
 {
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_remove(&dp->flow_table, &flow->node);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     free(flow->actions);
     free(flow);
 }
@@ -567,6 +677,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
     int i;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         struct odp_port *odp_port = &ports[i];
         if (i >= n) {
@@ -575,6 +688,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
         answer_port_query(port, odp_port);
         i++;
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return dp->n_ports;
 }
 
@@ -656,17 +772,27 @@ dpif_netdev_port_group_set(struct dpif *dpif, int group_no,
 }
 
 static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const flow_t *key)
+dp_netdev_lookup_flow(struct dp_netdev *dp, const flow_t *key)
 {
     struct dp_netdev_flow *flow;
 
     assert(!key->reserved[0] && !key->reserved[1] && !key->reserved[2]);
+    
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH_WITH_HASH (flow, struct dp_netdev_flow, node,
                              flow_hash(key, 0), &dp->flow_table) {
         if (flow_equal(&flow->key, key)) {
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
             return flow;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return NULL;
 }
 
@@ -707,8 +833,11 @@ dpif_netdev_flow_get(const struct dpif *dpif, struct odp_flow flows[], int n)
 
     for (i = 0; i < n; i++) {
         struct odp_flow *odp_flow = &flows[i];
-        answer_flow_query(dp_netdev_lookup_flow(dp, &odp_flow->key),
-                          odp_flow->flags, odp_flow);
+    struct dp_netdev_flow *lookup_flow;
+
+    lookup_flow = dp_netdev_lookup_flow(dp, &odp_flow->key);
+    if ( lookup_flow == NULL )
+        answer_flow_query(lookup_flow, odp_flow->flags, odp_flow);
     }
     return 0;
 }
@@ -817,7 +946,13 @@ add_flow(struct dpif *dpif, struct odp_flow *odp_flow)
         return error;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return 0;
 }
 
@@ -836,11 +971,19 @@ dpif_netdev_flow_put(struct dpif *dpif, struct odp_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
+    int n_flows;
 
     flow = dp_netdev_lookup_flow(dp, &put->flow.key);
     if (!flow) {
         if (put->flags & ODPPF_CREATE) {
-            if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+#ifdef THREADED
+            pthread_mutex_lock(&dp->table_mutex);
+#endif
+            n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
+            if (n_flows < MAX_FLOWS) {
                 return add_flow(dpif, &put->flow);
             } else {
                 return EFBIG;
@@ -883,16 +1026,24 @@ dpif_netdev_flow_list(const struct dpif *dpif, struct odp_flow flows[], int n)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
-    int i;
+    int i, n_flows;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH (flow, struct dp_netdev_flow, node, &dp->flow_table) {
         if (i >= n) {
             break;
         }
         answer_flow_query(flow, 0, &flows[i++]);
     }
-    return hmap_count(&dp->flow_table);
+    n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
+    return n_flows;
 }
 
 static int
@@ -974,13 +1125,31 @@ find_nonempty_queue(struct dpif *dpif)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
+dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp OVS_UNUSED)
 {
-    struct ovs_queue *q = find_nonempty_queue(dpif);
+    struct ovs_queue *q;
+
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    char c;
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    q = find_nonempty_queue(dpif);
     if (q) {
         *bufp = queue_pop_head(q);
+#ifdef THREADED
+        /* read a byte from the pipe to advertise that a packet has been
+         * received */
+        if (read(dp->pipe[0], &c, 1) < 0) {
+            printf("Error reading from the pipe\n");
+        }
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return 0;
     } else {
+#ifdef THREADED
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return EAGAIN;
     }
 }
@@ -988,6 +1157,11 @@ dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    poll_fd_wait(dp->pipe[0], POLLIN);
+#else 
     struct ovs_queue *q = find_nonempty_queue(dpif);
     if (q) {
         poll_immediate_wake();
@@ -995,8 +1169,10 @@ dpif_netdev_recv_wait(struct dpif *dpif)
         /* No messages ready to be received, and dp_wait() will ensure that we
          * wake up to queue new messages, so there is nothing to do. */
     }
+#endif
 }
 \f
+
 static void
 dp_netdev_flow_used(struct dp_netdev_flow *flow, const flow_t *key,
                     const struct ofpbuf *packet)
@@ -1037,13 +1213,17 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
     }
 }
 
+/*
+ * This function is no longer called by the threaded version. The same task is
+ * instead performed in the thread body.
+ */
 static void
 dp_netdev_run(void)
 {
     struct ofpbuf packet;
     struct dp_netdev *dp;
 
-    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + max_mtu);
+    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
     LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
         struct dp_netdev_port *port;
 
@@ -1067,6 +1247,7 @@ dp_netdev_run(void)
     ofpbuf_uninit(&packet);
 }
 
+/* This function is no longer called in the threaded version. */
 static void
 dp_netdev_wait(void)
 {
@@ -1080,6 +1261,139 @@ dp_netdev_wait(void)
     }
 }
 
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+    struct dp_netdev *dp;   /* update statistics */
+    struct dp_netdev_port *port;    /* argument to flow identifier function */
+    struct ofpbuf buf;      /* used to process the packet */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet)
+{
+    struct dispatch_arg *arg = (struct dispatch_arg *)arg_p;
+    struct ofpbuf *buf = &arg->buf;
+
+    /* set packet size and data pointer */
+    buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or
+                              * hdr->caplen */
+    buf->data = (void*)packet;
+
+    dp_netdev_port_input(arg->dp, arg->port, buf);
+
+    return;
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+    struct dp_netdev *dp;
+    struct dp_netdev_port *port;
+    struct dispatch_arg arg;
+    int error;
+    int n_fds;
+    uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+    int processed;     /* actual number of pkts processed by the dispatch */
+
+    sigset_t sigmask;
+
+    /*XXX Since the poll involves all ports of all datapaths, the right fds
+     * size should be MAX_PORTS * max_number_of_datapaths */
+    struct pollfd fds[MAX_PORTS]; 
+    
+    /* mask the fatal signals. In this way the main thread is delegate to
+     * manage this them. */
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGALRM);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGHUP);
+
+    if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+        printf("Error pthread_sigmask\n");
+    }
+
+    ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
+    for(;;) {
+        n_fds = 0;
+        /* build the structure for poll */
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                /* insert an element in the fds structure */
+                fds[n_fds].fd = netdev_get_fd(port->netdev);
+                fds[n_fds].events = POLLIN;
+                port->poll_fd = &fds[n_fds];
+                n_fds++;
+            }
+            pthread_mutex_unlock(&dp->port_list_mutex);
+        }
+
+        error = poll(fds, n_fds, 2000);
+
+        if (error < 0) {
+            printf("poll() error: %s\n", strerror(errno));
+            break;
+        }
+
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            arg.dp = dp;
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                arg.port = port;
+                arg.buf.size = 0;
+                arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM;
+                if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+                    /* call the dispatch and process the packet into
+                     * its callback. We process 'batch' packets at time */
+                    processed = netdev_dispatch(port->netdev, batch,
+                                         process_pkt, (u_char *)&arg);
+                    if (processed < 0) { /* pcap returns error */
+                        struct vlog_rate_limit rl =
+                            VLOG_RATE_LIMIT_INIT(1, 5);
+                        VLOG_ERR_RL(&rl, 
+                                "error receiving data from XXX \n"); 
+                    }
+                } /* end of if poll */
+            } /* end of port loop */
+        pthread_mutex_unlock(&dp->port_list_mutex);
+        } /* end of dp loop */
+    } /* for ;; */
+
+    ofpbuf_uninit(&arg.buf);
+    return NULL;
+}
+
+/* Starts the datapath */
+static void
+dp_netdev_start(void) 
+{
+    int error;
+
+    /* Launch thread which manages the datapath */
+    error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+    return;
+}
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dp_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+    pthread_cancel(thread_p);
+    pthread_join(thread_p, NULL);
+}
+#endif /* THREADED */
 
 /* Modify the TCI field of 'packet'.  If a VLAN tag is not present, one
  * is added with the TCI field set to 'tci'.  If a VLAN tag is present,
@@ -1204,7 +1518,7 @@ static void
 dp_netdev_set_tp_port(struct ofpbuf *packet, const flow_t *key,
                       const struct odp_action_tp_port *a)
 {
-       if (is_ip(packet, key)) {
+    if (is_ip(packet, key)) {
         uint16_t *field;
         if (key->nw_proto == IPPROTO_TCP && packet->l7) {
             struct tcp_header *th = packet->l4;
@@ -1255,6 +1569,9 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     struct odp_msg *header;
     struct ofpbuf *msg;
     size_t msg_size;
+#ifdef THREADED
+    char c;
+#endif
 
     if (q->n >= MAX_QUEUE_LEN) {
         dp->n_lost++;
@@ -1269,7 +1586,18 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     header->port = port_no;
     header->arg = arg;
     ofpbuf_put(msg, packet->data, packet->size);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    
     queue_push_tail(q, msg);
+#ifdef THREADED
+    /* write a byte on the pipe to advertise that a packet is ready */
+    if (write(dp->pipe[1], &c, 1) < 0) {
+        printf("Error writing on the pipe\n");
+    }
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
 
     return 0;
 }
@@ -1302,12 +1630,16 @@ dp_netdev_is_spoofed_arp(struct ofpbuf *packet, const struct odp_flow_key *key)
             || !eth_addr_equals(arp->ar_sha, eth->eth_src));
 }
 
+/*
+ * Execute the actions associated to a flow.
+ */
 static int
 dp_netdev_execute_actions(struct dp_netdev *dp,
                           struct ofpbuf *packet, const flow_t *key,
                           const union odp_action *actions, int n_actions)
 {
     int i;
+
     for (i = 0; i < n_actions; i++) {
         const union odp_action *a = &actions[i];
 
@@ -1376,6 +1708,10 @@ const struct dpif_class dpif_netdev_class = {
     "netdev",
     dp_netdev_run,
     dp_netdev_wait,
+#ifdef THREADED
+    dp_netdev_start, 
+    dp_netdev_exit_hook,
+#endif
     NULL,                       /* enumerate */
     dpif_netdev_open,
     dpif_netdev_close,