Merge "master" into "wdp".
[sliver-openvswitch.git] / ofproto / ofproto.c
1 /*
2  * Copyright (c) 2009, 2010 Nicira Networks.
3  * Copyright (c) 2010 Jean Tourrilhes - HP-Labs.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19 #include "ofproto.h"
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <sys/socket.h>
23 #include <net/if.h>
24 #include <netinet/in.h>
25 #include <stdbool.h>
26 #include <stdlib.h>
27 #include "classifier.h"
28 #include "coverage.h"
29 #include "discovery.h"
30 #include "dynamic-string.h"
31 #include "fail-open.h"
32 #include "hash.h"
33 #include "hmap.h"
34 #include "in-band.h"
35 #include "mac-learning.h"
36 #include "netdev.h"
37 #include "netflow.h"
38 #include "ofp-print.h"
39 #include "ofp-util.h"
40 #include "ofproto-sflow.h"
41 #include "ofpbuf.h"
42 #include "openflow/nicira-ext.h"
43 #include "openflow/openflow.h"
44 #include "openvswitch/xflow.h"
45 #include "packets.h"
46 #include "pinsched.h"
47 #include "pktbuf.h"
48 #include "poll-loop.h"
49 #include "rconn.h"
50 #include "shash.h"
51 #include "status.h"
52 #include "stream-ssl.h"
53 #include "svec.h"
54 #include "tag.h"
55 #include "timeval.h"
56 #include "unixctl.h"
57 #include "vconn.h"
58 #include "vlog.h"
59 #include "wdp.h"
60 #include "xfif.h"
61 #include "xtoxll.h"
62
63 VLOG_DEFINE_THIS_MODULE(ofproto)
64
65 #include "sflow_api.h"
66
67 struct ofproto_rule {
68     uint64_t flow_cookie;       /* Controller-issued identifier.
69                                    (Kept in network-byte order.) */
70     bool send_flow_removed;     /* Send a flow removed message? */
71     tag_type tags;              /* Tags (set only by hooks). */
72 };
73
74 static struct ofproto_rule *
75 ofproto_rule_cast(const struct wdp_rule *wdp_rule)
76 {
77     return wdp_rule->client_data;
78 }
79
80 static void
81 ofproto_rule_init(struct wdp_rule *wdp_rule)
82 {
83     wdp_rule->client_data = xzalloc(sizeof(struct ofproto_rule));
84 }
85
86 static inline bool
87 rule_is_hidden(const struct wdp_rule *rule)
88 {
89     /* Rules with priority higher than UINT16_MAX are set up by ofproto itself
90      * (e.g. by in-band control) and are intentionally hidden from the
91      * controller. */
92     if (rule->cr.flow.priority > UINT16_MAX) {
93         return true;
94     }
95
96     return false;
97 }
98
99 static int delete_flow(struct ofproto *, struct wdp_rule *, uint8_t reason);
100
101 /* ofproto supports two kinds of OpenFlow connections:
102  *
103  *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
104  *     maintains persistent connections to these controllers and by default
105  *     sends them asynchronous messages such as packet-ins.
106  *
107  *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
108  *     drop, it is the other side's responsibility to reconnect them if
109  *     necessary.  ofproto does not send them asynchronous messages by default.
110  *
111  * Currently, active (tcp, ssl, unix) connections are always "primary"
112  * connections and passive (ptcp, pssl, punix) connections are always "service"
113  * connections.  There is no inherent reason for this, but it reflects the
114  * common case.
115  */
116 enum ofconn_type {
117     OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
118     OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
119 };
120
121 /* A listener for incoming OpenFlow "service" connections. */
122 struct ofservice {
123     struct hmap_node node;      /* In struct ofproto's "services" hmap. */
124     struct pvconn *pvconn;      /* OpenFlow connection listener. */
125
126     /* These are not used by ofservice directly.  They are settings for
127      * accepted "struct ofconn"s from the pvconn. */
128     int probe_interval;         /* Max idle time before probing, in seconds. */
129     int rate_limit;             /* Max packet-in rate in packets per second. */
130     int burst_limit;            /* Limit on accumulating packet credits. */
131 };
132
133 static struct ofservice *ofservice_lookup(struct ofproto *,
134                                           const char *target);
135 static int ofservice_create(struct ofproto *,
136                             const struct ofproto_controller *);
137 static void ofservice_reconfigure(struct ofservice *,
138                                   const struct ofproto_controller *);
139 static void ofservice_destroy(struct ofproto *, struct ofservice *);
140
141 /* An OpenFlow connection. */
142 struct ofconn {
143     struct ofproto *ofproto;    /* The ofproto that owns this connection. */
144     struct list node;           /* In struct ofproto's "all_conns" list. */
145     struct rconn *rconn;        /* OpenFlow connection. */
146     enum ofconn_type type;      /* Type. */
147     bool flow_mod_table_id;     /* NXT_FLOW_MOD_TABLE_ID enabled? */
148
149     /* OFPT_PACKET_IN related data. */
150     struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
151     struct pinsched *schedulers[2]; /* Indexed by reason code; see below. */
152     struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
153     int miss_send_len;             /* Bytes to send of buffered packets. */
154
155     /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
156      * requests, and the maximum number before we stop reading OpenFlow
157      * requests.  */
158 #define OFCONN_REPLY_MAX 100
159     struct rconn_packet_counter *reply_counter;
160
161     /* type == OFCONN_PRIMARY only. */
162     enum nx_role role;           /* Role. */
163     struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
164     struct discovery *discovery; /* Controller discovery object, if enabled. */
165     struct status_category *ss;  /* Switch status category. */
166     enum ofproto_band band;      /* In-band or out-of-band? */
167 };
168
169 /* We use OFPR_NO_MATCH and OFPR_ACTION as indexes into struct ofconn's
170  * "schedulers" array.  Their values are 0 and 1, and their meanings and values
171  * coincide with WDP_CHAN_MISS and WDP_CHAN_ACTION, so this is convenient.  In
172  * case anything ever changes, check their values here.  */
173 #define N_SCHEDULERS 2
174 BUILD_ASSERT_DECL(OFPR_NO_MATCH == 0);
175 BUILD_ASSERT_DECL(OFPR_NO_MATCH == WDP_CHAN_MISS);
176 BUILD_ASSERT_DECL(OFPR_ACTION == 1);
177 BUILD_ASSERT_DECL(OFPR_ACTION == WDP_CHAN_ACTION);
178
179 static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
180                                     enum ofconn_type);
181 static void ofconn_destroy(struct ofconn *);
182 static void ofconn_run(struct ofconn *, struct ofproto *);
183 static void ofconn_wait(struct ofconn *);
184 static bool ofconn_receives_async_msgs(const struct ofconn *);
185 static char *ofconn_make_name(const struct ofproto *, const char *target);
186 static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
187
188 static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
189                      struct rconn_packet_counter *counter);
190
191 static void send_packet_in(struct ofproto *, struct wdp_packet *);
192 static void do_send_packet_in(struct wdp_packet *, void *ofconn);
193
194 struct ofproto {
195     /* Settings. */
196     uint64_t datapath_id;       /* Datapath ID. */
197     uint64_t fallback_dpid;     /* Datapath ID if no better choice found. */
198     char *mfr_desc;             /* Manufacturer. */
199     char *hw_desc;              /* Hardware. */
200     char *sw_desc;              /* Software version. */
201     char *serial_desc;          /* Serial number. */
202     char *dp_desc;              /* Datapath description. */
203
204     /* Datapath. */
205     struct wdp *wdp;
206     uint32_t max_ports;
207
208     /* Configuration. */
209     struct switch_status *switch_status;
210     struct fail_open *fail_open;
211     struct netflow *netflow;
212     struct ofproto_sflow *sflow;
213     bool tun_id_from_cookie;    /* NXT_TUN_ID_FROM_COOKIE enabled? */
214
215     /* In-band control. */
216     struct in_band *in_band;
217     long long int next_in_band_update;
218     struct sockaddr_in *extra_in_band_remotes;
219     size_t n_extra_remotes;
220
221     /* OpenFlow connections. */
222     struct hmap controllers;   /* Controller "struct ofconn"s. */
223     struct list all_conns;     /* Contains "struct ofconn"s. */
224     enum ofproto_fail_mode fail_mode;
225
226     /* OpenFlow listeners. */
227     struct hmap services;       /* Contains "struct ofservice"s. */
228     struct pvconn **snoops;
229     size_t n_snoops;
230 };
231
232 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
233
234 static uint64_t pick_datapath_id(const struct ofproto *);
235 static uint64_t pick_fallback_dpid(void);
236
237 static void handle_wdp_packet(struct ofproto *, struct wdp_packet *);
238
239 static void handle_openflow(struct ofconn *, struct ofproto *,
240                             struct ofpbuf *);
241
242 int
243 ofproto_create(const char *datapath, const char *datapath_type,
244                const struct ofhooks *ofhooks, void *aux,
245                struct ofproto **ofprotop)
246 {
247     struct wdp_stats stats;
248     struct ofproto *p;
249     struct wdp *wdp;
250     int error;
251
252     *ofprotop = NULL;
253
254     /* Connect to datapath and start listening for messages. */
255     error = wdp_open(datapath, datapath_type, &wdp);
256     if (error) {
257         VLOG_ERR("failed to open datapath %s: %s", datapath, strerror(error));
258         return error;
259     }
260     error = wdp_get_wdp_stats(wdp, &stats);
261     if (error) {
262         VLOG_ERR("failed to obtain stats for datapath %s: %s",
263                  datapath, strerror(error));
264         wdp_close(wdp);
265         return error;
266     }
267     error = wdp_recv_set_mask(wdp, ((1 << WDP_CHAN_MISS)
268                                     | (1 << WDP_CHAN_ACTION)
269                                     | (1 << WDP_CHAN_SFLOW)));
270     if (error) {
271         VLOG_ERR("failed to listen on datapath %s: %s",
272                  datapath, strerror(error));
273         wdp_close(wdp);
274         return error;
275     }
276     wdp_flow_flush(wdp);
277     wdp_recv_purge(wdp);
278     wdp_set_ofhooks(wdp, ofhooks, aux);
279
280     /* Initialize settings. */
281     p = xzalloc(sizeof *p);
282     p->fallback_dpid = pick_fallback_dpid();
283     p->datapath_id = p->fallback_dpid;
284     p->mfr_desc = xstrdup(DEFAULT_MFR_DESC);
285     p->hw_desc = xstrdup(DEFAULT_HW_DESC);
286     p->sw_desc = xstrdup(DEFAULT_SW_DESC);
287     p->serial_desc = xstrdup(DEFAULT_SERIAL_DESC);
288     p->dp_desc = xstrdup(DEFAULT_DP_DESC);
289
290     /* Initialize datapath. */
291     p->wdp = wdp;
292     p->max_ports = stats.max_ports;
293
294     /* Initialize submodules. */
295     p->switch_status = switch_status_create(p);
296     p->in_band = NULL;
297     p->fail_open = NULL;
298     p->netflow = NULL;
299     p->sflow = NULL;
300
301     /* Initialize OpenFlow connections. */
302     list_init(&p->all_conns);
303     hmap_init(&p->controllers);
304     hmap_init(&p->services);
305     p->snoops = NULL;
306     p->n_snoops = 0;
307
308     /* Pick final datapath ID. */
309     p->datapath_id = pick_datapath_id(p);
310     VLOG_INFO("using datapath ID %016"PRIx64, p->datapath_id);
311
312     *ofprotop = p;
313     return 0;
314 }
315
316 void
317 ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
318 {
319     uint64_t old_dpid = p->datapath_id;
320     p->datapath_id = datapath_id ? datapath_id : pick_datapath_id(p);
321     if (p->datapath_id != old_dpid) {
322         VLOG_INFO("datapath ID changed to %016"PRIx64, p->datapath_id);
323
324         /* Force all active connections to reconnect, since there is no way to
325          * notify a controller that the datapath ID has changed. */
326         ofproto_reconnect_controllers(p);
327     }
328 }
329
330 static bool
331 is_discovery_controller(const struct ofproto_controller *c)
332 {
333     return !strcmp(c->target, "discover");
334 }
335
336 static bool
337 is_in_band_controller(const struct ofproto_controller *c)
338 {
339     return is_discovery_controller(c) || c->band == OFPROTO_IN_BAND;
340 }
341
342 /* Creates a new controller in 'ofproto'.  Some of the settings are initially
343  * drawn from 'c', but update_controller() needs to be called later to finish
344  * the new ofconn's configuration. */
345 static void
346 add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
347 {
348     struct discovery *discovery;
349     struct ofconn *ofconn;
350
351     if (is_discovery_controller(c)) {
352         int error = discovery_create(c->accept_re, c->update_resolv_conf,
353                                      ofproto->wdp, ofproto->switch_status,
354                                      &discovery);
355         if (error) {
356             return;
357         }
358     } else {
359         discovery = NULL;
360     }
361
362     ofconn = ofconn_create(ofproto, rconn_create(5, 8), OFCONN_PRIMARY);
363     ofconn->pktbuf = pktbuf_create();
364     ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
365     if (discovery) {
366         ofconn->discovery = discovery;
367     } else {
368         char *name = ofconn_make_name(ofproto, c->target);
369         rconn_connect(ofconn->rconn, c->target, name);
370         free(name);
371     }
372     hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
373                 hash_string(c->target, 0));
374 }
375
376 /* Reconfigures 'ofconn' to match 'c'.  This function cannot update an ofconn's
377  * target or turn discovery on or off (these are done by creating new ofconns
378  * and deleting old ones), but it can update the rest of an ofconn's
379  * settings. */
380 static void
381 update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
382 {
383     int probe_interval;
384
385     ofconn->band = (is_in_band_controller(c)
386                     ? OFPROTO_IN_BAND : OFPROTO_OUT_OF_BAND);
387
388     rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
389
390     probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
391     rconn_set_probe_interval(ofconn->rconn, probe_interval);
392
393     if (ofconn->discovery) {
394         discovery_set_update_resolv_conf(ofconn->discovery,
395                                          c->update_resolv_conf);
396         discovery_set_accept_controller_re(ofconn->discovery, c->accept_re);
397     }
398
399     ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
400 }
401
402 static const char *
403 ofconn_get_target(const struct ofconn *ofconn)
404 {
405     return ofconn->discovery ? "discover" : rconn_get_target(ofconn->rconn);
406 }
407
408 static struct ofconn *
409 find_controller_by_target(struct ofproto *ofproto, const char *target)
410 {
411     struct ofconn *ofconn;
412
413     HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
414                              hash_string(target, 0), &ofproto->controllers) {
415         if (!strcmp(ofconn_get_target(ofconn), target)) {
416             return ofconn;
417         }
418     }
419     return NULL;
420 }
421
422 static void
423 update_in_band_remotes(struct ofproto *ofproto)
424 {
425     const struct ofconn *ofconn;
426     struct sockaddr_in *addrs;
427     size_t max_addrs, n_addrs;
428     bool discovery;
429     size_t i;
430
431     /* Allocate enough memory for as many remotes as we could possibly have. */
432     max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
433     addrs = xmalloc(max_addrs * sizeof *addrs);
434     n_addrs = 0;
435
436     /* Add all the remotes. */
437     discovery = false;
438     HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
439         struct sockaddr_in *sin = &addrs[n_addrs];
440
441         if (ofconn->band == OFPROTO_OUT_OF_BAND) {
442             continue;
443         }
444
445         sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
446         if (sin->sin_addr.s_addr) {
447             sin->sin_port = rconn_get_remote_port(ofconn->rconn);
448             n_addrs++;
449         }
450         if (ofconn->discovery) {
451             discovery = true;
452         }
453     }
454     for (i = 0; i < ofproto->n_extra_remotes; i++) {
455         addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
456     }
457
458     /* Create or update or destroy in-band.
459      *
460      * Ordinarily we only enable in-band if there's at least one remote
461      * address, but discovery needs the in-band rules for DHCP to be installed
462      * even before we know any remote addresses. */
463     if (n_addrs || discovery) {
464         if (!ofproto->in_band) {
465             in_band_create(ofproto, ofproto->wdp, ofproto->switch_status,
466                            &ofproto->in_band);
467         }
468         if (ofproto->in_band) {
469             in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
470         }
471         ofproto->next_in_band_update = time_msec() + 1000;
472     } else {
473         in_band_destroy(ofproto->in_band);
474         ofproto->in_band = NULL;
475     }
476
477     /* Clean up. */
478     free(addrs);
479 }
480
481 static void
482 update_fail_open(struct ofproto *p)
483 {
484     struct ofconn *ofconn;
485
486     if (!hmap_is_empty(&p->controllers)
487             && p->fail_mode == OFPROTO_FAIL_STANDALONE) {
488         struct rconn **rconns;
489         size_t n;
490
491         if (!p->fail_open) {
492             p->fail_open = fail_open_create(p, p->switch_status);
493         }
494
495         n = 0;
496         rconns = xmalloc(hmap_count(&p->controllers) * sizeof *rconns);
497         HMAP_FOR_EACH (ofconn, hmap_node, &p->controllers) {
498             rconns[n++] = ofconn->rconn;
499         }
500
501         fail_open_set_controllers(p->fail_open, rconns, n);
502         /* p->fail_open takes ownership of 'rconns'. */
503     } else {
504         fail_open_destroy(p->fail_open);
505         p->fail_open = NULL;
506     }
507 }
508
509 void
510 ofproto_set_controllers(struct ofproto *p,
511                         const struct ofproto_controller *controllers,
512                         size_t n_controllers)
513 {
514     struct shash new_controllers;
515     struct ofconn *ofconn, *next_ofconn;
516     struct ofservice *ofservice, *next_ofservice;
517     bool ss_exists;
518     size_t i;
519
520     /* Create newly configured controllers and services.
521      * Create a name to ofproto_controller mapping in 'new_controllers'. */
522     shash_init(&new_controllers);
523     for (i = 0; i < n_controllers; i++) {
524         const struct ofproto_controller *c = &controllers[i];
525
526         if (!vconn_verify_name(c->target) || !strcmp(c->target, "discover")) {
527             if (!find_controller_by_target(p, c->target)) {
528                 add_controller(p, c);
529             }
530         } else if (!pvconn_verify_name(c->target)) {
531             if (!ofservice_lookup(p, c->target) && ofservice_create(p, c)) {
532                 continue;
533             }
534         } else {
535             VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
536                          wdp_name(p->wdp), c->target);
537             continue;
538         }
539
540         shash_add_once(&new_controllers, c->target, &controllers[i]);
541     }
542
543     /* Delete controllers that are no longer configured.
544      * Update configuration of all now-existing controllers. */
545     ss_exists = false;
546     HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &p->controllers) {
547         struct ofproto_controller *c;
548
549         c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
550         if (!c) {
551             ofconn_destroy(ofconn);
552         } else {
553             update_controller(ofconn, c);
554             if (ofconn->ss) {
555                 ss_exists = true;
556             }
557         }
558     }
559
560     /* Delete services that are no longer configured.
561      * Update configuration of all now-existing services. */
562     HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
563         struct ofproto_controller *c;
564
565         c = shash_find_data(&new_controllers,
566                             pvconn_get_name(ofservice->pvconn));
567         if (!c) {
568             ofservice_destroy(p, ofservice);
569         } else {
570             ofservice_reconfigure(ofservice, c);
571         }
572     }
573
574     shash_destroy(&new_controllers);
575
576     update_in_band_remotes(p);
577     update_fail_open(p);
578
579     if (!hmap_is_empty(&p->controllers) && !ss_exists) {
580         ofconn = CONTAINER_OF(hmap_first(&p->controllers),
581                               struct ofconn, hmap_node);
582         ofconn->ss = switch_status_register(p->switch_status, "remote",
583                                             rconn_status_cb, ofconn->rconn);
584     }
585 }
586
587 void
588 ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
589 {
590     p->fail_mode = fail_mode;
591     update_fail_open(p);
592 }
593
594 /* Drops the connections between 'ofproto' and all of its controllers, forcing
595  * them to reconnect. */
596 void
597 ofproto_reconnect_controllers(struct ofproto *ofproto)
598 {
599     struct ofconn *ofconn;
600
601     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
602         rconn_reconnect(ofconn->rconn);
603     }
604 }
605
606 static bool
607 any_extras_changed(const struct ofproto *ofproto,
608                    const struct sockaddr_in *extras, size_t n)
609 {
610     size_t i;
611
612     if (n != ofproto->n_extra_remotes) {
613         return true;
614     }
615
616     for (i = 0; i < n; i++) {
617         const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
618         const struct sockaddr_in *new = &extras[i];
619
620         if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
621             old->sin_port != new->sin_port) {
622             return true;
623         }
624     }
625
626     return false;
627 }
628
629 /* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
630  * in-band control should guarantee access, in the same way that in-band
631  * control guarantees access to OpenFlow controllers. */
632 void
633 ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
634                                   const struct sockaddr_in *extras, size_t n)
635 {
636     if (!any_extras_changed(ofproto, extras, n)) {
637         return;
638     }
639
640     free(ofproto->extra_in_band_remotes);
641     ofproto->n_extra_remotes = n;
642     ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
643
644     update_in_band_remotes(ofproto);
645 }
646
647 void
648 ofproto_set_desc(struct ofproto *p,
649                  const char *mfr_desc, const char *hw_desc,
650                  const char *sw_desc, const char *serial_desc,
651                  const char *dp_desc)
652 {
653     struct ofp_desc_stats *ods;
654
655     if (mfr_desc) {
656         if (strlen(mfr_desc) >= sizeof ods->mfr_desc) {
657             VLOG_WARN("truncating mfr_desc, must be less than %zu characters",
658                     sizeof ods->mfr_desc);
659         }
660         free(p->mfr_desc);
661         p->mfr_desc = xstrdup(mfr_desc);
662     }
663     if (hw_desc) {
664         if (strlen(hw_desc) >= sizeof ods->hw_desc) {
665             VLOG_WARN("truncating hw_desc, must be less than %zu characters",
666                     sizeof ods->hw_desc);
667         }
668         free(p->hw_desc);
669         p->hw_desc = xstrdup(hw_desc);
670     }
671     if (sw_desc) {
672         if (strlen(sw_desc) >= sizeof ods->sw_desc) {
673             VLOG_WARN("truncating sw_desc, must be less than %zu characters",
674                     sizeof ods->sw_desc);
675         }
676         free(p->sw_desc);
677         p->sw_desc = xstrdup(sw_desc);
678     }
679     if (serial_desc) {
680         if (strlen(serial_desc) >= sizeof ods->serial_num) {
681             VLOG_WARN("truncating serial_desc, must be less than %zu "
682                     "characters",
683                     sizeof ods->serial_num);
684         }
685         free(p->serial_desc);
686         p->serial_desc = xstrdup(serial_desc);
687     }
688     if (dp_desc) {
689         if (strlen(dp_desc) >= sizeof ods->dp_desc) {
690             VLOG_WARN("truncating dp_desc, must be less than %zu characters",
691                     sizeof ods->dp_desc);
692         }
693         free(p->dp_desc);
694         p->dp_desc = xstrdup(dp_desc);
695     }
696 }
697
698 static int
699 set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
700             const struct svec *svec)
701 {
702     struct pvconn **pvconns = *pvconnsp;
703     size_t n_pvconns = *n_pvconnsp;
704     int retval = 0;
705     size_t i;
706
707     for (i = 0; i < n_pvconns; i++) {
708         pvconn_close(pvconns[i]);
709     }
710     free(pvconns);
711
712     pvconns = xmalloc(svec->n * sizeof *pvconns);
713     n_pvconns = 0;
714     for (i = 0; i < svec->n; i++) {
715         const char *name = svec->names[i];
716         struct pvconn *pvconn;
717         int error;
718
719         error = pvconn_open(name, &pvconn);
720         if (!error) {
721             pvconns[n_pvconns++] = pvconn;
722         } else {
723             VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
724             if (!retval) {
725                 retval = error;
726             }
727         }
728     }
729
730     *pvconnsp = pvconns;
731     *n_pvconnsp = n_pvconns;
732
733     return retval;
734 }
735
736 int
737 ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
738 {
739     return set_pvconns(&ofproto->snoops, &ofproto->n_snoops, snoops);
740 }
741
742 int
743 ofproto_set_netflow(struct ofproto *ofproto,
744                     const struct netflow_options *nf_options)
745 {
746     if (nf_options && nf_options->collectors.n) {
747         if (!ofproto->netflow) {
748             ofproto->netflow = netflow_create();
749         }
750         return netflow_set_options(ofproto->netflow, nf_options);
751     } else {
752         netflow_destroy(ofproto->netflow);
753         ofproto->netflow = NULL;
754         return 0;
755     }
756 }
757
758 void
759 ofproto_set_sflow(struct ofproto *ofproto,
760                   const struct ofproto_sflow_options *oso)
761 {
762     struct ofproto_sflow *os = ofproto->sflow;
763     if (oso) {
764         if (!os) {
765             os = ofproto->sflow = ofproto_sflow_create(ofproto->wdp);
766             /* XXX ofport */
767         }
768         ofproto_sflow_set_options(os, oso);
769     } else {
770         ofproto_sflow_destroy(os);
771         ofproto->sflow = NULL;
772     }
773 }
774
775 uint64_t
776 ofproto_get_datapath_id(const struct ofproto *ofproto)
777 {
778     return ofproto->datapath_id;
779 }
780
781 bool
782 ofproto_has_primary_controller(const struct ofproto *ofproto)
783 {
784     return !hmap_is_empty(&ofproto->controllers);
785 }
786
787 enum ofproto_fail_mode
788 ofproto_get_fail_mode(const struct ofproto *p)
789 {
790     return p->fail_mode;
791 }
792
793 void
794 ofproto_get_snoops(const struct ofproto *ofproto, struct svec *snoops)
795 {
796     size_t i;
797
798     for (i = 0; i < ofproto->n_snoops; i++) {
799         svec_add(snoops, pvconn_get_name(ofproto->snoops[i]));
800     }
801 }
802
803 void
804 ofproto_destroy(struct ofproto *p)
805 {
806     struct ofservice *ofservice, *next_ofservice;
807     struct ofconn *ofconn, *next_ofconn;
808     size_t i;
809
810     if (!p) {
811         return;
812     }
813
814     /* Destroy fail-open and in-band early, since they touch the classifier. */
815     fail_open_destroy(p->fail_open);
816     p->fail_open = NULL;
817
818     in_band_destroy(p->in_band);
819     p->in_band = NULL;
820     free(p->extra_in_band_remotes);
821
822     ofproto_flush_flows(p);
823
824     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
825         ofconn_destroy(ofconn);
826     }
827     hmap_destroy(&p->controllers);
828
829     wdp_close(p->wdp);
830
831     switch_status_destroy(p->switch_status);
832     netflow_destroy(p->netflow);
833     ofproto_sflow_destroy(p->sflow);
834
835     HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
836         ofservice_destroy(p, ofservice);
837     }
838     hmap_destroy(&p->services);
839
840     for (i = 0; i < p->n_snoops; i++) {
841         pvconn_close(p->snoops[i]);
842     }
843     free(p->snoops);
844
845     free(p->mfr_desc);
846     free(p->hw_desc);
847     free(p->sw_desc);
848     free(p->serial_desc);
849     free(p->dp_desc);
850
851     free(p);
852 }
853
854 int
855 ofproto_run(struct ofproto *p)
856 {
857     int error = ofproto_run1(p);
858     if (!error) {
859         error = ofproto_run2(p, false);
860     }
861     return error;
862 }
863
864 /* Returns a "preference level" for snooping 'ofconn'.  A higher return value
865  * means that 'ofconn' is more interesting for monitoring than a lower return
866  * value. */
867 static int
868 snoop_preference(const struct ofconn *ofconn)
869 {
870     switch (ofconn->role) {
871     case NX_ROLE_MASTER:
872         return 3;
873     case NX_ROLE_OTHER:
874         return 2;
875     case NX_ROLE_SLAVE:
876         return 1;
877     default:
878         /* Shouldn't happen. */
879         return 0;
880     }
881 }
882
883 /* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
884  * Connects this vconn to a controller. */
885 static void
886 add_snooper(struct ofproto *ofproto, struct vconn *vconn)
887 {
888     struct ofconn *ofconn, *best;
889
890     /* Pick a controller for monitoring. */
891     best = NULL;
892     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
893         if (ofconn->type == OFCONN_PRIMARY
894             && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
895             best = ofconn;
896         }
897     }
898
899     if (best) {
900         rconn_add_monitor(best->rconn, vconn);
901     } else {
902         VLOG_INFO_RL(&rl, "no controller connection to snoop");
903         vconn_close(vconn);
904     }
905 }
906
907 static void
908 ofproto_port_poll_cb(const struct ofp_phy_port *opp, uint8_t reason,
909                      void *ofproto_)
910 {
911     /* XXX Should limit the number of queued port status change messages. */
912     struct ofproto *ofproto = ofproto_;
913     struct ofconn *ofconn;
914
915     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
916         struct ofp_port_status *ops;
917         struct ofpbuf *b;
918
919         if (!ofconn_receives_async_msgs(ofconn)) {
920             continue;
921         }
922
923         ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
924         ops->reason = reason;
925         ops->desc = *opp;
926         hton_ofp_phy_port(&ops->desc);
927         queue_tx(b, ofconn, NULL);
928     }
929 }
930
931 int
932 ofproto_run1(struct ofproto *p)
933 {
934     struct ofconn *ofconn, *next_ofconn;
935     struct ofservice *ofservice;
936     int i;
937
938     for (i = 0; i < 50; i++) {
939         struct wdp_packet packet;
940         int error;
941
942         error = wdp_recv(p->wdp, &packet);
943         if (error) {
944             if (error == ENODEV) {
945                 /* Someone destroyed the datapath behind our back.  The caller
946                  * better destroy us and give up, because we're just going to
947                  * spin from here on out. */
948                 static struct vlog_rate_limit rl2 = VLOG_RATE_LIMIT_INIT(1, 5);
949                 VLOG_ERR_RL(&rl2, "%s: datapath was destroyed externally",
950                             wdp_name(p->wdp));
951                 return ENODEV;
952             }
953             break;
954         }
955
956         handle_wdp_packet(p, xmemdup(&packet, sizeof packet));
957     }
958
959     wdp_port_poll(p->wdp, ofproto_port_poll_cb, p);
960
961     if (p->in_band) {
962         if (time_msec() >= p->next_in_band_update) {
963             update_in_band_remotes(p);
964         }
965         in_band_run(p->in_band);
966     }
967
968     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
969         ofconn_run(ofconn, p);
970     }
971
972     /* Fail-open maintenance.  Do this after processing the ofconns since
973      * fail-open checks the status of the controller rconn. */
974     if (p->fail_open) {
975         fail_open_run(p->fail_open);
976     }
977
978     HMAP_FOR_EACH (ofservice, node, &p->services) {
979         struct vconn *vconn;
980         int retval;
981
982         retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
983         if (!retval) {
984             struct rconn *rconn;
985             char *name;
986
987             rconn = rconn_create(ofservice->probe_interval, 0);
988             name = ofconn_make_name(p, vconn_get_name(vconn));
989             rconn_connect_unreliably(rconn, vconn, name);
990             free(name);
991
992             ofconn = ofconn_create(p, rconn, OFCONN_SERVICE);
993             ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
994                                   ofservice->burst_limit);
995         } else if (retval != EAGAIN) {
996             VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
997         }
998     }
999
1000     for (i = 0; i < p->n_snoops; i++) {
1001         struct vconn *vconn;
1002         int retval;
1003
1004         retval = pvconn_accept(p->snoops[i], OFP_VERSION, &vconn);
1005         if (!retval) {
1006             add_snooper(p, vconn);
1007         } else if (retval != EAGAIN) {
1008             VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
1009         }
1010     }
1011
1012     if (p->netflow) {
1013         netflow_run(p->netflow);
1014     }
1015     if (p->sflow) {
1016         ofproto_sflow_run(p->sflow);
1017     }
1018
1019     return 0;
1020 }
1021
1022 int
1023 ofproto_run2(struct ofproto *p OVS_UNUSED, bool revalidate_all OVS_UNUSED)
1024 {
1025     return 0;
1026 }
1027
1028 void
1029 ofproto_wait(struct ofproto *p)
1030 {
1031     struct ofservice *ofservice;
1032     struct ofconn *ofconn;
1033     size_t i;
1034
1035     wdp_recv_wait(p->wdp);
1036     wdp_port_poll_wait(p->wdp);
1037     LIST_FOR_EACH (ofconn, node, &p->all_conns) {
1038         ofconn_wait(ofconn);
1039     }
1040     if (p->in_band) {
1041         poll_timer_wait_until(p->next_in_band_update);
1042         in_band_wait(p->in_band);
1043     }
1044     if (p->fail_open) {
1045         fail_open_wait(p->fail_open);
1046     }
1047     if (p->sflow) {
1048         ofproto_sflow_wait(p->sflow);
1049     }
1050     HMAP_FOR_EACH (ofservice, node, &p->services) {
1051         pvconn_wait(ofservice->pvconn);
1052     }
1053     for (i = 0; i < p->n_snoops; i++) {
1054         pvconn_wait(p->snoops[i]);
1055     }
1056 }
1057
1058 void
1059 ofproto_revalidate(struct ofproto *ofproto, tag_type tag)
1060 {
1061     wdp_revalidate(ofproto->wdp, tag);
1062 }
1063
1064 void
1065 ofproto_revalidate_all(struct ofproto *ofproto)
1066 {
1067     wdp_revalidate_all(ofproto->wdp);
1068 }
1069
1070 bool
1071 ofproto_is_alive(const struct ofproto *p)
1072 {
1073     return !hmap_is_empty(&p->controllers);
1074 }
1075
1076 int
1077 ofproto_send_packet(struct ofproto *p, const flow_t *flow,
1078                     const union ofp_action *actions, size_t n_actions,
1079                     const struct ofpbuf *packet)
1080 {
1081     /* XXX Should we translate the wdp_execute() errno value into an OpenFlow
1082      * error code? */
1083     wdp_execute(p->wdp, flow->in_port, actions, n_actions, packet);
1084     return 0;
1085 }
1086
1087 /* Intended for used by ofproto clients and ofproto submodules to add flows to
1088  * the flow table. */
1089 void
1090 ofproto_add_flow(struct ofproto *p, const flow_t *flow,
1091                  const union ofp_action *actions, size_t n_actions,
1092                  int idle_timeout)
1093 {
1094     struct wdp_flow_put put;
1095     struct wdp_rule *rule;
1096
1097     put.flags = WDP_PUT_CREATE | WDP_PUT_MODIFY | WDP_PUT_ALL;
1098     put.flow = flow;
1099     put.actions = actions;
1100     put.n_actions = n_actions;
1101     put.idle_timeout = idle_timeout;
1102     put.hard_timeout = 0;
1103     put.ofp_table_id = 0xff;
1104     put.cookie = htonll(0);
1105     put.xid = htonl(0);
1106
1107     if (!wdp_flow_put(p->wdp, &put, NULL, &rule)) {
1108         ofproto_rule_init(rule);
1109     }
1110 }
1111
1112 /* Intended for used by ofproto clients and ofproto submodules to delete flows
1113  * that they earlier added to the flow table. */
1114 void
1115 ofproto_delete_flow(struct ofproto *ofproto, const flow_t *flow)
1116 {
1117     struct wdp_rule *rule = wdp_flow_get(ofproto->wdp, flow, UINT_MAX);
1118     if (rule) {
1119         delete_flow(ofproto, rule, OFPRR_DELETE);
1120     }
1121 }
1122
1123 void
1124 ofproto_flush_flows(struct ofproto *ofproto)
1125 {
1126     COVERAGE_INC(ofproto_flush);
1127     wdp_flow_flush(ofproto->wdp);
1128     if (ofproto->in_band) {
1129         in_band_flushed(ofproto->in_band);
1130     }
1131     if (ofproto->fail_open) {
1132         fail_open_flushed(ofproto->fail_open);
1133     }
1134 }
1135 \f
1136 static struct ofconn *
1137 ofconn_create(struct ofproto *p, struct rconn *rconn, enum ofconn_type type)
1138 {
1139     struct ofconn *ofconn = xzalloc(sizeof *ofconn);
1140     ofconn->ofproto = p;
1141     list_push_back(&p->all_conns, &ofconn->node);
1142     ofconn->rconn = rconn;
1143     ofconn->type = type;
1144     ofconn->role = NX_ROLE_OTHER;
1145     ofconn->packet_in_counter = rconn_packet_counter_create ();
1146     ofconn->pktbuf = NULL;
1147     ofconn->miss_send_len = 0;
1148     ofconn->reply_counter = rconn_packet_counter_create ();
1149     return ofconn;
1150 }
1151
1152 static void
1153 ofconn_destroy(struct ofconn *ofconn)
1154 {
1155     if (ofconn->type == OFCONN_PRIMARY) {
1156         hmap_remove(&ofconn->ofproto->controllers, &ofconn->hmap_node);
1157     }
1158     discovery_destroy(ofconn->discovery);
1159
1160     list_remove(&ofconn->node);
1161     switch_status_unregister(ofconn->ss);
1162     rconn_destroy(ofconn->rconn);
1163     rconn_packet_counter_destroy(ofconn->packet_in_counter);
1164     rconn_packet_counter_destroy(ofconn->reply_counter);
1165     pktbuf_destroy(ofconn->pktbuf);
1166     free(ofconn);
1167 }
1168
1169 static void
1170 ofconn_run(struct ofconn *ofconn, struct ofproto *p)
1171 {
1172     int iteration;
1173     size_t i;
1174
1175     if (ofconn->discovery) {
1176         char *controller_name;
1177         if (rconn_is_connectivity_questionable(ofconn->rconn)) {
1178             discovery_question_connectivity(ofconn->discovery);
1179         }
1180         if (discovery_run(ofconn->discovery, &controller_name)) {
1181             if (controller_name) {
1182                 char *ofconn_name = ofconn_make_name(p, controller_name);
1183                 rconn_connect(ofconn->rconn, controller_name, ofconn_name);
1184                 free(ofconn_name);
1185             } else {
1186                 rconn_disconnect(ofconn->rconn);
1187             }
1188         }
1189     }
1190
1191     for (i = 0; i < N_SCHEDULERS; i++) {
1192         pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
1193     }
1194
1195     rconn_run(ofconn->rconn);
1196
1197     if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
1198         /* Limit the number of iterations to prevent other tasks from
1199          * starving. */
1200         for (iteration = 0; iteration < 50; iteration++) {
1201             struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
1202             if (!of_msg) {
1203                 break;
1204             }
1205             if (p->fail_open) {
1206                 fail_open_maybe_recover(p->fail_open);
1207             }
1208             handle_openflow(ofconn, p, of_msg);
1209             ofpbuf_delete(of_msg);
1210         }
1211     }
1212
1213     if (!ofconn->discovery && !rconn_is_alive(ofconn->rconn)) {
1214         ofconn_destroy(ofconn);
1215     }
1216 }
1217
1218 static void
1219 ofconn_wait(struct ofconn *ofconn)
1220 {
1221     int i;
1222
1223     if (ofconn->discovery) {
1224         discovery_wait(ofconn->discovery);
1225     }
1226     for (i = 0; i < N_SCHEDULERS; i++) {
1227         pinsched_wait(ofconn->schedulers[i]);
1228     }
1229     rconn_run_wait(ofconn->rconn);
1230     if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
1231         rconn_recv_wait(ofconn->rconn);
1232     } else {
1233         COVERAGE_INC(ofproto_ofconn_stuck);
1234     }
1235 }
1236
1237 /* Returns true if 'ofconn' should receive asynchronous messages. */
1238 static bool
1239 ofconn_receives_async_msgs(const struct ofconn *ofconn)
1240 {
1241     if (ofconn->type == OFCONN_PRIMARY) {
1242         /* Primary controllers always get asynchronous messages unless they
1243          * have configured themselves as "slaves".  */
1244         return ofconn->role != NX_ROLE_SLAVE;
1245     } else {
1246         /* Service connections don't get asynchronous messages unless they have
1247          * explicitly asked for them by setting a nonzero miss send length. */
1248         return ofconn->miss_send_len > 0;
1249     }
1250 }
1251
1252 /* Returns a human-readable name for an OpenFlow connection between 'ofproto'
1253  * and 'target', suitable for use in log messages for identifying the
1254  * connection.
1255  *
1256  * The name is dynamically allocated.  The caller should free it (with free())
1257  * when it is no longer needed. */
1258 static char *
1259 ofconn_make_name(const struct ofproto *ofproto, const char *target)
1260 {
1261     return xasprintf("%s<->%s", wdp_base_name(ofproto->wdp), target);
1262 }
1263
1264 static void
1265 ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
1266 {
1267     int i;
1268
1269     for (i = 0; i < N_SCHEDULERS; i++) {
1270         struct pinsched **s = &ofconn->schedulers[i];
1271
1272         if (rate > 0) {
1273             if (!*s) {
1274                 *s = pinsched_create(rate, burst,
1275                                      ofconn->ofproto->switch_status);
1276             } else {
1277                 pinsched_set_limits(*s, rate, burst);
1278             }
1279         } else {
1280             pinsched_destroy(*s);
1281             *s = NULL;
1282         }
1283     }
1284 }
1285 \f
1286 static void
1287 ofservice_reconfigure(struct ofservice *ofservice,
1288                       const struct ofproto_controller *c)
1289 {
1290     ofservice->probe_interval = c->probe_interval;
1291     ofservice->rate_limit = c->rate_limit;
1292     ofservice->burst_limit = c->burst_limit;
1293 }
1294
1295 /* Creates a new ofservice in 'ofproto'.  Returns 0 if successful, otherwise a
1296  * positive errno value. */
1297 static int
1298 ofservice_create(struct ofproto *ofproto, const struct ofproto_controller *c)
1299 {
1300     struct ofservice *ofservice;
1301     struct pvconn *pvconn;
1302     int error;
1303
1304     error = pvconn_open(c->target, &pvconn);
1305     if (error) {
1306         return error;
1307     }
1308
1309     ofservice = xzalloc(sizeof *ofservice);
1310     hmap_insert(&ofproto->services, &ofservice->node,
1311                 hash_string(c->target, 0));
1312     ofservice->pvconn = pvconn;
1313
1314     ofservice_reconfigure(ofservice, c);
1315
1316     return 0;
1317 }
1318
1319 static void
1320 ofservice_destroy(struct ofproto *ofproto, struct ofservice *ofservice)
1321 {
1322     hmap_remove(&ofproto->services, &ofservice->node);
1323     pvconn_close(ofservice->pvconn);
1324     free(ofservice);
1325 }
1326
1327 /* Finds and returns the ofservice within 'ofproto' that has the given
1328  * 'target', or a null pointer if none exists. */
1329 static struct ofservice *
1330 ofservice_lookup(struct ofproto *ofproto, const char *target)
1331 {
1332     struct ofservice *ofservice;
1333
1334     HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
1335                              &ofproto->services) {
1336         if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
1337             return ofservice;
1338         }
1339     }
1340     return NULL;
1341 }
1342 \f
1343 static bool
1344 rule_has_out_port(const struct wdp_rule *rule, uint16_t out_port)
1345 {
1346     const union ofp_action *oa;
1347     struct actions_iterator i;
1348
1349     if (out_port == htons(OFPP_NONE)) {
1350         return true;
1351     }
1352     for (oa = actions_first(&i, rule->actions, rule->n_actions); oa;
1353          oa = actions_next(&i)) {
1354         if (action_outputs_to_port(oa, out_port)) {
1355             return true;
1356         }
1357     }
1358     return false;
1359 }
1360 \f
1361 static void
1362 queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
1363          struct rconn_packet_counter *counter)
1364 {
1365     update_openflow_length(msg);
1366     if (rconn_send(ofconn->rconn, msg, counter)) {
1367         ofpbuf_delete(msg);
1368     }
1369 }
1370
1371 static void
1372 send_error_oh(const struct ofconn *ofconn, const struct ofp_header *oh,
1373               int error)
1374 {
1375     struct ofpbuf *buf = make_ofp_error_msg(error, oh);
1376     if (buf) {
1377         COVERAGE_INC(ofproto_error);
1378         queue_tx(buf, ofconn, ofconn->reply_counter);
1379     }
1380 }
1381
1382 static int
1383 handle_echo_request(struct ofconn *ofconn, struct ofp_header *oh)
1384 {
1385     struct ofp_header *rq = oh;
1386     queue_tx(make_echo_reply(rq), ofconn, ofconn->reply_counter);
1387     return 0;
1388 }
1389
1390 static int
1391 handle_features_request(struct ofproto *p, struct ofconn *ofconn,
1392                         struct ofp_header *oh)
1393 {
1394     struct ofpbuf *features;
1395     int error;
1396
1397     error = wdp_get_features(p->wdp, &features);
1398     if (!error) {
1399         struct ofp_switch_features *osf = features->data;
1400
1401         update_openflow_length(features);
1402         osf->header.version = OFP_VERSION;
1403         osf->header.type = OFPT_FEATURES_REPLY;
1404         osf->header.xid = oh->xid;
1405
1406         osf->datapath_id = htonll(p->datapath_id);
1407         osf->n_buffers = htonl(pktbuf_capacity());
1408         memset(osf->pad, 0, sizeof osf->pad);
1409
1410         /* Turn on capabilities implemented by ofproto. */
1411         osf->capabilities |= htonl(OFPC_FLOW_STATS | OFPC_TABLE_STATS |
1412                                    OFPC_PORT_STATS);
1413
1414         queue_tx(features, ofconn, ofconn->reply_counter);
1415     }
1416     return error;
1417 }
1418
1419 static int
1420 handle_get_config_request(struct ofproto *p, struct ofconn *ofconn,
1421                           struct ofp_header *oh)
1422 {
1423     struct ofpbuf *buf;
1424     struct ofp_switch_config *osc;
1425     uint16_t flags;
1426     bool drop_frags;
1427
1428     /* Figure out flags. */
1429     wdp_get_drop_frags(p->wdp, &drop_frags);
1430     flags = drop_frags ? OFPC_FRAG_DROP : OFPC_FRAG_NORMAL;
1431
1432     /* Send reply. */
1433     osc = make_openflow_xid(sizeof *osc, OFPT_GET_CONFIG_REPLY, oh->xid, &buf);
1434     osc->flags = htons(flags);
1435     osc->miss_send_len = htons(ofconn->miss_send_len);
1436     queue_tx(buf, ofconn, ofconn->reply_counter);
1437
1438     return 0;
1439 }
1440
1441 static int
1442 handle_set_config(struct ofproto *p, struct ofconn *ofconn,
1443                   struct ofp_switch_config *osc)
1444 {
1445     uint16_t flags;
1446     int error;
1447
1448     error = check_ofp_message(&osc->header, OFPT_SET_CONFIG, sizeof *osc);
1449     if (error) {
1450         return error;
1451     }
1452     flags = ntohs(osc->flags);
1453
1454     if (ofconn->type == OFCONN_PRIMARY && ofconn->role != NX_ROLE_SLAVE) {
1455         switch (flags & OFPC_FRAG_MASK) {
1456         case OFPC_FRAG_NORMAL:
1457             wdp_set_drop_frags(p->wdp, false);
1458             break;
1459         case OFPC_FRAG_DROP:
1460             wdp_set_drop_frags(p->wdp, true);
1461             break;
1462         default:
1463             VLOG_WARN_RL(&rl, "requested bad fragment mode (flags=%"PRIx16")",
1464                          osc->flags);
1465             break;
1466         }
1467     }
1468
1469     ofconn->miss_send_len = ntohs(osc->miss_send_len);
1470
1471     return 0;
1472 }
1473
1474 /* Checks whether 'ofconn' is a slave controller.  If so, returns an OpenFlow
1475  * error message code (composed with ofp_mkerr()) for the caller to propagate
1476  * upward.  Otherwise, returns 0.
1477  *
1478  * 'oh' is used to make log messages more informative. */
1479 static int
1480 reject_slave_controller(struct ofconn *ofconn, const struct ofp_header *oh)
1481 {
1482     if (ofconn->type == OFCONN_PRIMARY && ofconn->role == NX_ROLE_SLAVE) {
1483         static struct vlog_rate_limit perm_rl = VLOG_RATE_LIMIT_INIT(1, 5);
1484         char *type_name;
1485
1486         type_name = ofp_message_type_to_string(oh->type);
1487         VLOG_WARN_RL(&perm_rl, "rejecting %s message from slave controller",
1488                      type_name);
1489         free(type_name);
1490
1491         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
1492     } else {
1493         return 0;
1494     }
1495 }
1496
1497 static int
1498 handle_packet_out(struct ofproto *p, struct ofconn *ofconn,
1499                   struct ofp_header *oh)
1500 {
1501     struct ofp_packet_out *opo;
1502     struct ofpbuf payload, *buffer;
1503     struct ofp_action_header *actions;
1504     int n_actions;
1505     uint16_t in_port;
1506     flow_t flow;
1507     int error;
1508
1509     error = reject_slave_controller(ofconn, oh);
1510     if (error) {
1511         return error;
1512     }
1513
1514     error = check_ofp_packet_out(oh, &payload, &n_actions, p->max_ports);
1515     if (error) {
1516         return error;
1517     }
1518     opo = (struct ofp_packet_out *) oh;
1519     actions = opo->actions;
1520
1521     COVERAGE_INC(ofproto_packet_out);
1522     if (opo->buffer_id != htonl(UINT32_MAX)) {
1523         error = pktbuf_retrieve(ofconn->pktbuf, ntohl(opo->buffer_id),
1524                                 &buffer, &in_port);
1525         if (error || !buffer) {
1526             return error;
1527         }
1528         payload = *buffer;
1529     } else {
1530         buffer = NULL;
1531     }
1532
1533     flow_extract(&payload, 0, ntohs(opo->in_port), &flow);
1534     wdp_execute(p->wdp, flow.in_port, (const union ofp_action *) actions,
1535                 n_actions, &payload);
1536     ofpbuf_delete(buffer);
1537
1538     return 0;
1539 }
1540
1541 static int
1542 handle_port_mod(struct ofproto *p, struct ofconn *ofconn,
1543                 struct ofp_header *oh)
1544 {
1545     const struct ofp_port_mod *opm;
1546     struct wdp_port port;
1547     int error;
1548
1549     error = reject_slave_controller(ofconn, oh);
1550     if (error) {
1551         return error;
1552     }
1553     error = check_ofp_message(oh, OFPT_PORT_MOD, sizeof *opm);
1554     if (error) {
1555         return error;
1556     }
1557     opm = (struct ofp_port_mod *) oh;
1558
1559     if (wdp_port_query_by_number(p->wdp, ntohs(opm->port_no), &port)) {
1560         error = ofp_mkerr(OFPET_PORT_MOD_FAILED, OFPPMFC_BAD_PORT);
1561     } else if (memcmp(port.opp.hw_addr, opm->hw_addr, OFP_ETH_ALEN)) {
1562         error = ofp_mkerr(OFPET_PORT_MOD_FAILED, OFPPMFC_BAD_HW_ADDR);
1563     } else {
1564         uint32_t mask, new_config;
1565
1566         mask = ntohl(opm->mask) & (OFPPC_PORT_DOWN | OFPPC_NO_STP
1567                                    | OFPPC_NO_RECV | OFPPC_NO_RECV_STP
1568                                    | OFPPC_NO_FLOOD | OFPPC_NO_FWD
1569                                    | OFPPC_NO_PACKET_IN);
1570         new_config = (port.opp.config & ~mask) | (ntohl(opm->config) & mask);
1571         if (new_config != port.opp.config) {
1572             error = wdp_port_set_config(p->wdp, ntohs(opm->port_no),
1573                                         new_config);
1574         }
1575         if (opm->advertise) {
1576             netdev_set_advertisements(port.netdev, ntohl(opm->advertise));
1577         }
1578     }
1579     wdp_port_free(&port);
1580
1581     return error;
1582 }
1583
1584 static struct ofpbuf *
1585 make_stats_reply(uint32_t xid, uint16_t type, size_t body_len)
1586 {
1587     struct ofp_stats_reply *osr;
1588     struct ofpbuf *msg;
1589
1590     msg = ofpbuf_new(MIN(sizeof *osr + body_len, UINT16_MAX));
1591     osr = put_openflow_xid(sizeof *osr, OFPT_STATS_REPLY, xid, msg);
1592     osr->type = type;
1593     osr->flags = htons(0);
1594     return msg;
1595 }
1596
1597 static struct ofpbuf *
1598 start_stats_reply(const struct ofp_stats_request *request, size_t body_len)
1599 {
1600     return make_stats_reply(request->header.xid, request->type, body_len);
1601 }
1602
1603 static void *
1604 append_stats_reply(size_t nbytes, struct ofconn *ofconn, struct ofpbuf **msgp)
1605 {
1606     struct ofpbuf *msg = *msgp;
1607     assert(nbytes <= UINT16_MAX - sizeof(struct ofp_stats_reply));
1608     if (nbytes + msg->size > UINT16_MAX) {
1609         struct ofp_stats_reply *reply = msg->data;
1610         reply->flags = htons(OFPSF_REPLY_MORE);
1611         *msgp = make_stats_reply(reply->header.xid, reply->type, nbytes);
1612         queue_tx(msg, ofconn, ofconn->reply_counter);
1613     }
1614     return ofpbuf_put_uninit(*msgp, nbytes);
1615 }
1616
1617 static int
1618 handle_desc_stats_request(struct ofproto *p, struct ofconn *ofconn,
1619                            struct ofp_stats_request *request)
1620 {
1621     struct ofp_desc_stats *ods;
1622     struct ofpbuf *msg;
1623
1624     msg = start_stats_reply(request, sizeof *ods);
1625     ods = append_stats_reply(sizeof *ods, ofconn, &msg);
1626     memset(ods, 0, sizeof *ods);
1627     ovs_strlcpy(ods->mfr_desc, p->mfr_desc, sizeof ods->mfr_desc);
1628     ovs_strlcpy(ods->hw_desc, p->hw_desc, sizeof ods->hw_desc);
1629     ovs_strlcpy(ods->sw_desc, p->sw_desc, sizeof ods->sw_desc);
1630     ovs_strlcpy(ods->serial_num, p->serial_desc, sizeof ods->serial_num);
1631     ovs_strlcpy(ods->dp_desc, p->dp_desc, sizeof ods->dp_desc);
1632     queue_tx(msg, ofconn, ofconn->reply_counter);
1633
1634     return 0;
1635 }
1636
1637 static int
1638 handle_table_stats_request(struct ofproto *p, struct ofconn *ofconn,
1639                            struct ofp_stats_request *request)
1640 {
1641     struct ofpbuf *msg;
1642     int error;
1643
1644     msg = start_stats_reply(request, sizeof(struct ofp_table_stats) * 3);
1645     error = wdp_get_table_stats(p->wdp, msg);
1646     if (!error) {
1647         queue_tx(msg, ofconn, ofconn->reply_counter);
1648     } else {
1649         ofpbuf_delete(msg);
1650     }
1651     return error;
1652 }
1653
1654 static void
1655 append_port_stat(struct wdp_port *port, struct ofconn *ofconn,
1656                  struct ofpbuf **msgp)
1657 {
1658     struct netdev_stats stats;
1659     struct ofp_port_stats *ops;
1660
1661     /* Intentionally ignore return value, since errors will set
1662      * 'stats' to all-1s, which is correct for OpenFlow, and
1663      * netdev_get_stats() will log errors. */
1664     netdev_get_stats(port->netdev, &stats);
1665
1666     ops = append_stats_reply(sizeof *ops, ofconn, msgp);
1667     ops->port_no = htons(port->opp.port_no);
1668     memset(ops->pad, 0, sizeof ops->pad);
1669     ops->rx_packets = htonll(stats.rx_packets);
1670     ops->tx_packets = htonll(stats.tx_packets);
1671     ops->rx_bytes = htonll(stats.rx_bytes);
1672     ops->tx_bytes = htonll(stats.tx_bytes);
1673     ops->rx_dropped = htonll(stats.rx_dropped);
1674     ops->tx_dropped = htonll(stats.tx_dropped);
1675     ops->rx_errors = htonll(stats.rx_errors);
1676     ops->tx_errors = htonll(stats.tx_errors);
1677     ops->rx_frame_err = htonll(stats.rx_frame_errors);
1678     ops->rx_over_err = htonll(stats.rx_over_errors);
1679     ops->rx_crc_err = htonll(stats.rx_crc_errors);
1680     ops->collisions = htonll(stats.collisions);
1681 }
1682
1683 static int
1684 handle_port_stats_request(struct ofproto *p, struct ofconn *ofconn,
1685                           struct ofp_stats_request *osr,
1686                           size_t arg_size)
1687 {
1688     struct ofp_port_stats_request *psr;
1689     struct ofp_port_stats *ops;
1690     struct ofpbuf *msg;
1691
1692     if (arg_size != sizeof *psr) {
1693         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1694     }
1695     psr = (struct ofp_port_stats_request *) osr->body;
1696
1697     msg = start_stats_reply(osr, sizeof *ops * 16);
1698     if (psr->port_no != htons(OFPP_NONE)) {
1699         struct wdp_port port;
1700
1701         if (!wdp_port_query_by_number(p->wdp, ntohs(psr->port_no), &port)) {
1702             append_port_stat(&port, ofconn, &msg);
1703             wdp_port_free(&port);
1704         }
1705     } else {
1706         struct wdp_port *ports;
1707         size_t n_ports;
1708         size_t i;
1709
1710         wdp_port_list(p->wdp, &ports, &n_ports);
1711         for (i = 0; i < n_ports; i++) {
1712             append_port_stat(&ports[i], ofconn, &msg);
1713         }
1714         wdp_port_array_free(ports, n_ports);
1715     }
1716
1717     queue_tx(msg, ofconn, ofconn->reply_counter);
1718     return 0;
1719 }
1720
1721 struct flow_stats_cbdata {
1722     struct ofproto *ofproto;
1723     struct ofconn *ofconn;
1724     uint16_t out_port;
1725     struct ofpbuf *msg;
1726 };
1727
1728 /* Obtains statistic counters for 'rule' within 'p' and stores them into
1729  * '*packet_countp' and '*byte_countp'.  If 'rule' is a wildcarded rule, the
1730  * returned statistic include statistics for all of 'rule''s subrules. */
1731 static void
1732 query_stats(struct ofproto *p, struct wdp_rule *rule,
1733             uint64_t *packet_countp, uint64_t *byte_countp)
1734 {
1735     struct wdp_flow_stats stats;
1736
1737     if (!wdp_flow_get_stats(p->wdp, rule, &stats)) {
1738         *packet_countp = stats.n_packets;
1739         *byte_countp = stats.n_bytes;
1740     } else {
1741         *packet_countp = 0;
1742         *byte_countp = 0;
1743     }
1744 }
1745
1746 static int
1747 flow_stats_cb(struct wdp_rule *rule, void *cbdata_)
1748 {
1749     struct flow_stats_cbdata *cbdata = cbdata_;
1750     struct ofp_flow_stats *ofs;
1751     uint64_t packet_count, byte_count;
1752     size_t act_len, len;
1753     long long int tdiff = time_msec() - rule->created;
1754     uint32_t sec = tdiff / 1000;
1755     uint32_t msec = tdiff - (sec * 1000);
1756
1757     if (rule_is_hidden(rule)
1758         || !rule_has_out_port(rule, cbdata->out_port)) {
1759         return 0;
1760     }
1761
1762     act_len = sizeof *rule->actions * rule->n_actions;
1763     len = offsetof(struct ofp_flow_stats, actions) + act_len;
1764
1765     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1766
1767     ofs = append_stats_reply(len, cbdata->ofconn, &cbdata->msg);
1768     ofs->length = htons(len);
1769     ofs->table_id = rule->ofp_table_id;
1770     ofs->pad = 0;
1771     flow_to_match(&rule->cr.flow, cbdata->ofproto->tun_id_from_cookie,
1772                   &ofs->match);
1773     ofs->duration_sec = htonl(sec);
1774     ofs->duration_nsec = htonl(msec * 1000000);
1775     ofs->cookie = ofproto_rule_cast(rule)->flow_cookie;
1776     ofs->priority = htons(rule->cr.flow.priority);
1777     ofs->idle_timeout = htons(rule->idle_timeout);
1778     ofs->hard_timeout = htons(rule->hard_timeout);
1779     memset(ofs->pad2, 0, sizeof ofs->pad2);
1780     ofs->packet_count = htonll(packet_count);
1781     ofs->byte_count = htonll(byte_count);
1782     memcpy(ofs->actions, rule->actions, act_len);
1783
1784     return 0;
1785 }
1786
1787 static unsigned int
1788 table_id_to_include(uint8_t table_id)
1789 {
1790     return (table_id == 0xff ? UINT_MAX
1791             : table_id < 32 ? 1u << table_id
1792             : 0);
1793 }
1794
1795 static unsigned int
1796 flow_mod_table_id(const struct ofconn *ofconn, const struct ofp_flow_mod *ofm)
1797 {
1798     return ofconn->flow_mod_table_id ? ntohs(ofm->command) >> 8 : 0xff;
1799 }
1800
1801 static int
1802 handle_flow_stats_request(struct ofproto *p, struct ofconn *ofconn,
1803                           const struct ofp_stats_request *osr,
1804                           size_t arg_size)
1805 {
1806     struct ofp_flow_stats_request *fsr;
1807     struct flow_stats_cbdata cbdata;
1808     flow_t target;
1809
1810     if (arg_size != sizeof *fsr) {
1811         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1812     }
1813     fsr = (struct ofp_flow_stats_request *) osr->body;
1814
1815     COVERAGE_INC(ofproto_flows_req);
1816     cbdata.ofproto = p;
1817     cbdata.ofconn = ofconn;
1818     cbdata.out_port = fsr->out_port;
1819     cbdata.msg = start_stats_reply(osr, 1024);
1820     flow_from_match(&fsr->match, 0, false, 0, &target);
1821     wdp_flow_for_each_match(p->wdp, &target,
1822                             table_id_to_include(fsr->table_id),
1823                             flow_stats_cb, &cbdata);
1824     queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
1825     return 0;
1826 }
1827
1828 struct flow_stats_ds_cbdata {
1829     struct ofproto *ofproto;
1830     struct ds *results;
1831 };
1832
1833 static int
1834 flow_stats_ds_cb(struct wdp_rule *rule, void *cbdata_)
1835 {
1836     struct flow_stats_ds_cbdata *cbdata = cbdata_;
1837     struct ds *results = cbdata->results;
1838     struct ofp_match match;
1839     uint64_t packet_count, byte_count;
1840     size_t act_len = sizeof *rule->actions * rule->n_actions;
1841
1842     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1843     flow_to_match(&rule->cr.flow, cbdata->ofproto->tun_id_from_cookie,
1844                   &match);
1845
1846     ds_put_format(results, "duration=%llds, ",
1847                   (time_msec() - rule->created) / 1000);
1848     ds_put_format(results, "priority=%u, ", rule->cr.flow.priority);
1849     ds_put_format(results, "n_packets=%"PRIu64", ", packet_count);
1850     ds_put_format(results, "n_bytes=%"PRIu64", ", byte_count);
1851     ofp_print_match(results, &match, true);
1852     ofp_print_actions(results, &rule->actions->header, act_len);
1853     ds_put_cstr(results, "\n");
1854
1855     return 0;
1856 }
1857
1858 /* Adds a pretty-printed description of all flows to 'results', including
1859  * those marked hidden by secchan (e.g., by in-band control). */
1860 void
1861 ofproto_get_all_flows(struct ofproto *p, struct ds *results)
1862 {
1863     struct flow_stats_ds_cbdata cbdata;
1864     struct ofp_match match;
1865     flow_t target;
1866
1867     memset(&match, 0, sizeof match);
1868     match.wildcards = htonl(OVSFW_ALL);
1869
1870     cbdata.ofproto = p;
1871     cbdata.results = results;
1872
1873     flow_from_match(&match, 0, false, 0, &target);
1874     wdp_flow_for_each_match(p->wdp, &target, UINT_MAX,
1875                             flow_stats_ds_cb, &cbdata);
1876 }
1877
1878 struct aggregate_stats_cbdata {
1879     struct ofproto *ofproto;
1880     uint16_t out_port;
1881     uint64_t packet_count;
1882     uint64_t byte_count;
1883     uint32_t n_flows;
1884 };
1885
1886 static int
1887 aggregate_stats_cb(struct wdp_rule *rule, void *cbdata_)
1888 {
1889     struct aggregate_stats_cbdata *cbdata = cbdata_;
1890     uint64_t packet_count, byte_count;
1891
1892     if (rule_is_hidden(rule) || !rule_has_out_port(rule, cbdata->out_port)) {
1893         return 0;
1894     }
1895
1896     query_stats(cbdata->ofproto, rule, &packet_count, &byte_count);
1897
1898     cbdata->packet_count += packet_count;
1899     cbdata->byte_count += byte_count;
1900     cbdata->n_flows++;
1901
1902     return 0;
1903 }
1904
1905 static int
1906 handle_aggregate_stats_request(struct ofproto *p, struct ofconn *ofconn,
1907                                const struct ofp_stats_request *osr,
1908                                size_t arg_size)
1909 {
1910     struct ofp_aggregate_stats_request *asr;
1911     struct ofp_aggregate_stats_reply *reply;
1912     struct aggregate_stats_cbdata cbdata;
1913     struct ofpbuf *msg;
1914     flow_t target;
1915
1916     if (arg_size != sizeof *asr) {
1917         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
1918     }
1919     asr = (struct ofp_aggregate_stats_request *) osr->body;
1920
1921     COVERAGE_INC(ofproto_agg_request);
1922     cbdata.ofproto = p;
1923     cbdata.out_port = asr->out_port;
1924     cbdata.packet_count = 0;
1925     cbdata.byte_count = 0;
1926     cbdata.n_flows = 0;
1927     flow_from_match(&asr->match, 0, false, 0, &target);
1928     wdp_flow_for_each_match(p->wdp, &target,
1929                             table_id_to_include(asr->table_id),
1930                             aggregate_stats_cb, &cbdata);
1931
1932     msg = start_stats_reply(osr, sizeof *reply);
1933     reply = append_stats_reply(sizeof *reply, ofconn, &msg);
1934     reply->flow_count = htonl(cbdata.n_flows);
1935     reply->packet_count = htonll(cbdata.packet_count);
1936     reply->byte_count = htonll(cbdata.byte_count);
1937     queue_tx(msg, ofconn, ofconn->reply_counter);
1938     return 0;
1939 }
1940
1941 struct queue_stats_cbdata {
1942     struct ofconn *ofconn;
1943     struct wdp_port *wdp_port;
1944     struct ofpbuf *msg;
1945 };
1946
1947 static void
1948 put_queue_stats(struct queue_stats_cbdata *cbdata, uint32_t queue_id,
1949                 const struct netdev_queue_stats *stats)
1950 {
1951     struct ofp_queue_stats *reply;
1952
1953     reply = append_stats_reply(sizeof *reply, cbdata->ofconn, &cbdata->msg);
1954     reply->port_no = htons(cbdata->wdp_port->opp.port_no);
1955     memset(reply->pad, 0, sizeof reply->pad);
1956     reply->queue_id = htonl(queue_id);
1957     reply->tx_bytes = htonll(stats->tx_bytes);
1958     reply->tx_packets = htonll(stats->tx_packets);
1959     reply->tx_errors = htonll(stats->tx_errors);
1960 }
1961
1962 static void
1963 handle_queue_stats_dump_cb(uint32_t queue_id,
1964                            struct netdev_queue_stats *stats,
1965                            void *cbdata_)
1966 {
1967     struct queue_stats_cbdata *cbdata = cbdata_;
1968
1969     put_queue_stats(cbdata, queue_id, stats);
1970 }
1971
1972 static void
1973 handle_queue_stats_for_port(struct wdp_port *port, uint32_t queue_id,
1974                             struct queue_stats_cbdata *cbdata)
1975 {
1976     cbdata->wdp_port = port;
1977     if (queue_id == OFPQ_ALL) {
1978         netdev_dump_queue_stats(port->netdev,
1979                                 handle_queue_stats_dump_cb, cbdata);
1980     } else {
1981         struct netdev_queue_stats stats;
1982
1983         if (!netdev_get_queue_stats(port->netdev, queue_id, &stats)) {
1984             put_queue_stats(cbdata, queue_id, &stats);
1985         }
1986     }
1987 }
1988
1989 static int
1990 handle_queue_stats_request(struct ofproto *ofproto, struct ofconn *ofconn,
1991                            const struct ofp_stats_request *osr,
1992                            size_t arg_size)
1993 {
1994     struct ofp_queue_stats_request *qsr;
1995     struct queue_stats_cbdata cbdata;
1996     unsigned int port_no;
1997     uint32_t queue_id;
1998
1999     if (arg_size != sizeof *qsr) {
2000         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2001     }
2002     qsr = (struct ofp_queue_stats_request *) osr->body;
2003
2004     COVERAGE_INC(ofproto_queue_req);
2005
2006     cbdata.ofconn = ofconn;
2007     cbdata.msg = start_stats_reply(osr, 128);
2008
2009     port_no = ntohs(qsr->port_no);
2010     queue_id = ntohl(qsr->queue_id);
2011     if (port_no == OFPP_ALL) {
2012         struct wdp_port *ports;
2013         size_t n_ports, i;
2014
2015         wdp_port_list(ofproto->wdp, &ports, &n_ports);
2016         /* XXX deal with wdp_port_list() errors */
2017         for (i = 0; i < n_ports; i++) {
2018             handle_queue_stats_for_port(&ports[i], queue_id, &cbdata);
2019         }
2020         wdp_port_array_free(ports, n_ports);
2021     } else if (port_no < ofproto->max_ports) {
2022         struct wdp_port port;
2023         int error;
2024
2025         error = wdp_port_query_by_number(ofproto->wdp, port_no, &port);
2026         if (!error) {
2027             handle_queue_stats_for_port(&port, queue_id, &cbdata);
2028         } else {
2029             /* XXX deal with wdp_port_query_by_number() errors */
2030         }
2031         wdp_port_free(&port);
2032     } else {
2033         ofpbuf_delete(cbdata.msg);
2034         return ofp_mkerr(OFPET_QUEUE_OP_FAILED, OFPQOFC_BAD_PORT);
2035     }
2036     queue_tx(cbdata.msg, ofconn, ofconn->reply_counter);
2037
2038     return 0;
2039 }
2040
2041 static int
2042 handle_stats_request(struct ofproto *p, struct ofconn *ofconn,
2043                      struct ofp_header *oh)
2044 {
2045     struct ofp_stats_request *osr;
2046     size_t arg_size;
2047     int error;
2048
2049     error = check_ofp_message_array(oh, OFPT_STATS_REQUEST, sizeof *osr,
2050                                     1, &arg_size);
2051     if (error) {
2052         return error;
2053     }
2054     osr = (struct ofp_stats_request *) oh;
2055
2056     switch (ntohs(osr->type)) {
2057     case OFPST_DESC:
2058         return handle_desc_stats_request(p, ofconn, osr);
2059
2060     case OFPST_FLOW:
2061         return handle_flow_stats_request(p, ofconn, osr, arg_size);
2062
2063     case OFPST_AGGREGATE:
2064         return handle_aggregate_stats_request(p, ofconn, osr, arg_size);
2065
2066     case OFPST_TABLE:
2067         return handle_table_stats_request(p, ofconn, osr);
2068
2069     case OFPST_PORT:
2070         return handle_port_stats_request(p, ofconn, osr, arg_size);
2071
2072     case OFPST_QUEUE:
2073         return handle_queue_stats_request(p, ofconn, osr, arg_size);
2074
2075     case OFPST_VENDOR:
2076         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
2077
2078     default:
2079         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_STAT);
2080     }
2081 }
2082
2083 /* Implements OFPFC_ADD and the cases for OFPFC_MODIFY and OFPFC_MODIFY_STRICT
2084  * in which no matching flow already exists in the flow table.
2085  *
2086  * Adds the flow specified by 'ofm', which is followed by 'n_actions'
2087  * ofp_actions, to 'p''s flow table.  Returns 0 on success or an OpenFlow error
2088  * code as encoded by ofp_mkerr() on failure.
2089  *
2090  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2091  * if any. */
2092 static int
2093 add_flow(struct ofproto *p, struct ofconn *ofconn,
2094          const struct ofp_flow_mod *ofm, size_t n_actions)
2095 {
2096     struct wdp_rule *rule;
2097     struct wdp_flow_put put;
2098     struct ofpbuf *packet;
2099     uint16_t in_port;
2100     flow_t flow;
2101     int error;
2102
2103     flow_from_match(&ofm->match, ntohs(ofm->priority), p->tun_id_from_cookie,
2104                     ofm->cookie, &flow);
2105     if (ofm->flags & htons(OFPFF_CHECK_OVERLAP)
2106         && wdp_flow_overlaps(p->wdp, &flow)) {
2107         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_OVERLAP);
2108     }
2109
2110     put.flags = WDP_PUT_CREATE | WDP_PUT_MODIFY | WDP_PUT_ALL;
2111     put.flow = &flow;
2112     put.actions = (const union ofp_action *) ofm->actions;
2113     put.n_actions = n_actions;
2114     put.idle_timeout = ntohs(ofm->idle_timeout);
2115     put.hard_timeout = ntohs(ofm->hard_timeout);
2116     put.ofp_table_id = flow_mod_table_id(ofconn, ofm);
2117     put.cookie = ofm->cookie;
2118     put.xid = ofm->header.xid;
2119     error = wdp_flow_put(p->wdp, &put, NULL, &rule);
2120     if (error) {
2121         /* XXX wdp_flow_put should return OpenFlow error code. */
2122         return error;
2123     }
2124     ofproto_rule_init(rule);
2125
2126     if (ofm->buffer_id != htonl(UINT32_MAX)) {
2127         error = pktbuf_retrieve(ofconn->pktbuf, ntohl(ofm->buffer_id),
2128                                 &packet, &in_port);
2129         if (!error) {
2130             error = wdp_flow_inject(p->wdp, rule, in_port, packet);
2131             ofpbuf_delete(packet);
2132         }
2133     }
2134
2135     return error;
2136 }
2137
2138 static struct wdp_rule *
2139 find_flow_strict(struct ofproto *p, const struct ofconn *ofconn,
2140                  const struct ofp_flow_mod *ofm)
2141 {
2142     uint8_t table_id;
2143     flow_t flow;
2144
2145     flow_from_match(&ofm->match, ntohs(ofm->priority),
2146                     p->tun_id_from_cookie, ofm->cookie, &flow);
2147     table_id = flow_mod_table_id(ofconn, ofm);
2148     return wdp_flow_get(p->wdp, &flow, table_id_to_include(table_id));
2149 }
2150
2151 static int
2152 send_buffered_packet(struct ofproto *ofproto, struct ofconn *ofconn,
2153                      struct wdp_rule *rule, const struct ofp_flow_mod *ofm)
2154 {
2155     struct ofpbuf *packet;
2156     uint16_t in_port;
2157     int error;
2158
2159     if (ofm->buffer_id == htonl(UINT32_MAX)) {
2160         return 0;
2161     }
2162
2163     error = pktbuf_retrieve(ofconn->pktbuf, ntohl(ofm->buffer_id),
2164                             &packet, &in_port);
2165     if (error) {
2166         return error;
2167     }
2168
2169     error = wdp_flow_inject(ofproto->wdp, rule, in_port, packet);
2170     ofpbuf_delete(packet);
2171
2172     return error;
2173 }
2174 \f
2175 /* OFPFC_MODIFY and OFPFC_MODIFY_STRICT. */
2176
2177 struct modify_flows_cbdata {
2178     struct ofproto *ofproto;
2179     const struct ofp_flow_mod *ofm;
2180     size_t n_actions;
2181     struct wdp_rule *match;
2182 };
2183
2184 static int modify_flow(struct ofproto *, const struct ofp_flow_mod *,
2185                        size_t n_actions, struct wdp_rule *)
2186     WARN_UNUSED_RESULT;
2187 static int modify_flows_cb(struct wdp_rule *, void *cbdata_);
2188
2189 /* Implements OFPFC_ADD and OFPFC_MODIFY.  Returns 0 on success or an OpenFlow
2190  * error code as encoded by ofp_mkerr() on failure.
2191  *
2192  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2193  * if any. */
2194 static int
2195 modify_flows_loose(struct ofproto *p, struct ofconn *ofconn,
2196                    const struct ofp_flow_mod *ofm, size_t n_actions)
2197 {
2198     struct modify_flows_cbdata cbdata;
2199     uint8_t table_id;
2200     flow_t target;
2201     int error;
2202
2203     cbdata.ofproto = p;
2204     cbdata.ofm = ofm;
2205     cbdata.n_actions = n_actions;
2206     cbdata.match = NULL;
2207
2208     flow_from_match(&ofm->match, 0, p->tun_id_from_cookie, ofm->cookie,
2209                     &target);
2210
2211     table_id = flow_mod_table_id(ofconn, ofm);
2212     error = wdp_flow_for_each_match(p->wdp, &target,
2213                                     table_id_to_include(table_id),
2214                                     modify_flows_cb, &cbdata);
2215     if (error) {
2216         return error;
2217     }
2218
2219     if (cbdata.match) {
2220         /* This credits the packet to whichever flow happened to match last.
2221          * That's weird.  Maybe we should do a lookup for the flow that
2222          * actually matches the packet?  Who knows. */
2223         return send_buffered_packet(p, ofconn, cbdata.match, ofm);
2224     } else {
2225         return add_flow(p, ofconn, ofm, n_actions);
2226     }
2227 }
2228
2229 /* Implements OFPFC_MODIFY_STRICT.  Returns 0 on success or an OpenFlow error
2230  * code as encoded by ofp_mkerr() on failure.
2231  *
2232  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
2233  * if any. */
2234 static int
2235 modify_flow_strict(struct ofproto *p, struct ofconn *ofconn,
2236                    struct ofp_flow_mod *ofm, size_t n_actions)
2237 {
2238     struct wdp_rule *rule = find_flow_strict(p, ofconn, ofm);
2239     if (rule && !rule_is_hidden(rule)) {
2240         int error = modify_flow(p, ofm, n_actions, rule);
2241         return error ? error : send_buffered_packet(p, ofconn, rule, ofm);
2242     } else {
2243         return add_flow(p, ofconn, ofm, n_actions);
2244     }
2245 }
2246
2247 /* Callback for modify_flows_loose(). */
2248 static int
2249 modify_flows_cb(struct wdp_rule *rule, void *cbdata_)
2250 {
2251     struct modify_flows_cbdata *cbdata = cbdata_;
2252
2253     if (!rule_is_hidden(rule)) {
2254         cbdata->match = rule;
2255         return modify_flow(cbdata->ofproto, cbdata->ofm, cbdata->n_actions,
2256                            rule);
2257     }
2258     return 0;
2259 }
2260
2261 /* Implements core of OFPFC_MODIFY and OFPFC_MODIFY_STRICT where 'rule' has
2262  * been identified as a flow in 'p''s flow table to be modified, by changing
2263  * the rule's actions to match those in 'ofm' (which is followed by 'n_actions'
2264  * ofp_action[] structures). */
2265 static int
2266 modify_flow(struct ofproto *p, const struct ofp_flow_mod *ofm,
2267             size_t n_actions, struct wdp_rule *rule)
2268 {
2269     const struct ofp_action_header *actions = ofm->actions;
2270     struct ofproto_rule *ofproto_rule = ofproto_rule_cast(rule);
2271     struct wdp_flow_put put;
2272
2273     ofproto_rule->flow_cookie = ofm->cookie;
2274
2275     /* If the actions are the same, do nothing. */
2276     if (n_actions == rule->n_actions
2277         && !memcmp(ofm->actions, rule->actions, sizeof *actions * n_actions))
2278     {
2279         return 0;
2280     }
2281
2282     put.flags = WDP_PUT_MODIFY | WDP_PUT_ACTIONS;
2283     put.flow = &rule->cr.flow;
2284     put.actions = (const union ofp_action *) actions;
2285     put.n_actions = n_actions;
2286     put.idle_timeout = put.hard_timeout = 0;
2287     put.ofp_table_id = rule->ofp_table_id;
2288     put.cookie = ofm->cookie;
2289     put.xid = ofm->header.xid;
2290     return wdp_flow_put(p->wdp, &put, NULL, NULL);
2291 }
2292 \f
2293 /* OFPFC_DELETE implementation. */
2294
2295 struct delete_flows_cbdata {
2296     struct ofproto *ofproto;
2297     uint16_t out_port;
2298 };
2299
2300 static int delete_flows_cb(struct wdp_rule *, void *cbdata_);
2301 static int delete_flow_core(struct ofproto *, struct wdp_rule *,
2302                              uint16_t out_port);
2303
2304 /* Implements OFPFC_DELETE. */
2305 static int
2306 delete_flows_loose(struct ofproto *p, const struct ofconn *ofconn,
2307                    const struct ofp_flow_mod *ofm)
2308 {
2309     struct delete_flows_cbdata cbdata;
2310     uint8_t table_id;
2311     flow_t target;
2312
2313     cbdata.ofproto = p;
2314     cbdata.out_port = ofm->out_port;
2315
2316     flow_from_match(&ofm->match, 0, p->tun_id_from_cookie, ofm->cookie,
2317                     &target);
2318     table_id = flow_mod_table_id(ofconn, ofm);
2319
2320     return wdp_flow_for_each_match(p->wdp, &target,
2321                                    table_id_to_include(table_id),
2322                                    delete_flows_cb, &cbdata);
2323 }
2324
2325 /* Implements OFPFC_DELETE_STRICT. */
2326 static int
2327 delete_flow_strict(struct ofproto *p, const struct ofconn *ofconn,
2328                    struct ofp_flow_mod *ofm)
2329 {
2330     struct wdp_rule *rule = find_flow_strict(p, ofconn, ofm);
2331     if (rule) {
2332         return delete_flow_core(p, rule, ofm->out_port);
2333     }
2334     return 0;
2335 }
2336
2337 /* Callback for delete_flows_loose(). */
2338 static int
2339 delete_flows_cb(struct wdp_rule *rule, void *cbdata_)
2340 {
2341     struct delete_flows_cbdata *cbdata = cbdata_;
2342
2343     return delete_flow_core(cbdata->ofproto, rule, cbdata->out_port);
2344 }
2345
2346 /* Implements core of OFPFC_DELETE and OFPFC_DELETE_STRICT where 'rule' has
2347  * been identified as a flow to delete from 'p''s flow table, by deleting the
2348  * flow and sending out a OFPT_FLOW_REMOVED message to any interested
2349  * controller.
2350  *
2351  * Will not delete 'rule' if it is hidden.  Will delete 'rule' only if
2352  * 'out_port' is htons(OFPP_NONE) or if 'rule' actually outputs to the
2353  * specified 'out_port'. */
2354 static int
2355 delete_flow_core(struct ofproto *p, struct wdp_rule *rule, uint16_t out_port)
2356 {
2357     if (rule_is_hidden(rule)) {
2358         return 0;
2359     }
2360
2361     if (out_port != htons(OFPP_NONE) && !rule_has_out_port(rule, out_port)) {
2362         return 0;
2363     }
2364
2365     return delete_flow(p, rule, OFPRR_DELETE);
2366 }
2367 \f
2368 static int
2369 handle_flow_mod(struct ofproto *p, struct ofconn *ofconn,
2370                 struct ofp_flow_mod *ofm)
2371 {
2372     struct ofp_match orig_match;
2373     size_t n_actions;
2374     int error;
2375
2376     error = reject_slave_controller(ofconn, &ofm->header);
2377     if (error) {
2378         return error;
2379     }
2380     error = check_ofp_message_array(&ofm->header, OFPT_FLOW_MOD, sizeof *ofm,
2381                                     sizeof *ofm->actions, &n_actions);
2382     if (error) {
2383         return error;
2384     }
2385
2386     /* We do not support the emergency flow cache.  It will hopefully
2387      * get dropped from OpenFlow in the near future. */
2388     if (ofm->flags & htons(OFPFF_EMERG)) {
2389         /* There isn't a good fit for an error code, so just state that the
2390          * flow table is full. */
2391         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_ALL_TABLES_FULL);
2392     }
2393
2394     /* Normalize ofp->match.  If normalization actually changes anything, then
2395      * log the differences. */
2396     ofm->match.pad1[0] = ofm->match.pad2[0] = 0;
2397     orig_match = ofm->match;
2398     normalize_match(&ofm->match);
2399     if (memcmp(&ofm->match, &orig_match, sizeof orig_match)) {
2400         static struct vlog_rate_limit normal_rl = VLOG_RATE_LIMIT_INIT(1, 1);
2401         if (!VLOG_DROP_INFO(&normal_rl)) {
2402             char *old = ofp_match_to_literal_string(&orig_match);
2403             char *new = ofp_match_to_literal_string(&ofm->match);
2404             VLOG_INFO("%s: normalization changed ofp_match, details:",
2405                       rconn_get_name(ofconn->rconn));
2406             VLOG_INFO(" pre: %s", old);
2407             VLOG_INFO("post: %s", new);
2408             free(old);
2409             free(new);
2410         }
2411     }
2412
2413     if (!ofm->match.wildcards) {
2414         ofm->priority = htons(UINT16_MAX);
2415     }
2416
2417     error = validate_actions((const union ofp_action *) ofm->actions,
2418                              n_actions, p->max_ports);
2419     if (error) {
2420         return error;
2421     }
2422
2423     if (!ofconn->flow_mod_table_id && ofm->command & htons(0xff00)) {
2424         static struct vlog_rate_limit table_id_rl = VLOG_RATE_LIMIT_INIT(1, 1);
2425         VLOG_WARN_RL(&table_id_rl, "%s: flow_mod table_id feature must be "
2426                      "enabled with NXT_FLOW_MOD_TABLE_ID",
2427                      rconn_get_name(ofconn->rconn));
2428         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_BAD_COMMAND);
2429     }
2430
2431     switch (ntohs(ofm->command) & 0xff) {
2432     case OFPFC_ADD:
2433         return add_flow(p, ofconn, ofm, n_actions);
2434
2435     case OFPFC_MODIFY:
2436         return modify_flows_loose(p, ofconn, ofm, n_actions);
2437
2438     case OFPFC_MODIFY_STRICT:
2439         return modify_flow_strict(p, ofconn, ofm, n_actions);
2440
2441     case OFPFC_DELETE:
2442         return delete_flows_loose(p, ofconn, ofm);
2443
2444     case OFPFC_DELETE_STRICT:
2445         return delete_flow_strict(p, ofconn, ofm);
2446
2447     default:
2448         return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_BAD_COMMAND);
2449     }
2450 }
2451
2452 static int
2453 handle_tun_id_from_cookie(struct ofproto *p, struct nxt_tun_id_cookie *msg)
2454 {
2455     int error;
2456
2457     error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
2458     if (error) {
2459         return error;
2460     }
2461
2462     p->tun_id_from_cookie = !!msg->set;
2463     return 0;
2464 }
2465
2466 static int
2467 handle_flow_mod_table_id(struct ofconn *ofconn,
2468                          struct nxt_flow_mod_table_id *msg)
2469 {
2470     int error;
2471
2472     error = check_ofp_message(&msg->header, OFPT_VENDOR, sizeof *msg);
2473     if (error) {
2474         return error;
2475     }
2476
2477     ofconn->flow_mod_table_id = !!msg->set;
2478     return 0;
2479 }
2480
2481 static int
2482 handle_role_request(struct ofproto *ofproto,
2483                     struct ofconn *ofconn, struct nicira_header *msg)
2484 {
2485     struct nx_role_request *nrr;
2486     struct nx_role_request *reply;
2487     struct ofpbuf *buf;
2488     uint32_t role;
2489
2490     if (ntohs(msg->header.length) != sizeof *nrr) {
2491         VLOG_WARN_RL(&rl, "received role request of length %u (expected %zu)",
2492                      ntohs(msg->header.length), sizeof *nrr);
2493         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2494     }
2495     nrr = (struct nx_role_request *) msg;
2496
2497     if (ofconn->type != OFCONN_PRIMARY) {
2498         VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
2499                      "connection");
2500         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
2501     }
2502
2503     role = ntohl(nrr->role);
2504     if (role != NX_ROLE_OTHER && role != NX_ROLE_MASTER
2505         && role != NX_ROLE_SLAVE) {
2506         VLOG_WARN_RL(&rl, "received request for unknown role %"PRIu32, role);
2507
2508         /* There's no good error code for this. */
2509         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
2510     }
2511
2512     if (role == NX_ROLE_MASTER) {
2513         struct ofconn *other;
2514
2515         HMAP_FOR_EACH (other, hmap_node, &ofproto->controllers) {
2516             if (other->role == NX_ROLE_MASTER) {
2517                 other->role = NX_ROLE_SLAVE;
2518             }
2519         }
2520     }
2521     ofconn->role = role;
2522
2523     reply = make_openflow_xid(sizeof *reply, OFPT_VENDOR, msg->header.xid,
2524                               &buf);
2525     reply->nxh.vendor = htonl(NX_VENDOR_ID);
2526     reply->nxh.subtype = htonl(NXT_ROLE_REPLY);
2527     reply->role = htonl(role);
2528     queue_tx(buf, ofconn, ofconn->reply_counter);
2529
2530     return 0;
2531 }
2532
2533 static int
2534 handle_vendor(struct ofproto *p, struct ofconn *ofconn, void *msg)
2535 {
2536     struct ofp_vendor_header *ovh = msg;
2537     struct nicira_header *nh;
2538
2539     if (ntohs(ovh->header.length) < sizeof(struct ofp_vendor_header)) {
2540         VLOG_WARN_RL(&rl, "received vendor message of length %u "
2541                           "(expected at least %zu)",
2542                    ntohs(ovh->header.length), sizeof(struct ofp_vendor_header));
2543         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2544     }
2545     if (ovh->vendor != htonl(NX_VENDOR_ID)) {
2546         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_VENDOR);
2547     }
2548     if (ntohs(ovh->header.length) < sizeof(struct nicira_header)) {
2549         VLOG_WARN_RL(&rl, "received Nicira vendor message of length %u "
2550                           "(expected at least %zu)",
2551                      ntohs(ovh->header.length), sizeof(struct nicira_header));
2552         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
2553     }
2554
2555     nh = msg;
2556     switch (ntohl(nh->subtype)) {
2557     case NXT_STATUS_REQUEST:
2558         return switch_status_handle_request(p->switch_status, ofconn->rconn,
2559                                             msg);
2560
2561     case NXT_TUN_ID_FROM_COOKIE:
2562         return handle_tun_id_from_cookie(p, msg);
2563
2564     case NXT_FLOW_MOD_TABLE_ID:
2565         return handle_flow_mod_table_id(ofconn, msg);
2566
2567     case NXT_ROLE_REQUEST:
2568         return handle_role_request(p, ofconn, msg);
2569     }
2570
2571     return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_SUBTYPE);
2572 }
2573
2574 static int
2575 handle_barrier_request(struct ofconn *ofconn, struct ofp_header *oh)
2576 {
2577     struct ofp_header *ob;
2578     struct ofpbuf *buf;
2579
2580     /* Currently, everything executes synchronously, so we can just
2581      * immediately send the barrier reply. */
2582     ob = make_openflow_xid(sizeof *ob, OFPT_BARRIER_REPLY, oh->xid, &buf);
2583     queue_tx(buf, ofconn, ofconn->reply_counter);
2584     return 0;
2585 }
2586
2587 static void
2588 handle_openflow(struct ofconn *ofconn, struct ofproto *p,
2589                 struct ofpbuf *ofp_msg)
2590 {
2591     struct ofp_header *oh = ofp_msg->data;
2592     int error;
2593
2594     COVERAGE_INC(ofproto_recv_openflow);
2595     switch (oh->type) {
2596     case OFPT_ECHO_REQUEST:
2597         error = handle_echo_request(ofconn, oh);
2598         break;
2599
2600     case OFPT_ECHO_REPLY:
2601         error = 0;
2602         break;
2603
2604     case OFPT_FEATURES_REQUEST:
2605         error = handle_features_request(p, ofconn, oh);
2606         break;
2607
2608     case OFPT_GET_CONFIG_REQUEST:
2609         error = handle_get_config_request(p, ofconn, oh);
2610         break;
2611
2612     case OFPT_SET_CONFIG:
2613         error = handle_set_config(p, ofconn, ofp_msg->data);
2614         break;
2615
2616     case OFPT_PACKET_OUT:
2617         error = handle_packet_out(p, ofconn, ofp_msg->data);
2618         break;
2619
2620     case OFPT_PORT_MOD:
2621         error = handle_port_mod(p, ofconn, oh);
2622         break;
2623
2624     case OFPT_FLOW_MOD:
2625         error = handle_flow_mod(p, ofconn, ofp_msg->data);
2626         break;
2627
2628     case OFPT_STATS_REQUEST:
2629         error = handle_stats_request(p, ofconn, oh);
2630         break;
2631
2632     case OFPT_VENDOR:
2633         error = handle_vendor(p, ofconn, ofp_msg->data);
2634         break;
2635
2636     case OFPT_BARRIER_REQUEST:
2637         error = handle_barrier_request(ofconn, oh);
2638         break;
2639
2640     default:
2641         if (VLOG_IS_WARN_ENABLED()) {
2642             char *s = ofp_to_string(oh, ntohs(oh->length), 2);
2643             VLOG_DBG_RL(&rl, "OpenFlow message ignored: %s", s);
2644             free(s);
2645         }
2646         error = ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_BAD_TYPE);
2647         break;
2648     }
2649
2650     if (error) {
2651         send_error_oh(ofconn, ofp_msg->data, error);
2652     }
2653 }
2654 \f
2655 static void
2656 handle_flow_miss(struct ofproto *p, struct wdp_packet *packet)
2657 {
2658     struct wdp_rule *rule;
2659     flow_t flow;
2660
2661     flow_extract(packet->payload, packet->tun_id, packet->in_port, &flow);
2662     rule = wdp_flow_match(p->wdp, &flow);
2663     if (!rule) {
2664         /* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
2665         struct wdp_port port;
2666
2667         if (!wdp_port_query_by_number(p->wdp, packet->in_port, &port)) {
2668             bool no_packet_in = (port.opp.config & OFPPC_NO_PACKET_IN) != 0;
2669             wdp_port_free(&port);
2670             if (no_packet_in) {
2671                 COVERAGE_INC(ofproto_no_packet_in);
2672                 wdp_packet_destroy(packet);
2673                 return;
2674             }
2675         } else {
2676             VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
2677                          packet->in_port);
2678         }
2679
2680         COVERAGE_INC(ofproto_packet_in);
2681         send_packet_in(p, packet);
2682         return;
2683     }
2684
2685     wdp_flow_inject(p->wdp, rule, packet->in_port, packet->payload);
2686
2687     if (rule->cr.flow.priority == FAIL_OPEN_PRIORITY) {
2688         /*
2689          * Extra-special case for fail-open mode.
2690          *
2691          * We are in fail-open mode and the packet matched the fail-open rule,
2692          * but we are connected to a controller too.  We should send the packet
2693          * up to the controller in the hope that it will try to set up a flow
2694          * and thereby allow us to exit fail-open.
2695          *
2696          * See the top-level comment in fail-open.c for more information.
2697          */
2698         send_packet_in(p, packet);
2699     } else {
2700         wdp_packet_destroy(packet);
2701     }
2702 }
2703
2704 static void
2705 handle_wdp_packet(struct ofproto *p, struct wdp_packet *packet)
2706 {
2707     switch (packet->channel) {
2708     case WDP_CHAN_ACTION:
2709         COVERAGE_INC(ofproto_ctlr_action);
2710         send_packet_in(p, packet);
2711         break;
2712
2713     case WDP_CHAN_SFLOW:
2714         /* XXX */
2715         wdp_packet_destroy(packet);
2716         break;
2717
2718     case WDP_CHAN_MISS:
2719         handle_flow_miss(p, packet);
2720         break;
2721
2722     case WDP_N_CHANS:
2723     default:
2724         wdp_packet_destroy(packet);
2725         VLOG_WARN_RL(&rl, "received message on unexpected channel %d",
2726                      (int) packet->channel);
2727         break;
2728     }
2729 }
2730 \f
2731 static struct ofpbuf *
2732 compose_flow_removed(struct ofproto *p, const struct wdp_rule *rule,
2733                      uint8_t reason)
2734 {
2735     long long int tdiff = time_msec() - rule->created;
2736     uint32_t sec = tdiff / 1000;
2737     uint32_t msec = tdiff - (sec * 1000);
2738     struct ofp_flow_removed *ofr;
2739     struct ofpbuf *buf;
2740
2741     ofr = make_openflow(sizeof *ofr, OFPT_FLOW_REMOVED, &buf);
2742     flow_to_match(&rule->cr.flow, p->tun_id_from_cookie, &ofr->match);
2743     ofr->cookie = ofproto_rule_cast(rule)->flow_cookie;
2744     ofr->priority = htons(rule->cr.flow.priority);
2745     ofr->reason = reason;
2746     ofr->duration_sec = htonl(sec);
2747     ofr->duration_nsec = htonl(msec * 1000000);
2748     ofr->idle_timeout = htons(rule->idle_timeout);
2749
2750     return buf;
2751 }
2752
2753 static int
2754 delete_flow(struct ofproto *p, struct wdp_rule *rule, uint8_t reason)
2755 {
2756     /* We limit the maximum number of queued flow expirations it by accounting
2757      * them under the counter for replies.  That works because preventing
2758      * OpenFlow requests from being processed also prevents new flows from
2759      * being added (and expiring).  (It also prevents processing OpenFlow
2760      * requests that would not add new flows, so it is imperfect.) */
2761
2762     struct ofproto_rule *ofproto_rule = ofproto_rule_cast(rule);
2763     struct wdp_flow_stats stats;
2764     struct ofpbuf *buf;
2765     int error;
2766
2767     if (ofproto_rule->send_flow_removed) {
2768         /* Compose most of the ofp_flow_removed before 'rule' is destroyed. */
2769         buf = compose_flow_removed(p, rule, reason);
2770     } else {
2771         buf = NULL;
2772     }
2773
2774     error = wdp_flow_delete(p->wdp, rule, &stats);
2775     if (error) {
2776         return error;
2777     }
2778
2779     if (buf) {
2780         struct ofp_flow_removed *ofr;
2781         struct ofconn *prev = NULL;
2782         struct ofconn *ofconn;
2783
2784         /* Compose the parts of the ofp_flow_removed that require stats. */
2785         ofr = buf->data;
2786         ofr->packet_count = htonll(stats.n_packets);
2787         ofr->byte_count = htonll(stats.n_bytes);
2788
2789         LIST_FOR_EACH (ofconn, node, &p->all_conns) {
2790             if (rconn_is_connected(ofconn->rconn)) {
2791                 if (prev) {
2792                     queue_tx(ofpbuf_clone(buf), prev, prev->reply_counter);
2793                 }
2794                 prev = ofconn;
2795             }
2796         }
2797         if (prev) {
2798             queue_tx(buf, prev, prev->reply_counter);
2799         } else {
2800             ofpbuf_delete(buf);
2801         }
2802     }
2803     free(ofproto_rule);
2804
2805     return 0;
2806 }
2807
2808 /* pinsched callback for sending 'packet' on 'ofconn'. */
2809 static void
2810 do_send_packet_in(struct wdp_packet *packet, void *ofconn_)
2811 {
2812     struct ofconn *ofconn = ofconn_;
2813
2814     rconn_send_with_limit(ofconn->rconn, packet->payload,
2815                           ofconn->packet_in_counter, 100);
2816     packet->payload = NULL;
2817     wdp_packet_destroy(packet);
2818 }
2819
2820 /* Takes 'packet', which has been converted with do_convert_to_packet_in(), and
2821  * finalizes its content for sending on 'ofconn', and passes it to 'ofconn''s
2822  * packet scheduler for sending.
2823  *
2824  * 'max_len' specifies the maximum number of bytes of the packet to send on
2825  * 'ofconn' (INT_MAX specifies no limit).
2826  *
2827  * If 'clone' is true, the caller retains ownership of 'packet'.  Otherwise,
2828  * ownership is transferred to this function. */
2829 static void
2830 schedule_packet_in(struct ofconn *ofconn, struct wdp_packet *packet,
2831                    int max_len, bool clone)
2832 {
2833     struct ofproto *ofproto = ofconn->ofproto;
2834     struct ofp_packet_in *opi = packet->payload->data;
2835     int send_len, trim_size;
2836     uint32_t buffer_id;
2837
2838     /* Get buffer. */
2839     if (opi->reason == OFPR_ACTION) {
2840         buffer_id = UINT32_MAX;
2841     } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
2842         buffer_id = pktbuf_get_null();
2843     } else if (!ofconn->pktbuf) {
2844         buffer_id = UINT32_MAX;
2845     } else {
2846         struct ofpbuf payload;
2847         payload.data = opi->data;
2848         payload.size = (packet->payload->size
2849                         - offsetof(struct ofp_packet_in, data));
2850         buffer_id = pktbuf_save(ofconn->pktbuf, &payload, packet->in_port);
2851     }
2852
2853     /* Figure out how much of the packet to send. */
2854     send_len = ntohs(opi->total_len);
2855     if (buffer_id != UINT32_MAX) {
2856         send_len = MIN(send_len, ofconn->miss_send_len);
2857     }
2858     send_len = MIN(send_len, max_len);
2859
2860     /* Adjust packet length and clone if necessary. */
2861     trim_size = offsetof(struct ofp_packet_in, data) + send_len;
2862     if (clone) {
2863         packet = wdp_packet_clone(packet, trim_size);
2864         opi = packet->payload->data;
2865     } else {
2866         packet->payload->size = trim_size;
2867     }
2868
2869     /* Update packet headers. */
2870     opi->buffer_id = htonl(buffer_id);
2871     update_openflow_length(packet->payload);
2872
2873     /* Hand over to packet scheduler.  It might immediately call into
2874      * do_send_packet_in() or it might buffer it for a while (until a later
2875      * call to pinsched_run()). */
2876     pinsched_send(ofconn->schedulers[opi->reason], packet->in_port,
2877                   packet, do_send_packet_in, ofconn);
2878 }
2879
2880 /* Converts 'packet->payload' to a struct ofp_packet_in.  It must have
2881  * sufficient headroom to do so (e.g. as returned by xfif_recv()).
2882  *
2883  * The conversion is not complete: the caller still needs to trim any unneeded
2884  * payload off the end of the buffer, set the length in the OpenFlow header,
2885  * and set buffer_id.  Those require us to know the controller settings and so
2886  * must be done on a per-controller basis.
2887  *
2888  * Returns the maximum number of bytes of the packet that should be sent to
2889  * the controller (INT_MAX if no limit). */
2890 static int
2891 do_convert_to_packet_in(struct wdp_packet *packet)
2892 {
2893     uint16_t total_len = packet->payload->size;
2894     struct ofp_packet_in *opi;
2895
2896     /* Repurpose packet buffer by overwriting header. */
2897     opi = ofpbuf_push_zeros(packet->payload,
2898                             offsetof(struct ofp_packet_in, data));
2899     opi->header.version = OFP_VERSION;
2900     opi->header.type = OFPT_PACKET_IN;
2901     opi->total_len = htons(total_len);
2902     opi->in_port = htons(packet->in_port);
2903     if (packet->channel == WDP_CHAN_MISS) {
2904         opi->reason = OFPR_NO_MATCH;
2905         return INT_MAX;
2906     } else {
2907         opi->reason = OFPR_ACTION;
2908         return packet->send_len;
2909     }
2910 }
2911
2912 /* Given 'packet' with channel WDP_CHAN_ACTION or WDP_CHAN_MISS, sends an
2913  * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
2914  * their individual configurations.
2915  *
2916  * 'packet->payload' must have sufficient headroom to convert it into a struct
2917  * ofp_packet_in (e.g. as returned by dpif_recv()).
2918  *
2919  * Takes ownership of 'packet'. */
2920 static void
2921 send_packet_in(struct ofproto *ofproto, struct wdp_packet *packet)
2922 {
2923     struct ofconn *ofconn, *prev;
2924     int max_len;
2925
2926     max_len = do_convert_to_packet_in(packet);
2927
2928     prev = NULL;
2929     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
2930         if (ofconn_receives_async_msgs(ofconn)) {
2931             if (prev) {
2932                 schedule_packet_in(prev, packet, max_len, true);
2933             }
2934             prev = ofconn;
2935         }
2936     }
2937     if (prev) {
2938         schedule_packet_in(prev, packet, max_len, false);
2939     } else {
2940         wdp_packet_destroy(packet);
2941     }
2942 }
2943
2944 static uint64_t
2945 pick_datapath_id(const struct ofproto *ofproto)
2946 {
2947     struct wdp_port port;
2948
2949     if (!wdp_port_query_by_number(ofproto->wdp, OFPP_LOCAL, &port)) {
2950         uint8_t ea[ETH_ADDR_LEN];
2951         int error;
2952
2953         error = netdev_get_etheraddr(port.netdev, ea);
2954         if (!error) {
2955             wdp_port_free(&port);
2956             return eth_addr_to_uint64(ea);
2957         }
2958         VLOG_WARN("could not get MAC address for %s (%s)",
2959                   netdev_get_name(port.netdev), strerror(error));
2960         wdp_port_free(&port);
2961     }
2962
2963     return ofproto->fallback_dpid;
2964 }
2965
2966 static uint64_t
2967 pick_fallback_dpid(void)
2968 {
2969     uint8_t ea[ETH_ADDR_LEN];
2970     eth_addr_nicira_random(ea);
2971     return eth_addr_to_uint64(ea);
2972 }