ofproto: Send OpenFlow messages as ports are added or deleted or changed.
authorBen Pfaff <blp@nicira.com>
Mon, 28 Jun 2010 21:06:26 +0000 (14:06 -0700)
committerBen Pfaff <blp@nicira.com>
Mon, 28 Jun 2010 21:06:26 +0000 (14:06 -0700)
This functionality (a required part of OpenFlow) was dropped in the move
from dpif to wdp.  This commit restores this functionality.

ofproto/ofproto.c
ofproto/wdp-provider.h
ofproto/wdp-xflow.c
ofproto/wdp.c
ofproto/wdp.h

index 541d0f7..ebb24d1 100644 (file)
@@ -902,6 +902,30 @@ add_snooper(struct ofproto *ofproto, struct vconn *vconn)
     }
 }
 
+static void
+ofproto_port_poll_cb(const struct ofp_phy_port *opp, uint8_t reason,
+                     void *ofproto_)
+{
+    /* XXX Should limit the number of queued port status change messages. */
+    struct ofproto *ofproto = ofproto_;
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, struct ofconn, node, &ofproto->all_conns) {
+        struct ofp_port_status *ops;
+        struct ofpbuf *b;
+
+        if (!ofconn_receives_async_msgs(ofconn)) {
+            continue;
+        }
+
+        ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
+        ops->reason = reason;
+        ops->desc = *opp;
+        hton_ofp_phy_port(&ops->desc);
+        queue_tx(b, ofconn, NULL);
+    }
+}
+
 int
 ofproto_run1(struct ofproto *p)
 {
@@ -929,6 +953,8 @@ ofproto_run1(struct ofproto *p)
         handle_wdp_packet(p, xmemdup(&packet, sizeof packet));
     }
 
+    wdp_port_poll(p->wdp, ofproto_port_poll_cb, p);
+
     if (p->in_band) {
         if (time_msec() >= p->next_in_band_update) {
             update_in_band_remotes(p);
@@ -2721,7 +2747,7 @@ schedule_packet_in(struct ofconn *ofconn, struct wdp_packet *packet,
 }
 
 /* Converts 'packet->payload' to a struct ofp_packet_in.  It must have
- * sufficient headroom to do so (e.g. as returned by dpif_recv()).
+ * sufficient headroom to do so (e.g. as returned by xfif_recv()).
  *
  * The conversion is not complete: the caller still needs to trim any unneeded
  * payload off the end of the buffer, set the length in the OpenFlow header,
index 4efb383..1d189eb 100644 (file)
@@ -210,35 +210,36 @@ struct wdp_class {
     int (*port_set_config)(struct wdp *wdp, uint16_t port_no,
                            uint32_t config);
 
-    /* Polls for changes in the set of ports in 'wdp'.  If the set of ports
-     * in 'wdp' has changed, then this function should do one of the
-     * following:
+    /* Polls for changes in the set of ports in 'wdp' since the last call to
+     * this function or, if this is the first call, since this wdp was opened.
+     * For each change, calls 'cb' passing 'aux' and:
      *
-     * - Preferably: store the name of the device that was added to or deleted
-     *   from 'wdp' in '*devnamep' and return 0.  The caller is responsible
-     *   for freeing '*devnamep' (with free()) when it no longer needs it.
+     *   - For a port that has been added, OFPPR_ADD as 'reason' and the new
+     *     port's "struct ofp_phy_port" as 'opp'.
      *
-     * - Alternatively: return ENOBUFS, without indicating the device that was
-     *   added or deleted.
+     *   - For a port that has been removed, OFPPR_DELETE as 'reason' and the
+     *     deleted port's former "struct ofp_phy_port" as 'opp'.
      *
-     * Occasional 'false positives', in which the function returns 0 while
-     * indicating a device that was not actually added or deleted or returns
-     * ENOBUFS without any change, are acceptable.
+     *   - For a port whose configuration has changed, OFPPR_MODIFY as 'reason'
+     *     and the modified port's new "struct ofp_phy_port" as 'opp'.
      *
-     * If the set of ports in 'wdp' has not changed, returns EAGAIN.  May
-     * also return other positive errno values to indicate that something has
-     * gone wrong.
+     * If 'wdp' has a fixed set of ports, this function must still be present
+     * (to report changes to port configurations) but it will never report
+     * OFPPR_ADD or OFPPR_DELETE as a reason.
      *
-     * If 'wdp' has a fixed set of ports, this function may be null, which is
-     * equivalent to always returning EAGAIN.
-     */
-    int (*port_poll)(const struct wdp *wdp, char **devnamep);
-
-    /* Arranges for the poll loop to wake up when 'port_poll' will return a
-     * value other than EAGAIN.
+     * 'opp' is in *host* byte order.
      *
-     * If 'wdp' has a fixed set of ports, this function may be null. */
-    void (*port_poll_wait)(const struct wdp *wdp);
+     * Normally returns 0.  May also return a positive errno value to indicate
+     * that something has gone wrong.
+     */
+    int (*port_poll)(struct wdp *wdp,
+                     void (*cb)(const struct ofp_phy_port *opp,
+                                uint8_t reason, void *aux),
+                     void *aux);
+
+    /* Arranges for the poll loop to wake up when 'port_poll' will call its
+     * callback. */
+    int (*port_poll_wait)(const struct wdp *wdp);
 
     /* If 'wdp' contains a flow exactly equal to 'flow', returns that flow.
      * Otherwise returns null. */
index 56067e6..26946a0 100644 (file)
@@ -71,7 +71,8 @@ struct wx {
 static struct list all_wx = LIST_INITIALIZER(&all_wx);
 
 static int wx_port_init(struct wx *);
-static void wx_port_run(struct wx *);
+static void wx_port_process_change(struct wx *wx, int error, char *devname,
+                                   wdp_port_poll_cb_func *cb, void *aux);
 static void wx_port_refresh_groups(struct wx *);
 
 enum {
@@ -1130,8 +1131,6 @@ revalidate_cb(struct cls_rule *sub_, void *cbdata_)
 static void
 wx_run_one(struct wx *wx)
 {
-    wx_port_run(wx);
-
     if (time_msec() >= wx->next_expiration) {
         COVERAGE_INC(wx_expiration);
         wx->next_expiration = time_msec() + 1000;
@@ -1169,8 +1168,6 @@ wx_run(void)
 static void
 wx_wait_one(struct wx *wx)
 {
-    xfif_port_poll_wait(wx->xfif);
-    netdev_monitor_poll_wait(wx->netdev_monitor);
     if (wx->need_revalidate /*|| !tag_set_is_empty(&p->revalidate_set)*/) {
         poll_immediate_wake();
     } else if (wx->next_expiration != LLONG_MAX) {
@@ -1474,19 +1471,38 @@ wx_port_list(const struct wdp *wdp, struct wdp_port **portsp, size_t *n_portsp)
 }
 
 static int
-wx_port_poll(const struct wdp *wdp, char **devnamep)
+wx_port_poll(struct wdp *wdp, wdp_port_poll_cb_func *cb, void *aux)
 {
     struct wx *wx = wx_cast(wdp);
+    char *devname;
+    int retval;
+    int error;
 
-    return xfif_port_poll(wx->xfif, devnamep);
+    retval = 0;
+    while ((error = xfif_port_poll(wx->xfif, &devname)) != EAGAIN) {
+        wx_port_process_change(wx, error, devname, cb, aux);
+        if (error && error != ENOBUFS) {
+            retval = error;
+        }
+    }
+    while ((error = netdev_monitor_poll(wx->netdev_monitor,
+                                        &devname)) != EAGAIN) {
+        wx_port_process_change(wx, error, devname, cb, aux);
+        if (error && error != ENOBUFS) {
+            retval = error;
+        }
+    }
+    return retval;
 }
 
-static void
+static int
 wx_port_poll_wait(const struct wdp *wdp)
 {
     struct wx *wx = wx_cast(wdp);
 
     xfif_port_poll_wait(wx->xfif);
+    netdev_monitor_poll_wait(wx->netdev_monitor);
+    return 0;
 }
 
 static struct wdp_rule *
@@ -1951,35 +1967,22 @@ wx_recv_wait(struct wdp *wdp)
     xfif_recv_wait(wx->xfif);
 }
 \f
-static void wx_port_update(struct wx *, const char *devname);
-static void wx_port_reinit(struct wx *);
+static void wx_port_update(struct wx *, const char *devname,
+                           wdp_port_poll_cb_func *cb, void *aux);
+static void wx_port_reinit(struct wx *, wdp_port_poll_cb_func *cb, void *aux);
 
 static void
-wx_port_process_change(struct wx *wx, int error, char *devname)
+wx_port_process_change(struct wx *wx, int error, char *devname,
+                       wdp_port_poll_cb_func *cb, void *aux)
 {
     if (error == ENOBUFS) {
-        wx_port_reinit(wx);
+        wx_port_reinit(wx, cb, aux);
     } else if (!error) {
-        wx_port_update(wx, devname);
+        wx_port_update(wx, devname, cb, aux);
         free(devname);
     }
 }
 
-static void
-wx_port_run(struct wx *wx)
-{
-    char *devname;
-    int error;
-
-    while ((error = xfif_port_poll(wx->xfif, &devname)) != EAGAIN) {
-        wx_port_process_change(wx, error, devname);
-    }
-    while ((error = netdev_monitor_poll(wx->netdev_monitor,
-                                        &devname)) != EAGAIN) {
-        wx_port_process_change(wx, error, devname);
-    }
-}
-
 static size_t
 wx_port_refresh_group(struct wx *wx, unsigned int group)
 {
@@ -2011,7 +2014,7 @@ wx_port_refresh_groups(struct wx *wx)
 }
 
 static void
-wx_port_reinit(struct wx *wx)
+wx_port_reinit(struct wx *wx, wdp_port_poll_cb_func *cb, void *aux)
 {
     struct svec devnames;
     struct wdp_port *wdp_port;
@@ -2032,7 +2035,7 @@ wx_port_reinit(struct wx *wx)
 
     svec_sort_unique(&devnames);
     for (i = 0; i < devnames.n; i++) {
-        wx_port_update(wx, devnames.names[i]);
+        wx_port_update(wx, devnames.names[i], cb, aux);
     }
     svec_destroy(&devnames);
 
@@ -2151,7 +2154,8 @@ wx_port_free(struct wdp_port *wdp_port)
 }
 
 static void
-wx_port_update(struct wx *wx, const char *devname)
+wx_port_update(struct wx *wx, const char *devname,
+               wdp_port_poll_cb_func *cb, void *aux)
 {
     struct xflow_port xflow_port;
     struct wdp_port *old_wdp_port;
@@ -2212,10 +2216,21 @@ wx_port_update(struct wx *wx, const char *devname)
     if (new_wdp_port) {
         wx_port_install(wx, new_wdp_port);
     }
-    wx_port_free(old_wdp_port);
+
+    /* Call back. */
+    if (!old_wdp_port) {
+        (*cb)(&new_wdp_port->opp, OFPPR_ADD, aux);
+    } else if (!new_wdp_port) {
+        (*cb)(&old_wdp_port->opp, OFPPR_DELETE, aux);
+    } else {
+        (*cb)(&new_wdp_port->opp, OFPPR_MODIFY, aux);
+    }
 
     /* Update port groups. */
     wx_port_refresh_groups(wx);
+
+    /* Clean up. */
+    wx_port_free(old_wdp_port);
 }
 
 static int
index 7e04110..bf8e406 100644 (file)
@@ -714,42 +714,36 @@ wdp_port_set_config(struct wdp *wdp, uint16_t port_no, uint32_t config)
     return wdp->wdp_class->port_set_config(wdp, port_no, config);
 }
 
-/* Polls for changes in the set of ports in 'wdp'.  If the set of ports in
- * 'wdp' has changed, this function does one of the following:
+/* Polls for changes in the set of ports in 'wdp' since the last call to this
+ * function or, if this is the first call, since 'wdp' was opened.  For each
+ * change, calls 'cb' passing 'aux' and:
  *
- * - Stores the name of the device that was added to or deleted from 'wdp' in
- *   '*devnamep' and returns 0.  The caller is responsible for freeing
- *   '*devnamep' (with free()) when it no longer needs it.
+ *   - For a port that has been added, OFPPR_ADD as 'reason' and the new port's
+ *     "struct ofp_phy_port" as 'opp'.
  *
- * - Returns ENOBUFS and sets '*devnamep' to NULL.
+ *   - For a port that has been removed, OFPPR_DELETE as 'reason' and the
+ *     deleted port's former "struct ofp_phy_port" as 'opp'.
  *
- * This function may also return 'false positives', where it returns 0 and
- * '*devnamep' names a device that was not actually added or deleted or it
- * returns ENOBUFS without any change.
+ *   - For a port whose configuration has changed, OFPPR_MODIFY as 'reason' and
+ *     the modified port's new "struct ofp_phy_port" as 'opp'.
  *
- * Returns EAGAIN if the set of ports in 'wdp' has not changed.  May also
- * return other positive errno values to indicate that something has gone
- * wrong. */
+ * 'opp' is in *host* byte order.
+ *
+ * Normally returns 0.  May also return a positive errno value to indicate
+ * that something has gone wrong.
+ */
 int
-wdp_port_poll(const struct wdp *wdp, char **devnamep)
+wdp_port_poll(struct wdp *wdp, wdp_port_poll_cb_func *cb, void *aux)
 {
-    int error = (wdp->wdp_class->port_poll
-                 ? wdp->wdp_class->port_poll(wdp, devnamep)
-                 : EAGAIN);
-    if (error) {
-        *devnamep = NULL;
-    }
-    return error;
+    return wdp->wdp_class->port_poll(wdp, cb, aux);
 }
 
-/* Arranges for the poll loop to wake up when port_poll(wdp) will return a
- * value other than EAGAIN. */
-void
+/* Arranges for the poll loop to wake up when 'port_poll' will call its
+ * callback. */
+int
 wdp_port_poll_wait(const struct wdp *wdp)
 {
-    if (wdp->wdp_class->port_poll_wait) {
-        wdp->wdp_class->port_poll_wait(wdp);
-    }
+    return wdp->wdp_class->port_poll_wait(wdp);
 }
 
 /* Deletes all flows from 'wdp'.  Returns 0 if successful, otherwise a
index 0b90bd4..724377b 100644 (file)
@@ -135,8 +135,10 @@ int wdp_port_list(const struct wdp *, struct wdp_port **, size_t *n_ports);
 
 int wdp_port_set_config(struct wdp *, uint16_t port_no, uint32_t config);
 
-int wdp_port_poll(const struct wdp *, char **devnamep);
-void wdp_port_poll_wait(const struct wdp *);
+typedef void wdp_port_poll_cb_func(const struct ofp_phy_port *opp,
+                                   uint8_t reason, void *aux);
+int wdp_port_poll(struct wdp *, wdp_port_poll_cb_func *cb, void *aux);
+int wdp_port_poll_wait(const struct wdp *);
 
 int wdp_flow_flush(struct wdp *);