Implement initial Python bindings for Open vSwitch database.
[sliver-openvswitch.git] / python / ovs / reconnect.py
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
new file mode 100644 (file)
index 0000000..9048579
--- /dev/null
@@ -0,0 +1,563 @@
+# Copyright (c) 2010 Nicira Networks
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+# Values returned by Reconnect.run()
+CONNECT = 'connect'
+DISCONNECT = 'disconnect'
+PROBE = 'probe'
+
+EOF = -1
+
+class Reconnect(object):
+    """A finite-state machine for connecting and reconnecting to a network
+    resource with exponential backoff.  It also provides optional support for
+    detecting a connection on which the peer is no longer responding.
+
+    The library does not implement anything networking related, only an FSM for
+    networking code to use.
+
+    Many Reconnect methods take a "now" argument.  This makes testing easier
+    since there is no hidden state.  When not testing, just pass the return
+    value of ovs.time.msec().  (Perhaps this design should be revisited
+    later.)"""
+
+    class Void(object):
+        name = "VOID"
+        is_connected = False
+
+        @staticmethod
+        def deadline(fsm):
+            return None
+
+        @staticmethod
+        def run(fsm, now):
+            return None
+
+    class Listening(object):
+        name = "LISTENING"
+        is_connected = False
+
+        @staticmethod
+        def deadline(fsm):
+            return None
+
+        @staticmethod
+        def run(fsm, now):
+            return None
+
+    class Backoff(object):
+        name = "BACKOFF"
+        is_connected = False
+
+        @staticmethod
+        def deadline(fsm):
+            return fsm.state_entered + fsm.backoff
+
+        @staticmethod
+        def run(fsm, now):
+            return CONNECT
+
+    class ConnectInProgress(object):
+        name = "CONNECT_IN_PROGRESS"
+        is_connected = False
+
+        @staticmethod
+        def deadline(fsm):
+            return fsm.state_entered + max(1000, fsm.backoff)
+
+        @staticmethod
+        def run(fsm, now):
+            return DISCONNECT
+
+    class Active(object):
+        name = "ACTIVE"
+        is_connected = True
+
+        @staticmethod
+        def deadline(fsm):
+            if fsm.probe_interval:
+                base = max(fsm.last_received, fsm.state_entered)
+                return base + fsm.probe_interval
+            return None
+
+        @staticmethod
+        def run(fsm, now):
+            logging.debug("%s: idle %d ms, sending inactivity probe"
+                          % (fsm.name,
+                             now - max(fsm.last_received, fsm.state_entered)))
+            fsm._transition(now, Reconnect.Idle)
+            return PROBE
+
+    class Idle(object):
+        name = "IDLE"
+        is_connected = True
+
+        @staticmethod
+        def deadline(fsm):
+            return fsm.state_entered + fsm.probe_interval
+
+        @staticmethod
+        def run(fsm, now):
+            logging.error("%s: no response to inactivity probe after %.3g "
+                          "seconds, disconnecting"
+                          % (fsm.name, (now - fsm.state_entered) / 1000.0))
+            return DISCONNECT
+
+    class Reconnect:
+        name = "RECONNECT"
+        is_connected = False
+
+        @staticmethod
+        def deadline(fsm):
+            return fsm.state_entered
+
+        @staticmethod
+        def run(fsm, now):
+            return DISCONNECT
+
+    def __init__(self, now):
+        """Creates and returns a new reconnect FSM with default settings.  The
+        FSM is initially disabled.  The caller will likely want to call
+        self.enable() and self.set_name() on the returned object."""
+
+        self.name = "void"
+        self.min_backoff = 1000
+        self.max_backoff = 8000
+        self.probe_interval = 5000
+        self.passive = False
+        self.info_level = logging.info
+
+        self.state = Reconnect.Void
+        self.state_entered = now
+        self.backoff = 0
+        self.last_received = now
+        self.last_connected = now
+        self.max_tries = None
+
+        self.creation_time = now
+        self.n_attempted_connections = 0
+        self.n_successful_connections = 0
+        self.total_connected_duration = 0
+        self.seqno = 0
+
+    def set_quiet(self, quiet):
+        """If 'quiet' is true, this object will log informational messages at
+        debug level, by default keeping them out of log files.  This is
+        appropriate if the connection is one that is expected to be
+        short-lived, so that the log messages are merely distracting.
+        
+        If 'quiet' is false, this object logs informational messages at info
+        level.  This is the default.
+        
+        This setting has no effect on the log level of debugging, warning, or
+        error messages."""
+        if quiet:
+            self.info_level = logging.debug
+        else:
+            self.info_level = logging.info
+
+    def get_name(self):
+        return self.name
+
+    def set_name(self, name):
+        """Sets this object's name to 'name'.  If 'name' is None, then "void"
+        is used instead.
+        
+        The name is used in log messages."""
+        if name is None:
+            self.name = "void"
+        else:
+            self.name = name
+
+    def get_min_backoff(self):
+        """Return the minimum number of milliseconds to back off between
+        consecutive connection attempts.  The default is 1000 ms."""
+        return self.min_backoff
+
+    def get_max_backoff(self):
+        """Return the maximum number of milliseconds to back off between
+        consecutive connection attempts.  The default is 8000 ms."""
+        return self.max_backoff
+
+    def get_probe_interval(self):
+        """Returns the "probe interval" in milliseconds.  If this is zero, it
+        disables the connection keepalive feature.  If it is nonzero, then if
+        the interval passes while the FSM is connected and without
+        self.received() being called, self.run() returns ovs.reconnect.PROBE.
+        If the interval passes again without self.received() being called,
+        self.run() returns ovs.reconnect.DISCONNECT."""
+        return self.probe_interval
+
+    def set_max_tries(self, max_tries):
+        """Limits the maximum number of times that this object will ask the
+        client to try to reconnect to 'max_tries'.  None (the default) means an
+        unlimited number of tries.
+
+        After the number of tries has expired, the FSM will disable itself
+        instead of backing off and retrying."""
+        self.max_tries = max_tries
+
+    def get_max_tries(self):
+        """Returns the current remaining number of connection attempts,
+        None if the number is unlimited."""
+        return self.max_tries
+
+    def set_backoff(self, min_backoff, max_backoff):
+        """Configures the backoff parameters for this FSM.  'min_backoff' is
+        the minimum number of milliseconds, and 'max_backoff' is the maximum,
+        between connection attempts.
+
+        'min_backoff' must be at least 1000, and 'max_backoff' must be greater
+        than or equal to 'min_backoff'."""
+        self.min_backoff = max(min_backoff, 1000)
+        if self.max_backoff:
+            self.max_backoff = max(max_backoff, 1000)
+        else:
+            self.max_backoff = 8000
+        if self.min_backoff > self.max_backoff:
+            self.max_backoff = self.min_backoff
+
+        if (self.state == Reconnect.Backoff and
+            self.backoff > self.max_backoff):
+                self.backoff = self.max_backoff
+        
+    def set_probe_interval(self, probe_interval):
+        """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
+        this is zero, it disables the connection keepalive feature.  If it is
+        nonzero, then if the interval passes while this FSM is connected and
+        without self.received() being called, self.run() returns
+        ovs.reconnect.PROBE.  If the interval passes again without
+        self.received() being called, self.run() returns
+        ovs.reconnect.DISCONNECT.
+
+        If 'probe_interval' is nonzero, then it will be forced to a value of at
+        least 1000 ms."""
+        if probe_interval:
+            self.probe_interval = max(1000, probe_interval)
+        else:
+            self.probe_interval = 0
+
+    def is_passive(self):
+        """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
+        active mode (the default)."""
+        return self.passive
+
+    def set_passive(self, passive, now):
+        """Configures this FSM for active or passive mode.  In active mode (the
+        default), the FSM is attempting to connect to a remote host.  In
+        passive mode, the FSM is listening for connections from a remote host."""
+        if self.passive != passive:
+            self.passive = passive
+
+            if ((passive and self.state in (Reconnect.ConnectInProgress,
+                                            Reconnect.Reconnect)) or
+                (not passive and self.state == Reconnect.Listening
+                 and self.__may_retry())):
+                self._transition(now, Reconnect.Backoff)
+                self.backoff = 0
+
+    def is_enabled(self):
+        """Returns true if this FSM has been enabled with self.enable().
+        Calling another function that indicates a change in connection state,
+        such as self.disconnected() or self.force_reconnect(), will also enable
+        a reconnect FSM."""
+        return self.state != Reconnect.Void
+
+    def enable(self, now):
+        """If this FSM is disabled (the default for newly created FSMs),
+        enables it, so that the next call to reconnect_run() for 'fsm' will
+        return ovs.reconnect.CONNECT.
+
+        If this FSM is not disabled, this function has no effect."""
+        if self.state == Reconnect.Void and self.__may_retry():
+            self._transition(now, Reconnect.Backoff)
+            self.backoff = 0
+
+    def disable(self, now):
+        """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
+        always return 0."""
+        if self.state != Reconnect.Void:
+            self._transition(now, Reconnect.Void)
+
+    def force_reconnect(self, now):
+        """If this FSM is enabled and currently connected (or attempting to
+        connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
+        time it is called, which should cause the client to drop the connection
+        (or attempt), back off, and then reconnect."""
+        if self.state in (Reconnect.ConnectInProgress,
+                          Reconnect.Active,
+                          Reconnect.Idle):
+            self._transition(now, Reconnect.Reconnect)
+
+    def disconnected(self, now, error):
+        """Tell this FSM that the connection dropped or that a connection
+        attempt failed.  'error' specifies the reason: a positive value
+        represents an errno value, EOF indicates that the connection was closed
+        by the peer (e.g. read() returned 0), and 0 indicates no specific
+        error.
+
+        The FSM will back off, then reconnect."""
+        if self.state not in (Reconnect.Backoff, Reconnect.Void):
+            # Report what happened
+            if self.state in (Reconnect.Active, Reconnect.Idle):
+                if error > 0:
+                    logging.warning("%s: connection dropped (%s)"
+                                    % (self.name, os.strerror(error)))
+                elif error == EOF:
+                    self.info_level("%s: connection closed by peer"
+                                    % self.name)
+                else:
+                    self.info_level("%s: connection dropped" % self.name)
+            elif self.state == Reconnect.Listening:
+                if error > 0:
+                    logging.warning("%s: error listening for connections (%s)"
+                                    % (self.name, os.strerror(error)))
+                else:
+                    self.info_level("%s: error listening for connections"
+                                    % self.name)
+            else:
+                if self.passive:
+                    type = "listen"
+                else:
+                    type = "connection"
+                if error > 0:
+                    logging.warning("%s: %s attempt failed (%s)"
+                                    % (self.name, type, os.strerror(error)))
+                else:
+                    self.info_level("%s: %s attempt timed out"
+                                    % (self.name, type))
+
+            # Back off
+            if (self.state in (Reconnect.Active, Reconnect.Idle) and
+                (self.last_received - self.last_connected >= self.backoff or
+                 self.passive)):
+                if self.passive:
+                    self.backoff = 0
+                else:
+                    self.backoff = self.min_backoff
+            else:
+                if self.backoff < self.min_backoff:
+                    self.backoff = self.min_backoff
+                elif self.backoff >= self.max_backoff / 2:
+                    self.backoff = self.max_backoff
+                else:
+                    self.backoff *= 2
+
+                if self.passive:
+                    self.info_level("%s: waiting %.3g seconds before trying "
+                                    "to listen again"
+                                    % (self.name, self.backoff / 1000.0))
+                else:
+                    self.info_level("%s: waiting %.3g seconds before reconnect"
+                                    % (self.name, self.backoff / 1000.0))
+
+            if self.__may_retry():
+                self._transition(now, Reconnect.Backoff)
+            else:
+                self._transition(now, Reconnect.Void)
+
+    def connecting(self, now):
+        """Tell this FSM that a connection or listening attempt is in progress.
+
+        The FSM will start a timer, after which the connection or listening
+        attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
+        self.run())."""
+        if self.state != Reconnect.ConnectInProgress:
+            if self.passive:
+                self.info_level("%s: listening..." % self.name)
+            else:
+                self.info_level("%s: connecting..." % self.name)
+            self._transition(now, Reconnect.ConnectInProgress)
+            
+    def listening(self, now):
+        """Tell this FSM that the client is listening for connection attempts.
+        This state last indefinitely until the client reports some change.
+        
+        The natural progression from this state is for the client to report
+        that a connection has been accepted or is in progress of being
+        accepted, by calling self.connecting() or self.connected().
+        
+        The client may also report that listening failed (e.g. accept()
+        returned an unexpected error such as ENOMEM) by calling
+        self.listen_error(), in which case the FSM will back off and eventually
+        return ovs.reconnect.CONNECT from self.run() to tell the client to try
+        listening again."""
+        if self.state != Reconnect.Listening:
+            self.info_level("%s: listening..." % self.name)
+            self._transition(now, Reconnect.Listening)
+
+    def listen_error(self, now, error):
+        """Tell this FSM that the client's attempt to accept a connection
+        failed (e.g. accept() returned an unexpected error such as ENOMEM).
+        
+        If the FSM is currently listening (self.listening() was called), it
+        will back off and eventually return ovs.reconnect.CONNECT from
+        self.run() to tell the client to try listening again.  If there is an
+        active connection, this will be delayed until that connection drops."""
+        if self.state == Reconnect.Listening:
+            self.disconnected(now, error)
+
+    def connected(self, now):
+        """Tell this FSM that the connection was successful.
+
+        The FSM will start the probe interval timer, which is reset by
+        self.received().  If the timer expires, a probe will be sent (by
+        returning ovs.reconnect.PROBE from self.run().  If the timer expires
+        again without being reset, the connection will be aborted (by returning
+        ovs.reconnect.DISCONNECT from self.run()."""
+        if not self.state.is_connected:
+            self.connecting(now)
+
+            self.info_level("%s: connected" % self.name)
+            self._transition(now, Reconnect.Active)
+            self.last_connected = now
+
+    def connect_failed(self, now, error):
+        """Tell this FSM that the connection attempt failed.
+
+        The FSM will back off and attempt to reconnect."""
+        self.connecting(now)
+        self.disconnected(now, error)
+
+    def received(self, now):
+        """Tell this FSM that some data was received.  This resets the probe
+        interval timer, so that the connection is known not to be idle."""
+        if self.state != Reconnect.Active:
+            self._transition(now, Reconnect.Active)
+        self.last_received = now
+
+    def _transition(self, now, state):
+        if self.state == Reconnect.ConnectInProgress:
+            self.n_attempted_connections += 1
+            if state == Reconnect.Active:
+                self.n_successful_connections += 1
+
+        connected_before = self.state.is_connected
+        connected_now = state.is_connected
+        if connected_before != connected_now:
+            if connected_before:
+                self.total_connected_duration += now - self.last_connected
+            self.seqno += 1
+            
+        logging.debug("%s: entering %s" % (self.name, state.name))
+        self.state = state
+        self.state_entered = now
+
+    def run(self, now):
+        """Assesses whether any action should be taken on this FSM.  The return
+        value is one of:
+        
+            - None: The client need not take any action.
+        
+            - Active client, ovs.reconnect.CONNECT: The client should start a
+              connection attempt and indicate this by calling
+              self.connecting().  If the connection attempt has definitely
+              succeeded, it should call self.connected().  If the connection
+              attempt has definitely failed, it should call
+              self.connect_failed().
+        
+              The FSM is smart enough to back off correctly after successful
+              connections that quickly abort, so it is OK to call
+              self.connected() after a low-level successful connection
+              (e.g. connect()) even if the connection might soon abort due to a
+              failure at a high-level (e.g. SSL negotiation failure).
+        
+            - Passive client, ovs.reconnect.CONNECT: The client should try to
+              listen for a connection, if it is not already listening.  It
+              should call self.listening() if successful, otherwise
+              self.connecting() or reconnected_connect_failed() if the attempt
+              is in progress or definitely failed, respectively.
+        
+              A listening passive client should constantly attempt to accept a
+              new connection and report an accepted connection with
+              self.connected().
+        
+            - ovs.reconnect.DISCONNECT: The client should abort the current
+              connection or connection attempt or listen attempt and call
+              self.disconnected() or self.connect_failed() to indicate it.
+        
+            - ovs.reconnect.PROBE: The client should send some kind of request
+              to the peer that will elicit a response, to ensure that the
+              connection is indeed in working order.  (This will only be
+              returned if the "probe interval" is nonzero--see
+              self.set_probe_interval())."""
+        if now >= self.state.deadline(self):
+            return self.state.run(self, now)
+        else:
+            return None
+        
+    def wait(self, poller, now):
+        """Causes the next call to poller.block() to wake up when self.run()
+        should be called."""
+        timeout = self.timeout(now)
+        if timeout >= 0:
+            poller.timer_wait(timeout)
+
+    def timeout(self, now):
+        """Returns the number of milliseconds after which self.run() should be
+        called if nothing else notable happens in the meantime, or a negative
+        number if this is currently unnecessary."""
+        deadline = self.state.deadline(self)
+        if deadline is not None:
+            remaining = deadline - now
+            return max(0, remaining)
+        else:
+            return None
+
+    def is_connected(self):
+        """Returns True if this FSM is currently believed to be connected, that
+        is, if self.connected() was called more recently than any call to
+        self.connect_failed() or self.disconnected() or self.disable(), and
+        False otherwise."""
+        return self.state.is_connected
+
+    def get_connection_duration(self, now):
+        """Returns the number of milliseconds for which this FSM has been
+        continuously connected to its peer.  (If this FSM is not currently
+        connected, this is 0.)"""
+        if self.is_connected():
+            return now - self.last_connected
+        else:
+            return 0
+
+    def get_stats(self, now):
+        class Stats(object):
+            pass
+        stats = Stats()
+        stats.creation_time = self.creation_time
+        stats.last_connected = self.last_connected
+        stats.last_received = self.last_received
+        stats.backoff = self.backoff
+        stats.seqno = self.seqno
+        stats.is_connected = self.is_connected()
+        stats.current_connection_duration = self.get_connection_duration(now)
+        stats.total_connected_duration = (stats.current_connection_duration +
+                                          self.total_connected_duration)
+        stats.n_attempted_connections = self.n_attempted_connections
+        stats.n_successful_connections = self.n_successful_connections
+        stats.state = self.state.name
+        stats.state_elapsed = now - self.state_entered
+        return stats
+
+    def __may_retry(self):
+        if self.max_tries is None:
+            return True
+        elif self.max_tries > 0:
+            self.max_tries -= 1
+            return True
+        else:
+            return False