ofproto: Fully construct rules before putting them in the classifier.
authorBen Pfaff <blp@nicira.com>
Mon, 26 Aug 2013 19:45:55 +0000 (12:45 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 28 Aug 2013 04:45:37 +0000 (21:45 -0700)
add_flow() in ofproto.c has a race: it adds a new flow to the flow table
before calling ->rule_construct().  That means that (in ofproto-dpif) the
new flow is visible to the forwarder threads before gets properly
initialized.

One solution would be to lock the flow table across the entire operation,
but this is not desirable:

   * We would need a write-lock but this would be expensive for
     implementing "learn" flow_mods that often don't really modify anything
     at all.

   * The code would need to keep the lock across a couple of different calls
     into the client, which seems undesirable if it can be avoided.

   * The code in add_flow() is difficult to understand already.

Instead, I've decided to refactor add_flow() to simplify it and make it
easier to understand.  I think this will also improve the potential for
concurrency.

This commit completes the initial refactoring and solves the race.  It makes
two key changes:

    1. It disentangles insertion of a new flow from evicting some existing
       flow to make room for it (if necessary).  Instead, if inserting a
       new flow would make the flow table overfull, it evicts a flow and
       commits that operation before it attempts the insertion.  This is
       a user-visible change in behavior, in that previously a flow table
       insertion could never cause the number of flows in the flow table
       to decrease, and now it theoretically could if the eviction succeeds
       but the insertion fails.  However, I do not think that this is a
       serious problem.

    2. It adds two new steps to the life cycle of a rule.  Previously,
       rules had "alloc", "construct", "destruct", and "dealloc" steps,
       like other ofproto objects do.  This adds "insert" and "delete"
       steps between "construct" and "destruct".  The new steps are
       intended to handle the actual insertion and deletion into the
       datapath flow table.

Signed-off-by: Ben Pfaff <blp@nicira.com>
Acked-by: Ethan Jackson <ethan@nicira.com>
ofproto/ofproto-dpif.c
ofproto/ofproto-provider.h
ofproto/ofproto.c

index 8d82512..d4c8f73 100644 (file)
@@ -1437,10 +1437,11 @@ destruct(struct ofproto *ofproto_)
         ovs_rwlock_wrlock(&table->cls.rwlock);
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, up.cr, &cursor) {
-            ofproto_rule_destroy(&ofproto->up, &table->cls, &rule->up);
+            ofproto_rule_delete(&ofproto->up, &table->cls, &rule->up);
         }
         ovs_rwlock_unlock(&table->cls.rwlock);
     }
+    complete_operations(ofproto);
 
     ovs_mutex_lock(&ofproto->flow_mod_mutex);
     LIST_FOR_EACH_SAFE (fm, next_fm, list_node, &ofproto->flow_mods) {
@@ -3975,12 +3976,8 @@ rule_expire(struct rule_dpif *rule)
         return;
     }
 
-    if (!ovs_rwlock_trywrlock(&rule->up.evict)) {
-        COVERAGE_INC(ofproto_dpif_expired);
-
-        /* Get rid of the rule. */
-        ofproto_rule_expire(&rule->up, reason);
-    }
+    COVERAGE_INC(ofproto_dpif_expired);
+    ofproto_rule_expire(&rule->up, reason);
 }
 \f
 /* Facets. */
@@ -4892,15 +4889,27 @@ rule_construct(struct rule *rule_)
     rule->packet_count = 0;
     rule->byte_count = 0;
     ovs_mutex_unlock(&rule->stats_mutex);
-    complete_operation(rule);
     return 0;
 }
 
 static void
-rule_destruct(struct rule *rule_)
+rule_insert(struct rule *rule_)
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
     complete_operation(rule);
+}
+
+static void
+rule_delete(struct rule *rule_)
+{
+    struct rule_dpif *rule = rule_dpif_cast(rule_);
+    complete_operation(rule);
+}
+
+static void
+rule_destruct(struct rule *rule_)
+{
+    struct rule_dpif *rule = rule_dpif_cast(rule_);
     ovs_mutex_destroy(&rule->stats_mutex);
 }
 
@@ -6350,6 +6359,8 @@ const struct ofproto_class ofproto_dpif_class = {
     NULL,                       /* rule_choose_table */
     rule_alloc,
     rule_construct,
+    rule_insert,
+    rule_delete,
     rule_destruct,
     rule_dealloc,
     rule_get_stats,
index 43273ec..63d2687 100644 (file)
@@ -277,10 +277,9 @@ rule_from_cls_rule(const struct cls_rule *cls_rule)
 }
 
 void ofproto_rule_update_used(struct rule *, long long int used);
-void ofproto_rule_expire(struct rule *rule, uint8_t reason)
-    OVS_RELEASES(rule->evict);
-void ofproto_rule_destroy(struct ofproto *, struct classifier *cls,
-                          struct rule *) OVS_REQ_WRLOCK(cls->rwlock);
+void ofproto_rule_expire(struct rule *rule, uint8_t reason);
+void ofproto_rule_delete(struct ofproto *, struct classifier *cls,
+                         struct rule *) OVS_REQ_WRLOCK(cls->rwlock);
 void ofproto_rule_reduce_timeouts(struct rule *rule, uint16_t idle_timeout,
                                   uint16_t hard_timeout)
     OVS_EXCLUDED(rule->ofproto->expirable_mutex, rule->timeout_mutex);
@@ -330,6 +329,10 @@ bool ofproto_rule_is_hidden(const struct rule *);
  *   ofport   ->port_alloc  ->port_construct  ->port_destruct  ->port_dealloc
  *   rule     ->rule_alloc  ->rule_construct  ->rule_destruct  ->rule_dealloc
  *
+ * "ofproto" and "ofport" have this exact life cycle.  The "rule" data
+ * structure also follow this life cycle with some additional elaborations
+ * described under "Rule Life Cycle" below.
+ *
  * Any instance of a given data structure goes through the following life
  * cycle:
  *
@@ -518,8 +521,16 @@ struct ofproto_class {
      * must complete all of them by calling ofoperation_complete().
      *
      * ->destruct() must also destroy all remaining rules in the ofproto's
-     * tables, by passing each remaining rule to ofproto_rule_destroy().  The
-     * client will destroy the flow tables themselves after ->destruct()
+     * tables, by passing each remaining rule to ofproto_rule_delete(), and
+     * then complete each of those deletions in turn by calling
+     * ofoperation_complete().
+     *
+     * (Thus, there is a multi-step process for any rule currently being
+     * inserted or modified at the beginning of destruction: first
+     * ofoperation_complete() that operation, then ofproto_rule_delete() the
+     * rule, then ofoperation_complete() the deletion operation.)
+     *
+     * The client will destroy the flow tables themselves after ->destruct()
      * returns.
      */
     struct ofproto *(*alloc)(void);
@@ -874,17 +885,49 @@ struct ofproto_class {
                                      const struct match *match,
                                      uint8_t *table_idp);
 
-    /* Life-cycle functions for a "struct rule" (see "Life Cycle" above).
+    /* Life-cycle functions for a "struct rule".
+     *
+     *
+     * Rule Life Cycle
+     * ===============
+     *
+     * The life cycle of a struct rule is an elaboration of the basic life
+     * cycle described above under "Life Cycle".
+     *
+     * After a rule is successfully constructed, it is then inserted.  If
+     * insertion completes successfully, then before it is later destructed, it
+     * is deleted.
+     *
+     * You can think of a rule as having the following extra steps inserted
+     * between "Life Cycle" steps 4 and 5:
+     *
+     *   4.1. The client inserts the rule into the flow table, making it
+     *        visible in flow table lookups.
+     *
+     *   4.2. The client calls "rule_insert".  Immediately or eventually, the
+     *        implementation calls ofoperation_complete() to indicate that the
+     *        insertion completed.  If the operation failed, skip to step 5.
+     *
+     *   4.3. The rule is now installed in the flow table.  Eventually it will
+     *        be deleted.
+     *
+     *   4.4. The client removes the rule from the flow table.  It is no longer
+     *        visible in flow table lookups.
+     *
+     *   4.5. The client calls "rule_delete".  Immediately or eventually, the
+     *        implementation calls ofoperation_complete() to indicate that the
+     *        deletion completed.  Deletion is not allowed to fail, so it must
+     *        be successful.
      *
      *
      * Asynchronous Operation Support
      * ==============================
      *
-     * The life-cycle operations on rules can operate asynchronously, meaning
-     * that ->rule_construct() and ->rule_destruct() only need to initiate
-     * their respective operations and do not need to wait for them to complete
-     * before they return.  ->rule_modify_actions() also operates
-     * asynchronously.
+     * The "insert" and "delete" life-cycle operations on rules can operate
+     * asynchronously, meaning that ->rule_insert() and ->rule_delete() only
+     * need to initiate their respective operations and do not need to wait for
+     * them to complete before they return.  ->rule_modify_actions() also
+     * operates asynchronously.
      *
      * An ofproto implementation reports the success or failure of an
      * asynchronous operation on a rule using the rule's 'pending' member,
@@ -895,9 +938,9 @@ struct ofproto_class {
      *
      * Only the following contexts may call ofoperation_complete():
      *
-     *   - The function called to initiate the operation,
-     *     e.g. ->rule_construct() or ->rule_destruct().  This is the best
-     *     choice if the operation completes quickly.
+     *   - The function called to initiate the operation, e.g. ->rule_insert()
+     *     or ->rule_delete().  This is the best choice if the operation
+     *     completes quickly.
      *
      *   - The implementation's ->run() function.
      *
@@ -907,12 +950,12 @@ struct ofproto_class {
      * that the operation will probably succeed:
      *
      *   - ofproto adds the rule in the flow table before calling
-     *     ->rule_construct().
+     *     ->rule_insert().
      *
      *   - ofproto updates the rule's actions and other properties before
      *     calling ->rule_modify_actions().
      *
-     *   - ofproto removes the rule before calling ->rule_destruct().
+     *   - ofproto removes the rule before calling ->rule_delete().
      *
      * With one exception, when an asynchronous operation completes with an
      * error, ofoperation_complete() backs out the already applied changes:
@@ -937,59 +980,77 @@ struct ofproto_class {
      * Construction
      * ============
      *
-     * When ->rule_construct() is called, 'rule' is a new rule in its flow
-     * table.  The caller has already inserted 'rule' into 'rule->ofproto''s
-     * flow table numbered 'rule->table_id'.
+     * When ->rule_construct() is called, 'rule' is a new rule that is not yet
+     * inserted into a flow table.  ->rule_construct() should initialize enough
+     * of the rule's derived state for 'rule' to be suitable for inserting into
+     * a flow table.  ->rule_construct() should not modify any base members of
+     * struct rule.
      *
-     * ->rule_construct() should set the following in motion:
+     * If ->rule_construct() fails (as indicated by returning a nonzero
+     * OpenFlow error code), the ofproto base code will uninitialize and
+     * deallocate 'rule'.  See "Rule Life Cycle" above for more details.
      *
-     *   - Validate that the matching rule in 'rule->cr' is supported by the
+     * ->rule_construct() may also:
+     *
+     *   - Validate that the datapath supports the matching rule in 'rule->cr'
      *     datapath.  For example, if the rule's table does not support
      *     registers, then it is an error if 'rule->cr' does not wildcard all
      *     registers.
      *
      *   - Validate that the datapath can correctly implement 'rule->ofpacts'.
      *
-     *   - If the rule is valid, add the new rule to the datapath flow table.
+     * Some implementations might need to defer these tasks to ->rule_insert(),
+     * which is also acceptable.
+     *
+     *
+     * Insertion
+     * =========
+     *
+     * Following successful construction, the ofproto base case inserts 'rule'
+     * into its flow table, then it calls ->rule_insert().  ->rule_insert()
+     * should set in motion adding the new rule to the datapath flow table.  It
+     * must act as follows:
      *
-     * (On failure, the ofproto code will roll back the insertion from the flow
-     * table by removing 'rule'.)
+     *   - If it completes insertion, either by succeeding or failing, it must
+     *     call ofoperation_complete()
      *
-     * ->rule_construct() must act in one of the following ways:
+     *   - If insertion is only partially complete, then it must return without
+     *     calling ofoperation_complete().  Later, when the insertion is
+     *     complete, the ->run() or ->destruct() function must call
+     *     ofoperation_complete() to report success or failure.
      *
-     *   - If it succeeds, it must call ofoperation_complete() and return 0.
+     * If ->rule_insert() fails, the ofproto base code will remove 'rule' from
+     * the flow table, destruct, uninitialize, and deallocate 'rule'.  See
+     * "Rule Life Cycle" above for more details.
      *
-     *   - If it fails, it must act in one of the following ways:
      *
-     *       * Call ofoperation_complete() and return 0.
+     * Deletion
+     * ========
      *
-     *       * Return an OpenFlow error code.  (Do not call
-     *         ofoperation_complete() in this case.)
+     * The ofproto base code removes 'rule' from its flow table before it calls
+     * ->rule_delete().  ->rule_delete() should set in motion removing 'rule'
+     * from the datapath flow table.  It must act as follows:
      *
-     *     Either way, ->rule_destruct() will not be called for 'rule', but
-     *     ->rule_dealloc() will be.
+     *   - If it completes deletion, it must call ofoperation_complete().
      *
-     *   - If the operation is only partially complete, then it must return 0.
-     *     Later, when the operation is complete, the ->run() or ->destruct()
-     *     function must call ofoperation_complete() to report success or
-     *     failure.
+     *   - If deletion is only partially complete, then it must return without
+     *     calling ofoperation_complete().  Later, when the deletion is
+     *     complete, the ->run() or ->destruct() function must call
+     *     ofoperation_complete().
      *
-     * ->rule_construct() should not modify any base members of struct rule.
+     * Rule deletion must not fail.
      *
      *
      * Destruction
      * ===========
      *
-     * When ->rule_destruct() is called, the caller has already removed 'rule'
-     * from 'rule->ofproto''s flow table.  ->rule_destruct() should set in
-     * motion removing 'rule' from the datapath flow table.  If removal
-     * completes synchronously, it should call ofoperation_complete().
-     * Otherwise, the ->run() or ->destruct() function must later call
-     * ofoperation_complete() after the operation completes.
+     * ->rule_destruct() must uninitialize derived state.
      *
      * Rule destruction must not fail. */
     struct rule *(*rule_alloc)(void);
     enum ofperr (*rule_construct)(struct rule *rule);
+    void (*rule_insert)(struct rule *rule);
+    void (*rule_delete)(struct rule *rule);
     void (*rule_destruct)(struct rule *rule);
     void (*rule_dealloc)(struct rule *rule);
 
index 623039b..e5ad442 100644 (file)
@@ -1074,17 +1074,37 @@ ofproto_get_snoops(const struct ofproto *ofproto, struct sset *snoops)
     connmgr_get_snoops(ofproto->connmgr, snoops);
 }
 
+/* Deletes 'rule' from 'cls' within 'ofproto'.
+ *
+ * The 'cls' argument is redundant (it is &ofproto->tables[rule->table_id].cls)
+ * but it allows Clang to do better checking. */
 static void
-ofproto_flush__(struct ofproto *ofproto)
+ofproto_delete_rule(struct ofproto *ofproto, struct classifier *cls,
+                    struct rule *rule)
+    OVS_REQ_WRLOCK(cls->rwlock)
 {
     struct ofopgroup *group;
+
+    ovs_assert(!rule->pending);
+    ovs_assert(cls == &ofproto->tables[rule->table_id].cls);
+
+    group = ofopgroup_create_unattached(ofproto);
+    ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
+    ovs_rwlock_wrlock(&rule->evict);
+    oftable_remove_rule__(ofproto, cls, rule);
+    ofproto->ofproto_class->rule_delete(rule);
+    ofopgroup_submit(group);
+}
+
+static void
+ofproto_flush__(struct ofproto *ofproto)
+{
     struct oftable *table;
 
     if (ofproto->ofproto_class->flush) {
         ofproto->ofproto_class->flush(ofproto);
     }
 
-    group = ofopgroup_create_unattached(ofproto);
     OFPROTO_FOR_EACH_TABLE (table, ofproto) {
         struct rule *rule, *next_rule;
         struct cls_cursor cursor;
@@ -1097,16 +1117,11 @@ ofproto_flush__(struct ofproto *ofproto)
         cls_cursor_init(&cursor, &table->cls, NULL);
         CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
             if (!rule->pending) {
-                ofoperation_create(group, rule, OFOPERATION_DELETE,
-                                   OFPRR_DELETE);
-                ovs_rwlock_wrlock(&rule->evict);
-                oftable_remove_rule__(ofproto, &table->cls, rule);
-                ofproto->ofproto_class->rule_destruct(rule);
+                ofproto_delete_rule(ofproto, &table->cls, rule);
             }
         }
         ovs_rwlock_unlock(&table->cls.rwlock);
     }
-    ofopgroup_submit(group);
 }
 
 static void
@@ -1684,12 +1699,13 @@ bool
 ofproto_delete_flow(struct ofproto *ofproto,
                     const struct match *target, unsigned int priority)
 {
+    struct classifier *cls = &ofproto->tables[0].cls;
     struct rule *rule;
 
-    ovs_rwlock_rdlock(&ofproto->tables[0].cls.rwlock);
-    rule = rule_from_cls_rule(classifier_find_match_exactly(
-                                  &ofproto->tables[0].cls, target, priority));
-    ovs_rwlock_unlock(&ofproto->tables[0].cls.rwlock);
+    ovs_rwlock_rdlock(&cls->rwlock);
+    rule = rule_from_cls_rule(classifier_find_match_exactly(cls, target,
+                                                            priority));
+    ovs_rwlock_unlock(&cls->rwlock);
     if (!rule) {
         /* No such rule -> success. */
         return true;
@@ -1699,12 +1715,10 @@ ofproto_delete_flow(struct ofproto *ofproto,
         return false;
     } else {
         /* Initiate deletion -> success. */
-        struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
-        ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
-        ovs_rwlock_wrlock(&rule->evict);
-        oftable_remove_rule(rule);
-        ofproto->ofproto_class->rule_destruct(rule);
-        ofopgroup_submit(group);
+        ovs_rwlock_wrlock(&cls->rwlock);
+        ofproto_delete_rule(ofproto, cls, rule);
+        ovs_rwlock_unlock(&cls->rwlock);
+
         return true;
     }
 
@@ -2202,6 +2216,7 @@ static void
 ofproto_rule_destroy__(struct rule *rule)
 {
     if (rule) {
+        rule->ofproto->ofproto_class->rule_destruct(rule);
         cls_rule_destroy(&rule->cr);
         free(rule->ofpacts);
         ovs_mutex_destroy(&rule->timeout_mutex);
@@ -2211,24 +2226,18 @@ ofproto_rule_destroy__(struct rule *rule)
 }
 
 /* This function allows an ofproto implementation to destroy any rules that
- * remain when its ->destruct() function is called.  The caller must have
- * already uninitialized any derived members of 'rule' (step 5 described in the
- * large comment in ofproto/ofproto-provider.h titled "Life Cycle").
- * This function implements steps 6 and 7.
+ * remain when its ->destruct() function is called..  This function implements
+ * steps 4.4 and 4.5 in the section titled "Rule Life Cycle" in
+ * ofproto-provider.h.
  *
  * This function should only be called from an ofproto implementation's
  * ->destruct() function.  It is not suitable elsewhere. */
 void
-ofproto_rule_destroy(struct ofproto *ofproto, struct classifier *cls,
-                     struct rule *rule) OVS_REQ_WRLOCK(cls->rwlock)
+ofproto_rule_delete(struct ofproto *ofproto, struct classifier *cls,
+                    struct rule *rule)
+    OVS_REQ_WRLOCK(cls->rwlock)
 {
-    ovs_assert(!rule->pending);
-    if (!ovs_rwlock_trywrlock(&rule->evict)) {
-        oftable_remove_rule__(ofproto, cls, rule);
-    } else {
-        NOT_REACHED();
-    }
-    ofproto_rule_destroy__(rule);
+    ofproto_delete_rule(ofproto, cls, rule);
 }
 
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
@@ -3342,6 +3351,34 @@ is_flow_deletion_pending(const struct ofproto *ofproto,
     return false;
 }
 
+static enum ofperr
+evict_rule_from_table(struct ofproto *ofproto, struct oftable *table)
+{
+    struct rule *rule;
+    size_t n_rules;
+
+    ovs_rwlock_rdlock(&table->cls.rwlock);
+    n_rules = classifier_count(&table->cls);
+    ovs_rwlock_unlock(&table->cls.rwlock);
+
+    if (n_rules < table->max_flows) {
+        return 0;
+    } else if (!choose_rule_to_evict(table, &rule)) {
+        return OFPERR_OFPFMFC_TABLE_FULL;
+    } else if (rule->pending) {
+        ovs_rwlock_unlock(&rule->evict);
+        return OFPROTO_POSTPONE;
+    } else {
+        struct ofopgroup *group;
+
+        group = ofopgroup_create_unattached(ofproto);
+        delete_flow__(rule, group, OFPRR_EVICTION);
+        ofopgroup_submit(group);
+
+        return 0;
+    }
+}
+
 /* Implements OFPFC_ADD and the cases for OFPFC_MODIFY and OFPFC_MODIFY_STRICT
  * in which no matching flow already exists in the flow table.
  *
@@ -3361,10 +3398,8 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
 {
     struct oftable *table;
     struct ofopgroup *group;
-    struct ofoperation *op;
-    struct rule *evict;
+    struct cls_rule cr;
     struct rule *rule;
-    size_t n_rules;
     uint8_t table_id;
     int error;
 
@@ -3398,13 +3433,14 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         return OFPERR_OFPBRC_EPERM;
     }
 
+    cls_rule_init(&cr, &fm->match, fm->priority);
+
     /* Transform "add" into "modify" if there's an existing identical flow. */
     ovs_rwlock_rdlock(&table->cls.rwlock);
-    rule = rule_from_cls_rule(classifier_find_match_exactly(&table->cls,
-                                                            &fm->match,
-                                                            fm->priority));
+    rule = rule_from_cls_rule(classifier_find_rule_exactly(&table->cls, &cr));
     ovs_rwlock_unlock(&table->cls.rwlock);
     if (rule) {
+        cls_rule_destroy(&cr);
         if (!rule_is_modifiable(rule)) {
             return OFPERR_OFPBRC_EPERM;
         } else if (rule->pending) {
@@ -3426,19 +3462,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         return error;
     }
 
-    /* Allocate new rule and initialize classifier rule. */
-    rule = ofproto->ofproto_class->rule_alloc();
-    if (!rule) {
-        VLOG_WARN_RL(&rl, "%s: failed to create rule (%s)",
-                     ofproto->name, ovs_strerror(error));
-        return ENOMEM;
-    }
-    cls_rule_init(&rule->cr, &fm->match, fm->priority);
-
     /* Serialize against pending deletion. */
-    if (is_flow_deletion_pending(ofproto, &rule->cr, table_id)) {
-        cls_rule_destroy(&rule->cr);
-        ofproto->ofproto_class->rule_dealloc(rule);
+    if (is_flow_deletion_pending(ofproto, &cr, table_id)) {
+        cls_rule_destroy(&cr);
         return OFPROTO_POSTPONE;
     }
 
@@ -3447,19 +3473,34 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
         bool overlaps;
 
         ovs_rwlock_rdlock(&table->cls.rwlock);
-        overlaps = classifier_rule_overlaps(&table->cls, &rule->cr);
+        overlaps = classifier_rule_overlaps(&table->cls, &cr);
         ovs_rwlock_unlock(&table->cls.rwlock);
 
         if (overlaps) {
-            cls_rule_destroy(&rule->cr);
-            ofproto->ofproto_class->rule_dealloc(rule);
+            cls_rule_destroy(&cr);
             return OFPERR_OFPFMFC_OVERLAP;
         }
     }
 
-    /* FIXME: Implement OFPFF12_RESET_COUNTS */
+    /* If necessary, evict an existing rule to clear out space. */
+    error = evict_rule_from_table(ofproto, table);
+    if (error) {
+        cls_rule_destroy(&cr);
+        return error;
+    }
 
+    /* Allocate new rule. */
+    rule = ofproto->ofproto_class->rule_alloc();
+    if (!rule) {
+        cls_rule_destroy(&cr);
+        VLOG_WARN_RL(&rl, "%s: failed to create rule (%s)",
+                     ofproto->name, ovs_strerror(error));
+        return ENOMEM;
+    }
+
+    /* Initialize base state. */
     rule->ofproto = ofproto;
+    cls_rule_move(&rule->cr, &cr);
     rule->pending = NULL;
     rule->flow_cookie = fm->new_cookie;
     rule->created = rule->modified = rule->used = time_msec();
@@ -3483,56 +3524,21 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     rule->modify_seqno = 0;
     ovs_rwlock_init(&rule->evict);
 
-    /* Insert new rule. */
-    oftable_insert_rule(rule);
-
-    ovs_rwlock_rdlock(&table->cls.rwlock);
-    n_rules = classifier_count(&table->cls);
-    ovs_rwlock_unlock(&table->cls.rwlock);
-    if (n_rules > table->max_flows) {
-        ovs_rwlock_rdlock(&rule->evict);
-        if (choose_rule_to_evict(table, &evict)) {
-            ovs_rwlock_unlock(&rule->evict);
-            ovs_rwlock_unlock(&evict->evict);
-            if (evict->pending) {
-                error = OFPROTO_POSTPONE;
-                goto exit;
-            }
-        } else {
-            ovs_rwlock_unlock(&rule->evict);
-            error = OFPERR_OFPFMFC_TABLE_FULL;
-            goto exit;
-        }
-    } else {
-        evict = NULL;
-    }
-
-    group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
-    op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
-
+    /* Construct rule, initializing derived state. */
     error = ofproto->ofproto_class->rule_construct(rule);
     if (error) {
-        op->group->n_running--;
-        ofoperation_destroy(rule->pending);
-    } else if (evict) {
-        /* It would be better if we maintained the lock we took in
-         * choose_rule_to_evict() earlier, but that confuses the thread
-         * safety analysis, and this code is fragile enough that we really
-         * need it.  In the worst case, we'll have to block a little while
-         * before we perform the eviction, which doesn't seem like a big
-         * problem. */
-        ovs_rwlock_wrlock(&evict->evict);
-        delete_flow__(evict, group, OFPRR_EVICTION);
+        ofproto_rule_destroy__(rule);
+        return error;
     }
+
+    /* Insert rule. */
+    oftable_insert_rule(rule);
+
+    group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
+    ofoperation_create(group, rule, OFOPERATION_ADD, 0);
+    ofproto->ofproto_class->rule_insert(rule);
     ofopgroup_submit(group);
 
-exit:
-    /* Back out if an error occurred. */
-    if (error) {
-        ovs_rwlock_wrlock(&rule->evict);
-        oftable_remove_rule(rule);
-        ofproto_rule_destroy__(rule);
-    }
     return error;
 }
 \f
@@ -3699,7 +3705,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group,
 
     ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
     oftable_remove_rule(rule);
-    ofproto->ofproto_class->rule_destruct(rule);
+    ofproto->ofproto_class->rule_delete(rule);
 }
 
 /* Deletes the rules listed in 'rules'.
@@ -3813,17 +3819,14 @@ void
 ofproto_rule_expire(struct rule *rule, uint8_t reason)
 {
     struct ofproto *ofproto = rule->ofproto;
-    struct ofopgroup *group;
+    struct classifier *cls = &ofproto->tables[rule->table_id].cls;
 
     ovs_assert(reason == OFPRR_HARD_TIMEOUT || reason == OFPRR_IDLE_TIMEOUT);
-
     ofproto_rule_send_removed(rule, reason);
 
-    group = ofopgroup_create_unattached(ofproto);
-    ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
-    oftable_remove_rule(rule);
-    ofproto->ofproto_class->rule_destruct(rule);
-    ofopgroup_submit(group);
+    ovs_rwlock_wrlock(&cls->rwlock);
+    ofproto_delete_rule(ofproto, cls, rule);
+    ovs_rwlock_unlock(&cls->rwlock);
 }
 
 /* Reduces '*timeout' to no more than 'max'.  A value of zero in either case
@@ -5269,7 +5272,7 @@ ofproto_evict(struct ofproto *ofproto)
             ofoperation_create(group, rule,
                                OFOPERATION_DELETE, OFPRR_EVICTION);
             oftable_remove_rule(rule);
-            ofproto->ofproto_class->rule_destruct(rule);
+            ofproto->ofproto_class->rule_delete(rule);
         }
     }
     ofopgroup_submit(group);