writing testCred.py
authorJosh Karlin <jkarlin@bbn.com>
Fri, 9 Apr 2010 21:32:37 +0000 (21:32 +0000)
committerJosh Karlin <jkarlin@bbn.com>
Fri, 9 Apr 2010 21:32:37 +0000 (21:32 +0000)
sfa/trust/credential.py
sfa/trust/gid.py
sfa/trust/rights.py
tests/testCred.py
tests/testGid.py

index 9b3e9c4..ef5100e 100644 (file)
@@ -28,7 +28,7 @@ DEFAULT_CREDENTIAL_LIFETIME = 1051200
 # TODO:
 # . make privs match between PG and PL
 # . Need to test delegation, xml verification
-
+# . Need to add support for other types of credentials, e.g. tickets
 
 
 signature_template = \
@@ -210,7 +210,6 @@ class Credential(object):
     def set_parent(self, cred):
         self.parent = cred
         self.updateRefID()
-        
 
     ##
     # set the GID of the caller
@@ -264,7 +263,7 @@ class Credential(object):
     # get the lifetime of the credential (in minutes)
 
     def get_lifetime(self):
-        if not self.lifeTime:
+        if not self.expiration:
             self.decode()
         return self.expiration
 
@@ -422,6 +421,7 @@ class Credential(object):
                 next_cred = next_cred.parent
             else:
                 next_cred = None
+
         
         # Find a unique refid for this credential
         rid = self.get_refid()
@@ -443,7 +443,6 @@ class Credential(object):
     def sign(self):
         if not self.issuer_privkey or not self.issuer_gid:
             return
-        
         doc = parseString(self.get_xml())
         sigs = doc.getElementsByTagName("signatures")[0]
 
@@ -463,8 +462,8 @@ class Credential(object):
                  % (ref, self.issuer_privkey, self.issuer_gid, filename)).read()
         os.remove(filename)
 
-
         self.xml = signed
+        
 
     def getTextNode(self, element, subele):
         sub = element.getElementsByTagName(subele)[0]
@@ -479,6 +478,8 @@ class Credential(object):
     # this class and should not need to be called explicitly.
 
     def decode(self):
+        if not self.xml:
+            return
         doc = parseString(self.xml)
         sigs = None
         signed_cred = doc.getElementsByTagName("signed-credential")
@@ -497,27 +498,21 @@ class Credential(object):
         self.set_refid(cred.getAttribute("xml:id"))
         sz_expires = getTextNode(cred, "expires")
         if sz_expires != '':
-            self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S')            
+            self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S')        
         self.lifeTime = getTextNode(cred, "expires")
         self.gidCaller = GID(string=getTextNode(cred, "owner_gid"))
-        self.gidObject = GID(string=getTextNode(cred, "target_gid"))
+        self.gidObject = GID(string=getTextNode(cred, "target_gid"))   
+
+
+        # Process privileges
         privs = cred.getElementsByTagName("privileges")[0]
-        sz_privs = ''
-        delegates = []
+        rlist = RightList()
         for priv in privs.getElementsByTagName("privilege"):
-            sz_privs += getTextNode(priv, "name")
-            sz_privs += ","
-            delegates.append(getTextNode(priv, "can_delegate"))
-
-        # Can we delegate?
-        delegate = False
-        if "false" not in delegates:
-            self.delegate = True
-
-        # Make the rights list
-        sz_privs.rstrip(", ")
-        self.privileges = RightList(string=sz_privs)
-        self.delegate
+            kind = getTextNode(priv, "name")
+            deleg = bool(getTextNode(priv, "can_delegate"))
+            rlist.add(Right(kind.strip(), deleg))
+        self.set_privileges(rlist)
+
 
         # Is there a parent?
         parent = cred.getElementsByTagName("parent")
index da35d17..97c6d8c 100644 (file)
@@ -65,12 +65,16 @@ class GID(Certificate):
             self.uuid = int(uuid)
         if hrn:
             self.hrn = hrn
+            self.urn = hrn_to_urn(hrn, 'unknown')
         if urn:
             self.urn = urn
             self.hrn, type = urn_to_hrn(urn)
 
     def set_uuid(self, uuid):
-        self.uuid = uuid
+        if isinstance(uuid, str):
+            self.uuid = int(uuid)
+        else:
+            self.uuid = uuid
 
     def get_uuid(self):
         if not self.uuid:
index 1c08a1e..4ecabb9 100644 (file)
@@ -11,6 +11,7 @@
 ##
 
 
+
 ##
 # privilege_table is a list of priviliges and what operations are allowed
 # per privilege.
@@ -146,7 +147,7 @@ class RightList:
 
     def add(self, right, delegate=False):
         if isinstance(right, str):
-            right = Right(kind = right, delegate=delegate)
+            right = Right(right, delegate)
         self.rights.append(right)
 
     ##
@@ -163,10 +164,10 @@ class RightList:
         for part in parts:
             if ':' in part:
                 spl = part.split(':')
-                kind = spl[0]
-                delegate = int(spl[1])
+                kind = spl[0].strip()
+                delegate = bool(int(spl[1]))
             else:
-                kind = part
+                kind = part.strip()
                 delegate = 0
             self.rights.append(Right(kind, bool(delegate)))
 
@@ -177,7 +178,7 @@ class RightList:
     def save_to_string(self):        
         right_names = []
         for right in self.rights:
-            right_names.append('%s:%d' % (right.kind, right.delegate))
+            right_names.append('%s:%d' % (right.kind.strip(), right.delegate))
 
         return ",".join(right_names)
 
index 155543c..b148bb2 100755 (executable)
@@ -2,6 +2,7 @@ import unittest
 from sfa.trust.credential import *
 from sfa.trust.rights import *
 from sfa.trust.gid import *
+from sfa.trust.certificate import *
 
 class TestCred(unittest.TestCase):
    def setUp(self):
@@ -32,23 +33,93 @@ class TestCred(unittest.TestCase):
       self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject())
 
       cred.set_lifetime(lifeTime)
-      self.assertEqual(cred.get_lifetime(), lifeTime)
-
+      
       cred.set_privileges(rights)
       self.assertEqual(cred.get_privileges().save_to_string(), rights)
 
+      cred.get_privileges().delegate_all_privileges(delegate)
+
       cred.encode()
 
       cred_str = cred.save_to_string()
 
-      # re-load the credential from a string and make sure it's fields are
+      # re-load the credential from a string and make sure its fields are
       # intact
       cred2 = Credential(string = cred_str)
       self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject())
       self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject())
-      self.assertEqual(cred2.get_lifetime(), lifeTime)
-      self.assertEqual(cred2.get_delegate(), delegate)
+      self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate)
       self.assertEqual(cred2.get_privileges().save_to_string(), rights)
 
+
+   def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None):
+      gid = GID(subject=subject, uuid=1, urn=urn)
+      keys = Keypair(create=True)
+      gid.set_pubkey(keys)
+      if issuer_pkey:
+         gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer()))
+      else:
+         gid.set_issuer(keys, subject)
+
+      gid.encode()
+      gid.sign()
+      return gid, keys
+   
+   def testDelegation(self):
+      gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site")
+      gidCaller, ckeys = self.createSignedGID("foo", "urn:publicid:IDN+plc:site+user+foo",
+                                          keys, gidAuthority)
+      gidObject, _ = self.createSignedGID("bar_slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
+                                          keys, gidAuthority)
+      gidDelegatee, _ = self.createSignedGID("delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
+                                             keys, gidAuthority)
+      
+      cred = Credential()
+      cred.set_gid_caller(gidCaller)
+      cred.set_gid_object(gidObject)
+      cred.set_lifetime(3600)
+      cred.set_privileges("embed:1, bind:1")
+      cred.encode()
+
+      gidAuthority.save_to_file("/tmp/auth_gid")
+      keys.save_to_file("/tmp/auth_key")
+      cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+      cred.sign()
+
+      cred.verify(['/tmp/auth_gid'])
+
+      # Test copying
+      cred2 = Credential(string=cred.save_to_string())
+      cred2.verify(['/tmp/auth_gid'])
+
+      # Test delegation
+      delegated = Credential()
+      delegated.set_gid_caller(gidDelegatee)
+      delegated.set_gid_object(gidObject)      
+      delegated.set_parent(cred)
+      delegated.set_lifetime(600)
+      delegated.set_privileges("embed:1, bind:1")
+      gidCaller.save_to_file("/tmp/caller_gid")
+      ckeys.save_to_file("/tmp/caller_pkey")      
+      
+      delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+
+      delegated.encode()
+      delegated.sign()
+      
+      # This should verify
+      delegated.verify(['/tmp/auth_gid'])
+      delegated.save_to_file("/tmp/dcred")
+
+
+      # Test that verify catches an incorrect lifetime      
+      delegated.set_lifetime(6000)
+
+      WHY IS THIS CRASHING??  
+      delegated.encode()
+      delegated.sign()
+      delegated.verify(['/tmp/auth_gid'])
+      
+
 if __name__ == "__main__":
     unittest.main()
index 33406f5..85f9240 100755 (executable)
@@ -16,14 +16,14 @@ class TestGid(unittest.TestCase):
 
    def testSetGetUuid(self):
       gid = GID(subject="test")
-      u = create_uuid()
+      u = uuid.uuid4().int
 
       gid.set_uuid(u)
       self.assertEqual(gid.get_uuid(), u)
 
    def testEncodeDecode(self):
       gid = GID(subject="test")
-      u = str(uuid.uuid4().int)
+      u = uuid.uuid4().int
       hrn = "test.hrn"
 
       gid.set_uuid(u)
@@ -38,7 +38,7 @@ class TestGid(unittest.TestCase):
    def testSaveAndLoadString(self):
       gid = GID(subject="test")
 
-      u = str(uuid.uuid4().int)
+      u = uuid.uuid4().int
       hrn = "test.hrn"
 
       gid.set_uuid(u)