Implement initial Python bindings for Open vSwitch database.
[sliver-openvswitch.git] / python / ovs / reconnect.py
1 # Copyright (c) 2010 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import os
17
18 # Values returned by Reconnect.run()
19 CONNECT = 'connect'
20 DISCONNECT = 'disconnect'
21 PROBE = 'probe'
22
23 EOF = -1
24
25 class Reconnect(object):
26     """A finite-state machine for connecting and reconnecting to a network
27     resource with exponential backoff.  It also provides optional support for
28     detecting a connection on which the peer is no longer responding.
29
30     The library does not implement anything networking related, only an FSM for
31     networking code to use.
32
33     Many Reconnect methods take a "now" argument.  This makes testing easier
34     since there is no hidden state.  When not testing, just pass the return
35     value of ovs.time.msec().  (Perhaps this design should be revisited
36     later.)"""
37
38     class Void(object):
39         name = "VOID"
40         is_connected = False
41
42         @staticmethod
43         def deadline(fsm):
44             return None
45
46         @staticmethod
47         def run(fsm, now):
48             return None
49
50     class Listening(object):
51         name = "LISTENING"
52         is_connected = False
53
54         @staticmethod
55         def deadline(fsm):
56             return None
57
58         @staticmethod
59         def run(fsm, now):
60             return None
61
62     class Backoff(object):
63         name = "BACKOFF"
64         is_connected = False
65
66         @staticmethod
67         def deadline(fsm):
68             return fsm.state_entered + fsm.backoff
69
70         @staticmethod
71         def run(fsm, now):
72             return CONNECT
73
74     class ConnectInProgress(object):
75         name = "CONNECT_IN_PROGRESS"
76         is_connected = False
77
78         @staticmethod
79         def deadline(fsm):
80             return fsm.state_entered + max(1000, fsm.backoff)
81
82         @staticmethod
83         def run(fsm, now):
84             return DISCONNECT
85
86     class Active(object):
87         name = "ACTIVE"
88         is_connected = True
89
90         @staticmethod
91         def deadline(fsm):
92             if fsm.probe_interval:
93                 base = max(fsm.last_received, fsm.state_entered)
94                 return base + fsm.probe_interval
95             return None
96
97         @staticmethod
98         def run(fsm, now):
99             logging.debug("%s: idle %d ms, sending inactivity probe"
100                           % (fsm.name,
101                              now - max(fsm.last_received, fsm.state_entered)))
102             fsm._transition(now, Reconnect.Idle)
103             return PROBE
104
105     class Idle(object):
106         name = "IDLE"
107         is_connected = True
108
109         @staticmethod
110         def deadline(fsm):
111             return fsm.state_entered + fsm.probe_interval
112
113         @staticmethod
114         def run(fsm, now):
115             logging.error("%s: no response to inactivity probe after %.3g "
116                           "seconds, disconnecting"
117                           % (fsm.name, (now - fsm.state_entered) / 1000.0))
118             return DISCONNECT
119
120     class Reconnect:
121         name = "RECONNECT"
122         is_connected = False
123
124         @staticmethod
125         def deadline(fsm):
126             return fsm.state_entered
127
128         @staticmethod
129         def run(fsm, now):
130             return DISCONNECT
131
132     def __init__(self, now):
133         """Creates and returns a new reconnect FSM with default settings.  The
134         FSM is initially disabled.  The caller will likely want to call
135         self.enable() and self.set_name() on the returned object."""
136
137         self.name = "void"
138         self.min_backoff = 1000
139         self.max_backoff = 8000
140         self.probe_interval = 5000
141         self.passive = False
142         self.info_level = logging.info
143
144         self.state = Reconnect.Void
145         self.state_entered = now
146         self.backoff = 0
147         self.last_received = now
148         self.last_connected = now
149         self.max_tries = None
150
151         self.creation_time = now
152         self.n_attempted_connections = 0
153         self.n_successful_connections = 0
154         self.total_connected_duration = 0
155         self.seqno = 0
156
157     def set_quiet(self, quiet):
158         """If 'quiet' is true, this object will log informational messages at
159         debug level, by default keeping them out of log files.  This is
160         appropriate if the connection is one that is expected to be
161         short-lived, so that the log messages are merely distracting.
162         
163         If 'quiet' is false, this object logs informational messages at info
164         level.  This is the default.
165         
166         This setting has no effect on the log level of debugging, warning, or
167         error messages."""
168         if quiet:
169             self.info_level = logging.debug
170         else:
171             self.info_level = logging.info
172
173     def get_name(self):
174         return self.name
175
176     def set_name(self, name):
177         """Sets this object's name to 'name'.  If 'name' is None, then "void"
178         is used instead.
179         
180         The name is used in log messages."""
181         if name is None:
182             self.name = "void"
183         else:
184             self.name = name
185
186     def get_min_backoff(self):
187         """Return the minimum number of milliseconds to back off between
188         consecutive connection attempts.  The default is 1000 ms."""
189         return self.min_backoff
190
191     def get_max_backoff(self):
192         """Return the maximum number of milliseconds to back off between
193         consecutive connection attempts.  The default is 8000 ms."""
194         return self.max_backoff
195
196     def get_probe_interval(self):
197         """Returns the "probe interval" in milliseconds.  If this is zero, it
198         disables the connection keepalive feature.  If it is nonzero, then if
199         the interval passes while the FSM is connected and without
200         self.received() being called, self.run() returns ovs.reconnect.PROBE.
201         If the interval passes again without self.received() being called,
202         self.run() returns ovs.reconnect.DISCONNECT."""
203         return self.probe_interval
204
205     def set_max_tries(self, max_tries):
206         """Limits the maximum number of times that this object will ask the
207         client to try to reconnect to 'max_tries'.  None (the default) means an
208         unlimited number of tries.
209
210         After the number of tries has expired, the FSM will disable itself
211         instead of backing off and retrying."""
212         self.max_tries = max_tries
213
214     def get_max_tries(self):
215         """Returns the current remaining number of connection attempts,
216         None if the number is unlimited."""
217         return self.max_tries
218
219     def set_backoff(self, min_backoff, max_backoff):
220         """Configures the backoff parameters for this FSM.  'min_backoff' is
221         the minimum number of milliseconds, and 'max_backoff' is the maximum,
222         between connection attempts.
223
224         'min_backoff' must be at least 1000, and 'max_backoff' must be greater
225         than or equal to 'min_backoff'."""
226         self.min_backoff = max(min_backoff, 1000)
227         if self.max_backoff:
228             self.max_backoff = max(max_backoff, 1000)
229         else:
230             self.max_backoff = 8000
231         if self.min_backoff > self.max_backoff:
232             self.max_backoff = self.min_backoff
233
234         if (self.state == Reconnect.Backoff and
235             self.backoff > self.max_backoff):
236                 self.backoff = self.max_backoff
237         
238     def set_probe_interval(self, probe_interval):
239         """Sets the "probe interval" to 'probe_interval', in milliseconds.  If
240         this is zero, it disables the connection keepalive feature.  If it is
241         nonzero, then if the interval passes while this FSM is connected and
242         without self.received() being called, self.run() returns
243         ovs.reconnect.PROBE.  If the interval passes again without
244         self.received() being called, self.run() returns
245         ovs.reconnect.DISCONNECT.
246
247         If 'probe_interval' is nonzero, then it will be forced to a value of at
248         least 1000 ms."""
249         if probe_interval:
250             self.probe_interval = max(1000, probe_interval)
251         else:
252             self.probe_interval = 0
253
254     def is_passive(self):
255         """Returns true if 'fsm' is in passive mode, false if 'fsm' is in
256         active mode (the default)."""
257         return self.passive
258
259     def set_passive(self, passive, now):
260         """Configures this FSM for active or passive mode.  In active mode (the
261         default), the FSM is attempting to connect to a remote host.  In
262         passive mode, the FSM is listening for connections from a remote host."""
263         if self.passive != passive:
264             self.passive = passive
265
266             if ((passive and self.state in (Reconnect.ConnectInProgress,
267                                             Reconnect.Reconnect)) or
268                 (not passive and self.state == Reconnect.Listening
269                  and self.__may_retry())):
270                 self._transition(now, Reconnect.Backoff)
271                 self.backoff = 0
272
273     def is_enabled(self):
274         """Returns true if this FSM has been enabled with self.enable().
275         Calling another function that indicates a change in connection state,
276         such as self.disconnected() or self.force_reconnect(), will also enable
277         a reconnect FSM."""
278         return self.state != Reconnect.Void
279
280     def enable(self, now):
281         """If this FSM is disabled (the default for newly created FSMs),
282         enables it, so that the next call to reconnect_run() for 'fsm' will
283         return ovs.reconnect.CONNECT.
284
285         If this FSM is not disabled, this function has no effect."""
286         if self.state == Reconnect.Void and self.__may_retry():
287             self._transition(now, Reconnect.Backoff)
288             self.backoff = 0
289
290     def disable(self, now):
291         """Disables this FSM.  Until 'fsm' is enabled again, self.run() will
292         always return 0."""
293         if self.state != Reconnect.Void:
294             self._transition(now, Reconnect.Void)
295
296     def force_reconnect(self, now):
297         """If this FSM is enabled and currently connected (or attempting to
298         connect), forces self.run() to return ovs.reconnect.DISCONNECT the next
299         time it is called, which should cause the client to drop the connection
300         (or attempt), back off, and then reconnect."""
301         if self.state in (Reconnect.ConnectInProgress,
302                           Reconnect.Active,
303                           Reconnect.Idle):
304             self._transition(now, Reconnect.Reconnect)
305
306     def disconnected(self, now, error):
307         """Tell this FSM that the connection dropped or that a connection
308         attempt failed.  'error' specifies the reason: a positive value
309         represents an errno value, EOF indicates that the connection was closed
310         by the peer (e.g. read() returned 0), and 0 indicates no specific
311         error.
312
313         The FSM will back off, then reconnect."""
314         if self.state not in (Reconnect.Backoff, Reconnect.Void):
315             # Report what happened
316             if self.state in (Reconnect.Active, Reconnect.Idle):
317                 if error > 0:
318                     logging.warning("%s: connection dropped (%s)"
319                                     % (self.name, os.strerror(error)))
320                 elif error == EOF:
321                     self.info_level("%s: connection closed by peer"
322                                     % self.name)
323                 else:
324                     self.info_level("%s: connection dropped" % self.name)
325             elif self.state == Reconnect.Listening:
326                 if error > 0:
327                     logging.warning("%s: error listening for connections (%s)"
328                                     % (self.name, os.strerror(error)))
329                 else:
330                     self.info_level("%s: error listening for connections"
331                                     % self.name)
332             else:
333                 if self.passive:
334                     type = "listen"
335                 else:
336                     type = "connection"
337                 if error > 0:
338                     logging.warning("%s: %s attempt failed (%s)"
339                                     % (self.name, type, os.strerror(error)))
340                 else:
341                     self.info_level("%s: %s attempt timed out"
342                                     % (self.name, type))
343
344             # Back off
345             if (self.state in (Reconnect.Active, Reconnect.Idle) and
346                 (self.last_received - self.last_connected >= self.backoff or
347                  self.passive)):
348                 if self.passive:
349                     self.backoff = 0
350                 else:
351                     self.backoff = self.min_backoff
352             else:
353                 if self.backoff < self.min_backoff:
354                     self.backoff = self.min_backoff
355                 elif self.backoff >= self.max_backoff / 2:
356                     self.backoff = self.max_backoff
357                 else:
358                     self.backoff *= 2
359
360                 if self.passive:
361                     self.info_level("%s: waiting %.3g seconds before trying "
362                                     "to listen again"
363                                     % (self.name, self.backoff / 1000.0))
364                 else:
365                     self.info_level("%s: waiting %.3g seconds before reconnect"
366                                     % (self.name, self.backoff / 1000.0))
367
368             if self.__may_retry():
369                 self._transition(now, Reconnect.Backoff)
370             else:
371                 self._transition(now, Reconnect.Void)
372
373     def connecting(self, now):
374         """Tell this FSM that a connection or listening attempt is in progress.
375
376         The FSM will start a timer, after which the connection or listening
377         attempt will be aborted (by returning ovs.reconnect.DISCONNECT from
378         self.run())."""
379         if self.state != Reconnect.ConnectInProgress:
380             if self.passive:
381                 self.info_level("%s: listening..." % self.name)
382             else:
383                 self.info_level("%s: connecting..." % self.name)
384             self._transition(now, Reconnect.ConnectInProgress)
385             
386     def listening(self, now):
387         """Tell this FSM that the client is listening for connection attempts.
388         This state last indefinitely until the client reports some change.
389         
390         The natural progression from this state is for the client to report
391         that a connection has been accepted or is in progress of being
392         accepted, by calling self.connecting() or self.connected().
393         
394         The client may also report that listening failed (e.g. accept()
395         returned an unexpected error such as ENOMEM) by calling
396         self.listen_error(), in which case the FSM will back off and eventually
397         return ovs.reconnect.CONNECT from self.run() to tell the client to try
398         listening again."""
399         if self.state != Reconnect.Listening:
400             self.info_level("%s: listening..." % self.name)
401             self._transition(now, Reconnect.Listening)
402
403     def listen_error(self, now, error):
404         """Tell this FSM that the client's attempt to accept a connection
405         failed (e.g. accept() returned an unexpected error such as ENOMEM).
406         
407         If the FSM is currently listening (self.listening() was called), it
408         will back off and eventually return ovs.reconnect.CONNECT from
409         self.run() to tell the client to try listening again.  If there is an
410         active connection, this will be delayed until that connection drops."""
411         if self.state == Reconnect.Listening:
412             self.disconnected(now, error)
413
414     def connected(self, now):
415         """Tell this FSM that the connection was successful.
416
417         The FSM will start the probe interval timer, which is reset by
418         self.received().  If the timer expires, a probe will be sent (by
419         returning ovs.reconnect.PROBE from self.run().  If the timer expires
420         again without being reset, the connection will be aborted (by returning
421         ovs.reconnect.DISCONNECT from self.run()."""
422         if not self.state.is_connected:
423             self.connecting(now)
424
425             self.info_level("%s: connected" % self.name)
426             self._transition(now, Reconnect.Active)
427             self.last_connected = now
428
429     def connect_failed(self, now, error):
430         """Tell this FSM that the connection attempt failed.
431
432         The FSM will back off and attempt to reconnect."""
433         self.connecting(now)
434         self.disconnected(now, error)
435
436     def received(self, now):
437         """Tell this FSM that some data was received.  This resets the probe
438         interval timer, so that the connection is known not to be idle."""
439         if self.state != Reconnect.Active:
440             self._transition(now, Reconnect.Active)
441         self.last_received = now
442
443     def _transition(self, now, state):
444         if self.state == Reconnect.ConnectInProgress:
445             self.n_attempted_connections += 1
446             if state == Reconnect.Active:
447                 self.n_successful_connections += 1
448
449         connected_before = self.state.is_connected
450         connected_now = state.is_connected
451         if connected_before != connected_now:
452             if connected_before:
453                 self.total_connected_duration += now - self.last_connected
454             self.seqno += 1
455             
456         logging.debug("%s: entering %s" % (self.name, state.name))
457         self.state = state
458         self.state_entered = now
459
460     def run(self, now):
461         """Assesses whether any action should be taken on this FSM.  The return
462         value is one of:
463         
464             - None: The client need not take any action.
465         
466             - Active client, ovs.reconnect.CONNECT: The client should start a
467               connection attempt and indicate this by calling
468               self.connecting().  If the connection attempt has definitely
469               succeeded, it should call self.connected().  If the connection
470               attempt has definitely failed, it should call
471               self.connect_failed().
472         
473               The FSM is smart enough to back off correctly after successful
474               connections that quickly abort, so it is OK to call
475               self.connected() after a low-level successful connection
476               (e.g. connect()) even if the connection might soon abort due to a
477               failure at a high-level (e.g. SSL negotiation failure).
478         
479             - Passive client, ovs.reconnect.CONNECT: The client should try to
480               listen for a connection, if it is not already listening.  It
481               should call self.listening() if successful, otherwise
482               self.connecting() or reconnected_connect_failed() if the attempt
483               is in progress or definitely failed, respectively.
484         
485               A listening passive client should constantly attempt to accept a
486               new connection and report an accepted connection with
487               self.connected().
488         
489             - ovs.reconnect.DISCONNECT: The client should abort the current
490               connection or connection attempt or listen attempt and call
491               self.disconnected() or self.connect_failed() to indicate it.
492         
493             - ovs.reconnect.PROBE: The client should send some kind of request
494               to the peer that will elicit a response, to ensure that the
495               connection is indeed in working order.  (This will only be
496               returned if the "probe interval" is nonzero--see
497               self.set_probe_interval())."""
498         if now >= self.state.deadline(self):
499             return self.state.run(self, now)
500         else:
501             return None
502         
503     def wait(self, poller, now):
504         """Causes the next call to poller.block() to wake up when self.run()
505         should be called."""
506         timeout = self.timeout(now)
507         if timeout >= 0:
508             poller.timer_wait(timeout)
509
510     def timeout(self, now):
511         """Returns the number of milliseconds after which self.run() should be
512         called if nothing else notable happens in the meantime, or a negative
513         number if this is currently unnecessary."""
514         deadline = self.state.deadline(self)
515         if deadline is not None:
516             remaining = deadline - now
517             return max(0, remaining)
518         else:
519             return None
520
521     def is_connected(self):
522         """Returns True if this FSM is currently believed to be connected, that
523         is, if self.connected() was called more recently than any call to
524         self.connect_failed() or self.disconnected() or self.disable(), and
525         False otherwise."""
526         return self.state.is_connected
527
528     def get_connection_duration(self, now):
529         """Returns the number of milliseconds for which this FSM has been
530         continuously connected to its peer.  (If this FSM is not currently
531         connected, this is 0.)"""
532         if self.is_connected():
533             return now - self.last_connected
534         else:
535             return 0
536
537     def get_stats(self, now):
538         class Stats(object):
539             pass
540         stats = Stats()
541         stats.creation_time = self.creation_time
542         stats.last_connected = self.last_connected
543         stats.last_received = self.last_received
544         stats.backoff = self.backoff
545         stats.seqno = self.seqno
546         stats.is_connected = self.is_connected()
547         stats.current_connection_duration = self.get_connection_duration(now)
548         stats.total_connected_duration = (stats.current_connection_duration +
549                                           self.total_connected_duration)
550         stats.n_attempted_connections = self.n_attempted_connections
551         stats.n_successful_connections = self.n_successful_connections
552         stats.state = self.state.name
553         stats.state_elapsed = now - self.state_entered
554         return stats
555
556     def __may_retry(self):
557         if self.max_tries is None:
558             return True
559         elif self.max_tries > 0:
560             self.max_tries -= 1
561             return True
562         else:
563             return False