From da24c42e31aa334a1b7a1ada00964a20b631cf6f Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Fri, 25 Jan 2008 19:36:30 +0000 Subject: [PATCH] deleting --- qaapi/qa/modules/api/__init__.py | 0 qaapi/qa/modules/api/add_test_data.py | 168 ------ qaapi/qa/modules/api/delete_test_data.py | 24 - qaapi/qa/modules/api/sync_user_key.py | 66 --- qaapi/qa/modules/api/unit_test.py | 676 ----------------------- 5 files changed, 934 deletions(-) delete mode 100644 qaapi/qa/modules/api/__init__.py delete mode 100644 qaapi/qa/modules/api/add_test_data.py delete mode 100644 qaapi/qa/modules/api/delete_test_data.py delete mode 100644 qaapi/qa/modules/api/sync_user_key.py delete mode 100755 qaapi/qa/modules/api/unit_test.py diff --git a/qaapi/qa/modules/api/__init__.py b/qaapi/qa/modules/api/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/qaapi/qa/modules/api/add_test_data.py b/qaapi/qa/modules/api/add_test_data.py deleted file mode 100644 index 262fb43..0000000 --- a/qaapi/qa/modules/api/add_test_data.py +++ /dev/null @@ -1,168 +0,0 @@ -import os,sys -from qa.Test import Test -from qa import utils - -class add_test_data(Test): - """ - Adds the test data found in config to the plc db - """ - def call(self): - - api = self.config.api - auth = self.config.auth - - # Make sure some required fields are in config - required_fields = ['TEST_SITE_NAME', 'TEST_SITE_LOGIN_BASE', 'TEST_SLICE_NAME', 'TEST_PERSON_EMAIL'] - required_node_fields = ['TEST_NODE_TYPE', 'TEST_NODE_METHOD', 'TEST_NODE_HOSTNAME', 'TEST_NODE_IP', - 'TEST_NODE_GATEWAY', 'TEST_NODE_DNS', 'TEST_NODE_NETWORK', 'TEST_NODE_BROADCAST', - 'TEST_NODE_NETMASK'] - - for field in required_fields: - if not hasattr(self.config, field) or \ - len(getattr(self.config, field).strip()) < 1: - raise Exception, "%(field)s must be set and cannot be blank" % locals() - - # Look for node configurations - node_params = {} - for attr in dir(self.config): - if attr.find("NODE") > 0: - parts = attr.split('_') - node_prefix = parts[1] +"_"+ parts[3] - name = "_".join(parts[:3]) - value = getattr(self.config, attr) - # start a new node dictionary - if node_prefix not in node_params: - node_params[node_prefix] = {'prefix': node_prefix} - node_params[node_prefix][name] = value - - node_configs = node_params.values() - node_list = [] - - # Make sure required node fields are preset for each node config - for node_config in node_configs: - for field in required_node_fields: - if field not in node_config or len(node_config[field].strip()) < 1: - raise Exception, "%s must be set for %s and cannot be blank" % (field, node_config['prefix']) - node = {'type': node_config['TEST_NODE_TYPE'], - 'method': node_config['TEST_NODE_METHOD'], - 'hostname': node_config['TEST_NODE_HOSTNAME'], - 'ip': node_config['TEST_NODE_IP'], - 'gateway': node_config['TEST_NODE_GATEWAY'], - 'dns1': node_config['TEST_NODE_DNS'], - 'broadcast': node_config['TEST_NODE_BROADCAST'], - 'network': node_config['TEST_NODE_NETWORK'], - 'netmask': node_config['TEST_NODE_NETMASK'], - 'slice_ids': [], - 'nodenetwork_ids': []} - node_list.append(node) - - - # Define test objects - site_fields = {'name': self.config.TEST_SITE_NAME, 'login_base': self.config.TEST_SITE_LOGIN_BASE, - 'url': 'http://google.com', 'enabled': True, 'max_slices': 1000, - 'max_slivers': 1000, 'is_public': True, 'abbreviated_name': 'Test', - 'person_ids': []} - - slice_fields = {'name': self.config.TEST_SLICE_NAME, 'instantiation': 'plc-instantiated', - 'max_nodes': 1000, 'description': 'blank', 'person_ids': [], 'node_ids': []} - - person_fields = {'first_name': 'fname', 'last_name': 'lname', 'password': 'password', - 'email': self.config.TEST_PERSON_EMAIL, 'site_ids': [], 'slice_ids': []} - - - # Add Test site - sites = api.GetSites(auth, {'login_base': site_fields['login_base']}) - if not sites: - site_id = api.AddSite(auth, site_fields) - site_fields['site_id'] = site_id - site = site_fields - if self.config.verbose: - utils.header("Added test site") - else: - site = sites[0] - if self.config.verbose: - utils.header("Test site found") - - # Add Test nodes - for node_fields in node_list: - nodes = api.GetNodes(auth, [node_fields['hostname']]) - if not nodes: - node_id = api.AddNode(auth, site_fields['login_base'], node_fields) - node_fields['node_id'] = node_id - node = node_fields - nodes.append(node_fields) - if self.config.verbose: - utils.header("Added test node") - else: - node = nodes[0] - if self.config.verbose: - utils.header("Test node found") - - # Add node network - if not node['nodenetwork_ids']: - nodenetwork_id = api.AddNodeNetwork(auth, node_fields['hostname'], node_fields) - if self.config.verbose: - utils.header("Added test nodenetwork") - else: - if self.config.verbose: - utils.header("Nodenetwork found") - - # Add Test slice - slices = api.GetSlices(auth, [slice_fields['name']]) - if not slices: - slice_id = api.AddSlice(auth, slice_fields) - slice_fields['slice_id'] = slice_id - slice = slice_fields - if self.config.verbose: - utils.header("Added test slice") - else: - slice = slices[0] - if self.config.verbose: - utils.header("Test slice found") - - # Add slice to nodes - node_ids = [n['node_id'] for n in nodes] - node_ids = filter(lambda node_id: node_id not in slice['node_ids'], node_ids) - if node_ids: - api.AddSliceToNodes(auth, slice['name'], node_ids) - if self.config.verbose: - utils.header("Added test slice to test nodes") - else: - if self.config.verbose: - utils.header("Test slice found on test nodes") - - # Add test person - persons = api.GetPersons(auth, [person_fields['email']]) - if not persons: - person_id = api.AddPerson(auth, person_fields) - person_fields['person_id'] = person_id - person = person_fields - if self.config.verbose: - utils.header("Added test person") - else: - person = persons[0] - if self.config.verbose: - utils.header("Test person found") - - # Add roles to person - api.AddRoleToPerson(auth, 'user', person['email']) - - # Add person to site - if site['site_id'] not in person['site_ids']: - api.AddPersonToSite(auth, person['email'], site['login_base']) - if self.config.verbose: - utils.header("Added test person to test site") - else: - if self.config.verbose: - utils.header("Test person found on test site") - - # Add person to slice - if slice['slice_id'] not in person['slice_ids']: - api.AddPersonToSlice(auth, person_fields['email'], slice_fields['name']) - if self.config.verbose: - utils.header("Added test person to slice") - else: - if self.config.verbose: - utils.header("Test person found on test slice") - - diff --git a/qaapi/qa/modules/api/delete_test_data.py b/qaapi/qa/modules/api/delete_test_data.py deleted file mode 100644 index afa5be7..0000000 --- a/qaapi/qa/modules/api/delete_test_data.py +++ /dev/null @@ -1,24 +0,0 @@ -from qa.Test import Test -from qa import utils - - -class delete_test_data(Test): - """ - Removes the test data found in config from the plc db - """ - - def call(self): - - api = self.config.api - auth = self.config.auth - - site_login_base = self.config.TEST_SITE_LOGIN_BASE - - # Deleting the site should delete everything associated with it - api.DeleteSite(auth, site_login_base) - if self.config.verbose: - utils.header("Test data deleted") - - return 1 - - diff --git a/qaapi/qa/modules/api/sync_user_key.py b/qaapi/qa/modules/api/sync_user_key.py deleted file mode 100644 index 436b58f..0000000 --- a/qaapi/qa/modules/api/sync_user_key.py +++ /dev/null @@ -1,66 +0,0 @@ -import os, sys - -from qa import utils -from qa.Test import Test - -class sync_person_key(Test): - """ - Make sure specified users public key on file matches whats - recorded at plc. Create a public/private keypair for the - specified user if one doesnt exist already. - """ - - def make_keys(path, name): - if not os.path.isdir(path): - os.mkdir(path) - key_path = path + os.sep + name - command = "ssh-keygen -f %(key_path)s -t rsa -N ''" % locals() - (stdout, stderr) = utils.popen(command) - - def call(self, email): - api = self.config.api - auth = self.config.auth - email_parts = email.split("@") - keys_filename = email_parts[0] - keys_path = self.config.KEYS_PATH - private_key_path = keys_path + os.sep + keys_filename - public_key_path = private_key_path + ".pub" - - # Validate person - persons = api.GetPersons(auth, [email], ['person_id', 'key_ids']) - if not persons: - raise Exception, "No such person %(email)s" - person = persons[0] - - # make keys if they dont already exist - if not os.path.isfile(private_key_path) or \ - not os.path.isfile(public_key_path): - # Make new keys - self.make_keys(keys_path, keys_filename) - if self.config.verbose: - utils.header("Made new key pair %(private_key_path)s %(public_key_path)s " %\ - locals()) - - # sync public key - public_key_file = open(public_key_path, 'r') - public_key = public_key_file.readline() - - keys = api.GetKeys(auth, person['key_ids']) - if not keys: - # Add current key to db - key_fields = {'type': 'rsa', - 'key': public_key} - api.AddPersonKey(auth, person['person_id'], key_fields) - if self.config.verbose: - utils.header("Added public key in %(public_key_path)s to db" % locals() ) - else: - # keys need to be checked and possibly updated - key = keys[0] - if key['key'] != public_key: - api.UpdateKey(auth, key['key_id'], public_key) - if self.config.verbose: - utils.header("Updated plc with new public key in %(public_key_path)s " % locals()) - else: - if self.config.verbose: - utils.header("Key in %(public_key_path)s matchs public key in plc" % locals()) - diff --git a/qaapi/qa/modules/api/unit_test.py b/qaapi/qa/modules/api/unit_test.py deleted file mode 100755 index 0168764..0000000 --- a/qaapi/qa/modules/api/unit_test.py +++ /dev/null @@ -1,676 +0,0 @@ -#!/usr/bin/python -# -# Test script example -# -# Copyright (C) 2006 The Trustees of Princeton University -# -# - -from pprint import pprint -from string import letters, digits, punctuation -from traceback import print_exc -from optparse import OptionParser -import base64 -import os -import socket -import xmlrpclib -import time - -from qa import utils -from qa.Test import Test -from qa.Config import Config -from qa.logger import Logfile, log -from random import Random - -random = Random() - -config = Config() -auth = config.auth - -try: boot_states = config.api.GetBootStates(auth) -except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins'] - -try: roles = [role['name'] for role in config.api.GetRoles(auth)] -except: roles = [u'admin', u'pi', u'user', u'tech'] - -try: methods = config.api.GetNetworkMethods(auth) -except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown'] - -try:types = config.api.GetNetworkTypes(auth) -except: types = [u'ipv4'] - -def randfloat(min = 0.0, max = 1.0): - return float(min) + (random.random() * (float(max) - float(min))) - -def randint(min = 0, max = 1): - return int(randfloat(min, max + 1)) - -# See "2.2 Characters" in the XML specification: -# -# #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] -# avoiding -# [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF] - -ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD]) -ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1)) -low_xml_chars = list(ascii_xml_chars) -low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1)) -low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF)) -valid_xml_chars = list(low_xml_chars) -valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF)) -valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1)) -valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD)) - -def randstr(length, pool = valid_xml_chars, encoding = "utf-8"): - sample = random.sample(pool, min(length, len(pool))) - while True: - s = u''.join(sample) - bytes = len(s.encode(encoding)) - if bytes > length: - sample.pop() - elif bytes < length: - sample += random.sample(pool, min(length - bytes, len(pool))) - random.shuffle(sample) - else: - break - return s - -def randhostname(): - # 1. Each part begins and ends with a letter or number. - # 2. Each part except the last can contain letters, numbers, or hyphens. - # 3. Each part is between 1 and 64 characters, including the trailing dot. - # 4. At least two parts. - # 5. Last part can only contain between 2 and 6 letters. - hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \ - 'b' + randstr(61, letters + digits + '-') + '2.' + \ - 'c' + randstr(5, letters) - return hostname - -def randpath(length): - parts = [] - for i in range(randint(1, 10)): - parts.append(randstr(randint(1, 30), ascii_xml_chars)) - return os.sep.join(parts)[0:length] - -def randemail(): - return (randstr(100, letters + digits) + "@" + randhostname()).lower() - -def randkey(bits = 2048): - key_types = ["ssh-dss", "ssh-rsa"] - key_type = random.sample(key_types, 1)[0] - return ' '.join([key_type, - base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))), - randemail()]) - -def random_site(): - return { - 'name': randstr(254), - 'abbreviated_name': randstr(50), - 'login_base': randstr(20, letters).lower(), - 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0, - 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0, - } - -def random_address_type(): - return { - 'name': randstr(20), - 'description': randstr(254), - } - -def random_address(): - return { - 'line1': randstr(254), - 'line2': randstr(254), - 'line3': randstr(254), - 'city': randstr(254), - 'state': randstr(254), - 'postalcode': randstr(64), - 'country': randstr(128), - } - -def random_person(): - return { - 'first_name': randstr(128), - 'last_name': randstr(128), - 'email': randemail(), - 'bio': randstr(254), - # Accounts are disabled by default - 'enabled': False, - 'password': randstr(254), - } - -def random_key(): - return { - 'key_type': random.sample(key_types, 1)[0], - 'key': randkey() - } - -def random_slice(): - return { - 'name': site['login_base'] + "_" + randstr(11, letters).lower(), - 'url': "http://" + randhostname() + "/", - 'description': randstr(2048), - } - -def random_nodegroup(): - return { - 'name': randstr(50), - 'description': randstr(200), - } - -def random_node(): - return { - 'hostname': randhostname(), - 'boot_state': random.sample(boot_states, 1)[0], - 'model': randstr(255), - 'version': randstr(64), - } - -def random_nodenetwork(): - nodenetwork_fields = { - 'method': random.sample(methods, 1)[0], - 'type': random.sample(types, 1)[0], - 'bwlimit': randint(500000, 10000000), - } - - if method != 'dhcp': - ip = randint(0, 0xffffffff) - netmask = (0xffffffff << randint(2, 31)) & 0xffffffff - network = ip & netmask - broadcast = ((ip & netmask) | ~netmask) & 0xffffffff - gateway = randint(network + 1, broadcast - 1) - dns1 = randint(0, 0xffffffff) - - for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1': - nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field])) - - return nodenetwork_fields - -def random_pcu(): - return { - 'hostname': randhostname(), - 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))), - 'protocol': randstr(16), - 'username': randstr(254), - 'password': randstr(254), - 'notes': randstr(254), - 'model': randstr(32), - } - -def random_conf_file(): - return { - 'enabled': bool(randint()), - 'source': randpath(255), - 'dest': randpath(255), - 'file_permissions': "%#o" % randint(0, 512), - 'file_owner': randstr(32, letters + '_' + digits), - 'file_group': randstr(32, letters + '_' + digits), - 'preinstall_cmd': randpath(100), - 'postinstall_cmd': randpath(100), - 'error_cmd': randpath(100), - 'ignore_cmd_errors': bool(randint()), - 'always_update': bool(randint()), - } - -def random_attribute_type(): - return { - 'name': randstr(100), - 'description': randstr(254), - 'min_role_id': random.sample(roles.values(), 1)[0], - } - -def isequal(object_fields, expected_fields): - try: - for field in expected_fields: - assert field in object_fields - assert object_fields[field] == expected_fields[field] - except: - return False - return True - -def islistequal(list1, list2): - try: - assert set(list1) == set(list2) - except: - return False - return True - -def isunique(id, id_list): - try: - assert id not in id_list - except: - return False - return True - -class unit_test(Test): - - def call(self, - sites = 2, - nodes = 4, - address_types = 3, - addresses = 2, - persons = 10, - keys = 3 - ): - self.api = self.config.api - self.auth = self.config.auth - self.all_methods = set(self.api.system.listMethods()) - self.methods_tested = set() - self.methods_failed = set() - - try: - try: - self.site_ids = self.Sites(sites) - self.node_ids = self.Nodes(nodes) - except: - print_exc() - finally: - try: - self.cleanup() - finally: - - logfile = Logfile("api-unittest.log") - methods_ok = list(self.methods_tested.difference(self.methods_failed)) - methods_failed = list(self.methods_failed) - methods_untested = list(self.all_methods.difference(self.methods_tested)) - methods_ok.sort() - methods_failed.sort() - methods_untested.sort() - print >> logfile, "\n".join([m+": [OK]" for m in methods_ok]) - print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed]) - print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested]) - - def isequal(self, object_fields, expected_fields, method_name): - try: - for field in expected_fields: - assert field in object_fields - assert object_fields[field] == expected_fields[field] - except: - self.methods_failed.update([method_name]) - return False - return True - - def islistequal(self, list1, list2, method_name): - try: assert set(list1) == set(list2) - except: - self.methods_failed.update([method_name]) - return False - return True - - def isunique(self, id, id_list, method_name): - try: assert id not in id_list - except: - self.methods_failed.update([method_name]) - return False - return True - - def debug(self, method, method_name=None): - if method_name is None: - method_name = method._Method__name - - self.methods_tested.update([method_name]) - def wrapper(*args, **kwds): - try: - return method(*args, **kwds) - except: - self.methods_failed.update([method_name]) - return None - - return wrapper - - def cleanup(self): - #if self.person_ids: self.DeletePersons() - #if self.address_ids: self.DeleteAddresses() - #if self.address_type_ids: self.DeleteAddressTypes() - if hasattr(self, 'node_ids'): self.DeleteNodes() - if hasattr(self, 'site_ids'): self.DeleteSites() - - - def Sites(self, n=4): - site_ids = [] - for i in range(n): - # Add Site - site_fields = random_site() - AddSite = self.debug(self.api.AddSite) - site_id = AddSite(self.auth, site_fields) - if site_id is None: continue - - # Should return a unique id - self.isunique(site_id, site_ids, 'AddSite - isunique') - site_ids.append(site_id) - GetSites = self.debug(self.api.GetSites) - sites = GetSites(self.auth, [site_id]) - if sites is None: continue - site = sites[0] - self.isequal(site, site_fields, 'AddSite - isequal') - - # Update site - site_fields = random_site() - UpdateSite = self.debug(self.api.UpdateSite) - result = UpdateSite(self.auth, site_id, site_fields) - - # Check again - sites = GetSites(self.auth, [site_id]) - if sites is None: continue - site = sites[0] - self.isequal(site, site_fields, 'UpdateSite - isequal') - - sites = GetSites(self.auth, site_ids) - if sites is not None: - self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal') - - if self.config.verbose: - utils.header("Added sites: %s" % site_ids) - - return site_ids - - - def DeleteSites(self): - # Delete all sites - DeleteSite = self.debug(self.api.DeleteSite) - for site_id in self.site_ids: - result = DeleteSite(self.auth, site_id) - - # Check if sites are deleted - GetSites = self.debug(self.api.GetSites) - sites = GetSites(self.auth, self.site_ids) - self.islistequal(sites, [], 'DeleteSite - check') - - if self.config.verbose: - utils.header("Deleted sites: %s" % self.site_ids) - - self.site_ids = [] - - def Nodes(self, n=4): - node_ids = [] - for i in range(n): - # Add Node - node_fields = random_node() - site_id = random.sample(self.site_ids, 1)[0] - AddNode = self.debug(self.api.AddNode) - node_id = AddNode(self.auth, site_id, node_fields) - if node_id is None: continue - - # Should return a unique id - self.isunique(node_id, node_ids, 'AddNode - isunique') - node_ids.append(node_id) - - # Check nodes - GetNodes = self.debug(self.api.GetNodes) - nodes = GetNodes(self.auth, [node_id]) - if nodes is None: continue - node = nodes[0] - self.isequal(node, node_fields, 'AddNode - isequal') - - # Update node - node_fields = random_node() - UpdateNode = self.debug(self.api.UpdateNode) - result = UpdateNode(self.auth, node_id, node_fields) - - # Check again - nodes = GetNodes(self.auth, [node_id]) - if nodes is None: continue - node = nodes[0] - self.isequal(node, node_fields, 'UpdateNode - isequal') - - nodes = GetNodes(self.auth, node_ids) - if nodes is not None: - self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal') - - if self.config.verbose: - utils.header("Added nodes: %s" % node_ids) - - return node_ids - - def DeleteNodes(self): - DeleteNode = self.debug(self.api.DeleteNode) - for node_id in self.node_ids: - result = DeleteNode(self.auth, node_id) - - # Check if nodes are deleted - GetNodes = self.debug(self.api.GetNodes) - nodes = GetNodes(self.api, self.node_ids) - self.islistequal(nodes, [], 'DeleteNode Check') - - if self.config.verbose: - utils.header("Deleted nodes: %s" % self.node_ids) - - self.node_ids = [] - - def AddressTypes(self, n = 3): - address_type_ids = [] - for i in range(n): - address_type_fields = random_address_type() - AddAddressType = self.debug(self.api.AddAddressType) - address_type_id = AddAddressType(self.auth, address_type_fields) - if address_type_id is None: continue - - # Should return a unique address_type_id - self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique') - address_type_ids.append(address_type_id) - - # Check address type - GetAddressTypes = self.debug(self.api.GetAddressTypes) - address_types = GetAddressTypes(self.auth, [address_type_id]) - if address_types is None: continue - address_type = address_types[0] - self.isequal(address_type, address_type_fields, 'AddAddressType - isequal') - - # Update address type - address_type_fields = random_address_type() - UpdateAddressType = self.debug(self.api.UpdateAddressType) - result = UpdateAddressType(self.auth, address_type_id, address_type_fields) - if result is None: continue - - # Check address type again - address_types = GetAddressTypes(self.auth, [address_type_id]) - if address_types is None: continue - address_type = address_types[0] - self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal') - - # Check get all address types - address_types = GetAddressTypes(self.auth, address_type_ids) - if address_types is not None: - self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal') - - if self.config.verbose: - print "Added address types", address_type_ids - - return address_type_ids - - def DeleteAddressTypes(self): - - DeleteAddressType = self.debug(self.api.DeleteAddressType) - for address_type_id in self.address_type_ids: - DeleteAddressType(auth, address_type_id) - - GetAddressTypes = self.debug(self.api.GetAddressTypes) - address_types = GetAddressTypes(self.auth, self.address_type_ids) - self.islistequal(address_types, [], 'DeleteAddressType - check') - - if self.config.verbose: - utils.header("Deleted address types: " % self.address_type_ids) - - self.address_type_ids = [] - - def Addresses(self, n = 3): - address_ids = [] - for i in range(n): - address_fields = random_address() - site_id = random.sample(self.site_ids, 1)[0] - AddSiteAddress = self.debug(self.api.AddSiteAddress) - address_id = AddSiteAddress(self.auth, site_id, address_fields) - if address_id is None: continue - - # Should return a unique address_id - self.isunique(address_id, address_ids, 'AddSiteAddress - isunique') - address_ids.append(address_id) - - # Check address - GetAddresses = self.debug(self.api.GetAddresses) - addresses = GetAddresses(self.auth, [address_id]) - if addresses is None: continue - address = addresses[0] - self.isequal(address, address_fields, 'AddSiteAddress - isequal') - - # Update address - address_fields = random_address() - UpdateAddress = self.debug(self.api.UpdateAddress) - result = UpdateAddress(self.auth, address_id, address_fields) - - # Check again - addresses = GetAddresses(self.auth, [address_id]) - if addresses is None: continue - address = addresses[0] - self.isequal(address, address_fields, 'UpdateAddress - isequal') - - addresses = GetAddress(self.auth, address_ids) - if addresses is not None: - slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal') - - if self.config.verbose: - utils.header("Added addresses: %s" % self.address_ids) - - return address_ids - - def DeleteAddresses(self): - - DeleteAddress = self.debug(self.api.DeleteAddress) - # Delete site addresses - for address_id in self.address_ids: - result = DeleteAddress(self.auth, address_id) - - # Check - GetAddresses = self.debug(self.api.GetAddresses) - addresses = GetAddresses(self.api, self.address_ids) - self.islistequal(addresses, [], 'DeleteAddress - check') - if self.verbose: - print "Deleted addresses", self.address_ids - - self.address_ids = [] - - def AddPersons(self, n = 3): - - roles = GetRoles() - role_ids = [role['role_id'] for role in roles] - roles = [role['name'] for role in roles] - roles = dict(zip(roles, role_ids)) - - for i in range(n): - - # Add account - person_fields = random_person() - person_id = AddPerson(person_fields) - - # Should return a unique person_id - assert person_id not in self.person_ids - self.person_ids.append(person_id) - - if self.check: - # Check account - person = GetPersons([person_id])[0] - for field in person_fields: - if field != 'password': - assert person[field] == person_fields[field] - - # Update account - person_fields = random_person() - UpdatePerson(person_id, person_fields) - - # Check account again - person = GetPersons([person_id])[0] - for field in person_fields: - if field != 'password': - assert person[field] == person_fields[field] - - auth = {'AuthMethod': "password", - 'Username': person_fields['email'], - 'AuthString': person_fields['password']} - - if self.check: - # Check that account is disabled - try: - assert not AuthCheck(auth) - except: - pass - - # Add random set of roles - person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3)) - for person_role in person_roles: - role_id = roles[person_role] - AddRoleToPerson(role_id, person_id) - - if self.check: - person = GetPersons([person_id])[0] - assert set(person_roles) == set(person['roles']) - - # Enable account - UpdatePerson(person_id, {'enabled': True}) - - if self.check: - # Check that account is enabled - assert AuthCheck(auth) - - # Associate account with random set of sites - person_site_ids = [] - for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))): - AddPersonToSite(person_id, site_id) - person_site_ids.append(site_id) - - if self.check: - # Make sure it really did it - person = GetPersons([person_id])[0] - assert set(person_site_ids) == set(person['site_ids']) - - # Set a primary site - primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0] - SetPersonPrimarySite(person_id, primary_site_id) - - if self.check: - person = GetPersons([person_id])[0] - assert person['site_ids'][0] == primary_site_id - - if self.verbose: - print "Added users", self.person_ids - - def DeletePersons(self): - # Delete users - for person_id in self.person_ids: - # Remove from each site - for site_id in self.site_ids: - DeletePersonFromSite(person_id, site_id) - - if self.check: - person = GetPersons([person_id])[0] - assert not person['site_ids'] - - # Revoke roles - person = GetPersons([person_id])[0] - for role_id in person['role_ids']: - DeleteRoleFromPerson(role_id, person_id) - - if self.check: - person = GetPersons([person_id])[0] - assert not person['role_ids'] - - # Disable account - UpdatePerson(person_id, {'enabled': False}) - - if self.check: - person = GetPersons([person_id])[0] - assert not person['enabled'] - - # Delete account - DeletePerson(person_id) - - if self.check: - assert not GetPersons([person_id]) - - if self.check: - assert not GetPersons(self.person_ids) - - if self.verbose: - print "Deleted users", self.person_ids - - self.person_ids = [] - -- 2.43.0