f3421438158fd651be1cc708f54e9f68235072f7
[sliver-openvswitch.git] / python / ovs / reconnect.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import os
17
18 # Values returned by Reconnect.run()
19 CONNECT = 'connect'
20 DISCONNECT = 'disconnect'
21 PROBE = 'probe'
22
23 EOF = -1
24
25
26 class Reconnect(object):
27     """A finite-state machine for connecting and reconnecting to a network
28     resource with exponential backoff.  It also provides optional support for
29     detecting a connection on which the peer is no longer responding.
30
31     The library does not implement anything networking related, only an FSM for
32     networking code to use.
33
34     Many Reconnect methods take a "now" argument.  This makes testing easier
35     since there is no hidden state.  When not testing, just pass the return
36     value of ovs.time.msec().  (Perhaps this design should be revisited
37     later.)"""
38
39     class Void(object):
40         name = "VOID"
41         is_connected = False
42
43         @staticmethod
44         def deadline(fsm):
45             return None
46
47         @staticmethod
48         def run(fsm, now):
49             return None
50
51     class Listening(object):
52         name = "LISTENING"
53         is_connected = False
54
55         @staticmethod
56         def deadline(fsm):
57             return None
58
59         @staticmethod
60         def run(fsm, now):
61             return None
62
63     class Backoff(object):
64         name = "BACKOFF"
65         is_connected = False
66
67         @staticmethod
68         def deadline(fsm):
69             return fsm.state_entered + fsm.backoff
70
71         @staticmethod
72         def run(fsm, now):
73             return CONNECT
74
75     class ConnectInProgress(object):
76         name = "CONNECTING"
77         is_connected = False
78
79         @staticmethod
80         def deadline(fsm):
81             return fsm.state_entered + max(1000, fsm.backoff)
82
83         @staticmethod
84         def run(fsm, now):
85             return DISCONNECT
86
87     class Active(object):
88         name = "ACTIVE"
89         is_connected = True
90
91         @staticmethod
92         def deadline(fsm):
93             if fsm.probe_interval:
94                 base = max(fsm.last_received, fsm.state_entered)
95                 return base + fsm.probe_interval
96             return None
97
98         @staticmethod
99         def run(fsm, now):
100             logging.debug("%s: idle %d ms, sending inactivity probe"
101                           % (fsm.name,
102                              now - max(fsm.last_received, fsm.state_entered)))
103             fsm._transition(now, Reconnect.Idle)
104             return PROBE
105
106     class Idle(object):
107         name = "IDLE"
108         is_connected = True
109
110         @staticmethod
111         def deadline(fsm):
112             return fsm.state_entered + fsm.probe_interval
113
114         @staticmethod
115         def run(fsm, now):
116             logging.error("%s: no response to inactivity probe after %.3g "
117                           "seconds, disconnecting"
118                           % (fsm.name, (now - fsm.state_entered) / 1000.0))
119             return DISCONNECT
120
121     class Reconnect(object):
122         name = "RECONNECT"
123         is_connected = False
124
125         @staticmethod
126         def deadline(fsm):
127             return fsm.state_entered
128
129         @staticmethod
130         def run(fsm, now):
131             return DISCONNECT
132
133     def __init__(self, now):
134         """Creates and returns a new reconnect FSM with default settings.  The
135         FSM is initially disabled.  The caller will likely want to call
136         self.enable() and self.set_name() on the returned object."""
137
138         self.name = "void"
139         self.min_backoff = 1000
140         self.max_backoff = 8000
141         self.probe_interval = 5000
142         self.passive = False
143         self.info_level = logging.info
144
145         self.state = Reconnect.Void
146         self.state_entered = now
147         self.backoff = 0
148         self.last_received = now
149         self.last_connected = None
150         self.last_disconnected = None
151         self.max_tries = None
152
153         self.creation_time = now
154         self.n_attempted_connections = 0
155         self.n_successful_connections = 0
156         self.total_connected_duration = 0
157         self.seqno = 0
158
159     def set_quiet(self, quiet):
160         """If 'quiet' is true, this object will log informational messages at
161         debug level, by default keeping them out of log files.  This is
162         appropriate if the connection is one that is expected to be
163         short-lived, so that the log messages are merely distracting.
164
165         If 'quiet' is false, this object logs informational messages at info
166         level.  This is the default.
167
168         This setting has no effect on the log level of debugging, warning, or
169         error messages."""
170         if quiet:
171             self.info_level = logging.debug
172         else:
173             self.info_level = logging.info
174
175     def get_name(self):
176         return self.name
177
178     def set_name(self, name):
179         """Sets this object's name to 'name'.  If 'name' is None, then "void"
180         is used instead.
181
182         The name is used in log messages."""
183         if name is None:
184             self.name = "void"
185         else:
186             self.name = name
187
188     def get_min_backoff(self):
189         """Return the minimum number of milliseconds to back off between
190         consecutive connection attempts.  The default is 1000 ms."""
191         return self.min_backoff
192
193     def get_max_backoff(self):
194         """Return the maximum number of milliseconds to back off between
195         consecutive connection attempts.  The default is 8000 ms."""
196         return self.max_backoff
197
198     def get_probe_interval(self):
199         """Returns the "probe interval" in milliseconds.  If this is zero, it
200         disables the connection keepalive feature.  If it is nonzero, then if
201         the interval passes while the FSM is connected and without
202         self.received() being called, self.run() returns ovs.reconnect.PROBE.
203         If the interval passes again without self.received() being called,
204         self.run() returns ovs.reconnect.DISCONNECT."""
205         return self.probe_interval
206
207     def set_max_tries(self, max_tries):
208         """Limits the maximum number of times that this object will ask the
209         client to try to reconnect to 'max_tries'.  None (the default) means an
210         unlimited number of tries.
211
212         After the number of tries has expired, the FSM will disable itself
213         instead of backing off and retrying."""
214         self.max_tries = max_tries
215
216     def get_max_tries(self):
217         """Returns the current remaining number of connection attempts,
218         None if the number is unlimited."""
219         return self.max_tries
220
221     def set_backoff(self, min_backoff, max_backoff):
222         """Configures the backoff parameters for this FSM.  'min_backoff' is
223         the minimum number of milliseconds, and 'max_backoff' is the maximum,
224         between connection attempts.
225
226         'min_backoff' must be at least 1000, and 'max_backoff' must be greater
227         than or equal to 'min_backoff'."""
228         self.min_backoff = max(min_backoff, 1000)
229         if self.max_backoff:
230             self.max_backoff = max(max_backoff, 1000)
231         else:
232             self.max_backoff = 8000
233         if self.min_backoff > self.max_backoff:
234             self.max_backoff = self.min_backoff
235
236         if (self.state == Reconnect.Backoff and
237             self.backoff > self.max_backoff):
238                 self.backoff = self.max_backoff
239
240     def set_probe_interval(self, probe_interval):
241         """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
242         this is zero, it disables the connection keepalive feature.  If it is
243         nonzero, then if the interval passes while this FSM is connected and
244         without self.received() being called, self.run() returns
245         ovs.reconnect.PROBE.  If the interval passes again without
246         self.received() being called, self.run() returns
247         ovs.reconnect.DISCONNECT.
248
249         If 'probe_interval' is nonzero, then it will be forced to a value of at
250         least 1000 ms."""
251         if probe_interval:
252             self.probe_interval = max(1000, probe_interval)
253         else:
254             self.probe_interval = 0
255
256     def is_passive(self):
257         """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
258         active mode (the default)."""
259         return self.passive
260
261     def set_passive(self, passive, now):
262         """Configures this FSM for active or passive mode.  In active mode (the
263         default), the FSM is attempting to connect to a remote host.  In
264         passive mode, the FSM is listening for connections from a remote
265         host."""
266         if self.passive != passive:
267             self.passive = passive
268
269             if ((passive and self.state in (Reconnect.ConnectInProgress,
270                                             Reconnect.Reconnect)) or
271                 (not passive and self.state == Reconnect.Listening
272                  and self.__may_retry())):
273                 self._transition(now, Reconnect.Backoff)
274                 self.backoff = 0
275
276     def is_enabled(self):
277         """Returns true if this FSM has been enabled with self.enable().
278         Calling another function that indicates a change in connection state,
279         such as self.disconnected() or self.force_reconnect(), will also enable
280         a reconnect FSM."""
281         return self.state != Reconnect.Void
282
283     def enable(self, now):
284         """If this FSM is disabled (the default for newly created FSMs),
285         enables it, so that the next call to reconnect_run() for 'fsm' will
286         return ovs.reconnect.CONNECT.
287
288         If this FSM is not disabled, this function has no effect."""
289         if self.state == Reconnect.Void and self.__may_retry():
290             self._transition(now, Reconnect.Backoff)
291             self.backoff = 0
292
293     def disable(self, now):
294         """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
295         always return 0."""
296         if self.state != Reconnect.Void:
297             self._transition(now, Reconnect.Void)
298
299     def force_reconnect(self, now):
300         """If this FSM is enabled and currently connected (or attempting to
301         connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
302         time it is called, which should cause the client to drop the connection
303         (or attempt), back off, and then reconnect."""
304         if self.state in (Reconnect.ConnectInProgress,
305                           Reconnect.Active,
306                           Reconnect.Idle):
307             self._transition(now, Reconnect.Reconnect)
308
309     def disconnected(self, now, error):
310         """Tell this FSM that the connection dropped or that a connection
311         attempt failed.  'error' specifies the reason: a positive value
312         represents an errno value, EOF indicates that the connection was closed
313         by the peer (e.g. read() returned 0), and 0 indicates no specific
314         error.
315
316         The FSM will back off, then reconnect."""
317         if self.state not in (Reconnect.Backoff, Reconnect.Void):
318             # Report what happened
319             if self.state in (Reconnect.Active, Reconnect.Idle):
320                 if error > 0:
321                     logging.warning("%s: connection dropped (%s)"
322                                     % (self.name, os.strerror(error)))
323                 elif error == EOF:
324                     self.info_level("%s: connection closed by peer"
325                                     % self.name)
326                 else:
327                     self.info_level("%s: connection dropped" % self.name)
328             elif self.state == Reconnect.Listening:
329                 if error > 0:
330                     logging.warning("%s: error listening for connections (%s)"
331                                     % (self.name, os.strerror(error)))
332                 else:
333                     self.info_level("%s: error listening for connections"
334                                     % self.name)
335             else:
336                 if self.passive:
337                     type_ = "listen"
338                 else:
339                     type_ = "connection"
340                 if error > 0:
341                     logging.warning("%s: %s attempt failed (%s)"
342                                     % (self.name, type_, os.strerror(error)))
343                 else:
344                     self.info_level("%s: %s attempt timed out"
345                                     % (self.name, type_))
346
347             if (self.state in (Reconnect.Active, Reconnect.Idle)):
348                 self.last_disconnected = now
349
350             # Back off
351             if (self.state in (Reconnect.Active, Reconnect.Idle) and
352                 (self.last_received - self.last_connected >= self.backoff or
353                  self.passive)):
354                 if self.passive:
355                     self.backoff = 0
356                 else:
357                     self.backoff = self.min_backoff
358             else:
359                 if self.backoff < self.min_backoff:
360                     self.backoff = self.min_backoff
361                 elif self.backoff >= self.max_backoff / 2:
362                     self.backoff = self.max_backoff
363                 else:
364                     self.backoff *= 2
365
366                 if self.passive:
367                     self.info_level("%s: waiting %.3g seconds before trying "
368                                     "to listen again"
369                                     % (self.name, self.backoff / 1000.0))
370                 else:
371                     self.info_level("%s: waiting %.3g seconds before reconnect"
372                                     % (self.name, self.backoff / 1000.0))
373
374             if self.__may_retry():
375                 self._transition(now, Reconnect.Backoff)
376             else:
377                 self._transition(now, Reconnect.Void)
378
379     def connecting(self, now):
380         """Tell this FSM that a connection or listening attempt is in progress.
381
382         The FSM will start a timer, after which the connection or listening
383         attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
384         self.run())."""
385         if self.state != Reconnect.ConnectInProgress:
386             if self.passive:
387                 self.info_level("%s: listening..." % self.name)
388             else:
389                 self.info_level("%s: connecting..." % self.name)
390             self._transition(now, Reconnect.ConnectInProgress)
391
392     def listening(self, now):
393         """Tell this FSM that the client is listening for connection attempts.
394         This state last indefinitely until the client reports some change.
395
396         The natural progression from this state is for the client to report
397         that a connection has been accepted or is in progress of being
398         accepted, by calling self.connecting() or self.connected().
399
400         The client may also report that listening failed (e.g. accept()
401         returned an unexpected error such as ENOMEM) by calling
402         self.listen_error(), in which case the FSM will back off and eventually
403         return ovs.reconnect.CONNECT from self.run() to tell the client to try
404         listening again."""
405         if self.state != Reconnect.Listening:
406             self.info_level("%s: listening..." % self.name)
407             self._transition(now, Reconnect.Listening)
408
409     def listen_error(self, now, error):
410         """Tell this FSM that the client's attempt to accept a connection
411         failed (e.g. accept() returned an unexpected error such as ENOMEM).
412
413         If the FSM is currently listening (self.listening() was called), it
414         will back off and eventually return ovs.reconnect.CONNECT from
415         self.run() to tell the client to try listening again.  If there is an
416         active connection, this will be delayed until that connection drops."""
417         if self.state == Reconnect.Listening:
418             self.disconnected(now, error)
419
420     def connected(self, now):
421         """Tell this FSM that the connection was successful.
422
423         The FSM will start the probe interval timer, which is reset by
424         self.received().  If the timer expires, a probe will be sent (by
425         returning ovs.reconnect.PROBE from self.run().  If the timer expires
426         again without being reset, the connection will be aborted (by returning
427         ovs.reconnect.DISCONNECT from self.run()."""
428         if not self.state.is_connected:
429             self.connecting(now)
430
431             self.info_level("%s: connected" % self.name)
432             self._transition(now, Reconnect.Active)
433             self.last_connected = now
434
435     def connect_failed(self, now, error):
436         """Tell this FSM that the connection attempt failed.
437
438         The FSM will back off and attempt to reconnect."""
439         self.connecting(now)
440         self.disconnected(now, error)
441
442     def received(self, now):
443         """Tell this FSM that some data was received.  This resets the probe
444         interval timer, so that the connection is known not to be idle."""
445         if self.state != Reconnect.Active:
446             self._transition(now, Reconnect.Active)
447         self.last_received = now
448
449     def _transition(self, now, state):
450         if self.state == Reconnect.ConnectInProgress:
451             self.n_attempted_connections += 1
452             if state == Reconnect.Active:
453                 self.n_successful_connections += 1
454
455         connected_before = self.state.is_connected
456         connected_now = state.is_connected
457         if connected_before != connected_now:
458             if connected_before:
459                 self.total_connected_duration += now - self.last_connected
460             self.seqno += 1
461
462         logging.debug("%s: entering %s" % (self.name, state.name))
463         self.state = state
464         self.state_entered = now
465
466     def run(self, now):
467         """Assesses whether any action should be taken on this FSM.  The return
468         value is one of:
469
470             - None: The client need not take any action.
471
472             - Active client, ovs.reconnect.CONNECT: The client should start a
473               connection attempt and indicate this by calling
474               self.connecting().  If the connection attempt has definitely
475               succeeded, it should call self.connected().  If the connection
476               attempt has definitely failed, it should call
477               self.connect_failed().
478
479               The FSM is smart enough to back off correctly after successful
480               connections that quickly abort, so it is OK to call
481               self.connected() after a low-level successful connection
482               (e.g. connect()) even if the connection might soon abort due to a
483               failure at a high-level (e.g. SSL negotiation failure).
484
485             - Passive client, ovs.reconnect.CONNECT: The client should try to
486               listen for a connection, if it is not already listening.  It
487               should call self.listening() if successful, otherwise
488               self.connecting() or reconnected_connect_failed() if the attempt
489               is in progress or definitely failed, respectively.
490
491               A listening passive client should constantly attempt to accept a
492               new connection and report an accepted connection with
493               self.connected().
494
495             - ovs.reconnect.DISCONNECT: The client should abort the current
496               connection or connection attempt or listen attempt and call
497               self.disconnected() or self.connect_failed() to indicate it.
498
499             - ovs.reconnect.PROBE: The client should send some kind of request
500               to the peer that will elicit a response, to ensure that the
501               connection is indeed in working order.  (This will only be
502               returned if the "probe interval" is nonzero--see
503               self.set_probe_interval())."""
504         if now >= self.state.deadline(self):
505             return self.state.run(self, now)
506         else:
507             return None
508
509     def wait(self, poller, now):
510         """Causes the next call to poller.block() to wake up when self.run()
511         should be called."""
512         timeout = self.timeout(now)
513         if timeout >= 0:
514             poller.timer_wait(timeout)
515
516     def timeout(self, now):
517         """Returns the number of milliseconds after which self.run() should be
518         called if nothing else notable happens in the meantime, or None if this
519         is currently unnecessary."""
520         deadline = self.state.deadline(self)
521         if deadline is not None:
522             remaining = deadline - now
523             return max(0, remaining)
524         else:
525             return None
526
527     def is_connected(self):
528         """Returns True if this FSM is currently believed to be connected, that
529         is, if self.connected() was called more recently than any call to
530         self.connect_failed() or self.disconnected() or self.disable(), and
531         False otherwise."""
532         return self.state.is_connected
533
534     def get_last_connect_elapsed(self, now):
535         """Returns the number of milliseconds since 'fsm' was last connected
536         to its peer. Returns None if never connected."""
537         if self.last_connected:
538             return now - self.last_connected
539         else:
540             return None
541
542     def get_last_disconnect_elapsed(self, now):
543         """Returns the number of milliseconds since 'fsm' was last disconnected
544         from its peer. Returns None if never disconnected."""
545         if self.last_disconnected:
546             return now - self.last_disconnected
547         else:
548             return None
549
550     def get_stats(self, now):
551         class Stats(object):
552             pass
553         stats = Stats()
554         stats.creation_time = self.creation_time
555         stats.last_connected = self.last_connected
556         stats.last_disconnected = self.last_disconnected
557         stats.last_received = self.last_received
558         stats.backoff = self.backoff
559         stats.seqno = self.seqno
560         stats.is_connected = self.is_connected()
561         stats.msec_since_connect = self.get_last_connect_elapsed(now)
562         stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
563         stats.total_connected_duration = self.total_connected_duration
564         if self.is_connected():
565             stats.total_connected_duration += (
566                     self.get_last_connect_elapsed(now))
567         stats.n_attempted_connections = self.n_attempted_connections
568         stats.n_successful_connections = self.n_successful_connections
569         stats.state = self.state.name
570         stats.state_elapsed = now - self.state_entered
571         return stats
572
573     def __may_retry(self):
574         if self.max_tries is None:
575             return True
576         elif self.max_tries > 0:
577             self.max_tries -= 1
578             return True
579         else:
580             return False