X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;ds=sidebyside;f=sfa%2Ftrust%2Fspeaksfor_util.py;h=12ee8d042bddbb4ee4b1e9c8b6e166f71917c17c;hb=04a3f20dc71bf8b3f96b1e3172623aa346a638a7;hp=640d5125714609ae24b10ea74458935e92414352;hpb=c1c136b3042a24604823c6da135308b7c031c234;p=sfa.git diff --git a/sfa/trust/speaksfor_util.py b/sfa/trust/speaksfor_util.py index 640d5125..12ee8d04 100644 --- a/sfa/trust/speaksfor_util.py +++ b/sfa/trust/speaksfor_util.py @@ -42,7 +42,7 @@ from sfa.trust.gid import GID from sfa.util.sfalogging import logger from sfa.util.py23 import StringIO -# Routine to validate that a speaks-for credential +# Routine to validate that a speaks-for credential # says what it claims to say: # It is a signed credential wherein the signer S is attesting to the # ABAC statement: @@ -54,6 +54,8 @@ from sfa.util.py23 import StringIO # Simple XML helper functions # Find the text associated with first child text node + + def findTextChildValue(root): child = findChildNamed(root, '#text') if child: @@ -61,6 +63,8 @@ def findTextChildValue(root): return None # Find first child with given name + + def findChildNamed(root, name): for child in root.childNodes: if child.nodeName == name: @@ -68,6 +72,8 @@ def findChildNamed(root, name): return None # Write a string to a tempfile, returning name of tempfile + + def write_to_tempfile(str): str_fd, str_file = tempfile.mkstemp() if str: @@ -76,6 +82,8 @@ def write_to_tempfile(str): return str_file # Run a subprocess and return output + + def run_subprocess(cmd, stdout, stderr): try: proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr) @@ -86,7 +94,9 @@ def run_subprocess(cmd, stdout, stderr): output = proc.returncode return output except Exception as e: - raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e)) + raise Exception( + "Failed call to subprocess '{}': {}".format(" ".join(cmd), e)) + def get_cert_keyid(gid): """Extract the subject key identifier from the given certificate. @@ -102,6 +112,8 @@ def get_cert_keyid(gid): return keyid # Pull the cert out of a list of certs in a PEM formatted cert string + + def grab_toplevel_cert(cert): start_label = '-----BEGIN CERTIFICATE-----' if cert.find(start_label) > -1: @@ -118,9 +130,9 @@ def grab_toplevel_cert(cert): # Validate that the given speaks-for credential represents the # statement User.speaks_for(User)<-Tool for the given user and tool certs # and was signed by the user -# Return: -# Boolean indicating whether the given credential -# is not expired +# Return: +# Boolean indicating whether the given credential +# is not expired # is an ABAC credential # was signed by the user associated with the speaking_for_urn # is verified by xmlsec1 @@ -130,6 +142,8 @@ def grab_toplevel_cert(cert): # String user certificate of speaking_for user if the above tests succeed # (None otherwise) # Error message indicating why the speaks_for call failed ("" otherwise) + + def verify_speaks_for(cred, tool_gid, speaking_for_urn, trusted_roots, schema=None, logger=None): @@ -154,7 +168,7 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, .format(user_urn, speaking_for_urn, cred.pretty_cred()) tails = cred.get_tails() - if len(tails) != 1: + if len(tails) != 1: return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\ .format(len(tails), cred.pretty_cred()) @@ -172,11 +186,12 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, if trusted_roots: for x in trusted_roots: cert_args += ['--trusted-pem', x.filename] - # FIXME: Why do we not need to specify the --node-id option as credential.py does? + # FIXME: Why do we not need to specify the --node-id option as + # credential.py does? xmlsec1 = cred.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") - xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file] + xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file] output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE) os.unlink(cred_file) if output != 0: @@ -208,7 +223,8 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, xmlschema = etree.XMLSchema(schema_doc) if not xmlschema.validate(tree): error = xmlschema.error_log.last_error - message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line) + message = "{}: {} (line {})".format( + cred.pretty_cred(), error.message, error.line) return False, None, ("XML Credential schema invalid: {}".format(message)) if trusted_roots: @@ -233,20 +249,23 @@ def verify_speaks_for(cred, tool_gid, speaking_for_urn, # # credentials is a list of GENI-style credentials: # Either a cred string xml string, or Credential object of a tuple -# [{'geni_type' : geni_type, 'geni_value : cred_value, +# [{'geni_type' : geni_type, 'geni_value : cred_value, # 'geni_version' : version}] # caller_gid is the raw X509 cert gid # options is the dictionary of API-provided options # trusted_roots is a list of Certificate objects from the system # trusted_root directory # Optionally, provide an XML schema against which to validate the credential + + def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None): if speaking_for_xrn: - speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn() + speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn() for cred in credentials: # Skip things that aren't ABAC credentials if type(cred) == dict: - if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue + if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: + continue cred_value = cred['geni_value'] elif isinstance(cred, Credential): if not isinstance(cred, ABACCredential): @@ -254,7 +273,8 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus else: cred_value = cred else: - if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue + if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: + continue cred_value = cred # If the cred_value is xml, create the object @@ -271,13 +291,16 @@ def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trus trusted_roots, schema, logger=logger) logger.info(msg) if is_valid_speaks_for: - return user_gid # speaks-for + return user_gid # speaks-for else: logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}" .format(msg)) - return caller_gid # Not speaks-for + return caller_gid # Not speaks-for + +# Create an ABAC Speaks For credential using the ABACCredential object and +# it's encode&sign methods + -# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365): logger.info("Creating ABAC SpeaksFor using ABACCredential...\n") # Write out the user cert @@ -298,9 +321,11 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam user_urn = user_gid.get_urn() user_keyid = get_cert_keyid(user_gid) tool_keyid = get_cert_keyid(tool_gid) - cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid)) + cred.head = ABACElement(user_keyid, user_urn, + "speaks_for_{}".format(user_keyid)) cred.tails.append(ABACElement(tool_keyid, tool_urn)) - cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days)) + cred.set_expiration(datetime.datetime.utcnow() + + datetime.timedelta(days=dur_days)) cred.expiration = cred.expiration.replace(microsecond=0) # Produce the cred XML @@ -314,7 +339,9 @@ def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filenam .format(cred.pretty_cred(), cred_filename)) # FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted -def create_speaks_for(tool_gid, user_gid, ma_gid, + + +def create_speaks_for(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365): tool_urn = tool_gid.get_urn() user_urn = user_gid.get_urn() @@ -363,14 +390,14 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, unsigned_cred_filename = write_to_tempfile(unsigned_cred) # Now sign the file with xmlsec1 - # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem + # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem # --output signed.xml tosign.xml pems = "{},{},{}".format(user_key_file, user_gid.get_filename(), ma_gid.get_filename()) xmlsec1 = Credential.get_xmlsec1_path() if not xmlsec1: raise Exception("Could not locate required 'xmlsec1' program") - cmd = [ xmlsec1, '--sign', '--privkey-pem', pems, + cmd = [xmlsec1, '--sign', '--privkey-pem', pems, '--output', cred_filename, unsigned_cred_filename] # print(" ".join(cmd)) @@ -387,19 +414,19 @@ def create_speaks_for(tool_gid, user_gid, ma_gid, if __name__ == "__main__": parser = optparse.OptionParser() - parser.add_option('--cred_file', + parser.add_option('--cred_file', help='Name of credential file') - parser.add_option('--tool_cert_file', + parser.add_option('--tool_cert_file', help='Name of file containing tool certificate') - parser.add_option('--user_urn', + parser.add_option('--user_urn', help='URN of speaks-for user') - parser.add_option('--user_cert_file', + parser.add_option('--user_cert_file', help="filename of x509 certificate of signing user") - parser.add_option('--ma_cert_file', + parser.add_option('--ma_cert_file', help="filename of x509 cert of MA that signed user cert") - parser.add_option('--user_key_file', + parser.add_option('--user_key_file', help="filename of private key of signing user") - parser.add_option('--trusted_roots_directory', + parser.add_option('--trusted_roots_directory', help='Directory of trusted root certs') parser.add_option('--create', help="name of file of ABAC speaksfor cred to create") @@ -412,7 +439,7 @@ if __name__ == "__main__": if options.create: if options.user_cert_file and options.user_key_file \ - and options.ma_cert_file: + and options.ma_cert_file: user_gid = GID(filename=options.user_cert_file) ma_gid = GID(filename=options.ma_cert_file) if options.useObject: @@ -424,8 +451,8 @@ if __name__ == "__main__": options.user_key_file, options.create) else: - print("Usage: --create cred_file " + - "--user_cert_file user_cert_file" + + print("Usage: --create cred_file " + + "--user_cert_file user_cert_file" + " --user_key_file user_key_file --ma_cert_file ma_cert_file") sys.exit() @@ -437,18 +464,17 @@ if __name__ == "__main__": trusted_roots_directory = options.trusted_roots_directory trusted_roots = \ - [Certificate(filename=os.path.join(trusted_roots_directory, file)) - for file in os.listdir(trusted_roots_directory) - if file.endswith('.pem') and file != 'CATedCACerts.pem'] + [Certificate(filename=os.path.join(trusted_roots_directory, file)) + for file in os.listdir(trusted_roots_directory) + if file.endswith('.pem') and file != 'CATedCACerts.pem'] cred = open(options.cred_file).read() - creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred, - 'geni_version' : '1'}] + creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred, + 'geni_version': '1'}] gid = determine_speaks_for(None, creds, tool_gid, - {'geni_speaking_for' : user_urn}, + {'geni_speaking_for': user_urn}, trusted_roots) - print('SPEAKS_FOR = {}'.format(gid != tool_gid)) print("CERT URN = {}".format(gid.get_urn()))