-# Copyright (c) 2010 Nicira Networks
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# limitations under the License.
import errno
-import logging
import os
import select
import socket
-import sys
import ovs.poller
import ovs.socket_util
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("stream")
+
+
+def stream_or_pstream_needs_probes(name):
+ """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+ verify connectivity. For [p]streams which need probes, it can take a long
+ time to notice the connection was dropped. Returns 0 if probes aren't
+ needed, and -1 if 'name' is invalid"""
+
+ if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+ # Only unix and punix are supported currently.
+ return 0
+ else:
+ return -1
+
class Stream(object):
"""Bidirectional byte stream. Currently only Unix domain sockets
are implemented."""
- n_unix_sockets = 0
# States.
__S_CONNECTING = 0
False."""
return name.startswith("unix:")
- def __init__(self, socket, name, bind_path, status):
+ def __init__(self, socket, name, status):
self.socket = socket
self.name = name
- self.bind_path = bind_path
if status == errno.EAGAIN:
self.state = Stream.__S_CONNECTING
elif status == 0:
if not Stream.is_valid_name(name):
return errno.EAFNOSUPPORT, None
- Stream.n_unix_sockets += 1
- bind_path = "/tmp/stream-unix.%ld.%d" % (os.getpid(),
- Stream.n_unix_sockets)
connect_path = name[5:]
error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
- True, bind_path,
+ True, None,
connect_path)
if error:
return error, None
else:
status = ovs.socket_util.check_connection_completion(sock)
- return 0, Stream(sock, name, bind_path, status)
+ return 0, Stream(sock, name, status)
@staticmethod
- def open_block(tuple):
+ def open_block((error, stream)):
"""Blocks until a Stream completes its connection attempt, either
- succeeding or failing. 'tuple' should be the tuple returned by
+ succeeding or failing. (error, stream) should be the tuple returned by
Stream.open(). Returns a tuple of the same form.
Typical usage:
- error, stream = Stream.open_block(Stream.open("tcp:1.2.3.4:5"))"""
+ error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
- error, stream = tuple
if not error:
while True:
error = stream.connect()
break
stream.run()
poller = ovs.poller.Poller()
- stream.run_wait()
+ stream.run_wait(poller)
stream.connect_wait(poller)
poller.block()
assert error != errno.EINPROGRESS
-
+
if error and stream:
stream.close()
stream = None
def close(self):
self.socket.close()
- if self.bind_path is not None:
- ovs.fatal_signal.unlink_file_now(self.bind_path)
- self.bind_path = None
def __scs_connecting(self):
retval = ovs.socket_util.check_connection_completion(self.socket)
returns errno.EAGAIN."""
last_state = -1 # Always differs from initial self.state
while self.state != last_state:
+ last_state = self.state
if self.state == Stream.__S_CONNECTING:
self.__scs_connecting()
elif self.state == Stream.__S_CONNECTED:
def recv(self, n):
"""Tries to receive up to 'n' bytes from this stream. Returns a
(error, string) tuple:
-
+
- If successful, 'error' is zero and 'string' contains between 1
and 'n' bytes of data.
- If the connection has been closed in the normal fashion or if 'n'
is 0, the tuple is (0, "").
-
+
The recv function will not block waiting for data to arrive. If no
data have been received, it returns (errno.EAGAIN, "") immediately."""
if self.state == Stream.__S_CONNECTING:
wait = Stream.W_CONNECT
- if wait in (Stream.W_CONNECT, Stream.W_SEND):
- poller.fd_wait(self.socket, select.POLLOUT)
- else:
+ if wait == Stream.W_RECV:
poller.fd_wait(self.socket, select.POLLIN)
+ else:
+ poller.fd_wait(self.socket, select.POLLOUT)
def connect_wait(self, poller):
self.wait(poller, Stream.W_CONNECT)
-
+
def recv_wait(self, poller):
self.wait(poller, Stream.W_RECV)
-
+
def send_wait(self, poller):
self.wait(poller, Stream.W_SEND)
-
- def get_name(self):
- return self.name
-
+
def __del__(self):
# Don't delete the file: we might have forked.
self.socket.close()
+
class PassiveStream(object):
@staticmethod
def is_valid_name(name):
try:
sock.listen(10)
except socket.error, e:
- logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
+ vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
sock.close()
return e.error, None
try:
sock, addr = self.socket.accept()
ovs.socket_util.set_nonblocking(sock)
- return 0, Stream(sock, "unix:%s" % addr, None, 0)
+ return 0, Stream(sock, "unix:%s" % addr, 0)
except socket.error, e:
error = ovs.socket_util.get_exception_errno(e)
if error != errno.EAGAIN:
# XXX rate-limit
- logging.debug("accept: %s" % os.strerror(error))
+ vlog.dbg("accept: %s" % os.strerror(error))
return error, None
def wait(self, poller):
# Don't delete the file: we might have forked.
self.socket.close()
-def usage(name, active, passive, bootstrap):
- print
- if active:
- print("Active %s connection methods:" % name)
- print(" unix:FILE "
- "Unix domain socket named FILE");
-
- if passive:
- print("Passive %s connection methods:" % name)
- print(" punix:FILE "
- "listen on Unix domain socket FILE")
+
+def usage(name):
+ return """
+Active %s connection methods:
+ unix:FILE Unix domain socket named FILE
+
+Passive %s connection methods:
+ punix:FILE Listen on Unix domain socket FILE""" % (name, name)