X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Ftrust%2Fspeaksfor_util.py;h=640d5125714609ae24b10ea74458935e92414352;hb=dfafd9825fecddbbd1ea4c7093766c386f660c29;hp=83178225bc52aacd8de42c32f66cdf458e06eee8;hpb=24e34365889f45eda036c974816ec6e9f5ebab09;p=sfa.git
diff --git a/sfa/trust/speaksfor_util.py b/sfa/trust/speaksfor_util.py
index 83178225..640d5125 100644
--- a/sfa/trust/speaksfor_util.py
+++ b/sfa/trust/speaksfor_util.py
@@ -35,7 +35,7 @@ from xml.dom.minidom import *
from sfa.util.sfatime import SFATIME_FORMAT
from sfa.trust.certificate import Certificate
-from sfa.trust.credential import Credential, signature_template, HAVELXML
+from sfa.trust.credential import Credential, signature_format, HAVELXML
from sfa.trust.abac_credential import ABACCredential, ABACElement
from sfa.trust.credential_factory import CredentialFactory
from sfa.trust.gid import GID
@@ -56,7 +56,8 @@ from sfa.util.py23 import StringIO
# Find the text associated with first child text node
def findTextChildValue(root):
child = findChildNamed(root, '#text')
- if child: return str(child.nodeValue)
+ if child:
+ return str(child.nodeValue)
return None
# Find first child with given name
@@ -85,7 +86,7 @@ def run_subprocess(cmd, stdout, stderr):
output = proc.returncode
return output
except Exception as e:
- raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e))
+ raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
def get_cert_keyid(gid):
"""Extract the subject key identifier from the given certificate.
@@ -134,26 +135,28 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
# Credential has not expired
if cred.expiration and cred.expiration < datetime.datetime.utcnow():
- return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
+ return False, None, "ABAC Credential expired at {} ({})"\
+ .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
# Must be ABAC
if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
- return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type
+ return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
if cred.signature is None or cred.signature.gid is None:
- return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred()
+ return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
+ .format(cred.pretty_cred())
user_gid = cred.signature.gid
user_urn = user_gid.get_urn()
# URN of signer from cert must match URN of 'speaking-for' argument
if user_urn != speaking_for_urn:
- return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \
- (user_urn, speaking_for_urn, cred.pretty_cred())
+ return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
+ .format(user_urn, speaking_for_urn, cred.pretty_cred())
tails = cred.get_tails()
if len(tails) != 1:
- return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \
- (len(tails), cred.pretty_cred())
+ return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
+ .format(len(tails), cred.pretty_cred())
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
@@ -188,13 +191,14 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
msg = verified[mstart:mend]
if msg == "":
msg = output
- return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg
+ return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
# Must say U.speaks_for(U)<-T
if user_keyid != principal_keyid or \
tool_keyid != subject_keyid or \
- role != ('speaks_for_%s' % user_keyid):
- return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred()
+ role != ('speaks_for_{}'.format(user_keyid)):
+ return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
+ .format(cred.pretty_cred())
# If schema provided, validate against schema
if HAVELXML and schema and os.path.exists(schema):
@@ -204,8 +208,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line)
- return False, None, ("XML Credential schema invalid: %s" % message)
+ message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
+ return False, None, ("XML Credential schema invalid: {}".format(message))
if trusted_roots:
# User certificate must validate against trusted roots
@@ -213,14 +217,13 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn,
user_gid.verify_chain(trusted_roots)
except Exception as e:
return False, None, \
- "Cred signer (user) cert not trusted: %s" % e
+ "Cred signer (user) cert not trusted: {}".format(e)
# Tool certificate must validate against trusted roots
try:
tool_gid.verify_chain(trusted_roots)
except Exception as e:
- return False, None, \
- "Tool cert not trusted: %s" % e
+ return False, None, "Tool cert not trusted: {}".format(e)
return True, user_gid, ""
@@ -258,9 +261,9 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
if not isinstance(cred_value, ABACCredential):
cred = CredentialFactory.createCred(cred_value)
-# print("Got a cred to check speaksfor for: %s" % cred.pretty_cred())
+# print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
# #cred.dump(True, True)
-# print("Caller: %s" % caller_gid.dump_string(2, True))
+# print("Caller: {}".format(caller_gid.dump_string(2, True)))
# See if this is a valid speaks_for
is_valid_speaks_for, user_gid, msg = \
verify_speaks_for(cred,
@@ -270,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus
if is_valid_speaks_for:
return user_gid # speaks-for
else:
- logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
+ logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
+ .format(msg))
return caller_gid # Not speaks-for
# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
@@ -294,7 +298,7 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
user_urn = user_gid.get_urn()
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid)
+ cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
cred.tails.append(ABACElement(tool_keyid, tool_urn))
cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
cred.expiration = cred.expiration.replace(microsecond=0)
@@ -306,8 +310,8 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam
cred.sign()
# Save it
cred.save_to_file(cred_filename)
- logger.info("Created ABAC credential: '%s' in file %s" %
- (cred.pretty_cred(), cred_filename))
+ logger.info("Created ABAC credential: '{}' in file {}"
+ .format(cred.pretty_cred(), cred_filename))
# FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
def create_speaks_for(tool_gid, user_gid, ma_gid,
@@ -315,41 +319,37 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
tool_urn = tool_gid.get_urn()
user_urn = user_gid.get_urn()
- header = ''
- reference = "ref0"
- signature_block = \
- '\n' + \
- signature_template + \
- ''
- template = header + '\n' + \
- '\n' + \
- '\n' + \
- 'abac\n' + \
- '\n' +\
- '\n' + \
- '\n' + \
- '\n' + \
- '\n' + \
- '\n' + \
- '%s' +\
- '\n' + \
- '\n' + \
- '%s\n' + \
- '\n' + \
- '%s%s\n' +\
- 'speaks_for_%s\n' + \
- '\n' + \
- '\n' +\
- '%s%s\n' +\
- '\n' +\
- '\n' + \
- '\n' + \
- '\n' + \
- signature_block + \
- '\n'
-
+ refid = "ref0"
+
+ credential_format = """\
+
+
+
+ abac
+
+
+
+
+
+
+ {expiration_str}
+
+
+ {version}
+
+ {user_keyid}{user_urn}
+ speaks_for_{user_keyid}
+
+
+ {tool_keyid}/keyid>{tool_urn}
+
+
+
+
+ """ + signature_format + """\
+
+\
+"""
credential_duration = datetime.timedelta(days=dur_days)
expiration = datetime.datetime.utcnow() + credential_duration
@@ -358,16 +358,15 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- unsigned_cred = template % (reference, expiration_str, version,
- user_keyid, user_urn, user_keyid, tool_keyid, tool_urn,
- reference, reference)
+ # apply the format - itself uses signature_format which uses 'refid'
+ unsigned_cred = credential_format.format(**locals())
unsigned_cred_filename = write_to_tempfile(unsigned_cred)
# Now sign the file with xmlsec1
# xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
# --output signed.xml tosign.xml
- pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(),
- ma_gid.get_filename())
+ pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
+ ma_gid.get_filename())
xmlsec1 = Credential.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
@@ -379,8 +378,8 @@ def create_speaks_for(tool_gid, user_gid, ma_gid,
if sign_proc_output == None:
logger.info("xmlsec1 returns empty output")
else:
- logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" %
- (tool_urn, user_urn, cred_filename))
+ logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
+ .format(tool_urn, user_urn, cred_filename))
os.unlink(unsigned_cred_filename)
@@ -451,5 +450,5 @@ if __name__ == "__main__":
trusted_roots)
- print('SPEAKS_FOR = %s' % (gid != tool_gid))
- print("CERT URN = %s" % gid.get_urn())
+ print('SPEAKS_FOR = {}'.format(gid != tool_gid))
+ print("CERT URN = {}".format(gid.get_urn()))