-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
-##
+#
# SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
# the necessary crypto functionality. Ideally just one of these libraries
# would be used, but unfortunately each of these libraries is independently
# Notes on using the openssl command line
#
-# for verifying the chain in a gid, assuming it is split into pieces p1.pem p2.pem p3.pem
+# for verifying the chain in a gid,
+# assuming it is split into pieces p1.pem p2.pem p3.pem
# you can use openssl to verify the chain using this command
# openssl verify -verbose -CAfile <(cat p2.pem p3.pem) p1.pem
# also you can use sfax509 to invoke openssl x509 on all parts of the gid
import OpenSSL
# M2Crypto is imported on the fly to minimize crashes
-#import M2Crypto
+# import M2Crypto
from sfa.util.py23 import PY3
-from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
+from sfa.util.faults import (CertExpired, CertMissingParent,
+ CertNotSignedByParent)
from sfa.util.sfalogging import logger
# this tends to generate quite some logs for little or no value
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
raise Exception(
- "keyconvert: generated certificate not found. keyconvert may have failed.")
+ "generated certificate not found. keyconvert may have failed.")
k = Keypair()
try:
import M2Crypto
if glo_passphrase_callback:
self.key = OpenSSL.crypto.load_privatekey(
- OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ OpenSSL.crypto.FILETYPE_PEM, string,
+ functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(
- string, functools.partial(glo_passphrase_callback, self, string))
+ string, functools.partial(glo_passphrase_callback,
+ self, string))
else:
self.key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, string)
# Return the private key in PEM format.
def as_pem(self):
- return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.key)
+ return OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, self.key)
##
# Return an M2Crypto key object
# @param filename If filename!=None, load the certficiate from the file.
# @param isCA If !=None, set whether this cert is for a CA
- def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):
+ def __init__(self, lifeDays=1825, create=False, subject=None, string=None,
+ filename=None, isCA=None):
# these used to be defined in the class !
self.x509 = None
self.issuerKey = None
self.load_from_file(filename)
# Set the CA bit if a value was supplied
- if isCA != None:
+ if isCA is not None:
self.set_is_ca(isCA)
# Create a blank X509 certificate and store it in this object.
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
- string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
+ string = '-----BEGIN CERTIFICATE-----'\
+ '\n{}\n-----END CERTIFICATE-----'\
.format(string)
# If there is a PEM cert in there, but there is some other text first
##
# Save the certificate to a string.
#
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_string(self, save_parents=True):
if self.x509 is None:
##
# Save the certificate to a file.
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_file(self, filename, save_parents=True, filep=None):
string = self.save_to_string(save_parents=save_parents)
##
# Save the certificate to a random file in /tmp/
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_random_tmp_file(self, save_parents=True):
fp, filename = mkstemp(suffix='cert', text=True)
fp = os.fdopen(fp, "w")
# Sets the issuer private key and name
# @param key Keypair object containing the private key of the issuer
# @param subject String containing the name of the issuer
- # @param cert (optional) Certificate object containing the name of the issuer
+ # @param cert (optional)
+ # Certificate object containing the name of the issuer
def set_issuer(self, key, subject=None, cert=None):
self.issuerKey = key
def set_intermediate_ca(self, val):
return self.set_is_ca(val)
- # Set whether this cert is for a CA. All signers and only signers should be CAs.
+ # Set whether this cert is for a CA.
+ # All signers and only signers should be CAs.
# The local member starts unset, letting us check that you only set it once
# @param val Boolean indicating whether this cert is for a CA
def set_is_ca(self, val):
if val is None:
return
- if self.isCA != None:
+ if self.isCA is not None:
# Can't double set properties
- raise Exception("Cannot set basicConstraints CA:?? more than once. "
- "Was {}, trying to set as {}%s".format(self.isCA, val))
+ raise Exception(
+ "Cannot set basicConstraints CA:?? more than once. "
+ "Was {}, trying to set as {}".format(self.isCA, val))
self.isCA = val
if val:
self.add_extension('basicConstraints', 1, 'CA:FALSE')
##
- # Add an X509 extension to the certificate. Add_extension can only be called
- # once for a particular extension name, due to limitations in the underlying
- # library.
+ # Add an X509 extension to the certificate. Add_extension can only
+ # be called once for a particular extension name, due to
+ # limitations in the underlying library.
#
# @param name string containing name of extension
# @param value string containing value of the extension
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
-# raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value)
+# raise "Cannot add extension {} which had val {} with new val {}"\
+# .format(name, oldExtVal, value)
ext = OpenSSL.crypto.X509Extension(name, critical, value)
self.x509.add_extensions([ext])
return value
##
- # Set_data is a wrapper around add_extension. It stores the parameter str in
- # the X509 subject_alt_name extension. Set_data can only be called once, due
- # to limitations in the underlying library.
+ # Set_data is a wrapper around add_extension. It stores the
+ # parameter str in the X509 subject_alt_name extension. Set_data
+ # can only be called once, due to limitations in the underlying
+ # library.
def set_data(self, string, field='subjectAltName'):
# pyOpenSSL only allows us to add extensions, so if we try to set the
def sign(self):
logger.debug('certificate.sign')
- assert self.x509 != None
- assert self.issuerSubject != None
- assert self.issuerKey != None
+ assert self.x509 is not None
+ assert self.issuerSubject is not None
+ assert self.issuerKey is not None
self.x509.set_issuer(self.issuerSubject)
self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
result = m2result == 1
if debug_verify_chain:
logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
- .format(result, m2result, self.pretty_cert(), m2pubkey))
+ .format(result, m2result,
+ self.pretty_cert(), m2pubkey))
return result
# XXX alternatively, if openssl has been patched, do the much simpler:
# return 0
##
- # Return True if pkey is identical to the public key that is contained in the certificate.
+ # Return True if pkey is identical to the public key that is
+ # contained in the certificate.
# @param pkey Keypair object
def is_pubkey(self, pkey):
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
- logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
- .format(self.pretty_name(), trusted_cert.pretty_name()))
+ logger.debug("verify_chain: YES."
+ " Cert {} signed by trusted cert {}"
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
return trusted_cert
else:
if debug_verify_chain:
- logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+ logger.debug("verify_chain: NO. Cert {} "
+ "is signed by trusted_cert {}, "
"but that signer is expired..."
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ .format(self.pretty_cert(),
+ trusted_cert.pretty_cert()))
raise CertExpired("{} signer trusted_cert {}"
- .format(self.pretty_name(), trusted_cert.pretty_name()))
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
else:
- logger.debug("verify_chain: not a direct descendant of a trusted root".
- format(self.pretty_name(), trusted_cert))
+ logger.debug("verify_chain: not a direct"
+ " descendant of a trusted root")
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
- .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
- raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+ .format(self.pretty_name(), self.get_issuer(),
+ len(trusted_certs)))
+ raise CertMissingParent("{}: Issuer {} is not "
+ "one of the {} trusted roots, "
"and cert has no parent."
- .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
+ .format(self.pretty_name(),
+ self.get_issuer(),
+ len(trusted_certs)))
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
# CAs.
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
- if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
+ if not self.parent.isCA and not (
+ self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
.format(self.pretty_name(), self.parent.pretty_name()))
raise CertNotSignedByParent("{}: Parent {} not a CA"
- .format(self.pretty_name(), self.parent.pretty_name()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
- .format(self.pretty_name(),self.parent.pretty_name()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
self.parent.verify_chain(trusted_certs)
return
-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
-#----------------------------------------------------------------------
+# ----------------------------------------------------------------------
##
# Implements SFA Credentials
#
-# Credentials are signed XML files that assign a subject gid privileges to an object gid
+# Credentials are signed XML files that assign a subject gid
+# privileges to an object gid
##
from __future__ import print_function
from sfa.util.py23 import PY3, StringType, StringIO
-HAVELXML = False
-try:
- from lxml import etree
- HAVELXML = True
-except:
- pass
-
from xml.parsers.expat import ExpatError
-from sfa.util.faults import CredentialNotVerifiable, ChildRightsNotSubsetOfParent
+from sfa.util.faults import (CredentialNotVerifiable,
+ ChildRightsNotSubsetOfParent)
from sfa.util.sfalogging import logger
from sfa.util.sfatime import utcparse, SFATIME_FORMAT
from sfa.trust.rights import Right, Rights, determine_rights
from sfa.trust.gid import GID
from sfa.util.xrn import urn_to_hrn, hrn_authfor_hrn
+HAVELXML = False
+try:
+ from lxml import etree
+ HAVELXML = True
+except:
+ pass
+
+
# 31 days, in seconds
DEFAULT_CREDENTIAL_LIFETIME = 86400 * 31
if len(cert.childNodes) > 0:
szgid = cert.childNodes[0].nodeValue
szgid = szgid.strip()
- szgid = "-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----".format(
+ szgid = "-----BEGIN CERTIFICATE-----\n"\
+ "{}\n-----END CERTIFICATE-----".format(
szgid)
if gids is None:
gids = szgid
# A credential provides a caller gid with privileges to an object gid.
# A signed credential is signed by the object's authority.
#
-# Credentials are encoded in one of two ways.
-# The legacy style (now unsupported) places it in the subjectAltName of an X509 certificate.
+# Credentials are encoded in one of two ways. The legacy style (now
+# unsupported) places it in the subjectAltName of an X509 certificate.
# The new credentials are placed in signed XML.
#
# WARNING:
# Create a Credential object
#
# @param create If true, create a blank x509 certificate
- # @param subject If subject!=None, create an x509 cert with the subject name
+ # @param subject If subject!=None,
+ # create an x509 cert with the subject name
# @param string If string!=None, load the credential from the string
# @param filename If filename!=None, load the credential from the file
# FIXME: create and subject are ignored!
- def __init__(self, create=False, subject=None, string=None, filename=None, cred=None):
+ def __init__(self, create=False, subject=None, string=None,
+ filename=None, cred=None):
self.gidCaller = None
self.gidObject = None
self.expiration = None
# if this is a legacy credential, write error and bail out
if isinstance(str, StringType) and str.strip().startswith("-----"):
logger.error(
- "Legacy credentials not supported any more - giving up with {}...".format(str[:10]))
+ "Legacy credentials not supported any more "
+ "- giving up with {}..."
+ .format(str[:10]))
return
else:
self.xml = str
break
if not Credential.xmlsec1_path:
logger.error(
- "Could not locate required binary 'xmlsec1' - SFA will be unable to sign stuff !!")
+ "Could not locate required binary 'xmlsec1' -"
+ "SFA will be unable to sign stuff !!")
return Credential.xmlsec1_path
def get_subject(self):
caller = self.gidCaller.pretty_cert()
exp = self.get_expiration()
# Summarize the rights too? The issuer?
- return "[Cred. for {caller} rights on {obj} until {exp} ]".format(**locals())
+ return "[Cred. for {caller} rights on {obj} until {exp} ]"\
+ .format(**locals())
def get_signature(self):
if not self.signature:
self.decode()
return self.gidObject
- ##
- # Expiration: an absolute UTC time of expiration (as either an int or string or datetime)
+ #
+ # Expiration: an absolute UTC time of expiration (as either an int
+ # or string or datetime)
#
def set_expiration(self, expiration):
expiration_datetime = utcparse(expiration)
self.expiration = expiration_datetime
else:
logger.error(
- "unexpected input {} in Credential.set_expiration".format(expiration))
+ "unexpected input {} in Credential.set_expiration"
+ .format(expiration))
##
# get the lifetime of the credential (always in datetime format)
##
# set the privileges
#
- # @param privs either a comma-separated list of privileges of a Rights object
+ # @param privs either a comma-separated list of privileges of a
+ # Rights object
def set_privileges(self, privs):
if isinstance(privs, str):
# determine whether the credential allows a particular operation to be
# performed
#
- # @param op_name string specifying name of operation ("lookup", "update", etc)
+ # @param op_name string specifying name of operation
+ # ("lookup", "update", etc)
def can_perform(self, op_name):
rights = self.get_privileges()
"xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
# FIXME: See v2 schema at
# www.geni.net/resources/credential/2/credential.xsd
- signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
- "http://www.planet-lab.org/resources/sfa/credential.xsd")
signed_cred.setAttribute(
- "xsi:schemaLocation", "http://www.planet-lab.org/resources/sfa/ext/policy/1 http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
+ "xsi:noNamespaceSchemaLocation",
+ "http://www.planet-lab.org/resources/sfa/credential.xsd")
+ signed_cred.setAttribute(
+ "xsi:schemaLocation",
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1 "
+ "http://www.planet-lab.org/resources/sfa/ext/policy/1/policy.xsd")
# PG says for those last 2:
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
doc.appendChild(signed_cred)
self.set_expiration(datetime.datetime.utcnow(
) + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
self.expiration = self.expiration.replace(microsecond=0)
- if self.expiration.tzinfo is not None and self.expiration.tzinfo.utcoffset(self.expiration) is not None:
+ if self.expiration.tzinfo is not None \
+ and self.expiration.tzinfo.utcoffset(self.expiration) is not None:
# TZ aware. Make sure it is UTC - by Aaron Helsinger
self.expiration = self.expiration.astimezone(tz.tzutc())
append_sub(doc, cred, "expires",
# If the root node is a signed-credential (it should be), then
# get all its attributes and attach those to our signed_cred
# node.
- # Specifically, PG and PLadd attributes for namespaces (which is reasonable),
+ # Specifically, PG and PL add attributes for namespaces
+ # (which is reasonable),
# and we need to include those again here or else their signature
# no longer matches on the credential.
# We expect three of these, but here we copy them all:
- # signed_cred.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
+ # signed_cred.setAttribute("xmlns:xsi",
+ # "http://www.w3.org/2001/XMLSchema-instance")
# and from PG (PL is equivalent, as shown above):
- # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation", "http://www.protogeni.net/resources/credential/credential.xsd")
- # signed_cred.setAttribute("xsi:schemaLocation", "http://www.protogeni.net/resources/credential/ext/policy/1 http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
+ # signed_cred.setAttribute("xsi:noNamespaceSchemaLocation",
+ # "http://www.protogeni.net/resources/credential/credential.xsd")
+ # signed_cred.setAttribute("xsi:schemaLocation",
+ # "http://www.protogeni.net/resources/credential/ext/policy/1 "
+ # "http://www.protogeni.net/resources/credential/ext/policy/1/policy.xsd")
# HOWEVER!
# PL now also declares these, with different URLs, so
# the contents of the schemas are the same,
# or something else, but it seems odd. And this works.
parentRoot = sdoc.documentElement
- if parentRoot.tagName == "signed-credential" and parentRoot.hasAttributes():
+ if parentRoot.tagName == "signed-credential" and \
+ parentRoot.hasAttributes():
for attrIx in range(0, parentRoot.attributes.length):
attr = parentRoot.attributes.item(attrIx)
# returns the old attribute of same name that was
if oldAttr and oldAttr.value != attr.value:
msg = "Delegating cred from owner {} to {} over {}:\n"
"- Replaced attribute {} value '{}' with '{}'"\
- .format(self.parent.gidCaller.get_urn(), self.gidCaller.get_urn(),
- self.gidObject.get_urn(), oldAttr.name, oldAttr.value, attr.value)
+ .format(self.parent.gidCaller.get_urn(),
+ self.gidCaller.get_urn(),
+ self.gidObject.get_urn(),
+ oldAttr.name, oldAttr.value, attr.value)
logger.warn(msg)
- #raise CredentialNotVerifiable("Can't encode new valid delegated credential: {}".format(msg))
+ # raise CredentialNotVerifiable(
+ # "Can't encode new valid delegated credential: {}"
+ # .format(msg))
p_cred = doc.importNode(
sdoc.getElementsByTagName("credential")[0], True)
if not xmlsec1:
raise Exception("Could not locate required 'xmlsec1' program")
command = '{} --sign --node-id "{}" --privkey-pem {},{} {}' \
- .format(xmlsec1, ref, self.issuer_privkey, ",".join(gid_files), filename)
+ .format(xmlsec1, ref, self.issuer_privkey,
+ ",".join(gid_files), filename)
signed = os.popen(command).read()
os.remove(filename)
kind = getTextNode(priv, "name")
deleg = str2bool(getTextNode(priv, "can_delegate"))
if kind == '*':
- # Convert * into the default privileges for the credential's type
+ # Convert * into the default privileges
+ # for the credential's type
# Each inherits the delegatability from the * above
_, type = urn_to_hrn(self.gidObject.get_urn())
rl = determine_rights(type, self.gidObject.get_urn())
##
# Verify
# trusted_certs: A list of trusted GID filenames (not GID objects!)
- # Chaining is not supported within the GIDs by xmlsec1.
+ # Chaining is not supported within the GIDs by xmlsec1.
#
# trusted_certs_required: Should usually be true. Set False means an
- # empty list of trusted_certs would still let this method pass.
- # It just skips xmlsec1 verification et al. Only used by some utils
+ # empty list of trusted_certs would still let this method pass.
+ # It just skips xmlsec1 verification et al.
+ # Only used by some utils
#
# Verify that:
# . All of the signatures are valid and that the issuers trace back
#
# -- For Delegates (credentials with parents)
# . The privileges must be a subset of the parent credentials
- # . The privileges must have "can_delegate" set for each delegated privilege
+ # . The privileges must have "can_delegate"
+ # set for each delegated privilege
# . The target gid must be the same between child and parents
# . The expiry time on the child must be no later than the parent
# . The signer of the child must be the owner of the parent
# must be done elsewhere
#
# @param trusted_certs: The certificates of trusted CA certificates
- def verify(self, trusted_certs=None, schema=None, trusted_certs_required=True):
+ def verify(self, trusted_certs=None, schema=None,
+ trusted_certs_required=True):
if not self.xml:
self.decode()
xmlschema = etree.XMLSchema(schema_doc)
if not xmlschema.validate(tree):
error = xmlschema.error_log.last_error
- message = "{}: {} (line {})".format(self.pretty_cred(),
- error.message, error.line)
+ message = "{}: {} (line {})"\
+ .format(self.pretty_cred(),
+ error.message, error.line)
raise CredentialNotVerifiable(message)
if trusted_certs_required and trusted_certs is None:
# trusted_cert_objects = [GID(filename=f) for f in trusted_certs]
trusted_cert_objects = []
ok_trusted_certs = []
- # If caller explicitly passed in None that means skip cert chain validation.
- # Strange and not typical
+ # If caller explicitly passed in None, that means
+ # skip cert chain validation. Strange and not typical
if trusted_certs is not None:
for f in trusted_certs:
try:
trusted_cert_objects.append(GID(filename=f))
ok_trusted_certs.append(f)
except Exception as exc:
- logger.error(
- "Failed to load trusted cert from {}: {}".format(f, exc))
+ logger.exception(
+ "Failed to load trusted cert from {}".format(f))
trusted_certs = ok_trusted_certs
# make sure it is not expired
if self.get_expiration() < datetime.datetime.utcnow():
- raise CredentialNotVerifiable("Credential {} expired at {}"
- .format(self.pretty_cred(),
- self.expiration.strftime(SFATIME_FORMAT)))
+ raise CredentialNotVerifiable(
+ "Credential {} expired at {}"
+ .format(self.pretty_cred(),
+ self.expiration.strftime(SFATIME_FORMAT)))
# Verify the signatures
filename = self.save_to_random_tmp_file()
- # If caller explicitly passed in None that means skip cert chain validation.
- # - Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip cert chain validation. Strange and not typical
if trusted_certs is not None:
# Verify the caller and object gids of this cred and of its parents
for cur_cred in self.get_credential_list():
- # check both the caller and the subject
- for gid in cur_cred.get_gid_object(), cur_cred.get_gid_caller():
+ # check both the caller and the subject
+ for gid in (cur_cred.get_gid_object(),
+ cur_cred.get_gid_caller()):
logger.debug("Credential.verify: verifying chain {}"
.format(gid.pretty_cert()))
logger.debug("Credential.verify: against trusted {}"
.format(" ".join(trusted_certs)))
gid.verify_chain(trusted_cert_objects)
-
+
refs = []
refs.append("Sig_{}".format(self.get_refid()))
refs.append("Sig_{}".format(ref))
for ref in refs:
- # If caller explicitly passed in None that means skip xmlsec1 validation.
- # Strange and not typical
+ # If caller explicitly passed in None that means
+ # skip xmlsec1 validation. Strange and not typical
if trusted_certs is None:
break
# Thierry - jan 2015
- # up to fedora20 we used os.popen and checked that the output begins with OK
- # turns out, with fedora21, there is extra input before this 'OK' thing
- # looks like we're better off just using the exit code - that's what it is made for
- #cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
+ # up to fedora20 we used os.popen and checked
+ # that the output begins with OK; turns out, with fedora21,
+ # there is extra input before this 'OK' thing
+ # looks like we're better off just using the exit code
+ # that's what it is made for
+ # cert_args = " ".join(['--trusted-pem {}'.format(x) for x in trusted_certs])
# command = '{} --verify --node-id "{}" {} {} 2>&1'.\
# format(self.xmlsec_path, ref, cert_args, filename)
xmlsec1 = self.get_xmlsec1_path()
mend = verified.find('\\', mstart)
msg = verified[mstart:mend]
logger.warning(
- "Credential.verify - failed - xmlsec1 returned {}".format(verified.strip()))
- raise CredentialNotVerifiable("xmlsec1 error verifying cred {} using Signature ID {}: {}"
- .format(self.pretty_cred(), ref, msg))
+ "Credential.verify - failed - xmlsec1 returned {}"
+ .format(verified.strip()))
+ raise CredentialNotVerifiable(
+ "xmlsec1 error verifying cred {} using Signature ID {}: {}"
+ .format(self.pretty_cred(), ref, msg))
os.remove(filename)
# Verify the parents (delegation)
root_target_gid = root_cred.get_gid_object()
if root_cred.get_signature() is None:
# malformed
- raise CredentialNotVerifiable("Could not verify credential owned by {} for object {}. "
- "Cred has no signature"
- .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
+ raise CredentialNotVerifiable(
+ "Could not verify credential owned by {} for object {}. "
+ "Cred has no signature"
+ .format(self.gidCaller.get_urn(), self.gidObject.get_urn()))
root_cred_signer = root_cred.get_signature().get_issuer_gid()
# Case 2:
# Allow someone to sign credential about themeselves. Used?
# If not, remove this.
- #root_target_gid_str = root_target_gid.save_to_string()
- #root_cred_signer_str = root_cred_signer.save_to_string()
+ # root_target_gid_str = root_target_gid.save_to_string()
+ # root_cred_signer_str = root_cred_signer.save_to_string()
# if root_target_gid_str == root_cred_signer_str:
# # cred signer is target, return success
# return
if trusted_gids and len(trusted_gids) > 0:
root_cred_signer.verify_chain(trusted_gids)
else:
- logger.debug("Cannot verify that cred signer is signed by a trusted authority. "
- "No trusted gids. Skipping that check.")
+ logger.debug(
+ "Cannot verify that cred signer is signed by a trusted authority. "
+ "No trusted gids. Skipping that check.")
# See if the signer is an authority over the domain of the target.
# There are multiple types of authority - accept them all here
# Maybe should be (hrn, type) = urn_to_hrn(root_cred_signer.get_urn())
root_cred_signer_type = root_cred_signer.get_type()
if root_cred_signer_type.find('authority') == 0:
- #logger.debug('Cred signer is an authority')
+ # logger.debug('Cred signer is an authority')
# signer is an authority, see if target is in authority's domain
signerhrn = root_cred_signer.get_hrn()
if hrn_authfor_hrn(signerhrn, root_target_gid.get_hrn()):
def delegate(self, delegee_gidfile, caller_keyfile, caller_gidfile):
"""
- Return a delegated copy of this credential, delegated to the
- specified gid's user.
+ Return a delegated copy of this credential, delegated to the
+ specified gid's user.
"""
# get the gid of the object we are delegating
object_gid = self.get_gid_object()
delegee_gid = GID(filename=delegee_gidfile)
delegee_hrn = delegee_gid.get_hrn()
- #user_key = Keypair(filename=keyfile)
- #user_hrn = self.get_gid_caller().get_hrn()
+ # user_key = Keypair(filename=keyfile)
+ # user_hrn = self.get_gid_caller().get_hrn()
subject_string = "{} delegated to {}".format(object_hrn, delegee_hrn)
dcred = Credential(subject=subject_string)
dcred.set_gid_caller(delegee_gid)
dcred.set_expiration(self.get_expiration())
dcred.set_privileges(self.get_privileges())
dcred.get_privileges().delegate_all_privileges(True)
- #dcred.set_issuer_keys(keyfile, delegee_gidfile)
+ # dcred.set_issuer_keys(keyfile, delegee_gidfile)
dcred.set_issuer_keys(caller_keyfile, caller_gidfile)
dcred.encode()
dcred.sign()
# it's probably the former
if caller_type == "user" and issuer_type != "user":
actual_caller_hrn = caller_hrn
- # if we find that the caller_hrn is an immediate descendant of the issuer, then
- # this seems to be a 'regular' credential
+ # if we find that the caller_hrn is an immediate descendant
+ # of the issuer, then this seems to be a 'regular' credential
elif caller_hrn.startswith(issuer_hrn):
actual_caller_hrn = caller_hrn
# else this looks like a delegated credential, and the real caller is
# the issuer
else:
actual_caller_hrn = issuer_hrn
- logger.info("actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
- .format(caller_hrn, issuer_hrn, actual_caller_hrn))
+ logger.info(
+ "actual_caller_hrn: caller_hrn={}, issuer_hrn={}, returning {}"
+ .format(caller_hrn, issuer_hrn, actual_caller_hrn))
return actual_caller_hrn
##
if self.get_signature():
result += " gidIssuer:\n"
- result += self.get_signature().get_issuer_gid().dump_string(8, dump_parents)
+ result += self.get_signature().get_issuer_gid()\
+ .dump_string(8, dump_parents)
if self.expiration:
result += " expiration: " + \
from sfa.trust.sfaticket import SfaTicket
##
-# The AuthInfo class contains the information for an authority. This information
-# includes the GID, private key, and database connection information.
+# The AuthInfo class contains the information for an authority. This
+# information includes the GID, private key, and database connection
+# information.
class AuthInfo:
##
# Initialize and authority object.
#
- # @param xrn the human readable name of the authority (urn will be converted to hrn)
+ # @param xrn the human readable name of the authority
+ # (urn will be converted to hrn)
# @param gid_filename the filename containing the GID
# @param privkey_filename the filename containing the private key
# The Hierarchy class is responsible for managing the tree of authorities.
# Each authority is a node in the tree and exists as an AuthInfo object.
#
-# The tree is stored on disk in a hierarchical manner than reflects the
-# structure of the tree. Each authority is a subdirectory, and each subdirectory
-# contains the GID and pkey files for that authority (as well as
-# subdirectories for each sub-authority)
+# The tree is stored on disk in a hierarchical manner than reflects
+# the structure of the tree. Each authority is a subdirectory, and
+# each subdirectory contains the GID and pkey files for that authority
+# (as well as subdirectories for each sub-authority)
class Hierarchy:
# Given a hrn, return the filenames of the GID, private key
# files.
#
- # @param xrn the human readable name of the authority (urn will be convertd to hrn)
+ # @param xrn the human readable name of the authority
+ # (urn will be convertd to hrn)
def get_auth_filenames(self, xrn):
hrn, type = urn_to_hrn(xrn)
(directory, gid_filename, privkey_filename) = \
self.get_auth_filenames(hrn)
- return os.path.exists(gid_filename) and os.path.exists(privkey_filename)
+ return os.path.exists(gid_filename) \
+ and os.path.exists(privkey_filename)
##
# Create an authority. A private key for the authority and the associated
# GID are created and signed by the parent authority.
#
- # @param xrn the human readable name of the authority to create (urn will be converted to hrn)
- # @param create_parents if true, also create the parents if they do not exist
+ # @param xrn the human readable name of the authority to create
+ # (urn will be converted to hrn)
+ # @param create_parents
+ # if true, also create the parents if they do not exist
def create_auth(self, xrn, create_parents=False):
hrn, type = urn_to_hrn(str(xrn))
- logger.debug("Hierarchy: creating authority: %s" % hrn)
+ logger.debug("Hierarchy: creating authority: {}".format(hrn))
# create the parent authority if necessary
parent_hrn = get_authority(hrn)
parent_urn = hrn_to_urn(parent_hrn, 'authority')
- if (parent_hrn) and (not self.auth_exists(parent_urn)) and (create_parents):
+ if (parent_hrn) and (not self.auth_exists(parent_urn)) \
+ and (create_parents):
self.create_auth(parent_urn, create_parents)
directory, gid_filename, privkey_filename = self.get_auth_filenames(
hrn)
pass
if os.path.exists(privkey_filename):
- logger.debug("using existing key %r for authority %r"
- % (privkey_filename, hrn))
+ logger.debug("using existing key {} for authority {}"
+ .format(privkey_filename, hrn))
pkey = Keypair(filename=privkey_filename)
else:
pkey = Keypair(create=True)
def create_top_level_auth(self, hrn=None):
"""
- Create top level records (includes root and sub authorities (local/remote)
+ Create top level records
+ (includes root and sub authorities (local/remote)
"""
# create the authority if it doesnt alrady exist
if not self.auth_exists(hrn):
def get_interface_auth_info(self, create=True):
hrn = self.config.SFA_INTERFACE_HRN
if not self.auth_exists(hrn):
- if create == True:
+ if create:
self.create_top_level_auth(hrn)
else:
raise MissingAuthority(hrn)
# does not exist, then an exception is thrown. As a side effect, disk files
# and a subdirectory may be created to store the authority.
#
- # @param xrn the human readable name of the authority to create (urn will be converted to hrn).
+ # @param xrn the human readable name of the authority to create
+ # (urn will be converted to hrn).
def get_auth_info(self, xrn):
hrn, type = urn_to_hrn(xrn)
if not self.auth_exists(hrn):
logger.warning(
- "Hierarchy: missing authority - xrn=%s, hrn=%s" % (xrn, hrn))
+ "Hierarchy: missing authority - xrn={}, hrn={}"
+ .format(xrn, hrn))
raise MissingAuthority(hrn)
(directory, gid_filename, privkey_filename, ) = \
# @param uuid the unique identifier to store in the GID
# @param pkey the public key to store in the GID
- def create_gid(self, xrn, uuid, pkey, CA=False, email=None, force_parent=None):
+ def create_gid(self, xrn, uuid, pkey,
+ CA=False, email=None, force_parent=None):
hrn, type = urn_to_hrn(xrn)
if not type:
type = 'authority'
# credential will contain the authority privilege and will be signed by
# the authority's parent.
#
- # @param hrn the human readable name of the authority (urn is converted to hrn)
+ # @param hrn the human readable name of the authority
+ # (urn is converted to hrn)
# @param authority type of credential to return (authority | sa | ma)
def get_auth_cred(self, xrn, kind="authority"):
# This looks almost the same as get_auth_cred, but works for tickets
# XXX does similarity imply there should be more code re-use?
#
- # @param xrn the human readable name of the authority (urn is converted to hrn)
+ # @param xrn the human readable name of the authority
+ # (urn is converted to hrn)
def get_auth_ticket(self, xrn):
hrn, type = urn_to_hrn(xrn)