logger.error ("checkCredentialsSpeaksFor was not passed options=options")
return
# remove the options arg
- options=kwds['options']; del kwds['options']
+ options = kwds['options']; del kwds['options']
# compute the speaking_for_xrn arg and pass it to checkCredentials
- if options is None: speaking_for_xrn=None
- else: speaking_for_xrn=options.get('geni_speaking_for',None)
- kwds['speaking_for_xrn']=speaking_for_xrn
- return self.checkCredentials (*args, **kwds)
+ if options is None: speaking_for_xrn = None
+ else: speaking_for_xrn = options.get('geni_speaking_for', None)
+ kwds['speaking_for_xrn'] = speaking_for_xrn
+ return self.checkCredentials(*args, **kwds)
# do not use mutable as default argument
# http://docs.python-guide.org/en/latest/writing/gotchas/#mutable-default-arguments
def checkCredentials(self, creds, operation, xrns=None,
check_sliver_callback=None,
speaking_for_xrn=None):
- if xrns is None: xrns=[]
+ if xrns is None: xrns = []
+ error = (None, None)
def log_invalid_cred(cred):
if not isinstance (cred, StringTypes):
logger.info("cannot validate credential %s - expecting a string"%cred)
- error="checkCredentials: expected a string, received %s"%(type(cred))
+ error = ('TypeMismatch',
+ "checkCredentials: expected a string, received {} -- {}"
+ .format(type(cred), cred))
else:
- cred_obj=Credential(string=cred)
+ cred_obj = Credential(string=cred)
logger.info("failed to validate credential - dump=%s"%\
cred_obj.dump_string(dump_parents=True))
error = sys.exc_info()[:2]
# won't work if either creds or hrns is empty - let's make it more explicit
if not creds: raise Forbidden("no credential provided")
if not hrns: hrns = [None]
- error=[None,None]
speaks_for_gid = determine_speaks_for(logger, creds, self.peer_cert,
speaking_for_xrn, self.trusted_cert_list)
return valid
-
def check(self, credential, operation, hrn = None):
"""
Check the credential against the peer cert (callerGID) included
# researchers in the slice are in the DB as-is
researcher_hrns = [ user.hrn for user in reg_record.reg_researchers ]
# locating PIs attached to that slice
- slice_pis=reg_record.get_pis()
+ slice_pis = reg_record.get_pis()
pi_hrns = [ user.hrn for user in slice_pis ]
if (caller_hrn in researcher_hrns + pi_hrns):
rl.add('refresh')
def load_from_string(self, string):
if glo_passphrase_callback:
- self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string) )
- self.m2key = M2Crypto.EVP.load_key_string(string, functools.partial(glo_passphrase_callback, self, string) )
+ self.key = crypto.load_privatekey(
+ crypto.FILETYPE_PEM, string, functools.partial(glo_passphrase_callback, self, string))
+ self.m2key = M2Crypto.EVP.load_key_string(
+ string, functools.partial(glo_passphrase_callback, self, string))
else:
self.key = crypto.load_privatekey(crypto.FILETYPE_PEM, string)
self.m2key = M2Crypto.EVP.load_key_string(string)
def get_file_list(self):
file_list = []
- pattern=os.path.join(self.basedir,"*")
+ pattern = os.path.join(self.basedir,"*")
for cert_file in glob.glob(pattern):
if os.path.isfile(cert_file):
if self.has_supported_extension(cert_file):
file_list.append(cert_file)
else:
- logger.warning("File %s ignored - supported extensions are %r"%\
- (cert_file,TrustedRoots.supported_extensions))
+ logger.warning("File {} ignored - supported extensions are {}"
+ .format(cert_file, TrustedRoots.supported_extensions))
return file_list
def has_supported_extension (self,path):