don't use select.POLL* constants
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18
19 import ovs.poller
20 import ovs.socket_util
21 import ovs.vlog
22
23 vlog = ovs.vlog.Vlog("stream")
24
25
26 def stream_or_pstream_needs_probes(name):
27     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28     verify connectivity.  For [p]streams which need probes, it can take a long
29     time to notice the connection was dropped.  Returns 0 if probes aren't
30     needed, and -1 if 'name' is invalid"""
31
32     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33         # Only unix and punix are supported currently.
34         return 0
35     else:
36         return -1
37
38
39 class Stream(object):
40     """Bidirectional byte stream.  Currently only Unix domain sockets
41     are implemented."""
42
43     # States.
44     __S_CONNECTING = 0
45     __S_CONNECTED = 1
46     __S_DISCONNECTED = 2
47
48     # Kinds of events that one might wait for.
49     W_CONNECT = 0               # Connect complete (success or failure).
50     W_RECV = 1                  # Data received.
51     W_SEND = 2                  # Send buffer room available.
52
53     _SOCKET_METHODS = {}
54
55     @staticmethod
56     def register_method(method, cls):
57         Stream._SOCKET_METHODS[method + ":"] = cls
58
59     @staticmethod
60     def _find_method(name):
61         for method, cls in Stream._SOCKET_METHODS.items():
62             if name.startswith(method):
63                 return cls
64         return None
65
66     @staticmethod
67     def is_valid_name(name):
68         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
70         otherwise False."""
71         return bool(Stream._find_method(name))
72
73     def __init__(self, socket, name, status):
74         self.socket = socket
75         self.name = name
76         if status == errno.EAGAIN:
77             self.state = Stream.__S_CONNECTING
78         elif status == 0:
79             self.state = Stream.__S_CONNECTED
80         else:
81             self.state = Stream.__S_DISCONNECTED
82
83         self.error = 0
84
85     # Default value of dscp bits for connection between controller and manager.
86     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87     # in <netinet/ip.h> is used.
88     IPTOS_PREC_INTERNETCONTROL = 0xc0
89     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
90
91     @staticmethod
92     def open(name, dscp=DSCP_DEFAULT):
93         """Attempts to connect a stream to a remote peer.  'name' is a
94         connection name in the form "TYPE:ARGS", where TYPE is an active stream
95         class's name and ARGS are stream class-specific.  Currently the only
96         supported TYPEs are "unix" and "tcp".
97
98         Returns (error, stream): on success 'error' is 0 and 'stream' is the
99         new Stream, on failure 'error' is a positive errno value and 'stream'
100         is None.
101
102         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
103         and a new Stream.  The connect() method can be used to check for
104         successful connection completion."""
105         cls = Stream._find_method(name)
106         if not cls:
107             return errno.EAFNOSUPPORT, None
108
109         suffix = name.split(":", 1)[1]
110         error, sock = cls._open(suffix, dscp)
111         if error:
112             return error, None
113         else:
114             status = ovs.socket_util.check_connection_completion(sock)
115             return 0, Stream(sock, name, status)
116
117     @staticmethod
118     def _open(suffix, dscp):
119         raise NotImplementedError("This method must be overrided by subclass")
120
121     @staticmethod
122     def open_block((error, stream)):
123         """Blocks until a Stream completes its connection attempt, either
124         succeeding or failing.  (error, stream) should be the tuple returned by
125         Stream.open().  Returns a tuple of the same form.
126
127         Typical usage:
128         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
129
130         if not error:
131             while True:
132                 error = stream.connect()
133                 if error != errno.EAGAIN:
134                     break
135                 stream.run()
136                 poller = ovs.poller.Poller()
137                 stream.run_wait(poller)
138                 stream.connect_wait(poller)
139                 poller.block()
140             assert error != errno.EINPROGRESS
141
142         if error and stream:
143             stream.close()
144             stream = None
145         return error, stream
146
147     def close(self):
148         self.socket.close()
149
150     def __scs_connecting(self):
151         retval = ovs.socket_util.check_connection_completion(self.socket)
152         assert retval != errno.EINPROGRESS
153         if retval == 0:
154             self.state = Stream.__S_CONNECTED
155         elif retval != errno.EAGAIN:
156             self.state = Stream.__S_DISCONNECTED
157             self.error = retval
158
159     def connect(self):
160         """Tries to complete the connection on this stream.  If the connection
161         is complete, returns 0 if the connection was successful or a positive
162         errno value if it failed.  If the connection is still in progress,
163         returns errno.EAGAIN."""
164         last_state = -1         # Always differs from initial self.state
165         while self.state != last_state:
166             last_state = self.state
167             if self.state == Stream.__S_CONNECTING:
168                 self.__scs_connecting()
169             elif self.state == Stream.__S_CONNECTED:
170                 return 0
171             elif self.state == Stream.__S_DISCONNECTED:
172                 return self.error
173
174     def recv(self, n):
175         """Tries to receive up to 'n' bytes from this stream.  Returns a
176         (error, string) tuple:
177
178             - If successful, 'error' is zero and 'string' contains between 1
179               and 'n' bytes of data.
180
181             - On error, 'error' is a positive errno value.
182
183             - If the connection has been closed in the normal fashion or if 'n'
184               is 0, the tuple is (0, "").
185
186         The recv function will not block waiting for data to arrive.  If no
187         data have been received, it returns (errno.EAGAIN, "") immediately."""
188
189         retval = self.connect()
190         if retval != 0:
191             return (retval, "")
192         elif n == 0:
193             return (0, "")
194
195         try:
196             return (0, self.socket.recv(n))
197         except socket.error, e:
198             return (ovs.socket_util.get_exception_errno(e), "")
199
200     def send(self, buf):
201         """Tries to send 'buf' on this stream.
202
203         If successful, returns the number of bytes sent, between 1 and
204         len(buf).  0 is only a valid return value if len(buf) is 0.
205
206         On error, returns a negative errno value.
207
208         Will not block.  If no bytes can be immediately accepted for
209         transmission, returns -errno.EAGAIN immediately."""
210
211         retval = self.connect()
212         if retval != 0:
213             return -retval
214         elif len(buf) == 0:
215             return 0
216
217         try:
218             return self.socket.send(buf)
219         except socket.error, e:
220             return -ovs.socket_util.get_exception_errno(e)
221
222     def run(self):
223         pass
224
225     def run_wait(self, poller):
226         pass
227
228     def wait(self, poller, wait):
229         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
230
231         if self.state == Stream.__S_DISCONNECTED:
232             poller.immediate_wake()
233             return
234
235         if self.state == Stream.__S_CONNECTING:
236             wait = Stream.W_CONNECT
237         if wait == Stream.W_RECV:
238             poller.fd_wait(self.socket, ovs.poller.POLLIN)
239         else:
240             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
241
242     def connect_wait(self, poller):
243         self.wait(poller, Stream.W_CONNECT)
244
245     def recv_wait(self, poller):
246         self.wait(poller, Stream.W_RECV)
247
248     def send_wait(self, poller):
249         self.wait(poller, Stream.W_SEND)
250
251     def __del__(self):
252         # Don't delete the file: we might have forked.
253         self.socket.close()
254
255
256 class PassiveStream(object):
257     @staticmethod
258     def is_valid_name(name):
259         """Returns True if 'name' is a passive stream name in the form
260         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
261         "punix:"), otherwise False."""
262         return name.startswith("punix:")
263
264     def __init__(self, sock, name, bind_path):
265         self.name = name
266         self.socket = sock
267         self.bind_path = bind_path
268
269     @staticmethod
270     def open(name):
271         """Attempts to start listening for remote stream connections.  'name'
272         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
273         stream class's name and ARGS are stream class-specific.  Currently the
274         only supported TYPE is "punix".
275
276         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
277         new PassiveStream, on failure 'error' is a positive errno value and
278         'pstream' is None."""
279         if not PassiveStream.is_valid_name(name):
280             return errno.EAFNOSUPPORT, None
281
282         bind_path = name[6:]
283         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
284                                                        True, bind_path, None)
285         if error:
286             return error, None
287
288         try:
289             sock.listen(10)
290         except socket.error, e:
291             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
292             sock.close()
293             return e.error, None
294
295         return 0, PassiveStream(sock, name, bind_path)
296
297     def close(self):
298         """Closes this PassiveStream."""
299         self.socket.close()
300         if self.bind_path is not None:
301             ovs.fatal_signal.unlink_file_now(self.bind_path)
302             self.bind_path = None
303
304     def accept(self):
305         """Tries to accept a new connection on this passive stream.  Returns
306         (error, stream): if successful, 'error' is 0 and 'stream' is the new
307         Stream object, and on failure 'error' is a positive errno value and
308         'stream' is None.
309
310         Will not block waiting for a connection.  If no connection is ready to
311         be accepted, returns (errno.EAGAIN, None) immediately."""
312
313         while True:
314             try:
315                 sock, addr = self.socket.accept()
316                 ovs.socket_util.set_nonblocking(sock)
317                 return 0, Stream(sock, "unix:%s" % addr, 0)
318             except socket.error, e:
319                 error = ovs.socket_util.get_exception_errno(e)
320                 if error != errno.EAGAIN:
321                     # XXX rate-limit
322                     vlog.dbg("accept: %s" % os.strerror(error))
323                 return error, None
324
325     def wait(self, poller):
326         poller.fd_wait(self.socket, ovs.poller.POLLIN)
327
328     def __del__(self):
329         # Don't delete the file: we might have forked.
330         self.socket.close()
331
332
333 def usage(name):
334     return """
335 Active %s connection methods:
336   unix:FILE               Unix domain socket named FILE
337   tcp:IP:PORT             TCP socket to IP with port no of PORT
338
339 Passive %s connection methods:
340   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
341
342
343 class UnixStream(Stream):
344     @staticmethod
345     def _open(suffix, dscp):
346         connect_path = suffix
347         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
348                                                  True, None, connect_path)
349 Stream.register_method("unix", UnixStream)
350
351
352 class TCPStream(Stream):
353     @staticmethod
354     def _open(suffix, dscp):
355         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
356                                                        suffix, 0, dscp)
357         if not error:
358             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
359         return error, sock
360 Stream.register_method("tcp", TCPStream)