# IN THE WORK.
#----------------------------------------------------------------------
+from __future__ import print_function
+
import datetime
from dateutil import parser as du_parser, tz as du_tz
import optparse
from xml.dom.minidom import *
from StringIO import StringIO
+from sfa.util.sfatime import SFATIME_FORMAT
+
from sfa.trust.certificate import Certificate
from sfa.trust.credential import Credential, signature_template, HAVELXML
from sfa.trust.abac_credential import ABACCredential, ABACElement
from sfa.trust.credential_factory import CredentialFactory
from sfa.trust.gid import GID
+from sfa.util.sfalogging import logger
# Routine to validate that a speaks-for credential
# says what it claims to say:
# Credential has not expired
if cred.expiration and cred.expiration < datetime.datetime.utcnow():
- return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.isoformat(), cred.get_summary_tostring())
+ return False, None, "ABAC Credential expired at %s (%s)" % (cred.expiration.strftime(SFATIME_FORMAT), cred.pretty_cred())
# Must be ABAC
if cred.get_cred_type() != ABACCredential.ABAC_CREDENTIAL_TYPE:
return False, None, "Credential not of type ABAC but %s" % cred.get_cred_type
if cred.signature is None or cred.signature.gid is None:
- return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.get_summary_tostring()
+ return False, None, "Credential malformed: missing signature or signer cert. Cred: %s" % cred.pretty_cred()
user_gid = cred.signature.gid
user_urn = user_gid.get_urn()
# URN of signer from cert must match URN of 'speaking-for' argument
if user_urn != speaking_for_urn:
return False, None, "User URN from cred doesn't match speaking_for URN: %s != %s (cred %s)" % \
- (user_urn, speaking_for_urn, cred.get_summary_tostring())
+ (user_urn, speaking_for_urn, cred.pretty_cred())
tails = cred.get_tails()
if len(tails) != 1:
return False, None, "Invalid ABAC-SF credential: Need exactly 1 tail element, got %d (%s)" % \
- (len(tails), cred.get_summary_tostring())
+ (len(tails), cred.pretty_cred())
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
for x in trusted_roots:
cert_args += ['--trusted-pem', x.filename]
# FIXME: Why do we not need to specify the --node-id option as credential.py does?
- xmlsec1_args = [cred.xmlsec_path, '--verify'] + cert_args + [ cred_file]
+ xmlsec1 = cred.get_xmlsec1_path()
+ if not xmlsec1:
+ raise Exception("Could not locate required 'xmlsec1' program")
+ xmlsec1_args = [xmlsec1, '--verify'] + cert_args + [ cred_file]
output = run_subprocess(xmlsec1_args, stdout=None, stderr=subprocess.PIPE)
os.unlink(cred_file)
if output != 0:
if user_keyid != principal_keyid or \
tool_keyid != subject_keyid or \
role != ('speaks_for_%s' % user_keyid):
- return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.get_summary_tostring()
+ return False, None, "ABAC statement doesn't assert U.speaks_for(U)<-T (%s)" % cred.pretty_cred()
# If schema provided, validate against schema
if HAVELXML and schema and os.path.exists(schema):
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "%s: %s (line %s)" % (cred.get_summary_tostring(), error.message, error.line)
+ message = "%s: %s (line %s)" % (cred.pretty_cred(), error.message, error.line)
return False, None, ("XML Credential schema invalid: %s" % message)
if trusted_roots:
if not isinstance(cred_value, ABACCredential):
cred = CredentialFactory.createCred(cred_value)
-# print "Got a cred to check speaksfor for: %s" % cred.get_summary_tostring()
+# print("Got a cred to check speaksfor for: %s" % cred.pretty_cred())
# #cred.dump(True, True)
-# print "Caller: %s" % caller_gid.dump_string(2, True)
+# print("Caller: %s" % caller_gid.dump_string(2, True))
# See if this is a valid speaks_for
is_valid_speaks_for, user_gid, msg = \
verify_speaks_for(cred,
- caller_gid, speaking_for_urn, \
- trusted_roots, schema, logger=logger)
+ caller_gid, speaking_for_urn,
+ trusted_roots, schema, logger=logger)
logger.info(msg)
if is_valid_speaks_for:
return user_gid # speaks-for
else:
- if logger:
- logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
- else:
- print "Got a speaks-for option but not a valid speaks_for with this credential: " + msg
+ logger.info("Got speaks-for option but not a valid speaks_for with this credential: %s" % msg)
return caller_gid # Not speaks-for
# Create an ABAC Speaks For credential using the ABACCredential object and it's encode&sign methods
def create_sign_abaccred(tool_gid, user_gid, ma_gid, user_key_file, cred_filename, dur_days=365):
- print "Creating ABAC SpeaksFor using ABACCredential...\n"
+ logger.info("Creating ABAC SpeaksFor using ABACCredential...\n")
# Write out the user cert
from tempfile import mkstemp
ma_str = ma_gid.save_to_string()
cred.sign()
# Save it
cred.save_to_file(cred_filename)
- print "Created ABAC credential: '%s' in file %s" % \
- (cred.get_summary_tostring(), cred_filename)
+ logger.info("Created ABAC credential: '%s' in file %s" %
+ (cred.pretty_cred(), cred_filename))
-# FIXME: Assumes xmlsec1 is on path
# FIXME: Assumes signer is itself signed by an 'ma_gid' that can be trusted
-def create_speaks_for(tool_gid, user_gid, ma_gid, \
- user_key_file, cred_filename, dur_days=365):
+def create_speaks_for(tool_gid, user_gid, ma_gid,
+ user_key_file, cred_filename, dur_days=365):
tool_urn = tool_gid.get_urn()
user_urn = user_gid.get_urn()
credential_duration = datetime.timedelta(days=dur_days)
expiration = datetime.datetime.utcnow() + credential_duration
- expiration_str = expiration.strftime('%Y-%m-%dT%H:%M:%SZ') # FIXME: libabac can't handle .isoformat()
+ expiration_str = expiration.strftime(SFATIME_FORMAT)
version = "1.1"
user_keyid = get_cert_keyid(user_gid)
tool_keyid = get_cert_keyid(tool_gid)
- unsigned_cred = template % (reference, expiration_str, version, \
- user_keyid, user_urn, user_keyid, tool_keyid, tool_urn, \
- reference, reference)
+ unsigned_cred = template % (reference, expiration_str, version,
+ user_keyid, user_urn, user_keyid, tool_keyid, tool_urn,
+ reference, reference)
unsigned_cred_filename = write_to_tempfile(unsigned_cred)
# Now sign the file with xmlsec1
# --output signed.xml tosign.xml
pems = "%s,%s,%s" % (user_key_file, user_gid.get_filename(),
ma_gid.get_filename())
- # FIXME: assumes xmlsec1 is on path
- cmd = ['xmlsec1', '--sign', '--privkey-pem', pems,
+ xmlsec1 = Credential.get_xmlsec1_path()
+ if not xmlsec1:
+ raise Exception("Could not locate required 'xmlsec1' program")
+ cmd = [ xmlsec1, '--sign', '--privkey-pem', pems,
'--output', cred_filename, unsigned_cred_filename]
-# print " ".join(cmd)
+# print(" ".join(cmd))
sign_proc_output = run_subprocess(cmd, stdout=subprocess.PIPE, stderr=None)
if sign_proc_output == None:
- print "OUTPUT = %s" % sign_proc_output
+ logger.info("xmlsec1 returns empty output")
else:
- print "Created ABAC credential: '%s speaks_for %s' in file %s" % \
- (tool_urn, user_urn, cred_filename)
+ logger.info("Created ABAC credential: '%s speaks_for %s' in file %s" %
+ (tool_urn, user_urn, cred_filename))
os.unlink(unsigned_cred_filename)
user_gid = GID(filename=options.user_cert_file)
ma_gid = GID(filename=options.ma_cert_file)
if options.useObject:
- create_sign_abaccred(tool_gid, user_gid, ma_gid, \
- options.user_key_file, \
- options.create)
+ create_sign_abaccred(tool_gid, user_gid, ma_gid,
+ options.user_key_file,
+ options.create)
else:
- create_speaks_for(tool_gid, user_gid, ma_gid, \
- options.user_key_file, \
- options.create)
+ create_speaks_for(tool_gid, user_gid, ma_gid,
+ options.user_key_file,
+ options.create)
else:
- print "Usage: --create cred_file " + \
- "--user_cert_file user_cert_file" + \
- " --user_key_file user_key_file --ma_cert_file ma_cert_file"
+ print("Usage: --create cred_file " +
+ "--user_cert_file user_cert_file" +
+ " --user_key_file user_key_file --ma_cert_file ma_cert_file")
sys.exit()
user_urn = options.user_urn
trusted_roots_directory = options.trusted_roots_directory
trusted_roots = \
- [Certificate(filename=os.path.join(trusted_roots_directory, file)) \
- for file in os.listdir(trusted_roots_directory) \
+ [Certificate(filename=os.path.join(trusted_roots_directory, file))
+ for file in os.listdir(trusted_roots_directory)
if file.endswith('.pem') and file != 'CATedCACerts.pem']
cred = open(options.cred_file).read()
creds = [{'geni_type' : ABACCredential.ABAC_CREDENTIAL_TYPE, 'geni_value' : cred,
'geni_version' : '1'}]
- gid = determine_speaks_for(None, creds, tool_gid, \
- {'geni_speaking_for' : user_urn}, \
- trusted_roots)
+ gid = determine_speaks_for(None, creds, tool_gid,
+ {'geni_speaking_for' : user_urn},
+ trusted_roots)
- print 'SPEAKS_FOR = %s' % (gid != tool_gid)
- print "CERT URN = %s" % gid.get_urn()
+ print('SPEAKS_FOR = %s' % (gid != tool_gid))
+ print("CERT URN = %s" % gid.get_urn())