ovs.stream: Use %d in place of %ld since the two are equivalent in Python.
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18 import select
19 import socket
20 import sys
21
22 import ovs.poller
23 import ovs.socket_util
24
25 class Stream(object):
26     """Bidirectional byte stream.  Currently only Unix domain sockets
27     are implemented."""
28     n_unix_sockets = 0
29
30     # States.
31     __S_CONNECTING = 0
32     __S_CONNECTED = 1
33     __S_DISCONNECTED = 2
34
35     # Kinds of events that one might wait for.
36     W_CONNECT = 0               # Connect complete (success or failure).
37     W_RECV = 1                  # Data received.
38     W_SEND = 2                  # Send buffer room available.
39
40     @staticmethod
41     def is_valid_name(name):
42         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
43         TYPE is a supported stream type (currently only "unix:"), otherwise
44         False."""
45         return name.startswith("unix:")
46
47     def __init__(self, socket, name, bind_path, status):
48         self.socket = socket
49         self.name = name
50         self.bind_path = bind_path
51         if status == errno.EAGAIN:
52             self.state = Stream.__S_CONNECTING
53         elif status == 0:
54             self.state = Stream.__S_CONNECTED
55         else:
56             self.state = Stream.__S_DISCONNECTED
57
58         self.error = 0
59
60     @staticmethod
61     def open(name):
62         """Attempts to connect a stream to a remote peer.  'name' is a
63         connection name in the form "TYPE:ARGS", where TYPE is an active stream
64         class's name and ARGS are stream class-specific.  Currently the only
65         supported TYPE is "unix".
66
67         Returns (error, stream): on success 'error' is 0 and 'stream' is the
68         new Stream, on failure 'error' is a positive errno value and 'stream'
69         is None.
70
71         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
72         and a new Stream.  The connect() method can be used to check for
73         successful connection completion."""
74         if not Stream.is_valid_name(name):
75             return errno.EAFNOSUPPORT, None
76
77         Stream.n_unix_sockets += 1
78         bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
79                                                 Stream.n_unix_sockets)
80         connect_path = name[5:]
81         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
82                                                        True, bind_path,
83                                                        connect_path)
84         if error:
85             return error, None
86         else:
87             status = ovs.socket_util.check_connection_completion(sock)
88             return 0, Stream(sock, name, bind_path, status)
89
90     @staticmethod
91     def open_block((error, stream)):
92         """Blocks until a Stream completes its connection attempt, either
93         succeeding or failing.  (error, stream) should be the tuple returned by
94         Stream.open().  Returns a tuple of the same form.
95
96         Typical usage:
97         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
98
99         if not error:
100             while True:
101                 error = stream.connect()
102                 if error != errno.EAGAIN:
103                     break
104                 stream.run()
105                 poller = ovs.poller.Poller()
106                 stream.run_wait()
107                 stream.connect_wait(poller)
108                 poller.block()
109             assert error != errno.EINPROGRESS
110         
111         if error and stream:
112             stream.close()
113             stream = None
114         return error, stream
115
116     def close(self):
117         self.socket.close()
118         if self.bind_path is not None:
119             ovs.fatal_signal.unlink_file_now(self.bind_path)
120             self.bind_path = None
121
122     def __scs_connecting(self):
123         retval = ovs.socket_util.check_connection_completion(self.socket)
124         assert retval != errno.EINPROGRESS
125         if retval == 0:
126             self.state = Stream.__S_CONNECTED
127         elif retval != errno.EAGAIN:
128             self.state = Stream.__S_DISCONNECTED
129             self.error = retval
130
131     def connect(self):
132         """Tries to complete the connection on this stream.  If the connection
133         is complete, returns 0 if the connection was successful or a positive
134         errno value if it failed.  If the connection is still in progress,
135         returns errno.EAGAIN."""
136         last_state = -1         # Always differs from initial self.state
137         while self.state != last_state:
138             last_state = self.state
139             if self.state == Stream.__S_CONNECTING:
140                 self.__scs_connecting()
141             elif self.state == Stream.__S_CONNECTED:
142                 return 0
143             elif self.state == Stream.__S_DISCONNECTED:
144                 return self.error
145
146     def recv(self, n):
147         """Tries to receive up to 'n' bytes from this stream.  Returns a
148         (error, string) tuple:
149         
150             - If successful, 'error' is zero and 'string' contains between 1
151               and 'n' bytes of data.
152
153             - On error, 'error' is a positive errno value.
154
155             - If the connection has been closed in the normal fashion or if 'n'
156               is 0, the tuple is (0, "").
157         
158         The recv function will not block waiting for data to arrive.  If no
159         data have been received, it returns (errno.EAGAIN, "") immediately."""
160
161         retval = self.connect()
162         if retval != 0:
163             return (retval, "")
164         elif n == 0:
165             return (0, "")
166
167         try:
168             return (0, self.socket.recv(n))
169         except socket.error, e:
170             return (ovs.socket_util.get_exception_errno(e), "")
171
172     def send(self, buf):
173         """Tries to send 'buf' on this stream.
174
175         If successful, returns the number of bytes sent, between 1 and
176         len(buf).  0 is only a valid return value if len(buf) is 0.
177
178         On error, returns a negative errno value.
179
180         Will not block.  If no bytes can be immediately accepted for
181         transmission, returns -errno.EAGAIN immediately."""
182
183         retval = self.connect()
184         if retval != 0:
185             return -retval
186         elif len(buf) == 0:
187             return 0
188
189         try:
190             return self.socket.send(buf)
191         except socket.error, e:
192             return -ovs.socket_util.get_exception_errno(e)
193
194     def run(self):
195         pass
196
197     def run_wait(self, poller):
198         pass
199
200     def wait(self, poller, wait):
201         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
202
203         if self.state == Stream.__S_DISCONNECTED:
204             poller.immediate_wake()
205             return
206
207         if self.state == Stream.__S_CONNECTING:
208             wait = Stream.W_CONNECT
209         if wait in (Stream.W_CONNECT, Stream.W_SEND):
210             poller.fd_wait(self.socket, select.POLLOUT)
211         else:
212             poller.fd_wait(self.socket, select.POLLIN)
213
214     def connect_wait(self, poller):
215         self.wait(poller, Stream.W_CONNECT)
216         
217     def recv_wait(self, poller):
218         self.wait(poller, Stream.W_RECV)
219         
220     def send_wait(self, poller):
221         self.wait(poller, Stream.W_SEND)
222         
223     def get_name(self):
224         return self.name
225         
226     def __del__(self):
227         # Don't delete the file: we might have forked.
228         self.socket.close()
229
230 class PassiveStream(object):
231     @staticmethod
232     def is_valid_name(name):
233         """Returns True if 'name' is a passive stream name in the form
234         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
235         "punix:"), otherwise False."""
236         return name.startswith("punix:")
237
238     def __init__(self, sock, name, bind_path):
239         self.name = name
240         self.socket = sock
241         self.bind_path = bind_path
242
243     @staticmethod
244     def open(name):
245         """Attempts to start listening for remote stream connections.  'name'
246         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
247         stream class's name and ARGS are stream class-specific.  Currently the
248         only supported TYPE is "punix".
249
250         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
251         new PassiveStream, on failure 'error' is a positive errno value and
252         'pstream' is None."""
253         if not PassiveStream.is_valid_name(name):
254             return errno.EAFNOSUPPORT, None
255
256         bind_path = name[6:]
257         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
258                                                        True, bind_path, None)
259         if error:
260             return error, None
261
262         try:
263             sock.listen(10)
264         except socket.error, e:
265             logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
266             sock.close()
267             return e.error, None
268
269         return 0, PassiveStream(sock, name, bind_path)
270
271     def close(self):
272         """Closes this PassiveStream."""
273         self.socket.close()
274         if self.bind_path is not None:
275             ovs.fatal_signal.unlink_file_now(self.bind_path)
276             self.bind_path = None
277
278     def accept(self):
279         """Tries to accept a new connection on this passive stream.  Returns
280         (error, stream): if successful, 'error' is 0 and 'stream' is the new
281         Stream object, and on failure 'error' is a positive errno value and
282         'stream' is None.
283
284         Will not block waiting for a connection.  If no connection is ready to
285         be accepted, returns (errno.EAGAIN, None) immediately."""
286
287         while True:
288             try:
289                 sock, addr = self.socket.accept()
290                 ovs.socket_util.set_nonblocking(sock)
291                 return 0, Stream(sock, "unix:%s" % addr, None, 0)
292             except socket.error, e:
293                 error = ovs.socket_util.get_exception_errno(e)
294                 if error != errno.EAGAIN:
295                     # XXX rate-limit
296                     logging.debug("accept: %s" % os.strerror(error))
297                 return error, None
298
299     def wait(self, poller):
300         poller.fd_wait(self.socket, select.POLLIN)
301
302     def __del__(self):
303         # Don't delete the file: we might have forked.
304         self.socket.close()
305
306 def usage(name, active, passive, bootstrap):
307     print
308     if active:
309         print("Active %s connection methods:" % name)
310         print("  unix:FILE               "
311                "Unix domain socket named FILE");
312
313     if passive:
314         print("Passive %s connection methods:" % name)
315         print("  punix:FILE              "
316               "listen on Unix domain socket FILE")