no semantic change, make pylint happier
[sfa.git] / sfa / server / threadedserver.py
1 ##
2 # This module implements a general-purpose server layer for sfa.
3 # The same basic server should be usable on the registry, component, or
4 # other interfaces.
5 #
6 # TODO: investigate ways to combine this with existing PLC server?
7 ##
8
9 import sys
10 import socket
11 import traceback
12 import threading
13 from queue import Queue
14 import socketserver
15 import http.server
16 import xmlrpc.server
17 from OpenSSL import SSL
18
19 from sfa.util.sfalogging import logger
20 from sfa.util.config import Config
21 from sfa.util.cache import Cache
22 from sfa.trust.certificate import Certificate
23 from sfa.trust.trustedroots import TrustedRoots
24 import xmlrpc.client
25
26 # don't hard code an api class anymore here
27 from sfa.generic import Generic
28
29 ##
30 # Verification callback for pyOpenSSL. We do our own checking of keys because
31 # we have our own authentication spec. Thus we disable several of the normal
32 # prohibitions that OpenSSL places on certificates
33
34
35 def verify_callback(conn, x509, err, depth, preverify):
36     # if the cert has been preverified, then it is ok
37     if preverify:
38         # print "  preverified"
39         return 1
40
41     # the certificate verification done by openssl checks a number of things
42     # that we aren't interested in, so we look out for those error messages
43     # and ignore them
44
45     # XXX SMBAKER: I don't know what this error is, but it's being returned
46     # xxx thierry: this most likely means the cert
47     #     has a validity range in the future
48     # by newer pl nodes.
49     if err == 9:
50         # print "  X509_V_ERR_CERT_NOT_YET_VALID"
51         return 1
52
53     # allow self-signed certificates
54     if err == 18:
55         # print "  X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT"
56         return 1
57
58     # allow certs that don't have an issuer
59     if err == 20:
60         # print "  X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY"
61         return 1
62
63     # allow chained certs with self-signed roots
64     if err == 19:
65         return 1
66
67     # allow certs that are untrusted
68     if err == 21:
69         # print "  X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE"
70         return 1
71
72     # allow certs that are untrusted
73     if err == 27:
74         # print "  X509_V_ERR_CERT_UNTRUSTED"
75         return 1
76
77     # ignore X509_V_ERR_CERT_SIGNATURE_FAILURE
78     if err == 7:
79         return 1
80
81     logger.debug("  error %s in verify_callback" % err)
82
83     return 0
84
85 ##
86 # taken from the web (XXX find reference). Implements HTTPS xmlrpc request
87 # handler
88
89
90 class SecureXMLRpcRequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
91     """
92     Secure XML-RPC request handler class.
93
94     It it very similar to SimpleXMLRPCRequestHandler
95     but it uses HTTPS for transporting XML data.
96     """
97
98     def setup(self):
99         self.connection = self.request
100         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
101         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
102
103     def do_POST(self):
104         """
105         Handles the HTTPS POST request.
106
107         It was copied out from SimpleXMLRPCServer.py and modified to shutdown
108         the socket cleanly.
109         """
110         try:
111             peer_cert = Certificate()
112             peer_cert.load_from_pyopenssl_x509(
113                 self.connection.get_peer_certificate())
114             generic = Generic.the_flavour()
115             self.api = generic.make_api(peer_cert=peer_cert,
116                                         interface=self.server.interface,
117                                         key_file=self.server.key_file,
118                                         cert_file=self.server.cert_file,
119                                         cache=self.cache)
120             # logger.info("SecureXMLRpcRequestHandler.do_POST:")
121             # logger.info("interface=%s"%self.server.interface)
122             # logger.info("key_file=%s"%self.server.key_file)
123             # logger.info("api=%s"%self.api)
124             # logger.info("server=%s"%self.server)
125             # logger.info("handler=%s"%self)
126             # get arguments
127             request = self.rfile.read(int(self.headers["content-length"]))
128             remote_addr = (
129                 remote_ip, remote_port) = self.connection.getpeername()
130             self.api.remote_addr = remote_addr
131             response = self.api.handle(
132                 remote_addr, request, self.server.method_map)
133         except Exception as fault:
134             # This should only happen if the module is buggy
135             # internal error, report as HTTP server error
136             logger.log_exc("server.do_POST")
137             response = self.api.prepare_response(fault)
138             # self.send_response(500)
139             # self.end_headers()
140
141         # avoid session/connection leaks : do this no matter what
142         finally:
143             self.send_response(200)
144             self.send_header("Content-type", "text/xml")
145             self.send_header("Content-length", str(len(response)))
146             self.end_headers()
147             self.wfile.write(response)
148             self.wfile.flush()
149             # close db connection
150             self.api.close_dbsession()
151             # shut down the connection
152             self.connection.shutdown()  # Modified here!
153
154 ##
155 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
156
157
158 class SecureXMLRPCServer(http.server.HTTPServer,
159                          xmlrpc.server.SimpleXMLRPCDispatcher):
160
161     def __init__(self, server_address, HandlerClass,
162                  key_file, cert_file, logRequests=True):
163         """
164         Secure XML-RPC server.
165
166         It it very similar to SimpleXMLRPCServer
167          but it uses HTTPS for transporting XML data.
168         """
169         logger.debug(
170             f"SecureXMLRPCServer.__init__, server_address={server_address}, "
171             f"cert_file={cert_file}, key_file={key_file}")
172         self.logRequests = logRequests
173         self.interface = None
174         self.key_file = key_file
175         self.cert_file = cert_file
176         self.method_map = {}
177         # add cache to the request handler
178         HandlerClass.cache = Cache()
179         # for compatibility with python 2.4 (centos53)
180         if sys.version_info < (2, 5):
181             xmlrpc.server.SimpleXMLRPCDispatcher.__init__(self)
182         else:
183             xmlrpc.server.SimpleXMLRPCDispatcher.__init__(
184                 self, True, None)
185         socketserver.BaseServer.__init__(self, server_address, HandlerClass)
186         ctx = SSL.Context(SSL.SSLv23_METHOD)
187         ctx.use_privatekey_file(key_file)
188         ctx.use_certificate_file(cert_file)
189         # If you wanted to verify certs against known CAs..
190         # this is how you would do it
191         # ctx.load_verify_locations('/etc/sfa/trusted_roots/plc.gpo.gid')
192         config = Config()
193         trusted_cert_files = TrustedRoots(
194             config.get_trustedroots_dir()).get_file_list()
195         for cert_file in trusted_cert_files:
196             ctx.load_verify_locations(cert_file)
197         ctx.set_verify(SSL.VERIFY_PEER |
198                        SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_callback)
199         ctx.set_verify_depth(5)
200         ctx.set_app_data(self)
201         self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
202                                                         self.socket_type))
203         self.server_bind()
204         self.server_activate()
205
206     # _dispatch
207     #
208     # Convert an exception on the server to a full stack trace and send it to
209     # the client.
210
211     def _dispatch(self, method, params):
212         logger.debug("SecureXMLRPCServer._dispatch, method=%s" % method)
213         try:
214             return xmlrpc.server.SimpleXMLRPCDispatcher._dispatch(
215                 self, method, params)
216         except:
217             # can't use format_exc() as it is not available in jython yet
218             # (even in trunk).
219             type, value, tb = sys.exc_info()
220             raise xmlrpc.client.Fault(1, ''.join(
221                 traceback.format_exception(type, value, tb)))
222
223     # override this one from the python 2.7 code
224     # originally defined in class TCPServer
225     def shutdown_request(self, request):
226         """
227         Called to shutdown and close an individual request.
228         """
229         # ----------
230         # the std python 2.7 code just attempts a
231         # request.shutdown(socket.SHUT_WR)
232         # this works fine with regular sockets
233         # However we are dealing with an instance of
234         # OpenSSL.SSL.Connection instead
235         # This one only supports shutdown(), and in addition this does not
236         # always perform as expected
237         # ---------- std python 2.7 code
238         try:
239             # explicitly shutdown.  socket.close() merely releases
240             # the socket and waits for GC to perform the actual close.
241             request.shutdown(socket.SHUT_WR)
242         except socket.error:
243             pass  # some platforms may raise ENOTCONN here
244         # ----------
245         except TypeError:
246             # we are dealing with an OpenSSL.Connection object,
247             # try to shut it down but never mind if that fails
248             try:
249                 request.shutdown()
250             except:
251                 pass
252         # ----------
253         self.close_request(request)
254
255 # From Active State code: http://code.activestate.com/recipes/574454/
256 # This is intended as a drop-in replacement for the ThreadingMixIn class in
257 # module SocketServer of the standard lib. Instead of spawning a new thread
258 # for each request, requests are processed by of pool of reusable threads.
259
260
261 class ThreadPoolMixIn(socketserver.ThreadingMixIn):
262     """
263     use a thread pool instead of a new thread on every request
264     """
265     # XX TODO: Make this configurable
266     # config = Config()
267     # numThreads = config.SFA_SERVER_NUM_THREADS
268     numThreads = 25
269     allow_reuse_address = True  # seems to fix socket.error on server restart
270
271     def serve_forever(self):
272         """
273         Handle one request at a time until doomsday.
274         """
275         # set up the threadpool
276         self.requests = Queue()
277
278         for _ in range(self.numThreads):
279             thread = threading.Thread(target=self.process_request_thread)
280             thread.setDaemon(1)
281             thread.start()
282
283         # server main loop
284         while True:
285             self.handle_request()
286
287         self.server_close()
288
289     def process_request_thread(self):
290         """
291         obtain request from queue instead of directly from server socket
292         """
293         while True:
294             socketserver.ThreadingMixIn.process_request_thread(
295                 self, *self.requests.get())
296
297     def handle_request(self):
298         """
299         simply collect requests and put them on the queue for the workers.
300         """
301         try:
302             request, client_address = self.get_request()
303         except socket.error:
304             return
305         if self.verify_request(request, client_address):
306             self.requests.put((request, client_address))
307
308
309 class ThreadedServer(ThreadPoolMixIn, SecureXMLRPCServer):
310     pass