from pprint import pprint
from string import letters, digits, punctuation
+import traceback
from traceback import print_exc
import base64
import os, sys
random = Random()
config = Config()
+api = config.api
+auth = api.auth
-try: boot_states = GetBootStates()
+try: boot_states = api.GetBootStates(auth)
except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
-try: roles = [role['name'] for role in GetRoles()]
+try: roles = [role['name'] for role in api.GetRoles(auth)]
except: roles = [u'admin', u'pi', u'user', u'tech']
-try: methods = GetNetworkMethods()
+try: methods = api.GetNetworkMethods(auth)
except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
-try: key_types = GetKeyTypes()
+try: key_types = api.GetKeyTypes(auth)
except: key_types = [u'ssh']
-try:types = GetNetworkTypes()
+try:types = api.GetNetworkTypes(auth)
except: types = [u'ipv4']
+try:
+ sites = api.GetSites(auth, None, ['login_base'])
+ login_bases = [site['login_base'] for site in sites]
+except:
+ login_bases = ['pl']
+
+
def randfloat(min = 0.0, max = 1.0):
return float(min) + (random.random() * (float(max) - float(min)))
'login_base': randstr(20, letters).lower(),
'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
+ 'max_slices': 10
}
def random_address_type():
def random_slice():
return {
- 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
+ 'name': random.sample(login_bases, 1)[0] + "_" + randstr(11, letters).lower(),
'url': "http://" + randhostname() + "/",
'description': randstr(2048),
}
return True
class api_unit_test(Test):
+
+ error_log = Logfile('api-unittest-error.log')
def call(self,
sites = 2,
slices = 4,
initscripts = 4,
):
- self.all_methods = set(system.listMethods())
+ self.all_methods = set(api.system.listMethods())
self.methods_tested = set()
self.methods_failed = set()
try:
self.cleanup()
finally:
-
- logfile = Logfile("api-unittest.log")
+ utils.header("writing api_unitest.log")
+ logfile = Logfile("api-unittest-summary.log")
methods_ok = list(self.methods_tested.difference(self.methods_failed))
methods_failed = list(self.methods_failed)
methods_untested = list(self.all_methods.difference(self.methods_tested))
return method(*args, **kwds)
except:
self.methods_failed.update([method_name])
+ print >> self.error_log, "%s: %s\n" % (method_name, traceback.format_exc())
return None
return wrapper
def cleanup(self):
+ if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
+ if hasattr(self, 'key_ids'): self.DeleteKeys()
+ if hasattr(self, 'person_ids'): self.DeletePersons()
if hasattr(self, 'initscript_ids'): self.DeleteInitScripts()
if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
if hasattr(self, 'slice_ids'): self.DeleteSlices()
if hasattr(self, 'slice_instantiation_ids'): self.DeleteSliceInstantiations()
if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes()
if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
- if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
- if hasattr(self, 'key_ids'): self.DeleteKeys()
- if hasattr(self, 'person_ids'): self.DeletePersons()
if hasattr(self, 'role_ids'): self.DeleteRoles()
if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes()
if hasattr(self, 'pcu_ids'): self.DeletePCUs()
if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes()
if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings()
if hasattr(self, 'address_ids'): self.DeleteAddresses()
- if hasattr(self, 'attress_type_ids'): self.DeleteAddressTypes()
+ if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes()
if hasattr(self, 'node_ids'): self.DeleteNodes()
if hasattr(self, 'site_ids'): self.DeleteSites()
for i in range(n):
# Add Site
site_fields = random_site()
- AddSite = self.debug(shell.AddSite)
- site_id = AddSite(site_fields)
+ AddSite = self.debug(api.AddSite)
+ site_id = AddSite(auth, site_fields)
if site_id is None: continue
# Should return a unique id
self.isunique(site_id, site_ids, 'AddSite - isunique')
site_ids.append(site_id)
- GetSites = self.debug(shell.GetSites)
- sites = GetSites([site_id])
+ GetSites = self.debug(api.GetSites)
+ sites = GetSites(auth, [site_id])
if sites is None: continue
site = sites[0]
self.isequal(site, site_fields, 'AddSite - isequal')
# Update site
site_fields = random_site()
- UpdateSite = self.debug(shell.UpdateSite)
- result = UpdateSite(site_id, site_fields)
+ UpdateSite = self.debug(api.UpdateSite)
+ result = UpdateSite(auth, site_id, site_fields)
# Check again
- sites = GetSites([site_id])
+ sites = GetSites(auth, [site_id])
if sites is None: continue
site = sites[0]
self.isequal(site, site_fields, 'UpdateSite - isequal')
def DeleteSites(self):
# Delete all sites
- DeleteSite = self.debug(shell.DeleteSite)
+ DeleteSite = self.debug(api.DeleteSite)
for site_id in self.site_ids:
result = DeleteSite(site_id)
# Check if sites are deleted
- GetSites = self.debug(shell.GetSites)
- sites = GetSites(self.site_ids)
+ GetSites = self.debug(api.GetSites)
+ sites = GetSites(auth, self.site_ids)
self.islistequal(sites, [], 'DeleteSite - check')
if self.config.verbose:
# Add Node
node_fields = random_node()
site_id = random.sample(self.site_ids, 1)[0]
- AddNode = self.debug(shell.AddNode)
- node_id = AddNode(site_id, node_fields)
+ AddNode = self.debug(api.AddNode)
+ node_id = AddNode(auth, site_id, node_fields)
if node_id is None: continue
# Should return a unique id
node_ids.append(node_id)
# Check nodes
- GetNodes = self.debug(shell.GetNodes)
- nodes = GetNodes([node_id])
+ GetNodes = self.debug(api.GetNodes)
+ nodes = GetNodes(auth, [node_id])
if nodes is None: continue
node = nodes[0]
self.isequal(node, node_fields, 'AddNode - isequal')
# Update node
node_fields = random_node()
- UpdateNode = self.debug(shell.UpdateNode)
- result = UpdateNode(node_id, node_fields)
+ UpdateNode = self.debug(api.UpdateNode)
+ result = UpdateNode(auth, node_id, node_fields)
# Check again
- nodes = GetNodes([node_id])
+ nodes = GetNodes(auth, [node_id])
if nodes is None: continue
node = nodes[0]
self.isequal(node, node_fields, 'UpdateNode - isequal')
- nodes = GetNodes(node_ids)
+ nodes = GetNodes(auth, node_ids)
if nodes is not None:
self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
return node_ids
def DeleteNodes(self):
- DeleteNode = self.debug(shell.DeleteNode)
+ DeleteNode = self.debug(api.DeleteNode)
for node_id in self.node_ids:
- result = DeleteNode(node_id)
+ result = DeleteNode(auth, node_id)
# Check if nodes are deleted
- GetNodes = self.debug(shell.GetNodes)
- nodes = GetNodes(self.node_ids)
+ GetNodes = self.debug(api.GetNodes)
+ nodes = GetNodes(auth, self.node_ids)
self.islistequal(nodes, [], 'DeleteNode Check')
if self.config.verbose:
address_type_ids = []
for i in range(n):
address_type_fields = random_address_type()
- AddAddressType = self.debug(shell.AddAddressType)
- address_type_id = AddAddressType(address_type_fields)
+ AddAddressType = self.debug(api.AddAddressType)
+ address_type_id = AddAddressType(auth, address_type_fields)
if address_type_id is None: continue
# Should return a unique address_type_id
address_type_ids.append(address_type_id)
# Check address type
- GetAddressTypes = self.debug(shell.GetAddressTypes)
- address_types = GetAddressTypes([address_type_id])
+ GetAddressTypes = self.debug(api.GetAddressTypes)
+ address_types = GetAddressTypes(auth, [address_type_id])
if address_types is None: continue
address_type = address_types[0]
self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
# Update address type
address_type_fields = random_address_type()
- UpdateAddressType = self.debug(shell.UpdateAddressType)
- result = UpdateAddressType(address_type_id, address_type_fields)
+ UpdateAddressType = self.debug(api.UpdateAddressType)
+ result = UpdateAddressType(auth, address_type_id, address_type_fields)
if result is None: continue
# Check address type again
- address_types = GetAddressTypes([address_type_id])
+ address_types = GetAddressTypes(auth, [address_type_id])
if address_types is None: continue
address_type = address_types[0]
self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
# Check get all address types
- address_types = GetAddressTypes(address_type_ids)
+ address_types = GetAddressTypes(auth, address_type_ids)
if address_types is not None:
self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
def DeleteAddressTypes(self):
- DeleteAddressType = self.debug(shell.DeleteAddressType)
+ DeleteAddressType = self.debug(api.DeleteAddressType)
for address_type_id in self.address_type_ids:
- DeleteAddressType(ddress_type_id)
+ DeleteAddressType(auth, address_type_id)
- GetAddressTypes = self.debug(shell.GetAddressTypes)
- address_types = GetAddressTypes(self.address_type_ids)
+ GetAddressTypes = self.debug(api.GetAddressTypes)
+ address_types = GetAddressTypes(auth, self.address_type_ids)
self.islistequal(address_types, [], 'DeleteAddressType - check')
if self.config.verbose:
for i in range(n):
address_fields = random_address()
site_id = random.sample(self.site_ids, 1)[0]
- AddSiteAddress = self.debug(shell.AddSiteAddress)
- address_id = AddSiteAddress(site_id, address_fields)
+ AddSiteAddress = self.debug(api.AddSiteAddress)
+ address_id = AddSiteAddress(auth, site_id, address_fields)
if address_id is None: continue
# Should return a unique address_id
address_ids.append(address_id)
# Check address
- GetAddresses = self.debug(shell.GetAddresses)
- addresses = GetAddresses([address_id])
+ GetAddresses = self.debug(api.GetAddresses)
+ addresses = GetAddresses(auth, [address_id])
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'AddSiteAddress - isequal')
# Update address
address_fields = random_address()
- UpdateAddress = self.debug(shell.UpdateAddress)
- result = UpdateAddress(address_id, address_fields)
+ UpdateAddress = self.debug(api.UpdateAddress)
+ result = UpdateAddress(auth, address_id, address_fields)
# Check again
- addresses = GetAddresses([address_id])
+ addresses = GetAddresses(auth, [address_id])
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'UpdateAddress - isequal')
- addresses = GetAddresses(address_ids)
+ addresses = GetAddresses(auth, address_ids)
if addresses is not None:
self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
def DeleteAddresses(self):
- DeleteAddress = self.debug(shell.DeleteAddress)
+ DeleteAddress = self.debug(api.DeleteAddress)
# Delete site addresses
for address_id in self.address_ids:
- result = DeleteAddress(address_id)
+ result = DeleteAddress(auth, address_id)
# Check
- GetAddresses = self.debug(shell.GetAddresses)
- addresses = GetAddresses(self.address_ids)
+ GetAddresses = self.debug(api.GetAddresses)
+ addresses = GetAddresses(auth, self.address_ids)
self.islistequal(addresses, [], 'DeleteAddress - check')
if self.config.verbose:
utils.header("Deleted addresses: %s" % self.address_ids)
for i in range(n):
# Add Site
slice_fields = random_slice()
- AddSlice = self.debug(shell.Slice)
- slice_id = AddSlice(slice_fields)
+ AddSlice = self.debug(api.AddSlice)
+ slice_id = AddSlice(auth, slice_fields)
if slice_id is None: continue
# Should return a unique id
self.isunique(slice_id, slice_ids, 'AddSlicel - isunique')
slice_ids.append(slice_id)
- GetSlices = self.debug(shell.GetSlices)
- slices = GetSlices([slice_id])
+ GetSlices = self.debug(api.GetSlices)
+ slices = GetSlices(auth, [slice_id])
if slices is None: continue
slice = slices[0]
self.isequal(slice, slice_fields, 'AddSlice - isequal')
# Update slice
slice_fields = random_slice()
- UpdateSite = self.debug(shell.UpdateSlice)
- result = UpdateSlice(slice_id, slice_fields)
+ UpdateSlice = self.debug(api.UpdateSlice)
+ result = UpdateSlice(auth, slice_id, slice_fields)
# Check again
- slices = GetSites([slice_id])
+ slices = GetSlices(auth, [slice_id])
if slices is None: continue
slice = slices[0]
self.isequal(slice, slice_fields, 'UpdateSlice - isequal')
# XX Add node
- slices = GetSites(slice_ids)
+ slices = GetSlices(auth, slice_ids)
if slices is not None:
self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal')
def DeleteSlices(self):
# XX manually delete attributes for first slice
- slices = GetSlices(self.slice_ids, ['slice_attribute_ids', 'node_ids'])
-
+ GetSlices = self.debug(api.GetSlices)
+ slices = GetSlices(auth, self.slice_ids, ['slice_attribute_ids', 'node_ids'])
- DeleteSlice = self.debug(shell.DeleteSlice)
# Have DeleteSlice automatically delete attriubtes for the rest
+ DeleteSlice = self.debug(api.DeleteSlice)
for slice_id in self.slice_ids:
# Delete account
- DeleteSlice(slice_id)
+ DeleteSlice(auth, slice_id)
- # Check if persons are deleted
- GetSlices = self.debug(shell.GetSlices)
- slices = GetSlices(self.slice_ids)
+ # Check if slices are deleted
+ GetSlices = self.debug(api.GetSlices)
+ slices = GetSlices(auth, self.slice_ids)
self.islistequal(slices, [], 'DeleteSlice - check')
- if self.verbose:
+ if self.config.verbose:
utils.header("Deleted slices: %s" % self.slice_ids)
self.slice_ids = []
-
def Persons(self, n = 3):
person_ids = []
# Add account
person_fields = random_person()
- AddPerson = self.debug(shell.AddPerson)
- person_id = AddPerson(person_fields)
+ AddPerson = self.debug(api.AddPerson)
+ person_id = AddPerson(auth, person_fields)
if person_id is None: continue
# Should return a unique person_id
self.isunique(person_id, person_ids, 'AddPerson - isunique')
person_ids.append(person_id)
- GetPersons = self.debug(shell.GetPersons)
- persons = GetPersons([person_id])
+ GetPersons = self.debug(api.GetPersons)
+ persons = GetPersons(auth, [person_id])
if persons is None: continue
person = persons[0]
self.isequal(person, person_fields, 'AddPerson - isequal')
# Update account
person_fields = random_person()
person_fields['enabled'] = True
- UpdatePerson = self.debug(shell.UpdatePerson)
- result = UpdatePerson(person_id, person_fields)
+ UpdatePerson = self.debug(api.UpdatePerson)
+ result = UpdatePerson(auth, person_id, person_fields)
# Add random role
- AddRoleToPerson = self.debug(shell.AddRoleToPerson)
+ AddRoleToPerson = self.debug(api.AddRoleToPerson)
role = random.sample(roles, 1)[0]
- result = AddRoleToPerson(role, person_id)
+ result = AddRoleToPerson(auth, role, person_id)
# Add key to person
key = random_key()
- key_id = AddPersonKey = self.debug(shell.AddPersonKey)
- AddPersonKey(person_id, key)
+ key_id = AddPersonKey = self.debug(api.AddPersonKey)
+ AddPersonKey(auth, person_id, key)
# Add person to site
site_id = random.sample(self.site_ids, 1)[0]
- AddPersonToSite = self.debug(shell.AddPersonToSite)
- AddPersonToSite(person_id, site_id)
+ AddPersonToSite = self.debug(api.AddPersonToSite)
+ AddPersonToSite(auth, person_id, site_id)
# Add person to slice
slice_id = random.sample(self.slice_ids, 1)[0]
- AddPersonToSlice = self.debug(self.AddPersonToSlice)
- AddPersonToSlice(person_id, slice_id)
+ AddPersonToSlice = self.debug(api.AddPersonToSlice)
+ AddPersonToSlice(auth, person_id, slice_id)
# check role, key, site, slice
- persons = GetPersons([person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ persons = GetPersons(auth, [person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
if persons is None or not persons: continue
person = persons[0]
self.islistequal([role], person['roles'], 'AddRoleToPerson - check')
self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check')
self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check')
- persons = GetPersons(person_ids)
+ persons = GetPersons(auth, person_ids)
if persons is not None:
self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal')
- if self.verbose:
- utils.header("Added users: %s" % self.person_ids)
+ if self.config.verbose:
+ utils.header("Added users: %s" % person_ids)
+
+ return person_ids
def DeletePersons(self):
# Delete attributes manually for first person
- persons = GetPersons(self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids'])
+ GetPersons = self.debug(api.GetPersons)
+ persons = GetPersons(auth, self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids', 'roles'])
if persons is None or not persons: return 0
person = persons[0]
-
- # Delete role
- DeleteRoleFromPerson = self.debug(shell.DeleteRoleFromPerson)
- DeleteRoleFromPerson(person['role'], person['person_id'])
- # Delete key
- DeleteKey = self.debug(shell.DeleteKey)
- DeleteKey(person['key_id'])
-
- # Remove person from site
- DeletePersonFromSite = self.debug(shell.DeletePersonFromSite)
- DeletePersonFromSite(person['person_id'], person['site_id'])
-
- # Remove person from slice
- DeletePersonFromSlice = self.debug(shell.DeletePersonFromSlice)
- DeletePersonFromSlice(person['person_id'], person['slice_id'])
+ if person['roles']:
+ # Delete role
+ role = random.sample(person['roles'], 1)[0]
+ DeleteRoleFromPerson = self.debug(api.DeleteRoleFromPerson)
+ DeleteRoleFromPerson(auth, role, person['person_id'])
+
+ if person['key_ids']:
+ # Delete key
+ key_id = random.sample(person['key_ids'], 1)[0]
+ DeleteKey = self.debug(api.DeleteKey)
+ DeleteKey(auth, key_id)
+
+ if person['site_ids']:
+ # Remove person from site
+ site_id = random.sample(person['site_ids'], 1)[0]
+ DeletePersonFromSite = self.debug(api.DeletePersonFromSite)
+ DeletePersonFromSite(auth, person['person_id'], site_id)
+
+ if person['slice_ids']:
+ # Remove person from slice
+ slice_id = random.sample(person['slice_ids'], 1)[0]
+ DeletePersonFromSlice = self.debug(api.DeletePersonFromSlice)
+ DeletePersonFromSlice(auth, person['person_id'], slice_id)
# check role, key, site, slice
- persons = GetPersons([person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ persons = GetPersons(auth, [person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
if persons is None or not persons: return 0
person = persons[0]
self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check')
self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check')
self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check')
- DeletePerson = self.debug(shell.DeletePerson)
+ DeletePerson = self.debug(api.DeletePerson)
# Have DeletePeson automatically delete attriubtes for all other persons
for person_id in self.person_ids:
# Delete account
- DeletePerson(person_id)
+ DeletePerson(auth, person_id)
# Check if persons are deleted
- GetPersons = self.debug(shell.GetPersons)
- persons = GetPersons(self.person_ids)
+ GetPersons = self.debug(api.GetPersons)
+ persons = GetPersons(auth, self.person_ids)
self.islistequal(persons, [], 'DeletePerson - check')
- if self.verbose:
+ if self.config.verbose:
utils.header("Deleted users: %s" % self.person_ids)
self.person_ids = []
+ def Keys(self, n = 3):
+ key_ids = []
+ for i in range(n):
+ # Add a key to an account
+ key_fields = random_key()
+ person_id = random.sample(self.person_ids, 1)[0]
+ AddPersonKey = self.debug(api.AddPersonKey)
+ key_id = AddPersonKey(auth, person_id, key_fields)
+ if key_id is None: continue
+
+ # Should return a unique key_id
+ self.isunique(key_id, key_ids, 'AddPersonKey - isunique')
+ key_ids.append(key_id)
+ GetKeys = self.debug(api.GetKeys)
+ keys = GetKeys(auth, [key_id])
+ if keys is None: continue
+ key = keys[0]
+ self.isequal(key, key_fields, 'AddPersonKey - isequal')
+
+ # Update Key
+ key_fields = random_key()
+ UpdateKey = self.debug(api.UpdateKey)
+ result = UpdateKey(auth, key_id, key_fields)
+
+ keys = GetKeys(auth, [key_id])
+ if keys is None or not keys: continue
+ key = keys[0]
+ self.isequal(key, key_fields, 'UpdatePersonKey - isequal')
+
+ keys = GetKeys(auth, key_ids)
+ if keys is not None:
+ self.islistequal(key_ids, [key['key_id'] for key in keys], 'GetKeys - isequal')
+
+ if self.config.verbose:
+ utils.header("Added keys: %s" % key_ids)
+ return key_ids
+
+
+ def DeleteKeys(self):
+
+ # Blacklist first key, Delete rest
+ GetKeys = self.debug(api.GetKeys)
+ keys = GetKeys(auth, self.key_ids)
+ if keys is None or not keys: return 0
+ key = keys[0]
+
+ BlacklistKey = self.debug(api.BlacklistKey)
+ BlacklistKey(auth, key['key_id'])
+
+ keys = GetKeys(auth, [key['key_id']])
+ self.islistequal(keys, [], 'BlacklistKey - check')
+
+ if self.config.verbose:
+ utils.header("Blacklisted key: %s" % key['key_id'])
+
+ DeleteKey = self.debug(api.DeleteKey)
+ for key_id in self.key_ids:
+ DeleteKey(auth, key_id)
+
+ keys = GetKeys(auth, self.key_ids)
+ self.islistequal(keys, [], 'DeleteKey - check')
+
+ if self.config.verbose:
+ utils.header("Deleted keys: %s" % self.key_ids)
+
+ self.key_ids = []
+
if __name__ == '__main__':
args = tuple(sys.argv[1:])
api_unit_test()(*args)