Merge branch 'master' of ssh://git.onelab.eu/git/sliver-openvswitch
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import socket
18
19 import ovs.poller
20 import ovs.socket_util
21 import ovs.vlog
22
23 vlog = ovs.vlog.Vlog("stream")
24
25
26 def stream_or_pstream_needs_probes(name):
27     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
28     verify connectivity.  For [p]streams which need probes, it can take a long
29     time to notice the connection was dropped.  Returns 0 if probes aren't
30     needed, and -1 if 'name' is invalid"""
31
32     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
33         # Only unix and punix are supported currently.
34         return 0
35     else:
36         return -1
37
38
39 class Stream(object):
40     """Bidirectional byte stream.  Currently only Unix domain sockets
41     are implemented."""
42
43     # States.
44     __S_CONNECTING = 0
45     __S_CONNECTED = 1
46     __S_DISCONNECTED = 2
47
48     # Kinds of events that one might wait for.
49     W_CONNECT = 0               # Connect complete (success or failure).
50     W_RECV = 1                  # Data received.
51     W_SEND = 2                  # Send buffer room available.
52
53     _SOCKET_METHODS = {}
54
55     @staticmethod
56     def register_method(method, cls):
57         Stream._SOCKET_METHODS[method + ":"] = cls
58
59     @staticmethod
60     def _find_method(name):
61         for method, cls in Stream._SOCKET_METHODS.items():
62             if name.startswith(method):
63                 return cls
64         return None
65
66     @staticmethod
67     def is_valid_name(name):
68         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
69         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
70         otherwise False."""
71         return bool(Stream._find_method(name))
72
73     def __init__(self, socket, name, status):
74         self.socket = socket
75         self.name = name
76         if status == errno.EAGAIN:
77             self.state = Stream.__S_CONNECTING
78         elif status == 0:
79             self.state = Stream.__S_CONNECTED
80         else:
81             self.state = Stream.__S_DISCONNECTED
82
83         self.error = 0
84
85     # Default value of dscp bits for connection between controller and manager.
86     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
87     # in <netinet/ip.h> is used.
88     IPTOS_PREC_INTERNETCONTROL = 0xc0
89     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
90
91     @staticmethod
92     def open(name, dscp=DSCP_DEFAULT):
93         """Attempts to connect a stream to a remote peer.  'name' is a
94         connection name in the form "TYPE:ARGS", where TYPE is an active stream
95         class's name and ARGS are stream class-specific.  Currently the only
96         supported TYPEs are "unix" and "tcp".
97
98         Returns (error, stream): on success 'error' is 0 and 'stream' is the
99         new Stream, on failure 'error' is a positive errno value and 'stream'
100         is None.
101
102         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
103         and a new Stream.  The connect() method can be used to check for
104         successful connection completion."""
105         cls = Stream._find_method(name)
106         if not cls:
107             return errno.EAFNOSUPPORT, None
108
109         suffix = name.split(":", 1)[1]
110         error, sock = cls._open(suffix, dscp)
111         if error:
112             return error, None
113         else:
114             status = ovs.socket_util.check_connection_completion(sock)
115             return 0, Stream(sock, name, status)
116
117     @staticmethod
118     def _open(suffix, dscp):
119         raise NotImplementedError("This method must be overrided by subclass")
120
121     @staticmethod
122     def open_block((error, stream)):
123         """Blocks until a Stream completes its connection attempt, either
124         succeeding or failing.  (error, stream) should be the tuple returned by
125         Stream.open().  Returns a tuple of the same form.
126
127         Typical usage:
128         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
129
130         if not error:
131             while True:
132                 error = stream.connect()
133                 if error != errno.EAGAIN:
134                     break
135                 stream.run()
136                 poller = ovs.poller.Poller()
137                 stream.run_wait(poller)
138                 stream.connect_wait(poller)
139                 poller.block()
140             assert error != errno.EINPROGRESS
141
142         if error and stream:
143             stream.close()
144             stream = None
145         return error, stream
146
147     def close(self):
148         self.socket.close()
149
150     def __scs_connecting(self):
151         retval = ovs.socket_util.check_connection_completion(self.socket)
152         assert retval != errno.EINPROGRESS
153         if retval == 0:
154             self.state = Stream.__S_CONNECTED
155         elif retval != errno.EAGAIN:
156             self.state = Stream.__S_DISCONNECTED
157             self.error = retval
158
159     def connect(self):
160         """Tries to complete the connection on this stream.  If the connection
161         is complete, returns 0 if the connection was successful or a positive
162         errno value if it failed.  If the connection is still in progress,
163         returns errno.EAGAIN."""
164
165         if self.state == Stream.__S_CONNECTING:
166             self.__scs_connecting()
167
168         if self.state == Stream.__S_CONNECTING:
169             return errno.EAGAIN
170         elif self.state == Stream.__S_CONNECTED:
171             return 0
172         else:
173             assert self.state == Stream.__S_DISCONNECTED
174             return self.error
175
176     def recv(self, n):
177         """Tries to receive up to 'n' bytes from this stream.  Returns a
178         (error, string) tuple:
179
180             - If successful, 'error' is zero and 'string' contains between 1
181               and 'n' bytes of data.
182
183             - On error, 'error' is a positive errno value.
184
185             - If the connection has been closed in the normal fashion or if 'n'
186               is 0, the tuple is (0, "").
187
188         The recv function will not block waiting for data to arrive.  If no
189         data have been received, it returns (errno.EAGAIN, "") immediately."""
190
191         retval = self.connect()
192         if retval != 0:
193             return (retval, "")
194         elif n == 0:
195             return (0, "")
196
197         try:
198             return (0, self.socket.recv(n))
199         except socket.error, e:
200             return (ovs.socket_util.get_exception_errno(e), "")
201
202     def send(self, buf):
203         """Tries to send 'buf' on this stream.
204
205         If successful, returns the number of bytes sent, between 1 and
206         len(buf).  0 is only a valid return value if len(buf) is 0.
207
208         On error, returns a negative errno value.
209
210         Will not block.  If no bytes can be immediately accepted for
211         transmission, returns -errno.EAGAIN immediately."""
212
213         retval = self.connect()
214         if retval != 0:
215             return -retval
216         elif len(buf) == 0:
217             return 0
218
219         try:
220             return self.socket.send(buf)
221         except socket.error, e:
222             return -ovs.socket_util.get_exception_errno(e)
223
224     def run(self):
225         pass
226
227     def run_wait(self, poller):
228         pass
229
230     def wait(self, poller, wait):
231         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
232
233         if self.state == Stream.__S_DISCONNECTED:
234             poller.immediate_wake()
235             return
236
237         if self.state == Stream.__S_CONNECTING:
238             wait = Stream.W_CONNECT
239         if wait == Stream.W_RECV:
240             poller.fd_wait(self.socket, ovs.poller.POLLIN)
241         else:
242             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
243
244     def connect_wait(self, poller):
245         self.wait(poller, Stream.W_CONNECT)
246
247     def recv_wait(self, poller):
248         self.wait(poller, Stream.W_RECV)
249
250     def send_wait(self, poller):
251         self.wait(poller, Stream.W_SEND)
252
253     def __del__(self):
254         # Don't delete the file: we might have forked.
255         self.socket.close()
256
257
258 class PassiveStream(object):
259     @staticmethod
260     def is_valid_name(name):
261         """Returns True if 'name' is a passive stream name in the form
262         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
263         "punix:"), otherwise False."""
264         return name.startswith("punix:")
265
266     def __init__(self, sock, name, bind_path):
267         self.name = name
268         self.socket = sock
269         self.bind_path = bind_path
270
271     @staticmethod
272     def open(name):
273         """Attempts to start listening for remote stream connections.  'name'
274         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
275         stream class's name and ARGS are stream class-specific.  Currently the
276         only supported TYPE is "punix".
277
278         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
279         new PassiveStream, on failure 'error' is a positive errno value and
280         'pstream' is None."""
281         if not PassiveStream.is_valid_name(name):
282             return errno.EAFNOSUPPORT, None
283
284         bind_path = name[6:]
285         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
286                                                        True, bind_path, None)
287         if error:
288             return error, None
289
290         try:
291             sock.listen(10)
292         except socket.error, e:
293             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
294             sock.close()
295             return e.error, None
296
297         return 0, PassiveStream(sock, name, bind_path)
298
299     def close(self):
300         """Closes this PassiveStream."""
301         self.socket.close()
302         if self.bind_path is not None:
303             ovs.fatal_signal.unlink_file_now(self.bind_path)
304             self.bind_path = None
305
306     def accept(self):
307         """Tries to accept a new connection on this passive stream.  Returns
308         (error, stream): if successful, 'error' is 0 and 'stream' is the new
309         Stream object, and on failure 'error' is a positive errno value and
310         'stream' is None.
311
312         Will not block waiting for a connection.  If no connection is ready to
313         be accepted, returns (errno.EAGAIN, None) immediately."""
314
315         while True:
316             try:
317                 sock, addr = self.socket.accept()
318                 ovs.socket_util.set_nonblocking(sock)
319                 return 0, Stream(sock, "unix:%s" % addr, 0)
320             except socket.error, e:
321                 error = ovs.socket_util.get_exception_errno(e)
322                 if error != errno.EAGAIN:
323                     # XXX rate-limit
324                     vlog.dbg("accept: %s" % os.strerror(error))
325                 return error, None
326
327     def wait(self, poller):
328         poller.fd_wait(self.socket, ovs.poller.POLLIN)
329
330     def __del__(self):
331         # Don't delete the file: we might have forked.
332         self.socket.close()
333
334
335 def usage(name):
336     return """
337 Active %s connection methods:
338   unix:FILE               Unix domain socket named FILE
339   tcp:IP:PORT             TCP socket to IP with port no of PORT
340
341 Passive %s connection methods:
342   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
343
344
345 class UnixStream(Stream):
346     @staticmethod
347     def _open(suffix, dscp):
348         connect_path = suffix
349         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
350                                                  True, None, connect_path)
351 Stream.register_method("unix", UnixStream)
352
353
354 class TCPStream(Stream):
355     @staticmethod
356     def _open(suffix, dscp):
357         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
358                                                        suffix, 0, dscp)
359         if not error:
360             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
361         return error, sock
362 Stream.register_method("tcp", TCPStream)