-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import errno
import os
-import select
import socket
import ovs.poller
vlog = ovs.vlog.Vlog("stream")
+def stream_or_pstream_needs_probes(name):
+ """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+ verify connectivity. For [p]streams which need probes, it can take a long
+ time to notice the connection was dropped. Returns 0 if probes aren't
+ needed, and -1 if 'name' is invalid"""
+
+ if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+ # Only unix and punix are supported currently.
+ return 0
+ else:
+ return -1
+
+
class Stream(object):
"""Bidirectional byte stream. Currently only Unix domain sockets
are implemented."""
- n_unix_sockets = 0
# States.
__S_CONNECTING = 0
W_RECV = 1 # Data received.
W_SEND = 2 # Send buffer room available.
+ _SOCKET_METHODS = {}
+
+ @staticmethod
+ def register_method(method, cls):
+ Stream._SOCKET_METHODS[method + ":"] = cls
+
+ @staticmethod
+ def _find_method(name):
+ for method, cls in Stream._SOCKET_METHODS.items():
+ if name.startswith(method):
+ return cls
+ return None
+
@staticmethod
def is_valid_name(name):
"""Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
- TYPE is a supported stream type (currently only "unix:"), otherwise
- False."""
- return name.startswith("unix:")
+ TYPE is a supported stream type (currently only "unix:" and "tcp:"),
+ otherwise False."""
+ return bool(Stream._find_method(name))
- def __init__(self, socket, name, bind_path, status):
+ def __init__(self, socket, name, status):
self.socket = socket
self.name = name
- self.bind_path = bind_path
if status == errno.EAGAIN:
self.state = Stream.__S_CONNECTING
elif status == 0:
self.error = 0
+ # Default value of dscp bits for connection between controller and manager.
+ # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
+ # in <netinet/ip.h> is used.
+ IPTOS_PREC_INTERNETCONTROL = 0xc0
+ DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
+
@staticmethod
- def open(name):
+ def open(name, dscp=DSCP_DEFAULT):
"""Attempts to connect a stream to a remote peer. 'name' is a
connection name in the form "TYPE:ARGS", where TYPE is an active stream
class's name and ARGS are stream class-specific. Currently the only
- supported TYPE is "unix".
+ supported TYPEs are "unix" and "tcp".
Returns (error, stream): on success 'error' is 0 and 'stream' is the
new Stream, on failure 'error' is a positive errno value and 'stream'
Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0
and a new Stream. The connect() method can be used to check for
successful connection completion."""
- if not Stream.is_valid_name(name):
+ cls = Stream._find_method(name)
+ if not cls:
return errno.EAFNOSUPPORT, None
- Stream.n_unix_sockets += 1
- bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
- Stream.n_unix_sockets)
- connect_path = name[5:]
- error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
- True, bind_path,
- connect_path)
+ suffix = name.split(":", 1)[1]
+ if name.startswith("unix:"):
+ suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+ error, sock = cls._open(suffix, dscp)
if error:
return error, None
else:
status = ovs.socket_util.check_connection_completion(sock)
- return 0, Stream(sock, name, bind_path, status)
+ return 0, Stream(sock, name, status)
+
+ @staticmethod
+ def _open(suffix, dscp):
+ raise NotImplementedError("This method must be overrided by subclass")
@staticmethod
def open_block((error, stream)):
break
stream.run()
poller = ovs.poller.Poller()
- stream.run_wait()
+ stream.run_wait(poller)
stream.connect_wait(poller)
poller.block()
assert error != errno.EINPROGRESS
def close(self):
self.socket.close()
- if self.bind_path is not None:
- ovs.fatal_signal.unlink_file_now(self.bind_path)
- self.bind_path = None
def __scs_connecting(self):
retval = ovs.socket_util.check_connection_completion(self.socket)
is complete, returns 0 if the connection was successful or a positive
errno value if it failed. If the connection is still in progress,
returns errno.EAGAIN."""
- last_state = -1 # Always differs from initial self.state
- while self.state != last_state:
- last_state = self.state
- if self.state == Stream.__S_CONNECTING:
- self.__scs_connecting()
- elif self.state == Stream.__S_CONNECTED:
- return 0
- elif self.state == Stream.__S_DISCONNECTED:
- return self.error
+
+ if self.state == Stream.__S_CONNECTING:
+ self.__scs_connecting()
+
+ if self.state == Stream.__S_CONNECTING:
+ return errno.EAGAIN
+ elif self.state == Stream.__S_CONNECTED:
+ return 0
+ else:
+ assert self.state == Stream.__S_DISCONNECTED
+ return self.error
def recv(self, n):
"""Tries to receive up to 'n' bytes from this stream. Returns a
if self.state == Stream.__S_CONNECTING:
wait = Stream.W_CONNECT
if wait == Stream.W_RECV:
- poller.fd_wait(self.socket, select.POLLIN)
+ poller.fd_wait(self.socket, ovs.poller.POLLIN)
else:
- poller.fd_wait(self.socket, select.POLLOUT)
+ poller.fd_wait(self.socket, ovs.poller.POLLOUT)
def connect_wait(self, poller):
self.wait(poller, Stream.W_CONNECT)
return errno.EAFNOSUPPORT, None
bind_path = name[6:]
+ if name.startswith("punix:"):
+ bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
True, bind_path, None)
if error:
try:
sock, addr = self.socket.accept()
ovs.socket_util.set_nonblocking(sock)
- return 0, Stream(sock, "unix:%s" % addr, None, 0)
+ return 0, Stream(sock, "unix:%s" % addr, 0)
except socket.error, e:
error = ovs.socket_util.get_exception_errno(e)
if error != errno.EAGAIN:
return error, None
def wait(self, poller):
- poller.fd_wait(self.socket, select.POLLIN)
+ poller.fd_wait(self.socket, ovs.poller.POLLIN)
def __del__(self):
# Don't delete the file: we might have forked.
return """
Active %s connection methods:
unix:FILE Unix domain socket named FILE
+ tcp:IP:PORT TCP socket to IP with port no of PORT
Passive %s connection methods:
punix:FILE Listen on Unix domain socket FILE""" % (name, name)
+
+
+class UnixStream(Stream):
+ @staticmethod
+ def _open(suffix, dscp):
+ connect_path = suffix
+ return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+ True, None, connect_path)
+Stream.register_method("unix", UnixStream)
+
+
+class TCPStream(Stream):
+ @staticmethod
+ def _open(suffix, dscp):
+ error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
+ suffix, 0, dscp)
+ if not error:
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ return error, sock
+Stream.register_method("tcp", TCPStream)