### $URL$
#
-import time
from sfa.trust.credential import Credential
from sfa.trust.trustedroot import TrustedRootList
-from sfa.trust.rights import RightList
from sfa.util.faults import *
from sfa.trust.hierarchy import Hierarchy
from sfa.util.config import *
from sfa.util.namespace import *
-from sfa.trust.gid import GID
from sfa.util.sfaticket import *
+from sfa.util.sfalogging import logger
+
+import sys
class Auth:
"""
if not config:
self.config = Config()
self.load_trusted_certs()
- self.trusted_cert_file_list = TrustedRootList(self.config.get_trustedroots_dir()).get_file_list()
def load_trusted_certs(self):
self.trusted_cert_list = TrustedRootList(self.config.get_trustedroots_dir()).get_list()
+ self.trusted_cert_file_list = TrustedRootList(self.config.get_trustedroots_dir()).get_file_list()
+
+
- def check(self, cred, operation):
+ def checkCredentials(self, creds, operation, hrn = None):
+ valid = []
+ for cred in creds:
+ try:
+ self.check(cred, operation, hrn)
+ valid.append(cred)
+ except:
+ error = sys.exc_info()[:2]
+ continue
+
+ if not len(valid):
+ raise InsufficientRights('Access denied: %s -- %s' % (error[0],error[1]))
+
+ return valid
+
+
+ def check(self, cred, operation, hrn = None):
"""
Check the credential against the peer cert (callerGID included
in the credential matches the caller that is connected to the
self.client_cred.verify(self.trusted_cert_file_list)
else:
raise MissingTrustedRoots(self.config.get_trustedroots_dir())
-
+
+ # Make sure the credential's target matches the specified hrn.
+ # This check does not apply to trusted peers
+ trusted_peers = [gid.get_hrn() for gid in self.trusted_cert_list]
+ if hrn and self.client_gid.get_hrn() not in trusted_peers:
+ if not hrn == self.object_gid.get_hrn():
+ raise PermissionError("Target hrn: %s doesn't match specified hrn: %s " % \
+ (self.object_gid.get_hrn(), hrn) )
return True
def check_ticket(self, ticket):