git://git.onelab.eu
/
sfa.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
avoid publishing non-relevant entries in GetVersion
[sfa.git]
/
sfa
/
trust
/
certificate.py
diff --git
a/sfa/trust/certificate.py
b/sfa/trust/certificate.py
index
960a387
..
936352f
100644
(file)
--- a/
sfa/trust/certificate.py
+++ b/
sfa/trust/certificate.py
@@
-1,4
+1,4
@@
-#----------------------------------------------------------------------
+#
----------------------------------------------------------------------
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
# Copyright (c) 2008 Board of Trustees, Princeton University
#
# Permission is hereby granted, free of charge, to any person obtaining
@@
-19,9
+19,9
@@
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS
# IN THE WORK.
-#----------------------------------------------------------------------
+#
----------------------------------------------------------------------
-#
#
+#
# SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
# the necessary crypto functionality. Ideally just one of these libraries
# would be used, but unfortunately each of these libraries is independently
# SFA uses two crypto libraries: pyOpenSSL and M2Crypto to implement
# the necessary crypto functionality. Ideally just one of these libraries
# would be used, but unfortunately each of these libraries is independently
@@
-37,7
+37,8
@@
# Notes on using the openssl command line
#
# Notes on using the openssl command line
#
-# for verifying the chain in a gid, assuming it is split into pieces p1.pem p2.pem p3.pem
+# for verifying the chain in a gid,
+# assuming it is split into pieces p1.pem p2.pem p3.pem
# you can use openssl to verify the chain using this command
# openssl verify -verbose -CAfile <(cat p2.pem p3.pem) p1.pem
# also you can use sfax509 to invoke openssl x509 on all parts of the gid
# you can use openssl to verify the chain using this command
# openssl verify -verbose -CAfile <(cat p2.pem p3.pem) p1.pem
# also you can use sfax509 to invoke openssl x509 on all parts of the gid
@@
-54,11
+55,12
@@
from tempfile import mkstemp
import OpenSSL
# M2Crypto is imported on the fly to minimize crashes
import OpenSSL
# M2Crypto is imported on the fly to minimize crashes
-#import M2Crypto
+#
import M2Crypto
from sfa.util.py23 import PY3
from sfa.util.py23 import PY3
-from sfa.util.faults import CertExpired, CertMissingParent, CertNotSignedByParent
+from sfa.util.faults import (CertExpired, CertMissingParent,
+ CertNotSignedByParent)
from sfa.util.sfalogging import logger
# this tends to generate quite some logs for little or no value
from sfa.util.sfalogging import logger
# this tends to generate quite some logs for little or no value
@@
-126,7
+128,7
@@
def convert_public_key(key):
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
raise Exception(
# TODO: for production, cleanup the temporary files
if not os.path.exists(ssl_fn):
raise Exception(
- "
keyconvert:
generated certificate not found. keyconvert may have failed.")
+ "generated certificate not found. keyconvert may have failed.")
k = Keypair()
try:
k = Keypair()
try:
@@
-200,9
+202,11
@@
class Keypair:
import M2Crypto
if glo_passphrase_callback:
self.key = OpenSSL.crypto.load_privatekey(
import M2Crypto
if glo_passphrase_callback:
self.key = OpenSSL.crypto.load_privatekey(
- OpenSSL.crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ OpenSSL.crypto.FILETYPE_PEM, string,
+ functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(
self.m2key = M2Crypto.EVP.load_key_string(
- string, functools.partial(glo_passphrase_callback, self, string))
+ string, functools.partial(glo_passphrase_callback,
+ self, string))
else:
self.key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, string)
else:
self.key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, string)
@@
-260,7
+264,8
@@
class Keypair:
# Return the private key in PEM format.
def as_pem(self):
# Return the private key in PEM format.
def as_pem(self):
- return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.key)
+ return OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, self.key)
##
# Return an M2Crypto key object
##
# Return an M2Crypto key object
@@
-356,7
+361,8
@@
class Certificate:
# @param filename If filename!=None, load the certficiate from the file.
# @param isCA If !=None, set whether this cert is for a CA
# @param filename If filename!=None, load the certficiate from the file.
# @param isCA If !=None, set whether this cert is for a CA
- def __init__(self, lifeDays=1825, create=False, subject=None, string=None, filename=None, isCA=None):
+ def __init__(self, lifeDays=1825, create=False, subject=None, string=None,
+ filename=None, isCA=None):
# these used to be defined in the class !
self.x509 = None
self.issuerKey = None
# these used to be defined in the class !
self.x509 = None
self.issuerKey = None
@@
-374,7
+380,7
@@
class Certificate:
self.load_from_file(filename)
# Set the CA bit if a value was supplied
self.load_from_file(filename)
# Set the CA bit if a value was supplied
- if isCA
!=
None:
+ if isCA
is not
None:
self.set_is_ca(isCA)
# Create a blank X509 certificate and store it in this object.
self.set_is_ca(isCA)
# Create a blank X509 certificate and store it in this object.
@@
-411,7
+417,8
@@
class Certificate:
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
# If it's not in proper PEM format, wrap it
if string.count('-----BEGIN CERTIFICATE') == 0:
- string = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----'\
+ string = '-----BEGIN CERTIFICATE-----'\
+ '\n{}\n-----END CERTIFICATE-----'\
.format(string)
# If there is a PEM cert in there, but there is some other text first
.format(string)
# If there is a PEM cert in there, but there is some other text first
@@
-455,7
+462,8
@@
class Certificate:
##
# Save the certificate to a string.
#
##
# Save the certificate to a string.
#
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_string(self, save_parents=True):
if self.x509 is None:
def save_to_string(self, save_parents=True):
if self.x509 is None:
@@
-471,7
+479,8
@@
class Certificate:
##
# Save the certificate to a file.
##
# Save the certificate to a file.
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_file(self, filename, save_parents=True, filep=None):
string = self.save_to_string(save_parents=save_parents)
def save_to_file(self, filename, save_parents=True, filep=None):
string = self.save_to_string(save_parents=save_parents)
@@
-487,7
+496,8
@@
class Certificate:
##
# Save the certificate to a random file in /tmp/
##
# Save the certificate to a random file in /tmp/
- # @param save_parents If save_parents==True, then also save the parent certificates.
+ # @param save_parents If save_parents==True,
+ # then also save the parent certificates.
def save_to_random_tmp_file(self, save_parents=True):
fp, filename = mkstemp(suffix='cert', text=True)
fp = os.fdopen(fp, "w")
def save_to_random_tmp_file(self, save_parents=True):
fp, filename = mkstemp(suffix='cert', text=True)
fp = os.fdopen(fp, "w")
@@
-498,7
+508,8
@@
class Certificate:
# Sets the issuer private key and name
# @param key Keypair object containing the private key of the issuer
# @param subject String containing the name of the issuer
# Sets the issuer private key and name
# @param key Keypair object containing the private key of the issuer
# @param subject String containing the name of the issuer
- # @param cert (optional) Certificate object containing the name of the issuer
+ # @param cert (optional)
+ # Certificate object containing the name of the issuer
def set_issuer(self, key, subject=None, cert=None):
self.issuerKey = key
def set_issuer(self, key, subject=None, cert=None):
self.issuerKey = key
@@
-616,17
+627,19
@@
class Certificate:
def set_intermediate_ca(self, val):
return self.set_is_ca(val)
def set_intermediate_ca(self, val):
return self.set_is_ca(val)
- # Set whether this cert is for a CA. All signers and only signers should be CAs.
+ # Set whether this cert is for a CA.
+ # All signers and only signers should be CAs.
# The local member starts unset, letting us check that you only set it once
# @param val Boolean indicating whether this cert is for a CA
def set_is_ca(self, val):
if val is None:
return
# The local member starts unset, letting us check that you only set it once
# @param val Boolean indicating whether this cert is for a CA
def set_is_ca(self, val):
if val is None:
return
- if self.isCA
!=
None:
+ if self.isCA
is not
None:
# Can't double set properties
# Can't double set properties
- raise Exception("Cannot set basicConstraints CA:?? more than once. "
- "Was {}, trying to set as {}%s".format(self.isCA, val))
+ raise Exception(
+ "Cannot set basicConstraints CA:?? more than once. "
+ "Was {}, trying to set as {}".format(self.isCA, val))
self.isCA = val
if val:
self.isCA = val
if val:
@@
-635,9
+648,9
@@
class Certificate:
self.add_extension('basicConstraints', 1, 'CA:FALSE')
##
self.add_extension('basicConstraints', 1, 'CA:FALSE')
##
- # Add an X509 extension to the certificate. Add_extension can only
be called
- #
once for a particular extension name, due to limitations in the underlying
- # library.
+ # Add an X509 extension to the certificate. Add_extension can only
+ #
be called once for a particular extension name, due to
+ # li
mitations in the underlying li
brary.
#
# @param name string containing name of extension
# @param value string containing value of the extension
#
# @param name string containing name of extension
# @param value string containing value of the extension
@@
-660,7
+673,8
@@
class Certificate:
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
# FIXME: What if they are trying to set with a different value?
# Is this ever OK? Or should we raise an exception?
# elif oldExtVal:
-# raise "Cannot add extension {} which had val {} with new val {}".format(name, oldExtVal, value)
+# raise "Cannot add extension {} which had val {} with new val {}"\
+# .format(name, oldExtVal, value)
ext = OpenSSL.crypto.X509Extension(name, critical, value)
self.x509.add_extensions([ext])
ext = OpenSSL.crypto.X509Extension(name, critical, value)
self.x509.add_extensions([ext])
@@
-689,17
+703,18
@@
class Certificate:
return value
##
return value
##
- # Set_data is a wrapper around add_extension. It stores the parameter str in
- # the X509 subject_alt_name extension. Set_data can only be called once, due
- # to limitations in the underlying library.
+ # Set_data is a wrapper around add_extension. It stores the
+ # parameter str in the X509 subject_alt_name extension. Set_data
+ # can only be called once, due to limitations in the underlying
+ # library.
- def set_data(self, str, field='subjectAltName'):
+ def set_data(self, str
ing
, field='subjectAltName'):
# pyOpenSSL only allows us to add extensions, so if we try to set the
# same extension more than once, it will not work
if field in self.data:
raise Exception("Cannot set {} more than once".format(field))
# pyOpenSSL only allows us to add extensions, so if we try to set the
# same extension more than once, it will not work
if field in self.data:
raise Exception("Cannot set {} more than once".format(field))
- self.data[field] = str
- self.add_extension(field, 0, str)
+ self.data[field] = str
ing
+ self.add_extension(field, 0, str
ing
)
##
# Return the data string that was previously set with set_data
##
# Return the data string that was previously set with set_data
@@
-722,9
+737,9
@@
class Certificate:
def sign(self):
logger.debug('certificate.sign')
def sign(self):
logger.debug('certificate.sign')
- assert self.x509
!=
None
- assert self.issuerSubject
!=
None
- assert self.issuerKey
!=
None
+ assert self.x509
is not
None
+ assert self.issuerSubject
is not
None
+ assert self.issuerKey
is not
None
self.x509.set_issuer(self.issuerSubject)
self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
self.x509.set_issuer(self.issuerSubject)
self.x509.sign(self.issuerKey.get_openssl_pkey(), self.digest)
@@
-748,7
+763,8
@@
class Certificate:
result = m2result == 1
if debug_verify_chain:
logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
result = m2result == 1
if debug_verify_chain:
logger.debug("Certificate.verify: <- {} (m2={}) ({} x {})"
- .format(result, m2result, self.pretty_cert(), m2pubkey))
+ .format(result, m2result,
+ self.pretty_cert(), m2pubkey))
return result
# XXX alternatively, if openssl has been patched, do the much simpler:
return result
# XXX alternatively, if openssl has been patched, do the much simpler:
@@
-759,7
+775,8
@@
class Certificate:
# return 0
##
# return 0
##
- # Return True if pkey is identical to the public key that is contained in the certificate.
+ # Return True if pkey is identical to the public key that is
+ # contained in the certificate.
# @param pkey Keypair object
def is_pubkey(self, pkey):
# @param pkey Keypair object
def is_pubkey(self, pkey):
@@
-833,29
+850,38
@@
class Certificate:
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
# verify expiration of trusted_cert ?
if not trusted_cert.x509.has_expired():
if debug_verify_chain:
- logger.debug("verify_chain: YES. Cert {} signed by trusted cert {}"
- .format(self.pretty_name(), trusted_cert.pretty_name()))
+ logger.debug("verify_chain: YES."
+ " Cert {} signed by trusted cert {}"
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
return trusted_cert
else:
if debug_verify_chain:
return trusted_cert
else:
if debug_verify_chain:
- logger.debug("verify_chain: NO. Cert {} is signed by trusted_cert {}, "
+ logger.debug("verify_chain: NO. Cert {} "
+ "is signed by trusted_cert {}, "
"but that signer is expired..."
"but that signer is expired..."
- .format(self.pretty_cert(), trusted_cert.pretty_cert()))
+ .format(self.pretty_cert(),
+ trusted_cert.pretty_cert()))
raise CertExpired("{} signer trusted_cert {}"
raise CertExpired("{} signer trusted_cert {}"
- .format(self.pretty_name(), trusted_cert.pretty_name()))
+ .format(self.pretty_name(),
+ trusted_cert.pretty_name()))
else:
else:
- logger.debug("verify_chain: not a direct
descendant of a trusted root".
-
format(self.pretty_name(), trusted_cert)
)
+ logger.debug("verify_chain: not a direct
"
+
" descendant of a trusted root"
)
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
# if there is no parent, then no way to verify the chain
if not self.parent:
if debug_verify_chain:
logger.debug("verify_chain: NO. {} has no parent "
"and issuer {} is not in {} trusted roots"
- .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
- raise CertMissingParent("{}: Issuer {} is not one of the {} trusted roots, "
+ .format(self.pretty_name(), self.get_issuer(),
+ len(trusted_certs)))
+ raise CertMissingParent("{}: Issuer {} is not "
+ "one of the {} trusted roots, "
"and cert has no parent."
"and cert has no parent."
- .format(self.pretty_name(), self.get_issuer(), len(trusted_certs)))
+ .format(self.pretty_name(),
+ self.get_issuer(),
+ len(trusted_certs)))
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
@@
-875,16
+901,19
@@
class Certificate:
# CAs.
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
# CAs.
# Ugly - cert objects aren't parsed so we need to read the
# extension and hope there are no other basicConstraints
- if not self.parent.isCA and not (self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
+ if not self.parent.isCA and not (
+ self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
.format(self.pretty_name(), self.parent.pretty_name()))
raise CertNotSignedByParent("{}: Parent {} not a CA"
logger.warn("verify_chain: cert {}'s parent {} is not a CA"
.format(self.pretty_name(), self.parent.pretty_name()))
raise CertNotSignedByParent("{}: Parent {} not a CA"
- .format(self.pretty_name(), self.parent.pretty_name()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
# if the parent isn't verified...
if debug_verify_chain:
logger.debug("verify_chain: .. {}, -> verifying parent {}"
- .format(self.pretty_name(),self.parent.pretty_name()))
+ .format(self.pretty_name(),
+ self.parent.pretty_name()))
self.parent.verify_chain(trusted_certs)
return
self.parent.verify_chain(trusted_certs)
return