-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
-##
+#
# SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
# the necessary crypto functionality. Ideally just one of these libraries
# would be used, but unfortunately each of these libraries is independently
##
#
+# Notes on using the openssl command line
+#
+# for verifying the chain in a gid,
+# assuming it is split into pieces p1.pem p2.pem p3.pem
+# you can use openssl to verify the chain using this command
+# openssl verify -verbose -CAfile <(cat p2.pem p3.pem) p1.pem
+# also you can use sfax509 to invoke openssl x509 on all parts of the gid
+#
+
+
from __future__ import print_function
import functools
import OpenSSL
# M2Crypto is imported on the fly to minimize crashes
-#import M2Crypto
+# import M2Crypto
from sfa.util.py23 import PY3
-from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
+from sfa.util.faults import (CertExpired, CertMissingParent,
+ CertNotSignedByParent)
from sfa.util.sfalogging import logger
# this tends to generate quite some logs for little or no value
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
raise Exception(
- "keyconvert: generated certificate not found. keyconvert may have failed.")
+ "generated certificate not found. keyconvert may have failed.")
k = Keypair()
try:
import M2Crypto
if glo_passphrase_callback:
self.key = OpenSSL.crypto.load_privatekey(
- OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ OpenSSL.crypto.FILETYPE_PEM, string,
+ functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(
- string, functools.partial(glo_passphrase_callback, self, string))
+ string, functools.partial(glo_passphrase_callback,
+ self, string))
else:
self.key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, string)
# Return the private key in PEM format.
def as_pem(self):
- return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.key)
+ return OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, self.key)
##
# Return an M2Crypto key object
# @param filename If filename!=None, load the certficiate from the file.
# @param isCA If !=None, set whether this cert is for a CA
- def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):
+ def __init__(self, lifeDays=1825, create=False, subject=None, string=None,
+ filename=None, isCA=None):
# these used to be defined in the class !
self.x509 = None
self.issuerKey = None
self.load_from_file(filename)
# Set the CA bit if a value was supplied
- if isCA != None:
+ if isCA is not None:
self.set_is_ca(isCA)
# Create a blank X509 certificate and store it in this object.
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
- string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
+ string = '-----BEGIN CERTIFICATE-----'\
+ '\n{}\n-----END CERTIFICATE-----'\
.format(string)
# If there is a PEM cert in there, but there is some other text first
##
# Save the certificate to a string.
#
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_string(self, save_parents=True):
if self.x509 is None:
##
# Save the certificate to a file.
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_file(self, filename, save_parents=True, filep=None):
string = self.save_to_string(save_parents=save_parents)
##
# Save the certificate to a random file in /tmp/
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_random_tmp_file(self, save_parents=True):
fp, filename = mkstemp(suffix='cert', text=True)
fp = os.fdopen(fp, "w")
# Sets the issuer private key and name
# @param key Keypair object containing the private key of the issuer
# @param subject String containing the name of the issuer
- # @param cert (optional) Certificate object containing the name of the issuer
+ # @param cert (optional)
+ # Certificate object containing the name of the issuer
def set_issuer(self, key, subject=None, cert=None):
self.issuerKey = key
message += "]"
return message
+ def pretty_chain(self):
+ message = "{}".format(self.x509.get_subject())
+ parent = self.parent
+ while parent:
+ message += "->{}".format(parent.x509.get_subject())
+ parent = parent.parent
+ return message
+
+ def pretty_name(self):
+ return self.get_filename() or self.pretty_chain()
+
##
# Get the public key of the certificate.
#
def set_intermediate_ca(self, val):
return self.set_is_ca(val)
- # Set whether this cert is for a CA. All signers and only signers should be CAs.
+ # Set whether this cert is for a CA.
+ # All signers and only signers should be CAs.
# The local member starts unset, letting us check that you only set it once
# @param val Boolean indicating whether this cert is for a CA
def set_is_ca(self, val):
if val is None:
return
- if self.isCA != None:
+ if self.isCA is not None:
# Can't double set properties
- raise Exception("Cannot set basicConstraints CA:?? more than once. "
- "Was {}, trying to set as {}%s".format(self.isCA, val))
+ raise Exception(
+ "Cannot set basicConstraints CA:?? more than once. "
+ "Was {}, trying to set as {}".format(self.isCA, val))
self.isCA = val
if val:
self.add_extension('basicConstraints', 1, 'CA:FALSE')
##
- # Add an X509 extension to the certificate. Add_extension can only be called
- # once for a particular extension name, due to limitations in the underlying
- # library.
+ # Add an X509 extension to the certificate. Add_extension can only
+ # be called once for a particular extension name, due to
+ # limitations in the underlying library.
#
# @param name string containing name of extension
# @param value string containing value of the extension
def add_extension(self, name, critical, value):
- import M2Crypto
oldExtVal = None
try:
oldExtVal = self.get_extension(name)
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
-# raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value)
+# raise "Cannot add extension {} which had val {} with new val {}"\
+# .format(name, oldExtVal, value)
ext = OpenSSL.crypto.X509Extension(name, critical, value)
self.x509.add_extensions([ext])
return value
##
- # Set_data is a wrapper around add_extension. It stores the parameter str in
- # the X509 subject_alt_name extension. Set_data can only be called once, due
- # to limitations in the underlying library.
+ # Set_data is a wrapper around add_extension. It stores the
+ # parameter str in the X509 subject_alt_name extension. Set_data
+ # can only be called once, due to limitations in the underlying
+ # library.
- def set_data(self, str, field='subjectAltName'):
+ def set_data(self, string, field='subjectAltName'):
# pyOpenSSL only allows us to add extensions, so if we try to set the
# same extension more than once, it will not work
if field in self.data:
raise Exception("Cannot set {} more than once".format(field))
- self.data[field] = str
- self.add_extension(field, 0, str)
+ self.data[field] = string
+ self.add_extension(field, 0, string)
##
# Return the data string that was previously set with set_data
def sign(self):
logger.debug('certificate.sign')
- assert self.x509 != None
- assert self.issuerSubject != None
- assert self.issuerKey != None
+ assert self.x509 is not None
+ assert self.issuerSubject is not None
+ assert self.issuerKey is not None
self.x509.set_issuer(self.issuerSubject)
self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
m2x509 = M2Crypto.X509.load_cert_string(self.save_to_string())
m2pubkey = pubkey.get_m2_pubkey()
# verify it
- # verify returns -1 or 0 on failure depending on how serious the
- # error conditions are
- return m2x509.verify(m2pubkey) == 1
+ # https://www.openssl.org/docs/man1.1.0/crypto/X509_verify.html
+ # verify returns
+ # 1 if it checks out
+ # 0 if if does not
+ # -1 if it could not be checked 'for some reason'
+ m2result = m2x509.verify(m2pubkey)
+ result = m2result == 1
+ if debug_verify_chain:
+ logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
+ .format(result, m2result,
+ self.pretty_cert(), m2pubkey))
+ return result
# XXX alternatively, if openssl has been patched, do the much simpler:
# try:
# return 0
##
- # Return True if pkey is identical to the public key that is contained in the certificate.
+ # Return True if pkey is identical to the public key that is
+ # contained in the certificate.
# @param pkey Keypair object
def is_pubkey(self, pkey):
# @param cert certificate object
def is_signed_by_cert(self, cert):
+ logger.debug("Certificate.is_signed_by_cert -> invoking verify")
k = cert.get_pubkey()
result = self.verify(k)
return result
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
+ logger.debug("Certificate.verify_chain {}".format(self.pretty_name()))
# verify expiration time
if self.x509.has_expired():
if debug_verify_chain:
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
- for trusted_cert in trusted_certs:
+ for i, trusted_cert in enumerate(trusted_certs, 1):
+ logger.debug("Certificate.verify_chain - trying trusted #{} : {}"
+ .format(i, trusted_cert.pretty_name()))
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
- logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ logger.debug("verify_chain: YES."
+ " Cert {} signed by trusted cert {}"
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
return trusted_cert
else:
if debug_verify_chain:
- logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+ logger.debug("verify_chain: NO. Cert {} "
+ "is signed by trusted_cert {}, "
"but that signer is expired..."
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ .format(self.pretty_cert(),
+ trusted_cert.pretty_cert()))
raise CertExpired("{} signer trusted_cert {}"
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
+ else:
+ logger.debug("verify_chain: not a direct"
+ " descendant of a trusted root")
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
- .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
- raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+ .format(self.pretty_name(), self.get_issuer(),
+ len(trusted_certs)))
+ raise CertMissingParent("{}: Issuer {} is not "
+ "one of the {} trusted roots, "
"and cert has no parent."
- .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
+ .format(self.pretty_name(),
+ self.get_issuer(),
+ len(trusted_certs)))
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
if debug_verify_chain:
- logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}"
- .format(self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
+ logger.debug("verify_chain: NO. {} is not signed by parent {}"
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
+ self.save_to_file("/tmp/xxx-capture.pem", save_parents=True)
raise CertNotSignedByParent("{}: Parent {}, issuer {}"
- .format(self.pretty_cert(),
- self.parent.pretty_cert(),
+ .format(self.pretty_name(),
+ self.parent.pretty_name(),
self.get_issuer()))
# Confirm that the parent is a CA. Only CAs can be trusted as
# CAs.
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
- if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
+ if not self.parent.isCA and not (
+ self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
- .format(self.pretty_cert(), self.parent.pretty_cert()))
+ .format(self.pretty_name(), self.parent.pretty_name()))
raise CertNotSignedByParent("{}: Parent {} not a CA"
- .format(self.pretty_cert(), self.parent.pretty_cert()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
- .format(self.pretty_cert(), self.parent.pretty_cert()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
self.parent.verify_chain(trusted_certs)
return