from M2Crypto import X509
from tempfile import mkstemp
from sfa.util.sfalogging import logger
-
+from sfa.util.namespace import urn_to_hrn
from sfa.util.faults import *
def convert_public_key(key):
def get_openssl_pkey(self):
return self.key
+
##
# Given another Keypair object, return TRUE if the two keys are the same.
# @param string If string!=None, load the certficate from the string.
# @param filename If filename!=None, load the certficiate from the file.
- def __init__(self, create=False, subject=None, string=None, filename=None):
+ def __init__(self, create=False, subject=None, string=None, filename=None, intermediate=None):
self.data = {}
if create or subject:
self.create()
if filename:
self.load_from_file(filename)
+ if intermediate:
+ self.set_intermediate_ca(intermediate)
+ else:
+ self.set_intermediate_ca(False)
+
##
# Create a blank X509 certificate and store it in this object.
def create(self):
self.cert = crypto.X509()
- self.cert.set_serial_number(1)
+ self.cert.set_serial_number(3)
self.cert.gmtime_adj_notBefore(0)
self.cert.gmtime_adj_notAfter(60*60*24*365*5) # five years
pkey.key = self.cert.get_pubkey()
pkey.m2key = m2x509.get_pubkey()
return pkey
+
+ def set_intermediate_ca(self, val):
+ self.intermediate = val
+ if val:
+ self.add_extension('basicConstraints', 1, 'CA:TRUE')
+
+
##
# Add an X509 extension to the certificate. Add_extension can only be called
if self.is_signed_by_cert(trusted_cert):
# make sure sure the trusted cert's hrn is a prefix of the
# signed cert's hrn
- if not self.get_subject().startswith(trusted_cert.get_subject()):
- raise GidParentHrn(trusted_cert.get_subject())
+ trusted_hrn, _ = urn_to_hrn(trusted_cert.get_subject())
+ cur_hrn, _ = urn_to_hrn(self.get_subject())
+ if not cur_hrn.startswith(trusted_hrn):
+ raise GidParentHrn(trusted_cert.get_subject() + " " + self.get_subject())
#print self.get_subject(), "is signed by a root"
return
def verify(self, trusted_certs):
if not self.xml:
self.decode()
-
trusted_cert_objects = [GID(filename=f) for f in trusted_certs]
# Use legacy verification if this is a legacy credential
# Verify the gids of this cred and of its parents
+
+
for cur_cred in self.get_credential_list():
cur_cred.get_gid_object().verify_chain(trusted_cert_objects)
cur_cred.get_gid_caller().verify_chain(trusted_cert_objects)
-
+
refs = []
refs.append("Sig_%s" % self.get_refid())
def testDelegationAndVerification(self):
gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site")
- gidCaller, ckeys = self.createSignedGID("foo", "urn:publicid:IDN+plc:site+user+foo",
+ gidCaller, ckeys = self.createSignedGID("site.foo", "urn:publicid:IDN+plc:site+user+foo",
keys, gidAuthority)
- gidObject, _ = self.createSignedGID("bar_slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
+ gidObject, _ = self.createSignedGID("site.slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
keys, gidAuthority)
- gidDelegatee, _ = self.createSignedGID("delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
+ gidDelegatee, _ = self.createSignedGID("site.delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
keys, gidAuthority)
-
+
cred = Credential()
cred.set_gid_caller(gidCaller)
cred.set_gid_object(gidObject)