# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
-# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
+# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
from sfa.util.sfalogging import logger
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
##
# Check to see if a passphrase works for a particular private key string.
# Intended to be used by passphrase callbacks for input validation.
- crypto.load_privatekey(crypto.FILETYPE_PEM, string, (lambda x: passphrase))
+ OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, string, (lambda x: passphrase))
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
def convert_public_key(key):
keyconvert_path = "/usr/bin/keyconvert.py"
if not os.path.isfile(keyconvert_path):
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# that it can be expected to see why it failed.
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
# A Keypair object may represent both a public and private key pair, or it
# may represent only a public key (this usage is consistent with OpenSSL).
# Creates a Keypair object
# @param create If create==True, creates a new public/private key and
# stores it in the object
# Creates a Keypair object
# @param create If create==True, creates a new public/private key and
# stores it in the object
- # @param string If string!=None, load the keypair from the string (PEM)
- # @param filename If filename!=None, load the keypair from the file
+ # @param string If string != None, load the keypair from the string (PEM)
+ # @param filename If filename != None, load the keypair from the file
def __init__(self, create=False, string=None, filename=None):
if create:
def __init__(self, create=False, string=None, filename=None):
if create:
- self.key = crypto.PKey()
- self.key.generate_key(crypto.TYPE_RSA, 2048)
+ self.key = OpenSSL.crypto.PKey()
+ self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
##
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
##
- self.key = crypto.load_privatekey(
- crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ self.key = OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(
string, functools.partial(glo_passphrase_callback, self, string))
else:
self.m2key = M2Crypto.EVP.load_key_string(
string, functools.partial(glo_passphrase_callback, self, string))
else:
self.m2key = M2Crypto.EVP.load_key_string(string)
##
# Load the public key from a string. No private key is loaded.
def load_pubkey_from_file(self, filename):
self.m2key = M2Crypto.EVP.load_key_string(string)
##
# Load the public key from a string. No private key is loaded.
def load_pubkey_from_file(self, filename):
- m2name.add_entry_by_txt(field="CN", type=0x1001, entry="junk", len=-1, loc=-1, set=0)
+ m2name.add_entry_by_txt(field="CN", type=0x1001,
+ entry="junk", len=-1, loc=-1, set=0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
m2x509 = M2Crypto.X509.X509()
m2x509.set_pubkey(self.m2key)
m2x509.set_serial_number(0)
return base64.b64encode(k.sign_final())
def verify_string(self, data, sig):
return base64.b64encode(k.sign_final())
def verify_string(self, data, sig):
- def dump_string (self):
- result=""
- result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
- filename=self.get_filename()
- if filename: result += "Filename %s\n"%filename
+ def dump_string(self):
+ result = ""
+ result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
+ filename = self.get_filename()
+ if filename:
+ result += "Filename {}\n".format(filename)
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
# When saving a certificate to a file or a string, the caller can choose
# whether to save the parent certificates as well.
# Create a blank X509 certificate and store it in this object.
def create(self, lifeDays=1825):
# Create a blank X509 certificate and store it in this object.
def create(self, lifeDays=1825):
- self.x509.gmtime_adj_notBefore(0) # 0 means now
- self.x509.gmtime_adj_notAfter(lifeDays*60*60*24) # five years is default
- self.x509.set_version(2) # x509v3 so it can have extensions
-
+ self.x509.gmtime_adj_notBefore(0) # 0 means now
+ self.x509.gmtime_adj_notAfter(
+ lifeDays * 60 * 60 * 24) # five years is default
+ self.x509.set_version(2) # x509v3 so it can have extensions
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
def load_from_string(self, string):
# if it is a chain of multiple certs, then split off the first one and
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
# If there is a PEM cert in there, but there is some other text first
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
# If there is a PEM cert in there, but there is some other text first
# such as the text of the certificate, skip the text
beg = string.find('-----BEGIN CERTIFICATE')
if beg > 0:
- string.count(Certificate.separator) == 0:
- parts = string.split('-----END CERTIFICATE-----',1)
+ string.count(Certificate.separator) == 0:
+ parts = string.split('-----END CERTIFICATE-----', 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
parts[0] += '-----END CERTIFICATE-----'
else:
parts = string.split(Certificate.separator, 1)
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
# if there are more certs, then create a parent and let the parent load
# itself from the remainder of the string
- string = crypto.dump_certificate(crypto.FILETYPE_PEM, self.x509)
+ string = OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, self.x509)
+ if PY3 and isinstance(string, bytes):
+ string = string.decode()
# it's a mistake to use subject and cert params at the same time
assert(not cert)
if isinstance(subject, dict) or isinstance(subject, str):
# it's a mistake to use subject and cert params at the same time
assert(not cert)
if isinstance(subject, dict) or isinstance(subject, str):
for key in reqSubject.keys():
setattr(reqSubject, key, subject[key])
else:
for key in reqSubject.keys():
setattr(reqSubject, key, subject[key])
else:
# Set the subject name of the certificate
def set_subject(self, name):
# Set the subject name of the certificate
def set_subject(self, name):
# let's try to make this a little more usable as is makes logs hairy
# FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
pretty_fields = ['email']
# let's try to make this a little more usable as is makes logs hairy
# FIXME: Consider adding 'urn:publicid' and 'uuid' back for GENI?
pretty_fields = ['email']
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
def filter_chunk(self, chunk):
for field in self.pretty_fields:
if field in chunk:
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
- raise Exception("Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val))
+ raise Exception("Cannot set basicConstraints CA:?? more than once. "
+ "Was {}, trying to set as {}%s".format(self.isCA, val))
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
##
# Add an X509 extension to the certificate. Add_extension can only be called
# once for a particular extension name, due to limitations in the underlying
# @param value string containing value of the extension
def add_extension(self, name, critical, value):
# @param value string containing value of the extension
def add_extension(self, name, critical, value):
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
# did not sign the certificate, then an exception will be thrown.
def verify(self, pubkey):
# did not sign the certificate, then an exception will be thrown.
def verify(self, pubkey):
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
# Verify a chain of certificates. Each certificate must be signed by
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
raise CertExpired(self.pretty_cert(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
- logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(
- self.pretty_cert(), trusted_cert.pretty_cert()))
+ logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
+ .format(self.pretty_cert(), trusted_cert.pretty_cert()))
- logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(
- self.pretty_cert(),trusted_cert.pretty_cert()))
- raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert())
+ logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+ "but that signer is expired..."
+ .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ raise CertExpired("{} signer trusted_cert {}"
+ .format(self.pretty_cert(), trusted_cert.pretty_cert()))
- logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\
- (self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
- raise CertMissingParent(self.pretty_cert() + \
- ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\
- (self.get_issuer(), len(trusted_certs)))
+ logger.debug("verify_chain: NO. {} has no parent "
+ "and issuer {} is not in {} trusted roots"
+ .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
+ raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+ "and cert has no parent."
+ .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
- logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\
- (self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
- raise CertNotSignedByParent("%s: Parent %s, issuer %s"\
- % (self.pretty_cert(),
- self.parent.pretty_cert(),
- self.get_issuer()))
+ logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}"
+ .format(self.pretty_cert(),
+ self.parent.pretty_cert(),
+ self.get_issuer()))
+ raise CertNotSignedByParent("{}: Parent {}, issuer {}"
+ .format(self.pretty_cert(),
+ self.parent.pretty_cert(),
+ self.get_issuer()))
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
- logger.warn("verify_chain: cert %s's parent %s is not a CA" % \
- (self.pretty_cert(), self.parent.pretty_cert()))
- raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(),
- self.parent.pretty_cert()))
+ logger.warn("verify_chain: cert {}'s parent {} is not a CA"
+ .format(self.pretty_cert(), self.parent.pretty_cert()))
+ raise CertNotSignedByParent("{}: Parent {} not a CA"
+ .format(self.pretty_cert(), self.parent.pretty_cert()))
- logger.debug("verify_chain: .. %s, -> verifying parent %s"%\
- (self.pretty_cert(),self.parent.pretty_cert()))
+ logger.debug("verify_chain: .. {}, -> verifying parent {}"
+ .format(self.pretty_cert(), self.parent.pretty_cert()))
- ext=m2x509.get_ext_at(i)
- triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
+ ext = m2x509.get_ext_at(i)
+ triples.append(
+ (ext.get_name(), ext.get_value(), ext.get_critical(),))
- triples.append( (name,self.get_data(name),'data',) )
+ triples.append((name, self.get_data(name), 'data',))
- result += "CERTIFICATE for %s\n"%self.pretty_cert()
- result += "Issued by %s\n"%self.get_issuer()
- filename=self.get_filename()
- if filename: result += "Filename %s\n"%filename
+ result += "CERTIFICATE for {}\n".format(self.pretty_cert())
+ result += "Issued by {}\n".format(self.get_issuer())
+ filename = self.get_filename()
+ if filename:
+ result += "Filename {}\n".format(filename)
- result += " has %d extensions/data attached"%len(all_datas)
- for (n, v, c) in all_datas:
- if c=='data':
- result += " data: %s=%s\n"%(n,v)
+ result += " has {} extensions/data attached".format(len(all_datas))
+ for n, v, c in all_datas:
+ if c == 'data':
+ result += " data: {}={}\n".format(n, v)