BadRequestHash, ConnectionKeyGIDMismatch, SfaPermissionDenied
from sfa.util.sfalogging import logger
from sfa.util.config import Config
-from sfa.util.xrn import get_authority
+from sfa.util.xrn import get_authority, Xrn
from sfa.trust.gid import GID
from sfa.trust.rights import Rights
from sfa.trust.trustedroots import TrustedRoots
from sfa.trust.hierarchy import Hierarchy
from sfa.trust.sfaticket import SfaTicket
+from sfa.trust.speaksfor_util import determine_speaks_for
class Auth:
self.trusted_cert_list = TrustedRoots(self.config.get_trustedroots_dir()).get_list()
self.trusted_cert_file_list = TrustedRoots(self.config.get_trustedroots_dir()).get_file_list()
-
-
- def checkCredentials(self, creds, operation, hrn = None):
+
+ def checkCredentials(self, creds, operation, hrn = None, options = {}):
+
+ def log_invalid_cred(cred):
+ #cred_obj=Credential(string=cred)
+ #logger.debug("failed to validate credential - dump=%s"%cred_obj.dump_string(dump_parents=True))
+ error = sys.exc_info()[:2]
+ return error
+
valid = []
if not isinstance(creds, list):
creds = [creds]
- logger.debug("Auth.checkCredentials with %d creds"%len(creds))
- for cred in creds:
- try:
- self.check(cred, operation, hrn)
- valid.append(cred)
- except:
- cred_obj=Credential(string=cred)
- logger.debug("failed to validate credential - dump=%s"%cred_obj.dump_string(dump_parents=True))
- error = sys.exc_info()[:2]
- continue
+
+ # if speaks for gid matches caller cert then we've found a valid
+ # speaks for credential
+ speaks_for_gid = determine_speaks_for(logger, creds, self.peer_cert, \
+ options, self.trusted_cert_list)
+ if self.peer_cert and \
+ self.peer_cert.is_pubkey(speaks_for_gid.get_pubkey()):
+ valid = creds
+ else:
+ for cred in creds:
+ try:
+ self.check(cred, operation, hrn)
+ valid.append(cred)
+ except:
+ error = log_invalid_cred(cred)
+
+ if not len(valid):
+ raise InsufficientRights('Access denied: %s -- %s' % (error[0],error[1]))
- if not len(valid):
- raise InsufficientRights('Access denied: %s -- %s' % (error[0],error[1]))
-
return valid
self.client_cred = Credential(string = cred)
self.client_gid = self.client_cred.get_gid_caller()
self.object_gid = self.client_cred.get_gid_object()
+
# make sure the client_gid is not blank
if not self.client_gid:
raise MissingCallerGID(self.client_cred.get_subject())
self.verifyPeerCert(self.peer_cert, self.client_gid)
# make sure the client is allowed to perform the operation
- if operation:
+ if operation:
if not self.client_cred.can_perform(operation):
raise InsufficientRights(operation)
if self.trusted_cert_list:
self.client_cred.verify(self.trusted_cert_file_list, self.config.SFA_CREDENTIAL_SCHEMA)
-
else:
raise MissingTrustedRoots(self.config.get_trustedroots_dir())
# This check does not apply to trusted peers
trusted_peers = [gid.get_hrn() for gid in self.trusted_cert_list]
if hrn and self.client_gid.get_hrn() not in trusted_peers:
-
target_hrn = self.object_gid.get_hrn()
if not hrn == target_hrn:
raise PermissionError("Target hrn: %s doesn't match specified hrn: %s " % \
@param name human readable name to test
"""
object_hrn = self.object_gid.get_hrn()
- #strname = str(name).strip("['']")
- if object_hrn == name:
- #if object_hrn == strname:
- return
- if name.startswith(object_hrn + ".") :
- #if strname.startswith((object_hrn + ".")) is True:
+ if object_hrn == name:
+ return
+ if name.startswith(object_hrn + "."):
return
#if name.startswith(get_authority(name)):
#return
-
+
raise PermissionError(name)
def determine_user_rights(self, caller_hrn, reg_record):