Setting tag sliver-openvswitch-1.9.90-2
[sliver-openvswitch.git] / python / ovs / stream.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17 import select
18 import socket
19
20 import ovs.poller
21 import ovs.socket_util
22 import ovs.vlog
23
24 vlog = ovs.vlog.Vlog("stream")
25
26
27 def stream_or_pstream_needs_probes(name):
28     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
29     verify connectivity.  For [p]streams which need probes, it can take a long
30     time to notice the connection was dropped.  Returns 0 if probes aren't
31     needed, and -1 if 'name' is invalid"""
32
33     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
34         # Only unix and punix are supported currently.
35         return 0
36     else:
37         return -1
38
39
40 class Stream(object):
41     """Bidirectional byte stream.  Currently only Unix domain sockets
42     are implemented."""
43
44     # States.
45     __S_CONNECTING = 0
46     __S_CONNECTED = 1
47     __S_DISCONNECTED = 2
48
49     # Kinds of events that one might wait for.
50     W_CONNECT = 0               # Connect complete (success or failure).
51     W_RECV = 1                  # Data received.
52     W_SEND = 2                  # Send buffer room available.
53
54     _SOCKET_METHODS = {}
55
56     @staticmethod
57     def register_method(method, cls):
58         Stream._SOCKET_METHODS[method + ":"] = cls
59
60     @staticmethod
61     def _find_method(name):
62         for method, cls in Stream._SOCKET_METHODS.items():
63             if name.startswith(method):
64                 return cls
65         return None
66
67     @staticmethod
68     def is_valid_name(name):
69         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
70         TYPE is a supported stream type (currently only "unix:" and "tcp:"),
71         otherwise False."""
72         return bool(Stream._find_method(name))
73
74     def __init__(self, socket, name, status):
75         self.socket = socket
76         self.name = name
77         if status == errno.EAGAIN:
78             self.state = Stream.__S_CONNECTING
79         elif status == 0:
80             self.state = Stream.__S_CONNECTED
81         else:
82             self.state = Stream.__S_DISCONNECTED
83
84         self.error = 0
85
86     # Default value of dscp bits for connection between controller and manager.
87     # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
88     # in <netinet/ip.h> is used.
89     IPTOS_PREC_INTERNETCONTROL = 0xc0
90     DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
91
92     @staticmethod
93     def open(name, dscp=DSCP_DEFAULT):
94         """Attempts to connect a stream to a remote peer.  'name' is a
95         connection name in the form "TYPE:ARGS", where TYPE is an active stream
96         class's name and ARGS are stream class-specific.  Currently the only
97         supported TYPEs are "unix" and "tcp".
98
99         Returns (error, stream): on success 'error' is 0 and 'stream' is the
100         new Stream, on failure 'error' is a positive errno value and 'stream'
101         is None.
102
103         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
104         and a new Stream.  The connect() method can be used to check for
105         successful connection completion."""
106         cls = Stream._find_method(name)
107         if not cls:
108             return errno.EAFNOSUPPORT, None
109
110         suffix = name.split(":", 1)[1]
111         error, sock = cls._open(suffix, dscp)
112         if error:
113             return error, None
114         else:
115             status = ovs.socket_util.check_connection_completion(sock)
116             return 0, Stream(sock, name, status)
117
118     @staticmethod
119     def _open(suffix, dscp):
120         raise NotImplementedError("This method must be overrided by subclass")
121
122     @staticmethod
123     def open_block((error, stream)):
124         """Blocks until a Stream completes its connection attempt, either
125         succeeding or failing.  (error, stream) should be the tuple returned by
126         Stream.open().  Returns a tuple of the same form.
127
128         Typical usage:
129         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
130
131         if not error:
132             while True:
133                 error = stream.connect()
134                 if error != errno.EAGAIN:
135                     break
136                 stream.run()
137                 poller = ovs.poller.Poller()
138                 stream.run_wait(poller)
139                 stream.connect_wait(poller)
140                 poller.block()
141             assert error != errno.EINPROGRESS
142
143         if error and stream:
144             stream.close()
145             stream = None
146         return error, stream
147
148     def close(self):
149         self.socket.close()
150
151     def __scs_connecting(self):
152         retval = ovs.socket_util.check_connection_completion(self.socket)
153         assert retval != errno.EINPROGRESS
154         if retval == 0:
155             self.state = Stream.__S_CONNECTED
156         elif retval != errno.EAGAIN:
157             self.state = Stream.__S_DISCONNECTED
158             self.error = retval
159
160     def connect(self):
161         """Tries to complete the connection on this stream.  If the connection
162         is complete, returns 0 if the connection was successful or a positive
163         errno value if it failed.  If the connection is still in progress,
164         returns errno.EAGAIN."""
165         last_state = -1         # Always differs from initial self.state
166         while self.state != last_state:
167             last_state = self.state
168             if self.state == Stream.__S_CONNECTING:
169                 self.__scs_connecting()
170             elif self.state == Stream.__S_CONNECTED:
171                 return 0
172             elif self.state == Stream.__S_DISCONNECTED:
173                 return self.error
174
175     def recv(self, n):
176         """Tries to receive up to 'n' bytes from this stream.  Returns a
177         (error, string) tuple:
178
179             - If successful, 'error' is zero and 'string' contains between 1
180               and 'n' bytes of data.
181
182             - On error, 'error' is a positive errno value.
183
184             - If the connection has been closed in the normal fashion or if 'n'
185               is 0, the tuple is (0, "").
186
187         The recv function will not block waiting for data to arrive.  If no
188         data have been received, it returns (errno.EAGAIN, "") immediately."""
189
190         retval = self.connect()
191         if retval != 0:
192             return (retval, "")
193         elif n == 0:
194             return (0, "")
195
196         try:
197             return (0, self.socket.recv(n))
198         except socket.error, e:
199             return (ovs.socket_util.get_exception_errno(e), "")
200
201     def send(self, buf):
202         """Tries to send 'buf' on this stream.
203
204         If successful, returns the number of bytes sent, between 1 and
205         len(buf).  0 is only a valid return value if len(buf) is 0.
206
207         On error, returns a negative errno value.
208
209         Will not block.  If no bytes can be immediately accepted for
210         transmission, returns -errno.EAGAIN immediately."""
211
212         retval = self.connect()
213         if retval != 0:
214             return -retval
215         elif len(buf) == 0:
216             return 0
217
218         try:
219             return self.socket.send(buf)
220         except socket.error, e:
221             return -ovs.socket_util.get_exception_errno(e)
222
223     def run(self):
224         pass
225
226     def run_wait(self, poller):
227         pass
228
229     def wait(self, poller, wait):
230         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
231
232         if self.state == Stream.__S_DISCONNECTED:
233             poller.immediate_wake()
234             return
235
236         if self.state == Stream.__S_CONNECTING:
237             wait = Stream.W_CONNECT
238         if wait == Stream.W_RECV:
239             poller.fd_wait(self.socket, select.POLLIN)
240         else:
241             poller.fd_wait(self.socket, select.POLLOUT)
242
243     def connect_wait(self, poller):
244         self.wait(poller, Stream.W_CONNECT)
245
246     def recv_wait(self, poller):
247         self.wait(poller, Stream.W_RECV)
248
249     def send_wait(self, poller):
250         self.wait(poller, Stream.W_SEND)
251
252     def __del__(self):
253         # Don't delete the file: we might have forked.
254         self.socket.close()
255
256
257 class PassiveStream(object):
258     @staticmethod
259     def is_valid_name(name):
260         """Returns True if 'name' is a passive stream name in the form
261         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
262         "punix:"), otherwise False."""
263         return name.startswith("punix:")
264
265     def __init__(self, sock, name, bind_path):
266         self.name = name
267         self.socket = sock
268         self.bind_path = bind_path
269
270     @staticmethod
271     def open(name):
272         """Attempts to start listening for remote stream connections.  'name'
273         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
274         stream class's name and ARGS are stream class-specific.  Currently the
275         only supported TYPE is "punix".
276
277         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
278         new PassiveStream, on failure 'error' is a positive errno value and
279         'pstream' is None."""
280         if not PassiveStream.is_valid_name(name):
281             return errno.EAFNOSUPPORT, None
282
283         bind_path = name[6:]
284         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
285                                                        True, bind_path, None)
286         if error:
287             return error, None
288
289         try:
290             sock.listen(10)
291         except socket.error, e:
292             vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
293             sock.close()
294             return e.error, None
295
296         return 0, PassiveStream(sock, name, bind_path)
297
298     def close(self):
299         """Closes this PassiveStream."""
300         self.socket.close()
301         if self.bind_path is not None:
302             ovs.fatal_signal.unlink_file_now(self.bind_path)
303             self.bind_path = None
304
305     def accept(self):
306         """Tries to accept a new connection on this passive stream.  Returns
307         (error, stream): if successful, 'error' is 0 and 'stream' is the new
308         Stream object, and on failure 'error' is a positive errno value and
309         'stream' is None.
310
311         Will not block waiting for a connection.  If no connection is ready to
312         be accepted, returns (errno.EAGAIN, None) immediately."""
313
314         while True:
315             try:
316                 sock, addr = self.socket.accept()
317                 ovs.socket_util.set_nonblocking(sock)
318                 return 0, Stream(sock, "unix:%s" % addr, 0)
319             except socket.error, e:
320                 error = ovs.socket_util.get_exception_errno(e)
321                 if error != errno.EAGAIN:
322                     # XXX rate-limit
323                     vlog.dbg("accept: %s" % os.strerror(error))
324                 return error, None
325
326     def wait(self, poller):
327         poller.fd_wait(self.socket, select.POLLIN)
328
329     def __del__(self):
330         # Don't delete the file: we might have forked.
331         self.socket.close()
332
333
334 def usage(name):
335     return """
336 Active %s connection methods:
337   unix:FILE               Unix domain socket named FILE
338   tcp:IP:PORT             TCP socket to IP with port no of PORT
339
340 Passive %s connection methods:
341   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
342
343
344 class UnixStream(Stream):
345     @staticmethod
346     def _open(suffix, dscp):
347         connect_path = suffix
348         return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
349                                                  True, None, connect_path)
350 Stream.register_method("unix", UnixStream)
351
352
353 class TCPStream(Stream):
354     @staticmethod
355     def _open(suffix, dscp):
356         error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
357                                                        suffix, 0, dscp)
358         if not error:
359             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
360         return error, sock
361 Stream.register_method("tcp", TCPStream)