-/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
unsigned int avg_n_flows;
/* Following fields are accessed and modified by different threads. */
- atomic_llong max_idle; /* Maximum datapath flow idle time. */
atomic_uint flow_limit; /* Datapath flow hard limit. */
+
+ /* n_flows_mutex prevents multiple threads updating these concurrently. */
+ atomic_uint64_t n_flows; /* Number of flows in the datapath. */
+ atomic_llong n_flows_timestamp; /* Last time n_flows was updated. */
+ struct ovs_mutex n_flows_mutex;
};
enum upcall_type {
struct ofproto_dpif *ofproto;
struct flow flow;
- enum odp_key_fitness key_fitness;
const struct nlattr *key;
size_t key_len;
enum dpif_upcall_type upcall_type;
struct odputil_keybuf mask_buf;
struct xlate_out xout;
+
+ bool put;
};
static void upcall_destroy(struct upcall *);
static void *udpif_dispatcher(void *);
static void *udpif_upcall_handler(void *);
static void *udpif_revalidator(void *);
-static uint64_t udpif_get_n_flows(const struct udpif *);
+static uint64_t udpif_get_n_flows(struct udpif *);
static void revalidate_udumps(struct revalidator *, struct list *udumps);
static void revalidator_sweep(struct revalidator *);
+static void revalidator_purge(struct revalidator *);
static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
const char *argv[], void *aux);
static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
const char *argv[], void *aux);
static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
const char *argv[], void *aux);
+static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux);
static void ukey_delete(struct revalidator *, struct udpif_key *);
static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
upcall_unixctl_disable_megaflows, NULL);
unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
upcall_unixctl_enable_megaflows, NULL);
+ unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
+ upcall_unixctl_set_flow_limit, NULL);
ovsthread_once_done(&once);
}
udpif->dpif = dpif;
udpif->backer = backer;
- atomic_init(&udpif->max_idle, 5000);
atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
udpif->secret = random_uint32();
udpif->reval_seq = seq_create();
udpif->dump_seq = seq_create();
latch_init(&udpif->exit_latch);
list_push_back(&all_udpifs, &udpif->list_node);
+ atomic_init(&udpif->n_flows, 0);
+ atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
+ ovs_mutex_init(&udpif->n_flows_mutex);
return udpif;
}
udpif_destroy(struct udpif *udpif)
{
udpif_set_threads(udpif, 0, 0);
- udpif_flush();
+ udpif_flush(udpif);
list_remove(&udpif->list_node);
latch_destroy(&udpif->exit_latch);
seq_destroy(udpif->reval_seq);
seq_destroy(udpif->dump_seq);
- atomic_destroy(&udpif->max_idle);
- atomic_destroy(&udpif->flow_limit);
+ ovs_mutex_destroy(&udpif->n_flows_mutex);
free(udpif);
}
for (i = 0; i < udpif->n_revalidators; i++) {
struct revalidator *revalidator = &udpif->revalidators[i];
struct udpif_flow_dump *udump, *next_udump;
- struct udpif_key *ukey, *next_ukey;
LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
&revalidator->udumps) {
free(udump);
}
- HMAP_FOR_EACH_SAFE (ukey, next_ukey, hmap_node,
- &revalidator->ukeys) {
- ukey_delete(revalidator, ukey);
- }
+ /* Delete ukeys, and delete all flows from the datapath to prevent
+ * double-counting stats. */
+ revalidator_purge(revalidator);
hmap_destroy(&revalidator->ukeys);
ovs_mutex_destroy(&revalidator->mutex);
}
}
+/* Waits for all ongoing upcall translations to complete. This ensures that
+ * there are no transient references to any removed ofprotos (or other
+ * objects). In particular, this should be called after an ofproto is removed
+ * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
+void
+udpif_synchronize(struct udpif *udpif)
+{
+ /* This is stronger than necessary. It would be sufficient to ensure
+ * (somehow) that each handler and revalidator thread had passed through
+ * its main loop once. */
+ size_t n_handlers = udpif->n_handlers;
+ size_t n_revalidators = udpif->n_revalidators;
+ udpif_set_threads(udpif, 0, 0);
+ udpif_set_threads(udpif, n_handlers, n_revalidators);
+}
+
/* Notifies 'udpif' that something changed which may render previous
* xlate_actions() results invalid. */
void
}
}
-/* Removes all flows from all datapaths. */
+/* Remove flows from a single datapath. */
void
-udpif_flush(void)
+udpif_flush(struct udpif *udpif)
+{
+ size_t n_handlers, n_revalidators;
+
+ n_handlers = udpif->n_handlers;
+ n_revalidators = udpif->n_revalidators;
+
+ udpif_set_threads(udpif, 0, 0);
+ dpif_flow_flush(udpif->dpif);
+ udpif_set_threads(udpif, n_handlers, n_revalidators);
+}
+
+/* Removes all flows from all datapaths. */
+static void
+udpif_flush_all_datapaths(void)
{
struct udpif *udpif;
LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
- dpif_flow_flush(udpif->dpif);
+ udpif_flush(udpif);
}
}
+
\f
/* Destroys and deallocates 'upcall'. */
static void
}
static uint64_t
-udpif_get_n_flows(const struct udpif *udpif)
+udpif_get_n_flows(struct udpif *udpif)
{
- struct dpif_dp_stats stats;
-
- dpif_get_dp_stats(udpif->dpif, &stats);
- return stats.n_flows;
+ long long int time, now;
+ uint64_t flow_count;
+
+ now = time_msec();
+ atomic_read(&udpif->n_flows_timestamp, &time);
+ if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
+ struct dpif_dp_stats stats;
+
+ atomic_store(&udpif->n_flows_timestamp, now);
+ dpif_get_dp_stats(udpif->dpif, &stats);
+ flow_count = stats.n_flows;
+ atomic_store(&udpif->n_flows, flow_count);
+ ovs_mutex_unlock(&udpif->n_flows_mutex);
+ } else {
+ atomic_read(&udpif->n_flows, &flow_count);
+ }
+ return flow_count;
}
/* The dispatcher thread is responsible for receiving upcalls from the kernel,
struct dpif_flow_dump dump;
size_t key_len, mask_len;
unsigned int flow_limit;
- long long int max_idle;
bool need_revalidate;
uint64_t reval_seq;
size_t n_flows, i;
+ int error;
+ void *state = NULL;
reval_seq = seq_read(udpif->reval_seq);
need_revalidate = udpif->last_reval_seq != reval_seq;
udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
- atomic_read(&udpif->flow_limit, &flow_limit);
- if (n_flows < flow_limit / 8) {
- max_idle = 5000;
- } else if (n_flows < flow_limit / 4) {
- max_idle = 2000;
- } else if (n_flows < flow_limit / 2) {
- max_idle = 1000;
- } else {
- max_idle = 500;
- }
- atomic_store(&udpif->max_idle, max_idle);
-
start_time = time_msec();
- dpif_flow_dump_start(&dump, udpif->dpif);
- while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
- NULL, NULL, &stats)
+ error = dpif_flow_dump_start(&dump, udpif->dpif);
+ if (error) {
+ VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error));
+ goto skip;
+ }
+ dpif_flow_dump_state_init(udpif->dpif, &state);
+ while (dpif_flow_dump_next(&dump, state, &key, &key_len,
+ &mask, &mask_len, NULL, NULL, &stats)
&& !latch_is_set(&udpif->exit_latch)) {
struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
struct revalidator *revalidator;
xpthread_cond_signal(&revalidator->wake_cond);
ovs_mutex_unlock(&revalidator->mutex);
}
+ dpif_flow_dump_state_uninit(udpif->dpif, state);
dpif_flow_dump_done(&dump);
/* Let all the revalidators finish and garbage collect. */
ovs_mutex_unlock(&revalidator->mutex);
}
- duration = time_msec() - start_time;
+ duration = MAX(time_msec() - start_time, 1);
udpif->dump_duration = duration;
+ atomic_read(&udpif->flow_limit, &flow_limit);
if (duration > 2000) {
flow_limit /= duration / 1000;
} else if (duration > 1300) {
duration);
}
- poll_timer_wait_until(start_time + MIN(max_idle, 500));
+skip:
+ poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
seq_wait(udpif->reval_seq, udpif->last_reval_seq);
latch_wait(&udpif->exit_latch);
poll_block();
}
memset(&cookie, 0, sizeof cookie);
memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
- if (userdata_len == sizeof cookie.sflow
+ if (userdata_len == MAX(8, sizeof cookie.sflow)
&& cookie.type == USER_ACTION_COOKIE_SFLOW) {
return SFLOW_UPCALL;
- } else if (userdata_len == sizeof cookie.slow_path
+ } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
&& cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
return MISS_UPCALL;
- } else if (userdata_len == sizeof cookie.flow_sample
+ } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
&& cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
return FLOW_SAMPLE_UPCALL;
- } else if (userdata_len == sizeof cookie.ipfix
+ } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
&& cookie.type == USER_ACTION_COOKIE_IPFIX) {
return IPFIX_UPCALL;
} else {
int error;
error = xlate_receive(udpif->backer, packet, dupcall->key,
- dupcall->key_len, &flow, &miss->key_fitness,
+ dupcall->key_len, &flow,
&ofproto, &ipfix, &sflow, NULL, &odp_in_port);
if (error) {
if (error == ENODEV) {
type = classify_upcall(upcall);
if (type == MISS_UPCALL) {
uint32_t hash;
+ struct pkt_metadata md;
- flow_extract(packet, flow.skb_priority, flow.pkt_mark,
- &flow.tunnel, &flow.in_port, &miss->flow);
+ pkt_metadata_from_flow(&md, &flow);
+ flow_extract(packet, &md, &miss->flow);
hash = flow_hash(&miss->flow, 0);
existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
miss->stats.used = time_msec();
miss->stats.tcp_flags = 0;
miss->odp_in_port = odp_in_port;
+ miss->put = false;
n_misses++;
} else {
miss->flow.vlan_tci = 0;
}
- if (may_put) {
+ /* Do not install a flow into the datapath if:
+ *
+ * - The datapath already has too many flows.
+ *
+ * - An earlier iteration of this loop already put the same flow.
+ *
+ * - We received this packet via some flow installed in the kernel
+ * already. */
+ if (may_put
+ && !miss->put
+ && upcall->dpif_upcall.type == DPIF_UC_MISS) {
struct ofpbuf mask;
bool megaflow;
+ miss->put = true;
+
atomic_read(&enable_megaflows, &megaflow);
ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
if (megaflow) {
+ size_t max_mpls;
+
+ max_mpls = ofproto_dpif_get_max_mpls_depth(miss->ofproto);
odp_flow_key_from_mask(&mask, &miss->xout.wc.masks,
- &miss->flow, UINT32_MAX);
+ &miss->flow, UINT32_MAX, max_mpls);
}
op = &ops[n_ops++];
return NULL;
}
+static struct udpif_key *
+ukey_create(const struct nlattr *key, size_t key_len, long long int used)
+{
+ struct udpif_key *ukey = xmalloc(sizeof *ukey);
+
+ ukey->key = (struct nlattr *) &ukey->key_buf;
+ memcpy(&ukey->key_buf, key, key_len);
+ ukey->key_len = key_len;
+
+ ukey->mark = false;
+ ukey->created = used ? used : time_msec();
+ memset(&ukey->stats, 0, sizeof ukey->stats);
+
+ return ukey;
+}
+
static void
ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
{
}
error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
- NULL, &ofproto, NULL, NULL, NULL, &odp_in_port);
+ &ofproto, NULL, NULL, NULL, &odp_in_port);
if (error) {
goto exit;
}
return ok;
}
+struct dump_op {
+ struct udpif_key *ukey;
+ struct udpif_flow_dump *udump;
+ struct dpif_flow_stats stats; /* Stats for 'op'. */
+ struct dpif_op op; /* Flow del operation. */
+};
+
static void
-revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
+ struct udpif_key *ukey, struct udpif_flow_dump *udump)
+{
+ op->ukey = ukey;
+ op->udump = udump;
+ op->op.type = DPIF_OP_FLOW_DEL;
+ op->op.u.flow_del.key = key;
+ op->op.u.flow_del.key_len = key_len;
+ op->op.u.flow_del.stats = &op->stats;
+}
+
+static void
+push_dump_ops(struct revalidator *revalidator,
+ struct dump_op *ops, size_t n_ops)
{
struct udpif *udpif = revalidator->udpif;
+ struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+ size_t i;
- struct {
- struct dpif_flow_stats ukey_stats; /* Stats stored in the ukey. */
- struct dpif_flow_stats stats; /* Stats for 'op'. */
- struct dpif_op op; /* Flow del operation. */
- } ops[REVALIDATE_MAX_BATCH];
+ ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
+ for (i = 0; i < n_ops; i++) {
+ opsp[i] = &ops[i].op;
+ }
+ dpif_operate(udpif->dpif, opsp, n_ops);
- struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+ for (i = 0; i < n_ops; i++) {
+ struct dump_op *op = &ops[i];
+ struct dpif_flow_stats *push, *stats, push_buf;
+
+ stats = op->op.u.flow_del.stats;
+ if (op->ukey) {
+ push = &push_buf;
+ push->used = MAX(stats->used, op->ukey->stats.used);
+ push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
+ push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
+ push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+ } else {
+ push = stats;
+ }
+
+ if (push->n_packets || netflow_exists()) {
+ struct ofproto_dpif *ofproto;
+ struct netflow *netflow;
+ struct flow flow;
+
+ if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
+ op->op.u.flow_del.key_len, &flow, &ofproto,
+ NULL, NULL, &netflow, NULL)) {
+ struct xlate_in xin;
+
+ xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
+ NULL);
+ xin.resubmit_stats = push->n_packets ? push : NULL;
+ xin.may_learn = push->n_packets > 0;
+ xin.skip_wildcards = true;
+ xlate_actions_for_side_effects(&xin);
+
+ if (netflow) {
+ netflow_expire(netflow, &flow);
+ netflow_flow_clear(netflow, &flow);
+ netflow_unref(netflow);
+ }
+ }
+ }
+ }
+
+ for (i = 0; i < n_ops; i++) {
+ struct udpif_key *ukey;
+
+ /* If there's a udump, this ukey came directly from a datapath flow
+ * dump. Sometimes a datapath can send duplicates in flow dumps, in
+ * which case we wouldn't want to double-free a ukey, so avoid that by
+ * looking up the ukey again.
+ *
+ * If there's no udump then we know what we're doing. */
+ ukey = (ops[i].udump
+ ? ukey_lookup(revalidator, ops[i].udump)
+ : ops[i].ukey);
+ if (ukey) {
+ ukey_delete(revalidator, ukey);
+ }
+ }
+}
+
+static void
+revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+{
+ struct udpif *udpif = revalidator->udpif;
+
+ struct dump_op ops[REVALIDATE_MAX_BATCH];
struct udpif_flow_dump *udump, *next_udump;
- size_t n_ops, i, n_flows;
+ size_t n_ops, n_flows;
unsigned int flow_limit;
long long int max_idle;
bool must_del;
- atomic_read(&udpif->max_idle, &max_idle);
atomic_read(&udpif->flow_limit, &flow_limit);
n_flows = udpif_get_n_flows(udpif);
must_del = false;
+ max_idle = ofproto_max_idle;
if (n_flows > flow_limit) {
must_del = n_flows > 2 * flow_limit;
max_idle = 100;
}
if (must_del || (used && used < now - max_idle)) {
- struct dpif_flow_stats *ukey_stats = &ops[n_ops].ukey_stats;
- struct dpif_op *op = &ops[n_ops].op;
-
- op->type = DPIF_OP_FLOW_DEL;
- op->u.flow_del.key = udump->key;
- op->u.flow_del.key_len = udump->key_len;
- op->u.flow_del.stats = &ops[n_ops].stats;
- n_ops++;
-
- if (ukey) {
- *ukey_stats = ukey->stats;
- ukey_delete(revalidator, ukey);
- } else {
- memset(ukey_stats, 0, sizeof *ukey_stats);
- }
+ struct dump_op *dop = &ops[n_ops++];
+ dump_op_init(dop, udump->key, udump->key_len, ukey, udump);
continue;
}
if (!ukey) {
- ukey = xmalloc(sizeof *ukey);
-
- ukey->key = (struct nlattr *) &ukey->key_buf;
- memcpy(ukey->key, udump->key, udump->key_len);
- ukey->key_len = udump->key_len;
-
- ukey->created = used ? used : now;
- memset(&ukey->stats, 0, sizeof ukey->stats);
-
- ukey->mark = false;
-
+ ukey = ukey_create(udump->key, udump->key_len, used);
hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
udump->key_hash);
}
free(udump);
}
- for (i = 0; i < n_ops; i++) {
- opsp[i] = &ops[i].op;
- }
- dpif_operate(udpif->dpif, opsp, n_ops);
-
- for (i = 0; i < n_ops; i++) {
- struct dpif_flow_stats push, *stats, *ukey_stats;
-
- ukey_stats = &ops[i].ukey_stats;
- stats = ops[i].op.u.flow_del.stats;
- push.used = MAX(stats->used, ukey_stats->used);
- push.tcp_flags = stats->tcp_flags | ukey_stats->tcp_flags;
- push.n_packets = stats->n_packets - ukey_stats->n_packets;
- push.n_bytes = stats->n_bytes - ukey_stats->n_bytes;
-
- if (push.n_packets || netflow_exists()) {
- struct ofproto_dpif *ofproto;
- struct netflow *netflow;
- struct flow flow;
-
- if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key,
- ops[i].op.u.flow_del.key_len, &flow, NULL,
- &ofproto, NULL, NULL, &netflow, NULL)) {
- struct xlate_in xin;
-
- xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags,
- NULL);
- xin.resubmit_stats = push.n_packets ? &push : NULL;
- xin.may_learn = push.n_packets > 0;
- xin.skip_wildcards = true;
- xlate_actions_for_side_effects(&xin);
-
- if (netflow) {
- netflow_expire(netflow, &flow);
- netflow_flow_clear(netflow, &flow);
- netflow_unref(netflow);
- }
- }
- }
- }
+ push_dump_ops(revalidator, ops, n_ops);
LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
list_remove(&udump->list_node);
}
static void
-revalidator_sweep(struct revalidator *revalidator)
+revalidator_sweep__(struct revalidator *revalidator, bool purge)
{
+ struct dump_op ops[REVALIDATE_MAX_BATCH];
struct udpif_key *ukey, *next;
+ size_t n_ops;
+
+ n_ops = 0;
HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
- if (ukey->mark) {
+ if (!purge && ukey->mark) {
ukey->mark = false;
} else {
- ukey_delete(revalidator, ukey);
+ struct dump_op *op = &ops[n_ops++];
+
+ /* If we have previously seen a flow in the datapath, but didn't
+ * see it during the most recent dump, delete it. This allows us
+ * to clean up the ukey and keep the statistics consistent. */
+ dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL);
+ if (n_ops == REVALIDATE_MAX_BATCH) {
+ push_dump_ops(revalidator, ops, n_ops);
+ n_ops = 0;
+ }
}
}
+
+ if (n_ops) {
+ push_dump_ops(revalidator, ops, n_ops);
+ }
+}
+
+static void
+revalidator_sweep(struct revalidator *revalidator)
+{
+ revalidator_sweep__(revalidator, false);
+}
+
+static void
+revalidator_purge(struct revalidator *revalidator)
+{
+ revalidator_sweep__(revalidator, true);
}
\f
static void
LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
unsigned int flow_limit;
- long long int max_idle;
size_t i;
atomic_read(&udpif->flow_limit, &flow_limit);
- atomic_read(&udpif->max_idle, &max_idle);
ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
ds_put_format(&ds, "\tflows : (current %"PRIu64")"
" (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
- ds_put_format(&ds, "\tmax idle : %lldms\n", max_idle);
ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
ds_put_char(&ds, '\n');
void *aux OVS_UNUSED)
{
atomic_store(&enable_megaflows, false);
- udpif_flush();
+ udpif_flush_all_datapaths();
unixctl_command_reply(conn, "megaflows disabled");
}
void *aux OVS_UNUSED)
{
atomic_store(&enable_megaflows, true);
- udpif_flush();
+ udpif_flush_all_datapaths();
unixctl_command_reply(conn, "megaflows enabled");
}
+
+/* Set the flow limit.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+ unsigned int flow_limit = atoi(argv[1]);
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ atomic_store(&udpif->flow_limit, flow_limit);
+ }
+ ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
+}