Merge branch 'upstreammaster'
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from Queue import Queue
14 import xmlrpclib
15 import SocketServer
16 import BaseHTTPServer
17 import SimpleXMLRPCServer
18 from OpenSSL import SSL
19
20 from sfa.util.sfalogging import logger
21 from sfa.util.config import Config
22 from sfa.util.cache import Cache 
23 from sfa.trust.certificate import Certificate
24 from sfa.trust.trustedroots import TrustedRoots
25
26 # don't hard code an api class anymore here
27 from sfa.generic import Generic
28
29 ##
30 # Verification callback for pyOpenSSL. We do our own checking of keys because
31 # we have our own authentication spec. Thus we disable several of the normal
32 # prohibitions that OpenSSL places on certificates
33
34 def verify_callback(conn, x509, err, depth, preverify):
35     # if the cert has been preverified, then it is ok
36     if preverify:
37        #print "  preverified"
38        return 1
39
40
41     # the certificate verification done by openssl checks a number of things
42     # that we aren't interested in, so we look out for those error messages
43     # and ignore them
44
45     # XXX SMBAKER: I don't know what this error is, but it's being returned
46     # xxx thierry: this most likely means the cert has a validity range in the future
47     # by newer pl nodes.
48     if err == 9:
49        #print "  X509_V_ERR_CERT_NOT_YET_VALID"
50        return 1
51
52     # allow self-signed certificates
53     if err == 18:
54        #print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
55        return 1
56
57     # allow certs that don't have an issuer
58     if err == 20:
59        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
60        return 1
61
62     # allow chained certs with self-signed roots
63     if err == 19:
64         return 1
65     
66     # allow certs that are untrusted
67     if err == 21:
68        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
69        return 1
70
71     # allow certs that are untrusted
72     if err == 27:
73        #print "  X509_V_ERR_CERT_UNTRUSTED"
74        return 1
75
76     logger.debug("  error %s in verify_callback"%err)
77
78     return 0
79
80 ##
81 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request handler
82 class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
83     """Secure XML-RPC request handler class.
84
85     It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
86     """
87     def setup(self):
88         self.connection = self.request
89         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
90         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
91
92     def do_POST(self):
93         """Handles the HTTPS POST request.
94
95         It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
96         the socket cleanly.
97         """
98         try:
99             peer_cert = Certificate()
100             peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
101             generic=Generic.the_flavour()
102             self.api = generic.make_api (peer_cert = peer_cert, 
103                                          interface = self.server.interface, 
104                                          key_file = self.server.key_file, 
105                                          cert_file = self.server.cert_file,
106                                          cache = self.cache)
107             #logger.info("SecureXMLRpcRequestHandler.do_POST:")
108             #logger.info("interface=%s"%self.server.interface)
109             #logger.info("key_file=%s"%self.server.key_file)
110             #logger.info("api=%s"%self.api)
111             #logger.info("server=%s"%self.server)
112             #logger.info("handler=%s"%self)
113             # get arguments
114             request = self.rfile.read(int(self.headers["content-length"]))
115             remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
116             self.api.remote_addr = remote_addr            
117             response = self.api.handle(remote_addr, request, self.server.method_map)
118         except Exception, fault:
119             # This should only happen if the module is buggy
120             # internal error, report as HTTP server error
121             logger.log_exc("server.do_POST")
122             response = self.api.prepare_response(fault)
123             #self.send_response(500)
124             #self.end_headers()
125        
126         # got a valid response
127         self.send_response(200)
128         self.send_header("Content-type", "text/xml")
129         self.send_header("Content-length", str(len(response)))
130         self.end_headers()
131         self.wfile.write(response)
132
133         # shut down the connection
134         self.wfile.flush()
135         self.connection.shutdown() # Modified here!
136
137 ##
138 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
139 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
140
141     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
142         """Secure XML-RPC server.
143
144         It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
145         """
146         logger.debug("SecureXMLRPCServer.__init__, server_address=%s, cert_file=%s"%(server_address,cert_file))
147         self.logRequests = logRequests
148         self.interface = None
149         self.key_file = key_file
150         self.cert_file = cert_file
151         self.method_map = {}
152         # add cache to the request handler
153         HandlerClass.cache = Cache()
154         #for compatibility with python 2.4 (centos53)
155         if sys.version_info < (2, 5):
156             SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
157         else:
158            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
159         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
160         ctx = SSL.Context(SSL.SSLv23_METHOD)
161         ctx.use_privatekey_file(key_file)        
162         ctx.use_certificate_file(cert_file)
163         # If you wanted to verify certs against known CAs.. this is how you would do it
164         #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
165         config = Config()
166         trusted_cert_files = TrustedRoots(config.get_trustedroots_dir()).get_file_list()
167         for cert_file in trusted_cert_files:
168             ctx.load_verify_locations(cert_file)
169         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
170         ctx.set_verify_depth(5)
171         ctx.set_app_data(self)
172         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
173                                                         self.socket_type))
174         self.server_bind()
175         self.server_activate()
176
177     # _dispatch
178     #
179     # Convert an exception on the server to a full stack trace and send it to
180     # the client.
181
182     def _dispatch(self, method, params):
183         logger.debug("SecureXMLRPCServer._dispatch, method=%s"%method)
184         try:
185             return SimpleXMLRPCServer.SimpleXMLRPCDispatcher._dispatch(self, method, params)
186         except:
187             # can't use format_exc() as it is not available in jython yet
188             # (even in trunk).
189             type, value, tb = sys.exc_info()
190             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
191
192     # override this one from the python 2.7 code
193     # originally defined in class TCPServer
194     def shutdown_request(self, request):
195         """Called to shutdown and close an individual request."""
196         # ---------- 
197         # the std python 2.7 code just attempts a request.shutdown(socket.SHUT_WR)
198         # this works fine with regular sockets
199         # However we are dealing with an instance of OpenSSL.SSL.Connection instead
200         # This one only supports shutdown(), and in addition this does not
201         # always perform as expected
202         # ---------- std python 2.7 code
203         try:
204             #explicitly shutdown.  socket.close() merely releases
205             #the socket and waits for GC to perform the actual close.
206             request.shutdown(socket.SHUT_WR)
207         except socket.error:
208             pass #some platforms may raise ENOTCONN here
209         # ----------
210         except TypeError:
211             # we are dealing with an OpenSSL.Connection object, 
212             # try to shut it down but never mind if that fails
213             try: request.shutdown()
214             except: pass
215         # ----------
216         self.close_request(request)
217
218 ## From Active State code: http://code.activestate.com/recipes/574454/
219 # This is intended as a drop-in replacement for the ThreadingMixIn class in 
220 # module SocketServer of the standard lib. Instead of spawning a new thread 
221 # for each request, requests are processed by of pool of reusable threads.
222 class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
223     """
224     use a thread pool instead of a new thread on every request
225     """
226     # XX TODO: Make this configurable
227     # config = Config()
228     # numThreads = config.SFA_SERVER_NUM_THREADS
229     numThreads = 25
230     allow_reuse_address = True  # seems to fix socket.error on server restart
231
232     def serve_forever(self):
233         """
234         Handle one request at a time until doomsday.
235         """
236         # set up the threadpool
237         self.requests = Queue()
238
239         for x in range(self.numThreads):
240             t = threading.Thread(target = self.process_request_thread)
241             t.setDaemon(1)
242             t.start()
243
244         # server main loop
245         while True:
246             self.handle_request()
247             
248         self.server_close()
249
250     
251     def process_request_thread(self):
252         """
253         obtain request from queue instead of directly from server socket
254         """
255         while True:
256             SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
257
258     
259     def handle_request(self):
260         """
261         simply collect requests and put them on the queue for the workers.
262         """
263         try:
264             request, client_address = self.get_request()
265         except socket.error:
266             return
267         if self.verify_request(request, client_address):
268             self.requests.put((request, client_address))
269
270 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
271     pass