wdp-xflow: Remove wx structure from global list when closing.
[sliver-openvswitch.git] / ofproto / ofproto.c
1 /*
2  * Copyright (c) 2009, 2010, 2011 Nicira Networks.
3  * Copyright (c) 2010 Jean Tourrilhes - HP-Labs.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19 #include "ofproto.h"
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <sys/socket.h>
23 #include <net/if.h>
24 #include <netinet/in.h>
25 #include <stdbool.h>
26 #include <stdlib.h>
27 #include "classifier.h"
28 #include "coverage.h"
29 #include "discovery.h"
30 #include "dynamic-string.h"
31 #include "fail-open.h"
32 #include "hash.h"
33 #include "hmap.h"
34 #include "in-band.h"
35 #include "mac-learning.h"
36 #include "netdev.h"
37 #include "netflow.h"
38 #include "ofp-print.h"
39 #include "ofp-util.h"
40 #include "ofproto-sflow.h"
41 #include "ofpbuf.h"
42 #include "openflow/nicira-ext.h"
43 #include "openflow/openflow.h"
44 #include "openvswitch/xflow.h"
45 #include "packets.h"
46 #include "pinsched.h"
47 #include "pktbuf.h"
48 #include "poll-loop.h"
49 #include "rconn.h"
50 #include "shash.h"
51 #include "status.h"
52 #include "stream-ssl.h"
53 #include "svec.h"
54 #include "tag.h"
55 #include "timeval.h"
56 #include "unixctl.h"
57 #include "vconn.h"
58 #include "vlog.h"
59 #include "wdp.h"
60 #include "xfif.h"
61 #include "xtoxll.h"
62
63 VLOG_DEFINE_THIS_MODULE(ofproto)
64
65 #include "sflow_api.h"
66
67 struct ofproto_rule {
68     uint64_t flow_cookie;       /* Controller-issued identifier.
69                                    (Kept in network-byte order.) */
70     bool send_flow_removed;     /* Send a flow removed message? */
71     tag_type tags;              /* Tags (set only by hooks). */
72 };
73
74 static struct ofproto_rule *
75 ofproto_rule_cast(const struct wdp_rule *wdp_rule)
76 {
77     return wdp_rule->client_data;
78 }
79
80 static void
81 ofproto_rule_init(struct wdp_rule *wdp_rule)
82 {
83     wdp_rule->client_data = xzalloc(sizeof(struct ofproto_rule));
84 }
85
86 static inline bool
87 rule_is_hidden(const struct wdp_rule *rule)
88 {
89     /* Rules with priority higher than UINT16_MAX are set up by ofproto itself
90      * (e.g. by in-band control) and are intentionally hidden from the
91      * controller. */
92     if (rule->cr.flow.priority > UINT16_MAX) {
93         return true;
94     }
95
96     return false;
97 }
98
99 static int delete_flow(struct ofproto *, struct wdp_rule *, uint8_t reason);
100
101 /* ofproto supports two kinds of OpenFlow connections:
102  *
103  *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
104  *     maintains persistent connections to these controllers and by default
105  *     sends them asynchronous messages such as packet-ins.
106  *
107  *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
108  *     drop, it is the other side's responsibility to reconnect them if
109  *     necessary.  ofproto does not send them asynchronous messages by default.
110  *
111  * Currently, active (tcp, ssl, unix) connections are always "primary"
112  * connections and passive (ptcp, pssl, punix) connections are always "service"
113  * connections.  There is no inherent reason for this, but it reflects the
114  * common case.
115  */
116 enum ofconn_type {
117     OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
118     OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
119 };
120
121 /* A listener for incoming OpenFlow "service" connections. */
122 struct ofservice {
123     struct hmap_node node;      /* In struct ofproto's "services" hmap. */
124     struct pvconn *pvconn;      /* OpenFlow connection listener. */
125
126     /* These are not used by ofservice directly.  They are settings for
127      * accepted "struct ofconn"s from the pvconn. */
128     int probe_interval;         /* Max idle time before probing, in seconds. */
129     int rate_limit;             /* Max packet-in rate in packets per second. */
130     int burst_limit;            /* Limit on accumulating packet credits. */
131 };
132
133 static struct ofservice *ofservice_lookup(struct ofproto *,
134                                           const char *target);
135 static int ofservice_create(struct ofproto *,
136                             const struct ofproto_controller *);
137 static void ofservice_reconfigure(struct ofservice *,
138                                   const struct ofproto_controller *);
139 static void ofservice_destroy(struct ofproto *, struct ofservice *);
140
141 /* An OpenFlow connection. */
142 struct ofconn {
143     struct ofproto *ofproto;    /* The ofproto that owns this connection. */
144     struct list node;           /* In struct ofproto's "all_conns" list. */
145     struct rconn *rconn;        /* OpenFlow connection. */
146     enum ofconn_type type;      /* Type. */
147     bool flow_mod_table_id;     /* NXT_FLOW_MOD_TABLE_ID enabled? */
148
149     /* OFPT_PACKET_IN related data. */
150     struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
151     struct pinsched *schedulers[2]; /* Indexed by reason code; see below. */
152     struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
153     int miss_send_len;             /* Bytes to send of buffered packets. */
154
155     /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
156      * requests, and the maximum number before we stop reading OpenFlow
157      * requests.  */
158 #define OFCONN_REPLY_MAX 100
159     struct rconn_packet_counter *reply_counter;
160
161     /* type == OFCONN_PRIMARY only. */
162     enum nx_role role;           /* Role. */
163     struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
164     struct discovery *discovery; /* Controller discovery object, if enabled. */
165     struct status_category *ss;  /* Switch status category. */
166     enum ofproto_band band;      /* In-band or out-of-band? */
167 };
168
169 /* We use OFPR_NO_MATCH and OFPR_ACTION as indexes into struct ofconn's
170  * "schedulers" array.  Their values are 0 and 1, and their meanings and values
171  * coincide with WDP_CHAN_MISS and WDP_CHAN_ACTION, so this is convenient.  In
172  * case anything ever changes, check their values here.  */
173 #define N_SCHEDULERS 2
174 BUILD_ASSERT_DECL(OFPR_NO_MATCH == 0);
175 BUILD_ASSERT_DECL(OFPR_NO_MATCH == WDP_CHAN_MISS);
176 BUILD_ASSERT_DECL(OFPR_ACTION == 1);
177 BUILD_ASSERT_DECL(OFPR_ACTION == WDP_CHAN_ACTION);
178
179 static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
180                                     enum ofconn_type);
181 static void ofconn_destroy(struct ofconn *);
182 static void ofconn_run(struct ofconn *, struct ofproto *);
183 static void ofconn_wait(struct ofconn *);
184 static bool ofconn_receives_async_msgs(const struct ofconn *);
185 static char *ofconn_make_name(const struct ofproto *, const char *target);
186 static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
187
188 static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
189                      struct rconn_packet_counter *counter);
190
191 static void send_packet_in(struct ofproto *, struct wdp_packet *);
192 static void do_send_packet_in(struct wdp_packet *, void *ofconn);
193
194 struct ofproto {
195     /* Settings. */
196     uint64_t datapath_id;       /* Datapath ID. */
197     uint64_t fallback_dpid;     /* Datapath ID if no better choice found. */
198     char *mfr_desc;             /* Manufacturer. */
199     char *hw_desc;              /* Hardware. */
200     char *sw_desc;              /* Software version. */
201     char *serial_desc;          /* Serial number. */
202     char *dp_desc;              /* Datapath description. */
203
204     /* Datapath. */
205     struct wdp *wdp;
206     uint32_t max_ports;
207
208     /* Configuration. */
209     struct switch_status *switch_status;
210     struct fail_open *fail_open;
211     struct netflow *netflow;
212     struct ofproto_sflow *sflow;
213     bool tun_id_from_cookie;    /* NXT_TUN_ID_FROM_COOKIE enabled? */
214
215     /* In-band control. */
216     struct in_band *in_band;
217     long long int next_in_band_update;
218     struct sockaddr_in *extra_in_band_remotes;
219     size_t n_extra_remotes;
220
221     /* OpenFlow connections. */
222     struct hmap controllers;   /* Controller "struct ofconn"s. */
223     struct list all_conns;     /* Contains "struct ofconn"s. */
224     enum ofproto_fail_mode fail_mode;
225
226     /* OpenFlow listeners. */
227     struct hmap services;       /* Contains "struct ofservice"s. */
228     struct pvconn **snoops;
229     size_t n_snoops;
230 };
231
232 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
233
234 static uint64_t pick_datapath_id(const struct ofproto *);
235 static uint64_t pick_fallback_dpid(void);
236
237 static void handle_wdp_packet(struct ofproto *, struct wdp_packet *);
238
239 static void handle_openflow(struct ofconn *, struct ofproto *,
240                             struct ofpbuf *);
241
242 int
243 ofproto_create(const char *datapath, const char *datapath_type,
244                const struct ofhooks *ofhooks, void *aux,
245                struct ofproto **ofprotop)
246 {
247     struct wdp_stats stats;
248     struct ofproto *p;
249     struct wdp *wdp;
250     int error;
251
252     *ofprotop = NULL;
253
254     /* Connect to datapath and start listening for messages. */
255     error = wdp_create_and_open(datapath, datapath_type, &wdp);
256     if (error) {
257         VLOG_ERR("failed to open datapath %s: %s", datapath, strerror(error));
258         return error;
259     }
260     error = wdp_get_wdp_stats(wdp, &stats);
261     if (error) {
262         VLOG_ERR("failed to obtain stats for datapath %s: %s",
263                  datapath, strerror(error));
264         wdp_close(wdp);
265         return error;
266     }
267     error = wdp_recv_set_mask(wdp, ((1 << WDP_CHAN_MISS)
268                                     | (1 << WDP_CHAN_ACTION)
269                                     | (1 << WDP_CHAN_SFLOW)));
270     if (error) {
271         VLOG_ERR("failed to listen on datapath %s: %s",
272                  datapath, strerror(error));
273         wdp_close(wdp);
274         return error;
275     }
276     wdp_flow_flush(wdp);
277     wdp_recv_purge(wdp);
278     wdp_set_ofhooks(wdp, ofhooks, aux);
279
280     /* Initialize settings. */
281     p = xzalloc(sizeof *p);
282     p->fallback_dpid = pick_fallback_dpid();
283     p->datapath_id = p->fallback_dpid;
284     p->mfr_desc = xstrdup(DEFAULT_MFR_DESC);
285     p->hw_desc = xstrdup(DEFAULT_HW_DESC);
286     p->sw_desc = xstrdup(DEFAULT_SW_DESC);
287     p->serial_desc = xstrdup(DEFAULT_SERIAL_DESC);
288     p->dp_desc = xstrdup(DEFAULT_DP_DESC);
289
290     /* Initialize datapath. */
291     p->wdp = wdp;
292     p->max_ports = stats.max_ports;
293
294     /* Initialize submodules. */
295     p->switch_status = switch_status_create(p);
296     p->in_band = NULL;
297     p->fail_open = NULL;
298     p->netflow = NULL;
299     p->sflow = NULL;
300
301     /* Initialize OpenFlow connections. */
302     list_init(&p->all_conns);
303     hmap_init(&p->controllers);
304     hmap_init(&p->services);
305     p->snoops = NULL;
306     p->n_snoops = 0;
307
308     /* Pick final datapath ID. */
309     p->datapath_id = pick_datapath_id(p);
310     VLOG_INFO("using datapath ID %016"PRIx64, p->datapath_id);
311
312     *ofprotop = p;
313     return 0;
314 }
315
316 struct wdp *
317 ofproto_get_wdp(struct ofproto *ofproto)
318 {
319     return ofproto->wdp;
320 }
321
322 void
323 ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
324 {
325     uint64_t old_dpid = p->datapath_id;
326     p->datapath_id = datapath_id ? datapath_id : pick_datapath_id(p);
327     if (p->datapath_id != old_dpid) {
328         VLOG_INFO("datapath ID changed to %016"PRIx64, p->datapath_id);
329
330         /* Force all active connections to reconnect, since there is no way to
331          * notify a controller that the datapath ID has changed. */
332         ofproto_reconnect_controllers(p);
333     }
334 }
335
336 static bool
337 is_discovery_controller(const struct ofproto_controller *c)
338 {
339     return !strcmp(c->target, "discover");
340 }
341
342 static bool
343 is_in_band_controller(const struct ofproto_controller *c)
344 {
345     return is_discovery_controller(c) || c->band == OFPROTO_IN_BAND;
346 }
347
348 /* Creates a new controller in 'ofproto'.  Some of the settings are initially
349  * drawn from 'c', but update_controller() needs to be called later to finish
350  * the new ofconn's configuration. */
351 static void
352 add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
353 {
354     struct discovery *discovery;
355     struct ofconn *ofconn;
356
357     if (is_discovery_controller(c)) {
358         int error = discovery_create(c->accept_re, c->update_resolv_conf,
359                                      ofproto->wdp, ofproto->switch_status,
360                                      &discovery);
361         if (error) {
362             return;
363         }
364     } else {
365         discovery = NULL;
366     }
367
368     ofconn = ofconn_create(ofproto, rconn_create(5, 8), OFCONN_PRIMARY);
369     ofconn->pktbuf = pktbuf_create();
370     ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
371     if (discovery) {
372         ofconn->discovery = discovery;
373     } else {
374         char *name = ofconn_make_name(ofproto, c->target);
375         rconn_connect(ofconn->rconn, c->target, name);
376         free(name);
377     }
378     hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
379                 hash_string(c->target, 0));
380 }
381
382 /* Reconfigures 'ofconn' to match 'c'.  This function cannot update an ofconn's
383  * target or turn discovery on or off (these are done by creating new ofconns
384  * and deleting old ones), but it can update the rest of an ofconn's
385  * settings. */
386 static void
387 update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
388 {
389     int probe_interval;
390
391     ofconn->band = (is_in_band_controller(c)
392                     ? OFPROTO_IN_BAND : OFPROTO_OUT_OF_BAND);
393
394     rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
395
396     probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
397     rconn_set_probe_interval(ofconn->rconn, probe_interval);
398
399     if (ofconn->discovery) {
400         discovery_set_update_resolv_conf(ofconn->discovery,
401                                          c->update_resolv_conf);
402         discovery_set_accept_controller_re(ofconn->discovery, c->accept_re);
403     }
404
405     ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
406 }
407
408 static const char *
409 ofconn_get_target(const struct ofconn *ofconn)
410 {
411     return ofconn->discovery ? "discover" : rconn_get_target(ofconn->rconn);
412 }
413
414 static struct ofconn *
415 find_controller_by_target(struct ofproto *ofproto, const char *target)
416 {
417     struct ofconn *ofconn;
418
419     HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
420                              hash_string(target, 0), &ofproto->controllers) {
421         if (!strcmp(ofconn_get_target(ofconn), target)) {
422             return ofconn;
423         }
424     }
425     return NULL;
426 }
427
428 static void
429 update_in_band_remotes(struct ofproto *ofproto)
430 {
431     const struct ofconn *ofconn;
432     struct sockaddr_in *addrs;
433     size_t max_addrs, n_addrs;
434     bool discovery;
435     size_t i;
436
437     /* Allocate enough memory for as many remotes as we could possibly have. */
438     max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
439     addrs = xmalloc(max_addrs * sizeof *addrs);
440     n_addrs = 0;
441
442     /* Add all the remotes. */
443     discovery = false;
444     HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
445         struct sockaddr_in *sin = &addrs[n_addrs];
446
447         if (ofconn->band == OFPROTO_OUT_OF_BAND) {
448             continue;
449         }
450
451         sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
452         if (sin->sin_addr.s_addr) {
453             sin->sin_port = rconn_get_remote_port(ofconn->rconn);
454             n_addrs++;
455         }
456         if (ofconn->discovery) {
457             discovery = true;
458         }
459     }
460     for (i = 0; i < ofproto->n_extra_remotes; i++) {
461         addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
462     }
463
464     /* Create or update or destroy in-band.
465      *
466      * Ordinarily we only enable in-band if there's at least one remote
467      * address, but discovery needs the in-band rules for DHCP to be installed
468      * even before we know any remote addresses. */
469     if (n_addrs || discovery) {
470         if (!ofproto->in_band) {
471             in_band_create(ofproto, ofproto->wdp, ofproto->switch_status,
472                            &ofproto->in_band);
473         }
474         if (ofproto->in_band) {
475             in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
476         }
477         ofproto->next_in_band_update = time_msec() + 1000;
478     } else {
479         in_band_destroy(ofproto->in_band);
480         ofproto->in_band = NULL;
481     }
482
483     /* Clean up. */
484     free(addrs);
485 }
486
487 static void
488 update_fail_open(struct ofproto *p)
489 {
490     struct ofconn *ofconn;
491
492     if (!hmap_is_empty(&p->controllers)
493             && p->fail_mode == OFPROTO_FAIL_STANDALONE) {
494         struct rconn **rconns;
495         size_t n;
496
497         if (!p->fail_open) {
498             p->fail_open = fail_open_create(p, p->switch_status);
499         }
500
501         n = 0;
502         rconns = xmalloc(hmap_count(&p->controllers) * sizeof *rconns);
503         HMAP_FOR_EACH (ofconn, hmap_node, &p->controllers) {
504             rconns[n++] = ofconn->rconn;
505         }
506
507         fail_open_set_controllers(p->fail_open, rconns, n);
508         /* p->fail_open takes ownership of 'rconns'. */
509     } else {
510         fail_open_destroy(p->fail_open);
511         p->fail_open = NULL;
512     }
513 }
514
515 void
516 ofproto_set_controllers(struct ofproto *p,
517                         const struct ofproto_controller *controllers,
518                         size_t n_controllers)
519 {
520     struct shash new_controllers;
521     struct ofconn *ofconn, *next_ofconn;
522     struct ofservice *ofservice, *next_ofservice;
523     bool ss_exists;
524     size_t i;
525
526     /* Create newly configured controllers and services.
527      * Create a name to ofproto_controller mapping in 'new_controllers'. */
528     shash_init(&new_controllers);
529     for (i = 0; i < n_controllers; i++) {
530         const struct ofproto_controller *c = &controllers[i];
531
532         if (!vconn_verify_name(c->target) || !strcmp(c->target, "discover")) {
533             if (!find_controller_by_target(p, c->target)) {
534                 add_controller(p, c);
535             }
536         } else if (!pvconn_verify_name(c->target)) {
537             if (!ofservice_lookup(p, c->target) && ofservice_create(p, c)) {
538                 continue;
539             }
540         } else {
541             VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
542                          wdp_name(p->wdp), c->target);
543             continue;
544         }
545
546         shash_add_once(&new_controllers, c->target, &controllers[i]);
547     }
548
549     /* Delete controllers that are no longer configured.
550      * Update configuration of all now-existing controllers. */
551     ss_exists = false;
552     HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &p->controllers) {
553         struct ofproto_controller *c;
554
555         c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
556         if (!c) {
557             ofconn_destroy(ofconn);
558         } else {
559             update_controller(ofconn, c);
560             if (ofconn->ss) {
561                 ss_exists = true;
562             }
563         }
564     }
565
566     /* Delete services that are no longer configured.
567      * Update configuration of all now-existing services. */
568     HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
569         struct ofproto_controller *c;
570
571         c = shash_find_data(&new_controllers,
572                             pvconn_get_name(ofservice->pvconn));
573         if (!c) {
574             ofservice_destroy(p, ofservice);
575         } else {
576             ofservice_reconfigure(ofservice, c);
577         }
578     }
579
580     shash_destroy(&new_controllers);
581
582     update_in_band_remotes(p);
583     update_fail_open(p);
584
585     if (!hmap_is_empty(&p->controllers) && !ss_exists) {
586         ofconn = CONTAINER_OF(hmap_first(&p->controllers),
587                               struct ofconn, hmap_node);
588         ofconn->ss = switch_status_register(p->switch_status, "remote",
589                                             rconn_status_cb, ofconn->rconn);
590     }
591 }
592
593 void
594 ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
595 {
596     p->fail_mode = fail_mode;
597     update_fail_open(p);
598 }
599
600 /* Drops the connections between 'ofproto' and all of its controllers, forcing
601  * them to reconnect. */
602 void
603 ofproto_reconnect_controllers(struct ofproto *ofproto)
604 {
605     struct ofconn *ofconn;
606
607     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
608         rconn_reconnect(ofconn->rconn);
609     }
610 }
611
612 static bool
613 any_extras_changed(const struct ofproto *ofproto,
614                    const struct sockaddr_in *extras, size_t n)
615 {
616     size_t i;
617
618     if (n != ofproto->n_extra_remotes) {
619         return true;
620     }
621
622     for (i = 0; i < n; i++) {
623         const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
624         const struct sockaddr_in *new = &extras[i];
625
626         if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
627             old->sin_port != new->sin_port) {
628             return true;
629         }
630     }
631
632     return false;
633 }
634
635 /* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
636  * in-band control should guarantee access, in the same way that in-band
637  * control guarantees access to OpenFlow controllers. */
638 void
639 ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
640                                   const struct sockaddr_in *extras, size_t n)
641 {
642     if (!any_extras_changed(ofproto, extras, n)) {
643         return;
644     }
645
646     free(ofproto->extra_in_band_remotes);
647     ofproto->n_extra_remotes = n;
648     ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
649
650     update_in_band_remotes(ofproto);
651 }
652
653 void
654 ofproto_set_desc(struct ofproto *p,
655                  const char *mfr_desc, const char *hw_desc,
656                  const char *sw_desc, const char *serial_desc,
657                  const char *dp_desc)
658 {
659     struct ofp_desc_stats *ods;
660
661     if (mfr_desc) {
662         if (strlen(mfr_desc) >= sizeof ods->mfr_desc) {
663             VLOG_WARN("truncating mfr_desc, must be less than %zu characters",
664                     sizeof ods->mfr_desc);
665         }
666         free(p->mfr_desc);
667         p->mfr_desc = xstrdup(mfr_desc);
668     }
669     if (hw_desc) {
670         if (strlen(hw_desc) >= sizeof ods->hw_desc) {
671             VLOG_WARN("truncating hw_desc, must be less than %zu characters",
672                     sizeof ods->hw_desc);
673         }
674         free(p->hw_desc);
675         p->hw_desc = xstrdup(hw_desc);
676     }
677     if (sw_desc) {
678         if (strlen(sw_desc) >= sizeof ods->sw_desc) {
679             VLOG_WARN("truncating sw_desc, must be less than %zu characters",
680                     sizeof ods->sw_desc);
681         }
682         free(p->sw_desc);
683         p->sw_desc = xstrdup(sw_desc);
684     }
685     if (serial_desc) {
686         if (strlen(serial_desc) >= sizeof ods->serial_num) {
687             VLOG_WARN("truncating serial_desc, must be less than %zu "
688                     "characters",
689                     sizeof ods->serial_num);
690         }
691         free(p->serial_desc);
692         p->serial_desc = xstrdup(serial_desc);
693     }
694     if (dp_desc) {
695         if (strlen(dp_desc) >= sizeof ods->dp_desc) {
696             VLOG_WARN("truncating dp_desc, must be less than %zu characters",
697                     sizeof ods->dp_desc);
698         }
699         free(p->dp_desc);
700         p->dp_desc = xstrdup(dp_desc);
701     }
702 }
703
704 static int
705 set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
706             const struct svec *svec)
707 {
708     struct pvconn **pvconns = *pvconnsp;
709     size_t n_pvconns = *n_pvconnsp;
710     int retval = 0;
711     size_t i;
712
713     for (i = 0; i < n_pvconns; i++) {
714         pvconn_close(pvconns[i]);
715     }
716     free(pvconns);
717
718     pvconns = xmalloc(svec->n * sizeof *pvconns);
719     n_pvconns = 0;
720     for (i = 0; i < svec->n; i++) {
721         const char *name = svec->names[i];
722         struct pvconn *pvconn;
723         int error;
724
725         error = pvconn_open(name, &pvconn);
726         if (!error) {
727             pvconns[n_pvconns++] = pvconn;
728         } else {
729             VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
730             if (!retval) {
731                 retval = error;
732             }
733         }
734     }
735
736     *pvconnsp = pvconns;
737     *n_pvconnsp = n_pvconns;
738
739     return retval;
740 }
741
742 int
743 ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
744 {
745     return set_pvconns(&ofproto->snoops, &ofproto->n_snoops, snoops);
746 }
747
748 int
749 ofproto_set_netflow(struct ofproto *ofproto,
750                     const struct netflow_options *nf_options)
751 {
752     if (nf_options && nf_options->collectors.n) {
753         if (!ofproto->netflow) {
754             ofproto->netflow = netflow_create();
755         }
756         return netflow_set_options(ofproto->netflow, nf_options);
757     } else {
758         netflow_destroy(ofproto->netflow);
759         ofproto->netflow = NULL;
760         return 0;
761     }
762 }
763
764 void
765 ofproto_set_sflow(struct ofproto *ofproto,
766                   const struct ofproto_sflow_options *oso)
767 {
768     struct ofproto_sflow *os = ofproto->sflow;
769     if (oso) {
770         if (!os) {
771             os = ofproto->sflow = ofproto_sflow_create(ofproto->wdp);
772             /* XXX ofport */
773         }
774         ofproto_sflow_set_options(os, oso);
775     } else {
776         ofproto_sflow_destroy(os);
777         ofproto->sflow = NULL;
778     }
779 }
780
781 uint64_t
782 ofproto_get_datapath_id(const struct ofproto *ofproto)
783 {
784     return ofproto->datapath_id;
785 }
786
787 bool
788 ofproto_has_primary_controller(const struct ofproto *ofproto)
789 {
790     return !hmap_is_empty(&ofproto->controllers);
791 }
792
793 enum ofproto_fail_mode
794 ofproto_get_fail_mode(const struct ofproto *p)
795 {
796     return p->fail_mode;
797 }
798
799 void
800 ofproto_get_snoops(const struct ofproto *ofproto, struct svec *snoops)
801 {
802     size_t i;
803
804     for (i = 0; i < ofproto->n_snoops; i++) {
805         svec_add(snoops, pvconn_get_name(ofproto->snoops[i]));
806     }
807 }
808
809 void
810 ofproto_destroy(struct ofproto *p)
811 {
812     struct ofservice *ofservice, *next_ofservice;
813     struct ofconn *ofconn, *next_ofconn;
814     size_t i;
815
816     if (!p) {
817         return;
818     }
819
820     /* Destroy fail-open and in-band early, since they touch the classifier. */
821     fail_open_destroy(p->fail_open);
822     p->fail_open = NULL;
823
824     in_band_destroy(p->in_band);
825     p->in_band = NULL;
826     free(p->extra_in_band_remotes);
827
828     ofproto_flush_flows(p);
829
830     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
831         ofconn_destroy(ofconn);
832     }
833     hmap_destroy(&p->controllers);
834
835     wdp_close(p->wdp);
836
837     switch_status_destroy(p->switch_status);
838     netflow_destroy(p->netflow);
839     ofproto_sflow_destroy(p->sflow);
840
841     HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
842         ofservice_destroy(p, ofservice);
843     }
844     hmap_destroy(&p->services);
845
846     for (i = 0; i < p->n_snoops; i++) {
847         pvconn_close(p->snoops[i]);
848     }
849     free(p->snoops);
850
851     free(p->mfr_desc);
852     free(p->hw_desc);
853     free(p->sw_desc);
854     free(p->serial_desc);
855     free(p->dp_desc);
856
857     free(p);
858 }
859
860 int
861 ofproto_run(struct ofproto *p)
862 {
863     int error = ofproto_run1(p);
864     if (!error) {
865         error = ofproto_run2(p, false);
866     }
867     return error;
868 }
869
870 /* Returns a "preference level" for snooping 'ofconn'.  A higher return value
871  * means that 'ofconn' is more interesting for monitoring than a lower return
872  * value. */
873 static int
874 snoop_preference(const struct ofconn *ofconn)
875 {
876     switch (ofconn->role) {
877     case NX_ROLE_MASTER:
878         return 3;
879     case NX_ROLE_OTHER:
880         return 2;
881     case NX_ROLE_SLAVE:
882         return 1;
883     default:
884         /* Shouldn't happen. */
885         return 0;
886     }
887 }
888
889 /* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
890  * Connects this vconn to a controller. */
891 static void
892 add_snooper(struct ofproto *ofproto, struct vconn *vconn)
893 {
894     struct ofconn *ofconn, *best;
895
896     /* Pick a controller for monitoring. */
897     best = NULL;
898     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
899         if (ofconn->type == OFCONN_PRIMARY
900             && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
901             best = ofconn;
902         }
903     }
904
905     if (best) {
906         rconn_add_monitor(best->rconn, vconn);
907     } else {
908         VLOG_INFO_RL(&rl, "no controller connection to snoop");
909         vconn_close(vconn);
910     }
911 }
912
913 static void
914 ofproto_port_poll_cb(const struct ofp_phy_port *opp, uint8_t reason,
915                      void *ofproto_)
916 {
917     /* XXX Should limit the number of queued port status change messages. */
918     struct ofproto *ofproto = ofproto_;
919     struct ofconn *ofconn;
920
921     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
922         struct ofp_port_status *ops;
923         struct ofpbuf *b;
924
925         if (!ofconn_receives_async_msgs(ofconn)) {
926             continue;
927         }
928
929         ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
930         ops->reason = reason;
931         ops->desc = *opp;
932         hton_ofp_phy_port(&ops->desc);
933         queue_tx(b, ofconn, NULL);
934     }
935 }
936
937 int
938 ofproto_run1(struct ofproto *p)
939 {
940     struct ofconn *ofconn, *next_ofconn;
941     struct ofservice *ofservice;
942     int i;
943
944     for (i = 0; i < 50; i++) {
945         struct wdp_packet packet;
946         int error;
947
948         error = wdp_recv(p->wdp, &packet);
949         if (error) {
950             if (error == ENODEV) {
951                 /* Someone destroyed the datapath behind our back.  The caller
952                  * better destroy us and give up, because we're just going to
953                  * spin from here on out. */
954                 static struct vlog_rate_limit rl2 = VLOG_RATE_LIMIT_INIT(1, 5);
955                 VLOG_ERR_RL(&rl2, "%s: datapath was destroyed externally",
956                             wdp_name(p->wdp));
957                 return ENODEV;
958             }
959             break;
960         }
961
962         handle_wdp_packet(p, xmemdup(&packet, sizeof packet));
963     }
964
965     wdp_port_poll(p->wdp, ofproto_port_poll_cb, p);
966
967     if (p->in_band) {
968         if (time_msec() >= p->next_in_band_update) {
969             update_in_band_remotes(p);
970         }
971         in_band_run(p->in_band);
972     }
973
974     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
975         ofconn_run(ofconn, p);
976     }
977
978     /* Fail-open maintenance.  Do this after processing the ofconns since
979      * fail-open checks the status of the controller rconn. */
980     if (p->fail_open) {
981         fail_open_run(p->fail_open);
982     }
983
984     HMAP_FOR_EACH (ofservice, node, &p->services) {
985         struct vconn *vconn;
986         int retval;
987
988         retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
989         if (!retval) {
990             struct rconn *rconn;
991             char *name;
992
993             rconn = rconn_create(ofservice->probe_interval, 0);
994             name = ofconn_make_name(p, vconn_get_name(vconn));
995             rconn_connect_unreliably(rconn, vconn, name);
996             free(name);
997
998             ofconn = ofconn_create(p, rconn, OFCONN_SERVICE);
999             ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
1000                                   ofservice->burst_limit);
1001         } else if (retval != EAGAIN) {
1002             VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
1003         }
1004     }
1005
1006     for (i = 0; i < p->n_snoops; i++) {
1007         struct vconn *vconn;
1008         int retval;
1009
1010         retval = pvconn_accept(p->snoops[i], OFP_VERSION, &vconn);
1011         if (!retval) {
1012             add_snooper(p, vconn);
1013         } else if (retval != EAGAIN) {
1014             VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
1015         }
1016     }
1017
1018     if (p->netflow) {
1019         netflow_run(p->netflow);
1020     }
1021     if (p->sflow) {
1022         ofproto_sflow_run(p->sflow);
1023     }
1024
1025     return 0;
1026 }
1027
1028 int
1029 ofproto_run2(struct ofproto *p OVS_UNUSED, bool revalidate_all OVS_UNUSED)
1030 {
1031     return 0;
1032 }
1033
1034 void
1035 ofproto_wait(struct ofproto *p)
1036 {
1037     struct ofservice *ofservice;
1038     struct ofconn *ofconn;
1039     size_t i;
1040
1041     wdp_recv_wait(p->wdp);
1042     wdp_port_poll_wait(p->wdp);
1043     LIST_FOR_EACH (ofconn, node, &p->all_conns) {
1044         ofconn_wait(ofconn);
1045     }
1046     if (p->in_band) {
1047         poll_timer_wait_until(p->next_in_band_update);
1048         in_band_wait(p->in_band);
1049     }
1050     if (p->fail_open) {
1051         fail_open_wait(p->fail_open);
1052     }
1053     if (p->sflow) {
1054         ofproto_sflow_wait(p->sflow);
1055     }
1056     HMAP_FOR_EACH (ofservice, node, &p->services) {
1057         pvconn_wait(ofservice->pvconn);
1058     }
1059     for (i = 0; i < p->n_snoops; i++) {
1060         pvconn_wait(p->snoops[i]);
1061     }
1062 }
1063
1064 void
1065 ofproto_revalidate(struct ofproto *ofproto, tag_type tag)
1066 {
1067     wdp_revalidate(ofproto->wdp, tag);
1068 }
1069
1070 void
1071 ofproto_revalidate_all(struct ofproto *ofproto)
1072 {
1073     wdp_revalidate_all(ofproto->wdp);
1074 }
1075
1076 bool
1077 ofproto_is_alive(const struct ofproto *p)
1078 {
1079     return !hmap_is_empty(&p->controllers);
1080 }
1081
1082 int
1083 ofproto_send_packet(struct ofproto *p, const flow_t *flow,
1084                     const union ofp_action *actions, size_t n_actions,
1085                     const struct ofpbuf *packet)
1086 {
1087     /* XXX Should we translate the wdp_execute() errno value into an OpenFlow
1088      * error code? */
1089     wdp_execute(p->wdp, flow->in_port, actions, n_actions, packet);
1090     return 0;
1091 }
1092
1093 /* Intended for used by ofproto clients and ofproto submodules to add flows to
1094  * the flow table. */
1095 void
1096 ofproto_add_flow(struct ofproto *p, const flow_t *flow,
1097                  const union ofp_action *actions, size_t n_actions,
1098                  int idle_timeout)
1099 {
1100     struct wdp_flow_put put;
1101     struct wdp_rule *rule;
1102
1103     put.flags = WDP_PUT_CREATE | WDP_PUT_MODIFY | WDP_PUT_ALL;
1104     put.flow = flow;
1105     put.actions = actions;
1106     put.n_actions = n_actions;
1107     put.idle_timeout = idle_timeout;
1108     put.hard_timeout = 0;
1109     put.ofp_table_id = 0xff;
1110     put.cookie = htonll(0);
1111     put.xid = htonl(0);
1112
1113     if (!wdp_flow_put(p->wdp, &put, NULL, &rule)) {
1114         ofproto_rule_init(rule);
1115     }
1116 }
1117
1118 /* Intended for used by ofproto clients and ofproto submodules to delete flows
1119  * that they earlier added to the flow table. */
1120 void
1121 ofproto_delete_flow(struct ofproto *ofproto, const flow_t *flow)
1122 {
1123     struct wdp_rule *rule = wdp_flow_get(ofproto->wdp, flow, UINT_MAX);
1124     if (rule) {
1125         delete_flow(ofproto, rule, OFPRR_DELETE);
1126     }
1127 }
1128
1129 void
1130 ofproto_flush_flows(struct ofproto *ofproto)
1131 {
1132     COVERAGE_INC(ofproto_flush);
1133     wdp_flow_flush(ofproto->wdp);
1134     if (ofproto->in_band) {
1135         in_band_flushed(ofproto->in_band);
1136     }
1137     if (ofproto->fail_open) {
1138         fail_open_flushed(ofproto->fail_open);
1139     }
1140 }
1141 \f
1142 static struct ofconn *
1143 ofconn_create(struct ofproto *p, struct rconn *rconn, enum ofconn_type type)
1144 {
1145     struct ofconn *ofconn = xzalloc(sizeof *ofconn);
1146     ofconn->ofproto = p;
1147     list_push_back(&p->all_conns, &ofconn->node);
1148     ofconn->rconn = rconn;
1149     ofconn->type = type;
1150     ofconn->role = NX_ROLE_OTHER;
1151     ofconn->packet_in_counter = rconn_packet_counter_create ();
1152     ofconn->pktbuf = NULL;
1153     ofconn->miss_send_len = 0;
1154     ofconn->reply_counter = rconn_packet_counter_create ();
1155     return ofconn;
1156 }
1157
1158 static void
1159 ofconn_destroy(struct ofconn *ofconn)
1160 {
1161     if (ofconn->type == OFCONN_PRIMARY) {
1162         hmap_remove(&ofconn->ofproto->controllers, &ofconn->hmap_node);
1163     }
1164     discovery_destroy(ofconn->discovery);
1165
1166     list_remove(&ofconn->node);
1167     switch_status_unregister(ofconn->ss);
1168     rconn_destroy(ofconn->rconn);
1169     rconn_packet_counter_destroy(ofconn->packet_in_counter);
1170     rconn_packet_counter_destroy(ofconn->reply_counter);
1171     pktbuf_destroy(ofconn->pktbuf);
1172     free(ofconn);
1173 }
1174
1175 static void
1176 ofconn_run(struct ofconn *ofconn, struct ofproto *p)
1177 {
1178     int iteration;
1179     size_t i;
1180
1181     if (ofconn->discovery) {
1182         char *controller_name;
1183         if (rconn_is_connectivity_questionable(ofconn->rconn)) {
1184             discovery_question_connectivity(ofconn->discovery);
1185         }
1186         if (discovery_run(ofconn->discovery, &controller_name)) {
1187             if (controller_name) {
1188                 char *ofconn_name = ofconn_make_name(p, controller_name);
1189                 rconn_connect(ofconn->rconn, controller_name, ofconn_name);
1190                 free(ofconn_name);
1191             } else {
1192                 rconn_disconnect(ofconn->rconn);
1193             }
1194         }
1195     }
1196
1197     for (i = 0; i < N_SCHEDULERS; i++) {
1198         pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
1199     }
1200
1201     rconn_run(ofconn->rconn);
1202
1203     if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
1204         /* Limit the number of iterations to prevent other tasks from
1205          * starving. */
1206         for (iteration = 0; iteration < 50; iteration++) {
1207             struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
1208             if (!of_msg) {
1209                 break;
1210             }
1211             if (p->fail_open) {
1212                 fail_open_maybe_recover(p->fail_open);
1213             }
1214             handle_openflow(ofconn, p, of_msg);
1215             ofpbuf_delete(of_msg);
1216         }
1217     }
1218
1219     if (!ofconn->discovery && !rconn_is_alive(ofconn->rconn)) {
1220         ofconn_destroy(ofconn);
1221     }
1222 }
1223
1224 static void
1225 ofconn_wait(struct ofconn *ofconn)
1226 {
1227     int i;
1228
1229     if (ofconn->discovery) {
1230         discovery_wait(ofconn->discovery);
1231     }
1232     for (i = 0; i < N_SCHEDULERS; i++) {
1233         pinsched_wait(ofconn->schedulers[i]);
1234     }
1235     rconn_run_wait(ofconn->rconn);
1236     if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
1237         rconn_recv_wait(ofconn->rconn);
1238     } else {
1239         COVERAGE_INC(ofproto_ofconn_stuck);
1240     }
1241 }
1242
1243 /* Returns true if 'ofconn' should receive asynchronous messages. */
1244 static bool
1245 ofconn_receives_async_msgs(const struct ofconn *ofconn)
1246 {
1247     if (ofconn->type == OFCONN_PRIMARY) {
1248         /* Primary controllers always get asynchronous messages unless they
1249          * have configured themselves as "slaves".  */
1250         return ofconn->role != NX_ROLE_SLAVE;
1251     } else {
1252         /* Service connections don't get asynchronous messages unless they have
1253          * explicitly asked for them by setting a nonzero miss send length. */
1254         return ofconn->miss_send_len > 0;
1255     }
1256 }
1257
1258 /* Returns a human-readable name for an OpenFlow connection between 'ofproto'
1259  * and 'target', suitable for use in log messages for identifying the
1260  * connection.
1261  *
1262  * The name is dynamically allocated.  The caller should free it (with free())
1263  * when it is no longer needed. */
1264 static char *
1265 ofconn_make_name(const struct ofproto *ofproto, const char *target)
1266 {
1267     return xasprintf("%s<->%s", wdp_base_name(ofproto->wdp), target);
1268 }
1269
1270 static void
1271 ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
1272 {
1273     int i;
1274
1275     for (i = 0; i < N_SCHEDULERS; i++) {
1276         struct pinsched **s = &ofconn->schedulers[i];
1277
1278         if (rate > 0) {
1279             if (!*s) {
1280                 *s = pinsched_create(rate, burst,
1281                                      ofconn->ofproto->switch_status);
1282             } else {
1283                 pinsched_set_limits(*s, rate, burst);
1284             }
1285         } else {
1286             pinsched_destroy(*s);
1287             *s = NULL;
1288         }
1289     }
1290 }
1291 \f
1292 static void
1293 ofservice_reconfigure(struct ofservice *ofservice,
1294                       const struct ofproto_controller *c)
1295 {
1296     ofservice->probe_interval = c->probe_interval;
1297     ofservice->rate_limit = c->rate_limit;
1298     ofservice->burst_limit = c->burst_limit;
1299 }
1300
1301 /* Creates a new ofservice in 'ofproto'.  Returns 0 if successful, otherwise a
1302  * positive errno value. */
1303 static int
1304 ofservice_create(struct ofproto *ofproto, const struct ofproto_controller *c)
1305 {
1306     struct ofservice *ofservice;
1307     struct pvconn *pvconn;
1308     int error;
1309
1310     error = pvconn_open(c->target, &pvconn);
1311     if (error) {
1312         return error;
1313     }
1314
1315     ofservice = xzalloc(sizeof *ofservice);
1316     hmap_insert(&ofproto->services, &ofservice->node,
1317                 hash_string(c->target, 0));
1318     ofservice->pvconn = pvconn;
1319
1320     ofservice_reconfigure(ofservice, c);
1321
1322     return 0;
1323 }
1324
1325 static void
1326 ofservice_destroy(struct ofproto *ofproto, struct ofservice *ofservice)
1327 {
1328     hmap_remove(&ofproto->services, &ofservice->node);
1329     pvconn_close(ofservice->pvconn);
1330     free(ofservice);
1331 }
1332
1333 /* Finds and returns the ofservice within 'ofproto' that has the given
1334  * 'target', or a null pointer if none exists. */
1335 static struct ofservice *
1336 ofservice_lookup(struct ofproto *ofproto, const char *target)
1337 {
1338     struct ofservice *ofservice;
1339
1340     HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
1341                              &ofproto->services) {
1342         if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
1343             return ofservice;
1344         }
1345     }
1346     return NULL;
1347 }
1348 \f
1349 static bool
1350 rule_has_out_port(const struct wdp_rule *rule, uint16_t out_port)
1351 {
1352     const union ofp_action *oa;
1353     struct actions_iterator i;
1354
1355     if (out_port == htons(OFPP_NONE)) {
1356         return true;
1357     }
1358     for (oa = actions_first(&i, rule->actions, rule->n_actions); oa;
1359          oa = actions_next(&i)) {
1360         if (action_outputs_to_port(oa, out_port)) {
1361             return true;
1362         }
1363     }
1364     return false;
1365 }
1366 \f
1367 static void
1368 queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
1369          struct rconn_packet_counter *counter)
1370 {
1371     update_openflow_length(msg);
1372     if (rconn_send(ofconn->rconn, msg, counter)) {
1373         ofpbuf_delete(msg);
1374     }
1375 }
1376
1377 static void
1378 send_error_oh(const struct ofconn *ofconn, const struct ofp_header *oh,
1379               int error)
1380 {
1381     struct ofpbuf *buf = make_ofp_error_msg(error, oh);
1382     if (buf) {
1383         COVERAGE_INC(ofproto_error);
1384         queue_tx(buf, ofconn, ofconn->reply_counter);
1385     }
1386 }
1387
1388 static int
1389 handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh)
1390 {
1391     struct ofp_header *rq = oh;
1392     queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter);
1393     return 0;
1394 }
1395
1396 static int
1397 handle_features_request(struct ofproto *p, struct ofconn *ofconn,
1398                         struct ofp_header *oh)
1399 {
1400     struct ofpbuf *features;
1401     int error;
1402
1403     error = wdp_get_features(p->wdp, &features);
1404     if (!error) {
1405         struct ofp_switch_features *osf = features->data;
1406
1407         update_openflow_length(features);
1408         osf->header.version = OFP_VERSION;
1409         osf->header.type = OFPT_FEATURES_REPLY;
1410         osf->header.xid = oh->xid;
1411
1412         osf->datapath_id = htonll(p->datapath_id);
1413         osf->n_buffers = htonl(pktbuf_capacity());
1414         memset(osf->pad, 0, sizeof osf->pad);
1415
1416         /* Turn on capabilities implemented by ofproto. */
1417         osf->capabilities |= htonl(OFPC_FLOW_STATS | OFPC_TABLE_STATS |
1418                                    OFPC_PORT_STATS);
1419
1420         queue_tx(features, ofconn, ofconn->reply_counter);
1421     }
1422     return error;
1423 }
1424
1425 static int
1426 handle_get_config_request(struct ofproto *p, struct ofconn *ofconn,
1427                           struct ofp_header *oh)
1428 {
1429     struct ofpbuf *buf;
1430     struct ofp_switch_config *osc;
1431     uint16_t flags;
1432     bool drop_frags;
1433
1434     /* Figure out flags. */
1435     wdp_get_drop_frags(p->wdp, &drop_frags);
1436     flags = drop_frags ? OFPC_FRAG_DROP : OFPC_FRAG_NORMAL;
1437
1438     /* Send reply. */
1439     osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
1440     osc->flags = htons(flags);
1441     osc->miss_send_len = htons(ofconn->miss_send_len);
1442     queue_tx(buf, ofconn, ofconn->reply_counter);
1443
1444     return 0;
1445 }
1446
1447 static int
1448 handle_set_config(struct ofproto *p, struct ofconn *ofconn,
1449                   struct ofp_switch_config *osc)
1450 {
1451     uint16_t flags;
1452     int error;
1453
1454     error = check_ofp_message(&osc->header, OFPT_SET_CONFIG, sizeof *osc);
1455     if (error) {
1456         return error;
1457     }
1458     flags = ntohs(osc->flags);
1459
1460     if (ofconn->type == OFCONN_PRIMARY && ofconn->role != NX_ROLE_SLAVE) {
1461         switch (flags & OFPC_FRAG_MASK) {
1462         case OFPC_FRAG_NORMAL:
1463             wdp_set_drop_frags(p->wdp, false);
1464             break;
1465         case OFPC_FRAG_DROP:
1466             wdp_set_drop_frags(p->wdp, true);
1467             break;
1468         default:
1469             VLOG_WARN_RL(&rl, "requested bad fragment mode (flags=%"PRIx16")",
1470                          osc->flags);
1471             break;
1472         }
1473     }
1474
1475     ofconn->miss_send_len = ntohs(osc->miss_send_len);
1476
1477     return 0;
1478 }
1479
1480 /* Checks whether 'ofconn' is a slave controller.  If so, returns an OpenFlow
1481  * error message code (composed with ofp_mkerr()) for the caller to propagate
1482  * upward.  Otherwise, returns 0.
1483  *
1484  * 'oh' is used to make log messages more informative. */
1485 static int
1486 reject_slave_controller(struct ofconn *ofconn, const struct ofp_header *oh)
1487 {
1488     if (ofconn->type == OFCONN_PRIMARY && ofconn->role == NX_ROLE_SLAVE) {
1489         static struct vlog_rate_limit perm_rl = VLOG_RATE_LIMIT_INIT(1, 5);
1490         char *type_name;
1491
1492         type_name = ofp_message_type_to_string(oh->type);
1493         VLOG_WARN_RL(&perm_rl, "rejecting %s message from slave controller",
1494                      type_name);
1495         free(type_name);
1496
1497         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
1498     } else {
1499         return 0;
1500     }
1501 }
1502
1503 static int
1504 handle_packet_out(struct ofproto *p, struct ofconn *ofconn,
1505                   struct ofp_header *oh)
1506 {
1507     struct ofp_packet_out *opo;
1508     struct ofpbuf payload, *buffer;
1509     struct ofp_action_header *actions;
1510     int n_actions;
1511     uint16_t in_port;
1512     flow_t flow;
1513     int error;
1514
1515     error = reject_slave_controller(ofconn, oh);
1516     if (error) {
1517         return error;
1518     }
1519
1520     error = check_ofp_packet_out(oh, &payload, &n_actions, p->max_ports);
1521     if (error) {
1522         return error;
1523     }
1524     opo = (struct ofp_packet_out *) oh;
1525     actions = opo->actions;
1526
1527     COVERAGE_INC(ofproto_packet_out);
1528     if (opo->buffer_id != htonl(UINT32_MAX)) {
1529         error = pktbuf_retrieve(ofconn->pktbuf, ntohl(opo->buffer_id),
1530                                 &buffer, &in_port);
1531         if (error || !buffer) {
1532             return error;
1533         }
1534         payload = *buffer;
1535     } else {
1536         buffer = NULL;
1537     }
1538
1539     flow_extract(&payload, 0, ntohs(opo->in_port), &flow);
1540     wdp_execute(p->wdp, flow.in_port, (const union ofp_action *) actions,
1541                 n_actions, &payload);
1542     ofpbuf_delete(buffer);
1543
1544     return 0;
1545 }
1546
1547 static int
1548 handle_port_mod(struct ofproto *p, struct ofconn *ofconn,
1549                 struct ofp_header *oh)
1550 {
1551     const struct ofp_port_mod *opm;
1552     struct wdp_port port;
1553     int error;
1554
1555     error = reject_slave_controller(ofconn, oh);
1556     if (error) {
1557         return error;
1558     }
1559     error = check_ofp_message(oh, OFPT_PORT_MOD, sizeof *opm);
1560     if (error) {
1561         return error;
1562     }
1563     opm = (struct ofp_port_mod *) oh;
1564
1565     if (wdp_port_query_by_number(p->wdp, ntohs(opm->port_no), &port)) {
1566         error = ofp_mkerr(OFPET_PORT_MOD_FAILED, OFPPMFC_BAD_PORT);
1567     } else if (memcmp(port.opp.hw_addr, opm->hw_addr, OFP_ETH_ALEN)) {
1568         error = ofp_mkerr(OFPET_PORT_MOD_FAILED, OFPPMFC_BAD_HW_ADDR);
1569     } else {
1570         uint32_t mask, new_config;
1571
1572         mask = ntohl(opm->mask) & (OFPPC_PORT_DOWN | OFPPC_NO_STP
1573                                    | OFPPC_NO_RECV | OFPPC_NO_RECV_STP
1574                                    | OFPPC_NO_FLOOD | OFPPC_NO_FWD
1575                                    | OFPPC_NO_PACKET_IN);
1576         new_config = (port.opp.config & ~mask) | (ntohl(opm->config) & mask);
1577         if (new_config != port.opp.config) {
1578             error = wdp_port_set_config(p->wdp, ntohs(opm->port_no),
1579                                         new_config);
1580         }
1581         if (opm->advertise) {
1582             netdev_set_advertisements(port.netdev, ntohl(opm->advertise));
1583         }
1584     }
1585     wdp_port_free(&port);
1586
1587     return error;
1588 }
1589
1590 static struct ofpbuf *
1591 make_stats_reply(uint32_t xid, uint16_t type, size_t body_len)
1592 {
1593     struct ofp_stats_reply *osr;
1594     struct ofpbuf *msg;
1595
1596     msg = ofpbuf_new(MIN(sizeof *osr + body_len, UINT16_MAX));
1597     osr = put_openflow_xid(sizeof *osr, OFPT_STATS_REPLY, xid, msg);
1598     osr->type = type;
1599     osr->flags = htons(0);
1600     return msg;
1601 }
1602
1603 static struct ofpbuf *
1604 start_stats_reply(const struct ofp_stats_request *request, size_t body_len)
1605 {
1606     return make_stats_reply(request->header.xid, request->type, body_len);
1607 }
1608
1609 static void *
1610 append_stats_reply(size_t nbytes, struct ofconn *ofconn, struct ofpbuf **msgp)
1611 {
1612     struct ofpbuf *msg = *msgp;
1613     assert(nbytes <= UINT16_MAX - sizeof(struct ofp_stats_reply));
1614     if (nbytes + msg->size > UINT16_MAX) {
1615         struct ofp_stats_reply *reply = msg->data;
1616         reply->flags = htons(OFPSF_REPLY_MORE);
1617         *msgp = make_stats_reply(reply->header.xid, reply->type, nbytes);
1618         queue_tx(msg, ofconn, ofconn->reply_counter);
1619     }
1620     return ofpbuf_put_uninit(*msgp, nbytes);
1621 }
1622
1623 static int
1624 handle_desc_stats_request(struct ofproto *p, struct ofconn *ofconn,
1625                            struct ofp_stats_request *request)
1626 {
1627     struct ofp_desc_stats *ods;
1628     struct ofpbuf *msg;
1629
1630     msg = start_stats_reply(request, sizeof *ods);
1631     ods = append_stats_reply(sizeof *ods, ofconn, &msg);
1632     memset(ods, 0, sizeof *ods);
1633     ovs_strlcpy(ods->mfr_desc, p->mfr_desc, sizeof ods->mfr_desc);
1634     ovs_strlcpy(ods->hw_desc, p->hw_desc, sizeof ods->hw_desc);
1635     ovs_strlcpy(ods->sw_desc, p->sw_desc, sizeof ods->sw_desc);
1636     ovs_strlcpy(ods->serial_num, p->serial_desc, sizeof ods->serial_num);
1637     ovs_strlcpy(ods->dp_desc, p->dp_desc, sizeof ods->dp_desc);
1638     queue_tx(msg, ofconn, ofconn->reply_counter);
1639
1640     return 0;
1641 }
1642
1643 static int
1644 handle_table_stats_request(struct ofproto *p, struct ofconn *ofconn,
1645                            struct ofp_stats_request *request)
1646 {
1647     struct ofpbuf *msg;
1648     int error;
1649
1650     msg = start_stats_reply(request, sizeof(struct ofp_table_stats) * 3);
1651     error = wdp_get_table_stats(p->wdp, msg);
1652     if (!error) {
1653         queue_tx(msg, ofconn, ofconn->reply_counter);
1654     } else {
1655         ofpbuf_delete(msg);
1656     }
1657     return error;
1658 }
1659
1660 static void
1661 append_port_stat(struct wdp_port *port, struct ofconn *ofconn,
1662                  struct ofpbuf **msgp)
1663 {
1664     struct netdev_stats stats;
1665     struct ofp_port_stats *ops;
1666
1667     /* Intentionally ignore return value, since errors will set
1668      * 'stats' to all-1s, which is correct for OpenFlow, and
1669      * netdev_get_stats() will log errors. */
1670     netdev_get_stats(port->netdev, &stats);
1671
1672     ops = append_stats_reply(sizeof *ops, ofconn, msgp);
1673     ops->port_no = htons(port->opp.port_no);
1674     memset(ops->pad, 0, sizeof ops->pad);
1675     ops->rx_packets = htonll(stats.rx_packets);
1676     ops->tx_packets = htonll(stats.tx_packets);
1677     ops->rx_bytes = htonll(stats.rx_bytes);
1678     ops->tx_bytes = htonll(stats.tx_bytes);
1679     ops->rx_dropped = htonll(stats.rx_dropped);
1680     ops->tx_dropped = htonll(stats.tx_dropped);
1681     ops->rx_errors = htonll(stats.rx_errors);
1682     ops->tx_errors = htonll(stats.tx_errors);
1683     ops->rx_frame_err = htonll(stats.rx_frame_errors);
1684     ops->rx_over_err = htonll(stats.rx_over_errors);
1685     ops->rx_crc_err = htonll(stats.rx_crc_errors);
1686     ops->collisions = htonll(stats.collisions);
1687 }
1688
1689 static int
1690 handle_port_stats_request(struct ofproto *p, struct ofconn *ofconn,
1691                           struct ofp_stats_request *osr,
1692                           size_t arg_size)
1693 {
1694     struct ofp_port_stats_request *psr;
1695     struct ofp_port_stats *ops;
1696     struct ofpbuf *msg;
1697
1698     if (arg_size != sizeof *psr) {
1699         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1700     }
1701     psr = (struct ofp_port_stats_request *) osr->body;
1702
1703     msg = start_stats_reply(osr, sizeof *ops * 16);
1704     if (psr->port_no != htons(OFPP_NONE)) {
1705         struct wdp_port port;
1706
1707         if (!wdp_port_query_by_number(p->wdp, ntohs(psr->port_no), &port)) {
1708             append_port_stat(&port, ofconn, &msg);
1709             wdp_port_free(&port);
1710         }
1711     } else {
1712         struct wdp_port *ports;
1713         size_t n_ports;
1714         size_t i;
1715
1716         wdp_port_list(p->wdp, &ports, &n_ports);
1717         for (i = 0; i < n_ports; i++) {
1718             append_port_stat(&ports[i], ofconn, &msg);
1719         }
1720         wdp_port_array_free(ports, n_ports);
1721     }
1722
1723     queue_tx(msg, ofconn, ofconn->reply_counter);
1724     return 0;
1725 }
1726
1727 struct flow_stats_cbdata {
1728     struct ofproto *ofproto;
1729     struct ofconn *ofconn;
1730     uint16_t out_port;
1731     struct ofpbuf *msg;
1732 };
1733
1734 /* Obtains statistic counters for 'rule' within 'p' and stores them into
1735  * '*packet_countp' and '*byte_countp'.  If 'rule' is a wildcarded rule, the
1736  * returned statistic include statistics for all of 'rule''s subrules. */
1737 static void
1738 query_stats(struct ofproto *p, struct wdp_rule *rule,
1739             uint64_t *packet_countp, uint64_t *byte_countp)
1740 {
1741     struct wdp_flow_stats stats;
1742
1743     if (!wdp_flow_get_stats(p->wdp, rule, &stats)) {
1744         *packet_countp = stats.n_packets;
1745         *byte_countp = stats.n_bytes;
1746     } else {
1747         *packet_countp = 0;
1748         *byte_countp = 0;
1749     }
1750 }
1751
1752 static int
1753 flow_stats_cb(struct wdp_rule *rule, void *cbdata_)
1754 {
1755     struct flow_stats_cbdata *cbdata = cbdata_;
1756     struct ofp_flow_stats *ofs;
1757     uint64_t packet_count, byte_count;
1758     size_t act_len, len;
1759     long long int tdiff = time_msec() - rule->created;
1760     uint32_t sec = tdiff / 1000;
1761     uint32_t msec = tdiff - (sec * 1000);
1762
1763     if (rule_is_hidden(rule)
1764         || !rule_has_out_port(rule, cbdata->out_port)) {
1765         return 0;
1766     }
1767
1768     act_len = sizeof *rule->actions * rule->n_actions;
1769     len = offsetof(struct ofp_flow_stats, actions) + act_len;
1770
1771     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1772
1773     ofs = append_stats_reply(len, cbdata->ofconn, &cbdata->msg);
1774     ofs->length = htons(len);
1775     ofs->table_id = rule->ofp_table_id;
1776     ofs->pad = 0;
1777     flow_to_match(&rule->cr.flow, cbdata->ofproto->tun_id_from_cookie,
1778                   &ofs->match);
1779     ofs->duration_sec = htonl(sec);
1780     ofs->duration_nsec = htonl(msec * 1000000);
1781     ofs->cookie = ofproto_rule_cast(rule)->flow_cookie;
1782     ofs->priority = htons(rule->cr.flow.priority);
1783     ofs->idle_timeout = htons(rule->idle_timeout);
1784     ofs->hard_timeout = htons(rule->hard_timeout);
1785     memset(ofs->pad2, 0, sizeof ofs->pad2);
1786     ofs->packet_count = htonll(packet_count);
1787     ofs->byte_count = htonll(byte_count);
1788     memcpy(ofs->actions, rule->actions, act_len);
1789
1790     return 0;
1791 }
1792
1793 static unsigned int
1794 table_id_to_include(uint8_t table_id)
1795 {
1796     return (table_id == 0xff ? UINT_MAX
1797             : table_id < 32 ? 1u << table_id
1798             : 0);
1799 }
1800
1801 static unsigned int
1802 flow_mod_table_id(const struct ofconn *ofconn, const struct ofp_flow_mod *ofm)
1803 {
1804     return ofconn->flow_mod_table_id ? ntohs(ofm->command) >> 8 : 0xff;
1805 }
1806
1807 static int
1808 handle_flow_stats_request(struct ofproto *p, struct ofconn *ofconn,
1809                           const struct ofp_stats_request *osr,
1810                           size_t arg_size)
1811 {
1812     struct ofp_flow_stats_request *fsr;
1813     struct flow_stats_cbdata cbdata;
1814     flow_t target;
1815
1816     if (arg_size != sizeof *fsr) {
1817         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1818     }
1819     fsr = (struct ofp_flow_stats_request *) osr->body;
1820
1821     COVERAGE_INC(ofproto_flows_req);
1822     cbdata.ofproto = p;
1823     cbdata.ofconn = ofconn;
1824     cbdata.out_port = fsr->out_port;
1825     cbdata.msg = start_stats_reply(osr, 1024);
1826     flow_from_match(&fsr->match, 0, false, 0, &target);
1827     wdp_flow_for_each_match(p->wdp, &target,
1828                             table_id_to_include(fsr->table_id),
1829                             flow_stats_cb, &cbdata);
1830     queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
1831     return 0;
1832 }
1833
1834 struct flow_stats_ds_cbdata {
1835     struct ofproto *ofproto;
1836     struct ds *results;
1837 };
1838
1839 static int
1840 flow_stats_ds_cb(struct wdp_rule *rule, void *cbdata_)
1841 {
1842     struct flow_stats_ds_cbdata *cbdata = cbdata_;
1843     struct ds *results = cbdata->results;
1844     struct ofp_match match;
1845     uint64_t packet_count, byte_count;
1846     size_t act_len = sizeof *rule->actions * rule->n_actions;
1847
1848     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1849     flow_to_match(&rule->cr.flow, cbdata->ofproto->tun_id_from_cookie,
1850                   &match);
1851
1852     ds_put_format(results, "duration=%llds, ",
1853                   (time_msec() - rule->created) / 1000);
1854     ds_put_format(results, "priority=%u, ", rule->cr.flow.priority);
1855     ds_put_format(results, "n_packets=%"PRIu64", ", packet_count);
1856     ds_put_format(results, "n_bytes=%"PRIu64", ", byte_count);
1857     ofp_print_match(results, &match, true);
1858     ofp_print_actions(results, &rule->actions->header, act_len);
1859     ds_put_cstr(results, "\n");
1860
1861     return 0;
1862 }
1863
1864 /* Adds a pretty-printed description of all flows to 'results', including
1865  * those marked hidden by secchan (e.g., by in-band control). */
1866 void
1867 ofproto_get_all_flows(struct ofproto *p, struct ds *results)
1868 {
1869     struct flow_stats_ds_cbdata cbdata;
1870     struct ofp_match match;
1871     flow_t target;
1872
1873     memset(&match, 0, sizeof match);
1874     match.wildcards = htonl(OVSFW_ALL);
1875
1876     cbdata.ofproto = p;
1877     cbdata.results = results;
1878
1879     flow_from_match(&match, 0, false, 0, &target);
1880     wdp_flow_for_each_match(p->wdp, &target, UINT_MAX,
1881                             flow_stats_ds_cb, &cbdata);
1882 }
1883
1884 struct aggregate_stats_cbdata {
1885     struct ofproto *ofproto;
1886     uint16_t out_port;
1887     uint64_t packet_count;
1888     uint64_t byte_count;
1889     uint32_t n_flows;
1890 };
1891
1892 static int
1893 aggregate_stats_cb(struct wdp_rule *rule, void *cbdata_)
1894 {
1895     struct aggregate_stats_cbdata *cbdata = cbdata_;
1896     uint64_t packet_count, byte_count;
1897
1898     if (rule_is_hidden(rule) || !rule_has_out_port(rule, cbdata->out_port)) {
1899         return 0;
1900     }
1901
1902     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1903
1904     cbdata->packet_count += packet_count;
1905     cbdata->byte_count += byte_count;
1906     cbdata->n_flows++;
1907
1908     return 0;
1909 }
1910
1911 static int
1912 handle_aggregate_stats_request(struct ofproto *p, struct ofconn *ofconn,
1913                                const struct ofp_stats_request *osr,
1914                                size_t arg_size)
1915 {
1916     struct ofp_aggregate_stats_request *asr;
1917     struct ofp_aggregate_stats_reply *reply;
1918     struct aggregate_stats_cbdata cbdata;
1919     struct ofpbuf *msg;
1920     flow_t target;
1921
1922     if (arg_size != sizeof *asr) {
1923         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1924     }
1925     asr = (struct ofp_aggregate_stats_request *) osr->body;
1926
1927     COVERAGE_INC(ofproto_agg_request);
1928     cbdata.ofproto = p;
1929     cbdata.out_port = asr->out_port;
1930     cbdata.packet_count = 0;
1931     cbdata.byte_count = 0;
1932     cbdata.n_flows = 0;
1933     flow_from_match(&asr->match, 0, false, 0, &target);
1934     wdp_flow_for_each_match(p->wdp, &target,
1935                             table_id_to_include(asr->table_id),
1936                             aggregate_stats_cb, &cbdata);
1937
1938     msg = start_stats_reply(osr, sizeof *reply);
1939     reply = append_stats_reply(sizeof *reply, ofconn, &msg);
1940     reply->flow_count = htonl(cbdata.n_flows);
1941     reply->packet_count = htonll(cbdata.packet_count);
1942     reply->byte_count = htonll(cbdata.byte_count);
1943     queue_tx(msg, ofconn, ofconn->reply_counter);
1944     return 0;
1945 }
1946
1947 struct queue_stats_cbdata {
1948     struct ofconn *ofconn;
1949     struct wdp_port *wdp_port;
1950     struct ofpbuf *msg;
1951 };
1952
1953 static void
1954 put_queue_stats(struct queue_stats_cbdata *cbdata, uint32_t queue_id,
1955                 const struct netdev_queue_stats *stats)
1956 {
1957     struct ofp_queue_stats *reply;
1958
1959     reply = append_stats_reply(sizeof *reply, cbdata->ofconn, &cbdata->msg);
1960     reply->port_no = htons(cbdata->wdp_port->opp.port_no);
1961     memset(reply->pad, 0, sizeof reply->pad);
1962     reply->queue_id = htonl(queue_id);
1963     reply->tx_bytes = htonll(stats->tx_bytes);
1964     reply->tx_packets = htonll(stats->tx_packets);
1965     reply->tx_errors = htonll(stats->tx_errors);
1966 }
1967
1968 static void
1969 handle_queue_stats_dump_cb(uint32_t queue_id,
1970                            struct netdev_queue_stats *stats,
1971                            void *cbdata_)
1972 {
1973     struct queue_stats_cbdata *cbdata = cbdata_;
1974
1975     put_queue_stats(cbdata, queue_id, stats);
1976 }
1977
1978 static void
1979 handle_queue_stats_for_port(struct wdp_port *port, uint32_t queue_id,
1980                             struct queue_stats_cbdata *cbdata)
1981 {
1982     cbdata->wdp_port = port;
1983     if (queue_id == OFPQ_ALL) {
1984         netdev_dump_queue_stats(port->netdev,
1985                                 handle_queue_stats_dump_cb, cbdata);
1986     } else {
1987         struct netdev_queue_stats stats;
1988
1989         if (!netdev_get_queue_stats(port->netdev, queue_id, &stats)) {
1990             put_queue_stats(cbdata, queue_id, &stats);
1991         }
1992     }
1993 }
1994
1995 static int
1996 handle_queue_stats_request(struct ofproto *ofproto, struct ofconn *ofconn,
1997                            const struct ofp_stats_request *osr,
1998                            size_t arg_size)
1999 {
2000     struct ofp_queue_stats_request *qsr;
2001     struct queue_stats_cbdata cbdata;
2002     unsigned int port_no;
2003     uint32_t queue_id;
2004
2005     if (arg_size != sizeof *qsr) {
2006         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2007     }
2008     qsr = (struct ofp_queue_stats_request *) osr->body;
2009
2010     COVERAGE_INC(ofproto_queue_req);
2011
2012     cbdata.ofconn = ofconn;
2013     cbdata.msg = start_stats_reply(osr, 128);
2014
2015     port_no = ntohs(qsr->port_no);
2016     queue_id = ntohl(qsr->queue_id);
2017     if (port_no == OFPP_ALL) {
2018         struct wdp_port *ports;
2019         size_t n_ports, i;
2020
2021         wdp_port_list(ofproto->wdp, &ports, &n_ports);
2022         /* XXX deal with wdp_port_list() errors */
2023         for (i = 0; i < n_ports; i++) {
2024             handle_queue_stats_for_port(&ports[i], queue_id, &cbdata);
2025         }
2026         wdp_port_array_free(ports, n_ports);
2027     } else if (port_no < ofproto->max_ports) {
2028         struct wdp_port port;
2029         int error;
2030
2031         error = wdp_port_query_by_number(ofproto->wdp, port_no, &port);
2032         if (!error) {
2033             handle_queue_stats_for_port(&port, queue_id, &cbdata);
2034         } else {
2035             /* XXX deal with wdp_port_query_by_number() errors */
2036         }
2037         wdp_port_free(&port);
2038     } else {
2039         ofpbuf_delete(cbdata.msg);
2040         return ofp_mkerr(OFPET_QUEUE_OP_FAILED, OFPQOFC_BAD_PORT);
2041     }
2042     queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
2043
2044     return 0;
2045 }
2046
2047 static int
2048 handle_stats_request(struct ofproto *p, struct ofconn *ofconn,
2049                      struct ofp_header *oh)
2050 {
2051     struct ofp_stats_request *osr;
2052     size_t arg_size;
2053     int error;
2054
2055     error = check_ofp_message_array(oh, OFPT_STATS_REQUEST, sizeof *osr,
2056                                     1, &arg_size);
2057     if (error) {
2058         return error;
2059     }
2060     osr = (struct ofp_stats_request *) oh;
2061
2062     switch (ntohs(osr->type)) {
2063     case OFPST_DESC:
2064         return handle_desc_stats_request(p, ofconn, osr);
2065
2066     case OFPST_FLOW:
2067         return handle_flow_stats_request(p, ofconn, osr, arg_size);
2068
2069     case OFPST_AGGREGATE:
2070         return handle_aggregate_stats_request(p, ofconn, osr, arg_size);
2071
2072     case OFPST_TABLE:
2073         return handle_table_stats_request(p, ofconn, osr);
2074
2075     case OFPST_PORT:
2076         return handle_port_stats_request(p, ofconn, osr, arg_size);
2077
2078     case OFPST_QUEUE:
2079         return handle_queue_stats_request(p, ofconn, osr, arg_size);
2080
2081     case OFPST_VENDOR:
2082         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
2083
2084     default:
2085         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_STAT);
2086     }
2087 }
2088
2089 /* Implements OFPFC_ADD and the cases for OFPFC_MODIFY and OFPFC_MODIFY_STRICT
2090  * in which no matching flow already exists in the flow table.
2091  *
2092  * Adds the flow specified by 'ofm', which is followed by 'n_actions'
2093  * ofp_actions, to 'p''s flow table.  Returns 0 on success or an OpenFlow error
2094  * code as encoded by ofp_mkerr() on failure.
2095  *
2096  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2097  * if any. */
2098 static int
2099 add_flow(struct ofproto *p, struct ofconn *ofconn,
2100          const struct ofp_flow_mod *ofm, size_t n_actions)
2101 {
2102     struct wdp_rule *rule;
2103     struct wdp_flow_put put;
2104     struct ofpbuf *packet;
2105     uint16_t in_port;
2106     flow_t flow;
2107     int error;
2108
2109     flow_from_match(&ofm->match, ntohs(ofm->priority), p->tun_id_from_cookie,
2110                     ofm->cookie, &flow);
2111     if (ofm->flags & htons(OFPFF_CHECK_OVERLAP)
2112         && wdp_flow_overlaps(p->wdp, &flow)) {
2113         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_OVERLAP);
2114     }
2115
2116     put.flags = WDP_PUT_CREATE | WDP_PUT_MODIFY | WDP_PUT_ALL;
2117     put.flow = &flow;
2118     put.actions = (const union ofp_action *) ofm->actions;
2119     put.n_actions = n_actions;
2120     put.idle_timeout = ntohs(ofm->idle_timeout);
2121     put.hard_timeout = ntohs(ofm->hard_timeout);
2122     put.ofp_table_id = flow_mod_table_id(ofconn, ofm);
2123     put.cookie = ofm->cookie;
2124     put.xid = ofm->header.xid;
2125     error = wdp_flow_put(p->wdp, &put, NULL, &rule);
2126     if (error) {
2127         /* XXX wdp_flow_put should return OpenFlow error code. */
2128         return error;
2129     }
2130     ofproto_rule_init(rule);
2131
2132     if (ofm->buffer_id != htonl(UINT32_MAX)) {
2133         error = pktbuf_retrieve(ofconn->pktbuf, ntohl(ofm->buffer_id),
2134                                 &packet, &in_port);
2135         if (!error) {
2136             error = wdp_flow_inject(p->wdp, rule, in_port, packet);
2137             ofpbuf_delete(packet);
2138         }
2139     }
2140
2141     return error;
2142 }
2143
2144 static struct wdp_rule *
2145 find_flow_strict(struct ofproto *p, const struct ofconn *ofconn,
2146                  const struct ofp_flow_mod *ofm)
2147 {
2148     uint8_t table_id;
2149     flow_t flow;
2150
2151     flow_from_match(&ofm->match, ntohs(ofm->priority),
2152                     p->tun_id_from_cookie, ofm->cookie, &flow);
2153     table_id = flow_mod_table_id(ofconn, ofm);
2154     return wdp_flow_get(p->wdp, &flow, table_id_to_include(table_id));
2155 }
2156
2157 static int
2158 send_buffered_packet(struct ofproto *ofproto, struct ofconn *ofconn,
2159                      struct wdp_rule *rule, const struct ofp_flow_mod *ofm)
2160 {
2161     struct ofpbuf *packet;
2162     uint16_t in_port;
2163     int error;
2164
2165     if (ofm->buffer_id == htonl(UINT32_MAX)) {
2166         return 0;
2167     }
2168
2169     error = pktbuf_retrieve(ofconn->pktbuf, ntohl(ofm->buffer_id),
2170                             &packet, &in_port);
2171     if (error) {
2172         return error;
2173     }
2174
2175     error = wdp_flow_inject(ofproto->wdp, rule, in_port, packet);
2176     ofpbuf_delete(packet);
2177
2178     return error;
2179 }
2180 \f
2181 /* OFPFC_MODIFY and OFPFC_MODIFY_STRICT. */
2182
2183 struct modify_flows_cbdata {
2184     struct ofproto *ofproto;
2185     const struct ofp_flow_mod *ofm;
2186     size_t n_actions;
2187     struct wdp_rule *match;
2188 };
2189
2190 static int modify_flow(struct ofproto *, const struct ofp_flow_mod *,
2191                        size_t n_actions, struct wdp_rule *)
2192     WARN_UNUSED_RESULT;
2193 static int modify_flows_cb(struct wdp_rule *, void *cbdata_);
2194
2195 /* Implements OFPFC_ADD and OFPFC_MODIFY.  Returns 0 on success or an OpenFlow
2196  * error code as encoded by ofp_mkerr() on failure.
2197  *
2198  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2199  * if any. */
2200 static int
2201 modify_flows_loose(struct ofproto *p, struct ofconn *ofconn,
2202                    const struct ofp_flow_mod *ofm, size_t n_actions)
2203 {
2204     struct modify_flows_cbdata cbdata;
2205     uint8_t table_id;
2206     flow_t target;
2207     int error;
2208
2209     cbdata.ofproto = p;
2210     cbdata.ofm = ofm;
2211     cbdata.n_actions = n_actions;
2212     cbdata.match = NULL;
2213
2214     flow_from_match(&ofm->match, 0, p->tun_id_from_cookie, ofm->cookie,
2215                     &target);
2216
2217     table_id = flow_mod_table_id(ofconn, ofm);
2218     error = wdp_flow_for_each_match(p->wdp, &target,
2219                                     table_id_to_include(table_id),
2220                                     modify_flows_cb, &cbdata);
2221     if (error) {
2222         return error;
2223     }
2224
2225     if (cbdata.match) {
2226         /* This credits the packet to whichever flow happened to match last.
2227          * That's weird.  Maybe we should do a lookup for the flow that
2228          * actually matches the packet?  Who knows. */
2229         return send_buffered_packet(p, ofconn, cbdata.match, ofm);
2230     } else {
2231         return add_flow(p, ofconn, ofm, n_actions);
2232     }
2233 }
2234
2235 /* Implements OFPFC_MODIFY_STRICT.  Returns 0 on success or an OpenFlow error
2236  * code as encoded by ofp_mkerr() on failure.
2237  *
2238  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2239  * if any. */
2240 static int
2241 modify_flow_strict(struct ofproto *p, struct ofconn *ofconn,
2242                    struct ofp_flow_mod *ofm, size_t n_actions)
2243 {
2244     struct wdp_rule *rule = find_flow_strict(p, ofconn, ofm);
2245     if (rule && !rule_is_hidden(rule)) {
2246         int error = modify_flow(p, ofm, n_actions, rule);
2247         return error ? error : send_buffered_packet(p, ofconn, rule, ofm);
2248     } else {
2249         return add_flow(p, ofconn, ofm, n_actions);
2250     }
2251 }
2252
2253 /* Callback for modify_flows_loose(). */
2254 static int
2255 modify_flows_cb(struct wdp_rule *rule, void *cbdata_)
2256 {
2257     struct modify_flows_cbdata *cbdata = cbdata_;
2258
2259     if (!rule_is_hidden(rule)) {
2260         cbdata->match = rule;
2261         return modify_flow(cbdata->ofproto, cbdata->ofm, cbdata->n_actions,
2262                            rule);
2263     }
2264     return 0;
2265 }
2266
2267 /* Implements core of OFPFC_MODIFY and OFPFC_MODIFY_STRICT where 'rule' has
2268  * been identified as a flow in 'p''s flow table to be modified, by changing
2269  * the rule's actions to match those in 'ofm' (which is followed by 'n_actions'
2270  * ofp_action[] structures). */
2271 static int
2272 modify_flow(struct ofproto *p, const struct ofp_flow_mod *ofm,
2273             size_t n_actions, struct wdp_rule *rule)
2274 {
2275     const struct ofp_action_header *actions = ofm->actions;
2276     struct ofproto_rule *ofproto_rule = ofproto_rule_cast(rule);
2277     struct wdp_flow_put put;
2278
2279     ofproto_rule->flow_cookie = ofm->cookie;
2280
2281     /* If the actions are the same, do nothing. */
2282     if (n_actions == rule->n_actions
2283         && !memcmp(ofm->actions, rule->actions, sizeof *actions * n_actions))
2284     {
2285         return 0;
2286     }
2287
2288     put.flags = WDP_PUT_MODIFY | WDP_PUT_ACTIONS;
2289     put.flow = &rule->cr.flow;
2290     put.actions = (const union ofp_action *) actions;
2291     put.n_actions = n_actions;
2292     put.idle_timeout = put.hard_timeout = 0;
2293     put.ofp_table_id = rule->ofp_table_id;
2294     put.cookie = ofm->cookie;
2295     put.xid = ofm->header.xid;
2296     return wdp_flow_put(p->wdp, &put, NULL, NULL);
2297 }
2298 \f
2299 /* OFPFC_DELETE implementation. */
2300
2301 struct delete_flows_cbdata {
2302     struct ofproto *ofproto;
2303     uint16_t out_port;
2304 };
2305
2306 static int delete_flows_cb(struct wdp_rule *, void *cbdata_);
2307 static int delete_flow_core(struct ofproto *, struct wdp_rule *,
2308                              uint16_t out_port);
2309
2310 /* Implements OFPFC_DELETE. */
2311 static int
2312 delete_flows_loose(struct ofproto *p, const struct ofconn *ofconn,
2313                    const struct ofp_flow_mod *ofm)
2314 {
2315     struct delete_flows_cbdata cbdata;
2316     uint8_t table_id;
2317     flow_t target;
2318
2319     cbdata.ofproto = p;
2320     cbdata.out_port = ofm->out_port;
2321
2322     flow_from_match(&ofm->match, 0, p->tun_id_from_cookie, ofm->cookie,
2323                     &target);
2324     table_id = flow_mod_table_id(ofconn, ofm);
2325
2326     return wdp_flow_for_each_match(p->wdp, &target,
2327                                    table_id_to_include(table_id),
2328                                    delete_flows_cb, &cbdata);
2329 }
2330
2331 /* Implements OFPFC_DELETE_STRICT. */
2332 static int
2333 delete_flow_strict(struct ofproto *p, const struct ofconn *ofconn,
2334                    struct ofp_flow_mod *ofm)
2335 {
2336     struct wdp_rule *rule = find_flow_strict(p, ofconn, ofm);
2337     if (rule) {
2338         return delete_flow_core(p, rule, ofm->out_port);
2339     }
2340     return 0;
2341 }
2342
2343 /* Callback for delete_flows_loose(). */
2344 static int
2345 delete_flows_cb(struct wdp_rule *rule, void *cbdata_)
2346 {
2347     struct delete_flows_cbdata *cbdata = cbdata_;
2348
2349     return delete_flow_core(cbdata->ofproto, rule, cbdata->out_port);
2350 }
2351
2352 /* Implements core of OFPFC_DELETE and OFPFC_DELETE_STRICT where 'rule' has
2353  * been identified as a flow to delete from 'p''s flow table, by deleting the
2354  * flow and sending out a OFPT_FLOW_REMOVED message to any interested
2355  * controller.
2356  *
2357  * Will not delete 'rule' if it is hidden.  Will delete 'rule' only if
2358  * 'out_port' is htons(OFPP_NONE) or if 'rule' actually outputs to the
2359  * specified 'out_port'. */
2360 static int
2361 delete_flow_core(struct ofproto *p, struct wdp_rule *rule, uint16_t out_port)
2362 {
2363     if (rule_is_hidden(rule)) {
2364         return 0;
2365     }
2366
2367     if (out_port != htons(OFPP_NONE) && !rule_has_out_port(rule, out_port)) {
2368         return 0;
2369     }
2370
2371     return delete_flow(p, rule, OFPRR_DELETE);
2372 }
2373 \f
2374 static int
2375 handle_flow_mod(struct ofproto *p, struct ofconn *ofconn,
2376                 struct ofp_flow_mod *ofm)
2377 {
2378     struct ofp_match orig_match;
2379     size_t n_actions;
2380     int error;
2381
2382     error = reject_slave_controller(ofconn, &ofm->header);
2383     if (error) {
2384         return error;
2385     }
2386     error = check_ofp_message_array(&ofm->header, OFPT_FLOW_MOD, sizeof *ofm,
2387                                     sizeof *ofm->actions, &n_actions);
2388     if (error) {
2389         return error;
2390     }
2391
2392     /* We do not support the emergency flow cache.  It will hopefully
2393      * get dropped from OpenFlow in the near future. */
2394     if (ofm->flags & htons(OFPFF_EMERG)) {
2395         /* There isn't a good fit for an error code, so just state that the
2396          * flow table is full. */
2397         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_ALL_TABLES_FULL);
2398     }
2399
2400     /* Normalize ofp->match.  If normalization actually changes anything, then
2401      * log the differences. */
2402     ofm->match.pad1[0] = ofm->match.pad2[0] = 0;
2403     orig_match = ofm->match;
2404     normalize_match(&ofm->match);
2405     if (memcmp(&ofm->match, &orig_match, sizeof orig_match)) {
2406         static struct vlog_rate_limit normal_rl = VLOG_RATE_LIMIT_INIT(1, 1);
2407         if (!VLOG_DROP_INFO(&normal_rl)) {
2408             char *old = ofp_match_to_literal_string(&orig_match);
2409             char *new = ofp_match_to_literal_string(&ofm->match);
2410             VLOG_INFO("%s: normalization changed ofp_match, details:",
2411                       rconn_get_name(ofconn->rconn));
2412             VLOG_INFO(" pre: %s", old);
2413             VLOG_INFO("post: %s", new);
2414             free(old);
2415             free(new);
2416         }
2417     }
2418
2419     if (!ofm->match.wildcards) {
2420         ofm->priority = htons(UINT16_MAX);
2421     }
2422
2423     error = validate_actions((const union ofp_action *) ofm->actions,
2424                              n_actions, p->max_ports);
2425     if (error) {
2426         return error;
2427     }
2428
2429     if (!ofconn->flow_mod_table_id && ofm->command & htons(0xff00)) {
2430         static struct vlog_rate_limit table_id_rl = VLOG_RATE_LIMIT_INIT(1, 1);
2431         VLOG_WARN_RL(&table_id_rl, "%s: flow_mod table_id feature must be "
2432                      "enabled with NXT_FLOW_MOD_TABLE_ID",
2433                      rconn_get_name(ofconn->rconn));
2434         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_BAD_COMMAND);
2435     }
2436
2437     switch (ntohs(ofm->command) & 0xff) {
2438     case OFPFC_ADD:
2439         return add_flow(p, ofconn, ofm, n_actions);
2440
2441     case OFPFC_MODIFY:
2442         return modify_flows_loose(p, ofconn, ofm, n_actions);
2443
2444     case OFPFC_MODIFY_STRICT:
2445         return modify_flow_strict(p, ofconn, ofm, n_actions);
2446
2447     case OFPFC_DELETE:
2448         return delete_flows_loose(p, ofconn, ofm);
2449
2450     case OFPFC_DELETE_STRICT:
2451         return delete_flow_strict(p, ofconn, ofm);
2452
2453     default:
2454         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_BAD_COMMAND);
2455     }
2456 }
2457
2458 static int
2459 handle_tun_id_from_cookie(struct ofproto *p, struct nxt_tun_id_cookie *msg)
2460 {
2461     int error;
2462
2463     error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
2464     if (error) {
2465         return error;
2466     }
2467
2468     p->tun_id_from_cookie = !!msg->set;
2469     return 0;
2470 }
2471
2472 static int
2473 handle_flow_mod_table_id(struct ofconn *ofconn,
2474                          struct nxt_flow_mod_table_id *msg)
2475 {
2476     int error;
2477
2478     error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
2479     if (error) {
2480         return error;
2481     }
2482
2483     ofconn->flow_mod_table_id = !!msg->set;
2484     return 0;
2485 }
2486
2487 static int
2488 handle_role_request(struct ofproto *ofproto,
2489                     struct ofconn *ofconn, struct nicira_header *msg)
2490 {
2491     struct nx_role_request *nrr;
2492     struct nx_role_request *reply;
2493     struct ofpbuf *buf;
2494     uint32_t role;
2495
2496     if (ntohs(msg->header.length) != sizeof *nrr) {
2497         VLOG_WARN_RL(&rl, "received role request of length %u (expected %zu)",
2498                      ntohs(msg->header.length), sizeof *nrr);
2499         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2500     }
2501     nrr = (struct nx_role_request *) msg;
2502
2503     if (ofconn->type != OFCONN_PRIMARY) {
2504         VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
2505                      "connection");
2506         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
2507     }
2508
2509     role = ntohl(nrr->role);
2510     if (role != NX_ROLE_OTHER && role != NX_ROLE_MASTER
2511         && role != NX_ROLE_SLAVE) {
2512         VLOG_WARN_RL(&rl, "received request for unknown role %"PRIu32, role);
2513
2514         /* There's no good error code for this. */
2515         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
2516     }
2517
2518     if (role == NX_ROLE_MASTER) {
2519         struct ofconn *other;
2520
2521         HMAP_FOR_EACH (other, hmap_node, &ofproto->controllers) {
2522             if (other->role == NX_ROLE_MASTER) {
2523                 other->role = NX_ROLE_SLAVE;
2524             }
2525         }
2526     }
2527     ofconn->role = role;
2528
2529     reply = make_openflow_xid(sizeof *reply, OFPT_VENDOR, msg->header.xid,
2530                               &buf);
2531     reply->nxh.vendor = htonl(NX_VENDOR_ID);
2532     reply->nxh.subtype = htonl(NXT_ROLE_REPLY);
2533     reply->role = htonl(role);
2534     queue_tx(buf, ofconn, ofconn->reply_counter);
2535
2536     return 0;
2537 }
2538
2539 static int
2540 handle_vendor(struct ofproto *p, struct ofconn *ofconn, void *msg)
2541 {
2542     struct ofp_vendor_header *ovh = msg;
2543     struct nicira_header *nh;
2544
2545     if (ntohs(ovh->header.length) < sizeof(struct ofp_vendor_header)) {
2546         VLOG_WARN_RL(&rl, "received vendor message of length %u "
2547                           "(expected at least %zu)",
2548                    ntohs(ovh->header.length), sizeof(struct ofp_vendor_header));
2549         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2550     }
2551     if (ovh->vendor != htonl(NX_VENDOR_ID)) {
2552         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
2553     }
2554     if (ntohs(ovh->header.length) < sizeof(struct nicira_header)) {
2555         VLOG_WARN_RL(&rl, "received Nicira vendor message of length %u "
2556                           "(expected at least %zu)",
2557                      ntohs(ovh->header.length), sizeof(struct nicira_header));
2558         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2559     }
2560
2561     nh = msg;
2562     switch (ntohl(nh->subtype)) {
2563     case NXT_STATUS_REQUEST:
2564         return switch_status_handle_request(p->switch_status, ofconn->rconn,
2565                                             msg);
2566
2567     case NXT_TUN_ID_FROM_COOKIE:
2568         return handle_tun_id_from_cookie(p, msg);
2569
2570     case NXT_FLOW_MOD_TABLE_ID:
2571         return handle_flow_mod_table_id(ofconn, msg);
2572
2573     case NXT_ROLE_REQUEST:
2574         return handle_role_request(p, ofconn, msg);
2575     }
2576
2577     return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_SUBTYPE);
2578 }
2579
2580 static int
2581 handle_barrier_request(struct ofconn *ofconn, struct ofp_header *oh)
2582 {
2583     struct ofp_header *ob;
2584     struct ofpbuf *buf;
2585
2586     /* Currently, everything executes synchronously, so we can just
2587      * immediately send the barrier reply. */
2588     ob = make_openflow_xid(sizeof *ob, OFPT_BARRIER_REPLY, oh->xid, &buf);
2589     queue_tx(buf, ofconn, ofconn->reply_counter);
2590     return 0;
2591 }
2592
2593 static void
2594 handle_openflow(struct ofconn *ofconn, struct ofproto *p,
2595                 struct ofpbuf *ofp_msg)
2596 {
2597     struct ofp_header *oh = ofp_msg->data;
2598     int error;
2599
2600     COVERAGE_INC(ofproto_recv_openflow);
2601     switch (oh->type) {
2602     case OFPT_ECHO_REQUEST:
2603         error = handle_echo_request(ofconn, oh);
2604         break;
2605
2606     case OFPT_ECHO_REPLY:
2607         error = 0;
2608         break;
2609
2610     case OFPT_FEATURES_REQUEST:
2611         error = handle_features_request(p, ofconn, oh);
2612         break;
2613
2614     case OFPT_GET_CONFIG_REQUEST:
2615         error = handle_get_config_request(p, ofconn, oh);
2616         break;
2617
2618     case OFPT_SET_CONFIG:
2619         error = handle_set_config(p, ofconn, ofp_msg->data);
2620         break;
2621
2622     case OFPT_PACKET_OUT:
2623         error = handle_packet_out(p, ofconn, ofp_msg->data);
2624         break;
2625
2626     case OFPT_PORT_MOD:
2627         error = handle_port_mod(p, ofconn, oh);
2628         break;
2629
2630     case OFPT_FLOW_MOD:
2631         error = handle_flow_mod(p, ofconn, ofp_msg->data);
2632         break;
2633
2634     case OFPT_STATS_REQUEST:
2635         error = handle_stats_request(p, ofconn, oh);
2636         break;
2637
2638     case OFPT_VENDOR:
2639         error = handle_vendor(p, ofconn, ofp_msg->data);
2640         break;
2641
2642     case OFPT_BARRIER_REQUEST:
2643         error = handle_barrier_request(ofconn, oh);
2644         break;
2645
2646     default:
2647         if (VLOG_IS_WARN_ENABLED()) {
2648             char *s = ofp_to_string(oh, ntohs(oh->length), 2);
2649             VLOG_DBG_RL(&rl, "OpenFlow message ignored: %s", s);
2650             free(s);
2651         }
2652         error = ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_TYPE);
2653         break;
2654     }
2655
2656     if (error) {
2657         send_error_oh(ofconn, ofp_msg->data, error);
2658     }
2659 }
2660 \f
2661 static void
2662 handle_flow_miss(struct ofproto *p, struct wdp_packet *packet)
2663 {
2664     struct wdp_rule *rule;
2665     flow_t flow;
2666
2667     flow_extract(packet->payload, packet->tun_id, packet->in_port, &flow);
2668     rule = wdp_flow_match(p->wdp, &flow);
2669     if (!rule) {
2670         /* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
2671         struct wdp_port port;
2672
2673         if (!wdp_port_query_by_number(p->wdp, packet->in_port, &port)) {
2674             bool no_packet_in = (port.opp.config & OFPPC_NO_PACKET_IN) != 0;
2675             wdp_port_free(&port);
2676             if (no_packet_in) {
2677                 COVERAGE_INC(ofproto_no_packet_in);
2678                 wdp_packet_destroy(packet);
2679                 return;
2680             }
2681         } else {
2682             VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
2683                          packet->in_port);
2684         }
2685
2686         COVERAGE_INC(ofproto_packet_in);
2687         send_packet_in(p, packet);
2688         return;
2689     }
2690
2691     wdp_flow_inject(p->wdp, rule, packet->in_port, packet->payload);
2692
2693     if (rule->cr.flow.priority == FAIL_OPEN_PRIORITY) {
2694         /*
2695          * Extra-special case for fail-open mode.
2696          *
2697          * We are in fail-open mode and the packet matched the fail-open rule,
2698          * but we are connected to a controller too.  We should send the packet
2699          * up to the controller in the hope that it will try to set up a flow
2700          * and thereby allow us to exit fail-open.
2701          *
2702          * See the top-level comment in fail-open.c for more information.
2703          */
2704         send_packet_in(p, packet);
2705     } else {
2706         wdp_packet_destroy(packet);
2707     }
2708 }
2709
2710 static void
2711 handle_wdp_packet(struct ofproto *p, struct wdp_packet *packet)
2712 {
2713     switch (packet->channel) {
2714     case WDP_CHAN_ACTION:
2715         COVERAGE_INC(ofproto_ctlr_action);
2716         send_packet_in(p, packet);
2717         break;
2718
2719     case WDP_CHAN_SFLOW:
2720         /* XXX */
2721         wdp_packet_destroy(packet);
2722         break;
2723
2724     case WDP_CHAN_MISS:
2725         handle_flow_miss(p, packet);
2726         break;
2727
2728     case WDP_N_CHANS:
2729     default:
2730         wdp_packet_destroy(packet);
2731         VLOG_WARN_RL(&rl, "received message on unexpected channel %d",
2732                      (int) packet->channel);
2733         break;
2734     }
2735 }
2736 \f
2737 static struct ofpbuf *
2738 compose_flow_removed(struct ofproto *p, const struct wdp_rule *rule,
2739                      uint8_t reason)
2740 {
2741     long long int tdiff = time_msec() - rule->created;
2742     uint32_t sec = tdiff / 1000;
2743     uint32_t msec = tdiff - (sec * 1000);
2744     struct ofp_flow_removed *ofr;
2745     struct ofpbuf *buf;
2746
2747     ofr = make_openflow(sizeof *ofr, OFPT_FLOW_REMOVED, &buf);
2748     flow_to_match(&rule->cr.flow, p->tun_id_from_cookie, &ofr->match);
2749     ofr->cookie = ofproto_rule_cast(rule)->flow_cookie;
2750     ofr->priority = htons(rule->cr.flow.priority);
2751     ofr->reason = reason;
2752     ofr->duration_sec = htonl(sec);
2753     ofr->duration_nsec = htonl(msec * 1000000);
2754     ofr->idle_timeout = htons(rule->idle_timeout);
2755
2756     return buf;
2757 }
2758
2759 static int
2760 delete_flow(struct ofproto *p, struct wdp_rule *rule, uint8_t reason)
2761 {
2762     /* We limit the maximum number of queued flow expirations it by accounting
2763      * them under the counter for replies.  That works because preventing
2764      * OpenFlow requests from being processed also prevents new flows from
2765      * being added (and expiring).  (It also prevents processing OpenFlow
2766      * requests that would not add new flows, so it is imperfect.) */
2767
2768     struct ofproto_rule *ofproto_rule = ofproto_rule_cast(rule);
2769     struct wdp_flow_stats stats;
2770     struct ofpbuf *buf;
2771     int error;
2772
2773     if (ofproto_rule->send_flow_removed) {
2774         /* Compose most of the ofp_flow_removed before 'rule' is destroyed. */
2775         buf = compose_flow_removed(p, rule, reason);
2776     } else {
2777         buf = NULL;
2778     }
2779
2780     error = wdp_flow_delete(p->wdp, rule, &stats);
2781     if (error) {
2782         return error;
2783     }
2784
2785     if (buf) {
2786         struct ofp_flow_removed *ofr;
2787         struct ofconn *prev = NULL;
2788         struct ofconn *ofconn;
2789
2790         /* Compose the parts of the ofp_flow_removed that require stats. */
2791         ofr = buf->data;
2792         ofr->packet_count = htonll(stats.n_packets);
2793         ofr->byte_count = htonll(stats.n_bytes);
2794
2795         LIST_FOR_EACH (ofconn, node, &p->all_conns) {
2796             if (rconn_is_connected(ofconn->rconn)) {
2797                 if (prev) {
2798                     queue_tx(ofpbuf_clone(buf), prev, prev->reply_counter);
2799                 }
2800                 prev = ofconn;
2801             }
2802         }
2803         if (prev) {
2804             queue_tx(buf, prev, prev->reply_counter);
2805         } else {
2806             ofpbuf_delete(buf);
2807         }
2808     }
2809     free(ofproto_rule);
2810
2811     return 0;
2812 }
2813
2814 /* pinsched callback for sending 'packet' on 'ofconn'. */
2815 static void
2816 do_send_packet_in(struct wdp_packet *packet, void *ofconn_)
2817 {
2818     struct ofconn *ofconn = ofconn_;
2819
2820     rconn_send_with_limit(ofconn->rconn, packet->payload,
2821                           ofconn->packet_in_counter, 100);
2822     packet->payload = NULL;
2823     wdp_packet_destroy(packet);
2824 }
2825
2826 /* Takes 'packet', which has been converted with do_convert_to_packet_in(), and
2827  * finalizes its content for sending on 'ofconn', and passes it to 'ofconn''s
2828  * packet scheduler for sending.
2829  *
2830  * 'max_len' specifies the maximum number of bytes of the packet to send on
2831  * 'ofconn' (INT_MAX specifies no limit).
2832  *
2833  * If 'clone' is true, the caller retains ownership of 'packet'.  Otherwise,
2834  * ownership is transferred to this function. */
2835 static void
2836 schedule_packet_in(struct ofconn *ofconn, struct wdp_packet *packet,
2837                    int max_len, bool clone)
2838 {
2839     struct ofproto *ofproto = ofconn->ofproto;
2840     struct ofp_packet_in *opi = packet->payload->data;
2841     int send_len, trim_size;
2842     uint32_t buffer_id;
2843
2844     /* Get buffer. */
2845     if (opi->reason == OFPR_ACTION) {
2846         buffer_id = UINT32_MAX;
2847     } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
2848         buffer_id = pktbuf_get_null();
2849     } else if (!ofconn->pktbuf) {
2850         buffer_id = UINT32_MAX;
2851     } else {
2852         struct ofpbuf payload;
2853         payload.data = opi->data;
2854         payload.size = (packet->payload->size
2855                         - offsetof(struct ofp_packet_in, data));
2856         buffer_id = pktbuf_save(ofconn->pktbuf, &payload, packet->in_port);
2857     }
2858
2859     /* Figure out how much of the packet to send. */
2860     send_len = ntohs(opi->total_len);
2861     if (buffer_id != UINT32_MAX) {
2862         send_len = MIN(send_len, ofconn->miss_send_len);
2863     }
2864     send_len = MIN(send_len, max_len);
2865
2866     /* Adjust packet length and clone if necessary. */
2867     trim_size = offsetof(struct ofp_packet_in, data) + send_len;
2868     if (clone) {
2869         packet = wdp_packet_clone(packet, trim_size);
2870         opi = packet->payload->data;
2871     } else {
2872         packet->payload->size = trim_size;
2873     }
2874
2875     /* Update packet headers. */
2876     opi->buffer_id = htonl(buffer_id);
2877     update_openflow_length(packet->payload);
2878
2879     /* Hand over to packet scheduler.  It might immediately call into
2880      * do_send_packet_in() or it might buffer it for a while (until a later
2881      * call to pinsched_run()). */
2882     pinsched_send(ofconn->schedulers[opi->reason], packet->in_port,
2883                   packet, do_send_packet_in, ofconn);
2884 }
2885
2886 /* Converts 'packet->payload' to a struct ofp_packet_in.  It must have
2887  * sufficient headroom to do so (e.g. as returned by xfif_recv()).
2888  *
2889  * The conversion is not complete: the caller still needs to trim any unneeded
2890  * payload off the end of the buffer, set the length in the OpenFlow header,
2891  * and set buffer_id.  Those require us to know the controller settings and so
2892  * must be done on a per-controller basis.
2893  *
2894  * Returns the maximum number of bytes of the packet that should be sent to
2895  * the controller (INT_MAX if no limit). */
2896 static int
2897 do_convert_to_packet_in(struct wdp_packet *packet)
2898 {
2899     uint16_t total_len = packet->payload->size;
2900     struct ofp_packet_in *opi;
2901
2902     /* Repurpose packet buffer by overwriting header. */
2903     opi = ofpbuf_push_zeros(packet->payload,
2904                             offsetof(struct ofp_packet_in, data));
2905     opi->header.version = OFP_VERSION;
2906     opi->header.type = OFPT_PACKET_IN;
2907     opi->total_len = htons(total_len);
2908     opi->in_port = htons(packet->in_port);
2909     if (packet->channel == WDP_CHAN_MISS) {
2910         opi->reason = OFPR_NO_MATCH;
2911         return INT_MAX;
2912     } else {
2913         opi->reason = OFPR_ACTION;
2914         return packet->send_len;
2915     }
2916 }
2917
2918 /* Given 'packet' with channel WDP_CHAN_ACTION or WDP_CHAN_MISS, sends an
2919  * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
2920  * their individual configurations.
2921  *
2922  * 'packet->payload' must have sufficient headroom to convert it into a struct
2923  * ofp_packet_in (e.g. as returned by dpif_recv()).
2924  *
2925  * Takes ownership of 'packet'. */
2926 static void
2927 send_packet_in(struct ofproto *ofproto, struct wdp_packet *packet)
2928 {
2929     struct ofconn *ofconn, *prev;
2930     int max_len;
2931
2932     max_len = do_convert_to_packet_in(packet);
2933
2934     prev = NULL;
2935     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
2936         if (ofconn_receives_async_msgs(ofconn)) {
2937             if (prev) {
2938                 schedule_packet_in(prev, packet, max_len, true);
2939             }
2940             prev = ofconn;
2941         }
2942     }
2943     if (prev) {
2944         schedule_packet_in(prev, packet, max_len, false);
2945     } else {
2946         wdp_packet_destroy(packet);
2947     }
2948 }
2949
2950 static uint64_t
2951 pick_datapath_id(const struct ofproto *ofproto)
2952 {
2953     struct wdp_port port;
2954
2955     if (!wdp_port_query_by_number(ofproto->wdp, OFPP_LOCAL, &port)) {
2956         uint8_t ea[ETH_ADDR_LEN];
2957         int error;
2958
2959         error = netdev_get_etheraddr(port.netdev, ea);
2960         if (!error) {
2961             wdp_port_free(&port);
2962             return eth_addr_to_uint64(ea);
2963         }
2964         VLOG_WARN("could not get MAC address for %s (%s)",
2965                   netdev_get_name(port.netdev), strerror(error));
2966         wdp_port_free(&port);
2967     }
2968
2969     return ofproto->fallback_dpid;
2970 }
2971
2972 static uint64_t
2973 pick_fallback_dpid(void)
2974 {
2975     uint8_t ea[ETH_ADDR_LEN];
2976     eth_addr_nicira_random(ea);
2977     return eth_addr_to_uint64(ea);
2978 }