# This module exports two classes: Keypair and Certificate.
##
#
-### $Id$
-### $URL$
-#
import os
import tempfile
from M2Crypto import X509
from sfa.util.sfalogging import sfa_logger
-from sfa.util.namespace import urn_to_hrn
+from sfa.util.xrn import urn_to_hrn
from sfa.util.faults import *
def convert_public_key(key):
try:
k.load_pubkey_from_file(ssl_fn)
except:
- sfa_logger.log_exc("convert_public_key caught exception")
+ sfa_logger().log_exc("convert_public_key caught exception")
k = None
# remove the temporary files
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
+ self.filename=filename
##
# Load the private key from a file. Implicity the private key includes the public key.
def load_from_file(self, filename):
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
+ self.filename=filename
##
# Load the private key from a string. Implicitly the private key includes the public key.
# get the pyopenssl pkey from the pyopenssl x509
self.key = pyx509.get_pubkey()
+ self.filename=filename
##
# Load the public key from a string. No private key is loaded.
def get_openssl_pkey(self):
return self.key
-
##
# Given another Keypair object, return TRUE if the two keys are the same.
def compute_hash(self, value):
return self.sign_string(str(value))
+ # only informative
+ def get_filename(self):
+ return getattr(self,'filename',None)
+
+ def dump (self, *args, **kwargs):
+ print self.dump_string(*args, **kwargs)
+
+ def dump_string (self):
+ result=""
+ result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
+ filename=self.get_filename()
+ if filename: result += "Filename %s\n"%filename
+ return result
+
##
# The certificate class implements a general purpose X509 certificate, making
# use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds
# load it (support for the ---parent--- tag as well as normal chained certs)
string = string.strip()
-
-
- if not string.startswith('-----'):
+
+ # If it's not in proper PEM format, wrap it
+ if string.count('-----BEGIN CERTIFICATE') == 0:
string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string
+ # If there is a PEM cert in there, but there is some other text first
+ # such as the text of the certificate, skip the text
+ beg = string.find('-----BEGIN CERTIFICATE')
+ if beg > 0:
+ # skipping over non cert beginning
+ string = string[beg:]
+
parts = []
if string.count('-----BEGIN CERTIFICATE-----') > 1 and \
file = open(filename)
string = file.read()
self.load_from_string(string)
+ self.filename=filename
##
# Save the certificate to a string.
f = open(filename, 'w')
f.write(string)
f.close()
+ self.filename=filename
##
# Save the certificate to a random file in /tmp/
# Get an X509 extension from the certificate
def get_extension(self, name):
+
# pyOpenSSL does not have a way to get extensions
m2x509 = X509.load_cert_string(self.save_to_string())
value = m2x509.get_ext(name).get_value()
+
return value
##
# Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().
def sign(self):
+ sfa_logger().debug('certificate.sign')
assert self.cert != None
assert self.issuerSubject != None
assert self.issuerKey != None
# verify expiration time
if self.cert.has_expired():
+ sfa_logger().debug("verify_chain: NO our certificate has expired")
raise CertExpired(self.get_subject(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
for trusted_cert in trusted_certs:
if self.is_signed_by_cert(trusted_cert):
- sfa_logger.debug("Cert %s signed by trusted cert %s", self.get_subject(), trusted_cert.get_subject())
# verify expiration of trusted_cert ?
if not trusted_cert.cert.has_expired():
+ sfa_logger().debug("verify_chain: YES cert %s signed by trusted cert %s"%(
+ self.get_subject(), trusted_cert.get_subject()))
return trusted_cert
else:
- sfa_logger.debug("Trusted cert %s is expired", trusted_cert.get_subject())
+ sfa_logger().debug("verify_chain: NO cert %s is signed by trusted_cert %s, but this is expired..."%(
+ self.get_subject(),trusted_cert.get_subject()))
+ raise CertExpired(self.get_subject(),"trusted_cert %s"%trusted_cert.get_subject())
# if there is no parent, then no way to verify the chain
if not self.parent:
- sfa_logger.debug("%r has no parent"%self.get_subject())
+ sfa_logger().debug("verify_chain: NO %s has no parent and is not in trusted roots"%self.get_subject())
raise CertMissingParent(self.get_subject())
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
- sfa_logger.debug("%r is not signed by parent"%self.get_subject())
+ sfa_logger().debug("verify_chain: NO %s is not signed by parent"%self.get_subject())
return CertNotSignedByParent(self.get_subject())
# if the parent isn't verified...
+ sfa_logger().debug("verify_chain: .. %s, -> verifying parent %s"%(self.get_subject(),self.parent.get_subject()))
self.parent.verify_chain(trusted_certs)
return
+
+ ### more introspection
+ def get_extensions(self):
+ # pyOpenSSL does not have a way to get extensions
+ triples=[]
+ m2x509 = X509.load_cert_string(self.save_to_string())
+ nb_extensions=m2x509.get_ext_count()
+ sfa_logger().debug("X509 had %d extensions"%nb_extensions)
+ for i in range(nb_extensions):
+ ext=m2x509.get_ext_at(i)
+ triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
+ return triples
+
+ def get_data_names(self):
+ return self.data.keys()
+
+ def get_all_datas (self):
+ triples=self.get_extensions()
+ for name in self.get_data_names():
+ triples.append( (name,self.get_data(name),'data',) )
+ return triples
+
+ # only informative
+ def get_filename(self):
+ return getattr(self,'filename',None)
+
+ def dump (self, *args, **kwargs):
+ print self.dump_string(*args, **kwargs)
+
+ def dump_string (self,show_extensions=False):
+ result = ""
+ result += "CERTIFICATE for %s\n"%self.get_subject()
+ result += "Issued by %s\n"%self.get_issuer()
+ filename=self.get_filename()
+ if filename: result += "Filename %s\n"%filename
+ if show_extensions:
+ all_datas=self.get_all_datas()
+ result += " has %d extensions/data attached"%len(all_datas)
+ for (n,v,c) in all_datas:
+ if c=='data':
+ result += " data: %s=%s\n"%(n,v)
+ else:
+ result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)
+ return result