Global replace of Nicira Networks.
[sliver-openvswitch.git] / python / ovs / stream.py
index 82d4557..82ea0c1 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # limitations under the License.
 
 import errno
 # limitations under the License.
 
 import errno
-import logging
 import os
 import select
 import socket
 import os
 import select
 import socket
-import sys
 
 import ovs.poller
 import ovs.socket_util
 
 import ovs.poller
 import ovs.socket_util
+import ovs.vlog
+
+vlog = ovs.vlog.Vlog("stream")
+
+
+def stream_or_pstream_needs_probes(name):
+    """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+    verify connectivty.  For [p]streams which need probes, it can take a long
+    time to notice the connection was dropped.  Returns 0 if probes aren't
+    needed, and -1 if 'name' is invalid"""
+
+    if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+        # Only unix and punix are supported currently.
+        return 0
+    else:
+        return -1
+
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
 
 class Stream(object):
     """Bidirectional byte stream.  Currently only Unix domain sockets
     are implemented."""
-    n_unix_sockets = 0
 
     # States.
     __S_CONNECTING = 0
 
     # States.
     __S_CONNECTING = 0
@@ -44,10 +58,9 @@ class Stream(object):
         False."""
         return name.startswith("unix:")
 
         False."""
         return name.startswith("unix:")
 
-    def __init__(self, socket, name, bind_path, status):
+    def __init__(self, socket, name, status):
         self.socket = socket
         self.name = name
         self.socket = socket
         self.name = name
-        self.bind_path = bind_path
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
         elif status == 0:
@@ -74,18 +87,15 @@ class Stream(object):
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
         if not Stream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
 
-        Stream.n_unix_sockets += 1
-        bind_path = "/tmp/stream-unix.%d.%d" % (os.getpid(),
-                                                Stream.n_unix_sockets)
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
         connect_path = name[5:]
         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, bind_path,
+                                                       True, None,
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
                                                        connect_path)
         if error:
             return error, None
         else:
             status = ovs.socket_util.check_connection_completion(sock)
-            return 0, Stream(sock, name, bind_path, status)
+            return 0, Stream(sock, name, status)
 
     @staticmethod
     def open_block((error, stream)):
 
     @staticmethod
     def open_block((error, stream)):
@@ -107,7 +117,7 @@ class Stream(object):
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
                 stream.connect_wait(poller)
                 poller.block()
             assert error != errno.EINPROGRESS
-        
+
         if error and stream:
             stream.close()
             stream = None
         if error and stream:
             stream.close()
             stream = None
@@ -115,9 +125,6 @@ class Stream(object):
 
     def close(self):
         self.socket.close()
 
     def close(self):
         self.socket.close()
-        if self.bind_path is not None:
-            ovs.fatal_signal.unlink_file_now(self.bind_path)
-            self.bind_path = None
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
 
     def __scs_connecting(self):
         retval = ovs.socket_util.check_connection_completion(self.socket)
@@ -146,7 +153,7 @@ class Stream(object):
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
     def recv(self, n):
         """Tries to receive up to 'n' bytes from this stream.  Returns a
         (error, string) tuple:
-        
+
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
             - If successful, 'error' is zero and 'string' contains between 1
               and 'n' bytes of data.
 
@@ -154,7 +161,7 @@ class Stream(object):
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
 
             - If the connection has been closed in the normal fashion or if 'n'
               is 0, the tuple is (0, "").
-        
+
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
         The recv function will not block waiting for data to arrive.  If no
         data have been received, it returns (errno.EAGAIN, "") immediately."""
 
@@ -213,17 +220,18 @@ class Stream(object):
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
 
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
-        
+
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
     def recv_wait(self, poller):
         self.wait(poller, Stream.W_RECV)
-        
+
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
     def send_wait(self, poller):
         self.wait(poller, Stream.W_SEND)
-        
+
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
     def __del__(self):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
+
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
 class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
@@ -259,7 +267,7 @@ class PassiveStream(object):
         try:
             sock.listen(10)
         except socket.error, e:
         try:
             sock.listen(10)
         except socket.error, e:
-            logging.error("%s: listen: %s" % (name, os.strerror(e.error)))
+            vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
             sock.close()
             return e.error, None
 
             sock.close()
             return e.error, None
 
@@ -285,12 +293,12 @@ class PassiveStream(object):
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                return 0, Stream(sock, "unix:%s" % addr, None, 0)
+                return 0, Stream(sock, "unix:%s" % addr, 0)
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
                     # XXX rate-limit
-                    logging.debug("accept: %s" % os.strerror(error))
+                    vlog.dbg("accept: %s" % os.strerror(error))
                 return error, None
 
     def wait(self, poller):
                 return error, None
 
     def wait(self, poller):
@@ -300,14 +308,11 @@ class PassiveStream(object):
         # Don't delete the file: we might have forked.
         self.socket.close()
 
         # Don't delete the file: we might have forked.
         self.socket.close()
 
-def usage(name, active, passive, bootstrap):
-    print
-    if active:
-        print("Active %s connection methods:" % name)
-        print("  unix:FILE               "
-               "Unix domain socket named FILE");
-
-    if passive:
-        print("Passive %s connection methods:" % name)
-        print("  punix:FILE              "
-              "listen on Unix domain socket FILE")
+
+def usage(name):
+    return """
+Active %s connection methods:
+  unix:FILE               Unix domain socket named FILE
+
+Passive %s connection methods:
+  punix:FILE              Listen on Unix domain socket FILE""" % (name, name)