X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=python%2Fovs%2Fstream.py;h=fb083eeeb4e9025edda10a86bee316a598939965;hb=2c487bc808ba6a4a297523f2c6b78ca3e358073a;hp=7ea9e46e8b59c7b6d175460ef31a268d5b8f1607;hpb=3a656eafb96ab8a474e943baabdb2679d0a6b0ef;p=sliver-openvswitch.git diff --git a/python/ovs/stream.py b/python/ovs/stream.py index 7ea9e46e8..fb083eeeb 100644 --- a/python/ovs/stream.py +++ b/python/ovs/stream.py @@ -1,4 +1,4 @@ -# Copyright (c) 2010, 2011 Nicira Networks +# Copyright (c) 2010, 2011, 2012 Nicira, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ import errno import os -import select import socket import ovs.poller @@ -24,10 +23,22 @@ import ovs.vlog vlog = ovs.vlog.Vlog("stream") +def stream_or_pstream_needs_probes(name): + """ 1 if the stream or pstream specified by 'name' needs periodic probes to + verify connectivity. For [p]streams which need probes, it can take a long + time to notice the connection was dropped. Returns 0 if probes aren't + needed, and -1 if 'name' is invalid""" + + if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name): + # Only unix and punix are supported currently. + return 0 + else: + return -1 + + class Stream(object): """Bidirectional byte stream. Currently only Unix domain sockets are implemented.""" - n_unix_sockets = 0 # States. __S_CONNECTING = 0 @@ -39,17 +50,29 @@ class Stream(object): W_RECV = 1 # Data received. W_SEND = 2 # Send buffer room available. + _SOCKET_METHODS = {} + + @staticmethod + def register_method(method, cls): + Stream._SOCKET_METHODS[method + ":"] = cls + + @staticmethod + def _find_method(name): + for method, cls in Stream._SOCKET_METHODS.items(): + if name.startswith(method): + return cls + return None + @staticmethod def is_valid_name(name): """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and - TYPE is a supported stream type (currently only "unix:"), otherwise - False.""" - return name.startswith("unix:") + TYPE is a supported stream type (currently only "unix:" and "tcp:"), + otherwise False.""" + return bool(Stream._find_method(name)) - def __init__(self, socket, name, bind_path, status): + def __init__(self, socket, name, status): self.socket = socket self.name = name - self.bind_path = bind_path if status == errno.EAGAIN: self.state = Stream.__S_CONNECTING elif status == 0: @@ -59,12 +82,18 @@ class Stream(object): self.error = 0 + # Default value of dscp bits for connection between controller and manager. + # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined + # in is used. + IPTOS_PREC_INTERNETCONTROL = 0xc0 + DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 + @staticmethod - def open(name): + def open(name, dscp=DSCP_DEFAULT): """Attempts to connect a stream to a remote peer. 'name' is a connection name in the form "TYPE:ARGS", where TYPE is an active stream class's name and ARGS are stream class-specific. Currently the only - supported TYPE is "unix". + supported TYPEs are "unix" and "tcp". Returns (error, stream): on success 'error' is 0 and 'stream' is the new Stream, on failure 'error' is a positive errno value and 'stream' @@ -73,21 +102,23 @@ class Stream(object): Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0 and a new Stream. The connect() method can be used to check for successful connection completion.""" - if not Stream.is_valid_name(name): + cls = Stream._find_method(name) + if not cls: return errno.EAFNOSUPPORT, None - Stream.n_unix_sockets += 1 - bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(), - Stream.n_unix_sockets) - connect_path = name[5:] - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, bind_path, - connect_path) + suffix = name.split(":", 1)[1] + if name.startswith("unix:"): + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + error, sock = cls._open(suffix, dscp) if error: return error, None else: status = ovs.socket_util.check_connection_completion(sock) - return 0, Stream(sock, name, bind_path, status) + return 0, Stream(sock, name, status) + + @staticmethod + def _open(suffix, dscp): + raise NotImplementedError("This method must be overrided by subclass") @staticmethod def open_block((error, stream)): @@ -105,7 +136,7 @@ class Stream(object): break stream.run() poller = ovs.poller.Poller() - stream.run_wait() + stream.run_wait(poller) stream.connect_wait(poller) poller.block() assert error != errno.EINPROGRESS @@ -117,9 +148,6 @@ class Stream(object): def close(self): self.socket.close() - if self.bind_path is not None: - ovs.fatal_signal.unlink_file_now(self.bind_path) - self.bind_path = None def __scs_connecting(self): retval = ovs.socket_util.check_connection_completion(self.socket) @@ -135,15 +163,17 @@ class Stream(object): is complete, returns 0 if the connection was successful or a positive errno value if it failed. If the connection is still in progress, returns errno.EAGAIN.""" - last_state = -1 # Always differs from initial self.state - while self.state != last_state: - last_state = self.state - if self.state == Stream.__S_CONNECTING: - self.__scs_connecting() - elif self.state == Stream.__S_CONNECTED: - return 0 - elif self.state == Stream.__S_DISCONNECTED: - return self.error + + if self.state == Stream.__S_CONNECTING: + self.__scs_connecting() + + if self.state == Stream.__S_CONNECTING: + return errno.EAGAIN + elif self.state == Stream.__S_CONNECTED: + return 0 + else: + assert self.state == Stream.__S_DISCONNECTED + return self.error def recv(self, n): """Tries to receive up to 'n' bytes from this stream. Returns a @@ -209,9 +239,9 @@ class Stream(object): if self.state == Stream.__S_CONNECTING: wait = Stream.W_CONNECT if wait == Stream.W_RECV: - poller.fd_wait(self.socket, select.POLLIN) + poller.fd_wait(self.socket, ovs.poller.POLLIN) else: - poller.fd_wait(self.socket, select.POLLOUT) + poller.fd_wait(self.socket, ovs.poller.POLLOUT) def connect_wait(self, poller): self.wait(poller, Stream.W_CONNECT) @@ -254,6 +284,8 @@ class PassiveStream(object): return errno.EAFNOSUPPORT, None bind_path = name[6:] + if name.startswith("punix:"): + bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, True, bind_path, None) if error: @@ -288,7 +320,7 @@ class PassiveStream(object): try: sock, addr = self.socket.accept() ovs.socket_util.set_nonblocking(sock) - return 0, Stream(sock, "unix:%s" % addr, None, 0) + return 0, Stream(sock, "unix:%s" % addr, 0) except socket.error, e: error = ovs.socket_util.get_exception_errno(e) if error != errno.EAGAIN: @@ -297,7 +329,7 @@ class PassiveStream(object): return error, None def wait(self, poller): - poller.fd_wait(self.socket, select.POLLIN) + poller.fd_wait(self.socket, ovs.poller.POLLIN) def __del__(self): # Don't delete the file: we might have forked. @@ -308,6 +340,27 @@ def usage(name): return """ Active %s connection methods: unix:FILE Unix domain socket named FILE + tcp:IP:PORT TCP socket to IP with port no of PORT Passive %s connection methods: punix:FILE Listen on Unix domain socket FILE""" % (name, name) + + +class UnixStream(Stream): + @staticmethod + def _open(suffix, dscp): + connect_path = suffix + return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, + True, None, connect_path) +Stream.register_method("unix", UnixStream) + + +class TCPStream(Stream): + @staticmethod + def _open(suffix, dscp): + error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM, + suffix, 0, dscp) + if not error: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return error, sock +Stream.register_method("tcp", TCPStream)