prettified certificate, credential and speaksfor_util
[sfa.git] / sfa / trust / speaksfor_util.py
index 8317822..640d512 100644 (file)
@@ -35,7 +35,7 @@ from xml.dom.minidom import *
 from sfa.util.sfatime import SFATIME_FORMAT
 
 from sfa.trust.certificate import Certificate
-from sfa.trust.credential import Credential, signature_template, HAVELXML
+from sfa.trust.credential import Credential, signature_format, HAVELXML
 from sfa.trust.abac_credential import ABACCredential, ABACElement
 from sfa.trust.credential_factory import CredentialFactory
 from sfa.trust.gid import GID
@@ -56,7 +56,8 @@ from sfa.util.py23 import StringIO
 # Find the text associated with first child text node
 def findTextChildValue(root):
     child = findChildNamed(root, '#text')
-    if child: return str(child.nodeValue)
+    if child:
+        return str(child.nodeValue)
     return None
 
 # Find first child with given name
@@ -85,7 +86,7 @@ def run_subprocess(cmd, stdout, stderr):
             output = proc.returncode
         return output
     except Exception as e:
-        raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e))
+        raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
 
 def get_cert_keyid(gid):
     """Extract the subject key identifier from the given certificate.
@@ -134,26 +135,28 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
 
     # Credential has not expired
     if cred.expiration and cred.expiration < datetime.datetime.utcnow():
-        return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
+        return False, None, "ABAC Credential expired at {} ({})"\
+            .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
 
     # Must be ABAC
     if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
-        return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type
+        return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
 
     if cred.signature is None or cred.signature.gid is None:
-        return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred()
+        return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
+            .format(cred.pretty_cred())
     user_gid = cred.signature.gid
     user_urn = user_gid.get_urn()
 
     # URN of signer from cert must match URN of 'speaking-for' argument
     if user_urn != speaking_for_urn:
-        return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \
-            (user_urn, speaking_for_urn, cred.pretty_cred())
+        return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
+            .format(user_urn, speaking_for_urn, cred.pretty_cred())
 
     tails = cred.get_tails()
     if len(tails) != 1: 
-        return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \
-            (len(tails), cred.pretty_cred())
+        return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
+            .format(len(tails), cred.pretty_cred())
 
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
@@ -188,13 +191,14 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
             msg = verified[mstart:mend]
         if msg == "":
             msg = output
-        return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg
+        return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
 
     # Must say U.speaks_for(U)<-T
     if user_keyid != principal_keyid or \
             tool_keyid != subject_keyid or \
-            role != ('speaks_for_%s' % user_keyid):
-        return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred()
+            role != ('speaks_for_{}'.format(user_keyid)):
+        return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
+            .format(cred.pretty_cred())
 
     # If schema provided, validate against schema
     if HAVELXML and schema and os.path.exists(schema):
@@ -204,8 +208,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
         xmlschema = etree.XMLSchema(schema_doc)
         if not xmlschema.validate(tree):
             error = xmlschema.error_log.last_error
-            message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line)
-            return False, None, ("XML Credential schema invalid: %s" % message)
+            message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
+            return False, None, ("XML Credential schema invalid: {}".format(message))
 
     if trusted_roots:
         # User certificate must validate against trusted roots
@@ -213,14 +217,13 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
             user_gid.verify_chain(trusted_roots)
         except Exception as e:
             return False, None, \
-                "Cred signer (user) cert not trusted: %s" % e
+                "Cred signer (user) cert not trusted: {}".format(e)
 
         # Tool certificate must validate against trusted roots
         try:
             tool_gid.verify_chain(trusted_roots)
         except Exception as e:
-            return False, None, \
-                "Tool cert not trusted: %s" % e
+            return False, None, "Tool cert not trusted: {}".format(e)
 
     return True, user_gid, ""
 
@@ -258,9 +261,9 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
             if not isinstance(cred_value, ABACCredential):
                 cred = CredentialFactory.createCred(cred_value)
 
-#            print("Got a cred to check speaksfor for: %s" % cred.pretty_cred())
+#            print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
 #            #cred.dump(True, True)
-#            print("Caller: %s" % caller_gid.dump_string(2, True))
+#            print("Caller: {}".format(caller_gid.dump_string(2, True)))
             # See if this is a valid speaks_for
             is_valid_speaks_for, user_gid, msg = \
                 verify_speaks_for(cred,
@@ -270,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
             if is_valid_speaks_for:
                 return user_gid # speaks-for
             else:
-                logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
+                logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
+                            .format(msg))
     return caller_gid # Not speaks-for
 
 # Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
@@ -294,7 +298,7 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
     user_urn = user_gid.get_urn()
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
-    cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid)
+    cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
     cred.tails.append(ABACElement(tool_keyid, tool_urn))
     cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
     cred.expiration = cred.expiration.replace(microsecond=0)
@@ -306,8 +310,8 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
     cred.sign()
     # Save it
     cred.save_to_file(cred_filename)
-    logger.info("Created ABAC credential: '%s' in file %s" % 
-                (cred.pretty_cred(), cred_filename))
+    logger.info("Created ABAC credential: '{}' in file {}"
+                .format(cred.pretty_cred(), cred_filename))
 
 # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
 def create_speaks_for(tool_gid, user_gid, ma_gid, 
@@ -315,41 +319,37 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
     tool_urn = tool_gid.get_urn()
     user_urn = user_gid.get_urn()
 
-    header = '<?xml version="1.0" encoding="UTF-8"?>'
-    reference = "ref0"
-    signature_block = \
-        '<signatures>\n' + \
-        signature_template + \
-        '</signatures>'
-    template = header + '\n' + \
-        '<signed-credential '
-    template += 'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd"'
-    template += '>\n' + \
-        '<credential xml:id="%s">\n' + \
-        '<type>abac</type>\n' + \
-        '<serial/>\n' +\
-        '<owner_gid/>\n' + \
-        '<owner_urn/>\n' + \
-        '<target_gid/>\n' + \
-        '<target_urn/>\n' + \
-        '<uuid/>\n' + \
-        '<expires>%s</expires>' +\
-        '<abac>\n' + \
-        '<rt0>\n' + \
-        '<version>%s</version>\n' + \
-        '<head>\n' + \
-        '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
-        '<role>speaks_for_%s</role>\n' + \
-        '</head>\n' + \
-        '<tail>\n' +\
-        '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
-        '</tail>\n' +\
-        '</rt0>\n' + \
-        '</abac>\n' + \
-        '</credential>\n' + \
-        signature_block + \
-        '</signed-credential>\n'
-
+    refid = "ref0"
+
+    credential_format = """\
+<?xml version="1.0" encoding="UTF-8"?>
+<signed-credential xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd">
+ <credential xml:id="{refid}">
+  <type>abac</type>
+  <serial/>
+  <owner_gid/>
+  <owner_urn/>
+  <target_gid/>
+  <target_urn/>
+  <uuid/>
+  <expires>{expiration_str}</expires>
+  <abac>
+   <rt0>
+    <version>{version}</version>
+    <head>
+     <ABACprincipal><keyid>{user_keyid}</keyid><mnemonic>{user_urn}</mnemonic></ABACprincipal>
+     <role>speaks_for_{user_keyid}</role>
+    </head>
+    <tail>
+     <ABACprincipal><keyid>{tool_keyid}/keyid><mnemonic>{tool_urn}</mnemonic></ABACprincipal>
+    </tail>
+   </rt0>
+  </abac>
+ </credential>
+ <signatures>""" + signature_format + """\
+ </signatures>
+</signed-credential>\
+"""
 
     credential_duration = datetime.timedelta(days=dur_days)
     expiration = datetime.datetime.utcnow() + credential_duration
@@ -358,16 +358,15 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
 
     user_keyid = get_cert_keyid(user_gid)
     tool_keyid = get_cert_keyid(tool_gid)
-    unsigned_cred = template % (reference, expiration_str, version, 
-                                user_keyid, user_urn, user_keyid, tool_keyid, tool_urn,
-                                reference, reference)
+    # apply the format - itself uses signature_format which uses 'refid'
+    unsigned_cred = credential_format.format(**locals())
     unsigned_cred_filename = write_to_tempfile(unsigned_cred)
 
     # Now sign the file with xmlsec1
     # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem 
     # --output signed.xml tosign.xml
-    pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(),
-                         ma_gid.get_filename())
+    pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
+                             ma_gid.get_filename())
     xmlsec1 = Credential.get_xmlsec1_path()
     if not xmlsec1:
         raise Exception("Could not locate required 'xmlsec1' program")
@@ -379,8 +378,8 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
     if sign_proc_output == None:
         logger.info("xmlsec1 returns empty output")
     else:
-        logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" % 
-                    (tool_urn, user_urn, cred_filename))
+        logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
+                    .format(tool_urn, user_urn, cred_filename))
     os.unlink(unsigned_cred_filename)
 
 
@@ -451,5 +450,5 @@ if __name__ == "__main__":
                                trusted_roots)
 
 
-    print('SPEAKS_FOR = %s' % (gid != tool_gid))
-    print("CERT URN = %s" % gid.get_urn())
+    print('SPEAKS_FOR = {}'.format(gid != tool_gid))
+    print("CERT URN = {}".format(gid.get_urn()))