Make dumping large numbers of flows possible.
authorBen Pfaff <blp@cs.stanford.edu>
Tue, 29 Apr 2008 01:16:05 +0000 (18:16 -0700)
committerBen Pfaff <blp@nicira.com>
Tue, 29 Apr 2008 17:12:46 +0000 (10:12 -0700)
This changes the kernel switch implementation to use the kernel Netlink
"dump" interface to allow flow stats that don't fit in the socket buffer
to be dumped gradually as the caller drains the socket buffer.

One of the changes here is a bug fix for nla_unreserve.  Because Netlink
attributes' lengths are rounded up to a multiple of 4 bytes, reducing
the length of the payload by N bytes doesn't necessarily reduce the
length of the skb by N bytes.  Instead, we need to know the original
length and final length of the attribute.  This means that using 'len'
as a difference in bytes doesn't really make sense, so this changes
'len' to be the new length of the attribute payload and renames the
function to nla_shrink to (IMO) better reflect what it is now doing.

Since we have to release the RCU read lock between calls to the dump
function, we need table iterators that persist across RCU epochs.  One
way to do this would be to add new "iterator_save" and "iterator_restore"
functions, but this seemed like overkill since there would then be a
total of 5 iterator functions that have only one user (flow stats dumping).
Instead, this patch refactors table iteration into a single "iterate"
function that takes a callback.  This simplifies the table iteration code
significantly.

This change also modifies dpctl to understand the new format of flow
stats.

datapath/datapath.c
datapath/flow.h
datapath/forward.c
datapath/table-hash.c
datapath/table-linear.c
datapath/table.h
datapath/table_t.c
lib/dpif.c
utilities/dpctl.c

index a039590..7d2a44c 100644 (file)
@@ -78,78 +78,116 @@ static int dp_maint_func(void *data);
 static int send_port_status(struct net_bridge_port *p, uint8_t status);
 
 
-/* nla_unreserve - reduce amount of space reserved by nla_reserve  
+/* nla_shrink - reduce amount of space reserved by nla_reserve
  * @skb: socket buffer from which to recover room
  * @nla: netlink attribute to adjust
- * @len: amount by which to reduce attribute payload
+ * @len: new length of attribute payload
  *
  * Reduces amount of space reserved by a call to nla_reserve.
  *
  * No other attributes may be added between calling nla_reserve and this
  * function, since it will create a hole in the message.
  */
-void nla_unreserve(struct sk_buff *skb, struct nlattr *nla, int len)
+void nla_shrink(struct sk_buff *skb, struct nlattr *nla, int len)
 {
-       skb->tail -= len;
-       skb->len  -= len;
-
-       nla->nla_len -= len;
+       int delta = nla_total_size(len) - nla_total_size(nla_len(nla));
+       BUG_ON(delta > 0);
+       skb->tail += delta;
+       skb->len  += delta;
+       nla->nla_len = nla_attr_size(len);
 }
 
+/* Puts a set of openflow headers for a message of the given 'type' into 'skb'.
+ * If 'sender' is nonnull, then it is used as the message's destination.  'dp'
+ * must specify the datapath to use.
+ *
+ * '*max_openflow_len' receives the maximum number of bytes that are available
+ * for the embedded OpenFlow message.  The caller must call
+ * resize_openflow_skb() to set the actual size of the message to this number
+ * of bytes or less.
+ *
+ * Returns the openflow header if successful, otherwise (if 'skb' is too small)
+ * an error code. */
 static void *
-alloc_openflow_skb(struct datapath *dp, size_t openflow_len, uint8_t type,
-                  const struct sender *sender, struct sk_buff **pskb) 
+put_openflow_headers(struct datapath *dp, struct sk_buff *skb, uint8_t type,
+                    const struct sender *sender, int *max_openflow_len)
 {
-       size_t genl_len;
-       struct sk_buff *skb;
-       struct nlattr *attr;
        struct ofp_header *oh;
-
-       genl_len = nlmsg_total_size(GENL_HDRLEN + dp_genl_family.hdrsize);
-       genl_len += nla_total_size(sizeof(uint32_t)); /* DP_GENL_A_DP_IDX */
-       genl_len += nla_total_size(openflow_len);    /* DP_GENL_A_OPENFLOW */
-       skb = *pskb = genlmsg_new(genl_len, GFP_ATOMIC);
-       if (!skb) {
-               if (net_ratelimit())
-                       printk("alloc_openflow_skb: genlmsg_new failed\n");
-               return NULL;
-       }
+       struct nlattr *attr;
+       int openflow_len;
 
        /* Assemble the Generic Netlink wrapper. */
        if (!genlmsg_put(skb,
                         sender ? sender->pid : 0,
                         sender ? sender->seq : 0,
                         &dp_genl_family, 0, DP_GENL_C_OPENFLOW))
-               BUG();
+               return ERR_PTR(-ENOBUFS);
        if (nla_put_u32(skb, DP_GENL_A_DP_IDX, dp->dp_idx) < 0)
-               BUG();
+               return ERR_PTR(-ENOBUFS);
+       openflow_len = (skb_tailroom(skb) - NLA_HDRLEN) & ~(NLA_ALIGNTO - 1);
+       if (openflow_len < sizeof *oh)
+               return ERR_PTR(-ENOBUFS);
+       *max_openflow_len = openflow_len;
        attr = nla_reserve(skb, DP_GENL_A_OPENFLOW, openflow_len);
        BUG_ON(!attr);
-       nlmsg_end(skb, (struct nlmsghdr *) skb->data);
 
-       /* Fill in the header. */
+       /* Fill in the header.  The caller is responsible for the length. */
        oh = nla_data(attr);
        oh->version = OFP_VERSION;
        oh->type = type;
-       oh->length = htons(openflow_len);
        oh->xid = sender ? sender->xid : 0;
 
        return oh;
 }
 
+/* Resizes OpenFlow header 'oh', which must be at the tail end of 'skb', to new
+ * length 'new_length' (in bytes), adjusting pointers and size values as
+ * necessary. */
 static void
 resize_openflow_skb(struct sk_buff *skb,
                    struct ofp_header *oh, size_t new_length)
 {
-       struct nlattr *attr;
-
-       BUG_ON(new_length > ntohs(oh->length));
-       attr = ((void *) oh) - NLA_HDRLEN;
-       nla_unreserve(skb, attr, ntohs(oh->length) - new_length);
+       struct nlattr *attr = ((void *) oh) - NLA_HDRLEN;
+       nla_shrink(skb, attr, new_length);
        oh->length = htons(new_length);
        nlmsg_end(skb, (struct nlmsghdr *) skb->data);
 }
 
+/* Allocates a new skb to contain an OpenFlow message 'openflow_len' bytes in
+ * length.  Returns a null pointer if memory is unavailable, otherwise returns
+ * the OpenFlow header and stores a pointer to the skb in '*pskb'. 
+ *
+ * 'type' is the OpenFlow message type.  If 'sender' is nonnull, then it is
+ * used as the message's destination.  'dp' must specify the datapath to
+ * use.  */
+static void *
+alloc_openflow_skb(struct datapath *dp, size_t openflow_len, uint8_t type,
+                  const struct sender *sender, struct sk_buff **pskb) 
+{
+       struct ofp_header *oh;
+       size_t genl_len;
+       struct sk_buff *skb;
+       int max_openflow_len;
+
+       genl_len = nlmsg_total_size(GENL_HDRLEN + dp_genl_family.hdrsize);
+       genl_len += nla_total_size(sizeof(uint32_t)); /* DP_GENL_A_DP_IDX */
+       genl_len += nla_total_size(openflow_len);    /* DP_GENL_A_OPENFLOW */
+       skb = *pskb = genlmsg_new(genl_len, GFP_ATOMIC);
+       if (!skb) {
+               if (net_ratelimit())
+                       printk("alloc_openflow_skb: genlmsg_new failed\n");
+               return NULL;
+       }
+
+       oh = put_openflow_headers(dp, skb, type, sender, &max_openflow_len);
+       BUG_ON(!oh || IS_ERR(oh));
+       resize_openflow_skb(skb, oh, openflow_len);
+
+       return oh;
+}
+
+/* Sends 'skb' to 'sender' if it is nonnull, otherwise multicasts 'skb' to all
+ * listeners. */
 static int
 send_openflow_skb(struct sk_buff *skb, const struct sender *sender) 
 {
@@ -789,58 +827,6 @@ fill_flow_stats(struct ofp_flow_stats *ofs, struct sw_flow *flow,
        ofs->byte_count      = cpu_to_be64(flow->byte_count);
 }
 
-int
-dp_send_flow_stats(struct datapath *dp, const struct sender *sender,
-                  const struct ofp_match *match)
-{
-       struct sk_buff *skb;
-       struct ofp_flow_stats_reply *fsr;
-       size_t header_size, fudge, flow_size;
-       struct sw_flow_key match_key;
-       int table_idx, n_flows, max_flows;
-
-       header_size = offsetof(struct ofp_flow_stats_reply, flows);
-       fudge = 128;
-       flow_size = sizeof fsr->flows[0];
-       max_flows = (NLMSG_GOODSIZE - header_size - fudge) / flow_size;
-       fsr = alloc_openflow_skb(dp, header_size + max_flows * flow_size,
-                                OFPT_FLOW_STATS_REPLY, sender, &skb);
-       if (!fsr)
-               return -ENOMEM;
-
-       n_flows = 0;
-       flow_extract_match(&match_key, match);
-       for (table_idx = 0; table_idx < dp->chain->n_tables; table_idx++) {
-               struct sw_table *table = dp->chain->tables[table_idx];
-               struct swt_iterator iter;
-
-               if (n_flows >= max_flows) {
-                       break;
-               }
-
-               if (!table->iterator(table, &iter)) {
-                       if (net_ratelimit())
-                               printk("iterator failed for table %d\n",
-                                      table_idx);
-                       continue;
-               }
-
-               for (; iter.flow; table->iterator_next(&iter)) {
-                       if (flow_matches(&match_key, &iter.flow->key)) {
-                               fill_flow_stats(&fsr->flows[n_flows],
-                                               iter.flow, table_idx);
-                               if (++n_flows >= max_flows) {
-                                       break;
-                               }
-                       }
-               }
-               table->iterator_destroy(&iter);
-       }
-       resize_openflow_skb(skb, &fsr->header,
-                           header_size + flow_size * n_flows);
-       return send_openflow_skb(skb, sender);
-}
-
 static int 
 fill_port_stats_reply(struct datapath *dp, struct ofp_port_stats_reply *psr)
 {
@@ -1145,12 +1131,156 @@ static struct nla_policy dp_genl_openflow_policy[DP_GENL_A_MAX + 1] = {
        [DP_GENL_A_DP_IDX] = { .type = NLA_U32 },
 };
 
+struct flow_stats_cb_state {
+       int dp_idx;
+       int table_idx;
+       struct sw_table_position position;
+       struct ofp_flow_stats_request *rq;
+       int sent_terminator;
+
+       struct ofp_flow_stats *flows;
+       int n_flows, max_flows;
+};
+
+static int muster_callback(struct sw_flow *flow, void *private)
+{
+       struct flow_stats_cb_state *s = private;
+
+       fill_flow_stats(&s->flows[s->n_flows], flow, s->table_idx);
+       return ++s->n_flows >= s->max_flows;
+}
+
+int
+muster_flow_stats(struct datapath *dp, struct flow_stats_cb_state *s,
+                 const struct sender *sender, struct sk_buff *skb)
+{
+       struct ofp_flow_stats_reply *fsr;
+       size_t header_size, flow_size;
+       struct sw_flow_key match_key;
+       int max_openflow_len;
+       size_t size;
+
+       fsr = put_openflow_headers(dp, skb, OFPT_FLOW_STATS_REPLY, sender,
+                                  &max_openflow_len);
+       if (IS_ERR(fsr))
+               return PTR_ERR(fsr);
+       resize_openflow_skb(skb, &fsr->header, max_openflow_len);
+
+       header_size = offsetof(struct ofp_flow_stats_reply, flows);
+       flow_size = sizeof fsr->flows[0];
+       s->max_flows = (max_openflow_len - header_size) / flow_size;
+       if (s->max_flows <= 0)
+               return -ENOMEM;
+       s->flows = fsr->flows;
+
+       flow_extract_match(&match_key, &s->rq->match);
+       s->n_flows = 0;
+       while (s->table_idx < dp->chain->n_tables
+              && (s->rq->table_id == 0xff || s->rq->table_id == s->table_idx))
+       {
+               struct sw_table *table = dp->chain->tables[s->table_idx];
+
+               if (table->iterate(table, &match_key, &s->position,
+                                  muster_callback, s))
+                       break;
+
+               s->table_idx++;
+               memset(&s->position, 0, sizeof s->position);
+       }
+       if (!s->n_flows) {
+               /* Signal dump completion. */
+               if (s->sent_terminator) {
+                       return 0;
+               }
+               s->sent_terminator = 1;
+       }
+       size = header_size + flow_size * s->n_flows;
+       resize_openflow_skb(skb, &fsr->header, size);
+       return skb->len;
+}
+
+static int
+dp_genl_openflow_dumpit(struct sk_buff *skb, struct netlink_callback *cb)
+{
+       struct datapath *dp;
+       struct sender sender;
+       struct flow_stats_cb_state *state;
+       int err;
+
+       if (!cb->args[0]) {
+               struct nlattr *attrs[DP_GENL_A_MAX + 1];
+               struct ofp_flow_stats_request *rq;
+               struct nlattr *va;
+
+               err = nlmsg_parse(cb->nlh, GENL_HDRLEN, attrs, DP_GENL_A_MAX,
+                                 dp_genl_openflow_policy);
+               if (err < 0)
+                       return err;
+
+               if (!attrs[DP_GENL_A_DP_IDX])
+                       return -EINVAL;
+
+               va = attrs[DP_GENL_A_OPENFLOW];
+               if (!va || nla_len(va) != sizeof *state->rq)
+                       return -EINVAL;
+
+               rq = nla_data(va);
+               if (rq->header.version != OFP_VERSION
+                   || rq->header.type != OFPT_FLOW_STATS_REQUEST
+                   || ntohs(rq->header.length) != sizeof *rq)
+                       return -EINVAL;
+
+               state = kmalloc(sizeof *state, GFP_KERNEL);
+               if (!state)
+                       return -ENOMEM;
+               state->dp_idx = nla_get_u32(attrs[DP_GENL_A_DP_IDX]);
+               state->table_idx = rq->table_id == 0xff ? 0 : rq->table_id;
+               memset(&state->position, 0, sizeof state->position);
+               state->rq = rq;
+               state->sent_terminator = 0;
+
+               cb->args[0] = (long) state;
+       } else {
+               state = (struct flow_stats_cb_state *) cb->args[0];
+       }
+
+       if (state->rq->type != OFPFS_INDIV) {
+               return -ENOTSUPP;
+       }
+
+       rcu_read_lock();
+       dp = dp_get(state->dp_idx);
+       if (!dp) {
+               err = -ENOENT;
+               goto out;
+       }
+
+       sender.xid = state->rq->header.xid;
+       sender.pid = NETLINK_CB(cb->skb).pid;
+       sender.seq = cb->nlh->nlmsg_seq;
+       err = muster_flow_stats(dp, state, &sender, skb);
+
+out:
+       rcu_read_unlock();
+       return err;
+}
+
+static int
+dp_genl_openflow_done(struct netlink_callback *cb)
+{
+       struct flow_stats_cb_state *state;
+       state = (struct flow_stats_cb_state *) cb->args[0];
+       kfree(state);
+       return 0;
+}
+
 static struct genl_ops dp_genl_ops_openflow = {
        .cmd = DP_GENL_C_OPENFLOW,
        .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
        .policy = dp_genl_openflow_policy,
        .doit = dp_genl_openflow,
-       .dumpit = NULL,
+       .dumpit = dp_genl_openflow_dumpit,
+       .done = dp_genl_openflow_done,
 };
 
 static struct nla_policy dp_genl_benchmark_policy[DP_GENL_A_MAX + 1] = {
index 3a34a9e..5d1e471 100644 (file)
@@ -79,6 +79,8 @@ struct sw_flow {
                struct list_head node;
                struct hlist_node hnode;
        } u;
+       struct list_head iter_node;
+       unsigned long serial;
 
        spinlock_t lock;         /* Lock this entry...mostly for stat updates */
        unsigned long init_time; /* When the flow was created (in jiffies). */
index aa40000..bfc5815 100644 (file)
@@ -438,19 +438,6 @@ recv_flow(struct sw_chain *chain, const struct sender *sender, const void *msg)
        }
 }
 
-static int
-recv_flow_stats_request(struct sw_chain *chain, const struct sender *sender,
-                        const void *msg)
-{
-       const struct ofp_flow_stats_request *fsr = msg;
-       if (fsr->type == OFPFS_INDIV) {
-               return dp_send_flow_stats(chain->dp, sender, &fsr->match); 
-       } else {
-               /* FIXME */
-               return -ENOTSUPP;
-       }
-}
-
 static int
 recv_port_stats_request(struct sw_chain *chain, const struct sender *sender,
                         const void *msg)
@@ -503,10 +490,6 @@ fwd_control_input(struct sw_chain *chain, const struct sender *sender,
                        sizeof (struct ofp_port_mod),
                        recv_port_mod,
                },
-               [OFPT_FLOW_STATS_REQUEST] = {
-                       sizeof (struct ofp_flow_stats_request),
-                       recv_flow_stats_request,
-               },
                [OFPT_PORT_STATS_REQUEST] = {
                        sizeof (struct ofp_port_stats_request),
                        recv_port_stats_request,
index 184aa30..c87048f 100644 (file)
@@ -144,58 +144,37 @@ static void table_hash_destroy(struct sw_table *swt)
        kfree(th);
 }
 
-struct swt_iterator_hash {
-       struct sw_table_hash *th;
-       unsigned int bucket_i;
-};
-
-static struct sw_flow *next_flow(struct swt_iterator_hash *ih)
+static int table_hash_iterate(struct sw_table *swt,
+                             const struct sw_flow_key *key,
+                             struct sw_table_position *position,
+                             int (*callback)(struct sw_flow *, void *private),
+                             void *private) 
 {
-       for (;ih->bucket_i <= ih->th->bucket_mask; ih->bucket_i++) {
-               struct sw_flow *f = ih->th->buckets[ih->bucket_i];
-               if (f != NULL)
-                       return f;
-       }
-
-       return NULL;
-}
-
-static int table_hash_iterator(struct sw_table *swt,
-                               struct swt_iterator *swt_iter)
-{
-       struct swt_iterator_hash *ih;
-
-       swt_iter->private = ih = kmalloc(sizeof *ih, GFP_ATOMIC);
+       struct sw_table_hash *th = (struct sw_table_hash *) swt;
 
-       if (ih == NULL)
+       if (position->private[0] > th->bucket_mask)
                return 0;
 
-       ih->th = (struct sw_table_hash *) swt;
-
-       ih->bucket_i = 0;
-       swt_iter->flow = next_flow(ih);
-
-       return 1;
-}
-
-static void table_hash_next(struct swt_iterator *swt_iter)
-{
-       struct swt_iterator_hash *ih;
-
-       if (swt_iter->flow == NULL)
-               return;
-
-       ih = (struct swt_iterator_hash *) swt_iter->private;
-
-       ih->bucket_i++;
-       swt_iter->flow = next_flow(ih);
-}
-
-static void table_hash_iterator_destroy(struct swt_iterator *swt_iter)
-{
-       kfree(swt_iter->private);
+       if (key->wildcards == 0) {
+               struct sw_flow *flow = table_hash_lookup(swt, key);
+               position->private[0] = -1;
+               return flow ? callback(flow, private) : 0;
+       } else {
+               int i;
+
+               for (i = position->private[0]; i <= th->bucket_mask; i++) {
+                       struct sw_flow *flow = th->buckets[i];
+                       if (flow && flow_matches(key, &flow->key)) {
+                               int error = callback(flow, private);
+                               if (error) {
+                                       position->private[0] = i + 1;
+                                       return error;
+                               }
+                       }
+               }
+               return 0;
+       }
 }
-
 static void table_hash_stats(struct sw_table *swt,
                                 struct sw_table_stats *stats) 
 {
@@ -230,9 +209,7 @@ struct sw_table *table_hash_create(unsigned int polynomial,
        swt->delete = table_hash_delete;
        swt->timeout = table_hash_timeout;
        swt->destroy = table_hash_destroy;
-       swt->iterator = table_hash_iterator;
-       swt->iterator_next = table_hash_next;
-       swt->iterator_destroy = table_hash_iterator_destroy;
+       swt->iterate = table_hash_iterate;
        swt->stats = table_hash_stats;
 
        spin_lock_init(&th->lock);
@@ -295,79 +272,25 @@ static void table_hash2_destroy(struct sw_table *swt)
        kfree(t2);
 }
 
-struct swt_iterator_hash2 {
-       struct sw_table_hash2 *th2;
-       struct swt_iterator ih;
-       uint8_t table_i;
-};
-
-static int table_hash2_iterator(struct sw_table *swt,
-                               struct swt_iterator *swt_iter)
-{
-       struct swt_iterator_hash2 *ih2;
-
-       swt_iter->private = ih2 = kmalloc(sizeof *ih2, GFP_ATOMIC);
-       if (ih2 == NULL)
-               return 0;
-
-       ih2->th2 = (struct sw_table_hash2 *) swt;
-       if (!table_hash_iterator(ih2->th2->subtable[0], &ih2->ih)) {
-               kfree(ih2);
-               return 0;
-       }
-
-       if (ih2->ih.flow != NULL) {
-               swt_iter->flow = ih2->ih.flow;
-               ih2->table_i = 0;
-       } else {
-               table_hash_iterator_destroy(&ih2->ih);
-               ih2->table_i = 1;
-               if (!table_hash_iterator(ih2->th2->subtable[1], &ih2->ih)) {
-                       kfree(ih2);
-                       return 0;
-               }
-               swt_iter->flow = ih2->ih.flow;
-       }
-
-       return 1;
-}
-
-static void table_hash2_next(struct swt_iterator *swt_iter) 
+static int table_hash2_iterate(struct sw_table *swt,
+                              const struct sw_flow_key *key,
+                              struct sw_table_position *position,
+                              int (*callback)(struct sw_flow *, void *),
+                              void *private)
 {
-       struct swt_iterator_hash2 *ih2;
-
-       if (swt_iter->flow == NULL)
-               return;
-
-       ih2 = (struct swt_iterator_hash2 *) swt_iter->private;
-       table_hash_next(&ih2->ih);
+       struct sw_table_hash2 *t2 = (struct sw_table_hash2 *) swt;
+       int i;
 
-       if (ih2->ih.flow != NULL) {
-               swt_iter->flow = ih2->ih.flow;
-       } else {
-               if (ih2->table_i == 0) {
-                       table_hash_iterator_destroy(&ih2->ih);
-                       ih2->table_i = 1;
-                       if (!table_hash_iterator(ih2->th2->subtable[1], &ih2->ih)) {
-                               ih2->ih.private = NULL;
-                               swt_iter->flow = NULL;
-                       } else {
-                               swt_iter->flow = ih2->ih.flow;
-                       }
-               } else {
-                       swt_iter->flow = NULL;
+       for (i = position->private[1]; i < 2; i++) {
+               int error = table_hash_iterate(t2->subtable[i], key, position,
+                                              callback, private);
+               if (error) {
+                       return error;
                }
+               position->private[0] = 0;
+               position->private[1]++;
        }
-}
-
-static void table_hash2_iterator_destroy(struct swt_iterator *swt_iter)
-{
-       struct swt_iterator_hash2 *ih2;
-
-       ih2 = (struct swt_iterator_hash2 *) swt_iter->private;
-       if (ih2->ih.private != NULL)
-               table_hash_iterator_destroy(&ih2->ih);
-       kfree(ih2);
+       return 0;
 }
 
 static void table_hash2_stats(struct sw_table *swt,
@@ -409,12 +332,9 @@ struct sw_table *table_hash2_create(unsigned int poly0, unsigned int buckets0,
        swt->delete = table_hash2_delete;
        swt->timeout = table_hash2_timeout;
        swt->destroy = table_hash2_destroy;
+       swt->iterate = table_hash2_iterate;
        swt->stats = table_hash2_stats;
 
-       swt->iterator = table_hash2_iterator;
-       swt->iterator_next = table_hash2_next;
-       swt->iterator_destroy = table_hash2_iterator_destroy;
-
        return swt;
 
 out_free_subtable0:
index a82b63e..d14d032 100644 (file)
@@ -19,6 +19,8 @@ struct sw_table_linear {
        unsigned int max_flows;
        atomic_t n_flows;
        struct list_head flows;
+       struct list_head iter_flows;
+       unsigned long int next_serial;
 };
 
 static struct sw_flow *table_linear_lookup(struct sw_table *swt,
@@ -50,7 +52,9 @@ static int table_linear_insert(struct sw_table *swt, struct sw_flow *flow)
                                && f->key.wildcards == flow->key.wildcards
                                && flow_matches(&f->key, &flow->key)
                                && flow_del(f)) {
+                       flow->serial = f->serial;
                        list_replace_rcu(&f->u.node, &flow->u.node);
+                       list_replace_rcu(&f->iter_node, &flow->iter_node);
                        spin_unlock_irqrestore(&tl->lock, flags);
                        flow_deferred_free(f);
                        return 1;
@@ -69,6 +73,7 @@ static int table_linear_insert(struct sw_table *swt, struct sw_flow *flow)
 
        /* Insert the entry immediately in front of where we're pointing. */
        list_add_tail_rcu(&flow->u.node, &f->u.node);
+       list_add_rcu(&flow->iter_node, &tl->iter_flows);
        spin_unlock_irqrestore(&tl->lock, flags);
        return 1;
 }
@@ -77,6 +82,7 @@ static int do_delete(struct sw_table *swt, struct sw_flow *flow)
 {
        if (flow_del(flow)) {
                list_del_rcu(&flow->u.node);
+               list_del_rcu(&flow->iter_node);
                flow_deferred_free(flow);
                return 1;
        }
@@ -132,44 +138,29 @@ static void table_linear_destroy(struct sw_table *swt)
        kfree(tl);
 }
 
-/* Linear table's private data is just a pointer to the table */
-
-static int table_linear_iterator(struct sw_table *swt,
-                                struct swt_iterator *swt_iter) 
+static int table_linear_iterate(struct sw_table *swt,
+                               const struct sw_flow_key *key,
+                               struct sw_table_position *position,
+                               int (*callback)(struct sw_flow *, void *),
+                               void *private)
 {
        struct sw_table_linear *tl = (struct sw_table_linear *) swt;
-
-       swt_iter->private = tl;
-
-       if (atomic_read(&tl->n_flows) == 0)
-               swt_iter->flow = NULL;
-       else
-               swt_iter->flow = list_entry(tl->flows.next,
-                               struct sw_flow, u.node);
-
-       return 1;
-}
-
-static void table_linear_next(struct swt_iterator *swt_iter)
-{
-       struct sw_table_linear *tl;
-       struct list_head *next;
-
-       if (swt_iter->flow == NULL)
-               return;
-
-       tl = (struct sw_table_linear *) swt_iter->private;
-
-       next = swt_iter->flow->u.node.next;
-       if (next == &tl->flows)
-               swt_iter->flow = NULL;
-       else
-               swt_iter->flow = list_entry(next, struct sw_flow, u.node);
+       struct sw_flow *flow;
+       unsigned long start;
+
+       start = ~position->private[0];
+       list_for_each_entry_rcu (flow, &tl->iter_flows, iter_node) {
+               if (flow->serial <= start && flow_matches(key, &flow->key)) {
+                       int error = callback(flow, private);
+                       if (error) {
+                               position->private[0] = ~(flow->serial - 1);
+                               return error;
+                       }
+               }
+       }
+       return 0;
 }
 
-static void table_linear_iterator_destroy(struct swt_iterator *swt_iter)
-{}
-
 static void table_linear_stats(struct sw_table *swt,
                                struct sw_table_stats *stats)
 {
@@ -195,16 +186,15 @@ struct sw_table *table_linear_create(unsigned int max_flows)
        swt->delete = table_linear_delete;
        swt->timeout = table_linear_timeout;
        swt->destroy = table_linear_destroy;
+       swt->iterate = table_linear_iterate;
        swt->stats = table_linear_stats;
 
-               swt->iterator = table_linear_iterator;
-       swt->iterator_next = table_linear_next;
-       swt->iterator_destroy = table_linear_iterator_destroy;
-
        tl->max_flows = max_flows;
        atomic_set(&tl->n_flows, 0);
        INIT_LIST_HEAD(&tl->flows);
+       INIT_LIST_HEAD(&tl->iter_flows);
        spin_lock_init(&tl->lock);
+       tl->next_serial = 0;
 
        return swt;
 }
index dcb72b9..34028b4 100644 (file)
@@ -8,12 +8,6 @@ struct sw_flow;
 struct sw_flow_key;
 struct datapath;
 
-/* Iterator through the flows stored in a table. */
-struct swt_iterator {
-       struct sw_flow *flow;   /* Current flow, for use by client. */
-       void *private;
-};
-
 /* Table statistics. */
 struct sw_table_stats {
        const char *name;       /* Human-readable name. */
@@ -21,6 +15,14 @@ struct sw_table_stats {
        unsigned long int max_flows; /* Flow capacity. */
 };
 
+/* Position within an iteration of a sw_table.
+ *
+ * The contents are private to the table implementation, except that a position
+ * initialized to all-zero-bits represents the start of a table. */
+struct sw_table_position {
+       unsigned long private[4];
+};
+
 /* A single table of flows.
  *
  * All functions, except destroy, must be called holding the
@@ -55,9 +57,23 @@ struct sw_table {
        /* Destroys 'table', which must not have any users. */
        void (*destroy)(struct sw_table *table);
 
-       int (*iterator)(struct sw_table *, struct swt_iterator *);
-       void (*iterator_next)(struct swt_iterator *);
-       void (*iterator_destroy)(struct swt_iterator *);
+       /* Iterates through the flow entries in 'table', passing each one
+        * matches 'key' to 'callback'.  The callback function should return 0
+        * to continue iteration or a nonzero error code to stop.  The iterator
+        * function returns either 0 if the table iteration completed or the
+        * value returned by the callback function otherwise.
+        *
+        * The iteration starts at 'position', which may be initialized to
+        * all-zero-bits to iterate from the beginning of the table.  If the
+        * iteration terminates due to an error from the callback function,
+        * 'position' is updated to a value that can be passed back to the
+        * iterator function to resume iteration later with the following
+        * flow. */
+       int (*iterate)(struct sw_table *table,
+                      const struct sw_flow_key *key,
+                      struct sw_table_position *position,
+                      int (*callback)(struct sw_flow *flow, void *private),
+                      void *private);
 
        /* Dumps statistics for 'table' into 'stats'. */
        void (*stats)(struct sw_table *table, struct sw_table_stats *stats);
index 42687cf..cb017b8 100644 (file)
@@ -279,6 +279,31 @@ check_no_lookup(struct sw_table *swt, struct list_head *keys)
 }
 
 
+struct check_iteration_state
+{
+       int n_found;
+       struct list_head *to_find;
+       struct list_head *found;
+};
+
+static int
+check_iteration_callback(struct sw_flow *flow, void *private) 
+{
+       struct check_iteration_state *s = private;
+       struct flow_key_entry *entry;
+
+       entry = find_flow(s->to_find, flow);
+       if (entry == NULL) {
+               unit_fail("UNKNOWN ITERATOR FLOW %p", flow);
+               rcu_read_unlock();
+               return 1;
+       }
+       s->n_found++;
+       list_del(&entry->node);
+       list_add(&entry->node, s->found);
+       return 0;
+}
+
 /*
  * Compares an iterator's view of the 'swt' table to the list of
  * flow_key_entrys in 'to_find'.  flow_key_entrys that are matched are removed
@@ -293,36 +318,24 @@ check_no_lookup(struct sw_table *swt, struct list_head *keys)
 static int
 check_iteration(struct sw_table *swt, struct list_head *to_find, struct list_head *found)
 {
-       struct swt_iterator iter;
-       struct flow_key_entry *entry;
-       int n_found = 0;
+       struct sw_flow_key key;
+       struct sw_table_position position;
+       struct check_iteration_state state;
 
-       rcu_read_lock();
-       if (!swt->iterator(swt, &iter)) {
-               rcu_read_unlock();
-               unit_fail("Could not initialize iterator");
-               return -1;
-       }
+       memset(&key, 0, sizeof key);
+       key.wildcards = -1;
 
-       while (iter.flow != NULL) {
-               entry = find_flow(to_find, iter.flow);
-               if (entry == NULL) {
-                       unit_fail("UNKNOWN ITERATOR FLOW %p",
-                                 iter.flow);
-                       swt->iterator_destroy(&iter);
-                       rcu_read_unlock();
-                       return -1;
-               }
-               n_found++;
-               list_del(&entry->node);
-               list_add(&entry->node, found);
-               swt->iterator_next(&iter);
-       }
+       memset(&position, 0, sizeof position);
 
-       swt->iterator_destroy(&iter);
+       state.n_found = 0;
+       state.to_find = to_find;
+       state.found = found;
+
+       rcu_read_lock();
+       swt->iterate(swt, &key, &position, check_iteration_callback, &state);
        rcu_read_unlock();
 
-       return n_found;
+       return state.n_found;
 }
 
 /*
index 50d34f6..3426899 100644 (file)
@@ -192,6 +192,8 @@ error:
 int
 dpif_send_openflow(struct dpif *dp, struct buffer *buffer, bool wait) 
 {
+    struct ofp_header *oh;
+    unsigned int dump_flag;
     struct buffer hdr;
     struct nlattr *nla;
     uint32_t fixed_buffer[64 / 4];
@@ -200,9 +202,14 @@ dpif_send_openflow(struct dpif *dp, struct buffer *buffer, bool wait)
     int n_iov;
     int retval;
 
+    /* The reply to OFPT_FLOW_STATS_REQUEST may be multiple segments long, so
+     * we need to specify NLM_F_DUMP in the request. */
+    oh = buffer_at_assert(buffer, 0, sizeof *oh);
+    dump_flag = oh->type == OFPT_FLOW_STATS_REQUEST ? NLM_F_DUMP : 0;
+
     buffer_use(&hdr, fixed_buffer, sizeof fixed_buffer);
     nl_msg_put_genlmsghdr(&hdr, dp->sock, 32, openflow_family,
-                          NLM_F_REQUEST, DP_GENL_C_OPENFLOW, 1);
+                          NLM_F_REQUEST | dump_flag, DP_GENL_C_OPENFLOW, 1);
     nl_msg_put_u32(&hdr, DP_GENL_A_DP_IDX, dp->dp_idx);
     nla = buffer_put_uninit(&hdr, sizeof *nla);
     nla->nla_len = sizeof *nla + buffer->size;
index fc1638d..97feba2 100644 (file)
@@ -562,8 +562,10 @@ str_to_flow(char *string, struct ofp_match *match, struct ofp_action *action,
 static void do_dump_flows(int argc, char *argv[])
 {
     struct vconn *vconn;
-    struct buffer *request, *reply;
+    struct buffer *request;
     struct ofp_flow_stats_request *fsr;
+    uint32_t send_xid;
+    bool done = false;
 
     run(vconn_open_block(argv[1], &vconn), "connecting to %s", argv[1]);
     fsr = alloc_openflow_buffer(sizeof *fsr, OFPT_FLOW_STATS_REQUEST, &request);
@@ -571,8 +573,32 @@ static void do_dump_flows(int argc, char *argv[])
             NULL);
     fsr->type = OFPFS_INDIV;
     fsr->pad = 0;
-    reply = transact_openflow(vconn, request);
-    ofp_print(stdout, reply->data, reply->size, 1);
+
+    send_xid = ((struct ofp_header *) request->data)->xid;
+    send_openflow_buffer(vconn, request);
+    while (!done) {
+        uint32_t recv_xid;
+        struct buffer *reply;
+
+        run(vconn_recv_block(vconn, &reply), "OpenFlow packet receive failed");
+        recv_xid = ((struct ofp_header *) reply->data)->xid;
+        if (send_xid == recv_xid) {
+            struct ofp_flow_stats_reply *fsr;
+
+            ofp_print(stdout, reply->data, reply->size, 1);
+
+            fsr = buffer_at(reply, 0, sizeof *fsr);
+            done = (!fsr
+                    || fsr->header.version != OFP_VERSION
+                    || fsr->header.type != OFPT_FLOW_STATS_REPLY
+                    || (ntohs(fsr->header.length)
+                        < sizeof fsr->header + sizeof *fsr->flows));
+        } else {
+            VLOG_DBG("received reply with xid %08"PRIx32" "
+                     "!= expected %08"PRIx32, recv_xid, send_xid);
+        }
+        buffer_delete(reply);
+    }
     vconn_close(vconn);
 }