From: Tony Mack Date: Mon, 18 Feb 2008 23:07:01 +0000 (+0000) Subject: added more tests X-Git-Tag: tests-4.2-4~233 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=14ee281f51d24ccd98ac6a786e43c5f7178d030c;p=tests.git added more tests --- diff --git a/qaapi/qa/tests/api_unit_test.py b/qaapi/qa/tests/api_unit_test.py index 49fa051..6fd36bd 100755 --- a/qaapi/qa/tests/api_unit_test.py +++ b/qaapi/qa/tests/api_unit_test.py @@ -13,6 +13,7 @@ from traceback import print_exc import base64 import os, sys import socket +import struct import xmlrpclib import time @@ -43,6 +44,9 @@ except: key_types = [u'ssh'] try:types = api.GetNetworkTypes(auth) except: types = [u'ipv4'] +try:attribute_types = api.GetSliceAttributeTypes(auth) +except: attribute_types = range(20) + try: sites = api.GetSites(auth, None, ['login_base']) login_bases = [site['login_base'] for site in sites] @@ -56,6 +60,8 @@ def randfloat(min = 0.0, max = 1.0): def randint(min = 0, max = 1): return int(randfloat(min, max + 1)) +def randbool(): + return random.sample([True, False], 1)[0] # See "2.2 Characters" in the XML specification: # # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] @@ -122,7 +128,15 @@ def random_site(): 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0, 'max_slices': 10 } - + +def random_peer(): + bits = 2048 + return { + 'peername': randstr(254), + 'peer_url': "https://" + randhostname() + "/", + 'key': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))), + 'cacert': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))) + } def random_address_type(): return { 'name': randstr(20), @@ -198,6 +212,7 @@ def random_nodenetwork(): return nodenetwork_fields + def random_pcu(): return { 'hostname': randhostname(), @@ -231,6 +246,31 @@ def random_attribute_type(): 'min_role_id': random.sample(roles.values(), 1)[0], } +def random_pcu_type(): + return { + 'model': randstr(254), + 'name': randstr(254), + } + +def random_pcu_protocol_type(): + return { + 'port': randint(0, 65535), + 'protocol': randstr(254), + 'supported': randbool() + } + +def random_slice_instantiation(): + return { + 'instantiation': randstr(10) + } +def random_slice_attribute(): + return { + 'attribute_type_id': random.sample(attribute_types, 1)[0], + 'name': randstr(50), + 'description': randstr(100), + 'min_role_id': random.sample([10,20,30,40], 1)[0], + 'value': randstr(20) + } def isequal(object_fields, expected_fields): try: for field in expected_fields: @@ -255,22 +295,42 @@ def isunique(id, id_list): return True class api_unit_test(Test): + + logfile = Logfile("api-unittest-summary.log") - error_log = Logfile('api-unittest-error.log') - def call(self, + boot_states = 2, sites = 2, - boot_sates = 2, - conf_files = 3, - nodes = 4, + peers = 2, address_types = 3, addresses = 2, - persons = 4, - keys = 3, - key_types = 3, + pcu_types = 2, + pcu_protocol_types = 2, + pcus = 2, + network_methods = 2, + network_types = 2, + nodegroups = 3, + nodes = 4, + conf_files = 3, + nodenetworks = 4, + nodenetworksetting_types = 2, + nodenetworksettings = 2, + slice_attribute_types = 2, + slice_instantiations = 2, slices = 4, + slice_attributes = 4, initscripts = 4, + roles = 2, + persons = 4, + key_types = 3, + keys = 3, + messages = 2 ): + # Filter out deprecated (Adm) and boot Methods + current_methods = lambda method: not method.startswith('Adm') or \ + not method.startswith('Slice') or \ + not method.startswith('Boot') or \ + not method.startswith('system') self.all_methods = set(api.system.listMethods()) self.methods_tested = set() self.methods_failed = set() @@ -278,53 +338,52 @@ class api_unit_test(Test): # Begin testing methods try: try: - #self.boot_state_ids = self.BootStates(boot_states) - self.site_ids = self.Sites(sites) - #self.peer_ids = self.Peers(peers) - self.address_type_ids = self.AddressTypes(address_types) - self.address_ids = self.Addresses(addresses) - #self.conf_files = self.ConfFiles(conf_files) - #self.network_method_ids = self.NetworkMethods() - #self.network_type_ids = self.NetworkTypes() - #self.nodegroup_ids = self.NodeGroups() - self.node_ids = self.Nodes(nodes) - #self.node_network_ids = self.NodeNetworks(node_networks) - #self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types) - #self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings) - #self.pcu_protocol_types_ids = self.PCUProtocolTypes(pcu_protocol_types) - #self.pcus_ids = self.PCUs(pcus) - #self.pcu_types_ids = self.PCUTypes(pcu_types) - #self.role_ids = self.Roles(roles) - #self.key_types = self.KeyTypes(key_types) - #self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types) - #self.slice_instantiation_ids = self.SliceInstantiations(slice_instantiations) - self.slice_ids = self.Slices(slices) - #self.slice_attribute_ids = self.SliceAttributes(slice_attributes) - #self.initscript_ids = self.InitScripts(initscripts) - self.person_ids = self.Persons(persons) - self.key_ids = self.Keys(keys) - # self.message_ids = self.Messages(messages) - #self.NotifPersons() + if hasattr(self, 'BootStates'): self.boot_states = self.BootStates(boot_states) + if hasattr(self, 'Sites'): self.site_ids = self.Sites(sites) + if hasattr(self, 'Peers'): self.peer_ids = self.Peers(peers) + if hasattr(self, 'AddressTypes'): self.address_type_ids = self.AddressTypes(address_types) + if hasattr(self, 'Addresses'): self.address_ids = self.Addresses(addresses) + if hasattr(self, 'PCUTypes'): self.pcu_type_ids = self.PCUTypes(pcu_types) + if hasattr(self, 'PCUProtocolTypes'): self.pcu_protocol_type_ids = self.PCUProtocolTypes(pcu_protocol_types) + if hasattr(self, 'PCUs'): self.pcu_ids = self.PCUs(pcus) + if hasattr(self, 'NetworkMethods'): self.network_methods = self.NetworkMethods() + if hasattr(self, 'NetworkTypes'): self.network_types = self.NetworkTypes() + if hasattr(self, 'NodeGroups'): self.nodegroup_ids = self.NodeGroups() + if hasattr(self, 'Nodes'): self.node_ids = self.Nodes(nodes) + if hasattr(self, 'ConfFiles'): self.conf_file_ids = self.ConfFiles(conf_files) + if hasattr(self, 'NodeNetworks'): self.node_network_ids = self.NodeNetworks(node_networks) + if hasattr(self, 'NodeNetworkSettingsTypes'): self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types) + if hasattr(self, 'NodeNetworkSettings'): self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings) + if hasattr(self, 'SliceAttributeTypes'): self.slice_attribute_types = self.SliceAttributeTypes(slice_attribute_types) + if hasattr(self, 'SliceInstantiations'): self.slice_instantiations = self.SliceInstantiations(slice_instantiations) + if hasattr(self, 'Slices'): self.slice_ids = self.Slices(slices) + if hasattr(self, 'SliceAttributes'): self.slice_attribute_ids = self.SliceAttributes(slice_attributes) + if hasattr(self, 'InitScripts'): self.initscript_ids = self.InitScripts(initscripts) + if hasattr(self, 'Roles'): self.role_ids = self.Roles(roles) + if hasattr(self, 'Persons'): self.person_ids = self.Persons(persons) + if hasattr(self, 'KeyTypes'): self.key_types = self.KeyTypes(key_types) + if hasattr(self, 'Keys'): self.key_ids = self.Keys(keys) + if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages) + if hasattr(self, 'NotifyPersons'): self.NotifPersons() # Test GetEventObject only - #self.event_object_ids = self.GetEventObjects() - #self.event_ids = self.GetEvents() + if hasattr(self, 'GetEventObject'): self.event_object_ids = self.GetEventObjects() + if hasattr(self, 'GetEvents'): self.event_ids = self.GetEvents() except: print_exc() finally: try: self.cleanup() finally: - utils.header("writing api_unitest.log") - logfile = Logfile("api-unittest-summary.log") + utils.header("writing api-unittest-summary.log") methods_ok = list(self.methods_tested.difference(self.methods_failed)) methods_failed = list(self.methods_failed) methods_untested = list(self.all_methods.difference(self.methods_tested)) methods_ok.sort() methods_failed.sort() methods_untested.sort() - print >> logfile, "\n".join([m+": [OK]" for m in methods_ok]) - print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed]) - print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested]) + print >> self.logfile, "\n".join([m+": [OK]" for m in methods_ok]) + print >> self.logfile, "\n".join([m+": [FAILED]" for m in methods_failed]) + print >> self.logfile, "\n".join([m+": [Not Tested]" for m in methods_untested]) def isequal(self, object_fields, expected_fields, method_name): try: @@ -360,7 +419,7 @@ class api_unit_test(Test): return method(*args, **kwds) except: self.methods_failed.update([method_name]) - print >> self.error_log, "%s: %s\n" % (method_name, traceback.format_exc()) + print >> self.logfile, "%s: %s\n" % (method_name, traceback.format_exc()) return None return wrapper @@ -369,21 +428,28 @@ class api_unit_test(Test): if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes() if hasattr(self, 'key_ids'): self.DeleteKeys() if hasattr(self, 'person_ids'): self.DeletePersons() + if hasattr(self, 'role_ids'): self.DeleteRoles() if hasattr(self, 'initscript_ids'): self.DeleteInitScripts() if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes() if hasattr(self, 'slice_ids'): self.DeleteSlices() - if hasattr(self, 'slice_instantiation_ids'): self.DeleteSliceInstantiations() + if hasattr(self, 'slice_instantiations'): self.DeleteSliceInstantiations() if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes() - if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes() - if hasattr(self, 'role_ids'): self.DeleteRoles() - if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes() + if hasattr(self, 'nodenetwork_setting_ids'): self.DeleteNodeNetworkSettings() + if hasattr(self, 'nodenetwork_setting_type_ids'): self.DeleteNodeNetworkSettingTypes() + if hasattr(self, 'nodenetwork_ids'): self.DeleteNodeNetworks() + if hasattr(self, 'conffile_ids'): self.DeleteConfFiles() + if hasattr(self, 'node_ids'): self.DeleteNodes() + if hasattr(self, 'nodegroups_ids'): self.DeleteNodeGroups() + if hasattr(self, 'network_types'): self.DeleteNetworkTypes() + if hasattr(self, 'network_methods'): self.DeleteNetworkMethods() if hasattr(self, 'pcu_ids'): self.DeletePCUs() - if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes() - if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings() + if hasattr(self, 'pcu_protocol_type_ids'): self.DeletePCUProtocolTypes() + if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes() if hasattr(self, 'address_ids'): self.DeleteAddresses() if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes() - if hasattr(self, 'node_ids'): self.DeleteNodes() + if hasattr(self, 'peer_ids'): self.DeletePeers() if hasattr(self, 'site_ids'): self.DeleteSites() + if hasattr(self, 'boot_state_ids'): self.DeleteBootStates() def Sites(self, n=4): @@ -415,7 +481,7 @@ class api_unit_test(Test): site = sites[0] self.isequal(site, site_fields, 'UpdateSite - isequal') - sites = GetSites(site_ids) + sites = GetSites(auth, site_ids) if sites is not None: self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal') @@ -429,7 +495,7 @@ class api_unit_test(Test): # Delete all sites DeleteSite = self.debug(api.DeleteSite) for site_id in self.site_ids: - result = DeleteSite(site_id) + result = DeleteSite(auth, site_id) # Check if sites are deleted GetSites = self.debug(api.GetSites) @@ -439,8 +505,318 @@ class api_unit_test(Test): if self.config.verbose: utils.header("Deleted sites: %s" % self.site_ids) - self.site_ids = [] + self.site_ids = [] + + def NetworkMethods(self, n=2): + methods = [] + AddNetworkMethod = self.debug(api.AddNetworkMethod) + GetNetworkMethods = self.debug(api.GetNetworkMethods) + + for i in range(n): + # Add Network Method + net_method = randstr(10) + AddNetworkMethod(auth, net_method) + if net_method is None: continue + + # Should return a unique id + self.isunique(net_method, methods, 'AddNetworkMethod - isunique') + methods.append(net_method) + net_methods = GetNetworkMethods(auth) + if net_methods is None: continue + net_methods = filter(lambda x: x in [net_method], net_methods) + method = net_methods[0] + self.isequal(method, net_method, 'AddNetworkMethod - isequal') + + + net_methods = GetNetworkMethods(auth) + if net_methods is not None: + net_methods = filter(lambda x: x in methods, net_methods) + self.islistequal(methods, net_methods, 'GetNetworkMethods - isequal') + + if self.config.verbose: + utils.header("Added network methods: %s" % methods) + + return methods + + def DeleteNetworkMethods(self): + DeleteNetworkMethod = self.debug(api.DeleteNetworkMethod) + GetNetworkMethods = self.debug(api.GetNetworkMethods) + for method in self.network_methods: + DeleteNetworkMethod(auth, method) + + # check + network_methods = GetNetworkMethods(auth, self.network_methods) + self.islistequal(network_methods, [], 'DeleteNetworkMethods - check') + + if self.config.verbose: + utils.header("Deleted network methods: %s" % self.network_methods) + self.network_methods = [] + + def NetworkTypes(self, n=2): + net_types = [] + AddNetworkType = self.debug(api.AddNetworkType) + GetNetworkTypes = self.debug(api.GetNetworkTypes) + + for i in range(n): + # Add Network Type + type = randstr(10) + AddNetworkType(auth, type) + + # Should return a unique id + self.isunique(type, net_types, 'AddNetworkType - isunique') + net_types.append(type) + types = GetNetworkTypes(auth) + if types is None: continue + types = filter(lambda x: x in [type], types) + if types is None: continue + net_type = types[0] + self.isequal(net_type, type, 'AddNetworkType - isequal') + + types = GetNetworkTypes(auth, net_types) + if types is not None: + types = filter(lambda x: x in net_types, types) + self.islistequal(types, net_types, 'GetNetworkTypes - isequal') + + if self.config.verbose: + utils.header("Added network types: %s" % net_types) + + return net_types + + def DeleteNetworkTypes(self): + DeleteNetworkType = self.debug(api.DeleteNetworkType) + GetNetworkTypes = self.debug(api.GetNetworkTypes) + for type in self.network_types: + DeleteNetworkType(auth, type) + + # check + network_types = GetNetworkTypes(auth, self.network_types) + self.islistequal(network_types, [], 'DeleteNetworkTypes - check') + + if self.config.verbose: + utils.header("Deleted network types: %s" % self.network_types) + self.network_types = [] + + + def NodeGroups(self, n = 4): + nodegroup_ids = [] + AddNodeGroup = self.debug(api.AddNodeGroup) + UpdateNodeGroup = self.debug(api.UpdateNodeGroup) + GetNodeGroups = self.debug(api.GetNodeGroups) + + for i in range(n): + # Add Nodegroups + nodegroup_fields = random_nodegroup() + nodegroup_id = AddNodeGroup(auth, nodegroup_fields) + if nodegroup_id is None: continue + + # Should return a unique id + self.isunique(nodegroup_id, nodegroup_ids, 'AddNodeGroup - isunique') + nodegroup_ids.append(nodegroup_id) + nodegroups = GetNodeGroups(auth, [nodegroup_id]) + if nodegroups is None: continue + nodegroup = nodegroups[0] + self.isequal(nodegroup, nodegroup_fields, 'AddNodeGroup - isequal') + + # Update NodeGroups + nodegroup_fields = random_nodegroup() + UpdateNodeGroup(auth, nodegroup_id, nodegroup_fields) + + # Check again + nodegroups = GetNodeGroups(auth, [nodegroup_id]) + if nodegroups is None: continue + nodegroup = nodegroups[0] + self.isequal(nodegroup, nodegroup_fields, 'UpdateNodeGroup - isequal') + + nodegroups = GetNodeGroups(auth, nodegroup_ids) + if nodegroups is not None: + self.islistequal(nodegroup_ids, [n['nodegroup_id'] for n in nodegroups], 'GetNodeGroups - isequal') + if self.config.verbose: + utils.header("Added nodegroups: %s" % nodegroup_ids) + + return nodegroup_ids + + def DeleteNodeGroups(self): + # Delete all NodeGroups + GetNodeGroups = self.debug(api.GetNodeGroups) + DeleteNodeGroup = self.debug(api.DeleteNodeGroup) + + for nodegroup_id in self.nodegroup_ids: + result = DeleteNodeGroup(auth, nodegroup_id) + + # Check is nodegroups are deleted + nodegroups = GetNodeGroups(auth, self.nodegroup_ids) + self.islisteqeual(nodegroups, [], 'DeleteNodeGroup - check') + + if self.config.verbose: + utils.header("Deleted nodegroups: %s" % self.nodegroup_ids) + + self.nodegroup_ids = [] + def PCUTypes(self, n=2): + pcu_type_ids = [] + AddPCUType = self.debug(api.AddPCUType) + UpdatePCUType = self.debug(api.UpdatePCUType) + GetPCUTypes = self.debug(api.GetPCUTypes) + + for i in range(n): + # Add PCUType + pcu_type_fields = random_pcu_type() + pcu_type_id = AddPCUType(auth, pcu_type_fields) + if pcu_type_id is None: continue + + # Should return a unique id + self.isunique(pcu_type_id, pcu_type_ids, 'AddPCUType - isunique') + pcu_type_ids.append(pcu_type_id) + + # Check pcu type + pcu_types = GetPCUTypes(auth, [pcu_type_id]) + if pcu_types is None: continue + pcu_type = pcu_types[0] + self.isequal(pcu_type, pcu_type_fields, 'AddPCUType - isequal') + + # Update PCUType + pcu_type_fields = random_pcu_type() + pcu_type_id = UpdatePCUType(auth, pcu_type_id, pcu_type_fields) + + # Check again + pcu_types = GetPCUTypes(auth, [pcu_type_id]) + if pcu_types is None: continue + pcu_type = pcu_types[0] + self.isequal(pcu_type, pcu_type_fields, 'UpdatePCUType - isequal') + + pcu_types = GetPCUTypes(auth, pcu_type_ids) + if pcu_types is not None: + self.islistequal(pcu_type_ids, [p['pcu_type_id'] for p in pcu_types], 'GetPCUTypes - check') + + if self.config.verbose: + utils.header("Added pcu_types: %s " % pcu_type_ids) + return pcu_type_ids + + def DeletePCUTypes(self): + GetPCUTypes = self.debug(api.GetPCUTypes) + DeletePCUType = self.debug(api.DeletePCUType) + + for pcu_type_id in self.pcu_type_ids: + DeletePCUType(auth, pcu_type_id) + + pcu_types = GetPCUTypes(auth, self.pcu_type_ids) + self.islistequal(pcu_types, [], 'DeletePCUType - check') + + def PCUProtocolTypes(self, n=2): + protocol_type_ids = [] + AddPCUProtocolType = self.debug(api.AddPCUProtocolType) + UpdatePCUProtocolType = self.debug(api.UpdatePCUProtocolType) + GetPCUProtocolTypes = self.debug(api.GetPCUProtocolTypes) + + for i in range(n): + # Add PCUProtocolType + protocol_type_fields = random_pcu_protocol_type() + pcu_type_id = random.sample(self.pcu_type_ids, 1)[0] + protocol_type_id = AddPCUProtocolType(auth, pcu_type_id, protocol_type_fields) + if protocol_type_id is None: continue + + # Should return a unique id + self.isunique(protocol_type_id, protocol_type_ids, 'AddPCUProtocolType - isunique') + protocol_type_ids.append(protocol_type_id) + + # Check protocol type + protocol_types = GetPCUProtocolTypes(auth, [protocol_type_id]) + if protocol_types is None: continue + protocol_type = protocol_types[0] + self.isequal(protocol_type, protocol_type_fields, 'AddPCUProtocolType - isequal') + + # Update protocol type + protocol_type_fields = random_pcu_protocol_type() + UpdatePCUProtocolType(auth, protocol_type_id, protocol_type_fields) + + # Check again + protocol_types = GetPCUProtocolTypes(auth, [protocol_type_id]) + if protocol_types is None: continue + protocol_type = protocol_types[0] + self.isequal(protocol_type, protocol_type_fields, 'UpdatePCUProtocolType - isequal') + + protocol_types = GetPCUProtocolTypes(auth, protocol_type_ids) + if protocol_types is not None: + pt_ids = [p['pcu_protocol_type_id'] for p in protocol_types] + self.islistequal(protocol_type_ids, pt_ids, 'GetPCUProtocolTypes - isequal') + + if self.config.verbose: + utils.header('Added pcu_protocol_types: %s' % protocol_type_ids) + + return protocol_type_ids + + def DeletePCUProtocolTypes(self): + GetPCUProtocolTypes = self.debug(api.GetPCUProtocolTypes) + DeletePCUProtocolType = self.debug(api.DeletePCUProtocolType) + + for protocol_type_id in self.pcu_protocol_type_ids: + DeletePCUProtocolType(auth, protocol_type_id) + + # check + protocol_types = GetPCUProtocolTypes(auth, self.pcu_protocol_type_ids) + self.islistequal(protocol_types, [], 'DeletePCUProtocolType - check') + + if self.config.verbose: + utils.header("Deleted pcu_protocol_types: %s" % self.pcu_protocol_type_ids) + self.pcu_protocol_type_ids = [] + + def PCUs(self, n = 4): + pcu_ids = [] + AddPCU = self.debug(api.AddPCU) + UpdatePCU = self.debug(api.UpdatePCU) + GetPCUs = self.debug(api.GetPCUs) + + for i in range(n): + # Add PCU + pcu_fields = random_pcu() + site_id = random.sample(self.site_ids, 1)[0] + pcu_id = AddPCU(auth, site_id, pcu_fields) + if pcu_id is None: continue + + # Should return a unique id + self.isunique(pcu_id, pcu_ids, 'AddPCU - isunique') + pcu_ids.append(pcu_id) + + # check PCU + pcus = GetPCUs(auth, [pcu_id]) + if pcus is None: continue + pcu = pcus[0] + self.isequal(pcu, pcu_fields, 'AddPCU - isequal') + + # Update PCU + pcu_fields = random_pcu() + UpdatePCU(auth, pcu_id, pcu_fields) + + # Check again + pcus = GetPCUs(auth, [pcu_id]) + if pcus is None: continue + pcu = pcus[0] + self.isequal(pcu, pcu_fields, 'UpdatePCU - isequal') + + pcus = GetPCUs(auth, pcu_ids) + if pcus is not None: + self.islistequal(pcu_ids, [p['pcu_id'] for p in pcus], 'GetPCUs - isequal') + + if self.config.verbose: + utils.header('Added pcus: %s' % pcu_ids) + + return pcu_ids + + def DeletePCUs(self): + GetPCUs = self.debug(api.GetPCUs) + DeletePCU = self.debug(api.DeletePCU) + + for pcu_id in self.pcu_ids: + DeletePCU(auth, pcu_id) + + # check + pcus = GetPCUs(auth, self.pcu_ids) + self.islistequal(pcus, [], 'DeletePCU - check') + + if self.config.verbose: + utils.header("Deleted pcus: %s " % self.pcu_ids) + self.pcu_ids = [] + def Nodes(self, n=4): node_ids = [] for i in range(n): @@ -472,6 +848,23 @@ class api_unit_test(Test): if nodes is None: continue node = nodes[0] self.isequal(node, node_fields, 'UpdateNode - isequal') + + # Add node to nodegroup + nodegroup_id = random.sample(self.nodegroup_ids, 1)[0] + AddNodeToNodeGroup = self.debug(api.AddNodeToNodeGroup) + AddNodeToNodeGroup(auth, node_id, nodegroup_id) + + # Add node to PCU + pcu_id = random.sample(self.pcu_ids, 1)[0] + AddNodeToPCU = self.debug(api.AddNodeToPCU) + AddNodeToPCU(auth, node_id, nodegroup_id) + + # check nodegroup, pcu + nodes = GetNodes(auth, [node_id], ['nodegroup_ids', 'pcu_ids']) + if nodes is None or not nodes: continue + node = nodes[0] + self.islistequal([nodegroup_id], node['nodegroup_ids'], 'AddNodeToNodeGroup - check') + self.islistequal([pcu_id], node['pcu_ids'], 'AddNodeToPCU - check') nodes = GetNodes(auth, node_ids) if nodes is not None: @@ -483,6 +876,32 @@ class api_unit_test(Test): return node_ids def DeleteNodes(self): + + # Delete attributes manually for first node + GetNodes = self.debug(api.GetNodes) + nodes = GetNodes(auth, self.node_ids) + if nodes is None or not nodes: return 0 + node = nodes[0] + + if node['nodegroup_ids']: + # Delete NodeGroup + nodegroup_id = random.sample(node['nodegroup_ids'], 1)[0] + DeleteNodeFromNodeGroup = self.debug(api.DeleteNodeFromNodeGroup) + DeleteNodeFromNodeGroup(auth, node['node_id'], nodegroup_id) + + if node['pcu_ids']: + # Delete PCU + pcu_id = random.sample(node['pcu_ids'], 1)[0] + DeleteNodeFromPCU = self.debug(api.DeleteNodeFromPCU) + DeleteNodeFromPCU(auth, node['node_id'], pcu_id) + + # check nodegroup, pcu + nodes = GetNodes(auth, [node['node_id']]) + if nodes is None or not nodes: return 0 + self.islistequal([], node['nodegroup_ids'], 'DeleteNodeGromNodeGroup - check') + self.islistequal([], node['pcu_ids'], 'DeleteNodeFromPCU - check') + + # Delete rest of nodes DeleteNode = self.debug(api.DeleteNode) for node_id in self.node_ids: result = DeleteNode(auth, node_id) @@ -496,7 +915,7 @@ class api_unit_test(Test): utils.header("Deleted nodes: %s" % self.node_ids) self.node_ids = [] - + def AddressTypes(self, n = 3): address_type_ids = [] for i in range(n): @@ -609,19 +1028,122 @@ class api_unit_test(Test): self.address_ids = [] + def SliceAttributeTypes(self, n = 2): + attribute_type_ids = [] + AddSliceAttributeType = self.debug(api.AddSliceAttribute) + GetSliceAttributeTypes = self.debug(api.GetSliceAttributes) + UpdateSliceAttributeType = self.debug(api.UpdateSliceAttribute) + + for i in range(n): + attribute_type_fields = random_attribute_type() + attribute_type_id = AddSliceAttributeType(auth, attribute_type_fields) + if attribute_type_id is None: continue + + # Should return a unique slice_attribute_type_id + self.isunique(attribute_type_id, attribute_type_ids, 'AddSliceAttributeType - isunique') + attribute_type_ids.append(attribute_type_id) + + # Check slice_attribute_type + attribute_types = GeddtSliceAttribute_types(auth, [attribute_type_id]) + if attribute_types is None: continue + attribute_type = attribute_types[0] + self.isequal(attribute_type, attribute_types_fields, 'AddSliceAttributeType - isequal') + + # Update slice_attribute_type + attribute_type_fields = random_attribute_type() + result = UpdateSliceAttributeType(auth, attribute_type_id, attribute_type_fields) + + # Check again + attribute_types = GetSliceAttributeTypes(auth, [attribute_type_id]) + if attribute_types is None: continue + attribute_type = attribute_types[0] + self.isequal(attribute_type, attribute_type_fields, 'UpdateSliceAttributeType - isequal') + + attribute_types = GetSliceAttributeType(auth, attribute_type_ids) + if attribute_types is not None: + at_ids = [at['attribute_type_id'] for at in attribute_types] + self.islistequal(attribute_type_ids, at_ids, 'GetSliceAttributeTypes - isequal') + + if self.config.verbose: + utils.header("Added slice_attribute_types: %s" % attribute_type_ids) + + return attribute_type_ids + + def DeleteSliceAttributeTypes(self): + DeleteSliceAttributeType = self.debug(api.DeleteSliceAttributeType) + GetSliceAttributeTypes = self.debug(api.GetSliceAtttributeTypes) + + # Delete slice_attribute_type + for slice_attribute_type_id in self.slice_attribute_types_ids: + result = DeleteSliceAttributeType(auth, slice_attribute_type_id) + + # Check + slice_attribute_type_ids = GetSliceAttributeTypes(auth, self.slice_attribute_type_id) + self.islistequal(slice_attribute_type_ids, [], 'DeleteSliceAttributeTypes - check') + if self.config.verbose: + utils.header("Deleted slice_attributes: %s" % self.slice_attribute_type_ids) + + self.slice_attribute_type_ids = [] + + def SliceInstantiations(self, n = 2): + insts = [] + AddSliceInstantiation= self.debug(api.AddSliceInstantiation) + GetSliceInstantiations = self.debug(api.GetSliceInstantiaton) + + for i in range(n): + instantiation_fields = random_instantiation() + result = AddSliceInstantiation(auth, instantiation_fields) + if result is None: continue + instantiation = instantiation_fields['instantiation'] + insts.append(instantiation) + + # Check slice instantiaton + instantiations = GetSliceInstantiation(auth, ['instantiation']) + if instantiations is None: continue + instantiation = instantiations[0] + self.isequal(instantiation, instantiation_fields, 'AddSliceInstantiation - isequal') + + + instantiations = GetSliceInstantiations(auth, insts) + if instantiations is not None: + inst_list = [i['instantiaion'] for i in instantiations] + self.islistequal(insts, inst_list, 'GetSliceInstantiations - isequal') + + if self.config.verbose: + utils.header("Added slice instantiations: %s" % insts) + + return insts + + def DeleteSliceInstantiations(self): + DeleteSliceInstantiation = self.debug(api.DeleteSliceInstantiation) + GetSliceInstantiations = self.debug(api.GetSliceInstantiations) + # Delete slice instantiation + for instantiation in self.slice_instantiations: + result = DeleteSliceInstantiation(auth, instantiation) + + # Check + instantiations = GetSliceInstnatiations(auth, self.slice_instantiations) + self.islistequal(instantiations, [], 'DeleteSliceInstantiation - check') + if self.config.verbose: + utils.header("Deleted slice instantiations" % self.slice_instantiations) + + self.slice_instantiations = [] + def Slices(self, n = 3): slice_ids = [] + AddSlice = self.debug(api.AddSlice) + GetSlices = self.debug(api.GetSlices) + UpdateSlice = self.debug(api.UpdateSlice) + AddSliceToNode = self.debug(api.AddSliceToNode) for i in range(n): # Add Site slice_fields = random_slice() - AddSlice = self.debug(api.AddSlice) slice_id = AddSlice(auth, slice_fields) if slice_id is None: continue # Should return a unique id self.isunique(slice_id, slice_ids, 'AddSlicel - isunique') slice_ids.append(slice_id) - GetSlices = self.debug(api.GetSlices) slices = GetSlices(auth, [slice_id]) if slices is None: continue slice = slices[0] @@ -629,7 +1151,6 @@ class api_unit_test(Test): # Update slice slice_fields = random_slice() - UpdateSlice = self.debug(api.UpdateSlice) result = UpdateSlice(auth, slice_id, slice_fields) # Check again @@ -638,9 +1159,15 @@ class api_unit_test(Test): slice = slices[0] self.isequal(slice, slice_fields, 'UpdateSlice - isequal') - # XX Add attribute - - # XX Add node + # Add node + node_id = random.sample(self.node_ids, 1)[0] + AddSliceToNode(auth, slice_id, node_id) + + # check node + slices = GetSlices(auth, [slice_id], ['node_ids']) + if slices is None or not slices: continue + slice = slices[0] + self.islistequal([node_id], slice['node_ids'], 'AddSliceToNode - check') slices = GetSlices(auth, slice_ids) if slices is not None: @@ -653,12 +1180,27 @@ class api_unit_test(Test): def DeleteSlices(self): - # XX manually delete attributes for first slice GetSlices = self.debug(api.GetSlices) - slices = GetSlices(auth, self.slice_ids, ['slice_attribute_ids', 'node_ids']) + DeleteSlice = self.debug(api.DeleteSlice) + DeleteSliceFromNodes = self.debug(api.DeleteSliceFromNodes) + + # manually delete attributes for first slice + slices = GetSlices(auth, self.slice_ids, ['slice_id', 'node_ids']) + if slices is None or not slices: return 0 + slice = slices[0] + + if slice['node_ids']: + # Delete node from slice + node_id = random.sample(slice['node_ids'], 1)[0] + DeleteSliceFromNodes(slice['slice_id'], [node_id]) + + # Check node_ids + slices = GetSlices(auth, [slice['slice_id']], ['node_ids']) + if slices is None or not slices: return 0 + slice = slices[0] + self.islistequal([], slice['node_ids'], 'DeleteSliceFromNode - check') # Have DeleteSlice automatically delete attriubtes for the rest - DeleteSlice = self.debug(api.DeleteSlice) for slice_id in self.slice_ids: # Delete account DeleteSlice(auth, slice_id) @@ -673,6 +1215,63 @@ class api_unit_test(Test): self.slice_ids = [] + def SliceAttributes(self, n = 4): + attribute_ids = [] + AddSliceAttribute = self.debug(api.AddSliceAttribute) + GetSliceAttributes = self.debug(api.GetSliceAttributes) + UpdateSliceAttribute = self.debug(api.UpdateSliceAttribute) + + for i in range(n): + # Add slice attribute + attribute_fields = random_slice_attribute() + slice_id = random.sample(self.slice_ids, 1)[0] + attribute_id = AddSliceAttribute(auth, slice_id, attribute_fields) + if attribute_id is None: continue + + # Should return a unique id + self.isunique(attribute_id, attribute_ids, 'AddSliceAttribute - isunique') + attribute_ids.append(attribute_id) + + # Check attribute + attributes = GetSliceAttributes(auth, [attribute_id]) + if attributes is None: continue + attribute = attributes[0] + self.isequal(attribute, attribute_fields, 'AddSliceAttribute - isequal') + + # Update attribute + attribute_fields = random_attribute() + result = UpdateSliceAttribute(auth, attribute_id, attribute_fields) + + # Check again + attributes = GetSliceAttributes(auth, [attribute_id]) + if attributes is None: continue + attribute = attributes[0] + self.isequal(attribute, attribute_fields, 'UpdateSliceAttribute - isequal') + + attributes = GetSliceAttributes(auth, attribute_ids) + if attributes is not None: + attr_ids = [a['attribute_id'] for a in attributes] + self.islistequal(attribute_ids, attr_ids, 'GetSliceAttributes - isequal') + if self.config.verbose: + utils.header("Added slice attributes: %s" % attribute_ids) + + return attribute_ids + + def DeleteSliceAttributes(self): + DeleteSliceAttribute = self.debug(api.DeleteSliceAttribute) + GetSliceAttributes = self.debug(api.GetSliceAttributes) + + for attribute_id in self.slice_attribute_ids: + DeleteSliceAttribute(auth, attribute_id) + + attributes = GetSliceAttributes(auth, self.slice_attribute_ids) + self.islistequal(attributes, [], 'DeleteSliceAttribute - check') + + if self.config.verbose: + utils.header("Deleted slice attributes: %s" % self.slice_attribute_ids) + + self.slice_attribute_ids = [] + def Persons(self, n = 3): person_ids = [] @@ -861,6 +1460,202 @@ class api_unit_test(Test): utils.header("Deleted keys: %s" % self.key_ids) self.key_ids = [] + + def BootStates(self, n = 3): + boot_states = [] + AddBootState = self.debug(api.AddBootState) + GetBootStates = self.debug(api.GetBootStates) + for i in range(n): + # Add boot state + bootstate_fields = randstr(10) + result = AddBootState(auth, bootstate_fields) + if result is None: continue + + # Check boot states + boot_states.append(bootstate_fields) + bootstates = GetBootStates(auth) + if not bootstates: continue + bootstates = filter(lambda x: x in [bootstate_fields], bootstates) + if not bootstates: continue + bootstate = bootstates[0] + self.isequal(bootstate, bootstate_fields, 'AddBootState - isequal') + + # Check all + bs = GetBootStates(auth) + if bs is not None: + bs = filter(lambda x: x in [boot_states], bs) + self.islistequal(boot_states, bs, 'GetBootStates - isequal') + + if self.config.verbose: + utils.header("Added boot_states: %s" % boot_states) + + return boot_states + + def DeleteBootStates(self): + DeleteBootState = self.debug(api.DeleteBootState) + GetBootStates = self.debug(api.GetBootStates) + for boot_state in self.boot_states: + result = DeleteBootState(auth, boot_state) + + # Check if bootsates are deleted + boot_states = GetBootStates(auth, self.boot_states) + self.islistequa(boot_states, [], 'DeleteBootState check') + + if self.config.verbose: + utils.header("Deleted boot_states: %s" % self.boot_states) + + self.boot_states = [] + + + def Peers(self, n = 2): + peer_ids = [] + for i in range(n): + # Add Peer + peer_fields = random_peer() + AddPeer = self.debug(api.AddPeer) + peer_id = AddPeer(auth, peer_fields) + + # Should return a unique id + self.isunique(peer_id, peer_ids, 'AddPeer - isunique') + peer_ids.append(peer_id) + GetPeers = self.debug(api.GetPeers) + peers = GetPeers(auth, [peer_id]) + if peers is None: continue + peer = peers[0] + self.isequal(peer, peer_fields, 'AddPeer - isequal') + + # Update Peer + peer_fields = random_peer() + UpdatePeer = self.debug(api.UpdatePeer) + result = UpdatePeer(auth, peer_id, peer_fields) + + # Check again + peers = GetPeers(auth, [peer_id]) + if peers is None: continue + peer = peers[0] + self.isequal(peer, peer_fields, 'UpdatePeer - isequal') + + peers = GetPeers(auth, peer_ids) + if peers is not None: + self.islistequal(peer_ids, [peer['peer_id'] for peer in peers], 'GetPeers -isequal') + + if self.config.verbose: + utils.header("Added peers: %s" % peer_ids) + + return peer_ids + + + def DeletePeers(self): + # Delete all peers + DeletePeer = self.debug(api.DeletePeer) + for peer_id in self.peer_ids: + result = DeletePeer(auth, peer_id) + + # Check if peers are deleted + GetPeers = self.debug(api.GetPeers) + peers = GetPeers(auth, self.peer_ids) + self.islistequal(peers, [], 'DeletePeer - check' % self.peer_ids) + + if self.config.verbose: + utils.header("Deleted sites: %s" % self.peer_ids) + self.peer_ids =[] + + def ConfFiles(self, n = 2): + conf_file_ids = [] + for i in range(n): + # Add ConfFile + conf_file_fields = random_conf_file() + AddConfFile = self.debug(api.AddConfFile) + conf_file_id = AddConfFile(auth, conf_file_fields) + if conf_file_id is None: continue + + # Should return a unique id + self.isunique(conf_file_id, conf_file_ids, 'AddConfFile - isunique') + conf_file_ids.append(conf_file_id) + + # Get ConfFiles + GetConfFiles = self.debug(api.GetConfFiles) + conf_files = GetConfFiles(auth, [conf_file_id]) + if conf_files is None: continue + conf_file = conf_files[0] + self.isequal(conf_file, conf_file_fields, 'AddConfFile - isunique') + + # Update ConfFile + conf_file_fields = random_conf_file() + UpdateConfFile = self.debug(api.UpdateConfFile) + result = UpdateConfFile(auth, conf_file_id, conf_file_fields) + + # Check again + conf_files = GetConfFiles(auth, [conf_file_id]) + if conf_files is None: continue + conf_file = conf_files[0] + self.isequal(conf_file, conf_file_fields, 'UpdateConfFile - isunique') + + + # Add this conf file to a random node + node_id = random.sample(self.node_ids, 1)[0] + AddConfFileToNode = self.debug(api.AddConfFileToNode) + AddConfFileToNode(auth, conf_file_id, node_id) + + # Add this conf file to a random node group + nodegroup_id = random.sample(self.nodegroup_ids, 1)[0] + AddConfFileToNodeGroup = self.debug(api.AddConfFileToNodeGroup) + AddConfFileToNodeGroup(auth, conf_file_id, nodegroup_id) + + # Check node, nodegroup + conf_files = GetConfFiles(auth, [conf_file_id], ['node_ids', 'nodegroup_ids']) + if conf_files is None or not conf_files: continue + conf_file = conf_files[0] + self.islistequal([node_id], conf_file['node_ids'], 'AddConfFileToNode - check') + self.islistequal([nodegroup_id], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check') + + + + conf_files = GetConfFiles(auth, conf_file_ids) + if conf_files is not None: + self.islistequal(conf_file_ids, [c['conf_file_id'] for c in conf_files], 'GetConfFiles - isequal') + if self.config.verbose: + utils.header("Added conf_files: %s" % conf_file_ids) + + return conf_file_ids + + def DeleteConfFiles(self): + + GetConfFiles = self.debug(api.GetConfFiles) + conf_files = GetConfFiles(auth, self.conf_file_ids) + if conf_fiels is None or not conf_files: return 0 + conf_file = conf_files[0] + + if conf_file['node_ids']: + node_id = random.sample(conf_file['node_ids'], 1)[0] + DeleteConfFileFromNode = self.debug(api.DeleteConfFileFromNode) + DeleteConfFileFromNode(auth, conf_file['conf_file_id'], node_id) + + if conf_file['nodegroup_ids']: + nodegroup_id = random.sample(conf_file['nodegroup_ids'], 1)[0] + DeleteConfFileFromNodeGroup = self.debug(api.DeleteConfFileFromNodeGroup) + DeleteConfFileFromNode(auth, conf_file['conf_file_id'], nodegroup_id) + + # check + conf_files = GetConfFiles(auth, conf_file['conf_file_id'], ['node_ids', 'nodegroup_ids']) + if conf_files is None or not conf_files: return 0 + conf_file = conf_files[0] + self.islistequal([], conf_file['node_ids'], 'AddConfFileToNode - check') + self.islistequal([], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check') + + DeleteConfFile = self.debug(api.DeleteConfFile) + for conf_file_id in self.conf_file_ids: + DeleteConfFile(auth, conf_file_id) + + # check + conf_files = GetConfFiles(auth, self.conf_file_ids) + self.islistequal(conf_files, [], 'DeleteConfFile - check') + + if self.config.verbose: + utils.header("Deleted conf_files: %s" % self.conf_file_ids) + + self.conf_file_ids = [] + if __name__ == '__main__': args = tuple(sys.argv[1:])