+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+ struct dp_netdev *dp; /* update statistics */
+ struct dp_netdev_port *port; /* argument to flow identifier function */
+ struct ofpbuf buf; /* used to process the packet */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet)
+{
+ struct dispatch_arg *arg = (struct dispatch_arg *)arg_p;
+ struct ofpbuf *buf = &arg->buf;
+
+ /* set packet size and data pointer */
+ buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or
+ * hdr->caplen */
+ buf->data = (void*)packet;
+
+ dp_netdev_port_input(arg->dp, arg->port, buf);
+
+ return;
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+ struct dp_netdev *dp;
+ struct dp_netdev_port *port;
+ struct dispatch_arg arg;
+ int error;
+ int n_fds;
+ uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+ int processed; /* actual number of pkts processed by the dispatch */
+
+ sigset_t sigmask;
+
+ /*XXX Since the poll involves all ports of all datapaths, the right fds
+ * size should be MAX_PORTS * max_number_of_datapaths */
+ struct pollfd fds[MAX_PORTS];
+
+ /* mask the fatal signals. In this way the main thread is delegate to
+ * manage this them. */
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGTERM);
+ sigaddset(&sigmask, SIGALRM);
+ sigaddset(&sigmask, SIGINT);
+ sigaddset(&sigmask, SIGHUP);
+
+ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+ printf("Error pthread_sigmask\n");
+ }
+
+ ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
+ for(;;) {
+ n_fds = 0;
+ /* build the structure for poll */
+ LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+ pthread_mutex_lock(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+ /* insert an element in the fds structure */
+ fds[n_fds].fd = netdev_get_fd(port->netdev);
+ fds[n_fds].events = POLLIN;
+ port->poll_fd = &fds[n_fds];
+ n_fds++;
+ }
+ pthread_mutex_unlock(&dp->port_list_mutex);
+ }
+
+ error = poll(fds, n_fds, 2000);
+
+ if (error < 0) {
+ printf("poll() error: %s\n", strerror(errno));
+ break;
+ }
+
+ LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
+ arg.dp = dp;
+ pthread_mutex_lock(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
+ arg.port = port;
+ arg.buf.size = 0;
+ arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM;
+ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+ /* call the dispatch and process the packet into
+ * its callback. We process 'batch' packets at time */
+ processed = netdev_dispatch(port->netdev, batch,
+ process_pkt, (u_char *)&arg);
+ if (processed < 0) { /* pcap returns error */
+ struct vlog_rate_limit rl =
+ VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl,
+ "error receiving data from XXX \n");
+ }
+ } /* end of if poll */
+ } /* end of port loop */
+ pthread_mutex_unlock(&dp->port_list_mutex);
+ } /* end of dp loop */
+ } /* for ;; */
+
+ ofpbuf_uninit(&arg.buf);
+ return NULL;
+}
+
+/* Starts the datapath */
+static void
+dp_netdev_start(void)
+{
+ int error;
+
+ /* Launch thread which manages the datapath */
+ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+ return;
+}
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dp_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+ pthread_cancel(thread_p);
+ pthread_join(thread_p, NULL);
+}
+#endif /* THREADED */