prettified certificate, credential and speaksfor_util
authorThierry Parmentelat <thierry.parmentelat@inria.fr>
Fri, 1 Apr 2016 10:56:21 +0000 (12:56 +0200)
committerThierry Parmentelat <thierry.parmentelat@inria.fr>
Fri, 1 Apr 2016 12:46:42 +0000 (14:46 +0200)
prettified speaks_for

sfa/trust/certificate.py
sfa/trust/credential.py
sfa/trust/speaksfor_util.py

index 8e9cf7d..758198e 100644 (file)
@@ -90,7 +90,7 @@ def test_passphrase(string, passphrase):
 def convert_public_key(key):
     keyconvert_path = "/usr/bin/keyconvert.py"
     if not os.path.isfile(keyconvert_path):
-        raise IOError("Could not find keyconvert in %s" % keyconvert_path)
+        raise IOError("Could not find keyconvert in {}".format(keyconvert_path))
 
     # we can only convert rsa keys
     if "ssh-dss" in key:
@@ -137,8 +137,8 @@ class Keypair:
     # Creates a Keypair object
     # @param create If create==True, creates a new public/private key and
     #     stores it in the object
-    # @param string If string!=None, load the keypair from the string (PEM)
-    # @param filename If filename!=None, load the keypair from the file
+    # @param string If string != None, load the keypair from the string (PEM)
+    # @param filename If filename != None, load the keypair from the file
 
     def __init__(self, create=False, string=None, filename=None):
         if create:
@@ -161,13 +161,13 @@ class Keypair:
 
     def save_to_file(self, filename):
         open(filename, 'w').write(self.as_pem())
-        self.filename=filename
+        self.filename = filename
 
     ##
     # Load the private key from a file. Implicity the private key includes the public key.
 
     def load_from_file(self, filename):
-        self.filename=filename
+        self.filename = filename
         buffer = open(filename, 'r').read()
         self.load_from_string(buffer)
 
@@ -217,7 +217,7 @@ class Keypair:
 
         # get the pyopenssl pkey from the pyopenssl x509
         self.key = pyx509.get_pubkey()
-        self.filename=filename
+        self.filename = filename
 
     ##
     # Load the public key from a string. No private key is loaded.
@@ -281,14 +281,14 @@ class Keypair:
     def get_filename(self):
         return getattr(self,'filename',None)
 
-    def dump (self, *args, **kwargs):
+    def dump(self, *args, **kwargs):
         print(self.dump_string(*args, **kwargs))
 
-    def dump_string (self):
-        result=""
-        result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
-        filename=self.get_filename()
-        if filename: result += "Filename %s\n"%filename
+    def dump_string(self):
+        result =  ""
+        result += "KEYPAIR: pubkey={:>40}...".format(self.get_pubkey_string())
+        filename = self.get_filename()
+        if filename: result += "Filename {}\n".format(filename)
         return result
 
 ##
@@ -312,7 +312,7 @@ class Certificate:
 #    parent = None
     isCA = None # will be a boolean once set
 
-    separator="-----parent-----"
+    separator = "-----parent-----"
 
     ##
     # Create a certificate object.
@@ -379,7 +379,8 @@ class Certificate:
         
         # If it's not in proper PEM format, wrap it
         if string.count('-----BEGIN CERTIFICATE') == 0:
-            string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string
+            string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
+                     .format(string)
 
         # If there is a PEM cert in there, but there is some other text first
         # such as the text of the certificate, skip the text
@@ -400,7 +401,7 @@ class Certificate:
         self.x509 = crypto.load_certificate(crypto.FILETYPE_PEM, parts[0])
 
         if self.x509 is None:
-            logger.warn("Loaded from string but cert is None: %s" % string)
+            logger.warn("Loaded from string but cert is None: {}".format(string))
 
         # if there are more certs, then create a parent and let the parent load
         # itself from the remainder of the string
@@ -415,7 +416,7 @@ class Certificate:
         file = open(filename)
         string = file.read()
         self.load_from_string(string)
-        self.filename=filename
+        self.filename = filename
 
     ##
     # Save the certificate to a string.
@@ -443,7 +444,7 @@ class Certificate:
             f = open(filename, 'w')
         f.write(string)
         f.close()
-        self.filename=filename
+        self.filename = filename
 
     ##
     # Save the certificate to a random file in /tmp/
@@ -468,7 +469,7 @@ class Certificate:
             if isinstance(subject, dict) or isinstance(subject, str):
                 req = crypto.X509Req()
                 reqSubject = req.get_subject()
-                if (isinstance(subject, dict)):
+                if isinstance(subject, dict):
                     for key in reqSubject.keys():
                         setattr(reqSubject, key, subject[key])
                 else:
@@ -495,7 +496,7 @@ class Certificate:
     def set_subject(self, name):
         req = crypto.X509Req()
         subj = req.get_subject()
-        if (isinstance(name, dict)):
+        if isinstance(name, dict):
             for key in name.keys():
                 setattr(subj, key, name[key])
         else:
@@ -532,7 +533,7 @@ class Certificate:
             counter = 0
             filtered = [self.filter_chunk(chunk) for chunk in data.split()]
             message += " ".join( [f for f in filtered if f])
-            omitted = len ([f for f in filtered if not f])
+            omitted = len([f for f in filtered if not f])
             if omitted:
                 message += "..+{} omitted".format(omitted)
         message += "]"
@@ -570,7 +571,8 @@ class Certificate:
 
         if self.isCA != None:
             # Can't double set properties
-            raise Exception("Cannot set basicConstraints CA:?? more than once. Was %s, trying to set as %s" % (self.isCA, val))
+            raise Exception("Cannot set basicConstraints CA:?? more than once. "
+                            "Was {}, trying to set as {}%s".format(self.isCA, val))
 
         self.isCA = val
         if val:
@@ -606,9 +608,9 @@ class Certificate:
         # FIXME: What if they are trying to set with a different value?
         # Is this ever OK? Or should we raise an exception?
 #        elif oldExtVal:
-#            raise "Cannot add extension %s which had val %s with new val %s" % (name, oldExtVal, value)
+#            raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value)
 
-        ext = crypto.X509Extension (name, critical, value)
+        ext = crypto.X509Extension(name, critical, value)
         self.x509.add_extensions([ext])
 
     ##
@@ -754,7 +756,8 @@ class Certificate:
         # verify expiration time
         if self.x509.has_expired():
             if debug_verify_chain:
-                logger.debug("verify_chain: NO, Certificate %s has expired" % self.pretty_cert())
+                logger.debug("verify_chain: NO, Certificate {} has expired"
+                             .format(self.pretty_cert()))
             raise CertExpired(self.pretty_cert(), "client cert")
 
         # if this cert is signed by a trusted_cert, then we are set
@@ -763,35 +766,38 @@ class Certificate:
                 # verify expiration of trusted_cert ?
                 if not trusted_cert.x509.has_expired():
                     if debug_verify_chain:
-                        logger.debug("verify_chain: YES. Cert %s signed by trusted cert %s"%(
-                            self.pretty_cert(), trusted_cert.pretty_cert()))
+                        logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
+                                     .format(self.pretty_cert(), trusted_cert.pretty_cert()))
                     return trusted_cert
                 else:
                     if debug_verify_chain:
-                        logger.debug("verify_chain: NO. Cert %s is signed by trusted_cert %s, but that signer is expired..."%(
-                            self.pretty_cert(),trusted_cert.pretty_cert()))
-                    raise CertExpired(self.pretty_cert()," signer trusted_cert %s"%trusted_cert.pretty_cert())
+                        logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+                                     "but that signer is expired..."
+                                     .format(self.pretty_cert(),trusted_cert.pretty_cert()))
+                    raise CertExpired("{} signer trusted_cert {}"
+                                      .format(self.pretty_cert(), trusted_cert.pretty_cert()))
 
         # if there is no parent, then no way to verify the chain
         if not self.parent:
             if debug_verify_chain:
-                logger.debug("verify_chain: NO. %s has no parent and issuer %s is not in %d trusted roots"%\
-                             (self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
-            raise CertMissingParent(self.pretty_cert() + \
-                                    ": Issuer %s is not one of the %d trusted roots, and cert has no parent." %\
-                                    (self.get_issuer(), len(trusted_certs)))
+                logger.debug("verify_chain: NO. {} has no parent "
+                             "and issuer {} is not in {} trusted roots"
+                             .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
+            raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+                                    "and cert has no parent."
+                                    .format(self.pretty_cert(), self.get_issuer(), len(trusted_certs)))
 
         # if it wasn't signed by the parent...
         if not self.is_signed_by_cert(self.parent):
             if debug_verify_chain:
-                logger.debug("verify_chain: NO. %s is not signed by parent %s, but by %s"%\
-                             (self.pretty_cert(),
-                              self.parent.pretty_cert(),
-                              self.get_issuer()))
-            raise CertNotSignedByParent("%s: Parent %s, issuer %s"\
-                                            % (self.pretty_cert(),
-                                               self.parent.pretty_cert(),
-                                               self.get_issuer()))
+                logger.debug("verify_chain: NO. {} is not signed by parent {}, but by {}"
+                             .format(self.pretty_cert(),
+                                     self.parent.pretty_cert(),
+                                     self.get_issuer()))
+            raise CertNotSignedByParent("{}: Parent {}, issuer {}"
+                                        .format(self.pretty_cert(),
+                                                self.parent.pretty_cert(),
+                                                self.get_issuer()))
 
         # Confirm that the parent is a CA. Only CAs can be trusted as
         # signers.
@@ -800,15 +806,15 @@ class Certificate:
         # Ugly - cert objects aren't parsed so we need to read the
         # extension and hope there are no other basicConstraints
         if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
-            logger.warn("verify_chain: cert %s's parent %s is not a CA" % \
-                            (self.pretty_cert(), self.parent.pretty_cert()))
-            raise CertNotSignedByParent("%s: Parent %s not a CA" % (self.pretty_cert(),
-                                                                    self.parent.pretty_cert()))
+            logger.warn("verify_chain: cert {}'s parent {} is not a CA"
+                        .format(self.pretty_cert(), self.parent.pretty_cert()))
+            raise CertNotSignedByParent("{}: Parent {} not a CA"
+                                        .format(self.pretty_cert(), self.parent.pretty_cert()))
 
         # if the parent isn't verified...
         if debug_verify_chain:
-            logger.debug("verify_chain: .. %s, -> verifying parent %s"%\
-                         (self.pretty_cert(),self.parent.pretty_cert()))
+            logger.debug("verify_chain: .. {}, -> verifying parent {}"
+                         .format(self.pretty_cert(),self.parent.pretty_cert()))
         self.parent.verify_chain(trusted_certs)
 
         return
@@ -819,16 +825,16 @@ class Certificate:
         triples = []
         m2x509 = X509.load_cert_string(self.save_to_string())
         nb_extensions = m2x509.get_ext_count()
-        logger.debug("X509 had %d extensions"%nb_extensions)
+        logger.debug("X509 had {} extensions".format(nb_extensions))
         for i in range(nb_extensions):
-            ext=m2x509.get_ext_at(i)
+            ext = m2x509.get_ext_at(i)
             triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
         return triples
 
     def get_data_names(self):
         return self.data.keys()
 
-    def get_all_datas (self):
+    def get_all_datas(self):
         triples = self.get_extensions()
         for name in self.get_data_names():
             triples.append( (name,self.get_data(name),'data',) )
@@ -838,21 +844,22 @@ class Certificate:
     def get_filename(self):
         return getattr(self,'filename',None)
 
-    def dump (self, *args, **kwargs):
+    def dump(self, *args, **kwargs):
         print(self.dump_string(*args, **kwargs))
 
-    def dump_string (self,show_extensions=False):
+    def dump_string(self, show_extensions=False):
         result = ""
-        result += "CERTIFICATE for %s\n"%self.pretty_cert()
-        result += "Issued by %s\n"%self.get_issuer()
-        filename=self.get_filename()
-        if filename: result += "Filename %s\n"%filename
+        result += "CERTIFICATE for {}\n".format(self.pretty_cert())
+        result += "Issued by {}\n".format(self.get_issuer())
+        filename = self.get_filename()
+        if filename:
+            result += "Filename {}\n".format(filename)
         if show_extensions:
             all_datas = self.get_all_datas()
-            result += " has %d extensions/data attached"%len(all_datas)
-            for (n, v, c) in all_datas:
-                if c=='data':
-                    result += "   data: %s=%s\n"%(n,v)
+            result += " has {} extensions/data attached".format(len(all_datas))
+            for n, v, c in all_datas:
+                if c == 'data':
+                    result += "   data: {}={}\n".format(n, v)
                 else:
-                    result += "    ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)
+                    result += "    ext: {} (crit={})=<<<{}>>>\n".format(n, c, v)
         return result
index 6f5336d..ee3f732 100644 (file)
@@ -62,13 +62,13 @@ DEFAULT_CREDENTIAL_LIFETIME = 86400 * 31
 # . Need to add support for other types of credentials, e.g. tickets
 # . add namespaces to signed-credential element?
 
-signature_template = \
+signature_format = \
 '''
-<Signature xml:id="Sig_%s" xmlns="http://www.w3.org/2000/09/xmldsig#">
+<Signature xml:id="Sig_{refid}" xmlns="http://www.w3.org/2000/09/xmldsig#">
   <SignedInfo>
     <CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
     <SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
-    <Reference URI="#%s">
+    <Reference URI="#{refid}">
       <Transforms>
         <Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature" />
       </Transforms>
@@ -88,35 +88,6 @@ signature_template = \
 </Signature>
 '''
 
-# PG formats the template (whitespace) slightly differently.
-# Note that they don't include the xmlns in the template, but add it later.
-# Otherwise the two are equivalent.
-#signature_template_as_in_pg = \
-#'''
-#<Signature xml:id="Sig_%s" >
-# <SignedInfo>
-#  <CanonicalizationMethod      Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
-#  <SignatureMethod      Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
-#  <Reference URI="#%s">
-#    <Transforms>
-#      <Transform         Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature" />
-#    </Transforms>
-#    <DigestMethod        Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
-#    <DigestValue></DigestValue>
-#    </Reference>
-# </SignedInfo>
-# <SignatureValue />
-# <KeyInfo>
-#  <X509Data >
-#   <X509SubjectName/>
-#   <X509IssuerSerial/>
-#   <X509Certificate/>
-#  </X509Data>
-#  <KeyValue />
-# </KeyInfo>
-#</Signature>
-#'''
-
 ##
 # Convert a string into a bool
 # used to convert an xsd:boolean to a Python boolean
@@ -193,7 +164,7 @@ class Signature(object):
         try:
             doc = parseString(self.xml)
         except ExpatError as e:
-            logger.log_exc ("Failed to parse credential, %s"%self.xml)
+            logger.log_exc("Failed to parse credential, {}".format(self.xml))
             raise
         sig = doc.getElementsByTagName("Signature")[0]
         ## This code until the end of function rewritten by Aaron Helsinger
@@ -214,7 +185,7 @@ class Signature(object):
                 if len(cert.childNodes) > 0:
                     szgid = cert.childNodes[0].nodeValue
                     szgid = szgid.strip()
-                    szgid = "-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----" % szgid
+                    szgid = "-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----".format(szgid)
                     if gids is None:
                         gids = szgid
                     else:
@@ -224,7 +195,7 @@ class Signature(object):
         self.set_issuer_gid(GID(string=gids))
         
     def encode(self):
-        self.xml = signature_template % (self.get_refid(), self.get_refid())
+        self.xml = signature_format.format(refid=self.get_refid())
 
 ##
 # A credential provides a caller gid with privileges to an object gid.
@@ -303,8 +274,8 @@ class Credential(object):
                     str = infile.read()
                 
             # if this is a legacy credential, write error and bail out
-            if isinstance (str, StringType) and str.strip().startswith("-----"):
-                logger.error("Legacy credentials not supported any more - giving up with %s..."%str[:10])
+            if isinstance(str, StringType) and str.strip().startswith("-----"):
+                logger.error("Legacy credentials not supported any more - giving up with {}...".format(str[:10]))
                 return
             else:
                 self.xml = str
@@ -415,11 +386,11 @@ class Credential(object):
     # Expiration: an absolute UTC time of expiration (as either an int or string or datetime)
     # 
     def set_expiration(self, expiration):
-        expiration_datetime = utcparse (expiration)
+        expiration_datetime = utcparse(expiration)
         if expiration_datetime is not None:
             self.expiration = expiration_datetime
         else:
-            logger.error ("unexpected input %s in Credential.set_expiration"%expiration)
+            logger.error("unexpected input {} in Credential.set_expiration".format(expiration))
 
     ##
     # get the lifetime of the credential (always in datetime format)
@@ -506,7 +477,7 @@ class Credential(object):
         append_sub(doc, cred, "target_urn", self.gidObject.get_urn())
         append_sub(doc, cred, "uuid", "")
         if not self.expiration:
-            logger.debug("Creating credential valid for %s s"%DEFAULT_CREDENTIAL_LIFETIME)
+            logger.debug("Creating credential valid for {} s".format(DEFAULT_CREDENTIAL_LIFETIME))
             self.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
         self.expiration = self.expiration.replace(microsecond=0)
         if self.expiration.tzinfo is not None and self.expiration.tzinfo.utcoffset(self.expiration) is not None:
@@ -567,10 +538,12 @@ class Credential(object):
                     # Below throws InUse exception if we forgot to clone the attribute first
                     oldAttr = signed_cred.setAttributeNode(attr.cloneNode(True))
                     if oldAttr and oldAttr.value != attr.value:
-                        msg = "Delegating cred from owner %s to %s over %s:\n - Replaced attribute %s value '%s' with '%s'" % \
-                              (self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(), self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value)
+                        msg = "Delegating cred from owner {} to {} over {}:\n"
+                        "- Replaced attribute {} value '{}' with '{}'"\
+                            .format(self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(),
+                                    self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value)
                         logger.warn(msg)
-                        #raise CredentialNotVerifiable("Can't encode new valid delegated credential: %s" % msg)
+                        #raise CredentialNotVerifiable("Can't encode new valid delegated credential: {}".format(msg))
 
             p_cred = doc.importNode(sdoc.getElementsByTagName("credential")[0], True)
             p = doc.createElement("parent")
@@ -647,7 +620,7 @@ class Credential(object):
         rid = self.get_refid()
         while rid in refs:
             val = int(rid[3:])
-            rid = "ref%d" % (val + 1)
+            rid = "ref{}".format(val + 1)
 
         # Set the new refid
         self.set_refid(rid)
@@ -700,14 +673,13 @@ class Credential(object):
 
 
         # Call out to xmlsec1 to sign it
-        ref = 'Sig_%s' % self.get_refid()
+        ref = 'Sig_{}'.format(self.get_refid())
         filename = self.save_to_random_tmp_file()
         xmlsec1 = self.get_xmlsec1_path()
         if not xmlsec1:
             raise Exception("Could not locate required 'xmlsec1' program")
-        command = '%s --sign --node-id "%s" --privkey-pem %s,%s %s' \
-            % (xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename)
-#        print 'command',command
+        command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \
+                  .format(xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename)
         signed = os.popen(command).read()
         os.remove(filename)
 
@@ -844,7 +816,8 @@ class Credential(object):
                 xmlschema = etree.XMLSchema(schema_doc)
                 if not xmlschema.validate(tree):
                     error = xmlschema.error_log.last_error
-                    message = "%s: %s (line %s)" % (self.pretty_cred(), error.message, error.line)
+                    message = "{}: {} (line {})".format(self.pretty_cred(),
+                                                        error.message, error.line)
                     raise CredentialNotVerifiable(message)
 
         if trusted_certs_required and trusted_certs is None:
@@ -863,14 +836,14 @@ class Credential(object):
                     trusted_cert_objects.append(GID(filename=f))
                     ok_trusted_certs.append(f)
                 except Exception as exc:
-                    logger.error("Failed to load trusted cert from %s: %r"%( f, exc))
+                    logger.error("Failed to load trusted cert from {}: {}".format(f, exc))
             trusted_certs = ok_trusted_certs
 
         # make sure it is not expired
         if self.get_expiration() < datetime.datetime.utcnow():
-            raise CredentialNotVerifiable("Credential %s expired at %s" % \
-                                          (self.pretty_cred(),
-                                           self.expiration.strftime(SFATIME_FORMAT)))
+            raise CredentialNotVerifiable("Credential {} expired at {}" \
+                                          .format(self.pretty_cred(),
+                                                  self.expiration.strftime(SFATIME_FORMAT)))
 
         # Verify the signatures
         filename = self.save_to_random_tmp_file()
@@ -884,11 +857,11 @@ class Credential(object):
                 cur_cred.get_gid_caller().verify_chain(trusted_cert_objects)
 
         refs = []
-        refs.append("Sig_%s" % self.get_refid())
+        refs.append("Sig_{}".format(self.get_refid()))
 
         parentRefs = self.updateRefID()
         for ref in parentRefs:
-            refs.append("Sig_%s" % ref)
+            refs.append("Sig_{}".format(ref))
 
         for ref in refs:
             # If caller explicitly passed in None that means skip xmlsec1 validation.
@@ -900,7 +873,7 @@ class Credential(object):
             # up to fedora20 we used os.popen and checked that the output begins with OK
             # turns out, with fedora21, there is extra input before this 'OK' thing
             # looks like we're better off just using the exit code - that's what it is made for
-            #cert_args = " ".join(['--trusted-pem %s' % x for x in trusted_certs])
+            #cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
             #command = '{} --verify --node-id "{}" {} {} 2>&1'.\
             #          format(self.xmlsec_path, ref, cert_args, filename)
             xmlsec1 = self.get_xmlsec1_path()
@@ -926,8 +899,8 @@ class Credential(object):
                     mend = verified.find('\\', mstart)
                     msg = verified[mstart:mend]
                 logger.warning("Credential.verify - failed - xmlsec1 returned {}".format(verified.strip()))
-                raise CredentialNotVerifiable("xmlsec1 error verifying cred %s using Signature ID %s: %s" % \
-                                              (self.pretty_cred(), ref, msg))
+                raise CredentialNotVerifiable("xmlsec1 error verifying cred {} using Signature ID {}: {}"\
+                                              .format(self.pretty_cred(), ref, msg))
         os.remove(filename)
 
         # Verify the parents (delegation)
@@ -964,7 +937,9 @@ class Credential(object):
         root_target_gid = root_cred.get_gid_object()
         if root_cred.get_signature() is None:
             # malformed
-            raise CredentialNotVerifiable("Could not verify credential owned by %s for object %s. Cred has no signature" % (self.gidCaller.get_urn(), self.gidObject.get_urn()))
+            raise CredentialNotVerifiable("Could not verify credential owned by {} for object {}. "
+                                          "Cred has no signature" \
+                                          .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
 
         root_cred_signer = root_cred.get_signature().get_issuer_gid()
 
@@ -1108,7 +1083,7 @@ class Credential(object):
   
         #user_key = Keypair(filename=keyfile)
         #user_hrn = self.get_gid_caller().get_hrn()
-        subject_string = "%s delegated to %s" % (object_hrn, delegee_hrn)
+        subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn)
         dcred = Credential(subject=subject_string)
         dcred.set_gid_caller(delegee_gid)
         dcred.set_gid_object(object_gid)
@@ -1127,7 +1102,7 @@ class Credential(object):
     def get_filename(self):
         return getattr(self,'filename',None)
 
-    def actual_caller_hrn (self):
+    def actual_caller_hrn(self):
         """a helper method used by some API calls like e.g. Allocate
         to try and find out who really is the original caller
 
@@ -1149,26 +1124,26 @@ class Credential(object):
         # else this looks like a delegated credential, and the real caller is the issuer
         else:
             actual_caller_hrn=issuer_hrn
-        logger.info("actual_caller_hrn: caller_hrn=%s, issuer_hrn=%s, returning %s"
-                    %(caller_hrn,issuer_hrn,actual_caller_hrn))
+        logger.info("actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
+                    .format(caller_hrn,issuer_hrn,actual_caller_hrn))
         return actual_caller_hrn
 
     ##
     # Dump the contents of a credential to stdout in human-readable format
     #
     # @param dump_parents If true, also dump the parent certificates
-    def dump (self, *args, **kwargs):
+    def dump(self, *args, **kwargs):
         print(self.dump_string(*args, **kwargs))
 
     # SFA code ignores show_xml and disables printing the cred xml
     def dump_string(self, dump_parents=False, show_xml=False):
         result=""
-        result += "CREDENTIAL %s\n" % self.pretty_subject()
+        result += "CREDENTIAL {}\n".format(self.pretty_subject())
         filename=self.get_filename()
-        if filename: result += "Filename %s\n"%filename
+        if filename: result += "Filename {}\n".format(filename)
         privileges = self.get_privileges()
         if privileges:
-            result += "      privs: %s\n" % privileges.save_to_string()
+            result += "      privs: {}\n".format(privileges.save_to_string())
         else:
             result += "      privs: \n"
         gidCaller = self.get_gid_caller()
index 8317822..640d512 100644 (file)
@@ -35,7 +35,7 @@ from xml.dom.minidom import *
 from sfa.util.sfatime import SFATIME_FORMAT
 
 from sfa.trust.certificate import Certificate
-from sfa.trust.credential import Credential, signature_template, HAVELXML
+from sfa.trust.credential import Credential, signature_format, HAVELXML
 from sfa.trust.abac_credential import ABACCredential, ABACElement
 from sfa.trust.credential_factory import CredentialFactory
 from sfa.trust.gid import GID
@@ -56,7 +56,8 @@ from sfa.util.py23 import StringIO
 # Find the text associated with first child text node
 def findTextChildValue(root):
     child = findChildNamed(root, '#text')
-    if child: return str(child.nodeValue)
+    if child:
+        return str(child.nodeValue)
     return None
 
 # Find first child with given name
@@ -85,7 +86,7 @@ def run_subprocess(cmd, stdout, stderr):
             output = proc.returncode
         return output
     except Exception as e:
-        raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e))
+        raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
 
 def get_cert_keyid(gid):
     """Extract the subject key identifier from the given certificate.
@@ -134,26 +135,28 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
 
     # Credential has not expired
     if cred.expiration and cred.expiration < datetime.datetime.utcnow():
-        return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
+        return False, None, "ABAC Credential expired at {} ({})"\
+            .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
 
     # Must be ABAC
     if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
-        return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type
+        return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
 
     if cred.signature is None or cred.signature.gid is None:
-        return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred()
+        return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
+            .format(cred.pretty_cred())
     user_gid = cred.signature.gid
     user_urn = user_gid.get_urn()
 
     # URN of signer from cert must match URN of 'speaking-for' argument
     if user_urn != speaking_for_urn:
-        return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \
-            (user_urn, speaking_for_urn, cred.pretty_cred())
+        return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
+            .format(user_urn, speaking_for_urn, cred.pretty_cred())
 
     tails = cred.get_tails()
     if len(tails) != 1: 
-        return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \
-            (len(tails), cred.pretty_cred())
+        return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
+            .format(len(tails), cred.pretty_cred())
 
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
@@ -188,13 +191,14 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
             msg = verified[mstart:mend]
         if msg == "":
             msg = output
-        return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg
+        return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
 
     # Must say U.speaks_for(U)<-T
     if user_keyid != principal_keyid or \
             tool_keyid != subject_keyid or \
-            role != ('speaks_for_%s' % user_keyid):
-        return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred()
+            role != ('speaks_for_{}'.format(user_keyid)):
+        return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
+            .format(cred.pretty_cred())
 
     # If schema provided, validate against schema
     if HAVELXML and schema and os.path.exists(schema):
@@ -204,8 +208,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
         xmlschema = etree.XMLSchema(schema_doc)
         if not xmlschema.validate(tree):
             error = xmlschema.error_log.last_error
-            message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line)
-            return False, None, ("XML Credential schema invalid: %s" % message)
+            message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
+            return False, None, ("XML Credential schema invalid: {}".format(message))
 
     if trusted_roots:
         # User certificate must validate against trusted roots
@@ -213,14 +217,13 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
             user_gid.verify_chain(trusted_roots)
         except Exception as e:
             return False, None, \
-                "Cred signer (user) cert not trusted: %s" % e
+                "Cred signer (user) cert not trusted: {}".format(e)
 
         # Tool certificate must validate against trusted roots
         try:
             tool_gid.verify_chain(trusted_roots)
         except Exception as e:
-            return False, None, \
-                "Tool cert not trusted: %s" % e
+            return False, None, "Tool cert not trusted: {}".format(e)
 
     return True, user_gid, ""
 
@@ -258,9 +261,9 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
             if not isinstance(cred_value, ABACCredential):
                 cred = CredentialFactory.createCred(cred_value)
 
-#            print("Got a cred to check speaksfor for: %s" % cred.pretty_cred())
+#            print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
 #            #cred.dump(True, True)
-#            print("Caller: %s" % caller_gid.dump_string(2, True))
+#            print("Caller: {}".format(caller_gid.dump_string(2, True)))
             # See if this is a valid speaks_for
             is_valid_speaks_for, user_gid, msg = \
                 verify_speaks_for(cred,
@@ -270,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
             if is_valid_speaks_for:
                 return user_gid # speaks-for
             else:
-                logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
+                logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
+                            .format(msg))
     return caller_gid # Not speaks-for
 
 # Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
@@ -294,7 +298,7 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
     user_urn = user_gid.get_urn()
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
-    cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid)
+    cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
     cred.tails.append(ABACElement(tool_keyid, tool_urn))
     cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
     cred.expiration = cred.expiration.replace(microsecond=0)
@@ -306,8 +310,8 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
     cred.sign()
     # Save it
     cred.save_to_file(cred_filename)
-    logger.info("Created ABAC credential: '%s' in file %s" % 
-                (cred.pretty_cred(), cred_filename))
+    logger.info("Created ABAC credential: '{}' in file {}"
+                .format(cred.pretty_cred(), cred_filename))
 
 # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
 def create_speaks_for(tool_gid, user_gid, ma_gid, 
@@ -315,41 +319,37 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
     tool_urn = tool_gid.get_urn()
     user_urn = user_gid.get_urn()
 
-    header = '<?xml version="1.0" encoding="UTF-8"?>'
-    reference = "ref0"
-    signature_block = \
-        '<signatures>\n' + \
-        signature_template + \
-        '</signatures>'
-    template = header + '\n' + \
-        '<signed-credential '
-    template += 'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd"'
-    template += '>\n' + \
-        '<credential xml:id="%s">\n' + \
-        '<type>abac</type>\n' + \
-        '<serial/>\n' +\
-        '<owner_gid/>\n' + \
-        '<owner_urn/>\n' + \
-        '<target_gid/>\n' + \
-        '<target_urn/>\n' + \
-        '<uuid/>\n' + \
-        '<expires>%s</expires>' +\
-        '<abac>\n' + \
-        '<rt0>\n' + \
-        '<version>%s</version>\n' + \
-        '<head>\n' + \
-        '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
-        '<role>speaks_for_%s</role>\n' + \
-        '</head>\n' + \
-        '<tail>\n' +\
-        '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
-        '</tail>\n' +\
-        '</rt0>\n' + \
-        '</abac>\n' + \
-        '</credential>\n' + \
-        signature_block + \
-        '</signed-credential>\n'
-
+    refid = "ref0"
+
+    credential_format = """\
+<?xml version="1.0" encoding="UTF-8"?>
+<signed-credential xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd">
+ <credential xml:id="{refid}">
+  <type>abac</type>
+  <serial/>
+  <owner_gid/>
+  <owner_urn/>
+  <target_gid/>
+  <target_urn/>
+  <uuid/>
+  <expires>{expiration_str}</expires>
+  <abac>
+   <rt0>
+    <version>{version}</version>
+    <head>
+     <ABACprincipal><keyid>{user_keyid}</keyid><mnemonic>{user_urn}</mnemonic></ABACprincipal>
+     <role>speaks_for_{user_keyid}</role>
+    </head>
+    <tail>
+     <ABACprincipal><keyid>{tool_keyid}/keyid><mnemonic>{tool_urn}</mnemonic></ABACprincipal>
+    </tail>
+   </rt0>
+  </abac>
+ </credential>
+ <signatures>""" + signature_format + """\
+ </signatures>
+</signed-credential>\
+"""
 
     credential_duration = datetime.timedelta(days=dur_days)
     expiration = datetime.datetime.utcnow() + credential_duration
@@ -358,16 +358,15 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
 
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
-    unsigned_cred = template % (reference, expiration_str, version, 
-                                user_keyid, user_urn, user_keyid, tool_keyid, tool_urn,
-                                reference, reference)
+    # apply the format - itself uses signature_format which uses 'refid'
+    unsigned_cred = credential_format.format(**locals())
     unsigned_cred_filename = write_to_tempfile(unsigned_cred)
 
     # Now sign the file with xmlsec1
     # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem 
     # --output signed.xml tosign.xml
-    pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(),
-                         ma_gid.get_filename())
+    pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
+                             ma_gid.get_filename())
     xmlsec1 = Credential.get_xmlsec1_path()
     if not xmlsec1:
         raise Exception("Could not locate required 'xmlsec1' program")
@@ -379,8 +378,8 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
     if sign_proc_output == None:
         logger.info("xmlsec1 returns empty output")
     else:
-        logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" % 
-                    (tool_urn, user_urn, cred_filename))
+        logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
+                    .format(tool_urn, user_urn, cred_filename))
     os.unlink(unsigned_cred_filename)
 
 
@@ -451,5 +450,5 @@ if __name__ == "__main__":
                                trusted_roots)
 
 
-    print('SPEAKS_FOR = %s' % (gid != tool_gid))
-    print("CERT URN = %s" % gid.get_urn())
+    print('SPEAKS_FOR = {}'.format(gid != tool_gid))
+    print("CERT URN = {}".format(gid.get_urn()))