from Test import Test
from qa import utils
from qa.Config import Config
-from qa.logger import Logfile, log
+from qa.logger import Logfile, log, logfile
from random import Random
random = Random()
try: boot_states = api.GetBootStates(auth)
except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
-try: roles = [role['name'] for role in api.GetRoles(auth)]
-except: roles = [u'admin', u'pi', u'user', u'tech']
+try: roles = [role['role_id'] for role in api.GetRoles(auth)]
+except: roles = [10,20,30,40]
try: methods = api.GetNetworkMethods(auth)
except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
try:types = api.GetNetworkTypes(auth)
except: types = [u'ipv4']
-try:attribute_types = api.GetSliceAttributeTypes(auth)
-except: attribute_types = range(20)
+try:
+ attribute_types = [a['attribute_type_id'] for a in api.GetSliceAttributeTypes(auth)]
+ attribute_types = filter(lambda x: x['name'] != 'initscript', attribute_types)
+
+except:
+ attribute_types = range(6,20)
try:
sites = api.GetSites(auth, None, ['login_base'])
'bwlimit': randint(500000, 10000000),
}
- if method != 'dhcp':
+ if nodenetwork_fields['method'] != 'dhcp':
ip = randint(0, 0xffffffff)
netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
network = ip & netmask
return nodenetwork_fields
+def random_nodenetwork_setting():
+ return {
+ 'value': randstr(20)
+ }
+
+def random_nodenetwork_setting_type():
+ return {
+ 'name': randstr(20),
+ 'description': randstr(50),
+ 'category': randstr(20),
+ 'min_role_id': random.sample(roles, 1)[0]
+ }
def random_pcu():
return {
return {
'name': randstr(100),
'description': randstr(254),
- 'min_role_id': random.sample(roles, 1)[0],
+ 'min_role_id': int(random.sample(roles, 1)[0]),
}
def random_pcu_type():
return {
'attribute_type_id': random.sample(attribute_types, 1)[0],
'name': randstr(50),
- 'description': randstr(100),
- 'min_role_id': random.sample([10,20,30,40], 1)[0],
'value': randstr(20)
- }
-def isequal(object_fields, expected_fields):
- try:
- for field in expected_fields:
- assert field in object_fields
- assert object_fields[field] == expected_fields[field]
- except:
- return False
- return True
-
-def islistequal(list1, list2):
- try:
- assert set(list1) == set(list2)
- except:
- return False
- return True
-
-def isunique(id, id_list):
- try:
- assert id not in id_list
- except:
- return False
- return True
-
+ }
+
+def random_initscript():
+ return {
+ 'name': randstr(20),
+ 'enabled': randbool(),
+ 'script': randstr(200)
+ }
+
+def random_role():
+ return {
+ 'role_id': randint(1000),
+ 'name': randstr(50)
+ }
+
class api_unit_test(Test):
logfile = Logfile("api-unittest-summary.log")
nodes = 4,
conf_files = 3,
nodenetworks = 4,
- nodenetworksetting_types = 2,
- nodenetworksettings = 2,
+ nodenetwork_setting_types = 2,
+ nodenetwork_settings = 2,
slice_attribute_types = 2,
slice_instantiations = 2,
slices = 4,
messages = 2
):
# Filter out deprecated (Adm) and boot Methods
- current_methods = lambda method: not method.startswith('Adm') or \
- not method.startswith('Slice') or \
- not method.startswith('Boot') or \
+ current_methods = lambda method: not method.startswith('Adm') and \
+ not method.startswith('Slice') and \
+ not method.startswith('Boot') and \
+ not method.startswith('Anon') and \
not method.startswith('system')
- self.all_methods = set(api.system.listMethods())
+ self.all_methods = set(filter(current_methods, api.system.listMethods()))
self.methods_tested = set()
self.methods_failed = set()
if hasattr(self, 'NodeGroups'): self.nodegroup_ids = self.NodeGroups()
if hasattr(self, 'Nodes'): self.node_ids = self.Nodes(nodes)
if hasattr(self, 'ConfFiles'): self.conf_file_ids = self.ConfFiles(conf_files)
- if hasattr(self, 'NodeNetworks'): self.node_network_ids = self.NodeNetworks(node_networks)
- if hasattr(self, 'NodeNetworkSettingsTypes'): self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types)
- if hasattr(self, 'NodeNetworkSettings'): self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings)
- if hasattr(self, 'SliceAttributeTypes'): self.slice_attribute_types = self.SliceAttributeTypes(slice_attribute_types)
+ if hasattr(self, 'NodeNetworks'): self.nodenetwork_ids = self.NodeNetworks(nodenetworks)
+ if hasattr(self, 'NodeNetworkSettingTypes'): self.nodenetwork_setting_type_ids = self.NodeNetworkSettingTypes(nodenetwork_setting_types)
+ if hasattr(self, 'NodeNetworkSettings'): self.nodenetwork_setting_ids = self.NodeNetworkSettings(nodenetwork_settings)
+ if hasattr(self, 'SliceAttributeTypes'): self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types)
if hasattr(self, 'SliceInstantiations'): self.slice_instantiations = self.SliceInstantiations(slice_instantiations)
if hasattr(self, 'Slices'): self.slice_ids = self.Slices(slices)
if hasattr(self, 'SliceAttributes'): self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
if hasattr(self, 'Persons'): self.person_ids = self.Persons(persons)
if hasattr(self, 'KeyTypes'): self.key_types = self.KeyTypes(key_types)
if hasattr(self, 'Keys'): self.key_ids = self.Keys(keys)
- if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages)
- if hasattr(self, 'NotifyPersons'): self.NotifPersons()
- # Test GetEventObject only
- if hasattr(self, 'GetEventObject'): self.event_object_ids = self.GetEventObjects()
- if hasattr(self, 'GetEvents'): self.event_ids = self.GetEvents()
+ if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages)
+ if hasattr(self, 'Sessions'): self.Sessions()
+
+ # Test misc Get calls
+ if hasattr(self, 'GenerateNodeConfFile'): self.GenerateNodeConfFile()
+ if hasattr(self, 'GetBootMedium'): self.GetBootMedium()
+ if hasattr(self, 'GetEventObjects'): self.event_object_ids = self.GetEventObjects()
+ if hasattr(self, 'GetEvents'): self.event_ids = self.GetEvents()
+ if hasattr(self, 'GetPeerData'): self.GetPeerData()
+ if hasattr(self, 'GetPeerName'): self.GetPeerName()
+ if hasattr(self, 'GetPlcRelease'): self.GetPlcRelease()
+ if hasattr(self, 'GetSliceKeys'): self.GetSliceKeys()
+ if hasattr(self, 'GetSliceTicket'): self.GetSliceTicket()
+ if hasattr(self, 'GetSlicesMD5'): self.GetSlicesMD5()
+ if hasattr(self, 'GetSlivers'): self.GetSlivers()
+ if hasattr(self, 'GetWhitelist'): self.GetWhitelist()
+
+ # Test various administrative methods
+ if hasattr(self, 'NotifyPersons'): self.NotifyPersons()
+ if hasattr(self, 'NotifySupport'): self.NotifySupport()
+ if hasattr(self, 'RebootNode'): self.RebootNode()
+ if hasattr(self, 'RefrestPeer'): self.RefreshPeer()
+ if hasattr(self, 'ResetPassword'): self.ResetPassword()
+ if hasattr(self, 'SetPersonPrimarySite'): self.SetPersonPrimarySite()
+ if hasattr(self, 'VerifyPerson'): self.VerifyPerson()
+
+
+
+
except:
print_exc()
finally:
self.methods_failed.update([method_name])
return False
return True
-
+
+ def isinlist(self, item, item_list, method_name):
+ try: assert item in item_list
+ except:
+ self.methods_failed.update([method_name])
+ return False
+ return True
def debug(self, method, method_name=None):
if method_name is None:
method_name = method.name
return method(*args, **kwds)
except:
self.methods_failed.update([method_name])
- print >> self.logfile, "%s: %s\n" % (method_name, traceback.format_exc())
+ print >> logfile, "%s%s: %s\n" % (method_name, tuple(args[1:]), traceback.format_exc())
return None
return wrapper
def cleanup(self):
- if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
+ if hasattr(self, 'key_types'): self.DeleteKeyTypes()
if hasattr(self, 'key_ids'): self.DeleteKeys()
if hasattr(self, 'person_ids'): self.DeletePersons()
if hasattr(self, 'role_ids'): self.DeleteRoles()
if hasattr(self, 'nodenetwork_setting_ids'): self.DeleteNodeNetworkSettings()
if hasattr(self, 'nodenetwork_setting_type_ids'): self.DeleteNodeNetworkSettingTypes()
if hasattr(self, 'nodenetwork_ids'): self.DeleteNodeNetworks()
- if hasattr(self, 'conffile_ids'): self.DeleteConfFiles()
+ if hasattr(self, 'conf_file_ids'): self.DeleteConfFiles()
if hasattr(self, 'node_ids'): self.DeleteNodes()
- if hasattr(self, 'nodegroups_ids'): self.DeleteNodeGroups()
+ if hasattr(self, 'nodegroup_ids'): self.DeleteNodeGroups()
if hasattr(self, 'network_types'): self.DeleteNetworkTypes()
if hasattr(self, 'network_methods'): self.DeleteNetworkMethods()
if hasattr(self, 'pcu_ids'): self.DeletePCUs()
if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes()
if hasattr(self, 'peer_ids'): self.DeletePeers()
if hasattr(self, 'site_ids'): self.DeleteSites()
- if hasattr(self, 'boot_state_ids'): self.DeleteBootStates()
+ if hasattr(self, 'boot_states'): self.DeleteBootStates()
def Sites(self, n=4):
DeleteNetworkMethod(auth, method)
# check
- network_methods = GetNetworkMethods(auth, self.network_methods)
+ network_methods = GetNetworkMethods(auth)
+ network_methods = filter(lambda x: x in self.network_methods, network_methods)
self.islistequal(network_methods, [], 'DeleteNetworkMethods - check')
if self.config.verbose:
net_type = types[0]
self.isequal(net_type, type, 'AddNetworkType - isequal')
- types = GetNetworkTypes(auth, net_types)
+ types = GetNetworkTypes(auth)
if types is not None:
types = filter(lambda x: x in net_types, types)
self.islistequal(types, net_types, 'GetNetworkTypes - isequal')
DeleteNetworkType(auth, type)
# check
- network_types = GetNetworkTypes(auth, self.network_types)
+ network_types = GetNetworkTypes(auth)
+ network_types = filter(lambda x: x in self.network_types, network_types)
self.islistequal(network_types, [], 'DeleteNetworkTypes - check')
if self.config.verbose:
# Check is nodegroups are deleted
nodegroups = GetNodeGroups(auth, self.nodegroup_ids)
- self.islisteqeual(nodegroups, [], 'DeleteNodeGroup - check')
+ self.islistequal(nodegroups, [], 'DeleteNodeGroup - check')
if self.config.verbose:
utils.header("Deleted nodegroups: %s" % self.nodegroup_ids)
pcu_type_fields = random_pcu_type()
pcu_type_id = AddPCUType(auth, pcu_type_fields)
if pcu_type_id is None: continue
-
# Should return a unique id
self.isunique(pcu_type_id, pcu_type_ids, 'AddPCUType - isunique')
pcu_type_ids.append(pcu_type_id)
# Check pcu type
- pcu_types = GetPCUTypes(auth, [pcu_type_id])
+ pcu_types = GetPCUTypes(auth, [pcu_type_id])
if pcu_types is None: continue
pcu_type = pcu_types[0]
self.isequal(pcu_type, pcu_type_fields, 'AddPCUType - isequal')
# Update PCUType
pcu_type_fields = random_pcu_type()
- pcu_type_id = UpdatePCUType(auth, pcu_type_id, pcu_type_fields)
+ UpdatePCUType(auth, pcu_type_id, pcu_type_fields)
# Check again
pcu_types = GetPCUTypes(auth, [pcu_type_id])
UpdatePCU = self.debug(api.UpdatePCU)
GetPCUs = self.debug(api.GetPCUs)
- for i in range(n):
+ for site_id in self.site_ids:
# Add PCU
pcu_fields = random_pcu()
- site_id = random.sample(self.site_ids, 1)[0]
pcu_id = AddPCU(auth, site_id, pcu_fields)
if pcu_id is None: continue
def Nodes(self, n=4):
node_ids = []
+ AddNode = self.debug(api.AddNode)
+ GetNodes = self.debug(api.GetNodes)
+ UpdateNode = self.debug(api.UpdateNode)
for i in range(n):
# Add Node
node_fields = random_node()
site_id = random.sample(self.site_ids, 1)[0]
- AddNode = self.debug(api.AddNode)
node_id = AddNode(auth, site_id, node_fields)
if node_id is None: continue
node_ids.append(node_id)
# Check nodes
- GetNodes = self.debug(api.GetNodes)
nodes = GetNodes(auth, [node_id])
if nodes is None: continue
node = nodes[0]
# Update node
node_fields = random_node()
- UpdateNode = self.debug(api.UpdateNode)
result = UpdateNode(auth, node_id, node_fields)
# Check again
AddNodeToNodeGroup(auth, node_id, nodegroup_id)
# Add node to PCU
- pcu_id = random.sample(self.pcu_ids, 1)[0]
+ sites = api.GetSites(auth, [node['site_id']], ['pcu_ids'])
+ if not sites: continue
+ site = sites[0]
+ pcu_id = random.sample(site['pcu_ids'], 1)[0]
+ port = random.sample(range(65535), 1)[0]
AddNodeToPCU = self.debug(api.AddNodeToPCU)
- AddNodeToPCU(auth, node_id, nodegroup_id)
+ AddNodeToPCU(auth, node_id, pcu_id, port)
# check nodegroup, pcu
nodes = GetNodes(auth, [node_id], ['nodegroup_ids', 'pcu_ids'])
self.islistequal(address_types, [], 'DeleteAddressType - check')
if self.config.verbose:
- utils.header("Deleted address types: " % self.address_type_ids)
+ utils.header("Deleted address types: %s" % self.address_type_ids)
self.address_type_ids = []
def Addresses(self, n = 3):
address_ids = []
- for i in range(n):
+ AddSiteAddress = self.debug(api.AddSiteAddress)
+ GetAddresses = self.debug(api.GetAddresses)
+ UpdateAddress = self.debug(api.UpdateAddress)
+ AddAddressTypeToAddress = self.debug(api.AddAddressTypeToAddress)
+ for i in range(n):
address_fields = random_address()
site_id = random.sample(self.site_ids, 1)[0]
- AddSiteAddress = self.debug(api.AddSiteAddress)
address_id = AddSiteAddress(auth, site_id, address_fields)
if address_id is None: continue
address_ids.append(address_id)
# Check address
- GetAddresses = self.debug(api.GetAddresses)
addresses = GetAddresses(auth, [address_id])
if addresses is None: continue
address = addresses[0]
# Update address
address_fields = random_address()
- UpdateAddress = self.debug(api.UpdateAddress)
result = UpdateAddress(auth, address_id, address_fields)
# Check again
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'UpdateAddress - isequal')
-
+
+ # Add Address Type
+ address_type_id = random.sample(self.address_type_ids, 1)[0]
+ AddAddressTypeToAddress(auth, address_type_id, address_id)
+
+ # check adress type
+ addresses = GetAddresses(auth, [address_id], ['address_type_ids'])
+ if addresses is None or not addresses: continue
+ address = addresses[0]
+ self.islistequal([address_type_id], address['address_type_ids'], 'AddAddressTypeToAddress - check')
+
addresses = GetAddresses(auth, address_ids)
if addresses is not None:
self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
def DeleteAddresses(self):
DeleteAddress = self.debug(api.DeleteAddress)
- # Delete site addresses
+ DeleteAddressTypeFromAddress = self.debug(api.DeleteAddressTypeFromAddress)
+ GetAddresses = self.debug(api.GetAddresses)
+
+ # Delete attributes mananually first
+ addresses = GetAddresses(auth, self.address_ids, ['address_id', 'address_type_ids'])
+ if addresses is None or not addresses: return 0
+ address = addresses[0]
+
+ if address['address_type_ids']:
+ address_type_id = random.sample(address['address_type_ids'], 1)[0]
+ DeleteAddressTypeFromAddress(auth, address_type_id, address['address_id'])
+
+ # check address_type_ids
+ addresses = GetAddresses(auth, [address['address_id']], ['address_type_ids'])
+ if addresses is None or not addresses: return 0
+ address = addresses[0]
+ self.islistequal([], address['address_type_ids'], 'DeleteAddressTypeFromAddress - check')
+
+ # Delete site addresses
for address_id in self.address_ids:
result = DeleteAddress(auth, address_id)
# Check
- GetAddresses = self.debug(api.GetAddresses)
addresses = GetAddresses(auth, self.address_ids)
self.islistequal(addresses, [], 'DeleteAddress - check')
if self.config.verbose:
def SliceAttributeTypes(self, n = 2):
attribute_type_ids = []
- AddSliceAttributeType = self.debug(api.AddSliceAttribute)
- GetSliceAttributeTypes = self.debug(api.GetSliceAttributes)
- UpdateSliceAttributeType = self.debug(api.UpdateSliceAttribute)
+ AddSliceAttributeType = self.debug(api.AddSliceAttributeType)
+ GetSliceAttributeTypes = self.debug(api.GetSliceAttributeTypes)
+ UpdateSliceAttributeType = self.debug(api.UpdateSliceAttributeType)
for i in range(n):
attribute_type_fields = random_attribute_type()
attribute_type_ids.append(attribute_type_id)
# Check slice_attribute_type
- attribute_types = GeddtSliceAttribute_types(auth, [attribute_type_id])
+ attribute_types = GetSliceAttributeTypes(auth, [attribute_type_id])
if attribute_types is None: continue
attribute_type = attribute_types[0]
- self.isequal(attribute_type, attribute_types_fields, 'AddSliceAttributeType - isequal')
+ self.isequal(attribute_type, attribute_type_fields, 'AddSliceAttributeType - isequal')
# Update slice_attribute_type
attribute_type_fields = random_attribute_type()
def DeleteSliceAttributeTypes(self):
DeleteSliceAttributeType = self.debug(api.DeleteSliceAttributeType)
- GetSliceAttributeTypes = self.debug(api.GetSliceAtttributeTypes)
+ GetSliceAttributeTypes = self.debug(api.GetSliceAttributeTypes)
# Delete slice_attribute_type
- for slice_attribute_type_id in self.slice_attribute_types_ids:
+ for slice_attribute_type_id in self.slice_attribute_type_ids:
result = DeleteSliceAttributeType(auth, slice_attribute_type_id)
# Check
- slice_attribute_type_ids = GetSliceAttributeTypes(auth, self.slice_attribute_type_id)
- self.islistequal(slice_attribute_type_ids, [], 'DeleteSliceAttributeTypes - check')
+ slice_attribute_types = GetSliceAttributeTypes(auth, self.slice_attribute_type_ids)
+ self.islistequal(slice_attribute_types, [], 'DeleteSliceAttributeTypes - check')
if self.config.verbose:
utils.header("Deleted slice_attributes: %s" % self.slice_attribute_type_ids)
self.isequal(instantiation, inst, 'AddSliceInstantiation - isequal')
- instantiations = GetSliceInstantiations(auth, insts)
+ instantiations = GetSliceInstantiations(auth)
if instantiations is not None:
+ instantiations = filter(lambda x: x in insts, instantiations)
self.islistequal(insts, instantiations, 'GetSliceInstantiations - isequal')
if self.config.verbose:
instantiations = filter(lambda x: x in self.slice_instantiations, instantiations)
self.islistequal(instantiations, [], 'DeleteSliceInstantiation - check')
if self.config.verbose:
- utils.header("Deleted slice instantiations" % self.slice_instantiations)
+ utils.header("Deleted slice instantiations: %s" % self.slice_instantiations)
self.slice_instantiations = []
AddSlice = self.debug(api.AddSlice)
GetSlices = self.debug(api.GetSlices)
UpdateSlice = self.debug(api.UpdateSlice)
- AddSliceToNode = self.debug(api.AddSliceToNodes)
+ AddSliceToNodes = self.debug(api.AddSliceToNodes)
for i in range(n):
# Add Site
slice_fields = random_slice()
# Add node
node_id = random.sample(self.node_ids, 1)[0]
- AddSliceToNode(auth, slice_id, node_id)
+ AddSliceToNodes(auth, slice_id, [node_id])
# check node
slices = GetSlices(auth, [slice_id], ['node_ids'])
# Add slice attribute
attribute_fields = random_slice_attribute()
slice_id = random.sample(self.slice_ids, 1)[0]
- attribute_id = AddSliceAttribute(auth, slice_id, attribute_fields)
+ type = attribute_fields['attribute_type_id']
+ value = attribute_fields['value']
+ attribute_id = AddSliceAttribute(auth, slice_id, type, value)
if attribute_id is None: continue
# Should return a unique id
self.isequal(attribute, attribute_fields, 'AddSliceAttribute - isequal')
# Update attribute
- attribute_fields = random_attribute()
- result = UpdateSliceAttribute(auth, attribute_id, attribute_fields)
+ attribute_fields = random_slice_attribute()
+ type = attribute_fields['attribute_type_id']
+ value = attribute_fields['value']
+ result = UpdateSliceAttribute(auth, attribute_id, value)
# Check again
attributes = GetSliceAttributes(auth, [attribute_id])
attributes = GetSliceAttributes(auth, attribute_ids)
if attributes is not None:
- attr_ids = [a['attribute_id'] for a in attributes]
+ attr_ids = [a['slice_attribute_id'] for a in attributes]
self.islistequal(attribute_ids, attr_ids, 'GetSliceAttributes - isequal')
if self.config.verbose:
utils.header("Added slice attributes: %s" % attribute_ids)
self.slice_attribute_ids = []
+ def InitScripts(self, n = 2):
+ initscript_ids = []
+ AddInitScript = self.debug(api.AddInitScript)
+ GetInitScripts = self.debug(api.GetInitScripts)
+ UpdateInitScript = self.debug(api.UpdateInitScript)
+ for i in range(n):
+ # Add Peer
+ initscript_fields = random_initscript()
+ initscript_id = AddInitScript(auth, initscript_fields)
+
+ # Should return a unique id
+ self.isunique(initscript_id, initscript_ids, 'AddInitScript - isunique')
+ initscript_ids.append(initscript_id)
+ initscripts = GetInitScripts(auth, [initscript_id])
+ if initscripts is None: continue
+ initscript = initscripts[0]
+ self.isequal(initscript, initscript_fields, 'AddInitScript - isequal')
+
+ # Update Peer
+ initscript_fields = random_initscript()
+ result = UpdateInitScript(auth, initscript_id, initscript_fields)
+
+ # Check again
+ initscripts = GetInitScripts(auth, [initscript_id])
+ if initscripts is None: continue
+ initscript = initscripts[0]
+ self.isequal(initscript, initscript_fields, 'UpdateInitScript - isequal')
+
+ initscripts = GetInitScripts(auth, initscript_ids)
+ if initscripts is not None:
+ self.islistequal(initscript_ids, [i['initscript_id'] for i in initscripts], 'GetInitScripts -isequal')
+
+ if self.config.verbose:
+ utils.header("Added initscripts: %s" % initscript_ids)
+
+ return initscript_ids
+
+ def DeleteInitScripts(self):
+ # Delete all initscripts
+ DeleteInitScript = self.debug(api.DeleteInitScript)
+ GetInitScripts = self.debug(api.GetInitScripts)
+ for initscript_id in self.initscript_ids:
+ result = DeleteInitScript(auth, initscript_id)
+
+ # Check if peers are deleted
+ initscripts = GetInitScripts(auth, self.initscript_ids)
+ self.islistequal(initscripts, [], 'DeletInitScript - check')
+
+ if self.config.verbose:
+ utils.header("Deleted initscripts: %s" % self.initscript_ids)
+ self.initscript_ids =[]
+
+ def Roles(self, n = 2):
+ role_ids = []
+ AddRole = self.debug(api.AddRole)
+ GetRoles = self.debug(api.GetRoles)
+ for i in range(n):
+ # Add Role
+ role_fields = random_role()
+ role_id = role_fields['role_id']
+ name = role_fields['name']
+ AddRole(auth, role_id, name)
+
+ # Should return a unique id
+ self.isunique(role_id, role_ids, 'AddRole - isunique')
+ role_ids.append(role_id)
+ roles = GetRoles(auth)
+ if roles is None: continue
+ roles = filter(lambda x: x['role_id'] in [role_id], roles)
+ role = roles[0]
+ self.isequal(role, role_fields, 'AddRole - isequal')
+
+ roles = GetRoles(auth)
+ if roles is not None:
+ roles = filter(lambda x: x['role_id'] in role_ids, roles)
+ self.islistequal(role_ids, [r['role_id'] for r in roles], 'GetRoles - isequal')
+
+ if self.config.verbose:
+ utils.header("Added roles: %s" % role_ids)
+
+ return role_ids
+
+ def DeleteRoles(self):
+ # Delete all roles
+ DeleteRole = self.debug(api.DeleteRole)
+ GetRoles = self.debug(api.GetRoles)
+ for role_id in self.role_ids:
+ result = DeleteRole(auth, role_id)
+
+ # Check if peers are deleted
+ roles = GetRoles(auth)
+ roles = filter(lambda x: x['role_id'] in self.role_ids, roles)
+ self.islistequal(roles, [], 'DeleteRole - check' % self.role_ids)
+
+ if self.config.verbose:
+ utils.header("Deleted roles: %s" % self.role_ids)
+ self.role_ids =[]
+
def Persons(self, n = 3):
person_ids = []
self.person_ids = []
+ def KeyTypes(self, n = 2):
+ key_types = []
+ AddKeyType = self.debug(api.AddKeyType)
+ GetKeyTypes = self.debug(api.GetKeyTypes)
+ for i in range(n):
+ # Add key type
+ keytype = randstr(10)
+ result = AddKeyType(auth, keytype)
+ if result is None: continue
+
+ # Check key types
+ key_types.append(keytype)
+ keytypes = GetKeyTypes(auth)
+ if not keytypes: continue
+ keytypes = filter(lambda x: x in [keytype], keytypes)
+ if not keytypes: continue
+ self.isequal(keytype, keytypes[0], 'AddKeyType - isequal')
+
+ # Check all
+ keytypes = GetKeyTypes(auth)
+ if keytypes is not None:
+ keytypes = filter(lambda x: x in key_types, keytypes)
+ self.islistequal(key_types, keytypes, 'GetKeyTypes - isequal')
+
+ if self.config.verbose:
+ utils.header("Added key types: %s" % key_types)
+
+ return key_types
+
+ def DeleteKeyTypes(self):
+ DeleteKeyType = self.debug(api.DeleteKeyType)
+ GetKeyTypes = self.debug(api.GetKeyTypes)
+ for key_type in self.key_types:
+ result = DeleteKeyType(auth, key_type)
+
+ # Check if key types are deleted
+ key_types = GetKeyTypes(auth)
+ key_types = filter(lambda x: x in self.key_types, key_types)
+ self.islistequal(key_types, [], 'DeleteKeyType - check')
+
+ if self.config.verbose:
+ utils.header("Deleted key types %s" % self.key_types)
+
+ self.key_types = []
def Keys(self, n = 3):
key_ids = []
# Blacklist first key, Delete rest
GetKeys = self.debug(api.GetKeys)
- keys = GetKeys(auth, self.key_ids)
- if keys is None or not keys: return 0
- key = keys[0]
-
+ DeleteKey = self.debug(api.DeleteKey)
BlacklistKey = self.debug(api.BlacklistKey)
- BlacklistKey(auth, key['key_id'])
- keys = GetKeys(auth, [key['key_id']])
+ key_id = self.key_ids.pop()
+ BlacklistKey(auth, key_id)
+ keys = GetKeys(auth, [key_id])
self.islistequal(keys, [], 'BlacklistKey - check')
if self.config.verbose:
- utils.header("Blacklisted key: %s" % key['key_id'])
+ utils.header("Blacklisted key: %s" % key_id)
- DeleteKey = self.debug(api.DeleteKey)
for key_id in self.key_ids:
DeleteKey(auth, key_id)
result = DeleteBootState(auth, boot_state)
# Check if bootsates are deleted
- boot_states = GetBootStates(auth, self.boot_states)
- self.islistequa(boot_states, [], 'DeleteBootState check')
+ boot_states = GetBootStates(auth)
+ boot_states = filter(lambda x: x in self.boot_states, boot_states)
+ self.islistequal(boot_states, [], 'DeleteBootState check')
if self.config.verbose:
utils.header("Deleted boot_states: %s" % self.boot_states)
def Peers(self, n = 2):
peer_ids = []
+ AddPeer = self.debug(api.AddPeer)
+ GetPeers = self.debug(api.GetPeers)
+ UpdatePeer = self.debug(api.UpdatePeer)
for i in range(n):
# Add Peer
peer_fields = random_peer()
- AddPeer = self.debug(api.AddPeer)
peer_id = AddPeer(auth, peer_fields)
# Should return a unique id
self.isunique(peer_id, peer_ids, 'AddPeer - isunique')
peer_ids.append(peer_id)
- GetPeers = self.debug(api.GetPeers)
peers = GetPeers(auth, [peer_id])
if peers is None: continue
peer = peers[0]
# Update Peer
peer_fields = random_peer()
- UpdatePeer = self.debug(api.UpdatePeer)
result = UpdatePeer(auth, peer_id, peer_fields)
# Check again
def DeletePeers(self):
# Delete all peers
DeletePeer = self.debug(api.DeletePeer)
+ GetPeers = self.debug(api.GetPeers)
for peer_id in self.peer_ids:
result = DeletePeer(auth, peer_id)
# Check if peers are deleted
- GetPeers = self.debug(api.GetPeers)
peers = GetPeers(auth, self.peer_ids)
self.islistequal(peers, [], 'DeletePeer - check' % self.peer_ids)
if self.config.verbose:
- utils.header("Deleted sites: %s" % self.peer_ids)
+ utils.header("Deleted peers: %s" % self.peer_ids)
self.peer_ids =[]
def ConfFiles(self, n = 2):
return conf_file_ids
- def DeleteConfFiles(self):
+ def DeleteConfFiles(self):
GetConfFiles = self.debug(api.GetConfFiles)
+ DeleteConfFile = self.debug(api.DeleteConfFile)
+ DeleteConfFileFromNode = self.debug(api.DeleteConfFileFromNode)
+ DeleteConfFileFromNodeGroup = self.debug(api.DeleteConfFileFromNodeGroup)
+
conf_files = GetConfFiles(auth, self.conf_file_ids)
- if conf_fiels is None or not conf_files: return 0
+ if conf_files is None or not conf_files: return 0
conf_file = conf_files[0]
-
if conf_file['node_ids']:
node_id = random.sample(conf_file['node_ids'], 1)[0]
- DeleteConfFileFromNode = self.debug(api.DeleteConfFileFromNode)
DeleteConfFileFromNode(auth, conf_file['conf_file_id'], node_id)
-
if conf_file['nodegroup_ids']:
nodegroup_id = random.sample(conf_file['nodegroup_ids'], 1)[0]
- DeleteConfFileFromNodeGroup = self.debug(api.DeleteConfFileFromNodeGroup)
- DeleteConfFileFromNode(auth, conf_file['conf_file_id'], nodegroup_id)
+ DeleteConfFileFromNodeGroup(auth, conf_file['conf_file_id'], nodegroup_id)
# check
- conf_files = GetConfFiles(auth, conf_file['conf_file_id'], ['node_ids', 'nodegroup_ids'])
+ conf_files = GetConfFiles(auth, [conf_file['conf_file_id']], ['node_ids', 'nodegroup_ids'])
if conf_files is None or not conf_files: return 0
conf_file = conf_files[0]
self.islistequal([], conf_file['node_ids'], 'AddConfFileToNode - check')
self.islistequal([], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check')
- DeleteConfFile = self.debug(api.DeleteConfFile)
for conf_file_id in self.conf_file_ids:
DeleteConfFile(auth, conf_file_id)
utils.header("Deleted conf_files: %s" % self.conf_file_ids)
self.conf_file_ids = []
+
+ def NodeNetworks(self, n = 4):
+ nodenetwork_ids = []
+ AddNodeNetwork = self.debug(api.AddNodeNetwork)
+ UpdateNodeNetwork = self.debug(api.UpdateNodeNetwork)
+ GetNodeNetworks = self.debug(api.GetNodeNetworks)
+
+ for i in range(n):
+ # Add Node Network
+ nodenetwork_fields = random_nodenetwork()
+ node_id = random.sample(self.node_ids, 1)[0]
+ nodenetwork_id = AddNodeNetwork(auth, node_id, nodenetwork_fields)
+ if nodenetwork_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_ids, nodenetwork_ids, 'AddNodeNetwork - isunique')
+ nodenetwork_ids.append(nodenetwork_id)
+
+ # check Node Network
+ nodenetworks = GetNodeNetworks(auth, [nodenetwork_id])
+ if nodenetworks is None: continue
+ nodenetwork = nodenetworks[0]
+ self.isequal(nodenetwork, nodenetwork_fields, 'AddNodeNetwork - isequal')
+
+ # Update NodeNetwork
+ nodenetwork_fields = random_nodenetwork()
+ UpdateNodeNetwork(auth, nodenetwork_id, nodenetwork_fields)
+
+ # Check again
+ nodenetworks = GetNodeNetworks(auth, [nodenetwork_id])
+ if nodenetworks is None: continue
+ nodenetwork = nodenetworks[0]
+ self.isequal(nodenetwork, nodenetwork_fields, 'UpdateNodeNetwork - isequal')
+
+ nodenetworks = GetNodeNetworks(auth, nodenetwork_ids)
+ if nodenetworks is not None:
+ self.islistequal(nodenetwork_ids, [n['nodenetwork_id'] for n in nodenetworks], 'GetNodeNetworks - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetworks: %s' % nodenetwork_ids)
+
+ return nodenetwork_ids
+
+ def DeleteNodeNetworks(self):
+ GetNodeNetworks = self.debug(api.GetNodeNetworks)
+ DeleteNodeNetwork = self.debug(api.DeleteNodeNetwork)
+
+ for nodenetwork_id in self.nodenetwork_ids:
+ DeleteNodeNetwork(auth, nodenetwork_id)
+
+ # check
+ nodenetworks = GetNodeNetworks(auth, self.nodenetwork_ids)
+ self.islistequal(nodenetworks, [], 'DeleteNodeNetwork - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetworks: %s " % self.nodenetwork_ids)
+ self.nodenetwork_ids = []
+ def NodeNetworkSettings(self, n=2):
+ nodenetwork_setting_ids = []
+ AddNodeNetworkSetting = self.debug(api.AddNodeNetworkSetting)
+ UpdateNodeNetworkSetting = self.debug(api.UpdateNodeNetworkSetting)
+ GetNodeNetworkSettings = self.debug(api.GetNodeNetworkSettings)
+
+ for nodenetwork_id in self.nodenetwork_ids:
+ # Add Node Network
+ nodenetwork_setting_fields = random_nodenetwork_setting()
+ #nodenetwork_id = random.sample(self.nodenetwork_ids, 1)[0]
+ nodenetwork_setting_type_id = random.sample(self.nodenetwork_setting_type_ids, 1)[0]
+ value = nodenetwork_setting_fields['value']
+ nodenetwork_setting_id = AddNodeNetworkSetting(auth, nodenetwork_id, nodenetwork_setting_type_id, value)
+ if nodenetwork_setting_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_setting_ids, nodenetwork_setting_ids, 'AddNodeNetworkSetting - isunique')
+ nodenetwork_setting_ids.append(nodenetwork_setting_id)
+
+ # check Node Network
+ nodenetwork_settings = GetNodeNetworkSettings(auth, [nodenetwork_setting_id])
+ if nodenetwork_settings is None: continue
+ nodenetwork_setting = nodenetwork_settings[0]
+ self.isequal(nodenetwork_setting, nodenetwork_setting_fields, 'AddNodeNetworkSetting - isequal')
+
+ # Update NodeNetworkSetting
+ nodenetwork_setting_fields = random_nodenetwork_setting()
+ value = nodenetwork_setting_fields['value']
+ UpdateNodeNetworkSetting(auth, nodenetwork_setting_id, value)
+
+ # Check again
+ nodenetwork_settings = GetNodeNetworkSettings(auth, [nodenetwork_setting_id])
+ if nodenetwork_settings is None: continue
+ nodenetwork_setting = nodenetwork_settings[0]
+ self.isequal(nodenetwork_setting, nodenetwork_setting_fields, 'UpdateNodeNetworkSetting - isequal')
+
+ nodenetwork_settings = GetNodeNetworkSettings(auth, nodenetwork_setting_ids)
+ if nodenetwork_settings is not None:
+ self.islistequal(nodenetwork_setting_ids, [n['nodenetwork_setting_id'] for n in nodenetwork_settings], 'GetNodeNetworkSettings - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetwork_settings: %s' % nodenetwork_setting_ids)
+
+ return nodenetwork_setting_ids
+
+ def DeleteNodeNetworkSettings(self):
+ GetNodeNetworkSettings = self.debug(api.GetNodeNetworkSettings)
+ DeleteNodeNetworkSetting = self.debug(api.DeleteNodeNetworkSetting)
+
+ for nodenetwork_setting_id in self.nodenetwork_setting_ids:
+ DeleteNodeNetworkSetting(auth, nodenetwork_setting_id)
+
+ # check
+ nodenetwork_settings = GetNodeNetworkSettings(auth, self.nodenetwork_setting_ids)
+ self.islistequal(nodenetwork_settings, [], 'DeleteNodeNetworkSetting - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetwork settings: %s " % self.nodenetwork_setting_ids)
+ self.nodenetwork_setting_ids = []
+ def NodeNetworkSettingTypes(self, n = 2):
+ nodenetwork_setting_type_ids = []
+ AddNodeNetworkSettingType = self.debug(api.AddNodeNetworkSettingType)
+ UpdateNodeNetworkSettingType = self.debug(api.UpdateNodeNetworkSettingType)
+ GetNodeNetworkSettingTypes = self.debug(api.GetNodeNetworkSettingTypes)
+
+ for i in range(n):
+ # Add Node Network Settings Type
+ nodenetwork_setting_type_fields = random_nodenetwork_setting_type()
+ nodenetwork_setting_type_id = AddNodeNetworkSettingType(auth, nodenetwork_setting_type_fields)
+ if nodenetwork_setting_type_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_setting_type_ids, nodenetwork_setting_type_ids, 'AddNodeNetworkSettingType - isunique')
+ nodenetwork_setting_type_ids.append(nodenetwork_setting_type_id)
+
+ # check Node Network Settings Type
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(auth, [nodenetwork_setting_type_id])
+ if nodenetwork_setting_types is None: continue
+ nodenetwork_setting_type = nodenetwork_setting_types[0]
+ self.isequal(nodenetwork_setting_type, nodenetwork_setting_type_fields, 'AddNodeNetworkSettingType - isequal')
+
+ # Update NodeNetworkSetting
+ nodenetwork_setting_type_fields = random_nodenetwork_setting_type()
+ UpdateNodeNetworkSettingType(auth, nodenetwork_setting_type_id, nodenetwork_setting_type_fields)
+
+ # Check again
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(auth, [nodenetwork_setting_type_id])
+ if nodenetwork_setting_types is None: continue
+ nodenetwork_setting_type = nodenetwork_setting_types[0]
+ self.isequal(nodenetwork_setting_type, nodenetwork_setting_type_fields, 'UpdateNodeNetworkSettingType - isequal')
+
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(auth, nodenetwork_setting_type_ids)
+ if nodenetwork_setting_types is not None:
+ self.islistequal(nodenetwork_setting_type_ids, [n['nodenetwork_setting_type_id'] for n in nodenetwork_setting_types], 'GetNodeNetworkSettingTypes - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetwork_setting_types: %s' % nodenetwork_setting_type_ids)
+
+ return nodenetwork_setting_type_ids
+
+ def DeleteNodeNetworkSettingTypes(self):
+ GetNodeNetworkSettingTypes = self.debug(api.GetNodeNetworkSettingTypes)
+ DeleteNodeNetworkSettingType = self.debug(api.DeleteNodeNetworkSettingType)
+
+ for nodenetwork_setting_type_id in self.nodenetwork_setting_type_ids:
+ DeleteNodeNetworkSettingType(auth, nodenetwork_setting_type_id)
+
+ # check
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(auth, self.nodenetwork_setting_type_ids)
+ self.islistequal(nodenetwork_setting_types, [], 'DeleteNodeNetworkSettingType - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetwork setting types: %s " % self.nodenetwork_setting_type_ids)
+ self.nodenetwork_setting_type_ids = []
+
if __name__ == '__main__':
args = tuple(sys.argv[1:])
api_unit_test()(*args)