remove unneeded import
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from Queue import Queue
14 import xmlrpclib
15 import SocketServer
16 import BaseHTTPServer
17 import SimpleXMLRPCServer
18 from OpenSSL import SSL
19
20 from sfa.util.sfalogging import logger
21 from sfa.util.config import Config
22 from sfa.util.cache import Cache 
23 from sfa.trust.certificate import Certificate
24 from sfa.trust.trustedroots import TrustedRoots
25
26 # don't hard code an api class anymore here
27 from sfa.generic import Generic
28
29 ##
30 # Verification callback for pyOpenSSL. We do our own checking of keys because
31 # we have our own authentication spec. Thus we disable several of the normal
32 # prohibitions that OpenSSL places on certificates
33
34 def verify_callback(conn, x509, err, depth, preverify):
35     # if the cert has been preverified, then it is ok
36     if preverify:
37        #print "  preverified"
38        return 1
39
40
41     # the certificate verification done by openssl checks a number of things
42     # that we aren't interested in, so we look out for those error messages
43     # and ignore them
44
45     # XXX SMBAKER: I don't know what this error is, but it's being returned
46     # by newer pl nodes.
47     if err == 9:
48        #print "  X509_V_ERR_CERT_NOT_YET_VALID"
49        return 1
50
51     # allow self-signed certificates
52     if err == 18:
53        #print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
54        return 1
55
56     # allow certs that don't have an issuer
57     if err == 20:
58        #print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
59        return 1
60
61     # allow chained certs with self-signed roots
62     if err == 19:
63         return 1
64     
65     # allow certs that are untrusted
66     if err == 21:
67        #print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
68        return 1
69
70     # allow certs that are untrusted
71     if err == 27:
72        #print "  X509_V_ERR_CERT_UNTRUSTED"
73        return 1
74
75     print "  error", err, "in verify_callback"
76
77     return 0
78
79 ##
80 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request handler
81 class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
82     """Secure XML-RPC request handler class.
83
84     It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
85     """
86     def setup(self):
87         self.connection = self.request
88         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
89         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
90
91     def do_POST(self):
92         """Handles the HTTPS POST request.
93
94         It was copied out from SimpleXMLRPCServer.py and modified to shutdown 
95         the socket cleanly.
96         """
97         try:
98             peer_cert = Certificate()
99             peer_cert.load_from_pyopenssl_x509(self.connection.get_peer_certificate())
100             generic=Generic.the_flavour()
101             self.api = generic.make_api (peer_cert = peer_cert, 
102                                          interface = self.server.interface, 
103                                          key_file = self.server.key_file, 
104                                          cert_file = self.server.cert_file,
105                                          cache = self.cache)
106             #logger.info("SecureXMLRpcRequestHandler.do_POST:")
107             #logger.info("interface=%s"%self.server.interface)
108             #logger.info("key_file=%s"%self.server.key_file)
109             #logger.info("api=%s"%self.api)
110             #logger.info("server=%s"%self.server)
111             #logger.info("handler=%s"%self)
112             # get arguments
113             request = self.rfile.read(int(self.headers["content-length"]))
114             remote_addr = (remote_ip, remote_port) = self.connection.getpeername()
115             self.api.remote_addr = remote_addr            
116             response = self.api.handle(remote_addr, request, self.server.method_map)
117         except Exception, fault:
118             # This should only happen if the module is buggy
119             # internal error, report as HTTP server error
120             logger.log_exc("server.do_POST")
121             response = self.api.prepare_response(fault)
122             #self.send_response(500)
123             #self.end_headers()
124        
125         # got a valid response
126         self.send_response(200)
127         self.send_header("Content-type", "text/xml")
128         self.send_header("Content-length", str(len(response)))
129         self.end_headers()
130         self.wfile.write(response)
131
132         # shut down the connection
133         self.wfile.flush()
134         self.connection.shutdown() # Modified here!
135
136 ##
137 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
138 class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
139
140     def __init__(self, server_address, HandlerClass, key_file, cert_file, logRequests=True):
141         """Secure XML-RPC server.
142
143         It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
144         """
145         logger.debug("SecureXMLRPCServer.__init__, server_address=%s, cert_file=%s"%(server_address,cert_file))
146         self.logRequests = logRequests
147         self.interface = None
148         self.key_file = key_file
149         self.cert_file = cert_file
150         self.method_map = {}
151         # add cache to the request handler
152         HandlerClass.cache = Cache()
153         #for compatibility with python 2.4 (centos53)
154         if sys.version_info < (2, 5):
155             SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
156         else:
157            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, True, None)
158         SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
159         ctx = SSL.Context(SSL.SSLv23_METHOD)
160         ctx.use_privatekey_file(key_file)        
161         ctx.use_certificate_file(cert_file)
162         # If you wanted to verify certs against known CAs.. this is how you would do it
163         #ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
164         config = Config()
165         trusted_cert_files = TrustedRoots(config.get_trustedroots_dir()).get_file_list()
166         for cert_file in trusted_cert_files:
167             ctx.load_verify_locations(cert_file)
168         ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
169         ctx.set_verify_depth(5)
170         ctx.set_app_data(self)
171         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
172                                                         self.socket_type))
173         self.server_bind()
174         self.server_activate()
175
176     # _dispatch
177     #
178     # Convert an exception on the server to a full stack trace and send it to
179     # the client.
180
181     def _dispatch(self, method, params):
182         logger.debug("SecureXMLRPCServer._dispatch, method=%s"%method)
183         try:
184             return SimpleXMLRPCServer.SimpleXMLRPCDispatcher._dispatch(self, method, params)
185         except:
186             # can't use format_exc() as it is not available in jython yet
187             # (even in trunk).
188             type, value, tb = sys.exc_info()
189             raise xmlrpclib.Fault(1,''.join(traceback.format_exception(type, value, tb)))
190
191     # override this one from the python 2.7 code
192     # originally defined in class TCPServer
193     def shutdown_request(self, request):
194         """Called to shutdown and close an individual request."""
195         # ---------- 
196         # the std python 2.7 code just attempts a request.shutdown(socket.SHUT_WR)
197         # this works fine with regular sockets
198         # However we are dealing with an instance of OpenSSL.SSL.Connection instead
199         # This one only supports shutdown(), and in addition this does not
200         # always perform as expected
201         # ---------- std python 2.7 code
202         try:
203             #explicitly shutdown.  socket.close() merely releases
204             #the socket and waits for GC to perform the actual close.
205             request.shutdown(socket.SHUT_WR)
206         except socket.error:
207             pass #some platforms may raise ENOTCONN here
208         # ----------
209         except TypeError:
210             # we are dealing with an OpenSSL.Connection object, 
211             # try to shut it down but never mind if that fails
212             try: request.shutdown()
213             except: pass
214         # ----------
215         self.close_request(request)
216
217 ## From Active State code: http://code.activestate.com/recipes/574454/
218 # This is intended as a drop-in replacement for the ThreadingMixIn class in 
219 # module SocketServer of the standard lib. Instead of spawning a new thread 
220 # for each request, requests are processed by of pool of reusable threads.
221 class ThreadPoolMixIn(SocketServer.ThreadingMixIn):
222     """
223     use a thread pool instead of a new thread on every request
224     """
225     # XX TODO: Make this configurable
226     # config = Config()
227     # numThreads = config.SFA_SERVER_NUM_THREADS
228     numThreads = 25
229     allow_reuse_address = True  # seems to fix socket.error on server restart
230
231     def serve_forever(self):
232         """
233         Handle one request at a time until doomsday.
234         """
235         # set up the threadpool
236         self.requests = Queue()
237
238         for x in range(self.numThreads):
239             t = threading.Thread(target = self.process_request_thread)
240             t.setDaemon(1)
241             t.start()
242
243         # server main loop
244         while True:
245             self.handle_request()
246             
247         self.server_close()
248
249     
250     def process_request_thread(self):
251         """
252         obtain request from queue instead of directly from server socket
253         """
254         while True:
255             SocketServer.ThreadingMixIn.process_request_thread(self, *self.requests.get())
256
257     
258     def handle_request(self):
259         """
260         simply collect requests and put them on the queue for the workers.
261         """
262         try:
263             request, client_address = self.get_request()
264         except socket.error:
265             return
266         if self.verify_request(request, client_address):
267             self.requests.put((request, client_address))
268
269 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
270     pass