python: Honor zero probe interval in reconnect.py
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 class Stream(object):
28     """Bidirectional byte stream.  Currently only Unix domain sockets
29     are implemented."""
30
31     # States.
32     __S_CONNECTING = 0
33     __S_CONNECTED = 1
34     __S_DISCONNECTED = 2
35
36     # Kinds of events that one might wait for.
37     W_CONNECT = 0               # Connect complete (success or failure).
38     W_RECV = 1                  # Data received.
39     W_SEND = 2                  # Send buffer room available.
40
41     @staticmethod
42     def is_valid_name(name):
43         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
44         TYPE is a supported stream type (currently only "unix:"), otherwise
45         False."""
46         return name.startswith("unix:")
47
48     def __init__(self, socket, name, status):
49         self.socket = socket
50         self.name = name
51         if status == errno.EAGAIN:
52             self.state = Stream.__S_CONNECTING
53         elif status == 0:
54             self.state = Stream.__S_CONNECTED
55         else:
56             self.state = Stream.__S_DISCONNECTED
57
58         self.error = 0
59
60     @staticmethod
61     def open(name):
62         """Attempts to connect a stream to a remote peer.  'name' is a
63         connection name in the form "TYPE:ARGS", where TYPE is an active stream
64         class's name and ARGS are stream class-specific.  Currently the only
65         supported TYPE is "unix".
66
67         Returns (error, stream): on success 'error' is 0 and 'stream' is the
68         new Stream, on failure 'error' is a positive errno value and 'stream'
69         is None.
70
71         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
72         and a new Stream.  The connect() method can be used to check for
73         successful connection completion."""
74         if not Stream.is_valid_name(name):
75             return errno.EAFNOSUPPORT, None
76
77         connect_path = name[5:]
78         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
79                                                        True, None,
80                                                        connect_path)
81         if error:
82             return error, None
83         else:
84             status = ovs.socket_util.check_connection_completion(sock)
85             return 0, Stream(sock, name, status)
86
87     @staticmethod
88     def open_block((error, stream)):
89         """Blocks until a Stream completes its connection attempt, either
90         succeeding or failing.  (error, stream) should be the tuple returned by
91         Stream.open().  Returns a tuple of the same form.
92
93         Typical usage:
94         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
95
96         if not error:
97             while True:
98                 error = stream.connect()
99                 if error != errno.EAGAIN:
100                     break
101                 stream.run()
102                 poller = ovs.poller.Poller()
103                 stream.run_wait()
104                 stream.connect_wait(poller)
105                 poller.block()
106             assert error != errno.EINPROGRESS
107
108         if error and stream:
109             stream.close()
110             stream = None
111         return error, stream
112
113     def close(self):
114         self.socket.close()
115
116     def __scs_connecting(self):
117         retval = ovs.socket_util.check_connection_completion(self.socket)
118         assert retval != errno.EINPROGRESS
119         if retval == 0:
120             self.state = Stream.__S_CONNECTED
121         elif retval != errno.EAGAIN:
122             self.state = Stream.__S_DISCONNECTED
123             self.error = retval
124
125     def connect(self):
126         """Tries to complete the connection on this stream.  If the connection
127         is complete, returns 0 if the connection was successful or a positive
128         errno value if it failed.  If the connection is still in progress,
129         returns errno.EAGAIN."""
130         last_state = -1         # Always differs from initial self.state
131         while self.state != last_state:
132             last_state = self.state
133             if self.state == Stream.__S_CONNECTING:
134                 self.__scs_connecting()
135             elif self.state == Stream.__S_CONNECTED:
136                 return 0
137             elif self.state == Stream.__S_DISCONNECTED:
138                 return self.error
139
140     def recv(self, n):
141         """Tries to receive up to 'n' bytes from this stream.  Returns a
142         (error, string) tuple:
143
144             - If successful, 'error' is zero and 'string' contains between 1
145               and 'n' bytes of data.
146
147             - On error, 'error' is a positive errno value.
148
149             - If the connection has been closed in the normal fashion or if 'n'
150               is 0, the tuple is (0, "").
151
152         The recv function will not block waiting for data to arrive.  If no
153         data have been received, it returns (errno.EAGAIN, "") immediately."""
154
155         retval = self.connect()
156         if retval != 0:
157             return (retval, "")
158         elif n == 0:
159             return (0, "")
160
161         try:
162             return (0, self.socket.recv(n))
163         except socket.error, e:
164             return (ovs.socket_util.get_exception_errno(e), "")
165
166     def send(self, buf):
167         """Tries to send 'buf' on this stream.
168
169         If successful, returns the number of bytes sent, between 1 and
170         len(buf).  0 is only a valid return value if len(buf) is 0.
171
172         On error, returns a negative errno value.
173
174         Will not block.  If no bytes can be immediately accepted for
175         transmission, returns -errno.EAGAIN immediately."""
176
177         retval = self.connect()
178         if retval != 0:
179             return -retval
180         elif len(buf) == 0:
181             return 0
182
183         try:
184             return self.socket.send(buf)
185         except socket.error, e:
186             return -ovs.socket_util.get_exception_errno(e)
187
188     def run(self):
189         pass
190
191     def run_wait(self, poller):
192         pass
193
194     def wait(self, poller, wait):
195         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
196
197         if self.state == Stream.__S_DISCONNECTED:
198             poller.immediate_wake()
199             return
200
201         if self.state == Stream.__S_CONNECTING:
202             wait = Stream.W_CONNECT
203         if wait == Stream.W_RECV:
204             poller.fd_wait(self.socket, select.POLLIN)
205         else:
206             poller.fd_wait(self.socket, select.POLLOUT)
207
208     def connect_wait(self, poller):
209         self.wait(poller, Stream.W_CONNECT)
210
211     def recv_wait(self, poller):
212         self.wait(poller, Stream.W_RECV)
213
214     def send_wait(self, poller):
215         self.wait(poller, Stream.W_SEND)
216
217     def __del__(self):
218         # Don't delete the file: we might have forked.
219         self.socket.close()
220
221
222 class PassiveStream(object):
223     @staticmethod
224     def is_valid_name(name):
225         """Returns True if 'name' is a passive stream name in the form
226         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
227         "punix:"), otherwise False."""
228         return name.startswith("punix:")
229
230     def __init__(self, sock, name, bind_path):
231         self.name = name
232         self.socket = sock
233         self.bind_path = bind_path
234
235     @staticmethod
236     def open(name):
237         """Attempts to start listening for remote stream connections.  'name'
238         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
239         stream class's name and ARGS are stream class-specific.  Currently the
240         only supported TYPE is "punix".
241
242         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
243         new PassiveStream, on failure 'error' is a positive errno value and
244         'pstream' is None."""
245         if not PassiveStream.is_valid_name(name):
246             return errno.EAFNOSUPPORT, None
247
248         bind_path = name[6:]
249         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
250                                                        True, bind_path, None)
251         if error:
252             return error, None
253
254         try:
255             sock.listen(10)
256         except socket.error, e:
257             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
258             sock.close()
259             return e.error, None
260
261         return 0, PassiveStream(sock, name, bind_path)
262
263     def close(self):
264         """Closes this PassiveStream."""
265         self.socket.close()
266         if self.bind_path is not None:
267             ovs.fatal_signal.unlink_file_now(self.bind_path)
268             self.bind_path = None
269
270     def accept(self):
271         """Tries to accept a new connection on this passive stream.  Returns
272         (error, stream): if successful, 'error' is 0 and 'stream' is the new
273         Stream object, and on failure 'error' is a positive errno value and
274         'stream' is None.
275
276         Will not block waiting for a connection.  If no connection is ready to
277         be accepted, returns (errno.EAGAIN, None) immediately."""
278
279         while True:
280             try:
281                 sock, addr = self.socket.accept()
282                 ovs.socket_util.set_nonblocking(sock)
283                 return 0, Stream(sock, "unix:%s" % addr, 0)
284             except socket.error, e:
285                 error = ovs.socket_util.get_exception_errno(e)
286                 if error != errno.EAGAIN:
287                     # XXX rate-limit
288                     vlog.dbg("accept: %s" % os.strerror(error))
289                 return error, None
290
291     def wait(self, poller):
292         poller.fd_wait(self.socket, select.POLLIN)
293
294     def __del__(self):
295         # Don't delete the file: we might have forked.
296         self.socket.close()
297
298
299 def usage(name):
300     return """
301 Active %s connection methods:
302   unix:FILE               Unix domain socket named FILE
303
304 Passive %s connection methods:
305   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)