ovs.daemon: Fix bug introduced by "pychecker" warning fixes.
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 class Stream(object):
28     """Bidirectional byte stream.  Currently only Unix domain sockets
29     are implemented."""
30     n_unix_sockets = 0
31
32     # States.
33     __S_CONNECTING = 0
34     __S_CONNECTED = 1
35     __S_DISCONNECTED = 2
36
37     # Kinds of events that one might wait for.
38     W_CONNECT = 0               # Connect complete (success or failure).
39     W_RECV = 1                  # Data received.
40     W_SEND = 2                  # Send buffer room available.
41
42     @staticmethod
43     def is_valid_name(name):
44         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
45         TYPE is a supported stream type (currently only "unix:"), otherwise
46         False."""
47         return name.startswith("unix:")
48
49     def __init__(self, socket, name, bind_path, status):
50         self.socket = socket
51         self.name = name
52         self.bind_path = bind_path
53         if status == errno.EAGAIN:
54             self.state = Stream.__S_CONNECTING
55         elif status == 0:
56             self.state = Stream.__S_CONNECTED
57         else:
58             self.state = Stream.__S_DISCONNECTED
59
60         self.error = 0
61
62     @staticmethod
63     def open(name):
64         """Attempts to connect a stream to a remote peer.  'name' is a
65         connection name in the form "TYPE:ARGS", where TYPE is an active stream
66         class's name and ARGS are stream class-specific.  Currently the only
67         supported TYPE is "unix".
68
69         Returns (error, stream): on success 'error' is 0 and 'stream' is the
70         new Stream, on failure 'error' is a positive errno value and 'stream'
71         is None.
72
73         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
74         and a new Stream.  The connect() method can be used to check for
75         successful connection completion."""
76         if not Stream.is_valid_name(name):
77             return errno.EAFNOSUPPORT, None
78
79         Stream.n_unix_sockets += 1
80         bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
81                                                 Stream.n_unix_sockets)
82         connect_path = name[5:]
83         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
84                                                        True, bind_path,
85                                                        connect_path)
86         if error:
87             return error, None
88         else:
89             status = ovs.socket_util.check_connection_completion(sock)
90             return 0, Stream(sock, name, bind_path, status)
91
92     @staticmethod
93     def open_block((error, stream)):
94         """Blocks until a Stream completes its connection attempt, either
95         succeeding or failing.  (error, stream) should be the tuple returned by
96         Stream.open().  Returns a tuple of the same form.
97
98         Typical usage:
99         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
100
101         if not error:
102             while True:
103                 error = stream.connect()
104                 if error != errno.EAGAIN:
105                     break
106                 stream.run()
107                 poller = ovs.poller.Poller()
108                 stream.run_wait()
109                 stream.connect_wait(poller)
110                 poller.block()
111             assert error != errno.EINPROGRESS
112
113         if error and stream:
114             stream.close()
115             stream = None
116         return error, stream
117
118     def close(self):
119         self.socket.close()
120         if self.bind_path is not None:
121             ovs.fatal_signal.unlink_file_now(self.bind_path)
122             self.bind_path = None
123
124     def __scs_connecting(self):
125         retval = ovs.socket_util.check_connection_completion(self.socket)
126         assert retval != errno.EINPROGRESS
127         if retval == 0:
128             self.state = Stream.__S_CONNECTED
129         elif retval != errno.EAGAIN:
130             self.state = Stream.__S_DISCONNECTED
131             self.error = retval
132
133     def connect(self):
134         """Tries to complete the connection on this stream.  If the connection
135         is complete, returns 0 if the connection was successful or a positive
136         errno value if it failed.  If the connection is still in progress,
137         returns errno.EAGAIN."""
138         last_state = -1         # Always differs from initial self.state
139         while self.state != last_state:
140             last_state = self.state
141             if self.state == Stream.__S_CONNECTING:
142                 self.__scs_connecting()
143             elif self.state == Stream.__S_CONNECTED:
144                 return 0
145             elif self.state == Stream.__S_DISCONNECTED:
146                 return self.error
147
148     def recv(self, n):
149         """Tries to receive up to 'n' bytes from this stream.  Returns a
150         (error, string) tuple:
151
152             - If successful, 'error' is zero and 'string' contains between 1
153               and 'n' bytes of data.
154
155             - On error, 'error' is a positive errno value.
156
157             - If the connection has been closed in the normal fashion or if 'n'
158               is 0, the tuple is (0, "").
159
160         The recv function will not block waiting for data to arrive.  If no
161         data have been received, it returns (errno.EAGAIN, "") immediately."""
162
163         retval = self.connect()
164         if retval != 0:
165             return (retval, "")
166         elif n == 0:
167             return (0, "")
168
169         try:
170             return (0, self.socket.recv(n))
171         except socket.error, e:
172             return (ovs.socket_util.get_exception_errno(e), "")
173
174     def send(self, buf):
175         """Tries to send 'buf' on this stream.
176
177         If successful, returns the number of bytes sent, between 1 and
178         len(buf).  0 is only a valid return value if len(buf) is 0.
179
180         On error, returns a negative errno value.
181
182         Will not block.  If no bytes can be immediately accepted for
183         transmission, returns -errno.EAGAIN immediately."""
184
185         retval = self.connect()
186         if retval != 0:
187             return -retval
188         elif len(buf) == 0:
189             return 0
190
191         try:
192             return self.socket.send(buf)
193         except socket.error, e:
194             return -ovs.socket_util.get_exception_errno(e)
195
196     def run(self):
197         pass
198
199     def run_wait(self, poller):
200         pass
201
202     def wait(self, poller, wait):
203         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
204
205         if self.state == Stream.__S_DISCONNECTED:
206             poller.immediate_wake()
207             return
208
209         if self.state == Stream.__S_CONNECTING:
210             wait = Stream.W_CONNECT
211         if wait == Stream.W_RECV:
212             poller.fd_wait(self.socket, select.POLLIN)
213         else:
214             poller.fd_wait(self.socket, select.POLLOUT)
215
216     def connect_wait(self, poller):
217         self.wait(poller, Stream.W_CONNECT)
218
219     def recv_wait(self, poller):
220         self.wait(poller, Stream.W_RECV)
221
222     def send_wait(self, poller):
223         self.wait(poller, Stream.W_SEND)
224
225     def __del__(self):
226         # Don't delete the file: we might have forked.
227         self.socket.close()
228
229
230 class PassiveStream(object):
231     @staticmethod
232     def is_valid_name(name):
233         """Returns True if 'name' is a passive stream name in the form
234         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
235         "punix:"), otherwise False."""
236         return name.startswith("punix:")
237
238     def __init__(self, sock, name, bind_path):
239         self.name = name
240         self.socket = sock
241         self.bind_path = bind_path
242
243     @staticmethod
244     def open(name):
245         """Attempts to start listening for remote stream connections.  'name'
246         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
247         stream class's name and ARGS are stream class-specific.  Currently the
248         only supported TYPE is "punix".
249
250         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
251         new PassiveStream, on failure 'error' is a positive errno value and
252         'pstream' is None."""
253         if not PassiveStream.is_valid_name(name):
254             return errno.EAFNOSUPPORT, None
255
256         bind_path = name[6:]
257         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
258                                                        True, bind_path, None)
259         if error:
260             return error, None
261
262         try:
263             sock.listen(10)
264         except socket.error, e:
265             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
266             sock.close()
267             return e.error, None
268
269         return 0, PassiveStream(sock, name, bind_path)
270
271     def close(self):
272         """Closes this PassiveStream."""
273         self.socket.close()
274         if self.bind_path is not None:
275             ovs.fatal_signal.unlink_file_now(self.bind_path)
276             self.bind_path = None
277
278     def accept(self):
279         """Tries to accept a new connection on this passive stream.  Returns
280         (error, stream): if successful, 'error' is 0 and 'stream' is the new
281         Stream object, and on failure 'error' is a positive errno value and
282         'stream' is None.
283
284         Will not block waiting for a connection.  If no connection is ready to
285         be accepted, returns (errno.EAGAIN, None) immediately."""
286
287         while True:
288             try:
289                 sock, addr = self.socket.accept()
290                 ovs.socket_util.set_nonblocking(sock)
291                 return 0, Stream(sock, "unix:%s" % addr, None, 0)
292             except socket.error, e:
293                 error = ovs.socket_util.get_exception_errno(e)
294                 if error != errno.EAGAIN:
295                     # XXX rate-limit
296                     vlog.dbg("accept: %s" % os.strerror(error))
297                 return error, None
298
299     def wait(self, poller):
300         poller.fd_wait(self.socket, select.POLLIN)
301
302     def __del__(self):
303         # Don't delete the file: we might have forked.
304         self.socket.close()
305
306
307 def usage(name):
308     return """
309 Active %s connection methods:
310   unix:FILE               Unix domain socket named FILE
311
312 Passive %s connection methods:
313   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)