From 62d3e5398e93ddd322b7b9f27184b951f6f83a99 Mon Sep 17 00:00:00 2001 From: Josh Karlin Date: Fri, 9 Apr 2010 21:32:37 +0000 Subject: [PATCH] writing testCred.py --- sfa/trust/credential.py | 39 +++++++++----------- sfa/trust/gid.py | 6 ++- sfa/trust/rights.py | 11 +++--- tests/testCred.py | 81 ++++++++++++++++++++++++++++++++++++++--- tests/testGid.py | 6 +-- 5 files changed, 107 insertions(+), 36 deletions(-) diff --git a/sfa/trust/credential.py b/sfa/trust/credential.py index 9b3e9c41..ef5100eb 100644 --- a/sfa/trust/credential.py +++ b/sfa/trust/credential.py @@ -28,7 +28,7 @@ DEFAULT_CREDENTIAL_LIFETIME = 1051200 # TODO: # . make privs match between PG and PL # . Need to test delegation, xml verification - +# . Need to add support for other types of credentials, e.g. tickets signature_template = \ @@ -210,7 +210,6 @@ class Credential(object): def set_parent(self, cred): self.parent = cred self.updateRefID() - ## # set the GID of the caller @@ -264,7 +263,7 @@ class Credential(object): # get the lifetime of the credential (in minutes) def get_lifetime(self): - if not self.lifeTime: + if not self.expiration: self.decode() return self.expiration @@ -422,6 +421,7 @@ class Credential(object): next_cred = next_cred.parent else: next_cred = None + # Find a unique refid for this credential rid = self.get_refid() @@ -443,7 +443,6 @@ class Credential(object): def sign(self): if not self.issuer_privkey or not self.issuer_gid: return - doc = parseString(self.get_xml()) sigs = doc.getElementsByTagName("signatures")[0] @@ -463,8 +462,8 @@ class Credential(object): % (ref, self.issuer_privkey, self.issuer_gid, filename)).read() os.remove(filename) - self.xml = signed + def getTextNode(self, element, subele): sub = element.getElementsByTagName(subele)[0] @@ -479,6 +478,8 @@ class Credential(object): # this class and should not need to be called explicitly. def decode(self): + if not self.xml: + return doc = parseString(self.xml) sigs = None signed_cred = doc.getElementsByTagName("signed-credential") @@ -497,27 +498,21 @@ class Credential(object): self.set_refid(cred.getAttribute("xml:id")) sz_expires = getTextNode(cred, "expires") if sz_expires != '': - self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S') + self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S') self.lifeTime = getTextNode(cred, "expires") self.gidCaller = GID(string=getTextNode(cred, "owner_gid")) - self.gidObject = GID(string=getTextNode(cred, "target_gid")) + self.gidObject = GID(string=getTextNode(cred, "target_gid")) + + + # Process privileges privs = cred.getElementsByTagName("privileges")[0] - sz_privs = '' - delegates = [] + rlist = RightList() for priv in privs.getElementsByTagName("privilege"): - sz_privs += getTextNode(priv, "name") - sz_privs += "," - delegates.append(getTextNode(priv, "can_delegate")) - - # Can we delegate? - delegate = False - if "false" not in delegates: - self.delegate = True - - # Make the rights list - sz_privs.rstrip(", ") - self.privileges = RightList(string=sz_privs) - self.delegate + kind = getTextNode(priv, "name") + deleg = bool(getTextNode(priv, "can_delegate")) + rlist.add(Right(kind.strip(), deleg)) + self.set_privileges(rlist) + # Is there a parent? parent = cred.getElementsByTagName("parent") diff --git a/sfa/trust/gid.py b/sfa/trust/gid.py index da35d178..97c6d8cb 100644 --- a/sfa/trust/gid.py +++ b/sfa/trust/gid.py @@ -65,12 +65,16 @@ class GID(Certificate): self.uuid = int(uuid) if hrn: self.hrn = hrn + self.urn = hrn_to_urn(hrn, 'unknown') if urn: self.urn = urn self.hrn, type = urn_to_hrn(urn) def set_uuid(self, uuid): - self.uuid = uuid + if isinstance(uuid, str): + self.uuid = int(uuid) + else: + self.uuid = uuid def get_uuid(self): if not self.uuid: diff --git a/sfa/trust/rights.py b/sfa/trust/rights.py index 1c08a1ec..4ecabb9d 100644 --- a/sfa/trust/rights.py +++ b/sfa/trust/rights.py @@ -11,6 +11,7 @@ ## + ## # privilege_table is a list of priviliges and what operations are allowed # per privilege. @@ -146,7 +147,7 @@ class RightList: def add(self, right, delegate=False): if isinstance(right, str): - right = Right(kind = right, delegate=delegate) + right = Right(right, delegate) self.rights.append(right) ## @@ -163,10 +164,10 @@ class RightList: for part in parts: if ':' in part: spl = part.split(':') - kind = spl[0] - delegate = int(spl[1]) + kind = spl[0].strip() + delegate = bool(int(spl[1])) else: - kind = part + kind = part.strip() delegate = 0 self.rights.append(Right(kind, bool(delegate))) @@ -177,7 +178,7 @@ class RightList: def save_to_string(self): right_names = [] for right in self.rights: - right_names.append('%s:%d' % (right.kind, right.delegate)) + right_names.append('%s:%d' % (right.kind.strip(), right.delegate)) return ",".join(right_names) diff --git a/tests/testCred.py b/tests/testCred.py index 155543cc..b148bb27 100755 --- a/tests/testCred.py +++ b/tests/testCred.py @@ -2,6 +2,7 @@ import unittest from sfa.trust.credential import * from sfa.trust.rights import * from sfa.trust.gid import * +from sfa.trust.certificate import * class TestCred(unittest.TestCase): def setUp(self): @@ -32,23 +33,93 @@ class TestCred(unittest.TestCase): self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject()) cred.set_lifetime(lifeTime) - self.assertEqual(cred.get_lifetime(), lifeTime) - + cred.set_privileges(rights) self.assertEqual(cred.get_privileges().save_to_string(), rights) + cred.get_privileges().delegate_all_privileges(delegate) + cred.encode() cred_str = cred.save_to_string() - # re-load the credential from a string and make sure it's fields are + # re-load the credential from a string and make sure its fields are # intact cred2 = Credential(string = cred_str) self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject()) self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject()) - self.assertEqual(cred2.get_lifetime(), lifeTime) - self.assertEqual(cred2.get_delegate(), delegate) + self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate) self.assertEqual(cred2.get_privileges().save_to_string(), rights) + + def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None): + gid = GID(subject=subject, uuid=1, urn=urn) + keys = Keypair(create=True) + gid.set_pubkey(keys) + if issuer_pkey: + gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer())) + else: + gid.set_issuer(keys, subject) + + gid.encode() + gid.sign() + return gid, keys + + def testDelegation(self): + gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site") + gidCaller, ckeys = self.createSignedGID("foo", "urn:publicid:IDN+plc:site+user+foo", + keys, gidAuthority) + gidObject, _ = self.createSignedGID("bar_slice", "urn:publicid:IDN+plc:site+slice+bar_slice", + keys, gidAuthority) + gidDelegatee, _ = self.createSignedGID("delegatee", "urn:publicid:IDN+plc:site+user+delegatee", + keys, gidAuthority) + + cred = Credential() + cred.set_gid_caller(gidCaller) + cred.set_gid_object(gidObject) + cred.set_lifetime(3600) + cred.set_privileges("embed:1, bind:1") + cred.encode() + + gidAuthority.save_to_file("/tmp/auth_gid") + keys.save_to_file("/tmp/auth_key") + cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid") + cred.sign() + + cred.verify(['/tmp/auth_gid']) + + # Test copying + cred2 = Credential(string=cred.save_to_string()) + cred2.verify(['/tmp/auth_gid']) + + # Test delegation + delegated = Credential() + delegated.set_gid_caller(gidDelegatee) + delegated.set_gid_object(gidObject) + delegated.set_parent(cred) + delegated.set_lifetime(600) + delegated.set_privileges("embed:1, bind:1") + gidCaller.save_to_file("/tmp/caller_gid") + ckeys.save_to_file("/tmp/caller_pkey") + + delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid") + + delegated.encode() + delegated.sign() + + # This should verify + delegated.verify(['/tmp/auth_gid']) + delegated.save_to_file("/tmp/dcred") + + + # Test that verify catches an incorrect lifetime + delegated.set_lifetime(6000) + + WHY IS THIS CRASHING?? + delegated.encode() + delegated.sign() + delegated.verify(['/tmp/auth_gid']) + + if __name__ == "__main__": unittest.main() diff --git a/tests/testGid.py b/tests/testGid.py index 33406f51..85f92407 100755 --- a/tests/testGid.py +++ b/tests/testGid.py @@ -16,14 +16,14 @@ class TestGid(unittest.TestCase): def testSetGetUuid(self): gid = GID(subject="test") - u = create_uuid() + u = uuid.uuid4().int gid.set_uuid(u) self.assertEqual(gid.get_uuid(), u) def testEncodeDecode(self): gid = GID(subject="test") - u = str(uuid.uuid4().int) + u = uuid.uuid4().int hrn = "test.hrn" gid.set_uuid(u) @@ -38,7 +38,7 @@ class TestGid(unittest.TestCase): def testSaveAndLoadString(self): gid = GID(subject="test") - u = str(uuid.uuid4().int) + u = uuid.uuid4().int hrn = "test.hrn" gid.set_uuid(u) -- 2.47.0