ovs-thread: Quiesce in xpthread_barrier_wait().
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18
19 import ovs.poller
20 import ovs.socket_util
21 import ovs.vlog
22
23 vlog = ovs.vlog.Vlog("stream")
24
25
26 def stream_or_pstream_needs_probes(name):
27     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28     verify connectivity.  For [p]streams which need probes, it can take a long
29     time to notice the connection was dropped.  Returns 0 if probes aren't
30     needed, and -1 if 'name' is invalid"""
31
32     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33         # Only unix and punix are supported currently.
34         return 0
35     else:
36         return -1
37
38
39 class Stream(object):
40     """Bidirectional byte stream.  Currently only Unix domain sockets
41     are implemented."""
42
43     # States.
44     __S_CONNECTING = 0
45     __S_CONNECTED = 1
46     __S_DISCONNECTED = 2
47
48     # Kinds of events that one might wait for.
49     W_CONNECT = 0               # Connect complete (success or failure).
50     W_RECV = 1                  # Data received.
51     W_SEND = 2                  # Send buffer room available.
52
53     _SOCKET_METHODS = {}
54
55     @staticmethod
56     def register_method(method, cls):
57         Stream._SOCKET_METHODS[method + ":"] = cls
58
59     @staticmethod
60     def _find_method(name):
61         for method, cls in Stream._SOCKET_METHODS.items():
62             if name.startswith(method):
63                 return cls
64         return None
65
66     @staticmethod
67     def is_valid_name(name):
68         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
70         otherwise False."""
71         return bool(Stream._find_method(name))
72
73     def __init__(self, socket, name, status):
74         self.socket = socket
75         self.name = name
76         if status == errno.EAGAIN:
77             self.state = Stream.__S_CONNECTING
78         elif status == 0:
79             self.state = Stream.__S_CONNECTED
80         else:
81             self.state = Stream.__S_DISCONNECTED
82
83         self.error = 0
84
85     # Default value of dscp bits for connection between controller and manager.
86     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87     # in <netinet/ip.h> is used.
88     IPTOS_PREC_INTERNETCONTROL = 0xc0
89     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
90
91     @staticmethod
92     def open(name, dscp=DSCP_DEFAULT):
93         """Attempts to connect a stream to a remote peer.  'name' is a
94         connection name in the form "TYPE:ARGS", where TYPE is an active stream
95         class's name and ARGS are stream class-specific.  Currently the only
96         supported TYPEs are "unix" and "tcp".
97
98         Returns (error, stream): on success 'error' is 0 and 'stream' is the
99         new Stream, on failure 'error' is a positive errno value and 'stream'
100         is None.
101
102         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
103         and a new Stream.  The connect() method can be used to check for
104         successful connection completion."""
105         cls = Stream._find_method(name)
106         if not cls:
107             return errno.EAFNOSUPPORT, None
108
109         suffix = name.split(":", 1)[1]
110         if name.startswith("unix:"):
111             suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
112         error, sock = cls._open(suffix, dscp)
113         if error:
114             return error, None
115         else:
116             status = ovs.socket_util.check_connection_completion(sock)
117             return 0, Stream(sock, name, status)
118
119     @staticmethod
120     def _open(suffix, dscp):
121         raise NotImplementedError("This method must be overrided by subclass")
122
123     @staticmethod
124     def open_block((error, stream)):
125         """Blocks until a Stream completes its connection attempt, either
126         succeeding or failing.  (error, stream) should be the tuple returned by
127         Stream.open().  Returns a tuple of the same form.
128
129         Typical usage:
130         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
131
132         if not error:
133             while True:
134                 error = stream.connect()
135                 if error != errno.EAGAIN:
136                     break
137                 stream.run()
138                 poller = ovs.poller.Poller()
139                 stream.run_wait(poller)
140                 stream.connect_wait(poller)
141                 poller.block()
142             assert error != errno.EINPROGRESS
143
144         if error and stream:
145             stream.close()
146             stream = None
147         return error, stream
148
149     def close(self):
150         self.socket.close()
151
152     def __scs_connecting(self):
153         retval = ovs.socket_util.check_connection_completion(self.socket)
154         assert retval != errno.EINPROGRESS
155         if retval == 0:
156             self.state = Stream.__S_CONNECTED
157         elif retval != errno.EAGAIN:
158             self.state = Stream.__S_DISCONNECTED
159             self.error = retval
160
161     def connect(self):
162         """Tries to complete the connection on this stream.  If the connection
163         is complete, returns 0 if the connection was successful or a positive
164         errno value if it failed.  If the connection is still in progress,
165         returns errno.EAGAIN."""
166
167         if self.state == Stream.__S_CONNECTING:
168             self.__scs_connecting()
169
170         if self.state == Stream.__S_CONNECTING:
171             return errno.EAGAIN
172         elif self.state == Stream.__S_CONNECTED:
173             return 0
174         else:
175             assert self.state == Stream.__S_DISCONNECTED
176             return self.error
177
178     def recv(self, n):
179         """Tries to receive up to 'n' bytes from this stream.  Returns a
180         (error, string) tuple:
181
182             - If successful, 'error' is zero and 'string' contains between 1
183               and 'n' bytes of data.
184
185             - On error, 'error' is a positive errno value.
186
187             - If the connection has been closed in the normal fashion or if 'n'
188               is 0, the tuple is (0, "").
189
190         The recv function will not block waiting for data to arrive.  If no
191         data have been received, it returns (errno.EAGAIN, "") immediately."""
192
193         retval = self.connect()
194         if retval != 0:
195             return (retval, "")
196         elif n == 0:
197             return (0, "")
198
199         try:
200             return (0, self.socket.recv(n))
201         except socket.error, e:
202             return (ovs.socket_util.get_exception_errno(e), "")
203
204     def send(self, buf):
205         """Tries to send 'buf' on this stream.
206
207         If successful, returns the number of bytes sent, between 1 and
208         len(buf).  0 is only a valid return value if len(buf) is 0.
209
210         On error, returns a negative errno value.
211
212         Will not block.  If no bytes can be immediately accepted for
213         transmission, returns -errno.EAGAIN immediately."""
214
215         retval = self.connect()
216         if retval != 0:
217             return -retval
218         elif len(buf) == 0:
219             return 0
220
221         try:
222             return self.socket.send(buf)
223         except socket.error, e:
224             return -ovs.socket_util.get_exception_errno(e)
225
226     def run(self):
227         pass
228
229     def run_wait(self, poller):
230         pass
231
232     def wait(self, poller, wait):
233         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
234
235         if self.state == Stream.__S_DISCONNECTED:
236             poller.immediate_wake()
237             return
238
239         if self.state == Stream.__S_CONNECTING:
240             wait = Stream.W_CONNECT
241         if wait == Stream.W_RECV:
242             poller.fd_wait(self.socket, ovs.poller.POLLIN)
243         else:
244             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
245
246     def connect_wait(self, poller):
247         self.wait(poller, Stream.W_CONNECT)
248
249     def recv_wait(self, poller):
250         self.wait(poller, Stream.W_RECV)
251
252     def send_wait(self, poller):
253         self.wait(poller, Stream.W_SEND)
254
255     def __del__(self):
256         # Don't delete the file: we might have forked.
257         self.socket.close()
258
259
260 class PassiveStream(object):
261     @staticmethod
262     def is_valid_name(name):
263         """Returns True if 'name' is a passive stream name in the form
264         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
265         "punix:"), otherwise False."""
266         return name.startswith("punix:")
267
268     def __init__(self, sock, name, bind_path):
269         self.name = name
270         self.socket = sock
271         self.bind_path = bind_path
272
273     @staticmethod
274     def open(name):
275         """Attempts to start listening for remote stream connections.  'name'
276         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
277         stream class's name and ARGS are stream class-specific.  Currently the
278         only supported TYPE is "punix".
279
280         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
281         new PassiveStream, on failure 'error' is a positive errno value and
282         'pstream' is None."""
283         if not PassiveStream.is_valid_name(name):
284             return errno.EAFNOSUPPORT, None
285
286         bind_path = name[6:]
287         if name.startswith("punix:"):
288             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
289         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
290                                                        True, bind_path, None)
291         if error:
292             return error, None
293
294         try:
295             sock.listen(10)
296         except socket.error, e:
297             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
298             sock.close()
299             return e.error, None
300
301         return 0, PassiveStream(sock, name, bind_path)
302
303     def close(self):
304         """Closes this PassiveStream."""
305         self.socket.close()
306         if self.bind_path is not None:
307             ovs.fatal_signal.unlink_file_now(self.bind_path)
308             self.bind_path = None
309
310     def accept(self):
311         """Tries to accept a new connection on this passive stream.  Returns
312         (error, stream): if successful, 'error' is 0 and 'stream' is the new
313         Stream object, and on failure 'error' is a positive errno value and
314         'stream' is None.
315
316         Will not block waiting for a connection.  If no connection is ready to
317         be accepted, returns (errno.EAGAIN, None) immediately."""
318
319         while True:
320             try:
321                 sock, addr = self.socket.accept()
322                 ovs.socket_util.set_nonblocking(sock)
323                 return 0, Stream(sock, "unix:%s" % addr, 0)
324             except socket.error, e:
325                 error = ovs.socket_util.get_exception_errno(e)
326                 if error != errno.EAGAIN:
327                     # XXX rate-limit
328                     vlog.dbg("accept: %s" % os.strerror(error))
329                 return error, None
330
331     def wait(self, poller):
332         poller.fd_wait(self.socket, ovs.poller.POLLIN)
333
334     def __del__(self):
335         # Don't delete the file: we might have forked.
336         self.socket.close()
337
338
339 def usage(name):
340     return """
341 Active %s connection methods:
342   unix:FILE               Unix domain socket named FILE
343   tcp:IP:PORT             TCP socket to IP with port no of PORT
344
345 Passive %s connection methods:
346   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
347
348
349 class UnixStream(Stream):
350     @staticmethod
351     def _open(suffix, dscp):
352         connect_path = suffix
353         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
354                                                  True, None, connect_path)
355 Stream.register_method("unix", UnixStream)
356
357
358 class TCPStream(Stream):
359     @staticmethod
360     def _open(suffix, dscp):
361         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
362                                                        suffix, 0, dscp)
363         if not error:
364             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
365         return error, sock
366 Stream.register_method("tcp", TCPStream)