Implement initial Python bindings for Open vSwitch database.
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18 import select
19 import socket
20 import sys
21
22 import ovs.poller
23 import ovs.socket_util
24
25 class Stream(object):
26     """Bidirectional byte stream.  Currently only Unix domain sockets
27     are implemented."""
28     n_unix_sockets = 0
29
30     # States.
31     __S_CONNECTING = 0
32     __S_CONNECTED = 1
33     __S_DISCONNECTED = 2
34
35     # Kinds of events that one might wait for.
36     W_CONNECT = 0               # Connect complete (success or failure).
37     W_RECV = 1                  # Data received.
38     W_SEND = 2                  # Send buffer room available.
39
40     @staticmethod
41     def is_valid_name(name):
42         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
43         TYPE is a supported stream type (currently only "unix:"), otherwise
44         False."""
45         return name.startswith("unix:")
46
47     def __init__(self, socket, name, bind_path, status):
48         self.socket = socket
49         self.name = name
50         self.bind_path = bind_path
51         if status == errno.EAGAIN:
52             self.state = Stream.__S_CONNECTING
53         elif status == 0:
54             self.state = Stream.__S_CONNECTED
55         else:
56             self.state = Stream.__S_DISCONNECTED
57
58         self.error = 0
59
60     @staticmethod
61     def open(name):
62         """Attempts to connect a stream to a remote peer.  'name' is a
63         connection name in the form "TYPE:ARGS", where TYPE is an active stream
64         class's name and ARGS are stream class-specific.  Currently the only
65         supported TYPE is "unix".
66
67         Returns (error, stream): on success 'error' is 0 and 'stream' is the
68         new Stream, on failure 'error' is a positive errno value and 'stream'
69         is None.
70
71         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
72         and a new Stream.  The connect() method can be used to check for
73         successful connection completion."""
74         if not Stream.is_valid_name(name):
75             return errno.EAFNOSUPPORT, None
76
77         Stream.n_unix_sockets += 1
78         bind_path = "/tmp/stream-unix.%ld.%d" % (os.getpid(),
79                                                  Stream.n_unix_sockets)
80         connect_path = name[5:]
81         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
82                                                        True, bind_path,
83                                                        connect_path)
84         if error:
85             return error, None
86         else:
87             status = ovs.socket_util.check_connection_completion(sock)
88             return 0, Stream(sock, name, bind_path, status)
89
90     @staticmethod
91     def open_block(tuple):
92         """Blocks until a Stream completes its connection attempt, either
93         succeeding or failing.  'tuple' should be the tuple returned by
94         Stream.open().  Returns a tuple of the same form.
95
96         Typical usage:
97         error, stream = Stream.open_block(Stream.open("tcp:1.2.3.4:5"))"""
98
99         error, stream = tuple
100         if not error:
101             while True:
102                 error = stream.connect()
103                 if error != errno.EAGAIN:
104                     break
105                 stream.run()
106                 poller = ovs.poller.Poller()
107                 stream.run_wait()
108                 stream.connect_wait(poller)
109                 poller.block()
110             assert error != errno.EINPROGRESS
111         
112         if error and stream:
113             stream.close()
114             stream = None
115         return error, stream
116
117     def close(self):
118         self.socket.close()
119         if self.bind_path is not None:
120             ovs.fatal_signal.unlink_file_now(self.bind_path)
121             self.bind_path = None
122
123     def __scs_connecting(self):
124         retval = ovs.socket_util.check_connection_completion(self.socket)
125         assert retval != errno.EINPROGRESS
126         if retval == 0:
127             self.state = Stream.__S_CONNECTED
128         elif retval != errno.EAGAIN:
129             self.state = Stream.__S_DISCONNECTED
130             self.error = retval
131
132     def connect(self):
133         """Tries to complete the connection on this stream.  If the connection
134         is complete, returns 0 if the connection was successful or a positive
135         errno value if it failed.  If the connection is still in progress,
136         returns errno.EAGAIN."""
137         last_state = -1         # Always differs from initial self.state
138         while self.state != last_state:
139             if self.state == Stream.__S_CONNECTING:
140                 self.__scs_connecting()
141             elif self.state == Stream.__S_CONNECTED:
142                 return 0
143             elif self.state == Stream.__S_DISCONNECTED:
144                 return self.error
145
146     def recv(self, n):
147         """Tries to receive up to 'n' bytes from this stream.  Returns a
148         (error, string) tuple:
149         
150             - If successful, 'error' is zero and 'string' contains between 1
151               and 'n' bytes of data.
152
153             - On error, 'error' is a positive errno value.
154
155             - If the connection has been closed in the normal fashion or if 'n'
156               is 0, the tuple is (0, "").
157         
158         The recv function will not block waiting for data to arrive.  If no
159         data have been received, it returns (errno.EAGAIN, "") immediately."""
160
161         retval = self.connect()
162         if retval != 0:
163             return (retval, "")
164         elif n == 0:
165             return (0, "")
166
167         try:
168             return (0, self.socket.recv(n))
169         except socket.error, e:
170             return (ovs.socket_util.get_exception_errno(e), "")
171
172     def send(self, buf):
173         """Tries to send 'buf' on this stream.
174
175         If successful, returns the number of bytes sent, between 1 and
176         len(buf).  0 is only a valid return value if len(buf) is 0.
177
178         On error, returns a negative errno value.
179
180         Will not block.  If no bytes can be immediately accepted for
181         transmission, returns -errno.EAGAIN immediately."""
182
183         retval = self.connect()
184         if retval != 0:
185             return -retval
186         elif len(buf) == 0:
187             return 0
188
189         try:
190             return self.socket.send(buf)
191         except socket.error, e:
192             return -ovs.socket_util.get_exception_errno(e)
193
194     def run(self):
195         pass
196
197     def run_wait(self, poller):
198         pass
199
200     def wait(self, poller, wait):
201         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
202
203         if self.state == Stream.__S_DISCONNECTED:
204             poller.immediate_wake()
205             return
206
207         if self.state == Stream.__S_CONNECTING:
208             wait = Stream.W_CONNECT
209         if wait in (Stream.W_CONNECT, Stream.W_SEND):
210             poller.fd_wait(self.socket, select.POLLOUT)
211         else:
212             poller.fd_wait(self.socket, select.POLLIN)
213
214     def connect_wait(self, poller):
215         self.wait(poller, Stream.W_CONNECT)
216         
217     def recv_wait(self, poller):
218         self.wait(poller, Stream.W_RECV)
219         
220     def send_wait(self, poller):
221         self.wait(poller, Stream.W_SEND)
222         
223     def get_name(self):
224         return self.name
225         
226     def __del__(self):
227         # Don't delete the file: we might have forked.
228         self.socket.close()
229
230 class PassiveStream(object):
231     @staticmethod
232     def is_valid_name(name):
233         """Returns True if 'name' is a passive stream name in the form
234         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
235         "punix:"), otherwise False."""
236         return name.startswith("punix:")
237
238     def __init__(self, sock, name, bind_path):
239         self.name = name
240         self.socket = sock
241         self.bind_path = bind_path
242
243     @staticmethod
244     def open(name):
245         """Attempts to start listening for remote stream connections.  'name'
246         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
247         stream class's name and ARGS are stream class-specific.  Currently the
248         only supported TYPE is "punix".
249
250         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
251         new PassiveStream, on failure 'error' is a positive errno value and
252         'pstream' is None."""
253         if not PassiveStream.is_valid_name(name):
254             return errno.EAFNOSUPPORT, None
255
256         bind_path = name[6:]
257         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
258                                                        True, bind_path, None)
259         if error:
260             return error, None
261
262         try:
263             sock.listen(10)
264         except socket.error, e:
265             logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
266             sock.close()
267             return e.error, None
268
269         return 0, PassiveStream(sock, name, bind_path)
270
271     def close(self):
272         """Closes this PassiveStream."""
273         self.socket.close()
274         if self.bind_path is not None:
275             ovs.fatal_signal.unlink_file_now(self.bind_path)
276             self.bind_path = None
277
278     def accept(self):
279         """Tries to accept a new connection on this passive stream.  Returns
280         (error, stream): if successful, 'error' is 0 and 'stream' is the new
281         Stream object, and on failure 'error' is a positive errno value and
282         'stream' is None.
283
284         Will not block waiting for a connection.  If no connection is ready to
285         be accepted, returns (errno.EAGAIN, None) immediately."""
286
287         while True:
288             try:
289                 sock, addr = self.socket.accept()
290                 ovs.socket_util.set_nonblocking(sock)
291                 return 0, Stream(sock, "unix:%s" % addr, None, 0)
292             except socket.error, e:
293                 error = ovs.socket_util.get_exception_errno(e)
294                 if error != errno.EAGAIN:
295                     # XXX rate-limit
296                     logging.debug("accept: %s" % os.strerror(error))
297                 return error, None
298
299     def wait(self, poller):
300         poller.fd_wait(self.socket, select.POLLIN)
301
302     def __del__(self):
303         # Don't delete the file: we might have forked.
304         self.socket.close()
305
306 def usage(name, active, passive, bootstrap):
307     print
308     if active:
309         print("Active %s connection methods:" % name)
310         print("  unix:FILE               "
311                "Unix domain socket named FILE");
312
313     if passive:
314         print("Passive %s connection methods:" % name)
315         print("  punix:FILE              "
316               "listen on Unix domain socket FILE")