X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=lib%2Flearning-switch.c;h=e674881bc52885e330f3f4751d7ba5612a2dbda7;hb=b798ce34c318f7e63a7c736b428c8b77fb069819;hp=d41602d6ba88ae34e756a565857f615ca81db2fb;hpb=b3b28afb7bef9094d05fcc8c1be4a41f9f1d5bfe;p=sliver-openvswitch.git diff --git a/lib/learning-switch.c b/lib/learning-switch.c index d41602d6b..e674881bc 100644 --- a/lib/learning-switch.c +++ b/lib/learning-switch.c @@ -40,13 +40,15 @@ #include #include -#include "buffer.h" #include "flow.h" #include "mac-learning.h" +#include "ofpbuf.h" #include "ofp-print.h" -#include "openflow.h" +#include "openflow/openflow.h" +#include "poll-loop.h" #include "queue.h" #include "rconn.h" +#include "stp.h" #include "timeval.h" #include "vconn.h" #include "xtoxll.h" @@ -54,23 +56,60 @@ #define THIS_MODULE VLM_learning_switch #include "vlog.h" +enum port_state { + P_DISABLED = 1 << 0, + P_LISTENING = 1 << 1, + P_LEARNING = 1 << 2, + P_FORWARDING = 1 << 3, + P_BLOCKING = 1 << 4 +}; + struct lswitch { /* If nonnegative, the switch sets up flows that expire after the given * number of seconds (or never expire, if the value is OFP_FLOW_PERMANENT). * Otherwise, the switch processes every packet. */ int max_idle; - uint64_t datapath_id; + unsigned long long int datapath_id; + uint32_t capabilities; time_t last_features_request; struct mac_learning *ml; /* NULL to act as hub instead of switch. */ + + /* Number of outgoing queued packets on the rconn. */ + int n_queued; + + /* Spanning tree protocol implementation. + * + * We implement STP states by, whenever a port's STP state changes, + * querying all the flows on the switch and then deleting any of them that + * are inappropriate for a port's STP state. */ + long long int next_query; /* Next time at which to query all flows. */ + long long int last_query; /* Last time we sent a query. */ + long long int last_reply; /* Last time we received a query reply. */ + unsigned int port_states[STP_MAX_PORTS]; + uint32_t query_xid; /* XID used for query. */ + int n_flows, n_no_recv, n_no_send; }; -static void queue_tx(struct lswitch *, struct rconn *, struct buffer *); +/* The log messages here could actually be useful in debugging, so keep the + * rate limit relatively high. */ +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300); + +static void queue_tx(struct lswitch *, struct rconn *, struct ofpbuf *); static void send_features_request(struct lswitch *, struct rconn *); -static void process_packet_in(struct lswitch *, struct rconn *, - struct ofp_packet_in *); -static void process_echo_request(struct lswitch *, struct rconn *, - struct ofp_header *); +static void schedule_query(struct lswitch *, long long int delay); +static bool may_learn(const struct lswitch *, uint16_t port_no); +static bool may_recv(const struct lswitch *, uint16_t port_no, + bool any_actions); +static bool may_send(const struct lswitch *, uint16_t port_no); + +typedef void packet_handler_func(struct lswitch *, struct rconn *, void *); +static packet_handler_func process_switch_features; +static packet_handler_func process_packet_in; +static packet_handler_func process_echo_request; +static packet_handler_func process_port_status; +static packet_handler_func process_phy_port; +static packet_handler_func process_stats_reply; /* Creates and returns a new learning switch. * @@ -85,12 +124,20 @@ static void process_echo_request(struct lswitch *, struct rconn *, struct lswitch * lswitch_create(struct rconn *rconn, bool learn_macs, int max_idle) { - struct lswitch *sw = xmalloc(sizeof *sw); - memset(sw, 0, sizeof *sw); + struct lswitch *sw; + size_t i; + + sw = xcalloc(1, sizeof *sw); sw->max_idle = max_idle; sw->datapath_id = 0; sw->last_features_request = time_now() - 1; sw->ml = learn_macs ? mac_learning_create() : NULL; + sw->next_query = LLONG_MIN; + sw->last_query = LLONG_MIN; + sw->last_reply = LLONG_MIN; + for (i = 0; i < STP_MAX_PORTS; i++) { + sw->port_states[i] = P_DISABLED; + } send_features_request(sw, rconn); return sw; } @@ -105,45 +152,177 @@ lswitch_destroy(struct lswitch *sw) } } +/* Takes care of necessary 'sw' activity, except for receiving packets (which + * the caller must do). */ +void +lswitch_run(struct lswitch *sw, struct rconn *rconn) +{ + long long int now = time_msec(); + + /* If we're waiting for more replies, keeping waiting for up to 10 s. */ + if (sw->last_reply != LLONG_MIN) { + if (now - sw->last_reply > 10000) { + VLOG_ERR_RL(&rl, "%012llx: No more flow stat replies last 10 s", + sw->datapath_id); + sw->last_reply = LLONG_MIN; + sw->last_query = LLONG_MIN; + schedule_query(sw, 0); + } else { + return; + } + } + + /* If we're waiting for any reply at all, keep waiting for up to 10 s. */ + if (sw->last_query != LLONG_MIN) { + if (now - sw->last_query > 10000) { + VLOG_ERR_RL(&rl, "%012llx: No flow stat replies in last 10 s", + sw->datapath_id); + sw->last_query = LLONG_MIN; + schedule_query(sw, 0); + } else { + return; + } + } + + /* If it's time to send another query, do so. */ + if (sw->next_query != LLONG_MIN && now >= sw->next_query) { + sw->next_query = LLONG_MIN; + if (!rconn_is_connected(rconn)) { + schedule_query(sw, 1000); + } else { + struct ofp_stats_request *osr; + struct ofp_flow_stats_request *ofsr; + struct ofpbuf *b; + int error; + + VLOG_DBG("%012llx: Sending flow stats request to implement STP", + sw->datapath_id); + + sw->last_query = now; + sw->query_xid = random_uint32(); + sw->n_flows = 0; + sw->n_no_recv = 0; + sw->n_no_send = 0; + osr = make_openflow_xid(sizeof *osr + sizeof *ofsr, + OFPT_STATS_REQUEST, sw->query_xid, &b); + osr->type = htons(OFPST_FLOW); + osr->flags = htons(0); + ofsr = (struct ofp_flow_stats_request *) osr->body; + ofsr->match.wildcards = htonl(OFPFW_ALL); + ofsr->table_id = 0xff; + ofsr->out_port = htons(OFPP_NONE); + + error = rconn_send(rconn, b, NULL); + if (error) { + VLOG_WARN_RL(&rl, "%012llx: sending flow stats request " + "failed: %s", sw->datapath_id, strerror(error)); + ofpbuf_delete(b); + schedule_query(sw, 1000); + } + } + } +} + +static void +wait_timeout(long long int started) +{ + long long int now = time_msec(); + long long int timeout = 10000 - (now - started); + if (timeout <= 0) { + poll_immediate_wake(); + } else { + poll_timer_wait(timeout); + } +} + +void +lswitch_wait(struct lswitch *sw) +{ + if (sw->last_reply != LLONG_MIN) { + wait_timeout(sw->last_reply); + } else if (sw->last_query != LLONG_MIN) { + wait_timeout(sw->last_query); + } +} + /* Processes 'msg', which should be an OpenFlow received on 'rconn', according * to the learning switch state in 'sw'. The most likely result of processing * is that flow-setup and packet-out OpenFlow messages will be sent out on * 'rconn'. */ void lswitch_process_packet(struct lswitch *sw, struct rconn *rconn, - const struct buffer *msg) + const struct ofpbuf *msg) { - static const size_t min_size[UINT8_MAX + 1] = { - [0 ... UINT8_MAX] = sizeof (struct ofp_header), - [OFPT_FEATURES_REPLY] = sizeof (struct ofp_switch_features), - [OFPT_PACKET_IN] = offsetof (struct ofp_packet_in, data), + struct processor { + uint8_t type; + size_t min_size; + packet_handler_func *handler; }; + static const struct processor processors[] = { + { + OFPT_ECHO_REQUEST, + sizeof(struct ofp_header), + process_echo_request + }, + { + OFPT_FEATURES_REPLY, + sizeof(struct ofp_switch_features), + process_switch_features + }, + { + OFPT_PACKET_IN, + offsetof(struct ofp_packet_in, data), + process_packet_in + }, + { + OFPT_PORT_STATUS, + sizeof(struct ofp_port_status), + process_port_status + }, + { + OFPT_STATS_REPLY, + offsetof(struct ofp_stats_reply, body), + process_stats_reply + }, + { + OFPT_FLOW_EXPIRED, + sizeof(struct ofp_flow_expired), + NULL + }, + }; + const size_t n_processors = ARRAY_SIZE(processors); + const struct processor *p; struct ofp_header *oh; oh = msg->data; - if (msg->size < min_size[oh->type]) { - VLOG_WARN("%s: too short (%zu bytes) for type %"PRIu8" (min %zu)", - rconn_get_name(rconn), - msg->size, oh->type, min_size[oh->type]); + if (sw->datapath_id == 0 + && oh->type != OFPT_ECHO_REQUEST + && oh->type != OFPT_FEATURES_REPLY) { + send_features_request(sw, rconn); return; } - if (oh->type == OFPT_ECHO_REQUEST) { - process_echo_request(sw, rconn, msg->data); - } else if (oh->type == OFPT_FEATURES_REPLY) { - struct ofp_switch_features *osf = msg->data; - sw->datapath_id = osf->datapath_id; - } else if (sw->datapath_id == 0) { - send_features_request(sw, rconn); - } else if (oh->type == OFPT_PACKET_IN) { - process_packet_in(sw, rconn, msg->data); - } else { - if (VLOG_IS_DBG_ENABLED()) { - char *p = ofp_to_string(msg->data, msg->size, 2); - VLOG_DBG("OpenFlow packet ignored: %s", p); - free(p); + for (p = processors; p < &processors[n_processors]; p++) { + if (oh->type == p->type) { + if (msg->size < p->min_size) { + VLOG_WARN_RL(&rl, "%012llx: %s: too short (%zu bytes) for " + "type %"PRIu8" (min %zu)", sw->datapath_id, + rconn_get_name(rconn), msg->size, oh->type, + p->min_size); + return; + } + if (p->handler) { + (p->handler)(sw, rconn, msg->data); + } + return; } } + if (VLOG_IS_DBG_ENABLED()) { + char *p = ofp_to_string(msg->data, msg->size, 2); + VLOG_DBG_RL(&rl, "%012llx: OpenFlow packet ignored: %s", + sw->datapath_id, p); + free(p); + } } static void @@ -151,26 +330,15 @@ send_features_request(struct lswitch *sw, struct rconn *rconn) { time_t now = time_now(); if (now >= sw->last_features_request + 1) { - struct buffer *b; - struct ofp_header *ofr; + struct ofpbuf *b; struct ofp_switch_config *osc; /* Send OFPT_FEATURES_REQUEST. */ - b = buffer_new(0); - ofr = buffer_put_uninit(b, sizeof *ofr); - memset(ofr, 0, sizeof *ofr); - ofr->type = OFPT_FEATURES_REQUEST; - ofr->version = OFP_VERSION; - ofr->length = htons(sizeof *ofr); + make_openflow(sizeof(struct ofp_header), OFPT_FEATURES_REQUEST, &b); queue_tx(sw, rconn, b); /* Send OFPT_SET_CONFIG. */ - b = buffer_new(0); - osc = buffer_put_uninit(b, sizeof *osc); - memset(osc, 0, sizeof *osc); - osc->header.type = OFPT_SET_CONFIG; - osc->header.version = OFP_VERSION; - osc->header.length = htons(sizeof *osc); + osc = make_openflow(sizeof *osc, OFPT_SET_CONFIG, &b); osc->flags = htons(OFPC_SEND_FLOW_EXP); osc->miss_send_len = htons(OFP_DEFAULT_MISS_SEND_LEN); queue_tx(sw, rconn, b); @@ -180,32 +348,58 @@ send_features_request(struct lswitch *sw, struct rconn *rconn) } static void -queue_tx(struct lswitch *sw, struct rconn *rconn, struct buffer *b) +queue_tx(struct lswitch *sw, struct rconn *rconn, struct ofpbuf *b) { - int retval = rconn_send(rconn, b); - if (retval) { + int retval = rconn_send_with_limit(rconn, b, &sw->n_queued, 10); + if (retval && retval != ENOTCONN) { if (retval == EAGAIN) { - /* FIXME: ratelimit. */ - VLOG_WARN("%s: tx queue overflow", rconn_get_name(rconn)); - } else if (retval == ENOTCONN) { - /* Ignore. */ + VLOG_WARN_RL(&rl, "%012llx: %s: tx queue overflow", + sw->datapath_id, rconn_get_name(rconn)); } else { - /* FIXME: ratelimit. */ - VLOG_WARN("%s: send: %s", rconn_get_name(rconn), strerror(retval)); + VLOG_WARN_RL(&rl, "%012llx: %s: send: %s", + sw->datapath_id, rconn_get_name(rconn), + strerror(retval)); } - buffer_delete(b); } } static void -process_packet_in(struct lswitch *sw, struct rconn *rconn, - struct ofp_packet_in *opi) +schedule_query(struct lswitch *sw, long long int delay) +{ + long long int now = time_msec(); + if (sw->next_query == LLONG_MIN || sw->next_query > now + delay) { + sw->next_query = now + delay; + } +} + +static void +process_switch_features(struct lswitch *sw, struct rconn *rconn, void *osf_) { + struct ofp_switch_features *osf = osf_; + size_t n_ports = ((ntohs(osf->header.length) + - offsetof(struct ofp_switch_features, ports)) + / sizeof *osf->ports); + size_t i; + + sw->datapath_id = ntohll(osf->datapath_id); + sw->capabilities = ntohl(osf->capabilities); + for (i = 0; i < n_ports; i++) { + process_phy_port(sw, rconn, &osf->ports[i]); + } + if (sw->capabilities & OFPC_STP) { + schedule_query(sw, 1000); + } +} + +static void +process_packet_in(struct lswitch *sw, struct rconn *rconn, void *opi_) +{ + struct ofp_packet_in *opi = opi_; uint16_t in_port = ntohs(opi->in_port); uint16_t out_port = OFPP_FLOOD; size_t pkt_ofs, pkt_len; - struct buffer pkt; + struct ofpbuf pkt; struct flow flow; /* Extract flow data from 'opi' into 'flow'. */ @@ -215,19 +409,33 @@ process_packet_in(struct lswitch *sw, struct rconn *rconn, pkt.size = pkt_len; flow_extract(&pkt, in_port, &flow); - if (sw->ml) { + if (may_learn(sw, in_port) && sw->ml) { if (mac_learning_learn(sw->ml, flow.dl_src, in_port)) { - VLOG_DBG("learned that "ETH_ADDR_FMT" is on datapath %" - PRIx64" port %"PRIu16, ETH_ADDR_ARGS(flow.dl_src), - ntohll(sw->datapath_id), in_port); + VLOG_DBG_RL(&rl, "%012llx: learned that "ETH_ADDR_FMT" is on " + "port %"PRIu16, sw->datapath_id, + ETH_ADDR_ARGS(flow.dl_src), in_port); + } + } + + if (eth_addr_is_reserved(flow.dl_src)) { + goto drop_it; + } + + if (!may_recv(sw, in_port, false)) { + /* STP prevents receiving anything on this port. */ + goto drop_it; + } + + if (sw->ml) { + uint16_t learned_port = mac_learning_lookup(sw->ml, flow.dl_dst); + if (may_send(sw, learned_port)) { + out_port = learned_port; } - out_port = mac_learning_lookup(sw->ml, flow.dl_dst); } if (in_port == out_port) { - /* The input and output port match. Set up a flow to drop packets. */ - queue_tx(sw, rconn, make_add_flow(&flow, ntohl(opi->buffer_id), - sw->max_idle, 0)); + /* Don't send out packets on their input ports. */ + goto drop_it; } else if (sw->max_idle >= 0 && (!sw->ml || out_port != OFPP_FLOOD)) { /* The output port is known, or we always flood everything, so add a * new flow. */ @@ -242,7 +450,7 @@ process_packet_in(struct lswitch *sw, struct rconn *rconn, } else { /* We don't know that MAC, or we don't set up flows. Send along the * packet without setting up a flow. */ - struct buffer *b; + struct ofpbuf *b; if (ntohl(opi->buffer_id) == UINT32_MAX) { b = make_unbuffered_packet_out(&pkt, in_port, out_port); } else { @@ -251,11 +459,230 @@ process_packet_in(struct lswitch *sw, struct rconn *rconn, } queue_tx(sw, rconn, b); } + return; + +drop_it: + if (sw->max_idle >= 0) { + /* Set up a flow to drop packets. */ + queue_tx(sw, rconn, make_add_flow(&flow, ntohl(opi->buffer_id), + sw->max_idle, 0)); + } else { + /* Just drop the packet, since we don't set up flows at all. + * XXX we should send a packet_out with no actions if buffer_id != + * UINT32_MAX, to avoid clogging the kernel buffers. */ + } + return; } static void -process_echo_request(struct lswitch *sw, struct rconn *rconn, - struct ofp_header *rq) +process_echo_request(struct lswitch *sw, struct rconn *rconn, void *rq_) { + struct ofp_header *rq = rq_; queue_tx(sw, rconn, make_echo_reply(rq)); } + +static void +process_port_status(struct lswitch *sw, struct rconn *rconn, void *ops_) +{ + struct ofp_port_status *ops = ops_; + process_phy_port(sw, rconn, &ops->desc); +} + +static void +process_phy_port(struct lswitch *sw, struct rconn *rconn, void *opp_) +{ + const struct ofp_phy_port *opp = opp_; + uint16_t port_no = ntohs(opp->port_no); + if (sw->capabilities & OFPC_STP && port_no < STP_MAX_PORTS) { + uint32_t config = ntohl(opp->config); + uint32_t state = ntohl(opp->state); + unsigned int *port_state = &sw->port_states[port_no]; + unsigned int new_port_state; + + if (!(config & (OFPPC_NO_STP | OFPPC_PORT_DOWN)) + && !(state & OFPPS_LINK_DOWN)) + { + switch (state & OFPPS_STP_MASK) { + case OFPPS_STP_LISTEN: + new_port_state = P_LISTENING; + break; + case OFPPS_STP_LEARN: + new_port_state = P_LEARNING; + break; + case OFPPS_STP_FORWARD: + new_port_state = P_FORWARDING; + break; + case OFPPS_STP_BLOCK: + new_port_state = P_BLOCKING; + break; + default: + new_port_state = P_DISABLED; + break; + } + } else { + new_port_state = P_FORWARDING; + } + if (*port_state != new_port_state) { + *port_state = new_port_state; + schedule_query(sw, 1000); + } + } +} + +static unsigned int +get_port_state(const struct lswitch *sw, uint16_t port_no) +{ + return (port_no >= STP_MAX_PORTS || !(sw->capabilities & OFPC_STP) + ? P_FORWARDING + : sw->port_states[port_no]); +} + +static bool +may_learn(const struct lswitch *sw, uint16_t port_no) +{ + return get_port_state(sw, port_no) & (P_LEARNING | P_FORWARDING); +} + +static bool +may_recv(const struct lswitch *sw, uint16_t port_no, bool any_actions) +{ + unsigned int state = get_port_state(sw, port_no); + return !(any_actions + ? state & (P_DISABLED | P_LISTENING | P_BLOCKING) + : state & (P_DISABLED | P_LISTENING | P_BLOCKING | P_LEARNING)); +} + +static bool +may_send(const struct lswitch *sw, uint16_t port_no) +{ + return get_port_state(sw, port_no) & P_FORWARDING; +} + +static void +process_flow_stats(struct lswitch *sw, struct rconn *rconn, + const struct ofp_flow_stats *ofs) +{ + const char *end = (char *) ofs + ntohs(ofs->length); + bool delete = false; + + /* Decide to delete the flow if it matches on an STP-disabled physical + * port. But don't delete it if the flow just drops all received packets, + * because that's a perfectly reasonable thing to do for disabled physical + * ports. */ + if (!(ofs->match.wildcards & htonl(OFPFW_IN_PORT))) { + if (!may_recv(sw, ntohs(ofs->match.in_port), + end > (char *) ofs->actions)) { + delete = true; + sw->n_no_recv++; + } + } + + /* Decide to delete the flow if it forwards to an STP-disabled physical + * port. */ + if (!delete) { + const struct ofp_action_header *a; + size_t len; + + for (a = ofs->actions; (char *) a < end; a += len / 8) { + uint16_t type; + + len = ntohs(a->len); + if (len > end - (char *) a) { + VLOG_DBG_RL(&rl, "%012llx: action exceeds available space " + "(%zu > %td)", + sw->datapath_id, len, end - (char *) a); + break; + } else if (len % 8) { + VLOG_DBG_RL(&rl, "%012llx: action length (%zu) not multiple " + "of 8 bytes", sw->datapath_id, len); + break; + } + + type = ntohs(a->type); + if (a->type == htons(OFPAT_OUTPUT)) { + struct ofp_action_output *oao = (struct ofp_action_output *) a; + if (!may_send(sw, ntohs(oao->port))) { + delete = true; + sw->n_no_send++; + break; + } + } + } + } + + /* Delete the flow. */ + if (delete) { + struct ofp_flow_mod *ofm; + struct ofpbuf *b; + + ofm = make_openflow(offsetof(struct ofp_flow_mod, actions), + OFPT_FLOW_MOD, &b); + ofm->match = ofs->match; + ofm->command = OFPFC_DELETE_STRICT; + rconn_send(rconn, b, NULL); + } +} + +static void +process_stats_reply(struct lswitch *sw, struct rconn *rconn, void *osr_) +{ + struct ofp_stats_reply *osr = osr_; + const uint8_t *body = osr->body; + const uint8_t *pos = body; + size_t body_len; + + if (sw->last_query == LLONG_MIN + || osr->type != htons(OFPST_FLOW) + || osr->header.xid != sw->query_xid) { + return; + } + body_len = (ntohs(osr->header.length) + - offsetof(struct ofp_stats_reply, body)); + for (;;) { + const struct ofp_flow_stats *fs; + ptrdiff_t bytes_left = body + body_len - pos; + size_t length; + + if (bytes_left < sizeof *fs) { + if (bytes_left != 0) { + VLOG_WARN_RL(&rl, "%012llx: %td leftover bytes in flow " + "stats reply", sw->datapath_id, bytes_left); + } + break; + } + + fs = (const void *) pos; + length = ntohs(fs->length); + if (length < sizeof *fs) { + VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu is shorter than " + "min %zu", sw->datapath_id, length, sizeof *fs); + break; + } else if (length > bytes_left) { + VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu but only %td " + "bytes left", sw->datapath_id, length, bytes_left); + break; + } else if ((length - sizeof *fs) % sizeof fs->actions[0]) { + VLOG_WARN_RL(&rl, "%012llx: flow stats length %zu has %zu bytes " + "left over in final action", sw->datapath_id, length, + (length - sizeof *fs) % sizeof fs->actions[0]); + break; + } + + sw->n_flows++; + process_flow_stats(sw, rconn, fs); + + pos += length; + } + if (!(osr->flags & htons(OFPSF_REPLY_MORE))) { + VLOG_DBG("%012llx: Deleted %d of %d received flows to " + "implement STP, %d because of no-recv, %d because of " + "no-send", sw->datapath_id, + sw->n_no_recv + sw->n_no_send, sw->n_flows, + sw->n_no_recv, sw->n_no_send); + sw->last_query = LLONG_MIN; + sw->last_reply = LLONG_MIN; + } else { + sw->last_reply = time_msec(); + } +} +