pull in additional changes from 2.0 branch.
[monitor.git] / monitor / Rpyc / Servers / ServerUtils.py
diff --git a/monitor/Rpyc/Servers/ServerUtils.py b/monitor/Rpyc/Servers/ServerUtils.py
new file mode 100644 (file)
index 0000000..e9b361d
--- /dev/null
@@ -0,0 +1,90 @@
+import os\r
+import socket\r
+import sys\r
+import gc\r
+from threading import Thread\r
+from Rpyc.Connection import Connection\r
+from Rpyc.Stream import SocketStream, PipeStream\r
+from Rpyc.Channel import Channel\r
+from Rpyc.Lib import DEFAULT_PORT\r
+\r
+\r
+class Logger(object):\r
+    def __init__(self, logfile = None, active = True):\r
+        self.logfile = logfile\r
+        self.active = active\r
+    def __call__(self, *args):\r
+        if not self.logfile:\r
+            return\r
+        if not self.active:\r
+            return\r
+        text = " ".join([str(a) for a in args])\r
+        self.logfile.write("[%d] %s\n" % (os.getpid(), text))\r
+        self.logfile.flush()\r
+        \r
+log = Logger(sys.stdout)\r
+\r
+def _serve(chan):\r
+    conn = Connection(chan)\r
+    try:\r
+        try:\r
+            while True:\r
+                conn.serve()\r
+        except EOFError:\r
+            pass\r
+    finally:\r
+        conn.close()\r
+        gc.collect()\r
+\r
+def serve_stream(stream, authenticate = False, users = None):\r
+    chan = Channel(stream)\r
+    \r
+    if authenticate:\r
+        from Rpyc.Authentication import accept\r
+        log("requiring authentication")\r
+        if accept(chan, users):\r
+            log("authenication successful")\r
+        else:\r
+            log("authentication failed")\r
+            return\r
+    \r
+    _serve(chan)\r
+\r
+def create_listener_socket(port):\r
+    sock = socket.socket()\r
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\r
+    #sock.bind(("", port))\r
+    sock.bind(("localhost", port))\r
+    sock.listen(4)\r
+    log("listening on", sock.getsockname())\r
+    return sock\r
+\r
+def serve_socket(sock, **kw):\r
+    sockname = sock.getpeername()\r
+    log("welcome", sockname)\r
+    try:\r
+        try:\r
+            serve_stream(SocketStream(sock), **kw)\r
+        except socket.error:\r
+            pass\r
+    finally:\r
+        log("goodbye", sockname)\r
+\r
+def serve_pipes(incoming, outgoing, **kw):\r
+    serve_stream(PipeStream(incoming, outgoing), **kw)\r
+\r
+def threaded_server(port = DEFAULT_PORT, **kwargs):\r
+    sock = create_listener_socket(port)\r
+    while True:\r
+        newsock, name = sock.accept()\r
+        t = Thread(target = serve_socket, args = (newsock,), kwargs = kwargs)\r
+        t.setDaemon(True)\r
+        t.start()\r
+\r
+def start_threaded_server(*args, **kwargs):\r
+    """starts the threaded_server on a separate thread. this turns the \r
+    threaded_server into a mix-in you can place anywhere in your code"""\r
+    t = Thread(target = threaded_server, args = args, kwargs = kwargs)\r
+    t.setDaemon(True)\r
+    t.start()\r
+\r