from sfa.util.sfalogging import logger
from sfa.util.py23 import StringIO
-# Routine to validate that a speaks-for credential
+# Routine to validate that a speaks-for credential
# says what it claims to say:
# It is a signed credential wherein the signer S is attesting to the
# ABAC statement:
# Simple XML helper functions
# Find the text associated with first child text node
+
+
def findTextChildValue(root):
child = findChildNamed(root, '#text')
if child:
return None
# Find first child with given name
+
+
def findChildNamed(root, name):
for child in root.childNodes:
if child.nodeName == name:
return None
# Write a string to a tempfile, returning name of tempfile
+
+
def write_to_tempfile(str):
str_fd, str_file = tempfile.mkstemp()
if str:
return str_file
# Run a subprocess and return output
+
+
def run_subprocess(cmd, stdout, stderr):
try:
proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
output = proc.returncode
return output
except Exception as e:
- raise Exception("Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
+ raise Exception(
+ "Failed call to subprocess '{}': {}".format(" ".join(cmd), e))
+
def get_cert_keyid(gid):
"""Extract the subject key identifier from the given certificate.
return keyid
# Pull the cert out of a list of certs in a PEM formatted cert string
+
+
def grab_toplevel_cert(cert):
start_label = '-----BEGIN CERTIFICATE-----'
if cert.find(start_label) > -1:
# Validate that the given speaks-for credential represents the
# statement User.speaks_for(User)<-Tool for the given user and tool certs
# and was signed by the user
-# Return:
-# Boolean indicating whether the given credential
-# is not expired
+# Return:
+# Boolean indicating whether the given credential
+# is not expired
# is an ABAC credential
# was signed by the user associated with the speaking_for_urn
# is verified by xmlsec1
# String user certificate of speaking_for user if the above tests succeed
# (None otherwise)
# Error message indicating why the speaks_for call failed ("" otherwise)
+
+
def verify_speaks_for(cred, tool_gid, speaking_for_urn,
trusted_roots, schema=None, logger=None):
.format(user_urn, speaking_for_urn, cred.pretty_cred())
tails = cred.get_tails()
- if len(tails) != 1:
+ if len(tails) != 1:
return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got {} ({})"\
.format(len(tails), cred.pretty_cred())
if trusted_roots:
for x in trusted_roots:
cert_args += ['--trusted-pem', x.filename]
- # FIXME: Why do we not need to specify the --node-id option as credential.py does?
+ # FIXME: Why do we not need to specify the --node-id option as
+ # credential.py does?
xmlsec1 = cred.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
- xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file]
+ xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [cred_file]
output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
os.unlink(cred_file)
if output != 0:
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "{}: {} (line {})".format(cred.pretty_cred(), error.message, error.line)
+ message = "{}: {} (line {})".format(
+ cred.pretty_cred(), error.message, error.line)
return False, None, ("XML Credential schema invalid: {}".format(message))
if trusted_roots:
#
# credentials is a list of GENI-style credentials:
# Either a cred string xml string, or Credential object of a tuple
-# [{'geni_type' : geni_type, 'geni_value : cred_value,
+# [{'geni_type' : geni_type, 'geni_value : cred_value,
# 'geni_version' : version}]
# caller_gid is the raw X509 cert gid
# options is the dictionary of API-provided options
# trusted_roots is a list of Certificate objects from the system
# trusted_root directory
# Optionally, provide an XML schema against which to validate the credential
+
+
def determine_speaks_for(logger, credentials, caller_gid, speaking_for_xrn, trusted_roots, schema=None):
if speaking_for_xrn:
- speaking_for_urn = Xrn (speaking_for_xrn.strip()).get_urn()
+ speaking_for_urn = Xrn(speaking_for_xrn.strip()).get_urn()
for cred in credentials:
# Skip things that aren't ABAC credentials
if type(cred) == dict:
- if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+ if cred['geni_type'] != ABACCredential.ABAC_CREDENTIAL_TYPE:
+ continue
cred_value = cred['geni_value']
elif isinstance(cred, Credential):
if not isinstance(cred, ABACCredential):
else:
cred_value = cred
else:
- if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE: continue
+ if CredentialFactory.getType(cred) != ABACCredential.ABAC_CREDENTIAL_TYPE:
+ continue
cred_value = cred
# If the cred_value is xml, create the object
trusted_roots, schema, logger=logger)
logger.info(msg)
if is_valid_speaks_for:
- return user_gid # speaks-for
+ return user_gid # speaks-for
else:
logger.info("Got speaks-for option but not a valid speaks_for with this credential: {}"
.format(msg))
- return caller_gid # Not speaks-for
+ return caller_gid # Not speaks-for
+
+# Create an ABAC Speaks For credential using the ABACCredential object and
+# it's encode&sign methods
+
-# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
# Write out the user cert
user_urn = user_gid.get_urn()
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- cred.head = ABACElement(user_keyid, user_urn, "speaks_for_{}".format(user_keyid))
+ cred.head = ABACElement(user_keyid, user_urn,
+ "speaks_for_{}".format(user_keyid))
cred.tails.append(ABACElement(tool_keyid, tool_urn))
- cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(days=dur_days))
+ cred.set_expiration(datetime.datetime.utcnow() +
+ datetime.timedelta(days=dur_days))
cred.expiration = cred.expiration.replace(microsecond=0)
# Produce the cred XML
.format(cred.pretty_cred(), cred_filename))
# FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
-def create_speaks_for(tool_gid, user_gid, ma_gid,
+
+
+def create_speaks_for(tool_gid, user_gid, ma_gid,
user_key_file, cred_filename, dur_days=365):
tool_urn = tool_gid.get_urn()
user_urn = user_gid.get_urn()
unsigned_cred_filename = write_to_tempfile(unsigned_cred)
# Now sign the file with xmlsec1
- # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
+ # xmlsec1 --sign --privkey-pem privkey.pem,cert.pem
# --output signed.xml tosign.xml
pems = "{},{},{}".format(user_key_file, user_gid.get_filename(),
ma_gid.get_filename())
xmlsec1 = Credential.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
- cmd = [ xmlsec1, '--sign', '--privkey-pem', pems,
+ cmd = [xmlsec1, '--sign', '--privkey-pem', pems,
'--output', cred_filename, unsigned_cred_filename]
# print(" ".join(cmd))
if __name__ == "__main__":
parser = optparse.OptionParser()
- parser.add_option('--cred_file',
+ parser.add_option('--cred_file',
help='Name of credential file')
- parser.add_option('--tool_cert_file',
+ parser.add_option('--tool_cert_file',
help='Name of file containing tool certificate')
- parser.add_option('--user_urn',
+ parser.add_option('--user_urn',
help='URN of speaks-for user')
- parser.add_option('--user_cert_file',
+ parser.add_option('--user_cert_file',
help="filename of x509 certificate of signing user")
- parser.add_option('--ma_cert_file',
+ parser.add_option('--ma_cert_file',
help="filename of x509 cert of MA that signed user cert")
- parser.add_option('--user_key_file',
+ parser.add_option('--user_key_file',
help="filename of private key of signing user")
- parser.add_option('--trusted_roots_directory',
+ parser.add_option('--trusted_roots_directory',
help='Directory of trusted root certs')
parser.add_option('--create',
help="name of file of ABAC speaksfor cred to create")
if options.create:
if options.user_cert_file and options.user_key_file \
- and options.ma_cert_file:
+ and options.ma_cert_file:
user_gid = GID(filename=options.user_cert_file)
ma_gid = GID(filename=options.ma_cert_file)
if options.useObject:
options.user_key_file,
options.create)
else:
- print("Usage: --create cred_file " +
- "--user_cert_file user_cert_file" +
+ print("Usage: --create cred_file " +
+ "--user_cert_file user_cert_file" +
" --user_key_file user_key_file --ma_cert_file ma_cert_file")
sys.exit()
trusted_roots_directory = options.trusted_roots_directory
trusted_roots = \
- [Certificate(filename=os.path.join(trusted_roots_directory, file))
- for file in os.listdir(trusted_roots_directory)
- if file.endswith('.pem') and file != 'CATedCACerts.pem']
+ [Certificate(filename=os.path.join(trusted_roots_directory, file))
+ for file in os.listdir(trusted_roots_directory)
+ if file.endswith('.pem') and file != 'CATedCACerts.pem']
cred = open(options.cred_file).read()
- creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred,
- 'geni_version' : '1'}]
+ creds = [{'geni_type': ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value': cred,
+ 'geni_version': '1'}]
gid = determine_speaks_for(None, creds, tool_gid,
- {'geni_speaking_for' : user_urn},
+ {'geni_speaking_for': user_urn},
trusted_roots)
-
print('SPEAKS_FOR = {}'.format(gid != tool_gid))
print("CERT URN = {}".format(gid.get_urn()))