ovs.stream: Fix logic bug in Stream.connect().
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18 import select
19 import socket
20 import sys
21
22 import ovs.poller
23 import ovs.socket_util
24
25 class Stream(object):
26     """Bidirectional byte stream.  Currently only Unix domain sockets
27     are implemented."""
28     n_unix_sockets = 0
29
30     # States.
31     __S_CONNECTING = 0
32     __S_CONNECTED = 1
33     __S_DISCONNECTED = 2
34
35     # Kinds of events that one might wait for.
36     W_CONNECT = 0               # Connect complete (success or failure).
37     W_RECV = 1                  # Data received.
38     W_SEND = 2                  # Send buffer room available.
39
40     @staticmethod
41     def is_valid_name(name):
42         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
43         TYPE is a supported stream type (currently only "unix:"), otherwise
44         False."""
45         return name.startswith("unix:")
46
47     def __init__(self, socket, name, bind_path, status):
48         self.socket = socket
49         self.name = name
50         self.bind_path = bind_path
51         if status == errno.EAGAIN:
52             self.state = Stream.__S_CONNECTING
53         elif status == 0:
54             self.state = Stream.__S_CONNECTED
55         else:
56             self.state = Stream.__S_DISCONNECTED
57
58         self.error = 0
59
60     @staticmethod
61     def open(name):
62         """Attempts to connect a stream to a remote peer.  'name' is a
63         connection name in the form "TYPE:ARGS", where TYPE is an active stream
64         class's name and ARGS are stream class-specific.  Currently the only
65         supported TYPE is "unix".
66
67         Returns (error, stream): on success 'error' is 0 and 'stream' is the
68         new Stream, on failure 'error' is a positive errno value and 'stream'
69         is None.
70
71         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
72         and a new Stream.  The connect() method can be used to check for
73         successful connection completion."""
74         if not Stream.is_valid_name(name):
75             return errno.EAFNOSUPPORT, None
76
77         Stream.n_unix_sockets += 1
78         bind_path = "/tmp/stream-unix.%ld.%d" % (os.getpid(),
79                                                  Stream.n_unix_sockets)
80         connect_path = name[5:]
81         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
82                                                        True, bind_path,
83                                                        connect_path)
84         if error:
85             return error, None
86         else:
87             status = ovs.socket_util.check_connection_completion(sock)
88             return 0, Stream(sock, name, bind_path, status)
89
90     @staticmethod
91     def open_block(tuple):
92         """Blocks until a Stream completes its connection attempt, either
93         succeeding or failing.  'tuple' should be the tuple returned by
94         Stream.open().  Returns a tuple of the same form.
95
96         Typical usage:
97         error, stream = Stream.open_block(Stream.open("tcp:1.2.3.4:5"))"""
98
99         error, stream = tuple
100         if not error:
101             while True:
102                 error = stream.connect()
103                 if error != errno.EAGAIN:
104                     break
105                 stream.run()
106                 poller = ovs.poller.Poller()
107                 stream.run_wait()
108                 stream.connect_wait(poller)
109                 poller.block()
110             assert error != errno.EINPROGRESS
111         
112         if error and stream:
113             stream.close()
114             stream = None
115         return error, stream
116
117     def close(self):
118         self.socket.close()
119         if self.bind_path is not None:
120             ovs.fatal_signal.unlink_file_now(self.bind_path)
121             self.bind_path = None
122
123     def __scs_connecting(self):
124         retval = ovs.socket_util.check_connection_completion(self.socket)
125         assert retval != errno.EINPROGRESS
126         if retval == 0:
127             self.state = Stream.__S_CONNECTED
128         elif retval != errno.EAGAIN:
129             self.state = Stream.__S_DISCONNECTED
130             self.error = retval
131
132     def connect(self):
133         """Tries to complete the connection on this stream.  If the connection
134         is complete, returns 0 if the connection was successful or a positive
135         errno value if it failed.  If the connection is still in progress,
136         returns errno.EAGAIN."""
137         last_state = -1         # Always differs from initial self.state
138         while self.state != last_state:
139             last_state = self.state
140             if self.state == Stream.__S_CONNECTING:
141                 self.__scs_connecting()
142             elif self.state == Stream.__S_CONNECTED:
143                 return 0
144             elif self.state == Stream.__S_DISCONNECTED:
145                 return self.error
146
147     def recv(self, n):
148         """Tries to receive up to 'n' bytes from this stream.  Returns a
149         (error, string) tuple:
150         
151             - If successful, 'error' is zero and 'string' contains between 1
152               and 'n' bytes of data.
153
154             - On error, 'error' is a positive errno value.
155
156             - If the connection has been closed in the normal fashion or if 'n'
157               is 0, the tuple is (0, "").
158         
159         The recv function will not block waiting for data to arrive.  If no
160         data have been received, it returns (errno.EAGAIN, "") immediately."""
161
162         retval = self.connect()
163         if retval != 0:
164             return (retval, "")
165         elif n == 0:
166             return (0, "")
167
168         try:
169             return (0, self.socket.recv(n))
170         except socket.error, e:
171             return (ovs.socket_util.get_exception_errno(e), "")
172
173     def send(self, buf):
174         """Tries to send 'buf' on this stream.
175
176         If successful, returns the number of bytes sent, between 1 and
177         len(buf).  0 is only a valid return value if len(buf) is 0.
178
179         On error, returns a negative errno value.
180
181         Will not block.  If no bytes can be immediately accepted for
182         transmission, returns -errno.EAGAIN immediately."""
183
184         retval = self.connect()
185         if retval != 0:
186             return -retval
187         elif len(buf) == 0:
188             return 0
189
190         try:
191             return self.socket.send(buf)
192         except socket.error, e:
193             return -ovs.socket_util.get_exception_errno(e)
194
195     def run(self):
196         pass
197
198     def run_wait(self, poller):
199         pass
200
201     def wait(self, poller, wait):
202         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
203
204         if self.state == Stream.__S_DISCONNECTED:
205             poller.immediate_wake()
206             return
207
208         if self.state == Stream.__S_CONNECTING:
209             wait = Stream.W_CONNECT
210         if wait in (Stream.W_CONNECT, Stream.W_SEND):
211             poller.fd_wait(self.socket, select.POLLOUT)
212         else:
213             poller.fd_wait(self.socket, select.POLLIN)
214
215     def connect_wait(self, poller):
216         self.wait(poller, Stream.W_CONNECT)
217         
218     def recv_wait(self, poller):
219         self.wait(poller, Stream.W_RECV)
220         
221     def send_wait(self, poller):
222         self.wait(poller, Stream.W_SEND)
223         
224     def get_name(self):
225         return self.name
226         
227     def __del__(self):
228         # Don't delete the file: we might have forked.
229         self.socket.close()
230
231 class PassiveStream(object):
232     @staticmethod
233     def is_valid_name(name):
234         """Returns True if 'name' is a passive stream name in the form
235         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
236         "punix:"), otherwise False."""
237         return name.startswith("punix:")
238
239     def __init__(self, sock, name, bind_path):
240         self.name = name
241         self.socket = sock
242         self.bind_path = bind_path
243
244     @staticmethod
245     def open(name):
246         """Attempts to start listening for remote stream connections.  'name'
247         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
248         stream class's name and ARGS are stream class-specific.  Currently the
249         only supported TYPE is "punix".
250
251         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
252         new PassiveStream, on failure 'error' is a positive errno value and
253         'pstream' is None."""
254         if not PassiveStream.is_valid_name(name):
255             return errno.EAFNOSUPPORT, None
256
257         bind_path = name[6:]
258         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
259                                                        True, bind_path, None)
260         if error:
261             return error, None
262
263         try:
264             sock.listen(10)
265         except socket.error, e:
266             logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
267             sock.close()
268             return e.error, None
269
270         return 0, PassiveStream(sock, name, bind_path)
271
272     def close(self):
273         """Closes this PassiveStream."""
274         self.socket.close()
275         if self.bind_path is not None:
276             ovs.fatal_signal.unlink_file_now(self.bind_path)
277             self.bind_path = None
278
279     def accept(self):
280         """Tries to accept a new connection on this passive stream.  Returns
281         (error, stream): if successful, 'error' is 0 and 'stream' is the new
282         Stream object, and on failure 'error' is a positive errno value and
283         'stream' is None.
284
285         Will not block waiting for a connection.  If no connection is ready to
286         be accepted, returns (errno.EAGAIN, None) immediately."""
287
288         while True:
289             try:
290                 sock, addr = self.socket.accept()
291                 ovs.socket_util.set_nonblocking(sock)
292                 return 0, Stream(sock, "unix:%s" % addr, None, 0)
293             except socket.error, e:
294                 error = ovs.socket_util.get_exception_errno(e)
295                 if error != errno.EAGAIN:
296                     # XXX rate-limit
297                     logging.debug("accept: %s" % os.strerror(error))
298                 return error, None
299
300     def wait(self, poller):
301         poller.fd_wait(self.socket, select.POLLIN)
302
303     def __del__(self):
304         # Don't delete the file: we might have forked.
305         self.socket.close()
306
307 def usage(name, active, passive, bootstrap):
308     print
309     if active:
310         print("Active %s connection methods:" % name)
311         print("  unix:FILE               "
312                "Unix domain socket named FILE");
313
314     if passive:
315         print("Passive %s connection methods:" % name)
316         print("  punix:FILE              "
317               "listen on Unix domain socket FILE")