1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 import ovs.socket_util
23 vlog = ovs.vlog.Vlog("stream")
26 def stream_or_pstream_needs_probes(name):
27 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28 verify connectivity. For [p]streams which need probes, it can take a long
29 time to notice the connection was dropped. Returns 0 if probes aren't
30 needed, and -1 if 'name' is invalid"""
32 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33 # Only unix and punix are supported currently.
40 """Bidirectional byte stream. Currently only Unix domain sockets
48 # Kinds of events that one might wait for.
49 W_CONNECT = 0 # Connect complete (success or failure).
50 W_RECV = 1 # Data received.
51 W_SEND = 2 # Send buffer room available.
56 def register_method(method, cls):
57 Stream._SOCKET_METHODS[method + ":"] = cls
60 def _find_method(name):
61 for method, cls in Stream._SOCKET_METHODS.items():
62 if name.startswith(method):
67 def is_valid_name(name):
68 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71 return bool(Stream._find_method(name))
73 def __init__(self, socket, name, status):
76 if status == errno.EAGAIN:
77 self.state = Stream.__S_CONNECTING
79 self.state = Stream.__S_CONNECTED
81 self.state = Stream.__S_DISCONNECTED
85 # Default value of dscp bits for connection between controller and manager.
86 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87 # in <netinet/ip.h> is used.
88 IPTOS_PREC_INTERNETCONTROL = 0xc0
89 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
92 def open(name, dscp=DSCP_DEFAULT):
93 """Attempts to connect a stream to a remote peer. 'name' is a
94 connection name in the form "TYPE:ARGS", where TYPE is an active stream
95 class's name and ARGS are stream class-specific. Currently the only
96 supported TYPEs are "unix" and "tcp".
98 Returns (error, stream): on success 'error' is 0 and 'stream' is the
99 new Stream, on failure 'error' is a positive errno value and 'stream'
102 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
103 and a new Stream. The connect() method can be used to check for
104 successful connection completion."""
105 cls = Stream._find_method(name)
107 return errno.EAFNOSUPPORT, None
109 suffix = name.split(":", 1)[1]
110 error, sock = cls._open(suffix, dscp)
114 status = ovs.socket_util.check_connection_completion(sock)
115 return 0, Stream(sock, name, status)
118 def _open(suffix, dscp):
119 raise NotImplementedError("This method must be overrided by subclass")
122 def open_block((error, stream)):
123 """Blocks until a Stream completes its connection attempt, either
124 succeeding or failing. (error, stream) should be the tuple returned by
125 Stream.open(). Returns a tuple of the same form.
128 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
132 error = stream.connect()
133 if error != errno.EAGAIN:
136 poller = ovs.poller.Poller()
137 stream.run_wait(poller)
138 stream.connect_wait(poller)
140 assert error != errno.EINPROGRESS
150 def __scs_connecting(self):
151 retval = ovs.socket_util.check_connection_completion(self.socket)
152 assert retval != errno.EINPROGRESS
154 self.state = Stream.__S_CONNECTED
155 elif retval != errno.EAGAIN:
156 self.state = Stream.__S_DISCONNECTED
160 """Tries to complete the connection on this stream. If the connection
161 is complete, returns 0 if the connection was successful or a positive
162 errno value if it failed. If the connection is still in progress,
163 returns errno.EAGAIN."""
165 if self.state == Stream.__S_CONNECTING:
166 self.__scs_connecting()
168 if self.state == Stream.__S_CONNECTING:
170 elif self.state == Stream.__S_CONNECTED:
173 assert self.state == Stream.__S_DISCONNECTED
177 """Tries to receive up to 'n' bytes from this stream. Returns a
178 (error, string) tuple:
180 - If successful, 'error' is zero and 'string' contains between 1
181 and 'n' bytes of data.
183 - On error, 'error' is a positive errno value.
185 - If the connection has been closed in the normal fashion or if 'n'
186 is 0, the tuple is (0, "").
188 The recv function will not block waiting for data to arrive. If no
189 data have been received, it returns (errno.EAGAIN, "") immediately."""
191 retval = self.connect()
198 return (0, self.socket.recv(n))
199 except socket.error, e:
200 return (ovs.socket_util.get_exception_errno(e), "")
203 """Tries to send 'buf' on this stream.
205 If successful, returns the number of bytes sent, between 1 and
206 len(buf). 0 is only a valid return value if len(buf) is 0.
208 On error, returns a negative errno value.
210 Will not block. If no bytes can be immediately accepted for
211 transmission, returns -errno.EAGAIN immediately."""
213 retval = self.connect()
220 return self.socket.send(buf)
221 except socket.error, e:
222 return -ovs.socket_util.get_exception_errno(e)
227 def run_wait(self, poller):
230 def wait(self, poller, wait):
231 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
233 if self.state == Stream.__S_DISCONNECTED:
234 poller.immediate_wake()
237 if self.state == Stream.__S_CONNECTING:
238 wait = Stream.W_CONNECT
239 if wait == Stream.W_RECV:
240 poller.fd_wait(self.socket, ovs.poller.POLLIN)
242 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
244 def connect_wait(self, poller):
245 self.wait(poller, Stream.W_CONNECT)
247 def recv_wait(self, poller):
248 self.wait(poller, Stream.W_RECV)
250 def send_wait(self, poller):
251 self.wait(poller, Stream.W_SEND)
254 # Don't delete the file: we might have forked.
258 class PassiveStream(object):
260 def is_valid_name(name):
261 """Returns True if 'name' is a passive stream name in the form
262 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
263 "punix:"), otherwise False."""
264 return name.startswith("punix:")
266 def __init__(self, sock, name, bind_path):
269 self.bind_path = bind_path
273 """Attempts to start listening for remote stream connections. 'name'
274 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
275 stream class's name and ARGS are stream class-specific. Currently the
276 only supported TYPE is "punix".
278 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
279 new PassiveStream, on failure 'error' is a positive errno value and
280 'pstream' is None."""
281 if not PassiveStream.is_valid_name(name):
282 return errno.EAFNOSUPPORT, None
285 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
286 True, bind_path, None)
292 except socket.error, e:
293 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
297 return 0, PassiveStream(sock, name, bind_path)
300 """Closes this PassiveStream."""
302 if self.bind_path is not None:
303 ovs.fatal_signal.unlink_file_now(self.bind_path)
304 self.bind_path = None
307 """Tries to accept a new connection on this passive stream. Returns
308 (error, stream): if successful, 'error' is 0 and 'stream' is the new
309 Stream object, and on failure 'error' is a positive errno value and
312 Will not block waiting for a connection. If no connection is ready to
313 be accepted, returns (errno.EAGAIN, None) immediately."""
317 sock, addr = self.socket.accept()
318 ovs.socket_util.set_nonblocking(sock)
319 return 0, Stream(sock, "unix:%s" % addr, 0)
320 except socket.error, e:
321 error = ovs.socket_util.get_exception_errno(e)
322 if error != errno.EAGAIN:
324 vlog.dbg("accept: %s" % os.strerror(error))
327 def wait(self, poller):
328 poller.fd_wait(self.socket, ovs.poller.POLLIN)
331 # Don't delete the file: we might have forked.
337 Active %s connection methods:
338 unix:FILE Unix domain socket named FILE
339 tcp:IP:PORT TCP socket to IP with port no of PORT
341 Passive %s connection methods:
342 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
345 class UnixStream(Stream):
347 def _open(suffix, dscp):
348 connect_path = suffix
349 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
350 True, None, connect_path)
351 Stream.register_method("unix", UnixStream)
354 class TCPStream(Stream):
356 def _open(suffix, dscp):
357 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
360 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
362 Stream.register_method("tcp", TCPStream)