Global replace of Nicira Networks.
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 def stream_or_pstream_needs_probes(name):
28     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29     verify connectivty.  For [p]streams which need probes, it can take a long
30     time to notice the connection was dropped.  Returns 0 if probes aren't
31     needed, and -1 if 'name' is invalid"""
32
33     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34         # Only unix and punix are supported currently.
35         return 0
36     else:
37         return -1
38
39
40 class Stream(object):
41     """Bidirectional byte stream.  Currently only Unix domain sockets
42     are implemented."""
43
44     # States.
45     __S_CONNECTING = 0
46     __S_CONNECTED = 1
47     __S_DISCONNECTED = 2
48
49     # Kinds of events that one might wait for.
50     W_CONNECT = 0               # Connect complete (success or failure).
51     W_RECV = 1                  # Data received.
52     W_SEND = 2                  # Send buffer room available.
53
54     @staticmethod
55     def is_valid_name(name):
56         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
57         TYPE is a supported stream type (currently only "unix:"), otherwise
58         False."""
59         return name.startswith("unix:")
60
61     def __init__(self, socket, name, status):
62         self.socket = socket
63         self.name = name
64         if status == errno.EAGAIN:
65             self.state = Stream.__S_CONNECTING
66         elif status == 0:
67             self.state = Stream.__S_CONNECTED
68         else:
69             self.state = Stream.__S_DISCONNECTED
70
71         self.error = 0
72
73     @staticmethod
74     def open(name):
75         """Attempts to connect a stream to a remote peer.  'name' is a
76         connection name in the form "TYPE:ARGS", where TYPE is an active stream
77         class's name and ARGS are stream class-specific.  Currently the only
78         supported TYPE is "unix".
79
80         Returns (error, stream): on success 'error' is 0 and 'stream' is the
81         new Stream, on failure 'error' is a positive errno value and 'stream'
82         is None.
83
84         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
85         and a new Stream.  The connect() method can be used to check for
86         successful connection completion."""
87         if not Stream.is_valid_name(name):
88             return errno.EAFNOSUPPORT, None
89
90         connect_path = name[5:]
91         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
92                                                        True, None,
93                                                        connect_path)
94         if error:
95             return error, None
96         else:
97             status = ovs.socket_util.check_connection_completion(sock)
98             return 0, Stream(sock, name, status)
99
100     @staticmethod
101     def open_block((error, stream)):
102         """Blocks until a Stream completes its connection attempt, either
103         succeeding or failing.  (error, stream) should be the tuple returned by
104         Stream.open().  Returns a tuple of the same form.
105
106         Typical usage:
107         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
108
109         if not error:
110             while True:
111                 error = stream.connect()
112                 if error != errno.EAGAIN:
113                     break
114                 stream.run()
115                 poller = ovs.poller.Poller()
116                 stream.run_wait()
117                 stream.connect_wait(poller)
118                 poller.block()
119             assert error != errno.EINPROGRESS
120
121         if error and stream:
122             stream.close()
123             stream = None
124         return error, stream
125
126     def close(self):
127         self.socket.close()
128
129     def __scs_connecting(self):
130         retval = ovs.socket_util.check_connection_completion(self.socket)
131         assert retval != errno.EINPROGRESS
132         if retval == 0:
133             self.state = Stream.__S_CONNECTED
134         elif retval != errno.EAGAIN:
135             self.state = Stream.__S_DISCONNECTED
136             self.error = retval
137
138     def connect(self):
139         """Tries to complete the connection on this stream.  If the connection
140         is complete, returns 0 if the connection was successful or a positive
141         errno value if it failed.  If the connection is still in progress,
142         returns errno.EAGAIN."""
143         last_state = -1         # Always differs from initial self.state
144         while self.state != last_state:
145             last_state = self.state
146             if self.state == Stream.__S_CONNECTING:
147                 self.__scs_connecting()
148             elif self.state == Stream.__S_CONNECTED:
149                 return 0
150             elif self.state == Stream.__S_DISCONNECTED:
151                 return self.error
152
153     def recv(self, n):
154         """Tries to receive up to 'n' bytes from this stream.  Returns a
155         (error, string) tuple:
156
157             - If successful, 'error' is zero and 'string' contains between 1
158               and 'n' bytes of data.
159
160             - On error, 'error' is a positive errno value.
161
162             - If the connection has been closed in the normal fashion or if 'n'
163               is 0, the tuple is (0, "").
164
165         The recv function will not block waiting for data to arrive.  If no
166         data have been received, it returns (errno.EAGAIN, "") immediately."""
167
168         retval = self.connect()
169         if retval != 0:
170             return (retval, "")
171         elif n == 0:
172             return (0, "")
173
174         try:
175             return (0, self.socket.recv(n))
176         except socket.error, e:
177             return (ovs.socket_util.get_exception_errno(e), "")
178
179     def send(self, buf):
180         """Tries to send 'buf' on this stream.
181
182         If successful, returns the number of bytes sent, between 1 and
183         len(buf).  0 is only a valid return value if len(buf) is 0.
184
185         On error, returns a negative errno value.
186
187         Will not block.  If no bytes can be immediately accepted for
188         transmission, returns -errno.EAGAIN immediately."""
189
190         retval = self.connect()
191         if retval != 0:
192             return -retval
193         elif len(buf) == 0:
194             return 0
195
196         try:
197             return self.socket.send(buf)
198         except socket.error, e:
199             return -ovs.socket_util.get_exception_errno(e)
200
201     def run(self):
202         pass
203
204     def run_wait(self, poller):
205         pass
206
207     def wait(self, poller, wait):
208         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
209
210         if self.state == Stream.__S_DISCONNECTED:
211             poller.immediate_wake()
212             return
213
214         if self.state == Stream.__S_CONNECTING:
215             wait = Stream.W_CONNECT
216         if wait == Stream.W_RECV:
217             poller.fd_wait(self.socket, select.POLLIN)
218         else:
219             poller.fd_wait(self.socket, select.POLLOUT)
220
221     def connect_wait(self, poller):
222         self.wait(poller, Stream.W_CONNECT)
223
224     def recv_wait(self, poller):
225         self.wait(poller, Stream.W_RECV)
226
227     def send_wait(self, poller):
228         self.wait(poller, Stream.W_SEND)
229
230     def __del__(self):
231         # Don't delete the file: we might have forked.
232         self.socket.close()
233
234
235 class PassiveStream(object):
236     @staticmethod
237     def is_valid_name(name):
238         """Returns True if 'name' is a passive stream name in the form
239         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
240         "punix:"), otherwise False."""
241         return name.startswith("punix:")
242
243     def __init__(self, sock, name, bind_path):
244         self.name = name
245         self.socket = sock
246         self.bind_path = bind_path
247
248     @staticmethod
249     def open(name):
250         """Attempts to start listening for remote stream connections.  'name'
251         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
252         stream class's name and ARGS are stream class-specific.  Currently the
253         only supported TYPE is "punix".
254
255         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
256         new PassiveStream, on failure 'error' is a positive errno value and
257         'pstream' is None."""
258         if not PassiveStream.is_valid_name(name):
259             return errno.EAFNOSUPPORT, None
260
261         bind_path = name[6:]
262         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
263                                                        True, bind_path, None)
264         if error:
265             return error, None
266
267         try:
268             sock.listen(10)
269         except socket.error, e:
270             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
271             sock.close()
272             return e.error, None
273
274         return 0, PassiveStream(sock, name, bind_path)
275
276     def close(self):
277         """Closes this PassiveStream."""
278         self.socket.close()
279         if self.bind_path is not None:
280             ovs.fatal_signal.unlink_file_now(self.bind_path)
281             self.bind_path = None
282
283     def accept(self):
284         """Tries to accept a new connection on this passive stream.  Returns
285         (error, stream): if successful, 'error' is 0 and 'stream' is the new
286         Stream object, and on failure 'error' is a positive errno value and
287         'stream' is None.
288
289         Will not block waiting for a connection.  If no connection is ready to
290         be accepted, returns (errno.EAGAIN, None) immediately."""
291
292         while True:
293             try:
294                 sock, addr = self.socket.accept()
295                 ovs.socket_util.set_nonblocking(sock)
296                 return 0, Stream(sock, "unix:%s" % addr, 0)
297             except socket.error, e:
298                 error = ovs.socket_util.get_exception_errno(e)
299                 if error != errno.EAGAIN:
300                     # XXX rate-limit
301                     vlog.dbg("accept: %s" % os.strerror(error))
302                 return error, None
303
304     def wait(self, poller):
305         poller.fd_wait(self.socket, select.POLLIN)
306
307     def __del__(self):
308         # Don't delete the file: we might have forked.
309         self.socket.close()
310
311
312 def usage(name):
313     return """
314 Active %s connection methods:
315   unix:FILE               Unix domain socket named FILE
316
317 Passive %s connection methods:
318   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)