X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=blobdiff_plain;f=tests%2FtestCred.py;h=5a563d652dcb0a03b8508d2584279ed6021dd896;hp=68c9b743c7789f902954e631a179847a8b2ba8c7;hb=HEAD;hpb=f13173726f8382eef380f1e754f24dd2b126a77b diff --git a/tests/testCred.py b/tests/testCred.py index 68c9b743..5a563d65 100755 --- a/tests/testCred.py +++ b/tests/testCred.py @@ -2,6 +2,7 @@ import unittest from sfa.trust.credential import * from sfa.trust.rights import * from sfa.trust.gid import * +from sfa.trust.certificate import * class TestCred(unittest.TestCase): def setUp(self): @@ -23,7 +24,7 @@ class TestCred(unittest.TestCase): gidObject = GID(subject="object", uuid=create_uuid(), hrn="foo.object") lifeTime = 12345 delegate = True - rights = "embed,bind" + rights = "embed:1,bind:1" cred.set_gid_caller(gidCaller) self.assertEqual(cred.get_gid_caller().get_subject(), gidCaller.get_subject()) @@ -31,27 +32,142 @@ class TestCred(unittest.TestCase): cred.set_gid_object(gidObject) self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject()) - cred.set_lifetime(lifeTime) - self.assertEqual(cred.get_lifetime(), lifeTime) - - cred.set_delegate(delegate) - self.assertEqual(cred.get_delegate(), delegate) - + cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=lifeTime)) + cred.set_privileges(rights) self.assertEqual(cred.get_privileges().save_to_string(), rights) + cred.get_privileges().delegate_all_privileges(delegate) + cred.encode() cred_str = cred.save_to_string() - # re-load the credential from a string and make sure it's fields are + # re-load the credential from a string and make sure its fields are # intact cred2 = Credential(string = cred_str) self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject()) self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject()) - self.assertEqual(cred2.get_lifetime(), lifeTime) - self.assertEqual(cred2.get_delegate(), delegate) + self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate) self.assertEqual(cred2.get_privileges().save_to_string(), rights) + + + def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None): + gid = GID(subject=subject, uuid=1, urn=urn) + keys = Keypair(create=True) + gid.set_pubkey(keys) + if issuer_pkey: + gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer())) + else: + gid.set_issuer(keys, subject) + + gid.encode() + gid.sign() + return gid, keys + + + + + def testDelegationAndVerification(self): + gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site") + gidCaller, ckeys = self.createSignedGID("site.foo", "urn:publicid:IDN+plc:site+user+foo", + keys, gidAuthority) + gidObject, _ = self.createSignedGID("site.slice", "urn:publicid:IDN+plc:site+slice+bar_slice", + keys, gidAuthority) + gidDelegatee, _ = self.createSignedGID("site.delegatee", "urn:publicid:IDN+plc:site+user+delegatee", + keys, gidAuthority) + + cred = Credential() + cred.set_gid_caller(gidCaller) + cred.set_gid_object(gidObject) + cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600)) + cred.set_privileges("embed:1, bind:1") + cred.encode() + + gidAuthority.save_to_file("/tmp/auth_gid") + keys.save_to_file("/tmp/auth_key") + cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid") + cred.sign() + + + cred.verify(['/tmp/auth_gid']) + + # Test copying + cred2 = Credential(string=cred.save_to_string()) + cred2.verify(['/tmp/auth_gid']) + + + # Test delegation + delegated = Credential() + delegated.set_gid_caller(gidDelegatee) + delegated.set_gid_object(gidObject) + delegated.set_parent(cred) + delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=600)) + delegated.set_privileges("embed:1, bind:1") + gidCaller.save_to_file("/tmp/caller_gid") + ckeys.save_to_file("/tmp/caller_pkey") + + delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid") + + delegated.encode() + + delegated.sign() + + # This should verify + delegated.verify(['/tmp/auth_gid']) + + backup = Credential(string=delegated.get_xml()) + + # Test that verify catches an incorrect lifetime + delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=6000)) + delegated.encode() + delegated.sign() + try: + delegated.verify(['/tmp/auth_gid']) + assert(1==0) + except CredentialNotVerifiable: + pass + + # Test that verify catches an incorrect signer + delegated = Credential(string=backup.get_xml()) + delegated.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid") + delegated.encode() + delegated.sign() + + try: + delegated.verify(['/tmp/auth_gid']) + assert(1==0) + except CredentialNotVerifiable: + pass + + + # Test that verify catches a changed gid + delegated = Credential(string=backup.get_xml()) + delegated.set_gid_object(delegated.get_gid_caller()) + delegated.encode() + delegated.sign() + + try: + delegated.verify(['/tmp/auth_gid']) + assert(1==0) + except CredentialNotVerifiable: + pass + + + # Test that verify catches a credential with the wrong authority for the object + test = Credential(string=cred.get_xml()) + test.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid") + test.encode() + test.sign() + + try: + test.verify(['/tmp/auth_gid']) + assert(1==0) + except CredentialNotVerifiable: + pass + + # Test that * gets translated properly + if __name__ == "__main__": unittest.main()