-#!/usr/bin/python
+#!/usr/bin/env /usr/share/plc_api/plcsh
#
# Test script example
#
from pprint import pprint
from string import letters, digits, punctuation
+import traceback
from traceback import print_exc
import base64
import os, sys
import socket
+import struct
import xmlrpclib
import time
from Test import Test
from qa import utils
from qa.Config import Config
-from qa.logger import Logfile, log
+from qa.logger import Logfile, log
from random import Random
random = Random()
+boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
+roles = [10,20,30,40]
+methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
+key_types = [u'ssh']
+types = [u'ipv4']
+attribute_types = range(6,20)
+login_bases = ['pl']
-config = Config()
-auth = config.auth
-
-try: boot_states = config.api.GetBootStates(auth)
-except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
-
-try: roles = [role['name'] for role in config.api.GetRoles(auth)]
-except: roles = [u'admin', u'pi', u'user', u'tech']
-
-try: methods = config.api.GetNetworkMethods(auth)
-except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
-
-try:types = config.api.GetNetworkTypes(auth)
-except: types = [u'ipv4']
def randfloat(min = 0.0, max = 1.0):
return float(min) + (random.random() * (float(max) - float(min)))
def randint(min = 0, max = 1):
return int(randfloat(min, max + 1))
+def randbool():
+ return random.sample([True, False], 1)[0]
# See "2.2 Characters" in the XML specification:
#
# #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
'login_base': randstr(20, letters).lower(),
'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
+ 'max_slices': 10
}
-
+
+def random_peer():
+ bits = 2048
+ return {
+ 'peername': randstr(254),
+ 'peer_url': "https://" + randhostname() + "/",
+ 'key': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
+ 'cacert': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8")))
+ }
def random_address_type():
return {
'name': randstr(20),
def random_slice():
return {
- 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
+ 'name': random.sample(login_bases, 1)[0] + "_" + randstr(11, letters).lower(),
'url': "http://" + randhostname() + "/",
'description': randstr(2048),
}
'boot_state': random.sample(boot_states, 1)[0],
'model': randstr(255),
'version': randstr(64),
+ 'session': randstr(20)
}
def random_nodenetwork():
'bwlimit': randint(500000, 10000000),
}
- if method != 'dhcp':
+ if nodenetwork_fields['method'] != 'dhcp':
ip = randint(0, 0xffffffff)
netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
network = ip & netmask
return nodenetwork_fields
+def random_nodenetwork_setting():
+ return {
+ 'value': randstr(20)
+ }
+
+def random_nodenetwork_setting_type():
+ return {
+ 'name': randstr(20),
+ 'description': randstr(50),
+ 'category': randstr(20),
+ 'min_role_id': random.sample(roles, 1)[0]
+ }
+
def random_pcu():
return {
'hostname': randhostname(),
return {
'name': randstr(100),
'description': randstr(254),
- 'min_role_id': random.sample(roles.values(), 1)[0],
+ 'min_role_id': int(random.sample(roles, 1)[0]),
}
-def isequal(object_fields, expected_fields):
- try:
- for field in expected_fields:
- assert field in object_fields
- assert object_fields[field] == expected_fields[field]
- except:
- return False
- return True
-
-def islistequal(list1, list2):
- try:
- assert set(list1) == set(list2)
- except:
- return False
- return True
-
-def isunique(id, id_list):
- try:
- assert id not in id_list
- except:
- return False
- return True
-
+def random_pcu_type():
+ return {
+ 'model': randstr(254),
+ 'name': randstr(254),
+ }
+
+def random_pcu_protocol_type():
+ return {
+ 'port': randint(0, 65535),
+ 'protocol': randstr(254),
+ 'supported': randbool()
+ }
+
+def random_slice_instantiation():
+ return {
+ 'instantiation': randstr(10)
+ }
+def random_slice_attribute():
+ return {
+ 'attribute_type_id': random.sample(attribute_types, 1)[0],
+ 'name': randstr(50),
+ 'value': randstr(20)
+ }
+
+def random_initscript():
+ return {
+ 'name': randstr(20),
+ 'enabled': randbool(),
+ 'script': randstr(200)
+ }
+
+def random_role():
+ return {
+ 'role_id': randint(1000),
+ 'name': randstr(50)
+ }
+
+def random_message():
+ return {
+ 'message_id': randstr(10),
+ 'subject': randstr(100),
+ 'template': randstr(254),
+ }
+
class api_unit_test(Test):
-
+
+ def __init__(self, config = None, logfile = None):
+ if not config: config = Config()
+ if not logfile: logfile = Logfile(config.logfile.dir + 'api-unittest.log')
+ Test.__init__(self, config, logfile)
+
def call(self,
+ plc_name = None,
+ boot_states = 2,
sites = 2,
- nodes = 4,
+ peers = 2,
address_types = 3,
addresses = 2,
- persons = 10,
- keys = 3
+ pcu_types = 2,
+ pcu_protocol_types = 2,
+ pcus = 2,
+ network_methods = 2,
+ network_types = 2,
+ nodegroups = 3,
+ nodes = 4,
+ conf_files = 3,
+ nodenetworks = 4,
+ nodenetwork_setting_types = 2,
+ nodenetwork_settings = 2,
+ slice_attribute_types = 2,
+ slice_instantiations = 2,
+ slices = 4,
+ slice_attributes = 4,
+ initscripts = 4,
+ roles = 2,
+ persons = 4,
+ key_types = 3,
+ keys = 3,
+ messages = 2
):
- self.api = self.config.api
- self.auth = self.config.auth
- self.all_methods = set(self.api.system.listMethods())
+ plc = self.config.get_plc(plc_name)
+ self.api = plc.config.api
+ self.auth = plc.config.auth
+ #self.logfile = Logfile(self.config.logfile.dir + 'api-unittest.log')
+
+ # Filter out deprecated (Adm) and boot Methods
+ current_methods = lambda method: not method.startswith('Adm') and \
+ not method.startswith('Slice') and \
+ not method.startswith('Boot') and \
+ not method.startswith('Anon') and \
+ not method.startswith('system')
+ self.all_methods = set(filter(current_methods, self.api.system.listMethods()))
self.methods_tested = set()
self.methods_failed = set()
+ # Begin testing methods
try:
try:
- self.site_ids = self.Sites(sites)
- self.node_ids = self.Nodes(nodes)
+ if hasattr(self, 'BootStates'): self.boot_states = self.BootStates(boot_states)
+ if hasattr(self, 'Sites'): self.site_ids = self.Sites(sites)
+ if hasattr(self, 'Peers'): self.peer_ids = self.Peers(peers)
+ if hasattr(self, 'AddressTypes'): self.address_type_ids = self.AddressTypes(address_types)
+ if hasattr(self, 'Addresses'): self.address_ids = self.Addresses(addresses)
+ if hasattr(self, 'PCUTypes'): self.pcu_type_ids = self.PCUTypes(pcu_types)
+ if hasattr(self, 'PCUProtocolTypes'): self.pcu_protocol_type_ids = self.PCUProtocolTypes(pcu_protocol_types)
+ if hasattr(self, 'PCUs'): self.pcu_ids = self.PCUs(pcus)
+ if hasattr(self, 'NetworkMethods'): self.network_methods = self.NetworkMethods()
+ if hasattr(self, 'NetworkTypes'): self.network_types = self.NetworkTypes()
+ if hasattr(self, 'NodeGroups'): self.nodegroup_ids = self.NodeGroups()
+ if hasattr(self, 'Nodes'): self.node_ids = self.Nodes(nodes)
+ if hasattr(self, 'ConfFiles'): self.conf_file_ids = self.ConfFiles(conf_files)
+ if hasattr(self, 'NodeNetworks'): self.nodenetwork_ids = self.NodeNetworks(nodenetworks)
+ if hasattr(self, 'NodeNetworkSettingTypes'): self.nodenetwork_setting_type_ids = self.NodeNetworkSettingTypes(nodenetwork_setting_types)
+ if hasattr(self, 'NodeNetworkSettings'): self.nodenetwork_setting_ids = self.NodeNetworkSettings(nodenetwork_settings)
+ if hasattr(self, 'SliceAttributeTypes'): self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types)
+ if hasattr(self, 'SliceInstantiations'): self.slice_instantiations = self.SliceInstantiations(slice_instantiations)
+ if hasattr(self, 'Slices'): self.slice_ids = self.Slices(slices)
+ if hasattr(self, 'SliceAttributes'): self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
+ if hasattr(self, 'InitScripts'): self.initscript_ids = self.InitScripts(initscripts)
+ if hasattr(self, 'Roles'): self.role_ids = self.Roles(roles)
+ if hasattr(self, 'Persons'): self.person_ids = self.Persons(persons)
+ if hasattr(self, 'KeyTypes'): self.key_types = self.KeyTypes(key_types)
+ if hasattr(self, 'Keys'): self.key_ids = self.Keys(keys)
+ if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages)
+ if hasattr(self, 'Sessions'): self.session_ids = self.Sessions()
+
+ # Test misc Get calls
+ if hasattr(self, 'GenerateNodeConfFile'): self.GenerateNodeConfFile()
+ if hasattr(self, 'GetBootMedium'): self.GetBootMedium()
+ if hasattr(self, 'GetEventObjects'): self.event_object_ids = self.GetEventObjects()
+ if hasattr(self, 'GetEvents'): self.event_ids = self.GetEvents()
+ if hasattr(self, 'GetPeerData'): self.GetPeerData()
+ if hasattr(self, 'GetPeerName'): self.GetPeerName()
+ if hasattr(self, 'GetPlcRelease'): self.GetPlcRelease()
+ if hasattr(self, 'GetSliceKeys'): self.GetSliceKeys()
+ if hasattr(self, 'GetSliceTicket'): self.GetSliceTicket()
+ if hasattr(self, 'GetSlicesMD5'): self.GetSlicesMD5()
+ if hasattr(self, 'GetSlivers'): self.GetSlivers()
+ if hasattr(self, 'GetWhitelist'): self.GetWhitelist()
+
+ # Test various administrative methods
+ if hasattr(self, 'NotifyPersons'): self.NotifyPersons()
+ if hasattr(self, 'NotifySupport'): self.NotifySupport()
+ if hasattr(self, 'RebootNode'): self.RebootNode()
+ if hasattr(self, 'RefrestPeer'): self.RefreshPeer()
+ if hasattr(self, 'ResetPassword'): self.ResetPassword()
+ if hasattr(self, 'SetPersonPrimarySite'): self.SetPersonPrimarySite()
+ if hasattr(self, 'VerifyPerson'): self.VerifyPerson()
+
except:
print_exc()
finally:
try:
self.cleanup()
finally:
-
- logfile = Logfile("api-unittest.log")
+ utils.header("writing api-unittest-summary.log")
methods_ok = list(self.methods_tested.difference(self.methods_failed))
methods_failed = list(self.methods_failed)
methods_untested = list(self.all_methods.difference(self.methods_tested))
methods_ok.sort()
methods_failed.sort()
methods_untested.sort()
- print >> logfile, "\n".join([m+": [OK]" for m in methods_ok])
- print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
- print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
+ print >> self.logfile, "\n".join([m+": [OK]" for m in methods_ok])
+ print >> self.logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
+ print >> self.logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
def isequal(self, object_fields, expected_fields, method_name):
try:
self.methods_failed.update([method_name])
return False
return True
-
+
+ def isinlist(self, item, item_list, method_name):
+ try: assert item in item_list
+ except:
+ self.methods_failed.update([method_name])
+ return False
+ return True
+
def debug(self, method, method_name=None):
if method_name is None:
- method_name = method._Method__name
-
- self.methods_tested.update([method_name])
+ try:
+ method_name = method.name
+ self.methods_tested.update([method_name])
+ except:
+ method_name = method._Method__name
+ self.methods_tested.update([method_name])
+
def wrapper(*args, **kwds):
try:
return method(*args, **kwds)
except:
self.methods_failed.update([method_name])
+ print >> self.logfile, "%s%s: %s\n" % (method_name, tuple(args[1:]), traceback.format_exc())
return None
return wrapper
def cleanup(self):
- #if self.person_ids: self.DeletePersons()
- #if self.address_ids: self.DeleteAddresses()
- #if self.address_type_ids: self.DeleteAddressTypes()
+ if hasattr(self, 'session_ids'): self.DeleteSessions()
+ if hasattr(self, 'message_ids'): self.DeleteMessages()
+ if hasattr(self, 'key_types'): self.DeleteKeyTypes()
+ if hasattr(self, 'key_ids'): self.DeleteKeys()
+ if hasattr(self, 'person_ids'): self.DeletePersons()
+ if hasattr(self, 'role_ids'): self.DeleteRoles()
+ if hasattr(self, 'initscript_ids'): self.DeleteInitScripts()
+ if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
+ if hasattr(self, 'slice_ids'): self.DeleteSlices()
+ if hasattr(self, 'slice_instantiations'): self.DeleteSliceInstantiations()
+ if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes()
+ if hasattr(self, 'nodenetwork_setting_ids'): self.DeleteNodeNetworkSettings()
+ if hasattr(self, 'nodenetwork_setting_type_ids'): self.DeleteNodeNetworkSettingTypes()
+ if hasattr(self, 'nodenetwork_ids'): self.DeleteNodeNetworks()
+ if hasattr(self, 'conf_file_ids'): self.DeleteConfFiles()
if hasattr(self, 'node_ids'): self.DeleteNodes()
+ if hasattr(self, 'nodegroup_ids'): self.DeleteNodeGroups()
+ if hasattr(self, 'network_types'): self.DeleteNetworkTypes()
+ if hasattr(self, 'network_methods'): self.DeleteNetworkMethods()
+ if hasattr(self, 'pcu_ids'): self.DeletePCUs()
+ if hasattr(self, 'pcu_protocol_type_ids'): self.DeletePCUProtocolTypes()
+ if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes()
+ if hasattr(self, 'address_ids'): self.DeleteAddresses()
+ if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes()
+ if hasattr(self, 'peer_ids'): self.DeletePeers()
if hasattr(self, 'site_ids'): self.DeleteSites()
-
+ if hasattr(self, 'boot_states'): self.DeleteBootStates()
+
def Sites(self, n=4):
site_ids = []
site_fields = random_site()
UpdateSite = self.debug(self.api.UpdateSite)
result = UpdateSite(self.auth, site_id, site_fields)
+
# Check again
sites = GetSites(self.auth, [site_id])
if self.config.verbose:
utils.header("Deleted sites: %s" % self.site_ids)
- self.site_ids = []
+ self.site_ids = []
+
+ def NetworkMethods(self, n=2):
+ methods = []
+ AddNetworkMethod = self.debug(self.api.AddNetworkMethod)
+ GetNetworkMethods = self.debug(self.api.GetNetworkMethods)
+
+ for i in range(n):
+ # Add Network Method
+ net_method = randstr(10)
+ AddNetworkMethod(self.auth, net_method)
+ if net_method is None: continue
+
+ # Should return a unique id
+ self.isunique(net_method, methods, 'AddNetworkMethod - isunique')
+ methods.append(net_method)
+ net_methods = GetNetworkMethods(self.auth)
+ if net_methods is None: continue
+ net_methods = filter(lambda x: x in [net_method], net_methods)
+ method = net_methods[0]
+ self.isequal(method, net_method, 'AddNetworkMethod - isequal')
+
+
+ net_methods = GetNetworkMethods(self.auth)
+ if net_methods is not None:
+ net_methods = filter(lambda x: x in methods, net_methods)
+ self.islistequal(methods, net_methods, 'GetNetworkMethods - isequal')
+
+ if self.config.verbose:
+ utils.header("Added network methods: %s" % methods)
+
+ return methods
+
+ def DeleteNetworkMethods(self):
+ DeleteNetworkMethod = self.debug(self.api.DeleteNetworkMethod)
+ GetNetworkMethods = self.debug(self.api.GetNetworkMethods)
+ for method in self.network_methods:
+ DeleteNetworkMethod(self.auth, method)
+
+ # check
+ network_methods = GetNetworkMethods(self.auth)
+ network_methods = filter(lambda x: x in self.network_methods, network_methods)
+ self.islistequal(network_methods, [], 'DeleteNetworkMethods - check')
+
+ if self.config.verbose:
+ utils.header("Deleted network methods: %s" % self.network_methods)
+ self.network_methods = []
+
+ def NetworkTypes(self, n=2):
+ net_types = []
+ AddNetworkType = self.debug(self.api.AddNetworkType)
+ GetNetworkTypes = self.debug(self.api.GetNetworkTypes)
+
+ for i in range(n):
+ # Add Network Type
+ type = randstr(10)
+ AddNetworkType(self.auth, type)
+
+ # Should return a unique id
+ self.isunique(type, net_types, 'AddNetworkType - isunique')
+ net_types.append(type)
+ types = GetNetworkTypes(self.auth)
+ if types is None: continue
+ types = filter(lambda x: x in [type], types)
+ if types is None: continue
+ net_type = types[0]
+ self.isequal(net_type, type, 'AddNetworkType - isequal')
+
+ types = GetNetworkTypes(self.auth)
+ if types is not None:
+ types = filter(lambda x: x in net_types, types)
+ self.islistequal(types, net_types, 'GetNetworkTypes - isequal')
+
+ if self.config.verbose:
+ utils.header("Added network types: %s" % net_types)
+
+ return net_types
+
+ def DeleteNetworkTypes(self):
+ DeleteNetworkType = self.debug(self.api.DeleteNetworkType)
+ GetNetworkTypes = self.debug(self.api.GetNetworkTypes)
+ for type in self.network_types:
+ DeleteNetworkType(self.auth, type)
+
+ # check
+ network_types = GetNetworkTypes(self.auth)
+ network_types = filter(lambda x: x in self.network_types, network_types)
+ self.islistequal(network_types, [], 'DeleteNetworkTypes - check')
+
+ if self.config.verbose:
+ utils.header("Deleted network types: %s" % self.network_types)
+ self.network_types = []
+
+
+ def NodeGroups(self, n = 4):
+ nodegroup_ids = []
+ AddNodeGroup = self.debug(self.api.AddNodeGroup)
+ UpdateNodeGroup = self.debug(self.api.UpdateNodeGroup)
+ GetNodeGroups = self.debug(self.api.GetNodeGroups)
+
+ for i in range(n):
+ # Add Nodegroups
+ nodegroup_fields = random_nodegroup()
+ nodegroup_id = AddNodeGroup(self.auth, nodegroup_fields)
+ if nodegroup_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodegroup_id, nodegroup_ids, 'AddNodeGroup - isunique')
+ nodegroup_ids.append(nodegroup_id)
+ nodegroups = GetNodeGroups(self.auth, [nodegroup_id])
+ if nodegroups is None: continue
+ nodegroup = nodegroups[0]
+ self.isequal(nodegroup, nodegroup_fields, 'AddNodeGroup - isequal')
+
+ # Update NodeGroups
+ nodegroup_fields = random_nodegroup()
+ UpdateNodeGroup(self.auth, nodegroup_id, nodegroup_fields)
+
+ # Check again
+ nodegroups = GetNodeGroups(self.auth, [nodegroup_id])
+ if nodegroups is None: continue
+ nodegroup = nodegroups[0]
+ self.isequal(nodegroup, nodegroup_fields, 'UpdateNodeGroup - isequal')
+
+ nodegroups = GetNodeGroups(self.auth, nodegroup_ids)
+ if nodegroups is not None:
+ self.islistequal(nodegroup_ids, [n['nodegroup_id'] for n in nodegroups], 'GetNodeGroups - isequal')
+ if self.config.verbose:
+ utils.header("Added nodegroups: %s" % nodegroup_ids)
+
+ return nodegroup_ids
+
+ def DeleteNodeGroups(self):
+ # Delete all NodeGroups
+ GetNodeGroups = self.debug(self.api.GetNodeGroups)
+ DeleteNodeGroup = self.debug(self.api.DeleteNodeGroup)
+
+ for nodegroup_id in self.nodegroup_ids:
+ result = DeleteNodeGroup(self.auth, nodegroup_id)
+
+ # Check is nodegroups are deleted
+ nodegroups = GetNodeGroups(self.auth, self.nodegroup_ids)
+ self.islistequal(nodegroups, [], 'DeleteNodeGroup - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodegroups: %s" % self.nodegroup_ids)
+
+ self.nodegroup_ids = []
+
+ def PCUTypes(self, n=2):
+ pcu_type_ids = []
+ AddPCUType = self.debug(self.api.AddPCUType)
+ UpdatePCUType = self.debug(self.api.UpdatePCUType)
+ GetPCUTypes = self.debug(self.api.GetPCUTypes)
+
+ for i in range(n):
+ # Add PCUType
+ pcu_type_fields = random_pcu_type()
+ pcu_type_id = AddPCUType(self.auth, pcu_type_fields)
+ if pcu_type_id is None: continue
+ # Should return a unique id
+ self.isunique(pcu_type_id, pcu_type_ids, 'AddPCUType - isunique')
+ pcu_type_ids.append(pcu_type_id)
+
+ # Check pcu type
+ pcu_types = GetPCUTypes(self.auth, [pcu_type_id])
+ if pcu_types is None: continue
+ pcu_type = pcu_types[0]
+ self.isequal(pcu_type, pcu_type_fields, 'AddPCUType - isequal')
+
+ # Update PCUType
+ pcu_type_fields = random_pcu_type()
+ UpdatePCUType(self.auth, pcu_type_id, pcu_type_fields)
+
+ # Check again
+ pcu_types = GetPCUTypes(self.auth, [pcu_type_id])
+ if pcu_types is None: continue
+ pcu_type = pcu_types[0]
+ self.isequal(pcu_type, pcu_type_fields, 'UpdatePCUType - isequal')
+
+ pcu_types = GetPCUTypes(self.auth, pcu_type_ids)
+ if pcu_types is not None:
+ self.islistequal(pcu_type_ids, [p['pcu_type_id'] for p in pcu_types], 'GetPCUTypes - check')
+
+ if self.config.verbose:
+ utils.header("Added pcu_types: %s " % pcu_type_ids)
+ return pcu_type_ids
+
+ def DeletePCUTypes(self):
+ GetPCUTypes = self.debug(self.api.GetPCUTypes)
+ DeletePCUType = self.debug(self.api.DeletePCUType)
+
+ for pcu_type_id in self.pcu_type_ids:
+ DeletePCUType(self.auth, pcu_type_id)
+
+ pcu_types = GetPCUTypes(self.auth, self.pcu_type_ids)
+ self.islistequal(pcu_types, [], 'DeletePCUType - check')
+
+ def PCUProtocolTypes(self, n=2):
+ protocol_type_ids = []
+ AddPCUProtocolType = self.debug(self.api.AddPCUProtocolType)
+ UpdatePCUProtocolType = self.debug(self.api.UpdatePCUProtocolType)
+ GetPCUProtocolTypes = self.debug(self.api.GetPCUProtocolTypes)
+
+ for i in range(n):
+ # Add PCUProtocolType
+ protocol_type_fields = random_pcu_protocol_type()
+ pcu_type_id = random.sample(self.pcu_type_ids, 1)[0]
+ protocol_type_id = AddPCUProtocolType(self.auth, pcu_type_id, protocol_type_fields)
+ if protocol_type_id is None: continue
+
+ # Should return a unique id
+ self.isunique(protocol_type_id, protocol_type_ids, 'AddPCUProtocolType - isunique')
+ protocol_type_ids.append(protocol_type_id)
+
+ # Check protocol type
+ protocol_types = GetPCUProtocolTypes(self.auth, [protocol_type_id])
+ if protocol_types is None: continue
+ protocol_type = protocol_types[0]
+ self.isequal(protocol_type, protocol_type_fields, 'AddPCUProtocolType - isequal')
+
+ # Update protocol type
+ protocol_type_fields = random_pcu_protocol_type()
+ UpdatePCUProtocolType(self.auth, protocol_type_id, protocol_type_fields)
+
+ # Check again
+ protocol_types = GetPCUProtocolTypes(self.auth, [protocol_type_id])
+ if protocol_types is None: continue
+ protocol_type = protocol_types[0]
+ self.isequal(protocol_type, protocol_type_fields, 'UpdatePCUProtocolType - isequal')
+
+ protocol_types = GetPCUProtocolTypes(self.auth, protocol_type_ids)
+ if protocol_types is not None:
+ pt_ids = [p['pcu_protocol_type_id'] for p in protocol_types]
+ self.islistequal(protocol_type_ids, pt_ids, 'GetPCUProtocolTypes - isequal')
+
+ if self.config.verbose:
+ utils.header('Added pcu_protocol_types: %s' % protocol_type_ids)
+
+ return protocol_type_ids
+
+ def DeletePCUProtocolTypes(self):
+ GetPCUProtocolTypes = self.debug(self.api.GetPCUProtocolTypes)
+ DeletePCUProtocolType = self.debug(self.api.DeletePCUProtocolType)
+
+ for protocol_type_id in self.pcu_protocol_type_ids:
+ DeletePCUProtocolType(self.auth, protocol_type_id)
+
+ # check
+ protocol_types = GetPCUProtocolTypes(self.auth, self.pcu_protocol_type_ids)
+ self.islistequal(protocol_types, [], 'DeletePCUProtocolType - check')
+
+ if self.config.verbose:
+ utils.header("Deleted pcu_protocol_types: %s" % self.pcu_protocol_type_ids)
+ self.pcu_protocol_type_ids = []
+
+ def PCUs(self, n = 4):
+ pcu_ids = []
+ AddPCU = self.debug(self.api.AddPCU)
+ UpdatePCU = self.debug(self.api.UpdatePCU)
+ GetPCUs = self.debug(self.api.GetPCUs)
+
+ for site_id in self.site_ids:
+ # Add PCU
+ pcu_fields = random_pcu()
+ pcu_id = AddPCU(self.auth, site_id, pcu_fields)
+ if pcu_id is None: continue
+
+ # Should return a unique id
+ self.isunique(pcu_id, pcu_ids, 'AddPCU - isunique')
+ pcu_ids.append(pcu_id)
+
+ # check PCU
+ pcus = GetPCUs(self.auth, [pcu_id])
+ if pcus is None: continue
+ pcu = pcus[0]
+ self.isequal(pcu, pcu_fields, 'AddPCU - isequal')
+
+ # Update PCU
+ pcu_fields = random_pcu()
+ UpdatePCU(self.auth, pcu_id, pcu_fields)
+
+ # Check again
+ pcus = GetPCUs(self.auth, [pcu_id])
+ if pcus is None: continue
+ pcu = pcus[0]
+ self.isequal(pcu, pcu_fields, 'UpdatePCU - isequal')
+
+ pcus = GetPCUs(self.auth, pcu_ids)
+ if pcus is not None:
+ self.islistequal(pcu_ids, [p['pcu_id'] for p in pcus], 'GetPCUs - isequal')
+
+ if self.config.verbose:
+ utils.header('Added pcus: %s' % pcu_ids)
+
+ return pcu_ids
+
+ def DeletePCUs(self):
+ GetPCUs = self.debug(self.api.GetPCUs)
+ DeletePCU = self.debug(self.api.DeletePCU)
+
+ for pcu_id in self.pcu_ids:
+ DeletePCU(self.auth, pcu_id)
+ # check
+ pcus = GetPCUs(self.auth, self.pcu_ids)
+ self.islistequal(pcus, [], 'DeletePCU - check')
+
+ if self.config.verbose:
+ utils.header("Deleted pcus: %s " % self.pcu_ids)
+ self.pcu_ids = []
+
def Nodes(self, n=4):
node_ids = []
+ AddNode = self.debug(self.api.AddNode)
+ GetNodes = self.debug(self.api.GetNodes)
+ UpdateNode = self.debug(self.api.UpdateNode)
for i in range(n):
# Add Node
node_fields = random_node()
site_id = random.sample(self.site_ids, 1)[0]
- AddNode = self.debug(self.api.AddNode)
node_id = AddNode(self.auth, site_id, node_fields)
if node_id is None: continue
node_ids.append(node_id)
# Check nodes
- GetNodes = self.debug(self.api.GetNodes)
nodes = GetNodes(self.auth, [node_id])
if nodes is None: continue
node = nodes[0]
# Update node
node_fields = random_node()
- UpdateNode = self.debug(self.api.UpdateNode)
result = UpdateNode(self.auth, node_id, node_fields)
# Check again
if nodes is None: continue
node = nodes[0]
self.isequal(node, node_fields, 'UpdateNode - isequal')
+
+ # Add node to nodegroup
+ nodegroup_id = random.sample(self.nodegroup_ids, 1)[0]
+ AddNodeToNodeGroup = self.debug(self.api.AddNodeToNodeGroup)
+ AddNodeToNodeGroup(self.auth, node_id, nodegroup_id)
+
+ # Add node to PCU
+ sites = self.api.GetSites(self.auth, [node['site_id']], ['pcu_ids'])
+ if not sites: continue
+ site = sites[0]
+ pcu_id = random.sample(site['pcu_ids'], 1)[0]
+ port = random.sample(range(65535), 1)[0]
+ AddNodeToPCU = self.debug(self.api.AddNodeToPCU)
+ AddNodeToPCU(self.auth, node_id, pcu_id, port)
+
+ # check nodegroup, pcu
+ nodes = GetNodes(self.auth, [node_id], ['nodegroup_ids', 'pcu_ids'])
+ if nodes is None or not nodes: continue
+ node = nodes[0]
+ self.islistequal([nodegroup_id], node['nodegroup_ids'], 'AddNodeToNodeGroup - check')
+ self.islistequal([pcu_id], node['pcu_ids'], 'AddNodeToPCU - check')
nodes = GetNodes(self.auth, node_ids)
if nodes is not None:
return node_ids
def DeleteNodes(self):
+
+ # Delete attributes manually for first node
+ GetNodes = self.debug(self.api.GetNodes)
+ nodes = GetNodes(self.auth, self.node_ids)
+ if nodes is None or not nodes: return 0
+ node = nodes[0]
+
+ if node['nodegroup_ids']:
+ # Delete NodeGroup
+ nodegroup_id = random.sample(node['nodegroup_ids'], 1)[0]
+ DeleteNodeFromNodeGroup = self.debug(self.api.DeleteNodeFromNodeGroup)
+ DeleteNodeFromNodeGroup(self.auth, node['node_id'], nodegroup_id)
+
+ if node['pcu_ids']:
+ # Delete PCU
+ pcu_id = random.sample(node['pcu_ids'], 1)[0]
+ DeleteNodeFromPCU = self.debug(self.api.DeleteNodeFromPCU)
+ DeleteNodeFromPCU(self.auth, node['node_id'], pcu_id)
+
+ # check nodegroup, pcu
+ nodes = GetNodes(self.auth, [node['node_id']])
+ if nodes is None or not nodes: return 0
+ self.islistequal([], node['nodegroup_ids'], 'DeleteNodeGromNodeGroup - check')
+ self.islistequal([], node['pcu_ids'], 'DeleteNodeFromPCU - check')
+
+ # Delete rest of nodes
DeleteNode = self.debug(self.api.DeleteNode)
for node_id in self.node_ids:
result = DeleteNode(self.auth, node_id)
# Check if nodes are deleted
GetNodes = self.debug(self.api.GetNodes)
- nodes = GetNodes(self.api, self.node_ids)
+ nodes = GetNodes(self.auth, self.node_ids)
self.islistequal(nodes, [], 'DeleteNode Check')
if self.config.verbose:
utils.header("Deleted nodes: %s" % self.node_ids)
self.node_ids = []
-
+
def AddressTypes(self, n = 3):
address_type_ids = []
for i in range(n):
self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
if self.config.verbose:
- print "Added address types", address_type_ids
+ utils.header("Added address types: %s " % address_type_ids)
return address_type_ids
DeleteAddressType = self.debug(self.api.DeleteAddressType)
for address_type_id in self.address_type_ids:
- DeleteAddressType(auth, address_type_id)
+ DeleteAddressType(self.auth, address_type_id)
GetAddressTypes = self.debug(self.api.GetAddressTypes)
address_types = GetAddressTypes(self.auth, self.address_type_ids)
self.islistequal(address_types, [], 'DeleteAddressType - check')
if self.config.verbose:
- utils.header("Deleted address types: " % self.address_type_ids)
+ utils.header("Deleted address types: %s" % self.address_type_ids)
self.address_type_ids = []
def Addresses(self, n = 3):
address_ids = []
- for i in range(n):
+ AddSiteAddress = self.debug(self.api.AddSiteAddress)
+ GetAddresses = self.debug(self.api.GetAddresses)
+ UpdateAddress = self.debug(self.api.UpdateAddress)
+ AddAddressTypeToAddress = self.debug(self.api.AddAddressTypeToAddress)
+ for i in range(n):
address_fields = random_address()
site_id = random.sample(self.site_ids, 1)[0]
- AddSiteAddress = self.debug(self.api.AddSiteAddress)
address_id = AddSiteAddress(self.auth, site_id, address_fields)
if address_id is None: continue
address_ids.append(address_id)
# Check address
- GetAddresses = self.debug(self.api.GetAddresses)
addresses = GetAddresses(self.auth, [address_id])
if addresses is None: continue
address = addresses[0]
# Update address
address_fields = random_address()
- UpdateAddress = self.debug(self.api.UpdateAddress)
result = UpdateAddress(self.auth, address_id, address_fields)
# Check again
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'UpdateAddress - isequal')
-
- addresses = GetAddress(self.auth, address_ids)
+
+ # Add Address Type
+ address_type_id = random.sample(self.address_type_ids, 1)[0]
+ AddAddressTypeToAddress(self.auth, address_type_id, address_id)
+
+ # check adress type
+ addresses = GetAddresses(self.auth, [address_id], ['address_type_ids'])
+ if addresses is None or not addresses: continue
+ address = addresses[0]
+ self.islistequal([address_type_id], address['address_type_ids'], 'AddAddressTypeToAddress - check')
+
+ addresses = GetAddresses(self.auth, address_ids)
if addresses is not None:
- slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
+ self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
if self.config.verbose:
- utils.header("Added addresses: %s" % self.address_ids)
+ utils.header("Added addresses: %s" % address_ids)
return address_ids
def DeleteAddresses(self):
DeleteAddress = self.debug(self.api.DeleteAddress)
- # Delete site addresses
+ DeleteAddressTypeFromAddress = self.debug(self.api.DeleteAddressTypeFromAddress)
+ GetAddresses = self.debug(self.api.GetAddresses)
+
+ # Delete attributes mananually first
+ addresses = GetAddresses(self.auth, self.address_ids, ['address_id', 'address_type_ids'])
+ if addresses is None or not addresses: return 0
+ address = addresses[0]
+
+ if address['address_type_ids']:
+ address_type_id = random.sample(address['address_type_ids'], 1)[0]
+ DeleteAddressTypeFromAddress(self.auth, address_type_id, address['address_id'])
+
+ # check address_type_ids
+ addresses = GetAddresses(self.auth, [address['address_id']], ['address_type_ids'])
+ if addresses is None or not addresses: return 0
+ address = addresses[0]
+ self.islistequal([], address['address_type_ids'], 'DeleteAddressTypeFromAddress - check')
+
+ # Delete site addresses
for address_id in self.address_ids:
result = DeleteAddress(self.auth, address_id)
# Check
- GetAddresses = self.debug(self.api.GetAddresses)
- addresses = GetAddresses(self.api, self.address_ids)
+ addresses = GetAddresses(self.auth, self.address_ids)
self.islistequal(addresses, [], 'DeleteAddress - check')
- if self.verbose:
- print "Deleted addresses", self.address_ids
+ if self.config.verbose:
+ utils.header("Deleted addresses: %s" % self.address_ids)
self.address_ids = []
- def AddPersons(self, n = 3):
+ def SliceAttributeTypes(self, n = 2):
+ attribute_type_ids = []
+ AddSliceAttributeType = self.debug(self.api.AddSliceAttributeType)
+ GetSliceAttributeTypes = self.debug(self.api.GetSliceAttributeTypes)
+ UpdateSliceAttributeType = self.debug(self.api.UpdateSliceAttributeType)
+
+ for i in range(n):
+ attribute_type_fields = random_attribute_type()
+ attribute_type_id = AddSliceAttributeType(self.auth, attribute_type_fields)
+ if attribute_type_id is None: continue
+
+ # Should return a unique slice_attribute_type_id
+ self.isunique(attribute_type_id, attribute_type_ids, 'AddSliceAttributeType - isunique')
+ attribute_type_ids.append(attribute_type_id)
+
+ # Check slice_attribute_type
+ attribute_types = GetSliceAttributeTypes(self.auth, [attribute_type_id])
+ if attribute_types is None: continue
+ attribute_type = attribute_types[0]
+ self.isequal(attribute_type, attribute_type_fields, 'AddSliceAttributeType - isequal')
+
+ # Update slice_attribute_type
+ attribute_type_fields = random_attribute_type()
+ result = UpdateSliceAttributeType(self.auth, attribute_type_id, attribute_type_fields)
+
+ # Check again
+ attribute_types = GetSliceAttributeTypes(self.auth, [attribute_type_id])
+ if attribute_types is None: continue
+ attribute_type = attribute_types[0]
+ self.isequal(attribute_type, attribute_type_fields, 'UpdateSliceAttributeType - isequal')
+
+ attribute_types = GetSliceAttributeTypes(self.auth, attribute_type_ids)
+ if attribute_types is not None:
+ at_ids = [at['attribute_type_id'] for at in attribute_types]
+ self.islistequal(attribute_type_ids, at_ids, 'GetSliceAttributeTypes - isequal')
+
+ if self.config.verbose:
+ utils.header("Added slice_attribute_types: %s" % attribute_type_ids)
+
+ return attribute_type_ids
+
+ def DeleteSliceAttributeTypes(self):
+ DeleteSliceAttributeType = self.debug(self.api.DeleteSliceAttributeType)
+ GetSliceAttributeTypes = self.debug(self.api.GetSliceAttributeTypes)
+
+ # Delete slice_attribute_type
+ for slice_attribute_type_id in self.slice_attribute_type_ids:
+ result = DeleteSliceAttributeType(self.auth, slice_attribute_type_id)
- roles = GetRoles()
- role_ids = [role['role_id'] for role in roles]
- roles = [role['name'] for role in roles]
- roles = dict(zip(roles, role_ids))
+ # Check
+ slice_attribute_types = GetSliceAttributeTypes(self.auth, self.slice_attribute_type_ids)
+ self.islistequal(slice_attribute_types, [], 'DeleteSliceAttributeTypes - check')
+ if self.config.verbose:
+ utils.header("Deleted slice_attributes: %s" % self.slice_attribute_type_ids)
+
+ self.slice_attribute_type_ids = []
+
+ def SliceInstantiations(self, n = 2):
+ insts = []
+ AddSliceInstantiation= self.debug(self.api.AddSliceInstantiation)
+ GetSliceInstantiations = self.debug(self.api.GetSliceInstantiations)
for i in range(n):
+ inst = randstr(10)
+ result = AddSliceInstantiation(self.auth, inst)
+ if result is None: continue
+ insts.append(inst)
+
+ # Check slice instantiaton
+ instantiations = GetSliceInstantiations(self.auth)
+ if instantiations is None: continue
+ instantiations = filter(lambda x: x in [inst], instantiations)
+ instantiation = instantiations[0]
+ self.isequal(instantiation, inst, 'AddSliceInstantiation - isequal')
+
+
+ instantiations = GetSliceInstantiations(self.auth)
+ if instantiations is not None:
+ instantiations = filter(lambda x: x in insts, instantiations)
+ self.islistequal(insts, instantiations, 'GetSliceInstantiations - isequal')
+
+ if self.config.verbose:
+ utils.header("Added slice instantiations: %s" % insts)
+
+ return insts
+
+ def DeleteSliceInstantiations(self):
+ DeleteSliceInstantiation = self.debug(self.api.DeleteSliceInstantiation)
+ GetSliceInstantiations = self.debug(self.api.GetSliceInstantiations)
+ # Delete slice instantiation
+ for instantiation in self.slice_instantiations:
+ result = DeleteSliceInstantiation(self.auth, instantiation)
+
+ # Check
+ instantiations = GetSliceInstantiations(self.auth)
+ instantiations = filter(lambda x: x in self.slice_instantiations, instantiations)
+ self.islistequal(instantiations, [], 'DeleteSliceInstantiation - check')
+ if self.config.verbose:
+ utils.header("Deleted slice instantiations: %s" % self.slice_instantiations)
+
+ self.slice_instantiations = []
+
+ def Slices(self, n = 3):
+ slice_ids = []
+ AddSlice = self.debug(self.api.AddSlice)
+ GetSlices = self.debug(self.api.GetSlices)
+ UpdateSlice = self.debug(self.api.UpdateSlice)
+ AddSliceToNodes = self.debug(self.api.AddSliceToNodes)
+ for i in range(n):
+ # Add Site
+ slice_fields = random_slice()
+ slice_id = AddSlice(self.auth, slice_fields)
+ if slice_id is None: continue
+
+ # Should return a unique id
+ self.isunique(slice_id, slice_ids, 'AddSlicel - isunique')
+ slice_ids.append(slice_id)
+ slices = GetSlices(self.auth, [slice_id])
+ if slices is None: continue
+ slice = slices[0]
+ self.isequal(slice, slice_fields, 'AddSlice - isequal')
+
+ # Update slice
+ slice_fields = random_slice()
+ result = UpdateSlice(self.auth, slice_id, slice_fields)
+
+ # Check again
+ slices = GetSlices(self.auth, [slice_id])
+ if slices is None: continue
+ slice = slices[0]
+ self.isequal(slice, slice_fields, 'UpdateSlice - isequal')
+
+ # Add node
+ node_id = random.sample(self.node_ids, 1)[0]
+ AddSliceToNodes(self.auth, slice_id, [node_id])
+
+ # check node
+ slices = GetSlices(self.auth, [slice_id], ['node_ids'])
+ if slices is None or not slices: continue
+ slice = slices[0]
+ self.islistequal([node_id], slice['node_ids'], 'AddSliceToNode - check')
+
+ slices = GetSlices(self.auth, slice_ids)
+ if slices is not None:
+ self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal')
+
+ if self.config.verbose:
+ utils.header("Added slices: %s" % slice_ids)
+
+ return slice_ids
+
+ def DeleteSlices(self):
+
+ GetSlices = self.debug(self.api.GetSlices)
+ DeleteSlice = self.debug(self.api.DeleteSlice)
+ DeleteSliceFromNodes = self.debug(self.api.DeleteSliceFromNodes)
+
+ # manually delete attributes for first slice
+ slices = GetSlices(self.auth, self.slice_ids, ['slice_id', 'node_ids'])
+ if slices is None or not slices: return 0
+ slice = slices[0]
+
+ if slice['node_ids']:
+ # Delete node from slice
+ node_id = random.sample(slice['node_ids'], 1)[0]
+ DeleteSliceFromNodes(self.auth, slice['slice_id'], [node_id])
+
+ # Check node_ids
+ slices = GetSlices(self.auth, [slice['slice_id']], ['node_ids'])
+ if slices is None or not slices: return 0
+ slice = slices[0]
+ self.islistequal([], slice['node_ids'], 'DeleteSliceFromNode - check')
+
+ # Have DeleteSlice automatically delete attriubtes for the rest
+ for slice_id in self.slice_ids:
+ # Delete account
+ DeleteSlice(self.auth, slice_id)
+
+ # Check if slices are deleted
+ GetSlices = self.debug(self.api.GetSlices)
+ slices = GetSlices(self.auth, self.slice_ids)
+ self.islistequal(slices, [], 'DeleteSlice - check')
+
+ if self.config.verbose:
+ utils.header("Deleted slices: %s" % self.slice_ids)
+
+ self.slice_ids = []
+
+ def SliceAttributes(self, n = 4):
+ attribute_ids = []
+ AddSliceAttribute = self.debug(self.api.AddSliceAttribute)
+ GetSliceAttributes = self.debug(self.api.GetSliceAttributes)
+ UpdateSliceAttribute = self.debug(self.api.UpdateSliceAttribute)
+
+ for i in range(n):
+ # Add slice attribute
+ attribute_fields = random_slice_attribute()
+ slice_id = random.sample(self.slice_ids, 1)[0]
+ type = attribute_fields['attribute_type_id']
+ value = attribute_fields['value']
+ attribute_id = AddSliceAttribute(self.auth, slice_id, type, value)
+ if attribute_id is None: continue
+
+ # Should return a unique id
+ self.isunique(attribute_id, attribute_ids, 'AddSliceAttribute - isunique')
+ attribute_ids.append(attribute_id)
+
+ # Check attribute
+ attributes = GetSliceAttributes(self.auth, [attribute_id])
+ if attributes is None: continue
+ attribute = attributes[0]
+ self.isequal(attribute, attribute_fields, 'AddSliceAttribute - isequal')
+
+ # Update attribute
+ attribute_fields = random_slice_attribute()
+ type = attribute_fields['attribute_type_id']
+ value = attribute_fields['value']
+ result = UpdateSliceAttribute(self.auth, attribute_id, value)
+
+ # Check again
+ attributes = GetSliceAttributes(self.auth, [attribute_id])
+ if attributes is None: continue
+ attribute = attributes[0]
+ self.isequal(attribute, attribute_fields, 'UpdateSliceAttribute - isequal')
+
+ attributes = GetSliceAttributes(self.auth, attribute_ids)
+ if attributes is not None:
+ attr_ids = [a['slice_attribute_id'] for a in attributes]
+ self.islistequal(attribute_ids, attr_ids, 'GetSliceAttributes - isequal')
+ if self.config.verbose:
+ utils.header("Added slice attributes: %s" % attribute_ids)
+
+ return attribute_ids
+
+ def DeleteSliceAttributes(self):
+ DeleteSliceAttribute = self.debug(self.api.DeleteSliceAttribute)
+ GetSliceAttributes = self.debug(self.api.GetSliceAttributes)
+
+ for attribute_id in self.slice_attribute_ids:
+ DeleteSliceAttribute(self.auth, attribute_id)
+
+ attributes = GetSliceAttributes(self.auth, self.slice_attribute_ids)
+ self.islistequal(attributes, [], 'DeleteSliceAttribute - check')
+
+ if self.config.verbose:
+ utils.header("Deleted slice attributes: %s" % self.slice_attribute_ids)
+
+ self.slice_attribute_ids = []
+
+ def InitScripts(self, n = 2):
+ initscript_ids = []
+ AddInitScript = self.debug(self.api.AddInitScript)
+ GetInitScripts = self.debug(self.api.GetInitScripts)
+ UpdateInitScript = self.debug(self.api.UpdateInitScript)
+ for i in range(n):
+ # Add Peer
+ initscript_fields = random_initscript()
+ initscript_id = AddInitScript(self.auth, initscript_fields)
+
+ # Should return a unique id
+ self.isunique(initscript_id, initscript_ids, 'AddInitScript - isunique')
+ initscript_ids.append(initscript_id)
+ initscripts = GetInitScripts(self.auth, [initscript_id])
+ if initscripts is None: continue
+ initscript = initscripts[0]
+ self.isequal(initscript, initscript_fields, 'AddInitScript - isequal')
+
+ # Update Peer
+ initscript_fields = random_initscript()
+ result = UpdateInitScript(self.auth, initscript_id, initscript_fields)
+
+ # Check again
+ initscripts = GetInitScripts(self.auth, [initscript_id])
+ if initscripts is None: continue
+ initscript = initscripts[0]
+ self.isequal(initscript, initscript_fields, 'UpdateInitScript - isequal')
+
+ initscripts = GetInitScripts(self.auth, initscript_ids)
+ if initscripts is not None:
+ self.islistequal(initscript_ids, [i['initscript_id'] for i in initscripts], 'GetInitScripts -isequal')
+
+ if self.config.verbose:
+ utils.header("Added initscripts: %s" % initscript_ids)
+
+ return initscript_ids
+
+ def DeleteInitScripts(self):
+ # Delete all initscripts
+ DeleteInitScript = self.debug(self.api.DeleteInitScript)
+ GetInitScripts = self.debug(self.api.GetInitScripts)
+ for initscript_id in self.initscript_ids:
+ result = DeleteInitScript(self.auth, initscript_id)
+
+ # Check if peers are deleted
+ initscripts = GetInitScripts(self.auth, self.initscript_ids)
+ self.islistequal(initscripts, [], 'DeletInitScript - check')
+
+ if self.config.verbose:
+ utils.header("Deleted initscripts: %s" % self.initscript_ids)
+ self.initscript_ids =[]
+
+ def Roles(self, n = 2):
+ role_ids = []
+ AddRole = self.debug(self.api.AddRole)
+ GetRoles = self.debug(self.api.GetRoles)
+ for i in range(n):
+ # Add Role
+ role_fields = random_role()
+ role_id = role_fields['role_id']
+ name = role_fields['name']
+ AddRole(self.auth, role_id, name)
+
+ # Should return a unique id
+ self.isunique(role_id, role_ids, 'AddRole - isunique')
+ role_ids.append(role_id)
+ roles = GetRoles(self.auth)
+ if roles is None: continue
+ roles = filter(lambda x: x['role_id'] in [role_id], roles)
+ role = roles[0]
+ self.isequal(role, role_fields, 'AddRole - isequal')
+
+ roles = GetRoles(self.auth)
+ if roles is not None:
+ roles = filter(lambda x: x['role_id'] in role_ids, roles)
+ self.islistequal(role_ids, [r['role_id'] for r in roles], 'GetRoles - isequal')
+
+ if self.config.verbose:
+ utils.header("Added roles: %s" % role_ids)
+
+ return role_ids
+
+ def DeleteRoles(self):
+ # Delete all roles
+ DeleteRole = self.debug(self.api.DeleteRole)
+ GetRoles = self.debug(self.api.GetRoles)
+ for role_id in self.role_ids:
+ result = DeleteRole(self.auth, role_id)
+
+ # Check if peers are deleted
+ roles = GetRoles(self.auth)
+ roles = filter(lambda x: x['role_id'] in self.role_ids, roles)
+ self.islistequal(roles, [], 'DeleteRole - check' % self.role_ids)
+
+ if self.config.verbose:
+ utils.header("Deleted roles: %s" % self.role_ids)
+ self.role_ids =[]
+
+ def Persons(self, n = 3):
+
+ person_ids = []
+ for i in range(n):
# Add account
person_fields = random_person()
- person_id = AddPerson(person_fields)
-
+ AddPerson = self.debug(self.api.AddPerson)
+ person_id = AddPerson(self.auth, person_fields)
+ if person_id is None: continue
+
# Should return a unique person_id
- assert person_id not in self.person_ids
- self.person_ids.append(person_id)
-
- if self.check:
- # Check account
- person = GetPersons([person_id])[0]
- for field in person_fields:
- if field != 'password':
- assert person[field] == person_fields[field]
-
- # Update account
- person_fields = random_person()
- UpdatePerson(person_id, person_fields)
-
- # Check account again
- person = GetPersons([person_id])[0]
- for field in person_fields:
- if field != 'password':
- assert person[field] == person_fields[field]
-
- auth = {'AuthMethod': "password",
- 'Username': person_fields['email'],
- 'AuthString': person_fields['password']}
-
- if self.check:
- # Check that account is disabled
- try:
- assert not AuthCheck(auth)
- except:
- pass
-
- # Add random set of roles
- person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
- for person_role in person_roles:
- role_id = roles[person_role]
- AddRoleToPerson(role_id, person_id)
-
- if self.check:
- person = GetPersons([person_id])[0]
- assert set(person_roles) == set(person['roles'])
-
- # Enable account
- UpdatePerson(person_id, {'enabled': True})
-
- if self.check:
- # Check that account is enabled
- assert AuthCheck(auth)
-
- # Associate account with random set of sites
- person_site_ids = []
- for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
- AddPersonToSite(person_id, site_id)
- person_site_ids.append(site_id)
-
- if self.check:
- # Make sure it really did it
- person = GetPersons([person_id])[0]
- assert set(person_site_ids) == set(person['site_ids'])
-
- # Set a primary site
- primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
- SetPersonPrimarySite(person_id, primary_site_id)
-
- if self.check:
- person = GetPersons([person_id])[0]
- assert person['site_ids'][0] == primary_site_id
-
- if self.verbose:
- print "Added users", self.person_ids
+ self.isunique(person_id, person_ids, 'AddPerson - isunique')
+ person_ids.append(person_id)
+ GetPersons = self.debug(self.api.GetPersons)
+ persons = GetPersons(self.auth, [person_id])
+ if persons is None: continue
+ person = persons[0]
+ self.isequal(person, person_fields, 'AddPerson - isequal')
+
+ # Update account
+ person_fields = random_person()
+ person_fields['enabled'] = True
+ UpdatePerson = self.debug(self.api.UpdatePerson)
+ result = UpdatePerson(self.auth, person_id, person_fields)
+
+ # Add random role
+ AddRoleToPerson = self.debug(self.api.AddRoleToPerson)
+ role = random.sample(roles, 1)[0]
+ result = AddRoleToPerson(self.auth, role, person_id)
+
+ # Add key to person
+ key = random_key()
+ key_id = AddPersonKey = self.debug(self.api.AddPersonKey)
+ AddPersonKey(self.auth, person_id, key)
+
+ # Add person to site
+ site_id = random.sample(self.site_ids, 1)[0]
+ AddPersonToSite = self.debug(self.api.AddPersonToSite)
+ AddPersonToSite(self.auth, person_id, site_id)
+
+ # Add person to slice
+ slice_id = random.sample(self.slice_ids, 1)[0]
+ AddPersonToSlice = self.debug(self.api.AddPersonToSlice)
+ AddPersonToSlice(self.auth, person_id, slice_id)
+
+ # check role, key, site, slice
+ persons = GetPersons(self.auth, [person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ if persons is None or not persons: continue
+ person = persons[0]
+ self.islistequal([role], person['roles'], 'AddRoleToPerson - check')
+ self.islistequal([key_id], person['key_ids'], 'AddPersonKey - check')
+ self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check')
+ self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check')
+
+ persons = GetPersons(self.auth, person_ids)
+ if persons is not None:
+ self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal')
+
+ if self.config.verbose:
+ utils.header("Added users: %s" % person_ids)
+
+ return person_ids
def DeletePersons(self):
- # Delete users
+
+ # Delete attributes manually for first person
+ GetPersons = self.debug(self.api.GetPersons)
+ persons = GetPersons(self.auth, self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids', 'roles'])
+ if persons is None or not persons: return 0
+ person = persons[0]
+
+ if person['roles']:
+ # Delete role
+ role = random.sample(person['roles'], 1)[0]
+ DeleteRoleFromPerson = self.debug(self.api.DeleteRoleFromPerson)
+ DeleteRoleFromPerson(self.auth, role, person['person_id'])
+
+ if person['key_ids']:
+ # Delete key
+ key_id = random.sample(person['key_ids'], 1)[0]
+ DeleteKey = self.debug(self.api.DeleteKey)
+ DeleteKey(self.auth, key_id)
+
+ if person['site_ids']:
+ # Remove person from site
+ site_id = random.sample(person['site_ids'], 1)[0]
+ DeletePersonFromSite = self.debug(self.api.DeletePersonFromSite)
+ DeletePersonFromSite(self.auth, person['person_id'], site_id)
+
+ if person['slice_ids']:
+ # Remove person from slice
+ slice_id = random.sample(person['slice_ids'], 1)[0]
+ DeletePersonFromSlice = self.debug(self.api.DeletePersonFromSlice)
+ DeletePersonFromSlice(self.auth, person['person_id'], slice_id)
+
+ # check role, key, site, slice
+ persons = GetPersons(self.auth, [person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ if persons is None or not persons: return 0
+ person = persons[0]
+ self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check')
+ self.islistequal([], person['key_ids'], 'DeleteKey - check')
+ self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check')
+ self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check')
+
+ DeletePerson = self.debug(self.api.DeletePerson)
+ # Have DeletePeson automatically delete attriubtes for all other persons
for person_id in self.person_ids:
- # Remove from each site
- for site_id in self.site_ids:
- DeletePersonFromSite(person_id, site_id)
+ # Delete account
+ DeletePerson(self.auth, person_id)
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['site_ids']
+ # Check if persons are deleted
+ GetPersons = self.debug(self.api.GetPersons)
+ persons = GetPersons(self.auth, self.person_ids)
+ self.islistequal(persons, [], 'DeletePerson - check')
+
+ if self.config.verbose:
+ utils.header("Deleted users: %s" % self.person_ids)
- # Revoke roles
- person = GetPersons([person_id])[0]
- for role_id in person['role_ids']:
- DeleteRoleFromPerson(role_id, person_id)
+ self.person_ids = []
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['role_ids']
+ def KeyTypes(self, n = 2):
+ key_types = []
+ AddKeyType = self.debug(self.api.AddKeyType)
+ GetKeyTypes = self.debug(self.api.GetKeyTypes)
+ for i in range(n):
+ # Add key type
+ keytype = randstr(10)
+ result = AddKeyType(self.auth, keytype)
+ if result is None: continue
+
+ # Check key types
+ key_types.append(keytype)
+ keytypes = GetKeyTypes(self.auth)
+ if not keytypes: continue
+ keytypes = filter(lambda x: x in [keytype], keytypes)
+ if not keytypes: continue
+ self.isequal(keytype, keytypes[0], 'AddKeyType - isequal')
+
+ # Check all
+ keytypes = GetKeyTypes(self.auth)
+ if keytypes is not None:
+ keytypes = filter(lambda x: x in key_types, keytypes)
+ self.islistequal(key_types, keytypes, 'GetKeyTypes - isequal')
- # Disable account
- UpdatePerson(person_id, {'enabled': False})
+ if self.config.verbose:
+ utils.header("Added key types: %s" % key_types)
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['enabled']
+ return key_types
- # Delete account
- DeletePerson(person_id)
+ def DeleteKeyTypes(self):
+ DeleteKeyType = self.debug(self.api.DeleteKeyType)
+ GetKeyTypes = self.debug(self.api.GetKeyTypes)
+ for key_type in self.key_types:
+ result = DeleteKeyType(self.auth, key_type)
- if self.check:
- assert not GetPersons([person_id])
+ # Check if key types are deleted
+ key_types = GetKeyTypes(self.auth)
+ key_types = filter(lambda x: x in self.key_types, key_types)
+ self.islistequal(key_types, [], 'DeleteKeyType - check')
- if self.check:
- assert not GetPersons(self.person_ids)
+ if self.config.verbose:
+ utils.header("Deleted key types %s" % self.key_types)
- if self.verbose:
- print "Deleted users", self.person_ids
+ self.key_types = []
- self.person_ids = []
+ def Keys(self, n = 3):
+ key_ids = []
+ for i in range(n):
+ # Add a key to an account
+ key_fields = random_key()
+ person_id = random.sample(self.person_ids, 1)[0]
+ AddPersonKey = self.debug(self.api.AddPersonKey)
+ key_id = AddPersonKey(self.auth, person_id, key_fields)
+ if key_id is None: continue
+
+ # Should return a unique key_id
+ self.isunique(key_id, key_ids, 'AddPersonKey - isunique')
+ key_ids.append(key_id)
+ GetKeys = self.debug(self.api.GetKeys)
+ keys = GetKeys(self.auth, [key_id])
+ if keys is None: continue
+ key = keys[0]
+ self.isequal(key, key_fields, 'AddPersonKey - isequal')
+
+ # Update Key
+ key_fields = random_key()
+ UpdateKey = self.debug(self.api.UpdateKey)
+ result = UpdateKey(self.auth, key_id, key_fields)
+
+ keys = GetKeys(self.auth, [key_id])
+ if keys is None or not keys: continue
+ key = keys[0]
+ self.isequal(key, key_fields, 'UpdatePersonKey - isequal')
+
+ keys = GetKeys(self.auth, key_ids)
+ if keys is not None:
+ self.islistequal(key_ids, [key['key_id'] for key in keys], 'GetKeys - isequal')
+
+ if self.config.verbose:
+ utils.header("Added keys: %s" % key_ids)
+ return key_ids
+
+
+ def DeleteKeys(self):
+
+ # Blacklist first key, Delete rest
+ GetKeys = self.debug(self.api.GetKeys)
+ DeleteKey = self.debug(self.api.DeleteKey)
+ BlacklistKey = self.debug(self.api.BlacklistKey)
+
+ key_id = self.key_ids.pop()
+ BlacklistKey(self.auth, key_id)
+ keys = GetKeys(self.auth, [key_id])
+ self.islistequal(keys, [], 'BlacklistKey - check')
+
+ if self.config.verbose:
+ utils.header("Blacklisted key: %s" % key_id)
+
+ for key_id in self.key_ids:
+ DeleteKey(self.auth, key_id)
+
+ keys = GetKeys(self.auth, self.key_ids)
+ self.islistequal(keys, [], 'DeleteKey - check')
+
+ if self.config.verbose:
+ utils.header("Deleted keys: %s" % self.key_ids)
+
+ self.key_ids = []
+
+ def BootStates(self, n = 3):
+ boot_states = []
+ AddBootState = self.debug(self.api.AddBootState)
+ GetBootStates = self.debug(self.api.GetBootStates)
+ for i in range(n):
+ # Add boot state
+ bootstate_fields = randstr(10)
+ result = AddBootState(self.auth, bootstate_fields)
+ if result is None: continue
+
+ # Check boot states
+ boot_states.append(bootstate_fields)
+ bootstates = GetBootStates(self.auth)
+ if not bootstates: continue
+ bootstates = filter(lambda x: x in [bootstate_fields], bootstates)
+ if not bootstates: continue
+ bootstate = bootstates[0]
+ self.isequal(bootstate, bootstate_fields, 'AddBootState - isequal')
+
+ # Check all
+ bs = GetBootStates(self.auth)
+ if bs is not None:
+ bs = filter(lambda x: x in [boot_states], bs)
+ self.islistequal(boot_states, bs, 'GetBootStates - isequal')
+
+ if self.config.verbose:
+ utils.header("Added boot_states: %s" % boot_states)
+
+ return boot_states
+
+ def DeleteBootStates(self):
+ DeleteBootState = self.debug(self.api.DeleteBootState)
+ GetBootStates = self.debug(self.api.GetBootStates)
+ for boot_state in self.boot_states:
+ result = DeleteBootState(self.auth, boot_state)
+
+ # Check if bootsates are deleted
+ boot_states = GetBootStates(self.auth)
+ boot_states = filter(lambda x: x in self.boot_states, boot_states)
+ self.islistequal(boot_states, [], 'DeleteBootState check')
+
+ if self.config.verbose:
+ utils.header("Deleted boot_states: %s" % self.boot_states)
+
+ self.boot_states = []
+
+
+ def Peers(self, n = 2):
+ peer_ids = []
+ AddPeer = self.debug(self.api.AddPeer)
+ GetPeers = self.debug(self.api.GetPeers)
+ UpdatePeer = self.debug(self.api.UpdatePeer)
+ for i in range(n):
+ # Add Peer
+ peer_fields = random_peer()
+ peer_id = AddPeer(self.auth, peer_fields)
+
+ # Should return a unique id
+ self.isunique(peer_id, peer_ids, 'AddPeer - isunique')
+ peer_ids.append(peer_id)
+ peers = GetPeers(self.auth, [peer_id])
+ if peers is None: continue
+ peer = peers[0]
+ self.isequal(peer, peer_fields, 'AddPeer - isequal')
+
+ # Update Peer
+ peer_fields = random_peer()
+ result = UpdatePeer(self.auth, peer_id, peer_fields)
+
+ # Check again
+ peers = GetPeers(self.auth, [peer_id])
+ if peers is None: continue
+ peer = peers[0]
+ self.isequal(peer, peer_fields, 'UpdatePeer - isequal')
+
+ peers = GetPeers(self.auth, peer_ids)
+ if peers is not None:
+ self.islistequal(peer_ids, [peer['peer_id'] for peer in peers], 'GetPeers -isequal')
+
+ if self.config.verbose:
+ utils.header("Added peers: %s" % peer_ids)
+
+ return peer_ids
+
+
+ def DeletePeers(self):
+ # Delete all peers
+ DeletePeer = self.debug(self.api.DeletePeer)
+ GetPeers = self.debug(self.api.GetPeers)
+ for peer_id in self.peer_ids:
+ result = DeletePeer(self.auth, peer_id)
+
+ # Check if peers are deleted
+ peers = GetPeers(self.auth, self.peer_ids)
+ self.islistequal(peers, [], 'DeletePeer - check' % self.peer_ids)
+
+ if self.config.verbose:
+ utils.header("Deleted peers: %s" % self.peer_ids)
+ self.peer_ids =[]
+
+ def ConfFiles(self, n = 2):
+ conf_file_ids = []
+ for i in range(n):
+ # Add ConfFile
+ conf_file_fields = random_conf_file()
+ AddConfFile = self.debug(self.api.AddConfFile)
+ conf_file_id = AddConfFile(self.auth, conf_file_fields)
+ if conf_file_id is None: continue
+
+ # Should return a unique id
+ self.isunique(conf_file_id, conf_file_ids, 'AddConfFile - isunique')
+ conf_file_ids.append(conf_file_id)
+
+ # Get ConfFiles
+ GetConfFiles = self.debug(self.api.GetConfFiles)
+ conf_files = GetConfFiles(self.auth, [conf_file_id])
+ if conf_files is None: continue
+ conf_file = conf_files[0]
+ self.isequal(conf_file, conf_file_fields, 'AddConfFile - isunique')
+
+ # Update ConfFile
+ conf_file_fields = random_conf_file()
+ UpdateConfFile = self.debug(self.api.UpdateConfFile)
+ result = UpdateConfFile(self.auth, conf_file_id, conf_file_fields)
+
+ # Check again
+ conf_files = GetConfFiles(self.auth, [conf_file_id])
+ if conf_files is None: continue
+ conf_file = conf_files[0]
+ self.isequal(conf_file, conf_file_fields, 'UpdateConfFile - isunique')
+
+
+ # Add this conf file to a random node
+ node_id = random.sample(self.node_ids, 1)[0]
+ AddConfFileToNode = self.debug(self.api.AddConfFileToNode)
+ AddConfFileToNode(self.auth, conf_file_id, node_id)
+
+ # Add this conf file to a random node group
+ nodegroup_id = random.sample(self.nodegroup_ids, 1)[0]
+ AddConfFileToNodeGroup = self.debug(self.api.AddConfFileToNodeGroup)
+ AddConfFileToNodeGroup(self.auth, conf_file_id, nodegroup_id)
+
+ # Check node, nodegroup
+ conf_files = GetConfFiles(self.auth, [conf_file_id], ['node_ids', 'nodegroup_ids'])
+ if conf_files is None or not conf_files: continue
+ conf_file = conf_files[0]
+ self.islistequal([node_id], conf_file['node_ids'], 'AddConfFileToNode - check')
+ self.islistequal([nodegroup_id], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check')
+
+
+
+ conf_files = GetConfFiles(self.auth, conf_file_ids)
+ if conf_files is not None:
+ self.islistequal(conf_file_ids, [c['conf_file_id'] for c in conf_files], 'GetConfFiles - isequal')
+ if self.config.verbose:
+ utils.header("Added conf_files: %s" % conf_file_ids)
+
+ return conf_file_ids
+
+ def DeleteConfFiles(self):
+
+ GetConfFiles = self.debug(self.api.GetConfFiles)
+ DeleteConfFile = self.debug(self.api.DeleteConfFile)
+ DeleteConfFileFromNode = self.debug(self.api.DeleteConfFileFromNode)
+ DeleteConfFileFromNodeGroup = self.debug(self.api.DeleteConfFileFromNodeGroup)
+
+ conf_files = GetConfFiles(self.auth, self.conf_file_ids)
+ if conf_files is None or not conf_files: return 0
+ conf_file = conf_files[0]
+ if conf_file['node_ids']:
+ node_id = random.sample(conf_file['node_ids'], 1)[0]
+ DeleteConfFileFromNode(self.auth, conf_file['conf_file_id'], node_id)
+ if conf_file['nodegroup_ids']:
+ nodegroup_id = random.sample(conf_file['nodegroup_ids'], 1)[0]
+ DeleteConfFileFromNodeGroup(self.auth, conf_file['conf_file_id'], nodegroup_id)
+
+ # check
+ conf_files = GetConfFiles(self.auth, [conf_file['conf_file_id']], ['node_ids', 'nodegroup_ids'])
+ if conf_files is None or not conf_files: return 0
+ conf_file = conf_files[0]
+ self.islistequal([], conf_file['node_ids'], 'AddConfFileToNode - check')
+ self.islistequal([], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check')
+
+ for conf_file_id in self.conf_file_ids:
+ DeleteConfFile(self.auth, conf_file_id)
+
+ # check
+ conf_files = GetConfFiles(self.auth, self.conf_file_ids)
+ self.islistequal(conf_files, [], 'DeleteConfFile - check')
+
+ if self.config.verbose:
+ utils.header("Deleted conf_files: %s" % self.conf_file_ids)
+
+ self.conf_file_ids = []
+
+ def NodeNetworks(self, n = 4):
+ nodenetwork_ids = []
+ AddNodeNetwork = self.debug(self.api.AddNodeNetwork)
+ UpdateNodeNetwork = self.debug(self.api.UpdateNodeNetwork)
+ GetNodeNetworks = self.debug(self.api.GetNodeNetworks)
+
+ for i in range(n):
+ # Add Node Network
+ nodenetwork_fields = random_nodenetwork()
+ node_id = random.sample(self.node_ids, 1)[0]
+ nodenetwork_id = AddNodeNetwork(self.auth, node_id, nodenetwork_fields)
+ if nodenetwork_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_ids, nodenetwork_ids, 'AddNodeNetwork - isunique')
+ nodenetwork_ids.append(nodenetwork_id)
+
+ # check Node Network
+ nodenetworks = GetNodeNetworks(self.auth, [nodenetwork_id])
+ if nodenetworks is None: continue
+ nodenetwork = nodenetworks[0]
+ self.isequal(nodenetwork, nodenetwork_fields, 'AddNodeNetwork - isequal')
+
+ # Update NodeNetwork
+ nodenetwork_fields = random_nodenetwork()
+ UpdateNodeNetwork(self.auth, nodenetwork_id, nodenetwork_fields)
+
+ # Check again
+ nodenetworks = GetNodeNetworks(self.auth, [nodenetwork_id])
+ if nodenetworks is None: continue
+ nodenetwork = nodenetworks[0]
+ self.isequal(nodenetwork, nodenetwork_fields, 'UpdateNodeNetwork - isequal')
+
+ nodenetworks = GetNodeNetworks(self.auth, nodenetwork_ids)
+ if nodenetworks is not None:
+ self.islistequal(nodenetwork_ids, [n['nodenetwork_id'] for n in nodenetworks], 'GetNodeNetworks - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetworks: %s' % nodenetwork_ids)
+
+ return nodenetwork_ids
+
+ def DeleteNodeNetworks(self):
+ GetNodeNetworks = self.debug(self.api.GetNodeNetworks)
+ DeleteNodeNetwork = self.debug(self.api.DeleteNodeNetwork)
+
+ for nodenetwork_id in self.nodenetwork_ids:
+ DeleteNodeNetwork(self.auth, nodenetwork_id)
+
+ # check
+ nodenetworks = GetNodeNetworks(self.auth, self.nodenetwork_ids)
+ self.islistequal(nodenetworks, [], 'DeleteNodeNetwork - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetworks: %s " % self.nodenetwork_ids)
+ self.nodenetwork_ids = []
+
+ def NodeNetworkSettings(self, n=2):
+ nodenetwork_setting_ids = []
+ AddNodeNetworkSetting = self.debug(self.api.AddNodeNetworkSetting)
+ UpdateNodeNetworkSetting = self.debug(self.api.UpdateNodeNetworkSetting)
+ GetNodeNetworkSettings = self.debug(self.api.GetNodeNetworkSettings)
+
+ for nodenetwork_id in self.nodenetwork_ids:
+ # Add Node Network
+ nodenetwork_setting_fields = random_nodenetwork_setting()
+ #nodenetwork_id = random.sample(self.nodenetwork_ids, 1)[0]
+ nodenetwork_setting_type_id = random.sample(self.nodenetwork_setting_type_ids, 1)[0]
+ value = nodenetwork_setting_fields['value']
+ nodenetwork_setting_id = AddNodeNetworkSetting(self.auth, nodenetwork_id, nodenetwork_setting_type_id, value)
+ if nodenetwork_setting_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_setting_ids, nodenetwork_setting_ids, 'AddNodeNetworkSetting - isunique')
+ nodenetwork_setting_ids.append(nodenetwork_setting_id)
+
+ # check Node Network
+ nodenetwork_settings = GetNodeNetworkSettings(self.auth, [nodenetwork_setting_id])
+ if nodenetwork_settings is None: continue
+ nodenetwork_setting = nodenetwork_settings[0]
+ self.isequal(nodenetwork_setting, nodenetwork_setting_fields, 'AddNodeNetworkSetting - isequal')
+
+ # Update NodeNetworkSetting
+ nodenetwork_setting_fields = random_nodenetwork_setting()
+ value = nodenetwork_setting_fields['value']
+ UpdateNodeNetworkSetting(self.auth, nodenetwork_setting_id, value)
+
+ # Check again
+ nodenetwork_settings = GetNodeNetworkSettings(self.auth, [nodenetwork_setting_id])
+ if nodenetwork_settings is None: continue
+ nodenetwork_setting = nodenetwork_settings[0]
+ self.isequal(nodenetwork_setting, nodenetwork_setting_fields, 'UpdateNodeNetworkSetting - isequal')
+
+ nodenetwork_settings = GetNodeNetworkSettings(self.auth, nodenetwork_setting_ids)
+ if nodenetwork_settings is not None:
+ self.islistequal(nodenetwork_setting_ids, [n['nodenetwork_setting_id'] for n in nodenetwork_settings], 'GetNodeNetworkSettings - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetwork_settings: %s' % nodenetwork_setting_ids)
+
+ return nodenetwork_setting_ids
+
+ def DeleteNodeNetworkSettings(self):
+ GetNodeNetworkSettings = self.debug(self.api.GetNodeNetworkSettings)
+ DeleteNodeNetworkSetting = self.debug(self.api.DeleteNodeNetworkSetting)
+ for nodenetwork_setting_id in self.nodenetwork_setting_ids:
+ DeleteNodeNetworkSetting(self.auth, nodenetwork_setting_id)
+ # check
+ nodenetwork_settings = GetNodeNetworkSettings(self.auth, self.nodenetwork_setting_ids)
+ self.islistequal(nodenetwork_settings, [], 'DeleteNodeNetworkSetting - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetwork settings: %s " % self.nodenetwork_setting_ids)
+ self.nodenetwork_setting_ids = []
+
+ def NodeNetworkSettingTypes(self, n = 2):
+ nodenetwork_setting_type_ids = []
+ AddNodeNetworkSettingType = self.debug(self.api.AddNodeNetworkSettingType)
+ UpdateNodeNetworkSettingType = self.debug(self.api.UpdateNodeNetworkSettingType)
+ GetNodeNetworkSettingTypes = self.debug(self.api.GetNodeNetworkSettingTypes)
+
+ for i in range(n):
+ # Add Node Network Settings Type
+ nodenetwork_setting_type_fields = random_nodenetwork_setting_type()
+ nodenetwork_setting_type_id = AddNodeNetworkSettingType(self.auth, nodenetwork_setting_type_fields)
+ if nodenetwork_setting_type_id is None: continue
+
+ # Should return a unique id
+ self.isunique(nodenetwork_setting_type_ids, nodenetwork_setting_type_ids, 'AddNodeNetworkSettingType - isunique')
+ nodenetwork_setting_type_ids.append(nodenetwork_setting_type_id)
+
+ # check Node Network Settings Type
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(self.auth, [nodenetwork_setting_type_id])
+ if nodenetwork_setting_types is None: continue
+ nodenetwork_setting_type = nodenetwork_setting_types[0]
+ self.isequal(nodenetwork_setting_type, nodenetwork_setting_type_fields, 'AddNodeNetworkSettingType - isequal')
+
+ # Update NodeNetworkSetting
+ nodenetwork_setting_type_fields = random_nodenetwork_setting_type()
+ UpdateNodeNetworkSettingType(self.auth, nodenetwork_setting_type_id, nodenetwork_setting_type_fields)
+
+ # Check again
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(self.auth, [nodenetwork_setting_type_id])
+ if nodenetwork_setting_types is None: continue
+ nodenetwork_setting_type = nodenetwork_setting_types[0]
+ self.isequal(nodenetwork_setting_type, nodenetwork_setting_type_fields, 'UpdateNodeNetworkSettingType - isequal')
+
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(self.auth, nodenetwork_setting_type_ids)
+ if nodenetwork_setting_types is not None:
+ self.islistequal(nodenetwork_setting_type_ids, [n['nodenetwork_setting_type_id'] for n in nodenetwork_setting_types], 'GetNodeNetworkSettingTypes - isequal')
+
+ if self.config.verbose:
+ utils.header('Added nodenetwork_setting_types: %s' % nodenetwork_setting_type_ids)
+
+ return nodenetwork_setting_type_ids
+
+ def DeleteNodeNetworkSettingTypes(self):
+ GetNodeNetworkSettingTypes = self.debug(self.api.GetNodeNetworkSettingTypes)
+ DeleteNodeNetworkSettingType = self.debug(self.api.DeleteNodeNetworkSettingType)
+
+ for nodenetwork_setting_type_id in self.nodenetwork_setting_type_ids:
+ DeleteNodeNetworkSettingType(self.auth, nodenetwork_setting_type_id)
+
+ # check
+ nodenetwork_setting_types = GetNodeNetworkSettingTypes(self.auth, self.nodenetwork_setting_type_ids)
+ self.islistequal(nodenetwork_setting_types, [], 'DeleteNodeNetworkSettingType - check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodenetwork setting types: %s " % self.nodenetwork_setting_type_ids)
+ self.nodenetwork_setting_type_ids = []
+
+ def Messages(self, n = 2):
+ message_ids = []
+ AddMessage = self.debug(self.api.AddMessage)
+ GetMessages = self.debug(self.api.GetMessages)
+ UpdateMessage = self.debug(self.api.UpdateMessage)
+ for i in range(n):
+ # Add Message
+ message_fields = random_message()
+ message_id = message_fields['message_id']
+ AddMessage(self.auth, message_fields)
+ if message_id is None: continue
+
+ # Should return a unique id
+ self.isunique(message_id, message_ids, 'AddMessage - isunique')
+ message_ids.append(message_id)
+ messages = GetMessages(self.auth, [message_id])
+ if messages is None: continue
+ message = messages[0]
+ self.isequal(message, message_fields, 'AddMessage - isequal')
+
+ # Update message
+ message_fields = random_message()
+ result = UpdateMessage(self.auth, message_id, message_fields)
+
+ # Check again
+ messages = GetMessages(self.auth, [message_id])
+ if messages is None: continue
+ message = messages[0]
+ self.isequal(message, message_fields, 'UpdateMessage - isequal')
+
+ messages = GetMessages(self.auth, message_ids)
+ if messages is not None:
+ self.islistequal(message_ids, [m['message_id'] for m in messages], 'GetMessages - isequal')
+
+ if self.config.verbose:
+ utils.header("Added messages: %s" % message_ids)
+
+ return message_ids
+
+ def DeleteMessages(self):
+ # Delete all messages
+ DeleteMessage = self.debug(self.api.DeleteMessage)
+ GetMessages = self.debug(self.api.GetMessages)
+ for message_id in self.message_ids:
+ result = DeleteMessage(self.auth, message_id)
+
+ # Check if messages are deleted
+ messages = GetMessages(self.auth, self.message_ids)
+ self.islistequal(messages, [], 'DeleteMessage - check')
+
+ if self.config.verbose:
+ utils.header("Deleted messages: %s" % self.message_ids)
+
+ self.message_ids = []
+
+ def Sessions(self, n = 2):
+ session_ids = []
+ AddSession = self.debug(self.api.AddSession)
+ GetSession = self.debug(self.api.GetSession)
+ GetSessions = self.debug(self.api.GetSessions)
+ for i in range(n):
+ # Add session
+ person_id = random.sample(self.person_ids, 1)[0]
+ session_id = AddSession(self.auth, person_id)
+ if session_id is None: continue
+ session_ids.append(session_id)
+
+ # Check session
+ sessions = GetSessions(self.auth, [person_id])
+ if not sessions: continue
+ session = sessions[0]
+ sess_id = session['session_id']
+ self.islistequal([sess_id], [session_id], 'AddSession - isequal')
+
+ # GetSession creates session_id based on auth, so we must use the current auth
+ session_id = GetSession(self.auth)
+ if session_id is None: continue
+ session_ids.append(session_id)
+
+ # Check session
+ sessions = GetSessions(self.auth, [self.auth['Username']])
+ if not sessions: continue
+ session = sessions[0]
+ sess_id = session['session_id']
+ self.islistequal([sess_id], [session_id], 'GetSession - isequal')
+
+ # Check all
+ sessions = GetSessions(self.auth, session_ids)
+ if sessions is not None:
+ sess_ids = [s['session_id'] for s in sessions]
+ self.islistequal(sess_ids, session_ids, 'GetSessions - isequal')
+
+ if self.config.verbose:
+ utils.header("Added sessions: %s" % session_ids)
+
+ return session_ids
+
+ def DeleteSessions(self):
+ DeleteSession = self.debug(self.api.DeleteSession)
+ GetSessions = self.debug(self.api.GetSessions)
+
+ # DeleteSession deletes based on auth, so we must create auths for the sessions we delete
+ for session_id in self.session_ids:
+ sessions = GetSessions(self.auth, [session_id])
+ if not sessions: continue
+ session = sessions[0]
+ tmpauth = {
+ 'session': session['session_id'],
+ 'AuthMethod': 'session'
+ }
+
+ DeleteSession(tmpauth)
+
+ # Check if sessions are deleted
+ sessions = GetSessions(self.auth, self.session_ids)
+ self.islistequal(sessions, [], 'DeleteBootState check')
+
+ if self.config.verbose:
+ utils.header("Deleted sessions: %s" % self.session_ids)
+
+ self.session_ids = []
+
+ def GenerateNodeConfFile(self):
+ GetNodes = self.debug(self.api.GetNodes)
+ GetNodeNetworks = self.debug(self.api.GetNodeNetworks)
+ GenerateNodeConfFile = self.debug(self.api.GenerateNodeConfFile)
+
+ nodes = GetNodes(self.auth, self.node_ids)
+ nodes = filter(lambda n: n['nodenetwork_ids'], nodes)
+ if not nodes: return 0
+ node = nodes[0]
+ nodenetworks = GetNodeNetworks(self.auth, node['nodenetwork_ids'])
+ nodenetwork = nodenetworks[0]
+ parts = node['hostname'].split(".", 1)
+ host = parts[0]
+ domain = parts[1]
+ node_config = {
+ 'NODE_ID': node['node_id'],
+ 'NODE_KEY': node['key'],
+ 'IP_METHOD': nodenetwork['method'],
+ 'IP_ADDRESS': nodenetwork['ip'],
+ 'IP_GATEWAY': nodenetwork['gateway'],
+ 'IP_NETMASK': nodenetwork['netmask'],
+ 'IP_NETADDR': nodenetwork['network'],
+ 'IP_BROADCASTADDR': nodenetwork['broadcast'],
+ 'IP_DNS1': nodenetwork['dns1'],
+ 'IP_DNS2': nodenetwork['dns2'],
+ 'HOSTNAME': host,
+ 'DOMAIN_NAME': domain
+ }
+ node_config_file = GenerateNodeConfFile(self.auth, node['node_id'])
+ self.isequal(node_config_file, node_config, 'GenerateNodeConfFile - isequal')
+
+ if self.config.verbose:
+ utils.header("GenerateNodeConfFile")
+
+ def GetBootMedium(self):
+ pass
+
+
+ def GetEventObjects(self):
+ GetEventObjects = self.debug(self.api.GetEventObjects)
+ GetEventObjects(self.auth)
+
+ if self.config.verbose:
+ utils.header("GetEventObjects")
+
+ def GetEvents(self):
+ GetEvents = self.debug(self.api.GetEvents)
+ GetEvents(self.auth)
+
+ if self.config.verbose:
+ utils.header("GetEvents")
+
+ def GetPeerData(self):
+ GetPeers = self.debug(self.api.GetPeers)
+ GetPeerData = self.debug(self.api.GetPeerData)
+
+ peers = GetPeers(self.auth)
+ if peers is None or not peers: return 0
+ peer = peers[0]
+ peer_data = GetPeerData(self.auth)
+
+ # Manuall construt peer data
+
+ if self.config.verbose:
+ utils.header("GetPeerData")
+
+ def GetPeerName(self):
+ # GetPeerName should return the same as api.config.PLC_NAME
+ GetPeerName = self.debug(self.api.GetPeerName)
+ peer_name = GetPeerName(self.auth)
+ self.islistequal([peer_name], [self.api.config.PLC_NAME], 'GetPeerName - isequal')
+
+ if self.config.verbose:
+ utils.header("GetPeerName")
+
+ def GetPlcRelease(self):
+ GetPlcRelease = self.debug(self.api.GetPlcRelease)
+ plc_release = GetPlcRelease(self.auth)
+
+ if self.config.verbose:
+ utils.header("GetPlcRelease")
+
+ def GetSliceKeys(self):
+ GetSliceKeys = self.debug(self.api.GetSliceKeys)
+ GetSlices = self.debug(self.api.GetSlices)
+
+ slices = GetSlices(self.auth, self.slice_ids)
+ if not slices: return 0
+ slices = filter(lambda s: s['person_ids'], slices)
+ if not slices: return 0
+ slice = slices[0]
+
+ slice_keys = GetSliceKeys(self.auth, [slice['slice_id']])
+ # XX Manually construct slice_keys for this slice and compare
+
+ if self.config.verbose:
+ utils.header("GetSliceKeys(%s)" % [slice['slice_id']])
+
+ def GetSliceTicket(self):
+ GetSliceTicket = self.debug(self.api.GetSliceTicket)
+
+ slice_id = random.sample(self.slice_ids, 1)[0]
+ slice_ticket = GetSliceTicket(self.auth, slice_id)
+
+ if self.config.verbose:
+ utils.header("GetSliceTicket(%s)" % slice_id)
+
+ def GetSlicesMD5(self):
+ GetSlicesMD5 = self.debug(self.api.GetSlicesMD5)
+
+ slices_md5 = GetSlicesMD5(self.auth)
+
+ if self.config.verbose:
+ utils.header("GetSlicesMD5")
+
+ def GetSlivers(self):
+ GetSlivers = self.debug(self.api.GetSlivers)
+ GetNodes = self.debug(self.api.GetNodes)
+ nodes = GetNodes(self.auth, self.node_ids)
+ if nodes is None or not nodes: return 0
+ nodes = filter(lambda n: n['slice_ids'], nodes)
+ if not nodes: return 0
+ node = nodes[0]
+
+ slivers = GetSlivers(self.auth, node['node_id'])
+
+ # XX manually create slivers object and compare
+
+ if self.config.verbose:
+ utils.header("GetSlivers(%s)" % node['node_id'])
+
+ def GetWhitelist(self):
+ GetWhitelist = self.debug(self.api.GetWhitelist)
+ GetNodes = self.debug(self.api.GetNodes)
+
+ whitelists = GetWhitelist(self.auth, self.node_ids)
+ nodes = GetNodes(self.auth, self.node_ids)
+ if nodes is None or not nodes: return 0
+ nodes = filter(lambda n: n['slice_ids_whitelist'], nodes)
+ self.islistequal(whitelists, nodes, 'GetWhitelist - isequal')
+
+ if self.config.verbose:
+ utils.header("GetWhitelist")
+
+ def NotifyPersons(self):
+ NotifyPersons = self.debug(self.api.NotifyPersons)
+ person_id = random.sample(self.person_ids, 1)[0]
+
+ NotifyPersons(self.auth, [person_id], 'QA Test', 'Welcome')
+
+ if self.config.verbose:
+ utils.header('NotifyPersons(%s)' % [person_id])
+
+ def NotifySupport(self):
+ NotifySupport = self.debug(self.api.NotifySupport)
+ NotifySupport(self.auth, 'QA Test', 'Support Request')
+
+ if self.config.verbose:
+ utils.header('NotifSupport')
+
+ def RebootNode(self):
+ RebootNode = self.debug(self.api.RebootNode)
+ node_id = random.sample(self.node_ids, 1)[0]
+ RebootNode(self.auth, node_id)
+
+ if self.config.verbose:
+ utils.header('RebootNode(%s)' % node_id)
+
+ def ResetPassword(self):
+ ResetPassword = self.debug(self.api.ResetPassword)
+ person_id = random.sample(self.person_ids, 1)[0]
+ ResetPassword(self.auth, person_id)
+
+ if self.config.verbose:
+ utils.header('ResetPassword(%s)' % person_id)
+
+ def SetPersonPrimarySite(self):
+ SetPersonPrimarySite = self.debug(self.api.SetPersonPrimarySite)
+ GetPersons = self.debug(self.api.GetPersons)
+ person_id = random.sample(self.person_ids, 1)[0]
+ persons = GetPersons(self.auth, person_id)
+ if not persons: return 0
+ person = persons[0]
+ site_id = random.sample(person['site_ids'], 1)[0]
+ SetPersonPrimarySite(self.auth, person_id, site_id)
+
+ if self.config.verbose:
+ utils.header('SetPersonPrimarySite(%s, %s)' % (person_id, site_id))
+
+ def VerifyPerson(self):
+ VerifyPerson = self.debug(self.api.VerifyPerson)
+ UpdatePerson = self.debug(self.api.UpdatePerson)
+ GetPersons = self.debug(self.api.GetPersons)
+
+ # can only verify new (disabled) accounts
+ person_id = random.sample(self.person_ids, 1)[0]
+ persons = GetPersons(self.auth, [person_id])
+ if persons is None or not persons: return 0
+ person = persons[0]
+ UpdatePerson(self.auth, person['person_id'], {'enabled': False})
+ VerifyPerson(self.auth, person['person_id'])
+
+ if self.config.verbose:
+ utils.header('VerifyPerson(%s)' % person_id)
+
if __name__ == '__main__':
args = tuple(sys.argv[1:])
api_unit_test()(*args)