X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=tests%2FtestInterfaces.py;h=520d94ad4e61a0f4dbed19b5a08004483b830390;hb=4a9e6751f9f396f463932133b9d62fc925a99ef6;hp=a51799b858a07c1820e0878e6b3c64f5b6e915dd;hpb=935de64c2ed3a68566da471c68447a1ac384455d;p=sfa.git diff --git a/tests/testInterfaces.py b/tests/testInterfaces.py index a51799b8..520d94ad 100755 --- a/tests/testInterfaces.py +++ b/tests/testInterfaces.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 import sys import os import random @@ -26,11 +26,11 @@ class Client: key = None cert = None credential = None - type = None + type = None def __init__(self, options): try: self.config = config = Config(options.config_file) except: - print "failed to read config_file %s" % options.config_file + print("failed to read config_file %s" % options.config_file) sys.exit(1) key_path = os.path.dirname(options.config_file) user_name = self.config.SFI_USER.split('.')[-1:][0] @@ -41,7 +41,7 @@ class Client: self.cert.set_pubkey(self.key) self.cert.set_issuer(self.key, self.config.SFI_USER) self.cert.sign() - self.cert.save_to_file(cert_file) + self.cert.save_to_file(cert_file) SFI_AGGREGATE = config.SFI_SM.replace('12347', '12346') SFI_CM = 'http://' + options.cm_host + ':12346' self.registry = SfaServerProxy(config.SFI_REGISTRY, key_file, cert_file) @@ -53,9 +53,9 @@ class Client: # test from components persepctive self.type = 'user' self.credential = self.GetCredential(self.hrn) - + def GetCredential(self, hrn = None, type = 'user'): - if not hrn: hrn = self.hrn + if not hrn: hrn = self.hrn if hrn == self.hrn: cert = self.cert.save_to_string(save_parents=True) request_hash = self.key.compute_hash([cert, 'user', hrn]) @@ -64,14 +64,14 @@ class Client: else: if not self.credential: self.credential = self.GetCredential(self.hrn, 'user') - return self.registry.GetCredential(self.credential, type, hrn) + return self.registry.GetCredential(self.credential, type, hrn) class BasicTestCase(unittest.TestCase): def __init__(self, testname, client, test_slice=None): unittest.TestCase.__init__(self, testname) self.client = client self.slice = test_slice - + def setUp(self): self.registry = self.client.registry self.aggregate = self.client.aggregate @@ -79,8 +79,8 @@ class BasicTestCase(unittest.TestCase): self.cm = self.client.cm self.credential = self.client.credential self.hrn = self.client.hrn - self.type = self.client.type - + self.type = self.client.type + # Registry tests class RegistryTest(BasicTestCase): @@ -122,22 +122,22 @@ class RegistryTest(BasicTestCase): try: self.registry.Remove(auth_cred, record['type'], record['hrn']) except: pass - + def testRegisterPeerObject(self): assert True - + def testUpdate(self): authority = get_authority(self.hrn) auth_cred = self.client.GetCredential(authority, 'authority') records = self.registry.Resolve(self.credential, self.hrn) if not records: assert False record = records[0] - self.registry.update(auth_cred, record) + self.registry.update(auth_cred, record) def testResolve(self): authority = get_authority(self.hrn) self.registry.Resolve(self.credential, self.hrn) - + def testRemove(self): authority = get_authority(self.hrn) auth_cred = self.client.GetCredential(authority, 'authority') @@ -149,26 +149,26 @@ class RegistryTest(BasicTestCase): try: self.registry.Resolve(self.credential, record['hrn']) assert False - except: + except: assert True - + def testRemovePeerObject(self): assert True def testList(self): authority = get_authority(self.client.hrn) self.registry.List(self.credential, authority) - + def testGetRegistries(self): self.registry.get_registries(self.credential) - + def testGetAggregates(self): self.registry.get_aggregates(self.credential) def testGetTrustedCerts(self): # this should fail unless we are a node callable = self.registry.get_trusted_certs - server_exception = False + server_exception = False try: callable(self.credential) except ServerException: @@ -176,12 +176,12 @@ class RegistryTest(BasicTestCase): finally: if self.type in ['user'] and not server_exception: assert False - + class AggregateTest(BasicTestCase): def setUp(self): BasicTestCase.setUp(self) - + def testGetSlices(self): self.aggregate.ListSlices(self.credential) @@ -195,7 +195,7 @@ class AggregateTest(BasicTestCase): RSpec(xml=slice_rspec) def testCreateSlice(self): - # get availabel resources + # get availabel resources rspec = self.aggregate.get_resources(self.credential) slice_credential = self.client.GetCredential(self.slice['hrn'], 'slice') self.aggregate.CreateSliver(slice_credential, self.slice['hrn'], rspec) @@ -209,43 +209,7 @@ class AggregateTest(BasicTestCase): rspec = self.aggregate.get_resources(self.credential) ticket = self.aggregate.GetTicket(slice_credential, self.slice['hrn'], rspec) # will raise an exception if the ticket inst valid - SfaTicket(string=ticket) - -class SlicemgrTest(AggregateTest): - def setUp(self): - AggregateTest.setUp(self) - - # force calls to go through slice manager - self.aggregate = self.sm - - # get the slice credential - - -class ComponentTest(BasicTestCase): - def setUp(self): - BasicTestCase.setUp(self) - self.slice_cred = self.client.GetCredential(self.slice['hrn'], 'slice') - - def testStartSlice(self): - self.cm.start_slice(self.slice_cred, self.slice['hrn']) - - def testStopSlice(self): - self.cm.stop_slice(self.slice_cred, self.slice['hrn']) - - def testDeleteSlice(self): - self.cm.DeleteSliver(self.slice_cred, self.slice['hrn'],"call-id-delete-slice-cm") - - def testRestartSlice(self): - self.cm.restart_slice(self.slice_cred, self.slice['hrn']) - - def testGetSlices(self): - self.cm.ListSlices(self.slice_cred, self.slice['hrn']) - - def testRedeemTicket(self): - rspec = self.aggregate.get_resources(self.credential) - ticket = self.aggregate.GetTicket(slice_cred, self.slice['hrn'], rspec) - self.cm.redeem_ticket(slice_cred, ticket) - + SfaTicket(string=ticket) def test_names(testcase): return [name for name in dir(testcase) if name.startswith('test')] @@ -258,13 +222,13 @@ def CreateSliver(client): 'type': 'slice', 'researcher': [client.hrn]} client.registry.Register(auth_cred, slice_record) return slice_record - + def DeleteSliver(client, slice): authority = get_authority(client.hrn) auth_cred = client.GetCredential(authority, 'authority') if slice: client.registry.Remove(auth_cred, 'slice', slice['hrn']) - + if __name__ == '__main__': args = sys.argv @@ -278,42 +242,35 @@ if __name__ == '__main__': default=False, help='run registry tests') parser.add_option('-a', '--aggregate', dest='aggregate', action='store_true', default=False, help='run aggregate tests') - parser.add_option('-s', '--slicemgr', dest='slicemgr', action='store_true', - default=False, help='run slicemgr tests') parser.add_option('-c', '--component', dest='component', action='store_true', default=False, help='run component tests') - parser.add_option('-d', '--cm_host', dest='cm_host', default=default_cm, + parser.add_option('-d', '--cm_host', dest='cm_host', default=default_cm, help='dns name of component to test. default is %s' % default_cm) parser.add_option('-A', '--all', dest='all', action='store_true', default=False, help='run component tests') - + options, args = parser.parse_args() suite = unittest.TestSuite() client = Client(options) test_slice = {} - + # create the test slice if necessary - if options.all or options.slicemgr or options.aggregate \ - or options.component: + if options.all or options.aggregate or options.component: test_slice = CreateSliver(client) if options.registry or options.all: for name in test_names(RegistryTest): suite.addTest(RegistryTest(name, client)) - if options.aggregate or options.all: + if options.aggregate or options.all: for name in test_names(AggregateTest): suite.addTest(AggregateTest(name, client, test_slice)) - if options.slicemgr or options.all: - for name in test_names(SlicemgrTest): - suite.addTest(SlicemgrTest(name, client, test_slice)) - - if options.component or options.all: + if options.component or options.all: for name in test_names(ComponentTest): suite.addTest(ComponentTest(name, client, test_slice)) - - # run tests + + # run tests unittest.TextTestRunner(verbosity=2).run(suite) # remove teset slice