# Mark Huang <mlhuang@cs.princeton.edu>
# Copyright (C) 2006 The Trustees of Princeton University
#
-# $Id: Test.py,v 1.10 2006/10/16 21:57:28 mlhuang Exp $
+# $Id: Test.py,v 1.11 2006/10/23 20:35:49 mlhuang Exp $
#
from pprint import pprint
# Add sites
site_ids = []
for i in range(3):
- name = randstr(254)
- abbreviated_name = randstr(50)
- login_base = randstr(20, letters).lower()
- latitude = int(randfloat(-90.0, 90.0) * 1000) / 1000.0
- longitude = int(randfloat(-180.0, 180.0) * 1000) / 1000.0
-
+ def random_site():
+ return {
+ 'name': randstr(254),
+ 'abbreviated_name': randstr(50),
+ 'login_base': randstr(20, letters).lower(),
+ 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
+ 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
+ }
+
# Add site
- print "AddSite(%s)" % login_base,
- site_id = AddSite(admin, name, abbreviated_name, login_base,
- {'latitude': latitude, 'longitude': longitude})
+ site_fields = random_site()
+ print "AddSite",
+ site_id = AddSite(admin, site_fields)
# Should return a unique site_id
assert site_id not in site_ids
# Check site
print "GetSites(%d)" % site_id,
site = GetSites(admin, [site_id])[0]
- for key in 'name', 'abbreviated_name', 'login_base', 'latitude', 'longitude', 'site_id':
- assert unicmp(site[key], locals()[key])
+ for key in site_fields:
+ assert unicmp(site[key], site_fields[key])
print "=> OK"
# Update site
- name = randstr(254)
- abbreviated_name = randstr(50)
- latitude = int(randfloat(-90.0, 90.0) * 1000) / 1000.0
- longitude = int(randfloat(-180.0, 180.0) * 1000) / 1000.0
- max_slices = 10
- print "UpdateSite(%s)" % login_base,
- UpdateSite(admin, site_id, {'name': name, 'abbreviated_name': abbreviated_name,
- 'latitude': latitude, 'longitude': longitude,
- 'max_slices': max_slices})
- site = GetSites(admin, [site_id])[0]
- for key in 'name', 'abbreviated_name', 'latitude', 'longitude', 'max_slices':
- assert unicmp(site[key], locals()[key])
+ site_fields = random_site()
+ # Currently cannot change login_base
+ del site_fields['login_base']
+ site_fields['max_slices'] = randint(1, 10)
+ print "UpdateSite(%d)" % site_id,
+ UpdateSite(admin, site_id, site_fields)
print "=> OK"
+ # Check site again
+ site = GetSites(admin, [site_id])[0]
+ for key in site_fields:
+ if key != 'login_base':
+ assert unicmp(site[key], site_fields[key])
+
print "GetSites",
sites = GetSites(admin, site_ids)
assert set(site_ids) == set([site['site_id'] for site in sites])
print "=>", site_ids
+# Add address types
+address_type_ids = []
+for i in range(3):
+ def random_address_type():
+ return {
+ 'name': randstr(20),
+ 'description': randstr(254),
+ }
+
+ print "AddAddressType",
+ address_type_fields = random_address_type()
+ address_type_id = AddAddressType(admin, address_type_fields)
+
+ # Should return a unique address_type_id
+ assert address_type_id not in address_type_ids
+ address_type_ids.append(address_type_id)
+ print "=>", address_type_id
+
+ # Check address type
+ print "GetAddressTypes(%d)" % address_type_id,
+ address_type = GetAddressTypes(admin, [address_type_id])[0]
+ for key in 'name', 'description':
+ assert unicmp(address_type[key], address_type_fields[key])
+ print "=> OK"
+
+ # Update address type
+ address_type_fields = random_address_type()
+ print "UpdateAddressType(%d)" % address_type_id,
+ UpdateAddressType(admin, address_type_id, address_type_fields)
+ print "=> OK"
+
+ # Check address type again
+ address_type = GetAddressTypes(admin, [address_type_id])[0]
+ for key in 'name', 'description':
+ assert unicmp(address_type[key], address_type_fields[key])
+
+print "GetAddressTypes",
+address_types = GetAddressTypes(admin, address_type_ids)
+assert set(address_type_ids) == set([address_type['address_type_id'] for address_type in address_types])
+print "=>", address_type_ids
+
+# Add site addresses
+address_ids = []
+for site_id in site_ids:
+ for i in range(3):
+ def random_address():
+ return {
+ 'line1': randstr(254),
+ 'line2': randstr(254),
+ 'line3': randstr(254),
+ 'city': randstr(254),
+ 'state': randstr(254),
+ 'postalcode': randstr(64),
+ 'country': randstr(128),
+ }
+
+ print "AddSiteAddress",
+ address_fields = random_address()
+ address_id = AddSiteAddress(admin, site_id, address_fields)
+
+ # Should return a unique address_id
+ assert address_id not in address_ids
+ address_ids.append(address_id)
+ print "=>", address_id
+
+ # Check address
+ print "GetAddresses(%d)" % address_id,
+ address = GetAddresses(admin, [address_id])[0]
+ for key in address_fields:
+ assert unicmp(address[key], address_fields[key])
+ print "=> OK"
+
+ # Update address
+ address_fields = random_address()
+ print "UpdateAddress(%d)" % address_id,
+ UpdateAddress(admin, address_id, address_fields)
+ print "=> OK"
+
+ # Check address again
+ address = GetAddresses(admin, [address_id])[0]
+ for key in address_fields:
+ assert unicmp(address[key], address_fields[key])
+
+ # Add address types
+ for address_type_id in address_type_ids:
+ print "AddAddressTypeToAddress(%d, %d)" % (address_type_id, address_id),
+ AddAddressTypeToAddress(admin, address_type_id, address_id)
+ print "=> OK"
+
+print "GetAddresses",
+addresses = GetAddresses(admin, address_ids)
+assert set(address_ids) == set([address['address_id'] for address in addresses])
+for address in addresses:
+ assert set(address_type_ids) == set(address['address_type_ids'])
+print "=>", address_ids
+
print "GetRoles",
roles = GetRoles(admin)
role_ids = [role['role_id'] for role in roles]
# Add users
person_ids = []
for auth in user, pi, tech:
- first_name = randstr(128)
- last_name = randstr(128)
- # 119 + 1 + 64 + 64 + 6 = 254
- email = (randstr(119, letters + digits) + "@" + randhostname()).lower()
- bio = randstr(254)
- # Accounts are disabled by default
- enabled = False
+ def random_person():
+ global auth
+
+ person_fields = {
+ 'first_name': randstr(128),
+ 'last_name': randstr(128),
+ # 119 + 1 + 64 + 64 + 6 = 254
+ 'email': (randstr(119, letters + digits) + "@" + randhostname()).lower(),
+ 'bio': randstr(254),
+ # Accounts are disabled by default
+ 'enabled': False,
+ 'password': randstr(254),
+ }
+
+ auth['Username'] = person_fields['email']
+ auth['AuthString'] = person_fields['password']
- auth['Username'] = email
- auth['AuthString'] = randstr(254)
+ return person_fields
# Add account
- print "AddPerson(%s)" % email,
- person_id = AddPerson(admin, first_name, last_name,
- {'email': email, 'bio': bio,
- 'password': auth['AuthString']})
+ person_fields = random_person()
+ print "AddPerson",
+ person_id = AddPerson(admin, person_fields)
# Should return a unique person_id
assert person_id not in person_ids
# Check account
print "GetPersons(%d)" % person_id,
person = GetPersons(admin, [person_id])[0]
- for key in 'first_name', 'last_name', 'email', 'bio', 'person_id', 'enabled':
- assert unicmp(person[key], locals()[key])
+ for key in person_fields:
+ if key != 'password':
+ assert unicmp(person[key], person_fields[key])
print "=> OK"
# Update account
- first_name = randstr(128)
- last_name = randstr(128)
- bio = randstr(254)
+ person_fields = random_person()
print "UpdatePerson(%d)" % person_id,
- UpdatePerson(admin, person_id, {'first_name': first_name,
- 'last_name': last_name,
- 'bio': bio})
+ UpdatePerson(admin, person_id, person_fields)
print "=> OK"
# Check account again
person = GetPersons(admin, [person_id])[0]
- for key in 'first_name', 'last_name', 'email', 'bio':
- assert unicmp(person[key], locals()[key])
+ for key in person_fields:
+ if key != 'password':
+ assert unicmp(person[key], person_fields[key])
# Check that account is really disabled
try:
# Add node groups
nodegroup_ids = []
for i in range(3):
- name = randstr(50)
- description = randstr(200)
+ def random_nodegroup():
+ return {
+ 'name': randstr(50),
+ 'description': randstr(200),
+ }
# Add node group
print "AddNodeGroup",
- nodegroup_id = AddNodeGroup(admin, name, {'description': description})
+ nodegroup_fields = random_nodegroup()
+ nodegroup_id = AddNodeGroup(admin, nodegroup_fields)
# Should return a unique nodegroup_id
assert nodegroup_id not in nodegroup_ids
# Check node group
print "GetNodeGroups(%d)" % nodegroup_id,
nodegroup = GetNodeGroups(admin, [nodegroup_id])[0]
- for key in 'name', 'description', 'nodegroup_id':
- assert unicmp(nodegroup[key], locals()[key])
+ for key in nodegroup_fields:
+ assert unicmp(nodegroup[key], nodegroup_fields[key])
print "=> OK"
- # Update node group
- name = randstr(16, letters + ' ' + digits)
- description = randstr(200)
+ # Update node group, with a readable name
+ nodegroup_fields = random_nodegroup()
+ nodegroup_fields['name'] = randstr(16, letters + ' ' + digits)
print "UpdateNodeGroup",
- UpdateNodeGroup(admin, nodegroup_id, {'name': name, 'description': description})
+ UpdateNodeGroup(admin, nodegroup_id, nodegroup_fields)
print "=> OK"
# Check node group again
nodegroup = GetNodeGroups(admin, [nodegroup_id])[0]
- for key in 'name', 'description', 'nodegroup_id':
- assert unicmp(nodegroup[key], locals()[key])
+ for key in nodegroup_fields:
+ assert unicmp(nodegroup[key], nodegroup_fields[key])
print "GetNodeGroups",
nodegroups = GetNodeGroups(admin, nodegroup_ids)
assert set(nodegroup_ids) == set([nodegroup['nodegroup_id'] for nodegroup in nodegroups])
print "=>", nodegroup_ids
+print "GetBootStates",
+boot_states = GetBootStates(admin)
+print "=>", boot_states
+
# Add nodes
node_ids = []
for site_id in site_ids:
for i in range(3):
- hostname = randhostname()
- boot_state = 'inst'
- model = randstr(255)
+ def random_node():
+ return {
+ 'hostname': randhostname(),
+ 'boot_state': random.sample(boot_states, 1)[0],
+ 'model': randstr(255),
+ 'version': randstr(64),
+ }
# Add node
- print "AddNode(%s)" % hostname,
- node_id = AddNode(admin, site_id, hostname,
- {'boot_state': boot_state, 'model': model})
+ node_fields = random_node()
+ print "AddNode",
+ node_id = AddNode(admin, site_id, node_fields)
# Should return a unique node_id
assert node_id not in node_ids
# Check node
print "GetNodes(%d)" % node_id,
node = GetNodes(admin, [node_id])[0]
- for key in 'hostname', 'boot_state', 'model', 'node_id':
- assert unicmp(node[key], locals()[key])
+ for key in node_fields:
+ assert unicmp(node[key], node_fields[key])
print "=> OK"
# Update node
- hostname = randhostname()
- model = randstr(255)
+ node_fields = random_node()
print "UpdateNode(%d)" % node_id,
- UpdateNode(admin, node_id, {'hostname': hostname, 'model': model})
+ UpdateNode(admin, node_id, node_fields)
print "=> OK"
# Check node again
node = GetNodes(admin, [node_id])[0]
- for key in 'hostname', 'boot_state', 'model', 'node_id':
- assert unicmp(node[key], locals()[key])
+ for key in node_fields:
+ assert unicmp(node[key], node_fields[key])
# Add to node groups
for nodegroup_id in nodegroup_ids:
assert set(nodegroup['node_ids']) == set(node_ids)
print "=> OK"
+print "GetNetworkMethods",
+network_methods = GetNetworkMethods(admin)
+print "=>", network_methods
+
+print "GetNetworkTypes",
+network_types = GetNetworkTypes(admin)
+print "=>", network_types
+
# Add node networks
nodenetwork_ids = []
for node_id in node_ids:
- ip = randint(0, 0xffffffff)
- netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
- network = ip & netmask
- broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
- gateway = randint(network + 1, broadcast - 1)
- dns1 = randint(0, 0xffffffff)
- bwlimit = randint(500000, 10000000)
-
- for method in 'static', 'dhcp':
- optional = {'bwlimit': bwlimit}
- if method == 'static':
- for key in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
- optional[key] = socket.inet_ntoa(struct.pack('>L', locals()[key]))
-
- # Add node network
- print "AddNodeNetwork(%s)" % method,
- nodenetwork_id = AddNodeNetwork(admin, node_id, method, 'ipv4', optional)
-
- # Should return a unique nodenetwork_id
- assert nodenetwork_id not in nodenetwork_ids
- nodenetwork_ids.append(nodenetwork_id)
- print "=>", nodenetwork_id
-
- # Check node network
- print "GetNodeNetworks(%d)" % nodenetwork_id,
- nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
- if method == 'static':
- for key in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
- address = struct.unpack('>L', socket.inet_aton(nodenetwork[key]))[0]
- assert address == locals()[key]
- print "=> OK"
-
- # Update node network
- optional = {'bwlimit': bwlimit}
- if nodenetwork['method'] == 'static':
- for key in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
- optional[key] = socket.inet_ntoa(struct.pack('>L', locals()[key]))
+ def random_nodenetwork(method, type):
+ nodenetwork_fields = {
+ 'method': method,
+ 'type': type,
+ 'bwlimit': randint(500000, 10000000),
+ }
+
+ if method != 'dhcp':
+ ip = randint(0, 0xffffffff)
+ netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
+ network = ip & netmask
+ broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
+ gateway = randint(network + 1, broadcast - 1)
+ dns1 = randint(0, 0xffffffff)
+
+ for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
+ nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
+
+ return nodenetwork_fields
+
+ for method in network_methods:
+ for type in network_types:
+ # Add node network
+ print "AddNodeNetwork",
+ nodenetwork_fields = random_nodenetwork(method, type)
+ nodenetwork_id = AddNodeNetwork(admin, node_id, nodenetwork_fields)
+
+ # Should return a unique nodenetwork_id
+ assert nodenetwork_id not in nodenetwork_ids
+ nodenetwork_ids.append(nodenetwork_id)
+ print "=>", nodenetwork_id
+
+ # Check node network
+ print "GetNodeNetworks(%d)" % nodenetwork_id,
+ nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
+ for key in nodenetwork_fields:
+ assert unicmp(nodenetwork[key], nodenetwork_fields[key])
+ print "=> OK"
- print "UpdateNodeNetwork(%d)" % nodenetwork_id,
- UpdateNodeNetwork(admin, nodenetwork['nodenetwork_id'], optional)
- print "=> OK"
+ # Update node network
+ nodenetwork_fields = random_nodenetwork(method, type)
+ print "UpdateNodeNetwork(%d)" % nodenetwork_id,
+ UpdateNodeNetwork(admin, nodenetwork_id, nodenetwork_fields)
+ print "=> OK"
- # Check node network again
- nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
- if nodenetwork['method'] == 'static':
- for key in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
- address = struct.unpack('>L', socket.inet_aton(nodenetwork[key]))[0]
- assert address == locals()[key]
+ # Check node network again
+ nodenetwork = GetNodeNetworks(admin, [nodenetwork_id])[0]
+ for key in nodenetwork_fields:
+ assert unicmp(nodenetwork[key], nodenetwork_fields[key])
print "GetNodeNetworks",
nodenetworks = GetNodeNetworks(admin, nodenetwork_ids)
# Add slice attribute types
attribute_type_ids = []
for i in range(3):
- name = randstr(100)
- description = randstr(254)
- min_role_id = random.sample(roles.values(), 1)[0]
+ def random_attribute_type():
+ return {
+ 'name': randstr(100),
+ 'description': randstr(254),
+ 'min_role_id': random.sample(roles.values(), 1)[0],
+ }
# Add slice attribute type
+ attribute_type_fields = random_attribute_type()
print "AddSliceAttributeType",
- attribute_type_id = AddSliceAttributeType(admin, name,
- {'description': description,
- 'min_role_id': min_role_id})
+ attribute_type_id = AddSliceAttributeType(admin, attribute_type_fields)
# Should return a unique attribute_type_id
assert attribute_type_id not in attribute_type_ids
# Check slice attribute type
print "GetSliceAttributeTypes(%d)" % attribute_type_id,
attribute_type = GetSliceAttributeTypes(admin, [attribute_type_id])[0]
- for key in 'name', 'min_role_id', 'description':
- assert unicmp(attribute_type[key], locals()[key])
+ for key in attribute_type_fields:
+ assert unicmp(attribute_type[key], attribute_type_fields[key])
print "=> OK"
# Update slice attribute type
- name = "attribute_" + randstr(10, letters + '_')
- description = randstr(254)
- min_role_id = random.sample(roles.values(), 1)[0]
+ attribute_type_fields = random_attribute_type()
print "UpdateSliceAttributeType(%d)" % attribute_type_id,
- UpdateSliceAttributeType(admin, attribute_type_id,
- {'name': name,
- 'description': description,
- 'min_role_id': min_role_id})
- attribute_type = GetSliceAttributeTypes(admin, [attribute_type_id])[0]
- for key in 'name', 'min_role_id', 'description':
- assert unicmp(attribute_type[key], locals()[key])
+ UpdateSliceAttributeType(admin, attribute_type_id, attribute_type_fields)
print "=> OK"
+ # Check slice attribute type again
+ attribute_type = GetSliceAttributeTypes(admin, [attribute_type_id])[0]
+ for key in attribute_type_fields:
+ assert unicmp(attribute_type[key], attribute_type_fields[key])
+
# Add slices and slice attributes
slice_ids = []
slice_attribute_ids = []
for site in sites:
- for i in range(10):
- name = site['login_base'] + "_" + randstr(11, letters).lower()
- url = "http://" + randhostname() + "/"
- description = randstr(2048)
+ for i in range(site['max_slices']):
+ def random_slice():
+ return {
+ 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
+ 'url': "http://" + randhostname() + "/",
+ 'description': randstr(2048),
+ }
# Add slice
- print "AddSlice(%s)" % name,
- slice_id = AddSlice(admin, name, {'url': url, 'description': description})
+ slice_fields = random_slice()
+ print "AddSlice",
+ slice_id = AddSlice(admin, slice_fields)
# Should return a unique slice_id
assert slice_id not in slice_ids
# Check slice
print "GetSlices(%d)" % slice_id,
slice = GetSlices(admin, [slice_id])[0]
- for key in 'name', 'url', 'description', 'slice_id':
- assert unicmp(slice[key], locals()[key])
+ for key in slice_fields:
+ assert unicmp(slice[key], slice_fields[key])
print "=> OK"
# Update slice
- url = "http://" + randhostname() + "/"
- description = randstr(2048)
+ slice_fields = random_slice()
+ # Cannot change slice name
+ del slice_fields['name']
print "UpdateSlice(%d)" % slice_id,
- UpdateSlice(admin, slice_id, {'url': url, 'description': description})
+ UpdateSlice(admin, slice_id, slice_fields)
slice = GetSlices(admin, [slice_id])[0]
- for key in 'name', 'url', 'description', 'slice_id':
- assert unicmp(slice[key], locals()[key])
+ for key in slice_fields:
+ assert unicmp(slice[key], slice_fields[key])
print "=> OK"
# Add slice to all nodes
print "AddSliceToNodes(%d, %s)" % (slice_id, str(node_ids)),
- AddSliceToNodes(admin, name, node_ids)
+ AddSliceToNodes(admin, slice_id, node_ids)
slice = GetSlices(admin, [slice_id])[0]
assert set(node_ids) == set(slice['node_ids'])
print "=> OK"
print "=> OK"
# Update slice attribute
- url = "http://" + randhostname() + "/"
- description = randstr(2048)
+ value = randstr(16, letters + '_' + digits)
print "UpdateSliceAttribute(%d)" % slice_attribute_id,
UpdateSliceAttribute(admin, slice_attribute_id, value)
slice_attribute = GetSliceAttributes(admin, [slice_attribute_id])[0]
assert not GetNodeGroups(admin, nodegroup_ids)
print "=> []"
+# Delete site addresses
+for address_id in address_ids:
+ # Remove address types
+ for address_type_id in address_type_ids:
+ print "DeleteAddressTypeFromAddress(%d, %d)" % (address_type_id, address_id),
+ DeleteAddressTypeFromAddress(admin, address_type_id, address_id)
+ print "=> OK"
+ address = GetAddresses(admin, [address_id])[0]
+ assert not address['address_type_ids']
+
+ print "DeleteAddress(%d)" % address_id,
+ DeleteAddress(admin, address_id)
+ assert not GetAddresses(admin, [address_id])
+ print "=> OK"
+
+print "GetAddresss",
+assert not GetAddresses(admin, address_ids)
+print "=> []"
+
+# Delete address types
+for address_type_id in address_type_ids:
+ print "DeleteAddressType(%d)" % address_type_id,
+ DeleteAddressType(admin, address_type_id)
+ assert not GetAddressTypes(admin, [address_type_id])
+ print "=> OK"
+
+print "GetAddressTypes",
+assert not GetAddressTypes(admin, address_type_ids)
+print "=> []"
+
# Delete sites
for site_id in site_ids:
print "DeleteSite(%d)" % site_id,