7dfac7d78d98bcba84dfe6f08d644305b6a8a505
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from Queue import Queue
14 import xmlrpclib
15 import SocketServer
16 import BaseHTTPServer
17 import SimpleXMLRPCServer
18 from OpenSSL import SSL
19
20 from sfa.util.sfalogging import logger
21 from sfa.util.config import Config
22 from sfa.util.cache import Cache 
23 from sfa.trust.certificate import Certificate
24 from sfa.trust.trustedroots import TrustedRoots
25
26 # don't hard code an api class anymore here
27 from sfa.generic import Generic
28
29 ##
30 # Verification callback for pyOpenSSL. We do our own checking of keys because
31 # we have our own authentication spec. Thus we disable several of the normal
32 # prohibitions that OpenSSL places on certificates
33
34 def verify_callback(conn, x509, err, depth, preverify):
35     # if the cert has been preverified, then it is ok
36     if preverify:
37        #print "  preverified"
38        return 1
39
40
41     # the certificate verification done by openssl checks a number of things
42     # that we aren't interested in, so we look out for those error messages
43     # and ignore them
44
45     # XXX SMBAKER: I don't know what this error is, but it's being returned
46     # xxx thierry: this most likely means the cert has a validity range in the future
47     # by newer pl nodes.
48     if err == 9:
49        #print "  X509_V_ERR_CERT_NOT_YET_VALID"
50        return 1
51
52     # allow self-signed certificates
53     if err == 18:
54        #print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
55        return 1
56
57     # allow certs that don't have an issuer
58     if err == 20:
59        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
60        return 1
61
62     # allow chained certs with self-signed roots
63     if err == 19:
64         return 1
65     
66     # allow certs that are untrusted
67     if err == 21:
68        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
69        return 1
70
71     # allow certs that are untrusted
72     if err == 27:
73        #print "  X509_V_ERR_CERT_UNTRUSTED"
74        return 1
75
76     # ignore X509_V_ERR_CERT_SIGNATURE_FAILURE
77     if err == 7:
78        return 1         
79
80     logger.debug("  error %s in verify_callback"%err)
81
82     return 0
83
84 ##
85 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request handler
86 class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
87     """Secure XML-RPC request handler class.
88
89     It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
90     """
91     def setup(self):
92         self.connection = self.request
93         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
94         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
95
96     def do_POST(self):
97         """Handles the HTTPS POST request.
98
99         It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
100         the socket cleanly.
101         """
102         try:
103             peer_cert = Certificate()
104             peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
105             generic=Generic.the_flavour()
106             self.api = generic.make_api (peer_cert = peer_cert, 
107                                          interface = self.server.interface, 
108                                          key_file = self.server.key_file, 
109                                          cert_file = self.server.cert_file,
110                                          cache = self.cache)
111             #logger.info("SecureXMLRpcRequestHandler.do_POST:")
112             #logger.info("interface=%s"%self.server.interface)
113             #logger.info("key_file=%s"%self.server.key_file)
114             #logger.info("api=%s"%self.api)
115             #logger.info("server=%s"%self.server)
116             #logger.info("handler=%s"%self)
117             # get arguments
118             request = self.rfile.read(int(self.headers["content-length"]))
119             remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
120             self.api.remote_addr = remote_addr            
121             response = self.api.handle(remote_addr, request, self.server.method_map)
122         except Exception, fault:
123             # This should only happen if the module is buggy
124             # internal error, report as HTTP server error
125             logger.log_exc("server.do_POST")
126             response = self.api.prepare_response(fault)
127             #self.send_response(500)
128             #self.end_headers()
129        
130         # avoid session/connection leaks : do this no matter what 
131         finally:
132             self.send_response(200)
133             self.send_header("Content-type", "text/xml")
134             self.send_header("Content-length", str(len(response)))
135             self.end_headers()
136             self.wfile.write(response)
137             self.wfile.flush()
138             # close db connection
139             self.api.close_dbsession()
140             # shut down the connection
141             self.connection.shutdown() # Modified here!
142
143 ##
144 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
145 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
146
147     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
148         """
149         Secure XML-RPC server.
150
151         It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
152         """
153         logger.debug("SecureXMLRPCServer.__init__, server_address=%s, " 
154                      "cert_file=%s, key_file=%s"%(server_address,cert_file,key_file))
155         self.logRequests = logRequests
156         self.interface = None
157         self.key_file = key_file
158         self.cert_file = cert_file
159         self.method_map = {}
160         # add cache to the request handler
161         HandlerClass.cache = Cache()
162         #for compatibility with python 2.4 (centos53)
163         if sys.version_info < (2, 5):
164             SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
165         else:
166            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
167         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
168         ctx = SSL.Context(SSL.SSLv23_METHOD)
169         ctx.use_privatekey_file(key_file)        
170         ctx.use_certificate_file(cert_file)
171         # If you wanted to verify certs against known CAs.. this is how you would do it
172         #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
173         config = Config()
174         trusted_cert_files = TrustedRoots(config.get_trustedroots_dir()).get_file_list()
175         for cert_file in trusted_cert_files:
176             ctx.load_verify_locations(cert_file)
177         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
178         ctx.set_verify_depth(5)
179         ctx.set_app_data(self)
180         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
181                                                         self.socket_type))
182         self.server_bind()
183         self.server_activate()
184
185     # _dispatch
186     #
187     # Convert an exception on the server to a full stack trace and send it to
188     # the client.
189
190     def _dispatch(self, method, params):
191         logger.debug("SecureXMLRPCServer._dispatch, method=%s"%method)
192         try:
193             return SimpleXMLRPCServer.SimpleXMLRPCDispatcher._dispatch(self, method, params)
194         except:
195             # can't use format_exc() as it is not available in jython yet
196             # (even in trunk).
197             type, value, tb = sys.exc_info()
198             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
199
200     # override this one from the python 2.7 code
201     # originally defined in class TCPServer
202     def shutdown_request(self, request):
203         """Called to shutdown and close an individual request."""
204         # ---------- 
205         # the std python 2.7 code just attempts a request.shutdown(socket.SHUT_WR)
206         # this works fine with regular sockets
207         # However we are dealing with an instance of OpenSSL.SSL.Connection instead
208         # This one only supports shutdown(), and in addition this does not
209         # always perform as expected
210         # ---------- std python 2.7 code
211         try:
212             #explicitly shutdown.  socket.close() merely releases
213             #the socket and waits for GC to perform the actual close.
214             request.shutdown(socket.SHUT_WR)
215         except socket.error:
216             pass #some platforms may raise ENOTCONN here
217         # ----------
218         except TypeError:
219             # we are dealing with an OpenSSL.Connection object, 
220             # try to shut it down but never mind if that fails
221             try: request.shutdown()
222             except: pass
223         # ----------
224         self.close_request(request)
225
226 ## From Active State code: http://code.activestate.com/recipes/574454/
227 # This is intended as a drop-in replacement for the ThreadingMixIn class in 
228 # module SocketServer of the standard lib. Instead of spawning a new thread 
229 # for each request, requests are processed by of pool of reusable threads.
230 class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
231     """
232     use a thread pool instead of a new thread on every request
233     """
234     # XX TODO: Make this configurable
235     # config = Config()
236     # numThreads = config.SFA_SERVER_NUM_THREADS
237     numThreads = 25
238     allow_reuse_address = True  # seems to fix socket.error on server restart
239
240     def serve_forever(self):
241         """
242         Handle one request at a time until doomsday.
243         """
244         # set up the threadpool
245         self.requests = Queue()
246
247         for x in range(self.numThreads):
248             t = threading.Thread(target = self.process_request_thread)
249             t.setDaemon(1)
250             t.start()
251
252         # server main loop
253         while True:
254             self.handle_request()
255             
256         self.server_close()
257
258     
259     def process_request_thread(self):
260         """
261         obtain request from queue instead of directly from server socket
262         """
263         while True:
264             SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
265
266     
267     def handle_request(self):
268         """
269         simply collect requests and put them on the queue for the workers.
270         """
271         try:
272             request, client_address = self.get_request()
273         except socket.error:
274             return
275         if self.verify_request(request, client_address):
276             self.requests.put((request, client_address))
277
278 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
279     pass