1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 import ovs.socket_util
23 vlog = ovs.vlog.Vlog("stream")
26 def stream_or_pstream_needs_probes(name):
27 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28 verify connectivity. For [p]streams which need probes, it can take a long
29 time to notice the connection was dropped. Returns 0 if probes aren't
30 needed, and -1 if 'name' is invalid"""
32 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33 # Only unix and punix are supported currently.
40 """Bidirectional byte stream. Currently only Unix domain sockets
48 # Kinds of events that one might wait for.
49 W_CONNECT = 0 # Connect complete (success or failure).
50 W_RECV = 1 # Data received.
51 W_SEND = 2 # Send buffer room available.
56 def register_method(method, cls):
57 Stream._SOCKET_METHODS[method + ":"] = cls
60 def _find_method(name):
61 for method, cls in Stream._SOCKET_METHODS.items():
62 if name.startswith(method):
67 def is_valid_name(name):
68 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71 return bool(Stream._find_method(name))
73 def __init__(self, socket, name, status):
76 if status == errno.EAGAIN:
77 self.state = Stream.__S_CONNECTING
79 self.state = Stream.__S_CONNECTED
81 self.state = Stream.__S_DISCONNECTED
85 # Default value of dscp bits for connection between controller and manager.
86 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87 # in <netinet/ip.h> is used.
88 IPTOS_PREC_INTERNETCONTROL = 0xc0
89 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
92 def open(name, dscp=DSCP_DEFAULT):
93 """Attempts to connect a stream to a remote peer. 'name' is a
94 connection name in the form "TYPE:ARGS", where TYPE is an active stream
95 class's name and ARGS are stream class-specific. Currently the only
96 supported TYPEs are "unix" and "tcp".
98 Returns (error, stream): on success 'error' is 0 and 'stream' is the
99 new Stream, on failure 'error' is a positive errno value and 'stream'
102 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
103 and a new Stream. The connect() method can be used to check for
104 successful connection completion."""
105 cls = Stream._find_method(name)
107 return errno.EAFNOSUPPORT, None
109 suffix = name.split(":", 1)[1]
110 if name.startswith("unix:"):
111 suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112 error, sock = cls._open(suffix, dscp)
116 status = ovs.socket_util.check_connection_completion(sock)
117 return 0, Stream(sock, name, status)
120 def _open(suffix, dscp):
121 raise NotImplementedError("This method must be overrided by subclass")
124 def open_block((error, stream)):
125 """Blocks until a Stream completes its connection attempt, either
126 succeeding or failing. (error, stream) should be the tuple returned by
127 Stream.open(). Returns a tuple of the same form.
130 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
134 error = stream.connect()
135 if error != errno.EAGAIN:
138 poller = ovs.poller.Poller()
139 stream.run_wait(poller)
140 stream.connect_wait(poller)
142 assert error != errno.EINPROGRESS
152 def __scs_connecting(self):
153 retval = ovs.socket_util.check_connection_completion(self.socket)
154 assert retval != errno.EINPROGRESS
156 self.state = Stream.__S_CONNECTED
157 elif retval != errno.EAGAIN:
158 self.state = Stream.__S_DISCONNECTED
162 """Tries to complete the connection on this stream. If the connection
163 is complete, returns 0 if the connection was successful or a positive
164 errno value if it failed. If the connection is still in progress,
165 returns errno.EAGAIN."""
167 if self.state == Stream.__S_CONNECTING:
168 self.__scs_connecting()
170 if self.state == Stream.__S_CONNECTING:
172 elif self.state == Stream.__S_CONNECTED:
175 assert self.state == Stream.__S_DISCONNECTED
179 """Tries to receive up to 'n' bytes from this stream. Returns a
180 (error, string) tuple:
182 - If successful, 'error' is zero and 'string' contains between 1
183 and 'n' bytes of data.
185 - On error, 'error' is a positive errno value.
187 - If the connection has been closed in the normal fashion or if 'n'
188 is 0, the tuple is (0, "").
190 The recv function will not block waiting for data to arrive. If no
191 data have been received, it returns (errno.EAGAIN, "") immediately."""
193 retval = self.connect()
200 return (0, self.socket.recv(n))
201 except socket.error, e:
202 return (ovs.socket_util.get_exception_errno(e), "")
205 """Tries to send 'buf' on this stream.
207 If successful, returns the number of bytes sent, between 1 and
208 len(buf). 0 is only a valid return value if len(buf) is 0.
210 On error, returns a negative errno value.
212 Will not block. If no bytes can be immediately accepted for
213 transmission, returns -errno.EAGAIN immediately."""
215 retval = self.connect()
222 return self.socket.send(buf)
223 except socket.error, e:
224 return -ovs.socket_util.get_exception_errno(e)
229 def run_wait(self, poller):
232 def wait(self, poller, wait):
233 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
235 if self.state == Stream.__S_DISCONNECTED:
236 poller.immediate_wake()
239 if self.state == Stream.__S_CONNECTING:
240 wait = Stream.W_CONNECT
241 if wait == Stream.W_RECV:
242 poller.fd_wait(self.socket, ovs.poller.POLLIN)
244 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
246 def connect_wait(self, poller):
247 self.wait(poller, Stream.W_CONNECT)
249 def recv_wait(self, poller):
250 self.wait(poller, Stream.W_RECV)
252 def send_wait(self, poller):
253 self.wait(poller, Stream.W_SEND)
256 # Don't delete the file: we might have forked.
260 class PassiveStream(object):
262 def is_valid_name(name):
263 """Returns True if 'name' is a passive stream name in the form
264 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
265 "punix:"), otherwise False."""
266 return name.startswith("punix:")
268 def __init__(self, sock, name, bind_path):
271 self.bind_path = bind_path
275 """Attempts to start listening for remote stream connections. 'name'
276 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
277 stream class's name and ARGS are stream class-specific. Currently the
278 only supported TYPE is "punix".
280 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
281 new PassiveStream, on failure 'error' is a positive errno value and
282 'pstream' is None."""
283 if not PassiveStream.is_valid_name(name):
284 return errno.EAFNOSUPPORT, None
287 if name.startswith("punix:"):
288 bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
289 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
290 True, bind_path, None)
296 except socket.error, e:
297 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
301 return 0, PassiveStream(sock, name, bind_path)
304 """Closes this PassiveStream."""
306 if self.bind_path is not None:
307 ovs.fatal_signal.unlink_file_now(self.bind_path)
308 self.bind_path = None
311 """Tries to accept a new connection on this passive stream. Returns
312 (error, stream): if successful, 'error' is 0 and 'stream' is the new
313 Stream object, and on failure 'error' is a positive errno value and
316 Will not block waiting for a connection. If no connection is ready to
317 be accepted, returns (errno.EAGAIN, None) immediately."""
321 sock, addr = self.socket.accept()
322 ovs.socket_util.set_nonblocking(sock)
323 return 0, Stream(sock, "unix:%s" % addr, 0)
324 except socket.error, e:
325 error = ovs.socket_util.get_exception_errno(e)
326 if error != errno.EAGAIN:
328 vlog.dbg("accept: %s" % os.strerror(error))
331 def wait(self, poller):
332 poller.fd_wait(self.socket, ovs.poller.POLLIN)
335 # Don't delete the file: we might have forked.
341 Active %s connection methods:
342 unix:FILE Unix domain socket named FILE
343 tcp:IP:PORT TCP socket to IP with port no of PORT
345 Passive %s connection methods:
346 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
349 class UnixStream(Stream):
351 def _open(suffix, dscp):
352 connect_path = suffix
353 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
354 True, None, connect_path)
355 Stream.register_method("unix", UnixStream)
358 class TCPStream(Stream):
360 def _open(suffix, dscp):
361 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
364 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
366 Stream.register_method("tcp", TCPStream)