X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Ftrust%2Fcertificate.py;h=6f3ecc6268f25c222fc77d6270b6b9f47e4d7c81;hb=ecc85e0b923922cf7117d29b380f5284edb88f21;hp=64ac865f38a5588d2c448882dfa400eba0fe3f06;hpb=b1775bb47ea5e242d337dbd34f5d58d10a57a028;p=sfa.git diff --git a/sfa/trust/certificate.py b/sfa/trust/certificate.py index 64ac865f..6f3ecc62 100644 --- a/sfa/trust/certificate.py +++ b/sfa/trust/certificate.py @@ -34,32 +34,67 @@ # This module exports two classes: Keypair and Certificate. ## # -### $Id$ -### $URL$ -# +from __future__ import print_function + +import functools import os import tempfile import base64 -import traceback from tempfile import mkstemp from OpenSSL import crypto import M2Crypto from M2Crypto import X509 -from sfa.util.sfalogging import sfa_logger -from sfa.util.namespace import urn_to_hrn -from sfa.util.faults import * +from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent +from sfa.util.sfalogging import logger + +# this tends to generate quite some logs for little or no value +debug_verify_chain = False + +glo_passphrase_callback = None + +## +# A global callback may be implemented for requesting passphrases from the +# user. The function will be called with three arguments: +# +# keypair_obj: the keypair object that is calling the passphrase +# string: the string containing the private key that's being loaded +# x: unknown, appears to be 0, comes from pyOpenSSL and/or m2crypto +# +# The callback should return a string containing the passphrase. + +def set_passphrase_callback(callback_func): + global glo_passphrase_callback + + glo_passphrase_callback = callback_func + +## +# Sets a fixed passphrase. + +def set_passphrase(passphrase): + set_passphrase_callback( lambda k,s,x: passphrase ) + +## +# Check to see if a passphrase works for a particular private key string. +# Intended to be used by passphrase callbacks for input validation. + +def test_passphrase(string, passphrase): + try: + crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase)) + return True + except: + return False def convert_public_key(key): keyconvert_path = "/usr/bin/keyconvert.py" if not os.path.isfile(keyconvert_path): - raise IOError, "Could not find keyconvert in %s" % keyconvert_path + raise IOError("Could not find keyconvert in %s" % keyconvert_path) # we can only convert rsa keys if "ssh-dss" in key: - return None + raise Exception("keyconvert: dss keys are not supported") (ssh_f, ssh_fn) = tempfile.mkstemp() ssl_fn = tempfile.mktemp() @@ -73,20 +108,21 @@ def convert_public_key(key): # that it can be expected to see why it failed. # TODO: for production, cleanup the temporary files if not os.path.exists(ssl_fn): - return None + raise Exception("keyconvert: generated certificate not found. keyconvert may have failed.") k = Keypair() try: k.load_pubkey_from_file(ssl_fn) + return k except: - sfa_logger.log_exc("convert_public_key caught exception") - k = None - - # remove the temporary files - os.remove(ssh_fn) - os.remove(ssl_fn) - - return k + logger.log_exc("convert_public_key caught exception") + raise + finally: + # remove the temporary files + if os.path.exists(ssh_fn): + os.remove(ssh_fn) + if os.path.exists(ssl_fn): + os.remove(ssl_fn) ## # Public-private key pairs are implemented by the Keypair class. @@ -117,7 +153,7 @@ class Keypair: def create(self): self.key = crypto.PKey() - self.key.generate_key(crypto.TYPE_RSA, 1024) + self.key.generate_key(crypto.TYPE_RSA, 2048) ## # Save the private key to a file @@ -125,11 +161,13 @@ class Keypair: def save_to_file(self, filename): open(filename, 'w').write(self.as_pem()) + self.filename=filename ## # Load the private key from a file. Implicity the private key includes the public key. def load_from_file(self, filename): + self.filename=filename buffer = open(filename, 'r').read() self.load_from_string(buffer) @@ -137,8 +175,14 @@ class Keypair: # Load the private key from a string. Implicitly the private key includes the public key. def load_from_string(self, string): - self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string) - self.m2key = M2Crypto.EVP.load_key_string(string) + if glo_passphrase_callback: + self.key = crypto.load_privatekey( + crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string)) + self.m2key = M2Crypto.EVP.load_key_string( + string, functools.partial(glo_passphrase_callback, self, string)) + else: + self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string) + self.m2key = M2Crypto.EVP.load_key_string(string) ## # Load the public key from a string. No private key is loaded. @@ -161,8 +205,11 @@ class Keypair: ASN1.set_time(500) m2x509.set_not_before(ASN1) m2x509.set_not_after(ASN1) + # x509v3 so it can have extensions + # prob not necc since this cert itself is junk but still... + m2x509.set_version(2) junk_key = Keypair(create=True) - m2x509.sign(pkey=junk_key.get_m2_pkey(), md="sha1") + m2x509.sign(pkey=junk_key.get_m2_pubkey(), md="sha1") # convert the m2 x509 cert to a pyopenssl x509 m2pem = m2x509.as_pem() @@ -170,6 +217,7 @@ class Keypair: # get the pyopenssl pkey from the pyopenssl x509 self.key = pyx509.get_pubkey() + self.filename=filename ## # Load the public key from a string. No private key is loaded. @@ -190,7 +238,7 @@ class Keypair: ## # Return an M2Crypto key object - def get_m2_pkey(self): + def get_m2_pubkey(self): if not self.m2key: self.m2key = M2Crypto.EVP.load_key_string(self.as_pem()) return self.m2key @@ -199,7 +247,7 @@ class Keypair: # Returns a string containing the public key represented by this object. def get_pubkey_string(self): - m2pkey = self.get_m2_pkey() + m2pkey = self.get_m2_pubkey() return base64.b64encode(m2pkey.as_der()) ## @@ -208,7 +256,6 @@ class Keypair: def get_openssl_pkey(self): return self.key - ## # Given another Keypair object, return TRUE if the two keys are the same. @@ -216,13 +263,13 @@ class Keypair: return self.as_pem() == pkey.as_pem() def sign_string(self, data): - k = self.get_m2_pkey() + k = self.get_m2_pubkey() k.sign_init() k.sign_update(data) return base64.b64encode(k.sign_final()) def verify_string(self, data, sig): - k = self.get_m2_pkey() + k = self.get_m2_pubkey() k.verify_init() k.verify_update(data) return M2Crypto.m2.verify_final(k.ctx, base64.b64decode(sig), k.pkey) @@ -230,6 +277,20 @@ class Keypair: def compute_hash(self, value): return self.sign_string(str(value)) + # only informative + def get_filename(self): + return getattr(self,'filename',None) + + def dump (self, *args, **kwargs): + print(self.dump_string(*args, **kwargs)) + + def dump_string (self): + result="" + result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string() + filename=self.get_filename() + if filename: result += "Filename %s\n"%filename + return result + ## # The certificate class implements a general purpose X509 certificate, making # use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds @@ -243,28 +304,37 @@ class Keypair: # whether to save the parent certificates as well. class Certificate: - digest = "md5" + digest = "sha256" - cert = None - issuerKey = None - issuerSubject = None - parent = None +# x509 = None +# issuerKey = None +# issuerSubject = None +# parent = None + isCA = None # will be a boolean once set separator="-----parent-----" ## # Create a certificate object. # + # @param lifeDays life of cert in days - default is 1825==5 years # @param create If create==True, then also create a blank X509 certificate. # @param subject If subject!=None, then create a blank certificate and set # it's subject name. # @param string If string!=None, load the certficate from the string. # @param filename If filename!=None, load the certficiate from the file. + # @param isCA If !=None, set whether this cert is for a CA + + def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None): + # these used to be defined in the class ! + self.x509 = None + self.issuerKey = None + self.issuerSubject = None + self.parent = None - def __init__(self, create=False, subject=None, string=None, filename=None, intermediate=None): self.data = {} if create or subject: - self.create() + self.create(lifeDays) if subject: self.set_subject(subject) if string: @@ -272,24 +342,27 @@ class Certificate: if filename: self.load_from_file(filename) - if intermediate: - self.set_intermediate_ca(intermediate) + # Set the CA bit if a value was supplied + if isCA != None: + self.set_is_ca(isCA) - ## # Create a blank X509 certificate and store it in this object. - def create(self): - self.cert = crypto.X509() - self.cert.set_serial_number(3) - self.cert.gmtime_adj_notBefore(0) - self.cert.gmtime_adj_notAfter(60*60*24*365*5) # five years + def create(self, lifeDays=1825): + self.x509 = crypto.X509() + # FIXME: Use different serial #s + self.x509.set_serial_number(3) + self.x509.gmtime_adj_notBefore(0) # 0 means now + self.x509.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default + self.x509.set_version(2) # x509v3 so it can have extensions + ## # Given a pyOpenSSL X509 object, store that object inside of this # certificate object. def load_from_pyopenssl_x509(self, x509): - self.cert = x509 + self.x509 = x509 ## # Load the certificate from a string @@ -298,12 +371,23 @@ class Certificate: # if it is a chain of multiple certs, then split off the first one and # load it (support for the ---parent--- tag as well as normal chained certs) - string = string.strip() - + if string is None or string.strip() == "": + logger.warn("Empty string in load_from_string") + return - if not string.startswith('-----'): + string = string.strip() + + # If it's not in proper PEM format, wrap it + if string.count('-----BEGIN CERTIFICATE') == 0: string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string + # If there is a PEM cert in there, but there is some other text first + # such as the text of the certificate, skip the text + beg = string.find('-----BEGIN CERTIFICATE') + if beg > 0: + # skipping over non cert beginning + string = string[beg:] + parts = [] if string.count('-----BEGIN CERTIFICATE-----') > 1 and \ @@ -313,7 +397,10 @@ class Certificate: else: parts = string.split(Certificate.separator, 1) - self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0]) + self.x509 = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0]) + + if self.x509 is None: + logger.warn("Loaded from string but cert is None: %s" % string) # if there are more certs, then create a parent and let the parent load # itself from the remainder of the string @@ -328,6 +415,7 @@ class Certificate: file = open(filename) string = file.read() self.load_from_string(string) + self.filename=filename ## # Save the certificate to a string. @@ -335,7 +423,10 @@ class Certificate: # @param save_parents If save_parents==True, then also save the parent certificates. def save_to_string(self, save_parents=True): - string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert) + if self.x509 is None: + logger.warn("None cert in certificate.save_to_string") + return "" + string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.x509) if save_parents and self.parent: string = string + self.parent.save_to_string(save_parents) return string @@ -352,6 +443,7 @@ class Certificate: f = open(filename, 'w') f.write(string) f.close() + self.filename=filename ## # Save the certificate to a random file in /tmp/ @@ -386,7 +478,7 @@ class Certificate: self.issuerReq = req if cert: # if a cert was supplied, then get the subject from the cert - subject = cert.cert.get_subject() + subject = cert.x509.get_subject() assert(subject) self.issuerSubject = subject @@ -394,7 +486,7 @@ class Certificate: # Get the issuer name def get_issuer(self, which="CN"): - x = self.cert.get_issuer() + x = self.x509.get_issuer() return getattr(x, which) ## @@ -408,14 +500,44 @@ class Certificate: setattr(subj, key, name[key]) else: setattr(subj, "CN", name) - self.cert.set_subject(subj) + self.x509.set_subject(subj) + ## # Get the subject name of the certificate def get_subject(self, which="CN"): - x = self.cert.get_subject() + x = self.x509.get_subject() return getattr(x, which) + ## + # Get a pretty-print subject name of the certificate + # let's try to make this a little more usable as is makes logs hairy + # FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI? + pretty_fields = ['email'] + def filter_chunk(self, chunk): + for field in self.pretty_fields: + if field in chunk: + return " "+chunk + + def pretty_cert(self): + message = "[Cert." + x = self.x509.get_subject() + ou = getattr(x, "OU") + if ou: message += " OU: {}".format(ou) + cn = getattr(x, "CN") + if cn: message += " CN: {}".format(cn) + data = self.get_data(field='subjectAltName') + if data: + message += " SubjectAltName:" + counter = 0 + filtered = [self.filter_chunk(chunk) for chunk in data.split()] + message += " ".join( [f for f in filtered if f]) + omitted = len ([f for f in filtered if not f]) + if omitted: + message += "..+{} omitted".format(omitted) + message += "]" + return message + ## # Get the public key of the certificate. # @@ -423,7 +545,7 @@ class Certificate: def set_pubkey(self, key): assert(isinstance(key, Keypair)) - self.cert.set_pubkey(key.get_openssl_pkey()) + self.x509.set_pubkey(key.get_openssl_pkey()) ## # Get the public key of the certificate. @@ -432,14 +554,29 @@ class Certificate: def get_pubkey(self): m2x509 = X509.load_cert_string(self.save_to_string()) pkey = Keypair() - pkey.key = self.cert.get_pubkey() + pkey.key = self.x509.get_pubkey() pkey.m2key = m2x509.get_pubkey() return pkey def set_intermediate_ca(self, val): - self.intermediate = val + return self.set_is_ca(val) + + # Set whether this cert is for a CA. All signers and only signers should be CAs. + # The local member starts unset, letting us check that you only set it once + # @param val Boolean indicating whether this cert is for a CA + def set_is_ca(self, val): + if val is None: + return + + if self.isCA != None: + # Can't double set properties + raise Exception("Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)) + + self.isCA = val if val: self.add_extension('basicConstraints', 1, 'CA:TRUE') + else: + self.add_extension('basicConstraints', 1, 'CA:FALSE') @@ -452,16 +589,48 @@ class Certificate: # @param value string containing value of the extension def add_extension(self, name, critical, value): + oldExtVal = None + try: + oldExtVal = self.get_extension(name) + except: + # M2Crypto LookupError when the extension isn't there (yet) + pass + + # This code limits you from adding the extension with the same value + # The method comment says you shouldn't do this with the same name + # But actually it (m2crypto) appears to allow you to do this. + if oldExtVal and oldExtVal == value: + # don't add this extension again + # just do nothing as here + return + # FIXME: What if they are trying to set with a different value? + # Is this ever OK? Or should we raise an exception? +# elif oldExtVal: +# raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value) + ext = crypto.X509Extension (name, critical, value) - self.cert.add_extensions([ext]) + self.x509.add_extensions([ext]) ## # Get an X509 extension from the certificate def get_extension(self, name): + + if name is None: + return None + + certstr = self.save_to_string() + if certstr is None or certstr == "": + return None # pyOpenSSL does not have a way to get extensions - m2x509 = X509.load_cert_string(self.save_to_string()) + m2x509 = X509.load_cert_string(certstr) + if m2x509 is None: + logger.warn("No cert loaded in get_extension") + return None + if m2x509.get_ext(name) is None: + return None value = m2x509.get_ext(name).get_value() + return value ## @@ -496,27 +665,30 @@ class Certificate: # Sign the certificate using the issuer private key and issuer subject previous set with set_issuer(). def sign(self): - assert self.cert != None + logger.debug('certificate.sign') + assert self.x509 != None assert self.issuerSubject != None assert self.issuerKey != None - self.cert.set_issuer(self.issuerSubject) - self.cert.sign(self.issuerKey.get_openssl_pkey(), self.digest) + self.x509.set_issuer(self.issuerSubject) + self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest) ## # Verify the authenticity of a certificate. # @param pkey is a Keypair object representing a public key. If Pkey # did not sign the certificate, then an exception will be thrown. - def verify(self, pkey): + def verify(self, pubkey): # pyOpenSSL does not have a way to verify signatures m2x509 = X509.load_cert_string(self.save_to_string()) - m2pkey = pkey.get_m2_pkey() + m2pubkey = pubkey.get_m2_pubkey() # verify it - return m2x509.verify(m2pkey) + # verify returns -1 or 0 on failure depending on how serious the + # error conditions are + return m2x509.verify(m2pubkey) == 1 # XXX alternatively, if openssl has been patched, do the much simpler: # try: - # self.cert.verify(pkey.get_openssl_key()) + # self.x509.verify(pkey.get_openssl_key()) # return 1 # except: # return 0 @@ -569,6 +741,7 @@ class Certificate: # child. If a parent did not sign a child, then an exception is thrown. If # the bottom of the recursion is reached and the certificate does not match # a trusted root, then an exception is thrown. + # Also require that parents are CAs. # # @param Trusted_certs is a list of certificates that are trusted. # @@ -579,30 +752,107 @@ class Certificate: # until a certificate is found that is signed by a trusted root. # verify expiration time - if self.cert.has_expired(): - raise CertExpired(self.get_subject(), "client cert") - + if self.x509.has_expired(): + if debug_verify_chain: + logger.debug("verify_chain: NO, Certificate %s has expired" % self.pretty_cert()) + raise CertExpired(self.pretty_cert(), "client cert") + # if this cert is signed by a trusted_cert, then we are set for trusted_cert in trusted_certs: if self.is_signed_by_cert(trusted_cert): - sfa_logger.debug("Cert %s signed by trusted cert %s", self.get_subject(), trusted_cert.get_subject()) # verify expiration of trusted_cert ? - if not trusted_cert.cert.has_expired(): + if not trusted_cert.x509.has_expired(): + if debug_verify_chain: + logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%( + self.pretty_cert(), trusted_cert.pretty_cert())) return trusted_cert else: - sfa_logger.debug("Trusted cert %s is expired", trusted_cert.get_subject()) + if debug_verify_chain: + logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%( + self.pretty_cert(),trusted_cert.pretty_cert())) + raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert()) # if there is no parent, then no way to verify the chain if not self.parent: - sfa_logger.debug("%r has no parent"%self.get_subject()) - raise CertMissingParent(self.get_subject()) + if debug_verify_chain: + logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\ + (self.pretty_cert(), self.get_issuer(), len(trusted_certs))) + raise CertMissingParent(self.pretty_cert() + \ + ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\ + (self.get_issuer(), len(trusted_certs))) # if it wasn't signed by the parent... if not self.is_signed_by_cert(self.parent): - sfa_logger.debug("%r is not signed by parent"%self.get_subject()) - return CertNotSignedByParent(self.get_subject()) + if debug_verify_chain: + logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\ + (self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) + raise CertNotSignedByParent("%s: Parent %s, issuer %s"\ + % (self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) + + # Confirm that the parent is a CA. Only CAs can be trusted as + # signers. + # Note that trusted roots are not parents, so don't need to be + # CAs. + # Ugly - cert objects aren't parsed so we need to read the + # extension and hope there are no other basicConstraints + if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'): + logger.warn("verify_chain: cert %s's parent %s is not a CA" % \ + (self.pretty_cert(), self.parent.pretty_cert())) + raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(), + self.parent.pretty_cert())) # if the parent isn't verified... + if debug_verify_chain: + logger.debug("verify_chain: .. %s, -> verifying parent %s"%\ + (self.pretty_cert(),self.parent.pretty_cert())) self.parent.verify_chain(trusted_certs) return + + ### more introspection + def get_extensions(self): + # pyOpenSSL does not have a way to get extensions + triples = [] + m2x509 = X509.load_cert_string(self.save_to_string()) + nb_extensions = m2x509.get_ext_count() + logger.debug("X509 had %d extensions"%nb_extensions) + for i in range(nb_extensions): + ext=m2x509.get_ext_at(i) + triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) ) + return triples + + def get_data_names(self): + return self.data.keys() + + def get_all_datas (self): + triples = self.get_extensions() + for name in self.get_data_names(): + triples.append( (name,self.get_data(name),'data',) ) + return triples + + # only informative + def get_filename(self): + return getattr(self,'filename',None) + + def dump (self, *args, **kwargs): + print(self.dump_string(*args, **kwargs)) + + def dump_string (self,show_extensions=False): + result = "" + result += "CERTIFICATE for %s\n"%self.pretty_cert() + result += "Issued by %s\n"%self.get_issuer() + filename=self.get_filename() + if filename: result += "Filename %s\n"%filename + if show_extensions: + all_datas = self.get_all_datas() + result += " has %d extensions/data attached"%len(all_datas) + for (n, v, c) in all_datas: + if c=='data': + result += " data: %s=%s\n"%(n,v) + else: + result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v) + return result