X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=python%2Fovs%2Fstream.py;h=fb083eeeb4e9025edda10a86bee316a598939965;hb=003ce655b7116d18c86a74c50391e54990346931;hp=9c10612d920b32c74482d8daeb6eae4922edbffa;hpb=293d49bdd6e8cedc3fade25af3eccb122c4cd3e1;p=sliver-openvswitch.git diff --git a/python/ovs/stream.py b/python/ovs/stream.py index 9c10612d9..fb083eeeb 100644 --- a/python/ovs/stream.py +++ b/python/ovs/stream.py @@ -14,7 +14,6 @@ import errno import os -import select import socket import ovs.poller @@ -51,12 +50,25 @@ class Stream(object): W_RECV = 1 # Data received. W_SEND = 2 # Send buffer room available. + _SOCKET_METHODS = {} + + @staticmethod + def register_method(method, cls): + Stream._SOCKET_METHODS[method + ":"] = cls + + @staticmethod + def _find_method(name): + for method, cls in Stream._SOCKET_METHODS.items(): + if name.startswith(method): + return cls + return None + @staticmethod def is_valid_name(name): """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and - TYPE is a supported stream type (currently only "unix:"), otherwise - False.""" - return name.startswith("unix:") + TYPE is a supported stream type (currently only "unix:" and "tcp:"), + otherwise False.""" + return bool(Stream._find_method(name)) def __init__(self, socket, name, status): self.socket = socket @@ -70,12 +82,18 @@ class Stream(object): self.error = 0 + # Default value of dscp bits for connection between controller and manager. + # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined + # in is used. + IPTOS_PREC_INTERNETCONTROL = 0xc0 + DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 + @staticmethod - def open(name): + def open(name, dscp=DSCP_DEFAULT): """Attempts to connect a stream to a remote peer. 'name' is a connection name in the form "TYPE:ARGS", where TYPE is an active stream class's name and ARGS are stream class-specific. Currently the only - supported TYPE is "unix". + supported TYPEs are "unix" and "tcp". Returns (error, stream): on success 'error' is 0 and 'stream' is the new Stream, on failure 'error' is a positive errno value and 'stream' @@ -84,19 +102,24 @@ class Stream(object): Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0 and a new Stream. The connect() method can be used to check for successful connection completion.""" - if not Stream.is_valid_name(name): + cls = Stream._find_method(name) + if not cls: return errno.EAFNOSUPPORT, None - connect_path = name[5:] - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, None, - connect_path) + suffix = name.split(":", 1)[1] + if name.startswith("unix:"): + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + error, sock = cls._open(suffix, dscp) if error: return error, None else: status = ovs.socket_util.check_connection_completion(sock) return 0, Stream(sock, name, status) + @staticmethod + def _open(suffix, dscp): + raise NotImplementedError("This method must be overrided by subclass") + @staticmethod def open_block((error, stream)): """Blocks until a Stream completes its connection attempt, either @@ -140,15 +163,17 @@ class Stream(object): is complete, returns 0 if the connection was successful or a positive errno value if it failed. If the connection is still in progress, returns errno.EAGAIN.""" - last_state = -1 # Always differs from initial self.state - while self.state != last_state: - last_state = self.state - if self.state == Stream.__S_CONNECTING: - self.__scs_connecting() - elif self.state == Stream.__S_CONNECTED: - return 0 - elif self.state == Stream.__S_DISCONNECTED: - return self.error + + if self.state == Stream.__S_CONNECTING: + self.__scs_connecting() + + if self.state == Stream.__S_CONNECTING: + return errno.EAGAIN + elif self.state == Stream.__S_CONNECTED: + return 0 + else: + assert self.state == Stream.__S_DISCONNECTED + return self.error def recv(self, n): """Tries to receive up to 'n' bytes from this stream. Returns a @@ -214,9 +239,9 @@ class Stream(object): if self.state == Stream.__S_CONNECTING: wait = Stream.W_CONNECT if wait == Stream.W_RECV: - poller.fd_wait(self.socket, select.POLLIN) + poller.fd_wait(self.socket, ovs.poller.POLLIN) else: - poller.fd_wait(self.socket, select.POLLOUT) + poller.fd_wait(self.socket, ovs.poller.POLLOUT) def connect_wait(self, poller): self.wait(poller, Stream.W_CONNECT) @@ -259,6 +284,8 @@ class PassiveStream(object): return errno.EAFNOSUPPORT, None bind_path = name[6:] + if name.startswith("punix:"): + bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, True, bind_path, None) if error: @@ -302,7 +329,7 @@ class PassiveStream(object): return error, None def wait(self, poller): - poller.fd_wait(self.socket, select.POLLIN) + poller.fd_wait(self.socket, ovs.poller.POLLIN) def __del__(self): # Don't delete the file: we might have forked. @@ -313,6 +340,27 @@ def usage(name): return """ Active %s connection methods: unix:FILE Unix domain socket named FILE + tcp:IP:PORT TCP socket to IP with port no of PORT Passive %s connection methods: punix:FILE Listen on Unix domain socket FILE""" % (name, name) + + +class UnixStream(Stream): + @staticmethod + def _open(suffix, dscp): + connect_path = suffix + return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, + True, None, connect_path) +Stream.register_method("unix", UnixStream) + + +class TCPStream(Stream): + @staticmethod + def _open(suffix, dscp): + error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM, + suffix, 0, dscp) + if not error: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return error, sock +Stream.register_method("tcp", TCPStream)