From fd26329a0ce5139c6e4938157fc54fafe431f3bc Mon Sep 17 00:00:00 2001 From: Thierry Parmentelat Date: Wed, 2 Nov 2011 00:48:01 +0100 Subject: [PATCH] another step of moving stuff around where it belongs util/ should be free of networking stuff --- sfa/client/sfascan.py | 4 +- sfa/client/sfi.py | 48 +-- sfa/{util => client}/soapprotocol.py | 2 +- sfa/{util => client}/xmlrpcprotocol.py | 2 +- sfa/managers/aggregate_manager.py | 2 +- sfa/managers/registry_manager.py | 4 +- sfa/managers/slice_manager.py | 21 +- sfa/plc/plcomponentdriver.py | 4 +- sfa/server/interface.py | 10 +- sfa/server/sfa-clean-peer-records.py | 4 +- sfa/server/sfa-start.py | 4 +- sfa/server/sfa_component_setup.py | 14 +- sfa/server/sfaapi.py | 6 +- sfa/{util => server}/threadmanager.py | 0 sfa/util/rspecHelper.py | 418 ------------------------- sfa/util/ssl_socket.py | 76 ----- tests/testInterfaces.py | 13 +- 17 files changed, 69 insertions(+), 563 deletions(-) rename sfa/{util => client}/soapprotocol.py (95%) rename sfa/{util => client}/xmlrpcprotocol.py (98%) rename sfa/{util => server}/threadmanager.py (100%) delete mode 100755 sfa/util/rspecHelper.py delete mode 100644 sfa/util/ssl_socket.py diff --git a/sfa/client/sfascan.py b/sfa/client/sfascan.py index 494a7275..71cc9ac9 100755 --- a/sfa/client/sfascan.py +++ b/sfa/client/sfascan.py @@ -11,7 +11,7 @@ from optparse import OptionParser from sfa.client.sfi import Sfi from sfa.util.sfalogging import logger, DEBUG -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol def url_hostname_port (url): if url.find("://")<0: @@ -70,7 +70,7 @@ class Interface: url=self.url() logger.info('issuing get version at %s'%url) logger.debug("GetVersion, using timeout=%d"%options.timeout) - server=xmlrpcprotocol.get_server(url, key_file, cert_file, timeout=options.timeout, verbose=options.verbose) + server=xmlrpcprotocol.server_proxy(url, key_file, cert_file, timeout=options.timeout, verbose=options.verbose) self._version=server.GetVersion() except: self._version={} diff --git a/sfa/client/sfi.py b/sfa/client/sfi.py index 49392466..fc6a7b48 100755 --- a/sfa/client/sfi.py +++ b/sfa/client/sfi.py @@ -23,7 +23,7 @@ from sfa.util.record import SfaRecord, UserRecord, SliceRecord, NodeRecord, Auth from sfa.rspecs.rspec import RSpec from sfa.rspecs.rspec_converter import RSpecConverter from sfa.util.xrn import get_leaf, get_authority, hrn_to_urn -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.util.config import Config from sfa.util.version import version_core from sfa.util.cache import Cache @@ -398,9 +398,9 @@ class Sfi: self.cert_file = cert_file self.cert = GID(filename=cert_file) self.logger.info("Contacting Registry at: %s"%self.reg_url) - self.registry = xmlrpcprotocol.get_server(self.reg_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) + self.registry = xmlrpcprotocol.server_proxy(self.reg_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) self.logger.info("Contacting Slice Manager at: %s"%self.sm_url) - self.slicemgr = xmlrpcprotocol.get_server(self.sm_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) + self.slicemgr = xmlrpcprotocol.server_proxy(self.sm_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) return def get_cached_server_version(self, server): @@ -493,7 +493,7 @@ class Sfi: self.logger.info("Getting Registry issued cert") self.read_config() # *hack. need to set registyr before _get_gid() is called - self.registry = xmlrpcprotocol.get_server(self.reg_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) + self.registry = xmlrpcprotocol.server_proxy(self.reg_url, key_file, cert_file, timeout=self.options.timeout, verbose=self.options.debug) gid = self._get_gid(type='user') self.registry = None self.logger.info("Writing certificate to %s"%cert_file) @@ -646,7 +646,7 @@ class Sfi: return key_string # xxx opts undefined - def get_component_server_from_hrn(self, hrn): + def get_component_proxy_from_hrn(self, hrn): # direct connection to the nodes component manager interface user_cred = self.get_user_cred().save_to_string(save_parents=True) records = self.registry.Resolve(hrn, user_cred) @@ -655,9 +655,9 @@ class Sfi: self.logger.warning("No such component:%r"% opts.component) record = records[0] - return self.get_server(record['hostname'], CM_PORT, self.key_file, self.cert_file) + return self.server_proxy(record['hostname'], CM_PORT, self.key_file, self.cert_file) - def get_server(self, host, port, keyfile, certfile): + def server_proxy(self, host, port, keyfile, certfile): """ Return an instance of an xmlrpc server connection """ @@ -666,10 +666,10 @@ class Sfi: host_parts = host.split('/') host_parts[0] = host_parts[0] + ":" + str(port) url = "http://%s" % "/".join(host_parts) - return xmlrpcprotocol.get_server(url, keyfile, certfile, timeout=self.options.timeout, verbose=self.options.debug) + return xmlrpcprotocol.server_proxy(url, keyfile, certfile, timeout=self.options.timeout, verbose=self.options.debug) # xxx opts could be retrieved in self.options - def get_server_from_opts(self, opts): + def server_proxy_from_opts(self, opts): """ Return instance of an xmlrpc connection to a slice manager, aggregate or component server depending on the specified opts @@ -677,10 +677,10 @@ class Sfi: server = self.slicemgr # direct connection to an aggregate if hasattr(opts, 'aggregate') and opts.aggregate: - server = self.get_server(opts.aggregate, opts.port, self.key_file, self.cert_file) + server = self.server_proxy(opts.aggregate, opts.port, self.key_file, self.cert_file) # direct connection to the nodes component manager interface if hasattr(opts, 'component') and opts.component: - server = self.get_component_server_from_hrn(opts.component) + server = self.get_component_proxy_from_hrn(opts.component) return server #========================================================================== @@ -911,7 +911,7 @@ class Sfi: if opts.version_registry: server=self.registry else: - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) version=server.GetVersion() for (k,v) in version.iteritems(): print "%-20s: %s"%(k,v) @@ -928,7 +928,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(user_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) #results = server.ListSlices(creds, unique_call_id()) results = server.ListSlices(creds) display_list(results) @@ -939,7 +939,7 @@ class Sfi: user_cred = self.get_user_cred().save_to_string(save_parents=True) server = self.slicemgr call_options = {} - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) if args: cred = self.get_slice_cred(args[0]).save_to_string(save_parents=True) @@ -979,7 +979,7 @@ class Sfi: # created named slice with given rspec def create(self, opts, args): - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) server_version = self.get_cached_server_version(server) slice_hrn = args[0] slice_urn = hrn_to_urn(slice_hrn, 'slice') @@ -1044,7 +1044,7 @@ class Sfi: creds.append(delegated_cred) rspec_file = self.get_rspec_file(rspec_path) rspec = open(rspec_file).read() - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) ticket_string = server.GetTicket(slice_urn, creds, rspec, []) file = os.path.join(self.options.sfi_dir, get_leaf(slice_hrn) + ".ticket") self.logger.info("writing ticket to %s"%file) @@ -1075,7 +1075,7 @@ class Sfi: for hostname in hostnames: try: self.logger.info("Calling redeem_ticket at %(hostname)s " % locals()) - server = self.get_server(hostname, CM_PORT, self.key_file, \ + server = self.server_proxy(hostname, CM_PORT, self.key_file, \ self.cert_file, self.options.debug) server.RedeemTicket(ticket.save_to_string(save_parents=True), slice_cred) self.logger.info("Success") @@ -1094,7 +1094,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) call_args = [slice_urn, creds] if self.server_supports_call_id_arg(server): @@ -1110,7 +1110,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) return server.Start(slice_urn, creds) # stop named slice @@ -1122,14 +1122,14 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) return server.Stop(slice_urn, creds) # reset named slice def reset(self, opts, args): slice_hrn = args[0] slice_urn = hrn_to_urn(slice_hrn, 'slice') - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) slice_cred = self.get_slice_cred(args[0]).save_to_string(save_parents=True) creds = [slice_cred] if opts.delegate: @@ -1140,7 +1140,7 @@ class Sfi: def renew(self, opts, args): slice_hrn = args[0] slice_urn = hrn_to_urn(slice_hrn, 'slice') - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) slice_cred = self.get_slice_cred(args[0]).save_to_string(save_parents=True) creds = [slice_cred] if opts.delegate: @@ -1162,7 +1162,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) call_args = [slice_urn, creds] if self.server_supports_call_id_arg(server): call_args.append(unique_call_id()) @@ -1180,7 +1180,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) creds.append(delegated_cred) - server = self.get_server_from_opts(opts) + server = self.server_proxy_from_opts(opts) return server.Shutdown(slice_urn, creds) def print_help (self): diff --git a/sfa/util/soapprotocol.py b/sfa/client/soapprotocol.py similarity index 95% rename from sfa/util/soapprotocol.py rename to sfa/client/soapprotocol.py index de3ee965..e714cc42 100644 --- a/sfa/util/soapprotocol.py +++ b/sfa/client/soapprotocol.py @@ -23,7 +23,7 @@ class SFASoapBinding(Binding): return SFACallable(soap_callable) -def get_server(url, key_file, cert_file): +def server_proxy(url, key_file, cert_file): auth = { 'transport' : HTTPSConnection, 'transdict' : {'cert_file' : cert_file, diff --git a/sfa/util/xmlrpcprotocol.py b/sfa/client/xmlrpcprotocol.py similarity index 98% rename from sfa/util/xmlrpcprotocol.py rename to sfa/client/xmlrpcprotocol.py index 2263b286..bd741a42 100644 --- a/sfa/util/xmlrpcprotocol.py +++ b/sfa/client/xmlrpcprotocol.py @@ -87,7 +87,7 @@ class XMLRPCServerProxy(xmlrpclib.ServerProxy): logger.debug ("xml-rpc %s method:%s"%(self.url,attr)) return xmlrpclib.ServerProxy.__getattr__(self, attr) -def get_server(url, key_file, cert_file, timeout=None, verbose=False): +def server_proxy(url, key_file, cert_file, timeout=None, verbose=False): transport = XMLRPCTransport(key_file, cert_file, timeout) return XMLRPCServerProxy(url, transport, allow_none=True, verbose=verbose) diff --git a/sfa/managers/aggregate_manager.py b/sfa/managers/aggregate_manager.py index d57958b3..df97dfc8 100644 --- a/sfa/managers/aggregate_manager.py +++ b/sfa/managers/aggregate_manager.py @@ -335,7 +335,7 @@ def get_ticket(api, xrn, creds, rspec, users): # get the slice record credential = api.getCredential() interface = api.registries[api.hrn] - registry = api.get_server(interface, credential) + registry = api.server_proxy(interface, credential) records = registry.Resolve(xrn, credential) # make sure we get a local slice record diff --git a/sfa/managers/registry_manager.py b/sfa/managers/registry_manager.py index 903a8b82..5888b2b9 100644 --- a/sfa/managers/registry_manager.py +++ b/sfa/managers/registry_manager.py @@ -120,7 +120,7 @@ def resolve(api, xrns, type=None, full=True): if registry_hrn != api.hrn: credential = api.getCredential() interface = api.registries[registry_hrn] - server = api.get_server(interface, credential) + server = api.server_proxy(interface, credential) peer_records = server.Resolve(xrns, credential) records.extend([SfaRecord(dict=record).as_dict() for record in peer_records]) @@ -163,7 +163,7 @@ def list(api, xrn, origin_hrn=None): if registry_hrn != api.hrn: credential = api.getCredential() interface = api.registries[registry_hrn] - server = api.get_server(interface, credential) + server = api.server_proxy(interface, credential) record_list = server.List(xrn, credential) records = [SfaRecord(dict=record).as_dict() for record in record_list] diff --git a/sfa/managers/slice_manager.py b/sfa/managers/slice_manager.py index 29113138..685e67a7 100644 --- a/sfa/managers/slice_manager.py +++ b/sfa/managers/slice_manager.py @@ -9,10 +9,11 @@ from sfa.trust.credential import Credential from sfa.util.sfalogging import logger from sfa.util.xrn import Xrn, urn_to_hrn -from sfa.util.threadmanager import ThreadManager from sfa.util.version import version_core from sfa.util.callids import Callids +from sfa.server.threadmanager import ThreadManager + from sfa.rspecs.rspec_converter import RSpecConverter from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec @@ -149,7 +150,7 @@ def ListResources(api, creds, options, call_id): # get the rspec from the aggregate interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(_ListResources, aggregate, server, [cred], options, call_id) @@ -231,7 +232,7 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) # Just send entire RSpec to each aggregate threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id) @@ -273,7 +274,7 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id) # 'and' the results return reduce (lambda x,y: x and y, threads.get_results() , True) @@ -303,7 +304,7 @@ def DeleteSliver(api, xrn, creds, call_id): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(_DeleteSliver, server, xrn, [cred], call_id) threads.get_results() return 1 @@ -326,7 +327,7 @@ def SliverStatus(api, slice_xrn, creds, call_id): threads = ThreadManager() for aggregate in api.aggregates: interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run (_SliverStatus, server, slice_xrn, [cred], call_id) results = threads.get_results() @@ -385,7 +386,7 @@ def ListSlices(api, creds, call_id): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(_ListSlices, server, [cred], call_id) # combime results @@ -427,7 +428,7 @@ def get_ticket(api, xrn, creds, rspec, users): continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users) results = threads.get_results() @@ -484,7 +485,7 @@ def start_slice(api, xrn, creds): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(server.Start, xrn, cred) threads.get_results() return 1 @@ -507,7 +508,7 @@ def stop_slice(api, xrn, creds): if caller_hrn == aggregate and aggregate != api.hrn: continue interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) + server = api.server_proxy(interface, cred) threads.run(server.Stop, xrn, cred) threads.get_results() return 1 diff --git a/sfa/plc/plcomponentdriver.py b/sfa/plc/plcomponentdriver.py index 1ddaf27b..c991bd67 100644 --- a/sfa/plc/plcomponentdriver.py +++ b/sfa/plc/plcomponentdriver.py @@ -1,7 +1,7 @@ import os import tempfile -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.plc.nodemanager import NodeManager from sfa.trust.credential import Credential @@ -33,7 +33,7 @@ class PlComponentDriver: addr, port = self.config.SFA_REGISTRY_HOST, self.config.SFA_REGISTRY_PORT url = "http://%(addr)s:%(port)s" % locals() ### xxx this would require access to the api... - server = xmlrpcprotocol.get_server(url, self.key_file, self.cert_file) + server = xmlrpcprotocol.server_proxy(url, self.key_file, self.cert_file) return server def get_node_key(self): diff --git a/sfa/server/interface.py b/sfa/server/interface.py index 94302ecf..3866a452 100644 --- a/sfa/server/interface.py +++ b/sfa/server/interface.py @@ -1,5 +1,5 @@ #from sfa.util.faults import * -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.util.xml import XML # GeniLight client support is optional @@ -25,13 +25,13 @@ class Interface: url = "http://%s" % "/".join(address_parts) return url - def get_server(self, key_file, cert_file, timeout=30): + def server_proxy(self, key_file, cert_file, timeout=30): server = None if self.client_type == 'geniclientlight' and GeniClientLight: # xxx url and self.api are undefined server = GeniClientLight(url, self.api.key_file, self.api.cert_file) else: - server = xmlrpcprotocol.get_server(self.get_url(), key_file, cert_file, timeout) + server = xmlrpcprotocol.server_proxy(self.get_url(), key_file, cert_file, timeout) return server ## @@ -72,5 +72,5 @@ class Interfaces(dict): interface = Interface(hrn, address, port) self[hrn] = interface - def get_server(self, hrn, key_file, cert_file, timeout=30): - return self[hrn].get_server(key_file, cert_file, timeout) + def server_proxy(self, hrn, key_file, cert_file, timeout=30): + return self[hrn].server_proxy(key_file, cert_file, timeout) diff --git a/sfa/server/sfa-clean-peer-records.py b/sfa/server/sfa-clean-peer-records.py index 178ae6e5..0d99e98c 100644 --- a/sfa/server/sfa-clean-peer-records.py +++ b/sfa/server/sfa-clean-peer-records.py @@ -5,7 +5,7 @@ import os import traceback import socket -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.util.table import SfaTable from sfa.util.prefixTree import prefixTree from sfa.util.config import Config @@ -33,7 +33,7 @@ def main(): # and a valid credential authority = config.SFA_INTERFACE_HRN url = 'http://%s:%s/' %(config.SFA_REGISTRY_HOST, config.SFA_REGISTRY_PORT) - registry = xmlrpcprotocol.get_server(url, key_file, cert_file) + registry = xmlrpcprotocol.server_proxy(url, key_file, cert_file) sfa_api = Generic.the_flavour() credential = sfa_api.getCredential() diff --git a/sfa/server/sfa-start.py b/sfa/server/sfa-start.py index b13b6a7a..d4a3131d 100755 --- a/sfa/server/sfa-start.py +++ b/sfa/server/sfa-start.py @@ -39,7 +39,7 @@ from optparse import OptionParser from sfa.util.sfalogging import logger from sfa.util.xrn import get_authority, hrn_to_urn from sfa.util.config import Config -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.trust.certificate import Keypair, Certificate from sfa.trust.hierarchy import Hierarchy @@ -166,7 +166,7 @@ def install_peer_certs(server_key_file, server_cert_file): try: # get gid from the registry url = interfaces[new_hrn].get_url() - interface = interfaces[new_hrn].get_server(server_key_file, server_cert_file, timeout=30) + interface = interfaces[new_hrn].server_proxy(server_key_file, server_cert_file, timeout=30) # skip non sfa aggregates server_version = api.get_cached_server_version(interface) if 'sfa' not in server_version: diff --git a/sfa/server/sfa_component_setup.py b/sfa/server/sfa_component_setup.py index 072f3b6f..cf9fda42 100755 --- a/sfa/server/sfa_component_setup.py +++ b/sfa/server/sfa_component_setup.py @@ -6,7 +6,7 @@ from optparse import OptionParser from sfa.util.faults import ConnectionKeyGIDMismatch from sfa.util.config import Config -import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.client.xmlrpcprotocol as xmlrpcprotocol from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn from sfa.trust.certificate import Keypair, Certificate @@ -28,7 +28,7 @@ def handle_gid_mismatch_exception(f): return wrapper -def get_server(url=None, port=None, keyfile=None, certfile=None,verbose=False): +def server_proxy(url=None, port=None, keyfile=None, certfile=None,verbose=False): """ returns an xmlrpc connection to the service a the specified address @@ -48,7 +48,7 @@ def get_server(url=None, port=None, keyfile=None, certfile=None,verbose=False): if verbose: print "Contacting registry at: %(url)s" % locals() - server = xmlrpcprotocol.get_server(url, keyfile, certfile) + server = xmlrpcprotocol.server_proxy(url, keyfile, certfile) return server @@ -97,7 +97,7 @@ def get_node_key(registry=None, verbose=False): cert.sign() cert.save_to_file(certfile) - registry = get_server(url = registry, keyfile=keyfile, certfile=certfile) + registry = server_proxy(url = registry, keyfile=keyfile, certfile=certfile) registry.get_key() def create_server_keypair(keyfile=None, certfile=None, hrn="component", verbose=False): @@ -145,7 +145,7 @@ def get_credential(registry=None, force=False, verbose=False): create_server_keypair(keyfile, certfile, hrn, verbose) # get credential from registry - registry = get_server(url=registry, keyfile=keyfile, certfile=certfile) + registry = server_proxy(url=registry, keyfile=keyfile, certfile=certfile) cert = Certificate(filename=certfile) cert_str = cert.save_to_string(save_parents=True) cred = registry.GetSelfCredential(cert_str, 'node', hrn) @@ -172,7 +172,7 @@ def get_trusted_certs(registry=None, verbose=False): cred = get_credential(registry=registry, verbose=verbose) # make sure server key cert pair exists create_server_keypair(keyfile=keyfile, certfile=certfile, hrn=hrn, verbose=verbose) - registry = get_server(url=registry, keyfile=keyfile, certfile=certfile) + registry = server_proxy(url=registry, keyfile=keyfile, certfile=certfile) # get the trusted certs and save them in the right place if verbose: print "Getting trusted certs from registry" @@ -217,7 +217,7 @@ def get_gids(registry=None, verbose=False): cred = get_credential(registry=registry, verbose=verbose) # make sure server key cert pair exists create_server_keypair(keyfile=keyfile, certfile=certfile, hrn=hrn, verbose=verbose) - registry = get_server(url=registry, keyfile=keyfile, certfile=certfile) + registry = server_proxy(url=registry, keyfile=keyfile, certfile=certfile) if verbose: print "Getting current slices on this node" diff --git a/sfa/server/sfaapi.py b/sfa/server/sfaapi.py index 9d22e7f2..fa3b4764 100644 --- a/sfa/server/sfaapi.py +++ b/sfa/server/sfaapi.py @@ -73,7 +73,7 @@ class SfaApi (XmlrpcApi): def get_interface_manager(self, manager_base = 'sfa.managers'): return self.manager - def get_server(self, interface, cred, timeout=30): + def server_proxy(self, interface, cred, timeout=30): """ Returns a connection to the specified interface. Use the specified credential to determine the caller and look for the caller's key/cert @@ -89,7 +89,7 @@ class SfaApi (XmlrpcApi): auth_info = hierarchy.get_auth_info(caller_gid.get_hrn()) key_file = auth_info.get_privkey_filename() cert_file = auth_info.get_gid_filename() - server = interface.get_server(key_file, cert_file, timeout) + server = interface.server_proxy(key_file, cert_file, timeout) return server @@ -142,7 +142,7 @@ class SfaApi (XmlrpcApi): """ from sfa.server.registry import Registries registries = Registries() - registry = registries.get_server(self.hrn, self.key_file, self.cert_file) + registry = registries.server_proxy(self.hrn, self.key_file, self.cert_file) cert_string=self.cert.save_to_string(save_parents=True) # get self credential self_cred = registry.GetSelfCredential(cert_string, self.hrn, 'authority') diff --git a/sfa/util/threadmanager.py b/sfa/server/threadmanager.py similarity index 100% rename from sfa/util/threadmanager.py rename to sfa/server/threadmanager.py diff --git a/sfa/util/rspecHelper.py b/sfa/util/rspecHelper.py deleted file mode 100755 index deaa746a..00000000 --- a/sfa/util/rspecHelper.py +++ /dev/null @@ -1,418 +0,0 @@ -#! /usr/bin/env python - -import sys - -from copy import deepcopy -from lxml import etree -from StringIO import StringIO -from optparse import OptionParser - -from sfa.util.faults import InvalidRSpec -from sfa.util.sfalogging import logger - -def merge_rspecs(rspecs): - """ - Merge merge a list of RSpecs into 1 RSpec, and return the result. - rspecs must be a valid RSpec string or list of RSpec strings. - """ - if not rspecs or not isinstance(rspecs, list): - return rspecs - - # ugly hack to avoid sending the same info twice, when the call graph has dags - known_networks={} - def register_network (network): - try: - known_networks[network.get('name')]=True - except: - logger.error("merge_rspecs: cannot register network with no name in rspec") - pass - def is_registered_network (network): - try: - return network.get('name') in known_networks - except: - logger.error("merge_rspecs: cannot retrieve network with no name in rspec") - return False - - # the resulting tree - rspec = None - for input_rspec in rspecs: - # ignore empty strings as returned with used call_ids - if not input_rspec: continue - try: - tree = etree.parse(StringIO(input_rspec)) - except etree.XMLSyntaxError: - # consider failing silently here - logger.log_exc("merge_rspecs, parse error") - message = str(sys.exc_info()[1]) + ' with ' + input_rspec - raise InvalidRSpec(message) - - root = tree.getroot() - if not root.get("type") in ["SFA"]: - logger.error("merge_rspecs: unexpected type for rspec root, %s"%root.get('type')) - continue - if rspec == None: - # we scan the first input, register all networks - # in addition we remove duplicates - needed until everyone runs 1.0-10 - rspec = root - for network in root.iterfind("./network"): - if not is_registered_network(network): - register_network(network) - else: - # duplicate in the first input - trash it - root.remove(network) - else: - for network in root.iterfind("./network"): - if not is_registered_network(network): - rspec.append(deepcopy(network)) - register_network(network) - for request in root.iterfind("./request"): - rspec.append(deepcopy(request)) - return etree.tostring(rspec, xml_declaration=True, pretty_print=True) - -class RSpec: - def __init__(self, xml): - parser = etree.XMLParser(remove_blank_text=True) - tree = etree.parse(StringIO(xml), parser) - self.rspec = tree.getroot() - - # If there is only one network in the rspec, make it the default - self.network = None - networks = self.get_network_list() - if len(networks) == 1: - self.network = networks[0] - - # Thierry : need this to locate hostname even if several networks - def get_node_element(self, hostname, network=None): - if network == None and self.network: - network = self.network - if network != None: - names = self.rspec.iterfind("./network[@name='%s']/site/node/hostname" % network) - else: - names = self.rspec.iterfind("./network/site/node/hostname") - for name in names: - if name.text == hostname: - return name.getparent() - return None - - # Thierry : need this to return all nodes in all networks - def get_node_list(self, network=None): - if network == None and self.network: - network = self.network - if network != None: - return self.rspec.xpath("./network[@name='%s']/site/node/hostname/text()" % network) - else: - return self.rspec.xpath("./network/site/node/hostname/text()") - - def get_network_list(self): - return self.rspec.xpath("./network[@name]/@name") - - def get_sliver_list(self, network=None): - if network == None: - network = self.network - result = self.rspec.xpath("./network[@name='%s']/site/node[sliver]/hostname/text()" % network) - return result - - def get_available_node_list(self, network=None): - if network == None: - network = self.network - result = self.rspec.xpath("./network[@name='%s']/site/node[not(sliver)]/hostname/text()" % network) - return result - - def add_sliver(self, hostname, network=None): - if network == None: - network = self.network - node = self.get_node_element(hostname, network) - etree.SubElement(node, "sliver") - - def remove_sliver(self, hostname, network=None): - if network == None: - network = self.network - node = self.get_node_element(hostname, network) - node.remove(node.find("sliver")) - - def attributes_list(self, elem): - opts = [] - if elem is not None: - for e in elem: - opts.append((e.tag, e.text)) - return opts - - def get_default_sliver_attributes(self, network=None): - if network == None: - network = self.network - defaults = self.rspec.find("./network[@name='%s']/sliver_defaults" % network) - return self.attributes_list(defaults) - - def get_sliver_attributes(self, hostname, network=None): - if network == None: - network = self.network - node = self.get_node_element(hostname, network) - sliver = node.find("sliver") - return self.attributes_list(sliver) - - def add_attribute(self, elem, name, value): - opt = etree.SubElement(elem, name) - opt.text = value - - def add_default_sliver_attribute(self, name, value, network=None): - if network == None: - network = self.network - defaults = self.rspec.find("./network[@name='%s']/sliver_defaults" % network) - if defaults is None: - defaults = etree.Element("sliver_defaults") - network = self.rspec.find("./network[@name='%s']" % network) - network.insert(0, defaults) - self.add_attribute(defaults, name, value) - - def add_sliver_attribute(self, hostname, name, value, network=None): - if network == None: - network = self.network - node = self.get_node_element(hostname, network) - sliver = node.find("sliver") - self.add_attribute(sliver, name, value) - - def remove_attribute(self, elem, name, value): - if elem is not None: - opts = elem.iterfind(name) - if opts is not None: - for opt in opts: - if opt.text == value: - elem.remove(opt) - - def remove_default_sliver_attribute(self, name, value, network=None): - if network == None: - network = self.network - defaults = self.rspec.find("./network[@name='%s']/sliver_defaults" % network) - self.remove_attribute(defaults, name, value) - - def remove_sliver_attribute(self, hostname, name, value, network=None): - if network == None: - network = self.network - node = self.get_node_element(hostname, network) - sliver = node.find("sliver") - self.remove_attribute(sliver, name, value) - - def get_site_nodes(self, siteid, network=None): - if network == None: - network = self.network - query = './network[@name="%s"]/site[@id="%s"]/node/hostname/text()' % (network, siteid) - result = self.rspec.xpath(query) - return result - - def get_link_list(self, network=None): - if network == None: - network = self.network - linklist = [] - links = self.rspec.iterfind("./network[@name='%s']/link" % network) - for link in links: - (end1, end2) = link.get("endpoints").split() - name = link.find("description") - linklist.append((name.text, - self.get_site_nodes(end1, network), - self.get_site_nodes(end2, network))) - return linklist - - def get_vlink_list(self, network=None): - if network == None: - network = self.network - vlinklist = [] - vlinks = self.rspec.iterfind("./network[@name='%s']//vlink" % network) - for vlink in vlinks: - endpoints = vlink.get("endpoints") - (end1, end2) = endpoints.split() - query = './network[@name="%s"]//node[@id="%s"]/hostname/text()' % network - node1 = self.rspec.xpath(query % end1)[0] - node2 = self.rspec.xpath(query % end2)[0] - desc = "%s <--> %s" % (node1, node2) - kbps = vlink.find("kbps") - vlinklist.append((endpoints, desc, kbps.text)) - return vlinklist - - def query_links(self, fromnode, tonode, network=None): - if network == None: - network = self.network - fromsite = fromnode.getparent() - tosite = tonode.getparent() - fromid = fromsite.get("id") - toid = tosite.get("id") - - query = "./network[@name='%s']/link[@endpoints = '%s %s']" % (network, fromid, toid) - results = self.rspec.xpath(query) - if results == None: - query = "./network[@name='%s']/link[@endpoints = '%s %s']" % (network, toid, fromid) - results = self.rspec.xpath(query) - return results - - def query_vlinks(self, endpoints, network=None): - if network == None: - network = self.network - query = "./network[@name='%s']//vlink[@endpoints = '%s']" % (network, endpoints) - results = self.rspec.xpath(query) - return results - - - def add_vlink(self, fromhost, tohost, kbps, network=None): - if network == None: - network = self.network - fromnode = self.get_node_element(fromhost, network) - tonode = self.get_node_element(tohost, network) - links = self.query_links(fromnode, tonode, network) - - for link in links: - vlink = etree.SubElement(link, "vlink") - fromid = fromnode.get("id") - toid = tonode.get("id") - vlink.set("endpoints", "%s %s" % (fromid, toid)) - self.add_attribute(vlink, "kbps", kbps) - - - def remove_vlink(self, endpoints, network=None): - if network == None: - network = self.network - vlinks = self.query_vlinks(endpoints, network) - for vlink in vlinks: - vlink.getparent().remove(vlink) - - def toxml(self): - return etree.tostring(self.rspec, pretty_print=True, - xml_declaration=True) - - def __str__(self): - return self.toxml() - - def save(self, filename): - f = open(filename, "w") - f.write(self.toxml()) - f.close() - - -class Commands: - def __init__(self, usage, description, epilog=None): - self.parser = OptionParser(usage=usage, description=description, - epilog=epilog) - self.parser.add_option("-i", "", dest="infile", metavar="FILE", - help="read RSpec from FILE (default is stdin)") - self.parser.add_option("-o", "", dest="outfile", metavar="FILE", - help="write output to FILE (default is stdout)") - self.nodefile = False - self.attributes = {} - - def add_nodefile_option(self): - self.nodefile = True - self.parser.add_option("-n", "", dest="nodefile", - metavar="FILE", - help="read node list from FILE"), - - def add_show_attributes_option(self): - self.parser.add_option("-s", "--show-attributes", action="store_true", - dest="showatt", default=False, - help="show sliver attributes") - - def add_attribute_options(self): - self.parser.add_option("", "--capabilities", action="append", - metavar="", - help="Vserver bcapabilities") - self.parser.add_option("", "--codemux", action="append", - metavar="", - help="Demux HTTP between slices using " + - "localhost ports") - self.parser.add_option("", "--cpu-pct", action="append", - metavar="", - help="Reserved CPU percent (e.g., 25)") - self.parser.add_option("", "--cpu-share", action="append", - metavar="", - help="Number of CPU shares (e.g., 5)") - self.parser.add_option("", "--delegations", - metavar="", action="append", - help="List of slices with delegation authority") - self.parser.add_option("", "--disk-max", - metavar="", action="append", - help="Disk quota (1k disk blocks)") - self.parser.add_option("", "--initscript", - metavar="", action="append", - help="Slice initialization script (e.g., stork)") - self.parser.add_option("", "--ip-addresses", action="append", - metavar="", - help="Add an IP address to a sliver") - self.parser.add_option("", "--net-i2-max-kbyte", - metavar="", action="append", - help="Maximum daily network Tx limit " + - "to I2 hosts.") - self.parser.add_option("", "--net-i2-max-rate", - metavar="", action="append", - help="Maximum bandwidth over I2 routes") - self.parser.add_option("", "--net-i2-min-rate", - metavar="", action="append", - help="Minimum bandwidth over I2 routes") - self.parser.add_option("", "--net-i2-share", - metavar="", action="append", - help="Number of bandwidth shares over I2 routes") - self.parser.add_option("", "--net-i2-thresh-kbyte", - metavar="", action="append", - help="Limit sent to I2 hosts before warning, " + - "throttling") - self.parser.add_option("", "--net-max-kbyte", - metavar="", action="append", - help="Maximum daily network Tx limit " + - "to non-I2 hosts.") - self.parser.add_option("", "--net-max-rate", - metavar="", action="append", - help="Maximum bandwidth over non-I2 routes") - self.parser.add_option("", "--net-min-rate", - metavar="", action="append", - help="Minimum bandwidth over non-I2 routes") - self.parser.add_option("", "--net-share", - metavar="", action="append", - help="Number of bandwidth shares over non-I2 " + - "routes") - self.parser.add_option("", "--net-thresh-kbyte", - metavar="", action="append", - help="Limit sent to non-I2 hosts before " + - "warning, throttling") - self.parser.add_option("", "--vsys", - metavar="", action="append", - help="Vsys script (e.g., fd_fusemount)") - self.parser.add_option("", "--vsys-vnet", - metavar="", action="append", - help="Allocate a virtual private network") - - def get_attribute_dict(self): - attrlist = ['capabilities','codemux','cpu_pct','cpu_share', - 'delegations','disk_max','initscript','ip_addresses', - 'net_i2_max_kbyte','net_i2_max_rate','net_i2_min_rate', - 'net_i2_share','net_i2_thresh_kbyte', - 'net_max_kbyte','net_max_rate','net_min_rate', - 'net_share','net_thresh_kbyte', - 'vsys','vsys_vnet'] - attrdict = {} - for attr in attrlist: - value = getattr(self.opts, attr, None) - if value is not None: - attrdict[attr] = value - return attrdict - - def prep(self): - (self.opts, self.args) = self.parser.parse_args() - - if self.opts.infile: - sys.stdin = open(self.opts.infile, "r") - xml = sys.stdin.read() - self.rspec = RSpec(xml) - - if self.nodefile: - if self.opts.nodefile: - f = open(self.opts.nodefile, "r") - self.nodes = f.read().split() - f.close() - else: - self.nodes = self.args - - if self.opts.outfile: - sys.stdout = open(self.opts.outfile, "w") - - - - - - - diff --git a/sfa/util/ssl_socket.py b/sfa/util/ssl_socket.py deleted file mode 100644 index d221da36..00000000 --- a/sfa/util/ssl_socket.py +++ /dev/null @@ -1,76 +0,0 @@ -from ssl import SSLSocket - -import textwrap - -import _ssl # if we can't import it, let the error propagate - -from _ssl import SSLError -from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED -from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1 -from _ssl import RAND_status, RAND_egd, RAND_add -from _ssl import \ - SSL_ERROR_ZERO_RETURN, \ - SSL_ERROR_WANT_READ, \ - SSL_ERROR_WANT_WRITE, \ - SSL_ERROR_WANT_X509_LOOKUP, \ - SSL_ERROR_SYSCALL, \ - SSL_ERROR_SSL, \ - SSL_ERROR_WANT_CONNECT, \ - SSL_ERROR_EOF, \ - SSL_ERROR_INVALID_ERROR_CODE - -from socket import socket, _fileobject -from socket import getnameinfo as _getnameinfo -import base64 # for DER-to-PEM translation - -class SSLSocket(SSLSocket, socket): - - """This class implements a subtype of socket.socket that wraps - the underlying OS socket in an SSL context when necessary, and - provides read and write methods over that channel.""" - - def __init__(self, sock, keyfile=None, certfile=None, - server_side=False, cert_reqs=CERT_NONE, - ssl_version=PROTOCOL_SSLv23, ca_certs=None, - do_handshake_on_connect=True, - suppress_ragged_eofs=True): - socket.__init__(self, _sock=sock._sock) - # the initializer for socket trashes the methods (tsk, tsk), so... - self.send = lambda data, flags=0: SSLSocket.send(self, data, flags) - self.sendto = lambda data, addr, flags=0: SSLSocket.sendto(self, data, addr, flags) - self.recv = lambda buflen=1024, flags=0: SSLSocket.recv(self, buflen, flags) - self.recvfrom = lambda addr, buflen=1024, flags=0: SSLSocket.recvfrom(self, addr, buflen, flags) - self.recv_into = lambda buffer, nbytes=None, flags=0: SSLSocket.recv_into(self, buffer, nbytes, flags) - self.recvfrom_into = lambda buffer, nbytes=None, flags=0: SSLSocket.recvfrom_into(self, buffer, nbytes, flags) - - if certfile and not keyfile: - keyfile = certfile - # see if it's connected - try: - socket.getpeername(self) - except: - # no, no connection yet - self._sslobj = None - else: - # yes, create the SSL object - self._sslobj = _ssl.sslwrap(self._sock, server_side, - keyfile, certfile, - cert_reqs, ssl_version, ca_certs) - if do_handshake_on_connect: - timeout = self.gettimeout() - try: - if timeout == 0: - self.settimeout(None) - self.do_handshake() - finally: - self.settimeout(timeout) - self.keyfile = keyfile - self.certfile = certfile - self.cert_reqs = cert_reqs - self.ssl_version = ssl_version - self.ca_certs = ca_certs - self.do_handshake_on_connect = do_handshake_on_connect - self.suppress_ragged_eofs = suppress_ragged_eofs - self._makefile_refs = 0 - - diff --git a/tests/testInterfaces.py b/tests/testInterfaces.py index 91606371..ce729367 100755 --- a/tests/testInterfaces.py +++ b/tests/testInterfaces.py @@ -4,10 +4,9 @@ import os import random import string import unittest -import sfa.util.xmlrpcprotocol as xmlrpc +import sfa.util.xmlrpcprotocol as xmlrpcprotocol from unittest import TestCase from optparse import OptionParser -from sfa.util.xmlrpcprotocol import ServerException from sfa.util.xrn import get_authority from sfa.util.config import * from sfa.trust.certificate import * @@ -44,10 +43,10 @@ class Client: self.cert.save_to_file(cert_file) SFI_AGGREGATE = config.SFI_SM.replace('12347', '12346') SFI_CM = 'http://' + options.cm_host + ':12346' - self.registry = xmlrpc.get_server(config.SFI_REGISTRY, key_file, cert_file) - self.aggregate = xmlrpc.get_server(SFI_AGGREGATE, key_file, cert_file) - self.sm = xmlrpc.get_server(config.SFI_SM, key_file, cert_file) - self.cm = xmlrpc.get_server(SFI_CM, key_file, cert_file) + self.registry = xmlrpcprotocol.server_proxy(config.SFI_REGISTRY, key_file, cert_file) + self.aggregate = xmlrpcprotocol.server_proxy(SFI_AGGREGATE, key_file, cert_file) + self.sm = xmlrpcprotocol.server_proxy(config.SFI_SM, key_file, cert_file) + self.cm = xmlrpcprotocol.server_proxy(SFI_CM, key_file, cert_file) self.hrn = config.SFI_USER # XX defaulting to user, but this should be configurable so we can # test from components persepctive @@ -171,7 +170,7 @@ class RegistryTest(BasicTestCase): server_exception = False try: callable(self.credential) - except ServerException: + except xmlrpcprotocol.ServerException: server_exception = True finally: if self.type in ['user'] and not server_exception: -- 2.43.0