Global replace of Nicira Networks.
[sliver-openvswitch.git] / python / ovs / reconnect.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import os
16
17 import ovs.vlog
18 import ovs.util
19
20 # Values returned by Reconnect.run()
21 CONNECT = 'connect'
22 DISCONNECT = 'disconnect'
23 PROBE = 'probe'
24
25 EOF = ovs.util.EOF
26 vlog = ovs.vlog.Vlog("reconnect")
27
28
29 class Reconnect(object):
30     """A finite-state machine for connecting and reconnecting to a network
31     resource with exponential backoff.  It also provides optional support for
32     detecting a connection on which the peer is no longer responding.
33
34     The library does not implement anything networking related, only an FSM for
35     networking code to use.
36
37     Many Reconnect methods take a "now" argument.  This makes testing easier
38     since there is no hidden state.  When not testing, just pass the return
39     value of ovs.time.msec().  (Perhaps this design should be revisited
40     later.)"""
41
42     class Void(object):
43         name = "VOID"
44         is_connected = False
45
46         @staticmethod
47         def deadline(fsm):
48             return None
49
50         @staticmethod
51         def run(fsm, now):
52             return None
53
54     class Listening(object):
55         name = "LISTENING"
56         is_connected = False
57
58         @staticmethod
59         def deadline(fsm):
60             return None
61
62         @staticmethod
63         def run(fsm, now):
64             return None
65
66     class Backoff(object):
67         name = "BACKOFF"
68         is_connected = False
69
70         @staticmethod
71         def deadline(fsm):
72             return fsm.state_entered + fsm.backoff
73
74         @staticmethod
75         def run(fsm, now):
76             return CONNECT
77
78     class ConnectInProgress(object):
79         name = "CONNECTING"
80         is_connected = False
81
82         @staticmethod
83         def deadline(fsm):
84             return fsm.state_entered + max(1000, fsm.backoff)
85
86         @staticmethod
87         def run(fsm, now):
88             return DISCONNECT
89
90     class Active(object):
91         name = "ACTIVE"
92         is_connected = True
93
94         @staticmethod
95         def deadline(fsm):
96             if fsm.probe_interval:
97                 base = max(fsm.last_received, fsm.state_entered)
98                 return base + fsm.probe_interval
99             return None
100
101         @staticmethod
102         def run(fsm, now):
103             vlog.dbg("%s: idle %d ms, sending inactivity probe"
104                      % (fsm.name,
105                         now - max(fsm.last_received, fsm.state_entered)))
106             fsm._transition(now, Reconnect.Idle)
107             return PROBE
108
109     class Idle(object):
110         name = "IDLE"
111         is_connected = True
112
113         @staticmethod
114         def deadline(fsm):
115             if fsm.probe_interval:
116                 return fsm.state_entered + fsm.probe_interval
117             return None
118
119         @staticmethod
120         def run(fsm, now):
121             vlog.err("%s: no response to inactivity probe after %.3g "
122                      "seconds, disconnecting"
123                       % (fsm.name, (now - fsm.state_entered) / 1000.0))
124             return DISCONNECT
125
126     class Reconnect(object):
127         name = "RECONNECT"
128         is_connected = False
129
130         @staticmethod
131         def deadline(fsm):
132             return fsm.state_entered
133
134         @staticmethod
135         def run(fsm, now):
136             return DISCONNECT
137
138     def __init__(self, now):
139         """Creates and returns a new reconnect FSM with default settings.  The
140         FSM is initially disabled.  The caller will likely want to call
141         self.enable() and self.set_name() on the returned object."""
142
143         self.name = "void"
144         self.min_backoff = 1000
145         self.max_backoff = 8000
146         self.probe_interval = 5000
147         self.passive = False
148         self.info_level = vlog.info
149
150         self.state = Reconnect.Void
151         self.state_entered = now
152         self.backoff = 0
153         self.last_received = now
154         self.last_connected = None
155         self.last_disconnected = None
156         self.max_tries = None
157
158         self.creation_time = now
159         self.n_attempted_connections = 0
160         self.n_successful_connections = 0
161         self.total_connected_duration = 0
162         self.seqno = 0
163
164     def set_quiet(self, quiet):
165         """If 'quiet' is true, this object will log informational messages at
166         debug level, by default keeping them out of log files.  This is
167         appropriate if the connection is one that is expected to be
168         short-lived, so that the log messages are merely distracting.
169
170         If 'quiet' is false, this object logs informational messages at info
171         level.  This is the default.
172
173         This setting has no effect on the log level of debugging, warning, or
174         error messages."""
175         if quiet:
176             self.info_level = vlog.dbg
177         else:
178             self.info_level = vlog.info
179
180     def get_name(self):
181         return self.name
182
183     def set_name(self, name):
184         """Sets this object's name to 'name'.  If 'name' is None, then "void"
185         is used instead.
186
187         The name is used in log messages."""
188         if name is None:
189             self.name = "void"
190         else:
191             self.name = name
192
193     def get_min_backoff(self):
194         """Return the minimum number of milliseconds to back off between
195         consecutive connection attempts.  The default is 1000 ms."""
196         return self.min_backoff
197
198     def get_max_backoff(self):
199         """Return the maximum number of milliseconds to back off between
200         consecutive connection attempts.  The default is 8000 ms."""
201         return self.max_backoff
202
203     def get_probe_interval(self):
204         """Returns the "probe interval" in milliseconds.  If this is zero, it
205         disables the connection keepalive feature.  If it is nonzero, then if
206         the interval passes while the FSM is connected and without
207         self.received() being called, self.run() returns ovs.reconnect.PROBE.
208         If the interval passes again without self.received() being called,
209         self.run() returns ovs.reconnect.DISCONNECT."""
210         return self.probe_interval
211
212     def set_max_tries(self, max_tries):
213         """Limits the maximum number of times that this object will ask the
214         client to try to reconnect to 'max_tries'.  None (the default) means an
215         unlimited number of tries.
216
217         After the number of tries has expired, the FSM will disable itself
218         instead of backing off and retrying."""
219         self.max_tries = max_tries
220
221     def get_max_tries(self):
222         """Returns the current remaining number of connection attempts,
223         None if the number is unlimited."""
224         return self.max_tries
225
226     def set_backoff(self, min_backoff, max_backoff):
227         """Configures the backoff parameters for this FSM.  'min_backoff' is
228         the minimum number of milliseconds, and 'max_backoff' is the maximum,
229         between connection attempts.
230
231         'min_backoff' must be at least 1000, and 'max_backoff' must be greater
232         than or equal to 'min_backoff'."""
233         self.min_backoff = max(min_backoff, 1000)
234         if self.max_backoff:
235             self.max_backoff = max(max_backoff, 1000)
236         else:
237             self.max_backoff = 8000
238         if self.min_backoff > self.max_backoff:
239             self.max_backoff = self.min_backoff
240
241         if (self.state == Reconnect.Backoff and
242             self.backoff > self.max_backoff):
243                 self.backoff = self.max_backoff
244
245     def set_probe_interval(self, probe_interval):
246         """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
247         this is zero, it disables the connection keepalive feature.  If it is
248         nonzero, then if the interval passes while this FSM is connected and
249         without self.received() being called, self.run() returns
250         ovs.reconnect.PROBE.  If the interval passes again without
251         self.received() being called, self.run() returns
252         ovs.reconnect.DISCONNECT.
253
254         If 'probe_interval' is nonzero, then it will be forced to a value of at
255         least 1000 ms."""
256         if probe_interval:
257             self.probe_interval = max(1000, probe_interval)
258         else:
259             self.probe_interval = 0
260
261     def is_passive(self):
262         """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
263         active mode (the default)."""
264         return self.passive
265
266     def set_passive(self, passive, now):
267         """Configures this FSM for active or passive mode.  In active mode (the
268         default), the FSM is attempting to connect to a remote host.  In
269         passive mode, the FSM is listening for connections from a remote
270         host."""
271         if self.passive != passive:
272             self.passive = passive
273
274             if ((passive and self.state in (Reconnect.ConnectInProgress,
275                                             Reconnect.Reconnect)) or
276                 (not passive and self.state == Reconnect.Listening
277                  and self.__may_retry())):
278                 self._transition(now, Reconnect.Backoff)
279                 self.backoff = 0
280
281     def is_enabled(self):
282         """Returns true if this FSM has been enabled with self.enable().
283         Calling another function that indicates a change in connection state,
284         such as self.disconnected() or self.force_reconnect(), will also enable
285         a reconnect FSM."""
286         return self.state != Reconnect.Void
287
288     def enable(self, now):
289         """If this FSM is disabled (the default for newly created FSMs),
290         enables it, so that the next call to reconnect_run() for 'fsm' will
291         return ovs.reconnect.CONNECT.
292
293         If this FSM is not disabled, this function has no effect."""
294         if self.state == Reconnect.Void and self.__may_retry():
295             self._transition(now, Reconnect.Backoff)
296             self.backoff = 0
297
298     def disable(self, now):
299         """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
300         always return 0."""
301         if self.state != Reconnect.Void:
302             self._transition(now, Reconnect.Void)
303
304     def force_reconnect(self, now):
305         """If this FSM is enabled and currently connected (or attempting to
306         connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
307         time it is called, which should cause the client to drop the connection
308         (or attempt), back off, and then reconnect."""
309         if self.state in (Reconnect.ConnectInProgress,
310                           Reconnect.Active,
311                           Reconnect.Idle):
312             self._transition(now, Reconnect.Reconnect)
313
314     def disconnected(self, now, error):
315         """Tell this FSM that the connection dropped or that a connection
316         attempt failed.  'error' specifies the reason: a positive value
317         represents an errno value, EOF indicates that the connection was closed
318         by the peer (e.g. read() returned 0), and 0 indicates no specific
319         error.
320
321         The FSM will back off, then reconnect."""
322         if self.state not in (Reconnect.Backoff, Reconnect.Void):
323             # Report what happened
324             if self.state in (Reconnect.Active, Reconnect.Idle):
325                 if error > 0:
326                     vlog.warn("%s: connection dropped (%s)"
327                               % (self.name, os.strerror(error)))
328                 elif error == EOF:
329                     self.info_level("%s: connection closed by peer"
330                                     % self.name)
331                 else:
332                     self.info_level("%s: connection dropped" % self.name)
333             elif self.state == Reconnect.Listening:
334                 if error > 0:
335                     vlog.warn("%s: error listening for connections (%s)"
336                               % (self.name, os.strerror(error)))
337                 else:
338                     self.info_level("%s: error listening for connections"
339                                     % self.name)
340             else:
341                 if self.passive:
342                     type_ = "listen"
343                 else:
344                     type_ = "connection"
345                 if error > 0:
346                     vlog.warn("%s: %s attempt failed (%s)"
347                               % (self.name, type_, os.strerror(error)))
348                 else:
349                     self.info_level("%s: %s attempt timed out"
350                                     % (self.name, type_))
351
352             if (self.state in (Reconnect.Active, Reconnect.Idle)):
353                 self.last_disconnected = now
354
355             # Back off
356             if (self.state in (Reconnect.Active, Reconnect.Idle) and
357                 (self.last_received - self.last_connected >= self.backoff or
358                  self.passive)):
359                 if self.passive:
360                     self.backoff = 0
361                 else:
362                     self.backoff = self.min_backoff
363             else:
364                 if self.backoff < self.min_backoff:
365                     self.backoff = self.min_backoff
366                 elif self.backoff >= self.max_backoff / 2:
367                     self.backoff = self.max_backoff
368                 else:
369                     self.backoff *= 2
370
371                 if self.passive:
372                     self.info_level("%s: waiting %.3g seconds before trying "
373                                     "to listen again"
374                                     % (self.name, self.backoff / 1000.0))
375                 else:
376                     self.info_level("%s: waiting %.3g seconds before reconnect"
377                                     % (self.name, self.backoff / 1000.0))
378
379             if self.__may_retry():
380                 self._transition(now, Reconnect.Backoff)
381             else:
382                 self._transition(now, Reconnect.Void)
383
384     def connecting(self, now):
385         """Tell this FSM that a connection or listening attempt is in progress.
386
387         The FSM will start a timer, after which the connection or listening
388         attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
389         self.run())."""
390         if self.state != Reconnect.ConnectInProgress:
391             if self.passive:
392                 self.info_level("%s: listening..." % self.name)
393             else:
394                 self.info_level("%s: connecting..." % self.name)
395             self._transition(now, Reconnect.ConnectInProgress)
396
397     def listening(self, now):
398         """Tell this FSM that the client is listening for connection attempts.
399         This state last indefinitely until the client reports some change.
400
401         The natural progression from this state is for the client to report
402         that a connection has been accepted or is in progress of being
403         accepted, by calling self.connecting() or self.connected().
404
405         The client may also report that listening failed (e.g. accept()
406         returned an unexpected error such as ENOMEM) by calling
407         self.listen_error(), in which case the FSM will back off and eventually
408         return ovs.reconnect.CONNECT from self.run() to tell the client to try
409         listening again."""
410         if self.state != Reconnect.Listening:
411             self.info_level("%s: listening..." % self.name)
412             self._transition(now, Reconnect.Listening)
413
414     def listen_error(self, now, error):
415         """Tell this FSM that the client's attempt to accept a connection
416         failed (e.g. accept() returned an unexpected error such as ENOMEM).
417
418         If the FSM is currently listening (self.listening() was called), it
419         will back off and eventually return ovs.reconnect.CONNECT from
420         self.run() to tell the client to try listening again.  If there is an
421         active connection, this will be delayed until that connection drops."""
422         if self.state == Reconnect.Listening:
423             self.disconnected(now, error)
424
425     def connected(self, now):
426         """Tell this FSM that the connection was successful.
427
428         The FSM will start the probe interval timer, which is reset by
429         self.received().  If the timer expires, a probe will be sent (by
430         returning ovs.reconnect.PROBE from self.run().  If the timer expires
431         again without being reset, the connection will be aborted (by returning
432         ovs.reconnect.DISCONNECT from self.run()."""
433         if not self.state.is_connected:
434             self.connecting(now)
435
436             self.info_level("%s: connected" % self.name)
437             self._transition(now, Reconnect.Active)
438             self.last_connected = now
439
440     def connect_failed(self, now, error):
441         """Tell this FSM that the connection attempt failed.
442
443         The FSM will back off and attempt to reconnect."""
444         self.connecting(now)
445         self.disconnected(now, error)
446
447     def received(self, now):
448         """Tell this FSM that some data was received.  This resets the probe
449         interval timer, so that the connection is known not to be idle."""
450         if self.state != Reconnect.Active:
451             self._transition(now, Reconnect.Active)
452         self.last_received = now
453
454     def _transition(self, now, state):
455         if self.state == Reconnect.ConnectInProgress:
456             self.n_attempted_connections += 1
457             if state == Reconnect.Active:
458                 self.n_successful_connections += 1
459
460         connected_before = self.state.is_connected
461         connected_now = state.is_connected
462         if connected_before != connected_now:
463             if connected_before:
464                 self.total_connected_duration += now - self.last_connected
465             self.seqno += 1
466
467         vlog.dbg("%s: entering %s" % (self.name, state.name))
468         self.state = state
469         self.state_entered = now
470
471     def run(self, now):
472         """Assesses whether any action should be taken on this FSM.  The return
473         value is one of:
474
475             - None: The client need not take any action.
476
477             - Active client, ovs.reconnect.CONNECT: The client should start a
478               connection attempt and indicate this by calling
479               self.connecting().  If the connection attempt has definitely
480               succeeded, it should call self.connected().  If the connection
481               attempt has definitely failed, it should call
482               self.connect_failed().
483
484               The FSM is smart enough to back off correctly after successful
485               connections that quickly abort, so it is OK to call
486               self.connected() after a low-level successful connection
487               (e.g. connect()) even if the connection might soon abort due to a
488               failure at a high-level (e.g. SSL negotiation failure).
489
490             - Passive client, ovs.reconnect.CONNECT: The client should try to
491               listen for a connection, if it is not already listening.  It
492               should call self.listening() if successful, otherwise
493               self.connecting() or reconnected_connect_failed() if the attempt
494               is in progress or definitely failed, respectively.
495
496               A listening passive client should constantly attempt to accept a
497               new connection and report an accepted connection with
498               self.connected().
499
500             - ovs.reconnect.DISCONNECT: The client should abort the current
501               connection or connection attempt or listen attempt and call
502               self.disconnected() or self.connect_failed() to indicate it.
503
504             - ovs.reconnect.PROBE: The client should send some kind of request
505               to the peer that will elicit a response, to ensure that the
506               connection is indeed in working order.  (This will only be
507               returned if the "probe interval" is nonzero--see
508               self.set_probe_interval())."""
509
510         deadline = self.state.deadline(self)
511         if deadline is not None and now >= deadline:
512             return self.state.run(self, now)
513         else:
514             return None
515
516     def wait(self, poller, now):
517         """Causes the next call to poller.block() to wake up when self.run()
518         should be called."""
519         timeout = self.timeout(now)
520         if timeout >= 0:
521             poller.timer_wait(timeout)
522
523     def timeout(self, now):
524         """Returns the number of milliseconds after which self.run() should be
525         called if nothing else notable happens in the meantime, or None if this
526         is currently unnecessary."""
527         deadline = self.state.deadline(self)
528         if deadline is not None:
529             remaining = deadline - now
530             return max(0, remaining)
531         else:
532             return None
533
534     def is_connected(self):
535         """Returns True if this FSM is currently believed to be connected, that
536         is, if self.connected() was called more recently than any call to
537         self.connect_failed() or self.disconnected() or self.disable(), and
538         False otherwise."""
539         return self.state.is_connected
540
541     def get_last_connect_elapsed(self, now):
542         """Returns the number of milliseconds since 'fsm' was last connected
543         to its peer. Returns None if never connected."""
544         if self.last_connected:
545             return now - self.last_connected
546         else:
547             return None
548
549     def get_last_disconnect_elapsed(self, now):
550         """Returns the number of milliseconds since 'fsm' was last disconnected
551         from its peer. Returns None if never disconnected."""
552         if self.last_disconnected:
553             return now - self.last_disconnected
554         else:
555             return None
556
557     def get_stats(self, now):
558         class Stats(object):
559             pass
560         stats = Stats()
561         stats.creation_time = self.creation_time
562         stats.last_connected = self.last_connected
563         stats.last_disconnected = self.last_disconnected
564         stats.last_received = self.last_received
565         stats.backoff = self.backoff
566         stats.seqno = self.seqno
567         stats.is_connected = self.is_connected()
568         stats.msec_since_connect = self.get_last_connect_elapsed(now)
569         stats.msec_since_disconnect = self.get_last_disconnect_elapsed(now)
570         stats.total_connected_duration = self.total_connected_duration
571         if self.is_connected():
572             stats.total_connected_duration += (
573                     self.get_last_connect_elapsed(now))
574         stats.n_attempted_connections = self.n_attempted_connections
575         stats.n_successful_connections = self.n_successful_connections
576         stats.state = self.state.name
577         stats.state_elapsed = now - self.state_entered
578         return stats
579
580     def __may_retry(self):
581         if self.max_tries is None:
582             return True
583         elif self.max_tries > 0:
584             self.max_tries -= 1
585             return True
586         else:
587             return False