From 033dd3ea973779b4ea5e5510a6eec9f9a3a44fa3 Mon Sep 17 00:00:00 2001 From: Thierry Parmentelat Date: Fri, 1 Apr 2016 10:49:59 +0200 Subject: [PATCH] purely cosmetic pass on sfaclientlib --- sfa/client/sfaclientlib.py | 297 +++++++++++++++++++------------------ 1 file changed, 150 insertions(+), 147 deletions(-) diff --git a/sfa/client/sfaclientlib.py b/sfa/client/sfaclientlib.py index 7736e52a..ebadb013 100644 --- a/sfa/client/sfaclientlib.py +++ b/sfa/client/sfaclientlib.py @@ -10,7 +10,7 @@ from __future__ import print_function # certificates and automatically retrieve fresh ones when expired import sys -import os,os.path +import os, os.path import subprocess from datetime import datetime from sfa.util.xrn import Xrn @@ -102,90 +102,91 @@ from sfa.trust.gid import GID # #################### -class SfaClientException (Exception): pass +class SfaClientException(Exception): pass class SfaClientBootstrap: # dir is mandatory but defaults to '.' - def __init__ (self, user_hrn, registry_url, dir=None, - verbose=False, timeout=None, logger=None): - self.hrn=user_hrn - self.registry_url=registry_url - if dir is None: dir="." - self.dir=dir - self.verbose=verbose - self.timeout=timeout + def __init__(self, user_hrn, registry_url, dir=None, + verbose=False, timeout=None, logger=None): + self.hrn = user_hrn + self.registry_url = registry_url + if dir is None: + dir="." + self.dir = dir + self.verbose = verbose + self.timeout = timeout # default for the logger is to use the global sfa logger if logger is None: logger = sfa.util.sfalogging.logger - self.logger=logger + self.logger = logger ######################################## *_produce methods ### step1 # unconditionnally create a self-signed certificate - def self_signed_cert_produce (self, output): + def self_signed_cert_produce(self, output): self.assert_private_key() private_key_filename = self.private_key_filename() - keypair=Keypair(filename=private_key_filename) - self_signed = Certificate (subject = self.hrn) - self_signed.set_pubkey (keypair) - self_signed.set_issuer (keypair, self.hrn) - self_signed.sign () - self_signed.save_to_file (output) - self.logger.debug("SfaClientBootstrap: Created self-signed certificate for %s in %s"%\ - (self.hrn, output)) + keypair = Keypair(filename=private_key_filename) + self_signed = Certificate(subject = self.hrn) + self_signed.set_pubkey(keypair) + self_signed.set_issuer(keypair, self.hrn) + self_signed.sign() + self_signed.save_to_file(output) + self.logger.debug("SfaClientBootstrap: Created self-signed certificate for {} in {}" + .format(self.hrn, output)) return output ### step2 # unconditionnally retrieve my credential (GetSelfCredential) # we always use the self-signed-cert as the SSL cert - def my_credential_produce (self, output): + def my_credential_produce(self, output): self.assert_self_signed_cert() certificate_filename = self.self_signed_cert_filename() - certificate_string = self.plain_read (certificate_filename) + certificate_string = self.plain_read(certificate_filename) self.assert_private_key() - registry_proxy = SfaServerProxy (self.registry_url, - self.private_key_filename(), - certificate_filename) + registry_proxy = SfaServerProxy(self.registry_url, + self.private_key_filename(), + certificate_filename) try: - credential_string=registry_proxy.GetSelfCredential (certificate_string, self.hrn, "user") + credential_string = registry_proxy.GetSelfCredential(certificate_string, self.hrn, "user") except: # some urns hrns may replace non hierarchy delimiters '.' with an '_' instead of escaping the '.' hrn = Xrn(self.hrn).get_hrn().replace('\.', '_') - credential_string=registry_proxy.GetSelfCredential (certificate_string, hrn, "user") - self.plain_write (output, credential_string) - self.logger.debug("SfaClientBootstrap: Wrote result of GetSelfCredential in %s"%output) + credential_string = registry_proxy.GetSelfCredential(certificate_string, hrn, "user") + self.plain_write(output, credential_string) + self.logger.debug("SfaClientBootstrap: Wrote result of GetSelfCredential in {}".format(output)) return output ### step3 # unconditionnally retrieve my GID - use the general form - def my_gid_produce (self,output): - return self.gid_produce (output, self.hrn, "user") + def my_gid_produce(self, output): + return self.gid_produce(output, self.hrn, "user") ### retrieve any credential (GetCredential) unconditionnal form # we always use the GID as the SSL cert - def credential_produce (self, output, hrn, type): + def credential_produce(self, output, hrn, type): self.assert_my_gid() certificate_filename = self.my_gid_filename() self.assert_private_key() - registry_proxy = SfaServerProxy (self.registry_url, self.private_key_filename(), - certificate_filename) + registry_proxy = SfaServerProxy(self.registry_url, self.private_key_filename(), + certificate_filename) self.assert_my_credential() my_credential_string = self.my_credential_string() - credential_string=registry_proxy.GetCredential (my_credential_string, hrn, type) - self.plain_write (output, credential_string) - self.logger.debug("SfaClientBootstrap: Wrote result of GetCredential in %s"%output) + credential_string = registry_proxy.GetCredential(my_credential_string, hrn, type) + self.plain_write(output, credential_string) + self.logger.debug("SfaClientBootstrap: Wrote result of GetCredential in {}".format(output)) return output - def slice_credential_produce (self, output, hrn): - return self.credential_produce (output, hrn, "slice") + def slice_credential_produce(self, output, hrn): + return self.credential_produce(output, hrn, "slice") - def authority_credential_produce (self, output, hrn): - return self.credential_produce (output, hrn, "authority") + def authority_credential_produce(self, output, hrn): + return self.credential_produce(output, hrn, "authority") - ### retrieve any gid (Resolve) - unconditionnal form + ### retrieve any gid(Resolve) - unconditionnal form # use my GID when available as the SSL cert, otherwise the self-signed - def gid_produce (self, output, hrn, type ): + def gid_produce(self, output, hrn, type ): try: self.assert_my_gid() certificate_filename = self.my_gid_filename() @@ -194,16 +195,16 @@ class SfaClientBootstrap: certificate_filename = self.self_signed_cert_filename() self.assert_private_key() - registry_proxy = SfaServerProxy (self.registry_url, self.private_key_filename(), - certificate_filename) - credential_string=self.plain_read (self.my_credential()) - records = registry_proxy.Resolve (hrn, credential_string) - records=[record for record in records if record['type']==type] + registry_proxy = SfaServerProxy(self.registry_url, self.private_key_filename(), + certificate_filename) + credential_string = self.plain_read(self.my_credential()) + records = registry_proxy.Resolve(hrn, credential_string) + records = [record for record in records if record['type'] == type] if not records: - raise RecordNotFound("hrn %s (%s) unknown to registry %s"%(hrn,type,self.registry_url)) - record=records[0] - self.plain_write (output, record['gid']) - self.logger.debug("SfaClientBootstrap: Wrote GID for %s (%s) in %s"% (hrn,type,output)) + raise RecordNotFound("hrn {} ({}) unknown to registry {}".format(hrn, type, self.registry_url)) + record = records[0] + self.plain_write(output, record['gid']) + self.logger.debug("SfaClientBootstrap: Wrote GID for {} ({}) in {}".format(hrn, type, output)) return output @@ -211,17 +212,17 @@ class SfaClientBootstrap: ### produce a pkcs12 bundled certificate from GID and private key # xxx for now we put a hard-wired password that's just, well, 'password' # when leaving this empty on the mac, result can't seem to be loaded in keychain.. - def my_pkcs12_produce (self, filename): - password=raw_input("Enter password for p12 certificate: ") - openssl_command=['openssl', 'pkcs12', "-export"] - openssl_command += [ "-password", "pass:%s"%password ] + def my_pkcs12_produce(self, filename): + password = raw_input("Enter password for p12 certificate: ") + openssl_command = ['openssl', 'pkcs12', "-export"] + openssl_command += [ "-password", "pass:{}".format(password) ] openssl_command += [ "-inkey", self.private_key_filename()] openssl_command += [ "-in", self.my_gid_filename()] openssl_command += [ "-out", filename ] - if subprocess.call(openssl_command) ==0: - print("Successfully created %s"%filename) + if subprocess.call(openssl_command) == 0: + print("Successfully created {}".format(filename)) else: - print("Failed to create %s"%filename) + print("Failed to create {}".format(filename)) # Returns True if credential file is valid. Otherwise return false. def validate_credential(self, filename): @@ -236,170 +237,171 @@ class SfaClientBootstrap: #################### public interface # return my_gid, run all missing steps in the bootstrap sequence - def bootstrap_my_gid (self): + def bootstrap_my_gid(self): self.self_signed_cert() self.my_credential() return self.my_gid() # once we've bootstrapped we can use this object to issue any other SFA call # always use my gid - def server_proxy (self, url): + def server_proxy(self, url): self.assert_my_gid() - return SfaServerProxy (url, self.private_key_filename(), self.my_gid_filename(), - verbose=self.verbose, timeout=self.timeout) + return SfaServerProxy(url, self.private_key_filename(), self.my_gid_filename(), + verbose=self.verbose, timeout=self.timeout) # now in some cases the self-signed is enough - def server_proxy_simple (self, url): + def server_proxy_simple(self, url): self.assert_self_signed_cert() - return SfaServerProxy (url, self.private_key_filename(), self.self_signed_cert_filename(), - verbose=self.verbose, timeout=self.timeout) + return SfaServerProxy(url, self.private_key_filename(), self.self_signed_cert_filename(), + verbose=self.verbose, timeout=self.timeout) # this method can optionnally be invoked to ensure proper # installation of the private key that belongs to this user # installs private_key in working dir with expected name -- preserve mode # typically user_private_key would be ~/.ssh/id_rsa # xxx should probably check the 2 files are identical - def init_private_key_if_missing (self, user_private_key): - private_key_filename=self.private_key_filename() - if not os.path.isfile (private_key_filename): - key=self.plain_read(user_private_key) + def init_private_key_if_missing(self, user_private_key): + private_key_filename = self.private_key_filename() + if not os.path.isfile(private_key_filename): + key = self.plain_read(user_private_key) self.plain_write(private_key_filename, key) - os.chmod(private_key_filename,os.stat(user_private_key).st_mode) - self.logger.debug("SfaClientBootstrap: Copied private key from %s into %s"%\ - (user_private_key,private_key_filename)) + os.chmod(private_key_filename, os.stat(user_private_key).st_mode) + self.logger.debug("SfaClientBootstrap: Copied private key from {} into {}" + .format(user_private_key, private_key_filename)) #################### private details # stupid stuff - def fullpath (self, file): return os.path.join (self.dir,file) + def fullpath(self, file): + return os.path.join(self.dir, file) # the expected filenames for the various pieces - def private_key_filename (self): - return self.fullpath ("%s.pkey" % Xrn.unescape(self.hrn)) - def self_signed_cert_filename (self): - return self.fullpath ("%s.sscert"%self.hrn) - def my_credential_filename (self): - return self.credential_filename (self.hrn, "user") + def private_key_filename(self): + return self.fullpath("{}.pkey".format(Xrn.unescape(self.hrn))) + def self_signed_cert_filename(self): + return self.fullpath("{}.sscert".format(self.hrn)) + def my_credential_filename(self): + return self.credential_filename(self.hrn, "user") # the tests use sfi -u ; meaning that the slice credential filename # needs to keep track of the user too - def credential_filename (self, hrn, type): + def credential_filename(self, hrn, type): if type in ['user']: - basename="%s.%s.cred"%(hrn,type) + basename = "{}.{}.cred".format(hrn, type) else: - basename="%s-%s.%s.cred"%(self.hrn,hrn,type) - return self.fullpath (basename) - def slice_credential_filename (self, hrn): - return self.credential_filename(hrn,'slice') - def authority_credential_filename (self, hrn): - return self.credential_filename(hrn,'authority') - def my_gid_filename (self): - return self.gid_filename (self.hrn, "user") - def gid_filename (self, hrn, type): - return self.fullpath ("%s.%s.gid"%(hrn,type)) - def my_pkcs12_filename (self): - return self.fullpath ("%s.p12"%self.hrn) + basename = "{}-{}.{}.cred".format(self.hrn, hrn, type) + return self.fullpath(basename) + def slice_credential_filename(self, hrn): + return self.credential_filename(hrn, 'slice') + def authority_credential_filename(self, hrn): + return self.credential_filename(hrn, 'authority') + def my_gid_filename(self): + return self.gid_filename(self.hrn, "user") + def gid_filename(self, hrn, type): + return self.fullpath("{}.{}.gid".format(hrn, type)) + def my_pkcs12_filename(self): + return self.fullpath("{}.p12".format(self.hrn)) # optimizing dependencies # originally we used classes GID or Credential or Certificate # like e.g. # return Credential(filename=self.my_credential()).save_to_string() # but in order to make it simpler to other implementations/languages.. - def plain_read (self, filename): - infile=file(filename,"r") - result=infile.read() + def plain_read(self, filename): + infile = file(filename, "r") + result = infile.read() infile.close() return result - def plain_write (self, filename, contents): - outfile=file(filename,"w") - result=outfile.write(contents) + def plain_write(self, filename, contents): + outfile = file(filename, "w") + result = outfile.write(contents) outfile.close() - def assert_filename (self, filename, kind): - if not os.path.isfile (filename): - raise IOError("Missing %s file %s"%(kind,filename)) + def assert_filename(self, filename, kind): + if not os.path.isfile(filename): + raise IOError("Missing {} file {}".format(kind, filename)) return True - def assert_private_key (self): - return self.assert_filename (self.private_key_filename(),"private key") - def assert_self_signed_cert (self): - return self.assert_filename (self.self_signed_cert_filename(),"self-signed certificate") - def assert_my_credential (self): - return self.assert_filename (self.my_credential_filename(),"user's credential") - def assert_my_gid (self): - return self.assert_filename (self.my_gid_filename(),"user's GID") + def assert_private_key(self): + return self.assert_filename(self.private_key_filename(), "private key") + def assert_self_signed_cert(self): + return self.assert_filename(self.self_signed_cert_filename(), "self-signed certificate") + def assert_my_credential(self): + return self.assert_filename(self.my_credential_filename(), "user's credential") + def assert_my_gid(self): + return self.assert_filename(self.my_gid_filename(), "user's GID") # decorator to make up the other methods - def get_or_produce (filename_method, produce_method, validate_method=None): + def get_or_produce(filename_method, produce_method, validate_method=None): # default validator returns true - def wrap (f): - def wrapped (self, *args, **kw): - filename=filename_method (self, *args, **kw) - if os.path.isfile ( filename ): + def wrap(f): + def wrapped(self, *args, **kw): + filename = filename_method(self, *args, **kw) + if os.path.isfile( filename ): if not validate_method: return filename elif validate_method(self, filename): return filename else: # remove invalid file - self.logger.warning ("Removing %s - has expired"%filename) + self.logger.warning("Removing {} - has expired".format(filename)) os.unlink(filename) try: - produce_method (self, filename, *args, **kw) + produce_method(self, filename, *args, **kw) return filename except IOError: raise except : error = sys.exc_info()[:2] - message="Could not produce/retrieve %s (%s -- %s)"%\ - (filename,error[0],error[1]) + message = "Could not produce/retrieve {} ({} -- {})"\ + .format(filename, error[0], error[1]) self.logger.log_exc(message) raise Exception(message) return wrapped return wrap - @get_or_produce (self_signed_cert_filename, self_signed_cert_produce) - def self_signed_cert (self): pass + @get_or_produce(self_signed_cert_filename, self_signed_cert_produce) + def self_signed_cert(self): pass - @get_or_produce (my_credential_filename, my_credential_produce, validate_credential) - def my_credential (self): pass + @get_or_produce(my_credential_filename, my_credential_produce, validate_credential) + def my_credential(self): pass - @get_or_produce (my_gid_filename, my_gid_produce) - def my_gid (self): pass + @get_or_produce(my_gid_filename, my_gid_produce) + def my_gid(self): pass - @get_or_produce (my_pkcs12_filename, my_pkcs12_produce) - def my_pkcs12 (self): pass + @get_or_produce(my_pkcs12_filename, my_pkcs12_produce) + def my_pkcs12(self): pass - @get_or_produce (credential_filename, credential_produce, validate_credential) - def credential (self, hrn, type): pass + @get_or_produce(credential_filename, credential_produce, validate_credential) + def credential(self, hrn, type): pass - @get_or_produce (slice_credential_filename, slice_credential_produce, validate_credential) - def slice_credential (self, hrn): pass + @get_or_produce(slice_credential_filename, slice_credential_produce, validate_credential) + def slice_credential(self, hrn): pass - @get_or_produce (authority_credential_filename, authority_credential_produce, validate_credential) - def authority_credential (self, hrn): pass + @get_or_produce(authority_credential_filename, authority_credential_produce, validate_credential) + def authority_credential(self, hrn): pass - @get_or_produce (gid_filename, gid_produce) - def gid (self, hrn, type ): pass + @get_or_produce(gid_filename, gid_produce) + def gid(self, hrn, type ): pass # get the credentials as strings, for inserting as API arguments - def my_credential_string (self): + def my_credential_string(self): self.my_credential() return self.plain_read(self.my_credential_filename()) - def slice_credential_string (self, hrn): + def slice_credential_string(self, hrn): self.slice_credential(hrn) return self.plain_read(self.slice_credential_filename(hrn)) - def authority_credential_string (self, hrn): + def authority_credential_string(self, hrn): self.authority_credential(hrn) return self.plain_read(self.authority_credential_filename(hrn)) # for consistency - def private_key (self): + def private_key(self): self.assert_private_key() return self.private_key_filename() - def delegate_credential_string (self, original_credential, to_hrn, to_type='authority'): + def delegate_credential_string(self, original_credential, to_hrn, to_type='authority'): """ sign a delegation credential to someone else @@ -414,22 +416,23 @@ class SfaClientBootstrap: and uses Credential.delegate()""" # the gid and hrn of the object we are delegating - if isinstance (original_credential, str): - original_credential = Credential (string=original_credential) + if isinstance(original_credential, str): + original_credential = Credential(string=original_credential) original_gid = original_credential.get_gid_object() original_hrn = original_gid.get_hrn() if not original_credential.get_privileges().get_all_delegate(): - self.logger.error("delegate_credential_string: original credential %s does not have delegate bit set"%original_hrn) + self.logger.error("delegate_credential_string: original credential {} does not have delegate bit set" + .format(original_hrn)) return # the delegating user's gid my_gid = self.my_gid() # retrieve the GID for the entity that we're delegating to - to_gidfile = self.gid (to_hrn,to_type) -# to_gid = GID ( to_gidfile ) + to_gidfile = self.gid(to_hrn, to_type) +# to_gid = GID(to_gidfile ) # to_hrn = delegee_gid.get_hrn() -# print 'to_hrn',to_hrn +# print 'to_hrn', to_hrn delegated_credential = original_credential.delegate(to_gidfile, self.private_key(), my_gid) return delegated_credential.save_to_string(save_parents=True) -- 2.43.0