ofproto: New feature to notify controllers of flow table changes.
authorBen Pfaff <blp@nicira.com>
Thu, 12 Jul 2012 21:18:05 +0000 (14:18 -0700)
committerBen Pfaff <blp@nicira.com>
Thu, 12 Jul 2012 21:18:05 +0000 (14:18 -0700)
OpenFlow switching monitoring and controller coordination can be made more
efficient if the switch can notify a controller of flow table changes as
they occur, rather than periodically polling for changes.  This commit
implements such a feature.

Feature #6633.
CC: Natasha Gude <natasha@nicira.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
17 files changed:
NEWS
include/openflow/nicira-ext.h
lib/learning-switch.c
lib/ofp-errors.h
lib/ofp-parse.c
lib/ofp-parse.h
lib/ofp-print.c
lib/ofp-util.c
lib/ofp-util.h
ofproto/connmgr.c
ofproto/connmgr.h
ofproto/ofproto-provider.h
ofproto/ofproto.c
tests/ofp-print.at
tests/ofproto.at
utilities/ovs-ofctl.8.in
utilities/ovs-ofctl.c

diff --git a/NEWS b/NEWS
index f5190c0..5b9db40 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -22,6 +22,8 @@ post-v1.7.0
         queue does not exist, or for requests for a specific queue on all
         ports, if the specified queue does not exist on any port.  (Previous
         versions generally reported an empty set of results.)
         queue does not exist, or for requests for a specific queue on all
         ports, if the specified queue does not exist on any port.  (Previous
         versions generally reported an empty set of results.)
+      - New "flow monitor" feature to allow controllers to be notified of
+        flow table changes as they happen.
     - Additional protocols are not mirrored and dropped when forward-bpdu is
       false.  For a full list, see the ovs-vswitchd.conf.db man page.
     - Open vSwitch now sends RARP packets in situations where it previously
     - Additional protocols are not mirrored and dropped when forward-bpdu is
       false.  For a full list, see the ovs-vswitchd.conf.db man page.
     - Open vSwitch now sends RARP packets in situations where it previously
index 82deeb0..1104dbf 100644 (file)
@@ -117,6 +117,11 @@ enum nicira_type {
 
     NXT_SET_ASYNC_CONFIG = 19,  /* struct nx_async_config. */
     NXT_SET_CONTROLLER_ID = 20, /* struct nx_controller_id. */
 
     NXT_SET_ASYNC_CONFIG = 19,  /* struct nx_async_config. */
     NXT_SET_CONTROLLER_ID = 20, /* struct nx_controller_id. */
+
+    /* Flow table monitoring (see also NXST_FLOW_MONITOR). */
+    NXT_FLOW_MONITOR_CANCEL = 21,  /* struct nx_flow_monitor_cancel. */
+    NXT_FLOW_MONITOR_PAUSED = 22,  /* struct nicira_header. */
+    NXT_FLOW_MONITOR_RESUMED = 23, /* struct nicira_header. */
 };
 
 /* Header for Nicira vendor stats request and reply messages. */
 };
 
 /* Header for Nicira vendor stats request and reply messages. */
@@ -131,7 +136,10 @@ OFP_ASSERT(sizeof(struct nicira_stats_msg) == 24);
 enum nicira_stats_type {
     /* Flexible flow specification (aka NXM = Nicira Extended Match). */
     NXST_FLOW,                  /* Analogous to OFPST_FLOW. */
 enum nicira_stats_type {
     /* Flexible flow specification (aka NXM = Nicira Extended Match). */
     NXST_FLOW,                  /* Analogous to OFPST_FLOW. */
-    NXST_AGGREGATE              /* Analogous to OFPST_AGGREGATE. */
+    NXST_AGGREGATE,             /* Analogous to OFPST_AGGREGATE. */
+
+    /* Flow table monitoring. */
+    NXST_FLOW_MONITOR,
 };
 
 /* Fields to use when hashing flows. */
 };
 
 /* Fields to use when hashing flows. */
@@ -1976,5 +1984,240 @@ struct nx_action_controller {
     uint8_t zero;                   /* Must be zero. */
 };
 OFP_ASSERT(sizeof(struct nx_action_controller) == 16);
     uint8_t zero;                   /* Must be zero. */
 };
 OFP_ASSERT(sizeof(struct nx_action_controller) == 16);
+\f
+/* Flow Table Monitoring
+ * =====================
+ *
+ * NXST_FLOW_MONITOR allows a controller to keep track of changes to OpenFlow
+ * flow table(s) or subsets of them, with the following workflow:
+ *
+ * 1. The controller sends an NXST_FLOW_MONITOR request to begin monitoring
+ *    flows.  The 'id' in the request must be unique among all monitors that
+ *    the controller has started and not yet canceled on this OpenFlow
+ *    connection.
+ *
+ * 2. The switch responds with an NXST_FLOW_MONITOR reply.  If the request's
+ *    'flags' included NXFMF_INITIAL, the reply includes all the flows that
+ *    matched the request at the time of the request (with event NXFME_ADDED).
+ *    If 'flags' did not include NXFMF_INITIAL, the reply is empty.
+ *
+ *    The reply uses the xid of the request (as do all replies to OpenFlow
+ *    requests).
+ *
+ * 3. Whenever a change to a flow table entry matches some outstanding monitor
+ *    request's criteria and flags, the switch sends a notification to the
+ *    controller as an additional NXST_FLOW_MONITOR reply with xid 0.
+ *
+ *    When multiple outstanding monitors match a single change, only a single
+ *    notification is sent.  This merged notification includes the information
+ *    requested in any of the individual monitors.  That is, if any of the
+ *    matching monitors requests actions (NXFMF_ACTIONS), the notification
+ *    includes actions, and if any of the monitors request full changes for the
+ *    controller's own changes (NXFMF_OWN), the controller's own changes will
+ *    be included in full.
+ *
+ * 4. The controller may cancel a monitor with NXT_FLOW_MONITOR_CANCEL.  No
+ *    further notifications will be sent on the basis of the canceled monitor
+ *    afterward.
+ *
+ *
+ * Buffer Management
+ * =================
+ *
+ * OpenFlow messages for flow monitor notifications can overflow the buffer
+ * space available to the switch, either temporarily (e.g. due to network
+ * conditions slowing OpenFlow traffic) or more permanently (e.g. the sustained
+ * rate of flow table change exceeds the network bandwidth between switch and
+ * controller).
+ *
+ * When Open vSwitch's notification buffer space reaches a limiting threshold,
+ * OVS reacts as follows:
+ *
+ * 1. OVS sends an NXT_FLOW_MONITOR_PAUSED message to the controller, following
+ *    all the already queued notifications.  After it receives this message,
+ *    the controller knows that its view of the flow table, as represented by
+ *    flow monitor notifications, is incomplete.
+ *
+ * 2. As long as the notification buffer is not empty:
+ *
+ *        - NXMFE_ADD and NXFME_MODIFIED notifications will not be sent.
+ *
+ *        - NXFME_DELETED notifications will still be sent, but only for flows
+ *          that existed before OVS sent NXT_FLOW_MONITOR_PAUSED.
+ *
+ *        - NXFME_ABBREV notifications will not be sent.  They are treated as
+ *          the expanded version (and therefore only the NXFME_DELETED
+ *          components, if any, are sent).
+ *
+ * 3. When the notification buffer empties, OVS sends NXFME_ADD notifications
+ *    for flows added since the buffer reached its limit and NXFME_MODIFIED
+ *    notifications for flows that existed before the limit was reached and
+ *    changed after the limit was reached.
+ *
+ * 4. OVS sends an NXT_FLOW_MONITOR_RESUMED message to the controller.  After
+ *    it receives this message, the controller knows that its view of the flow
+ *    table, as represented by flow monitor notifications, is again complete.
+ *
+ * This allows the maximum buffer space requirement for notifications to be
+ * bounded by the limit plus the maximum number of supported flows.
+ *
+ *
+ * "Flow Removed" messages
+ * =======================
+ *
+ * The flow monitor mechanism is independent of OFPT_FLOW_REMOVED and
+ * NXT_FLOW_REMOVED.  Flow monitor updates for deletion are sent if
+ * NXFMF_DELETE is set on a monitor, regardless of whether the
+ * OFPFF_SEND_FLOW_REM flag was set when the flow was added. */
+
+/* NXST_FLOW_MONITOR request.
+ *
+ * The NXST_FLOW_MONITOR request's body consists of an array of zero or more
+ * instances of this structure.  The request arranges to monitor the flows
+ * that match the specified criteria, which are interpreted in the same way as
+ * for NXST_FLOW.
+ *
+ * 'id' identifies a particular monitor for the purpose of allowing it to be
+ * canceled later with NXT_FLOW_MONITOR_CANCEL.  'id' must be unique among
+ * existing monitors that have not already been canceled.
+ *
+ * The reply includes the initial flow matches for monitors that have the
+ * NXFMF_INITIAL flag set.  No single flow will be included in the reply more
+ * than once, even if more than one requested monitor matches that flow.  The
+ * reply will be empty if none of the monitors has NXFMF_INITIAL set or if none
+ * of the monitors initially matches any flows.
+ *
+ * For NXFMF_ADD, an event will be reported if 'out_port' matches against the
+ * actions of the flow being added or, for a flow that is replacing an existing
+ * flow, if 'out_port' matches against the actions of the flow being replaced.
+ * For NXFMF_DELETE, 'out_port' matches against the actions of a flow being
+ * deleted.  For NXFMF_MODIFY, an event will be reported if 'out_port' matches
+ * either the old or the new actions. */
+struct nx_flow_monitor_request {
+    ovs_be32 id;                /* Controller-assigned ID for this monitor. */
+    ovs_be16 flags;             /* NXFMF_*. */
+    ovs_be16 out_port;          /* Required output port, if not OFPP_NONE. */
+    ovs_be16 match_len;         /* Length of nx_match. */
+    uint8_t table_id;           /* One table's ID or 0xff for all tables. */
+    uint8_t zeros[5];           /* Align to 64 bits (must be zero). */
+    /* Followed by:
+     *   - Exactly match_len (possibly 0) bytes containing the nx_match, then
+     *   - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of
+     *     all-zero bytes. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_monitor_request) == 16);
+
+/* 'flags' bits in struct nx_flow_monitor_request. */
+enum nx_flow_monitor_flags {
+    /* When to send updates. */
+    NXFMF_INITIAL = 1 << 0,     /* Initially matching flows. */
+    NXFMF_ADD = 1 << 1,         /* New matching flows as they are added. */
+    NXFMF_DELETE = 1 << 2,      /* Old matching flows as they are removed. */
+    NXFMF_MODIFY = 1 << 3,      /* Matching flows as they are changed. */
+
+    /* What to include in updates. */
+    NXFMF_ACTIONS = 1 << 4,     /* If set, actions are included. */
+    NXFMF_OWN = 1 << 5,         /* If set, include own changes in full. */
+};
+
+/* NXST_FLOW_MONITOR reply header.
+ *
+ * The body of an NXST_FLOW_MONITOR reply is an array of variable-length
+ * structures, each of which begins with this header.  The 'length' member may
+ * be used to traverse the array, and the 'event' member may be used to
+ * determine the particular structure.
+ *
+ * Every instance is a multiple of 8 bytes long. */
+struct nx_flow_update_header {
+    ovs_be16 length;            /* Length of this entry. */
+    ovs_be16 event;             /* One of NXFME_*. */
+    /* ...other data depending on 'event'... */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_header) == 4);
+
+/* 'event' values in struct nx_flow_update_header. */
+enum nx_flow_update_event {
+    /* struct nx_flow_update_full. */
+    NXFME_ADDED = 0,            /* Flow was added. */
+    NXFME_DELETED = 1,          /* Flow was deleted. */
+    NXFME_MODIFIED = 2,         /* Flow (generally its actions) was changed. */
+
+    /* struct nx_flow_update_abbrev. */
+    NXFME_ABBREV = 3,           /* Abbreviated reply. */
+};
+
+/* NXST_FLOW_MONITOR reply for NXFME_ADDED, NXFME_DELETED, and
+ * NXFME_MODIFIED. */
+struct nx_flow_update_full {
+    ovs_be16 length;            /* Length is 24. */
+    ovs_be16 event;             /* One of NXFME_*. */
+    ovs_be16 reason;            /* OFPRR_* for NXFME_DELETED, else zero. */
+    ovs_be16 priority;          /* Priority of the entry. */
+    ovs_be16 idle_timeout;      /* Number of seconds idle before expiration. */
+    ovs_be16 hard_timeout;      /* Number of seconds before expiration. */
+    ovs_be16 match_len;         /* Length of nx_match. */
+    uint8_t table_id;           /* ID of flow's table. */
+    uint8_t pad;                /* Reserved, currently zeroed. */
+    ovs_be64 cookie;            /* Opaque controller-issued identifier. */
+    /* Followed by:
+     *   - Exactly match_len (possibly 0) bytes containing the nx_match, then
+     *   - Exactly (match_len + 7)/8*8 - match_len (between 0 and 7) bytes of
+     *     all-zero bytes, then
+     *   - Actions to fill out the remainder 'length' bytes (always a multiple
+     *     of 8).  If NXFMF_ACTIONS was not specified, or 'event' is
+     *     NXFME_DELETED, no actions are included.
+     */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_full) == 24);
+
+/* NXST_FLOW_MONITOR reply for NXFME_ABBREV.
+ *
+ * When the controller does not specify NXFMF_OWN in a monitor request, any
+ * flow tables changes due to the controller's own requests (on the same
+ * OpenFlow channel) will be abbreviated, when possible, to this form, which
+ * simply specifies the 'xid' of the OpenFlow request (e.g. an OFPT_FLOW_MOD or
+ * NXT_FLOW_MOD) that caused the change.
+ *
+ * Some changes cannot be abbreviated and will be sent in full:
+ *
+ *   - Changes that only partially succeed.  This can happen if, for example,
+ *     a flow_mod with type OFPFC_MODIFY affects multiple flows, but only some
+ *     of those modifications succeed (e.g. due to hardware limitations).
+ *
+ *     This cannot occur with the current implementation of the Open vSwitch
+ *     software datapath.  It could happen with other datapath implementations.
+ *
+ *   - Changes that race with conflicting changes made by other controllers or
+ *     other flow_mods (not separated by barriers) by the same controller.
+ *
+ *     This cannot occur with the current Open vSwitch implementation
+ *     (regardless of datapath) because Open vSwitch internally serializes
+ *     potentially conflicting changes.
+ *
+ * A flow_mod that does not change the flow table will not trigger any
+ * notification, even an abbreviated one.  For example, a "modify" or "delete"
+ * flow_mod that does not match any flows will not trigger a notification.
+ * Whether an "add" or "modify" that specifies all the same parameters that a
+ * flow already has triggers a notification is unspecified and subject to
+ * change in future versions of Open vSwitch.
+ *
+ * OVS will always send the notifications for a given flow table change before
+ * the reply to a OFPT_BARRIER_REQUEST request that precedes the flow table
+ * change.  Thus, if the controller does not receive an abbreviated
+ * notification for a flow_mod before the next OFPT_BARRIER_REPLY, it will
+ * never receive one. */
+struct nx_flow_update_abbrev {
+    ovs_be16 length;            /* Length is 8. */
+    ovs_be16 event;             /* NXFME_ABBREV. */
+    ovs_be32 xid;               /* Controller-specified xid from flow_mod. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_update_abbrev) == 8);
+
+/* Used by a controller to cancel an outstanding monitor. */
+struct nx_flow_monitor_cancel {
+    struct nicira_header nxh;   /* Type NXT_FLOW_MONITOR_CANCEL. */
+    ovs_be32 id;                /* 'id' from nx_flow_monitor_request. */
+};
+OFP_ASSERT(sizeof(struct nx_flow_monitor_cancel) == 20);
 
 #endif /* openflow/nicira-ext.h */
 
 #endif /* openflow/nicira-ext.h */
index cb0e49b..b41bea0 100644 (file)
@@ -295,12 +295,17 @@ lswitch_process_packet(struct lswitch *sw, struct rconn *rconn,
     case OFPUTIL_NXT_FLOW_MOD:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_FLOW_AGE:
     case OFPUTIL_NXT_FLOW_MOD:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_FLOW_AGE:
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
     case OFPUTIL_NXT_SET_CONTROLLER_ID:
     case OFPUTIL_NXST_FLOW_REQUEST:
     case OFPUTIL_NXST_AGGREGATE_REQUEST:
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
     case OFPUTIL_NXT_SET_CONTROLLER_ID:
     case OFPUTIL_NXST_FLOW_REQUEST:
     case OFPUTIL_NXST_AGGREGATE_REQUEST:
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
     default:
         if (VLOG_IS_DBG_ENABLED()) {
             char *s = ofp_to_string(msg->data, msg->size, 2);
     default:
         if (VLOG_IS_DBG_ENABLED()) {
             char *s = ofp_to_string(msg->data, msg->size, 2);
index 61cef41..dddf8d0 100644 (file)
@@ -129,6 +129,20 @@ enum ofperr {
      * valid. */
     OFPERR_NXBRC_BAD_REASON,
 
      * valid. */
     OFPERR_NXBRC_BAD_REASON,
 
+    /* NX1.0+(1,517).  The 'id' in an NXST_FLOW_MONITOR request is the same as
+     * an existing monitor id (or two monitors in the same NXST_FLOW_MONITOR
+     * request have the same 'id').  */
+    OFPERR_NXBRC_FM_DUPLICATE_ID,
+
+    /* NX1.0+(1,518).  The 'flags' in an NXST_FLOW_MONITOR request either does
+     * not specify at least one of the NXFMF_ADD, NXFMF_DELETE, or NXFMF_MODIFY
+     * flags, or specifies a flag bit that is not defined. */
+    OFPERR_NXBRC_FM_BAD_FLAGS,
+
+    /* NX1.0+(1,519).  The 'id' in an NXT_FLOW_MONITOR_CANCEL request is not
+     * the id of any existing monitor. */
+    OFPERR_NXBRC_FM_BAD_ID,
+
 /* ## ---------------- ## */
 /* ## OFPET_BAD_ACTION ## */
 /* ## ---------------- ## */
 /* ## ---------------- ## */
 /* ## OFPET_BAD_ACTION ## */
 /* ## ---------------- ## */
@@ -469,7 +483,6 @@ enum ofperr {
     /* NX1.0(1,513), NX1.1(1,513), OF1.2+(11,2).  Invalid role. */
     OFPERR_OFPRRFC_BAD_ROLE,
 
     /* NX1.0(1,513), NX1.1(1,513), OF1.2+(11,2).  Invalid role. */
     OFPERR_OFPRRFC_BAD_ROLE,
 
-
 /* ## ------------------ ## */
 /* ## OFPET_EXPERIMENTER ## */
 /* ## ------------------ ## */
 /* ## ------------------ ## */
 /* ## OFPET_EXPERIMENTER ## */
 /* ## ------------------ ## */
index 922e296..32d3836 100644 (file)
@@ -700,6 +700,68 @@ parse_ofp_str(struct ofputil_flow_mod *fm, int command, const char *str_,
     free(string);
 }
 
     free(string);
 }
 
+/* Convert 'str_' (as described in the documentation for the "monitor" command
+ * in the ovs-ofctl man page) into 'fmr'. */
+void
+parse_flow_monitor_request(struct ofputil_flow_monitor_request *fmr,
+                           const char *str_)
+{
+    static uint32_t id;
+
+    char *string = xstrdup(str_);
+    char *save_ptr = NULL;
+    char *name;
+
+    fmr->id = id++;
+    fmr->flags = (NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY
+                  | NXFMF_OWN | NXFMF_ACTIONS);
+    fmr->out_port = OFPP_NONE;
+    fmr->table_id = 0xff;
+    cls_rule_init_catchall(&fmr->match, 0);
+
+    for (name = strtok_r(string, "=, \t\r\n", &save_ptr); name;
+         name = strtok_r(NULL, "=, \t\r\n", &save_ptr)) {
+        const struct protocol *p;
+
+        if (!strcmp(name, "!initial")) {
+            fmr->flags &= ~NXFMF_INITIAL;
+        } else if (!strcmp(name, "!add")) {
+            fmr->flags &= ~NXFMF_ADD;
+        } else if (!strcmp(name, "!delete")) {
+            fmr->flags &= ~NXFMF_DELETE;
+        } else if (!strcmp(name, "!modify")) {
+            fmr->flags &= ~NXFMF_MODIFY;
+        } else if (!strcmp(name, "!actions")) {
+            fmr->flags &= ~NXFMF_ACTIONS;
+        } else if (!strcmp(name, "!own")) {
+            fmr->flags &= ~NXFMF_OWN;
+        } else if (parse_protocol(name, &p)) {
+            cls_rule_set_dl_type(&fmr->match, htons(p->dl_type));
+            if (p->nw_proto) {
+                cls_rule_set_nw_proto(&fmr->match, p->nw_proto);
+            }
+        } else {
+            char *value;
+
+            value = strtok_r(NULL, ", \t\r\n", &save_ptr);
+            if (!value) {
+                ovs_fatal(0, "%s: field %s missing value", str_, name);
+            }
+
+            if (!strcmp(name, "table")) {
+                fmr->table_id = str_to_table_id(value);
+            } else if (!strcmp(name, "out_port")) {
+                fmr->out_port = atoi(value);
+            } else if (mf_from_name(name)) {
+                parse_field(mf_from_name(name), value, &fmr->match);
+            } else {
+                ovs_fatal(0, "%s: unknown keyword %s", str_, name);
+            }
+        }
+    }
+    free(string);
+}
+
 /* Parses 's' as a set of OpenFlow actions and appends the actions to
  * 'actions'.
  *
 /* Parses 's' as a set of OpenFlow actions and appends the actions to
  * 'actions'.
  *
index e930388..d2d3c3c 100644 (file)
@@ -26,6 +26,7 @@
 struct flow;
 struct ofpbuf;
 struct ofputil_flow_mod;
 struct flow;
 struct ofpbuf;
 struct ofputil_flow_mod;
+struct ofputil_flow_monitor_request;
 struct ofputil_flow_stats_request;
 
 void parse_ofp_str(struct ofputil_flow_mod *, int command, const char *str_,
 struct ofputil_flow_stats_request;
 
 void parse_ofp_str(struct ofputil_flow_mod *, int command, const char *str_,
@@ -44,4 +45,7 @@ void parse_ofpacts(const char *, struct ofpbuf *ofpacts);
 
 char *parse_ofp_exact_flow(struct flow *, const char *);
 
 
 char *parse_ofp_exact_flow(struct flow *, const char *);
 
+void parse_flow_monitor_request(struct ofputil_flow_monitor_request *,
+                                const char *);
+
 #endif /* ofp-parse.h */
 #endif /* ofp-parse.h */
index 03de5f1..48b8daa 100644 (file)
@@ -1368,6 +1368,137 @@ ofp_print_nxt_set_controller_id(struct ds *string,
     ds_put_format(string, " id=%"PRIu16, ntohs(nci->controller_id));
 }
 
     ds_put_format(string, " id=%"PRIu16, ntohs(nci->controller_id));
 }
 
+static void
+ofp_print_nxt_flow_monitor_cancel(struct ds *string,
+                                  const struct ofp_header *oh)
+{
+    ds_put_format(string, " id=%"PRIu32,
+                  ofputil_decode_flow_monitor_cancel(oh));
+}
+
+static const char *
+nx_flow_monitor_flags_to_name(uint32_t bit)
+{
+    enum nx_flow_monitor_flags fmf = bit;
+
+    switch (fmf) {
+    case NXFMF_INITIAL: return "initial";
+    case NXFMF_ADD: return "add";
+    case NXFMF_DELETE: return "delete";
+    case NXFMF_MODIFY: return "modify";
+    case NXFMF_ACTIONS: return "actions";
+    case NXFMF_OWN: return "own";
+    }
+
+    return NULL;
+}
+
+static void
+ofp_print_nxst_flow_monitor_request(struct ds *string,
+                                    const struct ofp_header *oh)
+{
+    struct ofpbuf b;
+
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval) {
+            if (retval != EOF) {
+                ofp_print_error(string, retval);
+            }
+            return;
+        }
+
+        ds_put_format(string, "\n id=%"PRIu32" flags=", request.id);
+        ofp_print_bit_names(string, request.flags,
+                            nx_flow_monitor_flags_to_name, ',');
+
+        if (request.out_port != OFPP_NONE) {
+            ds_put_cstr(string, " out_port=");
+            ofputil_format_port(request.out_port, string);
+        }
+
+        if (request.table_id != 0xff) {
+            ds_put_format(string, " table=%"PRIu8, request.table_id);
+        }
+
+        ds_put_char(string, ' ');
+        cls_rule_format(&request.match, string);
+        ds_chomp(string, ' ');
+    }
+}
+
+static void
+ofp_print_nxst_flow_monitor_reply(struct ds *string,
+                                  const struct ofp_header *oh)
+{
+    uint64_t ofpacts_stub[1024 / 8];
+    struct ofpbuf ofpacts;
+    struct ofpbuf b;
+
+    ofpbuf_use_const(&b, oh, ntohs(oh->length));
+    ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
+    for (;;) {
+        struct ofputil_flow_update update;
+        struct cls_rule match;
+        int retval;
+
+        update.match = &match;
+        retval = ofputil_decode_flow_update(&update, &b, &ofpacts);
+        if (retval) {
+            if (retval != EOF) {
+                ofp_print_error(string, retval);
+            }
+            ofpbuf_uninit(&ofpacts);
+            return;
+        }
+
+        ds_put_cstr(string, "\n event=");
+        switch (update.event) {
+        case NXFME_ADDED:
+            ds_put_cstr(string, "ADDED");
+            break;
+
+        case NXFME_DELETED:
+            ds_put_format(string, "DELETED reason=%s",
+                          ofp_flow_removed_reason_to_string(update.reason));
+            break;
+
+        case NXFME_MODIFIED:
+            ds_put_cstr(string, "MODIFIED");
+            break;
+
+        case NXFME_ABBREV:
+            ds_put_format(string, "ABBREV xid=0x%"PRIx32, ntohl(update.xid));
+            continue;
+        }
+
+        ds_put_format(string, " table=%"PRIu8, update.table_id);
+        if (update.idle_timeout != OFP_FLOW_PERMANENT) {
+            ds_put_format(string, " idle_timeout=%"PRIu16,
+                          update.idle_timeout);
+        }
+        if (update.hard_timeout != OFP_FLOW_PERMANENT) {
+            ds_put_format(string, " hard_timeout=%"PRIu16,
+                          update.hard_timeout);
+        }
+        ds_put_format(string, " cookie=%#"PRIx64, ntohll(update.cookie));
+
+        ds_put_char(string, ' ');
+        cls_rule_format(update.match, string);
+
+        if (update.ofpacts_len) {
+            if (string->string[string->length - 1] != ' ') {
+                ds_put_char(string, ' ');
+            }
+            ofpacts_format(update.ofpacts, update.ofpacts_len, string);
+        }
+    }
+}
+
 void
 ofp_print_version(const struct ofp_header *oh,
                   struct ds *string)
 void
 ofp_print_version(const struct ofp_header *oh,
                   struct ds *string)
@@ -1564,6 +1695,22 @@ ofp_to_string__(const struct ofp_header *oh,
         ofp_print_stats_reply(string, oh);
         ofp_print_nxst_aggregate_reply(string, msg);
         break;
         ofp_print_stats_reply(string, oh);
         ofp_print_nxst_aggregate_reply(string, msg);
         break;
+
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+        ofp_print_nxt_flow_monitor_cancel(string, msg);
+        break;
+
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
+        break;
+
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
+        ofp_print_nxst_flow_monitor_request(string, msg);
+        break;
+
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
+        ofp_print_nxst_flow_monitor_reply(string, msg);
+        break;
     }
 }
 
     }
 }
 
index cd26143..5fb8d8f 100644 (file)
@@ -678,6 +678,18 @@ ofputil_decode_vendor(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXT_SET_CONTROLLER_ID, OFP10_VERSION,
           NXT_SET_CONTROLLER_ID, "NXT_SET_CONTROLLER_ID",
           sizeof(struct nx_controller_id), 0 },
         { OFPUTIL_NXT_SET_CONTROLLER_ID, OFP10_VERSION,
           NXT_SET_CONTROLLER_ID, "NXT_SET_CONTROLLER_ID",
           sizeof(struct nx_controller_id), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_CANCEL, OFP10_VERSION,
+          NXT_FLOW_MONITOR_CANCEL, "NXT_FLOW_MONITOR_CANCEL",
+          sizeof(struct nx_flow_monitor_cancel), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_PAUSED, OFP10_VERSION,
+          NXT_FLOW_MONITOR_PAUSED, "NXT_FLOW_MONITOR_PAUSED",
+          sizeof(struct nicira_header), 0 },
+
+        { OFPUTIL_NXT_FLOW_MONITOR_RESUMED, OFP10_VERSION,
+          NXT_FLOW_MONITOR_RESUMED, "NXT_FLOW_MONITOR_RESUMED",
+          sizeof(struct nicira_header), 0 },
     };
 
     static const struct ofputil_msg_category nxt_category = {
     };
 
     static const struct ofputil_msg_category nxt_category = {
@@ -760,6 +772,10 @@ ofputil_decode_nxst_request(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXST_AGGREGATE_REQUEST, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE request",
           sizeof(struct nx_aggregate_stats_request), 8 },
         { OFPUTIL_NXST_AGGREGATE_REQUEST, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE request",
           sizeof(struct nx_aggregate_stats_request), 8 },
+
+        { OFPUTIL_NXST_FLOW_MONITOR_REQUEST, OFP10_VERSION,
+          NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR request",
+          sizeof(struct nicira_stats_msg), 8 },
     };
 
     static const struct ofputil_msg_category nxst_request_category = {
     };
 
     static const struct ofputil_msg_category nxst_request_category = {
@@ -793,6 +809,10 @@ ofputil_decode_nxst_reply(const struct ofp_header *oh, size_t length,
         { OFPUTIL_NXST_AGGREGATE_REPLY, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE reply",
           sizeof(struct nx_aggregate_stats_reply), 0 },
         { OFPUTIL_NXST_AGGREGATE_REPLY, OFP10_VERSION,
           NXST_AGGREGATE, "NXST_AGGREGATE reply",
           sizeof(struct nx_aggregate_stats_reply), 0 },
+
+        { OFPUTIL_NXST_FLOW_MONITOR_REPLY, OFP10_VERSION,
+          NXST_FLOW_MONITOR, "NXST_FLOW_MONITOR reply",
+          sizeof(struct nicira_stats_msg), 8 },
     };
 
     static const struct ofputil_msg_category nxst_reply_category = {
     };
 
     static const struct ofputil_msg_category nxst_reply_category = {
@@ -3095,7 +3115,268 @@ ofputil_encode_port_mod(const struct ofputil_port_mod *pm,
 
     return b;
 }
 
     return b;
 }
+\f
+/* ofputil_flow_monitor_request */
+
+/* Converts an NXST_FLOW_MONITOR request in 'msg' into an abstract
+ * ofputil_flow_monitor_request in 'rq'.
+ *
+ * Multiple NXST_FLOW_MONITOR requests can be packed into a single OpenFlow
+ * message.  Calling this function multiple times for a single 'msg' iterates
+ * through the requests.  The caller must initially leave 'msg''s layer
+ * pointers null and not modify them between calls.
+ *
+ * Returns 0 if successful, EOF if no requests were left in this 'msg',
+ * otherwise an OFPERR_* value. */
+int
+ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *rq,
+                                    struct ofpbuf *msg)
+{
+    struct nx_flow_monitor_request *nfmr;
+    uint16_t flags;
+
+    if (!msg->l2) {
+        msg->l2 = msg->data;
+        ofpbuf_pull(msg, sizeof(struct nicira_stats_msg));
+    }
+
+    if (!msg->size) {
+        return EOF;
+    }
+
+    nfmr = ofpbuf_try_pull(msg, sizeof *nfmr);
+    if (!nfmr) {
+        VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR request has %zu "
+                     "leftover bytes at end", msg->size);
+        return OFPERR_OFPBRC_BAD_LEN;
+    }
+
+    flags = ntohs(nfmr->flags);
+    if (!(flags & (NXFMF_ADD | NXFMF_DELETE | NXFMF_MODIFY))
+        || flags & ~(NXFMF_INITIAL | NXFMF_ADD | NXFMF_DELETE
+                     | NXFMF_MODIFY | NXFMF_ACTIONS | NXFMF_OWN)) {
+        VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR has bad flags %#"PRIx16,
+                     flags);
+        return OFPERR_NXBRC_FM_BAD_FLAGS;
+    }
+
+    if (!is_all_zeros(nfmr->zeros, sizeof nfmr->zeros)) {
+        return OFPERR_NXBRC_MUST_BE_ZERO;
+    }
+
+    rq->id = ntohl(nfmr->id);
+    rq->flags = flags;
+    rq->out_port = ntohs(nfmr->out_port);
+    rq->table_id = nfmr->table_id;
+
+    return nx_pull_match(msg, ntohs(nfmr->match_len), OFP_DEFAULT_PRIORITY,
+                         &rq->match, NULL, NULL);
+}
+
+void
+ofputil_append_flow_monitor_request(
+    const struct ofputil_flow_monitor_request *rq, struct ofpbuf *msg)
+{
+    struct nx_flow_monitor_request *nfmr;
+    size_t start_ofs;
+    int match_len;
+
+    if (!msg->size) {
+        ofputil_put_stats_header(alloc_xid(), OFPT10_STATS_REQUEST,
+                                 htons(OFPST_VENDOR),
+                                 htonl(NXST_FLOW_MONITOR), msg);
+    }
+
+    start_ofs = msg->size;
+    ofpbuf_put_zeros(msg, sizeof *nfmr);
+    match_len = nx_put_match(msg, false, &rq->match, htonll(0), htonll(0));
+
+    nfmr = ofpbuf_at_assert(msg, start_ofs, sizeof *nfmr);
+    nfmr->id = htonl(rq->id);
+    nfmr->flags = htons(rq->flags);
+    nfmr->out_port = htons(rq->out_port);
+    nfmr->match_len = htons(match_len);
+    nfmr->table_id = rq->table_id;
+}
+
+/* Converts an NXST_FLOW_MONITOR reply (also known as a flow update) in 'msg'
+ * into an abstract ofputil_flow_update in 'update'.  The caller must have
+ * initialized update->match to point to space allocated for a cls_rule.
+ *
+ * Uses 'ofpacts' to store the abstract OFPACT_* version of the update's
+ * actions (except for NXFME_ABBREV, which never includes actions).  The caller
+ * must initialize 'ofpacts' and retains ownership of it.  'update->ofpacts'
+ * will point into the 'ofpacts' buffer.
+ *
+ * Multiple flow updates can be packed into a single OpenFlow message.  Calling
+ * this function multiple times for a single 'msg' iterates through the
+ * updates.  The caller must initially leave 'msg''s layer pointers null and
+ * not modify them between calls.
+ *
+ * Returns 0 if successful, EOF if no updates were left in this 'msg',
+ * otherwise an OFPERR_* value. */
+int
+ofputil_decode_flow_update(struct ofputil_flow_update *update,
+                           struct ofpbuf *msg, struct ofpbuf *ofpacts)
+{
+    struct nx_flow_update_header *nfuh;
+    unsigned int length;
+
+    if (!msg->l2) {
+        msg->l2 = msg->data;
+        ofpbuf_pull(msg, sizeof(struct nicira_stats_msg));
+    }
+
+    if (!msg->size) {
+        return EOF;
+    }
+
+    if (msg->size < sizeof(struct nx_flow_update_header)) {
+        goto bad_len;
+    }
+
+    nfuh = msg->data;
+    update->event = ntohs(nfuh->event);
+    length = ntohs(nfuh->length);
+    if (length > msg->size || length % 8) {
+        goto bad_len;
+    }
 
 
+    if (update->event == NXFME_ABBREV) {
+        struct nx_flow_update_abbrev *nfua;
+
+        if (length != sizeof *nfua) {
+            goto bad_len;
+        }
+
+        nfua = ofpbuf_pull(msg, sizeof *nfua);
+        update->xid = nfua->xid;
+        return 0;
+    } else if (update->event == NXFME_ADDED
+               || update->event == NXFME_DELETED
+               || update->event == NXFME_MODIFIED) {
+        struct nx_flow_update_full *nfuf;
+        unsigned int actions_len;
+        unsigned int match_len;
+        enum ofperr error;
+
+        if (length < sizeof *nfuf) {
+            goto bad_len;
+        }
+
+        nfuf = ofpbuf_pull(msg, sizeof *nfuf);
+        match_len = ntohs(nfuf->match_len);
+        if (sizeof *nfuf + match_len > length) {
+            goto bad_len;
+        }
+
+        update->reason = ntohs(nfuf->reason);
+        update->idle_timeout = ntohs(nfuf->idle_timeout);
+        update->hard_timeout = ntohs(nfuf->hard_timeout);
+        update->table_id = nfuf->table_id;
+        update->cookie = nfuf->cookie;
+
+        error = nx_pull_match(msg, match_len, ntohs(nfuf->priority),
+                              update->match, NULL, NULL);
+        if (error) {
+            return error;
+        }
+
+        actions_len = length - sizeof *nfuf - ROUND_UP(match_len, 8);
+        error = ofpacts_pull_openflow10(msg, actions_len, ofpacts);
+        if (error) {
+            return error;
+        }
+
+        update->ofpacts = ofpacts->data;
+        update->ofpacts_len = ofpacts->size;
+        return 0;
+    } else {
+        VLOG_WARN_RL(&bad_ofmsg_rl,
+                     "NXST_FLOW_MONITOR reply has bad event %"PRIu16,
+                     ntohs(nfuh->event));
+        return OFPERR_OFPET_BAD_REQUEST;
+    }
+
+bad_len:
+    VLOG_WARN_RL(&bad_ofmsg_rl, "NXST_FLOW_MONITOR reply has %zu "
+                 "leftover bytes at end", msg->size);
+    return OFPERR_OFPBRC_BAD_LEN;
+}
+
+uint32_t
+ofputil_decode_flow_monitor_cancel(const struct ofp_header *oh)
+{
+    return ntohl(((const struct nx_flow_monitor_cancel *) oh)->id);
+}
+
+struct ofpbuf *
+ofputil_encode_flow_monitor_cancel(uint32_t id)
+{
+    struct nx_flow_monitor_cancel *nfmc;
+    struct ofpbuf *msg;
+
+    nfmc = make_nxmsg(sizeof *nfmc, NXT_FLOW_MONITOR_CANCEL, &msg);
+    nfmc->id = htonl(id);
+    return msg;
+}
+
+void
+ofputil_start_flow_update(struct list *replies)
+{
+    struct ofpbuf *msg;
+
+    msg = ofpbuf_new(1024);
+    ofputil_put_stats_header(htonl(0), OFPT10_STATS_REPLY,
+                             htons(OFPST_VENDOR),
+                             htonl(NXST_FLOW_MONITOR), msg);
+
+    list_init(replies);
+    list_push_back(replies, &msg->list_node);
+}
+
+void
+ofputil_append_flow_update(const struct ofputil_flow_update *update,
+                           struct list *replies)
+{
+    struct nx_flow_update_header *nfuh;
+    struct ofpbuf *msg;
+    size_t start_ofs;
+
+    msg = ofpbuf_from_list(list_back(replies));
+    start_ofs = msg->size;
+
+    if (update->event == NXFME_ABBREV) {
+        struct nx_flow_update_abbrev *nfua;
+
+        nfua = ofpbuf_put_zeros(msg, sizeof *nfua);
+        nfua->xid = update->xid;
+    } else {
+        struct nx_flow_update_full *nfuf;
+        int match_len;
+
+        ofpbuf_put_zeros(msg, sizeof *nfuf);
+        match_len = nx_put_match(msg, false, update->match,
+                                 htonll(0), htonll(0));
+        ofpacts_put_openflow10(update->ofpacts, update->ofpacts_len, msg);
+
+        nfuf = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuf);
+        nfuf->reason = htons(update->reason);
+        nfuf->priority = htons(update->match->priority);
+        nfuf->idle_timeout = htons(update->idle_timeout);
+        nfuf->hard_timeout = htons(update->hard_timeout);
+        nfuf->match_len = htons(match_len);
+        nfuf->table_id = update->table_id;
+        nfuf->cookie = update->cookie;
+    }
+
+    nfuh = ofpbuf_at_assert(msg, start_ofs, sizeof *nfuh);
+    nfuh->length = htons(msg->size - start_ofs);
+    nfuh->event = htons(update->event);
+
+    ofputil_postappend_stats_reply(start_ofs, replies);
+}
+\f
 struct ofpbuf *
 ofputil_encode_packet_out(const struct ofputil_packet_out *po)
 {
 struct ofpbuf *
 ofputil_encode_packet_out(const struct ofputil_packet_out *po)
 {
index 5b1e8ed..f7d3307 100644 (file)
@@ -86,14 +86,19 @@ enum ofputil_msg_code {
     OFPUTIL_NXT_FLOW_AGE,
     OFPUTIL_NXT_SET_ASYNC_CONFIG,
     OFPUTIL_NXT_SET_CONTROLLER_ID,
     OFPUTIL_NXT_FLOW_AGE,
     OFPUTIL_NXT_SET_ASYNC_CONFIG,
     OFPUTIL_NXT_SET_CONTROLLER_ID,
+    OFPUTIL_NXT_FLOW_MONITOR_CANCEL,
+    OFPUTIL_NXT_FLOW_MONITOR_PAUSED,
+    OFPUTIL_NXT_FLOW_MONITOR_RESUMED,
 
     /* NXST_* stat requests. */
     OFPUTIL_NXST_FLOW_REQUEST,
     OFPUTIL_NXST_AGGREGATE_REQUEST,
 
     /* NXST_* stat requests. */
     OFPUTIL_NXST_FLOW_REQUEST,
     OFPUTIL_NXST_AGGREGATE_REQUEST,
+    OFPUTIL_NXST_FLOW_MONITOR_REQUEST,
 
     /* NXST_* stat replies. */
     OFPUTIL_NXST_FLOW_REPLY,
 
     /* NXST_* stat replies. */
     OFPUTIL_NXST_FLOW_REPLY,
-    OFPUTIL_NXST_AGGREGATE_REPLY
+    OFPUTIL_NXST_AGGREGATE_REPLY,
+    OFPUTIL_NXST_FLOW_MONITOR_REPLY,
 };
 
 struct ofputil_msg_type;
 };
 
 struct ofputil_msg_type;
@@ -506,6 +511,48 @@ enum ofperr ofputil_decode_port_mod(const struct ofp_header *,
 struct ofpbuf *ofputil_encode_port_mod(const struct ofputil_port_mod *,
                                        enum ofputil_protocol);
 
 struct ofpbuf *ofputil_encode_port_mod(const struct ofputil_port_mod *,
                                        enum ofputil_protocol);
 
+/* Abstract nx_flow_monitor_request. */
+struct ofputil_flow_monitor_request {
+    uint32_t id;
+    enum nx_flow_monitor_flags flags;
+    uint16_t out_port;
+    uint8_t table_id;
+    struct cls_rule match;
+};
+
+int ofputil_decode_flow_monitor_request(struct ofputil_flow_monitor_request *,
+                                        struct ofpbuf *msg);
+void ofputil_append_flow_monitor_request(
+    const struct ofputil_flow_monitor_request *, struct ofpbuf *msg);
+
+/* Abstract nx_flow_update. */
+struct ofputil_flow_update {
+    enum nx_flow_update_event event;
+
+    /* Used only for NXFME_ADDED, NXFME_DELETED, NXFME_MODIFIED. */
+    enum ofp_flow_removed_reason reason;
+    uint16_t idle_timeout;
+    uint16_t hard_timeout;
+    uint8_t table_id;
+    ovs_be64 cookie;
+    struct cls_rule *match;
+    struct ofpact *ofpacts;
+    size_t ofpacts_len;
+
+    /* Used only for NXFME_ABBREV. */
+    ovs_be32 xid;
+};
+
+int ofputil_decode_flow_update(struct ofputil_flow_update *,
+                               struct ofpbuf *msg, struct ofpbuf *ofpacts);
+void ofputil_start_flow_update(struct list *replies);
+void ofputil_append_flow_update(const struct ofputil_flow_update *,
+                                struct list *replies);
+
+/* Abstract nx_flow_monitor_cancel. */
+uint32_t ofputil_decode_flow_monitor_cancel(const struct ofp_header *);
+struct ofpbuf *ofputil_encode_flow_monitor_cancel(uint32_t id);
+
 /* OpenFlow protocol utility functions. */
 void *make_openflow(size_t openflow_len, uint8_t type, struct ofpbuf **);
 void *make_nxmsg(size_t openflow_len, uint32_t subtype, struct ofpbuf **);
 /* OpenFlow protocol utility functions. */
 void *make_openflow(size_t openflow_len, uint8_t type, struct ofpbuf **);
 void *make_nxmsg(size_t openflow_len, uint32_t subtype, struct ofpbuf **);
index 3e750d2..b70b070 100644 (file)
@@ -88,6 +88,13 @@ struct ofconn {
      * that the message might be generated, a 0-bit disables it. */
     uint32_t master_async_config[OAM_N_TYPES]; /* master, other */
     uint32_t slave_async_config[OAM_N_TYPES];  /* slave */
      * that the message might be generated, a 0-bit disables it. */
     uint32_t master_async_config[OAM_N_TYPES]; /* master, other */
     uint32_t slave_async_config[OAM_N_TYPES];  /* slave */
+
+    /* Flow monitors. */
+    struct hmap monitors;       /* Contains "struct ofmonitor"s. */
+    struct list updates;        /* List of "struct ofpbuf"s. */
+    bool sent_abbrev_update;    /* Does 'updates' contain NXFME_ABBREV? */
+    struct rconn_packet_counter *monitor_counter;
+    uint64_t monitor_paused;
 };
 
 static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
 };
 
 static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
@@ -162,6 +169,8 @@ struct connmgr {
 
 static void update_in_band_remotes(struct connmgr *);
 static void add_snooper(struct connmgr *, struct vconn *);
 
 static void update_in_band_remotes(struct connmgr *);
 static void add_snooper(struct connmgr *, struct vconn *);
+static void ofmonitor_run(struct connmgr *);
+static void ofmonitor_wait(struct connmgr *);
 
 /* Creates and returns a new connection manager owned by 'ofproto'.  'name' is
  * a name for the ofproto suitable for using in log messages.
 
 /* Creates and returns a new connection manager owned by 'ofproto'.  'name' is
  * a name for the ofproto suitable for using in log messages.
@@ -267,6 +276,7 @@ connmgr_run(struct connmgr *mgr,
     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
         ofconn_run(ofconn, handle_openflow);
     }
     LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
         ofconn_run(ofconn, handle_openflow);
     }
+    ofmonitor_run(mgr);
 
     /* Fail-open maintenance.  Do this after processing the ofconns since
      * fail-open checks the status of the controller rconn. */
 
     /* Fail-open maintenance.  Do this after processing the ofconns since
      * fail-open checks the status of the controller rconn. */
@@ -326,6 +336,7 @@ connmgr_wait(struct connmgr *mgr, bool handling_openflow)
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
         ofconn_wait(ofconn, handling_openflow);
     }
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
         ofconn_wait(ofconn, handling_openflow);
     }
+    ofmonitor_wait(mgr);
     if (handling_openflow && mgr->in_band) {
         in_band_wait(mgr->in_band);
     }
     if (handling_openflow && mgr->in_band) {
         in_band_wait(mgr->in_band);
     }
@@ -1002,6 +1013,9 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
 
     list_init(&ofconn->opgroups);
 
 
     list_init(&ofconn->opgroups);
 
+    hmap_init(&ofconn->monitors);
+    list_init(&ofconn->updates);
+
     ofconn_flush(ofconn);
 
     return ofconn;
     ofconn_flush(ofconn);
 
     return ofconn;
@@ -1012,6 +1026,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
 static void
 ofconn_flush(struct ofconn *ofconn)
 {
 static void
 ofconn_flush(struct ofconn *ofconn)
 {
+    struct ofmonitor *monitor, *next_monitor;
     int i;
 
     ofconn->role = NX_ROLE_OTHER;
     int i;
 
     ofconn->role = NX_ROLE_OTHER;
@@ -1080,6 +1095,14 @@ ofconn_flush(struct ofconn *ofconn)
         memset(ofconn->slave_async_config, 0,
                sizeof ofconn->slave_async_config);
     }
         memset(ofconn->slave_async_config, 0,
                sizeof ofconn->slave_async_config);
     }
+
+    HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node,
+                        &ofconn->monitors) {
+        ofmonitor_destroy(monitor);
+    }
+    rconn_packet_counter_destroy(ofconn->monitor_counter);
+    ofconn->monitor_counter = rconn_packet_counter_create();
+    ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
 }
 
 static void
 }
 
 static void
@@ -1096,6 +1119,7 @@ ofconn_destroy(struct ofconn *ofconn)
     rconn_packet_counter_destroy(ofconn->packet_in_counter);
     rconn_packet_counter_destroy(ofconn->reply_counter);
     pktbuf_destroy(ofconn->pktbuf);
     rconn_packet_counter_destroy(ofconn->packet_in_counter);
     rconn_packet_counter_destroy(ofconn->reply_counter);
     pktbuf_destroy(ofconn->pktbuf);
+    rconn_packet_counter_destroy(ofconn->monitor_counter);
     free(ofconn);
 }
 
     free(ofconn);
 }
 
@@ -1646,3 +1670,239 @@ ofservice_lookup(struct connmgr *mgr, const char *target)
     }
     return NULL;
 }
     }
     return NULL;
 }
+\f
+/* Flow monitors (NXST_FLOW_MONITOR). */
+
+/* A counter incremented when something significant happens to an OpenFlow
+ * rule.
+ *
+ *     - When a rule is added, its 'add_seqno' and 'modify_seqno' are set to
+ *       the current value (which is then incremented).
+ *
+ *     - When a rule is modified, its 'modify_seqno' is set to the current
+ *       value (which is then incremented).
+ *
+ * Thus, by comparing an old value of monitor_seqno against a rule's
+ * 'add_seqno', one can tell whether the rule was added before or after the old
+ * value was read, and similarly for 'modify_seqno'.
+ *
+ * 32 bits should normally be sufficient (and would be nice, to save space in
+ * each rule) but then we'd have to have some special cases for wraparound.
+ *
+ * We initialize monitor_seqno to 1 to allow 0 to be used as an invalid
+ * value. */
+static uint64_t monitor_seqno = 1;
+
+COVERAGE_DEFINE(ofmonitor_pause);
+COVERAGE_DEFINE(ofmonitor_resume);
+
+enum ofperr
+ofmonitor_create(const struct ofputil_flow_monitor_request *request,
+                 struct ofconn *ofconn, struct ofmonitor **monitorp)
+{
+    struct ofmonitor *m;
+
+    *monitorp = NULL;
+
+    m = ofmonitor_lookup(ofconn, request->id);
+    if (m) {
+        return OFPERR_NXBRC_FM_DUPLICATE_ID;
+    }
+
+    m = xmalloc(sizeof *m);
+    m->ofconn = ofconn;
+    hmap_insert(&ofconn->monitors, &m->ofconn_node, hash_int(request->id, 0));
+    m->id = request->id;
+    m->flags = request->flags;
+    m->out_port = request->out_port;
+    m->table_id = request->table_id;
+    m->match = request->match;
+
+    *monitorp = m;
+    return 0;
+}
+
+struct ofmonitor *
+ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
+{
+    struct ofmonitor *m;
+
+    HMAP_FOR_EACH_IN_BUCKET (m, ofconn_node, hash_int(id, 0),
+                             &ofconn->monitors) {
+        if (m->id == id) {
+            return m;
+        }
+    }
+    return NULL;
+}
+
+void
+ofmonitor_destroy(struct ofmonitor *m)
+{
+    if (m) {
+        hmap_remove(&m->ofconn->monitors, &m->ofconn_node);
+        free(m);
+    }
+}
+
+void
+ofmonitor_report(struct connmgr *mgr, struct rule *rule,
+                 enum nx_flow_update_event event,
+                 enum ofp_flow_removed_reason reason,
+                 const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+{
+    enum nx_flow_monitor_flags update;
+    struct ofconn *ofconn;
+
+    switch (event) {
+    case NXFME_ADDED:
+        update = NXFMF_ADD;
+        rule->add_seqno = rule->modify_seqno = monitor_seqno++;
+        break;
+
+    case NXFME_DELETED:
+        update = NXFMF_DELETE;
+        break;
+
+    case NXFME_MODIFIED:
+        update = NXFMF_MODIFY;
+        rule->modify_seqno = monitor_seqno++;
+        break;
+
+    default:
+    case NXFME_ABBREV:
+        NOT_REACHED();
+    }
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        enum nx_flow_monitor_flags flags = 0;
+        struct ofmonitor *m;
+
+        if (ofconn->monitor_paused) {
+            /* Only send NXFME_DELETED notifications for flows that were added
+             * before we paused. */
+            if (event != NXFME_DELETED
+                || rule->add_seqno > ofconn->monitor_paused) {
+                continue;
+            }
+        }
+
+        HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+            if (m->flags & update
+                && (m->table_id == 0xff || m->table_id == rule->table_id)
+                && ofoperation_has_out_port(rule->pending, m->out_port)
+                && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+                flags |= m->flags;
+            }
+        }
+
+        if (flags) {
+            if (list_is_empty(&ofconn->updates)) {
+                ofputil_start_flow_update(&ofconn->updates);
+                ofconn->sent_abbrev_update = false;
+            }
+
+            if (ofconn != abbrev_ofconn || ofconn->monitor_paused) {
+                struct ofputil_flow_update fu;
+
+                fu.event = event;
+                fu.reason = event == NXFME_DELETED ? reason : 0;
+                fu.idle_timeout = rule->idle_timeout;
+                fu.hard_timeout = rule->hard_timeout;
+                fu.table_id = rule->table_id;
+                fu.cookie = rule->flow_cookie;
+                fu.match = &rule->cr;
+                if (flags & NXFMF_ACTIONS) {
+                    fu.ofpacts = rule->ofpacts;
+                    fu.ofpacts_len = rule->ofpacts_len;
+                } else {
+                    fu.ofpacts = NULL;
+                    fu.ofpacts_len = 0;
+                }
+                ofputil_append_flow_update(&fu, &ofconn->updates);
+            } else if (!ofconn->sent_abbrev_update) {
+                struct ofputil_flow_update fu;
+
+                fu.event = NXFME_ABBREV;
+                fu.xid = abbrev_xid;
+                ofputil_append_flow_update(&fu, &ofconn->updates);
+
+                ofconn->sent_abbrev_update = true;
+            }
+        }
+    }
+}
+
+void
+ofmonitor_flush(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        struct ofpbuf *msg, *next;
+
+        LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+            list_remove(&msg->list_node);
+            ofconn_send(ofconn, msg, ofconn->monitor_counter);
+            if (!ofconn->monitor_paused
+                && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+                struct ofpbuf *pause;
+
+                COVERAGE_INC(ofmonitor_pause);
+                ofconn->monitor_paused = monitor_seqno++;
+                make_nxmsg_xid(sizeof(struct nicira_header),
+                               NXT_FLOW_MONITOR_PAUSED, htonl(0), &pause);
+                ofconn_send(ofconn, pause, ofconn->monitor_counter);
+            }
+        }
+    }
+}
+
+static void
+ofmonitor_resume(struct ofconn *ofconn)
+{
+    struct ofpbuf *resume;
+    struct ofmonitor *m;
+    struct list rules;
+    struct list msgs;
+
+    list_init(&rules);
+    HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+        ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
+    }
+
+    list_init(&msgs);
+    ofmonitor_compose_refresh_updates(&rules, &msgs);
+
+    make_nxmsg_xid(sizeof(struct nicira_header),
+                   NXT_FLOW_MONITOR_RESUMED, htonl(0), &resume);
+    list_push_back(&msgs, &resume->list_node);
+    ofconn_send_replies(ofconn, &msgs);
+
+    ofconn->monitor_paused = 0;
+}
+
+static void
+ofmonitor_run(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+            COVERAGE_INC(ofmonitor_resume);
+            ofmonitor_resume(ofconn);
+        }
+    }
+}
+
+static void
+ofmonitor_wait(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+            poll_immediate_wake();
+        }
+    }
+}
index dec5b71..24a33fb 100644 (file)
@@ -17,6 +17,7 @@
 #ifndef CONNMGR_H
 #define CONNMGR_H 1
 
 #ifndef CONNMGR_H
 #define CONNMGR_H 1
 
+#include "classifier.h"
 #include "hmap.h"
 #include "list.h"
 #include "ofp-errors.h"
 #include "hmap.h"
 #include "list.h"
 #include "ofp-errors.h"
@@ -30,6 +31,7 @@ struct ofopgroup;
 struct ofputil_flow_removed;
 struct ofputil_packet_in;
 struct ofputil_phy_port;
 struct ofputil_flow_removed;
 struct ofputil_packet_in;
 struct ofputil_phy_port;
+struct rule;
 struct simap;
 struct sset;
 
 struct simap;
 struct sset;
 
@@ -159,4 +161,34 @@ bool connmgr_may_set_up_flow(struct connmgr *, const struct flow *,
 /* Fail-open and in-band implementation. */
 void connmgr_flushed(struct connmgr *);
 
 /* Fail-open and in-band implementation. */
 void connmgr_flushed(struct connmgr *);
 
+/* A flow monitor managed by NXST_FLOW_MONITOR and related requests. */
+struct ofmonitor {
+    struct ofconn *ofconn;      /* Owning 'ofconn'. */
+    struct hmap_node ofconn_node; /* In ofconn's 'monitors' hmap. */
+    uint32_t id;
+
+    enum nx_flow_monitor_flags flags;
+
+    /* Matching. */
+    uint16_t out_port;
+    uint8_t table_id;
+    struct cls_rule match;
+};
+
+struct ofputil_flow_monitor_request;
+
+enum ofperr ofmonitor_create(const struct ofputil_flow_monitor_request *,
+                             struct ofconn *, struct ofmonitor **);
+struct ofmonitor *ofmonitor_lookup(struct ofconn *, uint32_t id);
+void ofmonitor_destroy(struct ofmonitor *);
+
+void ofmonitor_report(struct connmgr *, struct rule *,
+                      enum nx_flow_update_event, enum ofp_flow_removed_reason,
+                      const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid);
+void ofmonitor_flush(struct connmgr *);
+
+void ofmonitor_collect_resume_rules(struct ofmonitor *, uint64_t seqno,
+                                    struct list *rules);
+void ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs);
+
 #endif /* connmgr.h */
 #endif /* connmgr.h */
index f22c9f6..6eef106 100644 (file)
@@ -187,6 +187,11 @@ struct rule {
 
     struct ofpact *ofpacts;      /* Sequence of "struct ofpacts". */
     unsigned int ofpacts_len;    /* Size of 'ofpacts', in bytes. */
 
     struct ofpact *ofpacts;      /* Sequence of "struct ofpacts". */
     unsigned int ofpacts_len;    /* Size of 'ofpacts', in bytes. */
+
+    /* Flow monitors. */
+    enum nx_flow_monitor_flags monitor_flags;
+    uint64_t add_seqno;         /* Sequence number when added. */
+    uint64_t modify_seqno;      /* Sequence number when changed. */
 };
 
 static inline struct rule *
 };
 
 static inline struct rule *
@@ -199,9 +204,15 @@ void ofproto_rule_update_used(struct rule *, long long int used);
 void ofproto_rule_expire(struct rule *, uint8_t reason);
 void ofproto_rule_destroy(struct rule *);
 
 void ofproto_rule_expire(struct rule *, uint8_t reason);
 void ofproto_rule_destroy(struct rule *);
 
+bool ofproto_rule_has_out_port(const struct rule *, uint16_t out_port);
+
 void ofoperation_complete(struct ofoperation *, enum ofperr);
 struct rule *ofoperation_get_victim(struct ofoperation *);
 
 void ofoperation_complete(struct ofoperation *, enum ofperr);
 struct rule *ofoperation_get_victim(struct ofoperation *);
 
+bool ofoperation_has_out_port(const struct ofoperation *, uint16_t out_port);
+
+bool ofproto_rule_is_hidden(const struct rule *);
+
 /* ofproto class structure, to be defined by each ofproto implementation.
  *
  *
 /* ofproto class structure, to be defined by each ofproto implementation.
  *
  *
index 9340191..b187c86 100644 (file)
@@ -126,13 +126,17 @@ struct ofoperation {
     struct ofpact *ofpacts;
     size_t ofpacts_len;
 
     struct ofpact *ofpacts;
     size_t ofpacts_len;
 
+    /* OFOPERATION_DELETE. */
+    enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
     enum ofperr error;          /* 0 if no error. */
 };
 
 static struct ofoperation *ofoperation_create(struct ofopgroup *,
                                               struct rule *,
     ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
     enum ofperr error;          /* 0 if no error. */
 };
 
 static struct ofoperation *ofoperation_create(struct ofopgroup *,
                                               struct rule *,
-                                              enum ofoperation_type);
+                                              enum ofoperation_type,
+                                              enum ofp_flow_removed_reason);
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
 static void ofoperation_destroy(struct ofoperation *);
 
 /* oftable. */
@@ -188,7 +192,6 @@ static void reinit_ports(struct ofproto *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
 
 /* OpenFlow. */
 static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -952,7 +955,8 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                ofoperation_create(group, rule, OFOPERATION_DELETE,
+                                   OFPRR_DELETE);
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
                 oftable_remove_rule(rule);
                 ofproto->ofproto_class->rule_destruct(rule);
             }
@@ -1445,7 +1449,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
     } else {
         /* Initiate deletion -> success. */
         struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
         oftable_remove_rule(rule);
         ofproto->ofproto_class->rule_destruct(rule);
         ofopgroup_submit(group);
@@ -1894,13 +1898,36 @@ ofproto_rule_destroy(struct rule *rule)
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t port)
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
 {
     return (port == OFPP_NONE
             || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
 }
 
 {
     return (port == OFPP_NONE
             || ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
 }
 
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+    if (ofproto_rule_has_out_port(op->rule, out_port)) {
+        return true;
+    }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+    case OFOPERATION_DELETE:
+        return false;
+
+    case OFOPERATION_MODIFY:
+        return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
+    }
+
+    NOT_REACHED();
+}
+
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
  * statistics appropriately.  'packet' must have at least sizeof(struct
  * ofp_packet_in) bytes of headroom.
 /* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
  * statistics appropriately.  'packet' must have at least sizeof(struct
  * ofp_packet_in) bytes of headroom.
@@ -1925,8 +1952,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
  * (e.g. by in-band control) and are intentionally hidden from the
  * controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
 {
     return rule->cr.priority > UINT16_MAX;
 }
 {
     return rule->cr.priority > UINT16_MAX;
 }
@@ -2393,7 +2420,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2437,7 +2465,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
             if (rule->pending) {
                 return OFPROTO_POSTPONE;
             }
-            if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+            if (!ofproto_rule_is_hidden(rule)
+                && ofproto_rule_has_out_port(rule, out_port)
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
                     && !((rule->flow_cookie ^ cookie) & cookie_mask)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2855,6 +2884,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
     rule->ofpacts_len = fm->ofpacts_len;
     rule->evictable = true;
     rule->eviction_group = NULL;
+    rule->monitor_flags = 0;
+    rule->add_seqno = 0;
+    rule->modify_seqno = 0;
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
 
     /* Insert new rule. */
     victim = oftable_replace_rule(rule);
@@ -2886,7 +2918,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
         }
 
         group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-        op = ofoperation_create(group, rule, OFOPERATION_ADD);
+        op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
         op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
         op->victim = victim;
 
         error = ofproto->ofproto_class->rule_construct(rule);
@@ -2950,7 +2982,7 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
             continue;
         }
 
             continue;
         }
 
-        op = ofoperation_create(group, rule, OFOPERATION_MODIFY);
+        op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
         rule->flow_cookie = new_cookie;
         if (actions_changed) {
             op->ofpacts = rule->ofpacts;
         rule->flow_cookie = new_cookie;
         if (actions_changed) {
             op->ofpacts = rule->ofpacts;
@@ -3029,7 +3061,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
 
     ofproto_rule_send_removed(rule, OFPRR_DELETE);
 
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
 }
@@ -3094,7 +3126,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
 
 {
     struct ofputil_flow_removed fr;
 
-    if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+    if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
         return;
     }
 
         return;
     }
 
@@ -3144,7 +3176,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
     ofproto_rule_send_removed(rule, reason);
 
     group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
     oftable_remove_rule(rule);
     ofproto->ofproto_class->rule_destruct(rule);
     ofopgroup_submit(group);
@@ -3397,6 +3429,255 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
     return 0;
 }
 
     return 0;
 }
 
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+                                    enum nx_flow_monitor_flags flags,
+                                    struct list *msgs)
+{
+    struct ofoperation *op = rule->pending;
+    struct ofputil_flow_update fu;
+
+    if (op && op->type == OFOPERATION_ADD && !op->victim) {
+        /* We'll report the final flow when the operation completes.  Reporting
+         * it now would cause a duplicate report later. */
+        return;
+    }
+
+    fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+                ? NXFME_ADDED : NXFME_MODIFIED);
+    fu.reason = 0;
+    fu.idle_timeout = rule->idle_timeout;
+    fu.hard_timeout = rule->hard_timeout;
+    fu.table_id = rule->table_id;
+    fu.cookie = rule->flow_cookie;
+    fu.match = (struct cls_rule *) &rule->cr;
+    if (!(flags & NXFMF_ACTIONS)) {
+        fu.ofpacts = NULL;
+        fu.ofpacts_len = 0;
+    } else if (!op) {
+        fu.ofpacts = rule->ofpacts;
+        fu.ofpacts_len = rule->ofpacts_len;
+    } else {
+        /* An operation is in progress.  Use the previous version of the flow's
+         * actions, so that when the operation commits we report the change. */
+        switch (op->type) {
+        case OFOPERATION_ADD:
+            /* We already verified that there was a victim. */
+            fu.ofpacts = op->victim->ofpacts;
+            fu.ofpacts_len = op->victim->ofpacts_len;
+            break;
+
+        case OFOPERATION_MODIFY:
+            if (op->ofpacts) {
+                fu.ofpacts = op->ofpacts;
+                fu.ofpacts_len = op->ofpacts_len;
+            } else {
+                fu.ofpacts = rule->ofpacts;
+                fu.ofpacts_len = rule->ofpacts_len;
+            }
+            break;
+
+        case OFOPERATION_DELETE:
+            fu.ofpacts = rule->ofpacts;
+            fu.ofpacts_len = rule->ofpacts_len;
+            break;
+
+        default:
+            NOT_REACHED();
+        }
+    }
+
+    if (list_is_empty(msgs)) {
+        ofputil_start_flow_update(msgs);
+    }
+    ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+    struct rule *rule;
+
+    LIST_FOR_EACH (rule, ofproto_node, rules) {
+        enum nx_flow_monitor_flags flags = rule->monitor_flags;
+        rule->monitor_flags = 0;
+
+        ofproto_compose_flow_refresh_update(rule, flags, msgs);
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+                                       struct rule *rule, uint64_t seqno,
+                                       struct list *rules)
+{
+    enum nx_flow_monitor_flags update;
+
+    if (ofproto_rule_is_hidden(rule)) {
+        return;
+    }
+
+    if (!(rule->pending
+          ? ofoperation_has_out_port(rule->pending, m->out_port)
+          : ofproto_rule_has_out_port(rule, m->out_port))) {
+        return;
+    }
+
+    if (seqno) {
+        if (rule->add_seqno > seqno) {
+            update = NXFMF_ADD | NXFMF_MODIFY;
+        } else if (rule->modify_seqno > seqno) {
+            update = NXFMF_MODIFY;
+        } else {
+            return;
+        }
+
+        if (!(m->flags & update)) {
+            return;
+        }
+    } else {
+        update = NXFMF_INITIAL;
+    }
+
+    if (!rule->monitor_flags) {
+        list_push_back(rules, &rule->ofproto_node);
+    }
+    rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+                                        uint64_t seqno,
+                                        struct list *rules)
+{
+    const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+    const struct ofoperation *op;
+    const struct oftable *table;
+
+    FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+        struct cls_cursor cursor;
+        struct rule *rule;
+
+        cls_cursor_init(&cursor, &table->cls, &m->match);
+        CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            assert(!rule->pending); /* XXX */
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+
+    HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+        struct rule *rule = op->rule;
+
+        if (((m->table_id == 0xff
+              ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+              : m->table_id == rule->table_id))
+            && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+            ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+        }
+    }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+                                        struct list *rules)
+{
+    if (m->flags & NXFMF_INITIAL) {
+        ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+    }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+                               uint64_t seqno, struct list *rules)
+{
+    ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn,
+                            const struct ofp_stats_msg *osm)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofmonitor **monitors;
+    size_t n_monitors, allocated_monitors;
+    struct list replies;
+    enum ofperr error;
+    struct list rules;
+    struct ofpbuf b;
+    size_t i;
+
+    error = 0;
+    ofpbuf_use_const(&b, osm, ntohs(osm->header.length));
+    monitors = NULL;
+    n_monitors = allocated_monitors = 0;
+    for (;;) {
+        struct ofputil_flow_monitor_request request;
+        struct ofmonitor *m;
+        int retval;
+
+        retval = ofputil_decode_flow_monitor_request(&request, &b);
+        if (retval == EOF) {
+            break;
+        } else if (retval) {
+            error = retval;
+            goto error;
+        }
+
+        if (request.table_id != 0xff
+            && request.table_id >= ofproto->n_tables) {
+            error = OFPERR_OFPBRC_BAD_TABLE_ID;
+            goto error;
+        }
+
+        error = ofmonitor_create(&request, ofconn, &m);
+        if (error) {
+            goto error;
+        }
+
+        if (n_monitors >= allocated_monitors) {
+            monitors = x2nrealloc(monitors, &allocated_monitors,
+                                  sizeof *monitors);
+        }
+        monitors[n_monitors++] = m;
+    }
+
+    list_init(&rules);
+    for (i = 0; i < n_monitors; i++) {
+        ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+    }
+
+    ofputil_start_stats_reply(osm, &replies);
+    ofmonitor_compose_refresh_updates(&rules, &replies);
+    ofconn_send_replies(ofconn, &replies);
+
+    free(monitors);
+
+    return 0;
+
+error:
+    for (i = 0; i < n_monitors; i++) {
+        ofmonitor_destroy(monitors[i]);
+    }
+    free(monitors);
+    return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofmonitor *m;
+    uint32_t id;
+
+    id = ofputil_decode_flow_monitor_cancel(oh);
+    m = ofmonitor_lookup(ofconn, id);
+    if (!m) {
+        return OFPERR_NXBRC_FM_BAD_ID;
+    }
+
+    ofmonitor_destroy(m);
+    return 0;
+}
+
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
 static enum ofperr
 handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
 {
@@ -3462,6 +3743,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
         /* Nothing to do. */
         return 0;
 
         /* Nothing to do. */
         return 0;
 
+    case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+        return handle_flow_monitor_cancel(ofconn, oh);
+
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
     case OFPUTIL_NXT_SET_ASYNC_CONFIG:
         return handle_nxt_set_async_config(ofconn, oh);
 
@@ -3489,6 +3773,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPUTIL_OFPST_PORT_DESC_REQUEST:
         return handle_port_desc_stats_request(ofconn, msg->data);
 
     case OFPUTIL_OFPST_PORT_DESC_REQUEST:
         return handle_port_desc_stats_request(ofconn, msg->data);
 
+    case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
+        return handle_flow_monitor_request(ofconn, msg->data);
+
     case OFPUTIL_MSG_INVALID:
     case OFPUTIL_OFPT_HELLO:
     case OFPUTIL_OFPT_ERROR:
     case OFPUTIL_MSG_INVALID:
     case OFPUTIL_OFPT_HELLO:
     case OFPUTIL_OFPT_ERROR:
@@ -3510,8 +3797,11 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPUTIL_NXT_ROLE_REPLY:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_PACKET_IN:
     case OFPUTIL_NXT_ROLE_REPLY:
     case OFPUTIL_NXT_FLOW_REMOVED:
     case OFPUTIL_NXT_PACKET_IN:
+    case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+    case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
     case OFPUTIL_NXST_FLOW_REPLY:
     case OFPUTIL_NXST_AGGREGATE_REPLY:
+    case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
     default:
         return (oh->type == OFPT10_STATS_REQUEST ||
                 oh->type == OFPT10_STATS_REPLY
     default:
         return (oh->type == OFPT10_STATS_REQUEST ||
                 oh->type == OFPT10_STATS_REPLY
@@ -3600,6 +3890,10 @@ static void
 ofopgroup_complete(struct ofopgroup *group)
 {
     struct ofproto *ofproto = group->ofproto;
 ofopgroup_complete(struct ofopgroup *group)
 {
     struct ofproto *ofproto = group->ofproto;
+
+    struct ofconn *abbrev_ofconn;
+    ovs_be32 abbrev_xid;
+
     struct ofoperation *op, *next_op;
     int error;
 
     struct ofoperation *op, *next_op;
     int error;
 
@@ -3630,8 +3924,28 @@ ofopgroup_complete(struct ofopgroup *group)
         }
     }
 
         }
     }
 
+    if (!error && !list_is_empty(&group->ofconn_node)) {
+        abbrev_ofconn = group->ofconn;
+        abbrev_xid = group->request->xid;
+    } else {
+        abbrev_ofconn = NULL;
+        abbrev_xid = htonl(0);
+    }
     LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
         struct rule *rule = op->rule;
     LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
         struct rule *rule = op->rule;
+
+        if (!op->error && !ofproto_rule_is_hidden(rule)) {
+            /* Check that we can just cast from ofoperation_type to
+             * nx_flow_update_event. */
+            BUILD_ASSERT_DECL(OFOPERATION_ADD == NXFME_ADDED);
+            BUILD_ASSERT_DECL(OFOPERATION_DELETE == NXFME_DELETED);
+            BUILD_ASSERT_DECL(OFOPERATION_MODIFY == NXFME_MODIFIED);
+
+            ofmonitor_report(ofproto->connmgr, rule,
+                             (enum nx_flow_update_event) op->type,
+                             op->reason, abbrev_ofconn, abbrev_xid);
+        }
+
         rule->pending = NULL;
 
         switch (op->type) {
         rule->pending = NULL;
 
         switch (op->type) {
@@ -3685,6 +3999,8 @@ ofopgroup_complete(struct ofopgroup *group)
         ofoperation_destroy(op);
     }
 
         ofoperation_destroy(op);
     }
 
+    ofmonitor_flush(ofproto->connmgr);
+
     if (!list_is_empty(&group->ofproto_node)) {
         assert(ofproto->n_pending > 0);
         ofproto->n_pending--;
     if (!list_is_empty(&group->ofproto_node)) {
         assert(ofproto->n_pending > 0);
         ofproto->n_pending--;
@@ -3704,11 +4020,15 @@ ofopgroup_complete(struct ofopgroup *group)
 /* Initiates a new operation on 'rule', of the specified 'type', within
  * 'group'.  Prior to calling, 'rule' must not have any pending operation.
  *
 /* Initiates a new operation on 'rule', of the specified 'type', within
  * 'group'.  Prior to calling, 'rule' must not have any pending operation.
  *
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted.  For other 'type's, 'reason' is ignored (use 0).
+ *
  * Returns the newly created ofoperation (which is also available as
  * rule->pending). */
 static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
  * Returns the newly created ofoperation (which is also available as
  * rule->pending). */
 static struct ofoperation *
 ofoperation_create(struct ofopgroup *group, struct rule *rule,
-                   enum ofoperation_type type)
+                   enum ofoperation_type type,
+                   enum ofp_flow_removed_reason reason)
 {
     struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
 {
     struct ofproto *ofproto = group->ofproto;
     struct ofoperation *op;
@@ -3720,6 +4040,7 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
     list_push_back(&group->ops, &op->group_node);
     op->rule = rule;
     op->type = type;
+    op->reason = reason;
     op->flow_cookie = rule->flow_cookie;
 
     group->n_running++;
     op->flow_cookie = rule->flow_cookie;
 
     group->n_running++;
@@ -3891,7 +4212,8 @@ ofproto_evict(struct ofproto *ofproto)
                 break;
             }
 
                 break;
             }
 
-            ofoperation_create(group, rule, OFOPERATION_DELETE);
+            ofoperation_create(group, rule,
+                               OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }
             oftable_remove_rule(rule);
             ofproto->ofproto_class->rule_destruct(rule);
         }
index 2014851..1fe54f1 100644 (file)
@@ -810,6 +810,34 @@ NXT_SET_CONTROLLER_ID (xid=0x3): id=123
 ])
 AT_CLEANUP
 
 ])
 AT_CLEANUP
 
+AT_SETUP([NXT_FLOW_MONITOR_CANCEL])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 14 00 00 00 03 00 00 23 20 00 00 00 15 \
+01 02 30 40 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_CANCEL (xid=0x3): id=16920640
+])
+AT_CLEANUP
+
+AT_SETUP([NXT_FLOW_MONITOR_PAUSED])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 16 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_PAUSED (xid=0x3):
+])
+AT_CLEANUP
+
+AT_SETUP([NXT_FLOW_MONITOR_RESUMED])
+AT_KEYWORDS([ofp-print])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 04 00 10 00 00 00 03 00 00 23 20 00 00 00 17 \
+"], [0], [dnl
+NXT_FLOW_MONITOR_RESUMED (xid=0x3):
+])
+AT_CLEANUP
+
 AT_SETUP([NXT_SET_FLOW_FORMAT])
 AT_KEYWORDS([ofp-print])
 AT_CHECK([ovs-ofctl ofp-print "\
 AT_SETUP([NXT_SET_FLOW_FORMAT])
 AT_KEYWORDS([ofp-print])
 AT_CHECK([ovs-ofctl ofp-print "\
@@ -1061,3 +1089,30 @@ AT_CHECK([ovs-ofctl ofp-print "\
 NXST_AGGREGATE reply (xid=0x4): packet_count=7 byte_count=420 flow_count=7
 ])
 AT_CLEANUP
 NXST_AGGREGATE reply (xid=0x4): packet_count=7 byte_count=420 flow_count=7
 ])
 AT_CLEANUP
+
+AT_SETUP([NXST_FLOW_MONITOR request])
+AT_KEYWORDS([ofp-print OFPT_STATS_REPLY])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 10 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \
+00 00 40 00 00 3f ff fe 00 00 01 00 00 00 00 00 \
+00 00 20 00 00 04 ff ff 00 06 02 00 00 00 00 00 00 00 00 02 00 01 00 00 \
+"], [0], [dnl
+NXST_FLOW_MONITOR request (xid=0x4):
+ id=16384 flags=initial,add,delete,modify,actions,own out_port=LOCAL table=1
+ id=8192 flags=delete table=2 in_port=1
+])
+AT_CLEANUP
+
+AT_SETUP([NXST_FLOW_MONITOR reply])
+AT_KEYWORDS([ofp-print OFPT_STATS_REPLY])
+AT_CHECK([ovs-ofctl ofp-print "\
+01 11 00 40 00 00 00 04 ff ff 00 00 00 00 23 20 00 00 00 02 00 00 00 00 \
+00 20 00 01 00 04 80 00 00 05 00 10 00 06 01 00 12 34 56 78 9a bc de f0 \
+00 00 00 02 00 01 00 00 \
+00 08 00 03 00 01 86 a0 \
+"], [0], [dnl
+NXST_FLOW_MONITOR reply (xid=0x4):
+ event=DELETED reason=eviction table=1 idle_timeout=5 hard_timeout=16 cookie=0x123456789abcdef0 in_port=1
+ event=ABBREV xid=0x186a0
+])
+AT_CLEANUP
index d703fa8..804965b 100644 (file)
@@ -736,3 +736,133 @@ OFPT_BARRIER_REPLY:
 
 OVS_VSWITCHD_STOP
 AT_CLEANUP
 
 OVS_VSWITCHD_STOP
 AT_CLEANUP
+
+AT_SETUP([ofproto - flow monitoring])
+AT_KEYWORDS([monitor])
+OVS_VSWITCHD_START
+
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:1
+
+# Start a monitor watching the flow table and check the initial reply.
+ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1
+AT_CAPTURE_FILE([monitor.log])
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+  [NXST_FLOW_MONITOR reply:
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:1
+OFPT_BARRIER_REPLY:
+])
+
+# Add, delete, and modify some flows and check the updates.
+ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=124,actions=output:2
+ovs-ofctl add-flow br0 in_port=0,dl_vlan=123,actions=output:5
+ovs-ofctl mod-flows br0 cookie=5,dl_vlan=123,actions=output:3
+ovs-ofctl del-flows br0 dl_vlan=123
+ovs-ofctl del-flows br0
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+[NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=0,dl_vlan=123 actions=output:5
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=MODIFIED table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=DELETED reason=delete table=0 cookie=0x5 in_port=0,dl_vlan=123 actions=output:3
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=DELETED reason=delete table=0 cookie=0 in_port=0,dl_vlan=124 actions=output:2
+OFPT_BARRIER_REPLY:
+])
+
+# Check that our own changes are reported as abbreviations.
+ovs-appctl -t ovs-ofctl ofctl/set-output-file monitor.log
+ovs-ofctl add-flow br0 in_port=1,actions=output:2
+ovs-ofctl add-flow br0 in_port=2,actions=output:1
+ovs-appctl -t ovs-ofctl ofctl/send 010e004812345678003fffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003000000000000ffffffffffff0000
+ovs-appctl -t ovs-ofctl ofctl/barrier
+AT_CHECK([ovs-ofctl dump-flows br0 | ofctl_strip], [0], [NXST_FLOW reply:
+])
+AT_CHECK([sed 's/ (xid=0x[[1-9a-fA-F]][[0-9a-fA-F]]*)//' monitor.log], [0],
+[NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=1 actions=output:2
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ADDED table=0 cookie=0 in_port=2 actions=output:1
+send: OFPT_FLOW_MOD: DEL priority=0 actions=drop
+NXST_FLOW_MONITOR reply (xid=0x0):
+ event=ABBREV xid=0x12345678
+OFPT_BARRIER_REPLY:
+])
+
+ovs-appctl -t ovs-ofctl exit
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
+AT_SETUP([ofproto - flow monitoring pause and resume])
+AT_KEYWORDS([monitor])
+
+# With a Linux kernel, this file has the maximum socket receive buffer
+# size.  That's important for this test, which tests behavior when the
+# receive buffer overflows.
+AT_SKIP_IF([test ! -e /proc/sys/net/core/rmem_max])
+
+# Calculate the total amount of queuing: rmem_max in the kernel, 128 kB
+# in ofproto sending userspace (see ofmonitor_flush() in connmgr.c).
+rmem_max=`cat /proc/sys/net/core/rmem_max`
+queue_size=`expr $rmem_max + 128 \* 1024`
+echo rmem_max=$rmem_max queue_size=$queue_size
+
+# Each flow update message takes up at least 48 bytes of space in queues
+# and in practice more than that.
+n_msgs=`expr $queue_size / 48`
+echo n_msgs=$n_msgs
+
+OVS_VSWITCHD_START
+
+# Start a monitor watching the flow table, then make it block.
+ovs-ofctl monitor br0 watch: --detach --no-chdir --pidfile >monitor.log 2>&1
+AT_CAPTURE_FILE([monitor.log])
+ovs-appctl -t ovs-ofctl ofctl/block
+
+# Add $n_msgs flows.
+(echo "in_port=2,actions=output:2"
+perl -e '
+    for ($i = 0; $i < '$n_msgs'; $i++) {
+        print "cookie=1,reg1=$i,actions=drop\n";
+    }
+') > flows.txt
+AT_CHECK([ovs-ofctl add-flows br0 flows.txt])
+AT_CHECK([ovs-ofctl add-flow br0 in_port=1,cookie=3,actions=drop])
+AT_CHECK([ovs-ofctl mod-flows br0 in_port=2,cookie=2,actions=output:2])
+AT_CHECK([ovs-ofctl del-flows br0 cookie=1/-1])
+
+ovs-appctl -t ovs-ofctl ofctl/unblock
+ovs-appctl -t ovs-ofctl ofctl/barrier
+
+ovs-appctl -t ovs-ofctl exit
+
+# Check that the flow monitor reported the same number of flows
+# added and deleted, but fewer than we actually added and deleted.
+adds=`grep -c 'ADDED.*reg1=' monitor.log`
+deletes=`grep -c 'DELETED.*reg1=' monitor.log`
+echo adds=$adds deletes=$deletes
+AT_CHECK([test $adds -gt 100 && test $adds -lt $n_msgs])
+AT_CHECK([test $adds = $deletes])
+
+# Check that the flow monitor reported everything in the expected order.
+AT_CHECK([ofctl_strip < monitor.log | sed -n -e '
+/reg1=0x22\b/p
+/cookie=0x[[23]]/p
+/NXT_FLOW_MONITOR_PAUSED:/p
+/NXT_FLOW_MONITOR_RESUMED:/p
+'], [0],
+[ event=ADDED table=0 cookie=0x1 reg1=0x22
+NXT_FLOW_MONITOR_PAUSED:
+ event=DELETED reason=delete table=0 cookie=0x1 reg1=0x22
+ event=ADDED table=0 cookie=0x3 in_port=1
+ event=MODIFIED table=0 cookie=0x2 in_port=2 actions=output:2
+NXT_FLOW_MONITOR_RESUMED:
+])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
index ebfde0f..65fc6e8 100644 (file)
@@ -282,7 +282,7 @@ If a switch has no controller configured, or if
 the configured controller is disconnected, no traffic is sent, so
 monitoring will not show any traffic.
 .
 the configured controller is disconnected, no traffic is sent, so
 monitoring will not show any traffic.
 .
-.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR]"
+.IP "\fBmonitor \fIswitch\fR [\fImiss-len\fR] [\fBinvalid_ttl\fR] [\fBwatch:\fR[\fIspec\fR...]]"
 Connects to \fIswitch\fR and prints to the console all OpenFlow
 messages received.  Usually, \fIswitch\fR should specify the name of a
 bridge in the \fBovs\-vswitchd\fR database.
 Connects to \fIswitch\fR and prints to the console all OpenFlow
 messages received.  Usually, \fIswitch\fR should specify the name of a
 bridge in the \fBovs\-vswitchd\fR database.
@@ -295,15 +295,46 @@ does not send these and other asynchronous messages to an
 specified on this argument.  (Thus, if \fImiss\-len\fR is not
 specified, very little traffic will ordinarily be printed.)
 .IP
 specified on this argument.  (Thus, if \fImiss\-len\fR is not
 specified, very little traffic will ordinarily be printed.)
 .IP
-.IP
 If \fBinvalid_ttl\fR is passed, \fBovs\-ofctl\fR sends an OpenFlow ``set
 configuration'' message at connection setup time that requests
 \fBINVALID_TTL_TO_CONTROLLER\fR, so that \fBovs\-ofctl monitor\fR can
 receive ``packet-in'' messages when TTL reaches zero on \fBdec_ttl\fR action.
 .IP
 If \fBinvalid_ttl\fR is passed, \fBovs\-ofctl\fR sends an OpenFlow ``set
 configuration'' message at connection setup time that requests
 \fBINVALID_TTL_TO_CONTROLLER\fR, so that \fBovs\-ofctl monitor\fR can
 receive ``packet-in'' messages when TTL reaches zero on \fBdec_ttl\fR action.
 .IP
-
+\fBwatch:\fR[\fB\fIspec\fR...] causes \fBovs\-ofctl\fR to send a
+``monitor request'' Nicira extension message to the switch at
+connection setup time.  This message causes the switch to send
+information about flow table changes as they occur.  The following
+comma-separated \fIspec\fR syntax is available:
+.RS
+.IP "\fB!initial\fR"
+Do not report the switch's initial flow table contents.
+.IP "\fB!add\fR"
+Do not report newly added flows.
+.IP "\fB!delete\fR"
+Do not report deleted flows.
+.IP "\fB!modify\fR"
+Do not report modifications to existing flows.
+.IP "\fB!own\fR"
+Abbreviate changes made to the flow table by \fBovs\-ofctl\fR's own
+connection to the switch.  (These could only occur using the
+\fBofctl/send\fR command described below under \fBRUNTIME MANAGEMENT
+COMMANDS\fR.)
+.IP "\fB!actions\fR"
+Do not report actions as part of flow updates.
+.IP "\fBtable=\fInumber\fR"
+Limits the monitoring to the table with the given \fInumber\fR between
+0 and 254.  By default, all tables are monitored.
+.IP "\fBout_port=\fIport\fR"
+If set, only flows that output to \fIport\fR are monitored.
+.IP "\fIfield\fB=\fIvalue\fR"
+Monitors only flows that have \fIfield\fR specified as the given
+\fIvalue\fR.  Any syntax valid for matching on \fBdump\-flows\fR may
+be used.
+.RE
+.IP
 This command may be useful for debugging switch or controller
 This command may be useful for debugging switch or controller
-implementations.
+implementations.  With \fBwatch:\fR, it is particularly useful for
+observing how a controller updates flow tables.
 .
 .SS "OpenFlow Switch and Controller Commands"
 .
 .
 .SS "OpenFlow Switch and Controller Commands"
 .
index 1c75f46..d633d1c 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <config.h>
  */
 
 #include <config.h>
+#include <ctype.h>
 #include <errno.h>
 #include <getopt.h>
 #include <inttypes.h>
 #include <errno.h>
 #include <getopt.h>
 #include <inttypes.h>
@@ -278,7 +279,7 @@ usage(void)
            "  diff-flows SOURCE1 SOURCE2  compare flows from two sources\n"
            "  packet-out SWITCH IN_PORT ACTIONS PACKET...\n"
            "                              execute ACTIONS on PACKET\n"
            "  diff-flows SOURCE1 SOURCE2  compare flows from two sources\n"
            "  packet-out SWITCH IN_PORT ACTIONS PACKET...\n"
            "                              execute ACTIONS on PACKET\n"
-           "  monitor SWITCH [MISSLEN] [invalid_ttl]\n"
+           "  monitor SWITCH [MISSLEN] [invalid_ttl] [watch:[...]]\n"
            "                              print packets received from SWITCH\n"
            "  snoop SWITCH                snoop on SWITCH and its controller\n"
            "\nFor OpenFlow switches and controllers:\n"
            "                              print packets received from SWITCH\n"
            "  snoop SWITCH                snoop on SWITCH and its controller\n"
            "\nFor OpenFlow switches and controllers:\n"
@@ -1255,10 +1256,57 @@ ofctl_set_output_file(struct unixctl_conn *conn, int argc OVS_UNUSED,
     unixctl_command_reply(conn, NULL);
 }
 
     unixctl_command_reply(conn, NULL);
 }
 
+struct block_aux {
+    struct vconn *vconn;
+    struct unixctl_server *server;
+    bool blocked;
+};
+
+static void
+ofctl_block(struct unixctl_conn *conn, int argc OVS_UNUSED,
+            const char *argv[] OVS_UNUSED, void *block_)
+{
+    struct block_aux *block = block_;
+
+    if (block->blocked) {
+        unixctl_command_reply(conn, "already blocking");
+        return;
+    }
+
+    block->blocked = true;
+    unixctl_command_reply(conn, NULL);
+    for (;;) {
+        unixctl_server_run(block->server);
+        if (!block->blocked) {
+            break;
+        }
+        vconn_run(block->vconn);
+
+        unixctl_server_wait(block->server);
+        vconn_run_wait(block->vconn);
+        poll_block();
+    }
+}
+
+static void
+ofctl_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED,
+              const char *argv[] OVS_UNUSED, void *block_)
+{
+    struct block_aux *block = block_;
+
+    if (!block->blocked) {
+        unixctl_command_reply(conn, "not blocking");
+    } else {
+        block->blocked = false;
+        unixctl_command_reply(conn, NULL);
+    }
+}
+
 static void
 monitor_vconn(struct vconn *vconn)
 {
     struct barrier_aux barrier_aux = { vconn, NULL };
 static void
 monitor_vconn(struct vconn *vconn)
 {
     struct barrier_aux barrier_aux = { vconn, NULL };
+    struct block_aux block;
     struct unixctl_server *server;
     bool exiting = false;
     int error;
     struct unixctl_server *server;
     bool exiting = false;
     int error;
@@ -1276,6 +1324,13 @@ monitor_vconn(struct vconn *vconn)
                              ofctl_barrier, &barrier_aux);
     unixctl_command_register("ofctl/set-output-file", "FILE", 1, 1,
                              ofctl_set_output_file, NULL);
                              ofctl_barrier, &barrier_aux);
     unixctl_command_register("ofctl/set-output-file", "FILE", 1, 1,
                              ofctl_set_output_file, NULL);
+
+    block.vconn = vconn;
+    block.server = server;
+    block.blocked = false;
+    unixctl_command_register("ofctl/block", "", 0, 0, ofctl_block, &block);
+    unixctl_command_register("ofctl/unblock", "", 0, 0, ofctl_unblock, &block);
+
     daemonize_complete();
 
     for (;;) {
     daemonize_complete();
 
     for (;;) {
@@ -1329,20 +1384,34 @@ static void
 ofctl_monitor(int argc, char *argv[])
 {
     struct vconn *vconn;
 ofctl_monitor(int argc, char *argv[])
 {
     struct vconn *vconn;
+    int i;
 
     open_vconn(argv[1], &vconn);
 
     open_vconn(argv[1], &vconn);
-    if (argc > 2) {
-        struct ofp_switch_config config;
+    for (i = 2; i < argc; i++) {
+        const char *arg = argv[i];
 
 
-        fetch_switch_config(vconn, &config);
-        config.miss_send_len = htons(atoi(argv[2]));
-        set_switch_config(vconn, &config);
-    }
-    if (argc > 3) {
-        if (!strcmp(argv[3], "invalid_ttl")) {
+        if (isdigit((unsigned char) *arg)) {
+            struct ofp_switch_config config;
+
+            fetch_switch_config(vconn, &config);
+            config.miss_send_len = htons(atoi(arg));
+            set_switch_config(vconn, &config);
+        } else if (!strcmp(arg, "invalid_ttl")) {
             monitor_set_invalid_ttl_to_controller(vconn);
             monitor_set_invalid_ttl_to_controller(vconn);
+        } else if (!strncmp(arg, "watch:", 6)) {
+            struct ofputil_flow_monitor_request fmr;
+            struct ofpbuf *msg;
+
+            parse_flow_monitor_request(&fmr, arg + 6);
+
+            msg = ofpbuf_new(0);
+            ofputil_append_flow_monitor_request(&fmr, msg);
+            dump_stats_transaction__(vconn, msg);
+        } else {
+            ovs_fatal(0, "%s: unsupported \"monitor\" argument", arg);
         }
     }
         }
     }
+
     if (preferred_packet_in_format >= 0) {
         set_packet_in_format(vconn, preferred_packet_in_format);
     } else {
     if (preferred_packet_in_format >= 0) {
         set_packet_in_format(vconn, preferred_packet_in_format);
     } else {