From: Thierry Parmentelat Date: Fri, 1 Apr 2016 10:56:21 +0000 (+0200) Subject: prettified certificate, credential and speaksfor_util X-Git-Tag: sfa-3.1-21~12 X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=commitdiff_plain;h=dfafd9825fecddbbd1ea4c7093766c386f660c29 prettified certificate, credential and speaksfor_util prettified speaks_for --- diff --git a/sfa/trust/certificate.py b/sfa/trust/certificate.py index 8e9cf7d9..758198ec 100644 --- a/sfa/trust/certificate.py +++ b/sfa/trust/certificate.py @@ -90,7 +90,7 @@ def test_passphrase(string, passphrase): def convert_public_key(key): keyconvert_path = "/usr/bin/keyconvert.py" if not os.path.isfile(keyconvert_path): - raise IOError("Could not find keyconvert in %s" % keyconvert_path) + raise IOError("Could not find keyconvert in {}".format(keyconvert_path)) # we can only convert rsa keys if "ssh-dss" in key: @@ -137,8 +137,8 @@ class Keypair: # Creates a Keypair object # @param create If create==True, creates a new public/private key and # stores it in the object - # @param string If string!=None, load the keypair from the string (PEM) - # @param filename If filename!=None, load the keypair from the file + # @param string If string != None, load the keypair from the string (PEM) + # @param filename If filename != None, load the keypair from the file def __init__(self, create=False, string=None, filename=None): if create: @@ -161,13 +161,13 @@ class Keypair: def save_to_file(self, filename): open(filename, 'w').write(self.as_pem()) - self.filename=filename + self.filename = filename ## # Load the private key from a file. Implicity the private key includes the public key. def load_from_file(self, filename): - self.filename=filename + self.filename = filename buffer = open(filename, 'r').read() self.load_from_string(buffer) @@ -217,7 +217,7 @@ class Keypair: # get the pyopenssl pkey from the pyopenssl x509 self.key = pyx509.get_pubkey() - self.filename=filename + self.filename = filename ## # Load the public key from a string. No private key is loaded. @@ -281,14 +281,14 @@ class Keypair: def get_filename(self): return getattr(self,'filename',None) - def dump (self, *args, **kwargs): + def dump(self, *args, **kwargs): print(self.dump_string(*args, **kwargs)) - def dump_string (self): - result="" - result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string() - filename=self.get_filename() - if filename: result += "Filename %s\n"%filename + def dump_string(self): + result = "" + result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string()) + filename = self.get_filename() + if filename: result += "Filename {}\n".format(filename) return result ## @@ -312,7 +312,7 @@ class Certificate: # parent = None isCA = None # will be a boolean once set - separator="-----parent-----" + separator = "-----parent-----" ## # Create a certificate object. @@ -379,7 +379,8 @@ class Certificate: # If it's not in proper PEM format, wrap it if string.count('-----BEGIN CERTIFICATE') == 0: - string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string + string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\ + .format(string) # If there is a PEM cert in there, but there is some other text first # such as the text of the certificate, skip the text @@ -400,7 +401,7 @@ class Certificate: self.x509 = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0]) if self.x509 is None: - logger.warn("Loaded from string but cert is None: %s" % string) + logger.warn("Loaded from string but cert is None: {}".format(string)) # if there are more certs, then create a parent and let the parent load # itself from the remainder of the string @@ -415,7 +416,7 @@ class Certificate: file = open(filename) string = file.read() self.load_from_string(string) - self.filename=filename + self.filename = filename ## # Save the certificate to a string. @@ -443,7 +444,7 @@ class Certificate: f = open(filename, 'w') f.write(string) f.close() - self.filename=filename + self.filename = filename ## # Save the certificate to a random file in /tmp/ @@ -468,7 +469,7 @@ class Certificate: if isinstance(subject, dict) or isinstance(subject, str): req = crypto.X509Req() reqSubject = req.get_subject() - if (isinstance(subject, dict)): + if isinstance(subject, dict): for key in reqSubject.keys(): setattr(reqSubject, key, subject[key]) else: @@ -495,7 +496,7 @@ class Certificate: def set_subject(self, name): req = crypto.X509Req() subj = req.get_subject() - if (isinstance(name, dict)): + if isinstance(name, dict): for key in name.keys(): setattr(subj, key, name[key]) else: @@ -532,7 +533,7 @@ class Certificate: counter = 0 filtered = [self.filter_chunk(chunk) for chunk in data.split()] message += " ".join( [f for f in filtered if f]) - omitted = len ([f for f in filtered if not f]) + omitted = len([f for f in filtered if not f]) if omitted: message += "..+{} omitted".format(omitted) message += "]" @@ -570,7 +571,8 @@ class Certificate: if self.isCA != None: # Can't double set properties - raise Exception("Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val)) + raise Exception("Cannot set basicConstraints CA:?? more than once. " + "Was {}, trying to set as {}%s".format(self.isCA, val)) self.isCA = val if val: @@ -606,9 +608,9 @@ class Certificate: # FIXME: What if they are trying to set with a different value? # Is this ever OK? Or should we raise an exception? # elif oldExtVal: -# raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value) +# raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value) - ext = crypto.X509Extension (name, critical, value) + ext = crypto.X509Extension(name, critical, value) self.x509.add_extensions([ext]) ## @@ -754,7 +756,8 @@ class Certificate: # verify expiration time if self.x509.has_expired(): if debug_verify_chain: - logger.debug("verify_chain: NO, Certificate %s has expired" % self.pretty_cert()) + logger.debug("verify_chain: NO, Certificate {} has expired" + .format(self.pretty_cert())) raise CertExpired(self.pretty_cert(), "client cert") # if this cert is signed by a trusted_cert, then we are set @@ -763,35 +766,38 @@ class Certificate: # verify expiration of trusted_cert ? if not trusted_cert.x509.has_expired(): if debug_verify_chain: - logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%( - self.pretty_cert(), trusted_cert.pretty_cert())) + logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}" + .format(self.pretty_cert(), trusted_cert.pretty_cert())) return trusted_cert else: if debug_verify_chain: - logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%( - self.pretty_cert(),trusted_cert.pretty_cert())) - raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert()) + logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, " + "but that signer is expired..." + .format(self.pretty_cert(),trusted_cert.pretty_cert())) + raise CertExpired("{} signer trusted_cert {}" + .format(self.pretty_cert(), trusted_cert.pretty_cert())) # if there is no parent, then no way to verify the chain if not self.parent: if debug_verify_chain: - logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\ - (self.pretty_cert(), self.get_issuer(), len(trusted_certs))) - raise CertMissingParent(self.pretty_cert() + \ - ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\ - (self.get_issuer(), len(trusted_certs))) + logger.debug("verify_chain: NO. {} has no parent " + "and issuer {} is not in {} trusted roots" + .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs))) + raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, " + "and cert has no parent." + .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs))) # if it wasn't signed by the parent... if not self.is_signed_by_cert(self.parent): if debug_verify_chain: - logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\ - (self.pretty_cert(), - self.parent.pretty_cert(), - self.get_issuer())) - raise CertNotSignedByParent("%s: Parent %s, issuer %s"\ - % (self.pretty_cert(), - self.parent.pretty_cert(), - self.get_issuer())) + logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}" + .format(self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) + raise CertNotSignedByParent("{}: Parent {}, issuer {}" + .format(self.pretty_cert(), + self.parent.pretty_cert(), + self.get_issuer())) # Confirm that the parent is a CA. Only CAs can be trusted as # signers. @@ -800,15 +806,15 @@ class Certificate: # Ugly - cert objects aren't parsed so we need to read the # extension and hope there are no other basicConstraints if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'): - logger.warn("verify_chain: cert %s's parent %s is not a CA" % \ - (self.pretty_cert(), self.parent.pretty_cert())) - raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(), - self.parent.pretty_cert())) + logger.warn("verify_chain: cert {}'s parent {} is not a CA" + .format(self.pretty_cert(), self.parent.pretty_cert())) + raise CertNotSignedByParent("{}: Parent {} not a CA" + .format(self.pretty_cert(), self.parent.pretty_cert())) # if the parent isn't verified... if debug_verify_chain: - logger.debug("verify_chain: .. %s, -> verifying parent %s"%\ - (self.pretty_cert(),self.parent.pretty_cert())) + logger.debug("verify_chain: .. {}, -> verifying parent {}" + .format(self.pretty_cert(),self.parent.pretty_cert())) self.parent.verify_chain(trusted_certs) return @@ -819,16 +825,16 @@ class Certificate: triples = [] m2x509 = X509.load_cert_string(self.save_to_string()) nb_extensions = m2x509.get_ext_count() - logger.debug("X509 had %d extensions"%nb_extensions) + logger.debug("X509 had {} extensions".format(nb_extensions)) for i in range(nb_extensions): - ext=m2x509.get_ext_at(i) + ext = m2x509.get_ext_at(i) triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) ) return triples def get_data_names(self): return self.data.keys() - def get_all_datas (self): + def get_all_datas(self): triples = self.get_extensions() for name in self.get_data_names(): triples.append( (name,self.get_data(name),'data',) ) @@ -838,21 +844,22 @@ class Certificate: def get_filename(self): return getattr(self,'filename',None) - def dump (self, *args, **kwargs): + def dump(self, *args, **kwargs): print(self.dump_string(*args, **kwargs)) - def dump_string (self,show_extensions=False): + def dump_string(self, show_extensions=False): result = "" - result += "CERTIFICATE for %s\n"%self.pretty_cert() - result += "Issued by %s\n"%self.get_issuer() - filename=self.get_filename() - if filename: result += "Filename %s\n"%filename + result += "CERTIFICATE for {}\n".format(self.pretty_cert()) + result += "Issued by {}\n".format(self.get_issuer()) + filename = self.get_filename() + if filename: + result += "Filename {}\n".format(filename) if show_extensions: all_datas = self.get_all_datas() - result += " has %d extensions/data attached"%len(all_datas) - for (n, v, c) in all_datas: - if c=='data': - result += " data: %s=%s\n"%(n,v) + result += " has {} extensions/data attached".format(len(all_datas)) + for n, v, c in all_datas: + if c == 'data': + result += " data: {}={}\n".format(n, v) else: - result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v) + result += " ext: {} (crit={})=<<<{}>>>\n".format(n, c, v) return result diff --git a/sfa/trust/credential.py b/sfa/trust/credential.py index 6f5336d6..ee3f7325 100644 --- a/sfa/trust/credential.py +++ b/sfa/trust/credential.py @@ -62,13 +62,13 @@ DEFAULT_CREDENTIAL_LIFETIME = 86400 * 31 # . Need to add support for other types of credentials, e.g. tickets # . add namespaces to signed-credential element? -signature_template = \ +signature_format = \ ''' - + - + @@ -88,35 +88,6 @@ signature_template = \ ''' -# PG formats the template (whitespace) slightly differently. -# Note that they don't include the xmlns in the template, but add it later. -# Otherwise the two are equivalent. -#signature_template_as_in_pg = \ -#''' -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -# -#''' - ## # Convert a string into a bool # used to convert an xsd:boolean to a Python boolean @@ -193,7 +164,7 @@ class Signature(object): try: doc = parseString(self.xml) except ExpatError as e: - logger.log_exc ("Failed to parse credential, %s"%self.xml) + logger.log_exc("Failed to parse credential, {}".format(self.xml)) raise sig = doc.getElementsByTagName("Signature")[0] ## This code until the end of function rewritten by Aaron Helsinger @@ -214,7 +185,7 @@ class Signature(object): if len(cert.childNodes) > 0: szgid = cert.childNodes[0].nodeValue szgid = szgid.strip() - szgid = "-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----" % szgid + szgid = "-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----".format(szgid) if gids is None: gids = szgid else: @@ -224,7 +195,7 @@ class Signature(object): self.set_issuer_gid(GID(string=gids)) def encode(self): - self.xml = signature_template % (self.get_refid(), self.get_refid()) + self.xml = signature_format.format(refid=self.get_refid()) ## # A credential provides a caller gid with privileges to an object gid. @@ -303,8 +274,8 @@ class Credential(object): str = infile.read() # if this is a legacy credential, write error and bail out - if isinstance (str, StringType) and str.strip().startswith("-----"): - logger.error("Legacy credentials not supported any more - giving up with %s..."%str[:10]) + if isinstance(str, StringType) and str.strip().startswith("-----"): + logger.error("Legacy credentials not supported any more - giving up with {}...".format(str[:10])) return else: self.xml = str @@ -415,11 +386,11 @@ class Credential(object): # Expiration: an absolute UTC time of expiration (as either an int or string or datetime) # def set_expiration(self, expiration): - expiration_datetime = utcparse (expiration) + expiration_datetime = utcparse(expiration) if expiration_datetime is not None: self.expiration = expiration_datetime else: - logger.error ("unexpected input %s in Credential.set_expiration"%expiration) + logger.error("unexpected input {} in Credential.set_expiration".format(expiration)) ## # get the lifetime of the credential (always in datetime format) @@ -506,7 +477,7 @@ class Credential(object): append_sub(doc, cred, "target_urn", self.gidObject.get_urn()) append_sub(doc, cred, "uuid", "") if not self.expiration: - logger.debug("Creating credential valid for %s s"%DEFAULT_CREDENTIAL_LIFETIME) + logger.debug("Creating credential valid for {} s".format(DEFAULT_CREDENTIAL_LIFETIME)) self.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME)) self.expiration = self.expiration.replace(microsecond=0) if self.expiration.tzinfo is not None and self.expiration.tzinfo.utcoffset(self.expiration) is not None: @@ -567,10 +538,12 @@ class Credential(object): # Below throws InUse exception if we forgot to clone the attribute first oldAttr = signed_cred.setAttributeNode(attr.cloneNode(True)) if oldAttr and oldAttr.value != attr.value: - msg = "Delegating cred from owner %s to %s over %s:\n - Replaced attribute %s value '%s' with '%s'" % \ - (self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(), self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value) + msg = "Delegating cred from owner {} to {} over {}:\n" + "- Replaced attribute {} value '{}' with '{}'"\ + .format(self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(), + self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value) logger.warn(msg) - #raise CredentialNotVerifiable("Can't encode new valid delegated credential: %s" % msg) + #raise CredentialNotVerifiable("Can't encode new valid delegated credential: {}".format(msg)) p_cred = doc.importNode(sdoc.getElementsByTagName("credential")[0], True) p = doc.createElement("parent") @@ -647,7 +620,7 @@ class Credential(object): rid = self.get_refid() while rid in refs: val = int(rid[3:]) - rid = "ref%d" % (val + 1) + rid = "ref{}".format(val + 1) # Set the new refid self.set_refid(rid) @@ -700,14 +673,13 @@ class Credential(object): # Call out to xmlsec1 to sign it - ref = 'Sig_%s' % self.get_refid() + ref = 'Sig_{}'.format(self.get_refid()) filename = self.save_to_random_tmp_file() xmlsec1 = self.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") - command = '%s --sign --node-id "%s" --privkey-pem %s,%s %s' \ - % (xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename) -# print 'command',command + command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \ + .format(xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename) signed = os.popen(command).read() os.remove(filename) @@ -844,7 +816,8 @@ class Credential(object): xmlschema = etree.XMLSchema(schema_doc) if not xmlschema.validate(tree): error = xmlschema.error_log.last_error - message = "%s: %s (line %s)" % (self.pretty_cred(), error.message, error.line) + message = "{}: {} (line {})".format(self.pretty_cred(), + error.message, error.line) raise CredentialNotVerifiable(message) if trusted_certs_required and trusted_certs is None: @@ -863,14 +836,14 @@ class Credential(object): trusted_cert_objects.append(GID(filename=f)) ok_trusted_certs.append(f) except Exception as exc: - logger.error("Failed to load trusted cert from %s: %r"%( f, exc)) + logger.error("Failed to load trusted cert from {}: {}".format(f, exc)) trusted_certs = ok_trusted_certs # make sure it is not expired if self.get_expiration() < datetime.datetime.utcnow(): - raise CredentialNotVerifiable("Credential %s expired at %s" % \ - (self.pretty_cred(), - self.expiration.strftime(SFATIME_FORMAT))) + raise CredentialNotVerifiable("Credential {} expired at {}" \ + .format(self.pretty_cred(), + self.expiration.strftime(SFATIME_FORMAT))) # Verify the signatures filename = self.save_to_random_tmp_file() @@ -884,11 +857,11 @@ class Credential(object): cur_cred.get_gid_caller().verify_chain(trusted_cert_objects) refs = [] - refs.append("Sig_%s" % self.get_refid()) + refs.append("Sig_{}".format(self.get_refid())) parentRefs = self.updateRefID() for ref in parentRefs: - refs.append("Sig_%s" % ref) + refs.append("Sig_{}".format(ref)) for ref in refs: # If caller explicitly passed in None that means skip xmlsec1 validation. @@ -900,7 +873,7 @@ class Credential(object): # up to fedora20 we used os.popen and checked that the output begins with OK # turns out, with fedora21, there is extra input before this 'OK' thing # looks like we're better off just using the exit code - that's what it is made for - #cert_args = " ".join(['--trusted-pem %s' % x for x in trusted_certs]) + #cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs]) #command = '{} --verify --node-id "{}" {} {} 2>&1'.\ # format(self.xmlsec_path, ref, cert_args, filename) xmlsec1 = self.get_xmlsec1_path() @@ -926,8 +899,8 @@ class Credential(object): mend = verified.find('\\', mstart) msg = verified[mstart:mend] logger.warning("Credential.verify - failed - xmlsec1 returned {}".format(verified.strip())) - raise CredentialNotVerifiable("xmlsec1 error verifying cred %s using Signature ID %s: %s" % \ - (self.pretty_cred(), ref, msg)) + raise CredentialNotVerifiable("xmlsec1 error verifying cred {} using Signature ID {}: {}"\ + .format(self.pretty_cred(), ref, msg)) os.remove(filename) # Verify the parents (delegation) @@ -964,7 +937,9 @@ class Credential(object): root_target_gid = root_cred.get_gid_object() if root_cred.get_signature() is None: # malformed - raise CredentialNotVerifiable("Could not verify credential owned by %s for object %s. Cred has no signature" % (self.gidCaller.get_urn(), self.gidObject.get_urn())) + raise CredentialNotVerifiable("Could not verify credential owned by {} for object {}. " + "Cred has no signature" \ + .format(self.gidCaller.get_urn(), self.gidObject.get_urn())) root_cred_signer = root_cred.get_signature().get_issuer_gid() @@ -1108,7 +1083,7 @@ class Credential(object): #user_key = Keypair(filename=keyfile) #user_hrn = self.get_gid_caller().get_hrn() - subject_string = "%s delegated to %s" % (object_hrn, delegee_hrn) + subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn) dcred = Credential(subject=subject_string) dcred.set_gid_caller(delegee_gid) dcred.set_gid_object(object_gid) @@ -1127,7 +1102,7 @@ class Credential(object): def get_filename(self): return getattr(self,'filename',None) - def actual_caller_hrn (self): + def actual_caller_hrn(self): """a helper method used by some API calls like e.g. Allocate to try and find out who really is the original caller @@ -1149,26 +1124,26 @@ class Credential(object): # else this looks like a delegated credential, and the real caller is the issuer else: actual_caller_hrn=issuer_hrn - logger.info("actual_caller_hrn: caller_hrn=%s, issuer_hrn=%s, returning %s" - %(caller_hrn,issuer_hrn,actual_caller_hrn)) + logger.info("actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}" + .format(caller_hrn,issuer_hrn,actual_caller_hrn)) return actual_caller_hrn ## # Dump the contents of a credential to stdout in human-readable format # # @param dump_parents If true, also dump the parent certificates - def dump (self, *args, **kwargs): + def dump(self, *args, **kwargs): print(self.dump_string(*args, **kwargs)) # SFA code ignores show_xml and disables printing the cred xml def dump_string(self, dump_parents=False, show_xml=False): result="" - result += "CREDENTIAL %s\n" % self.pretty_subject() + result += "CREDENTIAL {}\n".format(self.pretty_subject()) filename=self.get_filename() - if filename: result += "Filename %s\n"%filename + if filename: result += "Filename {}\n".format(filename) privileges = self.get_privileges() if privileges: - result += " privs: %s\n" % privileges.save_to_string() + result += " privs: {}\n".format(privileges.save_to_string()) else: result += " privs: \n" gidCaller = self.get_gid_caller() diff --git a/sfa/trust/speaksfor_util.py b/sfa/trust/speaksfor_util.py index 83178225..640d5125 100644 --- a/sfa/trust/speaksfor_util.py +++ b/sfa/trust/speaksfor_util.py @@ -35,7 +35,7 @@ from xml.dom.minidom import * from sfa.util.sfatime import SFATIME_FORMAT from sfa.trust.certificate import Certificate -from sfa.trust.credential import Credential, signature_template, HAVELXML +from sfa.trust.credential import Credential, signature_format, HAVELXML from sfa.trust.abac_credential import ABACCredential, ABACElement from sfa.trust.credential_factory import CredentialFactory from sfa.trust.gid import GID @@ -56,7 +56,8 @@ from sfa.util.py23 import StringIO # Find the text associated with first child text node def findTextChildValue(root): child = findChildNamed(root, '#text') - if child: return str(child.nodeValue) + if child: + return str(child.nodeValue) return None # Find first child with given name @@ -85,7 +86,7 @@ def run_subprocess(cmd, stdout, stderr): output = proc.returncode return output except Exception as e: - raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e)) + raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e)) def get_cert_keyid(gid): """Extract the subject key identifier from the given certificate. @@ -134,26 +135,28 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, # Credential has not expired if cred.expiration and cred.expiration < datetime.datetime.utcnow(): - return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred()) + return False, None, "ABAC Credential expired at {} ({})"\ + .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred()) # Must be ABAC if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE: - return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type + return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type) if cred.signature is None or cred.signature.gid is None: - return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred() + return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\ + .format(cred.pretty_cred()) user_gid = cred.signature.gid user_urn = user_gid.get_urn() # URN of signer from cert must match URN of 'speaking-for' argument if user_urn != speaking_for_urn: - return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \ - (user_urn, speaking_for_urn, cred.pretty_cred()) + return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\ + .format(user_urn, speaking_for_urn, cred.pretty_cred()) tails = cred.get_tails() if len(tails) != 1: - return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \ - (len(tails), cred.pretty_cred()) + return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\ + .format(len(tails), cred.pretty_cred()) user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) @@ -188,13 +191,14 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, msg = verified[mstart:mend] if msg == "": msg = output - return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg + return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg) # Must say U.speaks_for(U)<-T if user_keyid != principal_keyid or \ tool_keyid != subject_keyid or \ - role != ('speaks_for_%s' % user_keyid): - return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred() + role != ('speaks_for_{}'.format(user_keyid)): + return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\ + .format(cred.pretty_cred()) # If schema provided, validate against schema if HAVELXML and schema and os.path.exists(schema): @@ -204,8 +208,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, xmlschema = etree.XMLSchema(schema_doc) if not xmlschema.validate(tree): error = xmlschema.error_log.last_error - message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line) - return False, None, ("XML Credential schema invalid: %s" % message) + message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line) + return False, None, ("XML Credential schema invalid: {}".format(message)) if trusted_roots: # User certificate must validate against trusted roots @@ -213,14 +217,13 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, user_gid.verify_chain(trusted_roots) except Exception as e: return False, None, \ - "Cred signer (user) cert not trusted: %s" % e + "Cred signer (user) cert not trusted: {}".format(e) # Tool certificate must validate against trusted roots try: tool_gid.verify_chain(trusted_roots) except Exception as e: - return False, None, \ - "Tool cert not trusted: %s" % e + return False, None, "Tool cert not trusted: {}".format(e) return True, user_gid, "" @@ -258,9 +261,9 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus if not isinstance(cred_value, ABACCredential): cred = CredentialFactory.createCred(cred_value) -# print("Got a cred to check speaksfor for: %s" % cred.pretty_cred()) +# print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred())) # #cred.dump(True, True) -# print("Caller: %s" % caller_gid.dump_string(2, True)) +# print("Caller: {}".format(caller_gid.dump_string(2, True))) # See if this is a valid speaks_for is_valid_speaks_for, user_gid, msg = \ verify_speaks_for(cred, @@ -270,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus if is_valid_speaks_for: return user_gid # speaks-for else: - logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg) + logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}" + .format(msg)) return caller_gid # Not speaks-for # Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods @@ -294,7 +298,7 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam user_urn = user_gid.get_urn() user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) - cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid) + cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid)) cred.tails.append(ABACElement(tool_keyid, tool_urn)) cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days)) cred.expiration = cred.expiration.replace(microsecond=0) @@ -306,8 +310,8 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam cred.sign() # Save it cred.save_to_file(cred_filename) - logger.info("Created ABAC credential: '%s' in file %s" % - (cred.pretty_cred(), cred_filename)) + logger.info("Created ABAC credential: '{}' in file {}" + .format(cred.pretty_cred(), cred_filename)) # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted def create_speaks_for(tool_gid, user_gid, ma_gid, @@ -315,41 +319,37 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, tool_urn = tool_gid.get_urn() user_urn = user_gid.get_urn() - header = '' - reference = "ref0" - signature_block = \ - '\n' + \ - signature_template + \ - '' - template = header + '\n' + \ - '\n' + \ - 'abac\n' + \ - '\n' +\ - '\n' + \ - '\n' + \ - '\n' + \ - '\n' + \ - '\n' + \ - '%s' +\ - '\n' + \ - '\n' + \ - '%s\n' + \ - '\n' + \ - '%s%s\n' +\ - 'speaks_for_%s\n' + \ - '\n' + \ - '\n' +\ - '%s%s\n' +\ - '\n' +\ - '\n' + \ - '\n' + \ - '\n' + \ - signature_block + \ - '\n' - + refid = "ref0" + + credential_format = """\ + + + + abac + + + + + + + {expiration_str} + + + {version} + + {user_keyid}{user_urn} + speaks_for_{user_keyid} + + + {tool_keyid}/keyid>{tool_urn} + + + + + """ + signature_format + """\ + +\ +""" credential_duration = datetime.timedelta(days=dur_days) expiration = datetime.datetime.utcnow() + credential_duration @@ -358,16 +358,15 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) - unsigned_cred = template % (reference, expiration_str, version, - user_keyid, user_urn, user_keyid, tool_keyid, tool_urn, - reference, reference) + # apply the format - itself uses signature_format which uses 'refid' + unsigned_cred = credential_format.format(**locals()) unsigned_cred_filename = write_to_tempfile(unsigned_cred) # Now sign the file with xmlsec1 # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem # --output signed.xml tosign.xml - pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(), - ma_gid.get_filename()) + pems = "{},{},{}".format(user_key_file, user_gid.get_filename(), + ma_gid.get_filename()) xmlsec1 = Credential.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") @@ -379,8 +378,8 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, if sign_proc_output == None: logger.info("xmlsec1 returns empty output") else: - logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" % - (tool_urn, user_urn, cred_filename)) + logger.info("Created ABAC credential: '{} speaks_for {}' in file {}" + .format(tool_urn, user_urn, cred_filename)) os.unlink(unsigned_cred_filename) @@ -451,5 +450,5 @@ if __name__ == "__main__": trusted_roots) - print('SPEAKS_FOR = %s' % (gid != tool_gid)) - print("CERT URN = %s" % gid.get_urn()) + print('SPEAKS_FOR = {}'.format(gid != tool_gid)) + print("CERT URN = {}".format(gid.get_urn()))