Catalli's threaded switch
[sliver-openvswitch.git] / lib / dpif-netdev.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "dpif.h"
19
20 #include <assert.h>
21 #include <ctype.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <netinet/in.h>
26 #include <sys/socket.h>
27 #include <net/if.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33
34 #ifdef THREADED
35 #include <signal.h>
36 #include <pthread.h>
37
38 #include "socket-util.h"
39 #include "fatal-signal.h"
40 #include "dispatch.h"
41 #endif
42
43 #include "csum.h"
44 #include "dpif-provider.h"
45 #include "flow.h"
46 #include "hmap.h"
47 #include "list.h"
48 #include "netdev.h"
49 #include "odp-util.h"
50 #include "ofp-print.h"
51 #include "ofpbuf.h"
52 #include "packets.h"
53 #include "poll-loop.h"
54 #include "queue.h"
55 #include "timeval.h"
56 #include "util.h"
57 #include "vlog.h"
58
59 VLOG_DEFINE_THIS_MODULE(dpif_netdev)
60
61 /* We could use these macros instead of using #ifdef and #endif every time we
62  * need to call the pthread_mutex_lock/unlock.
63 #ifdef THREADED
64 #define LOCK(mutex) pthread_mutex_lock(mutex)
65 #define UNLOCK(mutex) pthread_mutex_unlock(mutex)
66 #else
67 #define LOCK(mutex)
68 #define UNLOCK(mutex)
69 #endif
70 */
71
72 /* Configuration parameters. */
73 enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
74 enum { MAX_QUEUE_LEN = 100 };   /* Maximum number of packets per queue. */
75 enum { N_GROUPS = 16 };         /* Number of port groups. */
76 enum { MAX_PORTS = 256 };       /* Maximum number of ports. */
77 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
78
79 /* Enough headroom to add a vlan tag, plus an extra 2 bytes to allow IP
80  * headers to be aligned on a 4-byte boundary.  */
81 enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
82
83 /* Datapath based on the network device interface from netdev.h. */
84 struct dp_netdev {
85     struct list node;
86     int dp_idx;
87     int open_cnt;
88     bool destroyed;
89
90     bool drop_frags;            /* Drop all IP fragments, if true. */
91
92 #ifdef THREADED
93     /* The pipe is used to signal the presence of a packet on the queue.
94      * - dpif_netdev_recv_wait() waits on p[0]
95      * - dpif_netdev_recv() extract from queue and read p[0]
96      * - dp_netdev_output_control() send to queue and write p[1]
97      */
98
99     /* The access to this queue is protected by the table_mutex mutex */
100     int pipe[2];    /* signal a packet on the queue */
101
102     pthread_mutex_t table_mutex;    /* mutex for the flow table */
103     pthread_mutex_t port_list_mutex;    /* port list mutex */
104 #endif
105     
106     struct ovs_queue queues[N_QUEUES];  /* messages queued for dpif_recv(). */
107
108     struct hmap flow_table;     /* Flow table. */
109
110     struct odp_port_group groups[N_GROUPS];
111
112     /* Statistics. */
113     long long int n_frags;      /* Number of dropped IP fragments. */
114     long long int n_hit;        /* Number of flow table matches. */
115     long long int n_missed;     /* Number of flow table misses. */
116     long long int n_lost;       /* Number of misses not passed to client. */
117
118     /* Ports. */
119     struct dp_netdev_port *ports[MAX_PORTS];
120     struct list port_list;
121     int n_ports;
122     unsigned int serial;
123 };
124
125 /* A port in a netdev-based datapath. */
126 struct dp_netdev_port {
127     int port_no;                /* Index into dp_netdev's 'ports'. */
128     struct list node;           /* Element in dp_netdev's 'port_list'. */
129     struct netdev *netdev;
130
131     bool internal;              /* Internal port (as ODP_PORT_INTERNAL)? */
132 #ifdef THREADED
133     struct pollfd *poll_fd;     /* Useful to manage the poll loop in the
134                                  * thread */
135 #endif
136 };
137
138 /* A flow in dp_netdev's 'flow_table'. */
139 struct dp_netdev_flow {
140     struct hmap_node node;      /* Element in dp_netdev's 'flow_table'. */
141     flow_t key;
142
143     /* Statistics. */
144     struct timespec used;       /* Last used time. */
145     long long int packet_count; /* Number of packets matched. */
146     long long int byte_count;   /* Number of bytes matched. */
147     uint16_t tcp_ctl;           /* Bitwise-OR of seen tcp_ctl values. */
148
149     /* Actions. */
150     union odp_action *actions;
151     unsigned int n_actions;
152 };
153
154 /* Interface to netdev-based datapath. */
155 struct dpif_netdev {
156     struct dpif dpif;
157     struct dp_netdev *dp;
158     int listen_mask;
159     unsigned int dp_serial;
160 };
161
162 /* All netdev-based datapaths. */
163 static struct dp_netdev *dp_netdevs[256];
164 struct list dp_netdev_list = LIST_INITIALIZER(&dp_netdev_list);
165 enum { N_DP_NETDEVS = ARRAY_SIZE(dp_netdevs) };
166
167 #ifdef THREADED
168 /* Descriptor of the thread that manages the datapaths */
169 pthread_t thread_p;
170 #endif
171
172 /* Maximum port MTU seen so far. */
173 static int max_mtu = ETH_PAYLOAD_MAX;
174
175 static int get_port_by_number(struct dp_netdev *, uint16_t port_no,
176                               struct dp_netdev_port **portp);
177 static int get_port_by_name(struct dp_netdev *, const char *devname,
178                             struct dp_netdev_port **portp);
179 static void dp_netdev_free(struct dp_netdev *);
180 static void dp_netdev_flow_flush(struct dp_netdev *);
181 static int do_add_port(struct dp_netdev *, const char *devname, uint16_t flags,
182                        uint16_t port_no);
183 static int do_del_port(struct dp_netdev *, uint16_t port_no);
184 static int dp_netdev_output_control(struct dp_netdev *, const struct ofpbuf *,
185                                     int queue_no, int port_no, uint32_t arg);
186 static int dp_netdev_execute_actions(struct dp_netdev *,
187                                      struct ofpbuf *, const flow_t *,
188                                      const union odp_action *, int n);
189
190 static struct dpif_netdev *
191 dpif_netdev_cast(const struct dpif *dpif)
192 {
193     dpif_assert_class(dpif, &dpif_netdev_class);
194     return CONTAINER_OF(dpif, struct dpif_netdev, dpif);
195 }
196
197 static struct dp_netdev *
198 get_dp_netdev(const struct dpif *dpif)
199 {
200     return dpif_netdev_cast(dpif)->dp;
201 }
202
203 static int
204 name_to_dp_idx(const char *name)
205 {
206     if (!strncmp(name, "dp", 2) && isdigit((unsigned char)name[2])) {
207         int dp_idx = atoi(name + 2);
208         if (dp_idx >= 0 && dp_idx < N_DP_NETDEVS) {
209             return dp_idx;
210         }
211     }
212     return -1;
213 }
214
215 static struct dp_netdev *
216 find_dp_netdev(const char *name)
217 {
218     int dp_idx;
219     size_t i;
220
221     dp_idx = name_to_dp_idx(name);
222     if (dp_idx >= 0) {
223         return dp_netdevs[dp_idx];
224     }
225
226     for (i = 0; i < N_DP_NETDEVS; i++) {
227         struct dp_netdev *dp = dp_netdevs[i];
228         if (dp) {
229             struct dp_netdev_port *port;
230             if (!get_port_by_name(dp, name, &port)) {
231                 return dp;
232             }
233         }
234     }
235     return NULL;
236 }
237
238 static struct dpif *
239 create_dpif_netdev(struct dp_netdev *dp)
240 {
241     struct dpif_netdev *dpif;
242     char *dpname;
243
244     dp->open_cnt++;
245
246     dpname = xasprintf("dp%d", dp->dp_idx);
247     dpif = xmalloc(sizeof *dpif);
248     dpif_init(&dpif->dpif, &dpif_netdev_class, dpname, dp->dp_idx, dp->dp_idx);
249     dpif->dp = dp;
250     dpif->listen_mask = 0;
251     dpif->dp_serial = dp->serial;
252     free(dpname);
253
254     return &dpif->dpif;
255 }
256
257 static int
258 create_dp_netdev(const char *name, int dp_idx, struct dpif **dpifp)
259 {
260     struct dp_netdev *dp;
261     int error;
262     int i;
263     
264     if (dp_netdevs[dp_idx]) {
265         return EBUSY;
266     }
267
268     /* Create datapath. */
269     dp_netdevs[dp_idx] = dp = xzalloc(sizeof *dp);
270     list_push_back(&dp_netdev_list, &dp->node);
271     dp->dp_idx = dp_idx;
272     dp->open_cnt = 0;
273     dp->drop_frags = false;
274
275 #ifdef THREADED
276     error = pipe(dp->pipe);
277     if (error) {
278         fprintf(stderr, "pipe creation error\n");
279         return errno;
280     }
281     if (set_nonblocking(dp->pipe[0]) || set_nonblocking(dp->pipe[1])) {
282         fprintf(stderr, "error set_nonblock on pipe\n");
283         return errno;
284     }
285
286     pthread_mutex_init(&dp->table_mutex, NULL);
287     pthread_mutex_init(&dp->port_list_mutex, NULL);
288 #endif
289
290     for (i = 0; i < N_QUEUES; i++) {
291         queue_init(&dp->queues[i]);
292     }
293
294     hmap_init(&dp->flow_table);
295     for (i = 0; i < N_GROUPS; i++) {
296         dp->groups[i].ports = NULL;
297         dp->groups[i].n_ports = 0;
298         dp->groups[i].group = i;
299     }
300
301     list_init(&dp->port_list);
302     error = do_add_port(dp, name, ODP_PORT_INTERNAL, ODPP_LOCAL);
303     if (error) {
304         dp_netdev_free(dp);
305         return ENODEV;
306     }
307
308     *dpifp = create_dpif_netdev(dp);
309     return 0;
310 }
311
312 static int
313 dpif_netdev_open(const char *name, const char *type OVS_UNUSED, bool create,
314                  struct dpif **dpifp)
315 {
316     if (create) {
317         if (find_dp_netdev(name)) {
318             return EEXIST;
319         } else {
320             int dp_idx = name_to_dp_idx(name);
321             if (dp_idx >= 0) {
322                 return create_dp_netdev(name, dp_idx, dpifp);
323             } else {
324                 /* Scan for unused dp_idx number. */
325                 for (dp_idx = 0; dp_idx < N_DP_NETDEVS; dp_idx++) {
326                     int error = create_dp_netdev(name, dp_idx, dpifp);
327                     if (error != EBUSY) {
328                         return error;
329                     }
330                 }
331
332                 /* All datapath numbers in use. */
333                 return ENOBUFS;
334             }
335         }
336     } else {
337         struct dp_netdev *dp = find_dp_netdev(name);
338         if (dp) {
339             *dpifp = create_dpif_netdev(dp);
340             return 0;
341         } else {
342             return ENODEV;
343         }
344     }
345 }
346
347 static void
348 dp_netdev_free(struct dp_netdev *dp)
349 {
350     int i;
351
352     dp_netdev_flow_flush(dp);
353 #ifdef THREADED
354     pthread_mutex_lock(&dp->port_list_mutex);
355 #endif
356     while (dp->n_ports > 0) {
357         struct dp_netdev_port *port = CONTAINER_OF(
358             dp->port_list.next, struct dp_netdev_port, node);
359         do_del_port(dp, port->port_no);
360     }
361 #ifdef THREADED
362     pthread_mutex_unlock(&dp->port_list_mutex);
363     pthread_mutex_lock(&dp->table_mutex);
364 #endif
365     for (i = 0; i < N_QUEUES; i++) {
366         queue_destroy(&dp->queues[i]);
367     }
368     hmap_destroy(&dp->flow_table);
369 #ifdef THREADED
370     pthread_mutex_unlock(&dp->table_mutex);
371     pthread_mutex_destroy(&dp->table_mutex);
372     pthread_mutex_destroy(&dp->port_list_mutex);
373 #endif
374
375     for (i = 0; i < N_GROUPS; i++) {
376         free(dp->groups[i].ports);
377     }
378     dp_netdevs[dp->dp_idx] = NULL;
379     list_remove(&dp->node);
380     free(dp);
381 }
382
383 static void
384 dpif_netdev_close(struct dpif *dpif)
385 {
386     struct dp_netdev *dp = get_dp_netdev(dpif);
387     assert(dp->open_cnt > 0);
388     if (--dp->open_cnt == 0 && dp->destroyed) {
389         dp_netdev_free(dp);
390     }
391     free(dpif);
392 }
393
394 static int
395 dpif_netdev_destroy(struct dpif *dpif)
396 {
397     struct dp_netdev *dp = get_dp_netdev(dpif);
398     dp->destroyed = true;
399     return 0;
400 }
401
402 static int
403 dpif_netdev_get_stats(const struct dpif *dpif, struct odp_stats *stats)
404 {
405     struct dp_netdev *dp = get_dp_netdev(dpif);
406     memset(stats, 0, sizeof *stats);
407
408 #ifdef THREADED
409     pthread_mutex_lock(&dp->table_mutex);
410 #endif
411     stats->n_flows = hmap_count(&dp->flow_table);
412     stats->cur_capacity = hmap_capacity(&dp->flow_table);
413 #ifdef THREADED
414     pthread_mutex_unlock(&dp->table_mutex);
415 #endif
416
417     stats->max_capacity = MAX_FLOWS;
418     stats->n_ports = dp->n_ports;
419     stats->max_ports = MAX_PORTS;
420     stats->max_groups = N_GROUPS;
421     stats->n_frags = dp->n_frags;
422     stats->n_hit = dp->n_hit;
423     stats->n_missed = dp->n_missed;
424     stats->n_lost = dp->n_lost;
425     stats->max_miss_queue = MAX_QUEUE_LEN;
426     stats->max_action_queue = MAX_QUEUE_LEN;
427     return 0;
428 }
429
430 static int
431 dpif_netdev_get_drop_frags(const struct dpif *dpif, bool *drop_fragsp)
432 {
433     struct dp_netdev *dp = get_dp_netdev(dpif);
434     *drop_fragsp = dp->drop_frags;
435     return 0;
436 }
437
438 static int
439 dpif_netdev_set_drop_frags(struct dpif *dpif, bool drop_frags)
440 {
441     struct dp_netdev *dp = get_dp_netdev(dpif);
442     dp->drop_frags = drop_frags;
443     return 0;
444 }
445
446 static int
447 do_add_port(struct dp_netdev *dp, const char *devname, uint16_t flags,
448             uint16_t port_no)
449 {
450     bool internal = (flags & ODP_PORT_INTERNAL) != 0;
451     struct dp_netdev_port *port;
452     struct netdev_options netdev_options;
453     struct netdev *netdev;
454     int mtu;
455     int error;
456
457     /* XXX reject devices already in some dp_netdev. */
458
459     /* Open and validate network device. */
460     memset(&netdev_options, 0, sizeof netdev_options);
461     netdev_options.name = devname;
462     netdev_options.ethertype = NETDEV_ETH_TYPE_ANY;
463     if (internal) {
464         netdev_options.type = "tap";
465     }
466
467     error = netdev_open(&netdev_options, &netdev);
468     if (error) {
469         return error;
470     }
471     /* XXX reject loopback devices */
472     /* XXX reject non-Ethernet devices */
473
474     error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, false);
475     if (error) {
476         netdev_close(netdev);
477         return error;
478     }
479
480     port = xmalloc(sizeof *port);
481     port->port_no = port_no;
482     port->netdev = netdev;
483     port->internal = internal;
484 #ifdef THREADED
485     port->poll_fd = NULL;
486 #endif
487
488     netdev_get_mtu(netdev, &mtu);
489     if (mtu > max_mtu) {
490         max_mtu = mtu;
491     }
492
493 #ifdef THREADED
494     pthread_mutex_lock(&dp->port_list_mutex);
495 #endif
496     list_push_back(&dp->port_list, &port->node);
497     dp->n_ports++;
498 #ifdef THREADED
499     pthread_mutex_unlock(&dp->port_list_mutex);
500 #endif
501     dp->ports[port_no] = port;
502     dp->serial++;
503
504     return 0;
505 }
506
507 static int
508 dpif_netdev_port_add(struct dpif *dpif, const char *devname, uint16_t flags,
509                      uint16_t *port_nop)
510 {
511     struct dp_netdev *dp = get_dp_netdev(dpif);
512     int port_no;
513
514     for (port_no = 0; port_no < MAX_PORTS; port_no++) {
515         if (!dp->ports[port_no]) {
516             *port_nop = port_no;
517             return do_add_port(dp, devname, flags, port_no);
518         }
519     }
520     return EFBIG;
521 }
522
523 static int
524 dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no)
525 {
526     struct dp_netdev *dp = get_dp_netdev(dpif);
527     return port_no == ODPP_LOCAL ? EINVAL : do_del_port(dp, port_no);
528 }
529
530 static bool
531 is_valid_port_number(uint16_t port_no)
532 {
533     return port_no < MAX_PORTS;
534 }
535
536 static int
537 get_port_by_number(struct dp_netdev *dp,
538                    uint16_t port_no, struct dp_netdev_port **portp)
539 {
540     if (!is_valid_port_number(port_no)) {
541         *portp = NULL;
542         return EINVAL;
543     } else {
544         *portp = dp->ports[port_no];
545         return *portp ? 0 : ENOENT;
546     }
547 }
548
549 static int
550 get_port_by_name(struct dp_netdev *dp,
551                  const char *devname, struct dp_netdev_port **portp)
552 {
553     struct dp_netdev_port *port;
554
555 #ifdef THREADED
556     pthread_mutex_lock(&dp->port_list_mutex);
557 #endif
558     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
559         if (!strcmp(netdev_get_name(port->netdev), devname)) {
560             *portp = port;
561 #ifdef THREADED
562             pthread_mutex_unlock(&dp->port_list_mutex);
563 #endif
564             return 0;
565         }
566     }
567 #ifdef THREADED
568     pthread_mutex_unlock(&dp->port_list_mutex);
569 #endif
570     return ENOENT;
571 }
572
573 static int
574 do_del_port(struct dp_netdev *dp, uint16_t port_no)
575 {
576     struct dp_netdev_port *port;
577     char *name;
578     int error;
579     /* XXX why no semaphores?? */
580     error = get_port_by_number(dp, port_no, &port);
581     if (error) {
582         return error;
583     }
584
585     list_remove(&port->node);
586     dp->ports[port->port_no] = NULL;
587     dp->n_ports--;
588     dp->serial++;
589
590     name = xstrdup(netdev_get_name(port->netdev));
591     netdev_close(port->netdev);
592
593     free(name);
594     free(port);
595
596     return 0;
597 }
598
599 static void
600 answer_port_query(const struct dp_netdev_port *port, struct odp_port *odp_port)
601 {
602     memset(odp_port, 0, sizeof *odp_port);
603     ovs_strlcpy(odp_port->devname, netdev_get_name(port->netdev),
604                 sizeof odp_port->devname);
605     odp_port->port = port->port_no;
606     odp_port->flags = port->internal ? ODP_PORT_INTERNAL : 0;
607 }
608
609 static int
610 dpif_netdev_port_query_by_number(const struct dpif *dpif, uint16_t port_no,
611                                  struct odp_port *odp_port)
612 {
613     struct dp_netdev *dp = get_dp_netdev(dpif);
614     struct dp_netdev_port *port;
615     int error;
616
617     error = get_port_by_number(dp, port_no, &port);
618     if (!error) {
619         answer_port_query(port, odp_port);
620     }
621     return error;
622 }
623
624 static int
625 dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
626                                struct odp_port *odp_port)
627 {
628     struct dp_netdev *dp = get_dp_netdev(dpif);
629     struct dp_netdev_port *port;
630     int error;
631
632     error = get_port_by_name(dp, devname, &port);
633     if (!error) {
634         answer_port_query(port, odp_port);
635     }
636     return error;
637 }
638
639 static void
640 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
641 {
642 #ifdef THREADED
643     pthread_mutex_lock(&dp->table_mutex);
644 #endif
645     hmap_remove(&dp->flow_table, &flow->node);
646 #ifdef THREADED
647     pthread_mutex_unlock(&dp->table_mutex);
648 #endif
649     free(flow->actions);
650     free(flow);
651 }
652
653 static void
654 dp_netdev_flow_flush(struct dp_netdev *dp)
655 {
656     struct dp_netdev_flow *flow, *next;
657
658     HMAP_FOR_EACH_SAFE (flow, next, struct dp_netdev_flow, node,
659                         &dp->flow_table) {
660         dp_netdev_free_flow(dp, flow);
661     }
662 }
663
664 static int
665 dpif_netdev_flow_flush(struct dpif *dpif)
666 {
667     struct dp_netdev *dp = get_dp_netdev(dpif);
668     dp_netdev_flow_flush(dp);
669     return 0;
670 }
671
672 static int
673 dpif_netdev_port_list(const struct dpif *dpif, struct odp_port *ports, int n)
674 {
675     struct dp_netdev *dp = get_dp_netdev(dpif);
676     struct dp_netdev_port *port;
677     int i;
678
679     i = 0;
680 #ifdef THREADED
681     pthread_mutex_lock(&dp->port_list_mutex);
682 #endif
683     LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
684         struct odp_port *odp_port = &ports[i];
685         if (i >= n) {
686             break;
687         }
688         answer_port_query(port, odp_port);
689         i++;
690     }
691 #ifdef THREADED
692     pthread_mutex_unlock(&dp->port_list_mutex);
693 #endif
694     return dp->n_ports;
695 }
696
697 static int
698 dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED)
699 {
700     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
701     if (dpif->dp_serial != dpif->dp->serial) {
702         dpif->dp_serial = dpif->dp->serial;
703         return ENOBUFS;
704     } else {
705         return EAGAIN;
706     }
707 }
708
709 static void
710 dpif_netdev_port_poll_wait(const struct dpif *dpif_)
711 {
712     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
713     if (dpif->dp_serial != dpif->dp->serial) {
714         poll_immediate_wake();
715     }
716 }
717
718 static int
719 get_port_group(const struct dpif *dpif, int group_no,
720                struct odp_port_group **groupp)
721 {
722     struct dp_netdev *dp = get_dp_netdev(dpif);
723
724     if (group_no >= 0 && group_no < N_GROUPS) {
725         *groupp = &dp->groups[group_no];
726         return 0;
727     } else {
728         *groupp = NULL;
729         return EINVAL;
730     }
731 }
732
733 static int
734 dpif_netdev_port_group_get(const struct dpif *dpif, int group_no,
735                            uint16_t ports[], int n)
736 {
737     struct odp_port_group *group;
738     int error;
739
740     if (n < 0) {
741         return -EINVAL;
742     }
743
744     error = get_port_group(dpif, group_no, &group);
745     if (!error) {
746         memcpy(ports, group->ports, MIN(n, group->n_ports) * sizeof *ports);
747         return group->n_ports;
748     } else {
749         return -error;
750     }
751 }
752
753 static int
754 dpif_netdev_port_group_set(struct dpif *dpif, int group_no,
755                            const uint16_t ports[], int n)
756 {
757     struct odp_port_group *group;
758     int error;
759
760     if (n < 0 || n > MAX_PORTS) {
761         return EINVAL;
762     }
763
764     error = get_port_group(dpif, group_no, &group);
765     if (!error) {
766         free(group->ports);
767         group->ports = xmemdup(ports, n * sizeof *group->ports);
768         group->n_ports = n;
769         group->group = group_no;
770     }
771     return error;
772 }
773
774 static struct dp_netdev_flow *
775 dp_netdev_lookup_flow(struct dp_netdev *dp, const flow_t *key)
776 {
777     struct dp_netdev_flow *flow;
778
779     assert(!key->reserved[0] && !key->reserved[1] && !key->reserved[2]);
780     
781 #ifdef THREADED
782     pthread_mutex_lock(&dp->table_mutex);
783 #endif
784     HMAP_FOR_EACH_WITH_HASH (flow, struct dp_netdev_flow, node,
785                              flow_hash(key, 0), &dp->flow_table) {
786         if (flow_equal(&flow->key, key)) {
787 #ifdef THREADED
788             pthread_mutex_unlock(&dp->table_mutex);
789 #endif
790             return flow;
791         }
792     }
793 #ifdef THREADED
794     pthread_mutex_unlock(&dp->table_mutex);
795 #endif
796     return NULL;
797 }
798
799 static void
800 answer_flow_query(struct dp_netdev_flow *flow, uint32_t query_flags,
801                   struct odp_flow *odp_flow)
802 {
803     if (flow) {
804         odp_flow->key = flow->key;
805         odp_flow->stats.n_packets = flow->packet_count;
806         odp_flow->stats.n_bytes = flow->byte_count;
807         odp_flow->stats.used_sec = flow->used.tv_sec;
808         odp_flow->stats.used_nsec = flow->used.tv_nsec;
809         odp_flow->stats.tcp_flags = TCP_FLAGS(flow->tcp_ctl);
810         odp_flow->stats.reserved = 0;
811         odp_flow->stats.error = 0;
812         if (odp_flow->n_actions > 0) {
813             unsigned int n = MIN(odp_flow->n_actions, flow->n_actions);
814             memcpy(odp_flow->actions, flow->actions,
815                    n * sizeof *odp_flow->actions);
816             odp_flow->n_actions = flow->n_actions;
817         }
818
819         if (query_flags & ODPFF_ZERO_TCP_FLAGS) {
820             flow->tcp_ctl = 0;
821         }
822
823     } else {
824         odp_flow->stats.error = ENOENT;
825     }
826 }
827
828 static int
829 dpif_netdev_flow_get(const struct dpif *dpif, struct odp_flow flows[], int n)
830 {
831     struct dp_netdev *dp = get_dp_netdev(dpif);
832     int i;
833
834     for (i = 0; i < n; i++) {
835         struct odp_flow *odp_flow = &flows[i];
836     struct dp_netdev_flow *lookup_flow;
837
838     lookup_flow = dp_netdev_lookup_flow(dp, &odp_flow->key);
839     if ( lookup_flow == NULL )
840         answer_flow_query(lookup_flow, odp_flow->flags, odp_flow);
841     }
842     return 0;
843 }
844
845 static int
846 dpif_netdev_validate_actions(const union odp_action *actions, int n_actions,
847                              bool *mutates)
848 {
849     unsigned int i;
850
851     *mutates = false;
852     for (i = 0; i < n_actions; i++) {
853         const union odp_action *a = &actions[i];
854         switch (a->type) {
855         case ODPAT_OUTPUT:
856             if (a->output.port >= MAX_PORTS) {
857                 return EINVAL;
858             }
859             break;
860
861         case ODPAT_OUTPUT_GROUP:
862             *mutates = true;
863             if (a->output_group.group >= N_GROUPS) {
864                 return EINVAL;
865             }
866             break;
867
868         case ODPAT_CONTROLLER:
869             break;
870
871         case ODPAT_SET_VLAN_VID:
872             *mutates = true;
873             if (a->vlan_vid.vlan_vid & htons(~VLAN_VID_MASK)) {
874                 return EINVAL;
875             }
876             break;
877
878         case ODPAT_SET_VLAN_PCP:
879             *mutates = true;
880             if (a->vlan_pcp.vlan_pcp & ~(VLAN_PCP_MASK >> VLAN_PCP_SHIFT)) {
881                 return EINVAL;
882             }
883             break;
884
885         case ODPAT_SET_NW_TOS:
886             *mutates = true;
887             if (a->nw_tos.nw_tos & IP_ECN_MASK) {
888                 return EINVAL;
889             }
890             break;
891
892         case ODPAT_STRIP_VLAN:
893         case ODPAT_SET_DL_SRC:
894         case ODPAT_SET_DL_DST:
895         case ODPAT_SET_NW_SRC:
896         case ODPAT_SET_NW_DST:
897         case ODPAT_SET_TP_SRC:
898         case ODPAT_SET_TP_DST:
899             *mutates = true;
900             break;
901
902         default:
903             return EOPNOTSUPP;
904         }
905     }
906     return 0;
907 }
908
909 static int
910 set_flow_actions(struct dp_netdev_flow *flow, struct odp_flow *odp_flow)
911 {
912     size_t n_bytes;
913     bool mutates;
914     int error;
915
916     if (odp_flow->n_actions >= 4096 / sizeof *odp_flow->actions) {
917         return EINVAL;
918     }
919     error = dpif_netdev_validate_actions(odp_flow->actions,
920                                          odp_flow->n_actions, &mutates);
921     if (error) {
922         return error;
923     }
924
925     n_bytes = odp_flow->n_actions * sizeof *flow->actions;
926     flow->actions = xrealloc(flow->actions, n_bytes);
927     flow->n_actions = odp_flow->n_actions;
928     memcpy(flow->actions, odp_flow->actions, n_bytes);
929     return 0;
930 }
931
932 static int
933 add_flow(struct dpif *dpif, struct odp_flow *odp_flow)
934 {
935     struct dp_netdev *dp = get_dp_netdev(dpif);
936     struct dp_netdev_flow *flow;
937     int error;
938
939     flow = xzalloc(sizeof *flow);
940     flow->key = odp_flow->key;
941     memset(flow->key.reserved, 0, sizeof flow->key.reserved);
942
943     error = set_flow_actions(flow, odp_flow);
944     if (error) {
945         free(flow);
946         return error;
947     }
948
949 #ifdef THREADED
950     pthread_mutex_lock(&dp->table_mutex);
951 #endif
952     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
953 #ifdef THREADED
954     pthread_mutex_unlock(&dp->table_mutex);
955 #endif
956     return 0;
957 }
958
959 static void
960 clear_stats(struct dp_netdev_flow *flow)
961 {
962     flow->used.tv_sec = 0;
963     flow->used.tv_nsec = 0;
964     flow->packet_count = 0;
965     flow->byte_count = 0;
966     flow->tcp_ctl = 0;
967 }
968
969 static int
970 dpif_netdev_flow_put(struct dpif *dpif, struct odp_flow_put *put)
971 {
972     struct dp_netdev *dp = get_dp_netdev(dpif);
973     struct dp_netdev_flow *flow;
974     int n_flows;
975
976     flow = dp_netdev_lookup_flow(dp, &put->flow.key);
977     if (!flow) {
978         if (put->flags & ODPPF_CREATE) {
979 #ifdef THREADED
980             pthread_mutex_lock(&dp->table_mutex);
981 #endif
982             n_flows = hmap_count(&dp->flow_table);
983 #ifdef THREADED
984             pthread_mutex_unlock(&dp->table_mutex);
985 #endif
986             if (n_flows < MAX_FLOWS) {
987                 return add_flow(dpif, &put->flow);
988             } else {
989                 return EFBIG;
990             }
991         } else {
992             return ENOENT;
993         }
994     } else {
995         if (put->flags & ODPPF_MODIFY) {
996             int error = set_flow_actions(flow, &put->flow);
997             if (!error && put->flags & ODPPF_ZERO_STATS) {
998                 clear_stats(flow);
999             }
1000             return error;
1001         } else {
1002             return EEXIST;
1003         }
1004     }
1005 }
1006
1007
1008 static int
1009 dpif_netdev_flow_del(struct dpif *dpif, struct odp_flow *odp_flow)
1010 {
1011     struct dp_netdev *dp = get_dp_netdev(dpif);
1012     struct dp_netdev_flow *flow;
1013
1014     flow = dp_netdev_lookup_flow(dp, &odp_flow->key);
1015     if (flow) {
1016         answer_flow_query(flow, 0, odp_flow);
1017         dp_netdev_free_flow(dp, flow);
1018         return 0;
1019     } else {
1020         return ENOENT;
1021     }
1022 }
1023
1024 static int
1025 dpif_netdev_flow_list(const struct dpif *dpif, struct odp_flow flows[], int n)
1026 {
1027     struct dp_netdev *dp = get_dp_netdev(dpif);
1028     struct dp_netdev_flow *flow;
1029     int i, n_flows;
1030
1031     i = 0;
1032 #ifdef THREADED
1033     pthread_mutex_lock(&dp->table_mutex);
1034 #endif
1035     HMAP_FOR_EACH (flow, struct dp_netdev_flow, node, &dp->flow_table) {
1036         if (i >= n) {
1037             break;
1038         }
1039         answer_flow_query(flow, 0, &flows[i++]);
1040     }
1041     n_flows = hmap_count(&dp->flow_table);
1042 #ifdef THREADED
1043     pthread_mutex_unlock(&dp->table_mutex);
1044 #endif
1045
1046     return n_flows;
1047 }
1048
1049 static int
1050 dpif_netdev_execute(struct dpif *dpif, uint16_t in_port,
1051                     const union odp_action actions[], int n_actions,
1052                     const struct ofpbuf *packet)
1053 {
1054     struct dp_netdev *dp = get_dp_netdev(dpif);
1055     struct ofpbuf copy;
1056     bool mutates;
1057     flow_t flow;
1058     int error;
1059
1060     if (packet->size < ETH_HEADER_LEN || packet->size > UINT16_MAX) {
1061         return EINVAL;
1062     }
1063
1064     error = dpif_netdev_validate_actions(actions, n_actions, &mutates);
1065     if (error) {
1066         return error;
1067     }
1068
1069     if (mutates) {
1070         /* We need a deep copy of 'packet' since we're going to modify its
1071          * data. */
1072         ofpbuf_init(&copy, DP_NETDEV_HEADROOM + packet->size);
1073         copy.data = (char*)copy.base + DP_NETDEV_HEADROOM;
1074         ofpbuf_put(&copy, packet->data, packet->size);
1075     } else {
1076         /* We still need a shallow copy of 'packet', even though we won't
1077          * modify its data, because flow_extract() modifies packet->l2, etc.
1078          * We could probably get away with modifying those but it's more polite
1079          * if we don't. */
1080         copy = *packet;
1081     }
1082     flow_extract(&copy, 0, in_port, &flow);
1083     error = dp_netdev_execute_actions(dp, &copy, &flow, actions, n_actions);
1084     if (mutates) {
1085         ofpbuf_uninit(&copy);
1086     }
1087     return error;
1088 }
1089
1090 static int
1091 dpif_netdev_recv_get_mask(const struct dpif *dpif, int *listen_mask)
1092 {
1093     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1094     *listen_mask = dpif_netdev->listen_mask;
1095     return 0;
1096 }
1097
1098 static int
1099 dpif_netdev_recv_set_mask(struct dpif *dpif, int listen_mask)
1100 {
1101     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1102     if (!(listen_mask & ~ODPL_ALL)) {
1103         dpif_netdev->listen_mask = listen_mask;
1104         return 0;
1105     } else {
1106         return EINVAL;
1107     }
1108 }
1109
1110 static struct ovs_queue *
1111 find_nonempty_queue(struct dpif *dpif)
1112 {
1113     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1114     struct dp_netdev *dp = get_dp_netdev(dpif);
1115     int mask = dpif_netdev->listen_mask;
1116     int i;
1117
1118     for (i = 0; i < N_QUEUES; i++) {
1119         struct ovs_queue *q = &dp->queues[i];
1120         if (q->n && mask & (1u << i)) {
1121             return q;
1122         }
1123     }
1124     return NULL;
1125 }
1126
1127 static int
1128 dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp OVS_UNUSED)
1129 {
1130     struct ovs_queue *q;
1131
1132 #ifdef THREADED
1133     struct dp_netdev *dp = get_dp_netdev(dpif);
1134     char c;
1135     pthread_mutex_lock(&dp->table_mutex);
1136 #endif
1137     q = find_nonempty_queue(dpif);
1138     if (q) {
1139         *bufp = queue_pop_head(q);
1140 #ifdef THREADED
1141         /* read a byte from the pipe to advertise that a packet has been
1142          * received */
1143         if (read(dp->pipe[0], &c, 1) < 0) {
1144             printf("Error reading from the pipe\n");
1145         }
1146         pthread_mutex_unlock(&dp->table_mutex);
1147 #endif
1148         return 0;
1149     } else {
1150 #ifdef THREADED
1151         pthread_mutex_unlock(&dp->table_mutex);
1152 #endif
1153         return EAGAIN;
1154     }
1155 }
1156
1157 static void
1158 dpif_netdev_recv_wait(struct dpif *dpif)
1159 {
1160 #ifdef THREADED
1161     struct dp_netdev *dp = get_dp_netdev(dpif);
1162
1163     poll_fd_wait(dp->pipe[0], POLLIN);
1164 #else 
1165     struct ovs_queue *q = find_nonempty_queue(dpif);
1166     if (q) {
1167         poll_immediate_wake();
1168     } else {
1169         /* No messages ready to be received, and dp_wait() will ensure that we
1170          * wake up to queue new messages, so there is nothing to do. */
1171     }
1172 #endif
1173 }
1174 \f
1175
1176 static void
1177 dp_netdev_flow_used(struct dp_netdev_flow *flow, const flow_t *key,
1178                     const struct ofpbuf *packet)
1179 {
1180     time_timespec(&flow->used);
1181     flow->packet_count++;
1182     flow->byte_count += packet->size;
1183     if (key->dl_type == htons(ETH_TYPE_IP) && key->nw_proto == IPPROTO_TCP) {
1184         struct tcp_header *th = packet->l4;
1185         flow->tcp_ctl |= th->tcp_ctl;
1186     }
1187 }
1188
1189 static void
1190 dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
1191                      struct ofpbuf *packet)
1192 {
1193     struct dp_netdev_flow *flow;
1194     flow_t key;
1195
1196     if (packet->size < ETH_HEADER_LEN) {
1197         return;
1198     }
1199     if (flow_extract(packet, 0, port->port_no, &key) && dp->drop_frags) {
1200         dp->n_frags++;
1201         return;
1202     }
1203
1204     flow = dp_netdev_lookup_flow(dp, &key);
1205     if (flow) {
1206         dp_netdev_flow_used(flow, &key, packet);
1207         dp_netdev_execute_actions(dp, packet, &key,
1208                                   flow->actions, flow->n_actions);
1209         dp->n_hit++;
1210     } else {
1211         dp->n_missed++;
1212         dp_netdev_output_control(dp, packet, _ODPL_MISS_NR, port->port_no, 0);
1213     }
1214 }
1215
1216 /*
1217  * This function is no longer called by the threaded version. The same task is
1218  * instead performed in the thread body.
1219  */
1220 static void
1221 dp_netdev_run(void)
1222 {
1223     struct ofpbuf packet;
1224     struct dp_netdev *dp;
1225
1226     ofpbuf_init(&packet, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
1227     LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
1228         struct dp_netdev_port *port;
1229
1230         LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
1231             int error;
1232
1233             /* Reset packet contents. */
1234             packet.data = (char*)packet.base + DP_NETDEV_HEADROOM;
1235             packet.size = 0;
1236
1237             error = netdev_recv(port->netdev, &packet);
1238             if (!error) {
1239                 dp_netdev_port_input(dp, port, &packet);
1240             } else if (error != EAGAIN) {
1241                 struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1242                 VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
1243                             netdev_get_name(port->netdev), strerror(error));
1244             }
1245         }
1246     }
1247     ofpbuf_uninit(&packet);
1248 }
1249
1250 /* This function is no longer called in the threaded version. */
1251 static void
1252 dp_netdev_wait(void)
1253 {
1254     struct dp_netdev *dp;
1255
1256     LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
1257         struct dp_netdev_port *port;
1258         LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
1259             netdev_recv_wait(port->netdev);
1260         }
1261     }
1262 }
1263
1264 #ifdef THREADED
1265 /*
1266  * pcap callback argument
1267  */
1268 struct dispatch_arg {
1269     struct dp_netdev *dp;   /* update statistics */
1270     struct dp_netdev_port *port;    /* argument to flow identifier function */
1271     struct ofpbuf buf;      /* used to process the packet */
1272 };
1273
1274 /* Process a packet.
1275  *
1276  * The port_input function will send immediately if it finds a flow match and
1277  * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
1278  * If a flow is not found or for the other actions, the packet is copied.
1279  */
1280 static void
1281 process_pkt(u_char *arg_p, const struct pkthdr *hdr, const u_char *packet)
1282 {
1283     struct dispatch_arg *arg = (struct dispatch_arg *)arg_p;
1284     struct ofpbuf *buf = &arg->buf;
1285
1286     /* set packet size and data pointer */
1287     buf->size = hdr->caplen; /* XXX Must the size be equal to hdr->len or
1288                               * hdr->caplen */
1289     buf->data = (void*)packet;
1290
1291     dp_netdev_port_input(arg->dp, arg->port, buf);
1292
1293     return;
1294 }
1295
1296 /* Body of the thread that manages the datapaths */
1297 static void*
1298 dp_thread_body(void *args OVS_UNUSED)
1299 {
1300     struct dp_netdev *dp;
1301     struct dp_netdev_port *port;
1302     struct dispatch_arg arg;
1303     int error;
1304     int n_fds;
1305     uint32_t batch = 50; /* max number of pkts processed by the dispatch */
1306     int processed;      /* actual number of pkts processed by the dispatch */
1307
1308     sigset_t sigmask;
1309
1310     /*XXX Since the poll involves all ports of all datapaths, the right fds
1311      * size should be MAX_PORTS * max_number_of_datapaths */
1312     struct pollfd fds[MAX_PORTS]; 
1313     
1314     /* mask the fatal signals. In this way the main thread is delegate to
1315      * manage this them. */
1316     sigemptyset(&sigmask);
1317     sigaddset(&sigmask, SIGTERM);
1318     sigaddset(&sigmask, SIGALRM);
1319     sigaddset(&sigmask, SIGINT);
1320     sigaddset(&sigmask, SIGHUP);
1321
1322     if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
1323         printf("Error pthread_sigmask\n");
1324     }
1325
1326     ofpbuf_init(&arg.buf, DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + max_mtu);
1327     for(;;) {
1328         n_fds = 0;
1329         /* build the structure for poll */
1330         LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
1331             pthread_mutex_lock(&dp->port_list_mutex);
1332             LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
1333                 /* insert an element in the fds structure */
1334                 fds[n_fds].fd = netdev_get_fd(port->netdev);
1335                 fds[n_fds].events = POLLIN;
1336                 port->poll_fd = &fds[n_fds];
1337                 n_fds++;
1338             }
1339             pthread_mutex_unlock(&dp->port_list_mutex);
1340         }
1341
1342         error = poll(fds, n_fds, 2000);
1343
1344         if (error < 0) {
1345             printf("poll() error: %s\n", strerror(errno));
1346             break;
1347         }
1348
1349         LIST_FOR_EACH (dp, struct dp_netdev, node, &dp_netdev_list) {
1350             arg.dp = dp;
1351             pthread_mutex_lock(&dp->port_list_mutex);
1352             LIST_FOR_EACH (port, struct dp_netdev_port, node, &dp->port_list) {
1353                 arg.port = port;
1354                 arg.buf.size = 0;
1355                 arg.buf.data = (char*)arg.buf.base + DP_NETDEV_HEADROOM;
1356                 if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
1357                     /* call the dispatch and process the packet into
1358                      * its callback. We process 'batch' packets at time */
1359                     processed = netdev_dispatch(port->netdev, batch,
1360                                          process_pkt, (u_char *)&arg);
1361                     if (processed < 0) { /* pcap returns error */
1362                         struct vlog_rate_limit rl =
1363                             VLOG_RATE_LIMIT_INIT(1, 5);
1364                         VLOG_ERR_RL(&rl, 
1365                                 "error receiving data from XXX \n"); 
1366                     }
1367                 } /* end of if poll */
1368             } /* end of port loop */
1369         pthread_mutex_unlock(&dp->port_list_mutex);
1370         } /* end of dp loop */
1371     } /* for ;; */
1372
1373     ofpbuf_uninit(&arg.buf);
1374     return NULL;
1375 }
1376
1377 /* Starts the datapath */
1378 static void
1379 dp_netdev_start(void) 
1380 {
1381     int error;
1382
1383     /* Launch thread which manages the datapath */
1384     error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
1385     return;
1386 }
1387
1388 /* This is the function that is called in response of a fatal signal (e.g.
1389  * SIGTERM) */
1390 static void
1391 dp_netdev_exit_hook(void *aux OVS_UNUSED)
1392 {
1393     pthread_cancel(thread_p);
1394     pthread_join(thread_p, NULL);
1395 }
1396 #endif /* THREADED */
1397
1398 /* Modify the TCI field of 'packet'.  If a VLAN tag is not present, one
1399  * is added with the TCI field set to 'tci'.  If a VLAN tag is present,
1400  * then 'mask' bits are cleared before 'tci' is logically OR'd into the
1401  * TCI field.
1402  *
1403  * Note that the function does not ensure that 'tci' does not affect
1404  * bits outside of 'mask'.
1405  */
1406 static void
1407 dp_netdev_modify_vlan_tci(struct ofpbuf *packet, uint16_t tci, uint16_t mask)
1408 {
1409     struct vlan_eth_header *veh;
1410     struct eth_header *eh;
1411
1412     eh = packet->l2;
1413     if (packet->size >= sizeof(struct vlan_eth_header)
1414         && eh->eth_type == htons(ETH_TYPE_VLAN)) {
1415         /* Clear 'mask' bits, but maintain other TCI bits. */
1416         veh = packet->l2;
1417         veh->veth_tci &= ~htons(mask);
1418         veh->veth_tci |= htons(tci);
1419     } else {
1420         /* Insert new 802.1Q header. */
1421         struct eth_header *eh = packet->l2;
1422         struct vlan_eth_header tmp;
1423         memcpy(tmp.veth_dst, eh->eth_dst, ETH_ADDR_LEN);
1424         memcpy(tmp.veth_src, eh->eth_src, ETH_ADDR_LEN);
1425         tmp.veth_type = htons(ETH_TYPE_VLAN);
1426         tmp.veth_tci = htons(tci);
1427         tmp.veth_next_type = eh->eth_type;
1428
1429         veh = ofpbuf_push_uninit(packet, VLAN_HEADER_LEN);
1430         memcpy(veh, &tmp, sizeof tmp);
1431         packet->l2 = (char*)packet->l2 - VLAN_HEADER_LEN;
1432     }
1433 }
1434
1435 static void
1436 dp_netdev_strip_vlan(struct ofpbuf *packet)
1437 {
1438     struct vlan_eth_header *veh = packet->l2;
1439     if (packet->size >= sizeof *veh
1440         && veh->veth_type == htons(ETH_TYPE_VLAN)) {
1441         struct eth_header tmp;
1442
1443         memcpy(tmp.eth_dst, veh->veth_dst, ETH_ADDR_LEN);
1444         memcpy(tmp.eth_src, veh->veth_src, ETH_ADDR_LEN);
1445         tmp.eth_type = veh->veth_next_type;
1446
1447         packet->size -= VLAN_HEADER_LEN;
1448         packet->data = (char*)packet->data + VLAN_HEADER_LEN;
1449         packet->l2 = (char*)packet->l2 + VLAN_HEADER_LEN;
1450         memcpy(packet->data, &tmp, sizeof tmp);
1451     }
1452 }
1453
1454 static void
1455 dp_netdev_set_dl_src(struct ofpbuf *packet, const uint8_t dl_addr[ETH_ADDR_LEN])
1456 {
1457     struct eth_header *eh = packet->l2;
1458     memcpy(eh->eth_src, dl_addr, sizeof eh->eth_src);
1459 }
1460
1461 static void
1462 dp_netdev_set_dl_dst(struct ofpbuf *packet, const uint8_t dl_addr[ETH_ADDR_LEN])
1463 {
1464     struct eth_header *eh = packet->l2;
1465     memcpy(eh->eth_dst, dl_addr, sizeof eh->eth_dst);
1466 }
1467
1468 static bool
1469 is_ip(const struct ofpbuf *packet, const flow_t *key)
1470 {
1471     return key->dl_type == htons(ETH_TYPE_IP) && packet->l4;
1472 }
1473
1474 static void
1475 dp_netdev_set_nw_addr(struct ofpbuf *packet, const flow_t *key,
1476                       const struct odp_action_nw_addr *a)
1477 {
1478     if (is_ip(packet, key)) {
1479         struct ip_header *nh = packet->l3;
1480         uint32_t *field;
1481
1482         field = a->type == ODPAT_SET_NW_SRC ? &nh->ip_src : &nh->ip_dst;
1483         if (key->nw_proto == IP_TYPE_TCP && packet->l7) {
1484             struct tcp_header *th = packet->l4;
1485             th->tcp_csum = recalc_csum32(th->tcp_csum, *field, a->nw_addr);
1486         } else if (key->nw_proto == IP_TYPE_UDP && packet->l7) {
1487             struct udp_header *uh = packet->l4;
1488             if (uh->udp_csum) {
1489                 uh->udp_csum = recalc_csum32(uh->udp_csum, *field, a->nw_addr);
1490                 if (!uh->udp_csum) {
1491                     uh->udp_csum = 0xffff;
1492                 }
1493             }
1494         }
1495         nh->ip_csum = recalc_csum32(nh->ip_csum, *field, a->nw_addr);
1496         *field = a->nw_addr;
1497     }
1498 }
1499
1500 static void
1501 dp_netdev_set_nw_tos(struct ofpbuf *packet, const flow_t *key,
1502                      const struct odp_action_nw_tos *a)
1503 {
1504     if (is_ip(packet, key)) {
1505         struct ip_header *nh = packet->l3;
1506         uint8_t *field = &nh->ip_tos;
1507
1508         /* Set the DSCP bits and preserve the ECN bits. */
1509         uint8_t new = a->nw_tos | (nh->ip_tos & IP_ECN_MASK);
1510
1511         nh->ip_csum = recalc_csum16(nh->ip_csum, htons((uint16_t)*field),
1512                 htons((uint16_t)a->nw_tos));
1513         *field = new;
1514     }
1515 }
1516
1517 static void
1518 dp_netdev_set_tp_port(struct ofpbuf *packet, const flow_t *key,
1519                       const struct odp_action_tp_port *a)
1520 {
1521     if (is_ip(packet, key)) {
1522         uint16_t *field;
1523         if (key->nw_proto == IPPROTO_TCP && packet->l7) {
1524             struct tcp_header *th = packet->l4;
1525             field = a->type == ODPAT_SET_TP_SRC ? &th->tcp_src : &th->tcp_dst;
1526             th->tcp_csum = recalc_csum16(th->tcp_csum, *field, a->tp_port);
1527             *field = a->tp_port;
1528         } else if (key->nw_proto == IPPROTO_UDP && packet->l7) {
1529             struct udp_header *uh = packet->l4;
1530             field = a->type == ODPAT_SET_TP_SRC ? &uh->udp_src : &uh->udp_dst;
1531             uh->udp_csum = recalc_csum16(uh->udp_csum, *field, a->tp_port);
1532             *field = a->tp_port;
1533         } else {
1534             return;
1535         }
1536     }
1537 }
1538
1539 static void
1540 dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
1541                       uint16_t out_port)
1542 {
1543     struct dp_netdev_port *p = dp->ports[out_port];
1544     if (p) {
1545         netdev_send(p->netdev, packet);
1546     }
1547 }
1548
1549 static void
1550 dp_netdev_output_group(struct dp_netdev *dp, uint16_t group, uint16_t in_port,
1551                        struct ofpbuf *packet)
1552 {
1553     struct odp_port_group *g = &dp->groups[group];
1554     int i;
1555
1556     for (i = 0; i < g->n_ports; i++) {
1557         uint16_t out_port = g->ports[i];
1558         if (out_port != in_port) {
1559             dp_netdev_output_port(dp, packet, out_port);
1560         }
1561     }
1562 }
1563
1564 static int
1565 dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
1566                          int queue_no, int port_no, uint32_t arg)
1567 {
1568     struct ovs_queue *q = &dp->queues[queue_no];
1569     struct odp_msg *header;
1570     struct ofpbuf *msg;
1571     size_t msg_size;
1572 #ifdef THREADED
1573     char c;
1574 #endif
1575
1576     if (q->n >= MAX_QUEUE_LEN) {
1577         dp->n_lost++;
1578         return ENOBUFS;
1579     }
1580
1581     msg_size = sizeof *header + packet->size;
1582     msg = ofpbuf_new_with_headroom(msg_size, DPIF_RECV_MSG_PADDING);
1583     header = ofpbuf_put_uninit(msg, sizeof *header);
1584     header->type = queue_no;
1585     header->length = msg_size;
1586     header->port = port_no;
1587     header->arg = arg;
1588     ofpbuf_put(msg, packet->data, packet->size);
1589 #ifdef THREADED
1590     pthread_mutex_lock(&dp->table_mutex);
1591 #endif
1592     
1593     queue_push_tail(q, msg);
1594 #ifdef THREADED
1595     /* write a byte on the pipe to advertise that a packet is ready */
1596     if (write(dp->pipe[1], &c, 1) < 0) {
1597         printf("Error writing on the pipe\n");
1598     }
1599     pthread_mutex_unlock(&dp->table_mutex);
1600 #endif
1601
1602     return 0;
1603 }
1604
1605 /* Returns true if 'packet' is an invalid Ethernet+IPv4 ARP packet: one with
1606  * screwy or truncated header fields or one whose inner and outer Ethernet
1607  * address differ. */
1608 static bool
1609 dp_netdev_is_spoofed_arp(struct ofpbuf *packet, const struct odp_flow_key *key)
1610 {
1611     struct arp_eth_header *arp;
1612     struct eth_header *eth;
1613     ptrdiff_t l3_size;
1614
1615     if (key->dl_type != htons(ETH_TYPE_ARP)) {
1616         return false;
1617     }
1618
1619     l3_size = (char *) ofpbuf_end(packet) - (char *) packet->l3;
1620     if (l3_size < sizeof(struct arp_eth_header)) {
1621         return true;
1622     }
1623
1624     eth = packet->l2;
1625     arp = packet->l3;
1626     return (arp->ar_hrd != htons(ARP_HRD_ETHERNET)
1627             || arp->ar_pro != htons(ARP_PRO_IP)
1628             || arp->ar_hln != ETH_HEADER_LEN
1629             || arp->ar_pln != 4
1630             || !eth_addr_equals(arp->ar_sha, eth->eth_src));
1631 }
1632
1633 /*
1634  * Execute the actions associated to a flow.
1635  */
1636 static int
1637 dp_netdev_execute_actions(struct dp_netdev *dp,
1638                           struct ofpbuf *packet, const flow_t *key,
1639                           const union odp_action *actions, int n_actions)
1640 {
1641     int i;
1642
1643     for (i = 0; i < n_actions; i++) {
1644         const union odp_action *a = &actions[i];
1645
1646         switch (a->type) {
1647         case ODPAT_OUTPUT:
1648             dp_netdev_output_port(dp, packet, a->output.port);
1649             break;
1650
1651         case ODPAT_OUTPUT_GROUP:
1652             dp_netdev_output_group(dp, a->output_group.group, key->in_port,
1653                                    packet);
1654             break;
1655
1656         case ODPAT_CONTROLLER:
1657             dp_netdev_output_control(dp, packet, _ODPL_ACTION_NR,
1658                                      key->in_port, a->controller.arg);
1659             break;
1660
1661         case ODPAT_SET_VLAN_VID:
1662             dp_netdev_modify_vlan_tci(packet, ntohs(a->vlan_vid.vlan_vid),
1663                                       VLAN_VID_MASK);
1664             break;
1665
1666         case ODPAT_SET_VLAN_PCP:
1667             dp_netdev_modify_vlan_tci(packet,
1668                                       a->vlan_pcp.vlan_pcp << VLAN_PCP_SHIFT,
1669                                       VLAN_PCP_MASK);
1670             break;
1671
1672         case ODPAT_STRIP_VLAN:
1673             dp_netdev_strip_vlan(packet);
1674             break;
1675
1676         case ODPAT_SET_DL_SRC:
1677             dp_netdev_set_dl_src(packet, a->dl_addr.dl_addr);
1678             break;
1679
1680         case ODPAT_SET_DL_DST:
1681             dp_netdev_set_dl_dst(packet, a->dl_addr.dl_addr);
1682             break;
1683
1684         case ODPAT_SET_NW_SRC:
1685         case ODPAT_SET_NW_DST:
1686             dp_netdev_set_nw_addr(packet, key, &a->nw_addr);
1687             break;
1688
1689         case ODPAT_SET_NW_TOS:
1690             dp_netdev_set_nw_tos(packet, key, &a->nw_tos);
1691             break;
1692
1693         case ODPAT_SET_TP_SRC:
1694         case ODPAT_SET_TP_DST:
1695             dp_netdev_set_tp_port(packet, key, &a->tp_port);
1696             break;
1697
1698         case ODPAT_DROP_SPOOFED_ARP:
1699             if (dp_netdev_is_spoofed_arp(packet, key)) {
1700                 return 0;
1701             }
1702         }
1703     }
1704     return 0;
1705 }
1706
1707 const struct dpif_class dpif_netdev_class = {
1708     "netdev",
1709     dp_netdev_run,
1710     dp_netdev_wait,
1711 #ifdef THREADED
1712     dp_netdev_start, 
1713     dp_netdev_exit_hook,
1714 #endif
1715     NULL,                       /* enumerate */
1716     dpif_netdev_open,
1717     dpif_netdev_close,
1718     NULL,                       /* get_all_names */
1719     dpif_netdev_destroy,
1720     dpif_netdev_get_stats,
1721     dpif_netdev_get_drop_frags,
1722     dpif_netdev_set_drop_frags,
1723     dpif_netdev_port_add,
1724     dpif_netdev_port_del,
1725     dpif_netdev_port_query_by_number,
1726     dpif_netdev_port_query_by_name,
1727     dpif_netdev_port_list,
1728     dpif_netdev_port_poll,
1729     dpif_netdev_port_poll_wait,
1730     dpif_netdev_port_group_get,
1731     dpif_netdev_port_group_set,
1732     dpif_netdev_flow_get,
1733     dpif_netdev_flow_put,
1734     dpif_netdev_flow_del,
1735     dpif_netdev_flow_flush,
1736     dpif_netdev_flow_list,
1737     dpif_netdev_execute,
1738     dpif_netdev_recv_get_mask,
1739     dpif_netdev_recv_set_mask,
1740     NULL,                       /* get_sflow_probability */
1741     NULL,                       /* set_sflow_probability */
1742     NULL,                       /* queue_to_priority */
1743     dpif_netdev_recv,
1744     dpif_netdev_recv_wait,
1745 };