'boot_state': random.sample(boot_states, 1)[0],
'model': randstr(255),
'version': randstr(64),
+ 'session': randstr(20)
}
def random_nodenetwork():
def random_message():
return {
+ 'message_id': randstr(10),
'subject': randstr(100),
'template': randstr(254),
}
return wrapper
def cleanup(self):
+ if hasattr(self, 'session_ids'): self.DeleteSessions()
+ if hasattr(self, 'message_ids'): self.DeleteMessages()
if hasattr(self, 'key_types'): self.DeleteKeyTypes()
if hasattr(self, 'key_ids'): self.DeleteKeys()
if hasattr(self, 'person_ids'): self.DeletePersons()
if hasattr(self, 'peer_ids'): self.DeletePeers()
if hasattr(self, 'site_ids'): self.DeleteSites()
if hasattr(self, 'boot_states'): self.DeleteBootStates()
-
+
def Sites(self, n=4):
site_ids = []
for i in range(n):
# Add Message
message_fields = random_message()
- message_id = AddMessage(auth, message_fields)
+ message_id = message_fields['message_id']
+ AddMessage(auth, message_fields)
if message_id is None: continue
# Should return a unique id
# Check again
messages = GetMessages(auth, [message_id])
- if messagess is None: continue
+ if messages is None: continue
message = messages[0]
self.isequal(message, message_fields, 'UpdateMessage - isequal')
def DeleteMessages(self):
# Delete all messages
DeleteMessage = self.debug(api.DeleteMessage)
- GetMessage = self.debug(api.GetMessages)
+ GetMessages = self.debug(api.GetMessages)
for message_id in self.message_ids:
result = DeleteMessage(auth, message_id)
def Sessions(self, n = 2):
session_ids = []
- AddSession = self.debug(api.AddBootState)
+ AddSession = self.debug(api.AddSession)
GetSession = self.debug(api.GetSession)
GetSessions = self.debug(api.GetSessions)
for i in range(n):
session_ids.append(session_id)
# Check session
- sessions = GetSessions(auth, person_id)
+ sessions = GetSessions(auth, [person_id])
if not sessions: continue
session = sessions[0]
sess_id = session['session_id']
session_ids.append(session_id)
# Check session
- sessions = GetSessions(auth, auth['Username'])
+ sessions = GetSessions(auth, [auth['Username']])
if not sessions: continue
session = sessions[0]
sess_id = session['session_id']
if not sessions: continue
session = sessions[0]
tmpauth = {
- 'AuthString': session['session_id'],
+ 'session': session['session_id'],
'AuthMethod': 'session'
}
self.session_ids = []
def GenerateNodeConfFile(self):
+ GetNodes = self.debug(api.GetNodes)
+ GetNodeNetworks = self.debug(api.GetNodeNetworks)
+ GenerateNodeConfFile = self.debug(api.GenerateNodeConfFile)
+
+ nodes = GetNodes(auth, self.node_ids)
+ nodes = filter(lambda n: n['nodenetwork_ids'], nodes)
+ if not nodes: return 0
+ node = nodes[0]
+ nodenetworks = GetNodeNetworks(auth, node['nodenetwork_ids'])
+ nodenetwork = nodenetworks[0]
+ parts = node['hostname'].split(".", 1)
+ host = parts[0]
+ domain = parts[1]
+ node_config = {
+ 'NODE_ID': node['node_id'],
+ 'NODE_KEY': node['key'],
+ 'IP_METHOD': nodenetwork['method'],
+ 'IP_ADDRESS': nodenetwork['ip'],
+ 'IP_GATEWAY': nodenetwork['gateway'],
+ 'IP_NETMASK': nodenetwork['netmask'],
+ 'IP_NETADDR': nodenetwork['network'],
+ 'IP_BROADCASTADDR': nodenetwork['broadcast'],
+ 'IP_DNS1': nodenetwork['dns1'],
+ 'IP_DNS2': nodenetwork['dns2'],
+ 'HOSTNAME': host,
+ 'DOMAIN_NAME': domain
+ }
+ node_config_file = GenerateNodeConfFile(auth, node['node_id'])
+ self.isequal(node_config_file, node_config, 'GenerateNodeConfFile - isequal')
+
+ if self.config.verbose:
+ utils.header("GenerateNodeConfFile")
+
+ def GetBootMedium(self):
pass
+
+ def GetEventObjects(self):
+ GetEventObjects = self.debug(api.GetEventObjects)
+ GetEventObjects(auth)
+
+ if self.config.verbose:
+ utils.header("GetEventObjects")
+
+ def GetEvents(self):
+ GetEvents = self.debug(api.GetEvents)
+ GetEvents(auth)
+
+ if self.config.verbose:
+ utils.header("GetEvents")
+
+ def GetPeerData(self):
+ GetPeers = self.debug(api.GetPeers)
+ GetPeerData = self.debug(api.GetPeerData)
+
+ peers = GetPeers(auth)
+ if peers is None or not peers: return 0
+ peer = peers[0]
+ peer_data = GetPeerData(auth)
+
+ # Manuall construt peer data
+
+ if self.config.verbose:
+ utils.header("GetPeerData")
+
+ def GetPeerName(self):
+ # GetPeerName should return the same as api.config.PLC_NAME
+ GetPeerName = self.debug(api.GetPeerName)
+ peer_name = GetPeerName(auth)
+ self.islistequal([peer_name], [api.config.PLC_NAME], 'GetPeerName - isequal')
+
+ if self.config.verbose:
+ utils.header("GetPeerName")
+ def GetPlcRelease(self):
+ GetPlcRelease = self.debug(api.GetPlcRelease)
+ plc_release = GetPlcRelease(auth)
+
+ if self.config.verbose:
+ utils.header("GetPlcRelease")
+
+ def GetSliceKeys(self):
+ GetSliceKeys = self.debug(api.GetSliceKeys)
+ GetSlices = self.debug(api.GetSlices)
+
+ slices = GetSlices(auth, self.slice_ids)
+ if not slices: return 0
+ slices = filter(lambda s: s['person_ids'], slices)
+ if not slices: return 0
+ slice = slices[0]
+
+ slice_keys = GetSliceKeys(auth, [slice['slice_id']])
+ # XX Manually construct slice_keys for this slice and compare
+
+ if self.config.verbose:
+ utils.header("GetSliceKeys(%s)" % [slice['slice_id']])
+
+ def GetSliceTicket(self):
+ GetSliceTicket = self.debug(api.GetSliceTicket)
+
+ slice_id = random.sample(self.slice_ids, 1)[0]
+ slice_ticket = GetSliceTicket(auth, slice_id)
+
+ if self.config.verbose:
+ utils.header("GetSliceTicket(%s)" % slice_id)
+
+ def GetSlicesMD5(self):
+ GetSlicesMD5 = self.debug(api.GetSlicesMD5)
+
+ slices_md5 = GetSlicesMD5(auth)
+
+ if self.config.verbose:
+ utils.header("GetSlicesMD5")
+
+ def GetSlivers(self):
+ GetSlivers = self.debug(api.GetSlivers)
+ GetNodes = self.debug(api.GetNodes)
+ nodes = GetNodes(auth, self.node_ids)
+ if nodes is None or not nodes: return 0
+ nodes = filter(lambda n: n['slice_ids'], nodes)
+ if not nodes: return 0
+ node = nodes[0]
+
+ slivers = GetSlivers(auth, node['node_id'])
+
+ # XX manually create slivers object and compare
+
+ if self.config.verbose:
+ utils.header("GetSlivers(%s)" % node['node_id'])
+
+ def GetWhitelist(self):
+ GetWhitelist = self.debug(api.GetWhitelist)
+ GetNodes = self.debug(api.GetNodes)
+
+ whitelists = GetWhitelist(auth, self.node_ids)
+ nodes = GetNodes(auth, self.node_ids)
+ if nodes is None or not nodes: return 0
+ nodes = filter(lambda n: n['slice_ids_whitelist'], nodes)
+ self.islistequal(whitelists, nodes, 'GetWhitelist - isequal')
+
+ if self.config.verbose:
+ utils.header("GetWhitelist")
+
+ def NotifyPersons(self):
+ NotifyPersons = self.debug(api.NotifyPersons)
+ person_id = random.sample(self.person_ids, 1)[0]
+
+ NotifyPersons(auth, [person_id], 'QA Test', 'Welcome')
+
+ if self.config.verbose:
+ utils.header('NotifyPersons(%s)' % [person_id])
+
+ def NotifySupport(self):
+ NotifySupport = self.debug(api.NotifySupport)
+ NotifySupport(auth, 'QA Test', 'Support Request')
+
+ if self.config.verbose:
+ utils.header('NotifSupport')
+
+ def RebootNode(self):
+ RebootNode = self.debug(api.RebootNode)
+ node_id = random.sample(self.node_ids, 1)[0]
+ RebootNode(auth, node_id)
+
+ if self.config.verbose:
+ utils.header('RebootNode(%s)' % node_id)
+
+ def ResetPassword(self):
+ ResetPassword = self.debug(api.ResetPassword)
+ person_id = random.sample(self.person_ids, 1)[0]
+ ResetPassword(auth, person_id)
+
+ if self.config.verbose:
+ utils.header('ResetPassword(%s)' % person_id)
+
+ def SetPersonPrimarySite(self):
+ SetPersonPrimarySite = self.debug(api.SetPersonPrimarySite)
+ GetPersons = self.debug(api.GetPersons)
+ person_id = random.sample(self.person_ids, 1)[0]
+ persons = GetPersons(auth, person_id)
+ if not persons: return 0
+ person = persons[0]
+ site_id = random.sample(person['site_ids'], 1)[0]
+ SetPersonPrimarySite(auth, person_id, site_id)
+
+ if self.config.verbose:
+ utils.header('SetPersonPrimarySite(%s, %s)' % (person_id, site_id))
+
+ def VerifyPerson(self):
+ VerifyPerson = self.debug(api.VerifyPerson)
+ UpdatePerson = self.debug(api.UpdatePerson)
+ GetPersons = self.debug(api.GetPersons)
+
+ # can only verify new (disabled) accounts
+ person_id = random.sample(self.person_ids, 1)[0]
+ persons = GetPersons(auth, [person_id])
+ if persons is None or not persons: return 0
+ person = persons[0]
+ UpdatePerson(auth, person['person_id'], {'enabled': False})
+ VerifyPerson(auth, person['person_id'])
+
+ if self.config.verbose:
+ utils.header('VerifyPerson(%s)' % person_id)
+
if __name__ == '__main__':
args = tuple(sys.argv[1:])
api_unit_test()(*args)