6bd0ccbe8b22c931e07e22644de410631801f2d1
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 def stream_or_pstream_needs_probes(name):
28     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29     verify connectivity.  For [p]streams which need probes, it can take a long
30     time to notice the connection was dropped.  Returns 0 if probes aren't
31     needed, and -1 if 'name' is invalid"""
32
33     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34         # Only unix and punix are supported currently.
35         return 0
36     else:
37         return -1
38
39
40 class Stream(object):
41     """Bidirectional byte stream.  Currently only Unix domain sockets
42     are implemented."""
43
44     # States.
45     __S_CONNECTING = 0
46     __S_CONNECTED = 1
47     __S_DISCONNECTED = 2
48
49     # Kinds of events that one might wait for.
50     W_CONNECT = 0               # Connect complete (success or failure).
51     W_RECV = 1                  # Data received.
52     W_SEND = 2                  # Send buffer room available.
53
54     _SOCKET_METHODS = {}
55
56     @staticmethod
57     def register_method(method):
58         def _register_method(cls):
59             Stream._SOCKET_METHODS[method + ":"] = cls
60             return cls
61         return _register_method
62
63     @staticmethod
64     def _find_method(name):
65         for method, cls in Stream._SOCKET_METHODS.items():
66             if name.startswith(method):
67                 return cls
68         return None
69
70     @staticmethod
71     def is_valid_name(name):
72         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
73         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
74         otherwise False."""
75         return bool(Stream._find_method(name))
76
77     def __init__(self, socket, name, status):
78         self.socket = socket
79         self.name = name
80         if status == errno.EAGAIN:
81             self.state = Stream.__S_CONNECTING
82         elif status == 0:
83             self.state = Stream.__S_CONNECTED
84         else:
85             self.state = Stream.__S_DISCONNECTED
86
87         self.error = 0
88
89     # Default value of dscp bits for connection between controller and manager.
90     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
91     # in <netinet/ip.h> is used.
92     IPTOS_PREC_INTERNETCONTROL = 0xc0
93     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
94
95     @staticmethod
96     def open(name, dscp=DSCP_DEFAULT):
97         """Attempts to connect a stream to a remote peer.  'name' is a
98         connection name in the form "TYPE:ARGS", where TYPE is an active stream
99         class's name and ARGS are stream class-specific.  Currently the only
100         supported TYPEs are "unix" and "tcp".
101
102         Returns (error, stream): on success 'error' is 0 and 'stream' is the
103         new Stream, on failure 'error' is a positive errno value and 'stream'
104         is None.
105
106         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
107         and a new Stream.  The connect() method can be used to check for
108         successful connection completion."""
109         cls = Stream._find_method(name)
110         if not cls:
111             return errno.EAFNOSUPPORT, None
112
113         suffix = name.split(":", 1)[1]
114         error, sock = cls._open(suffix, dscp)
115         if error:
116             return error, None
117         else:
118             status = ovs.socket_util.check_connection_completion(sock)
119             return 0, Stream(sock, name, status)
120
121     @staticmethod
122     def _open(suffix, dscp):
123         raise NotImplementedError("This method must be overrided by subclass")
124
125     @staticmethod
126     def open_block((error, stream)):
127         """Blocks until a Stream completes its connection attempt, either
128         succeeding or failing.  (error, stream) should be the tuple returned by
129         Stream.open().  Returns a tuple of the same form.
130
131         Typical usage:
132         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
133
134         if not error:
135             while True:
136                 error = stream.connect()
137                 if error != errno.EAGAIN:
138                     break
139                 stream.run()
140                 poller = ovs.poller.Poller()
141                 stream.run_wait(poller)
142                 stream.connect_wait(poller)
143                 poller.block()
144             assert error != errno.EINPROGRESS
145
146         if error and stream:
147             stream.close()
148             stream = None
149         return error, stream
150
151     def close(self):
152         self.socket.close()
153
154     def __scs_connecting(self):
155         retval = ovs.socket_util.check_connection_completion(self.socket)
156         assert retval != errno.EINPROGRESS
157         if retval == 0:
158             self.state = Stream.__S_CONNECTED
159         elif retval != errno.EAGAIN:
160             self.state = Stream.__S_DISCONNECTED
161             self.error = retval
162
163     def connect(self):
164         """Tries to complete the connection on this stream.  If the connection
165         is complete, returns 0 if the connection was successful or a positive
166         errno value if it failed.  If the connection is still in progress,
167         returns errno.EAGAIN."""
168         last_state = -1         # Always differs from initial self.state
169         while self.state != last_state:
170             last_state = self.state
171             if self.state == Stream.__S_CONNECTING:
172                 self.__scs_connecting()
173             elif self.state == Stream.__S_CONNECTED:
174                 return 0
175             elif self.state == Stream.__S_DISCONNECTED:
176                 return self.error
177
178     def recv(self, n):
179         """Tries to receive up to 'n' bytes from this stream.  Returns a
180         (error, string) tuple:
181
182             - If successful, 'error' is zero and 'string' contains between 1
183               and 'n' bytes of data.
184
185             - On error, 'error' is a positive errno value.
186
187             - If the connection has been closed in the normal fashion or if 'n'
188               is 0, the tuple is (0, "").
189
190         The recv function will not block waiting for data to arrive.  If no
191         data have been received, it returns (errno.EAGAIN, "") immediately."""
192
193         retval = self.connect()
194         if retval != 0:
195             return (retval, "")
196         elif n == 0:
197             return (0, "")
198
199         try:
200             return (0, self.socket.recv(n))
201         except socket.error, e:
202             return (ovs.socket_util.get_exception_errno(e), "")
203
204     def send(self, buf):
205         """Tries to send 'buf' on this stream.
206
207         If successful, returns the number of bytes sent, between 1 and
208         len(buf).  0 is only a valid return value if len(buf) is 0.
209
210         On error, returns a negative errno value.
211
212         Will not block.  If no bytes can be immediately accepted for
213         transmission, returns -errno.EAGAIN immediately."""
214
215         retval = self.connect()
216         if retval != 0:
217             return -retval
218         elif len(buf) == 0:
219             return 0
220
221         try:
222             return self.socket.send(buf)
223         except socket.error, e:
224             return -ovs.socket_util.get_exception_errno(e)
225
226     def run(self):
227         pass
228
229     def run_wait(self, poller):
230         pass
231
232     def wait(self, poller, wait):
233         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
234
235         if self.state == Stream.__S_DISCONNECTED:
236             poller.immediate_wake()
237             return
238
239         if self.state == Stream.__S_CONNECTING:
240             wait = Stream.W_CONNECT
241         if wait == Stream.W_RECV:
242             poller.fd_wait(self.socket, select.POLLIN)
243         else:
244             poller.fd_wait(self.socket, select.POLLOUT)
245
246     def connect_wait(self, poller):
247         self.wait(poller, Stream.W_CONNECT)
248
249     def recv_wait(self, poller):
250         self.wait(poller, Stream.W_RECV)
251
252     def send_wait(self, poller):
253         self.wait(poller, Stream.W_SEND)
254
255     def __del__(self):
256         # Don't delete the file: we might have forked.
257         self.socket.close()
258
259
260 class PassiveStream(object):
261     @staticmethod
262     def is_valid_name(name):
263         """Returns True if 'name' is a passive stream name in the form
264         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
265         "punix:"), otherwise False."""
266         return name.startswith("punix:")
267
268     def __init__(self, sock, name, bind_path):
269         self.name = name
270         self.socket = sock
271         self.bind_path = bind_path
272
273     @staticmethod
274     def open(name):
275         """Attempts to start listening for remote stream connections.  'name'
276         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
277         stream class's name and ARGS are stream class-specific.  Currently the
278         only supported TYPE is "punix".
279
280         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
281         new PassiveStream, on failure 'error' is a positive errno value and
282         'pstream' is None."""
283         if not PassiveStream.is_valid_name(name):
284             return errno.EAFNOSUPPORT, None
285
286         bind_path = name[6:]
287         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
288                                                        True, bind_path, None)
289         if error:
290             return error, None
291
292         try:
293             sock.listen(10)
294         except socket.error, e:
295             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
296             sock.close()
297             return e.error, None
298
299         return 0, PassiveStream(sock, name, bind_path)
300
301     def close(self):
302         """Closes this PassiveStream."""
303         self.socket.close()
304         if self.bind_path is not None:
305             ovs.fatal_signal.unlink_file_now(self.bind_path)
306             self.bind_path = None
307
308     def accept(self):
309         """Tries to accept a new connection on this passive stream.  Returns
310         (error, stream): if successful, 'error' is 0 and 'stream' is the new
311         Stream object, and on failure 'error' is a positive errno value and
312         'stream' is None.
313
314         Will not block waiting for a connection.  If no connection is ready to
315         be accepted, returns (errno.EAGAIN, None) immediately."""
316
317         while True:
318             try:
319                 sock, addr = self.socket.accept()
320                 ovs.socket_util.set_nonblocking(sock)
321                 return 0, Stream(sock, "unix:%s" % addr, 0)
322             except socket.error, e:
323                 error = ovs.socket_util.get_exception_errno(e)
324                 if error != errno.EAGAIN:
325                     # XXX rate-limit
326                     vlog.dbg("accept: %s" % os.strerror(error))
327                 return error, None
328
329     def wait(self, poller):
330         poller.fd_wait(self.socket, select.POLLIN)
331
332     def __del__(self):
333         # Don't delete the file: we might have forked.
334         self.socket.close()
335
336
337 def usage(name):
338     return """
339 Active %s connection methods:
340   unix:FILE               Unix domain socket named FILE
341   tcp:IP:PORT             TCP socket to IP with port no of PORT
342
343 Passive %s connection methods:
344   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
345
346
347 class UnixStream(Stream):
348     @staticmethod
349     def _open(suffix, dscp):
350         connect_path = suffix
351         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
352                                                  True, None, connect_path)
353 UnixStream = Stream.register_method("unix")(UnixStream)
354
355
356 class TCPStream(Stream):
357     @staticmethod
358     def _open(suffix, dscp):
359         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
360                                                        suffix, 0, dscp)
361         if not error:
362             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
363         return error, sock
364 TCPStream = Stream.register_method("tcp")(TCPStream)