X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Ftrust%2Fauth.py;h=d217b1c8abc327d5445fb627706ce357461c5260;hb=b921336b2495d77f494f4e106e29dd567d79fea8;hp=41f42803b2123a41cdf09acdb42d6bd0c18d7b4e;hpb=bcddcccb27539888e5ff85da4601c3eeadcfcf4f;p=sfa.git diff --git a/sfa/trust/auth.py b/sfa/trust/auth.py index 41f42803..d217b1c8 100644 --- a/sfa/trust/auth.py +++ b/sfa/trust/auth.py @@ -4,10 +4,11 @@ import sys from sfa.util.faults import InsufficientRights, MissingCallerGID, MissingTrustedRoots, PermissionError, \ - BadRequestHash, ConnectionKeyGIDMismatch, SfaPermissionDenied + BadRequestHash, ConnectionKeyGIDMismatch, SfaPermissionDenied, CredentialNotVerifiable, Forbidden, \ + BadArgs from sfa.util.sfalogging import logger from sfa.util.config import Config -from sfa.util.xrn import get_authority +from sfa.util.xrn import Xrn, get_authority from sfa.trust.gid import GID from sfa.trust.rights import Rights @@ -34,30 +35,56 @@ class Auth: self.trusted_cert_list = TrustedRoots(self.config.get_trustedroots_dir()).get_list() self.trusted_cert_file_list = TrustedRoots(self.config.get_trustedroots_dir()).get_file_list() + def checkCredentials(self, creds, operation, xrns=[], check_sliver_callback=None): + # if xrns are specified they cannot be None or empty string + if xrns: + for xrn in xrns: + if not xrn: + raise BadArgs("Invalid urn or hrn") + - - def checkCredentials(self, creds, operation, hrn = None): + if not isinstance(xrns, list): + xrns = [xrns] + + slice_xrns = Xrn.filter_type(xrns, 'slice') + sliver_xrns = Xrn.filter_type(xrns, 'sliver') + + # we are not able to validate slivers in the traditional way so + # we make sure not to include sliver urns/hrns in the core validation loop + hrns = [Xrn(xrn).hrn for xrn in xrns if xrn not in sliver_xrns] valid = [] if not isinstance(creds, list): creds = [creds] - logger.debug("Auth.checkCredentials with %d creds"%len(creds)) + logger.debug("Auth.checkCredentials with %d creds on hrns=%s"%(len(creds),hrns)) + # won't work if either creds or hrns is empty - let's make it more explicit + if not creds: raise Forbidden("no credential provided") + if not hrns: hrns = [None] for cred in creds: - try: - self.check(cred, operation, hrn) - valid.append(cred) - except: - cred_obj=Credential(string=cred) - logger.debug("failed to validate credential - dump=%s"%cred_obj.dump_string(dump_parents=True)) - error = sys.exc_info()[:2] - continue - + for hrn in hrns: + try: + self.check(cred, operation, hrn) + valid.append(cred) + except: + cred_obj=Credential(cred=cred) + logger.debug("failed to validate credential - dump=%s"%cred_obj.dump_string(dump_parents=True)) + error = sys.exc_info()[:2] + continue + + # make sure all sliver xrns are validated against the valid credentials + if sliver_xrns: + if not check_sliver_callback: + msg = "sliver verification callback method not found." + msg += " Unable to validate sliver xrns: %s" % sliver_xrns + raise Forbidden(msg) + check_sliver_callback(valid, sliver_xrns) + if not len(valid): - raise InsufficientRights('Access denied: %s -- %s' % (error[0],error[1])) + raise Forbidden("Invalid credential") return valid - def check(self, cred, operation, hrn = None): + def check(self, credential, operation, hrn = None): """ Check the credential against the peer cert (callerGID included in the credential matches the caller that is connected to the @@ -65,9 +92,16 @@ class Auth: trusted cert and check if the credential is allowed to perform the specified operation. """ - self.client_cred = Credential(string = cred) + cred = Credential(cred=credential) + self.client_cred = cred + logger.debug("Auth.check: handling hrn=%s and credential=%s"%\ + (hrn,cred.get_summary_tostring())) + + if cred.type not in ['geni_sfa']: + raise CredentialNotVerifiable(cred.type, "%s not supported" % cred.type) self.client_gid = self.client_cred.get_gid_caller() self.object_gid = self.client_cred.get_gid_object() + # make sure the client_gid is not blank if not self.client_gid: raise MissingCallerGID(self.client_cred.get_subject()) @@ -77,13 +111,12 @@ class Auth: self.verifyPeerCert(self.peer_cert, self.client_gid) # make sure the client is allowed to perform the operation - if operation: + if operation: if not self.client_cred.can_perform(operation): raise InsufficientRights(operation) if self.trusted_cert_list: self.client_cred.verify(self.trusted_cert_file_list, self.config.SFA_CREDENTIAL_SCHEMA) - else: raise MissingTrustedRoots(self.config.get_trustedroots_dir()) @@ -91,7 +124,6 @@ class Auth: # This check does not apply to trusted peers trusted_peers = [gid.get_hrn() for gid in self.trusted_cert_list] if hrn and self.client_gid.get_hrn() not in trusted_peers: - target_hrn = self.object_gid.get_hrn() if not hrn == target_hrn: raise PermissionError("Target hrn: %s doesn't match specified hrn: %s " % \ @@ -226,16 +258,13 @@ class Auth: @param name human readable name to test """ object_hrn = self.object_gid.get_hrn() - #strname = str(name).strip("['']") - if object_hrn == name: - #if object_hrn == strname: - return - if name.startswith(object_hrn + ".") : - #if strname.startswith((object_hrn + ".")) is True: + if object_hrn == name: + return + if name.startswith(object_hrn + "."): return #if name.startswith(get_authority(name)): #return - + raise PermissionError(name) def determine_user_rights(self, caller_hrn, reg_record):