Merge branch 'master' of ssh://git.planet-lab.org/git/sfa
[sfa.git] / sfa / util / server.py
index 757d945..2d8e13b 100644 (file)
@@ -13,21 +13,18 @@ import sys
 import traceback
 import threading
 import socket, os
-
 import SocketServer
 import BaseHTTPServer
 import SimpleHTTPServer
 import SimpleXMLRPCServer
-
 from OpenSSL import SSL
+from Queue import Queue
 
 from sfa.trust.certificate import Keypair, Certificate
 from sfa.trust.credential import *
-
 from sfa.util.faults import *
-from sfa.plc.api import SfaAPI 
-from sfa.util.debug import log
-
+from sfa.plc.api import SfaAPI
+from sfa.util.cache import Cache 
 ##
 # Verification callback for pyOpenSSL. We do our own checking of keys because
 # we have our own authentication spec. Thus we disable several of the normal
@@ -39,16 +36,6 @@ def verify_callback(conn, x509, err, depth, preverify):
        #print "  preverified"
        return 1
 
-    # we're only passing single certificates, not chains
-    if depth > 0:
-       #print "  depth > 0 in verify_callback"
-       return 0
-
-    # create a Certificate object and load it from the client's x509
-    ctx = conn.get_context()
-    server = ctx.get_app_data()
-    server.peer_cert = Certificate()
-    server.peer_cert.load_from_pyopenssl_x509(x509)
 
     # the certificate verification done by openssl checks a number of things
     # that we aren't interested in, so we look out for those error messages
@@ -70,6 +57,10 @@ def verify_callback(conn, x509, err, depth, preverify):
        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
        return 1
 
+    # allow chained certs with self-signed roots
+    if err == 19:
+        return 1
+    
     # allow certs that are untrusted
     if err == 21:
        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
@@ -85,8 +76,57 @@ def verify_callback(conn, x509, err, depth, preverify):
     return 0
 
 ##
-# Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
+# taken from the web (XXX find reference). Implents HTTPS xmlrpc request handler
+class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
+    """Secure XML-RPC request handler class.
 
+    It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
+    """
+    def setup(self):
+        self.connection = self.request
+        self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
+        self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
+
+    def do_POST(self):
+        """Handles the HTTPS POST request.
+
+        It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
+        the socket cleanly.
+        """
+        try:
+            peer_cert = Certificate()
+            peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
+            self.api = SfaAPI(peer_cert = peer_cert, 
+                              interface = self.server.interface, 
+                              key_file = self.server.key_file, 
+                              cert_file = self.server.cert_file,
+                              cache = self.cache)
+            # get arguments
+            request = self.rfile.read(int(self.headers["content-length"]))
+            remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
+            self.api.remote_addr = remote_addr            
+            response = self.api.handle(remote_addr, request, self.server.method_map)
+        except Exception, fault:
+            # This should only happen if the module is buggy
+            # internal error, report as HTTP server error
+            sfa_error.log_exc("server.do_POST")
+            response = self.api.prepare_response(fault)
+            #self.send_response(500)
+            #self.end_headers()
+       
+        # got a valid response
+        self.send_response(200)
+        self.send_header("Content-type", "text/xml")
+        self.send_header("Content-length", str(len(response)))
+        self.end_headers()
+        self.wfile.write(response)
+
+        # shut down the connection
+        self.wfile.flush()
+        self.connection.shutdown() # Modified here!
+
+##
+# Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
         """Secure XML-RPC server.
@@ -97,16 +137,22 @@ class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLR
         self.interface = None
         self.key_file = key_file
         self.cert_file = cert_file
-       #for compatibility with python 2.4 (centos53)
-       if sys.version_info < (2, 5):
-          SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
-       else:
+        self.method_map = {}
+        # add cache to the request handler
+        HandlerClass.cache = Cache()
+        #for compatibility with python 2.4 (centos53)
+        if sys.version_info < (2, 5):
+            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
+        else:
            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
         ctx = SSL.Context(SSL.SSLv23_METHOD)
-        ctx.use_privatekey_file(key_file)
+        ctx.use_privatekey_file(key_file)        
         ctx.use_certificate_file(cert_file)
+        # If you wanted to verify certs against known CAs.. this is how you would do it
+        #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
+        ctx.set_verify_depth(5)
         ctx.set_app_data(self)
         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
                                                         self.socket_type))
@@ -127,60 +173,60 @@ class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLR
             type, value, tb = sys.exc_info()
             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
 
-##
-# taken from the web (XXX find reference). Implents HTTPS xmlrpc request handler
-
-class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
-    """Secure XML-RPC request handler class.
-
-    It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
+## From Active State code: http://code.activestate.com/recipes/574454/
+# This is intended as a drop-in replacement for the ThreadingMixIn class in 
+# module SocketServer of the standard lib. Instead of spawning a new thread 
+# for each request, requests are processed by of pool of reusable threads.
+class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
     """
-    def setup(self):
-        self.connection = self.request
-        self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
-        self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
+    use a thread pool instead of a new thread on every request
+    """
+    # XX TODO: Make this configurable
+    # config = Config()
+    # numThreads = config.SFA_SERVER_NUM_THREADS
+    numThreads = 25
+    allow_reuse_address = True  # seems to fix socket.error on server restart
 
-    def do_POST(self):
-        """Handles the HTTPS POST request.
+    def serve_forever(self):
+        """
+        Handle one request at a time until doomsday.
+        """
+        # set up the threadpool
+        self.requests = Queue()
+
+        for x in range(self.numThreads):
+            t = threading.Thread(target = self.process_request_thread)
+            t.setDaemon(1)
+            t.start()
+
+        # server main loop
+        while True:
+            self.handle_request()
+            
+        self.server_close()
+
+    
+    def process_request_thread(self):
+        """
+        obtain request from queue instead of directly from server socket
+        """
+        while True:
+            SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
 
-        It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly.
+    
+    def handle_request(self):
+        """
+        simply collect requests and put them on the queue for the workers.
         """
         try:
-            self.api = SfaAPI(peer_cert = self.server.peer_cert, 
-                              interface = self.server.interface, 
-                              key_file = self.server.key_file, 
-                              cert_file = self.server.cert_file)
-            # get arguments
-            request = self.rfile.read(int(self.headers["content-length"]))
-            # In previous versions of SimpleXMLRPCServer, _dispatch
-            # could be overridden in this class, instead of in
-            # SimpleXMLRPCDispatcher. To maintain backwards compatibility,
-            # check to see if a subclass implements _dispatch and dispatch
-            # using that method if present.
-            #response = self.server._marshaled_dispatch(request, getattr(self, '_dispatch', None))
-            remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
-            self.api.remote_addr = remote_addr
-            response = self.api.handle(remote_addr, request)
-
-        
-        except Exception, fault:
-            raise
-            # This should only happen if the module is buggy
-            # internal error, report as HTTP server error
-            self.send_response(500)
-            self.end_headers()
-        else:
-            # got a valid XML RPC response
-            self.send_response(200)
-            self.send_header("Content-type", "text/xml")
-            self.send_header("Content-length", str(len(response)))
-            self.end_headers()
-            self.wfile.write(response)
-
-            # shut down the connection
-            self.wfile.flush()
-            self.connection.shutdown() # Modified here!
-
+            request, client_address = self.get_request()
+        except socket.error:
+            return
+        if self.verify_request(request, client_address):
+            self.requests.put((request, client_address))
+
+class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
+    pass
 ##
 # Implements an HTTPS XML-RPC server. Generally it is expected that SFA
 # functions will take a credential string, which is passed to
@@ -203,14 +249,15 @@ class SfaServer(threading.Thread):
         threading.Thread.__init__(self)
         self.key = Keypair(filename = key_file)
         self.cert = Certificate(filename = cert_file)
-        self.server = SecureXMLRPCServer((ip, port), SecureXMLRpcRequestHandler, key_file, cert_file)
+        #self.server = SecureXMLRPCServer((ip, port), SecureXMLRpcRequestHandler, key_file, cert_file)
+        self.server = ThreadedServer((ip, port), SecureXMLRpcRequestHandler, key_file, cert_file)
         self.trusted_cert_list = None
         self.register_functions()
 
 
     ##
     # Register functions that will be served by the XMLRPC server. This
-    # function should be overrided by each descendant class.
+    # function should be overridden by each descendant class.
 
     def register_functions(self):
         self.server.register_function(self.noop)
@@ -221,7 +268,6 @@ class SfaServer(threading.Thread):
 
     def noop(self, cred, anything):
         self.decode_authentication(cred)
-
         return anything
 
     ##