import sys
import tempfile
from xml.dom.minidom import *
-from StringIO import StringIO
from sfa.util.sfatime import SFATIME_FORMAT
from sfa.trust.certificate import Certificate
-from sfa.trust.credential import Credential, signature_template, HAVELXML
+from sfa.trust.credential import Credential, signature_format, HAVELXML
from sfa.trust.abac_credential import ABACCredential, ABACElement
from sfa.trust.credential_factory import CredentialFactory
from sfa.trust.gid import GID
from sfa.util.sfalogging import logger
+from sfa.util.py23 import StringIO
-# Routine to validate that a speaks-for credential
+# Routine to validate that a speaks-for credential
# says what it claims to say:
# It is a signed credential wherein the signer S is attesting to the
# ABAC statement:
# Simple XML helper functions
# Find the text associated with first child text node
+
+
def findTextChildValue(root):
child = findChildNamed(root, '#text')
- if child: return str(child.nodeValue)
+ if child:
+ return str(child.nodeValue)
return None
# Find first child with given name
+
+
def findChildNamed(root, name):
for child in root.childNodes:
if child.nodeName == name:
return None
# Write a string to a tempfile, returning name of tempfile
+
+
def write_to_tempfile(str):
str_fd, str_file = tempfile.mkstemp()
if str:
return str_file
# Run a subprocess and return output
+
+
def run_subprocess(cmd, stdout, stderr):
try:
proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
output = proc.returncode
return output
except Exception as e:
- raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e))
+ raise Exception(
+ "Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
+
def get_cert_keyid(gid):
"""Extract the subject key identifier from the given certificate.
return keyid
# Pull the cert out of a list of certs in a PEM formatted cert string
+
+
def grab_toplevel_cert(cert):
start_label = '-----BEGIN CERTIFICATE-----'
if cert.find(start_label) > -1:
# Validate that the given speaks-for credential represents the
# statement User.speaks_for(User)<-Tool for the given user and tool certs
# and was signed by the user
-# Return:
-# Boolean indicating whether the given credential
-# is not expired
+# Return:
+# Boolean indicating whether the given credential
+# is not expired
# is an ABAC credential
# was signed by the user associated with the speaking_for_urn
# is verified by xmlsec1
# String user certificate of speaking_for user if the above tests succeed
# (None otherwise)
# Error message indicating why the speaks_for call failed ("" otherwise)
+
+
def verify_speaks_for(cred, tool_gid, speaking_for_urn,
trusted_roots, schema=None, logger=None):
# Credential has not expired
if cred.expiration and cred.expiration < datetime.datetime.utcnow():
- return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
+ return False, None, "ABAC Credential expired at {} ({})"\
+ .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
# Must be ABAC
if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
- return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type
+ return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type)
if cred.signature is None or cred.signature.gid is None:
- return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred()
+ return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\
+ .format(cred.pretty_cred())
user_gid = cred.signature.gid
user_urn = user_gid.get_urn()
# URN of signer from cert must match URN of 'speaking-for' argument
if user_urn != speaking_for_urn:
- return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \
- (user_urn, speaking_for_urn, cred.pretty_cred())
+ return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\
+ .format(user_urn, speaking_for_urn, cred.pretty_cred())
tails = cred.get_tails()
- if len(tails) != 1:
- return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \
- (len(tails), cred.pretty_cred())
+ if len(tails) != 1:
+ return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
+ .format(len(tails), cred.pretty_cred())
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
if trusted_roots:
for x in trusted_roots:
cert_args += ['--trusted-pem', x.filename]
- # FIXME: Why do we not need to specify the --node-id option as credential.py does?
+ # FIXME: Why do we not need to specify the --node-id option as
+ # credential.py does?
xmlsec1 = cred.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
- xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file]
+ xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file]
output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
os.unlink(cred_file)
if output != 0:
msg = verified[mstart:mend]
if msg == "":
msg = output
- return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg
+ return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg)
# Must say U.speaks_for(U)<-T
if user_keyid != principal_keyid or \
tool_keyid != subject_keyid or \
- role != ('speaks_for_%s' % user_keyid):
- return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred()
+ role != ('speaks_for_{}'.format(user_keyid)):
+ return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\
+ .format(cred.pretty_cred())
# If schema provided, validate against schema
if HAVELXML and schema and os.path.exists(schema):
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line)
- return False, None, ("XML Credential schema invalid: %s" % message)
+ message = "{}: {} (line {})".format(
+ cred.pretty_cred(), error.message, error.line)
+ return False, None, ("XML Credential schema invalid: {}".format(message))
if trusted_roots:
# User certificate must validate against trusted roots
user_gid.verify_chain(trusted_roots)
except Exception as e:
return False, None, \
- "Cred signer (user) cert not trusted: %s" % e
+ "Cred signer (user) cert not trusted: {}".format(e)
# Tool certificate must validate against trusted roots
try:
tool_gid.verify_chain(trusted_roots)
except Exception as e:
- return False, None, \
- "Tool cert not trusted: %s" % e
+ return False, None, "Tool cert not trusted: {}".format(e)
return True, user_gid, ""
#
# credentials is a list of GENI-style credentials:
# Either a cred string xml string, or Credential object of a tuple
-# [{'geni_type' : geni_type, 'geni_value : cred_value,
+# [{'geni_type' : geni_type, 'geni_value : cred_value,
# 'geni_version' : version}]
# caller_gid is the raw X509 cert gid
# options is the dictionary of API-provided options
# trusted_roots is a list of Certificate objects from the system
# trusted_root directory
# Optionally, provide an XML schema against which to validate the credential
+
+
def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None):
if speaking_for_xrn:
- speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn()
+ speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn()
for cred in credentials:
# Skip things that aren't ABAC credentials
if type(cred) == dict:
- if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+ if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE:
+ continue
cred_value = cred['geni_value']
elif isinstance(cred, Credential):
if not isinstance(cred, ABACCredential):
else:
cred_value = cred
else:
- if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+ if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE:
+ continue
cred_value = cred
# If the cred_value is xml, create the object
if not isinstance(cred_value, ABACCredential):
cred = CredentialFactory.createCred(cred_value)
-# print("Got a cred to check speaksfor for: %s" % cred.pretty_cred())
+# print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred()))
# #cred.dump(True, True)
-# print("Caller: %s" % caller_gid.dump_string(2, True))
+# print("Caller: {}".format(caller_gid.dump_string(2, True)))
# See if this is a valid speaks_for
is_valid_speaks_for, user_gid, msg = \
verify_speaks_for(cred,
trusted_roots, schema, logger=logger)
logger.info(msg)
if is_valid_speaks_for:
- return user_gid # speaks-for
+ return user_gid # speaks-for
else:
- logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
- return caller_gid # Not speaks-for
+ logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
+ .format(msg))
+ return caller_gid # Not speaks-for
+
+# Create an ABAC Speaks For credential using the ABACCredential object and
+# it's encode&sign methods
+
-# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
# Write out the user cert
user_urn = user_gid.get_urn()
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid)
+ cred.head = ABACElement(user_keyid, user_urn,
+ "speaks_for_{}".format(user_keyid))
cred.tails.append(ABACElement(tool_keyid, tool_urn))
- cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
+ cred.set_expiration(datetime.datetime.utcnow() +
+ datetime.timedelta(days=dur_days))
cred.expiration = cred.expiration.replace(microsecond=0)
# Produce the cred XML
cred.sign()
# Save it
cred.save_to_file(cred_filename)
- logger.info("Created ABAC credential: '%s' in file %s" %
- (cred.pretty_cred(), cred_filename))
+ logger.info("Created ABAC credential: '{}' in file {}"
+ .format(cred.pretty_cred(), cred_filename))
# FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
-def create_speaks_for(tool_gid, user_gid, ma_gid,
+
+
+def create_speaks_for(tool_gid, user_gid, ma_gid,
user_key_file, cred_filename, dur_days=365):
tool_urn = tool_gid.get_urn()
user_urn = user_gid.get_urn()
- header = '<?xml version="1.0" encoding="UTF-8"?>'
- reference = "ref0"
- signature_block = \
- '<signatures>\n' + \
- signature_template + \
- '</signatures>'
- template = header + '\n' + \
- '<signed-credential '
- template += 'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd"'
- template += '>\n' + \
- '<credential xml:id="%s">\n' + \
- '<type>abac</type>\n' + \
- '<serial/>\n' +\
- '<owner_gid/>\n' + \
- '<owner_urn/>\n' + \
- '<target_gid/>\n' + \
- '<target_urn/>\n' + \
- '<uuid/>\n' + \
- '<expires>%s</expires>' +\
- '<abac>\n' + \
- '<rt0>\n' + \
- '<version>%s</version>\n' + \
- '<head>\n' + \
- '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
- '<role>speaks_for_%s</role>\n' + \
- '</head>\n' + \
- '<tail>\n' +\
- '<ABACprincipal><keyid>%s</keyid><mnemonic>%s</mnemonic></ABACprincipal>\n' +\
- '</tail>\n' +\
- '</rt0>\n' + \
- '</abac>\n' + \
- '</credential>\n' + \
- signature_block + \
- '</signed-credential>\n'
-
+ refid = "ref0"
+
+ credential_format = """\
+<?xml version="1.0" encoding="UTF-8"?>
+<signed-credential xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.geni.net/resources/credential/2/credential.xsd" xsi:schemaLocation="http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd">
+ <credential xml:id="{refid}">
+ <type>abac</type>
+ <serial/>
+ <owner_gid/>
+ <owner_urn/>
+ <target_gid/>
+ <target_urn/>
+ <uuid/>
+ <expires>{expiration_str}</expires>
+ <abac>
+ <rt0>
+ <version>{version}</version>
+ <head>
+ <ABACprincipal><keyid>{user_keyid}</keyid><mnemonic>{user_urn}</mnemonic></ABACprincipal>
+ <role>speaks_for_{user_keyid}</role>
+ </head>
+ <tail>
+ <ABACprincipal><keyid>{tool_keyid}/keyid><mnemonic>{tool_urn}</mnemonic></ABACprincipal>
+ </tail>
+ </rt0>
+ </abac>
+ </credential>
+ <signatures>""" + signature_format + """\
+ </signatures>
+</signed-credential>\
+"""
credential_duration = datetime.timedelta(days=dur_days)
expiration = datetime.datetime.utcnow() + credential_duration
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- unsigned_cred = template % (reference, expiration_str, version,
- user_keyid, user_urn, user_keyid, tool_keyid, tool_urn,
- reference, reference)
+ # apply the format - itself uses signature_format which uses 'refid'
+ unsigned_cred = credential_format.format(**locals())
unsigned_cred_filename = write_to_tempfile(unsigned_cred)
# Now sign the file with xmlsec1
- # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
+ # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
# --output signed.xml tosign.xml
- pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(),
- ma_gid.get_filename())
+ pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
+ ma_gid.get_filename())
xmlsec1 = Credential.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
- cmd = [ xmlsec1, '--sign', '--privkey-pem', pems,
+ cmd = [xmlsec1, '--sign', '--privkey-pem', pems,
'--output', cred_filename, unsigned_cred_filename]
# print(" ".join(cmd))
if sign_proc_output == None:
logger.info("xmlsec1 returns empty output")
else:
- logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" %
- (tool_urn, user_urn, cred_filename))
+ logger.info("Created ABAC credential: '{} speaks_for {}' in file {}"
+ .format(tool_urn, user_urn, cred_filename))
os.unlink(unsigned_cred_filename)
if __name__ == "__main__":
parser = optparse.OptionParser()
- parser.add_option('--cred_file',
+ parser.add_option('--cred_file',
help='Name of credential file')
- parser.add_option('--tool_cert_file',
+ parser.add_option('--tool_cert_file',
help='Name of file containing tool certificate')
- parser.add_option('--user_urn',
+ parser.add_option('--user_urn',
help='URN of speaks-for user')
- parser.add_option('--user_cert_file',
+ parser.add_option('--user_cert_file',
help="filename of x509 certificate of signing user")
- parser.add_option('--ma_cert_file',
+ parser.add_option('--ma_cert_file',
help="filename of x509 cert of MA that signed user cert")
- parser.add_option('--user_key_file',
+ parser.add_option('--user_key_file',
help="filename of private key of signing user")
- parser.add_option('--trusted_roots_directory',
+ parser.add_option('--trusted_roots_directory',
help='Directory of trusted root certs')
parser.add_option('--create',
help="name of file of ABAC speaksfor cred to create")
if options.create:
if options.user_cert_file and options.user_key_file \
- and options.ma_cert_file:
+ and options.ma_cert_file:
user_gid = GID(filename=options.user_cert_file)
ma_gid = GID(filename=options.ma_cert_file)
if options.useObject:
options.user_key_file,
options.create)
else:
- print("Usage: --create cred_file " +
- "--user_cert_file user_cert_file" +
+ print("Usage: --create cred_file " +
+ "--user_cert_file user_cert_file" +
" --user_key_file user_key_file --ma_cert_file ma_cert_file")
sys.exit()
trusted_roots_directory = options.trusted_roots_directory
trusted_roots = \
- [Certificate(filename=os.path.join(trusted_roots_directory, file))
- for file in os.listdir(trusted_roots_directory)
- if file.endswith('.pem') and file != 'CATedCACerts.pem']
+ [Certificate(filename=os.path.join(trusted_roots_directory, file))
+ for file in os.listdir(trusted_roots_directory)
+ if file.endswith('.pem') and file != 'CATedCACerts.pem']
cred = open(options.cred_file).read()
- creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred,
- 'geni_version' : '1'}]
+ creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred,
+ 'geni_version': '1'}]
gid = determine_speaks_for(None, creds, tool_gid,
- {'geni_speaking_for' : user_urn},
+ {'geni_speaking_for': user_urn},
trusted_roots)
-
- print('SPEAKS_FOR = %s' % (gid != tool_gid))
- print("CERT URN = %s" % gid.get_urn())
+ print('SPEAKS_FOR = {}'.format(gid != tool_gid))
+ print("CERT URN = {}".format(gid.get_urn()))