-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
##
# Implements SFA Credentials
#
-# Credentials are signed XML files that assign a subject gid privileges to an object gid
+# Credentials are signed XML files that assign a subject gid
+# privileges to an object gid
##
from __future__ import print_function
from sfa.util.py23 import PY3, StringType, StringIO
-HAVELXML = False
-try:
- from lxml import etree
- HAVELXML = True
-except:
- pass
-
from xml.parsers.expat import ExpatError
-from sfa.util.faults import CredentialNotVerifiable, ChildRightsNotSubsetOfParent
+from sfa.util.faults import (CredentialNotVerifiable,
+ ChildRightsNotSubsetOfParent)
from sfa.util.sfalogging import logger
from sfa.util.sfatime import utcparse, SFATIME_FORMAT
from sfa.trust.rights import Right, Rights, determine_rights
from sfa.trust.gid import GID
from sfa.util.xrn import urn_to_hrn, hrn_authfor_hrn
+HAVELXML = False
+try:
+ from lxml import etree
+ HAVELXML = True
+except:
+ pass
+
+
# 31 days, in seconds
DEFAULT_CREDENTIAL_LIFETIME = 86400 * 31
if len(cert.childNodes) > 0:
szgid = cert.childNodes[0].nodeValue
szgid = szgid.strip()
- szgid = "-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----".format(
+ szgid = "-----BEGIN CERTIFICATE-----\n"\
+ "{}\n-----END CERTIFICATE-----".format(
szgid)
if gids is None:
gids = szgid
# A credential provides a caller gid with privileges to an object gid.
# A signed credential is signed by the object's authority.
#
-# Credentials are encoded in one of two ways.
-# The legacy style (now unsupported) places it in the subjectAltName of an X509 certificate.
+# Credentials are encoded in one of two ways. The legacy style (now
+# unsupported) places it in the subjectAltName of an X509 certificate.
# The new credentials are placed in signed XML.
#
# WARNING:
# Create a Credential object
#
# @param create If true, create a blank x509 certificate
- # @param subject If subject!=None, create an x509 cert with the subject name
+ # @param subject If subject!=None,
+ # create an x509 cert with the subject name
# @param string If string!=None, load the credential from the string
# @param filename If filename!=None, load the credential from the file
# FIXME: create and subject are ignored!
- def __init__(self, create=False, subject=None, string=None, filename=None, cred=None):
+ def __init__(self, create=False, subject=None, string=None,
+ filename=None, cred=None):
self.gidCaller = None
self.gidObject = None
self.expiration = None
# if this is a legacy credential, write error and bail out
if isinstance(str, StringType) and str.strip().startswith("-----"):
logger.error(
- "Legacy credentials not supported any more - giving up with {}...".format(str[:10]))
+ "Legacy credentials not supported any more "
+ "- giving up with {}..."
+ .format(str[:10]))
return
else:
self.xml = str
break
if not Credential.xmlsec1_path:
logger.error(
- "Could not locate required binary 'xmlsec1' - SFA will be unable to sign stuff !!")
+ "Could not locate required binary 'xmlsec1' -"
+ "SFA will be unable to sign stuff !!")
return Credential.xmlsec1_path
def get_subject(self):
caller = self.gidCaller.pretty_cert()
exp = self.get_expiration()
# Summarize the rights too? The issuer?
- return "[Cred. for {caller} rights on {obj} until {exp} ]".format(**locals())
+ return "[Cred. for {caller} rights on {obj} until {exp} ]"\
+ .format(**locals())
def get_signature(self):
if not self.signature:
self.decode()
return self.gidObject
- ##
- # Expiration: an absolute UTC time of expiration (as either an int or string or datetime)
+ #
+ # Expiration: an absolute UTC time of expiration (as either an int
+ # or string or datetime)
#
def set_expiration(self, expiration):
expiration_datetime = utcparse(expiration)
self.expiration = expiration_datetime
else:
logger.error(
- "unexpected input {} in Credential.set_expiration".format(expiration))
+ "unexpected input {} in Credential.set_expiration"
+ .format(expiration))
##
# get the lifetime of the credential (always in datetime format)
##
# set the privileges
#
- # @param privs either a comma-separated list of privileges of a Rights object
+ # @param privs either a comma-separated list of privileges of a
+ # Rights object
def set_privileges(self, privs):
if isinstance(privs, str):
# determine whether the credential allows a particular operation to be
# performed
#
- # @param op_name string specifying name of operation ("lookup", "update", etc)
+ # @param op_name string specifying name of operation
+ # ("lookup", "update", etc)
def can_perform(self, op_name):
rights = self.get_privileges()
"xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
# FIXME: See v2 schema at
# www.geni.net/resources/credential/2/credential.xsd
- signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
- "http://www.planet-lab.org/resources/sfa/credential.xsd")
signed_cred.setAttribute(
- "xsi:schemaLocation", "http://www.planet-lab.org/resources/sfa/ext/policy/1 http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
+ "xsi:noNamespaceSchemaLocation",
+ "http://www.planet-lab.org/resources/sfa/credential.xsd")
+ signed_cred.setAttribute(
+ "xsi:schemaLocation",
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1 "
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
# PG says for those last 2:
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
doc.appendChild(signed_cred)
self.set_expiration(datetime.datetime.utcnow(
) + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
self.expiration = self.expiration.replace(microsecond=0)
- if self.expiration.tzinfo is not None and self.expiration.tzinfo.utcoffset(self.expiration) is not None:
+ if self.expiration.tzinfo is not None \
+ and self.expiration.tzinfo.utcoffset(self.expiration) is not None:
# TZ aware. Make sure it is UTC - by Aaron Helsinger
self.expiration = self.expiration.astimezone(tz.tzutc())
append_sub(doc, cred, "expires",
# If the root node is a signed-credential (it should be), then
# get all its attributes and attach those to our signed_cred
# node.
- # Specifically, PG and PLadd attributes for namespaces (which is reasonable),
+ # Specifically, PG and PL add attributes for namespaces
+ # (which is reasonable),
# and we need to include those again here or else their signature
# no longer matches on the credential.
# We expect three of these, but here we copy them all:
- # signed_cred.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
+ # signed_cred.setAttribute("xmlns:xsi",
+ # "http://www.w3.org/2001/XMLSchema-instance")
# and from PG (PL is equivalent, as shown above):
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
# HOWEVER!
# PL now also declares these, with different URLs, so
# the contents of the schemas are the same,
# or something else, but it seems odd. And this works.
parentRoot = sdoc.documentElement
- if parentRoot.tagName == "signed-credential" and parentRoot.hasAttributes():
+ if parentRoot.tagName == "signed-credential" and \
+ parentRoot.hasAttributes():
for attrIx in range(0, parentRoot.attributes.length):
attr = parentRoot.attributes.item(attrIx)
# returns the old attribute of same name that was
if oldAttr and oldAttr.value != attr.value:
msg = "Delegating cred from owner {} to {} over {}:\n"
"- Replaced attribute {} value '{}' with '{}'"\
- .format(self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(),
- self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value)
- logger.warn(msg)
- #raise CredentialNotVerifiable("Can't encode new valid delegated credential: {}".format(msg))
+ .format(self.parent.gidCaller.get_urn(),
+ self.gidCaller.get_urn(),
+ self.gidObject.get_urn(),
+ oldAttr.name, oldAttr.value, attr.value)
+ logger.warning(msg)
+ # raise CredentialNotVerifiable(
+ # "Can't encode new valid delegated credential: {}"
+ # .format(msg))
p_cred = doc.importNode(
sdoc.getElementsByTagName("credential")[0], True)
def sign(self):
if not self.issuer_privkey:
- logger.warn("Cannot sign credential (no private key)")
+ logger.warning("Cannot sign credential (no private key)")
return
if not self.issuer_gid:
- logger.warn("Cannot sign credential (no issuer gid)")
+ logger.warning("Cannot sign credential (no issuer gid)")
return
doc = parseString(self.get_xml())
sigs = doc.getElementsByTagName("signatures")[0]
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \
- .format(xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename)
+ .format(xmlsec1, ref, self.issuer_privkey,
+ ",".join(gid_files), filename)
signed = os.popen(command).read()
os.remove(filename)
kind = getTextNode(priv, "name")
deleg = str2bool(getTextNode(priv, "can_delegate"))
if kind == '*':
- # Convert * into the default privileges for the credential's type
+ # Convert * into the default privileges
+ # for the credential's type
# Each inherits the delegatability from the * above
_, type = urn_to_hrn(self.gidObject.get_urn())
rl = determine_rights(type, self.gidObject.get_urn())
##
# Verify
# trusted_certs: A list of trusted GID filenames (not GID objects!)
- # Chaining is not supported within the GIDs by xmlsec1.
+ # Chaining is not supported within the GIDs by xmlsec1.
#
# trusted_certs_required: Should usually be true. Set False means an
- # empty list of trusted_certs would still let this method pass.
- # It just skips xmlsec1 verification et al. Only used by some utils
+ # empty list of trusted_certs would still let this method pass.
+ # It just skips xmlsec1 verification et al.
+ # Only used by some utils
#
# Verify that:
# . All of the signatures are valid and that the issuers trace back
#
# -- For Delegates (credentials with parents)
# . The privileges must be a subset of the parent credentials
- # . The privileges must have "can_delegate" set for each delegated privilege
+ # . The privileges must have "can_delegate"
+ # set for each delegated privilege
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
# . The signer of the child must be the owner of the parent
# must be done elsewhere
#
# @param trusted_certs: The certificates of trusted CA certificates
- def verify(self, trusted_certs=None, schema=None, trusted_certs_required=True):
+ def verify(self, trusted_certs=None, schema=None,
+ trusted_certs_required=True):
if not self.xml:
self.decode()
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "{}: {} (line {})".format(self.pretty_cred(),
- error.message, error.line)
+ message = "{}: {} (line {})"\
+ .format(self.pretty_cred(),
+ error.message, error.line)
raise CredentialNotVerifiable(message)
if trusted_certs_required and trusted_certs is None:
# trusted_cert_objects = [GID(filename=f) for f in trusted_certs]
trusted_cert_objects = []
ok_trusted_certs = []
- # If caller explicitly passed in None that means skip cert chain validation.
- # Strange and not typical
+ # If caller explicitly passed in None, that means
+ # skip cert chain validation. Strange and not typical
if trusted_certs is not None:
for f in trusted_certs:
try:
trusted_cert_objects.append(GID(filename=f))
ok_trusted_certs.append(f)
except Exception as exc:
- logger.error(
- "Failed to load trusted cert from {}: {}".format(f, exc))
+ logger.exception(
+ "Failed to load trusted cert from {}".format(f))
trusted_certs = ok_trusted_certs
# make sure it is not expired
if self.get_expiration() < datetime.datetime.utcnow():
- raise CredentialNotVerifiable("Credential {} expired at {}"
- .format(self.pretty_cred(),
- self.expiration.strftime(SFATIME_FORMAT)))
+ raise CredentialNotVerifiable(
+ "Credential {} expired at {}"
+ .format(self.pretty_cred(),
+ self.expiration.strftime(SFATIME_FORMAT)))
# Verify the signatures
filename = self.save_to_random_tmp_file()
- # If caller explicitly passed in None that means skip cert chain validation.
- # - Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip cert chain validation. Strange and not typical
if trusted_certs is not None:
# Verify the caller and object gids of this cred and of its parents
for cur_cred in self.get_credential_list():
- # check both the caller and the subject
- for gid in cur_cred.get_gid_object(), cur_cred.get_gid_caller():
+ # check both the caller and the subject
+ for gid in (cur_cred.get_gid_object(),
+ cur_cred.get_gid_caller()):
logger.debug("Credential.verify: verifying chain {}"
.format(gid.pretty_cert()))
logger.debug("Credential.verify: against trusted {}"
.format(" ".join(trusted_certs)))
gid.verify_chain(trusted_cert_objects)
-
+
refs = []
refs.append("Sig_{}".format(self.get_refid()))
refs.append("Sig_{}".format(ref))
for ref in refs:
- # If caller explicitly passed in None that means skip xmlsec1 validation.
- # Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip xmlsec1 validation. Strange and not typical
if trusted_certs is None:
break
# Thierry - jan 2015
- # up to fedora20 we used os.popen and checked that the output begins with OK
- # turns out, with fedora21, there is extra input before this 'OK' thing
- # looks like we're better off just using the exit code - that's what it is made for
- #cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
+ # up to fedora20 we used os.popen and checked
+ # that the output begins with OK; turns out, with fedora21,
+ # there is extra input before this 'OK' thing
+ # looks like we're better off just using the exit code
+ # that's what it is made for
+ # cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
# command = '{} --verify --node-id "{}" {} {} 2>&1'.\
# format(self.xmlsec_path, ref, cert_args, filename)
xmlsec1 = self.get_xmlsec1_path()
mend = verified.find('\\', mstart)
msg = verified[mstart:mend]
logger.warning(
- "Credential.verify - failed - xmlsec1 returned {}".format(verified.strip()))
- raise CredentialNotVerifiable("xmlsec1 error verifying cred {} using Signature ID {}: {}"
- .format(self.pretty_cred(), ref, msg))
+ "Credential.verify - failed - xmlsec1 returned {}"
+ .format(verified.strip()))
+ raise CredentialNotVerifiable(
+ "xmlsec1 error verifying cred {} using Signature ID {}: {}"
+ .format(self.pretty_cred(), ref, msg))
os.remove(filename)
# Verify the parents (delegation)
root_target_gid = root_cred.get_gid_object()
if root_cred.get_signature() is None:
# malformed
- raise CredentialNotVerifiable("Could not verify credential owned by {} for object {}. "
- "Cred has no signature"
- .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
+ raise CredentialNotVerifiable(
+ "Could not verify credential owned by {} for object {}. "
+ "Cred has no signature"
+ .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
root_cred_signer = root_cred.get_signature().get_issuer_gid()
# Case 2:
# Allow someone to sign credential about themeselves. Used?
# If not, remove this.
- #root_target_gid_str = root_target_gid.save_to_string()
- #root_cred_signer_str = root_cred_signer.save_to_string()
+ # root_target_gid_str = root_target_gid.save_to_string()
+ # root_cred_signer_str = root_cred_signer.save_to_string()
# if root_target_gid_str == root_cred_signer_str:
# # cred signer is target, return success
# return
if trusted_gids and len(trusted_gids) > 0:
root_cred_signer.verify_chain(trusted_gids)
else:
- logger.debug("Cannot verify that cred signer is signed by a trusted authority. "
- "No trusted gids. Skipping that check.")
+ logger.debug(
+ "Cannot verify that cred signer is signed by a trusted authority. "
+ "No trusted gids. Skipping that check.")
# See if the signer is an authority over the domain of the target.
# There are multiple types of authority - accept them all here
# Maybe should be (hrn, type) = urn_to_hrn(root_cred_signer.get_urn())
root_cred_signer_type = root_cred_signer.get_type()
if root_cred_signer_type.find('authority') == 0:
- #logger.debug('Cred signer is an authority')
+ # logger.debug('Cred signer is an authority')
# signer is an authority, see if target is in authority's domain
signerhrn = root_cred_signer.get_hrn()
if hrn_authfor_hrn(signerhrn, root_target_gid.get_hrn()):
def delegate(self, delegee_gidfile, caller_keyfile, caller_gidfile):
"""
- Return a delegated copy of this credential, delegated to the
- specified gid's user.
+ Return a delegated copy of this credential, delegated to the
+ specified gid's user.
"""
# get the gid of the object we are delegating
object_gid = self.get_gid_object()
delegee_gid = GID(filename=delegee_gidfile)
delegee_hrn = delegee_gid.get_hrn()
- #user_key = Keypair(filename=keyfile)
- #user_hrn = self.get_gid_caller().get_hrn()
+ # user_key = Keypair(filename=keyfile)
+ # user_hrn = self.get_gid_caller().get_hrn()
subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn)
dcred = Credential(subject=subject_string)
dcred.set_gid_caller(delegee_gid)
dcred.set_expiration(self.get_expiration())
dcred.set_privileges(self.get_privileges())
dcred.get_privileges().delegate_all_privileges(True)
- #dcred.set_issuer_keys(keyfile, delegee_gidfile)
+ # dcred.set_issuer_keys(keyfile, delegee_gidfile)
dcred.set_issuer_keys(caller_keyfile, caller_gidfile)
dcred.encode()
dcred.sign()
# it's probably the former
if caller_type == "user" and issuer_type != "user":
actual_caller_hrn = caller_hrn
- # if we find that the caller_hrn is an immediate descendant of the issuer, then
- # this seems to be a 'regular' credential
+ # if we find that the caller_hrn is an immediate descendant
+ # of the issuer, then this seems to be a 'regular' credential
elif caller_hrn.startswith(issuer_hrn):
actual_caller_hrn = caller_hrn
# else this looks like a delegated credential, and the real caller is
# the issuer
else:
actual_caller_hrn = issuer_hrn
- logger.info("actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
- .format(caller_hrn, issuer_hrn, actual_caller_hrn))
+ logger.info(
+ "actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
+ .format(caller_hrn, issuer_hrn, actual_caller_hrn))
return actual_caller_hrn
##
if self.get_signature():
result += " gidIssuer:\n"
- result += self.get_signature().get_issuer_gid().dump_string(8, dump_parents)
+ result += self.get_signature().get_issuer_gid()\
+ .dump_string(8, dump_parents)
if self.expiration:
result += " expiration: " + \