import unittest
-from credential import *
-from rights import *
-from gid import *
+from sfa.trust.credential import *
+from sfa.trust.rights import *
+from sfa.trust.gid import *
+from sfa.trust.certificate import *
class TestCred(unittest.TestCase):
def setUp(self):
gidObject = GID(subject="object", uuid=create_uuid(), hrn="foo.object")
lifeTime = 12345
delegate = True
- rights = "embed,bind"
+ rights = "embed:1,bind:1"
cred.set_gid_caller(gidCaller)
self.assertEqual(cred.get_gid_caller().get_subject(), gidCaller.get_subject())
cred.set_gid_object(gidObject)
self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject())
- cred.set_lifetime(lifeTime)
- self.assertEqual(cred.get_lifetime(), lifeTime)
-
- cred.set_delegate(delegate)
- self.assertEqual(cred.get_delegate(), delegate)
-
+ cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=lifeTime))
+
cred.set_privileges(rights)
self.assertEqual(cred.get_privileges().save_to_string(), rights)
+ cred.get_privileges().delegate_all_privileges(delegate)
+
cred.encode()
cred_str = cred.save_to_string()
- # re-load the credential from a string and make sure it's fields are
+ # re-load the credential from a string and make sure its fields are
# intact
cred2 = Credential(string = cred_str)
self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject())
self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject())
- self.assertEqual(cred2.get_lifetime(), lifeTime)
- self.assertEqual(cred2.get_delegate(), delegate)
+ self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate)
self.assertEqual(cred2.get_privileges().save_to_string(), rights)
+
+
+ def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None):
+ gid = GID(subject=subject, uuid=1, urn=urn)
+ keys = Keypair(create=True)
+ gid.set_pubkey(keys)
+ if issuer_pkey:
+ gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer()))
+ else:
+ gid.set_issuer(keys, subject)
+
+ gid.encode()
+ gid.sign()
+ return gid, keys
+
+
+
+
+ def testDelegationAndVerification(self):
+ gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site")
+ gidCaller, ckeys = self.createSignedGID("site.foo", "urn:publicid:IDN+plc:site+user+foo",
+ keys, gidAuthority)
+ gidObject, _ = self.createSignedGID("site.slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
+ keys, gidAuthority)
+ gidDelegatee, _ = self.createSignedGID("site.delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
+ keys, gidAuthority)
+
+ cred = Credential()
+ cred.set_gid_caller(gidCaller)
+ cred.set_gid_object(gidObject)
+ cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600))
+ cred.set_privileges("embed:1, bind:1")
+ cred.encode()
+
+ gidAuthority.save_to_file("/tmp/auth_gid")
+ keys.save_to_file("/tmp/auth_key")
+ cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+ cred.sign()
+
+
+ cred.verify(['/tmp/auth_gid'])
+
+ # Test copying
+ cred2 = Credential(string=cred.save_to_string())
+ cred2.verify(['/tmp/auth_gid'])
+
+
+ # Test delegation
+ delegated = Credential()
+ delegated.set_gid_caller(gidDelegatee)
+ delegated.set_gid_object(gidObject)
+ delegated.set_parent(cred)
+ delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=600))
+ delegated.set_privileges("embed:1, bind:1")
+ gidCaller.save_to_file("/tmp/caller_gid")
+ ckeys.save_to_file("/tmp/caller_pkey")
+
+ delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+
+ delegated.encode()
+
+ delegated.sign()
+
+ # This should verify
+ delegated.verify(['/tmp/auth_gid'])
+
+ backup = Credential(string=delegated.get_xml())
+
+ # Test that verify catches an incorrect lifetime
+ delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=6000))
+ delegated.encode()
+ delegated.sign()
+ try:
+ delegated.verify(['/tmp/auth_gid'])
+ assert(1==0)
+ except CredentialNotVerifiable:
+ pass
+
+ # Test that verify catches an incorrect signer
+ delegated = Credential(string=backup.get_xml())
+ delegated.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+ delegated.encode()
+ delegated.sign()
+
+ try:
+ delegated.verify(['/tmp/auth_gid'])
+ assert(1==0)
+ except CredentialNotVerifiable:
+ pass
+
+
+ # Test that verify catches a changed gid
+ delegated = Credential(string=backup.get_xml())
+ delegated.set_gid_object(delegated.get_gid_caller())
+ delegated.encode()
+ delegated.sign()
+
+ try:
+ delegated.verify(['/tmp/auth_gid'])
+ assert(1==0)
+ except CredentialNotVerifiable:
+ pass
+
+
+ # Test that verify catches a credential with the wrong authority for the object
+ test = Credential(string=cred.get_xml())
+ test.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+ test.encode()
+ test.sign()
+
+ try:
+ test.verify(['/tmp/auth_gid'])
+ assert(1==0)
+ except CredentialNotVerifiable:
+ pass
+
+ # Test that * gets translated properly
+
if __name__ == "__main__":
unittest.main()