From 834a68eaaebcacf94440cad5fbfe0bb2c0ac78a7 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Thu, 24 Jan 2008 18:16:13 +0000 Subject: [PATCH] move files to tests directory. now each file is callable from bash --- qaapi/qa/tests/Test.py | 76 ++++ qaapi/qa/tests/access_slice.py | 73 ++++ qaapi/qa/tests/add_test_data.py | 170 ++++++++ qaapi/qa/tests/api_unit_test.py | 679 +++++++++++++++++++++++++++++ qaapi/qa/tests/boot_node.py | 71 +++ qaapi/qa/tests/check_boot_state.py | 58 +++ qaapi/qa/tests/delete_test_data.py | 27 ++ qaapi/qa/tests/get_boot_state.py | 58 +++ qaapi/qa/tests/misc.py | 23 + qaapi/qa/tests/plc_configure.py | 51 +++ qaapi/qa/tests/plc_install.py | 60 +++ qaapi/qa/tests/plc_remote_call.py | 28 ++ qaapi/qa/tests/plc_start.py | 37 ++ qaapi/qa/tests/plc_stop.py | 36 ++ qaapi/qa/tests/plc_uninstall.py | 43 ++ qaapi/qa/tests/sync_user_key.py | 68 +++ 16 files changed, 1558 insertions(+) create mode 100644 qaapi/qa/tests/Test.py create mode 100644 qaapi/qa/tests/access_slice.py create mode 100644 qaapi/qa/tests/add_test_data.py create mode 100755 qaapi/qa/tests/api_unit_test.py create mode 100644 qaapi/qa/tests/boot_node.py create mode 100644 qaapi/qa/tests/check_boot_state.py create mode 100644 qaapi/qa/tests/delete_test_data.py create mode 100644 qaapi/qa/tests/get_boot_state.py create mode 100644 qaapi/qa/tests/misc.py create mode 100644 qaapi/qa/tests/plc_configure.py create mode 100644 qaapi/qa/tests/plc_install.py create mode 100644 qaapi/qa/tests/plc_remote_call.py create mode 100644 qaapi/qa/tests/plc_start.py create mode 100644 qaapi/qa/tests/plc_stop.py create mode 100644 qaapi/qa/tests/plc_uninstall.py create mode 100644 qaapi/qa/tests/sync_user_key.py diff --git a/qaapi/qa/tests/Test.py b/qaapi/qa/tests/Test.py new file mode 100644 index 0000000..d7ab1e3 --- /dev/null +++ b/qaapi/qa/tests/Test.py @@ -0,0 +1,76 @@ +import sys, os +from types import * +from pprint import pprint + +# Add qa root directory to the path to make +# imports easier +path = os.path.dirname(os.path.abspath(__file__)) +path_parts = path.split(os.sep) +sys.path.append(os.sep.join(path_parts[:-2])) + +from qa import utils +from qa.logger import log +from qa.Config import Config + +class Test: + """ + Base class for all QA test functions. At a minimum all tests + must define: + + call(arg1, arg2, ...): method body + """ + + accepts = [] + returns = bool + status = "current" + + def call(self, *args): + """ + Method body for test functions. Must override. + """ + return True + + def __init__(self, config = Config()): + self.name = self.__class__.__name__ + self.path=os.path.dirname(sys.argv[0]) + self.config = config + self.errors = [] + + + def __call__(self, *args, **kwds): + """ + Main entry point for test functions. logs methods + """ + + #(min_args, max_args, defaults) = self.args() + + # Check that the right number of arguments were passed in + #if len(args) < len(min_args) or len(args) > len(max_args): + # raise Exception#, (len(args), len(min_args), len(max_args)) + + result = self.call(*args, **kwds) + return result + + + def args(self): + """ + Returns a tuple: + + ((arg1_name, arg2_name, ...), + (arg1_name, arg2_name, ..., optional1_name, optional2_name, ...), + (None, None, ..., optional1_default, optional2_default, ...)) + + That represents the minimum and maximum sets of arguments that + this function accepts and the defaults for the optional arguments. + """ + + # Inspect call. Remove self from the argument list. + max_args = self.call.func_code.co_varnames[1:self.call.func_code.co_argcount] + defaults = self.call.func_defaults + if defaults is None: + defaults = () + + min_args = max_args[0:len(max_args) - len(defaults)] + defaults = tuple([None for arg in min_args]) + defaults + + return (min_args, max_args, defaults) diff --git a/qaapi/qa/tests/access_slice.py b/qaapi/qa/tests/access_slice.py new file mode 100644 index 0000000..5229520 --- /dev/null +++ b/qaapi/qa/tests/access_slice.py @@ -0,0 +1,73 @@ +import os, sys +import traceback +import time +from Test import Test +from qa import utils + +class access_slice(Test): + """ + Repeatedly attempt to use the specified users credentials to + access the specified node on the specified slice. + """ + + def call(self, email, slice_name, hostname, timeout=3): + api = self.config.api + auth = self.config.auth + email_parts = email.split("@") + keys_filename = email_parts[0] + keys_path = self.config.KEYS_PATH + private_key_path = keys_path + os.sep + keys_filename + public_key_path = private_key_path + ".pub" + + # Validate slice + slices = api.GetSlices(auth, [slice_name], ['name', 'slice_id', 'node_ids']) + if not slices: + raise Exception, "No such slice %(slice_name)s" % locals() + slice = slices[0] + + # Validate node + nodes = api.GetNodes(auth, [hostname], ['hostname', 'node_id', 'slice_ids']) + if not nodes: + raise Exception, "No such node %(hostname)s" % locals() + node = nodes[0] + if slice['slice_id'] not in node['slice_ids']: + raise Exception, "%(slice_name)s not on %(hostname)s" % locals() + + # Validate user + persons = api.GetPersons(auth, ['email'], ['person_id', 'key_ids', 'slice_ids']) + if not persons: + raise Exception, "No such person %(email)s" % locals() + person = persons[0] + if slice['slice_id'] not in person['slice_ids']: + raise Exception, "%(email)s not in slice %(slice_name)s" % locals() + + # get keys + if not os.path.isfile(private_key_path) or \ + not os.path.isfile(public_key_path): + # keys dont exist, call api.sync_user_key() + from qa.modules.api.sync_user_key import sync_user_key + sync_user_key()(email) + + # attempt to access slice + start_time = time.time() + end_time = start_time + timeout*60 + sleep = 30 + while time.time() < endtime: + if self.config.verbose: + utils.header("Trying to connect to %(slice_name)s@%(hostname)s" % locals()) + ssh_command = "ssh -i %(private_key_path)s %(slice_name)s@%(hostname)s" % locals() + host_check = os.system(ssh_command + " hostname ") + if host_check == 0: + if self.config.verbose: + utils.header("connecteed to %(slice_name)s@%(hostname)s" % locals()) + return 1 + else: + if self.config.verbose: + utils.header("failed to connect to %(slice_name)s@%(hostname)s" % locals()) + time.sleep(sleep) + + return 0 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + access_slice()(*args) diff --git a/qaapi/qa/tests/add_test_data.py b/qaapi/qa/tests/add_test_data.py new file mode 100644 index 0000000..e334e61 --- /dev/null +++ b/qaapi/qa/tests/add_test_data.py @@ -0,0 +1,170 @@ +import os,sys +from Test import Test +from qa import utils + +class add_test_data(Test): + """ + Adds the test data found in config to the plc db + """ + def call(self): + + api = self.config.api + auth = self.config.auth + + # Make sure some required fields are in config + required_fields = ['TEST_SITE_NAME', 'TEST_SITE_LOGIN_BASE', 'TEST_SLICE_NAME', 'TEST_PERSON_EMAIL'] + required_node_fields = ['TEST_NODE_TYPE', 'TEST_NODE_METHOD', 'TEST_NODE_HOSTNAME', 'TEST_NODE_IP', + 'TEST_NODE_GATEWAY', 'TEST_NODE_DNS', 'TEST_NODE_NETWORK', 'TEST_NODE_BROADCAST', + 'TEST_NODE_NETMASK'] + + for field in required_fields: + if not hasattr(self.config, field) or \ + len(getattr(self.config, field).strip()) < 1: + raise Exception, "%(field)s must be set and cannot be blank" % locals() + + # Look for node configurations + node_params = {} + for attr in dir(self.config): + if attr.find("NODE") > 0: + parts = attr.split('_') + node_prefix = parts[1] +"_"+ parts[3] + name = "_".join(parts[:3]) + value = getattr(self.config, attr) + # start a new node dictionary + if node_prefix not in node_params: + node_params[node_prefix] = {'prefix': node_prefix} + node_params[node_prefix][name] = value + + node_configs = node_params.values() + node_list = [] + + # Make sure required node fields are preset for each node config + for node_config in node_configs: + for field in required_node_fields: + if field not in node_config or len(node_config[field].strip()) < 1: + raise Exception, "%s must be set for %s and cannot be blank" % (field, node_config['prefix']) + node = {'type': node_config['TEST_NODE_TYPE'], + 'method': node_config['TEST_NODE_METHOD'], + 'hostname': node_config['TEST_NODE_HOSTNAME'], + 'ip': node_config['TEST_NODE_IP'], + 'gateway': node_config['TEST_NODE_GATEWAY'], + 'dns1': node_config['TEST_NODE_DNS'], + 'broadcast': node_config['TEST_NODE_BROADCAST'], + 'network': node_config['TEST_NODE_NETWORK'], + 'netmask': node_config['TEST_NODE_NETMASK'], + 'slice_ids': [], + 'nodenetwork_ids': []} + node_list.append(node) + + + # Define test objects + site_fields = {'name': self.config.TEST_SITE_NAME, 'login_base': self.config.TEST_SITE_LOGIN_BASE, + 'url': 'http://google.com', 'enabled': True, 'max_slices': 1000, + 'max_slivers': 1000, 'is_public': True, 'abbreviated_name': 'Test', + 'person_ids': []} + + slice_fields = {'name': self.config.TEST_SLICE_NAME, 'instantiation': 'plc-instantiated', + 'max_nodes': 1000, 'description': 'blank', 'person_ids': [], 'node_ids': []} + + person_fields = {'first_name': 'fname', 'last_name': 'lname', 'password': 'password', + 'email': self.config.TEST_PERSON_EMAIL, 'site_ids': [], 'slice_ids': []} + + + # Add Test site + sites = api.GetSites(auth, {'login_base': site_fields['login_base']}) + if not sites: + site_id = api.AddSite(auth, site_fields) + site_fields['site_id'] = site_id + site = site_fields + if self.config.verbose: + utils.header("Added test site") + else: + site = sites[0] + if self.config.verbose: + utils.header("Test site found") + + # Add Test nodes + for node_fields in node_list: + nodes = api.GetNodes(auth, [node_fields['hostname']]) + if not nodes: + node_id = api.AddNode(auth, site_fields['login_base'], node_fields) + node_fields['node_id'] = node_id + node = node_fields + nodes.append(node_fields) + if self.config.verbose: + utils.header("Added test node") + else: + node = nodes[0] + if self.config.verbose: + utils.header("Test node found") + + # Add node network + if not node['nodenetwork_ids']: + nodenetwork_id = api.AddNodeNetwork(auth, node_fields['hostname'], node_fields) + if self.config.verbose: + utils.header("Added test nodenetwork") + else: + if self.config.verbose: + utils.header("Nodenetwork found") + + # Add Test slice + slices = api.GetSlices(auth, [slice_fields['name']]) + if not slices: + slice_id = api.AddSlice(auth, slice_fields) + slice_fields['slice_id'] = slice_id + slice = slice_fields + if self.config.verbose: + utils.header("Added test slice") + else: + slice = slices[0] + if self.config.verbose: + utils.header("Test slice found") + + # Add slice to nodes + node_ids = [n['node_id'] for n in nodes] + node_ids = filter(lambda node_id: node_id not in slice['node_ids'], node_ids) + if node_ids: + api.AddSliceToNodes(auth, slice['name'], node_ids) + if self.config.verbose: + utils.header("Added test slice to test nodes") + else: + if self.config.verbose: + utils.header("Test slice found on test nodes") + + # Add test person + persons = api.GetPersons(auth, [person_fields['email']]) + if not persons: + person_id = api.AddPerson(auth, person_fields) + person_fields['person_id'] = person_id + person = person_fields + if self.config.verbose: + utils.header("Added test person") + else: + person = persons[0] + if self.config.verbose: + utils.header("Test person found") + + # Add roles to person + api.AddRoleToPerson(auth, 'user', person['email']) + + # Add person to site + if site['site_id'] not in person['site_ids']: + api.AddPersonToSite(auth, person['email'], site['login_base']) + if self.config.verbose: + utils.header("Added test person to test site") + else: + if self.config.verbose: + utils.header("Test person found on test site") + + # Add person to slice + if slice['slice_id'] not in person['slice_ids']: + api.AddPersonToSlice(auth, person_fields['email'], slice_fields['name']) + if self.config.verbose: + utils.header("Added test person to slice") + else: + if self.config.verbose: + utils.header("Test person found on test slice") + +if __name__ == '__main__': + args = typle(sys.argv[1:]) + add_test_data()(*args) diff --git a/qaapi/qa/tests/api_unit_test.py b/qaapi/qa/tests/api_unit_test.py new file mode 100755 index 0000000..7bc181d --- /dev/null +++ b/qaapi/qa/tests/api_unit_test.py @@ -0,0 +1,679 @@ +#!/usr/bin/python +# +# Test script example +# +# Copyright (C) 2006 The Trustees of Princeton University +# +# + +from pprint import pprint +from string import letters, digits, punctuation +from traceback import print_exc +import base64 +import os, sys +import socket +import xmlrpclib +import time + +from Test import Test +from qa import utils +from qa.Config import Config +from qa.logger import Logfile, log +from random import Random + +random = Random() + +config = Config() +auth = config.auth + +try: boot_states = config.api.GetBootStates(auth) +except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins'] + +try: roles = [role['name'] for role in config.api.GetRoles(auth)] +except: roles = [u'admin', u'pi', u'user', u'tech'] + +try: methods = config.api.GetNetworkMethods(auth) +except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown'] + +try:types = config.api.GetNetworkTypes(auth) +except: types = [u'ipv4'] + +def randfloat(min = 0.0, max = 1.0): + return float(min) + (random.random() * (float(max) - float(min))) + +def randint(min = 0, max = 1): + return int(randfloat(min, max + 1)) + +# See "2.2 Characters" in the XML specification: +# +# #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] +# avoiding +# [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF] + +ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD]) +ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1)) +low_xml_chars = list(ascii_xml_chars) +low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1)) +low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF)) +valid_xml_chars = list(low_xml_chars) +valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF)) +valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1)) +valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD)) + +def randstr(length, pool = valid_xml_chars, encoding = "utf-8"): + sample = random.sample(pool, min(length, len(pool))) + while True: + s = u''.join(sample) + bytes = len(s.encode(encoding)) + if bytes > length: + sample.pop() + elif bytes < length: + sample += random.sample(pool, min(length - bytes, len(pool))) + random.shuffle(sample) + else: + break + return s + +def randhostname(): + # 1. Each part begins and ends with a letter or number. + # 2. Each part except the last can contain letters, numbers, or hyphens. + # 3. Each part is between 1 and 64 characters, including the trailing dot. + # 4. At least two parts. + # 5. Last part can only contain between 2 and 6 letters. + hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \ + 'b' + randstr(61, letters + digits + '-') + '2.' + \ + 'c' + randstr(5, letters) + return hostname + +def randpath(length): + parts = [] + for i in range(randint(1, 10)): + parts.append(randstr(randint(1, 30), ascii_xml_chars)) + return os.sep.join(parts)[0:length] + +def randemail(): + return (randstr(100, letters + digits) + "@" + randhostname()).lower() + +def randkey(bits = 2048): + key_types = ["ssh-dss", "ssh-rsa"] + key_type = random.sample(key_types, 1)[0] + return ' '.join([key_type, + base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))), + randemail()]) + +def random_site(): + return { + 'name': randstr(254), + 'abbreviated_name': randstr(50), + 'login_base': randstr(20, letters).lower(), + 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0, + 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0, + } + +def random_address_type(): + return { + 'name': randstr(20), + 'description': randstr(254), + } + +def random_address(): + return { + 'line1': randstr(254), + 'line2': randstr(254), + 'line3': randstr(254), + 'city': randstr(254), + 'state': randstr(254), + 'postalcode': randstr(64), + 'country': randstr(128), + } + +def random_person(): + return { + 'first_name': randstr(128), + 'last_name': randstr(128), + 'email': randemail(), + 'bio': randstr(254), + # Accounts are disabled by default + 'enabled': False, + 'password': randstr(254), + } + +def random_key(): + return { + 'key_type': random.sample(key_types, 1)[0], + 'key': randkey() + } + +def random_slice(): + return { + 'name': site['login_base'] + "_" + randstr(11, letters).lower(), + 'url': "http://" + randhostname() + "/", + 'description': randstr(2048), + } + +def random_nodegroup(): + return { + 'name': randstr(50), + 'description': randstr(200), + } + +def random_node(): + return { + 'hostname': randhostname(), + 'boot_state': random.sample(boot_states, 1)[0], + 'model': randstr(255), + 'version': randstr(64), + } + +def random_nodenetwork(): + nodenetwork_fields = { + 'method': random.sample(methods, 1)[0], + 'type': random.sample(types, 1)[0], + 'bwlimit': randint(500000, 10000000), + } + + if method != 'dhcp': + ip = randint(0, 0xffffffff) + netmask = (0xffffffff << randint(2, 31)) & 0xffffffff + network = ip & netmask + broadcast = ((ip & netmask) | ~netmask) & 0xffffffff + gateway = randint(network + 1, broadcast - 1) + dns1 = randint(0, 0xffffffff) + + for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1': + nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field])) + + return nodenetwork_fields + +def random_pcu(): + return { + 'hostname': randhostname(), + 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))), + 'protocol': randstr(16), + 'username': randstr(254), + 'password': randstr(254), + 'notes': randstr(254), + 'model': randstr(32), + } + +def random_conf_file(): + return { + 'enabled': bool(randint()), + 'source': randpath(255), + 'dest': randpath(255), + 'file_permissions': "%#o" % randint(0, 512), + 'file_owner': randstr(32, letters + '_' + digits), + 'file_group': randstr(32, letters + '_' + digits), + 'preinstall_cmd': randpath(100), + 'postinstall_cmd': randpath(100), + 'error_cmd': randpath(100), + 'ignore_cmd_errors': bool(randint()), + 'always_update': bool(randint()), + } + +def random_attribute_type(): + return { + 'name': randstr(100), + 'description': randstr(254), + 'min_role_id': random.sample(roles.values(), 1)[0], + } + +def isequal(object_fields, expected_fields): + try: + for field in expected_fields: + assert field in object_fields + assert object_fields[field] == expected_fields[field] + except: + return False + return True + +def islistequal(list1, list2): + try: + assert set(list1) == set(list2) + except: + return False + return True + +def isunique(id, id_list): + try: + assert id not in id_list + except: + return False + return True + +class api_unit_test(Test): + + def call(self, + sites = 2, + nodes = 4, + address_types = 3, + addresses = 2, + persons = 10, + keys = 3 + ): + self.api = self.config.api + self.auth = self.config.auth + self.all_methods = set(self.api.system.listMethods()) + self.methods_tested = set() + self.methods_failed = set() + + try: + try: + self.site_ids = self.Sites(sites) + self.node_ids = self.Nodes(nodes) + except: + print_exc() + finally: + try: + self.cleanup() + finally: + + logfile = Logfile("api-unittest.log") + methods_ok = list(self.methods_tested.difference(self.methods_failed)) + methods_failed = list(self.methods_failed) + methods_untested = list(self.all_methods.difference(self.methods_tested)) + methods_ok.sort() + methods_failed.sort() + methods_untested.sort() + print >> logfile, "\n".join([m+": [OK]" for m in methods_ok]) + print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed]) + print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested]) + + def isequal(self, object_fields, expected_fields, method_name): + try: + for field in expected_fields: + assert field in object_fields + assert object_fields[field] == expected_fields[field] + except: + self.methods_failed.update([method_name]) + return False + return True + + def islistequal(self, list1, list2, method_name): + try: assert set(list1) == set(list2) + except: + self.methods_failed.update([method_name]) + return False + return True + + def isunique(self, id, id_list, method_name): + try: assert id not in id_list + except: + self.methods_failed.update([method_name]) + return False + return True + + def debug(self, method, method_name=None): + if method_name is None: + method_name = method._Method__name + + self.methods_tested.update([method_name]) + def wrapper(*args, **kwds): + try: + return method(*args, **kwds) + except: + self.methods_failed.update([method_name]) + return None + + return wrapper + + def cleanup(self): + #if self.person_ids: self.DeletePersons() + #if self.address_ids: self.DeleteAddresses() + #if self.address_type_ids: self.DeleteAddressTypes() + if hasattr(self, 'node_ids'): self.DeleteNodes() + if hasattr(self, 'site_ids'): self.DeleteSites() + + + def Sites(self, n=4): + site_ids = [] + for i in range(n): + # Add Site + site_fields = random_site() + AddSite = self.debug(self.api.AddSite) + site_id = AddSite(self.auth, site_fields) + if site_id is None: continue + + # Should return a unique id + self.isunique(site_id, site_ids, 'AddSite - isunique') + site_ids.append(site_id) + GetSites = self.debug(self.api.GetSites) + sites = GetSites(self.auth, [site_id]) + if sites is None: continue + site = sites[0] + self.isequal(site, site_fields, 'AddSite - isequal') + + # Update site + site_fields = random_site() + UpdateSite = self.debug(self.api.UpdateSite) + result = UpdateSite(self.auth, site_id, site_fields) + + # Check again + sites = GetSites(self.auth, [site_id]) + if sites is None: continue + site = sites[0] + self.isequal(site, site_fields, 'UpdateSite - isequal') + + sites = GetSites(self.auth, site_ids) + if sites is not None: + self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal') + + if self.config.verbose: + utils.header("Added sites: %s" % site_ids) + + return site_ids + + + def DeleteSites(self): + # Delete all sites + DeleteSite = self.debug(self.api.DeleteSite) + for site_id in self.site_ids: + result = DeleteSite(self.auth, site_id) + + # Check if sites are deleted + GetSites = self.debug(self.api.GetSites) + sites = GetSites(self.auth, self.site_ids) + self.islistequal(sites, [], 'DeleteSite - check') + + if self.config.verbose: + utils.header("Deleted sites: %s" % self.site_ids) + + self.site_ids = [] + + def Nodes(self, n=4): + node_ids = [] + for i in range(n): + # Add Node + node_fields = random_node() + site_id = random.sample(self.site_ids, 1)[0] + AddNode = self.debug(self.api.AddNode) + node_id = AddNode(self.auth, site_id, node_fields) + if node_id is None: continue + + # Should return a unique id + self.isunique(node_id, node_ids, 'AddNode - isunique') + node_ids.append(node_id) + + # Check nodes + GetNodes = self.debug(self.api.GetNodes) + nodes = GetNodes(self.auth, [node_id]) + if nodes is None: continue + node = nodes[0] + self.isequal(node, node_fields, 'AddNode - isequal') + + # Update node + node_fields = random_node() + UpdateNode = self.debug(self.api.UpdateNode) + result = UpdateNode(self.auth, node_id, node_fields) + + # Check again + nodes = GetNodes(self.auth, [node_id]) + if nodes is None: continue + node = nodes[0] + self.isequal(node, node_fields, 'UpdateNode - isequal') + + nodes = GetNodes(self.auth, node_ids) + if nodes is not None: + self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal') + + if self.config.verbose: + utils.header("Added nodes: %s" % node_ids) + + return node_ids + + def DeleteNodes(self): + DeleteNode = self.debug(self.api.DeleteNode) + for node_id in self.node_ids: + result = DeleteNode(self.auth, node_id) + + # Check if nodes are deleted + GetNodes = self.debug(self.api.GetNodes) + nodes = GetNodes(self.api, self.node_ids) + self.islistequal(nodes, [], 'DeleteNode Check') + + if self.config.verbose: + utils.header("Deleted nodes: %s" % self.node_ids) + + self.node_ids = [] + + def AddressTypes(self, n = 3): + address_type_ids = [] + for i in range(n): + address_type_fields = random_address_type() + AddAddressType = self.debug(self.api.AddAddressType) + address_type_id = AddAddressType(self.auth, address_type_fields) + if address_type_id is None: continue + + # Should return a unique address_type_id + self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique') + address_type_ids.append(address_type_id) + + # Check address type + GetAddressTypes = self.debug(self.api.GetAddressTypes) + address_types = GetAddressTypes(self.auth, [address_type_id]) + if address_types is None: continue + address_type = address_types[0] + self.isequal(address_type, address_type_fields, 'AddAddressType - isequal') + + # Update address type + address_type_fields = random_address_type() + UpdateAddressType = self.debug(self.api.UpdateAddressType) + result = UpdateAddressType(self.auth, address_type_id, address_type_fields) + if result is None: continue + + # Check address type again + address_types = GetAddressTypes(self.auth, [address_type_id]) + if address_types is None: continue + address_type = address_types[0] + self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal') + + # Check get all address types + address_types = GetAddressTypes(self.auth, address_type_ids) + if address_types is not None: + self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal') + + if self.config.verbose: + print "Added address types", address_type_ids + + return address_type_ids + + def DeleteAddressTypes(self): + + DeleteAddressType = self.debug(self.api.DeleteAddressType) + for address_type_id in self.address_type_ids: + DeleteAddressType(auth, address_type_id) + + GetAddressTypes = self.debug(self.api.GetAddressTypes) + address_types = GetAddressTypes(self.auth, self.address_type_ids) + self.islistequal(address_types, [], 'DeleteAddressType - check') + + if self.config.verbose: + utils.header("Deleted address types: " % self.address_type_ids) + + self.address_type_ids = [] + + def Addresses(self, n = 3): + address_ids = [] + for i in range(n): + address_fields = random_address() + site_id = random.sample(self.site_ids, 1)[0] + AddSiteAddress = self.debug(self.api.AddSiteAddress) + address_id = AddSiteAddress(self.auth, site_id, address_fields) + if address_id is None: continue + + # Should return a unique address_id + self.isunique(address_id, address_ids, 'AddSiteAddress - isunique') + address_ids.append(address_id) + + # Check address + GetAddresses = self.debug(self.api.GetAddresses) + addresses = GetAddresses(self.auth, [address_id]) + if addresses is None: continue + address = addresses[0] + self.isequal(address, address_fields, 'AddSiteAddress - isequal') + + # Update address + address_fields = random_address() + UpdateAddress = self.debug(self.api.UpdateAddress) + result = UpdateAddress(self.auth, address_id, address_fields) + + # Check again + addresses = GetAddresses(self.auth, [address_id]) + if addresses is None: continue + address = addresses[0] + self.isequal(address, address_fields, 'UpdateAddress - isequal') + + addresses = GetAddress(self.auth, address_ids) + if addresses is not None: + slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal') + + if self.config.verbose: + utils.header("Added addresses: %s" % self.address_ids) + + return address_ids + + def DeleteAddresses(self): + + DeleteAddress = self.debug(self.api.DeleteAddress) + # Delete site addresses + for address_id in self.address_ids: + result = DeleteAddress(self.auth, address_id) + + # Check + GetAddresses = self.debug(self.api.GetAddresses) + addresses = GetAddresses(self.api, self.address_ids) + self.islistequal(addresses, [], 'DeleteAddress - check') + if self.verbose: + print "Deleted addresses", self.address_ids + + self.address_ids = [] + + def AddPersons(self, n = 3): + + roles = GetRoles() + role_ids = [role['role_id'] for role in roles] + roles = [role['name'] for role in roles] + roles = dict(zip(roles, role_ids)) + + for i in range(n): + + # Add account + person_fields = random_person() + person_id = AddPerson(person_fields) + + # Should return a unique person_id + assert person_id not in self.person_ids + self.person_ids.append(person_id) + + if self.check: + # Check account + person = GetPersons([person_id])[0] + for field in person_fields: + if field != 'password': + assert person[field] == person_fields[field] + + # Update account + person_fields = random_person() + UpdatePerson(person_id, person_fields) + + # Check account again + person = GetPersons([person_id])[0] + for field in person_fields: + if field != 'password': + assert person[field] == person_fields[field] + + auth = {'AuthMethod': "password", + 'Username': person_fields['email'], + 'AuthString': person_fields['password']} + + if self.check: + # Check that account is disabled + try: + assert not AuthCheck(auth) + except: + pass + + # Add random set of roles + person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3)) + for person_role in person_roles: + role_id = roles[person_role] + AddRoleToPerson(role_id, person_id) + + if self.check: + person = GetPersons([person_id])[0] + assert set(person_roles) == set(person['roles']) + + # Enable account + UpdatePerson(person_id, {'enabled': True}) + + if self.check: + # Check that account is enabled + assert AuthCheck(auth) + + # Associate account with random set of sites + person_site_ids = [] + for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))): + AddPersonToSite(person_id, site_id) + person_site_ids.append(site_id) + + if self.check: + # Make sure it really did it + person = GetPersons([person_id])[0] + assert set(person_site_ids) == set(person['site_ids']) + + # Set a primary site + primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0] + SetPersonPrimarySite(person_id, primary_site_id) + + if self.check: + person = GetPersons([person_id])[0] + assert person['site_ids'][0] == primary_site_id + + if self.verbose: + print "Added users", self.person_ids + + def DeletePersons(self): + # Delete users + for person_id in self.person_ids: + # Remove from each site + for site_id in self.site_ids: + DeletePersonFromSite(person_id, site_id) + + if self.check: + person = GetPersons([person_id])[0] + assert not person['site_ids'] + + # Revoke roles + person = GetPersons([person_id])[0] + for role_id in person['role_ids']: + DeleteRoleFromPerson(role_id, person_id) + + if self.check: + person = GetPersons([person_id])[0] + assert not person['role_ids'] + + # Disable account + UpdatePerson(person_id, {'enabled': False}) + + if self.check: + person = GetPersons([person_id])[0] + assert not person['enabled'] + + # Delete account + DeletePerson(person_id) + + if self.check: + assert not GetPersons([person_id]) + + if self.check: + assert not GetPersons(self.person_ids) + + if self.verbose: + print "Deleted users", self.person_ids + + self.person_ids = [] + + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + api_unit_test()(*args) diff --git a/qaapi/qa/tests/boot_node.py b/qaapi/qa/tests/boot_node.py new file mode 100644 index 0000000..8e7c6c4 --- /dev/null +++ b/qaapi/qa/tests/boot_node.py @@ -0,0 +1,71 @@ +import os,sys +import base64 +from Test import Test +from qa import utils + +image_types = ['node-iso', 'node-usb', 'generic-iso', 'generic-usb'] + +class boot_node(Test): + """ + Attempts to boot the specified node using qemu. + """ + + def call(self, hostname, image_type = 'node-iso', disk_size="4G"): + api = self.config.api + auth = self.config.auth + tdir = "/tmp/" + + # validate hostname + nodes = api.GetNodes(auth, [hostname], ['hostname']) + if not nodes: + raise Exception, "No such node %(hostname)s" % locals() + + bootimage = api.GetBootMedium(auth, hostname, image_type, '') + bootimage_path = '/%(tdir)s/%(hostname)s-bootcd.iso' % locals() + + if self.config.verbose: + utils.header("Creating bootcd for %(hostname)s at %(bootimage_path)s" % locals()) + # Create a temporary bootcd file + file = open(bootimage_path, 'w') + file.write(base64.b64decode(bootimage)) + file.close() + + # Create a temporary disk image + diskimage_path = "/%(tdir)s/%(hostname)s-hda.img" % locals() + qemu_img_cmd = "qemu-img create -f qcow2 %(diskimage_path)s %(disk_size)s" % locals() + (stdin, stdout, stderr) = os.popen3(qemu_img_cmd) + self.errors = stderr.readlines() + if self.errors: + raise Exception, "Unable to create disk image\n" + \ + "\n".join(self.errors) + + if self.config.verbose: + utils.header("Booting %(hostname)s" % locals()) + # Attempt to boot this node image + bootcmd = "qemu -hda %(diskimage_path)s -cdrom %(bootimage_path)s -smp 1 -m 256 -monitor stdio" % \ + locals() + (stdin, stdout, stderr) = os.popen3(bootcmd) + self.errors = stderr.readlines() + if self.errors: + raise Exception, "Unable to boot node image\n" + \ + "\n".join(self.errors) + + return 1 + + def get_image_medium(self, hostname, image_type, path): + api = self.config.api + auth = self.config.auth + + file = open(path, 'w') + if image-type in ['node-floppy', 'node-iso', 'node-usb', 'generic-iso', 'generic-usb']: + image = api.GetBootMedium(auth, hostname, image_type, '') + image = base64.b64.decode(image) + + file.write(image) + file.close() + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + boot_node()(*args) + + diff --git a/qaapi/qa/tests/check_boot_state.py b/qaapi/qa/tests/check_boot_state.py new file mode 100644 index 0000000..bb7f9ec --- /dev/null +++ b/qaapi/qa/tests/check_boot_state.py @@ -0,0 +1,58 @@ +import os, sys +import time +from qa import utils +from Test import Test + +class check_boot_state(Test): + """ + Continually checks the boot_state of the specified node until + either the node reaches boot, the node reaches debug or the + timeout is reached. + + Timeout represents the ammout of time (in minutes) we should + continue trying before quitting. + + Sleep represnet the ammount of time (in seconds) to wait in + between checks. + + Returns the boot state of the node. + """ + def call(self, hostname, timeout = 5, sleep = 30): + exit = False + api = self.config.api + auth = self.config.auth + + # Validate hostname + nodes = api.GetNodes(auth, [hostname], ['hostname']) + if not nodes: + raise Exception, "No such hostname %(hostname)s" % locals() + + start_time = time.time() + end_time = start_time + (timeout * 60) + + while not exit: + nodes = api.GetNodes(auth, [hostname], ['boot_state']) + node = nodes[0] + boot_state = node['boot_state'] + if self.config.verbose: + utils.header("%(hostname)s boot_state is %(boot_state)s" % locals()) + + if boot_state in ['boot', 'debug']: + exit = True + elif time.time() < end_time: + time.sleep(sleep) + else: + exit = True + + + if self.config.verbose: + if boot_state in ['boot']: + utils.header("%(hostname)s correctly installed and booted" % locals()) + else: + utils.header("%(hostname)s not fully booted" % locals()) + + return boot_state + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + check_boot_state()(*args) diff --git a/qaapi/qa/tests/delete_test_data.py b/qaapi/qa/tests/delete_test_data.py new file mode 100644 index 0000000..67e16fd --- /dev/null +++ b/qaapi/qa/tests/delete_test_data.py @@ -0,0 +1,27 @@ +import os, sys +from Test import Test +from qa import utils + + +class delete_test_data(Test): + """ + Removes the test data found in config from the plc db + """ + + def call(self): + + api = self.config.api + auth = self.config.auth + + site_login_base = self.config.TEST_SITE_LOGIN_BASE + + # Deleting the site should delete everything associated with it + api.DeleteSite(auth, site_login_base) + if self.config.verbose: + utils.header("Test data deleted") + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + delete_test_data(*args) diff --git a/qaapi/qa/tests/get_boot_state.py b/qaapi/qa/tests/get_boot_state.py new file mode 100644 index 0000000..95dd7c3 --- /dev/null +++ b/qaapi/qa/tests/get_boot_state.py @@ -0,0 +1,58 @@ +import os, sys +import time +from qa import utils +from Test import Test + +class get_boot_state(Test): + """ + Continually checks the boot_state of the specified node until + either the node reaches boot, the node reaches debug or the + timeout is reached. + + Timeout represents the ammout of time (in minutes) we should + continue trying before quitting. + + Sleep represnet the ammount of time (in seconds) to wait in + between checks. + + Returns the boot state of the node. + """ + def call(self, hostname, timeout = 5, sleep = 30): + exit = False + api = self.config.api + auth = self.config.auth + + # Validate hostname + nodes = api.GetNodes(auth, [hostname], ['hostname']) + if not nodes: + raise Exception, "No such hostname %(hostname)s" % locals() + + start_time = time.time() + end_time = start_time + (timeout * 60) + + while not exit: + nodes = api.GetNodes(auth, [hostname], ['boot_state']) + node = nodes[0] + boot_state = node['boot_state'] + if self.config.verbose: + utils.header("%(hostname)s boot_state is %(boot_state)s" % locals()) + + if boot_state in ['boot', 'debug']: + exit = True + elif time.time() < end_time: + time.sleep(sleep) + else: + exit = True + + + if self.config.verbose: + if boot_state in ['boot']: + utils.header("%(hostname)s correctly installed and booted" % locals()) + else: + utils.header("%(hostname)s not fully booted" % locals()) + + return boot_state + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + get_boot_state()(*args) diff --git a/qaapi/qa/tests/misc.py b/qaapi/qa/tests/misc.py new file mode 100644 index 0000000..6a7b3cb --- /dev/null +++ b/qaapi/qa/tests/misc.py @@ -0,0 +1,23 @@ +import os, sys +import traceback +from new import classobj +from Test import Test + + +class AddPerson(Test): pass + +class AddSite(Test): pass + +class GetNodes(Test): + + def call(self, hostname): + return self.config.api.GetNodes(self.config.auth, hostname) + +for name in ['Get', 'Update' ,'Delete']: + tc = classobj(name, (Test,), {}) + setattr(tc, 'call', lambda x: 1) + globals()[name] = tc + del(tc) + + + diff --git a/qaapi/qa/tests/plc_configure.py b/qaapi/qa/tests/plc_configure.py new file mode 100644 index 0000000..4cc1395 --- /dev/null +++ b/qaapi/qa/tests/plc_configure.py @@ -0,0 +1,51 @@ + +import os, sys +import traceback +from Test import Test +from qa import utils + +class plc_configure(Test): + """ + Configure the myplc from config options in config file + """ + + def call(self, system_type, root_dir): + tmpname = '/tmp/plc-cinfig-tty-%d' % os.getpid() + fileconf = open(tmpname, 'w') + for var in [ 'PLC_NAME', + 'PLC_ROOT_PASSWORD', + 'PLC_ROOT_USER', + 'PLC_MAIL_ENABLED', + 'PLC_MAIL_SUPPORT_ADDRESS', + 'PLC_DB_HOST', + 'PLC_API_HOST', + 'PLC_WWW_HOST', + 'PLC_BOOT_HOST', + 'PLC_NET_DNS1', + 'PLC_NET_DNS2']: + fileconf.write('e %s\n%s\n' % (var, getattr(self.config, var))) + fileconf.write('w\nq\n') + fileconf.close() + + mount_command = "/sbin/service plc mount" + full_command = "" + if system_type in ['vserv', 'vserver']: + full_command += " vserver %(root_dir)s exec " % locals() + elif system_type in ['chroot']: + full_command += " chroot %(root_dir)s " % locals() + else: + raise Exception, "Invalid system type %(sytem_type)s" % locals() + + full_command += " plc-config-tty < %(tmpname)s" % locals() + commands = [mount_command, full_command] + for command in commands: + if self.config.verbose: + utils.header(command) + (stdout, stderr) = utils.popen(command) + (stdout, stderr) = utils.popen("rm %s" % tmpname) + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_configure()(*args) diff --git a/qaapi/qa/tests/plc_install.py b/qaapi/qa/tests/plc_install.py new file mode 100644 index 0000000..caf0336 --- /dev/null +++ b/qaapi/qa/tests/plc_install.py @@ -0,0 +1,60 @@ + +import os, sys +import traceback +from qa import utils +from Test import Test + +class plc_install(Test): + """ + Installs a myplc + """ + + def call(self, system_type, root_dir, url=None): + + url_path = self.config.path + # Determine url + if not url: + try: + + url_file = open("%s/URL" % url_path) + url = url_file.read().strip() + url_file.close() + except IOError: + pass + if not url: + print "URL not specified" + sys.exit(1) + + # Save url + if self.config.verbose: + utils.header('Saving current myplc url into %s/URL' % url_path) + fsave=open('%s/URL' % url_path, "w") + fsave.write(url+'\n') + fsave.close() + + # Instal myplc from url + if self.config.verbose: + utils.header('Installing myplc from url %s' % url) + + # build command + full_command = "" + install_command = " rpm -Uvh %(url)s " + if system_type in ['vserv', 'vserver']: + full_command += " vserver %(root_dir)s exec " + elif system_type in ['chroot']: + pass + else: + raise Exception, "Invalid system type %(system_type)s" % locals() + + full_command += install_command % locals() + try: (stdout, stderr) = utils.popen(full_command) + except: (stdout, stderr) = utils.popen("yum localupdate %(url)s") + + if self.config.verbose: + utils.header("\n".join(stdout)) + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_install()(*args) diff --git a/qaapi/qa/tests/plc_remote_call.py b/qaapi/qa/tests/plc_remote_call.py new file mode 100644 index 0000000..f6fe3af --- /dev/null +++ b/qaapi/qa/tests/plc_remote_call.py @@ -0,0 +1,28 @@ +import os, sys +from Test import Test +from qa import utils + +class plc_remote_call(Test): + """ + Attempt to connect to a node using the plc root key and + issue a command. + """ + + def call(self, root_key_path, hostname, command): + if not os.path.isfile(root_key_path): + raise Exception, "no such private key file %(root_key_path)s" % locals() + + full_command = "ssh -i %(root_key_path)s root@%(hostname)s %(command)s" % locals() + if self.config.verbose: + utils.header(full_command) + (stdout, stderr) = utils.popen(full_command) + + if self.config.verbose: + utils.header("\n".join(stdout)) + + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_remote_call()(*args) diff --git a/qaapi/qa/tests/plc_start.py b/qaapi/qa/tests/plc_start.py new file mode 100644 index 0000000..a55e7e7 --- /dev/null +++ b/qaapi/qa/tests/plc_start.py @@ -0,0 +1,37 @@ +import traceback +import sys +from Test import Test +from qa import utils + +class plc_start(Test): + """ + Starts the myplc service + """ + + def call(self, system_type, root_dir): + + start_command = " /sbin/service plc start " + full_command = "" + + if system_type in ['vserv', 'vserver']: + full_command += " vserver %(root_dir)s exec " + elif system_type in ['chroot']: + pass + else: + raise Exception, "Invalid system type %(system_type)s" % locals() + + full_command += start_command % locals() + + if self.config.verbose: + utils.header(full_command) + + (stdout, stderr) = utils.popen(full_command) + + if self.config.verbose: + utils.header("".join(stdout)) + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_start()(*args) diff --git a/qaapi/qa/tests/plc_stop.py b/qaapi/qa/tests/plc_stop.py new file mode 100644 index 0000000..a3b7a2f --- /dev/null +++ b/qaapi/qa/tests/plc_stop.py @@ -0,0 +1,36 @@ +import os, sys +import traceback +from Test import Test +from qa import utils + +class plc_stop(Test): + """ + Installs a myplc + """ + + def call(self, system_type, root_dir): + + stop_command = " /sbin/service plc stop " + full_command = "" + if system_type in ['vserv', 'vserver']: + full_command += " vserver %(root_dir)s exec " + elif system_type in ['chroot']: + pass + else: + raise Exception, "Invalid system type %(system_type)s" % locals() + + full_command += stop_command % locals() + + if self.config.verbose: + utils.header(full_command) + + (stdout, stderr) = utils.popen(full_command) + + if self.config.verbose: + utils.header("\n".join(stdout)) + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_stop()(*args) diff --git a/qaapi/qa/tests/plc_uninstall.py b/qaapi/qa/tests/plc_uninstall.py new file mode 100644 index 0000000..b623937 --- /dev/null +++ b/qaapi/qa/tests/plc_uninstall.py @@ -0,0 +1,43 @@ +import os, sys +import traceback +from Test import Test +from qa import utils + +class plc_uninstall(Test): + """ + Completely removes the installed myplc + """ + + def call(self, system_type, root_dir): + + remove_command = " rpm -e myplc " + full_command = "" + + if system_type in ['vserv', 'vserver']: + full_command += " vserver %(root_dir)s exec " + elif system_type in ['chroot']: + pass + else: + raise Exception, "Invalid system type %(system_type)s" % locals() + + if self.config.verbose: + utils.header("Removing myplc") + + full_command = full_command % locals() + (stdout, stderr) = utils.popen(full_command + "/sbin/service plc safestop") + if self.config.verbose: + utils.header("\n".join(stdout)) + + (stdout, stderr) = utils.popen(full_command + remove_command) + if self.config.verbose: + utils.header("\n".join(stdout)) + + (stdout, stderr) = utils.popen(full_command + " rm -rf /plc/data") + if self.config.verbose: + utiils.header("\n".join(stdout)) + + return 1 + +if __name__ == '__main__': + args = tuple(sys.argv[1:]) + plc_unistall()(*args) diff --git a/qaapi/qa/tests/sync_user_key.py b/qaapi/qa/tests/sync_user_key.py new file mode 100644 index 0000000..c95a92e --- /dev/null +++ b/qaapi/qa/tests/sync_user_key.py @@ -0,0 +1,68 @@ +import os, sys +from qa import utils +from Test import Test + +class sync_person_key(Test): + """ + Make sure specified users public key on file matches whats + recorded at plc. Create a public/private keypair for the + specified user if one doesnt exist already. + """ + + def make_keys(path, name): + if not os.path.isdir(path): + os.mkdir(path) + key_path = path + os.sep + name + command = "ssh-keygen -f %(key_path)s -t rsa -N ''" % locals() + (stdout, stderr) = utils.popen(command) + + def call(self, email): + api = self.config.api + auth = self.config.auth + email_parts = email.split("@") + keys_filename = email_parts[0] + keys_path = self.config.KEYS_PATH + private_key_path = keys_path + os.sep + keys_filename + public_key_path = private_key_path + ".pub" + + # Validate person + persons = api.GetPersons(auth, [email], ['person_id', 'key_ids']) + if not persons: + raise Exception, "No such person %(email)s" + person = persons[0] + + # make keys if they dont already exist + if not os.path.isfile(private_key_path) or \ + not os.path.isfile(public_key_path): + # Make new keys + self.make_keys(keys_path, keys_filename) + if self.config.verbose: + utils.header("Made new key pair %(private_key_path)s %(public_key_path)s " %\ + locals()) + + # sync public key + public_key_file = open(public_key_path, 'r') + public_key = public_key_file.readline() + + keys = api.GetKeys(auth, person['key_ids']) + if not keys: + # Add current key to db + key_fields = {'type': 'rsa', + 'key': public_key} + api.AddPersonKey(auth, person['person_id'], key_fields) + if self.config.verbose: + utils.header("Added public key in %(public_key_path)s to db" % locals() ) + else: + # keys need to be checked and possibly updated + key = keys[0] + if key['key'] != public_key: + api.UpdateKey(auth, key['key_id'], public_key) + if self.config.verbose: + utils.header("Updated plc with new public key in %(public_key_path)s " % locals()) + else: + if self.config.verbose: + utils.header("Key in %(public_key_path)s matchs public key in plc" % locals()) + +if __name__ == '__main__': + args = typle(sys.argv[1:]) + sync_user_key()(*args) -- 2.47.0