dpif-netdev: Make internally thread-safe by introducing a global mutex.
[sliver-openvswitch.git] / lib / dpif-netdev.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "dpif.h"
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <net/if.h>
27 #include <stdint.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33
34 #include "csum.h"
35 #include "dpif.h"
36 #include "dpif-provider.h"
37 #include "dummy.h"
38 #include "dynamic-string.h"
39 #include "flow.h"
40 #include "hmap.h"
41 #include "list.h"
42 #include "netdev.h"
43 #include "netdev-vport.h"
44 #include "netlink.h"
45 #include "odp-execute.h"
46 #include "odp-util.h"
47 #include "ofp-print.h"
48 #include "ofpbuf.h"
49 #include "packets.h"
50 #include "poll-loop.h"
51 #include "random.h"
52 #include "shash.h"
53 #include "sset.h"
54 #include "timeval.h"
55 #include "util.h"
56 #include "vlog.h"
57
58 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
59
60 /* Configuration parameters. */
61 enum { MAX_PORTS = 256 };       /* Maximum number of ports. */
62 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
63
64 /* Enough headroom to add a vlan tag, plus an extra 2 bytes to allow IP
65  * headers to be aligned on a 4-byte boundary.  */
66 enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
67
68 /* Queues. */
69 enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
70 enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
71 enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
72 BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
73
74 struct dp_netdev_upcall {
75     struct dpif_upcall upcall;  /* Queued upcall information. */
76     struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
77 };
78
79 struct dp_netdev_queue {
80     struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN];
81     unsigned int head, tail;
82 };
83
84 /* Datapath based on the network device interface from netdev.h. */
85 struct dp_netdev {
86     const struct dpif_class *class;
87     char *name;
88     int open_cnt;
89     bool destroyed;
90     int max_mtu;                /* Maximum MTU of any port added so far. */
91
92     struct dp_netdev_queue queues[N_QUEUES];
93     struct hmap flow_table;     /* Flow table. */
94
95     /* Statistics. */
96     long long int n_hit;        /* Number of flow table matches. */
97     long long int n_missed;     /* Number of flow table misses. */
98     long long int n_lost;       /* Number of misses not passed to client. */
99
100     /* Ports. */
101     struct dp_netdev_port *ports[MAX_PORTS];
102     struct list port_list;
103     unsigned int serial;
104 };
105
106 /* A port in a netdev-based datapath. */
107 struct dp_netdev_port {
108     odp_port_t port_no;         /* Index into dp_netdev's 'ports'. */
109     struct list node;           /* Element in dp_netdev's 'port_list'. */
110     struct netdev *netdev;
111     struct netdev_saved_flags *sf;
112     struct netdev_rx *rx;
113     char *type;                 /* Port type as requested by user. */
114 };
115
116 /* A flow in dp_netdev's 'flow_table'. */
117 struct dp_netdev_flow {
118     struct hmap_node node;      /* Element in dp_netdev's 'flow_table'. */
119     struct flow key;
120
121     /* Statistics. */
122     long long int used;         /* Last used time, in monotonic msecs. */
123     long long int packet_count; /* Number of packets matched. */
124     long long int byte_count;   /* Number of bytes matched. */
125     uint8_t tcp_flags;          /* Bitwise-OR of seen tcp_flags values. */
126
127     /* Actions. */
128     struct nlattr *actions;
129     size_t actions_len;
130 };
131
132 /* Interface to netdev-based datapath. */
133 struct dpif_netdev {
134     struct dpif dpif;
135     struct dp_netdev *dp;
136     unsigned int dp_serial;
137 };
138
139 /* All netdev-based datapaths. */
140 static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
141
142 /* Global lock for all data. */
143 static pthread_mutex_t dp_netdev_mutex = PTHREAD_MUTEX_INITIALIZER;
144
145 static int get_port_by_number(struct dp_netdev *, odp_port_t port_no,
146                               struct dp_netdev_port **portp);
147 static int get_port_by_name(struct dp_netdev *, const char *devname,
148                             struct dp_netdev_port **portp);
149 static void dp_netdev_free(struct dp_netdev *);
150 static void dp_netdev_flow_flush(struct dp_netdev *);
151 static int do_add_port(struct dp_netdev *, const char *devname,
152                        const char *type, odp_port_t port_no);
153 static int do_del_port(struct dp_netdev *, odp_port_t port_no);
154 static int dpif_netdev_open(const struct dpif_class *, const char *name,
155                             bool create, struct dpif **);
156 static int dp_netdev_output_userspace(struct dp_netdev *, const struct ofpbuf *,
157                                     int queue_no, const struct flow *,
158                                     const struct nlattr *userdata);
159 static void dp_netdev_execute_actions(struct dp_netdev *,
160                                       struct ofpbuf *, struct flow *,
161                                       const struct nlattr *actions,
162                                       size_t actions_len);
163 static void dp_netdev_port_input(struct dp_netdev *dp,
164                                  struct dp_netdev_port *port,
165                                  struct ofpbuf *packet, uint32_t skb_priority,
166                                  uint32_t skb_mark, const struct flow_tnl *tnl);
167
168 static struct dpif_netdev *
169 dpif_netdev_cast(const struct dpif *dpif)
170 {
171     ovs_assert(dpif->dpif_class->open == dpif_netdev_open);
172     return CONTAINER_OF(dpif, struct dpif_netdev, dpif);
173 }
174
175 static struct dp_netdev *
176 get_dp_netdev(const struct dpif *dpif)
177 {
178     return dpif_netdev_cast(dpif)->dp;
179 }
180
181 static int
182 dpif_netdev_enumerate(struct sset *all_dps)
183 {
184     struct shash_node *node;
185
186     xpthread_mutex_lock(&dp_netdev_mutex);
187     SHASH_FOR_EACH(node, &dp_netdevs) {
188         sset_add(all_dps, node->name);
189     }
190     xpthread_mutex_unlock(&dp_netdev_mutex);
191
192     return 0;
193 }
194
195 static bool
196 dpif_netdev_class_is_dummy(const struct dpif_class *class)
197 {
198     return class != &dpif_netdev_class;
199 }
200
201 static const char *
202 dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
203 {
204     return strcmp(type, "internal") ? type
205                   : dpif_netdev_class_is_dummy(class) ? "dummy"
206                   : "tap";
207 }
208
209 static struct dpif *
210 create_dpif_netdev(struct dp_netdev *dp)
211 {
212     uint16_t netflow_id = hash_string(dp->name, 0);
213     struct dpif_netdev *dpif;
214
215     dp->open_cnt++;
216
217     dpif = xmalloc(sizeof *dpif);
218     dpif_init(&dpif->dpif, dp->class, dp->name, netflow_id >> 8, netflow_id);
219     dpif->dp = dp;
220     dpif->dp_serial = dp->serial;
221
222     return &dpif->dpif;
223 }
224
225 /* Choose an unused, non-zero port number and return it on success.
226  * Return ODPP_NONE on failure. */
227 static odp_port_t
228 choose_port(struct dp_netdev *dp, const char *name)
229 {
230     uint32_t port_no;
231
232     if (dp->class != &dpif_netdev_class) {
233         const char *p;
234         int start_no = 0;
235
236         /* If the port name begins with "br", start the number search at
237          * 100 to make writing tests easier. */
238         if (!strncmp(name, "br", 2)) {
239             start_no = 100;
240         }
241
242         /* If the port name contains a number, try to assign that port number.
243          * This can make writing unit tests easier because port numbers are
244          * predictable. */
245         for (p = name; *p != '\0'; p++) {
246             if (isdigit((unsigned char) *p)) {
247                 port_no = start_no + strtol(p, NULL, 10);
248                 if (port_no > 0 && port_no < MAX_PORTS
249                     && !dp->ports[port_no]) {
250                     return u32_to_odp(port_no);
251                 }
252                 break;
253             }
254         }
255     }
256
257     for (port_no = 1; port_no < MAX_PORTS; port_no++) {
258         if (!dp->ports[port_no]) {
259             return u32_to_odp(port_no);
260         }
261     }
262
263     return ODPP_NONE;
264 }
265
266 static int
267 create_dp_netdev(const char *name, const struct dpif_class *class,
268                  struct dp_netdev **dpp)
269 {
270     struct dp_netdev *dp;
271     int error;
272     int i;
273
274     dp = xzalloc(sizeof *dp);
275     dp->class = class;
276     dp->name = xstrdup(name);
277     dp->open_cnt = 0;
278     dp->max_mtu = ETH_PAYLOAD_MAX;
279     for (i = 0; i < N_QUEUES; i++) {
280         dp->queues[i].head = dp->queues[i].tail = 0;
281     }
282     hmap_init(&dp->flow_table);
283     list_init(&dp->port_list);
284
285     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
286     if (error) {
287         dp_netdev_free(dp);
288         return error;
289     }
290
291     shash_add(&dp_netdevs, name, dp);
292
293     *dpp = dp;
294     return 0;
295 }
296
297 static int
298 dpif_netdev_open(const struct dpif_class *class, const char *name,
299                  bool create, struct dpif **dpifp)
300 {
301     struct dp_netdev *dp;
302     int error;
303
304     xpthread_mutex_lock(&dp_netdev_mutex);
305     dp = shash_find_data(&dp_netdevs, name);
306     if (!dp) {
307         error = create ? create_dp_netdev(name, class, &dp) : ENODEV;
308     } else {
309         error = (dp->class != class ? EINVAL
310                  : create ? EEXIST
311                  : 0);
312     }
313     if (!error) {
314         *dpifp = create_dpif_netdev(dp);
315     }
316     xpthread_mutex_unlock(&dp_netdev_mutex);
317
318     return error;
319 }
320
321 static void
322 dp_netdev_purge_queues(struct dp_netdev *dp)
323 {
324     int i;
325
326     for (i = 0; i < N_QUEUES; i++) {
327         struct dp_netdev_queue *q = &dp->queues[i];
328
329         while (q->tail != q->head) {
330             struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
331             ofpbuf_uninit(&u->buf);
332         }
333     }
334 }
335
336 static void
337 dp_netdev_free(struct dp_netdev *dp)
338 {
339     struct dp_netdev_port *port, *next;
340
341     dp_netdev_flow_flush(dp);
342     LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
343         do_del_port(dp, port->port_no);
344     }
345     dp_netdev_purge_queues(dp);
346     hmap_destroy(&dp->flow_table);
347     free(dp->name);
348     free(dp);
349 }
350
351 static void
352 dpif_netdev_close(struct dpif *dpif)
353 {
354     struct dp_netdev *dp = get_dp_netdev(dpif);
355
356     xpthread_mutex_lock(&dp_netdev_mutex);
357
358     ovs_assert(dp->open_cnt > 0);
359     if (--dp->open_cnt == 0 && dp->destroyed) {
360         shash_find_and_delete(&dp_netdevs, dp->name);
361         dp_netdev_free(dp);
362     }
363     free(dpif);
364
365     xpthread_mutex_unlock(&dp_netdev_mutex);
366 }
367
368 static int
369 dpif_netdev_destroy(struct dpif *dpif)
370 {
371     struct dp_netdev *dp = get_dp_netdev(dpif);
372
373     xpthread_mutex_lock(&dp_netdev_mutex);
374     dp->destroyed = true;
375     xpthread_mutex_unlock(&dp_netdev_mutex);
376
377     return 0;
378 }
379
380 static int
381 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
382 {
383     struct dp_netdev *dp = get_dp_netdev(dpif);
384
385     xpthread_mutex_lock(&dp_netdev_mutex);
386     stats->n_flows = hmap_count(&dp->flow_table);
387     stats->n_hit = dp->n_hit;
388     stats->n_missed = dp->n_missed;
389     stats->n_lost = dp->n_lost;
390     xpthread_mutex_unlock(&dp_netdev_mutex);
391
392     return 0;
393 }
394
395 static int
396 do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
397             odp_port_t port_no)
398 {
399     struct netdev_saved_flags *sf;
400     struct dp_netdev_port *port;
401     struct netdev *netdev;
402     struct netdev_rx *rx;
403     const char *open_type;
404     int mtu;
405     int error;
406
407     /* XXX reject devices already in some dp_netdev. */
408
409     /* Open and validate network device. */
410     open_type = dpif_netdev_port_open_type(dp->class, type);
411     error = netdev_open(devname, open_type, &netdev);
412     if (error) {
413         return error;
414     }
415     /* XXX reject loopback devices */
416     /* XXX reject non-Ethernet devices */
417
418     error = netdev_rx_open(netdev, &rx);
419     if (error
420         && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
421         VLOG_ERR("%s: cannot receive packets on this network device (%s)",
422                  devname, ovs_strerror(errno));
423         netdev_close(netdev);
424         return error;
425     }
426
427     error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
428     if (error) {
429         netdev_rx_close(rx);
430         netdev_close(netdev);
431         return error;
432     }
433
434     port = xmalloc(sizeof *port);
435     port->port_no = port_no;
436     port->netdev = netdev;
437     port->sf = sf;
438     port->rx = rx;
439     port->type = xstrdup(type);
440
441     error = netdev_get_mtu(netdev, &mtu);
442     if (!error && mtu > dp->max_mtu) {
443         dp->max_mtu = mtu;
444     }
445
446     list_push_back(&dp->port_list, &port->node);
447     dp->ports[odp_to_u32(port_no)] = port;
448     dp->serial++;
449
450     return 0;
451 }
452
453 static int
454 dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev,
455                      odp_port_t *port_nop)
456 {
457     struct dp_netdev *dp = get_dp_netdev(dpif);
458     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
459     const char *dpif_port;
460     odp_port_t port_no;
461     int error;
462
463     xpthread_mutex_lock(&dp_netdev_mutex);
464     dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
465     if (*port_nop != ODPP_NONE) {
466         uint32_t port_idx = odp_to_u32(*port_nop);
467         if (port_idx >= MAX_PORTS) {
468             error = EFBIG;
469         } else if (dp->ports[port_idx]) {
470             error = EBUSY;
471         } else {
472             error = 0;
473             port_no = *port_nop;
474         }
475     } else {
476         port_no = choose_port(dp, dpif_port);
477         error = port_no == ODPP_NONE ? EFBIG : 0;
478     }
479     if (!error) {
480         *port_nop = port_no;
481         error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no);
482     }
483     xpthread_mutex_unlock(&dp_netdev_mutex);
484
485     return error;
486 }
487
488 static int
489 dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no)
490 {
491     struct dp_netdev *dp = get_dp_netdev(dpif);
492     int error;
493
494     xpthread_mutex_lock(&dp_netdev_mutex);
495     error = port_no == ODPP_LOCAL ? EINVAL : do_del_port(dp, port_no);
496     xpthread_mutex_unlock(&dp_netdev_mutex);
497
498     return error;
499 }
500
501 static bool
502 is_valid_port_number(odp_port_t port_no)
503 {
504     return odp_to_u32(port_no) < MAX_PORTS;
505 }
506
507 static int
508 get_port_by_number(struct dp_netdev *dp,
509                    odp_port_t port_no, struct dp_netdev_port **portp)
510 {
511     if (!is_valid_port_number(port_no)) {
512         *portp = NULL;
513         return EINVAL;
514     } else {
515         *portp = dp->ports[odp_to_u32(port_no)];
516         return *portp ? 0 : ENOENT;
517     }
518 }
519
520 static int
521 get_port_by_name(struct dp_netdev *dp,
522                  const char *devname, struct dp_netdev_port **portp)
523 {
524     struct dp_netdev_port *port;
525
526     LIST_FOR_EACH (port, node, &dp->port_list) {
527         if (!strcmp(netdev_get_name(port->netdev), devname)) {
528             *portp = port;
529             return 0;
530         }
531     }
532     return ENOENT;
533 }
534
535 static int
536 do_del_port(struct dp_netdev *dp, odp_port_t port_no)
537 {
538     struct dp_netdev_port *port;
539     int error;
540
541     error = get_port_by_number(dp, port_no, &port);
542     if (error) {
543         return error;
544     }
545
546     list_remove(&port->node);
547     dp->ports[odp_to_u32(port_no)] = NULL;
548     dp->serial++;
549
550     netdev_close(port->netdev);
551     netdev_restore_flags(port->sf);
552     netdev_rx_close(port->rx);
553     free(port->type);
554     free(port);
555
556     return 0;
557 }
558
559 static void
560 answer_port_query(const struct dp_netdev_port *port,
561                   struct dpif_port *dpif_port)
562 {
563     dpif_port->name = xstrdup(netdev_get_name(port->netdev));
564     dpif_port->type = xstrdup(port->type);
565     dpif_port->port_no = port->port_no;
566 }
567
568 static int
569 dpif_netdev_port_query_by_number(const struct dpif *dpif, odp_port_t port_no,
570                                  struct dpif_port *dpif_port)
571 {
572     struct dp_netdev *dp = get_dp_netdev(dpif);
573     struct dp_netdev_port *port;
574     int error;
575
576     xpthread_mutex_lock(&dp_netdev_mutex);
577     error = get_port_by_number(dp, port_no, &port);
578     if (!error && dpif_port) {
579         answer_port_query(port, dpif_port);
580     }
581     xpthread_mutex_unlock(&dp_netdev_mutex);
582
583     return error;
584 }
585
586 static int
587 dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
588                                struct dpif_port *dpif_port)
589 {
590     struct dp_netdev *dp = get_dp_netdev(dpif);
591     struct dp_netdev_port *port;
592     int error;
593
594     xpthread_mutex_lock(&dp_netdev_mutex);
595     error = get_port_by_name(dp, devname, &port);
596     if (!error && dpif_port) {
597         answer_port_query(port, dpif_port);
598     }
599     xpthread_mutex_unlock(&dp_netdev_mutex);
600
601     return error;
602 }
603
604 static odp_port_t
605 dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
606 {
607     return u32_to_odp(MAX_PORTS);
608 }
609
610 static void
611 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
612 {
613     hmap_remove(&dp->flow_table, &flow->node);
614     free(flow->actions);
615     free(flow);
616 }
617
618 static void
619 dp_netdev_flow_flush(struct dp_netdev *dp)
620 {
621     struct dp_netdev_flow *flow, *next;
622
623     HMAP_FOR_EACH_SAFE (flow, next, node, &dp->flow_table) {
624         dp_netdev_free_flow(dp, flow);
625     }
626 }
627
628 static int
629 dpif_netdev_flow_flush(struct dpif *dpif)
630 {
631     struct dp_netdev *dp = get_dp_netdev(dpif);
632
633     xpthread_mutex_lock(&dp_netdev_mutex);
634     dp_netdev_flow_flush(dp);
635     xpthread_mutex_unlock(&dp_netdev_mutex);
636
637     return 0;
638 }
639
640 struct dp_netdev_port_state {
641     odp_port_t port_no;
642     char *name;
643 };
644
645 static int
646 dpif_netdev_port_dump_start(const struct dpif *dpif OVS_UNUSED, void **statep)
647 {
648     *statep = xzalloc(sizeof(struct dp_netdev_port_state));
649     return 0;
650 }
651
652 static int
653 dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_,
654                            struct dpif_port *dpif_port)
655 {
656     struct dp_netdev_port_state *state = state_;
657     struct dp_netdev *dp = get_dp_netdev(dpif);
658     uint32_t port_idx;
659
660     xpthread_mutex_lock(&dp_netdev_mutex);
661     for (port_idx = odp_to_u32(state->port_no);
662          port_idx < MAX_PORTS; port_idx++) {
663         struct dp_netdev_port *port = dp->ports[port_idx];
664         if (port) {
665             free(state->name);
666             state->name = xstrdup(netdev_get_name(port->netdev));
667             dpif_port->name = state->name;
668             dpif_port->type = port->type;
669             dpif_port->port_no = port->port_no;
670             state->port_no = u32_to_odp(port_idx + 1);
671             xpthread_mutex_unlock(&dp_netdev_mutex);
672
673             return 0;
674         }
675     }
676     xpthread_mutex_unlock(&dp_netdev_mutex);
677
678     return EOF;
679 }
680
681 static int
682 dpif_netdev_port_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
683 {
684     struct dp_netdev_port_state *state = state_;
685     free(state->name);
686     free(state);
687     return 0;
688 }
689
690 static int
691 dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED)
692 {
693     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
694     int error;
695
696     xpthread_mutex_lock(&dp_netdev_mutex);
697     if (dpif->dp_serial != dpif->dp->serial) {
698         dpif->dp_serial = dpif->dp->serial;
699         error = ENOBUFS;
700     } else {
701         error = EAGAIN;
702     }
703     xpthread_mutex_unlock(&dp_netdev_mutex);
704
705     return error;
706 }
707
708 static void
709 dpif_netdev_port_poll_wait(const struct dpif *dpif_)
710 {
711     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
712
713     /* XXX In a multithreaded process, there is a race window between this
714      * function and the poll_block() in one thread and a change in
715      * dpif->dp->serial in another thread. */
716
717     xpthread_mutex_lock(&dp_netdev_mutex);
718     if (dpif->dp_serial != dpif->dp->serial) {
719         poll_immediate_wake();
720     }
721     xpthread_mutex_unlock(&dp_netdev_mutex);
722 }
723
724 static struct dp_netdev_flow *
725 dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
726 {
727     struct dp_netdev_flow *flow;
728
729     HMAP_FOR_EACH_WITH_HASH (flow, node, flow_hash(key, 0), &dp->flow_table) {
730         if (flow_equal(&flow->key, key)) {
731             return flow;
732         }
733     }
734     return NULL;
735 }
736
737 static void
738 get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
739 {
740     stats->n_packets = flow->packet_count;
741     stats->n_bytes = flow->byte_count;
742     stats->used = flow->used;
743     stats->tcp_flags = flow->tcp_flags;
744 }
745
746 static int
747 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
748                               struct flow *flow)
749 {
750     odp_port_t in_port;
751
752     if (odp_flow_key_to_flow(key, key_len, flow) != ODP_FIT_PERFECT) {
753         /* This should not happen: it indicates that odp_flow_key_from_flow()
754          * and odp_flow_key_to_flow() disagree on the acceptable form of a
755          * flow.  Log the problem as an error, with enough details to enable
756          * debugging. */
757         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
758
759         if (!VLOG_DROP_ERR(&rl)) {
760             struct ds s;
761
762             ds_init(&s);
763             odp_flow_key_format(key, key_len, &s);
764             VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
765             ds_destroy(&s);
766         }
767
768         return EINVAL;
769     }
770
771     in_port = flow->in_port.odp_port;
772     if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
773         return EINVAL;
774     }
775
776     return 0;
777 }
778
779 static int
780 dpif_netdev_flow_get(const struct dpif *dpif,
781                      const struct nlattr *nl_key, size_t nl_key_len,
782                      struct ofpbuf **actionsp, struct dpif_flow_stats *stats)
783 {
784     struct dp_netdev *dp = get_dp_netdev(dpif);
785     struct dp_netdev_flow *flow;
786     struct flow key;
787     int error;
788
789     error = dpif_netdev_flow_from_nlattrs(nl_key, nl_key_len, &key);
790     if (error) {
791         return error;
792     }
793
794     xpthread_mutex_lock(&dp_netdev_mutex);
795     flow = dp_netdev_lookup_flow(dp, &key);
796     if (flow) {
797         if (stats) {
798             get_dpif_flow_stats(flow, stats);
799         }
800         if (actionsp) {
801             *actionsp = ofpbuf_clone_data(flow->actions, flow->actions_len);
802         }
803     } else {
804         error = ENOENT;
805     }
806     xpthread_mutex_unlock(&dp_netdev_mutex);
807
808     return error;
809 }
810
811 static int
812 set_flow_actions(struct dp_netdev_flow *flow,
813                  const struct nlattr *actions, size_t actions_len)
814 {
815     flow->actions = xrealloc(flow->actions, actions_len);
816     flow->actions_len = actions_len;
817     memcpy(flow->actions, actions, actions_len);
818     return 0;
819 }
820
821 static int
822 dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *key,
823                    const struct nlattr *actions, size_t actions_len)
824 {
825     struct dp_netdev_flow *flow;
826     int error;
827
828     flow = xzalloc(sizeof *flow);
829     flow->key = *key;
830
831     error = set_flow_actions(flow, actions, actions_len);
832     if (error) {
833         free(flow);
834         return error;
835     }
836
837     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
838     return 0;
839 }
840
841 static void
842 clear_stats(struct dp_netdev_flow *flow)
843 {
844     flow->used = 0;
845     flow->packet_count = 0;
846     flow->byte_count = 0;
847     flow->tcp_flags = 0;
848 }
849
850 static int
851 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
852 {
853     struct dp_netdev *dp = get_dp_netdev(dpif);
854     struct dp_netdev_flow *flow;
855     struct flow key;
856     int error;
857
858     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
859     if (error) {
860         return error;
861     }
862
863     xpthread_mutex_lock(&dp_netdev_mutex);
864     flow = dp_netdev_lookup_flow(dp, &key);
865     if (!flow) {
866         if (put->flags & DPIF_FP_CREATE) {
867             if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
868                 if (put->stats) {
869                     memset(put->stats, 0, sizeof *put->stats);
870                 }
871                 error = dp_netdev_flow_add(dp, &key, put->actions,
872                                            put->actions_len);
873             } else {
874                 error = EFBIG;
875             }
876         } else {
877             error = ENOENT;
878         }
879     } else {
880         if (put->flags & DPIF_FP_MODIFY) {
881             error = set_flow_actions(flow, put->actions, put->actions_len);
882             if (!error) {
883                 if (put->stats) {
884                     get_dpif_flow_stats(flow, put->stats);
885                 }
886                 if (put->flags & DPIF_FP_ZERO_STATS) {
887                     clear_stats(flow);
888                 }
889             }
890         } else {
891             error = EEXIST;
892         }
893     }
894     xpthread_mutex_unlock(&dp_netdev_mutex);
895
896     return error;
897 }
898
899 static int
900 dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
901 {
902     struct dp_netdev *dp = get_dp_netdev(dpif);
903     struct dp_netdev_flow *flow;
904     struct flow key;
905     int error;
906
907     error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key);
908     if (error) {
909         return error;
910     }
911
912     xpthread_mutex_lock(&dp_netdev_mutex);
913     flow = dp_netdev_lookup_flow(dp, &key);
914     if (flow) {
915         if (del->stats) {
916             get_dpif_flow_stats(flow, del->stats);
917         }
918         dp_netdev_free_flow(dp, flow);
919     } else {
920         error = ENOENT;
921     }
922     xpthread_mutex_unlock(&dp_netdev_mutex);
923
924     return error;
925 }
926
927 struct dp_netdev_flow_state {
928     uint32_t bucket;
929     uint32_t offset;
930     struct nlattr *actions;
931     struct odputil_keybuf keybuf;
932     struct dpif_flow_stats stats;
933 };
934
935 static int
936 dpif_netdev_flow_dump_start(const struct dpif *dpif OVS_UNUSED, void **statep)
937 {
938     struct dp_netdev_flow_state *state;
939
940     *statep = state = xmalloc(sizeof *state);
941     state->bucket = 0;
942     state->offset = 0;
943     state->actions = NULL;
944     return 0;
945 }
946
947 static int
948 dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
949                            const struct nlattr **key, size_t *key_len,
950                            const struct nlattr **mask, size_t *mask_len,
951                            const struct nlattr **actions, size_t *actions_len,
952                            const struct dpif_flow_stats **stats)
953 {
954     struct dp_netdev_flow_state *state = state_;
955     struct dp_netdev *dp = get_dp_netdev(dpif);
956     struct dp_netdev_flow *flow;
957     struct hmap_node *node;
958
959     xpthread_mutex_lock(&dp_netdev_mutex);
960     node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
961     if (!node) {
962         xpthread_mutex_unlock(&dp_netdev_mutex);
963         return EOF;
964     }
965
966     flow = CONTAINER_OF(node, struct dp_netdev_flow, node);
967
968     if (key) {
969         struct ofpbuf buf;
970
971         ofpbuf_use_stack(&buf, &state->keybuf, sizeof state->keybuf);
972         odp_flow_key_from_flow(&buf, &flow->key, flow->key.in_port.odp_port);
973
974         *key = buf.data;
975         *key_len = buf.size;
976     }
977
978     if (mask) {
979         *mask = NULL;
980         *mask_len = 0;
981     }
982
983     if (actions) {
984         free(state->actions);
985         state->actions = xmemdup(flow->actions, flow->actions_len);
986
987         *actions = state->actions;
988         *actions_len = flow->actions_len;
989     }
990
991     if (stats) {
992         get_dpif_flow_stats(flow, &state->stats);
993         *stats = &state->stats;
994     }
995
996     xpthread_mutex_unlock(&dp_netdev_mutex);
997     return 0;
998 }
999
1000 static int
1001 dpif_netdev_flow_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
1002 {
1003     struct dp_netdev_flow_state *state = state_;
1004
1005     free(state->actions);
1006     free(state);
1007     return 0;
1008 }
1009
1010 static int
1011 dpif_netdev_execute(struct dpif *dpif, const struct dpif_execute *execute)
1012 {
1013     struct dp_netdev *dp = get_dp_netdev(dpif);
1014     struct ofpbuf copy;
1015     struct flow key;
1016     int error;
1017
1018     if (execute->packet->size < ETH_HEADER_LEN ||
1019         execute->packet->size > UINT16_MAX) {
1020         return EINVAL;
1021     }
1022
1023     /* Make a deep copy of 'packet', because we might modify its data. */
1024     ofpbuf_init(&copy, DP_NETDEV_HEADROOM + execute->packet->size);
1025     ofpbuf_reserve(&copy, DP_NETDEV_HEADROOM);
1026     ofpbuf_put(&copy, execute->packet->data, execute->packet->size);
1027
1028     flow_extract(&copy, 0, 0, NULL, NULL, &key);
1029     error = dpif_netdev_flow_from_nlattrs(execute->key, execute->key_len,
1030                                           &key);
1031     if (!error) {
1032         xpthread_mutex_lock(&dp_netdev_mutex);
1033         dp_netdev_execute_actions(dp, &copy, &key,
1034                                   execute->actions, execute->actions_len);
1035         xpthread_mutex_unlock(&dp_netdev_mutex);
1036     }
1037
1038     ofpbuf_uninit(&copy);
1039     return error;
1040 }
1041
1042 static int
1043 dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
1044 {
1045     return 0;
1046 }
1047
1048 static int
1049 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
1050                               uint32_t queue_id, uint32_t *priority)
1051 {
1052     *priority = queue_id;
1053     return 0;
1054 }
1055
1056 static struct dp_netdev_queue *
1057 find_nonempty_queue(struct dpif *dpif)
1058 {
1059     struct dp_netdev *dp = get_dp_netdev(dpif);
1060     int i;
1061
1062     for (i = 0; i < N_QUEUES; i++) {
1063         struct dp_netdev_queue *q = &dp->queues[i];
1064         if (q->head != q->tail) {
1065             return q;
1066         }
1067     }
1068     return NULL;
1069 }
1070
1071 static int
1072 dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
1073                  struct ofpbuf *buf)
1074 {
1075     struct dp_netdev_queue *q;
1076     int error;
1077
1078     xpthread_mutex_lock(&dp_netdev_mutex);
1079     q = find_nonempty_queue(dpif);
1080     if (q) {
1081         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
1082
1083         *upcall = u->upcall;
1084         upcall->packet = buf;
1085
1086         ofpbuf_uninit(buf);
1087         *buf = u->buf;
1088
1089         error = 0;
1090     } else {
1091         error = EAGAIN;
1092     }
1093     xpthread_mutex_unlock(&dp_netdev_mutex);
1094
1095     return error;
1096 }
1097
1098 static void
1099 dpif_netdev_recv_wait(struct dpif *dpif)
1100 {
1101     /* XXX In a multithreaded process, there is a race window between this
1102      * function and the poll_block() in one thread and a packet being queued in
1103      * another thread. */
1104
1105     xpthread_mutex_lock(&dp_netdev_mutex);
1106     if (find_nonempty_queue(dpif)) {
1107         poll_immediate_wake();
1108     }
1109     xpthread_mutex_unlock(&dp_netdev_mutex);
1110 }
1111
1112 static void
1113 dpif_netdev_recv_purge(struct dpif *dpif)
1114 {
1115     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1116     xpthread_mutex_lock(&dp_netdev_mutex);
1117     dp_netdev_purge_queues(dpif_netdev->dp);
1118     xpthread_mutex_unlock(&dp_netdev_mutex);
1119 }
1120 \f
1121 static void
1122 dp_netdev_flow_used(struct dp_netdev_flow *flow, const struct ofpbuf *packet)
1123 {
1124     flow->used = time_msec();
1125     flow->packet_count++;
1126     flow->byte_count += packet->size;
1127     flow->tcp_flags |= packet_get_tcp_flags(packet, &flow->key);
1128 }
1129
1130 static void
1131 dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
1132                      struct ofpbuf *packet, uint32_t skb_priority,
1133                      uint32_t skb_mark, const struct flow_tnl *tnl)
1134 {
1135     struct dp_netdev_flow *flow;
1136     struct flow key;
1137     union flow_in_port in_port_;
1138
1139     if (packet->size < ETH_HEADER_LEN) {
1140         return;
1141     }
1142     in_port_.odp_port = port->port_no;
1143     flow_extract(packet, skb_priority, skb_mark, tnl, &in_port_, &key);
1144     flow = dp_netdev_lookup_flow(dp, &key);
1145     if (flow) {
1146         dp_netdev_flow_used(flow, packet);
1147         dp_netdev_execute_actions(dp, packet, &key,
1148                                   flow->actions, flow->actions_len);
1149         dp->n_hit++;
1150     } else {
1151         dp->n_missed++;
1152         dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
1153     }
1154 }
1155
1156 static void
1157 dpif_netdev_run(struct dpif *dpif)
1158 {
1159     struct dp_netdev_port *port;
1160     struct dp_netdev *dp;
1161     struct ofpbuf packet;
1162
1163     xpthread_mutex_lock(&dp_netdev_mutex);
1164     dp = get_dp_netdev(dpif);
1165     ofpbuf_init(&packet,
1166                 DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + dp->max_mtu);
1167
1168     LIST_FOR_EACH (port, node, &dp->port_list) {
1169         int error;
1170
1171         /* Reset packet contents. */
1172         ofpbuf_clear(&packet);
1173         ofpbuf_reserve(&packet, DP_NETDEV_HEADROOM);
1174
1175         error = port->rx ? netdev_rx_recv(port->rx, &packet) : EOPNOTSUPP;
1176         if (!error) {
1177             dp_netdev_port_input(dp, port, &packet, 0, 0, NULL);
1178         } else if (error != EAGAIN && error != EOPNOTSUPP) {
1179             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1180
1181             VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
1182                         netdev_get_name(port->netdev), ovs_strerror(error));
1183         }
1184     }
1185     ofpbuf_uninit(&packet);
1186     xpthread_mutex_unlock(&dp_netdev_mutex);
1187 }
1188
1189 static void
1190 dpif_netdev_wait(struct dpif *dpif)
1191 {
1192     struct dp_netdev_port *port;
1193
1194     /* There is a race here, if thread A calls dpif_netdev_wait(dpif) and
1195      * thread B calls dpif_port_add(dpif) or dpif_port_remove(dpif) before
1196      * A makes it to poll_block().
1197      *
1198      * But I think it doesn't matter:
1199      *
1200      *     - In the dpif_port_add() case, A will not wake up when a packet
1201      *       arrives on the new port, but this would also happen if the
1202      *       ordering were reversed.
1203      *
1204      *     - In the dpif_port_remove() case, A might wake up spuriously, but
1205      *       that is harmless. */
1206
1207     xpthread_mutex_lock(&dp_netdev_mutex);
1208     LIST_FOR_EACH (port, node, &get_dp_netdev(dpif)->port_list) {
1209         if (port->rx) {
1210             netdev_rx_wait(port->rx);
1211         }
1212     }
1213     xpthread_mutex_unlock(&dp_netdev_mutex);
1214 }
1215
1216 static void
1217 dp_netdev_output_port(void *dp_, struct ofpbuf *packet, uint32_t out_port)
1218 {
1219     struct dp_netdev *dp = dp_;
1220     struct dp_netdev_port *p = dp->ports[out_port];
1221     if (p) {
1222         netdev_send(p->netdev, packet);
1223     }
1224 }
1225
1226 static int
1227 dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
1228                            int queue_no, const struct flow *flow,
1229                            const struct nlattr *userdata)
1230 {
1231     struct dp_netdev_queue *q = &dp->queues[queue_no];
1232     if (q->head - q->tail < MAX_QUEUE_LEN) {
1233         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
1234         struct dpif_upcall *upcall = &u->upcall;
1235         struct ofpbuf *buf = &u->buf;
1236         size_t buf_size;
1237
1238         upcall->type = queue_no;
1239
1240         /* Allocate buffer big enough for everything. */
1241         buf_size = ODPUTIL_FLOW_KEY_BYTES + 2 + packet->size;
1242         if (userdata) {
1243             buf_size += NLA_ALIGN(userdata->nla_len);
1244         }
1245         ofpbuf_init(buf, buf_size);
1246
1247         /* Put ODP flow. */
1248         odp_flow_key_from_flow(buf, flow, flow->in_port.odp_port);
1249         upcall->key = buf->data;
1250         upcall->key_len = buf->size;
1251
1252         /* Put userdata. */
1253         if (userdata) {
1254             upcall->userdata = ofpbuf_put(buf, userdata,
1255                                           NLA_ALIGN(userdata->nla_len));
1256         }
1257
1258         /* Put packet.
1259          *
1260          * We adjust 'data' and 'size' in 'buf' so that only the packet itself
1261          * is visible in 'upcall->packet'.  The ODP flow and (if present)
1262          * userdata become part of the headroom. */
1263         ofpbuf_put_zeros(buf, 2);
1264         buf->data = ofpbuf_put(buf, packet->data, packet->size);
1265         buf->size = packet->size;
1266         upcall->packet = buf;
1267
1268         return 0;
1269     } else {
1270         dp->n_lost++;
1271         return ENOBUFS;
1272     }
1273 }
1274
1275 static void
1276 dp_netdev_action_userspace(void *dp, struct ofpbuf *packet,
1277                            const struct flow *key,
1278                            const struct nlattr *userdata)
1279 {
1280     dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
1281 }
1282
1283 static void
1284 dp_netdev_execute_actions(struct dp_netdev *dp,
1285                           struct ofpbuf *packet, struct flow *key,
1286                           const struct nlattr *actions,
1287                           size_t actions_len)
1288 {
1289     odp_execute_actions(dp, packet, key, actions, actions_len,
1290                         dp_netdev_output_port, dp_netdev_action_userspace);
1291 }
1292
1293 const struct dpif_class dpif_netdev_class = {
1294     "netdev",
1295     dpif_netdev_enumerate,
1296     dpif_netdev_port_open_type,
1297     dpif_netdev_open,
1298     dpif_netdev_close,
1299     dpif_netdev_destroy,
1300     dpif_netdev_run,
1301     dpif_netdev_wait,
1302     dpif_netdev_get_stats,
1303     dpif_netdev_port_add,
1304     dpif_netdev_port_del,
1305     dpif_netdev_port_query_by_number,
1306     dpif_netdev_port_query_by_name,
1307     dpif_netdev_get_max_ports,
1308     NULL,                       /* port_get_pid */
1309     dpif_netdev_port_dump_start,
1310     dpif_netdev_port_dump_next,
1311     dpif_netdev_port_dump_done,
1312     dpif_netdev_port_poll,
1313     dpif_netdev_port_poll_wait,
1314     dpif_netdev_flow_get,
1315     dpif_netdev_flow_put,
1316     dpif_netdev_flow_del,
1317     dpif_netdev_flow_flush,
1318     dpif_netdev_flow_dump_start,
1319     dpif_netdev_flow_dump_next,
1320     dpif_netdev_flow_dump_done,
1321     dpif_netdev_execute,
1322     NULL,                       /* operate */
1323     dpif_netdev_recv_set,
1324     dpif_netdev_queue_to_priority,
1325     dpif_netdev_recv,
1326     dpif_netdev_recv_wait,
1327     dpif_netdev_recv_purge,
1328 };
1329
1330 static void
1331 dpif_dummy_register__(const char *type)
1332 {
1333     struct dpif_class *class;
1334
1335     class = xmalloc(sizeof *class);
1336     *class = dpif_netdev_class;
1337     class->type = xstrdup(type);
1338     dp_register_provider(class);
1339 }
1340
1341 void
1342 dpif_dummy_register(bool override)
1343 {
1344     if (override) {
1345         struct sset types;
1346         const char *type;
1347
1348         sset_init(&types);
1349         dp_enumerate_types(&types);
1350         SSET_FOR_EACH (type, &types) {
1351             if (!dp_unregister_provider(type)) {
1352                 dpif_dummy_register__(type);
1353             }
1354         }
1355         sset_destroy(&types);
1356     }
1357
1358     dpif_dummy_register__("dummy");
1359 }