From 76a2ef66023da22c7fda68b21fd9d8d49708ecba Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Fri, 22 Feb 2008 18:19:04 +0000 Subject: [PATCH] test misc Get and administrative methods --- qaapi/qa/tests/api_unit_test.py | 222 ++++++++++++++++++++++++++++++-- 1 file changed, 214 insertions(+), 8 deletions(-) diff --git a/qaapi/qa/tests/api_unit_test.py b/qaapi/qa/tests/api_unit_test.py index b2d1c49..dc3a523 100755 --- a/qaapi/qa/tests/api_unit_test.py +++ b/qaapi/qa/tests/api_unit_test.py @@ -194,6 +194,7 @@ def random_node(): 'boot_state': random.sample(boot_states, 1)[0], 'model': randstr(255), 'version': randstr(64), + 'session': randstr(20) } def random_nodenetwork(): @@ -301,6 +302,7 @@ def random_role(): def random_message(): return { + 'message_id': randstr(10), 'subject': randstr(100), 'template': randstr(254), } @@ -467,6 +469,8 @@ class api_unit_test(Test): return wrapper def cleanup(self): + if hasattr(self, 'session_ids'): self.DeleteSessions() + if hasattr(self, 'message_ids'): self.DeleteMessages() if hasattr(self, 'key_types'): self.DeleteKeyTypes() if hasattr(self, 'key_ids'): self.DeleteKeys() if hasattr(self, 'person_ids'): self.DeletePersons() @@ -492,7 +496,7 @@ class api_unit_test(Test): if hasattr(self, 'peer_ids'): self.DeletePeers() if hasattr(self, 'site_ids'): self.DeleteSites() if hasattr(self, 'boot_states'): self.DeleteBootStates() - + def Sites(self, n=4): site_ids = [] @@ -2055,7 +2059,8 @@ class api_unit_test(Test): for i in range(n): # Add Message message_fields = random_message() - message_id = AddMessage(auth, message_fields) + message_id = message_fields['message_id'] + AddMessage(auth, message_fields) if message_id is None: continue # Should return a unique id @@ -2072,7 +2077,7 @@ class api_unit_test(Test): # Check again messages = GetMessages(auth, [message_id]) - if messagess is None: continue + if messages is None: continue message = messages[0] self.isequal(message, message_fields, 'UpdateMessage - isequal') @@ -2088,7 +2093,7 @@ class api_unit_test(Test): def DeleteMessages(self): # Delete all messages DeleteMessage = self.debug(api.DeleteMessage) - GetMessage = self.debug(api.GetMessages) + GetMessages = self.debug(api.GetMessages) for message_id in self.message_ids: result = DeleteMessage(auth, message_id) @@ -2103,7 +2108,7 @@ class api_unit_test(Test): def Sessions(self, n = 2): session_ids = [] - AddSession = self.debug(api.AddBootState) + AddSession = self.debug(api.AddSession) GetSession = self.debug(api.GetSession) GetSessions = self.debug(api.GetSessions) for i in range(n): @@ -2114,7 +2119,7 @@ class api_unit_test(Test): session_ids.append(session_id) # Check session - sessions = GetSessions(auth, person_id) + sessions = GetSessions(auth, [person_id]) if not sessions: continue session = sessions[0] sess_id = session['session_id'] @@ -2126,7 +2131,7 @@ class api_unit_test(Test): session_ids.append(session_id) # Check session - sessions = GetSessions(auth, auth['Username']) + sessions = GetSessions(auth, [auth['Username']]) if not sessions: continue session = sessions[0] sess_id = session['session_id'] @@ -2153,7 +2158,7 @@ class api_unit_test(Test): if not sessions: continue session = sessions[0] tmpauth = { - 'AuthString': session['session_id'], + 'session': session['session_id'], 'AuthMethod': 'session' } @@ -2169,9 +2174,210 @@ class api_unit_test(Test): self.session_ids = [] def GenerateNodeConfFile(self): + GetNodes = self.debug(api.GetNodes) + GetNodeNetworks = self.debug(api.GetNodeNetworks) + GenerateNodeConfFile = self.debug(api.GenerateNodeConfFile) + + nodes = GetNodes(auth, self.node_ids) + nodes = filter(lambda n: n['nodenetwork_ids'], nodes) + if not nodes: return 0 + node = nodes[0] + nodenetworks = GetNodeNetworks(auth, node['nodenetwork_ids']) + nodenetwork = nodenetworks[0] + parts = node['hostname'].split(".", 1) + host = parts[0] + domain = parts[1] + node_config = { + 'NODE_ID': node['node_id'], + 'NODE_KEY': node['key'], + 'IP_METHOD': nodenetwork['method'], + 'IP_ADDRESS': nodenetwork['ip'], + 'IP_GATEWAY': nodenetwork['gateway'], + 'IP_NETMASK': nodenetwork['netmask'], + 'IP_NETADDR': nodenetwork['network'], + 'IP_BROADCASTADDR': nodenetwork['broadcast'], + 'IP_DNS1': nodenetwork['dns1'], + 'IP_DNS2': nodenetwork['dns2'], + 'HOSTNAME': host, + 'DOMAIN_NAME': domain + } + node_config_file = GenerateNodeConfFile(auth, node['node_id']) + self.isequal(node_config_file, node_config, 'GenerateNodeConfFile - isequal') + + if self.config.verbose: + utils.header("GenerateNodeConfFile") + + def GetBootMedium(self): pass + + def GetEventObjects(self): + GetEventObjects = self.debug(api.GetEventObjects) + GetEventObjects(auth) + + if self.config.verbose: + utils.header("GetEventObjects") + + def GetEvents(self): + GetEvents = self.debug(api.GetEvents) + GetEvents(auth) + + if self.config.verbose: + utils.header("GetEvents") + + def GetPeerData(self): + GetPeers = self.debug(api.GetPeers) + GetPeerData = self.debug(api.GetPeerData) + + peers = GetPeers(auth) + if peers is None or not peers: return 0 + peer = peers[0] + peer_data = GetPeerData(auth) + + # Manuall construt peer data + + if self.config.verbose: + utils.header("GetPeerData") + + def GetPeerName(self): + # GetPeerName should return the same as api.config.PLC_NAME + GetPeerName = self.debug(api.GetPeerName) + peer_name = GetPeerName(auth) + self.islistequal([peer_name], [api.config.PLC_NAME], 'GetPeerName - isequal') + + if self.config.verbose: + utils.header("GetPeerName") + def GetPlcRelease(self): + GetPlcRelease = self.debug(api.GetPlcRelease) + plc_release = GetPlcRelease(auth) + + if self.config.verbose: + utils.header("GetPlcRelease") + + def GetSliceKeys(self): + GetSliceKeys = self.debug(api.GetSliceKeys) + GetSlices = self.debug(api.GetSlices) + + slices = GetSlices(auth, self.slice_ids) + if not slices: return 0 + slices = filter(lambda s: s['person_ids'], slices) + if not slices: return 0 + slice = slices[0] + + slice_keys = GetSliceKeys(auth, [slice['slice_id']]) + # XX Manually construct slice_keys for this slice and compare + + if self.config.verbose: + utils.header("GetSliceKeys(%s)" % [slice['slice_id']]) + + def GetSliceTicket(self): + GetSliceTicket = self.debug(api.GetSliceTicket) + + slice_id = random.sample(self.slice_ids, 1)[0] + slice_ticket = GetSliceTicket(auth, slice_id) + + if self.config.verbose: + utils.header("GetSliceTicket(%s)" % slice_id) + + def GetSlicesMD5(self): + GetSlicesMD5 = self.debug(api.GetSlicesMD5) + + slices_md5 = GetSlicesMD5(auth) + + if self.config.verbose: + utils.header("GetSlicesMD5") + + def GetSlivers(self): + GetSlivers = self.debug(api.GetSlivers) + GetNodes = self.debug(api.GetNodes) + nodes = GetNodes(auth, self.node_ids) + if nodes is None or not nodes: return 0 + nodes = filter(lambda n: n['slice_ids'], nodes) + if not nodes: return 0 + node = nodes[0] + + slivers = GetSlivers(auth, node['node_id']) + + # XX manually create slivers object and compare + + if self.config.verbose: + utils.header("GetSlivers(%s)" % node['node_id']) + + def GetWhitelist(self): + GetWhitelist = self.debug(api.GetWhitelist) + GetNodes = self.debug(api.GetNodes) + + whitelists = GetWhitelist(auth, self.node_ids) + nodes = GetNodes(auth, self.node_ids) + if nodes is None or not nodes: return 0 + nodes = filter(lambda n: n['slice_ids_whitelist'], nodes) + self.islistequal(whitelists, nodes, 'GetWhitelist - isequal') + + if self.config.verbose: + utils.header("GetWhitelist") + + def NotifyPersons(self): + NotifyPersons = self.debug(api.NotifyPersons) + person_id = random.sample(self.person_ids, 1)[0] + + NotifyPersons(auth, [person_id], 'QA Test', 'Welcome') + + if self.config.verbose: + utils.header('NotifyPersons(%s)' % [person_id]) + + def NotifySupport(self): + NotifySupport = self.debug(api.NotifySupport) + NotifySupport(auth, 'QA Test', 'Support Request') + + if self.config.verbose: + utils.header('NotifSupport') + + def RebootNode(self): + RebootNode = self.debug(api.RebootNode) + node_id = random.sample(self.node_ids, 1)[0] + RebootNode(auth, node_id) + + if self.config.verbose: + utils.header('RebootNode(%s)' % node_id) + + def ResetPassword(self): + ResetPassword = self.debug(api.ResetPassword) + person_id = random.sample(self.person_ids, 1)[0] + ResetPassword(auth, person_id) + + if self.config.verbose: + utils.header('ResetPassword(%s)' % person_id) + + def SetPersonPrimarySite(self): + SetPersonPrimarySite = self.debug(api.SetPersonPrimarySite) + GetPersons = self.debug(api.GetPersons) + person_id = random.sample(self.person_ids, 1)[0] + persons = GetPersons(auth, person_id) + if not persons: return 0 + person = persons[0] + site_id = random.sample(person['site_ids'], 1)[0] + SetPersonPrimarySite(auth, person_id, site_id) + + if self.config.verbose: + utils.header('SetPersonPrimarySite(%s, %s)' % (person_id, site_id)) + + def VerifyPerson(self): + VerifyPerson = self.debug(api.VerifyPerson) + UpdatePerson = self.debug(api.UpdatePerson) + GetPersons = self.debug(api.GetPersons) + + # can only verify new (disabled) accounts + person_id = random.sample(self.person_ids, 1)[0] + persons = GetPersons(auth, [person_id]) + if persons is None or not persons: return 0 + person = persons[0] + UpdatePerson(auth, person['person_id'], {'enabled': False}) + VerifyPerson(auth, person['person_id']) + + if self.config.verbose: + utils.header('VerifyPerson(%s)' % person_id) + if __name__ == '__main__': args = tuple(sys.argv[1:]) api_unit_test()(*args) -- 2.43.0