1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 import ovs.socket_util
23 vlog = ovs.vlog.Vlog("stream")
26 def stream_or_pstream_needs_probes(name):
27 """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28 verify connectivity. For [p]streams which need probes, it can take a long
29 time to notice the connection was dropped. Returns 0 if probes aren't
30 needed, and -1 if 'name' is invalid"""
32 if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33 # Only unix and punix are supported currently.
40 """Bidirectional byte stream. Currently only Unix domain sockets
48 # Kinds of events that one might wait for.
49 W_CONNECT = 0 # Connect complete (success or failure).
50 W_RECV = 1 # Data received.
51 W_SEND = 2 # Send buffer room available.
56 def register_method(method, cls):
57 Stream._SOCKET_METHODS[method + ":"] = cls
60 def _find_method(name):
61 for method, cls in Stream._SOCKET_METHODS.items():
62 if name.startswith(method):
67 def is_valid_name(name):
68 """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69 TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71 return bool(Stream._find_method(name))
73 def __init__(self, socket, name, status):
76 if status == errno.EAGAIN:
77 self.state = Stream.__S_CONNECTING
79 self.state = Stream.__S_CONNECTED
81 self.state = Stream.__S_DISCONNECTED
85 # Default value of dscp bits for connection between controller and manager.
86 # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87 # in <netinet/ip.h> is used.
88 IPTOS_PREC_INTERNETCONTROL = 0xc0
89 DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
92 def open(name, dscp=DSCP_DEFAULT):
93 """Attempts to connect a stream to a remote peer. 'name' is a
94 connection name in the form "TYPE:ARGS", where TYPE is an active stream
95 class's name and ARGS are stream class-specific. Currently the only
96 supported TYPEs are "unix" and "tcp".
98 Returns (error, stream): on success 'error' is 0 and 'stream' is the
99 new Stream, on failure 'error' is a positive errno value and 'stream'
102 Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
103 and a new Stream. The connect() method can be used to check for
104 successful connection completion."""
105 cls = Stream._find_method(name)
107 return errno.EAFNOSUPPORT, None
109 suffix = name.split(":", 1)[1]
110 error, sock = cls._open(suffix, dscp)
114 status = ovs.socket_util.check_connection_completion(sock)
115 return 0, Stream(sock, name, status)
118 def _open(suffix, dscp):
119 raise NotImplementedError("This method must be overrided by subclass")
122 def open_block((error, stream)):
123 """Blocks until a Stream completes its connection attempt, either
124 succeeding or failing. (error, stream) should be the tuple returned by
125 Stream.open(). Returns a tuple of the same form.
128 error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
132 error = stream.connect()
133 if error != errno.EAGAIN:
136 poller = ovs.poller.Poller()
137 stream.run_wait(poller)
138 stream.connect_wait(poller)
140 assert error != errno.EINPROGRESS
150 def __scs_connecting(self):
151 retval = ovs.socket_util.check_connection_completion(self.socket)
152 assert retval != errno.EINPROGRESS
154 self.state = Stream.__S_CONNECTED
155 elif retval != errno.EAGAIN:
156 self.state = Stream.__S_DISCONNECTED
160 """Tries to complete the connection on this stream. If the connection
161 is complete, returns 0 if the connection was successful or a positive
162 errno value if it failed. If the connection is still in progress,
163 returns errno.EAGAIN."""
164 last_state = -1 # Always differs from initial self.state
165 while self.state != last_state:
166 last_state = self.state
167 if self.state == Stream.__S_CONNECTING:
168 self.__scs_connecting()
169 elif self.state == Stream.__S_CONNECTED:
171 elif self.state == Stream.__S_DISCONNECTED:
175 """Tries to receive up to 'n' bytes from this stream. Returns a
176 (error, string) tuple:
178 - If successful, 'error' is zero and 'string' contains between 1
179 and 'n' bytes of data.
181 - On error, 'error' is a positive errno value.
183 - If the connection has been closed in the normal fashion or if 'n'
184 is 0, the tuple is (0, "").
186 The recv function will not block waiting for data to arrive. If no
187 data have been received, it returns (errno.EAGAIN, "") immediately."""
189 retval = self.connect()
196 return (0, self.socket.recv(n))
197 except socket.error, e:
198 return (ovs.socket_util.get_exception_errno(e), "")
201 """Tries to send 'buf' on this stream.
203 If successful, returns the number of bytes sent, between 1 and
204 len(buf). 0 is only a valid return value if len(buf) is 0.
206 On error, returns a negative errno value.
208 Will not block. If no bytes can be immediately accepted for
209 transmission, returns -errno.EAGAIN immediately."""
211 retval = self.connect()
218 return self.socket.send(buf)
219 except socket.error, e:
220 return -ovs.socket_util.get_exception_errno(e)
225 def run_wait(self, poller):
228 def wait(self, poller, wait):
229 assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
231 if self.state == Stream.__S_DISCONNECTED:
232 poller.immediate_wake()
235 if self.state == Stream.__S_CONNECTING:
236 wait = Stream.W_CONNECT
237 if wait == Stream.W_RECV:
238 poller.fd_wait(self.socket, ovs.poller.POLLIN)
240 poller.fd_wait(self.socket, ovs.poller.POLLOUT)
242 def connect_wait(self, poller):
243 self.wait(poller, Stream.W_CONNECT)
245 def recv_wait(self, poller):
246 self.wait(poller, Stream.W_RECV)
248 def send_wait(self, poller):
249 self.wait(poller, Stream.W_SEND)
252 # Don't delete the file: we might have forked.
256 class PassiveStream(object):
258 def is_valid_name(name):
259 """Returns True if 'name' is a passive stream name in the form
260 "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
261 "punix:"), otherwise False."""
262 return name.startswith("punix:")
264 def __init__(self, sock, name, bind_path):
267 self.bind_path = bind_path
271 """Attempts to start listening for remote stream connections. 'name'
272 is a connection name in the form "TYPE:ARGS", where TYPE is an passive
273 stream class's name and ARGS are stream class-specific. Currently the
274 only supported TYPE is "punix".
276 Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
277 new PassiveStream, on failure 'error' is a positive errno value and
278 'pstream' is None."""
279 if not PassiveStream.is_valid_name(name):
280 return errno.EAFNOSUPPORT, None
283 error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
284 True, bind_path, None)
290 except socket.error, e:
291 vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
295 return 0, PassiveStream(sock, name, bind_path)
298 """Closes this PassiveStream."""
300 if self.bind_path is not None:
301 ovs.fatal_signal.unlink_file_now(self.bind_path)
302 self.bind_path = None
305 """Tries to accept a new connection on this passive stream. Returns
306 (error, stream): if successful, 'error' is 0 and 'stream' is the new
307 Stream object, and on failure 'error' is a positive errno value and
310 Will not block waiting for a connection. If no connection is ready to
311 be accepted, returns (errno.EAGAIN, None) immediately."""
315 sock, addr = self.socket.accept()
316 ovs.socket_util.set_nonblocking(sock)
317 return 0, Stream(sock, "unix:%s" % addr, 0)
318 except socket.error, e:
319 error = ovs.socket_util.get_exception_errno(e)
320 if error != errno.EAGAIN:
322 vlog.dbg("accept: %s" % os.strerror(error))
325 def wait(self, poller):
326 poller.fd_wait(self.socket, ovs.poller.POLLIN)
329 # Don't delete the file: we might have forked.
335 Active %s connection methods:
336 unix:FILE Unix domain socket named FILE
337 tcp:IP:PORT TCP socket to IP with port no of PORT
339 Passive %s connection methods:
340 punix:FILE Listen on Unix domain socket FILE""" % (name, name)
343 class UnixStream(Stream):
345 def _open(suffix, dscp):
346 connect_path = suffix
347 return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
348 True, None, connect_path)
349 Stream.register_method("unix", UnixStream)
352 class TCPStream(Stream):
354 def _open(suffix, dscp):
355 error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
358 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
360 Stream.register_method("tcp", TCPStream)