# TODO:
# . make privs match between PG and PL
# . Need to test delegation, xml verification
-
+# . Need to add support for other types of credentials, e.g. tickets
signature_template = \
def set_parent(self, cred):
self.parent = cred
self.updateRefID()
-
##
# set the GID of the caller
# get the lifetime of the credential (in minutes)
def get_lifetime(self):
- if not self.lifeTime:
+ if not self.expiration:
self.decode()
return self.expiration
next_cred = next_cred.parent
else:
next_cred = None
+
# Find a unique refid for this credential
rid = self.get_refid()
def sign(self):
if not self.issuer_privkey or not self.issuer_gid:
return
-
doc = parseString(self.get_xml())
sigs = doc.getElementsByTagName("signatures")[0]
% (ref, self.issuer_privkey, self.issuer_gid, filename)).read()
os.remove(filename)
-
self.xml = signed
+
def getTextNode(self, element, subele):
sub = element.getElementsByTagName(subele)[0]
# this class and should not need to be called explicitly.
def decode(self):
+ if not self.xml:
+ return
doc = parseString(self.xml)
sigs = None
signed_cred = doc.getElementsByTagName("signed-credential")
self.set_refid(cred.getAttribute("xml:id"))
sz_expires = getTextNode(cred, "expires")
if sz_expires != '':
- self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S')
+ self.expiration = datetime.datetime.strptime(sz_expires, '%Y-%m-%dT%H:%M:%S')
self.lifeTime = getTextNode(cred, "expires")
self.gidCaller = GID(string=getTextNode(cred, "owner_gid"))
- self.gidObject = GID(string=getTextNode(cred, "target_gid"))
+ self.gidObject = GID(string=getTextNode(cred, "target_gid"))
+
+
+ # Process privileges
privs = cred.getElementsByTagName("privileges")[0]
- sz_privs = ''
- delegates = []
+ rlist = RightList()
for priv in privs.getElementsByTagName("privilege"):
- sz_privs += getTextNode(priv, "name")
- sz_privs += ","
- delegates.append(getTextNode(priv, "can_delegate"))
-
- # Can we delegate?
- delegate = False
- if "false" not in delegates:
- self.delegate = True
-
- # Make the rights list
- sz_privs.rstrip(", ")
- self.privileges = RightList(string=sz_privs)
- self.delegate
+ kind = getTextNode(priv, "name")
+ deleg = bool(getTextNode(priv, "can_delegate"))
+ rlist.add(Right(kind.strip(), deleg))
+ self.set_privileges(rlist)
+
# Is there a parent?
parent = cred.getElementsByTagName("parent")
self.uuid = int(uuid)
if hrn:
self.hrn = hrn
+ self.urn = hrn_to_urn(hrn, 'unknown')
if urn:
self.urn = urn
self.hrn, type = urn_to_hrn(urn)
def set_uuid(self, uuid):
- self.uuid = uuid
+ if isinstance(uuid, str):
+ self.uuid = int(uuid)
+ else:
+ self.uuid = uuid
def get_uuid(self):
if not self.uuid:
##
+
##
# privilege_table is a list of priviliges and what operations are allowed
# per privilege.
def add(self, right, delegate=False):
if isinstance(right, str):
- right = Right(kind = right, delegate=delegate)
+ right = Right(right, delegate)
self.rights.append(right)
##
for part in parts:
if ':' in part:
spl = part.split(':')
- kind = spl[0]
- delegate = int(spl[1])
+ kind = spl[0].strip()
+ delegate = bool(int(spl[1]))
else:
- kind = part
+ kind = part.strip()
delegate = 0
self.rights.append(Right(kind, bool(delegate)))
def save_to_string(self):
right_names = []
for right in self.rights:
- right_names.append('%s:%d' % (right.kind, right.delegate))
+ right_names.append('%s:%d' % (right.kind.strip(), right.delegate))
return ",".join(right_names)
from sfa.trust.credential import *
from sfa.trust.rights import *
from sfa.trust.gid import *
+from sfa.trust.certificate import *
class TestCred(unittest.TestCase):
def setUp(self):
self.assertEqual(cred.get_gid_object().get_subject(), gidObject.get_subject())
cred.set_lifetime(lifeTime)
- self.assertEqual(cred.get_lifetime(), lifeTime)
-
+
cred.set_privileges(rights)
self.assertEqual(cred.get_privileges().save_to_string(), rights)
+ cred.get_privileges().delegate_all_privileges(delegate)
+
cred.encode()
cred_str = cred.save_to_string()
- # re-load the credential from a string and make sure it's fields are
+ # re-load the credential from a string and make sure its fields are
# intact
cred2 = Credential(string = cred_str)
self.assertEqual(cred2.get_gid_caller().get_subject(), gidCaller.get_subject())
self.assertEqual(cred2.get_gid_object().get_subject(), gidObject.get_subject())
- self.assertEqual(cred2.get_lifetime(), lifeTime)
- self.assertEqual(cred2.get_delegate(), delegate)
+ self.assertEqual(cred2.get_privileges().get_all_delegate(), delegate)
self.assertEqual(cred2.get_privileges().save_to_string(), rights)
+
+ def createSignedGID(self, subject, urn, issuer_pkey = None, issuer_gid = None):
+ gid = GID(subject=subject, uuid=1, urn=urn)
+ keys = Keypair(create=True)
+ gid.set_pubkey(keys)
+ if issuer_pkey:
+ gid.set_issuer(issuer_pkey, str(issuer_gid.get_issuer()))
+ else:
+ gid.set_issuer(keys, subject)
+
+ gid.encode()
+ gid.sign()
+ return gid, keys
+
+ def testDelegation(self):
+ gidAuthority, keys = self.createSignedGID("site", "urn:publicid:IDN+plc+authority+site")
+ gidCaller, ckeys = self.createSignedGID("foo", "urn:publicid:IDN+plc:site+user+foo",
+ keys, gidAuthority)
+ gidObject, _ = self.createSignedGID("bar_slice", "urn:publicid:IDN+plc:site+slice+bar_slice",
+ keys, gidAuthority)
+ gidDelegatee, _ = self.createSignedGID("delegatee", "urn:publicid:IDN+plc:site+user+delegatee",
+ keys, gidAuthority)
+
+ cred = Credential()
+ cred.set_gid_caller(gidCaller)
+ cred.set_gid_object(gidObject)
+ cred.set_lifetime(3600)
+ cred.set_privileges("embed:1, bind:1")
+ cred.encode()
+
+ gidAuthority.save_to_file("/tmp/auth_gid")
+ keys.save_to_file("/tmp/auth_key")
+ cred.set_issuer_keys("/tmp/auth_key", "/tmp/auth_gid")
+ cred.sign()
+
+ cred.verify(['/tmp/auth_gid'])
+
+ # Test copying
+ cred2 = Credential(string=cred.save_to_string())
+ cred2.verify(['/tmp/auth_gid'])
+
+ # Test delegation
+ delegated = Credential()
+ delegated.set_gid_caller(gidDelegatee)
+ delegated.set_gid_object(gidObject)
+ delegated.set_parent(cred)
+ delegated.set_lifetime(600)
+ delegated.set_privileges("embed:1, bind:1")
+ gidCaller.save_to_file("/tmp/caller_gid")
+ ckeys.save_to_file("/tmp/caller_pkey")
+
+ delegated.set_issuer_keys("/tmp/caller_pkey", "/tmp/caller_gid")
+
+ delegated.encode()
+ delegated.sign()
+
+ # This should verify
+ delegated.verify(['/tmp/auth_gid'])
+ delegated.save_to_file("/tmp/dcred")
+
+
+ # Test that verify catches an incorrect lifetime
+ delegated.set_lifetime(6000)
+
+ WHY IS THIS CRASHING??
+ delegated.encode()
+ delegated.sign()
+ delegated.verify(['/tmp/auth_gid'])
+
+
if __name__ == "__main__":
unittest.main()
def testSetGetUuid(self):
gid = GID(subject="test")
- u = create_uuid()
+ u = uuid.uuid4().int
gid.set_uuid(u)
self.assertEqual(gid.get_uuid(), u)
def testEncodeDecode(self):
gid = GID(subject="test")
- u = str(uuid.uuid4().int)
+ u = uuid.uuid4().int
hrn = "test.hrn"
gid.set_uuid(u)
def testSaveAndLoadString(self):
gid = GID(subject="test")
- u = str(uuid.uuid4().int)
+ u = uuid.uuid4().int
hrn = "test.hrn"
gid.set_uuid(u)