# Mark Huang <mlhuang@cs.princeton.edu>
# Copyright (C) 2006 The Trustees of Princeton University
#
-# $Id: Test.py,v 1.11 2006/10/23 20:35:49 mlhuang Exp $
+# $Id: Test.py,v 1.12 2006/10/25 14:32:31 mlhuang Exp $
#
from pprint import pprint
import re
import socket
import struct
+import base64
+import os
from random import Random
random = Random()
# avoiding
# [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
-low_xml_chars = map(unichr, [0x9, 0xA, 0xD])
-low_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
+ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
+ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
+low_xml_chars = list(ascii_xml_chars)
low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
-valid_xml_chars = list(low_xml_chars)
+valid_xml_chars = list(low_xml_chars)
valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
'c' + randstr(5, letters)
return hostname
+def randpath(length):
+ parts = []
+ for i in range(randint(1, 10)):
+ parts.append(randstr(randint(1, 30), ascii_xml_chars))
+ return os.sep.join(parts)[0:length]
+
+def randemail():
+ return (randstr(100, letters + digits) + "@" + randhostname()).lower()
+
+def randkey(bits = 2048):
+ key_types = ["ssh-dss", "ssh-rsa"]
+ key_type = random.sample(key_types, 1)[0]
+ return ' '.join([key_type,
+ base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
+ randemail()])
+
def unicmp(a, b, encoding = "utf-8"):
"""
When connected directly to the DB, values are returned as raw
# Check site
print "GetSites(%d)" % site_id,
site = GetSites(admin, [site_id])[0]
- for key in site_fields:
- assert unicmp(site[key], site_fields[key])
+ for field in site_fields:
+ assert unicmp(site[field], site_fields[field])
print "=> OK"
# Update site
# Check site again
site = GetSites(admin, [site_id])[0]
- for key in site_fields:
- if key != 'login_base':
- assert unicmp(site[key], site_fields[key])
+ for field in site_fields:
+ if field != 'login_base':
+ assert unicmp(site[field], site_fields[field])
print "GetSites",
sites = GetSites(admin, site_ids)
# Check address type
print "GetAddressTypes(%d)" % address_type_id,
address_type = GetAddressTypes(admin, [address_type_id])[0]
- for key in 'name', 'description':
- assert unicmp(address_type[key], address_type_fields[key])
+ for field in 'name', 'description':
+ assert unicmp(address_type[field], address_type_fields[field])
print "=> OK"
# Update address type
# Check address type again
address_type = GetAddressTypes(admin, [address_type_id])[0]
- for key in 'name', 'description':
- assert unicmp(address_type[key], address_type_fields[key])
+ for field in 'name', 'description':
+ assert unicmp(address_type[field], address_type_fields[field])
print "GetAddressTypes",
address_types = GetAddressTypes(admin, address_type_ids)
# Check address
print "GetAddresses(%d)" % address_id,
address = GetAddresses(admin, [address_id])[0]
- for key in address_fields:
- assert unicmp(address[key], address_fields[key])
+ for field in address_fields:
+ assert unicmp(address[field], address_fields[field])
print "=> OK"
# Update address
# Check address again
address = GetAddresses(admin, [address_id])[0]
- for key in address_fields:
- assert unicmp(address[key], address_fields[key])
+ for field in address_fields:
+ assert unicmp(address[field], address_fields[field])
# Add address types
for address_type_id in address_type_ids:
roles = dict(zip(roles, role_ids))
print "=>", role_ids
+print "GetKeyTypes",
+key_types = GetKeyTypes(admin)
+print "=>", key_types
+
# Add users
person_ids = []
+key_ids = []
for auth in user, pi, tech:
def random_person():
global auth
person_fields = {
'first_name': randstr(128),
'last_name': randstr(128),
- # 119 + 1 + 64 + 64 + 6 = 254
- 'email': (randstr(119, letters + digits) + "@" + randhostname()).lower(),
+ 'email': randemail(),
'bio': randstr(254),
# Accounts are disabled by default
'enabled': False,
# Check account
print "GetPersons(%d)" % person_id,
person = GetPersons(admin, [person_id])[0]
- for key in person_fields:
- if key != 'password':
- assert unicmp(person[key], person_fields[key])
+ for field in person_fields:
+ if field != 'password':
+ assert unicmp(person[field], person_fields[field])
print "=> OK"
# Update account
# Check account again
person = GetPersons(admin, [person_id])[0]
- for key in person_fields:
- if key != 'password':
- assert unicmp(person[key], person_fields[key])
+ for field in person_fields:
+ if field != 'password':
+ assert unicmp(person[field], person_fields[field])
# Check that account is really disabled
try:
assert person['site_ids'][0] == person_site_ids[1]
print "=> OK"
+ def random_key():
+ return {
+ 'key_type': random.sample(key_types, 1)[0],
+ 'key': randkey()
+ }
+
+ # Add keys
+ for i in range(3):
+ # Add slice attribute
+ key_fields = random_key()
+ print "AddPersonKey",
+ key_id = AddPersonKey(admin, person_id, key_fields)
+
+ # Should return a unique key_id
+ assert key_id not in key_ids
+ key_ids.append(key_id)
+ print "=>", key_id
+
+ # Check key
+ print "GetKeys(%d)" % key_id,
+ key = GetKeys(admin, [key_id])[0]
+ for field in key_fields:
+ assert unicmp(key[field], key_fields[field])
+ print "=> OK"
+
+ # Update key
+ key_fields = random_key()
+ print "UpdateKey(%d)" % key_id,
+ UpdateKey(admin, key_id, key_fields)
+ key = GetKeys(admin, [key_id])[0]
+ for field in key_fields:
+ assert unicmp(key[field], key_fields[field])
+ print "=> OK"
+
+ # Add and immediately blacklist a key
+ key_fields = random_key()
+ print "AddPersonKey",
+ key_id = AddPersonKey(admin, person_id, key_fields)
+ print "=>", key_id
+
+ print "BlacklistKey(%d)" % key_id,
+ BlacklistKey(admin, key_id)
+
+ # Is effectively deleted
+ assert not GetKeys(admin, [key_id])
+ person = GetPersons(admin, [person_id])[0]
+ assert key_id not in person['key_ids']
+
+ # Cannot be added again
+ try:
+ key_id = AddPersonKey(admin, person_id, key_fields)
+ assert False
+ except Exception, e:
+ pass
+
+ print "=> OK"
+
print "GetPersons",
persons = GetPersons(admin, person_ids)
assert set(person_ids) == set([person['person_id'] for person in persons])
# Check node group
print "GetNodeGroups(%d)" % nodegroup_id,
nodegroup = GetNodeGroups(admin, [nodegroup_id])[0]
- for key in nodegroup_fields:
- assert unicmp(nodegroup[key], nodegroup_fields[key])
+ for field in nodegroup_fields:
+ assert unicmp(nodegroup[field], nodegroup_fields[field])
print "=> OK"
# Update node group, with a readable name
# Check node group again
nodegroup = GetNodeGroups(admin, [nodegroup_id])[0]
- for key in nodegroup_fields:
- assert unicmp(nodegroup[key], nodegroup_fields[key])
+ for field in nodegroup_fields:
+ assert unicmp(nodegroup[field], nodegroup_fields[field])
print "GetNodeGroups",
nodegroups = GetNodeGroups(admin, nodegroup_ids)
# Check node
print "GetNodes(%d)" % node_id,
node = GetNodes(admin, [node_id])[0]
- for key in node_fields:
- assert unicmp(node[key], node_fields[key])
+ for field in node_fields:
+ assert unicmp(node[field], node_fields[field])
print "=> OK"
# Update node
# Check node again
node = GetNodes(admin, [node_id])[0]
- for key in node_fields:
- assert unicmp(node[key], node_fields[key])
+ for field in node_fields:
+ assert unicmp(node[field], node_fields[field])
# Add to node groups
for nodegroup_id in nodegroup_ids:
# Check node network
print "GetNodeNetworks(%d)" % nodenetwork_id,
nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
- for key in nodenetwork_fields:
- assert unicmp(nodenetwork[key], nodenetwork_fields[key])
+ for field in nodenetwork_fields:
+ assert unicmp(nodenetwork[field], nodenetwork_fields[field])
print "=> OK"
# Update node network
# Check node network again
nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
- for key in nodenetwork_fields:
- assert unicmp(nodenetwork[key], nodenetwork_fields[key])
+ for field in nodenetwork_fields:
+ assert unicmp(nodenetwork[field], nodenetwork_fields[field])
print "GetNodeNetworks",
nodenetworks = GetNodeNetworks(admin, nodenetwork_ids)
assert set(nodenetwork_ids) == set([nodenetwork['nodenetwork_id'] for nodenetwork in nodenetworks])
print "=>", nodenetwork_ids
+# Add PCUs
+pcu_ids = []
+for site_id in site_ids:
+ def random_pcu():
+ return {
+ 'hostname': randhostname(),
+ 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
+ 'protocol': randstr(16),
+ 'username': randstr(254),
+ 'password': randstr(254),
+ 'notes': randstr(254),
+ 'model': randstr(32),
+ }
+
+ # Add PCU
+ pcu_fields = random_pcu()
+ print "AddPCU",
+ pcu_id = AddPCU(admin, site_id, pcu_fields)
+
+ # Should return a unique pcu_id
+ assert pcu_id not in pcu_ids
+ pcu_ids.append(pcu_id)
+ print "=>", pcu_id
+
+ # Check PCU
+ print "GetPCUs(%d)" % pcu_id,
+ pcu = GetPCUs(admin, [pcu_id])[0]
+ for field in pcu_fields:
+ assert unicmp(pcu[field], pcu_fields[field])
+ print "=> OK"
+
+ # Update PCU
+ pcu_fields = random_pcu()
+ print "UpdatePCU(%d)" % pcu_id,
+ UpdatePCU(admin, pcu_id, pcu_fields)
+ print "=> OK"
+
+ # Check PCU again
+ pcu = GetPCUs(admin, [pcu_id])[0]
+ for field in pcu_fields:
+ assert unicmp(pcu[field], pcu_fields[field])
+
+ # Add each node at this site to a different port on this PCU
+ site = GetSites(admin, [site_id])[0]
+ port = randint(1, 10)
+ for node_id in site['node_ids']:
+ print "AddNodeToPCU(%d, %d, %d)" % (node_id, pcu_id, port),
+ AddNodeToPCU(admin, node_id, pcu_id, port)
+ print "=> OK"
+ port += 1
+
+print "GetPCUs",
+pcus = GetPCUs(admin, pcu_ids)
+assert set(pcu_ids) == set([pcu['pcu_id'] for pcu in pcus])
+print "=>", pcu_ids
+
+# Add configuration files
+conf_file_ids = []
+for nodegroup_id in nodegroup_ids:
+ def random_conf_file():
+ return {
+ 'enabled': bool(randint()),
+ 'source': randpath(255),
+ 'dest': randpath(255),
+ 'file_permissions': "%#o" % randint(0, 512),
+ 'file_owner': randstr(32, letters + '_' + digits),
+ 'file_group': randstr(32, letters + '_' + digits),
+ 'preinstall_cmd': randpath(100),
+ 'postinstall_cmd': randpath(100),
+ 'error_cmd': randpath(100),
+ 'ignore_cmd_errors': bool(randint()),
+ 'always_update': bool(randint()),
+ }
+
+ # Add configuration file
+ conf_file_fields = random_conf_file()
+ print "AddConfFile",
+ conf_file_id = AddConfFile(admin, conf_file_fields)
+
+ # Should return a unique conf_file_id
+ assert conf_file_id not in conf_file_ids
+ conf_file_ids.append(conf_file_id)
+ print "=>", conf_file_id
+
+ # Check configuration file
+ print "GetConfFiles(%d)" % conf_file_id,
+ conf_file = GetConfFiles(admin, [conf_file_id])[0]
+ for field in conf_file_fields:
+ assert unicmp(conf_file[field], conf_file_fields[field])
+ print "=> OK"
+
+ # Update configuration file
+ conf_file_fields = random_conf_file()
+ print "UpdateConfFile(%d)" % conf_file_id,
+ UpdateConfFile(admin, conf_file_id, conf_file_fields)
+ print "=> OK"
+
+ # Check configuration file
+ conf_file = GetConfFiles(admin, [conf_file_id])[0]
+ for field in conf_file_fields:
+ assert unicmp(conf_file[field], conf_file_fields[field])
+
+ # Add to all node groups
+ for nodegroup_id in nodegroup_ids:
+ print "AddConfFileToNodeGroup(%d, %d)" % (conf_file_id, nodegroup_id),
+ AddConfFileToNodeGroup(admin, conf_file_id, nodegroup_id)
+ print "=> OK"
+
+ # Add to all nodes
+ for node_id in node_ids:
+ print "AddConfFileToNode(%d, %d)" % (conf_file_id, node_id),
+ AddConfFileToNode(admin, conf_file_id, node_id)
+ print "=> OK"
+
+print "GetConfFiles",
+conf_files = GetConfFiles(admin, conf_file_ids)
+assert set(conf_file_ids) == set([conf_file['conf_file_id'] for conf_file in conf_files])
+for conf_file in conf_files:
+ assert set(nodegroup_ids) == set(conf_file['nodegroup_ids'])
+ assert set(node_ids) == set(conf_file['node_ids'])
+print "=>", conf_file_ids
+
# Add slice attribute types
attribute_type_ids = []
for i in range(3):
# Check slice attribute type
print "GetSliceAttributeTypes(%d)" % attribute_type_id,
attribute_type = GetSliceAttributeTypes(admin, [attribute_type_id])[0]
- for key in attribute_type_fields:
- assert unicmp(attribute_type[key], attribute_type_fields[key])
+ for field in attribute_type_fields:
+ assert unicmp(attribute_type[field], attribute_type_fields[field])
print "=> OK"
# Update slice attribute type
# Check slice attribute type again
attribute_type = GetSliceAttributeTypes(admin, [attribute_type_id])[0]
- for key in attribute_type_fields:
- assert unicmp(attribute_type[key], attribute_type_fields[key])
+ for field in attribute_type_fields:
+ assert unicmp(attribute_type[field], attribute_type_fields[field])
# Add slices and slice attributes
slice_ids = []
# Check slice
print "GetSlices(%d)" % slice_id,
slice = GetSlices(admin, [slice_id])[0]
- for key in slice_fields:
- assert unicmp(slice[key], slice_fields[key])
+ for field in slice_fields:
+ assert unicmp(slice[field], slice_fields[field])
print "=> OK"
# Update slice
print "UpdateSlice(%d)" % slice_id,
UpdateSlice(admin, slice_id, slice_fields)
slice = GetSlices(admin, [slice_id])[0]
- for key in slice_fields:
- assert unicmp(slice[key], slice_fields[key])
+ for field in slice_fields:
+ assert unicmp(slice[field], slice_fields[field])
print "=> OK"
# Add slice to all nodes
# Check slice attribute
print "GetSliceAttributes(%d)" % slice_attribute_id,
slice_attribute = GetSliceAttributes(admin, [slice_attribute_id])[0]
- for key in 'attribute_type_id', 'slice_id', 'node_id', 'slice_attribute_id', 'value':
- assert unicmp(slice_attribute[key], locals()[key])
+ for field in 'attribute_type_id', 'slice_id', 'node_id', 'slice_attribute_id', 'value':
+ assert unicmp(slice_attribute[field], locals()[field])
print "=> OK"
# Update slice attribute
print "UpdateSliceAttribute(%d)" % slice_attribute_id,
UpdateSliceAttribute(admin, slice_attribute_id, value)
slice_attribute = GetSliceAttributes(admin, [slice_attribute_id])[0]
- for key in 'attribute_type_id', 'slice_id', 'node_id', 'slice_attribute_id', 'value':
- assert unicmp(slice_attribute[key], locals()[key])
+ for field in 'attribute_type_id', 'slice_id', 'node_id', 'slice_attribute_id', 'value':
+ assert unicmp(slice_attribute[field], locals()[field])
print "=> OK"
# Delete slices
assert not GetSliceAttributeTypes(admin, attribute_type_ids)
print "=> []"
+# Delete configuration files
+for conf_file in conf_files:
+ conf_file_id = conf_file['conf_file_id']
+
+ for node_id in conf_file['node_ids']:
+ print "DeleteConfFileFromNode(%d, %d)" % (conf_file_id, node_id),
+ DeleteConfFileFromNode(admin, conf_file_id, node_id)
+ print "=> OK"
+
+ for nodegroup_id in conf_file['nodegroup_ids']:
+ print "DeleteConfFileFromNodeGroup(%d, %d)" % (conf_file_id, nodegroup_id),
+ DeleteConfFileFromNodeGroup(admin, conf_file_id, nodegroup_id)
+ print "=> OK"
+
+ print "DeleteConfFile(%d)" % conf_file_id,
+ DeleteConfFile(admin, conf_file_id)
+ print "=> OK"
+
+print "GetConfFiles",
+assert not GetConfFiles(admin, conf_file_ids)
+print "=> []"
+
+# Delete PCUs
+for pcu in pcus:
+ pcu_id = pcu['pcu_id']
+
+ for node_id in pcu['node_ids']:
+ print "DeleteNodeFromPCU(%d, %d)" % (node_id, pcu_id),
+ DeleteNodeFromPCU(admin, node_id, pcu_id)
+ print "=> OK"
+
+ print "DeletePCU(%d)" % pcu_id,
+ DeletePCU(admin, pcu_id)
+ print "=> OK"
+
+print "GetPCUs",
+assert not GetPCUs(admin, pcu_ids)
+print "=> []"
+
# Delete node networks
for nodenetwork_id in nodenetwork_ids:
print "DeleteNodeNetwork(%d)" % nodenetwork_id,
# Delete users
for person_id in person_ids:
+ # Delete keys
+ person = GetPersons(admin, [person_id])[0]
+ for key_id in person['key_ids']:
+ print "DeleteKey(%d)" % key_id,
+ DeleteKey(admin, key_id)
+ print "=> OK"
+ person = GetPersons(admin, [person_id])[0]
+ assert not person['key_ids']
+
# Remove from each site
for site_id in site_ids:
print "DeletePersonFromSite(%d, %d)" % (person_id, site_id),