a little nicer wrt pep8
[sfa.git] / tests / testCred.py
old mode 100644 (file)
new mode 100755 (executable)
index b1d97b7..5a563d6
@@ -1,7 +1,8 @@
 import unittest
-from credential import *
-from rights import *
-from gid import *
+from sfa.trust.credential import *
+from sfa.trust.rights import *
+from sfa.trust.gid import *
+from sfa.trust.certificate import *
 
 class TestCred(unittest.TestCase):
    def setUp(self):
@@ -23,7 +24,7 @@ class TestCred(unittest.TestCase):
       gidObject = GID(subject="object", uuid=create_uuid(), hrn="foo.object")
       lifeTime = 12345
       delegate = True
-      rights = "embed,bind"
+      rights = "embed:1,bind:1"
 
       cred.set_gid_caller(gidCaller)
       self.assertEqual(cred.get_gid_caller().get_subject(), gidCaller.get_subject())
@@ -31,27 +32,142 @@ class TestCred(unittest.TestCase):
       cred.set_gid_object(gidObject)
       self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject())
 
-      cred.set_lifetime(lifeTime)
-      self.assertEqual(cred.get_lifetime(), lifeTime)
-
-      cred.set_delegate(delegate)
-      self.assertEqual(cred.get_delegate(), delegate)
-
+      cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=lifeTime))
+      
       cred.set_privileges(rights)
       self.assertEqual(cred.get_privileges().save_to_string(), rights)
 
+      cred.get_privileges().delegate_all_privileges(delegate)
+
       cred.encode()
 
       cred_str = cred.save_to_string()
 
-      # re-load the credential from a string and make sure it's fields are
+      # re-load the credential from a string and make sure its fields are
       # intact
       cred2 = Credential(string = cred_str)
       self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject())
       self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject())
-      self.assertEqual(cred2.get_lifetime(), lifeTime)
-      self.assertEqual(cred2.get_delegate(), delegate)
+      self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate)
       self.assertEqual(cred2.get_privileges().save_to_string(), rights)
 
+
+
+   def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None):
+      gid = GID(subject=subject, uuid=1, urn=urn)
+      keys = Keypair(create=True)
+      gid.set_pubkey(keys)
+      if issuer_pkey:
+         gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer()))
+      else:
+         gid.set_issuer(keys, subject)
+
+      gid.encode()
+      gid.sign()
+      return gid, keys
+
+   
+   
+
+   def testDelegationAndVerification(self):
+      gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site")
+      gidCaller, ckeys = self.createSignedGID("site.foo", "urn:publicid:IDN+plc:site+user+foo",
+                                          keys, gidAuthority)
+      gidObject, _ = self.createSignedGID("site.slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
+                                          keys, gidAuthority)
+      gidDelegatee, _ = self.createSignedGID("site.delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
+                                             keys, gidAuthority)
+
+      cred = Credential()
+      cred.set_gid_caller(gidCaller)
+      cred.set_gid_object(gidObject)
+      cred.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600))
+      cred.set_privileges("embed:1, bind:1")
+      cred.encode()
+
+      gidAuthority.save_to_file("/tmp/auth_gid")
+      keys.save_to_file("/tmp/auth_key")
+      cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+      cred.sign()
+
+
+      cred.verify(['/tmp/auth_gid'])
+
+      # Test copying
+      cred2 = Credential(string=cred.save_to_string())
+      cred2.verify(['/tmp/auth_gid'])
+
+
+      # Test delegation
+      delegated = Credential()
+      delegated.set_gid_caller(gidDelegatee)
+      delegated.set_gid_object(gidObject)      
+      delegated.set_parent(cred)
+      delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=600))
+      delegated.set_privileges("embed:1, bind:1")
+      gidCaller.save_to_file("/tmp/caller_gid")
+      ckeys.save_to_file("/tmp/caller_pkey")      
+      
+      delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+
+      delegated.encode()
+
+      delegated.sign()
+      
+      # This should verify
+      delegated.verify(['/tmp/auth_gid'])
+
+      backup = Credential(string=delegated.get_xml())
+
+      # Test that verify catches an incorrect lifetime      
+      delegated.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=6000))
+      delegated.encode()
+      delegated.sign()
+      try:
+         delegated.verify(['/tmp/auth_gid'])
+         assert(1==0)
+      except CredentialNotVerifiable:
+         pass
+
+      # Test that verify catches an incorrect signer
+      delegated = Credential(string=backup.get_xml())
+      delegated.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+      delegated.encode()
+      delegated.sign()
+
+      try:
+         delegated.verify(['/tmp/auth_gid'])
+         assert(1==0)
+      except CredentialNotVerifiable:
+         pass
+
+
+      # Test that verify catches a changed gid
+      delegated = Credential(string=backup.get_xml())
+      delegated.set_gid_object(delegated.get_gid_caller())
+      delegated.encode()
+      delegated.sign()
+
+      try:
+         delegated.verify(['/tmp/auth_gid'])
+         assert(1==0)
+      except CredentialNotVerifiable:
+         pass
+
+
+      # Test that verify catches a credential with the wrong authority for the object
+      test = Credential(string=cred.get_xml())
+      test.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+      test.encode()
+      test.sign()
+
+      try:
+         test.verify(['/tmp/auth_gid'])
+         assert(1==0)
+      except CredentialNotVerifiable:
+         pass      
+      
+      # Test that * gets translated properly
+
 if __name__ == "__main__":
     unittest.main()