# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
-# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
+# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
- OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
+ OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
self.m2key = M2Crypto.EVP.load_key_string(
string, functools.partial(glo_passphrase_callback, self, string))
else:
self.m2key = M2Crypto.EVP.load_key_string(
string, functools.partial(glo_passphrase_callback, self, string))
else:
- m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)
+ m2name.add_entry_by_txt(field="CN", type=0x1001,
+ entry="junk", len=-1, loc=-1, set=0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
def dump(self, *args, **kwargs):
print(self.dump_string(*args, **kwargs))
def dump_string(self):
def dump(self, *args, **kwargs):
print(self.dump_string(*args, **kwargs))
def dump_string(self):
result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
filename = self.get_filename()
result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
filename = self.get_filename()
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
self.x509 = OpenSSL.crypto.X509()
# FIXME: Use different serial #s
self.x509.set_serial_number(3)
self.x509 = OpenSSL.crypto.X509()
# FIXME: Use different serial #s
self.x509.set_serial_number(3)
- self.x509.gmtime_adj_notBefore(0) # 0 means now
- self.x509.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default
- self.x509.set_version(2) # x509v3 so it can have extensions
-
+ self.x509.gmtime_adj_notBefore(0) # 0 means now
+ self.x509.gmtime_adj_notAfter(
+ lifeDays * 60 * 60 * 24) # five years is default
+ self.x509.set_version(2) # x509v3 so it can have extensions
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
- string.count(Certificate.separator) == 0:
- parts = string.split('-----END CERTIFICATE-----',1)
+ string.count(Certificate.separator) == 0:
+ parts = string.split('-----END CERTIFICATE-----', 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
- string = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, self.x509)
- if isinstance(string, bytes):
+ string = OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, self.x509)
+ if PY3 and isinstance(string, bytes):
# let's try to make this a little more usable as is makes logs hairy
# FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
pretty_fields = ['email']
# let's try to make this a little more usable as is makes logs hairy
# FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
pretty_fields = ['email']
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
# @param value string containing value of the extension
def add_extension(self, name, critical, value):
# @param value string containing value of the extension
def add_extension(self, name, critical, value):
- # verify returns -1 or 0 on failure depending on how serious the
- # error conditions are
- return m2x509.verify(m2pubkey) == 1
+ # https://www.openssl.org/docs/man1.1.0/crypto/X509_verify.html
+ # verify returns
+ # 1 if it checks out
+ # 0 if if does not
+ # -1 if it could not be checked 'for some reason'
+ m2result = m2x509.verify(m2pubkey)
+ result = m2result == 1
+ if debug_verify_chain:
+ logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
+ .format(result, m2result, self.pretty_cert(), m2pubkey))
+ return result
# @param cert certificate object
def is_signed_by_cert(self, cert):
# @param cert certificate object
def is_signed_by_cert(self, cert):
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
- for trusted_cert in trusted_certs:
+ for i, trusted_cert in enumerate(trusted_certs, 1):
+ logger.debug("Certificate.verify_chain - trying trusted #{} : {}"
+ .format(i, trusted_cert.pretty_name()))
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
return trusted_cert
else:
if debug_verify_chain:
logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
"but that signer is expired..."
return trusted_cert
else:
if debug_verify_chain:
logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
"but that signer is expired..."
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ .format(self.pretty_name(), trusted_cert.pretty_name()))
+ else:
+ logger.debug("verify_chain: not a direct descendant of a trusted root".
+ format(self.pretty_name(), trusted_cert))
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
"and cert has no parent."
raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
"and cert has no parent."
- logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}"
- .format(self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
+ logger.debug("verify_chain: NO. {} is not signed by parent {}"
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
+ self.save_to_file("/tmp/xxx-capture.pem", save_parents=True)
- .format(self.pretty_cert(),
- self.parent.pretty_cert(),
+ .format(self.pretty_name(),
+ self.parent.pretty_name(),
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
logger.debug("X509 had {} extensions".format(nb_extensions))
for i in range(nb_extensions):
ext = m2x509.get_ext_at(i)
logger.debug("X509 had {} extensions".format(nb_extensions))
for i in range(nb_extensions):
ext = m2x509.get_ext_at(i)
- triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
+ triples.append(
+ (ext.get_name(), ext.get_value(), ext.get_critical(),))
- triples.append( (name,self.get_data(name),'data',) )
+ triples.append((name, self.get_data(name), 'data',))
def dump(self, *args, **kwargs):
print(self.dump_string(*args, **kwargs))
def dump(self, *args, **kwargs):
print(self.dump_string(*args, **kwargs))