#
-from __future__ import print_function
+
import functools
import os
from sfa.util.sfalogging import logger
# this tends to generate quite some logs for little or no value
-debug_verify_chain = False
+debug_verify_chain = True
glo_passphrase_callback = None
# @param filename name of file to store the keypair in
def save_to_file(self, filename):
- open(filename, 'w').write(self.as_pem())
+ with open(filename, 'w') as output:
+ output.write(self.as_pem())
self.filename = filename
##
OpenSSL.crypto.FILETYPE_PEM, string,
functools.partial(glo_passphrase_callback, self, string))
self.m2key = M2Crypto.EVP.load_key_string(
- string, functools.partial(glo_passphrase_callback,
- self, string))
+ string.encode(encoding="utf-8"),
+ functools.partial(glo_passphrase_callback, self, string))
else:
self.key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, string)
- self.m2key = M2Crypto.EVP.load_key_string(string)
+ self.m2key = M2Crypto.EVP.load_key_string(
+ string.encode(encoding="utf-8"))
##
# Load the public key from a string. No private key is loaded.
def get_m2_pubkey(self):
import M2Crypto
if not self.m2key:
- self.m2key = M2Crypto.EVP.load_key_string(self.as_pem())
+ self.m2key = M2Crypto.EVP.load_key_string(
+ self.as_pem().encode(encoding="utf-8"))
return self.m2key
##
# certs)
if string is None or string.strip() == "":
- logger.warn("Empty string in load_from_string")
+ logger.warning("Empty string in load_from_string")
return
string = string.strip()
OpenSSL.crypto.FILETYPE_PEM, parts[0])
if self.x509 is None:
- logger.warn(
+ logger.warning(
"Loaded from string but cert is None: {}".format(string))
# if there are more certs, then create a parent and let the parent load
def save_to_string(self, save_parents=True):
if self.x509 is None:
- logger.warn("None cert in certificate.save_to_string")
+ logger.warning("None cert in certificate.save_to_string")
return ""
string = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, self.x509)
req = OpenSSL.crypto.X509Req()
reqSubject = req.get_subject()
if isinstance(subject, dict):
- for key in reqSubject.keys():
+ for key in list(reqSubject.keys()):
setattr(reqSubject, key, subject[key])
else:
setattr(reqSubject, "CN", subject)
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
if isinstance(name, dict):
- for key in name.keys():
+ for key in list(name.keys()):
setattr(subj, key, name[key])
else:
setattr(subj, "CN", name)
data = self.get_data(field='subjectAltName')
if data:
message += " SubjectAltName:"
- counter = 0
filtered = [self.filter_chunk(chunk) for chunk in data.split()]
message += " ".join([f for f in filtered if f])
omitted = len([f for f in filtered if not f])
# pyOpenSSL does not have a way to get extensions
m2x509 = M2Crypto.X509.load_cert_string(certstr)
if m2x509 is None:
- logger.warn("No cert loaded in get_extension")
+ logger.warning("No cert loaded in get_extension")
return None
if m2x509.get_ext(name) is None:
return None
if field in self.data:
raise Exception("Cannot set {} more than once".format(field))
self.data[field] = string
- self.add_extension(field, 0, string)
+ # call str() because we've seen unicode there
+ # and the underlying C code doesn't like it
+ self.add_extension(field, 0, str(string))
##
# Return the data string that was previously set with set_data
# @param cert certificate object
def is_signed_by_cert(self, cert):
- logger.debug("Certificate.is_signed_by_cert -> invoking verify")
- k = cert.get_pubkey()
- result = self.verify(k)
+ key = cert.get_pubkey()
+ logger.debug("Certificate.is_signed_by_cert -> verify on {}\n"
+ "with pubkey {}"
+ .format(self, key))
+ result = self.verify(key)
return result
##
# the public key contained in it's parent. The chain is recursed
# until a certificate is found that is signed by a trusted root.
- logger.debug("Certificate.verify_chain {}".format(self.pretty_name()))
# verify expiration time
if self.x509.has_expired():
if debug_verify_chain:
# if this cert is signed by a trusted_cert, then we are set
for i, trusted_cert in enumerate(trusted_certs, 1):
- logger.debug("Certificate.verify_chain - trying trusted #{} : {}"
+ logger.debug(5*'-' +
+ " Certificate.verify_chain - trying trusted #{} : {}"
.format(i, trusted_cert.pretty_name()))
if self.is_signed_by_cert(trusted_cert):
# verify expiration of trusted_cert ?
trusted_cert.pretty_name()))
else:
logger.debug("verify_chain: not a direct"
- " descendant of a trusted root")
+ " descendant of trusted root #{}".format(i))
# if there is no parent, then no way to verify the chain
if not self.parent:
# extension and hope there are no other basicConstraints
if not self.parent.isCA and not (
self.parent.get_extension('basicConstraints') == 'CA:TRUE'):
- logger.warn("verify_chain: cert {}'s parent {} is not a CA"
- .format(self.pretty_name(), self.parent.pretty_name()))
+ logger.warning("verify_chain: cert {}'s parent {} is not a CA"
+ .format(self.pretty_name(), self.parent.pretty_name()))
raise CertNotSignedByParent("{}: Parent {} not a CA"
.format(self.pretty_name(),
self.parent.pretty_name()))
return triples
def get_data_names(self):
- return self.data.keys()
+ return list(self.data.keys())
def get_all_datas(self):
triples = self.get_extensions()