Catalli's threaded switch
[sliver-openvswitch.git] / lib / dpif-netdev.c
index 1dee14b..ba05eca 100644 (file)
 #include <errno.h>
 #include <fcntl.h>
 #include <inttypes.h>
-#include <net/if.h>
 #include <netinet/in.h>
+#include <sys/socket.h>
+#include <net/if.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/ioctl.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
 #include "csum.h"
 #include "dpif-provider.h"
 #include "flow.h"
 #include "queue.h"
 #include "timeval.h"
 #include "util.h"
-
 #include "vlog.h"
-#define THIS_MODULE VLM_dpif_netdev
+
+VLOG_DEFINE_THIS_MODULE(dpif_netdev)
+
+/* We could use these macros instead of using #ifdef and #endif every time we
+ * need to call the pthread_mutex_lock/unlock.
+#ifdef THREADED
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
+*/
 
 /* Configuration parameters. */
 enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
@@ -67,8 +88,25 @@ struct dp_netdev {
     bool destroyed;
 
     bool drop_frags;            /* Drop all IP fragments, if true. */
-    struct ovs_queue queues[N_QUEUES]; /* Messages queued for dpif_recv(). */
+
+#ifdef THREADED
+    /* The pipe is used to signal the presence of a packet on the queue.
+     * - dpif_netdev_recv_wait() waits on p[0]
+     * - dpif_netdev_recv() extract from queue and read p[0]
+     * - dp_netdev_output_control() send to queue and write p[1]
+     */
+
+    /* The access to this queue is protected by the table_mutex mutex */
+    int pipe[2];    /* signal a packet on the queue */
+
+    pthread_mutex_t table_mutex;    /* mutex for the flow table */
+    pthread_mutex_t port_list_mutex;    /* port list mutex */
+#endif
+    
+    struct ovs_queue queues[N_QUEUES];  /* messages queued for dpif_recv(). */
+
     struct hmap flow_table;     /* Flow table. */
+
     struct odp_port_group groups[N_GROUPS];
 
     /* Statistics. */
@@ -78,9 +116,9 @@ struct dp_netdev {
     long long int n_lost;       /* Number of misses not passed to client. */
 
     /* Ports. */
-    int n_ports;
     struct dp_netdev_port *ports[MAX_PORTS];
     struct list port_list;
+    int n_ports;
     unsigned int serial;
 };
 
@@ -89,7 +127,12 @@ struct dp_netdev_port {
     int port_no;                /* Index into dp_netdev's 'ports'. */
     struct list node;           /* Element in dp_netdev's 'port_list'. */
     struct netdev *netdev;
+
     bool internal;              /* Internal port (as ODP_PORT_INTERNAL)? */
+#ifdef THREADED
+    struct pollfd *poll_fd;     /* Useful to manage the poll loop in the
+                                 * thread */
+#endif
 };
 
 /* A flow in dp_netdev's 'flow_table'. */
@@ -98,11 +141,10 @@ struct dp_netdev_flow {
     flow_t key;
 
     /* Statistics. */
-       struct timeval used;        /* Last used time, in milliseconds. */
-       long long int packet_count; /* Number of packets matched. */
-       long long int byte_count;   /* Number of bytes matched. */
-       uint8_t ip_tos;             /* IP TOS value. */
-       uint16_t tcp_ctl;           /* Bitwise-OR of seen tcp_ctl values. */
+    struct timespec used;       /* Last used time. */
+    long long int packet_count; /* Number of packets matched. */
+    long long int byte_count;   /* Number of bytes matched. */
+    uint16_t tcp_ctl;           /* Bitwise-OR of seen tcp_ctl values. */
 
     /* Actions. */
     union odp_action *actions;
@@ -122,6 +164,11 @@ static struct dp_netdev *dp_netdevs[256];
 struct list dp_netdev_list = LIST_INITIALIZER(&dp_netdev_list);
 enum { N_DP_NETDEVS = ARRAY_SIZE(dp_netdevs) };
 
+#ifdef THREADED
+/* Descriptor of the thread that manages the datapaths */
+pthread_t thread_p;
+#endif
+
 /* Maximum port MTU seen so far. */
 static int max_mtu = ETH_PAYLOAD_MAX;
 
@@ -137,7 +184,7 @@ static int do_del_port(struct dp_netdev *, uint16_t port_no);
 static int dp_netdev_output_control(struct dp_netdev *, const struct ofpbuf *,
                                     int queue_no, int port_no, uint32_t arg);
 static int dp_netdev_execute_actions(struct dp_netdev *,
-                                     struct ofpbuf *, flow_t *,
+                                     struct ofpbuf *, const flow_t *,
                                      const union odp_action *, int n);
 
 static struct dpif_netdev *
@@ -213,7 +260,7 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     struct dp_netdev *dp;
     int error;
     int i;
-
+    
     if (dp_netdevs[dp_idx]) {
         return EBUSY;
     }
@@ -224,15 +271,33 @@ create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
     dp->dp_idx = dp_idx;
     dp->open_cnt = 0;
     dp->drop_frags = false;
+
+#ifdef THREADED
+    error = pipe(dp->pipe);
+    if (error) {
+        fprintf(stderr, "pipe creation error\n");
+        return errno;
+    }
+    if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
+        fprintf(stderr, "error set_nonblock on pipe\n");
+        return errno;
+    }
+
+    pthread_mutex_init(&dp->table_mutex, NULL);
+    pthread_mutex_init(&dp->port_list_mutex, NULL);
+#endif
+
     for (i = 0; i < N_QUEUES; i++) {
         queue_init(&dp->queues[i]);
     }
+
     hmap_init(&dp->flow_table);
     for (i = 0; i < N_GROUPS; i++) {
         dp->groups[i].ports = NULL;
         dp->groups[i].n_ports = 0;
         dp->groups[i].group = i;
     }
+
     list_init(&dp->port_list);
     error = do_add_port(dp, name, ODP_PORT_INTERNAL, ODPP_LOCAL);
     if (error) {
@@ -285,15 +350,28 @@ dp_netdev_free(struct dp_netdev *dp)
     int i;
 
     dp_netdev_flow_flush(dp);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     while (dp->n_ports > 0) {
         struct dp_netdev_port *port = CONTAINER_OF(
             dp->port_list.next, struct dp_netdev_port, node);
         do_del_port(dp, port->port_no);
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     for (i = 0; i < N_QUEUES; i++) {
         queue_destroy(&dp->queues[i]);
     }
     hmap_destroy(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->table_mutex);
+    pthread_mutex_destroy(&dp->port_list_mutex);
+#endif
+
     for (i = 0; i < N_GROUPS; i++) {
         free(dp->groups[i].ports);
     }
@@ -326,8 +404,16 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct odp_stats *stats)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     memset(stats, 0, sizeof *stats);
+
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     stats->n_flows = hmap_count(&dp->flow_table);
     stats->cur_capacity = hmap_capacity(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
     stats->max_capacity = MAX_FLOWS;
     stats->n_ports = dp->n_ports;
     stats->max_ports = MAX_PORTS;
@@ -374,11 +460,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, uint16_t flags,
     memset(&netdev_options, 0, sizeof netdev_options);
     netdev_options.name = devname;
     netdev_options.ethertype = NETDEV_ETH_TYPE_ANY;
-    netdev_options.may_create = true;
     if (internal) {
         netdev_options.type = "tap";
-    } else {
-        netdev_options.may_open = true;
     }
 
     error = netdev_open(&netdev_options, &netdev);
@@ -398,15 +481,24 @@ do_add_port(struct dp_netdev *dp, const char *devname, uint16_t flags,
     port->port_no = port_no;
     port->netdev = netdev;
     port->internal = internal;
+#ifdef THREADED
+    port->poll_fd = NULL;
+#endif
 
     netdev_get_mtu(netdev, &mtu);
     if (mtu > max_mtu) {
         max_mtu = mtu;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     list_push_back(&dp->port_list, &port->node);
-    dp->ports[port_no] = port;
     dp->n_ports++;
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
+    dp->ports[port_no] = port;
     dp->serial++;
 
     return 0;
@@ -460,12 +552,21 @@ get_port_by_name(struct dp_netdev *dp,
 {
     struct dp_netdev_port *port;
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         if (!strcmp(netdev_get_name(port->netdev), devname)) {
             *portp = port;
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
             return 0;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return ENOENT;
 }
 
@@ -475,7 +576,7 @@ do_del_port(struct dp_netdev *dp, uint16_t port_no)
     struct dp_netdev_port *port;
     char *name;
     int error;
-
+    /* XXX why no semaphores?? */
     error = get_port_by_number(dp, port_no, &port);
     if (error) {
         return error;
@@ -538,7 +639,13 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
 static void
 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
 {
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_remove(&dp->flow_table, &flow->node);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     free(flow->actions);
     free(flow);
 }
@@ -570,6 +677,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
     int i;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->port_list_mutex);
+#endif
     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
         struct odp_port *odp_port = &ports[i];
         if (i >= n) {
@@ -578,6 +688,9 @@ dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
         answer_port_query(port, odp_port);
         i++;
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->port_list_mutex);
+#endif
     return dp->n_ports;
 }
 
@@ -659,17 +772,27 @@ dpif_netdev_port_group_set(struct dpif *dpif, int group_no,
 }
 
 static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const flow_t *key)
+dp_netdev_lookup_flow(struct dp_netdev *dp, const flow_t *key)
 {
     struct dp_netdev_flow *flow;
 
-    assert(key->reserved == 0);
+    assert(!key->reserved[0] && !key->reserved[1] && !key->reserved[2]);
+    
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH_WITH_HASH (flow, struct dp_netdev_flow, node,
                              flow_hash(key, 0), &dp->flow_table) {
         if (flow_equal(&flow->key, key)) {
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
             return flow;
         }
     }
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return NULL;
 }
 
@@ -682,9 +805,9 @@ answer_flow_query(struct dp_netdev_flow *flow, uint32_t query_flags,
         odp_flow->stats.n_packets = flow->packet_count;
         odp_flow->stats.n_bytes = flow->byte_count;
         odp_flow->stats.used_sec = flow->used.tv_sec;
-        odp_flow->stats.used_nsec = flow->used.tv_usec * 1000;
+        odp_flow->stats.used_nsec = flow->used.tv_nsec;
         odp_flow->stats.tcp_flags = TCP_FLAGS(flow->tcp_ctl);
-        odp_flow->stats.ip_tos = flow->ip_tos;
+        odp_flow->stats.reserved = 0;
         odp_flow->stats.error = 0;
         if (odp_flow->n_actions > 0) {
             unsigned int n = MIN(odp_flow->n_actions, flow->n_actions);
@@ -710,8 +833,11 @@ dpif_netdev_flow_get(const struct dpif *dpif, struct odp_flow flows[], int n)
 
     for (i = 0; i < n; i++) {
         struct odp_flow *odp_flow = &flows[i];
-        answer_flow_query(dp_netdev_lookup_flow(dp, &odp_flow->key),
-                          odp_flow->flags, odp_flow);
+    struct dp_netdev_flow *lookup_flow;
+
+    lookup_flow = dp_netdev_lookup_flow(dp, &odp_flow->key);
+    if ( lookup_flow == NULL )
+        answer_flow_query(lookup_flow, odp_flow->flags, odp_flow);
     }
     return 0;
 }
@@ -720,41 +846,48 @@ static int
 dpif_netdev_validate_actions(const union odp_action *actions, int n_actions,
                              bool *mutates)
 {
-       unsigned int i;
+    unsigned int i;
 
     *mutates = false;
-       for (i = 0; i < n_actions; i++) {
-               const union odp_action *a = &actions[i];
-               switch (a->type) {
-               case ODPAT_OUTPUT:
-                       if (a->output.port >= MAX_PORTS) {
-                               return EINVAL;
+    for (i = 0; i < n_actions; i++) {
+        const union odp_action *a = &actions[i];
+        switch (a->type) {
+        case ODPAT_OUTPUT:
+            if (a->output.port >= MAX_PORTS) {
+                return EINVAL;
             }
-                       break;
+            break;
 
-               case ODPAT_OUTPUT_GROUP:
+        case ODPAT_OUTPUT_GROUP:
             *mutates = true;
-                       if (a->output_group.group >= N_GROUPS) {
-                               return EINVAL;
+            if (a->output_group.group >= N_GROUPS) {
+                return EINVAL;
             }
-                       break;
+            break;
 
         case ODPAT_CONTROLLER:
             break;
 
-               case ODPAT_SET_VLAN_VID:
+        case ODPAT_SET_VLAN_VID:
+            *mutates = true;
+            if (a->vlan_vid.vlan_vid & htons(~VLAN_VID_MASK)) {
+                return EINVAL;
+            }
+            break;
+
+        case ODPAT_SET_VLAN_PCP:
             *mutates = true;
-                       if (a->vlan_vid.vlan_vid & htons(~VLAN_VID_MASK)) {
-                               return EINVAL;
+            if (a->vlan_pcp.vlan_pcp & ~(VLAN_PCP_MASK >> VLAN_PCP_SHIFT)) {
+                return EINVAL;
             }
-                       break;
+            break;
 
-               case ODPAT_SET_VLAN_PCP:
+        case ODPAT_SET_NW_TOS:
             *mutates = true;
-                       if (a->vlan_pcp.vlan_pcp & ~VLAN_PCP_MASK) {
-                               return EINVAL;
+            if (a->nw_tos.nw_tos & IP_ECN_MASK) {
+                return EINVAL;
             }
-                       break;
+            break;
 
         case ODPAT_STRIP_VLAN:
         case ODPAT_SET_DL_SRC:
@@ -766,11 +899,11 @@ dpif_netdev_validate_actions(const union odp_action *actions, int n_actions,
             *mutates = true;
             break;
 
-               default:
+        default:
             return EOPNOTSUPP;
-               }
-       }
-       return 0;
+        }
+    }
+    return 0;
 }
 
 static int
@@ -805,7 +938,7 @@ add_flow(struct dpif *dpif, struct odp_flow *odp_flow)
 
     flow = xzalloc(sizeof *flow);
     flow->key = odp_flow->key;
-    flow->key.reserved = 0;
+    memset(flow->key.reserved, 0, sizeof flow->key.reserved);
 
     error = set_flow_actions(flow, odp_flow);
     if (error) {
@@ -813,7 +946,13 @@ add_flow(struct dpif *dpif, struct odp_flow *odp_flow)
         return error;
     }
 
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
     return 0;
 }
 
@@ -821,10 +960,9 @@ static void
 clear_stats(struct dp_netdev_flow *flow)
 {
     flow->used.tv_sec = 0;
-    flow->used.tv_usec = 0;
+    flow->used.tv_nsec = 0;
     flow->packet_count = 0;
     flow->byte_count = 0;
-    flow->ip_tos = 0;
     flow->tcp_ctl = 0;
 }
 
@@ -833,11 +971,19 @@ dpif_netdev_flow_put(struct dpif *dpif, struct odp_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
+    int n_flows;
 
     flow = dp_netdev_lookup_flow(dp, &put->flow.key);
     if (!flow) {
         if (put->flags & ODPPF_CREATE) {
-            if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+#ifdef THREADED
+            pthread_mutex_lock(&dp->table_mutex);
+#endif
+            n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+            pthread_mutex_unlock(&dp->table_mutex);
+#endif
+            if (n_flows < MAX_FLOWS) {
                 return add_flow(dpif, &put->flow);
             } else {
                 return EFBIG;
@@ -880,16 +1026,24 @@ dpif_netdev_flow_list(const struct dpif *dpif, struct odp_flow flows[], int n)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *flow;
-    int i;
+    int i, n_flows;
 
     i = 0;
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
     HMAP_FOR_EACH (flow, struct dp_netdev_flow, node, &dp->flow_table) {
         if (i >= n) {
             break;
         }
         answer_flow_query(flow, 0, &flows[i++]);
     }
-    return hmap_count(&dp->flow_table);
+    n_flows = hmap_count(&dp->flow_table);
+#ifdef THREADED
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
+
+    return n_flows;
 }
 
 static int
@@ -925,7 +1079,7 @@ dpif_netdev_execute(struct dpif *dpif, uint16_t in_port,
          * if we don't. */
         copy = *packet;
     }
-    flow_extract(&copy, in_port, &flow);
+    flow_extract(&copy, 0, in_port, &flow);
     error = dp_netdev_execute_actions(dp, &copy, &flow, actions, n_actions);
     if (mutates) {
         ofpbuf_uninit(&copy);
@@ -971,13 +1125,31 @@ find_nonempty_queue(struct dpif *dpif)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
+dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp OVS_UNUSED)
 {
-    struct ovs_queue *q = find_nonempty_queue(dpif);
+    struct ovs_queue *q;
+
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    char c;
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    q = find_nonempty_queue(dpif);
     if (q) {
         *bufp = queue_pop_head(q);
+#ifdef THREADED
+        /* read a byte from the pipe to advertise that a packet has been
+         * received */
+        if (read(dp->pipe[0], &c, 1) < 0) {
+            printf("Error reading from the pipe\n");
+        }
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return 0;
     } else {
+#ifdef THREADED
+        pthread_mutex_unlock(&dp->table_mutex);
+#endif
         return EAGAIN;
     }
 }
@@ -985,6 +1157,11 @@ dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    poll_fd_wait(dp->pipe[0], POLLIN);
+#else 
     struct ovs_queue *q = find_nonempty_queue(dpif);
     if (q) {
         poll_immediate_wake();
@@ -992,23 +1169,20 @@ dpif_netdev_recv_wait(struct dpif *dpif)
         /* No messages ready to be received, and dp_wait() will ensure that we
          * wake up to queue new messages, so there is nothing to do. */
     }
+#endif
 }
 \f
+
 static void
 dp_netdev_flow_used(struct dp_netdev_flow *flow, const flow_t *key,
                     const struct ofpbuf *packet)
 {
-    time_timeval(&flow->used);
+    time_timespec(&flow->used);
     flow->packet_count++;
     flow->byte_count += packet->size;
-    if (key->dl_type == htons(ETH_TYPE_IP)) {
-        struct ip_header *nh = packet->l3;
-        flow->ip_tos = nh->ip_tos;
-
-        if (key->nw_proto == IPPROTO_TCP) {
-            struct tcp_header *th = packet->l4;
-            flow->tcp_ctl |= th->tcp_ctl;
-        }
+    if (key->dl_type == htons(ETH_TYPE_IP) && key->nw_proto == IPPROTO_TCP) {
+        struct tcp_header *th = packet->l4;
+        flow->tcp_ctl |= th->tcp_ctl;
     }
 }
 
@@ -1019,7 +1193,10 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
     struct dp_netdev_flow *flow;
     flow_t key;
 
-    if (flow_extract(packet, port->port_no, &key) && dp->drop_frags) {
+    if (packet->size < ETH_HEADER_LEN) {
+        return;
+    }
+    if (flow_extract(packet, 0, port->port_no, &key) && dp->drop_frags) {
         dp->n_frags++;
         return;
     }
@@ -1036,13 +1213,17 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
     }
 }
 
+/*
+ * This function is no longer called by the threaded version. The same task is
+ * instead performed in the thread body.
+ */
 static void
 dp_netdev_run(void)
 {
     struct ofpbuf packet;
     struct dp_netdev *dp;
 
-    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + max_mtu);
+    ofpbuf_init(&packet, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
     LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
         struct dp_netdev_port *port;
 
@@ -1066,6 +1247,7 @@ dp_netdev_run(void)
     ofpbuf_uninit(&packet);
 }
 
+/* This function is no longer called in the threaded version. */
 static void
 dp_netdev_wait(void)
 {
@@ -1079,14 +1261,158 @@ dp_netdev_wait(void)
     }
 }
 
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+    struct dp_netdev *dp;   /* update statistics */
+    struct dp_netdev_port *port;    /* argument to flow identifier function */
+    struct ofpbuf buf;      /* used to process the packet */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet)
+{
+    struct dispatch_arg *arg = (struct dispatch_arg *)arg_p;
+    struct ofpbuf *buf = &arg->buf;
+
+    /* set packet size and data pointer */
+    buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or
+                              * hdr->caplen */
+    buf->data = (void*)packet;
+
+    dp_netdev_port_input(arg->dp, arg->port, buf);
+
+    return;
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+    struct dp_netdev *dp;
+    struct dp_netdev_port *port;
+    struct dispatch_arg arg;
+    int error;
+    int n_fds;
+    uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+    int processed;     /* actual number of pkts processed by the dispatch */
+
+    sigset_t sigmask;
+
+    /*XXX Since the poll involves all ports of all datapaths, the right fds
+     * size should be MAX_PORTS * max_number_of_datapaths */
+    struct pollfd fds[MAX_PORTS]; 
+    
+    /* mask the fatal signals. In this way the main thread is delegate to
+     * manage this them. */
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGALRM);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGHUP);
+
+    if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+        printf("Error pthread_sigmask\n");
+    }
+
+    ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
+    for(;;) {
+        n_fds = 0;
+        /* build the structure for poll */
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                /* insert an element in the fds structure */
+                fds[n_fds].fd = netdev_get_fd(port->netdev);
+                fds[n_fds].events = POLLIN;
+                port->poll_fd = &fds[n_fds];
+                n_fds++;
+            }
+            pthread_mutex_unlock(&dp->port_list_mutex);
+        }
+
+        error = poll(fds, n_fds, 2000);
+
+        if (error < 0) {
+            printf("poll() error: %s\n", strerror(errno));
+            break;
+        }
+
+        LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+            arg.dp = dp;
+            pthread_mutex_lock(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+                arg.port = port;
+                arg.buf.size = 0;
+                arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM;
+                if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+                    /* call the dispatch and process the packet into
+                     * its callback. We process 'batch' packets at time */
+                    processed = netdev_dispatch(port->netdev, batch,
+                                         process_pkt, (u_char *)&arg);
+                    if (processed < 0) { /* pcap returns error */
+                        struct vlog_rate_limit rl =
+                            VLOG_RATE_LIMIT_INIT(1, 5);
+                        VLOG_ERR_RL(&rl, 
+                                "error receiving data from XXX \n"); 
+                    }
+                } /* end of if poll */
+            } /* end of port loop */
+        pthread_mutex_unlock(&dp->port_list_mutex);
+        } /* end of dp loop */
+    } /* for ;; */
+
+    ofpbuf_uninit(&arg.buf);
+    return NULL;
+}
+
+/* Starts the datapath */
+static void
+dp_netdev_start(void) 
+{
+    int error;
+
+    /* Launch thread which manages the datapath */
+    error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+    return;
+}
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
 static void
-dp_netdev_modify_vlan_tci(struct ofpbuf *packet, flow_t *key,
-                          uint16_t tci, uint16_t mask)
+dp_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+    pthread_cancel(thread_p);
+    pthread_join(thread_p, NULL);
+}
+#endif /* THREADED */
+
+/* Modify the TCI field of 'packet'.  If a VLAN tag is not present, one
+ * is added with the TCI field set to 'tci'.  If a VLAN tag is present,
+ * then 'mask' bits are cleared before 'tci' is logically OR'd into the
+ * TCI field.
+ *
+ * Note that the function does not ensure that 'tci' does not affect
+ * bits outside of 'mask'.
+ */
+static void
+dp_netdev_modify_vlan_tci(struct ofpbuf *packet, uint16_t tci, uint16_t mask)
 {
     struct vlan_eth_header *veh;
+    struct eth_header *eh;
 
-    if (key->dl_vlan != htons(ODP_VLAN_NONE)) {
-        /* Modify 'mask' bits, but maintain other TCI bits. */
+    eh = packet->l2;
+    if (packet->size >= sizeof(struct vlan_eth_header)
+        && eh->eth_type == htons(ETH_TYPE_VLAN)) {
+        /* Clear 'mask' bits, but maintain other TCI bits. */
         veh = packet->l2;
         veh->veth_tci &= ~htons(mask);
         veh->veth_tci |= htons(tci);
@@ -1104,15 +1430,14 @@ dp_netdev_modify_vlan_tci(struct ofpbuf *packet, flow_t *key,
         memcpy(veh, &tmp, sizeof tmp);
         packet->l2 = (char*)packet->l2 - VLAN_HEADER_LEN;
     }
-
-    key->dl_vlan = veh->veth_tci & htons(VLAN_VID_MASK);
 }
 
 static void
-dp_netdev_strip_vlan(struct ofpbuf *packet, flow_t *key)
+dp_netdev_strip_vlan(struct ofpbuf *packet)
 {
     struct vlan_eth_header *veh = packet->l2;
-    if (veh->veth_type == htons(ETH_TYPE_VLAN)) {
+    if (packet->size >= sizeof *veh
+        && veh->veth_type == htons(ETH_TYPE_VLAN)) {
         struct eth_header tmp;
 
         memcpy(tmp.eth_dst, veh->veth_dst, ETH_ADDR_LEN);
@@ -1123,40 +1448,42 @@ dp_netdev_strip_vlan(struct ofpbuf *packet, flow_t *key)
         packet->data = (char*)packet->data + VLAN_HEADER_LEN;
         packet->l2 = (char*)packet->l2 + VLAN_HEADER_LEN;
         memcpy(packet->data, &tmp, sizeof tmp);
-
-        key->dl_vlan = htons(ODP_VLAN_NONE);
     }
 }
 
 static void
-dp_netdev_set_dl_src(struct ofpbuf *packet,
-                     const uint8_t dl_addr[ETH_ADDR_LEN])
+dp_netdev_set_dl_src(struct ofpbuf *packet, const uint8_t dl_addr[ETH_ADDR_LEN])
 {
     struct eth_header *eh = packet->l2;
     memcpy(eh->eth_src, dl_addr, sizeof eh->eth_src);
 }
 
 static void
-dp_netdev_set_dl_dst(struct ofpbuf *packet,
-                     const uint8_t dl_addr[ETH_ADDR_LEN])
+dp_netdev_set_dl_dst(struct ofpbuf *packet, const uint8_t dl_addr[ETH_ADDR_LEN])
 {
     struct eth_header *eh = packet->l2;
     memcpy(eh->eth_dst, dl_addr, sizeof eh->eth_dst);
 }
 
+static bool
+is_ip(const struct ofpbuf *packet, const flow_t *key)
+{
+    return key->dl_type == htons(ETH_TYPE_IP) && packet->l4;
+}
+
 static void
-dp_netdev_set_nw_addr(struct ofpbuf *packet, flow_t *key,
+dp_netdev_set_nw_addr(struct ofpbuf *packet, const flow_t *key,
                       const struct odp_action_nw_addr *a)
 {
-    if (key->dl_type == htons(ETH_TYPE_IP)) {
+    if (is_ip(packet, key)) {
         struct ip_header *nh = packet->l3;
         uint32_t *field;
 
         field = a->type == ODPAT_SET_NW_SRC ? &nh->ip_src : &nh->ip_dst;
-        if (key->nw_proto == IP_TYPE_TCP) {
+        if (key->nw_proto == IP_TYPE_TCP && packet->l7) {
             struct tcp_header *th = packet->l4;
             th->tcp_csum = recalc_csum32(th->tcp_csum, *field, a->nw_addr);
-        } else if (key->nw_proto == IP_TYPE_UDP) {
+        } else if (key->nw_proto == IP_TYPE_UDP && packet->l7) {
             struct udp_header *uh = packet->l4;
             if (uh->udp_csum) {
                 uh->udp_csum = recalc_csum32(uh->udp_csum, *field, a->nw_addr);
@@ -1171,21 +1498,40 @@ dp_netdev_set_nw_addr(struct ofpbuf *packet, flow_t *key,
 }
 
 static void
-dp_netdev_set_tp_port(struct ofpbuf *packet, flow_t *key,
+dp_netdev_set_nw_tos(struct ofpbuf *packet, const flow_t *key,
+                     const struct odp_action_nw_tos *a)
+{
+    if (is_ip(packet, key)) {
+        struct ip_header *nh = packet->l3;
+        uint8_t *field = &nh->ip_tos;
+
+        /* Set the DSCP bits and preserve the ECN bits. */
+        uint8_t new = a->nw_tos | (nh->ip_tos & IP_ECN_MASK);
+
+        nh->ip_csum = recalc_csum16(nh->ip_csum, htons((uint16_t)*field),
+                htons((uint16_t)a->nw_tos));
+        *field = new;
+    }
+}
+
+static void
+dp_netdev_set_tp_port(struct ofpbuf *packet, const flow_t *key,
                       const struct odp_action_tp_port *a)
 {
-       if (key->dl_type == htons(ETH_TYPE_IP)) {
+    if (is_ip(packet, key)) {
         uint16_t *field;
-        if (key->nw_proto == IPPROTO_TCP) {
+        if (key->nw_proto == IPPROTO_TCP && packet->l7) {
             struct tcp_header *th = packet->l4;
             field = a->type == ODPAT_SET_TP_SRC ? &th->tcp_src : &th->tcp_dst;
             th->tcp_csum = recalc_csum16(th->tcp_csum, *field, a->tp_port);
             *field = a->tp_port;
-        } else if (key->nw_proto == IPPROTO_UDP) {
+        } else if (key->nw_proto == IPPROTO_UDP && packet->l7) {
             struct udp_header *uh = packet->l4;
             field = a->type == ODPAT_SET_TP_SRC ? &uh->udp_src : &uh->udp_dst;
             uh->udp_csum = recalc_csum16(uh->udp_csum, *field, a->tp_port);
             *field = a->tp_port;
+        } else {
+            return;
         }
     }
 }
@@ -1194,7 +1540,7 @@ static void
 dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
                       uint16_t out_port)
 {
-       struct dp_netdev_port *p = dp->ports[out_port];
+    struct dp_netdev_port *p = dp->ports[out_port];
     if (p) {
         netdev_send(p->netdev, packet);
     }
@@ -1204,15 +1550,15 @@ static void
 dp_netdev_output_group(struct dp_netdev *dp, uint16_t group, uint16_t in_port,
                        struct ofpbuf *packet)
 {
-       struct odp_port_group *g = &dp->groups[group];
-       int i;
+    struct odp_port_group *g = &dp->groups[group];
+    int i;
 
-       for (i = 0; i < g->n_ports; i++) {
+    for (i = 0; i < g->n_ports; i++) {
         uint16_t out_port = g->ports[i];
         if (out_port != in_port) {
             dp_netdev_output_port(dp, packet, out_port);
         }
-       }
+    }
 }
 
 static int
@@ -1223,6 +1569,9 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     struct odp_msg *header;
     struct ofpbuf *msg;
     size_t msg_size;
+#ifdef THREADED
+    char c;
+#endif
 
     if (q->n >= MAX_QUEUE_LEN) {
         dp->n_lost++;
@@ -1230,75 +1579,128 @@ dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
     }
 
     msg_size = sizeof *header + packet->size;
-    msg = ofpbuf_new(msg_size);
+    msg = ofpbuf_new_with_headroom(msg_size, DPIF_RECV_MSG_PADDING);
     header = ofpbuf_put_uninit(msg, sizeof *header);
     header->type = queue_no;
     header->length = msg_size;
     header->port = port_no;
     header->arg = arg;
     ofpbuf_put(msg, packet->data, packet->size);
+#ifdef THREADED
+    pthread_mutex_lock(&dp->table_mutex);
+#endif
+    
     queue_push_tail(q, msg);
+#ifdef THREADED
+    /* write a byte on the pipe to advertise that a packet is ready */
+    if (write(dp->pipe[1], &c, 1) < 0) {
+        printf("Error writing on the pipe\n");
+    }
+    pthread_mutex_unlock(&dp->table_mutex);
+#endif
 
     return 0;
 }
 
+/* Returns true if 'packet' is an invalid Ethernet+IPv4 ARP packet: one with
+ * screwy or truncated header fields or one whose inner and outer Ethernet
+ * address differ. */
+static bool
+dp_netdev_is_spoofed_arp(struct ofpbuf *packet, const struct odp_flow_key *key)
+{
+    struct arp_eth_header *arp;
+    struct eth_header *eth;
+    ptrdiff_t l3_size;
+
+    if (key->dl_type != htons(ETH_TYPE_ARP)) {
+        return false;
+    }
+
+    l3_size = (char *) ofpbuf_end(packet) - (char *) packet->l3;
+    if (l3_size < sizeof(struct arp_eth_header)) {
+        return true;
+    }
+
+    eth = packet->l2;
+    arp = packet->l3;
+    return (arp->ar_hrd != htons(ARP_HRD_ETHERNET)
+            || arp->ar_pro != htons(ARP_PRO_IP)
+            || arp->ar_hln != ETH_HEADER_LEN
+            || arp->ar_pln != 4
+            || !eth_addr_equals(arp->ar_sha, eth->eth_src));
+}
+
+/*
+ * Execute the actions associated to a flow.
+ */
 static int
 dp_netdev_execute_actions(struct dp_netdev *dp,
-                          struct ofpbuf *packet, flow_t *key,
+                          struct ofpbuf *packet, const flow_t *key,
                           const union odp_action *actions, int n_actions)
 {
     int i;
+
     for (i = 0; i < n_actions; i++) {
         const union odp_action *a = &actions[i];
 
-               switch (a->type) {
-               case ODPAT_OUTPUT:
+        switch (a->type) {
+        case ODPAT_OUTPUT:
             dp_netdev_output_port(dp, packet, a->output.port);
-                       break;
+            break;
 
-               case ODPAT_OUTPUT_GROUP:
-                       dp_netdev_output_group(dp, a->output_group.group, key->in_port,
+        case ODPAT_OUTPUT_GROUP:
+            dp_netdev_output_group(dp, a->output_group.group, key->in_port,
                                    packet);
-                       break;
+            break;
 
-               case ODPAT_CONTROLLER:
+        case ODPAT_CONTROLLER:
             dp_netdev_output_control(dp, packet, _ODPL_ACTION_NR,
                                      key->in_port, a->controller.arg);
-                       break;
+            break;
 
-               case ODPAT_SET_VLAN_VID:
-                       dp_netdev_modify_vlan_tci(packet, key, ntohs(a->vlan_vid.vlan_vid),
+        case ODPAT_SET_VLAN_VID:
+            dp_netdev_modify_vlan_tci(packet, ntohs(a->vlan_vid.vlan_vid),
                                       VLAN_VID_MASK);
             break;
 
-               case ODPAT_SET_VLAN_PCP:
-                       dp_netdev_modify_vlan_tci(packet, key, a->vlan_pcp.vlan_pcp << 13,
+        case ODPAT_SET_VLAN_PCP:
+            dp_netdev_modify_vlan_tci(packet,
+                                      a->vlan_pcp.vlan_pcp << VLAN_PCP_SHIFT,
                                       VLAN_PCP_MASK);
             break;
 
-               case ODPAT_STRIP_VLAN:
-                       dp_netdev_strip_vlan(packet, key);
-                       break;
+        case ODPAT_STRIP_VLAN:
+            dp_netdev_strip_vlan(packet);
+            break;
 
-               case ODPAT_SET_DL_SRC:
+        case ODPAT_SET_DL_SRC:
             dp_netdev_set_dl_src(packet, a->dl_addr.dl_addr);
-                       break;
+            break;
 
-               case ODPAT_SET_DL_DST:
+        case ODPAT_SET_DL_DST:
             dp_netdev_set_dl_dst(packet, a->dl_addr.dl_addr);
-                       break;
-
-               case ODPAT_SET_NW_SRC:
-               case ODPAT_SET_NW_DST:
-                       dp_netdev_set_nw_addr(packet, key, &a->nw_addr);
-                       break;
-
-               case ODPAT_SET_TP_SRC:
-               case ODPAT_SET_TP_DST:
-                       dp_netdev_set_tp_port(packet, key, &a->tp_port);
-                       break;
-               }
-       }
+            break;
+
+        case ODPAT_SET_NW_SRC:
+        case ODPAT_SET_NW_DST:
+            dp_netdev_set_nw_addr(packet, key, &a->nw_addr);
+            break;
+
+        case ODPAT_SET_NW_TOS:
+            dp_netdev_set_nw_tos(packet, key, &a->nw_tos);
+            break;
+
+        case ODPAT_SET_TP_SRC:
+        case ODPAT_SET_TP_DST:
+            dp_netdev_set_tp_port(packet, key, &a->tp_port);
+            break;
+
+        case ODPAT_DROP_SPOOFED_ARP:
+            if (dp_netdev_is_spoofed_arp(packet, key)) {
+                return 0;
+            }
+        }
+    }
     return 0;
 }
 
@@ -1306,6 +1708,10 @@ const struct dpif_class dpif_netdev_class = {
     "netdev",
     dp_netdev_run,
     dp_netdev_wait,
+#ifdef THREADED
+    dp_netdev_start, 
+    dp_netdev_exit_hook,
+#endif
     NULL,                       /* enumerate */
     dpif_netdev_open,
     dpif_netdev_close,
@@ -1333,6 +1739,7 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_recv_set_mask,
     NULL,                       /* get_sflow_probability */
     NULL,                       /* set_sflow_probability */
+    NULL,                       /* queue_to_priority */
     dpif_netdev_recv,
     dpif_netdev_recv_wait,
 };