--- /dev/null
+import sys, os
+from types import *
+from pprint import pprint
+
+# Add qa root directory to the path to make
+# imports easier
+path = os.path.dirname(os.path.abspath(__file__))
+path_parts = path.split(os.sep)
+sys.path.append(os.sep.join(path_parts[:-2]))
+
+from qa import utils
+from qa.logger import log
+from qa.Config import Config
+
+class Test:
+ """
+ Base class for all QA test functions. At a minimum all tests
+ must define:
+
+ call(arg1, arg2, ...): method body
+ """
+
+ accepts = []
+ returns = bool
+ status = "current"
+
+ def call(self, *args):
+ """
+ Method body for test functions. Must override.
+ """
+ return True
+
+ def __init__(self, config = Config()):
+ self.name = self.__class__.__name__
+ self.path=os.path.dirname(sys.argv[0])
+ self.config = config
+ self.errors = []
+
+
+ def __call__(self, *args, **kwds):
+ """
+ Main entry point for test functions. logs methods
+ """
+
+ #(min_args, max_args, defaults) = self.args()
+
+ # Check that the right number of arguments were passed in
+ #if len(args) < len(min_args) or len(args) > len(max_args):
+ # raise Exception#, (len(args), len(min_args), len(max_args))
+
+ result = self.call(*args, **kwds)
+ return result
+
+
+ def args(self):
+ """
+ Returns a tuple:
+
+ ((arg1_name, arg2_name, ...),
+ (arg1_name, arg2_name, ..., optional1_name, optional2_name, ...),
+ (None, None, ..., optional1_default, optional2_default, ...))
+
+ That represents the minimum and maximum sets of arguments that
+ this function accepts and the defaults for the optional arguments.
+ """
+
+ # Inspect call. Remove self from the argument list.
+ max_args = self.call.func_code.co_varnames[1:self.call.func_code.co_argcount]
+ defaults = self.call.func_defaults
+ if defaults is None:
+ defaults = ()
+
+ min_args = max_args[0:len(max_args) - len(defaults)]
+ defaults = tuple([None for arg in min_args]) + defaults
+
+ return (min_args, max_args, defaults)
--- /dev/null
+import os, sys
+import traceback
+import time
+from Test import Test
+from qa import utils
+
+class access_slice(Test):
+ """
+ Repeatedly attempt to use the specified users credentials to
+ access the specified node on the specified slice.
+ """
+
+ def call(self, email, slice_name, hostname, timeout=3):
+ api = self.config.api
+ auth = self.config.auth
+ email_parts = email.split("@")
+ keys_filename = email_parts[0]
+ keys_path = self.config.KEYS_PATH
+ private_key_path = keys_path + os.sep + keys_filename
+ public_key_path = private_key_path + ".pub"
+
+ # Validate slice
+ slices = api.GetSlices(auth, [slice_name], ['name', 'slice_id', 'node_ids'])
+ if not slices:
+ raise Exception, "No such slice %(slice_name)s" % locals()
+ slice = slices[0]
+
+ # Validate node
+ nodes = api.GetNodes(auth, [hostname], ['hostname', 'node_id', 'slice_ids'])
+ if not nodes:
+ raise Exception, "No such node %(hostname)s" % locals()
+ node = nodes[0]
+ if slice['slice_id'] not in node['slice_ids']:
+ raise Exception, "%(slice_name)s not on %(hostname)s" % locals()
+
+ # Validate user
+ persons = api.GetPersons(auth, ['email'], ['person_id', 'key_ids', 'slice_ids'])
+ if not persons:
+ raise Exception, "No such person %(email)s" % locals()
+ person = persons[0]
+ if slice['slice_id'] not in person['slice_ids']:
+ raise Exception, "%(email)s not in slice %(slice_name)s" % locals()
+
+ # get keys
+ if not os.path.isfile(private_key_path) or \
+ not os.path.isfile(public_key_path):
+ # keys dont exist, call api.sync_user_key()
+ from qa.modules.api.sync_user_key import sync_user_key
+ sync_user_key()(email)
+
+ # attempt to access slice
+ start_time = time.time()
+ end_time = start_time + timeout*60
+ sleep = 30
+ while time.time() < endtime:
+ if self.config.verbose:
+ utils.header("Trying to connect to %(slice_name)s@%(hostname)s" % locals())
+ ssh_command = "ssh -i %(private_key_path)s %(slice_name)s@%(hostname)s" % locals()
+ host_check = os.system(ssh_command + " hostname ")
+ if host_check == 0:
+ if self.config.verbose:
+ utils.header("connecteed to %(slice_name)s@%(hostname)s" % locals())
+ return 1
+ else:
+ if self.config.verbose:
+ utils.header("failed to connect to %(slice_name)s@%(hostname)s" % locals())
+ time.sleep(sleep)
+
+ return 0
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ access_slice()(*args)
--- /dev/null
+import os,sys
+from Test import Test
+from qa import utils
+
+class add_test_data(Test):
+ """
+ Adds the test data found in config to the plc db
+ """
+ def call(self):
+
+ api = self.config.api
+ auth = self.config.auth
+
+ # Make sure some required fields are in config
+ required_fields = ['TEST_SITE_NAME', 'TEST_SITE_LOGIN_BASE', 'TEST_SLICE_NAME', 'TEST_PERSON_EMAIL']
+ required_node_fields = ['TEST_NODE_TYPE', 'TEST_NODE_METHOD', 'TEST_NODE_HOSTNAME', 'TEST_NODE_IP',
+ 'TEST_NODE_GATEWAY', 'TEST_NODE_DNS', 'TEST_NODE_NETWORK', 'TEST_NODE_BROADCAST',
+ 'TEST_NODE_NETMASK']
+
+ for field in required_fields:
+ if not hasattr(self.config, field) or \
+ len(getattr(self.config, field).strip()) < 1:
+ raise Exception, "%(field)s must be set and cannot be blank" % locals()
+
+ # Look for node configurations
+ node_params = {}
+ for attr in dir(self.config):
+ if attr.find("NODE") > 0:
+ parts = attr.split('_')
+ node_prefix = parts[1] +"_"+ parts[3]
+ name = "_".join(parts[:3])
+ value = getattr(self.config, attr)
+ # start a new node dictionary
+ if node_prefix not in node_params:
+ node_params[node_prefix] = {'prefix': node_prefix}
+ node_params[node_prefix][name] = value
+
+ node_configs = node_params.values()
+ node_list = []
+
+ # Make sure required node fields are preset for each node config
+ for node_config in node_configs:
+ for field in required_node_fields:
+ if field not in node_config or len(node_config[field].strip()) < 1:
+ raise Exception, "%s must be set for %s and cannot be blank" % (field, node_config['prefix'])
+ node = {'type': node_config['TEST_NODE_TYPE'],
+ 'method': node_config['TEST_NODE_METHOD'],
+ 'hostname': node_config['TEST_NODE_HOSTNAME'],
+ 'ip': node_config['TEST_NODE_IP'],
+ 'gateway': node_config['TEST_NODE_GATEWAY'],
+ 'dns1': node_config['TEST_NODE_DNS'],
+ 'broadcast': node_config['TEST_NODE_BROADCAST'],
+ 'network': node_config['TEST_NODE_NETWORK'],
+ 'netmask': node_config['TEST_NODE_NETMASK'],
+ 'slice_ids': [],
+ 'nodenetwork_ids': []}
+ node_list.append(node)
+
+
+ # Define test objects
+ site_fields = {'name': self.config.TEST_SITE_NAME, 'login_base': self.config.TEST_SITE_LOGIN_BASE,
+ 'url': 'http://google.com', 'enabled': True, 'max_slices': 1000,
+ 'max_slivers': 1000, 'is_public': True, 'abbreviated_name': 'Test',
+ 'person_ids': []}
+
+ slice_fields = {'name': self.config.TEST_SLICE_NAME, 'instantiation': 'plc-instantiated',
+ 'max_nodes': 1000, 'description': 'blank', 'person_ids': [], 'node_ids': []}
+
+ person_fields = {'first_name': 'fname', 'last_name': 'lname', 'password': 'password',
+ 'email': self.config.TEST_PERSON_EMAIL, 'site_ids': [], 'slice_ids': []}
+
+
+ # Add Test site
+ sites = api.GetSites(auth, {'login_base': site_fields['login_base']})
+ if not sites:
+ site_id = api.AddSite(auth, site_fields)
+ site_fields['site_id'] = site_id
+ site = site_fields
+ if self.config.verbose:
+ utils.header("Added test site")
+ else:
+ site = sites[0]
+ if self.config.verbose:
+ utils.header("Test site found")
+
+ # Add Test nodes
+ for node_fields in node_list:
+ nodes = api.GetNodes(auth, [node_fields['hostname']])
+ if not nodes:
+ node_id = api.AddNode(auth, site_fields['login_base'], node_fields)
+ node_fields['node_id'] = node_id
+ node = node_fields
+ nodes.append(node_fields)
+ if self.config.verbose:
+ utils.header("Added test node")
+ else:
+ node = nodes[0]
+ if self.config.verbose:
+ utils.header("Test node found")
+
+ # Add node network
+ if not node['nodenetwork_ids']:
+ nodenetwork_id = api.AddNodeNetwork(auth, node_fields['hostname'], node_fields)
+ if self.config.verbose:
+ utils.header("Added test nodenetwork")
+ else:
+ if self.config.verbose:
+ utils.header("Nodenetwork found")
+
+ # Add Test slice
+ slices = api.GetSlices(auth, [slice_fields['name']])
+ if not slices:
+ slice_id = api.AddSlice(auth, slice_fields)
+ slice_fields['slice_id'] = slice_id
+ slice = slice_fields
+ if self.config.verbose:
+ utils.header("Added test slice")
+ else:
+ slice = slices[0]
+ if self.config.verbose:
+ utils.header("Test slice found")
+
+ # Add slice to nodes
+ node_ids = [n['node_id'] for n in nodes]
+ node_ids = filter(lambda node_id: node_id not in slice['node_ids'], node_ids)
+ if node_ids:
+ api.AddSliceToNodes(auth, slice['name'], node_ids)
+ if self.config.verbose:
+ utils.header("Added test slice to test nodes")
+ else:
+ if self.config.verbose:
+ utils.header("Test slice found on test nodes")
+
+ # Add test person
+ persons = api.GetPersons(auth, [person_fields['email']])
+ if not persons:
+ person_id = api.AddPerson(auth, person_fields)
+ person_fields['person_id'] = person_id
+ person = person_fields
+ if self.config.verbose:
+ utils.header("Added test person")
+ else:
+ person = persons[0]
+ if self.config.verbose:
+ utils.header("Test person found")
+
+ # Add roles to person
+ api.AddRoleToPerson(auth, 'user', person['email'])
+
+ # Add person to site
+ if site['site_id'] not in person['site_ids']:
+ api.AddPersonToSite(auth, person['email'], site['login_base'])
+ if self.config.verbose:
+ utils.header("Added test person to test site")
+ else:
+ if self.config.verbose:
+ utils.header("Test person found on test site")
+
+ # Add person to slice
+ if slice['slice_id'] not in person['slice_ids']:
+ api.AddPersonToSlice(auth, person_fields['email'], slice_fields['name'])
+ if self.config.verbose:
+ utils.header("Added test person to slice")
+ else:
+ if self.config.verbose:
+ utils.header("Test person found on test slice")
+
+if __name__ == '__main__':
+ args = typle(sys.argv[1:])
+ add_test_data()(*args)
--- /dev/null
+#!/usr/bin/python
+#
+# Test script example
+#
+# Copyright (C) 2006 The Trustees of Princeton University
+#
+#
+
+from pprint import pprint
+from string import letters, digits, punctuation
+from traceback import print_exc
+import base64
+import os, sys
+import socket
+import xmlrpclib
+import time
+
+from Test import Test
+from qa import utils
+from qa.Config import Config
+from qa.logger import Logfile, log
+from random import Random
+
+random = Random()
+
+config = Config()
+auth = config.auth
+
+try: boot_states = config.api.GetBootStates(auth)
+except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
+
+try: roles = [role['name'] for role in config.api.GetRoles(auth)]
+except: roles = [u'admin', u'pi', u'user', u'tech']
+
+try: methods = config.api.GetNetworkMethods(auth)
+except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
+
+try:types = config.api.GetNetworkTypes(auth)
+except: types = [u'ipv4']
+
+def randfloat(min = 0.0, max = 1.0):
+ return float(min) + (random.random() * (float(max) - float(min)))
+
+def randint(min = 0, max = 1):
+ return int(randfloat(min, max + 1))
+
+# See "2.2 Characters" in the XML specification:
+#
+# #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
+# avoiding
+# [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
+
+ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
+ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
+low_xml_chars = list(ascii_xml_chars)
+low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
+low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
+valid_xml_chars = list(low_xml_chars)
+valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
+valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
+valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
+
+def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
+ sample = random.sample(pool, min(length, len(pool)))
+ while True:
+ s = u''.join(sample)
+ bytes = len(s.encode(encoding))
+ if bytes > length:
+ sample.pop()
+ elif bytes < length:
+ sample += random.sample(pool, min(length - bytes, len(pool)))
+ random.shuffle(sample)
+ else:
+ break
+ return s
+
+def randhostname():
+ # 1. Each part begins and ends with a letter or number.
+ # 2. Each part except the last can contain letters, numbers, or hyphens.
+ # 3. Each part is between 1 and 64 characters, including the trailing dot.
+ # 4. At least two parts.
+ # 5. Last part can only contain between 2 and 6 letters.
+ hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
+ 'b' + randstr(61, letters + digits + '-') + '2.' + \
+ 'c' + randstr(5, letters)
+ return hostname
+
+def randpath(length):
+ parts = []
+ for i in range(randint(1, 10)):
+ parts.append(randstr(randint(1, 30), ascii_xml_chars))
+ return os.sep.join(parts)[0:length]
+
+def randemail():
+ return (randstr(100, letters + digits) + "@" + randhostname()).lower()
+
+def randkey(bits = 2048):
+ key_types = ["ssh-dss", "ssh-rsa"]
+ key_type = random.sample(key_types, 1)[0]
+ return ' '.join([key_type,
+ base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
+ randemail()])
+
+def random_site():
+ return {
+ 'name': randstr(254),
+ 'abbreviated_name': randstr(50),
+ 'login_base': randstr(20, letters).lower(),
+ 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
+ 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
+ }
+
+def random_address_type():
+ return {
+ 'name': randstr(20),
+ 'description': randstr(254),
+ }
+
+def random_address():
+ return {
+ 'line1': randstr(254),
+ 'line2': randstr(254),
+ 'line3': randstr(254),
+ 'city': randstr(254),
+ 'state': randstr(254),
+ 'postalcode': randstr(64),
+ 'country': randstr(128),
+ }
+
+def random_person():
+ return {
+ 'first_name': randstr(128),
+ 'last_name': randstr(128),
+ 'email': randemail(),
+ 'bio': randstr(254),
+ # Accounts are disabled by default
+ 'enabled': False,
+ 'password': randstr(254),
+ }
+
+def random_key():
+ return {
+ 'key_type': random.sample(key_types, 1)[0],
+ 'key': randkey()
+ }
+
+def random_slice():
+ return {
+ 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
+ 'url': "http://" + randhostname() + "/",
+ 'description': randstr(2048),
+ }
+
+def random_nodegroup():
+ return {
+ 'name': randstr(50),
+ 'description': randstr(200),
+ }
+
+def random_node():
+ return {
+ 'hostname': randhostname(),
+ 'boot_state': random.sample(boot_states, 1)[0],
+ 'model': randstr(255),
+ 'version': randstr(64),
+ }
+
+def random_nodenetwork():
+ nodenetwork_fields = {
+ 'method': random.sample(methods, 1)[0],
+ 'type': random.sample(types, 1)[0],
+ 'bwlimit': randint(500000, 10000000),
+ }
+
+ if method != 'dhcp':
+ ip = randint(0, 0xffffffff)
+ netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
+ network = ip & netmask
+ broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
+ gateway = randint(network + 1, broadcast - 1)
+ dns1 = randint(0, 0xffffffff)
+
+ for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
+ nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
+
+ return nodenetwork_fields
+
+def random_pcu():
+ return {
+ 'hostname': randhostname(),
+ 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
+ 'protocol': randstr(16),
+ 'username': randstr(254),
+ 'password': randstr(254),
+ 'notes': randstr(254),
+ 'model': randstr(32),
+ }
+
+def random_conf_file():
+ return {
+ 'enabled': bool(randint()),
+ 'source': randpath(255),
+ 'dest': randpath(255),
+ 'file_permissions': "%#o" % randint(0, 512),
+ 'file_owner': randstr(32, letters + '_' + digits),
+ 'file_group': randstr(32, letters + '_' + digits),
+ 'preinstall_cmd': randpath(100),
+ 'postinstall_cmd': randpath(100),
+ 'error_cmd': randpath(100),
+ 'ignore_cmd_errors': bool(randint()),
+ 'always_update': bool(randint()),
+ }
+
+def random_attribute_type():
+ return {
+ 'name': randstr(100),
+ 'description': randstr(254),
+ 'min_role_id': random.sample(roles.values(), 1)[0],
+ }
+
+def isequal(object_fields, expected_fields):
+ try:
+ for field in expected_fields:
+ assert field in object_fields
+ assert object_fields[field] == expected_fields[field]
+ except:
+ return False
+ return True
+
+def islistequal(list1, list2):
+ try:
+ assert set(list1) == set(list2)
+ except:
+ return False
+ return True
+
+def isunique(id, id_list):
+ try:
+ assert id not in id_list
+ except:
+ return False
+ return True
+
+class api_unit_test(Test):
+
+ def call(self,
+ sites = 2,
+ nodes = 4,
+ address_types = 3,
+ addresses = 2,
+ persons = 10,
+ keys = 3
+ ):
+ self.api = self.config.api
+ self.auth = self.config.auth
+ self.all_methods = set(self.api.system.listMethods())
+ self.methods_tested = set()
+ self.methods_failed = set()
+
+ try:
+ try:
+ self.site_ids = self.Sites(sites)
+ self.node_ids = self.Nodes(nodes)
+ except:
+ print_exc()
+ finally:
+ try:
+ self.cleanup()
+ finally:
+
+ logfile = Logfile("api-unittest.log")
+ methods_ok = list(self.methods_tested.difference(self.methods_failed))
+ methods_failed = list(self.methods_failed)
+ methods_untested = list(self.all_methods.difference(self.methods_tested))
+ methods_ok.sort()
+ methods_failed.sort()
+ methods_untested.sort()
+ print >> logfile, "\n".join([m+": [OK]" for m in methods_ok])
+ print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
+ print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
+
+ def isequal(self, object_fields, expected_fields, method_name):
+ try:
+ for field in expected_fields:
+ assert field in object_fields
+ assert object_fields[field] == expected_fields[field]
+ except:
+ self.methods_failed.update([method_name])
+ return False
+ return True
+
+ def islistequal(self, list1, list2, method_name):
+ try: assert set(list1) == set(list2)
+ except:
+ self.methods_failed.update([method_name])
+ return False
+ return True
+
+ def isunique(self, id, id_list, method_name):
+ try: assert id not in id_list
+ except:
+ self.methods_failed.update([method_name])
+ return False
+ return True
+
+ def debug(self, method, method_name=None):
+ if method_name is None:
+ method_name = method._Method__name
+
+ self.methods_tested.update([method_name])
+ def wrapper(*args, **kwds):
+ try:
+ return method(*args, **kwds)
+ except:
+ self.methods_failed.update([method_name])
+ return None
+
+ return wrapper
+
+ def cleanup(self):
+ #if self.person_ids: self.DeletePersons()
+ #if self.address_ids: self.DeleteAddresses()
+ #if self.address_type_ids: self.DeleteAddressTypes()
+ if hasattr(self, 'node_ids'): self.DeleteNodes()
+ if hasattr(self, 'site_ids'): self.DeleteSites()
+
+
+ def Sites(self, n=4):
+ site_ids = []
+ for i in range(n):
+ # Add Site
+ site_fields = random_site()
+ AddSite = self.debug(self.api.AddSite)
+ site_id = AddSite(self.auth, site_fields)
+ if site_id is None: continue
+
+ # Should return a unique id
+ self.isunique(site_id, site_ids, 'AddSite - isunique')
+ site_ids.append(site_id)
+ GetSites = self.debug(self.api.GetSites)
+ sites = GetSites(self.auth, [site_id])
+ if sites is None: continue
+ site = sites[0]
+ self.isequal(site, site_fields, 'AddSite - isequal')
+
+ # Update site
+ site_fields = random_site()
+ UpdateSite = self.debug(self.api.UpdateSite)
+ result = UpdateSite(self.auth, site_id, site_fields)
+
+ # Check again
+ sites = GetSites(self.auth, [site_id])
+ if sites is None: continue
+ site = sites[0]
+ self.isequal(site, site_fields, 'UpdateSite - isequal')
+
+ sites = GetSites(self.auth, site_ids)
+ if sites is not None:
+ self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
+
+ if self.config.verbose:
+ utils.header("Added sites: %s" % site_ids)
+
+ return site_ids
+
+
+ def DeleteSites(self):
+ # Delete all sites
+ DeleteSite = self.debug(self.api.DeleteSite)
+ for site_id in self.site_ids:
+ result = DeleteSite(self.auth, site_id)
+
+ # Check if sites are deleted
+ GetSites = self.debug(self.api.GetSites)
+ sites = GetSites(self.auth, self.site_ids)
+ self.islistequal(sites, [], 'DeleteSite - check')
+
+ if self.config.verbose:
+ utils.header("Deleted sites: %s" % self.site_ids)
+
+ self.site_ids = []
+
+ def Nodes(self, n=4):
+ node_ids = []
+ for i in range(n):
+ # Add Node
+ node_fields = random_node()
+ site_id = random.sample(self.site_ids, 1)[0]
+ AddNode = self.debug(self.api.AddNode)
+ node_id = AddNode(self.auth, site_id, node_fields)
+ if node_id is None: continue
+
+ # Should return a unique id
+ self.isunique(node_id, node_ids, 'AddNode - isunique')
+ node_ids.append(node_id)
+
+ # Check nodes
+ GetNodes = self.debug(self.api.GetNodes)
+ nodes = GetNodes(self.auth, [node_id])
+ if nodes is None: continue
+ node = nodes[0]
+ self.isequal(node, node_fields, 'AddNode - isequal')
+
+ # Update node
+ node_fields = random_node()
+ UpdateNode = self.debug(self.api.UpdateNode)
+ result = UpdateNode(self.auth, node_id, node_fields)
+
+ # Check again
+ nodes = GetNodes(self.auth, [node_id])
+ if nodes is None: continue
+ node = nodes[0]
+ self.isequal(node, node_fields, 'UpdateNode - isequal')
+
+ nodes = GetNodes(self.auth, node_ids)
+ if nodes is not None:
+ self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
+
+ if self.config.verbose:
+ utils.header("Added nodes: %s" % node_ids)
+
+ return node_ids
+
+ def DeleteNodes(self):
+ DeleteNode = self.debug(self.api.DeleteNode)
+ for node_id in self.node_ids:
+ result = DeleteNode(self.auth, node_id)
+
+ # Check if nodes are deleted
+ GetNodes = self.debug(self.api.GetNodes)
+ nodes = GetNodes(self.api, self.node_ids)
+ self.islistequal(nodes, [], 'DeleteNode Check')
+
+ if self.config.verbose:
+ utils.header("Deleted nodes: %s" % self.node_ids)
+
+ self.node_ids = []
+
+ def AddressTypes(self, n = 3):
+ address_type_ids = []
+ for i in range(n):
+ address_type_fields = random_address_type()
+ AddAddressType = self.debug(self.api.AddAddressType)
+ address_type_id = AddAddressType(self.auth, address_type_fields)
+ if address_type_id is None: continue
+
+ # Should return a unique address_type_id
+ self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique')
+ address_type_ids.append(address_type_id)
+
+ # Check address type
+ GetAddressTypes = self.debug(self.api.GetAddressTypes)
+ address_types = GetAddressTypes(self.auth, [address_type_id])
+ if address_types is None: continue
+ address_type = address_types[0]
+ self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
+
+ # Update address type
+ address_type_fields = random_address_type()
+ UpdateAddressType = self.debug(self.api.UpdateAddressType)
+ result = UpdateAddressType(self.auth, address_type_id, address_type_fields)
+ if result is None: continue
+
+ # Check address type again
+ address_types = GetAddressTypes(self.auth, [address_type_id])
+ if address_types is None: continue
+ address_type = address_types[0]
+ self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
+
+ # Check get all address types
+ address_types = GetAddressTypes(self.auth, address_type_ids)
+ if address_types is not None:
+ self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
+
+ if self.config.verbose:
+ print "Added address types", address_type_ids
+
+ return address_type_ids
+
+ def DeleteAddressTypes(self):
+
+ DeleteAddressType = self.debug(self.api.DeleteAddressType)
+ for address_type_id in self.address_type_ids:
+ DeleteAddressType(auth, address_type_id)
+
+ GetAddressTypes = self.debug(self.api.GetAddressTypes)
+ address_types = GetAddressTypes(self.auth, self.address_type_ids)
+ self.islistequal(address_types, [], 'DeleteAddressType - check')
+
+ if self.config.verbose:
+ utils.header("Deleted address types: " % self.address_type_ids)
+
+ self.address_type_ids = []
+
+ def Addresses(self, n = 3):
+ address_ids = []
+ for i in range(n):
+ address_fields = random_address()
+ site_id = random.sample(self.site_ids, 1)[0]
+ AddSiteAddress = self.debug(self.api.AddSiteAddress)
+ address_id = AddSiteAddress(self.auth, site_id, address_fields)
+ if address_id is None: continue
+
+ # Should return a unique address_id
+ self.isunique(address_id, address_ids, 'AddSiteAddress - isunique')
+ address_ids.append(address_id)
+
+ # Check address
+ GetAddresses = self.debug(self.api.GetAddresses)
+ addresses = GetAddresses(self.auth, [address_id])
+ if addresses is None: continue
+ address = addresses[0]
+ self.isequal(address, address_fields, 'AddSiteAddress - isequal')
+
+ # Update address
+ address_fields = random_address()
+ UpdateAddress = self.debug(self.api.UpdateAddress)
+ result = UpdateAddress(self.auth, address_id, address_fields)
+
+ # Check again
+ addresses = GetAddresses(self.auth, [address_id])
+ if addresses is None: continue
+ address = addresses[0]
+ self.isequal(address, address_fields, 'UpdateAddress - isequal')
+
+ addresses = GetAddress(self.auth, address_ids)
+ if addresses is not None:
+ slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
+
+ if self.config.verbose:
+ utils.header("Added addresses: %s" % self.address_ids)
+
+ return address_ids
+
+ def DeleteAddresses(self):
+
+ DeleteAddress = self.debug(self.api.DeleteAddress)
+ # Delete site addresses
+ for address_id in self.address_ids:
+ result = DeleteAddress(self.auth, address_id)
+
+ # Check
+ GetAddresses = self.debug(self.api.GetAddresses)
+ addresses = GetAddresses(self.api, self.address_ids)
+ self.islistequal(addresses, [], 'DeleteAddress - check')
+ if self.verbose:
+ print "Deleted addresses", self.address_ids
+
+ self.address_ids = []
+
+ def AddPersons(self, n = 3):
+
+ roles = GetRoles()
+ role_ids = [role['role_id'] for role in roles]
+ roles = [role['name'] for role in roles]
+ roles = dict(zip(roles, role_ids))
+
+ for i in range(n):
+
+ # Add account
+ person_fields = random_person()
+ person_id = AddPerson(person_fields)
+
+ # Should return a unique person_id
+ assert person_id not in self.person_ids
+ self.person_ids.append(person_id)
+
+ if self.check:
+ # Check account
+ person = GetPersons([person_id])[0]
+ for field in person_fields:
+ if field != 'password':
+ assert person[field] == person_fields[field]
+
+ # Update account
+ person_fields = random_person()
+ UpdatePerson(person_id, person_fields)
+
+ # Check account again
+ person = GetPersons([person_id])[0]
+ for field in person_fields:
+ if field != 'password':
+ assert person[field] == person_fields[field]
+
+ auth = {'AuthMethod': "password",
+ 'Username': person_fields['email'],
+ 'AuthString': person_fields['password']}
+
+ if self.check:
+ # Check that account is disabled
+ try:
+ assert not AuthCheck(auth)
+ except:
+ pass
+
+ # Add random set of roles
+ person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
+ for person_role in person_roles:
+ role_id = roles[person_role]
+ AddRoleToPerson(role_id, person_id)
+
+ if self.check:
+ person = GetPersons([person_id])[0]
+ assert set(person_roles) == set(person['roles'])
+
+ # Enable account
+ UpdatePerson(person_id, {'enabled': True})
+
+ if self.check:
+ # Check that account is enabled
+ assert AuthCheck(auth)
+
+ # Associate account with random set of sites
+ person_site_ids = []
+ for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
+ AddPersonToSite(person_id, site_id)
+ person_site_ids.append(site_id)
+
+ if self.check:
+ # Make sure it really did it
+ person = GetPersons([person_id])[0]
+ assert set(person_site_ids) == set(person['site_ids'])
+
+ # Set a primary site
+ primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
+ SetPersonPrimarySite(person_id, primary_site_id)
+
+ if self.check:
+ person = GetPersons([person_id])[0]
+ assert person['site_ids'][0] == primary_site_id
+
+ if self.verbose:
+ print "Added users", self.person_ids
+
+ def DeletePersons(self):
+ # Delete users
+ for person_id in self.person_ids:
+ # Remove from each site
+ for site_id in self.site_ids:
+ DeletePersonFromSite(person_id, site_id)
+
+ if self.check:
+ person = GetPersons([person_id])[0]
+ assert not person['site_ids']
+
+ # Revoke roles
+ person = GetPersons([person_id])[0]
+ for role_id in person['role_ids']:
+ DeleteRoleFromPerson(role_id, person_id)
+
+ if self.check:
+ person = GetPersons([person_id])[0]
+ assert not person['role_ids']
+
+ # Disable account
+ UpdatePerson(person_id, {'enabled': False})
+
+ if self.check:
+ person = GetPersons([person_id])[0]
+ assert not person['enabled']
+
+ # Delete account
+ DeletePerson(person_id)
+
+ if self.check:
+ assert not GetPersons([person_id])
+
+ if self.check:
+ assert not GetPersons(self.person_ids)
+
+ if self.verbose:
+ print "Deleted users", self.person_ids
+
+ self.person_ids = []
+
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ api_unit_test()(*args)
--- /dev/null
+import os,sys
+import base64
+from Test import Test
+from qa import utils
+
+image_types = ['node-iso', 'node-usb', 'generic-iso', 'generic-usb']
+
+class boot_node(Test):
+ """
+ Attempts to boot the specified node using qemu.
+ """
+
+ def call(self, hostname, image_type = 'node-iso', disk_size="4G"):
+ api = self.config.api
+ auth = self.config.auth
+ tdir = "/tmp/"
+
+ # validate hostname
+ nodes = api.GetNodes(auth, [hostname], ['hostname'])
+ if not nodes:
+ raise Exception, "No such node %(hostname)s" % locals()
+
+ bootimage = api.GetBootMedium(auth, hostname, image_type, '')
+ bootimage_path = '/%(tdir)s/%(hostname)s-bootcd.iso' % locals()
+
+ if self.config.verbose:
+ utils.header("Creating bootcd for %(hostname)s at %(bootimage_path)s" % locals())
+ # Create a temporary bootcd file
+ file = open(bootimage_path, 'w')
+ file.write(base64.b64decode(bootimage))
+ file.close()
+
+ # Create a temporary disk image
+ diskimage_path = "/%(tdir)s/%(hostname)s-hda.img" % locals()
+ qemu_img_cmd = "qemu-img create -f qcow2 %(diskimage_path)s %(disk_size)s" % locals()
+ (stdin, stdout, stderr) = os.popen3(qemu_img_cmd)
+ self.errors = stderr.readlines()
+ if self.errors:
+ raise Exception, "Unable to create disk image\n" + \
+ "\n".join(self.errors)
+
+ if self.config.verbose:
+ utils.header("Booting %(hostname)s" % locals())
+ # Attempt to boot this node image
+ bootcmd = "qemu -hda %(diskimage_path)s -cdrom %(bootimage_path)s -smp 1 -m 256 -monitor stdio" % \
+ locals()
+ (stdin, stdout, stderr) = os.popen3(bootcmd)
+ self.errors = stderr.readlines()
+ if self.errors:
+ raise Exception, "Unable to boot node image\n" + \
+ "\n".join(self.errors)
+
+ return 1
+
+ def get_image_medium(self, hostname, image_type, path):
+ api = self.config.api
+ auth = self.config.auth
+
+ file = open(path, 'w')
+ if image-type in ['node-floppy', 'node-iso', 'node-usb', 'generic-iso', 'generic-usb']:
+ image = api.GetBootMedium(auth, hostname, image_type, '')
+ image = base64.b64.decode(image)
+
+ file.write(image)
+ file.close()
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ boot_node()(*args)
+
+
--- /dev/null
+import os, sys
+import time
+from qa import utils
+from Test import Test
+
+class check_boot_state(Test):
+ """
+ Continually checks the boot_state of the specified node until
+ either the node reaches boot, the node reaches debug or the
+ timeout is reached.
+
+ Timeout represents the ammout of time (in minutes) we should
+ continue trying before quitting.
+
+ Sleep represnet the ammount of time (in seconds) to wait in
+ between checks.
+
+ Returns the boot state of the node.
+ """
+ def call(self, hostname, timeout = 5, sleep = 30):
+ exit = False
+ api = self.config.api
+ auth = self.config.auth
+
+ # Validate hostname
+ nodes = api.GetNodes(auth, [hostname], ['hostname'])
+ if not nodes:
+ raise Exception, "No such hostname %(hostname)s" % locals()
+
+ start_time = time.time()
+ end_time = start_time + (timeout * 60)
+
+ while not exit:
+ nodes = api.GetNodes(auth, [hostname], ['boot_state'])
+ node = nodes[0]
+ boot_state = node['boot_state']
+ if self.config.verbose:
+ utils.header("%(hostname)s boot_state is %(boot_state)s" % locals())
+
+ if boot_state in ['boot', 'debug']:
+ exit = True
+ elif time.time() < end_time:
+ time.sleep(sleep)
+ else:
+ exit = True
+
+
+ if self.config.verbose:
+ if boot_state in ['boot']:
+ utils.header("%(hostname)s correctly installed and booted" % locals())
+ else:
+ utils.header("%(hostname)s not fully booted" % locals())
+
+ return boot_state
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ check_boot_state()(*args)
--- /dev/null
+import os, sys
+from Test import Test
+from qa import utils
+
+
+class delete_test_data(Test):
+ """
+ Removes the test data found in config from the plc db
+ """
+
+ def call(self):
+
+ api = self.config.api
+ auth = self.config.auth
+
+ site_login_base = self.config.TEST_SITE_LOGIN_BASE
+
+ # Deleting the site should delete everything associated with it
+ api.DeleteSite(auth, site_login_base)
+ if self.config.verbose:
+ utils.header("Test data deleted")
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ delete_test_data(*args)
--- /dev/null
+import os, sys
+import time
+from qa import utils
+from Test import Test
+
+class get_boot_state(Test):
+ """
+ Continually checks the boot_state of the specified node until
+ either the node reaches boot, the node reaches debug or the
+ timeout is reached.
+
+ Timeout represents the ammout of time (in minutes) we should
+ continue trying before quitting.
+
+ Sleep represnet the ammount of time (in seconds) to wait in
+ between checks.
+
+ Returns the boot state of the node.
+ """
+ def call(self, hostname, timeout = 5, sleep = 30):
+ exit = False
+ api = self.config.api
+ auth = self.config.auth
+
+ # Validate hostname
+ nodes = api.GetNodes(auth, [hostname], ['hostname'])
+ if not nodes:
+ raise Exception, "No such hostname %(hostname)s" % locals()
+
+ start_time = time.time()
+ end_time = start_time + (timeout * 60)
+
+ while not exit:
+ nodes = api.GetNodes(auth, [hostname], ['boot_state'])
+ node = nodes[0]
+ boot_state = node['boot_state']
+ if self.config.verbose:
+ utils.header("%(hostname)s boot_state is %(boot_state)s" % locals())
+
+ if boot_state in ['boot', 'debug']:
+ exit = True
+ elif time.time() < end_time:
+ time.sleep(sleep)
+ else:
+ exit = True
+
+
+ if self.config.verbose:
+ if boot_state in ['boot']:
+ utils.header("%(hostname)s correctly installed and booted" % locals())
+ else:
+ utils.header("%(hostname)s not fully booted" % locals())
+
+ return boot_state
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ get_boot_state()(*args)
--- /dev/null
+import os, sys
+import traceback
+from new import classobj
+from Test import Test
+
+
+class AddPerson(Test): pass
+
+class AddSite(Test): pass
+
+class GetNodes(Test):
+
+ def call(self, hostname):
+ return self.config.api.GetNodes(self.config.auth, hostname)
+
+for name in ['Get', 'Update' ,'Delete']:
+ tc = classobj(name, (Test,), {})
+ setattr(tc, 'call', lambda x: 1)
+ globals()[name] = tc
+ del(tc)
+
+
+
--- /dev/null
+
+import os, sys
+import traceback
+from Test import Test
+from qa import utils
+
+class plc_configure(Test):
+ """
+ Configure the myplc from config options in config file
+ """
+
+ def call(self, system_type, root_dir):
+ tmpname = '/tmp/plc-cinfig-tty-%d' % os.getpid()
+ fileconf = open(tmpname, 'w')
+ for var in [ 'PLC_NAME',
+ 'PLC_ROOT_PASSWORD',
+ 'PLC_ROOT_USER',
+ 'PLC_MAIL_ENABLED',
+ 'PLC_MAIL_SUPPORT_ADDRESS',
+ 'PLC_DB_HOST',
+ 'PLC_API_HOST',
+ 'PLC_WWW_HOST',
+ 'PLC_BOOT_HOST',
+ 'PLC_NET_DNS1',
+ 'PLC_NET_DNS2']:
+ fileconf.write('e %s\n%s\n' % (var, getattr(self.config, var)))
+ fileconf.write('w\nq\n')
+ fileconf.close()
+
+ mount_command = "/sbin/service plc mount"
+ full_command = ""
+ if system_type in ['vserv', 'vserver']:
+ full_command += " vserver %(root_dir)s exec " % locals()
+ elif system_type in ['chroot']:
+ full_command += " chroot %(root_dir)s " % locals()
+ else:
+ raise Exception, "Invalid system type %(sytem_type)s" % locals()
+
+ full_command += " plc-config-tty < %(tmpname)s" % locals()
+ commands = [mount_command, full_command]
+ for command in commands:
+ if self.config.verbose:
+ utils.header(command)
+ (stdout, stderr) = utils.popen(command)
+ (stdout, stderr) = utils.popen("rm %s" % tmpname)
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_configure()(*args)
--- /dev/null
+
+import os, sys
+import traceback
+from qa import utils
+from Test import Test
+
+class plc_install(Test):
+ """
+ Installs a myplc
+ """
+
+ def call(self, system_type, root_dir, url=None):
+
+ url_path = self.config.path
+ # Determine url
+ if not url:
+ try:
+
+ url_file = open("%s/URL" % url_path)
+ url = url_file.read().strip()
+ url_file.close()
+ except IOError:
+ pass
+ if not url:
+ print "URL not specified"
+ sys.exit(1)
+
+ # Save url
+ if self.config.verbose:
+ utils.header('Saving current myplc url into %s/URL' % url_path)
+ fsave=open('%s/URL' % url_path, "w")
+ fsave.write(url+'\n')
+ fsave.close()
+
+ # Instal myplc from url
+ if self.config.verbose:
+ utils.header('Installing myplc from url %s' % url)
+
+ # build command
+ full_command = ""
+ install_command = " rpm -Uvh %(url)s "
+ if system_type in ['vserv', 'vserver']:
+ full_command += " vserver %(root_dir)s exec "
+ elif system_type in ['chroot']:
+ pass
+ else:
+ raise Exception, "Invalid system type %(system_type)s" % locals()
+
+ full_command += install_command % locals()
+ try: (stdout, stderr) = utils.popen(full_command)
+ except: (stdout, stderr) = utils.popen("yum localupdate %(url)s")
+
+ if self.config.verbose:
+ utils.header("\n".join(stdout))
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_install()(*args)
--- /dev/null
+import os, sys
+from Test import Test
+from qa import utils
+
+class plc_remote_call(Test):
+ """
+ Attempt to connect to a node using the plc root key and
+ issue a command.
+ """
+
+ def call(self, root_key_path, hostname, command):
+ if not os.path.isfile(root_key_path):
+ raise Exception, "no such private key file %(root_key_path)s" % locals()
+
+ full_command = "ssh -i %(root_key_path)s root@%(hostname)s %(command)s" % locals()
+ if self.config.verbose:
+ utils.header(full_command)
+ (stdout, stderr) = utils.popen(full_command)
+
+ if self.config.verbose:
+ utils.header("\n".join(stdout))
+
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_remote_call()(*args)
--- /dev/null
+import traceback
+import sys
+from Test import Test
+from qa import utils
+
+class plc_start(Test):
+ """
+ Starts the myplc service
+ """
+
+ def call(self, system_type, root_dir):
+
+ start_command = " /sbin/service plc start "
+ full_command = ""
+
+ if system_type in ['vserv', 'vserver']:
+ full_command += " vserver %(root_dir)s exec "
+ elif system_type in ['chroot']:
+ pass
+ else:
+ raise Exception, "Invalid system type %(system_type)s" % locals()
+
+ full_command += start_command % locals()
+
+ if self.config.verbose:
+ utils.header(full_command)
+
+ (stdout, stderr) = utils.popen(full_command)
+
+ if self.config.verbose:
+ utils.header("".join(stdout))
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_start()(*args)
--- /dev/null
+import os, sys
+import traceback
+from Test import Test
+from qa import utils
+
+class plc_stop(Test):
+ """
+ Installs a myplc
+ """
+
+ def call(self, system_type, root_dir):
+
+ stop_command = " /sbin/service plc stop "
+ full_command = ""
+ if system_type in ['vserv', 'vserver']:
+ full_command += " vserver %(root_dir)s exec "
+ elif system_type in ['chroot']:
+ pass
+ else:
+ raise Exception, "Invalid system type %(system_type)s" % locals()
+
+ full_command += stop_command % locals()
+
+ if self.config.verbose:
+ utils.header(full_command)
+
+ (stdout, stderr) = utils.popen(full_command)
+
+ if self.config.verbose:
+ utils.header("\n".join(stdout))
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_stop()(*args)
--- /dev/null
+import os, sys
+import traceback
+from Test import Test
+from qa import utils
+
+class plc_uninstall(Test):
+ """
+ Completely removes the installed myplc
+ """
+
+ def call(self, system_type, root_dir):
+
+ remove_command = " rpm -e myplc "
+ full_command = ""
+
+ if system_type in ['vserv', 'vserver']:
+ full_command += " vserver %(root_dir)s exec "
+ elif system_type in ['chroot']:
+ pass
+ else:
+ raise Exception, "Invalid system type %(system_type)s" % locals()
+
+ if self.config.verbose:
+ utils.header("Removing myplc")
+
+ full_command = full_command % locals()
+ (stdout, stderr) = utils.popen(full_command + "/sbin/service plc safestop")
+ if self.config.verbose:
+ utils.header("\n".join(stdout))
+
+ (stdout, stderr) = utils.popen(full_command + remove_command)
+ if self.config.verbose:
+ utils.header("\n".join(stdout))
+
+ (stdout, stderr) = utils.popen(full_command + " rm -rf /plc/data")
+ if self.config.verbose:
+ utiils.header("\n".join(stdout))
+
+ return 1
+
+if __name__ == '__main__':
+ args = tuple(sys.argv[1:])
+ plc_unistall()(*args)
--- /dev/null
+import os, sys
+from qa import utils
+from Test import Test
+
+class sync_person_key(Test):
+ """
+ Make sure specified users public key on file matches whats
+ recorded at plc. Create a public/private keypair for the
+ specified user if one doesnt exist already.
+ """
+
+ def make_keys(path, name):
+ if not os.path.isdir(path):
+ os.mkdir(path)
+ key_path = path + os.sep + name
+ command = "ssh-keygen -f %(key_path)s -t rsa -N ''" % locals()
+ (stdout, stderr) = utils.popen(command)
+
+ def call(self, email):
+ api = self.config.api
+ auth = self.config.auth
+ email_parts = email.split("@")
+ keys_filename = email_parts[0]
+ keys_path = self.config.KEYS_PATH
+ private_key_path = keys_path + os.sep + keys_filename
+ public_key_path = private_key_path + ".pub"
+
+ # Validate person
+ persons = api.GetPersons(auth, [email], ['person_id', 'key_ids'])
+ if not persons:
+ raise Exception, "No such person %(email)s"
+ person = persons[0]
+
+ # make keys if they dont already exist
+ if not os.path.isfile(private_key_path) or \
+ not os.path.isfile(public_key_path):
+ # Make new keys
+ self.make_keys(keys_path, keys_filename)
+ if self.config.verbose:
+ utils.header("Made new key pair %(private_key_path)s %(public_key_path)s " %\
+ locals())
+
+ # sync public key
+ public_key_file = open(public_key_path, 'r')
+ public_key = public_key_file.readline()
+
+ keys = api.GetKeys(auth, person['key_ids'])
+ if not keys:
+ # Add current key to db
+ key_fields = {'type': 'rsa',
+ 'key': public_key}
+ api.AddPersonKey(auth, person['person_id'], key_fields)
+ if self.config.verbose:
+ utils.header("Added public key in %(public_key_path)s to db" % locals() )
+ else:
+ # keys need to be checked and possibly updated
+ key = keys[0]
+ if key['key'] != public_key:
+ api.UpdateKey(auth, key['key_id'], public_key)
+ if self.config.verbose:
+ utils.header("Updated plc with new public key in %(public_key_path)s " % locals())
+ else:
+ if self.config.verbose:
+ utils.header("Key in %(public_key_path)s matchs public key in plc" % locals())
+
+if __name__ == '__main__':
+ args = typle(sys.argv[1:])
+ sync_user_key()(*args)