1 # Copyright (c) 2010, 2011 Nicira Networks
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 # Values returned by Reconnect.run()
20 DISCONNECT = 'disconnect'
26 class Reconnect(object):
27 """A finite-state machine for connecting and reconnecting to a network
28 resource with exponential backoff. It also provides optional support for
29 detecting a connection on which the peer is no longer responding.
31 The library does not implement anything networking related, only an FSM for
32 networking code to use.
34 Many Reconnect methods take a "now" argument. This makes testing easier
35 since there is no hidden state. When not testing, just pass the return
36 value of ovs.time.msec(). (Perhaps this design should be revisited
51 class Listening(object):
63 class Backoff(object):
69 return fsm.state_entered + fsm.backoff
75 class ConnectInProgress(object):
81 return fsm.state_entered + max(1000, fsm.backoff)
93 if fsm.probe_interval:
94 base = max(fsm.last_received, fsm.state_entered)
95 return base + fsm.probe_interval
100 logging.debug("%s: idle %d ms, sending inactivity probe"
102 now - max(fsm.last_received, fsm.state_entered)))
103 fsm._transition(now, Reconnect.Idle)
112 return fsm.state_entered + fsm.probe_interval
116 logging.error("%s: no response to inactivity probe after %.3g "
117 "seconds, disconnecting"
118 % (fsm.name, (now - fsm.state_entered) / 1000.0))
121 class Reconnect(object):
127 return fsm.state_entered
133 def __init__(self, now):
134 """Creates and returns a new reconnect FSM with default settings. The
135 FSM is initially disabled. The caller will likely want to call
136 self.enable() and self.set_name() on the returned object."""
139 self.min_backoff = 1000
140 self.max_backoff = 8000
141 self.probe_interval = 5000
143 self.info_level = logging.info
145 self.state = Reconnect.Void
146 self.state_entered = now
148 self.last_received = now
149 self.last_connected = None
150 self.last_disconnected = None
151 self.max_tries = None
153 self.creation_time = now
154 self.n_attempted_connections = 0
155 self.n_successful_connections = 0
156 self.total_connected_duration = 0
159 def set_quiet(self, quiet):
160 """If 'quiet' is true, this object will log informational messages at
161 debug level, by default keeping them out of log files. This is
162 appropriate if the connection is one that is expected to be
163 short-lived, so that the log messages are merely distracting.
165 If 'quiet' is false, this object logs informational messages at info
166 level. This is the default.
168 This setting has no effect on the log level of debugging, warning, or
171 self.info_level = logging.debug
173 self.info_level = logging.info
178 def set_name(self, name):
179 """Sets this object's name to 'name'. If 'name' is None, then "void"
182 The name is used in log messages."""
188 def get_min_backoff(self):
189 """Return the minimum number of milliseconds to back off between
190 consecutive connection attempts. The default is 1000 ms."""
191 return self.min_backoff
193 def get_max_backoff(self):
194 """Return the maximum number of milliseconds to back off between
195 consecutive connection attempts. The default is 8000 ms."""
196 return self.max_backoff
198 def get_probe_interval(self):
199 """Returns the "probe interval" in milliseconds. If this is zero, it
200 disables the connection keepalive feature. If it is nonzero, then if
201 the interval passes while the FSM is connected and without
202 self.received() being called, self.run() returns ovs.reconnect.PROBE.
203 If the interval passes again without self.received() being called,
204 self.run() returns ovs.reconnect.DISCONNECT."""
205 return self.probe_interval
207 def set_max_tries(self, max_tries):
208 """Limits the maximum number of times that this object will ask the
209 client to try to reconnect to 'max_tries'. None (the default) means an
210 unlimited number of tries.
212 After the number of tries has expired, the FSM will disable itself
213 instead of backing off and retrying."""
214 self.max_tries = max_tries
216 def get_max_tries(self):
217 """Returns the current remaining number of connection attempts,
218 None if the number is unlimited."""
219 return self.max_tries
221 def set_backoff(self, min_backoff, max_backoff):
222 """Configures the backoff parameters for this FSM. 'min_backoff' is
223 the minimum number of milliseconds, and 'max_backoff' is the maximum,
224 between connection attempts.
226 'min_backoff' must be at least 1000, and 'max_backoff' must be greater
227 than or equal to 'min_backoff'."""
228 self.min_backoff = max(min_backoff, 1000)
230 self.max_backoff = max(max_backoff, 1000)
232 self.max_backoff = 8000
233 if self.min_backoff > self.max_backoff:
234 self.max_backoff = self.min_backoff
236 if (self.state == Reconnect.Backoff and
237 self.backoff > self.max_backoff):
238 self.backoff = self.max_backoff
240 def set_probe_interval(self, probe_interval):
241 """Sets the "probe interval" to 'probe_interval', in milliseconds. If
242 this is zero, it disables the connection keepalive feature. If it is
243 nonzero, then if the interval passes while this FSM is connected and
244 without self.received() being called, self.run() returns
245 ovs.reconnect.PROBE. If the interval passes again without
246 self.received() being called, self.run() returns
247 ovs.reconnect.DISCONNECT.
249 If 'probe_interval' is nonzero, then it will be forced to a value of at
252 self.probe_interval = max(1000, probe_interval)
254 self.probe_interval = 0
256 def is_passive(self):
257 """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
258 active mode (the default)."""
261 def set_passive(self, passive, now):
262 """Configures this FSM for active or passive mode. In active mode (the
263 default), the FSM is attempting to connect to a remote host. In
264 passive mode, the FSM is listening for connections from a remote
266 if self.passive != passive:
267 self.passive = passive
269 if ((passive and self.state in (Reconnect.ConnectInProgress,
270 Reconnect.Reconnect)) or
271 (not passive and self.state == Reconnect.Listening
272 and self.__may_retry())):
273 self._transition(now, Reconnect.Backoff)
276 def is_enabled(self):
277 """Returns true if this FSM has been enabled with self.enable().
278 Calling another function that indicates a change in connection state,
279 such as self.disconnected() or self.force_reconnect(), will also enable
281 return self.state != Reconnect.Void
283 def enable(self, now):
284 """If this FSM is disabled (the default for newly created FSMs),
285 enables it, so that the next call to reconnect_run() for 'fsm' will
286 return ovs.reconnect.CONNECT.
288 If this FSM is not disabled, this function has no effect."""
289 if self.state == Reconnect.Void and self.__may_retry():
290 self._transition(now, Reconnect.Backoff)
293 def disable(self, now):
294 """Disables this FSM. Until 'fsm' is enabled again, self.run() will
296 if self.state != Reconnect.Void:
297 self._transition(now, Reconnect.Void)
299 def force_reconnect(self, now):
300 """If this FSM is enabled and currently connected (or attempting to
301 connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
302 time it is called, which should cause the client to drop the connection
303 (or attempt), back off, and then reconnect."""
304 if self.state in (Reconnect.ConnectInProgress,
307 self._transition(now, Reconnect.Reconnect)
309 def disconnected(self, now, error):
310 """Tell this FSM that the connection dropped or that a connection
311 attempt failed. 'error' specifies the reason: a positive value
312 represents an errno value, EOF indicates that the connection was closed
313 by the peer (e.g. read() returned 0), and 0 indicates no specific
316 The FSM will back off, then reconnect."""
317 if self.state not in (Reconnect.Backoff, Reconnect.Void):
318 # Report what happened
319 if self.state in (Reconnect.Active, Reconnect.Idle):
321 logging.warning("%s: connection dropped (%s)"
322 % (self.name, os.strerror(error)))
324 self.info_level("%s: connection closed by peer"
327 self.info_level("%s: connection dropped" % self.name)
328 elif self.state == Reconnect.Listening:
330 logging.warning("%s: error listening for connections (%s)"
331 % (self.name, os.strerror(error)))
333 self.info_level("%s: error listening for connections"
341 logging.warning("%s: %s attempt failed (%s)"
342 % (self.name, type_, os.strerror(error)))
344 self.info_level("%s: %s attempt timed out"
345 % (self.name, type_))
347 if (self.state in (Reconnect.Active, Reconnect.Idle)):
348 self.last_disconnected = now
351 if (self.state in (Reconnect.Active, Reconnect.Idle) and
352 (self.last_received - self.last_connected >= self.backoff or
357 self.backoff = self.min_backoff
359 if self.backoff < self.min_backoff:
360 self.backoff = self.min_backoff
361 elif self.backoff >= self.max_backoff / 2:
362 self.backoff = self.max_backoff
367 self.info_level("%s: waiting %.3g seconds before trying "
369 % (self.name, self.backoff / 1000.0))
371 self.info_level("%s: waiting %.3g seconds before reconnect"
372 % (self.name, self.backoff / 1000.0))
374 if self.__may_retry():
375 self._transition(now, Reconnect.Backoff)
377 self._transition(now, Reconnect.Void)
379 def connecting(self, now):
380 """Tell this FSM that a connection or listening attempt is in progress.
382 The FSM will start a timer, after which the connection or listening
383 attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
385 if self.state != Reconnect.ConnectInProgress:
387 self.info_level("%s: listening..." % self.name)
389 self.info_level("%s: connecting..." % self.name)
390 self._transition(now, Reconnect.ConnectInProgress)
392 def listening(self, now):
393 """Tell this FSM that the client is listening for connection attempts.
394 This state last indefinitely until the client reports some change.
396 The natural progression from this state is for the client to report
397 that a connection has been accepted or is in progress of being
398 accepted, by calling self.connecting() or self.connected().
400 The client may also report that listening failed (e.g. accept()
401 returned an unexpected error such as ENOMEM) by calling
402 self.listen_error(), in which case the FSM will back off and eventually
403 return ovs.reconnect.CONNECT from self.run() to tell the client to try
405 if self.state != Reconnect.Listening:
406 self.info_level("%s: listening..." % self.name)
407 self._transition(now, Reconnect.Listening)
409 def listen_error(self, now, error):
410 """Tell this FSM that the client's attempt to accept a connection
411 failed (e.g. accept() returned an unexpected error such as ENOMEM).
413 If the FSM is currently listening (self.listening() was called), it
414 will back off and eventually return ovs.reconnect.CONNECT from
415 self.run() to tell the client to try listening again. If there is an
416 active connection, this will be delayed until that connection drops."""
417 if self.state == Reconnect.Listening:
418 self.disconnected(now, error)
420 def connected(self, now):
421 """Tell this FSM that the connection was successful.
423 The FSM will start the probe interval timer, which is reset by
424 self.received(). If the timer expires, a probe will be sent (by
425 returning ovs.reconnect.PROBE from self.run(). If the timer expires
426 again without being reset, the connection will be aborted (by returning
427 ovs.reconnect.DISCONNECT from self.run()."""
428 if not self.state.is_connected:
431 self.info_level("%s: connected" % self.name)
432 self._transition(now, Reconnect.Active)
433 self.last_connected = now
435 def connect_failed(self, now, error):
436 """Tell this FSM that the connection attempt failed.
438 The FSM will back off and attempt to reconnect."""
440 self.disconnected(now, error)
442 def received(self, now):
443 """Tell this FSM that some data was received. This resets the probe
444 interval timer, so that the connection is known not to be idle."""
445 if self.state != Reconnect.Active:
446 self._transition(now, Reconnect.Active)
447 self.last_received = now
449 def _transition(self, now, state):
450 if self.state == Reconnect.ConnectInProgress:
451 self.n_attempted_connections += 1
452 if state == Reconnect.Active:
453 self.n_successful_connections += 1
455 connected_before = self.state.is_connected
456 connected_now = state.is_connected
457 if connected_before != connected_now:
459 self.total_connected_duration += now - self.last_connected
462 logging.debug("%s: entering %s" % (self.name, state.name))
464 self.state_entered = now
467 """Assesses whether any action should be taken on this FSM. The return
470 - None: The client need not take any action.
472 - Active client, ovs.reconnect.CONNECT: The client should start a
473 connection attempt and indicate this by calling
474 self.connecting(). If the connection attempt has definitely
475 succeeded, it should call self.connected(). If the connection
476 attempt has definitely failed, it should call
477 self.connect_failed().
479 The FSM is smart enough to back off correctly after successful
480 connections that quickly abort, so it is OK to call
481 self.connected() after a low-level successful connection
482 (e.g. connect()) even if the connection might soon abort due to a
483 failure at a high-level (e.g. SSL negotiation failure).
485 - Passive client, ovs.reconnect.CONNECT: The client should try to
486 listen for a connection, if it is not already listening. It
487 should call self.listening() if successful, otherwise
488 self.connecting() or reconnected_connect_failed() if the attempt
489 is in progress or definitely failed, respectively.
491 A listening passive client should constantly attempt to accept a
492 new connection and report an accepted connection with
495 - ovs.reconnect.DISCONNECT: The client should abort the current
496 connection or connection attempt or listen attempt and call
497 self.disconnected() or self.connect_failed() to indicate it.
499 - ovs.reconnect.PROBE: The client should send some kind of request
500 to the peer that will elicit a response, to ensure that the
501 connection is indeed in working order. (This will only be
502 returned if the "probe interval" is nonzero--see
503 self.set_probe_interval())."""
504 if now >= self.state.deadline(self):
505 return self.state.run(self, now)
509 def wait(self, poller, now):
510 """Causes the next call to poller.block() to wake up when self.run()
512 timeout = self.timeout(now)
514 poller.timer_wait(timeout)
516 def timeout(self, now):
517 """Returns the number of milliseconds after which self.run() should be
518 called if nothing else notable happens in the meantime, or None if this
519 is currently unnecessary."""
520 deadline = self.state.deadline(self)
521 if deadline is not None:
522 remaining = deadline - now
523 return max(0, remaining)
527 def is_connected(self):
528 """Returns True if this FSM is currently believed to be connected, that
529 is, if self.connected() was called more recently than any call to
530 self.connect_failed() or self.disconnected() or self.disable(), and
532 return self.state.is_connected
534 def get_last_connect_elapsed(self, now):
535 """Returns the number of milliseconds since 'fsm' was last connected
536 to its peer. Returns None if never connected."""
537 if self.last_connected:
538 return now - self.last_connected
542 def get_last_disconnect_elapsed(self, now):
543 """Returns the number of milliseconds since 'fsm' was last disconnected
544 from its peer. Returns None if never disconnected."""
545 if self.last_disconnected:
546 return now - self.last_disconnected
550 def get_stats(self, now):
554 stats.creation_time = self.creation_time
555 stats.last_connected = self.last_connected
556 stats.last_disconnected = self.last_disconnected
557 stats.last_received = self.last_received
558 stats.backoff = self.backoff
559 stats.seqno = self.seqno
560 stats.is_connected = self.is_connected()
561 stats.msec_since_connect = self.get_last_connect_elapsed(now)
562 stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
563 stats.total_connected_duration = self.total_connected_duration
564 if self.is_connected():
565 stats.total_connected_duration += (
566 self.get_last_connect_elapsed(now))
567 stats.n_attempted_connections = self.n_attempted_connections
568 stats.n_successful_connections = self.n_successful_connections
569 stats.state = self.state.name
570 stats.state_elapsed = now - self.state_entered
573 def __may_retry(self):
574 if self.max_tries is None:
576 elif self.max_tries > 0: