solve conflicts
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from Queue import Queue
14 import SocketServer
15 import BaseHTTPServer
16 import SimpleXMLRPCServer
17 from OpenSSL import SSL
18
19 from sfa.util.sfalogging import logger
20 from sfa.util.config import Config
21 from sfa.util.cache import Cache 
22 from sfa.trust.certificate import Certificate
23 from sfa.trust.trustedroots import TrustedRoots
24
25 # don't hard code an api class anymore here
26 from sfa.generic import Generic
27
28 ##
29 # Verification callback for pyOpenSSL. We do our own checking of keys because
30 # we have our own authentication spec. Thus we disable several of the normal
31 # prohibitions that OpenSSL places on certificates
32
33 def verify_callback(conn, x509, err, depth, preverify):
34     # if the cert has been preverified, then it is ok
35     if preverify:
36        #print "  preverified"
37        return 1
38
39
40     # the certificate verification done by openssl checks a number of things
41     # that we aren't interested in, so we look out for those error messages
42     # and ignore them
43
44     # XXX SMBAKER: I don't know what this error is, but it's being returned
45     # by newer pl nodes.
46     if err == 9:
47        #print "  X509_V_ERR_CERT_NOT_YET_VALID"
48        return 1
49
50     # allow self-signed certificates
51     if err == 18:
52        #print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
53        return 1
54
55     # allow certs that don't have an issuer
56     if err == 20:
57        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
58        return 1
59
60     # allow chained certs with self-signed roots
61     if err == 19:
62         return 1
63     
64     # allow certs that are untrusted
65     if err == 21:
66        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
67        return 1
68
69     # allow certs that are untrusted
70     if err == 27:
71        #print "  X509_V_ERR_CERT_UNTRUSTED"
72        return 1
73
74     print "  error", err, "in verify_callback"
75
76     return 0
77
78 ##
79 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request handler
80 class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
81     """Secure XML-RPC request handler class.
82
83     It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
84     """
85     def setup(self):
86         self.connection = self.request
87         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
88         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
89
90     def do_POST(self):
91         """Handles the HTTPS POST request.
92
93         It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
94         the socket cleanly.
95         """
96         try:
97             peer_cert = Certificate()
98             peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
99             generic=Generic.the_flavour()
100             self.api = generic.make_api (peer_cert = peer_cert, 
101                                          interface = self.server.interface, 
102                                          key_file = self.server.key_file, 
103                                          cert_file = self.server.cert_file,
104                                          cache = self.cache)
105             #logger.info("SecureXMLRpcRequestHandler.do_POST:")
106             #logger.info("interface=%s"%self.server.interface)
107             #logger.info("key_file=%s"%self.server.key_file)
108             #logger.info("api=%s"%self.api)
109             #logger.info("server=%s"%self.server)
110             #logger.info("handler=%s"%self)
111             # get arguments
112             request = self.rfile.read(int(self.headers["content-length"]))
113             remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
114             self.api.remote_addr = remote_addr            
115             response = self.api.handle(remote_addr, request, self.server.method_map)
116         except Exception, fault:
117             # This should only happen if the module is buggy
118             # internal error, report as HTTP server error
119             logger.log_exc("server.do_POST")
120             response = self.api.prepare_response(fault)
121             #self.send_response(500)
122             #self.end_headers()
123        
124         # got a valid response
125         self.send_response(200)
126         self.send_header("Content-type", "text/xml")
127         self.send_header("Content-length", str(len(response)))
128         self.end_headers()
129         self.wfile.write(response)
130
131         # shut down the connection
132         self.wfile.flush()
133         self.connection.shutdown() # Modified here!
134
135 ##
136 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
137 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
138
139     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
140         """Secure XML-RPC server.
141
142         It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
143         """
144         logger.debug("SecureXMLRPCServer.__init__, server_address=%s, cert_file=%s"%(server_address,cert_file))
145         self.logRequests = logRequests
146         self.interface = None
147         self.key_file = key_file
148         self.cert_file = cert_file
149         self.method_map = {}
150         # add cache to the request handler
151         HandlerClass.cache = Cache()
152         #for compatibility with python 2.4 (centos53)
153         if sys.version_info < (2, 5):
154             SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
155         else:
156            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
157         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
158         ctx = SSL.Context(SSL.SSLv23_METHOD)
159         ctx.use_privatekey_file(key_file)        
160         ctx.use_certificate_file(cert_file)
161         # If you wanted to verify certs against known CAs.. this is how you would do it
162         #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
163         config = Config()
164         trusted_cert_files = TrustedRoots(config.get_trustedroots_dir()).get_file_list()
165         for cert_file in trusted_cert_files:
166             ctx.load_verify_locations(cert_file)
167         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
168         ctx.set_verify_depth(5)
169         ctx.set_app_data(self)
170         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
171                                                         self.socket_type))
172         self.server_bind()
173         self.server_activate()
174
175     # _dispatch
176     #
177     # Convert an exception on the server to a full stack trace and send it to
178     # the client.
179
180     def _dispatch(self, method, params):
181         logger.debug("SecureXMLRPCServer._dispatch, method=%s"%method)
182         try:
183             return SimpleXMLRPCServer.SimpleXMLRPCDispatcher._dispatch(self, method, params)
184         except:
185             # can't use format_exc() as it is not available in jython yet
186             # (even in trunk).
187             type, value, tb = sys.exc_info()
188             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
189
190     # override this one from the python 2.7 code
191     # originally defined in class TCPServer
192     def shutdown_request(self, request):
193         """Called to shutdown and close an individual request."""
194         # ---------- 
195         # the std python 2.7 code just attempts a request.shutdown(socket.SHUT_WR)
196         # this works fine with regular sockets
197         # However we are dealing with an instance of OpenSSL.SSL.Connection instead
198         # This one only supports shutdown(), and in addition this does not
199         # always perform as expected
200         # ---------- std python 2.7 code
201         try:
202             #explicitly shutdown.  socket.close() merely releases
203             #the socket and waits for GC to perform the actual close.
204             request.shutdown(socket.SHUT_WR)
205         except socket.error:
206             pass #some platforms may raise ENOTCONN here
207         # ----------
208         except TypeError:
209             # we are dealing with an OpenSSL.Connection object, 
210             # try to shut it down but never mind if that fails
211             try: request.shutdown()
212             except: pass
213         # ----------
214         self.close_request(request)
215
216 ## From Active State code: http://code.activestate.com/recipes/574454/
217 # This is intended as a drop-in replacement for the ThreadingMixIn class in 
218 # module SocketServer of the standard lib. Instead of spawning a new thread 
219 # for each request, requests are processed by of pool of reusable threads.
220 class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
221     """
222     use a thread pool instead of a new thread on every request
223     """
224     # XX TODO: Make this configurable
225     # config = Config()
226     # numThreads = config.SFA_SERVER_NUM_THREADS
227     numThreads = 25
228     allow_reuse_address = True  # seems to fix socket.error on server restart
229
230     def serve_forever(self):
231         """
232         Handle one request at a time until doomsday.
233         """
234         # set up the threadpool
235         self.requests = Queue()
236
237         for x in range(self.numThreads):
238             t = threading.Thread(target = self.process_request_thread)
239             t.setDaemon(1)
240             t.start()
241
242         # server main loop
243         while True:
244             self.handle_request()
245             
246         self.server_close()
247
248     
249     def process_request_thread(self):
250         """
251         obtain request from queue instead of directly from server socket
252         """
253         while True:
254             SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
255
256     
257     def handle_request(self):
258         """
259         simply collect requests and put them on the queue for the workers.
260         """
261         try:
262             request, client_address = self.get_request()
263         except socket.error:
264             return
265         if self.verify_request(request, client_address):
266             self.requests.put((request, client_address))
267
268 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
269     pass