# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
-# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
+# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
- crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))
+ OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
(ssh_f, ssh_fn) = tempfile.mkstemp()
ssl_fn = tempfile.mktemp()
(ssh_f, ssh_fn) = tempfile.mkstemp()
ssl_fn = tempfile.mktemp()
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
# Creates a Keypair object
# @param create If create==True, creates a new public/private key and
# stores it in the object
# Creates a Keypair object
# @param create If create==True, creates a new public/private key and
# stores it in the object
- # @param string If string!=None, load the keypair from the string (PEM)
- # @param filename If filename!=None, load the keypair from the file
+ # @param string If string != None, load the keypair from the string (PEM)
+ # @param filename If filename != None, load the keypair from the file
def __init__(self, create=False, string=None, filename=None):
if create:
def __init__(self, create=False, string=None, filename=None):
if create:
- self.key = crypto.PKey()
- self.key.generate_key(crypto.TYPE_RSA, 1024)
+ self.key = OpenSSL.crypto.PKey()
+ self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
##
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
##
- self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )
- self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )
+ self.key = OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ self.m2key = M2Crypto.EVP.load_key_string(
+ string, functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(string)
##
# Load the public key from a string. No private key is loaded.
def load_pubkey_from_file(self, filename):
self.m2key = M2Crypto.EVP.load_key_string(string)
##
# Load the public key from a string. No private key is loaded.
def load_pubkey_from_file(self, filename):
- m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)
+ m2name.add_entry_by_txt(field="CN", type=0x1001,
+ entry="junk", len=-1, loc=-1, set=0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
# Returns a string containing the public key represented by this object.
def get_pubkey_string(self):
# Returns a string containing the public key represented by this object.
def get_pubkey_string(self):
return self.as_pem() == pkey.as_pem()
def sign_string(self, data):
return self.as_pem() == pkey.as_pem()
def sign_string(self, data):
k.sign_init()
k.sign_update(data)
return base64.b64encode(k.sign_final())
def verify_string(self, data, sig):
k.sign_init()
k.sign_update(data)
return base64.b64encode(k.sign_final())
def verify_string(self, data, sig):
- def dump (self, *args, **kwargs):
- print self.dump_string(*args, **kwargs)
+ def dump(self, *args, **kwargs):
+ print(self.dump_string(*args, **kwargs))
- def dump_string (self):
- result=""
- result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
- filename=self.get_filename()
- if filename: result += "Filename %s\n"%filename
+ def dump_string(self):
+ result = ""
+ result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
+ filename = self.get_filename()
+ if filename:
+ result += "Filename {}\n".format(filename)
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
# Create a blank X509 certificate and store it in this object.
def create(self, lifeDays=1825):
# Create a blank X509 certificate and store it in this object.
def create(self, lifeDays=1825):
- self.x509.gmtime_adj_notBefore(0) # 0 means now
- self.x509.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default
- self.x509.set_version(2) # x509v3 so it can have extensions
-
+ self.x509.gmtime_adj_notBefore(0) # 0 means now
+ self.x509.gmtime_adj_notAfter(
+ lifeDays * 60 * 60 * 24) # five years is default
+ self.x509.set_version(2) # x509v3 so it can have extensions
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
- # load it (support for the ---parent--- tag as well as normal chained certs)
+ # load it (support for the ---parent--- tag as well as normal chained
+ # certs)
+
+ if string is None or string.strip() == "":
+ logger.warn("Empty string in load_from_string")
+ return
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
# If there is a PEM cert in there, but there is some other text first
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
# If there is a PEM cert in there, but there is some other text first
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
- string.count(Certificate.separator) == 0:
- parts = string.split('-----END CERTIFICATE-----',1)
+ string.count(Certificate.separator) == 0:
+ parts = string.split('-----END CERTIFICATE-----', 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
- self.x509 = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])
+ self.x509 = OpenSSL.crypto.load_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, parts[0])
+
+ if self.x509 is None:
+ logger.warn(
+ "Loaded from string but cert is None: {}".format(string))
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
# @param save_parents If save_parents==True, then also save the parent certificates.
def save_to_string(self, save_parents=True):
# @param save_parents If save_parents==True, then also save the parent certificates.
def save_to_string(self, save_parents=True):
- string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.x509)
+ if self.x509 is None:
+ logger.warn("None cert in certificate.save_to_string")
+ return ""
+ string = OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, self.x509)
+ if PY3 and isinstance(string, bytes):
+ string = string.decode()
# it's a mistake to use subject and cert params at the same time
assert(not cert)
if isinstance(subject, dict) or isinstance(subject, str):
# it's a mistake to use subject and cert params at the same time
assert(not cert)
if isinstance(subject, dict) or isinstance(subject, str):
for key in reqSubject.keys():
setattr(reqSubject, key, subject[key])
else:
for key in reqSubject.keys():
setattr(reqSubject, key, subject[key])
else:
# Set the subject name of the certificate
def set_subject(self, name):
# Set the subject name of the certificate
def set_subject(self, name):
##
# Get a pretty-print subject name of the certificate
# let's try to make this a little more usable as is makes logs hairy
##
# Get a pretty-print subject name of the certificate
# let's try to make this a little more usable as is makes logs hairy
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
- raise Exception, "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)
+ raise Exception("Cannot set basicConstraints CA:?? more than once. "
+ "Was {}, trying to set as {}%s".format(self.isCA, val))
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
def set_data(self, str, field='subjectAltName'):
# pyOpenSSL only allows us to add extensions, so if we try to set the
# same extension more than once, it will not work
def set_data(self, str, field='subjectAltName'):
# pyOpenSSL only allows us to add extensions, so if we try to set the
# same extension more than once, it will not work
# Return the data string that was previously set with set_data
def get_data(self, field='subjectAltName'):
# Return the data string that was previously set with set_data
def get_data(self, field='subjectAltName'):
# @param pkey is a Keypair object representing a public key. If Pkey
# did not sign the certificate, then an exception will be thrown.
# @param pkey is a Keypair object representing a public key. If Pkey
# did not sign the certificate, then an exception will be thrown.
- return m2x509.verify(m2pkey)
+ # https://www.openssl.org/docs/man1.1.0/crypto/X509_verify.html
+ # verify returns
+ # 1 if it checks out
+ # 0 if if does not
+ # -1 if it could not be checked 'for some reason'
+ m2result = m2x509.verify(m2pubkey)
+ result = m2result == 1
+ if debug_verify_chain:
+ logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
+ .format(result, m2result, self.pretty_cert(), m2pubkey))
+ return result
# @param cert certificate object
def is_signed_by_cert(self, cert):
# @param cert certificate object
def is_signed_by_cert(self, cert):
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
- for trusted_cert in trusted_certs:
+ for i, trusted_cert in enumerate(trusted_certs, 1):
+ logger.debug("Certificate.verify_chain - trying trusted #{} : {}"
+ .format(i, trusted_cert.pretty_name()))
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
- if debug_verify_chain:
- logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(
- self.pretty_cert(), trusted_cert.pretty_cert()))
+ if debug_verify_chain:
+ logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
+ .format(self.pretty_name(), trusted_cert.pretty_name()))
- logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(
- self.pretty_cert(),trusted_cert.pretty_cert()))
- raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert())
+ logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+ "but that signer is expired..."
+ .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ raise CertExpired("{} signer trusted_cert {}"
+ .format(self.pretty_name(), trusted_cert.pretty_name()))
+ else:
+ logger.debug("verify_chain: not a direct descendant of a trusted root".
+ format(self.pretty_name(), trusted_cert))
- logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\
- (self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
- raise CertMissingParent(self.pretty_cert() + \
- ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\
- (self.get_issuer(), len(trusted_certs)))
+ logger.debug("verify_chain: NO. {} has no parent "
+ "and issuer {} is not in {} trusted roots"
+ .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
+ raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+ "and cert has no parent."
+ .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
- logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\
- (self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
- raise CertNotSignedByParent("%s: Parent %s, issuer %s"\
- % (self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
+ logger.debug("verify_chain: NO. {} is not signed by parent {}"
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
+ self.save_to_file("/tmp/xxx-capture.pem", save_parents=True)
+ raise CertNotSignedByParent("{}: Parent {}, issuer {}"
+ .format(self.pretty_name(),
+ self.parent.pretty_name(),
+ self.get_issuer()))
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
- logger.warn("verify_chain: cert %s's parent %s is not a CA" % \
- (self.pretty_cert(), self.parent.pretty_cert()))
- raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(),
- self.parent.pretty_cert()))
+ logger.warn("verify_chain: cert {}'s parent {} is not a CA"
+ .format(self.pretty_name(), self.parent.pretty_name()))
+ raise CertNotSignedByParent("{}: Parent {} not a CA"
+ .format(self.pretty_name(), self.parent.pretty_name()))
- logger.debug("verify_chain: .. %s, -> verifying parent %s"%\
- (self.pretty_cert(),self.parent.pretty_cert()))
+ logger.debug("verify_chain: .. {}, -> verifying parent {}"
+ .format(self.pretty_name(),self.parent.pretty_name()))
- ext=m2x509.get_ext_at(i)
- triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
+ ext = m2x509.get_ext_at(i)
+ triples.append(
+ (ext.get_name(), ext.get_value(), ext.get_critical(),))
- triples.append( (name,self.get_data(name),'data',) )
+ triples.append((name, self.get_data(name), 'data',))
- def dump (self, *args, **kwargs):
- print self.dump_string(*args, **kwargs)
+ def dump(self, *args, **kwargs):
+ print(self.dump_string(*args, **kwargs))
- result += "CERTIFICATE for %s\n"%self.pretty_cert()
- result += "Issued by %s\n"%self.get_issuer()
- filename=self.get_filename()
- if filename: result += "Filename %s\n"%filename
+ result += "CERTIFICATE for {}\n".format(self.pretty_cert())
+ result += "Issued by {}\n".format(self.get_issuer())
+ filename = self.get_filename()
+ if filename:
+ result += "Filename {}\n".format(filename)
- result += " has %d extensions/data attached"%len(all_datas)
- for (n, v, c) in all_datas:
- if c=='data':
- result += " data: %s=%s\n"%(n,v)
+ result += " has {} extensions/data attached".format(len(all_datas))
+ for n, v, c in all_datas:
+ if c == 'data':
+ result += " data: {}={}\n".format(n, v)