first stab at a design where each incoming API call has its own dbsession
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from Queue import Queue
14 import xmlrpclib
15 import SocketServer
16 import BaseHTTPServer
17 import SimpleXMLRPCServer
18 from OpenSSL import SSL
19
20 from sfa.util.sfalogging import logger
21 from sfa.util.config import Config
22 from sfa.util.cache import Cache 
23 from sfa.trust.certificate import Certificate
24 from sfa.trust.trustedroots import TrustedRoots
25
26 # don't hard code an api class anymore here
27 from sfa.generic import Generic
28
29 ##
30 # Verification callback for pyOpenSSL. We do our own checking of keys because
31 # we have our own authentication spec. Thus we disable several of the normal
32 # prohibitions that OpenSSL places on certificates
33
34 def verify_callback(conn, x509, err, depth, preverify):
35     # if the cert has been preverified, then it is ok
36     if preverify:
37        #print "  preverified"
38        return 1
39
40
41     # the certificate verification done by openssl checks a number of things
42     # that we aren't interested in, so we look out for those error messages
43     # and ignore them
44
45     # XXX SMBAKER: I don't know what this error is, but it's being returned
46     # xxx thierry: this most likely means the cert has a validity range in the future
47     # by newer pl nodes.
48     if err == 9:
49        #print "  X509_V_ERR_CERT_NOT_YET_VALID"
50        return 1
51
52     # allow self-signed certificates
53     if err == 18:
54        #print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
55        return 1
56
57     # allow certs that don't have an issuer
58     if err == 20:
59        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
60        return 1
61
62     # allow chained certs with self-signed roots
63     if err == 19:
64         return 1
65     
66     # allow certs that are untrusted
67     if err == 21:
68        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
69        return 1
70
71     # allow certs that are untrusted
72     if err == 27:
73        #print "  X509_V_ERR_CERT_UNTRUSTED"
74        return 1
75
76     # ignore X509_V_ERR_CERT_SIGNATURE_FAILURE
77     if err == 7:
78        return 1         
79
80     logger.debug("  error %s in verify_callback"%err)
81
82     return 0
83
84 ##
85 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request handler
86 class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
87     """Secure XML-RPC request handler class.
88
89     It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
90     """
91     def setup(self):
92         self.connection = self.request
93         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
94         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
95
96     def do_POST(self):
97         """Handles the HTTPS POST request.
98
99         It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
100         the socket cleanly.
101         """
102         try:
103             peer_cert = Certificate()
104             peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
105             generic=Generic.the_flavour()
106             self.api = generic.make_api (peer_cert = peer_cert, 
107                                          interface = self.server.interface, 
108                                          key_file = self.server.key_file, 
109                                          cert_file = self.server.cert_file,
110                                          cache = self.cache)
111             #logger.info("SecureXMLRpcRequestHandler.do_POST:")
112             #logger.info("interface=%s"%self.server.interface)
113             #logger.info("key_file=%s"%self.server.key_file)
114             #logger.info("api=%s"%self.api)
115             #logger.info("server=%s"%self.server)
116             #logger.info("handler=%s"%self)
117             # get arguments
118             request = self.rfile.read(int(self.headers["content-length"]))
119             remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
120             self.api.remote_addr = remote_addr            
121             response = self.api.handle(remote_addr, request, self.server.method_map)
122         except Exception, fault:
123             # This should only happen if the module is buggy
124             # internal error, report as HTTP server error
125             logger.log_exc("server.do_POST")
126             response = self.api.prepare_response(fault)
127             #self.send_response(500)
128             #self.end_headers()
129        
130         # got a valid response
131         self.send_response(200)
132         self.send_header("Content-type", "text/xml")
133         self.send_header("Content-length", str(len(response)))
134         self.end_headers()
135         self.wfile.write(response)
136         self.wfile.flush()
137         # close db connection
138         self.api.close_dbsession()
139         # shut down the connection
140         self.connection.shutdown() # Modified here!
141
142 ##
143 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
144 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
145
146     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
147         """Secure XML-RPC server.
148
149         It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
150         """
151         logger.debug("SecureXMLRPCServer.__init__, server_address=%s, cert_file=%s, key_file=%s"%(server_address,cert_file,key_file))
152         self.logRequests = logRequests
153         self.interface = None
154         self.key_file = key_file
155         self.cert_file = cert_file
156         self.method_map = {}
157         # add cache to the request handler
158         HandlerClass.cache = Cache()
159         #for compatibility with python 2.4 (centos53)
160         if sys.version_info < (2, 5):
161             SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
162         else:
163            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
164         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
165         ctx = SSL.Context(SSL.SSLv23_METHOD)
166         ctx.use_privatekey_file(key_file)        
167         ctx.use_certificate_file(cert_file)
168         # If you wanted to verify certs against known CAs.. this is how you would do it
169         #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
170         config = Config()
171         trusted_cert_files = TrustedRoots(config.get_trustedroots_dir()).get_file_list()
172         for cert_file in trusted_cert_files:
173             ctx.load_verify_locations(cert_file)
174         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
175         ctx.set_verify_depth(5)
176         ctx.set_app_data(self)
177         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
178                                                         self.socket_type))
179         self.server_bind()
180         self.server_activate()
181
182     # _dispatch
183     #
184     # Convert an exception on the server to a full stack trace and send it to
185     # the client.
186
187     def _dispatch(self, method, params):
188         logger.debug("SecureXMLRPCServer._dispatch, method=%s"%method)
189         try:
190             return SimpleXMLRPCServer.SimpleXMLRPCDispatcher._dispatch(self, method, params)
191         except:
192             # can't use format_exc() as it is not available in jython yet
193             # (even in trunk).
194             type, value, tb = sys.exc_info()
195             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
196
197     # override this one from the python 2.7 code
198     # originally defined in class TCPServer
199     def shutdown_request(self, request):
200         """Called to shutdown and close an individual request."""
201         # ---------- 
202         # the std python 2.7 code just attempts a request.shutdown(socket.SHUT_WR)
203         # this works fine with regular sockets
204         # However we are dealing with an instance of OpenSSL.SSL.Connection instead
205         # This one only supports shutdown(), and in addition this does not
206         # always perform as expected
207         # ---------- std python 2.7 code
208         try:
209             #explicitly shutdown.  socket.close() merely releases
210             #the socket and waits for GC to perform the actual close.
211             request.shutdown(socket.SHUT_WR)
212         except socket.error:
213             pass #some platforms may raise ENOTCONN here
214         # ----------
215         except TypeError:
216             # we are dealing with an OpenSSL.Connection object, 
217             # try to shut it down but never mind if that fails
218             try: request.shutdown()
219             except: pass
220         # ----------
221         self.close_request(request)
222
223 ## From Active State code: http://code.activestate.com/recipes/574454/
224 # This is intended as a drop-in replacement for the ThreadingMixIn class in 
225 # module SocketServer of the standard lib. Instead of spawning a new thread 
226 # for each request, requests are processed by of pool of reusable threads.
227 class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
228     """
229     use a thread pool instead of a new thread on every request
230     """
231     # XX TODO: Make this configurable
232     # config = Config()
233     # numThreads = config.SFA_SERVER_NUM_THREADS
234     numThreads = 25
235     allow_reuse_address = True  # seems to fix socket.error on server restart
236
237     def serve_forever(self):
238         """
239         Handle one request at a time until doomsday.
240         """
241         # set up the threadpool
242         self.requests = Queue()
243
244         for x in range(self.numThreads):
245             t = threading.Thread(target = self.process_request_thread)
246             t.setDaemon(1)
247             t.start()
248
249         # server main loop
250         while True:
251             self.handle_request()
252             
253         self.server_close()
254
255     
256     def process_request_thread(self):
257         """
258         obtain request from queue instead of directly from server socket
259         """
260         while True:
261             SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
262
263     
264     def handle_request(self):
265         """
266         simply collect requests and put them on the queue for the workers.
267         """
268         try:
269             request, client_address = self.get_request()
270         except socket.error:
271             return
272         if self.verify_request(request, client_address):
273             self.requests.put((request, client_address))
274
275 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
276     pass