1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 import ovs.socket_util
24 vlog = ovs.vlog.Vlog("stream")
27 def stream_or_pstream_needs_probes(name):
28 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29 verify connectivity. For [p]streams which need probes, it can take a long
30 time to notice the connection was dropped. Returns 0 if probes aren't
31 needed, and -1 if 'name' is invalid"""
33 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34 # Only unix and punix are supported currently.
41 """Bidirectional byte stream. Currently only Unix domain sockets
49 # Kinds of events that one might wait for.
50 W_CONNECT = 0 # Connect complete (success or failure).
51 W_RECV = 1 # Data received.
52 W_SEND = 2 # Send buffer room available.
57 def register_method(method):
58 def _register_method(cls):
59 Stream._SOCKET_METHODS[method + ":"] = cls
61 return _register_method
64 def _find_method(name):
65 for method, cls in Stream._SOCKET_METHODS.items():
66 if name.startswith(method):
71 def is_valid_name(name):
72 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
73 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
75 return bool(Stream._find_method(name))
77 def __init__(self, socket, name, status):
80 if status == errno.EAGAIN:
81 self.state = Stream.__S_CONNECTING
83 self.state = Stream.__S_CONNECTED
85 self.state = Stream.__S_DISCONNECTED
89 # Default value of dscp bits for connection between controller and manager.
90 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
91 # in <netinet/ip.h> is used.
92 IPTOS_PREC_INTERNETCONTROL = 0xc0
93 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
96 def open(name, dscp=DSCP_DEFAULT):
97 """Attempts to connect a stream to a remote peer. 'name' is a
98 connection name in the form "TYPE:ARGS", where TYPE is an active stream
99 class's name and ARGS are stream class-specific. Currently the only
100 supported TYPEs are "unix" and "tcp".
102 Returns (error, stream): on success 'error' is 0 and 'stream' is the
103 new Stream, on failure 'error' is a positive errno value and 'stream'
106 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
107 and a new Stream. The connect() method can be used to check for
108 successful connection completion."""
109 cls = Stream._find_method(name)
111 return errno.EAFNOSUPPORT, None
113 suffix = name.split(":", 1)[1]
114 error, sock = cls._open(suffix, dscp)
118 status = ovs.socket_util.check_connection_completion(sock)
119 return 0, Stream(sock, name, status)
122 def _open(suffix, dscp):
123 raise NotImplementedError("This method must be overrided by subclass")
126 def open_block((error, stream)):
127 """Blocks until a Stream completes its connection attempt, either
128 succeeding or failing. (error, stream) should be the tuple returned by
129 Stream.open(). Returns a tuple of the same form.
132 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
136 error = stream.connect()
137 if error != errno.EAGAIN:
140 poller = ovs.poller.Poller()
141 stream.run_wait(poller)
142 stream.connect_wait(poller)
144 assert error != errno.EINPROGRESS
154 def __scs_connecting(self):
155 retval = ovs.socket_util.check_connection_completion(self.socket)
156 assert retval != errno.EINPROGRESS
158 self.state = Stream.__S_CONNECTED
159 elif retval != errno.EAGAIN:
160 self.state = Stream.__S_DISCONNECTED
164 """Tries to complete the connection on this stream. If the connection
165 is complete, returns 0 if the connection was successful or a positive
166 errno value if it failed. If the connection is still in progress,
167 returns errno.EAGAIN."""
168 last_state = -1 # Always differs from initial self.state
169 while self.state != last_state:
170 last_state = self.state
171 if self.state == Stream.__S_CONNECTING:
172 self.__scs_connecting()
173 elif self.state == Stream.__S_CONNECTED:
175 elif self.state == Stream.__S_DISCONNECTED:
179 """Tries to receive up to 'n' bytes from this stream. Returns a
180 (error, string) tuple:
182 - If successful, 'error' is zero and 'string' contains between 1
183 and 'n' bytes of data.
185 - On error, 'error' is a positive errno value.
187 - If the connection has been closed in the normal fashion or if 'n'
188 is 0, the tuple is (0, "").
190 The recv function will not block waiting for data to arrive. If no
191 data have been received, it returns (errno.EAGAIN, "") immediately."""
193 retval = self.connect()
200 return (0, self.socket.recv(n))
201 except socket.error, e:
202 return (ovs.socket_util.get_exception_errno(e), "")
205 """Tries to send 'buf' on this stream.
207 If successful, returns the number of bytes sent, between 1 and
208 len(buf). 0 is only a valid return value if len(buf) is 0.
210 On error, returns a negative errno value.
212 Will not block. If no bytes can be immediately accepted for
213 transmission, returns -errno.EAGAIN immediately."""
215 retval = self.connect()
222 return self.socket.send(buf)
223 except socket.error, e:
224 return -ovs.socket_util.get_exception_errno(e)
229 def run_wait(self, poller):
232 def wait(self, poller, wait):
233 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
235 if self.state == Stream.__S_DISCONNECTED:
236 poller.immediate_wake()
239 if self.state == Stream.__S_CONNECTING:
240 wait = Stream.W_CONNECT
241 if wait == Stream.W_RECV:
242 poller.fd_wait(self.socket, select.POLLIN)
244 poller.fd_wait(self.socket, select.POLLOUT)
246 def connect_wait(self, poller):
247 self.wait(poller, Stream.W_CONNECT)
249 def recv_wait(self, poller):
250 self.wait(poller, Stream.W_RECV)
252 def send_wait(self, poller):
253 self.wait(poller, Stream.W_SEND)
256 # Don't delete the file: we might have forked.
260 class PassiveStream(object):
262 def is_valid_name(name):
263 """Returns True if 'name' is a passive stream name in the form
264 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
265 "punix:"), otherwise False."""
266 return name.startswith("punix:")
268 def __init__(self, sock, name, bind_path):
271 self.bind_path = bind_path
275 """Attempts to start listening for remote stream connections. 'name'
276 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
277 stream class's name and ARGS are stream class-specific. Currently the
278 only supported TYPE is "punix".
280 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
281 new PassiveStream, on failure 'error' is a positive errno value and
282 'pstream' is None."""
283 if not PassiveStream.is_valid_name(name):
284 return errno.EAFNOSUPPORT, None
287 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
288 True, bind_path, None)
294 except socket.error, e:
295 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
299 return 0, PassiveStream(sock, name, bind_path)
302 """Closes this PassiveStream."""
304 if self.bind_path is not None:
305 ovs.fatal_signal.unlink_file_now(self.bind_path)
306 self.bind_path = None
309 """Tries to accept a new connection on this passive stream. Returns
310 (error, stream): if successful, 'error' is 0 and 'stream' is the new
311 Stream object, and on failure 'error' is a positive errno value and
314 Will not block waiting for a connection. If no connection is ready to
315 be accepted, returns (errno.EAGAIN, None) immediately."""
319 sock, addr = self.socket.accept()
320 ovs.socket_util.set_nonblocking(sock)
321 return 0, Stream(sock, "unix:%s" % addr, 0)
322 except socket.error, e:
323 error = ovs.socket_util.get_exception_errno(e)
324 if error != errno.EAGAIN:
326 vlog.dbg("accept: %s" % os.strerror(error))
329 def wait(self, poller):
330 poller.fd_wait(self.socket, select.POLLIN)
333 # Don't delete the file: we might have forked.
339 Active %s connection methods:
340 unix:FILE Unix domain socket named FILE
341 tcp:IP:PORT TCP socket to IP with port no of PORT
343 Passive %s connection methods:
344 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
347 class UnixStream(Stream):
349 def _open(suffix, dscp):
350 connect_path = suffix
351 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
352 True, None, connect_path)
353 UnixStream = Stream.register_method("unix")(UnixStream)
356 class TCPStream(Stream):
358 def _open(suffix, dscp):
359 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
362 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
364 TCPStream = Stream.register_method("tcp")(TCPStream)