# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Work.
#
-# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
-# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
+# THE WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
from sfa.util.sfalogging import logger
from sfa.util.sfatime import utcparse, SFATIME_FORMAT
from sfa.trust.rights import Right, Rights, determine_rights
from sfa.trust.gid import GID
from sfa.util.xrn import urn_to_hrn, hrn_authfor_hrn
from sfa.util.sfalogging import logger
from sfa.util.sfatime import utcparse, SFATIME_FORMAT
from sfa.trust.rights import Right, Rights, determine_rights
from sfa.trust.gid import GID
from sfa.util.xrn import urn_to_hrn, hrn_authfor_hrn
<Signature xml:id="Sig_{refid}" xmlns="http://www.w3.org/2000/09/xmldsig#">
<SignedInfo>
<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
<Signature xml:id="Sig_{refid}" xmlns="http://www.w3.org/2000/09/xmldsig#">
<SignedInfo>
<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
def getTextNode(element, subele):
sub = element.getElementsByTagName(subele)[0]
def getTextNode(element, subele):
sub = element.getElementsByTagName(subele)[0]
##
# Utility function to set the text of an XML element
# It creates the element, adds the text to it,
# and then appends it to the parent.
##
# Utility function to set the text of an XML element
# It creates the element, adds the text to it,
# and then appends it to the parent.
def append_sub(doc, parent, element, text):
ele = doc.createElement(element)
ele.appendChild(doc.createTextNode(text))
def append_sub(doc, parent, element, text):
ele = doc.createElement(element)
ele.appendChild(doc.createTextNode(text))
def remove_prefix(text, prefix):
if text and prefix and text.startswith(prefix):
return text[len(prefix):]
def remove_prefix(text, prefix):
if text and prefix and text.startswith(prefix):
return text[len(prefix):]
logger.log_exc("Failed to parse credential, {}".format(self.xml))
raise
sig = doc.getElementsByTagName("Signature")[0]
logger.log_exc("Failed to parse credential, {}".format(self.xml))
raise
sig = doc.getElementsByTagName("Signature")[0]
# Reference xml:id or Reference UID sub element instead
if not ref_id or ref_id == '':
reference = sig.getElementsByTagName('Reference')[0]
# Reference xml:id or Reference UID sub element instead
if not ref_id or ref_id == '':
reference = sig.getElementsByTagName('Reference')[0]
# A credential provides a caller gid with privileges to an object gid.
# A signed credential is signed by the object's authority.
#
# A credential provides a caller gid with privileges to an object gid.
# A signed credential is signed by the object's authority.
#
-# Credentials are encoded in one of two ways.
-# The legacy style (now unsupported) places it in the subjectAltName of an X509 certificate.
+# Credentials are encoded in one of two ways. The legacy style (now
+# unsupported) places it in the subjectAltName of an X509 certificate.
# The new credentials are placed in signed XML.
#
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
# The new credentials are placed in signed XML.
#
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
- """
- Returns a list of creds who's gid caller matches the
- specified caller hrn
- """
- if not isinstance(creds, list): creds = [creds]
- if not isinstance(caller_hrn_list, list):
- caller_hrn_list = [caller_hrn_list]
- caller_creds = []
- for cred in creds:
- try:
- tmp_cred = Credential(string=cred)
- if tmp_cred.type != Credential.SFA_CREDENTIAL_TYPE:
- continue
- if tmp_cred.get_gid_caller().get_hrn() in caller_hrn_list:
- caller_creds.append(cred)
- except: pass
- return caller_creds
+ """
+ Returns a list of creds who's gid caller matches the
+ specified caller hrn
+ """
+ if not isinstance(creds, list):
+ creds = [creds]
+ if not isinstance(caller_hrn_list, list):
+ caller_hrn_list = [caller_hrn_list]
+ caller_creds = []
+ for cred in creds:
+ try:
+ tmp_cred = Credential(string=cred)
+ if tmp_cred.type != Credential.SFA_CREDENTIAL_TYPE:
+ continue
+ if tmp_cred.get_gid_caller().get_hrn() in caller_hrn_list:
+ caller_creds.append(cred)
+ except:
+ pass
+ return caller_creds
+
# Create a Credential object
#
# @param create If true, create a blank x509 certificate
# Create a Credential object
#
# @param create If true, create a blank x509 certificate
# @param string If string!=None, load the credential from the string
# @param filename If filename!=None, load the credential from the file
# FIXME: create and subject are ignored!
# @param string If string!=None, load the credential from the string
# @param filename If filename!=None, load the credential from the file
# FIXME: create and subject are ignored!
- def __init__(self, create=False, subject=None, string=None, filename=None, cred=None):
+ def __init__(self, create=False, subject=None, string=None,
+ filename=None, cred=None):
# if this is a legacy credential, write error and bail out
if isinstance(str, StringType) and str.strip().startswith("-----"):
# if this is a legacy credential, write error and bail out
if isinstance(str, StringType) and str.strip().startswith("-----"):
if not getattr(Credential, 'xmlsec1_path', None):
# Find a xmlsec1 binary path
Credential.xmlsec1_path = ''
if not getattr(Credential, 'xmlsec1_path', None):
# Find a xmlsec1 binary path
Credential.xmlsec1_path = ''
- paths = ['/usr/bin', '/usr/local/bin', '/bin', '/opt/bin', '/opt/local/bin']
- try: paths += os.getenv('PATH').split(':')
- except: pass
+ paths = ['/usr/bin', '/usr/local/bin',
+ '/bin', '/opt/bin', '/opt/local/bin']
+ try:
+ paths += os.getenv('PATH').split(':')
+ except:
+ pass
for path in paths:
xmlsec1 = os.path.join(path, 'xmlsec1')
if os.path.isfile(xmlsec1):
Credential.xmlsec1_path = xmlsec1
break
if not Credential.xmlsec1_path:
for path in paths:
xmlsec1 = os.path.join(path, 'xmlsec1')
if os.path.isfile(xmlsec1):
Credential.xmlsec1_path = xmlsec1
break
if not Credential.xmlsec1_path:
caller = self.gidCaller.pretty_cert()
exp = self.get_expiration()
# Summarize the rights too? The issuer?
caller = self.gidCaller.pretty_cert()
exp = self.get_expiration()
# Summarize the rights too? The issuer?
##
# Need the issuer's private key and name
# @param key Keypair object containing the private key of the issuer
##
# Need the issuer's private key and name
# @param key Keypair object containing the private key of the issuer
##
# Set this credential's parent
def set_parent(self, cred):
##
# Set this credential's parent
def set_parent(self, cred):
def set_expiration(self, expiration):
expiration_datetime = utcparse(expiration)
if expiration_datetime is not None:
self.expiration = expiration_datetime
else:
def set_expiration(self, expiration):
expiration_datetime = utcparse(expiration)
if expiration_datetime is not None:
self.expiration = expiration_datetime
else:
def set_privileges(self, privs):
if isinstance(privs, str):
def set_privileges(self, privs):
if isinstance(privs, str):
- # @param op_name string specifying name of operation ("lookup", "update", etc)
+ # @param op_name string specifying name of operation
+ # ("lookup", "update", etc)
def can_perform(self, op_name):
rights = self.get_privileges()
def can_perform(self, op_name):
rights = self.get_privileges()
- # Encode the attributes of the credential into an XML string
- # This should be done immediately before signing the credential.
+ # Encode the attributes of the credential into an XML string
+ # This should be done immediately before signing the credential.
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
# Note that delegation of credentials between the 2 only really works
# cause those schemas are identical.
# Also note these PG schemas talk about PG tickets and CM policies.
# Note that delegation of credentials between the 2 only really works
# cause those schemas are identical.
# Also note these PG schemas talk about PG tickets and CM policies.
- signed_cred.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
- # FIXME: See v2 schema at www.geni.net/resources/credential/2/credential.xsd
- signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.planet-lab.org/resources/sfa/credential.xsd")
- signed_cred.setAttribute("xsi:schemaLocation", "http://www.planet-lab.org/resources/sfa/ext/policy/1 http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
+ signed_cred.setAttribute(
+ "xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
+ # FIXME: See v2 schema at
+ # www.geni.net/resources/credential/2/credential.xsd
+ signed_cred.setAttribute(
+ "xsi:noNamespaceSchemaLocation",
+ "http://www.planet-lab.org/resources/sfa/credential.xsd")
+ signed_cred.setAttribute(
+ "xsi:schemaLocation",
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1 "
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
cred = doc.createElement("credential")
cred.setAttribute("xml:id", self.get_refid())
signed_cred.appendChild(cred)
cred = doc.createElement("credential")
cred.setAttribute("xml:id", self.get_refid())
signed_cred.appendChild(cred)
append_sub(doc, cred, "target_urn", self.gidObject.get_urn())
append_sub(doc, cred, "uuid", "")
if not self.expiration:
append_sub(doc, cred, "target_urn", self.gidObject.get_urn())
append_sub(doc, cred, "uuid", "")
if not self.expiration:
- logger.debug("Creating credential valid for {} s".format(DEFAULT_CREDENTIAL_LIFETIME))
- self.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
+ logger.debug("Creating credential valid for {} s".format(
+ DEFAULT_CREDENTIAL_LIFETIME))
+ self.set_expiration(datetime.datetime.utcnow(
+ ) + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
# TZ aware. Make sure it is UTC - by Aaron Helsinger
self.expiration = self.expiration.astimezone(tz.tzutc())
# TZ aware. Make sure it is UTC - by Aaron Helsinger
self.expiration = self.expiration.astimezone(tz.tzutc())
- append_sub(doc, cred, "expires", self.expiration.strftime(SFATIME_FORMAT))
+ append_sub(doc, cred, "expires",
+ self.expiration.strftime(SFATIME_FORMAT))
for right in rights.rights:
priv = doc.createElement("privilege")
append_sub(doc, priv, "name", right.kind)
for right in rights.rights:
priv = doc.createElement("privilege")
append_sub(doc, priv, "name", right.kind)
- append_sub(doc, priv, "can_delegate", str(right.delegate).lower())
+ append_sub(doc, priv, "can_delegate",
+ str(right.delegate).lower())
# If the root node is a signed-credential (it should be), then
# get all its attributes and attach those to our signed_cred
# node.
# If the root node is a signed-credential (it should be), then
# get all its attributes and attach those to our signed_cred
# node.
- # Specifically, PG and PLadd attributes for namespaces (which is reasonable),
+ # Specifically, PG and PL add attributes for namespaces
+ # (which is reasonable),
# and we need to include those again here or else their signature
# no longer matches on the credential.
# We expect three of these, but here we copy them all:
# and we need to include those again here or else their signature
# no longer matches on the credential.
# We expect three of these, but here we copy them all:
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
# the contents of the schemas are the same,
# or something else, but it seems odd. And this works.
parentRoot = sdoc.documentElement
# the contents of the schemas are the same,
# or something else, but it seems odd. And this works.
parentRoot = sdoc.documentElement
for attrIx in range(0, parentRoot.attributes.length):
attr = parentRoot.attributes.item(attrIx)
# returns the old attribute of same name that was
# on the credential
for attrIx in range(0, parentRoot.attributes.length):
attr = parentRoot.attributes.item(attrIx)
# returns the old attribute of same name that was
# on the credential
- # Below throws InUse exception if we forgot to clone the attribute first
- oldAttr = signed_cred.setAttributeNode(attr.cloneNode(True))
+ # Below throws InUse exception if we forgot to clone the
+ # attribute first
+ oldAttr = signed_cred.setAttributeNode(
+ attr.cloneNode(True))
if oldAttr and oldAttr.value != attr.value:
msg = "Delegating cred from owner {} to {} over {}:\n"
"- Replaced attribute {} value '{}' with '{}'"\
if oldAttr and oldAttr.value != attr.value:
msg = "Delegating cred from owner {} to {} over {}:\n"
"- Replaced attribute {} value '{}' with '{}'"\
- .format(self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(),
- self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value)
+ .format(self.parent.gidCaller.get_urn(),
+ self.gidCaller.get_urn(),
+ self.gidObject.get_urn(),
+ oldAttr.name, oldAttr.value, attr.value)
if self.parent:
for cur_cred in self.get_credential_list()[1:]:
sdoc = parseString(cur_cred.get_signature().get_xml())
if self.parent:
for cur_cred in self.get_credential_list()[1:]:
sdoc = parseString(cur_cred.get_signature().get_xml())
fp, filename = mkstemp(suffix='cred', text=True)
fp = os.fdopen(fp, "w")
self.save_to_file(filename, save_parents=True, filep=fp)
return filename
fp, filename = mkstemp(suffix='cred', text=True)
fp = os.fdopen(fp, "w")
self.save_to_file(filename, save_parents=True, filep=fp)
return filename
def save_to_file(self, filename, save_parents=True, filep=None):
if not self.xml:
self.encode()
if filep:
def save_to_file(self, filename, save_parents=True, filep=None):
if not self.xml:
self.encode()
if filep:
# Figure out what refids exist, and update this credential's id
# so that it doesn't clobber the others. Returns the refids of
# the parents.
# Figure out what refids exist, and update this credential's id
# so that it doesn't clobber the others. Returns the refids of
# the parents.
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
# WARNING:
# In general, a signed credential obtained externally should
# not be changed else the signature is no longer valid. So, once
- sdoc = parseString(signature.get_xml())
- sig_ele = doc.importNode(sdoc.getElementsByTagName("Signature")[0], True)
+ sdoc = parseString(signature.get_xml())
+ sig_ele = doc.importNode(
+ sdoc.getElementsByTagName("Signature")[0], True)
# Split the issuer GID into multiple certificates if it's a chain
chain = GID(filename=self.issuer_gid)
gid_files = []
# Split the issuer GID into multiple certificates if it's a chain
chain = GID(filename=self.issuer_gid)
gid_files = []
# Call out to xmlsec1 to sign it
ref = 'Sig_{}'.format(self.get_refid())
filename = self.save_to_random_tmp_file()
# Call out to xmlsec1 to sign it
ref = 'Sig_{}'.format(self.get_refid())
filename = self.save_to_random_tmp_file()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \
- .format(xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename)
+ .format(xmlsec1, ref, self.issuer_privkey,
+ ",".join(gid_files), filename)
sigs = signatures[0].getElementsByTagName("Signature")
else:
creds = doc.getElementsByTagName("credential")
sigs = signatures[0].getElementsByTagName("Signature")
else:
creds = doc.getElementsByTagName("credential")
self.gidCaller = GID(string=getTextNode(cred, "owner_gid"))
self.gidObject = GID(string=getTextNode(cred, "target_gid"))
self.gidCaller = GID(string=getTextNode(cred, "owner_gid"))
self.gidObject = GID(string=getTextNode(cred, "target_gid"))
kind = getTextNode(priv, "name")
deleg = str2bool(getTextNode(priv, "can_delegate"))
if kind == '*':
kind = getTextNode(priv, "name")
deleg = str2bool(getTextNode(priv, "can_delegate"))
if kind == '*':
rlist.add(Right(kind.strip(), deleg))
self.set_privileges(rlist)
rlist.add(Right(kind.strip(), deleg))
self.set_privileges(rlist)
# Is there a parent?
parent = cred.getElementsByTagName("parent")
if len(parent) > 0:
parent_doc = parent[0].getElementsByTagName("credential")[0]
parent_xml = parent_doc.toxml("utf-8")
if parent_xml is None or parent_xml.strip() == "":
# Is there a parent?
parent = cred.getElementsByTagName("parent")
if len(parent) > 0:
parent_doc = parent[0].getElementsByTagName("credential")[0]
parent_xml = parent_doc.toxml("utf-8")
if parent_xml is None or parent_xml.strip() == "":
for cur_cred in self.get_credential_list():
if cur_cred.get_refid() == Sig.get_refid():
cur_cred.set_signature(Sig)
for cur_cred in self.get_credential_list():
if cur_cred.get_refid() == Sig.get_refid():
cur_cred.set_signature(Sig)
- # trusted_certs: A list of trusted GID filenames (not GID objects!)
- # Chaining is not supported within the GIDs by xmlsec1.
+ # trusted_certs: A list of trusted GID filenames (not GID objects!)
+ # Chaining is not supported within the GIDs by xmlsec1.
- # empty list of trusted_certs would still let this method pass.
- # It just skips xmlsec1 verification et al. Only used by some utils
- #
+ # empty list of trusted_certs would still let this method pass.
+ # It just skips xmlsec1 verification et al.
+ # Only used by some utils
+ #
# Verify that:
# . All of the signatures are valid and that the issuers trace back
# to trusted roots (performed by xmlsec1)
# Verify that:
# . All of the signatures are valid and that the issuers trace back
# to trusted roots (performed by xmlsec1)
#
# -- For Delegates (credentials with parents)
# . The privileges must be a subset of the parent credentials
#
# -- For Delegates (credentials with parents)
# . The privileges must be a subset of the parent credentials
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
# . The signer of the child must be the owner of the parent
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
# . The signer of the child must be the owner of the parent
- def verify(self, trusted_certs=None, schema=None, trusted_certs_required=True):
+ def verify(self, trusted_certs=None, schema=None,
+ trusted_certs_required=True):
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "{}: {} (line {})".format(self.pretty_cred(),
- error.message, error.line)
+ message = "{}: {} (line {})"\
+ .format(self.pretty_cred(),
+ error.message, error.line)
- # If caller explicitly passed in None that means skip cert chain validation.
- # Strange and not typical
+ # If caller explicitly passed in None, that means
+ # skip cert chain validation. Strange and not typical
trusted_certs = ok_trusted_certs
# make sure it is not expired
if self.get_expiration() < datetime.datetime.utcnow():
trusted_certs = ok_trusted_certs
# make sure it is not expired
if self.get_expiration() < datetime.datetime.utcnow():
- raise CredentialNotVerifiable("Credential {} expired at {}" \
- .format(self.pretty_cred(),
- self.expiration.strftime(SFATIME_FORMAT)))
+ raise CredentialNotVerifiable(
+ "Credential {} expired at {}"
+ .format(self.pretty_cred(),
+ self.expiration.strftime(SFATIME_FORMAT)))
- # If caller explicitly passed in None that means skip cert chain validation.
- # - Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip cert chain validation. Strange and not typical
- cur_cred.get_gid_object().verify_chain(trusted_cert_objects)
- cur_cred.get_gid_caller().verify_chain(trusted_cert_objects)
+ # check both the caller and the subject
+ for gid in (cur_cred.get_gid_object(),
+ cur_cred.get_gid_caller()):
+ logger.debug("Credential.verify: verifying chain {}"
+ .format(gid.pretty_cert()))
+ logger.debug("Credential.verify: against trusted {}"
+ .format(" ".join(trusted_certs)))
+ gid.verify_chain(trusted_cert_objects)
- # If caller explicitly passed in None that means skip xmlsec1 validation.
- # Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip xmlsec1 validation. Strange and not typical
- # up to fedora20 we used os.popen and checked that the output begins with OK
- # turns out, with fedora21, there is extra input before this 'OK' thing
- # looks like we're better off just using the exit code - that's what it is made for
- #cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
- #command = '{} --verify --node-id "{}" {} {} 2>&1'.\
+ # up to fedora20 we used os.popen and checked
+ # that the output begins with OK; turns out, with fedora21,
+ # there is extra input before this 'OK' thing
+ # looks like we're better off just using the exit code
+ # that's what it is made for
+ # cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
+ # command = '{} --verify --node-id "{}" {} {} 2>&1'.\
# format(self.xmlsec_path, ref, cert_args, filename)
xmlsec1 = self.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
# format(self.xmlsec_path, ref, cert_args, filename)
xmlsec1 = self.get_xmlsec1_path()
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
- command = [ xmlsec1, '--verify', '--node-id', ref ]
+ command = [xmlsec1, '--verify', '--node-id', ref]
mstart = mstart + 4
mend = verified.find('\\', mstart)
msg = verified[mstart:mend]
mstart = mstart + 4
mend = verified.find('\\', mstart)
msg = verified[mstart:mend]
- logger.warning("Credential.verify - failed - xmlsec1 returned {}".format(verified.strip()))
- raise CredentialNotVerifiable("xmlsec1 error verifying cred {} using Signature ID {}: {}"\
- .format(self.pretty_cred(), ref, msg))
+ logger.warning(
+ "Credential.verify - failed - xmlsec1 returned {}"
+ .format(verified.strip()))
+ raise CredentialNotVerifiable(
+ "xmlsec1 error verifying cred {} using Signature ID {}: {}"
+ .format(self.pretty_cred(), ref, msg))
##
# Make sure the credential's target gid (a) was signed by or (b)
# is the same as the entity that signed the original credential,
##
# Make sure the credential's target gid (a) was signed by or (b)
# is the same as the entity that signed the original credential,
- raise CredentialNotVerifiable("Could not verify credential owned by {} for object {}. "
- "Cred has no signature" \
- .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
+ raise CredentialNotVerifiable(
+ "Could not verify credential owned by {} for object {}. "
+ "Cred has no signature"
+ .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
- #root_target_gid_str = root_target_gid.save_to_string()
- #root_cred_signer_str = root_cred_signer.save_to_string()
- #if root_target_gid_str == root_cred_signer_str:
+ # root_target_gid_str = root_target_gid.save_to_string()
+ # root_cred_signer_str = root_cred_signer.save_to_string()
+ # if root_target_gid_str == root_cred_signer_str:
- logger.debug("Cannot verify that cred signer is signed by a trusted authority. "
- "No trusted gids. Skipping that check.")
+ logger.debug(
+ "Cannot verify that cred signer is signed by a trusted authority. "
+ "No trusted gids. Skipping that check.")
# See if the signer is an authority over the domain of the target.
# There are multiple types of authority - accept them all here
# Maybe should be (hrn, type) = urn_to_hrn(root_cred_signer.get_urn())
root_cred_signer_type = root_cred_signer.get_type()
if root_cred_signer_type.find('authority') == 0:
# See if the signer is an authority over the domain of the target.
# There are multiple types of authority - accept them all here
# Maybe should be (hrn, type) = urn_to_hrn(root_cred_signer.get_urn())
root_cred_signer_type = root_cred_signer.get_type()
if root_cred_signer_type.find('authority') == 0:
# signer is an authority, see if target is in authority's domain
signerhrn = root_cred_signer.get_hrn()
if hrn_authfor_hrn(signerhrn, root_target_gid.get_hrn()):
# signer is an authority, see if target is in authority's domain
signerhrn = root_cred_signer.get_hrn()
if hrn_authfor_hrn(signerhrn, root_target_gid.get_hrn()):
# . The privileges must have "can_delegate" set for each delegated privilege
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
# . The privileges must have "can_delegate" set for each delegated privilege
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
def verify_parent(self, parent_cred):
# make sure the rights given to the child are a subset of the
# parents rights (and check delegate bits)
def verify_parent(self, parent_cred):
# make sure the rights given to the child are a subset of the
# parents rights (and check delegate bits)
parent_cred.get_privileges().pretty_rights(),
self.pretty_cred(), self.get_refid(),
self.get_privileges().pretty_rights()))
logger.error(message)
parent_cred.get_privileges().pretty_rights(),
self.pretty_cred(), self.get_refid(),
self.get_privileges().pretty_rights()))
logger.error(message)
- logger.error("parent details {}".format(parent_cred.get_privileges().save_to_string()))
- logger.error("self details {}".format(self.get_privileges().save_to_string()))
+ logger.error("parent details {}".format(
+ parent_cred.get_privileges().save_to_string()))
+ logger.error("self details {}".format(
+ self.get_privileges().save_to_string()))
"Delegated cred {}: Target gid not equal between parent and child. Parent {}"
.format(self.pretty_cred(), parent_cred.pretty_cred()))
logger.error(message)
"Delegated cred {}: Target gid not equal between parent and child. Parent {}"
.format(self.pretty_cred(), parent_cred.pretty_cred()))
logger.error(message)
message = "Delegated credential {} not signed by parent {}'s caller"\
.format(self.pretty_cred(), parent_cred.pretty_cred())
logger.error(message)
message = "Delegated credential {} not signed by parent {}'s caller"\
.format(self.pretty_cred(), parent_cred.pretty_cred())
logger.error(message)
- logger.error("compare1 parent {}".format(parent_cred.get_gid_caller().pretty_cert()))
- logger.error("compare1 parent details {}".format(parent_cred.get_gid_caller().save_to_string()))
- logger.error("compare2 self {}".format(self.get_signature().get_issuer_gid().pretty_crert()))
- logger.error("compare2 self details {}".format(self.get_signature().get_issuer_gid().save_to_string()))
+ logger.error("compare1 parent {}".format(
+ parent_cred.get_gid_caller().pretty_cert()))
+ logger.error("compare1 parent details {}".format(
+ parent_cred.get_gid_caller().save_to_string()))
+ logger.error("compare2 self {}".format(
+ self.get_signature().get_issuer_gid().pretty_crert()))
+ logger.error("compare2 self details {}".format(
+ self.get_signature().get_issuer_gid().save_to_string()))
def delegate(self, delegee_gidfile, caller_keyfile, caller_gidfile):
"""
def delegate(self, delegee_gidfile, caller_keyfile, caller_gidfile):
"""
# the hrn of the user who will be delegated to
delegee_gid = GID(filename=delegee_gidfile)
delegee_hrn = delegee_gid.get_hrn()
# the hrn of the user who will be delegated to
delegee_gid = GID(filename=delegee_gidfile)
delegee_hrn = delegee_gid.get_hrn()
subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn)
dcred = Credential(subject=subject_string)
dcred.set_gid_caller(delegee_gid)
subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn)
dcred = Credential(subject=subject_string)
dcred.set_gid_caller(delegee_gid)
dcred.set_expiration(self.get_expiration())
dcred.set_privileges(self.get_privileges())
dcred.get_privileges().delegate_all_privileges(True)
dcred.set_expiration(self.get_expiration())
dcred.set_privileges(self.get_privileges())
dcred.get_privileges().delegate_all_privileges(True)
subject_hrn = self.get_gid_object().get_hrn()
# if the caller is a user and the issuer is not
# it's probably the former
if caller_type == "user" and issuer_type != "user":
actual_caller_hrn = caller_hrn
subject_hrn = self.get_gid_object().get_hrn()
# if the caller is a user and the issuer is not
# it's probably the former
if caller_type == "user" and issuer_type != "user":
actual_caller_hrn = caller_hrn
- # if we find that the caller_hrn is an immediate descendant of the issuer, then
- # this seems to be a 'regular' credential
+ # if we find that the caller_hrn is an immediate descendant
+ # of the issuer, then this seems to be a 'regular' credential
- logger.info("actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
- .format(caller_hrn, issuer_hrn, actual_caller_hrn))
+ logger.info(
+ "actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
+ .format(caller_hrn, issuer_hrn, actual_caller_hrn))
# SFA code ignores show_xml and disables printing the cred xml
def dump_string(self, dump_parents=False, show_xml=False):
# SFA code ignores show_xml and disables printing the cred xml
def dump_string(self, dump_parents=False, show_xml=False):