--- /dev/null
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDeWX/s21UpBlK0r1AwV8m+sUM7XAcz/X0SMChEwWaqwR0IQF3M
+MuY583stGIiVJtVcMkGoEnz27xMGqeeILgP7R7GBH28RFJWw8dud7P0H2h0OiIoW
+PTyjcYotNsPxzcoK9wns6gH6tNKu8Sy+dcZ+Zbp8SfYfVSCIBR7+VJGrMQIDAQAB
+AoGAaQTIwSyPOoM8CQ9MxqiQJzs+UAkVdd7idfa87kySm0/+0GjmZI4d4302zoPp
+7tiaqaMrE6E32MHusJULQGzLf8AepxiK/KX04lA7CnBlbAjZAHy0XvUpZLvKGx+j
+hwb6VBR72WLUhYKmcjuUgVxfBlWDxSWV5OVC9x/IS4Vs3gkCQQD7LtGepxTlgbKM
+ZT8/MgfhOAyqtaW9CV98TgAlpjUemXrE7nFgJUbtXkWOe5WEG5cutyS0E3h0+Agi
+BVIZQ0JjAkEA4p0fwTTYkGQIJWfbrnBvpxNozTMJ0fU45iSdVJz+18Tw3oclmqbT
+k7+7B7IYUDo456ItPydsiyxiKEc3CxZGWwJBAO5KK0jEnzlfgmBYiNjOANWzk6i7
+bMFaTACkCtWsyQX/eo2q21nn41K6tWgHa/+JITKLQkGnmwX0a9rCi3E18psCQBcS
+B/mTd7i7dljYifToXXWU9EZvu8A0RuanM505nng5BIyjFaD4+vE/w7q01uTUCJcM
+W67iRJqmmhefqGroiaECQQDseGdWAPeQAbZ4XCHVXKVnWLi3PV69yhPOh4U7RgVZ
+F4pumzfgtwHL6347yH0D7hs3NXBti2x4iNDxRFIIgvEB
+-----END RSA PRIVATE KEY-----
--- /dev/null
+from testRights import *
+from testCert import *
+from testGid import *
+from testCred import *
+from testKeypair import *
+#from testRecord import *
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+import xmlrpclib
+from cert import *
+
+class TestCert(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testCreate(self):
+ cert = Certificate()
+ cert.create()
+
+ def testSetAndGetSubject(self):
+ cert = Certificate()
+ cert.create()
+ cert.set_subject("test")
+ subj = cert.get_subject()
+ self.assertEqual(subj, "test")
+
+ def testSign(self):
+ cert = Certificate(subject="test")
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create=True)
+ issuerSubject = "testissuer"
+ cert.set_issuer(issuerKey, issuerSubject)
+ cert.sign()
+
+ def testAddExtension(self):
+ cert = Certificate(subject="test")
+ cert.add_extension("subjectAltName", 0, "URI:http://foovalue")
+
+ self.assertEqual(cert.get_extension("subjectAltName"),
+ "URI:http://foovalue")
+
+ def testSetData(self):
+ cert = Certificate(subject="test")
+ data = "this is a test"
+ cert.set_data(data)
+ self.assertEqual(cert.get_data(), data)
+
+ # try something a bit more complicated, like an xmlrpc encoding of
+ # some parameters
+ cert = Certificate(subject="test")
+ data = xmlrpclib.dumps((1, "foo", ["a", "b"], {"c": "d", "e": "f"}, True))
+ cert.set_data(data)
+ self.assertEqual(cert.get_data(), data)
+
+
+ def testSaveAndLoadString(self):
+ cert = Certificate(subject="test")
+ cert.add_extension("subjectAltName", 0, "URI:http://foovalue")
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create=True)
+ issuerSubject = "testissuer"
+ cert.set_issuer(issuerKey, issuerSubject)
+ cert.sign()
+
+ certstr = cert.save_to_string()
+
+ #print certstr
+
+ cert2 = Certificate()
+ cert2.load_from_string(certstr)
+
+ # read back the subject and make sure it is correct
+ subj = cert2.get_subject()
+ self.assertEqual(subj, "test")
+
+ # read back the issuer and make sure it is correct
+ issuerName = cert2.get_issuer()
+ self.assertEqual(issuerName, "testissuer")
+
+ # read back the extension and make sure it is correct
+ self.assertEqual(cert2.get_extension("subjectAltName"),
+ "URI:http://foovalue")
+
+ def testLongExtension(self):
+ cert = Certificate(subject="test")
+
+ # should produce something around 256 KB
+ veryLongString = "URI:http://"
+ shortString = ""
+ for i in range(1, 80):
+ shortString = shortString + "abcdefghijklmnopqrstuvwxyz012345"
+ for i in range(1, 100):
+ veryLongString = veryLongString + shortString + str(i)
+
+ cert.add_extension("subjectAltName", 0, veryLongString)
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create=True)
+ issuerSubject = "testissuer"
+ cert.set_issuer(issuerKey, issuerSubject)
+ cert.sign()
+
+ certstr = cert.save_to_string()
+
+ cert2 = Certificate()
+ cert2.load_from_string(certstr)
+ val = cert2.get_extension("subjectAltName")
+ self.assertEqual(val, veryLongString)
+
+ def testVerify(self):
+ cert = Certificate(subject="test")
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create=True)
+ issuerSubject = "testissuer"
+ cert.set_issuer(issuerKey, issuerSubject)
+ cert.sign()
+
+ result = cert.verify(issuerKey)
+ self.assert_(result)
+
+ # create another key
+ issuerKey2 = Keypair(create=True)
+ issuerSubject2 = "wrongissuer"
+
+ # and make sure it doesn't verify
+ result = cert.verify(issuerKey2)
+ self.assert_(not result)
+
+ # load the cert from a string, and verify again
+ cert2 = Certificate(string = cert.save_to_string())
+ result = cert2.verify(issuerKey)
+ self.assert_(result)
+ result = cert2.verify(issuerKey2)
+ self.assert_(not result)
+
+ def test_is_signed_by(self):
+ cert1 = Certificate(subject="one")
+
+ key1 = Keypair()
+ key1.create()
+ cert1.set_pubkey(key1)
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create=True)
+ issuerSubject = "testissuer"
+ cert1.set_issuer(issuerKey, issuerSubject)
+ cert1.sign()
+
+ cert2 = Certificate(subject="two")
+
+ key2 = Keypair(create=True)
+ cert2.set_pubkey(key2)
+
+ cert2.set_issuer(key1, cert=cert1)
+
+ # cert2 is signed by cert1
+ self.assert_(cert2.is_signed_by_cert(cert1))
+ # cert1 is not signed by cert2
+ self.assert_(not cert1.is_signed_by_cert(cert2))
+
+ def test_parents(self):
+ cert_root = Certificate(subject="root")
+ key_root = Keypair(create=True)
+ cert_root.set_pubkey(key_root)
+ cert_root.set_issuer(key_root, "root")
+ cert_root.sign()
+
+ cert1 = Certificate(subject="one")
+ key1 = Keypair(create=True)
+ cert1.set_pubkey(key1)
+ cert1.set_issuer(key_root, "root")
+ cert1.sign()
+
+ cert2 = Certificate(subject="two")
+ key2 = Keypair(create=True)
+ cert2.set_pubkey(key2)
+ cert2.set_issuer(key1, cert=cert1)
+ cert2.set_parent(cert1)
+ cert2.sign()
+
+ cert3 = Certificate(subject="three")
+ key3 = Keypair(create=True)
+ cert3.set_pubkey(key3)
+ cert3.set_issuer(key2, cert=cert2)
+ cert3.set_parent(cert2)
+ cert3.sign()
+
+ self.assert_(cert1.verify(key_root))
+ self.assert_(cert2.is_signed_by_cert(cert1))
+ self.assert_(cert3.is_signed_by_cert(cert2))
+
+ self.assert_(cert3.verify_chain([cert_root]))
+
+ # now save the chain to a string and load it into a new certificate
+ str_chain = cert3.save_to_string(save_parents=True)
+ cert4 = Certificate(string = str_chain)
+
+ # verify the newly loaded chain still verifies
+ self.assert_(cert4.verify_chain([cert_root]))
+
+ # verify the parentage
+ self.assertEqual(cert4.get_parent().get_subject(), "two")
+ self.assertEqual(cert4.get_parent().get_parent().get_subject(), "one")
+
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+from credential import *
+from rights import *
+from gid import *
+
+class TestCred(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testCreate(self):
+ cred = Credential(create=True)
+
+ def testDefaults(self):
+ cred = Credential(subject="testCredential")
+
+ self.assertEqual(cred.get_gid_caller(), None)
+ self.assertEqual(cred.get_gid_object(), None)
+
+ def testLoadSave(self):
+ cred = Credential(subject="testCredential")
+
+ gidCaller = GID(subject="caller", uuid=create_uuid(), hrn="foo.caller")
+ gidObject = GID(subject="object", uuid=create_uuid(), hrn="foo.object")
+ lifeTime = 12345
+ delegate = True
+ rights = "embed,bind"
+
+ cred.set_gid_caller(gidCaller)
+ self.assertEqual(cred.get_gid_caller().get_subject(), gidCaller.get_subject())
+
+ cred.set_gid_object(gidObject)
+ self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject())
+
+ cred.set_lifetime(lifeTime)
+ self.assertEqual(cred.get_lifetime(), lifeTime)
+
+ cred.set_delegate(delegate)
+ self.assertEqual(cred.get_delegate(), delegate)
+
+ cred.set_privileges(rights)
+ self.assertEqual(cred.get_privileges().save_to_string(), rights)
+
+ cred.encode()
+
+ cred_str = cred.save_to_string()
+
+ # re-load the credential from a string and make sure it's fields are
+ # intact
+ cred2 = Credential(string = cred_str)
+ self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject())
+ self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject())
+ self.assertEqual(cred2.get_lifetime(), lifeTime)
+ self.assertEqual(cred2.get_delegate(), delegate)
+ self.assertEqual(cred2.get_privileges().save_to_string(), rights)
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+import xmlrpclib
+from cert import *
+from gid import *
+
+class TestGid(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testSetGetHrn(self):
+ gid = GID(subject="test")
+ hrn = "test.hrn"
+
+ gid.set_hrn(hrn)
+ self.assertEqual(gid.get_hrn(), hrn)
+
+ def testSetGetUuid(self):
+ gid = GID(subject="test")
+ u = create_uuid()
+
+ gid.set_uuid(u)
+ self.assertEqual(gid.get_uuid(), u)
+
+ def testEncodeDecode(self):
+ gid = GID(subject="test")
+ u = str(uuid.uuid4().int)
+ hrn = "test.hrn"
+
+ gid.set_uuid(u)
+ gid.set_hrn(hrn)
+
+ gid.encode()
+ gid.decode()
+
+ self.assertEqual(gid.get_hrn(), hrn)
+ self.assertEqual(gid.get_uuid(), u)
+
+ def testSaveAndLoadString(self):
+ gid = GID(subject="test")
+
+ u = str(uuid.uuid4().int)
+ hrn = "test.hrn"
+
+ gid.set_uuid(u)
+ gid.set_hrn(hrn)
+
+ # create an issuer and sign the certificate
+ issuerKey = Keypair(create = True)
+ issuerSubject = "testissuer"
+ gid.set_issuer(issuerKey, issuerSubject)
+ gid.sign()
+
+ certstr = gid.save_to_string()
+
+ #print certstr
+
+ gid2 = GID(string = certstr)
+
+ self.assertEqual(gid.get_hrn(), hrn)
+ self.assertEqual(gid.get_uuid(), u)
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+import xmlrpclib
+from cert import *
+
+class TestCert(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testCreate(self):
+ k = Keypair()
+ k.create()
+
+ def testSaveLoadFile(self):
+ k = Keypair()
+ k.create()
+
+ k.save_to_file("test.key")
+
+ k2 = Keypair()
+ k2.load_from_file("test.key")
+
+ self.assertEqual(k.as_pem(), k2.as_pem())
+
+ def test_get_m2_pkey(self):
+ k = Keypair()
+ k.create()
+
+ m2 = k.get_m2_pkey()
+ self.assert_(m2 != None)
+
+ def test_get_openssl_pkey(self):
+ k = Keypair()
+ k.create()
+
+ pk = k.get_openssl_pkey()
+ self.assert_(pk != None)
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+import xmlrpclib
+from record import *
+from cert import *
+from gid import *
+
+class TestRecord(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testCreate(self):
+ r = GeniRecord()
+
+class TestTable(unittest.TestCase):
+
+ def setUp(self):
+ self.reg_hrn = "test.table"
+ self.rec_hrn = self.reg_hrn + "." + "record"
+ pass
+
+ def testCreate(self):
+ t = GeniTable(hrn = self.reg_hrn)
+ t.create()
+
+ def testInsert(self):
+ t = GeniTable(hrn = self.reg_hrn)
+
+ k = Keypair(create=True)
+ gid = GID(subject="scott.foo", uuid=create_uuid(), hrn=self.rec_hrn)
+ gid.set_pubkey(k)
+ gid.set_issuer(key=k, subject=self.rec_hrn)
+ gid.encode()
+ gid.sign()
+
+ r = GeniRecord(name=self.rec_hrn, gid=gid.save_to_string(), type="user", pointer=3)
+ t.insert(r)
+
+ def testLookup(self):
+ t = GeniTable(hrn = self.reg_hrn)
+
+ rec_list = t.lookup(self.rec_hrn)
+ self.assertEqual(len(rec_list), 1)
+ r = rec_list[0]
+ self.assertEqual(r.name, self.rec_hrn)
+ self.assertEqual(r.pointer, 3)
+
+ def testUpdate(self):
+ t = GeniTable(hrn = self.reg_hrn)
+
+ rec_list = t.lookup(self.rec_hrn)
+ r = rec_list[0]
+
+ r.set_pointer(4)
+ t.update(r)
+
+ rec_list = t.lookup(self.rec_hrn)
+ self.assertEqual(len(rec_list), 1)
+ r = rec_list[0]
+ self.assertEqual(r.name, self.rec_hrn)
+ self.assertEqual(r.pointer, 4)
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import unittest
+from rights import *
+
+class TestRight(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testRightInit(self):
+ right = Right("embed")
+ self.assertEqual(right.kind, "embed")
+
+ def testRightCanPerform(self):
+ right = Right("embed")
+ self.assert_(right.can_perform("getticket"))
+ self.assert_(not right.can_perform("resolve"))
+
+class TestRightList(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def testInit(self):
+ # create a blank right list
+ rightList = RightList()
+
+ # create a right list with "embed" in it
+ rightList = RightList(string="embed")
+
+ def testAsString(self):
+ rightList = RightList()
+ self.assertEqual(rightList.save_to_string(), "")
+
+ rightList = RightList(string="embed")
+ self.assertEqual(rightList.save_to_string(), "embed")
+
+ rightList = RightList(string="embed,resolve")
+ self.assertEqual(rightList.save_to_string(), "embed,resolve")
+
+ def testCanPerform(self):
+ rightList = RightList(string="embed")
+ self.assert_(rightList.can_perform("getticket"))
+ self.assert_(not rightList.can_perform("resolve"))
+
+ rightList = RightList(string="embed,resolve")
+ self.assert_(rightList.can_perform("getticket"))
+ self.assert_(rightList.can_perform("resolve"))
+
+
+if __name__ == "__main__":
+ unittest.main()