X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Ftrust%2Fcertificate.py;h=758198ecae651a7c5ccb1dc845e5be6cbec7d2e7;hb=dfafd9825fecddbbd1ea4c7093766c386f660c29;hp=ff6534e083fc23537469f9cd483ab743deb25e5b;hpb=d0aed7cce376986611204f90dd2b61b4648532e1;p=sfa.git diff --git a/sfa/trust/certificate.py b/sfa/trust/certificate.py index ff6534e0..758198ec 100644 --- a/sfa/trust/certificate.py +++ b/sfa/trust/certificate.py @@ -35,6 +35,8 @@ ## # +from __future__ import print_function + import functools import os import tempfile @@ -88,11 +90,11 @@ def test_passphrase(string, passphrase): def convert_public_key(key): keyconvert_path = "/usr/bin/keyconvert.py" if not os.path.isfile(keyconvert_path): - raise IOError, "Could not find keyconvert in %s" % keyconvert_path + raise IOError("Could not find keyconvert in {}".format(keyconvert_path)) # we can only convert rsa keys if "ssh-dss" in key: - raise Exception, "keyconvert: dss keys are not supported" + raise Exception("keyconvert: dss keys are not supported") (ssh_f, ssh_fn) = tempfile.mkstemp() ssl_fn = tempfile.mktemp() @@ -106,7 +108,7 @@ def convert_public_key(key): # that it can be expected to see why it failed. # TODO: for production, cleanup the temporary files if not os.path.exists(ssl_fn): - raise Exception, "keyconvert: generated certificate not found. keyconvert may have failed." + raise Exception("keyconvert: generated certificate not found. keyconvert may have failed.") k = Keypair() try: @@ -135,8 +137,8 @@ class Keypair: # Creates a Keypair object # @param create If create==True, creates a new public/private key and # stores it in the object - # @param string If string!=None, load the keypair from the string (PEM) - # @param filename If filename!=None, load the keypair from the file + # @param string If string != None, load the keypair from the string (PEM) + # @param filename If filename != None, load the keypair from the file def __init__(self, create=False, string=None, filename=None): if create: @@ -159,13 +161,13 @@ class Keypair: def save_to_file(self, filename): open(filename, 'w').write(self.as_pem()) - self.filename=filename + self.filename = filename ## # Load the private key from a file. Implicity the private key includes the public key. def load_from_file(self, filename): - self.filename=filename + self.filename = filename buffer = open(filename, 'r').read() self.load_from_string(buffer) @@ -215,7 +217,7 @@ class Keypair: # get the pyopenssl pkey from the pyopenssl x509 self.key = pyx509.get_pubkey() - self.filename=filename + self.filename = filename ## # Load the public key from a string. No private key is loaded. @@ -279,14 +281,14 @@ class Keypair: def get_filename(self): return getattr(self,'filename',None) - def dump (self, *args, **kwargs): - print self.dump_string(*args, **kwargs) + def dump(self, *args, **kwargs): + print(self.dump_string(*args, **kwargs)) - def dump_string (self): - result="" - result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string() - filename=self.get_filename() - if filename: result += "Filename %s\n"%filename + def dump_string(self): + result = "" + result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string()) + filename = self.get_filename() + if filename: result += "Filename {}\n".format(filename) return result ## @@ -310,7 +312,7 @@ class Certificate: # parent = None isCA = None # will be a boolean once set - separator="-----parent-----" + separator = "-----parent-----" ## # Create a certificate object. @@ -377,7 +379,8 @@ class Certificate: # If it's not in proper PEM format, wrap it if string.count('-----BEGIN CERTIFICATE') == 0: - string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string + string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\ + .format(string) # If there is a PEM cert in there, but there is some other text first # such as the text of the certificate, skip the text @@ -398,7 +401,7 @@ class Certificate: self.x509 = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0]) if self.x509 is None: - logger.warn("Loaded from string but cert is None: %s" % string) + logger.warn("Loaded from string but cert is None: {}".format(string)) # if there are more certs, then create a parent and let the parent load # itself from the remainder of the string @@ -413,7 +416,7 @@ class Certificate: file = open(filename) string = file.read() self.load_from_string(string) - self.filename=filename + self.filename = filename ## # Save the certificate to a string. @@ -441,7 +444,7 @@ class Certificate: f = open(filename, 'w') f.write(string) f.close() - self.filename=filename + self.filename = filename ## # Save the certificate to a random file in /tmp/ @@ -466,7 +469,7 @@ class Certificate: if isinstance(subject, dict) or isinstance(subject, str): req = crypto.X509Req() reqSubject = req.get_subject() - if (isinstance(subject, dict)): + if isinstance(subject, dict): for key in reqSubject.keys(): setattr(reqSubject, key, subject[key]) else: @@ -493,7 +496,7 @@ class Certificate: def set_subject(self, name): req = crypto.X509Req() subj = req.get_subject() - if (isinstance(name, dict)): + if isinstance(name, dict): for key in name.keys(): setattr(subj, key, name[key]) else: @@ -530,7 +533,7 @@ class Certificate: counter = 0 filtered = [self.filter_chunk(chunk) for chunk in data.split()] message += " ".join( [f for f in filtered if f]) - omitted = len ([f for f in filtered if not f]) + omitted = len([f for f in filtered if not f]) if omitted: message += "..+{} omitted".format(omitted) message += "]" @@ -568,7 +571,8 @@ class Certificate: if self.isCA != None: # Can't double set properties - raise Exception, "Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val) + raise Exception("Cannot set basicConstraints CA:?? more than once. " + "Was {}, trying to set as {}%s".format(self.isCA, val)) self.isCA = val if val: @@ -604,9 +608,9 @@ class Certificate: # FIXME: What if they are trying to set with a different value? # Is this ever OK? Or should we raise an exception? # elif oldExtVal: -# raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value) +# raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value) - ext = crypto.X509Extension (name, critical, value) + ext = crypto.X509Extension(name, critical, value) self.x509.add_extensions([ext]) ## @@ -639,8 +643,8 @@ class Certificate: def set_data(self, str, field='subjectAltName'): # pyOpenSSL only allows us to add extensions, so if we try to set the # same extension more than once, it will not work - if self.data.has_key(field): - raise "Cannot set ", field, " more than once" + if field in self.data: + raise Exception("Cannot set {} more than once".format(field)) self.data[field] = str self.add_extension(field, 0, str) @@ -648,7 +652,7 @@ class Certificate: # Return the data string that was previously set with set_data def get_data(self, field='subjectAltName'): - if self.data.has_key(field): + if field in self.data: return self.data[field] try: @@ -752,7 +756,8 @@ class Certificate: # verify expiration time if self.x509.has_expired(): if debug_verify_chain: - logger.debug("verify_chain: NO, Certificate %s has expired" % self.pretty_cert()) + logger.debug("verify_chain: NO, Certificate {} has expired" + .format(self.pretty_cert())) raise CertExpired(self.pretty_cert(), "client cert") # if this cert is signed by a trusted_cert, then we are set @@ -761,35 +766,38 @@ class Certificate: # verify expiration of trusted_cert ? if not trusted_cert.x509.has_expired(): if debug_verify_chain: - logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%( - self.pretty_cert(), trusted_cert.pretty_cert())) + logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}" + .format(self.pretty_cert(), trusted_cert.pretty_cert())) return trusted_cert else: if debug_verify_chain: - logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%( - self.pretty_cert(),trusted_cert.pretty_cert())) - raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert()) + logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, " + "but that signer is expired..." + .format(self.pretty_cert(),trusted_cert.pretty_cert())) + raise CertExpired("{} signer trusted_cert {}" + .format(self.pretty_cert(), trusted_cert.pretty_cert())) # if there is no parent, then no way to verify the chain if not self.parent: if debug_verify_chain: - logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\ - (self.pretty_cert(), self.get_issuer(), len(trusted_certs))) - raise CertMissingParent(self.pretty_cert() + \ - ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\ - (self.get_issuer(), len(trusted_certs))) + logger.debug("verify_chain: NO. {} has no parent " + "and issuer {} is not in {} trusted roots" + .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs))) + raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, " + "and cert has no parent." + .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs))) # if it wasn't signed by the parent... if not self.is_signed_by_cert(self.parent): if debug_verify_chain: - logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\ - (self.pretty_cert(), - self.parent.pretty_cert(), - self.get_issuer())) - raise CertNotSignedByParent("%s: Parent %s, issuer %s"\ - % (self.pretty_cert(), - self.parent.pretty_cert(), - self.get_issuer())) + logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}" + .format(self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) + raise CertNotSignedByParent("{}: Parent {}, issuer {}" + .format(self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) # Confirm that the parent is a CA. Only CAs can be trusted as # signers. @@ -798,15 +806,15 @@ class Certificate: # Ugly - cert objects aren't parsed so we need to read the # extension and hope there are no other basicConstraints if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'): - logger.warn("verify_chain: cert %s's parent %s is not a CA" % \ - (self.pretty_cert(), self.parent.pretty_cert())) - raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(), - self.parent.pretty_cert())) + logger.warn("verify_chain: cert {}'s parent {} is not a CA" + .format(self.pretty_cert(), self.parent.pretty_cert())) + raise CertNotSignedByParent("{}: Parent {} not a CA" + .format(self.pretty_cert(), self.parent.pretty_cert())) # if the parent isn't verified... if debug_verify_chain: - logger.debug("verify_chain: .. %s, -> verifying parent %s"%\ - (self.pretty_cert(),self.parent.pretty_cert())) + logger.debug("verify_chain: .. {}, -> verifying parent {}" + .format(self.pretty_cert(),self.parent.pretty_cert())) self.parent.verify_chain(trusted_certs) return @@ -817,16 +825,16 @@ class Certificate: triples = [] m2x509 = X509.load_cert_string(self.save_to_string()) nb_extensions = m2x509.get_ext_count() - logger.debug("X509 had %d extensions"%nb_extensions) + logger.debug("X509 had {} extensions".format(nb_extensions)) for i in range(nb_extensions): - ext=m2x509.get_ext_at(i) + ext = m2x509.get_ext_at(i) triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) ) return triples def get_data_names(self): return self.data.keys() - def get_all_datas (self): + def get_all_datas(self): triples = self.get_extensions() for name in self.get_data_names(): triples.append( (name,self.get_data(name),'data',) ) @@ -836,21 +844,22 @@ class Certificate: def get_filename(self): return getattr(self,'filename',None) - def dump (self, *args, **kwargs): - print self.dump_string(*args, **kwargs) + def dump(self, *args, **kwargs): + print(self.dump_string(*args, **kwargs)) - def dump_string (self,show_extensions=False): + def dump_string(self, show_extensions=False): result = "" - result += "CERTIFICATE for %s\n"%self.pretty_cert() - result += "Issued by %s\n"%self.get_issuer() - filename=self.get_filename() - if filename: result += "Filename %s\n"%filename + result += "CERTIFICATE for {}\n".format(self.pretty_cert()) + result += "Issued by {}\n".format(self.get_issuer()) + filename = self.get_filename() + if filename: + result += "Filename {}\n".format(filename) if show_extensions: all_datas = self.get_all_datas() - result += " has %d extensions/data attached"%len(all_datas) - for (n, v, c) in all_datas: - if c=='data': - result += " data: %s=%s\n"%(n,v) + result += " has {} extensions/data attached".format(len(all_datas)) + for n, v, c in all_datas: + if c == 'data': + result += " data: {}={}\n".format(n, v) else: - result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v) + result += " ext: {} (crit={})=<<<{}>>>\n".format(n, c, v) return result