From: Tony Mack Date: Tue, 29 Jan 2008 22:04:08 +0000 (+0000) Subject: fix a lot of errors. use plcsh instead of xmlrpc X-Git-Tag: 2008-02-11-last-vmware-support~57 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=b3366690d013911dc2ded26a96b511a2fba5df68;p=tests.git fix a lot of errors. use plcsh instead of xmlrpc --- diff --git a/qaapi/qa/tests/api_unit_test.py b/qaapi/qa/tests/api_unit_test.py index e331e15..b7878af 100755 --- a/qaapi/qa/tests/api_unit_test.py +++ b/qaapi/qa/tests/api_unit_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env /usr/share/plc_api/plcsh # # Test script example # @@ -24,18 +24,20 @@ from random import Random random = Random() config = Config() -auth = config.auth -try: boot_states = config.api.GetBootStates(auth) +try: boot_states = GetBootStates() except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins'] -try: roles = [role['name'] for role in config.api.GetRoles(auth)] +try: roles = [role['name'] for role in GetRoles()] except: roles = [u'admin', u'pi', u'user', u'tech'] -try: methods = config.api.GetNetworkMethods(auth) +try: methods = GetNetworkMethods() except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown'] -try:types = config.api.GetNetworkTypes(auth) +try: key_types = GetKeyTypes() +except: key_types = [u'ssh'] + +try:types = GetNetworkTypes() except: types = [u'ipv4'] def randfloat(min = 0.0, max = 1.0): @@ -256,9 +258,7 @@ class api_unit_test(Test): slices = 4, initscripts = 4, ): - self.api = self.config.api - self.auth = self.config.auth - self.all_methods = set(self.api.system.listMethods()) + self.all_methods = set(system.listMethods()) self.methods_tested = set() self.methods_failed = set() @@ -269,7 +269,7 @@ class api_unit_test(Test): self.site_ids = self.Sites(sites) #self.peer_ids = self.Peers(peers) self.address_type_ids = self.AddressTypes(address_types) - self.addresse_ids = self.Addresses(addresses) + self.address_ids = self.Addresses(addresses) #self.conf_files = self.ConfFiles(conf_files) #self.network_method_ids = self.NetworkMethods() #self.network_type_ids = self.NetworkTypes() @@ -282,15 +282,14 @@ class api_unit_test(Test): #self.pcus_ids = self.PCUs(pcus) #self.pcu_types_ids = self.PCUTypes(pcu_types) #self.role_ids = self.Roles(roles) - self.person_ids = self.Persons(persons) - self.key_ids = self.Keys(keys) #self.key_types = self.KeyTypes(key_types) #self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types) #self.slice_instantiation_ids = self.SliceInstantiations(slice_instantiations) - #self.slice_ids = self.Slices(slices) + self.slice_ids = self.Slices(slices) #self.slice_attribute_ids = self.SliceAttributes(slice_attributes) #self.initscript_ids = self.InitScripts(initscripts) - + self.person_ids = self.Persons(persons) + self.key_ids = self.Keys(keys) # self.message_ids = self.Messages(messages) #self.NotifPersons() # Test GetEventObject only @@ -340,7 +339,7 @@ class api_unit_test(Test): def debug(self, method, method_name=None): if method_name is None: - method_name = method._Method__name + method_name = method.name self.methods_tested.update([method_name]) def wrapper(*args, **kwds): @@ -367,7 +366,7 @@ class api_unit_test(Test): if hasattr(self, 'pcu_ids'): self.DeletePCUs() if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes() if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings() - if hasattr(self, 'attress_ids'): self.DeleteAddresses() + if hasattr(self, 'address_ids'): self.DeleteAddresses() if hasattr(self, 'attress_type_ids'): self.DeleteAddressTypes() if hasattr(self, 'node_ids'): self.DeleteNodes() if hasattr(self, 'site_ids'): self.DeleteSites() @@ -378,31 +377,31 @@ class api_unit_test(Test): for i in range(n): # Add Site site_fields = random_site() - AddSite = self.debug(self.api.AddSite) - site_id = AddSite(self.auth, site_fields) + AddSite = self.debug(shell.AddSite) + site_id = AddSite(site_fields) if site_id is None: continue # Should return a unique id self.isunique(site_id, site_ids, 'AddSite - isunique') site_ids.append(site_id) - GetSites = self.debug(self.api.GetSites) - sites = GetSites(self.auth, [site_id]) + GetSites = self.debug(shell.GetSites) + sites = GetSites([site_id]) if sites is None: continue site = sites[0] self.isequal(site, site_fields, 'AddSite - isequal') # Update site site_fields = random_site() - UpdateSite = self.debug(self.api.UpdateSite) - result = UpdateSite(self.auth, site_id, site_fields) + UpdateSite = self.debug(shell.UpdateSite) + result = UpdateSite(site_id, site_fields) # Check again - sites = GetSites(self.auth, [site_id]) + sites = GetSites([site_id]) if sites is None: continue site = sites[0] self.isequal(site, site_fields, 'UpdateSite - isequal') - sites = GetSites(self.auth, site_ids) + sites = GetSites(site_ids) if sites is not None: self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal') @@ -414,13 +413,13 @@ class api_unit_test(Test): def DeleteSites(self): # Delete all sites - DeleteSite = self.debug(self.api.DeleteSite) + DeleteSite = self.debug(shell.DeleteSite) for site_id in self.site_ids: - result = DeleteSite(self.auth, site_id) + result = DeleteSite(site_id) # Check if sites are deleted - GetSites = self.debug(self.api.GetSites) - sites = GetSites(self.auth, self.site_ids) + GetSites = self.debug(shell.GetSites) + sites = GetSites(self.site_ids) self.islistequal(sites, [], 'DeleteSite - check') if self.config.verbose: @@ -434,8 +433,8 @@ class api_unit_test(Test): # Add Node node_fields = random_node() site_id = random.sample(self.site_ids, 1)[0] - AddNode = self.debug(self.api.AddNode) - node_id = AddNode(self.auth, site_id, node_fields) + AddNode = self.debug(shell.AddNode) + node_id = AddNode(site_id, node_fields) if node_id is None: continue # Should return a unique id @@ -443,24 +442,24 @@ class api_unit_test(Test): node_ids.append(node_id) # Check nodes - GetNodes = self.debug(self.api.GetNodes) - nodes = GetNodes(self.auth, [node_id]) + GetNodes = self.debug(shell.GetNodes) + nodes = GetNodes([node_id]) if nodes is None: continue node = nodes[0] self.isequal(node, node_fields, 'AddNode - isequal') # Update node node_fields = random_node() - UpdateNode = self.debug(self.api.UpdateNode) - result = UpdateNode(self.auth, node_id, node_fields) + UpdateNode = self.debug(shell.UpdateNode) + result = UpdateNode(node_id, node_fields) # Check again - nodes = GetNodes(self.auth, [node_id]) + nodes = GetNodes([node_id]) if nodes is None: continue node = nodes[0] self.isequal(node, node_fields, 'UpdateNode - isequal') - nodes = GetNodes(self.auth, node_ids) + nodes = GetNodes(node_ids) if nodes is not None: self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal') @@ -470,13 +469,13 @@ class api_unit_test(Test): return node_ids def DeleteNodes(self): - DeleteNode = self.debug(self.api.DeleteNode) + DeleteNode = self.debug(shell.DeleteNode) for node_id in self.node_ids: - result = DeleteNode(self.auth, node_id) + result = DeleteNode(node_id) # Check if nodes are deleted - GetNodes = self.debug(self.api.GetNodes) - nodes = GetNodes(self.api, self.node_ids) + GetNodes = self.debug(shell.GetNodes) + nodes = GetNodes(self.node_ids) self.islistequal(nodes, [], 'DeleteNode Check') if self.config.verbose: @@ -488,8 +487,8 @@ class api_unit_test(Test): address_type_ids = [] for i in range(n): address_type_fields = random_address_type() - AddAddressType = self.debug(self.api.AddAddressType) - address_type_id = AddAddressType(self.auth, address_type_fields) + AddAddressType = self.debug(shell.AddAddressType) + address_type_id = AddAddressType(address_type_fields) if address_type_id is None: continue # Should return a unique address_type_id @@ -497,42 +496,42 @@ class api_unit_test(Test): address_type_ids.append(address_type_id) # Check address type - GetAddressTypes = self.debug(self.api.GetAddressTypes) - address_types = GetAddressTypes(self.auth, [address_type_id]) + GetAddressTypes = self.debug(shell.GetAddressTypes) + address_types = GetAddressTypes([address_type_id]) if address_types is None: continue address_type = address_types[0] self.isequal(address_type, address_type_fields, 'AddAddressType - isequal') # Update address type address_type_fields = random_address_type() - UpdateAddressType = self.debug(self.api.UpdateAddressType) - result = UpdateAddressType(self.auth, address_type_id, address_type_fields) + UpdateAddressType = self.debug(shell.UpdateAddressType) + result = UpdateAddressType(address_type_id, address_type_fields) if result is None: continue # Check address type again - address_types = GetAddressTypes(self.auth, [address_type_id]) + address_types = GetAddressTypes([address_type_id]) if address_types is None: continue address_type = address_types[0] self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal') # Check get all address types - address_types = GetAddressTypes(self.auth, address_type_ids) + address_types = GetAddressTypes(address_type_ids) if address_types is not None: self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal') if self.config.verbose: - print "Added address types", address_type_ids + utils.header("Added address types: %s " % address_type_ids) return address_type_ids def DeleteAddressTypes(self): - DeleteAddressType = self.debug(self.api.DeleteAddressType) + DeleteAddressType = self.debug(shell.DeleteAddressType) for address_type_id in self.address_type_ids: - DeleteAddressType(auth, address_type_id) + DeleteAddressType(ddress_type_id) - GetAddressTypes = self.debug(self.api.GetAddressTypes) - address_types = GetAddressTypes(self.auth, self.address_type_ids) + GetAddressTypes = self.debug(shell.GetAddressTypes) + address_types = GetAddressTypes(self.address_type_ids) self.islistequal(address_types, [], 'DeleteAddressType - check') if self.config.verbose: @@ -545,8 +544,8 @@ class api_unit_test(Test): for i in range(n): address_fields = random_address() site_id = random.sample(self.site_ids, 1)[0] - AddSiteAddress = self.debug(self.api.AddSiteAddress) - address_id = AddSiteAddress(self.auth, site_id, address_fields) + AddSiteAddress = self.debug(shell.AddSiteAddress) + address_id = AddSiteAddress(site_id, address_fields) if address_id is None: continue # Should return a unique address_id @@ -554,170 +553,220 @@ class api_unit_test(Test): address_ids.append(address_id) # Check address - GetAddresses = self.debug(self.api.GetAddresses) - addresses = GetAddresses(self.auth, [address_id]) + GetAddresses = self.debug(shell.GetAddresses) + addresses = GetAddresses([address_id]) if addresses is None: continue address = addresses[0] self.isequal(address, address_fields, 'AddSiteAddress - isequal') # Update address address_fields = random_address() - UpdateAddress = self.debug(self.api.UpdateAddress) - result = UpdateAddress(self.auth, address_id, address_fields) + UpdateAddress = self.debug(shell.UpdateAddress) + result = UpdateAddress(address_id, address_fields) # Check again - addresses = GetAddresses(self.auth, [address_id]) + addresses = GetAddresses([address_id]) if addresses is None: continue address = addresses[0] self.isequal(address, address_fields, 'UpdateAddress - isequal') - addresses = GetAddress(self.auth, address_ids) + addresses = GetAddresses(address_ids) if addresses is not None: - slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal') + self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal') if self.config.verbose: - utils.header("Added addresses: %s" % self.address_ids) + utils.header("Added addresses: %s" % address_ids) return address_ids def DeleteAddresses(self): - DeleteAddress = self.debug(self.api.DeleteAddress) + DeleteAddress = self.debug(shell.DeleteAddress) # Delete site addresses for address_id in self.address_ids: - result = DeleteAddress(self.auth, address_id) + result = DeleteAddress(address_id) # Check - GetAddresses = self.debug(self.api.GetAddresses) - addresses = GetAddresses(self.api, self.address_ids) + GetAddresses = self.debug(shell.GetAddresses) + addresses = GetAddresses(self.address_ids) self.islistequal(addresses, [], 'DeleteAddress - check') - if self.verbose: - print "Deleted addresses", self.address_ids + if self.config.verbose: + utils.header("Deleted addresses: %s" % self.address_ids) self.address_ids = [] - def AddPersons(self, n = 3): + def Slices(self, n = 3): + slice_ids = [] + for i in range(n): + # Add Site + slice_fields = random_slice() + AddSlice = self.debug(shell.Slice) + slice_id = AddSlice(slice_fields) + if slice_id is None: continue + + # Should return a unique id + self.isunique(slice_id, slice_ids, 'AddSlicel - isunique') + slice_ids.append(slice_id) + GetSlices = self.debug(shell.GetSlices) + slices = GetSlices([slice_id]) + if slices is None: continue + slice = slices[0] + self.isequal(slice, slice_fields, 'AddSlice - isequal') + + # Update slice + slice_fields = random_slice() + UpdateSite = self.debug(shell.UpdateSlice) + result = UpdateSlice(slice_id, slice_fields) + + # Check again + slices = GetSites([slice_id]) + if slices is None: continue + slice = slices[0] + self.isequal(slice, slice_fields, 'UpdateSlice - isequal') + + # XX Add attribute + + # XX Add node + + slices = GetSites(slice_ids) + if slices is not None: + self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal') - roles = GetRoles() - role_ids = [role['role_id'] for role in roles] - roles = [role['name'] for role in roles] - roles = dict(zip(roles, role_ids)) + if self.config.verbose: + utils.header("Added slices: %s" % slice_ids) - for i in range(n): + return slice_ids - # Add account - person_fields = random_person() - person_id = AddPerson(person_fields) + def DeleteSlices(self): + + # XX manually delete attributes for first slice + slices = GetSlices(self.slice_ids, ['slice_attribute_ids', 'node_ids']) - # Should return a unique person_id - assert person_id not in self.person_ids - self.person_ids.append(person_id) - - if self.check: - # Check account - person = GetPersons([person_id])[0] - for field in person_fields: - if field != 'password': - assert person[field] == person_fields[field] - - # Update account - person_fields = random_person() - UpdatePerson(person_id, person_fields) - - # Check account again - person = GetPersons([person_id])[0] - for field in person_fields: - if field != 'password': - assert person[field] == person_fields[field] - - auth = {'AuthMethod': "password", - 'Username': person_fields['email'], - 'AuthString': person_fields['password']} - - if self.check: - # Check that account is disabled - try: - assert not AuthCheck(auth) - except: - pass - - # Add random set of roles - person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3)) - for person_role in person_roles: - role_id = roles[person_role] - AddRoleToPerson(role_id, person_id) - - if self.check: - person = GetPersons([person_id])[0] - assert set(person_roles) == set(person['roles']) - - # Enable account - UpdatePerson(person_id, {'enabled': True}) - - if self.check: - # Check that account is enabled - assert AuthCheck(auth) - - # Associate account with random set of sites - person_site_ids = [] - for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))): - AddPersonToSite(person_id, site_id) - person_site_ids.append(site_id) - - if self.check: - # Make sure it really did it - person = GetPersons([person_id])[0] - assert set(person_site_ids) == set(person['site_ids']) - - # Set a primary site - primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0] - SetPersonPrimarySite(person_id, primary_site_id) - - if self.check: - person = GetPersons([person_id])[0] - assert person['site_ids'][0] == primary_site_id + + DeleteSlice = self.debug(shell.DeleteSlice) + # Have DeleteSlice automatically delete attriubtes for the rest + for slice_id in self.slice_ids: + # Delete account + DeleteSlice(slice_id) + + # Check if persons are deleted + GetSlices = self.debug(shell.GetSlices) + slices = GetSlices(self.slice_ids) + self.islistequal(slices, [], 'DeleteSlice - check') if self.verbose: - print "Added users", self.person_ids + utils.header("Deleted slices: %s" % self.slice_ids) - def DeletePersons(self): - # Delete users - for person_id in self.person_ids: - # Remove from each site - for site_id in self.site_ids: - DeletePersonFromSite(person_id, site_id) + self.slice_ids = [] - if self.check: - person = GetPersons([person_id])[0] - assert not person['site_ids'] - # Revoke roles - person = GetPersons([person_id])[0] - for role_id in person['role_ids']: - DeleteRoleFromPerson(role_id, person_id) + def Persons(self, n = 3): - if self.check: - person = GetPersons([person_id])[0] - assert not person['role_ids'] + person_ids = [] + for i in range(n): - # Disable account - UpdatePerson(person_id, {'enabled': False}) + # Add account + person_fields = random_person() + AddPerson = self.debug(shell.AddPerson) + person_id = AddPerson(person_fields) + if person_id is None: continue + + # Should return a unique person_id + self.isunique(person_id, person_ids, 'AddPerson - isunique') + person_ids.append(person_id) + GetPersons = self.debug(shell.GetPersons) + persons = GetPersons([person_id]) + if persons is None: continue + person = persons[0] + self.isequal(person, person_fields, 'AddPerson - isequal') + + # Update account + person_fields = random_person() + person_fields['enabled'] = True + UpdatePerson = self.debug(shell.UpdatePerson) + result = UpdatePerson(person_id, person_fields) + + # Add random role + AddRoleToPerson = self.debug(shell.AddRoleToPerson) + role = random.sample(roles, 1)[0] + result = AddRoleToPerson(role, person_id) + + # Add key to person + key = random_key() + key_id = AddPersonKey = self.debug(shell.AddPersonKey) + AddPersonKey(person_id, key) + + # Add person to site + site_id = random.sample(self.site_ids, 1)[0] + AddPersonToSite = self.debug(shell.AddPersonToSite) + AddPersonToSite(person_id, site_id) + + # Add person to slice + slice_id = random.sample(self.slice_ids, 1)[0] + AddPersonToSlice = self.debug(self.AddPersonToSlice) + AddPersonToSlice(person_id, slice_id) + + # check role, key, site, slice + persons = GetPersons([person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids']) + if persons is None or not persons: continue + person = persons[0] + self.islistequal([role], person['roles'], 'AddRoleToPerson - check') + self.islistequal([key_id], person['key_ids'], 'AddPersonKey - check') + self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check') + self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check') + + persons = GetPersons(person_ids) + if persons is not None: + self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal') - if self.check: - person = GetPersons([person_id])[0] - assert not person['enabled'] + if self.verbose: + utils.header("Added users: %s" % self.person_ids) + def DeletePersons(self): + + # Delete attributes manually for first person + persons = GetPersons(self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids']) + if persons is None or not persons: return 0 + person = persons[0] + + # Delete role + DeleteRoleFromPerson = self.debug(shell.DeleteRoleFromPerson) + DeleteRoleFromPerson(person['role'], person['person_id']) + + # Delete key + DeleteKey = self.debug(shell.DeleteKey) + DeleteKey(person['key_id']) + + # Remove person from site + DeletePersonFromSite = self.debug(shell.DeletePersonFromSite) + DeletePersonFromSite(person['person_id'], person['site_id']) + + # Remove person from slice + DeletePersonFromSlice = self.debug(shell.DeletePersonFromSlice) + DeletePersonFromSlice(person['person_id'], person['slice_id']) + + # check role, key, site, slice + persons = GetPersons([person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids']) + if persons is None or not persons: return 0 + person = persons[0] + self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check') + self.islistequal([], person['key_ids'], 'DeleteKey - check') + self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check') + self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check') + + DeletePerson = self.debug(shell.DeletePerson) + # Have DeletePeson automatically delete attriubtes for all other persons + for person_id in self.person_ids: # Delete account DeletePerson(person_id) - if self.check: - assert not GetPersons([person_id]) - - if self.check: - assert not GetPersons(self.person_ids) - - if self.verbose: - print "Deleted users", self.person_ids + # Check if persons are deleted + GetPersons = self.debug(shell.GetPersons) + persons = GetPersons(self.person_ids) + self.islistequal(persons, [], 'DeletePerson - check') + + if self.verbose: + utils.header("Deleted users: %s" % self.person_ids) self.person_ids = []