'role_id': randint(1000),
'name': randstr(50)
}
-
+
+def random_message():
+ return {
+ 'subject': randstr(100),
+ 'template': randstr(254),
+ }
+
class api_unit_test(Test):
logfile = Logfile("api-unittest-summary.log")
if hasattr(self, 'KeyTypes'): self.key_types = self.KeyTypes(key_types)
if hasattr(self, 'Keys'): self.key_ids = self.Keys(keys)
if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages)
- if hasattr(self, 'Sessions'): self.Sessions()
+ if hasattr(self, 'Sessions'): self.session_ids = self.Sessions()
# Test misc Get calls
if hasattr(self, 'GenerateNodeConfFile'): self.GenerateNodeConfFile()
utils.header("Deleted nodenetwork setting types: %s " % self.nodenetwork_setting_type_ids)
self.nodenetwork_setting_type_ids = []
+ def Messages(self, n = 2):
+ message_ids = []
+ AddMessage = self.debug(api.AddMessage)
+ GetMessages = self.debug(api.GetMessages)
+ UpdateMessage = self.debug(api.UpdateMessage)
+ for i in range(n):
+ # Add Message
+ message_fields = random_message()
+ message_id = AddMessage(auth, message_fields)
+ if message_id is None: continue
+
+ # Should return a unique id
+ self.isunique(message_id, message_ids, 'AddMessage - isunique')
+ message_ids.append(message_id)
+ messages = GetMessages(auth, [message_id])
+ if messages is None: continue
+ message = messages[0]
+ self.isequal(message, message_fields, 'AddMessage - isequal')
+
+ # Update message
+ message_fields = random_message()
+ result = UpdateMessage(auth, message_id, message_fields)
+
+ # Check again
+ messages = GetMessages(auth, [message_id])
+ if messagess is None: continue
+ message = messages[0]
+ self.isequal(message, message_fields, 'UpdateMessage - isequal')
+
+ messages = GetMessages(auth, message_ids)
+ if messages is not None:
+ self.islistequal(message_ids, [m['message_id'] for m in messages], 'GetMessages - isequal')
+
+ if self.config.verbose:
+ utils.header("Added messages: %s" % message_ids)
+
+ return message_ids
+
+ def DeleteMessages(self):
+ # Delete all messages
+ DeleteMessage = self.debug(api.DeleteMessage)
+ GetMessage = self.debug(api.GetMessages)
+ for message_id in self.message_ids:
+ result = DeleteMessage(auth, message_id)
+
+ # Check if messages are deleted
+ messages = GetMessages(auth, self.message_ids)
+ self.islistequal(messages, [], 'DeleteMessage - check')
+
+ if self.config.verbose:
+ utils.header("Deleted messages: %s" % self.message_ids)
+
+ self.message_ids = []
+
+ def Sessions(self, n = 2):
+ session_ids = []
+ AddSession = self.debug(api.AddBootState)
+ GetSession = self.debug(api.GetSession)
+ GetSessions = self.debug(api.GetSessions)
+ for i in range(n):
+ # Add session
+ person_id = random.sample(self.person_ids, 1)[0]
+ session_id = AddSession(auth, person_id)
+ if session_id is None: continue
+ session_ids.append(session_id)
+
+ # Check session
+ sessions = GetSessions(auth, person_id)
+ if not sessions: continue
+ session = sessions[0]
+ sess_id = session['session_id']
+ self.islistequal([sess_id], [session_id], 'AddSession - isequal')
+
+ # GetSession creates session_id based on auth, so we must use the current auth
+ session_id = GetSession(auth)
+ if session_id is None: continue
+ session_ids.append(session_id)
+
+ # Check session
+ sessions = GetSessions(auth, auth['Username'])
+ if not sessions: continue
+ session = sessions[0]
+ sess_id = session['session_id']
+ self.islistequal([sess_id], [session_id], 'GetSession - isequal')
+
+ # Check all
+ sessions = GetSessions(auth, session_ids)
+ if sessions is not None:
+ sess_ids = [s['session_id'] for s in sessions]
+ self.islistequal(sess_ids, session_ids, 'GetSessions - isequal')
+
+ if self.config.verbose:
+ utils.header("Added sessions: %s" % session_ids)
+
+ return session_ids
+
+ def DeleteSessions(self):
+ DeleteSession = self.debug(api.DeleteSession)
+ GetSessions = self.debug(api.GetSessions)
+
+ # DeleteSession deletes based on auth, so we must create auths for the sessions we delete
+ for session_id in self.session_ids:
+ sessions = GetSessions(auth, [session_id])
+ if not sessions: continue
+ session = sessions[0]
+ tmpauth = {
+ 'AuthString': session['session_id'],
+ 'AuthMethod': 'session'
+ }
+
+ DeleteSession(tmpauth)
+
+ # Check if sessions are deleted
+ sessions = GetSessions(auth, self.session_ids)
+ self.islistequal(sessions, [], 'DeleteBootState check')
+
+ if self.config.verbose:
+ utils.header("Deleted sessions: %s" % self.session_ids)
+
+ self.session_ids = []
+
+ def GenerateNodeConfFile(self):
+ pass
+
+
if __name__ == '__main__':
args = tuple(sys.argv[1:])
api_unit_test()(*args)