X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Ftrust%2Fspeaksfor_util.py;h=7019e3c3a3bf4b34367511593a6b882e2a5fced7;hb=4a9e6751f9f396f463932133b9d62fc925a99ef6;hp=af2f8b542aa542ecf823a5aa080d2e25c9c04dc4;hpb=73e68cb7d9d0a0966832d610f516e390fbf534c6;p=sfa.git diff --git a/sfa/trust/speaksfor_util.py b/sfa/trust/speaksfor_util.py index af2f8b54..7019e3c3 100644 --- a/sfa/trust/speaksfor_util.py +++ b/sfa/trust/speaksfor_util.py @@ -21,6 +21,8 @@ # IN THE WORK. #---------------------------------------------------------------------- + + import datetime from dateutil import parser as du_parser, tz as du_tz import optparse @@ -29,17 +31,18 @@ import subprocess import sys import tempfile from xml.dom.minidom import * -from StringIO import StringIO from sfa.util.sfatime import SFATIME_FORMAT from sfa.trust.certificate import Certificate -from sfa.trust.credential import Credential, signature_template, HAVELXML +from sfa.trust.credential import Credential, signature_format, HAVELXML from sfa.trust.abac_credential import ABACCredential, ABACElement from sfa.trust.credential_factory import CredentialFactory from sfa.trust.gid import GID +from sfa.util.sfalogging import logger +from sfa.util.py23 import StringIO -# Routine to validate that a speaks-for credential +# Routine to validate that a speaks-for credential # says what it claims to say: # It is a signed credential wherein the signer S is attesting to the # ABAC statement: @@ -51,12 +54,17 @@ from sfa.trust.gid import GID # Simple XML helper functions # Find the text associated with first child text node + + def findTextChildValue(root): child = findChildNamed(root, '#text') - if child: return str(child.nodeValue) + if child: + return str(child.nodeValue) return None # Find first child with given name + + def findChildNamed(root, name): for child in root.childNodes: if child.nodeName == name: @@ -64,6 +72,8 @@ def findChildNamed(root, name): return None # Write a string to a tempfile, returning name of tempfile + + def write_to_tempfile(str): str_fd, str_file = tempfile.mkstemp() if str: @@ -72,6 +82,8 @@ def write_to_tempfile(str): return str_file # Run a subprocess and return output + + def run_subprocess(cmd, stdout, stderr): try: proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr) @@ -82,7 +94,9 @@ def run_subprocess(cmd, stdout, stderr): output = proc.returncode return output except Exception as e: - raise Exception("Failed call to subprocess '%s': %s" % (" ".join(cmd), e)) + raise Exception( + "Failed call to subprocess '{}': {}".format(" ".join(cmd), e)) + def get_cert_keyid(gid): """Extract the subject key identifier from the given certificate. @@ -98,6 +112,8 @@ def get_cert_keyid(gid): return keyid # Pull the cert out of a list of certs in a PEM formatted cert string + + def grab_toplevel_cert(cert): start_label = '-----BEGIN CERTIFICATE-----' if cert.find(start_label) > -1: @@ -114,9 +130,9 @@ def grab_toplevel_cert(cert): # Validate that the given speaks-for credential represents the # statement User.speaks_for(User)<-Tool for the given user and tool certs # and was signed by the user -# Return: -# Boolean indicating whether the given credential -# is not expired +# Return: +# Boolean indicating whether the given credential +# is not expired # is an ABAC credential # was signed by the user associated with the speaking_for_urn # is verified by xmlsec1 @@ -126,31 +142,35 @@ def grab_toplevel_cert(cert): # String user certificate of speaking_for user if the above tests succeed # (None otherwise) # Error message indicating why the speaks_for call failed ("" otherwise) + + def verify_speaks_for(cred, tool_gid, speaking_for_urn, trusted_roots, schema=None, logger=None): # Credential has not expired if cred.expiration and cred.expiration < datetime.datetime.utcnow(): - return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred()) + return False, None, "ABAC Credential expired at {} ({})"\ + .format(cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred()) # Must be ABAC if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE: - return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type + return False, None, "Credential not of type ABAC but {}".format(cred.get_cred_type) if cred.signature is None or cred.signature.gid is None: - return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred() + return False, None, "Credential malformed: missing signature or signer cert. Cred: {}"\ + .format(cred.pretty_cred()) user_gid = cred.signature.gid user_urn = user_gid.get_urn() # URN of signer from cert must match URN of 'speaking-for' argument if user_urn != speaking_for_urn: - return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \ - (user_urn, speaking_for_urn, cred.pretty_cred()) + return False, None, "User URN from cred doesn't match speaking_for URN: {} != {} (cred {})"\ + .format(user_urn, speaking_for_urn, cred.pretty_cred()) tails = cred.get_tails() - if len(tails) != 1: - return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \ - (len(tails), cred.pretty_cred()) + if len(tails) != 1: + return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\ + .format(len(tails), cred.pretty_cred()) user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) @@ -166,11 +186,12 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, if trusted_roots: for x in trusted_roots: cert_args += ['--trusted-pem', x.filename] - # FIXME: Why do we not need to specify the --node-id option as credential.py does? + # FIXME: Why do we not need to specify the --node-id option as + # credential.py does? xmlsec1 = cred.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") - xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file] + xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file] output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE) os.unlink(cred_file) if output != 0: @@ -185,13 +206,14 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, msg = verified[mstart:mend] if msg == "": msg = output - return False, None, "ABAC credential failed to xmlsec1 verify: %s" % msg + return False, None, "ABAC credential failed to xmlsec1 verify: {}".format(msg) # Must say U.speaks_for(U)<-T if user_keyid != principal_keyid or \ tool_keyid != subject_keyid or \ - role != ('speaks_for_%s' % user_keyid): - return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred() + role != ('speaks_for_{}'.format(user_keyid)): + return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T ({})"\ + .format(cred.pretty_cred()) # If schema provided, validate against schema if HAVELXML and schema and os.path.exists(schema): @@ -201,23 +223,23 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, xmlschema = etree.XMLSchema(schema_doc) if not xmlschema.validate(tree): error = xmlschema.error_log.last_error - message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line) - return False, None, ("XML Credential schema invalid: %s" % message) + message = "{}: {} (line {})".format( + cred.pretty_cred(), error.message, error.line) + return False, None, ("XML Credential schema invalid: {}".format(message)) if trusted_roots: # User certificate must validate against trusted roots try: user_gid.verify_chain(trusted_roots) - except Exception, e: + except Exception as e: return False, None, \ - "Cred signer (user) cert not trusted: %s" % e + "Cred signer (user) cert not trusted: {}".format(e) # Tool certificate must validate against trusted roots try: tool_gid.verify_chain(trusted_roots) - except Exception, e: - return False, None, \ - "Tool cert not trusted: %s" % e + except Exception as e: + return False, None, "Tool cert not trusted: {}".format(e) return True, user_gid, "" @@ -227,20 +249,23 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, # # credentials is a list of GENI-style credentials: # Either a cred string xml string, or Credential object of a tuple -# [{'geni_type' : geni_type, 'geni_value : cred_value, +# [{'geni_type' : geni_type, 'geni_value : cred_value, # 'geni_version' : version}] # caller_gid is the raw X509 cert gid # options is the dictionary of API-provided options # trusted_roots is a list of Certificate objects from the system # trusted_root directory # Optionally, provide an XML schema against which to validate the credential + + def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None): if speaking_for_xrn: - speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn() + speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn() for cred in credentials: # Skip things that aren't ABAC credentials if type(cred) == dict: - if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue + if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: + continue cred_value = cred['geni_value'] elif isinstance(cred, Credential): if not isinstance(cred, ABACCredential): @@ -248,34 +273,36 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus else: cred_value = cred else: - if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue + if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: + continue cred_value = cred # If the cred_value is xml, create the object if not isinstance(cred_value, ABACCredential): cred = CredentialFactory.createCred(cred_value) -# print "Got a cred to check speaksfor for: %s" % cred.pretty_cred() +# print("Got a cred to check speaksfor for: {}".format(cred.pretty_cred())) # #cred.dump(True, True) -# print "Caller: %s" % caller_gid.dump_string(2, True) +# print("Caller: {}".format(caller_gid.dump_string(2, True))) # See if this is a valid speaks_for is_valid_speaks_for, user_gid, msg = \ verify_speaks_for(cred, - caller_gid, speaking_for_urn, \ - trusted_roots, schema, logger=logger) + caller_gid, speaking_for_urn, + trusted_roots, schema, logger=logger) logger.info(msg) if is_valid_speaks_for: - return user_gid # speaks-for + return user_gid # speaks-for else: - if logger: - logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg) - else: - print "Got a speaks-for option but not a valid speaks_for with this credential: " + msg - return caller_gid # Not speaks-for + logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}" + .format(msg)) + return caller_gid # Not speaks-for + +# Create an ABAC Speaks For credential using the ABACCredential object and +# it's encode&sign methods + -# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365): - print "Creating ABAC SpeaksFor using ABACCredential...\n" + logger.info("Creating ABAC SpeaksFor using ABACCredential...\n") # Write out the user cert from tempfile import mkstemp ma_str = ma_gid.save_to_string() @@ -294,9 +321,11 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam user_urn = user_gid.get_urn() user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) - cred.head = ABACElement(user_keyid, user_urn, "speaks_for_%s" % user_keyid) + cred.head = ABACElement(user_keyid, user_urn, + "speaks_for_{}".format(user_keyid)) cred.tails.append(ABACElement(tool_keyid, tool_urn)) - cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days)) + cred.set_expiration(datetime.datetime.utcnow() + + datetime.timedelta(days=dur_days)) cred.expiration = cred.expiration.replace(microsecond=0) # Produce the cred XML @@ -306,50 +335,48 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam cred.sign() # Save it cred.save_to_file(cred_filename) - print "Created ABAC credential: '%s' in file %s" % \ - (cred.pretty_cred(), cred_filename) + logger.info("Created ABAC credential: '{}' in file {}" + .format(cred.pretty_cred(), cred_filename)) # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted -def create_speaks_for(tool_gid, user_gid, ma_gid, \ - user_key_file, cred_filename, dur_days=365): + + +def create_speaks_for(tool_gid, user_gid, ma_gid, + user_key_file, cred_filename, dur_days=365): tool_urn = tool_gid.get_urn() user_urn = user_gid.get_urn() - header = '' - reference = "ref0" - signature_block = \ - '\n' + \ - signature_template + \ - '' - template = header + '\n' + \ - '\n' + \ - 'abac\n' + \ - '\n' +\ - '\n' + \ - '\n' + \ - '\n' + \ - '\n' + \ - '\n' + \ - '%s' +\ - '\n' + \ - '\n' + \ - '%s\n' + \ - '\n' + \ - '%s%s\n' +\ - 'speaks_for_%s\n' + \ - '\n' + \ - '\n' +\ - '%s%s\n' +\ - '\n' +\ - '\n' + \ - '\n' + \ - '\n' + \ - signature_block + \ - '\n' - + refid = "ref0" + + credential_format = """\ + + + + abac + + + + + + + {expiration_str} + + + {version} + + {user_keyid}{user_urn} + speaks_for_{user_keyid} + + + {tool_keyid}/keyid>{tool_urn} + + + + + """ + signature_format + """\ + +\ +""" credential_duration = datetime.timedelta(days=dur_days) expiration = datetime.datetime.utcnow() + credential_duration @@ -358,29 +385,28 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, \ user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) - unsigned_cred = template % (reference, expiration_str, version, \ - user_keyid, user_urn, user_keyid, tool_keyid, tool_urn, \ - reference, reference) + # apply the format - itself uses signature_format which uses 'refid' + unsigned_cred = credential_format.format(**locals()) unsigned_cred_filename = write_to_tempfile(unsigned_cred) # Now sign the file with xmlsec1 - # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem + # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem # --output signed.xml tosign.xml - pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(), - ma_gid.get_filename()) - xmlsec1 = cred.get_xmlsec1_path() + pems = "{},{},{}".format(user_key_file, user_gid.get_filename(), + ma_gid.get_filename()) + xmlsec1 = Credential.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") - cmd = [ xmlsec1, '--sign', '--privkey-pem', pems, + cmd = [xmlsec1, '--sign', '--privkey-pem', pems, '--output', cred_filename, unsigned_cred_filename] -# print " ".join(cmd) +# print(" ".join(cmd)) sign_proc_output = run_subprocess(cmd, stdout=subprocess.PIPE, stderr=None) if sign_proc_output == None: - print "OUTPUT = %s" % sign_proc_output + logger.info("xmlsec1 returns empty output") else: - print "Created ABAC credential: '%s speaks_for %s' in file %s" % \ - (tool_urn, user_urn, cred_filename) + logger.info("Created ABAC credential: '{} speaks_for {}' in file {}" + .format(tool_urn, user_urn, cred_filename)) os.unlink(unsigned_cred_filename) @@ -388,19 +414,19 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, \ if __name__ == "__main__": parser = optparse.OptionParser() - parser.add_option('--cred_file', + parser.add_option('--cred_file', help='Name of credential file') - parser.add_option('--tool_cert_file', + parser.add_option('--tool_cert_file', help='Name of file containing tool certificate') - parser.add_option('--user_urn', + parser.add_option('--user_urn', help='URN of speaks-for user') - parser.add_option('--user_cert_file', + parser.add_option('--user_cert_file', help="filename of x509 certificate of signing user") - parser.add_option('--ma_cert_file', + parser.add_option('--ma_cert_file', help="filename of x509 cert of MA that signed user cert") - parser.add_option('--user_key_file', + parser.add_option('--user_key_file', help="filename of private key of signing user") - parser.add_option('--trusted_roots_directory', + parser.add_option('--trusted_roots_directory', help='Directory of trusted root certs') parser.add_option('--create', help="name of file of ABAC speaksfor cred to create") @@ -413,21 +439,21 @@ if __name__ == "__main__": if options.create: if options.user_cert_file and options.user_key_file \ - and options.ma_cert_file: + and options.ma_cert_file: user_gid = GID(filename=options.user_cert_file) ma_gid = GID(filename=options.ma_cert_file) if options.useObject: - create_sign_abaccred(tool_gid, user_gid, ma_gid, \ - options.user_key_file, \ - options.create) + create_sign_abaccred(tool_gid, user_gid, ma_gid, + options.user_key_file, + options.create) else: - create_speaks_for(tool_gid, user_gid, ma_gid, \ - options.user_key_file, \ - options.create) + create_speaks_for(tool_gid, user_gid, ma_gid, + options.user_key_file, + options.create) else: - print "Usage: --create cred_file " + \ - "--user_cert_file user_cert_file" + \ - " --user_key_file user_key_file --ma_cert_file ma_cert_file" + print("Usage: --create cred_file " + + "--user_cert_file user_cert_file" + + " --user_key_file user_key_file --ma_cert_file ma_cert_file") sys.exit() user_urn = options.user_urn @@ -438,18 +464,17 @@ if __name__ == "__main__": trusted_roots_directory = options.trusted_roots_directory trusted_roots = \ - [Certificate(filename=os.path.join(trusted_roots_directory, file)) \ - for file in os.listdir(trusted_roots_directory) \ - if file.endswith('.pem') and file != 'CATedCACerts.pem'] + [Certificate(filename=os.path.join(trusted_roots_directory, file)) + for file in os.listdir(trusted_roots_directory) + if file.endswith('.pem') and file != 'CATedCACerts.pem'] cred = open(options.cred_file).read() - creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred, - 'geni_version' : '1'}] - gid = determine_speaks_for(None, creds, tool_gid, \ - {'geni_speaking_for' : user_urn}, \ - trusted_roots) - + creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred, + 'geni_version': '1'}] + gid = determine_speaks_for(None, creds, tool_gid, + {'geni_speaking_for': user_urn}, + trusted_roots) - print 'SPEAKS_FOR = %s' % (gid != tool_gid) - print "CERT URN = %s" % gid.get_urn() + print('SPEAKS_FOR = {}'.format(gid != tool_gid)) + print("CERT URN = {}".format(gid.get_urn()))