##
# Implements SFA Credentials
#
# Credentials are layered on top of certificates, and are essentially a
# certificate that stores a tuple of parameters.
##
### $Id$
### $URL$
import xmlrpclib
import random
import os
import xml.dom.minidom
from xml.dom.minidom import Document
from sfa.trust.credential_legacy import CredentialLegacy
from sfa.trust.certificate import Certificate
from sfa.trust.rights import *
from sfa.trust.gid import *
from sfa.util.faults import *
from sfa.util.sfalogging import *
# TODO:
# . Need to verify credentials
# . Need to add privileges (make PG and PL privs work together and add delegation per privelege instead of global)
# . Need to fix lifetime
# . Need to make sure delegation is fully supported
# . Need to test
signature_template = \
'''
'''
##
# Credential is a tuple:
# (GIDCaller, GIDObject, LifeTime, Privileges, DelegateBit)
#
# These fields are encoded in one of two ways. The legacy style places
# it in the subjectAltName of an X509 certificate. The new credentials
# are placed in signed XML.
class Credential(object):
gidCaller = None
gidObject = None
lifeTime = None
privileges = None
delegate = False
issuer_privkey = None
issuer_gid = None
issuer_pubkey = None
parent = None
xml = None
##
# Create a Credential object
#
# @param create If true, create a blank x509 certificate
# @param subject If subject!=None, create an x509 cert with the subject name
# @param string If string!=None, load the credential from the string
# @param filename If filename!=None, load the credential from the file
def __init__(self, create=False, subject=None, string=None, filename=None):
# Check if this is a legacy credential, translate it if so
if string or filename:
if string:
str = string
elif filename:
str = file(filename).read()
if str.strip().startswith("-----"):
self.translate_legacy(str)
else:
self.xml = str
# Let's not mess around with invalid credentials
self.verify_chain()
##
# Translate a legacy credential into a new one
#
# @param String of the legacy credential
def translate_legacy(self, str):
legacy = CredentialLegacy(False,string=str)
self.gidCaller = legacy.get_gid_caller()
self.gidObject = legacy.get_gid_object()
self.lifeTime = legacy.get_lifetime()
self.privileges = legacy.get_privileges()
self.delegate = legacy.get_delegate()
##
# Need the issuer's private key and name
# @param key Keypair object containing the private key of the issuer
# @param gid GID of the issuing authority
def set_issuer_keys(self, privkey, gid):
self.issuer_privkey = privkey
self.issuer_gid = gid
def set_pubkey(self, pubkey):
self.issuer_pubkey = pubkey
def set_parent(self, cred):
self.parent = cred
##
# set the GID of the caller
#
# @param gid GID object of the caller
def set_gid_caller(self, gid):
self.gidCaller = gid
# gid origin caller is the caller's gid by default
self.gidOriginCaller = gid
##
# get the GID of the object
def get_gid_caller(self):
if not self.gidCaller:
self.decode()
return self.gidCaller
##
# set the GID of the object
#
# @param gid GID object of the object
def set_gid_object(self, gid):
self.gidObject = gid
##
# get the GID of the object
def get_gid_object(self):
if not self.gidObject:
self.decode()
return self.gidObject
##
# set the lifetime of this credential
#
# @param lifetime lifetime of credential
def set_lifetime(self, lifeTime):
self.lifeTime = lifeTime
##
# get the lifetime of the credential
def get_lifetime(self):
if not self.lifeTime:
self.decode()
return self.lifeTime
##
# set the delegate bit
#
# @param delegate boolean (True or False)
def set_delegate(self, delegate):
self.delegate = delegate
##
# get the delegate bit
def get_delegate(self):
if not self.delegate:
self.decode()
return self.delegate
##
# set the privileges
#
# @param privs either a comma-separated list of privileges of a RightList object
def set_privileges(self, privs):
if isinstance(privs, str):
self.privileges = RightList(string = privs)
else:
self.privileges = privs
##
# return the privileges as a RightList object
def get_privileges(self):
if not self.privileges:
self.decode()
return self.privileges
##
# determine whether the credential allows a particular operation to be
# performed
#
# @param op_name string specifying name of operation ("lookup", "update", etc)
def can_perform(self, op_name):
rights = self.get_privileges()
if not rights:
return False
return rights.can_perform(op_name)
def append_sub(self, doc, parent, element, text):
ele = doc.createElement(element)
ele.appendChild(doc.createTextNode(text))
parent.appendChild(ele)
##
# Encode the attributes of the credential into an XML string
# This should be done immediately before signing the credential.
def encode(self):
# Get information from the parent credential
if self.parent:
p_doc = xml.dom.minidom.parseString(self.parent)
p_signed_cred = p_doc.getElementsByTagName("signed-credential")[0]
p_cred = p_signed_cred.getElementsByTagName("credential")[0]
p_signatures = p_signed_cred.getElementsByTagName("signatures")[0]
p_sigs = p_signatures.getElementsByTagName("Signature")
# Create the XML document
doc = Document()
signed_cred = doc.createElement("signed-credential")
doc.appendChild(signed_cred)
# Fill in the bit
refid = "ref1"
cred = doc.createElement("credential")
cred.setAttribute("xml:id", refid)
signed_cred.appendChild(cred)
self.append_sub(doc, cred, "type", "privilege")
self.append_sub(doc, cred, "serial", "8")
self.append_sub(doc, cred, "owner_gid", self.gidCaller.save_to_string())
self.append_sub(doc, cred, "owner_urn", self.gidCaller.get_urn())
self.append_sub(doc, cred, "target_gid", self.gidObject.save_to_string())
self.append_sub(doc, cred, "target_urn", self.gidObject.get_urn())
self.append_sub(doc, cred, "uuid", "")
self.append_sub(doc, cred, "expires", str(self.lifeTime))
priveleges = doc.createElement("privileges")
cred.appendChild(priveleges)
if self.privileges:
rights = self.privileges.save_to_string().split(",")
for right in rights:
priv = doc.createElement("privelege")
priv.append_sub(doc, priv, "name", right.strip())
priv.append_sub(doc, priv, "can_delegate", str(self.delegate))
priveleges.appendChild(priv)
# Add the parent credential if it exists
if self.parent:
cred.appendChild(doc.createElement("parent").appendChild(p_cred))
signed_cred.appendChild(cred)
# Fill in the bit
signatures = doc.createElement("signatures")
sz_sig = signature_template % (refid,refid)
sdoc = xml.dom.minidom.parseString(sz_sig)
sig_ele = doc.importNode(sdoc.getElementsByTagName("Signature")[0], True)
signatures.appendChild(sig_ele)
# Add any parent signatures
if self.parent:
for sig in p_sigs:
signatures.appendChild(sig)
signed_cred.appendChild(signatures)
# Get the finished product
self.xml = doc.toxml()
#print doc.toprettyxml()
def save_to_string(self):
if not self.xml:
self.encode()
return self.xml
def sign(self):
if not self.xml:
self.encode()
# Call out to xmlsec1 to sign it
XMLSEC = '/usr/bin/xmlsec1'
filename = "/tmp/cred_%d" % random.randint(0,999999999)
f = open(filename, "w")
f.write(self.xml);
f.close()
signed = os.popen('/usr/bin/xmlsec1 --sign --node-id "%s" --privkey-pem %s,%s %s' \
% ('ref1', self.issuer_privkey, self.issuer_gid, filename)).read()
os.remove(filename)
self.xml = signed
def getTextNode(self, element, subele):
sub = element.getElementsByTagName(subele)[0]
return sub.childNodes[0].nodeValue
##
# Retrieve the attributes of the credential from the XML.
# This is automatically caleld by the various get_* methods of
# this class and should not need to be called explicitly.
def decode(self):
p_doc = xml.dom.minidom.parseString(self.xml)
p_signed_cred = p_doc.getElementsByTagName("signed-credential")[0]
p_cred = p_signed_cred.getElementsByTagName("credential")[0]
p_signatures = p_signed_cred.getElementsByTagName("signatures")[0]
p_sigs = p_signatures.getElementsByTagName("Signature")
self.lifeTime = self.getTextNode(p_cred, "expires")
self.gidCaller = GID(string=self.getTextNode(p_cred, "owner_gid"))
self.gidObject = GID(string=self.getTextNode(p_cred, "target_gid"))
privs = p_cred.getElementsByTagName("priveleges")[0]
sz_privs = ''
delegates = []
for priv in privs.getElementsByTagName("privelege"):
sz_privs += self.getTextNode(priv, "name")
sz_privs += ", "
delegates.append(self.getTextNode(priv, "can_delegate"))
# Can we delegate?
delegate = False
if "false" not in delegates:
self.delegate = True
# Make the rights list
sz_privs.rstrip(", ")
self.priveleges = RightList(string=sz_privs)
self.delegate
## ##
## # Retrieve the attributes of the credential from the alt-subject-name field
## # of the X509 certificate. This is automatically done by the various
## # get_* methods of this class and should not need to be called explicitly.
## def decode(self):
## data = self.get_data().lstrip('URI:http://')
## if data:
## dict = xmlrpclib.loads(data)[0][0]
## else:
## dict = {}
## self.lifeTime = dict.get("lifeTime", None)
## self.delegate = dict.get("delegate", None)
## privStr = dict.get("privileges", None)
## if privStr:
## self.privileges = RightList(string = privStr)
## else:
## self.privileges = None
## gidCallerStr = dict.get("gidCaller", None)
## if gidCallerStr:
## self.gidCaller = GID(string=gidCallerStr)
## else:
## self.gidCaller = None
## gidObjectStr = dict.get("gidObject", None)
## if gidObjectStr:
## self.gidObject = GID(string=gidObjectStr)
## else:
## self.gidObject = None
##
# Verify for the initial credential:
# 1. That the signature is valid
# 2. That the xml signer's certificate matches the object's certificate
# 3. That the urns match those in the gids
#
# Verify for the delegated credentials:
# 1. That the signature is valid
# 4.
# 3. That the object's certificate stays the s
# 2. That the GID of the
#def verify(self, trusted_certs = None):
##
# Verify that a chain of credentials is valid (see cert.py:verify). In
# addition to the checks for ordinary certificates, verification also
# ensures that the delegate bit was set by each parent in the chain. If
# a delegate bit was not set, then an exception is thrown.
#
# Each credential must be a subset of the rights of the parent.
def verify_chain(self, trusted_certs = None):
# do the normal certificate verification stuff
Certificate.verify_chain(self, trusted_certs)
if self.parent:
# make sure the parent delegated rights to the child
if not self.parent.get_delegate():
raise MissingDelegateBit(self.parent.get_subject())
# make sure the rights given to the child are a subset of the
# parents rights
if not self.parent.get_privileges().is_superset(self.get_privileges()):
raise ChildRightsNotSubsetOfParent(self.get_subject()
+ " " + self.parent.get_privileges().save_to_string()
+ " " + self.get_privileges().save_to_string())
return
##
# Dump the contents of a credential to stdout in human-readable format
#
# @param dump_parents If true, also dump the parent certificates
def dump(self, dump_parents=False):
print "CREDENTIAL", self.get_subject()
print " privs:", self.get_privileges().save_to_string()
print " gidCaller:"
gidCaller = self.get_gid_caller()
if gidCaller:
gidCaller.dump(8, dump_parents)
print " gidObject:"
gidObject = self.get_gid_object()
if gidObject:
gidObject.dump(8, dump_parents)
print " delegate:", self.get_delegate()
if self.parent and dump_parents:
print "PARENT",
self.parent.dump(dump_parents)